Use numpy array in shared memory for multiprocessing Use numpy array in shared memory for multiprocessing numpy numpy

Use numpy array in shared memory for multiprocessing


To add to @unutbu's (not available anymore) and @Henry Gomersall's answers. You could use shared_arr.get_lock() to synchronize access when needed:

shared_arr = mp.Array(ctypes.c_double, N)# ...def f(i): # could be anything numpy accepts as an index such another numpy array    with shared_arr.get_lock(): # synchronize access        arr = np.frombuffer(shared_arr.get_obj()) # no data copying        arr[i] = -arr[i]

Example

import ctypesimport loggingimport multiprocessing as mpfrom contextlib import closingimport numpy as npinfo = mp.get_logger().infodef main():    logger = mp.log_to_stderr()    logger.setLevel(logging.INFO)    # create shared array    N, M = 100, 11    shared_arr = mp.Array(ctypes.c_double, N)    arr = tonumpyarray(shared_arr)    # fill with random values    arr[:] = np.random.uniform(size=N)    arr_orig = arr.copy()    # write to arr from different processes    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:        # many processes access the same slice        stop_f = N // 10        p.map_async(f, [slice(stop_f)]*M)        # many processes access different slices of the same array        assert M % 2 # odd        step = N // 10        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])    p.join()    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)def init(shared_arr_):    global shared_arr    shared_arr = shared_arr_ # must be inherited, not passed as an argumentdef tonumpyarray(mp_arr):    return np.frombuffer(mp_arr.get_obj())def f(i):    """synchronized."""    with shared_arr.get_lock(): # synchronize access        g(i)def g(i):    """no synchronization."""    info("start %s" % (i,))    arr = tonumpyarray(shared_arr)    arr[i] = -1 * arr[i]    info("end   %s" % (i,))if __name__ == '__main__':    mp.freeze_support()    main()

If you don't need synchronized access or you create your own locks then mp.Array() is unnecessary. You could use mp.sharedctypes.RawArray in this case.


The Array object has a get_obj() method associated with it, which returns the ctypes array which presents a buffer interface. I think the following should work...

from multiprocessing import Process, Arrayimport scipyimport numpydef f(a):    a[0] = -a[0]if __name__ == '__main__':    # Create the array    N = int(10)    unshared_arr = scipy.rand(N)    a = Array('d', unshared_arr)    print "Originally, the first two elements of arr = %s"%(a[:2])    # Create, start, and finish the child process    p = Process(target=f, args=(a,))    p.start()    p.join()    # Print out the changed values    print "Now, the first two elements of arr = %s"%a[:2]    b = numpy.frombuffer(a.get_obj())    b[0] = 10.0    print a[0]

When run, this prints out the first element of a now being 10.0, showing a and b are just two views into the same memory.

In order to make sure it is still multiprocessor safe, I believe you will have to use the acquire and release methods that exist on the Array object, a, and its built in lock to make sure its all safely accessed (though I'm not an expert on the multiprocessor module).


While the answers already given are good, there is a much easier solution to this problem provided two conditions are met:

  1. You are on a POSIX-compliant operating system (e.g. Linux, Mac OSX); and
  2. Your child processes need read-only access to the shared array.

In this case you do not need to fiddle with explicitly making variables shared, as the child processes will be created using a fork. A forked child automatically shares the parent's memory space. In the context of Python multiprocessing, this means it shares all module-level variables; note that this does not hold for arguments that you explicitly pass to your child processes or to the functions you call on a multiprocessing.Pool or so.

A simple example:

import multiprocessingimport numpy as np# will hold the (implicitly mem-shared) datadata_array = None# child worker functiondef job_handler(num):    # built-in id() returns unique memory ID of a variable    return id(data_array), np.sum(data_array)def launch_jobs(data, num_jobs=5, num_worker=4):    global data_array    data_array = data    pool = multiprocessing.Pool(num_worker)    return pool.map(job_handler, range(num_jobs))# create some random data and execute the child jobsmem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))# this will print 'True' on POSIX OS, since the data was sharedprint(np.all(np.asarray(mem_ids) == id(data_array)))