How to best perform Multiprocessing within requests with the python Tornado server? How to best perform Multiprocessing within requests with the python Tornado server? python python

How to best perform Multiprocessing within requests with the python Tornado server?


If you're willing to use concurrent.futures.ProcessPoolExecutor instead of multiprocessing, this is actually very simple. Tornado's ioloop already supports concurrent.futures.Future, so they'll play nicely together out of the box. concurrent.futures is included in Python 3.2+, and has been backported to Python 2.x.

Here's an example:

import timefrom concurrent.futures import ProcessPoolExecutorfrom tornado.ioloop import IOLoopfrom tornado import gendef f(a, b, c, blah=None):    print "got %s %s %s and %s" % (a, b, c, blah)    time.sleep(5)    return "hey there"@gen.coroutinedef test_it():    pool = ProcessPoolExecutor(max_workers=1)    fut = pool.submit(f, 1, 2, 3, blah="ok")  # This returns a concurrent.futures.Future    print("running it asynchronously")    ret = yield fut    print("it returned %s" % ret)    pool.shutdown()IOLoop.instance().run_sync(test_it)

Output:

running it asynchronouslygot 1 2 3 and okit returned hey there

ProcessPoolExecutor has a more limited API than multiprocessing.Pool, but if you don't need the more advanced features of multiprocessing.Pool, it's worth using because the integration is so much simpler.


multiprocessing.Pool can be integrated into the tornado I/O loop, but it's a bit messy. A much cleaner integration can be done using concurrent.futures (see my other answer for details), but if you're stuck on Python 2.x and can't install the concurrent.futures backport, here is how you can do it strictly using multiprocessing:

The multiprocessing.Pool.apply_async and multiprocessing.Pool.map_async methods both have an optional callback parameter, which means that both can potentially be plugged into a tornado.gen.Task. So in most cases, running code asynchronously in a sub-process is as simple as this:

import multiprocessingimport contextlibfrom tornado import genfrom tornado.gen import Returnfrom tornado.ioloop import IOLoopfrom functools import partialdef worker():    print "async work here"@gen.coroutinedef async_run(func, *args, **kwargs):    result = yield gen.Task(pool.apply_async, func, args, kwargs)    raise Return(result)if __name__ == "__main__":    pool = multiprocessing.Pool(multiprocessing.cpu_count())    func = partial(async_run, worker)    IOLoop().run_sync(func)

As I mentioned, this works well in most cases. But if worker() throws an exception, callback is never called, which means the gen.Task never finishes, and you hang forever. Now, if you know that your work will never throw an exception (because you wrapped the whole thing in a try/except, for example), you can happily use this approach. However, if you want to let exceptions escape from your worker, the only solution I found was to subclass some multiprocessing components, and make them call callback even if the worker sub-process raised an exception:

from multiprocessing.pool import ApplyResult, Pool, RUNimport multiprocessingclass TornadoApplyResult(ApplyResult):    def _set(self, i, obj):        self._success, self._value = obj         if self._callback:            self._callback(self._value)        self._cond.acquire()        try:            self._ready = True            self._cond.notify()        finally:            self._cond.release()        del self._cache[self._job]class TornadoPool(Pool):    def apply_async(self, func, args=(), kwds={}, callback=None):        ''' Asynchronous equivalent of `apply()` builtin        This version will call `callback` even if an exception is        raised by `func`.        '''        assert self._state == RUN        result = TornadoApplyResult(self._cache, callback)        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))        return result ... if __name__ == "__main__":     pool = TornadoPool(multiprocessing.cpu_count())     ...

With these changes, the exception object will be returned by the gen.Task, rather than the gen.Task hanging indefinitely. I also updated my async_run method to re-raise the exception when its returned, and made some other changes to provide better tracebacks for exceptions thrown in the worker sub-processes. Here's the full code:

import multiprocessingfrom multiprocessing.pool import Pool, ApplyResult, RUNfrom functools import wrapsimport tornado.webfrom tornado.ioloop import IOLoopfrom tornado.gen import Returnfrom tornado import genclass WrapException(Exception):    def __init__(self):        exc_type, exc_value, exc_tb = sys.exc_info()        self.exception = exc_value        self.formatted = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))    def __str__(self):        return '\n%s\nOriginal traceback:\n%s' % (Exception.__str__(self), self.formatted)class TornadoApplyResult(ApplyResult):    def _set(self, i, obj):        self._success, self._value = obj         if self._callback:            self._callback(self._value)        self._cond.acquire()        try:            self._ready = True            self._cond.notify()        finally:            self._cond.release()        del self._cache[self._job]   class TornadoPool(Pool):    def apply_async(self, func, args=(), kwds={}, callback=None):        ''' Asynchronous equivalent of `apply()` builtin        This version will call `callback` even if an exception is        raised by `func`.        '''        assert self._state == RUN        result = TornadoApplyResult(self._cache, callback)        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))        return result@gen.coroutinedef async_run(func, *args, **kwargs):    """ Runs the given function in a subprocess.    This wraps the given function in a gen.Task and runs it    in a multiprocessing.Pool. It is meant to be used as a    Tornado co-routine. Note that if func returns an Exception     (or an Exception sub-class), this function will raise the     Exception, rather than return it.    """    result = yield gen.Task(pool.apply_async, func, args, kwargs)    if isinstance(result, Exception):        raise result    raise Return(result)def handle_exceptions(func):    """ Raise a WrapException so we get a more meaningful traceback"""    @wraps(func)    def inner(*args, **kwargs):        try:            return func(*args, **kwargs)        except Exception:            raise WrapException()    return inner# Test worker functions@handle_exceptionsdef test2(x):    raise Exception("eeee")@handle_exceptionsdef test(x):    print x    time.sleep(2)    return "done"class TestHandler(tornado.web.RequestHandler):    @gen.coroutine    def get(self):        try:            result = yield async_run(test, "inside get")            self.write("%s\n" % result)            result = yield async_run(test2, "hi2")        except Exception as e:            print("caught exception in get")            self.write("Caught an exception: %s" % e)        finally:            self.finish()app = tornado.web.Application([    (r"/test", TestHandler),])if __name__ == "__main__":    pool = TornadoPool(4)    app.listen(8888)    IOLoop.instance().start()

Here's how it behaves for the client:

dan@dan:~$ curl localhost:8888/testdoneCaught an exception: Original traceback:Traceback (most recent call last):  File "./mutli.py", line 123, in inner    return func(*args, **kwargs)  File "./mutli.py", line 131, in test2    raise Exception("eeee")Exception: eeee

And if I send two simultaneous curl requests, we can see they're handled asynchronously on the server-side:

dan@dan:~$ ./mutli.py inside getinside getcaught exception inside getcaught exception inside get

Edit:

Note that this code becomes simpler with Python 3, because it introduces an error_callback keyword argument to all asynchronous multiprocessing.Pool methods. This makes it much easier to integrate with Tornado:

class TornadoPool(Pool):    def apply_async(self, func, args=(), kwds={}, callback=None):        ''' Asynchronous equivalent of `apply()` builtin        This version will call `callback` even if an exception is        raised by `func`.        '''        super().apply_async(func, args, kwds, callback=callback,                            error_callback=callback)@gen.coroutinedef async_run(func, *args, **kwargs):    """ Runs the given function in a subprocess.    This wraps the given function in a gen.Task and runs it    in a multiprocessing.Pool. It is meant to be used as a    Tornado co-routine. Note that if func returns an Exception    (or an Exception sub-class), this function will raise the    Exception, rather than return it.    """    result = yield gen.Task(pool.apply_async, func, args, kwargs)    raise Return(result)

All we need to do in our overridden apply_async is call the parent with the error_callback keyword argument, in addition to the callback kwarg. No need to override ApplyResult.

We can get even fancier by using a MetaClass in our TornadoPool, to allow its *_async methods to be called directly as if they were coroutines:

import timefrom functools import wrapsfrom multiprocessing.pool import Poolimport tornado.webfrom tornado import genfrom tornado.gen import Returnfrom tornado import stack_contextfrom tornado.ioloop import IOLoopfrom tornado.concurrent import Futuredef _argument_adapter(callback):    def wrapper(*args, **kwargs):        if kwargs or len(args) > 1:            callback(Arguments(args, kwargs))        elif args:            callback(args[0])        else:            callback(None)    return wrapperdef PoolTask(func, *args, **kwargs):    """ Task function for use with multiprocessing.Pool methods.    This is very similar to tornado.gen.Task, except it sets the    error_callback kwarg in addition to the callback kwarg. This    way exceptions raised in pool worker methods get raised in the    parent when the Task is yielded from.    """    future = Future()    def handle_exception(typ, value, tb):        if future.done():            return False        future.set_exc_info((typ, value, tb))        return True    def set_result(result):        if future.done():            return        if isinstance(result, Exception):            future.set_exception(result)        else:            future.set_result(result)    with stack_context.ExceptionStackContext(handle_exception):        cb = _argument_adapter(set_result)        func(*args, callback=cb, error_callback=cb)    return futuredef coro_runner(func):    """ Wraps the given func in a PoolTask and returns it. """    @wraps(func)    def wrapper(*args, **kwargs):        return PoolTask(func, *args, **kwargs)    return wrapperclass MetaPool(type):    """ Wrap all *_async methods in Pool with coro_runner. """    def __new__(cls, clsname, bases, dct):        pdct = bases[0].__dict__        for attr in pdct:            if attr.endswith("async") and not attr.startswith('_'):                setattr(bases[0], attr, coro_runner(pdct[attr]))        return super().__new__(cls, clsname, bases, dct)class TornadoPool(Pool, metaclass=MetaPool):    pass# Test worker functionsdef test2(x):    print("hi2")    raise Exception("eeee")def test(x):    print(x)    time.sleep(2)    return "done"class TestHandler(tornado.web.RequestHandler):    @gen.coroutine    def get(self):        try:            result = yield pool.apply_async(test, ("inside get",))            self.write("%s\n" % result)            result = yield pool.apply_async(test2, ("hi2",))            self.write("%s\n" % result)        except Exception as e:            print("caught exception in get")            self.write("Caught an exception: %s" % e)            raise        finally:            self.finish()app = tornado.web.Application([    (r"/test", TestHandler),])if __name__ == "__main__":    pool = TornadoPool()    app.listen(8888)    IOLoop.instance().start()


If your get requests are taking that long then tornado is the wrong framework.

I suggest you use nginx to route the fast gets to tornado and the slower ones to a different server.

PeterBe has an interesting article where he runs multiple Tornado servers and sets one of them to be 'the slow one' for handling the long running requests see: worrying-about-io-blocking I would try this method.