How to use asyncio with existing blocking library? How to use asyncio with existing blocking library? python python

How to use asyncio with existing blocking library?


There are (sort of) two questions here: first, how to run blocking code asynchronously, and second, how to run async code concurrently (asyncio is single-threaded, so the GIL still applies, so it isn't truly parallel, but I digress).

Concurrent tasks can be created using asyncio.ensure_future, as documented here.

To run synchronous code, you will need to run the blocking code in an executor. Example:

import concurrent.futuresimport asyncioimport timedef blocking(delay):    time.sleep(delay)    print('Completed.')async def non_blocking(loop, executor):    # Run three of the blocking tasks concurrently. asyncio.wait will    # automatically wrap these in Tasks. If you want explicit access    # to the tasks themselves, use asyncio.ensure_future, or add a    # "done, pending = asyncio.wait..." assignment    await asyncio.wait(        fs={            # Returns after delay=12 seconds            loop.run_in_executor(executor, blocking, 12),                        # Returns after delay=14 seconds            loop.run_in_executor(executor, blocking, 14),                        # Returns after delay=16 seconds            loop.run_in_executor(executor, blocking, 16)        },        return_when=asyncio.ALL_COMPLETED    )loop = asyncio.get_event_loop()executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)loop.run_until_complete(non_blocking(loop, executor))

If you want to schedule these tasks using a for loop (as in your example), you have several different strategies, but the underlying approach is to schedule the tasks using the for loop (or list comprehension, etc), await them with asyncio.wait, and then retrieve the results. Example:

done, pending = await asyncio.wait(    fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps],    return_when=asyncio.ALL_COMPLETED)# Note that any errors raise during the above will be raised here; to# handle errors you will need to call task.exception() and check if it# is not None before calling task.result()results = [task.result() for task in done]


Extending the accepted answer to actually solve the problem in question.

Note: Requires python 3.7+

import functoolsfrom urllib.request import urlopenimport asynciodef legacy_blocking_function():  # You cannot change this function    r = urlopen("https://example.com")    return r.read().decode()def run_in_executor(f):    @functools.wraps(f)    def inner(*args, **kwargs):        loop = asyncio.get_running_loop()        return loop.run_in_executor(None, lambda: f(*args, **kwargs))    return inner@run_in_executordef foo(arg):  # Your wrapper for async use    resp = legacy_blocking_function()    return f"{arg}{len(resp)}"@run_in_executordef bar(arg):  # Another wrapper    resp = legacy_blocking_function()    return f"{len(resp)}{arg}"async def process_input(inp):  # Modern async function (coroutine)    res = await foo(inp)    res = f"XXX{res}XXX"    return await bar(res)async def main():    inputs = ["one", "two", "three"]    input_tasks = [asyncio.create_task(process_input(inp)) for inp in inputs]    print([await t for t in asyncio.as_completed(input_tasks)])    # This doesn't work as expected :(    # print([await t for t in asyncio.as_completed([process_input(inp) for inp in input_tasks])])if __name__ == '__main__':asyncio.run(main())

Click here for up to date version of this example and to send pull requests.


import asynciofrom time import sleepimport logginglogging.basicConfig(    level=logging.DEBUG, format="%(asctime)s %(thread)s %(funcName)s %(message)s")def long_task(t):    """Simulate long IO bound task."""    logging.info("2. t: %s", t)    sleep(t)    logging.info("4. t: %s", t)    return t ** 2async def main():    loop = asyncio.get_running_loop()    inputs = range(1, 5)    logging.info("1.")    futures = [loop.run_in_executor(None, long_task, i) for i in inputs]    logging.info("3.")    results = await asyncio.gather(*futures)    logging.info("5.")    for (i, result) in zip(inputs, results):        logging.info("6. Result: %s, %s", i, result)if __name__ == "__main__":    asyncio.run(main())

Output:

2020-03-18 17:13:07,523 23964 main 1.2020-03-18 17:13:07,524 5008 long_task 2. t: 12020-03-18 17:13:07,525 21232 long_task 2. t: 22020-03-18 17:13:07,525 22048 long_task 2. t: 32020-03-18 17:13:07,526 25588 long_task 2. t: 42020-03-18 17:13:07,526 23964 main 3.2020-03-18 17:13:08,526 5008 long_task 4. t: 12020-03-18 17:13:09,526 21232 long_task 4. t: 22020-03-18 17:13:10,527 22048 long_task 4. t: 32020-03-18 17:13:11,527 25588 long_task 4. t: 42020-03-18 17:13:11,527 23964 main 5.2020-03-18 17:13:11,528 23964 main 6. Result: 1, 12020-03-18 17:13:11,528 23964 main 6. Result: 2, 42020-03-18 17:13:11,529 23964 main 6. Result: 3, 92020-03-18 17:13:11,529 23964 main 6. Result: 4, 16