multiprocessing.Pool() slower than just using ordinary functions
These problems usually boil down to the following:
The function you are trying to parallelize doesn't require enough CPU resources (i.e. CPU time) to rationalize parallelization!
Sure, when you parallelize with multiprocessing.Pool(8)
, you theoretically (but not practically) could get a 8x speed up.
However, keep in mind that this isn't free - you gain this parallelization at the expense of the following overhead:
- Creating a
task
for everychunk
(of sizechunksize
) in youriter
passed toPool.map(f, iter)
- For each
task
- Serialize the
task
, and thetask's
return value (thinkpickle.dumps()
) - Deserialize the
task
, and thetask's
return value (thinkpickle.loads()
) - Waste significant time waiting for
Locks
on shared memoryQueues
, while worker processes and parent processesget()
andput()
from/to theseQueues
.
- Serialize the
- One-time cost of calls to
os.fork()
for each worker process, which is expensive.
In essence, when using Pool()
you want:
- High CPU resource requirements
- Low data footprint passed to each function call
- Reasonably long
iter
to justify the one-time cost of (3) above.
For a more in-depth exploration, this post and linked talk walk-through how large data being passed to Pool.map()
(and friends) gets you into trouble.
Raymond Hettinger also talks about proper use of Python's concurrency here.
My best guess is inter-process communication (IPC) overhead. In the single-process instance, the single process has the word list. When delegating to various other processes, the main process needs to constantly shuttle sections of the list to other processes.
Thus, it follows that a better approach might be to spin off n processes, each of which is responsible for loading/generating 1/n segment of the list and checking if the word is in that part of the list.
I'm not sure how to do that with Python's multiprocessing library, though.
I experienced something similar with the Pool on a different problem. I'm not sure of the actual cause at this point...
The Answer edit by OP Karim Bahgat is the same solution that worked for me. After switching to a Process & Queue system, I was able to see speedups inline with the number of cores for a machine.
Here's an example.
def do_something(data): return data * 2def consumer(inQ, outQ): while True: try: # get a new message val = inQ.get() # this is the 'TERM' signal if val is None: break; # unpack the message pos = val[0] # its helpful to pass in/out the pos in the array data = val[1] # process the data ret = do_something(data) # send the response / results outQ.put( (pos, ret) ) except Exception, e: print "error!", e breakdef process_data(data_list, inQ, outQ): # send pos/data to workers for i,dat in enumerate(data_list): inQ.put( (i,dat) ) # process results for i in range(len(data_list)): ret = outQ.get() pos = ret[0] dat = ret[1] data_list[pos] = datdef main(): # initialize things n_workers = 4 inQ = mp.Queue() outQ = mp.Queue() # instantiate workers workers = [mp.Process(target=consumer, args=(inQ,outQ)) for i in range(n_workers)] # start the workers for w in workers: w.start() # gather some data data_list = [ d for d in range(1000)] # lets process the data a few times for i in range(4): process_data(data_list) # tell all workers, no more data (one msg for each) for i in range(n_workers): inQ.put(None) # join on the workers for w in workers: w.join() # print out final results (i*16) for i,dat in enumerate(data_list): print i, dat