How to use multiprocessing queue in Python?
My main problem is that I really don't know how to implement multiprocessing.queue correctly, you cannot really instantiate the object for each process since they will be separate queues, how do you make sure that all processes relate to a shared queue (or in this case, queues)
This is a simple example of a reader and writer sharing a single queue... The writer sends a bunch of integers to the reader; when the writer runs out of numbers, it sends 'DONE', which lets the reader know to break out of the read loop.
from multiprocessing import Process, Queueimport timeimport sysdef reader_proc(queue): ## Read from the queue; this will be spawned as a separate Process while True: msg = queue.get() # Read from the queue and do nothing if (msg == 'DONE'): breakdef writer(count, queue): ## Write to the queue for ii in range(0, count): queue.put(ii) # Write 'count' numbers into the queue queue.put('DONE')if __name__=='__main__': pqueue = Queue() # writer() writes to pqueue from _this_ process for count in [10**4, 10**5, 10**6]: ### reader_proc() reads from pqueue as a separate process reader_p = Process(target=reader_proc, args=((pqueue),)) reader_p.daemon = True reader_p.start() # Launch reader_proc() as a separate python process _start = time.time() writer(count, pqueue) # Send a lot of stuff to reader() reader_p.join() # Wait for the reader to finish print("Sending {0} numbers to Queue() took {1} seconds".format(count, (time.time() - _start)))
Here's a dead simple usage of multiprocessing.Queue
and multiprocessing.Process
that allows callers to send an "event" plus arguments to a separate process that dispatches the event to a "do_" method on the process. (Python 3.4+)
import multiprocessing as mpimport collectionsMsg = collections.namedtuple('Msg', ['event', 'args'])class BaseProcess(mp.Process): """A process backed by an internal queue for simple one-way message passing. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.queue = mp.Queue() def send(self, event, *args): """Puts the event and args as a `Msg` on the queue """ msg = Msg(event, args) self.queue.put(msg) def dispatch(self, msg): event, args = msg handler = getattr(self, "do_%s" % event, None) if not handler: raise NotImplementedError("Process has no handler for [%s]" % event) handler(*args) def run(self): while True: msg = self.queue.get() self.dispatch(msg)
Usage:
class MyProcess(BaseProcess): def do_helloworld(self, arg1, arg2): print(arg1, arg2)if __name__ == "__main__": process = MyProcess() process.start() process.send('helloworld', 'hello', 'world')
The send
happens in the parent process, the do_*
happens in the child process.
I left out any exception handling that would obviously interrupt the run loop and exit the child process. You can also customize it by overriding run
to control blocking or whatever else.
This is really only useful in situations where you have a single worker process, but I think it's a relevant answer to this question to demonstrate a common scenario with a little more object-orientation.
I had a look at multiple answers across stack overflow and the web while trying to set-up a way of doing multiprocessing using queues for passing around large pandas dataframes. It seemed to me that every answer was re-iterating the same kind of solutions without any consideration of the multitude of edge cases one will definitely come across when setting up calculations like these. The problem is that there is many things at play at the same time. The number of tasks, the number of workers, the duration of each task and possible exceptions during task execution. All of these make synchronization tricky and most answers do not address how you can go about it. So this is my take after fiddling around for a few hours, hopefully this will be generic enough for most people to find it useful.
Some thoughts before any coding examples. Since queue.Empty
or queue.qsize()
or any other similar method is unreliable for flow control, any code of the like
while True: try: task = pending_queue.get_nowait() except queue.Empty: break
is bogus. This will kill the worker even if milliseconds later another task turns up in the queue. The worker will not recover and after a while ALL the workers will disappear as they randomly find the queue momentarily empty. The end result will be that the main multiprocessing function (the one with the join() on the processes) will return without all the tasks having completed. Nice. Good luck debugging through that if you have thousands of tasks and a few are missing.
The other issue is the use of sentinel values. Many people have suggested adding a sentinel value in the queue to flag the end of the queue. But to flag it to whom exactly? If there is N workers, assuming N is the number of cores available give or take, then a single sentinel value will only flag the end of the queue to one worker. All the other workers will sit waiting for more work when there is none left. Typical examples I've seen are
while True: task = pending_queue.get() if task == SOME_SENTINEL_VALUE: break
One worker will get the sentinel value while the rest will wait indefinitely. No post I came across mentioned that you need to submit the sentinel value to the queue AT LEAST as many times as you have workers so that ALL of them get it.
The other issue is the handling of exceptions during task execution. Again these should be caught and managed. Moreover, if you have a completed_tasks
queue you should independently count in a deterministic way how many items are in the queue before you decide that the job is done. Again relying on queue sizes is bound to fail and returns unexpected results.
In the example below, the par_proc()
function will receive a list of tasks including the functions with which these tasks should be executed alongside any named arguments and values.
import multiprocessing as mpimport dill as pickleimport queueimport timeimport psutilSENTINEL = Nonedef do_work(tasks_pending, tasks_completed): # Get the current worker's name worker_name = mp.current_process().name while True: try: task = tasks_pending.get_nowait() except queue.Empty: print(worker_name + ' found an empty queue. Sleeping for a while before checking again...') time.sleep(0.01) else: try: if task == SENTINEL: print(worker_name + ' no more work left to be done. Exiting...') break print(worker_name + ' received some work... ') time_start = time.perf_counter() work_func = pickle.loads(task['func']) result = work_func(**task['task']) tasks_completed.put({work_func.__name__: result}) time_end = time.perf_counter() - time_start print(worker_name + ' done in {} seconds'.format(round(time_end, 5))) except Exception as e: print(worker_name + ' task failed. ' + str(e)) tasks_completed.put({work_func.__name__: None})def par_proc(job_list, num_cpus=None): # Get the number of cores if not num_cpus: num_cpus = psutil.cpu_count(logical=False) print('* Parallel processing') print('* Running on {} cores'.format(num_cpus)) # Set-up the queues for sending and receiving data to/from the workers tasks_pending = mp.Queue() tasks_completed = mp.Queue() # Gather processes and results here processes = [] results = [] # Count tasks num_tasks = 0 # Add the tasks to the queue for job in job_list: for task in job['tasks']: expanded_job = {} num_tasks = num_tasks + 1 expanded_job.update({'func': pickle.dumps(job['func'])}) expanded_job.update({'task': task}) tasks_pending.put(expanded_job) # Use as many workers as there are cores (usually chokes the system so better use less) num_workers = num_cpus # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more # work left to be done. for c in range(num_workers): tasks_pending.put(SENTINEL) print('* Number of tasks: {}'.format(num_tasks)) # Set-up and start the workers for c in range(num_workers): p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed)) p.name = 'worker' + str(c) processes.append(p) p.start() # Gather the results completed_tasks_counter = 0 while completed_tasks_counter < num_tasks: results.append(tasks_completed.get()) completed_tasks_counter = completed_tasks_counter + 1 for p in processes: p.join() return results
And here is a test to run the above code against
def test_parallel_processing(): def heavy_duty1(arg1, arg2, arg3): return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert job1 == 15 assert job2 == 21
plus another one with some exceptions
def test_parallel_processing_exceptions(): def heavy_duty1_raises(arg1, arg2, arg3): raise ValueError('Exception raised') return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert not job1 assert job2 == 21
Hope that is helpful.