Making my NumPy array shared across processes Making my NumPy array shared across processes numpy numpy

Making my NumPy array shared across processes


Note that you can start out with an array of complex dtype:

In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')

and view it as an array of homogenous dtype:

In [5]: data2 = data.view('float32')

and later, convert it back to complex dtype:

In [7]: data3 = data2.view('float32, (250000,2)float32')

Changing the dtype is a very quick operation; it does not affect the underlying data, only the way NumPy interprets it. So changing the dtype is virtually costless.

So what you've read about arrays with simple (homogenous) dtypes can be readily applied to your complex dtype with the trick above.


The code below borrows many ideas from J.F. Sebastian's answer, here.

import numpy as npimport multiprocessing as mpimport contextlibimport ctypesimport structimport base64def decode(arg):    chunk, counter = arg    print len(chunk), counter    for x in chunk:        peak_counter = 0        data_buff = base64.b64decode(x)        buff_size = len(data_buff) / 4        unpack_format = ">%dL" % buff_size        index = 0        for y in struct.unpack(unpack_format, data_buff):            buff1 = struct.pack("I", y)            buff2 = struct.unpack("f", buff1)[0]            with shared_arr.get_lock():                data = tonumpyarray(shared_arr).view(                    [('f0', '<f4'), ('f1', '<f4', (250000, 2))])                if (index % 2 == 0):                    data[counter][1][peak_counter][0] = float(buff2)                else:                    data[counter][1][peak_counter][1] = float(buff2)                    peak_counter += 1            index += 1        counter += 1def pool_init(shared_arr_):    global shared_arr    shared_arr = shared_arr_  # must be inherited, not passed as an argumentdef tonumpyarray(mp_arr):    return np.frombuffer(mp_arr.get_obj())def numpy_array(shared_arr, peaks):    """Fills the NumPy array 'data' with m/z-intensity values acquired    from b64 decoding and unpacking the binary string read from the    mzXML file, which is stored in the list 'peaks'.    The m/z values are assumed to be ordered without validating this    assumption.    Note: This function uses multi-processing    """    processors = mp.cpu_count()    with contextlib.closing(mp.Pool(processes=processors,                                    initializer=pool_init,                                    initargs=(shared_arr, ))) as pool:        chunk_size = int(len(peaks) / processors)        map_parameters = []        for i in range(processors):            counter = i * chunk_size            # WARNING: I removed -1 from (i + 1)*chunk_size, since the right            # index is non-inclusive.             chunk = peaks[i*chunk_size : (i + 1)*chunk_size]            map_parameters.append((chunk, counter))        pool.map(decode, map_parameters)if __name__ == '__main__':    shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)    peaks = ...    numpy_array(shared_arr, peaks)

If you can guarantee that the various processes which execute the assignments

if (index % 2 == 0):    data[counter][1][peak_counter][0] = float(buff2)else:    data[counter][1][peak_counter][1] = float(buff2)

never compete to alter the data in the same locations, then I believe you can actually forgo using the lock

with shared_arr.get_lock():

but I don't grok your code well enough to know for sure, so to be on the safe side, I included the lock.


from multiprocessing import Process, Arrayimport numpy as npimport timeimport ctypesdef fun(a):    a[0] = -a[0]    while 1:        time.sleep(2)        #print bytearray(a.get_obj())        c=np.frombuffer(a.get_obj(),dtype=np.float32)        c.shape=3,3        print 'haha',cdef main():    a = np.random.rand(3,3).astype(np.float32)    a.shape=1*a.size    #a=np.array([[1,3,4],[4,5,6]])    #b=bytearray(a)    h=Array(ctypes.c_float,a)    print "Originally,",h    # Create, start, and finish the child process    p = Process(target=fun, args=(h,))    p.start()    #p.join()    a.shape=3,3    # Print out the changed values    print 'first',a    time.sleep(3)    #h[0]=h[0]+1    print 'main',np.frombuffer(h.get_obj(), dtype=np.float32)if __name__=="__main__":    main()