Shared-memory objects in multiprocessing Shared-memory objects in multiprocessing numpy numpy

Shared-memory objects in multiprocessing


If you use an operating system that uses copy-on-write fork() semantics (like any common unix), then as long as you never alter your data structure it will be available to all child processes without taking up additional memory. You will not have to do anything special (except make absolutely sure you don't alter the object).

The most efficient thing you can do for your problem would be to pack your array into an efficient array structure (using numpy or array), place that in shared memory, wrap it with multiprocessing.Array, and pass that to your functions. This answer shows how to do that.

If you want a writeable shared object, then you will need to wrap it with some kind of synchronization or locking. multiprocessing provides two methods of doing this: one using shared memory (suitable for simple values, arrays, or ctypes) or a Manager proxy, where one process holds the memory and a manager arbitrates access to it from other processes (even over a network).

The Manager approach can be used with arbitrary Python objects, but will be slower than the equivalent using shared memory because the objects need to be serialized/deserialized and sent between processes.

There are a wealth of parallel processing libraries and approaches available in Python. multiprocessing is an excellent and well rounded library, but if you have special needs perhaps one of the other approaches may be better.


I run into the same problem and wrote a little shared-memory utility class to work around it.

I'm using multiprocessing.RawArray (lockfree), and also the access to the arrays is not synchronized at all (lockfree), be careful not to shoot your own feet.

With the solution I get speedups by a factor of approx 3 on a quad-core i7.

Here's the code:Feel free to use and improve it, and please report back any bugs.

'''Created on 14.05.2013@author: martin'''import multiprocessingimport ctypesimport numpy as npclass SharedNumpyMemManagerError(Exception):    pass'''Singleton Pattern'''class SharedNumpyMemManager:        _initSize = 1024    _instance = None    def __new__(cls, *args, **kwargs):        if not cls._instance:            cls._instance = super(SharedNumpyMemManager, cls).__new__(                                cls, *args, **kwargs)        return cls._instance            def __init__(self):        self.lock = multiprocessing.Lock()        self.cur = 0        self.cnt = 0        self.shared_arrays = [None] * SharedNumpyMemManager._initSize    def __createArray(self, dimensions, ctype=ctypes.c_double):        self.lock.acquire()        # double size if necessary        if (self.cnt >= len(self.shared_arrays)):            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)        # next handle        self.__getNextFreeHdl()                # create array in shared memory segment        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))        # convert to numpy array vie ctypeslib        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)        # do a reshape for correct dimensions                    # Returns a masked array containing the same data, but with a new shape.        # The result is a view on the original array        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)        # update cnt        self.cnt += 1        self.lock.release()        # return handle to the shared memory numpy array        return self.cur    def __getNextFreeHdl(self):        orgCur = self.cur        while self.shared_arrays[self.cur] is not None:            self.cur = (self.cur + 1) % len(self.shared_arrays)            if orgCur == self.cur:                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')    def __freeArray(self, hdl):        self.lock.acquire()        # set reference to None        if self.shared_arrays[hdl] is not None: # consider multiple calls to free            self.shared_arrays[hdl] = None            self.cnt -= 1        self.lock.release()    def __getArray(self, i):        return self.shared_arrays[i]    @staticmethod    def getInstance():        if not SharedNumpyMemManager._instance:            SharedNumpyMemManager._instance = SharedNumpyMemManager()        return SharedNumpyMemManager._instance    @staticmethod    def createArray(*args, **kwargs):        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)    @staticmethod    def getArray(*args, **kwargs):        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)    @staticmethod        def freeArray(*args, **kwargs):        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)# Init Singleton on module loadSharedNumpyMemManager.getInstance()if __name__ == '__main__':    import timeit    N_PROC = 8    INNER_LOOP = 10000    N = 1000    def propagate(t):        i, shm_hdl, evidence = t        a = SharedNumpyMemManager.getArray(shm_hdl)        for j in range(INNER_LOOP):            a[i] = i    class Parallel_Dummy_PF:        def __init__(self, N):            self.N = N            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)                        self.pool = multiprocessing.Pool(processes=N_PROC)        def update_par(self, evidence):            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))        def update_seq(self, evidence):            for i in range(self.N):                propagate((i, self.arrayHdl, evidence))        def getArray(self):            return SharedNumpyMemManager.getArray(self.arrayHdl)    def parallelExec():        pf = Parallel_Dummy_PF(N)        print(pf.getArray())        pf.update_par(5)        print(pf.getArray())    def sequentialExec():        pf = Parallel_Dummy_PF(N)        print(pf.getArray())        pf.update_seq(5)        print(pf.getArray())    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")    print("Sequential: ", t1.timeit(number=1))        print("Parallel: ", t2.timeit(number=1))


This is the intended use case for Ray, which is a library for parallel and distributed Python. Under the hood, it serializes objects using the Apache Arrow data layout (which is a zero-copy format) and stores them in a shared-memory object store so they can be accessed by multiple processes without creating copies.

The code would look like the following.

import numpy as npimport rayray.init()@ray.remotedef func(array, param):    # Do stuff.    return 1array = np.ones(10**6)# Store the array in the shared memory object store once# so it is not copied multiple times.array_id = ray.put(array)result_ids = [func.remote(array_id, i) for i in range(4)]output = ray.get(result_ids)

If you don't call ray.put then the array will still be stored in shared memory, but that will be done once per invocation of func, which is not what you want.

Note that this will work not only for arrays but also for objects that contain arrays, e.g., dictionaries mapping ints to arrays as below.

You can compare the performance of serialization in Ray versus pickle by running the following in IPython.

import numpy as npimport pickleimport rayray.init()x = {i: np.ones(10**7) for i in range(20)}# Time Ray.%time x_id = ray.put(x)  # 2.4s%time new_x = ray.get(x_id)  # 0.00073s# Time pickle.%time serialized = pickle.dumps(x)  # 2.6s%time deserialized = pickle.loads(serialized)  # 1.9s

Serialization with Ray is only slightly faster than pickle, but deserialization is 1000x faster because of the use of shared memory (this number will of course depend on the object).

See the Ray documentation. You can read more about fast serialization using Ray and Arrow. Note I'm one of the Ray developers.