What kind of problems (if any) would there be combining asyncio with multiprocessing? What kind of problems (if any) would there be combining asyncio with multiprocessing? multithreading multithreading

What kind of problems (if any) would there be combining asyncio with multiprocessing?


You should be able to safely combine asyncio and multiprocessing without too much trouble, though you shouldn't be using multiprocessing directly. The cardinal sin of asyncio (and any other event-loop based asynchronous framework) is blocking the event loop. If you try to use multiprocessing directly, any time you block to wait for a child process, you're going to block the event loop. Obviously, this is bad.

The simplest way to avoid this is to use BaseEventLoop.run_in_executor to execute a function in a concurrent.futures.ProcessPoolExecutor. ProcessPoolExecutor is a process pool implemented using multiprocessing.Process, but asyncio has built-in support for executing a function in it without blocking the event loop. Here's a simple example:

import timeimport asynciofrom concurrent.futures import ProcessPoolExecutordef blocking_func(x):   time.sleep(x) # Pretend this is expensive calculations   return x * 5@asyncio.coroutinedef main():    #pool = multiprocessing.Pool()    #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.    executor = ProcessPoolExecutor()    out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not    print(out)if __name__ == "__main__":    loop = asyncio.get_event_loop()    loop.run_until_complete(main())

For the majority of cases, this is function alone is good enough. If you find yourself needing other constructs from multiprocessing, like Queue, Event, Manager, etc., there is a third-party library called aioprocessing (full disclosure: I wrote it), that provides asyncio-compatible versions of all the multiprocessing data structures. Here's an example demoing that:

import timeimport asyncioimport aioprocessingimport multiprocessingdef func(queue, event, lock, items):    with lock:        event.set()        for item in items:            time.sleep(3)            queue.put(item+5)    queue.close()@asyncio.coroutinedef example(queue, event, lock):    l = [1,2,3,4,5]    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))     p.start()    while True:        result = yield from queue.coro_get()        if result is None:            break        print("Got result {}".format(result))    yield from p.coro_join()@asyncio.coroutinedef example2(queue, event, lock):    yield from event.coro_wait()    with (yield from lock):        yield from queue.coro_put(78)        yield from queue.coro_put(None) # Shut down the workerif __name__ == "__main__":    loop = asyncio.get_event_loop()    queue = aioprocessing.AioQueue()    lock = aioprocessing.AioLock()    event = aioprocessing.AioEvent()    tasks = [         asyncio.async(example(queue, event, lock)),        asyncio.async(example2(queue, event, lock)),    ]       loop.run_until_complete(asyncio.wait(tasks))    loop.close()


Yes, there are quite a few bits that may (or may not) bite you.

  • When you run something like asyncio it expects to run on one thread or process. This does not (by itself) work with parallel processing. You somehow have to distribute the work while leaving the IO operations (specifically those on sockets) in a single thread/process.
  • While your idea to hand off individual connections to a different handler process is nice, it is hard to implement. The first obstacle is that you need a way to pull the connection out of asyncio without closing it. The next obstacle is that you cannot simply send a file descriptor to a different process unless you use platform-specific (probably Linux) code from a C-extension.
  • Note that the multiprocessing module is known to create a number of threads for communication. Most of the time when you use communication structures (such as Queues), a thread is spawned. Unfortunately those threads are not completely invisible. For instance they can fail to tear down cleanly (when you intend to terminate your program), but depending on their number the resource usage may be noticeable on its own.

If you really intend to handle individual connections in individual processes, I suggest to examine different approaches. For instance you can put a socket into listen mode and then simultaneously accept connections from multiple worker processes in parallel. Once a worker is finished processing a request, it can go accept the next connection, so you still use less resources than forking a process for each connection. Spamassassin and Apache (mpm prefork) can use this worker model for instance. It might end up easier and more robust depending on your use case. Specifically you can make your workers die after serving a configured number of requests and be respawned by a master process thereby eliminating much of the negative effects of memory leaks.


See PEP 3156, in particular the section on Thread interaction:

http://www.python.org/dev/peps/pep-3156/#thread-interaction

This documents clearly the new asyncio methods you might use, including run_in_executor(). Note that the Executor is defined in concurrent.futures, I suggest you also have a look there.