Can I somehow share an asynchronous queue with a subprocess?
Here is an implementation of a multiprocessing.Queue
object that can be used with asyncio
. It provides the entire multiprocessing.Queue
interface, with the addition of coro_get
and coro_put
methods, which are asyncio.coroutine
s that can be used to asynchronously get/put from/into the queue. The implementation details are essentially the same as the second example of my other answer: ThreadPoolExecutor
is used to make the get/put asynchronous, and a multiprocessing.managers.SyncManager.Queue
is used to share the queue between processes. The only additional trick is implementing __getstate__
to keep the object picklable despite using a non-picklable ThreadPoolExecutor
as an instance variable.
from multiprocessing import Manager, cpu_countfrom concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutordef AsyncProcessQueue(maxsize=0): m = Manager() q = m.Queue(maxsize=maxsize) return _ProcQueue(q) class _ProcQueue(object): def __init__(self, q): self._queue = q self._real_executor = None self._cancelled_join = False @property def _executor(self): if not self._real_executor: self._real_executor = ThreadPoolExecutor(max_workers=cpu_count()) return self._real_executor def __getstate__(self): self_dict = self.__dict__ self_dict['_real_executor'] = None return self_dict def __getattr__(self, name): if name in ['qsize', 'empty', 'full', 'put', 'put_nowait', 'get', 'get_nowait', 'close']: return getattr(self._queue, name) else: raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) @asyncio.coroutine def coro_put(self, item): loop = asyncio.get_event_loop() return (yield from loop.run_in_executor(self._executor, self.put, item)) @asyncio.coroutine def coro_get(self): loop = asyncio.get_event_loop() return (yield from loop.run_in_executor(self._executor, self.get)) def cancel_join_thread(self): self._cancelled_join = True self._queue.cancel_join_thread() def join_thread(self): self._queue.join_thread() if self._real_executor and not self._cancelled_join: self._real_executor.shutdown()@asyncio.coroutinedef _do_coro_proc_work(q, stuff, stuff2): ok = stuff + stuff2 print("Passing %s to parent" % ok) yield from q.coro_put(ok) # Non-blocking item = q.get() # Can be used with the normal blocking API, too print("got %s back from parent" % item)def do_coro_proc_work(q, stuff, stuff2): loop = asyncio.get_event_loop() loop.run_until_complete(_do_coro_proc_work(q, stuff, stuff2))@asyncio.coroutinedef do_work(q): loop.run_in_executor(ProcessPoolExecutor(max_workers=1), do_coro_proc_work, q, 1, 2) item = yield from q.coro_get() print("Got %s from worker" % item) item = item + 25 q.put(item)if __name__ == "__main__": q = AsyncProcessQueue() loop = asyncio.get_event_loop() loop.run_until_complete(do_work(q))
Output:
Passing 3 to parentGot 3 from workergot 28 back from parent
As you can see, you can use the AsyncProcessQueue
both synchronously and asynchronously, from either the parent or child process. It doesn't require any global state, and by encapsulating most of the complexity in a class, is more elegant to use than my original answer.
You'll probably be able to get better performance using sockets directly, but getting that working in a cross-platform way seems to be pretty tricky. This also has the advantage of being usable across multiple workers, won't require you to pickle/unpickle yourself, etc.
The multiprocessing
library isn't particularly well-suited for use with asyncio
, unfortunately. Depending on how you were planning to use the multiprocessing
/multprocessing.Queue
, however, you may be able to replace it completely with a concurrent.futures.ProcessPoolExecutor
:
import asynciofrom concurrent.futures import ProcessPoolExecutordef do_proc_work(stuff, stuff2): # This runs in a separate process return stuff + stuff2@asyncio.coroutinedef do_work(): out = yield from loop.run_in_executor(ProcessPoolExecutor(max_workers=1), do_proc_work, 1, 2) print(out)if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(do_work())
Output:
3
If you absolutely need a multiprocessing.Queue
, It seems like it will behave ok when combined with ProcessPoolExecutor
:
import asyncioimport timeimport multiprocessingfrom concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutordef do_proc_work(q, stuff, stuff2): ok = stuff + stuff2 time.sleep(5) # Artificial delay to show that it's running asynchronously print("putting output in queue") q.put(ok)@asyncio.coroutinedef async_get(q): """ Calls q.get() in a separate Thread. q.get is an I/O call, so it should release the GIL. Ideally there would be a real non-blocking I/O-based Queue.get call that could be used as a coroutine instead of this, but I don't think one exists. """ return (yield from loop.run_in_executor(ThreadPoolExecutor(max_workers=1), q.get))@asyncio.coroutinedef do_work(q): loop.run_in_executor(ProcessPoolExecutor(max_workers=1), do_proc_work, q, 1, 2) coro = async_get(q) # You could do yield from here; I'm not just to show that it's asynchronous print("Getting queue result asynchronously") print((yield from coro))if __name__ == "__main__": m = multiprocessing.Manager() q = m.Queue() # The queue must be inherited by our worker, it can't be explicitly passed in loop = asyncio.get_event_loop() loop.run_until_complete(do_work(q))
Output:
Getting queue result asynchronouslyputting output in queue3
aiopipe (https://pypi.org/project/aiopipe/) looks like it hits the nail on the head here.
At least it helped me..