Fast Queue of read only numpy arrays Fast Queue of read only numpy arrays numpy numpy

Fast Queue of read only numpy arrays


Sharing memory between threads or processes

Use threading instead of multiprocessing

Since you're using numpy, you can take advantage of the fact that the global interpreter lock is released during numpy computations. This means you can do parallel processing with standard threads and shared memory, instead of multiprocessing and inter-process communication. Here's a version of your code, tweaked to use threading.Thread and Queue.Queue instead of multiprocessing.Process and multiprocessing.Queue. This passes a numpy ndarray via a queue without pickling it. On my computer, this runs about 3 times faster than your code. (However, it's only about 20% faster than the serial version of your code. I have suggested some other approaches further down.)

from threading import Threadfrom Queue import Queueimport numpy as npclass __EndToken(object):    passdef parallel_pipeline(buffer_size=50):    def parallel_pipeline_with_args(f):        def consumer(xs, q):            for x in xs:                q.put(x)            q.put(__EndToken())        def parallel_generator(f_xs):            q = Queue(buffer_size)            consumer_process = Thread(target=consumer,args=(f_xs,q,))            consumer_process.start()            while True:                x = q.get()                if isinstance(x, __EndToken):                    break                yield x        def f_wrapper(xs):            return parallel_generator(f(xs))        return f_wrapper    return parallel_pipeline_with_args@parallel_pipeline(3)def f(xs):    for x in xs:        yield x + 1.0@parallel_pipeline(3)def g(xs):    for x in xs:        yield x * 3@parallel_pipeline(3)def h(xs):    for x in xs:        yield x * xdef xs():    for i in range(1000):        yield np.random.uniform(0,1,(500,2000))rs = f(g(h(xs())))%time print sum(r.sum() for r in rs)  # 12.2s

Store numpy arrays in shared memory

Another option, close to what you requested, would be to continue using the multiprocessing package, but pass data between processes using arrays stored in shared memory. The code below creates a new ArrayQueue class to do that. The ArrayQueue object should be created before spawning subprocesses. It creates and manages a pool of numpy arrays backed by shared memory. When a result array is pushed onto the queue, ArrayQueue copies the data from that array into an existing shared-memory array, then passes the id of the shared-memory array through the queue. This is much faster than sending the whole array through the queue, since it avoids pickling the arrays. This has similar performance to the threaded version above (about 10% slower), and may scale better if the global interpreter lock is an issue (i.e., you run a lot of python code in the functions).

from multiprocessing import Process, Queue, Arrayimport numpy as npclass ArrayQueue(object):    def __init__(self, template, maxsize=0):        if type(template) is not np.ndarray:            raise ValueError('ArrayQueue(template, maxsize) must use a numpy.ndarray as the template.')        if maxsize == 0:            # this queue cannot be infinite, because it will be backed by real objects            raise ValueError('ArrayQueue(template, maxsize) must use a finite value for maxsize.')        # find the size and data type for the arrays        # note: every ndarray put on the queue must be this size        self.dtype = template.dtype        self.shape = template.shape        self.byte_count = len(template.data)        # make a pool of numpy arrays, each backed by shared memory,         # and create a queue to keep track of which ones are free        self.array_pool = [None] * maxsize        self.free_arrays = Queue(maxsize)        for i in range(maxsize):            buf = Array('c', self.byte_count, lock=False)            self.array_pool[i] = np.frombuffer(buf, dtype=self.dtype).reshape(self.shape)            self.free_arrays.put(i)        self.q = Queue(maxsize)    def put(self, item, *args, **kwargs):        if type(item) is np.ndarray:            if item.dtype == self.dtype and item.shape == self.shape and len(item.data)==self.byte_count:                # get the ID of an available shared-memory array                id = self.free_arrays.get()                # copy item to the shared-memory array                self.array_pool[id][:] = item                # put the array's id (not the whole array) onto the queue                new_item = id            else:                raise ValueError(                    'ndarray does not match type or shape of template used to initialize ArrayQueue'                )        else:            # not an ndarray            # put the original item on the queue (as a tuple, so we know it's not an ID)            new_item = (item,)        self.q.put(new_item, *args, **kwargs)    def get(self, *args, **kwargs):        item = self.q.get(*args, **kwargs)        if type(item) is tuple:            # unpack the original item            return item[0]        else:            # item is the id of a shared-memory array            # copy the array            arr = self.array_pool[item].copy()            # put the shared-memory array back into the pool            self.free_arrays.put(item)            return arrclass __EndToken(object):    passdef parallel_pipeline(buffer_size=50):    def parallel_pipeline_with_args(f):        def consumer(xs, q):            for x in xs:                q.put(x)            q.put(__EndToken())        def parallel_generator(f_xs):            q = ArrayQueue(template=np.zeros(0,1,(500,2000)), maxsize=buffer_size)            consumer_process = Process(target=consumer,args=(f_xs,q,))            consumer_process.start()            while True:                x = q.get()                if isinstance(x, __EndToken):                    break                yield x        def f_wrapper(xs):            return parallel_generator(f(xs))        return f_wrapper    return parallel_pipeline_with_args@parallel_pipeline(3)def f(xs):    for x in xs:        yield x + 1.0@parallel_pipeline(3)def g(xs):    for x in xs:        yield x * 3@parallel_pipeline(3)def h(xs):    for x in xs:        yield x * xdef xs():    for i in range(1000):        yield np.random.uniform(0,1,(500,2000))print "multiprocessing with shared-memory arrays:"%time print sum(r.sum() for r in f(g(h(xs()))))   # 13.5s

Parallel processing of samples instead of functions

The code above is only about 20% faster than a single-threaded version (12.2s vs. 14.8s for the serial version shown below). That is because each function is run in a single thread or process, and most of the work is done by xs(). The execution time for the example above is nearly the same as if you just ran %time print sum(1 for x in xs()).

If your real project has many more intermediate functions and/or they are more complex than the ones you showed, then the workload may be distributed better among processors, and this may not be a problem. However, if your workload really does resemble the code you provided, then you may want to refactor your code to allocate one sample to each thread instead of one function to each thread. That would look like the code below (both threading and multiprocessing versions are shown):

import multiprocessingimport threading, Queueimport numpy as npdef f(x):    return x + 1.0def g(x):    return x * 3def h(x):    return x * xdef final(i):    return f(g(h(x(i))))def final_sum(i):    return f(g(h(x(i)))).sum()def x(i):    # produce sample number i    return np.random.uniform(0, 1, (500, 2000))def rs_serial(func, n):    for i in range(n):        yield func(i)def rs_parallel_threaded(func, n):    todo = range(n)    q = Queue.Queue(2*n_workers)    def worker():        while True:            try:                # the global interpreter lock ensures only one thread does this at a time                i = todo.pop()                q.put(func(i))            except IndexError:                # none left to do                q.put(None)                break    threads = []    for j in range(n_workers):        t = threading.Thread(target=worker)        t.daemon=False        threads.append(t)   # in case it's needed later        t.start()    while True:        x = q.get()        if x is None:            break        else:            yield xdef rs_parallel_mp(func, n):    pool = multiprocessing.Pool(n_workers)    return pool.imap_unordered(func, range(n))n_workers = 4n_samples = 1000print "serial:"  # 14.8s%time print sum(r.sum() for r in rs_serial(final, n_samples))print "threaded:"  # 10.1s%time print sum(r.sum() for r in rs_parallel_threaded(final, n_samples))print "mp return arrays:"  # 19.6s%time print sum(r.sum() for r in rs_parallel_mp(final, n_samples))print "mp return results:"  # 8.4s%time print sum(r_sum for r_sum in rs_parallel_mp(final_sum, n_samples))

The threaded version of this code is only slightly faster than the first example I gave, and only about 30% faster than the serial version. That's not as much of a speedup as I would have expected; maybe Python is still getting partly bogged down by the GIL?

The multiprocessing version performs significantly faster than your original multiprocessing code, primarily because all the functions get chained together in a single process, rather than queueing (and pickling) intermediate results. However, it is still slower than the serial version because all the result arrays have to get pickled (in the worker process) and unpickled (in the main process) before being returned by imap_unordered. However, if you can arrange it so that your pipeline returns aggregate results instead of the complete arrays, then you can avoid the pickling overhead, and the multiprocessing version is fastest: about 43% faster than the serial version.

OK, now for the sake of completeness, here's a version of the second example that uses multiprocessing with your original generator functions instead of the finer-scale functions shown above. This uses some tricks to spread the samples among multiple processes, which may make it unsuitable for many workflows. But using generators does seem to be slightly faster than using the finer-scale functions, and this method can get you up to a 54% speedup vs. the serial version shown above. However, that is only available if you don't need to return the full arrays from the worker functions.

import multiprocessing, itertools, mathimport numpy as npdef f(xs):    for x in xs:        yield x + 1.0def g(xs):    for x in xs:        yield x * 3def h(xs):    for x in xs:        yield x * xdef xs():    for i in range(1000):        yield np.random.uniform(0,1,(500,2000))def final():    return f(g(h(xs())))def final_sum():    for x in f(g(h(xs()))):        yield x.sum()def get_chunk(args):    """Retrieve n values (n=args[1]) from a generator function (f=args[0]) and return them as a list.     This runs in a worker process and does all the computation."""    return list(itertools.islice(args[0](), args[1]))def parallelize(gen_func, max_items, n_workers=4, chunk_size=50):    """Pull up to max_items items from several copies of gen_func, in small groups in parallel processes.    chunk_size should be big enough to improve efficiency (one copy of gen_func will be run for each chunk)    but small enough to avoid exhausting memory (each worker will keep chunk_size items in memory)."""    pool = multiprocessing.Pool(n_workers)    # how many chunks will be needed to yield at least max_items items?    n_chunks = int(math.ceil(float(max_items)/float(chunk_size)))    # generate a suitable series of arguments for get_chunk()    args_list = itertools.repeat((gen_func, chunk_size), n_chunks)    # chunk_gen will yield a series of chunks (lists of results) from the generator function,     # totaling n_chunks * chunk_size items (which is >= max_items)    chunk_gen = pool.imap_unordered(get_chunk, args_list)    # parallel_gen flattens the chunks, and yields individual items    parallel_gen = itertools.chain.from_iterable(chunk_gen)    # limit the output to max_items items     return itertools.islice(parallel_gen, max_items)# in this case, the parallel version is slower than a single process, probably# due to overhead of gathering numpy arrays in imap_unordered (via pickle?)print "serial, return arrays:"  # 15.3s%time print sum(r.sum() for r in final())print "parallel, return arrays:"  # 24.2s%time print sum(r.sum() for r in parallelize(final, max_items=1000))# in this case, the parallel version is more than twice as fast as the single-thread versionprint "serial, return result:"  # 15.1s%time print sum(r for r in final_sum())print "parallel, return result:"  # 6.8s%time print sum(r for r in parallelize(final_sum, max_items=1000))


Your example does not seem to run on my computer, although that may have to do with the fact that I'm running windows (issues pickling anything not in __main__ namespace (anything decorated))... would something like this help? (you would have to be put pack and unpack inside each of f(), g(), and h())

Note* I'm not sure this would actually be any faster... Just a stab at what others have suggested..

from multiprocessing import Process, freeze_supportfrom multiprocessing.sharedctypes import Value, Arrayimport numpy as npdef package(arr):    shape = Array('i', arr.shape, lock=False)    if arr.dtype == float:        ctype = Value('c', b'd') #d for double #f for single    if arr.dtype == int:        ctype = Value('c', b'i') #if statements could be avoided if data is always the same    data = Array(ctype.value, arr.reshape(-1),lock=False)    return data, shapedef unpack(data, shape):    return np.array(data[:]).reshape(shape[:])#testdef f(args):    print(unpack(*args))if __name__ == '__main__':    freeze_support()    a = np.array([1,2,3,4,5])    a_packed = package(a)    print('array has been packaged')    p = Process(target=f, args=(a_packed,))    print('passing to parallel process')    p.start()    print('joining to parent process')    p.join()    print('finished')


Check out the Pathos-multiprocessing project, which avoids the standard multiprocessing reliance on pickling. This should allow you to get around both the inefficiencies of pickling, and give you access to common memory for read-only shared resources. Note that while Pathos is nearing deployment in a full pip package, in the interim I'd recommend installing with pip install git+https://github.com/uqfoundation/pathos