How to use multiprocessing queue in Python? How to use multiprocessing queue in Python? multithreading multithreading

How to use multiprocessing queue in Python?


My main problem is that I really don't know how to implement multiprocessing.queue correctly, you cannot really instantiate the object for each process since they will be separate queues, how do you make sure that all processes relate to a shared queue (or in this case, queues)

This is a simple example of a reader and writer sharing a single queue... The writer sends a bunch of integers to the reader; when the writer runs out of numbers, it sends 'DONE', which lets the reader know to break out of the read loop.

from multiprocessing import Process, Queueimport timeimport sysdef reader_proc(queue):    ## Read from the queue; this will be spawned as a separate Process    while True:        msg = queue.get()         # Read from the queue and do nothing        if (msg == 'DONE'):            breakdef writer(count, queue):    ## Write to the queue    for ii in range(0, count):        queue.put(ii)             # Write 'count' numbers into the queue    queue.put('DONE')if __name__=='__main__':    pqueue = Queue() # writer() writes to pqueue from _this_ process    for count in [10**4, 10**5, 10**6]:                     ### reader_proc() reads from pqueue as a separate process        reader_p = Process(target=reader_proc, args=((pqueue),))        reader_p.daemon = True        reader_p.start()        # Launch reader_proc() as a separate python process        _start = time.time()        writer(count, pqueue)    # Send a lot of stuff to reader()        reader_p.join()         # Wait for the reader to finish        print("Sending {0} numbers to Queue() took {1} seconds".format(count,             (time.time() - _start)))


Here's a dead simple usage of multiprocessing.Queue and multiprocessing.Process that allows callers to send an "event" plus arguments to a separate process that dispatches the event to a "do_" method on the process. (Python 3.4+)

import multiprocessing as mpimport collectionsMsg = collections.namedtuple('Msg', ['event', 'args'])class BaseProcess(mp.Process):    """A process backed by an internal queue for simple one-way message passing.    """    def __init__(self, *args, **kwargs):        super().__init__(*args, **kwargs)        self.queue = mp.Queue()    def send(self, event, *args):        """Puts the event and args as a `Msg` on the queue        """       msg = Msg(event, args)       self.queue.put(msg)    def dispatch(self, msg):        event, args = msg        handler = getattr(self, "do_%s" % event, None)        if not handler:            raise NotImplementedError("Process has no handler for [%s]" % event)        handler(*args)    def run(self):        while True:            msg = self.queue.get()            self.dispatch(msg)

Usage:

class MyProcess(BaseProcess):    def do_helloworld(self, arg1, arg2):        print(arg1, arg2)if __name__ == "__main__":    process = MyProcess()    process.start()    process.send('helloworld', 'hello', 'world')

The send happens in the parent process, the do_* happens in the child process.

I left out any exception handling that would obviously interrupt the run loop and exit the child process. You can also customize it by overriding run to control blocking or whatever else.

This is really only useful in situations where you have a single worker process, but I think it's a relevant answer to this question to demonstrate a common scenario with a little more object-orientation.


I had a look at multiple answers across stack overflow and the web while trying to set-up a way of doing multiprocessing using queues for passing around large pandas dataframes. It seemed to me that every answer was re-iterating the same kind of solutions without any consideration of the multitude of edge cases one will definitely come across when setting up calculations like these. The problem is that there is many things at play at the same time. The number of tasks, the number of workers, the duration of each task and possible exceptions during task execution. All of these make synchronization tricky and most answers do not address how you can go about it. So this is my take after fiddling around for a few hours, hopefully this will be generic enough for most people to find it useful.

Some thoughts before any coding examples. Since queue.Empty or queue.qsize() or any other similar method is unreliable for flow control, any code of the like

while True:    try:        task = pending_queue.get_nowait()    except queue.Empty:        break

is bogus. This will kill the worker even if milliseconds later another task turns up in the queue. The worker will not recover and after a while ALL the workers will disappear as they randomly find the queue momentarily empty. The end result will be that the main multiprocessing function (the one with the join() on the processes) will return without all the tasks having completed. Nice. Good luck debugging through that if you have thousands of tasks and a few are missing.

The other issue is the use of sentinel values. Many people have suggested adding a sentinel value in the queue to flag the end of the queue. But to flag it to whom exactly? If there is N workers, assuming N is the number of cores available give or take, then a single sentinel value will only flag the end of the queue to one worker. All the other workers will sit waiting for more work when there is none left. Typical examples I've seen are

while True:    task = pending_queue.get()    if task == SOME_SENTINEL_VALUE:        break

One worker will get the sentinel value while the rest will wait indefinitely. No post I came across mentioned that you need to submit the sentinel value to the queue AT LEAST as many times as you have workers so that ALL of them get it.

The other issue is the handling of exceptions during task execution. Again these should be caught and managed. Moreover, if you have a completed_tasks queue you should independently count in a deterministic way how many items are in the queue before you decide that the job is done. Again relying on queue sizes is bound to fail and returns unexpected results.

In the example below, the par_proc() function will receive a list of tasks including the functions with which these tasks should be executed alongside any named arguments and values.

import multiprocessing as mpimport dill as pickleimport queueimport timeimport psutilSENTINEL = Nonedef do_work(tasks_pending, tasks_completed):    # Get the current worker's name    worker_name = mp.current_process().name    while True:        try:            task = tasks_pending.get_nowait()        except queue.Empty:            print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')            time.sleep(0.01)        else:            try:                if task == SENTINEL:                    print(worker_name + ' no more work left to be done. Exiting...')                    break                print(worker_name + ' received some work... ')                time_start = time.perf_counter()                work_func = pickle.loads(task['func'])                result = work_func(**task['task'])                tasks_completed.put({work_func.__name__: result})                time_end = time.perf_counter() - time_start                print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))            except Exception as e:                print(worker_name + ' task failed. ' + str(e))                tasks_completed.put({work_func.__name__: None})def par_proc(job_list, num_cpus=None):    # Get the number of cores    if not num_cpus:        num_cpus = psutil.cpu_count(logical=False)    print('* Parallel processing')    print('* Running on {} cores'.format(num_cpus))    # Set-up the queues for sending and receiving data to/from the workers    tasks_pending = mp.Queue()    tasks_completed = mp.Queue()    # Gather processes and results here    processes = []    results = []    # Count tasks    num_tasks = 0    # Add the tasks to the queue    for job in job_list:        for task in job['tasks']:            expanded_job = {}            num_tasks = num_tasks + 1            expanded_job.update({'func': pickle.dumps(job['func'])})            expanded_job.update({'task': task})            tasks_pending.put(expanded_job)    # Use as many workers as there are cores (usually chokes the system so better use less)    num_workers = num_cpus    # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more    # work left to be done.    for c in range(num_workers):        tasks_pending.put(SENTINEL)    print('* Number of tasks: {}'.format(num_tasks))    # Set-up and start the workers    for c in range(num_workers):        p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))        p.name = 'worker' + str(c)        processes.append(p)        p.start()    # Gather the results    completed_tasks_counter = 0    while completed_tasks_counter < num_tasks:        results.append(tasks_completed.get())        completed_tasks_counter = completed_tasks_counter + 1    for p in processes:        p.join()    return results

And here is a test to run the above code against

def test_parallel_processing():    def heavy_duty1(arg1, arg2, arg3):        return arg1 + arg2 + arg3    def heavy_duty2(arg1, arg2, arg3):        return arg1 * arg2 * arg3    task_list = [        {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},    ]    results = par_proc(task_list)    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])    assert job1 == 15    assert job2 == 21

plus another one with some exceptions

def test_parallel_processing_exceptions():    def heavy_duty1_raises(arg1, arg2, arg3):        raise ValueError('Exception raised')        return arg1 + arg2 + arg3    def heavy_duty2(arg1, arg2, arg3):        return arg1 * arg2 * arg3    task_list = [        {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},    ]    results = par_proc(task_list)    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])    assert not job1    assert job2 == 21

Hope that is helpful.