How do I pass large numpy arrays between python subprocesses without saving to disk? How do I pass large numpy arrays between python subprocesses without saving to disk? python python

How do I pass large numpy arrays between python subprocesses without saving to disk?


While googling around for more information about the code Joe Kington posted, I found the numpy-sharedmem package. Judging from this numpy/multiprocessing tutorial it seems to share the same intellectual heritage (maybe largely the same authors? -- I'm not sure).

Using the sharedmem module, you can create a shared-memory numpy array (awesome!), and use it with multiprocessing like this:

import sharedmem as shmimport numpy as npimport multiprocessing as mpdef worker(q,arr):    done = False    while not done:        cmd = q.get()        if cmd == 'done':            done = True        elif cmd == 'data':            ##Fake data. In real life, get data from hardware.            rnd=np.random.randint(100)            print('rnd={0}'.format(rnd))            arr[:]=rnd        q.task_done()if __name__=='__main__':    N=10    arr=shm.zeros(N,dtype=np.uint8)    q=mp.JoinableQueue()        proc = mp.Process(target=worker, args=[q,arr])    proc.daemon=True    proc.start()    for i in range(3):        q.put('data')        # Wait for the computation to finish        q.join()           print arr.shape        print(arr)    q.put('done')    proc.join()

Running yields

rnd=53(10,)[53 53 53 53 53 53 53 53 53 53]rnd=15(10,)[15 15 15 15 15 15 15 15 15 15]rnd=87(10,)[87 87 87 87 87 87 87 87 87 87]


Basically, you just want to share a block of memory between processes and view it as a numpy array, right?

In that case, have a look at this (Posted to numpy-discussion by Nadav Horesh awhile back, not my work). There are a couple of similar implementations (some more flexible), but they all essentially use this principle.

#    "Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing"# Modified and corrected by Nadav Horesh, Mar 2010# No rights reservedimport numpy as Nimport ctypesimport multiprocessing as MP_ctypes_to_numpy = {    ctypes.c_char   : N.dtype(N.uint8),    ctypes.c_wchar  : N.dtype(N.int16),    ctypes.c_byte   : N.dtype(N.int8),    ctypes.c_ubyte  : N.dtype(N.uint8),    ctypes.c_short  : N.dtype(N.int16),    ctypes.c_ushort : N.dtype(N.uint16),    ctypes.c_int    : N.dtype(N.int32),    ctypes.c_uint   : N.dtype(N.uint32),    ctypes.c_long   : N.dtype(N.int64),    ctypes.c_ulong  : N.dtype(N.uint64),    ctypes.c_float  : N.dtype(N.float32),    ctypes.c_double : N.dtype(N.float64)}_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys()))def shmem_as_ndarray(raw_array, shape=None ):    address = raw_array._obj._wrapper.get_address()    size = len(raw_array)    if (shape is None) or (N.asarray(shape).prod() != size):        shape = (size,)    elif type(shape) is int:        shape = (shape,)    else:        shape = tuple(shape)    dtype = _ctypes_to_numpy[raw_array._obj._type_]    class Dummy(object): pass    d = Dummy()    d.__array_interface__ = {        'data' : (address, False),        'typestr' : dtype.str,        'descr' :   dtype.descr,        'shape' : shape,        'strides' : None,        'version' : 3}    return N.asarray(d)def empty_shared_array(shape, dtype, lock=True):    '''    Generate an empty MP shared array given ndarray parameters    '''    if type(shape) is not int:        shape = N.asarray(shape).prod()    try:        c_type = _numpy_to_ctypes[dtype]    except KeyError:        c_type = _numpy_to_ctypes[N.dtype(dtype)]    return MP.Array(c_type, shape, lock=lock)def emptylike_shared_array(ndarray, lock=True):    'Generate a empty shared array with size and dtype of a  given array'    return empty_shared_array(ndarray.size, ndarray.dtype, lock)


From the other answers, it seems that numpy-sharedmem is the way to go.

However, if you need a pure python solution, or installing extensions, cython or the like is a (big) hassle, you might want to use the following code which is a simplified version of Nadav's code:

import numpy, ctypes, multiprocessing_ctypes_to_numpy = {    ctypes.c_char   : numpy.dtype(numpy.uint8),    ctypes.c_wchar  : numpy.dtype(numpy.int16),    ctypes.c_byte   : numpy.dtype(numpy.int8),    ctypes.c_ubyte  : numpy.dtype(numpy.uint8),    ctypes.c_short  : numpy.dtype(numpy.int16),    ctypes.c_ushort : numpy.dtype(numpy.uint16),    ctypes.c_int    : numpy.dtype(numpy.int32),    ctypes.c_uint   : numpy.dtype(numpy.uint32),    ctypes.c_long   : numpy.dtype(numpy.int64),    ctypes.c_ulong  : numpy.dtype(numpy.uint64),    ctypes.c_float  : numpy.dtype(numpy.float32),    ctypes.c_double : numpy.dtype(numpy.float64)}_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(),                            _ctypes_to_numpy.keys()))def shm_as_ndarray(mp_array, shape = None):    '''Given a multiprocessing.Array, returns an ndarray pointing to    the same data.'''    # support SynchronizedArray:    if not hasattr(mp_array, '_type_'):        mp_array = mp_array.get_obj()    dtype = _ctypes_to_numpy[mp_array._type_]    result = numpy.frombuffer(mp_array, dtype)    if shape is not None:        result = result.reshape(shape)    return numpy.asarray(result)def ndarray_to_shm(array, lock = False):    '''Generate an 1D multiprocessing.Array containing the data from    the passed ndarray.  The data will be *copied* into shared    memory.'''    array1d = array.ravel(order = 'A')    try:        c_type = _numpy_to_ctypes[array1d.dtype]    except KeyError:        c_type = _numpy_to_ctypes[numpy.dtype(array1d.dtype)]    result = multiprocessing.Array(c_type, array1d.size, lock = lock)    shm_as_ndarray(result)[:] = array1d    return result

You would use it like this:

  1. Use sa = ndarray_to_shm(a) to convert the ndarray a into a shared multiprocessing.Array.
  2. Use multiprocessing.Process(target = somefunc, args = (sa, ) (and start, maybe join) to call somefunc in a separate process, passing the shared array.
  3. In somefunc, use a = shm_as_ndarray(sa) to get an ndarray pointing to the shared data. (Actually, you may want to do the same in the original process, immediately after creating sa, in order to have two ndarrays referencing the same data.)

AFAICS, you don't need to set lock to True, since shm_as_ndarray will not use the locking anyhow. If you need locking, you would set lock to True and call acquire/release on sa.

Also, if your array is not 1-dimensional, you might want to transfer the shape along with sa (e.g. use args = (sa, a.shape)).

This solution has the advantage that it does not need additional packages or extension modules, except multiprocessing (which is in the standard library).