Can SymbolicFunction be parallelized?

This post summarizes an interesting discussion from the OpenTUNRS Gitter thread.

In order to get a reference value for complicated toy-cases, a large Monte Carlo simulation is often the most reliable method (order of 10^9 simulations). In this case, having a parallel SymbolicFunction() would save some time. I then asked if the SymbolicFunction() could be parallelized, in the vein of the PythonFunction() that directly offers the argument “n_cpus”.

Here are the answers of J. Schueller and R. Lebrun, thanks to them for their very interesting inputs.

J. Schueller
yes, SymbolicFunction is quick but not thread-safe unfortunately there should be many more options with Python modules such as numpy, sympy, numba etc to achieve this, or maybe someone brave enough to write an LLVM backend for the SymbolicFunction ?

R. Lebrun
I played a little bit with SymbolicFunction and multiprocessing Pool, and got interesting results with the following code:

import openturns as ot
import multiprocessing as mp
from time import time

class ParallelSymbolic(ot.OpenTURNSPythonFunction):
    def __init__(self, f, ncpu=-1):
        super(ParallelSymbolic, self).__init__(f.getInputDimension(), f.getOutputDimension())
        self.f_ = f
        if ncpu >= 1:
            self.ncpu_ = ncpu
            self.ncpu_ = mp.cpu_count()

    def _exec(self, X):
        return self.f_(X)

    def _exec_sample(self, X):
        X = ot.Sample(X)
        size = X.getSize()
        block_size = size // self.ncpu_
        if block_size > 0:
            subsamples = [X[(i * block_size):((i + 1) * block_size)] for i in range(self.ncpu_-1)] + [X[((self.ncpu_-1) * block_size):size]]
            subsamples = [X]
        p = mp.Pool(self.ncpu_)
        rs = p.map_async(self.f_, subsamples)
        res = rs.get()
        Y = res[0]
        for i in range(1, len(res)):
        return Y

if __name__ == "__main__":
    # Input dimension
    d = 4
    # Complexity
    rep = 1000
    # Build a HUGE formula
    variables = ["x" + str(i) for i in range(d)]
    formula = ""
    for i in range(d*rep):
        formula += "sin(" + str(i+1) + "*" + variables[i % d] + ")+"
    formula += "1"

    f_sequential = ot.SymbolicFunction(variables, [formula])
    f_parallel = ot.Function(ParallelSymbolic(f_sequential))

    # Compare both model on a basic MC simulation
    block_size = 10000
    iter_max = 100
    threshold = (d * rep)**0.5
    distribution = ot.ComposedDistribution([ot.Uniform()]*d)
    X = ot.RandomVector(distribution)
    Y_sequential = ot.CompositeRandomVector(f_sequential, X)
    E_sequential = ot.ThresholdEvent(Y_sequential, ot.Greater(), threshold)
    algo = ot.ProbabilitySimulationAlgorithm(E_sequential)
    t0 = time()
    t1 = time()
    t_sequential = t1 - t0
    print("Sequential, P=%.5e" % algo.getResult().getProbabilityEstimate(), "t=", t_sequential)
    Y_parallel = ot.CompositeRandomVector(f_parallel, X)
    E_parallel = ot.ThresholdEvent(Y_parallel, ot.Greater(), threshold)
    algo = ot.ProbabilitySimulationAlgorithm(E_parallel)
    t0 = time()
    t1 = time()
    t_parallel = t1 - t0
    print("Parallel, P=%.5e" % algo.getResult().getProbabilityEstimate(), "t=", t_parallel)
    print("t_seq / t_par=", t_sequential / t_parallel)

The ParallelSymbolic class basically split an input sample into parts of roughly the same size and dispatch the execution of the SymbolicFunction over these sub-samples. As it is based on separate processes, there is no thread-safety problem here. I illustrate its use on a basic MonteCarlo simulation, where you can play with the function complexity (the rep parameter), the block size (block_size) and the number of outer iterations (iter_max). On my 8-cores machine I get:

rep=1, block_size=8, iter_max=1000, t_sequential=0.004s and t_parallel=17s
rep=1, block_size=1000, iter_max=8, t_sequential=0.0012s and t_parallel=0.29s
rep=10, block_size=8, iter_max=1000, t_sequential=0.01s and t_parallel=17s
rep=10, block_size=1000, iter_max=8, t_sequential=0.008s and t_parallel=0.29s
rep=100, block_size=8, iter_max=1000, t_sequential=0.08s and t_parallel=18s
rep=100, block_size=1000, iter_max=8, t_sequential=0.08s and t_parallel=0.30s
rep=1000, block_size=8, iter_max=1000, t_sequential=0.87s and t_parallel=43s
rep=1000, block_size=1000, iter_max=8, t_sequential=0.89s and t_parallel=0.58s

So if the model is huge and the block size is large you start to gain something, otherwise all the time is spent into splitting the input sample and gluing the output samples. The best I got was with rep=1000, block_max=10000, iter=10 (not relevant) for a speedup of 3.8 (the execution time is divided by 3.8 using the parallel version). I am sure that my implementation is naive and can be improved in many ways, it was just a test of mixing SymbolicFunction and Pool as forking processes solves the multi threading issue. Only tested under Linux.

E. Fekhari

Hi Julien and Régis, thanks for your very interesting answers. I didn’t realize the complexity of offering cpu parallelization for this function. At first I imagined that the functionality was comparable to the one from PythonFunction but I understand now that it doesn’t compare.
@Régis, thanks for this implementation, it works fine for me and provides very interesting results taking into account the complexity of the function. Once again I under-estimated the SymbolicFunction.

On my 12-cores computer I compared the execution time of the SymbolicFunction, your parallel developpment and the following vectorized numpy solution. Note that I restricted myself to the least complex formula your script offers for now (rep=1).

# Distribution
d = 4
distribution = ot.ComposedDistribution([ot.Uniform()]*d)
X = ot.RandomVector(distribution)
# MC size settings
block_size = 10000
iter_max = 1000
# Generate sample and reshape it into a np array
mc_sample = X.getSample(block_size * iter_max)
x0, x1, x2, x3 = [np.array(mc_sample.getMarginal(i)).flatten() for i in range(d)]
t0 = time()
output_sample = np.sin(x0) + np.sin(2 * x1) + np.sin(2 * x2) + np.sin(3 * x3) + 1
t_vectorized_numpy = time() - t0
print("Time elapsed: {:.4f}".format(interval))

Here are some results:

rep=1, block_size=100, iter_max=1000, t_sequential=0.0004s and t_parallel=0.1380s, t_vectorized_numpy=0.0144s
rep=1, block_size=1000, iter_max=10000, t_sequential=0.0005s and t_parallel=0.0598s, t_vectorized_numpy=0.6197s
rep=1, block_size=10000, iter_max=10000, t_sequential=0.0059s and t_parallel=0.2901s, t_vectorized_numpy=6.1185s