Minimize overhead in Python multiprocessing.Pool with numpy/scipy
Try to reduce interprocess communication. In the multiprocessing
module all (single-computer) interprocess communication done through Queues. Objects passed through a Queueare pickled. So try to send fewer and/or smaller objects through the Queue.
Do not send
self
, the instance ofBigData
, through the Queue. It is rather big, and gets bigger as the amount the amount of data inself
grows:In [6]: import pickleIn [14]: len(pickle.dumps(BigData(50)))Out[14]: 1052187
Everytime
pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
is called,self
is pickled in the main process and unpickled in the worker process. Thesize oflen(pickle.dumps(BigData(N)))
grows aN
increases.Let the data be read from a global variable. On Linux, you can take advantage of Copy-on-Write. As Jan-Philip Gehrcke explains:
After fork(), parent and child are in an equivalent state. It would be stupid to copy the entire memory of the parent to another place in the RAM. That's [where] the copy-on-write principle [comes] in. As long as the child does not change its memory state, it actually accesses the parent's memory. Only upon modification, the corresponding bits and pieces are copied into the memory space of the child.
Thus, you can avoid passing instances of
BigData
through the Queueby simply defining the instance as a global,bd = BigData(n)
, (as you are already doing) and referring to its values in the worker processes (e.g._do_chunk_wrapper
). It basically amounts to removingself
from the call topool.apply_async
:p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
and accessing
bd
as a global, and making the necessary attendant changes todo_chunk_wrapper
's call signature.Try to pass longer-running functions,
func
, topool.apply_async
. If you have many quickly-completing calls topool.apply_async
then the overhead of passing arguments and return values through the Queue becomes a significant part of the overall time. If instead you make fewer calls topool.apply_async
and give eachfunc
more work to do before returning a result, then interprocess communication becomes a smaller fraction of the overall time.Below, I modified
_do_chunk_wrapper
to acceptk_start
andk_end
arguments, so that each call topool.apply_async
would compute the sum for many values ofk
before returning a result.
import mathimport numpy as npimport timeimport sysimport multiprocessing as mpimport scipy.interpolate as interpolate_tm=0def stopwatch(msg=''): tm = time.time() global _tm if _tm==0: _tm = tm; return print("%s: %.2f seconds" % (msg, tm-_tm)) _tm = tmclass BigData: def __init__(self, n): z = np.random.uniform(size=n*n*n).reshape((n,n,n)) self.ff = [] for i in range(n): f = interpolate.RectBivariateSpline( np.arange(n), np.arange(n), z[i], kx=1, ky=1) self.ff.append(f) self.n = n def do_chunk(self, k, xi, yi): n = self.n s = np.sum(np.exp(self.ff[k].ev(xi, yi))) sys.stderr.write(".") return s def do_chunk_of_chunks(self, k_start, k_end, xi, yi): s = sum(np.sum(np.exp(self.ff[k].ev(xi, yi))) for k in range(k_start, k_end)) sys.stderr.write(".") return s def do_multi(self, numproc, xi, yi): procs = [] pool = mp.Pool(numproc) stopwatch('\nPool setup') ks = list(map(int, np.linspace(0, self.n, numproc+1))) for i in range(len(ks)-1): k_start, k_end = ks[i:i+2] p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi)) procs.append(p) stopwatch('Jobs queued (%d processes)' % numproc) total = 0.0 for k, p in enumerate(procs): total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt if k == 0: stopwatch("\nFirst get() done") print(total) stopwatch('Jobs done') pool.close() pool.join() return total def do_single(self, xi, yi): total = 0.0 for k in range(self.n): total += self.do_chunk(k, xi, yi) stopwatch('\nAll in single process') return totaldef _do_chunk_wrapper(k_start, k_end, xi, yi): return bd.do_chunk_of_chunks(k_start, k_end, xi, yi) if __name__ == "__main__": stopwatch() n = 50 bd = BigData(n) m = 1000*1000 xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m)) stopwatch('Initialized') bd.do_multi(2, xi, yi) bd.do_multi(3, xi, yi) bd.do_single(xi, yi)
yields
Initialized: 0.15 secondsPool setup: 0.06 secondsJobs queued (2 processes): 0.00 secondsFirst get() done: 6.56 seconds83963796.0404Jobs done: 0.55 seconds..Pool setup: 0.08 secondsJobs queued (3 processes): 0.00 secondsFirst get() done: 5.19 seconds83963796.0404Jobs done: 1.57 seconds...All in single process: 12.13 seconds
compared to the original code:
Initialized: 0.10 secondsPool setup: 0.03 secondsJobs queued (2 processes): 0.00 secondsFirst get() done: 10.47 secondsJobs done: 0.00 seconds..................................................Pool setup: 0.12 secondsJobs queued (3 processes): 0.00 secondsFirst get() done: 9.21 secondsJobs done: 0.00 seconds..................................................All in single process: 12.12 seconds