How do I pass large numpy arrays between python subprocesses without saving to disk?
While googling around for more information about the code Joe Kington posted, I found the numpy-sharedmem package. Judging from this numpy/multiprocessing tutorial it seems to share the same intellectual heritage (maybe largely the same authors? -- I'm not sure).
Using the sharedmem module, you can create a shared-memory numpy array (awesome!), and use it with multiprocessing like this:
import sharedmem as shmimport numpy as npimport multiprocessing as mpdef worker(q,arr): done = False while not done: cmd = q.get() if cmd == 'done': done = True elif cmd == 'data': ##Fake data. In real life, get data from hardware. rnd=np.random.randint(100) print('rnd={0}'.format(rnd)) arr[:]=rnd q.task_done()if __name__=='__main__': N=10 arr=shm.zeros(N,dtype=np.uint8) q=mp.JoinableQueue() proc = mp.Process(target=worker, args=[q,arr]) proc.daemon=True proc.start() for i in range(3): q.put('data') # Wait for the computation to finish q.join() print arr.shape print(arr) q.put('done') proc.join()
Running yields
rnd=53(10,)[53 53 53 53 53 53 53 53 53 53]rnd=15(10,)[15 15 15 15 15 15 15 15 15 15]rnd=87(10,)[87 87 87 87 87 87 87 87 87 87]
Basically, you just want to share a block of memory between processes and view it as a numpy array, right?
In that case, have a look at this (Posted to numpy-discussion by Nadav Horesh awhile back, not my work). There are a couple of similar implementations (some more flexible), but they all essentially use this principle.
# "Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing"# Modified and corrected by Nadav Horesh, Mar 2010# No rights reservedimport numpy as Nimport ctypesimport multiprocessing as MP_ctypes_to_numpy = { ctypes.c_char : N.dtype(N.uint8), ctypes.c_wchar : N.dtype(N.int16), ctypes.c_byte : N.dtype(N.int8), ctypes.c_ubyte : N.dtype(N.uint8), ctypes.c_short : N.dtype(N.int16), ctypes.c_ushort : N.dtype(N.uint16), ctypes.c_int : N.dtype(N.int32), ctypes.c_uint : N.dtype(N.uint32), ctypes.c_long : N.dtype(N.int64), ctypes.c_ulong : N.dtype(N.uint64), ctypes.c_float : N.dtype(N.float32), ctypes.c_double : N.dtype(N.float64)}_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys()))def shmem_as_ndarray(raw_array, shape=None ): address = raw_array._obj._wrapper.get_address() size = len(raw_array) if (shape is None) or (N.asarray(shape).prod() != size): shape = (size,) elif type(shape) is int: shape = (shape,) else: shape = tuple(shape) dtype = _ctypes_to_numpy[raw_array._obj._type_] class Dummy(object): pass d = Dummy() d.__array_interface__ = { 'data' : (address, False), 'typestr' : dtype.str, 'descr' : dtype.descr, 'shape' : shape, 'strides' : None, 'version' : 3} return N.asarray(d)def empty_shared_array(shape, dtype, lock=True): ''' Generate an empty MP shared array given ndarray parameters ''' if type(shape) is not int: shape = N.asarray(shape).prod() try: c_type = _numpy_to_ctypes[dtype] except KeyError: c_type = _numpy_to_ctypes[N.dtype(dtype)] return MP.Array(c_type, shape, lock=lock)def emptylike_shared_array(ndarray, lock=True): 'Generate a empty shared array with size and dtype of a given array' return empty_shared_array(ndarray.size, ndarray.dtype, lock)
From the other answers, it seems that numpy-sharedmem is the way to go.
However, if you need a pure python solution, or installing extensions, cython or the like is a (big) hassle, you might want to use the following code which is a simplified version of Nadav's code:
import numpy, ctypes, multiprocessing_ctypes_to_numpy = { ctypes.c_char : numpy.dtype(numpy.uint8), ctypes.c_wchar : numpy.dtype(numpy.int16), ctypes.c_byte : numpy.dtype(numpy.int8), ctypes.c_ubyte : numpy.dtype(numpy.uint8), ctypes.c_short : numpy.dtype(numpy.int16), ctypes.c_ushort : numpy.dtype(numpy.uint16), ctypes.c_int : numpy.dtype(numpy.int32), ctypes.c_uint : numpy.dtype(numpy.uint32), ctypes.c_long : numpy.dtype(numpy.int64), ctypes.c_ulong : numpy.dtype(numpy.uint64), ctypes.c_float : numpy.dtype(numpy.float32), ctypes.c_double : numpy.dtype(numpy.float64)}_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys()))def shm_as_ndarray(mp_array, shape = None): '''Given a multiprocessing.Array, returns an ndarray pointing to the same data.''' # support SynchronizedArray: if not hasattr(mp_array, '_type_'): mp_array = mp_array.get_obj() dtype = _ctypes_to_numpy[mp_array._type_] result = numpy.frombuffer(mp_array, dtype) if shape is not None: result = result.reshape(shape) return numpy.asarray(result)def ndarray_to_shm(array, lock = False): '''Generate an 1D multiprocessing.Array containing the data from the passed ndarray. The data will be *copied* into shared memory.''' array1d = array.ravel(order = 'A') try: c_type = _numpy_to_ctypes[array1d.dtype] except KeyError: c_type = _numpy_to_ctypes[numpy.dtype(array1d.dtype)] result = multiprocessing.Array(c_type, array1d.size, lock = lock) shm_as_ndarray(result)[:] = array1d return result
You would use it like this:
- Use
sa = ndarray_to_shm(a)
to convert the ndarraya
into a shared multiprocessing.Array. - Use
multiprocessing.Process(target = somefunc, args = (sa, )
(andstart
, maybejoin
) to callsomefunc
in a separate process, passing the shared array. - In
somefunc
, usea = shm_as_ndarray(sa)
to get an ndarray pointing to the shared data. (Actually, you may want to do the same in the original process, immediately after creatingsa
, in order to have two ndarrays referencing the same data.)
AFAICS, you don't need to set lock to True, since shm_as_ndarray
will not use the locking anyhow. If you need locking, you would set lock to True and call acquire/release on sa
.
Also, if your array is not 1-dimensional, you might want to transfer the shape along with sa (e.g. use args = (sa, a.shape)
).
This solution has the advantage that it does not need additional packages or extension modules, except multiprocessing (which is in the standard library).