Sharing a result queue among several processes Sharing a result queue among several processes python python

Sharing a result queue among several processes


Try using multiprocessing.Manager to manage your queue and to also make it accessible to different workers.

import multiprocessingdef worker(name, que):    que.put("%d is done" % name)if __name__ == '__main__':    pool = multiprocessing.Pool(processes=3)    m = multiprocessing.Manager()    q = m.Queue()    workers = pool.apply_async(worker, (33, q))


multiprocessing.Pool already has a shared result-queue, there is no need to additionally involve a Manager.Queue. Manager.Queue is a queue.Queue (multithreading-queue) under the hood, located on a separate server-process and exposed via proxies. This adds additional overhead compared to Pool's internal queue. Contrary to relying on Pool's native result-handling, the results in the Manager.Queue also are not guaranteed to be ordered.

The worker processes are not started with .apply_async(), this already happens when you instantiate Pool. What is startedwhen you call pool.apply_async() is a new "job". Pool's worker-processes run the multiprocessing.pool.worker-function under the hood. This function takes care of processing new "tasks" transferred over Pool's internal Pool._inqueue and of sending results back to the parent over the Pool._outqueue. Your specified func will be executed within multiprocessing.pool.worker. func only has to return something and the result will be automatically send back to the parent.

.apply_async() immediately (asynchronously) returns a AsyncResult object (alias for ApplyResult). You need to call .get() (is blocking) on that object to receive the actual result. Another option would be to register a callback function, which gets fired as soon as the result becomes ready.

from multiprocessing import Pooldef busy_foo(i):    """Dummy function simulating cpu-bound work."""    for _ in range(int(10e6)):  # do stuff        pass    return iif __name__ == '__main__':    with Pool(4) as pool:        print(pool._outqueue)  # DEMO        results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]        # `.apply_async()` immediately returns AsyncResult (ApplyResult) object        print(results[0])  # DEMO        results = [res.get() for res in results]        print(f'result: {results}')       

Example Output:

<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0><multiprocessing.pool.ApplyResult object at 0x7fa12586da20>result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Note: Specifying the timeout-parameter for .get() will not stop the actual processing of the task within the worker, it only unblocks the waiting parent by raising a multiprocessing.TimeoutError.