How to best perform Multiprocessing within requests with the python Tornado server?
If you're willing to use concurrent.futures.ProcessPoolExecutor
instead of multiprocessing
, this is actually very simple. Tornado's ioloop already supports concurrent.futures.Future
, so they'll play nicely together out of the box. concurrent.futures
is included in Python 3.2+, and has been backported to Python 2.x.
Here's an example:
import timefrom concurrent.futures import ProcessPoolExecutorfrom tornado.ioloop import IOLoopfrom tornado import gendef f(a, b, c, blah=None): print "got %s %s %s and %s" % (a, b, c, blah) time.sleep(5) return "hey there"@gen.coroutinedef test_it(): pool = ProcessPoolExecutor(max_workers=1) fut = pool.submit(f, 1, 2, 3, blah="ok") # This returns a concurrent.futures.Future print("running it asynchronously") ret = yield fut print("it returned %s" % ret) pool.shutdown()IOLoop.instance().run_sync(test_it)
Output:
running it asynchronouslygot 1 2 3 and okit returned hey there
ProcessPoolExecutor
has a more limited API than multiprocessing.Pool
, but if you don't need the more advanced features of multiprocessing.Pool
, it's worth using because the integration is so much simpler.
multiprocessing.Pool
can be integrated into the tornado
I/O loop, but it's a bit messy. A much cleaner integration can be done using concurrent.futures
(see my other answer for details), but if you're stuck on Python 2.x and can't install the concurrent.futures
backport, here is how you can do it strictly using multiprocessing
:
The multiprocessing.Pool.apply_async
and multiprocessing.Pool.map_async
methods both have an optional callback
parameter, which means that both can potentially be plugged into a tornado.gen.Task
. So in most cases, running code asynchronously in a sub-process is as simple as this:
import multiprocessingimport contextlibfrom tornado import genfrom tornado.gen import Returnfrom tornado.ioloop import IOLoopfrom functools import partialdef worker(): print "async work here"@gen.coroutinedef async_run(func, *args, **kwargs): result = yield gen.Task(pool.apply_async, func, args, kwargs) raise Return(result)if __name__ == "__main__": pool = multiprocessing.Pool(multiprocessing.cpu_count()) func = partial(async_run, worker) IOLoop().run_sync(func)
As I mentioned, this works well in most cases. But if worker()
throws an exception, callback
is never called, which means the gen.Task
never finishes, and you hang forever. Now, if you know that your work will never throw an exception (because you wrapped the whole thing in a try
/except
, for example), you can happily use this approach. However, if you want to let exceptions escape from your worker, the only solution I found was to subclass some multiprocessing components, and make them call callback
even if the worker sub-process raised an exception:
from multiprocessing.pool import ApplyResult, Pool, RUNimport multiprocessingclass TornadoApplyResult(ApplyResult): def _set(self, i, obj): self._success, self._value = obj if self._callback: self._callback(self._value) self._cond.acquire() try: self._ready = True self._cond.notify() finally: self._cond.release() del self._cache[self._job]class TornadoPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): ''' Asynchronous equivalent of `apply()` builtin This version will call `callback` even if an exception is raised by `func`. ''' assert self._state == RUN result = TornadoApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result ... if __name__ == "__main__": pool = TornadoPool(multiprocessing.cpu_count()) ...
With these changes, the exception object will be returned by the gen.Task
, rather than the gen.Task
hanging indefinitely. I also updated my async_run
method to re-raise the exception when its returned, and made some other changes to provide better tracebacks for exceptions thrown in the worker sub-processes. Here's the full code:
import multiprocessingfrom multiprocessing.pool import Pool, ApplyResult, RUNfrom functools import wrapsimport tornado.webfrom tornado.ioloop import IOLoopfrom tornado.gen import Returnfrom tornado import genclass WrapException(Exception): def __init__(self): exc_type, exc_value, exc_tb = sys.exc_info() self.exception = exc_value self.formatted = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb)) def __str__(self): return '\n%s\nOriginal traceback:\n%s' % (Exception.__str__(self), self.formatted)class TornadoApplyResult(ApplyResult): def _set(self, i, obj): self._success, self._value = obj if self._callback: self._callback(self._value) self._cond.acquire() try: self._ready = True self._cond.notify() finally: self._cond.release() del self._cache[self._job] class TornadoPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): ''' Asynchronous equivalent of `apply()` builtin This version will call `callback` even if an exception is raised by `func`. ''' assert self._state == RUN result = TornadoApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result@gen.coroutinedef async_run(func, *args, **kwargs): """ Runs the given function in a subprocess. This wraps the given function in a gen.Task and runs it in a multiprocessing.Pool. It is meant to be used as a Tornado co-routine. Note that if func returns an Exception (or an Exception sub-class), this function will raise the Exception, rather than return it. """ result = yield gen.Task(pool.apply_async, func, args, kwargs) if isinstance(result, Exception): raise result raise Return(result)def handle_exceptions(func): """ Raise a WrapException so we get a more meaningful traceback""" @wraps(func) def inner(*args, **kwargs): try: return func(*args, **kwargs) except Exception: raise WrapException() return inner# Test worker functions@handle_exceptionsdef test2(x): raise Exception("eeee")@handle_exceptionsdef test(x): print x time.sleep(2) return "done"class TestHandler(tornado.web.RequestHandler): @gen.coroutine def get(self): try: result = yield async_run(test, "inside get") self.write("%s\n" % result) result = yield async_run(test2, "hi2") except Exception as e: print("caught exception in get") self.write("Caught an exception: %s" % e) finally: self.finish()app = tornado.web.Application([ (r"/test", TestHandler),])if __name__ == "__main__": pool = TornadoPool(4) app.listen(8888) IOLoop.instance().start()
Here's how it behaves for the client:
dan@dan:~$ curl localhost:8888/testdoneCaught an exception: Original traceback:Traceback (most recent call last): File "./mutli.py", line 123, in inner return func(*args, **kwargs) File "./mutli.py", line 131, in test2 raise Exception("eeee")Exception: eeee
And if I send two simultaneous curl requests, we can see they're handled asynchronously on the server-side:
dan@dan:~$ ./mutli.py inside getinside getcaught exception inside getcaught exception inside get
Edit:
Note that this code becomes simpler with Python 3, because it introduces an error_callback
keyword argument to all asynchronous multiprocessing.Pool
methods. This makes it much easier to integrate with Tornado:
class TornadoPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): ''' Asynchronous equivalent of `apply()` builtin This version will call `callback` even if an exception is raised by `func`. ''' super().apply_async(func, args, kwds, callback=callback, error_callback=callback)@gen.coroutinedef async_run(func, *args, **kwargs): """ Runs the given function in a subprocess. This wraps the given function in a gen.Task and runs it in a multiprocessing.Pool. It is meant to be used as a Tornado co-routine. Note that if func returns an Exception (or an Exception sub-class), this function will raise the Exception, rather than return it. """ result = yield gen.Task(pool.apply_async, func, args, kwargs) raise Return(result)
All we need to do in our overridden apply_async
is call the parent with the error_callback
keyword argument, in addition to the callback
kwarg. No need to override ApplyResult
.
We can get even fancier by using a MetaClass in our TornadoPool
, to allow its *_async
methods to be called directly as if they were coroutines:
import timefrom functools import wrapsfrom multiprocessing.pool import Poolimport tornado.webfrom tornado import genfrom tornado.gen import Returnfrom tornado import stack_contextfrom tornado.ioloop import IOLoopfrom tornado.concurrent import Futuredef _argument_adapter(callback): def wrapper(*args, **kwargs): if kwargs or len(args) > 1: callback(Arguments(args, kwargs)) elif args: callback(args[0]) else: callback(None) return wrapperdef PoolTask(func, *args, **kwargs): """ Task function for use with multiprocessing.Pool methods. This is very similar to tornado.gen.Task, except it sets the error_callback kwarg in addition to the callback kwarg. This way exceptions raised in pool worker methods get raised in the parent when the Task is yielded from. """ future = Future() def handle_exception(typ, value, tb): if future.done(): return False future.set_exc_info((typ, value, tb)) return True def set_result(result): if future.done(): return if isinstance(result, Exception): future.set_exception(result) else: future.set_result(result) with stack_context.ExceptionStackContext(handle_exception): cb = _argument_adapter(set_result) func(*args, callback=cb, error_callback=cb) return futuredef coro_runner(func): """ Wraps the given func in a PoolTask and returns it. """ @wraps(func) def wrapper(*args, **kwargs): return PoolTask(func, *args, **kwargs) return wrapperclass MetaPool(type): """ Wrap all *_async methods in Pool with coro_runner. """ def __new__(cls, clsname, bases, dct): pdct = bases[0].__dict__ for attr in pdct: if attr.endswith("async") and not attr.startswith('_'): setattr(bases[0], attr, coro_runner(pdct[attr])) return super().__new__(cls, clsname, bases, dct)class TornadoPool(Pool, metaclass=MetaPool): pass# Test worker functionsdef test2(x): print("hi2") raise Exception("eeee")def test(x): print(x) time.sleep(2) return "done"class TestHandler(tornado.web.RequestHandler): @gen.coroutine def get(self): try: result = yield pool.apply_async(test, ("inside get",)) self.write("%s\n" % result) result = yield pool.apply_async(test2, ("hi2",)) self.write("%s\n" % result) except Exception as e: print("caught exception in get") self.write("Caught an exception: %s" % e) raise finally: self.finish()app = tornado.web.Application([ (r"/test", TestHandler),])if __name__ == "__main__": pool = TornadoPool() app.listen(8888) IOLoop.instance().start()
If your get requests are taking that long then tornado is the wrong framework.
I suggest you use nginx to route the fast gets to tornado and the slower ones to a different server.
PeterBe has an interesting article where he runs multiple Tornado servers and sets one of them to be 'the slow one' for handling the long running requests see: worrying-about-io-blocking I would try this method.