R. Lebrun
I played a little bit with SymbolicFunction and multiprocessing Pool, and got interesting results with the following code:
import openturns as ot
import multiprocessing as mp
from time import time
class ParallelSymbolic(ot.OpenTURNSPythonFunction):
def __init__(self, f, ncpu=-1):
super(ParallelSymbolic, self).__init__(f.getInputDimension(), f.getOutputDimension())
self.setInputDescription(f.getInputDescription())
self.setOutputDescription(f.getOutputDescription())
self.f_ = f
if ncpu >= 1:
self.ncpu_ = ncpu
else:
self.ncpu_ = mp.cpu_count()
def _exec(self, X):
return self.f_(X)
def _exec_sample(self, X):
X = ot.Sample(X)
size = X.getSize()
block_size = size // self.ncpu_
if block_size > 0:
subsamples = [X[(i * block_size):((i + 1) * block_size)] for i in range(self.ncpu_-1)] + [X[((self.ncpu_-1) * block_size):size]]
else:
subsamples = [X]
p = mp.Pool(self.ncpu_)
rs = p.map_async(self.f_, subsamples)
p.close();
res = rs.get()
Y = res[0]
for i in range(1, len(res)):
Y.add(res[i])
return Y
if __name__ == "__main__":
# Input dimension
d = 4
# Complexity
rep = 1000
# Build a HUGE formula
variables = ["x" + str(i) for i in range(d)]
formula = ""
for i in range(d*rep):
formula += "sin(" + str(i+1) + "*" + variables[i % d] + ")+"
formula += "1"
f_sequential = ot.SymbolicFunction(variables, [formula])
f_parallel = ot.Function(ParallelSymbolic(f_sequential))
# Compare both model on a basic MC simulation
block_size = 10000
iter_max = 100
threshold = (d * rep)**0.5
distribution = ot.ComposedDistribution([ot.Uniform()]*d)
X = ot.RandomVector(distribution)
print("Sequential")
Y_sequential = ot.CompositeRandomVector(f_sequential, X)
E_sequential = ot.ThresholdEvent(Y_sequential, ot.Greater(), threshold)
algo = ot.ProbabilitySimulationAlgorithm(E_sequential)
algo.setBlockSize(block_size)
algo.setMaximumOuterSampling(iter_max)
ot.RandomGenerator.SetSeed(1234)
t0 = time()
algo.run()
t1 = time()
t_sequential = t1 - t0
print("Sequential, P=%.5e" % algo.getResult().getProbabilityEstimate(), "t=", t_sequential)
print("Parallel")
Y_parallel = ot.CompositeRandomVector(f_parallel, X)
E_parallel = ot.ThresholdEvent(Y_parallel, ot.Greater(), threshold)
algo = ot.ProbabilitySimulationAlgorithm(E_parallel)
algo.setBlockSize(block_size)
algo.setMaximumOuterSampling(iter_max)
ot.RandomGenerator.SetSeed(1234)
t0 = time()
algo.run()
t1 = time()
t_parallel = t1 - t0
print("Parallel, P=%.5e" % algo.getResult().getProbabilityEstimate(), "t=", t_parallel)
print("t_seq / t_par=", t_sequential / t_parallel)
The ParallelSymbolic class basically split an input sample into parts of roughly the same size and dispatch the execution of the SymbolicFunction over these sub-samples. As it is based on separate processes, there is no thread-safety problem here. I illustrate its use on a basic MonteCarlo simulation, where you can play with the function complexity (the rep parameter), the block size (block_size) and the number of outer iterations (iter_max). On my 8-cores machine I get:
rep=1, block_size=8, iter_max=1000, t_sequential=0.004s and t_parallel=17s
rep=1, block_size=1000, iter_max=8, t_sequential=0.0012s and t_parallel=0.29s
rep=10, block_size=8, iter_max=1000, t_sequential=0.01s and t_parallel=17s
rep=10, block_size=1000, iter_max=8, t_sequential=0.008s and t_parallel=0.29s
rep=100, block_size=8, iter_max=1000, t_sequential=0.08s and t_parallel=18s
rep=100, block_size=1000, iter_max=8, t_sequential=0.08s and t_parallel=0.30s
rep=1000, block_size=8, iter_max=1000, t_sequential=0.87s and t_parallel=43s
rep=1000, block_size=1000, iter_max=8, t_sequential=0.89s and t_parallel=0.58s
So if the model is huge and the block size is large you start to gain something, otherwise all the time is spent into splitting the input sample and gluing the output samples. The best I got was with rep=1000, block_max=10000, iter=10 (not relevant) for a speedup of 3.8 (the execution time is divided by 3.8 using the parallel version). I am sure that my implementation is naive and can be improved in many ways, it was just a test of mixing SymbolicFunction and Pool as forking processes solves the multi threading issue. Only tested under Linux.