parallelized algorithm for evaluating a 1-d array of functions on a same-length 1d numpy array parallelized algorithm for evaluating a 1-d array of functions on a same-length 1d numpy array numpy numpy

parallelized algorithm for evaluating a 1-d array of functions on a same-length 1d numpy array


Warning/Caveat:

You may not want to apply multiprocessing to this problem. You'll find that relatively simple operations on large arrays, the problems will be memory bound with numpy. The bottleneck is moving data from RAM to the CPU caches. The CPU is starved for data, so throwing more CPUs at the problem doesn't help much. Furthermore, your current approach will pickle and make a copy of the entire array for every item in your input sequence, which adds lots of overhead.

There are plenty of cases where numpy + multiprocessing is very effective, but you need to make sure you're working with a CPU-bound problem. Ideally, it's a CPU-bound problem with relatively small inputs and outputs to alleviate the overhead of pickling the input and output. For many of the problems that numpy is most often used for, that's not the case.


Two Problems with Your Current Approach

On to answering your question:

Your immediate error is due to passing in a function that's not accessible from the global scope (i.e. a function defined inside a function).

However, you have another issue. You're treating the numpy arrays as though they're shared memory that can be modified by each process. Instead, when using multiprocessing the original array will be pickled (effectively making a copy) and passed to each process independently. The original array will never be modified.


Avoiding the PicklingError

As a minimal example to reproduce your error, consider the following:

import multiprocessingdef apply_parallel(input_sequence):    def func(x):        pass    pool = multiprocessing.Pool()    pool.map(func, input_sequence)    pool.close()foo = range(100)apply_parallel(foo)

This will result in:

PicklingError: Can't pickle <type 'function'>: attribute lookup                __builtin__.function failed

Of course, in this simple example, we could simply move the function definition back into the __main__ namespace. However, in yours, you need it to refer to data that's passed in. Let's look at an example that's a bit closer to what you're doing:

import numpy as npimport multiprocessingdef parallel_rolling_mean(data, window):    data = np.pad(data, window, mode='edge')    ind = np.arange(len(data)) + window    def func(i):        return data[i-window:i+window+1].mean()    pool = multiprocessing.Pool()    result = pool.map(func, ind)    pool.close()    return resultfoo = np.random.rand(20).cumsum()result = parallel_rolling_mean(foo, 10)

There are multiple ways you could handle this, but a common approach is something like:

import numpy as npimport multiprocessingclass RollingMean(object):    def __init__(self, data, window):        self.data = np.pad(data, window, mode='edge')        self.window = window    def __call__(self, i):        start = i - self.window        stop = i + self.window + 1        return self.data[start:stop].mean()def parallel_rolling_mean(data, window):    func = RollingMean(data, window)    ind = np.arange(len(data)) + window    pool = multiprocessing.Pool()    result = pool.map(func, ind)    pool.close()    return resultfoo = np.random.rand(20).cumsum()result = parallel_rolling_mean(foo, 10)

Great! It works!


However, if you scale this up to large arrays, you'll soon find that it will either run very slow (which you can speed up by increasing chunksize in the pool.map call) or you'll quickly run out of RAM (once you increase the chunksize).

multiprocessing pickles the input so that it can be passed to separate and independent python processes. This means you're making a copy of the entire array for every i you operate on.

We'll come back to this point in a bit...


multiprocessing Does not share memory between processes

The multiprocessing module works by pickling the inputs and passing them to independent processes. This means that if you modify something in one process the other process won't see the modification.

However, multiprocessing also provides primitives that live in shared memory and can be accessed and modified by child processes. There are a few different ways of adapting numpy arrays to use a shared memory multiprocessing.Array. However, I'd recommend avoiding those at first (read up on false sharing if you're not familiar with it). There are cases where it's very useful, but it's typically to conserve memory, not to improve performance.

Therefore, it's best to do all modifications to a large array in a single process (this is also a very useful pattern for general IO). It doesn't have to be the "main" process, but it's easiest to think about that way.

As an example, let's say we wanted to have our parallel_rolling_mean function take an output array to store things in. A useful pattern is something similar to the following. Notice the use of iterators and modifying the output only in the main process:

import numpy as npimport multiprocessingdef parallel_rolling_mean(data, window, output):    def windows(data, window):        padded = np.pad(data, window, mode='edge')        for i in xrange(len(data)):            yield padded[i:i + 2*window + 1]    pool = multiprocessing.Pool()    results = pool.imap(np.mean, windows(data, window))    for i, result in enumerate(results):        output[i] = result    pool.close()foo = np.random.rand(20000000).cumsum()output = np.zeros_like(foo)parallel_rolling_mean(foo, 10, output)print output

Hopefully that example helps clarify things a bit.


chunksize and performance

One quick note on performance: If we scale this up, it will get very slow very quickly. If you look at a system monitor (e.g. top/htop), you may notice that your cores are sitting idle most of the time.

By default, the master process pickles each input for each process and passes it in immediately and then waits until they're finished to pickle the next input. In many cases, this means that the master process works, then sits idle while the worker processes are busy, then the worker processes sit idle while the master process is pickling the next input.

The key is to increase the chunksize parameter. This will cause pool.imap to "pre-pickle" the specified number of inputs for each process. Basically, the master thread can stay busy pickling inputs and the worker processes can stay busy processing. The downside is that you're using more memory. If each input uses up a large amount of RAM, this can be a bad idea. If it doesn't, though, this can dramatically speed things up.

As a quick example:

import numpy as npimport multiprocessingdef parallel_rolling_mean(data, window, output):    def windows(data, window):        padded = np.pad(data, window, mode='edge')        for i in xrange(len(data)):            yield padded[i:i + 2*window + 1]    pool = multiprocessing.Pool()    results = pool.imap(np.mean, windows(data, window), chunksize=1000)    for i, result in enumerate(results):        output[i] = result    pool.close()foo = np.random.rand(2000000).cumsum()output = np.zeros_like(foo)parallel_rolling_mean(foo, 10, output)print output

With chunksize=1000, it takes 21 seconds to process a 2-million element array:

python ~/parallel_rolling_mean.py  83.53s user 1.12s system 401% cpu 21.087 total

But with chunksize=1 (the default) it takes about eight times as long (2 minutes, 41 seconds).

python ~/parallel_rolling_mean.py  358.26s user 53.40s system 246% cpu 2:47.09 total

In fact, with the default chunksize, it's actually far worse than a single-process implementation of the same thing, which takes only 45 seconds:

python ~/sequential_rolling_mean.py  45.11s user 0.06s system 99% cpu 45.187 total