Asyncio.gather vs asyncio.wait Asyncio.gather vs asyncio.wait python python

Asyncio.gather vs asyncio.wait


Although similar in general cases ("run and get results for many tasks"), each function has some specific functionality for other cases:

asyncio.gather()

Returns a Future instance, allowing high level grouping of tasks:

import asynciofrom pprint import pprintimport randomasync def coro(tag):    print(">", tag)    await asyncio.sleep(random.uniform(1, 3))    print("<", tag)    return tagloop = asyncio.get_event_loop()group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])all_groups = asyncio.gather(group1, group2, group3)results = loop.run_until_complete(all_groups)loop.close()pprint(results)

All tasks in a group can be cancelled by calling group2.cancel() or even all_groups.cancel(). See also .gather(..., return_exceptions=True),

asyncio.wait()

Supports waiting to be stopped after the first task is done, or after a specified timeout, allowing lower level precision of operations:

import asyncioimport randomasync def coro(tag):    print(">", tag)    await asyncio.sleep(random.uniform(0.5, 5))    print("<", tag)    return tagloop = asyncio.get_event_loop()tasks = [coro(i) for i in range(1, 11)]print("Get first result:")finished, unfinished = loop.run_until_complete(    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))for task in finished:    print(task.result())print("unfinished:", len(unfinished))print("Get more results in 2 seconds:")finished2, unfinished2 = loop.run_until_complete(    asyncio.wait(unfinished, timeout=2))for task in finished2:    print(task.result())print("unfinished2:", len(unfinished2))print("Get all other results:")finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))for task in finished3:    print(task.result())loop.close()


asyncio.wait is more low level than asyncio.gather.

As the name suggests, asyncio.gather mainly focuses on gathering the results. It waits on a bunch of futures and returns their results in a given order.

asyncio.wait just waits on the futures. And instead of giving you the results directly, it gives done and pending tasks. You have to manually collect the values.

Moreover, you could specify to wait for all futures to finish or just the first one with wait.


A very important distinction, which is easy to miss, is the default behavior of these two functions, when it comes to exceptions.


I'll use this example to simulate a coroutine that will raise exceptions, sometimes -

import asyncioimport randomasync def a_flaky_tsk(i):    await asyncio.sleep(i)  # bit of fuzz to simulate a real-world example    if i % 2 == 0:        print(i, "ok")    else:        print(i, "crashed!")        raise ValueErrorcoros = [a_flaky_tsk(i) for i in range(10)]

await asyncio.gather(*coros) outputs -

0 ok1 crashed!Traceback (most recent call last):  File "/Users/dev/PycharmProjects/trading/xxx.py", line 20, in <module>    asyncio.run(main())  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/runners.py", line 43, in run    return loop.run_until_complete(main)  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete    return future.result()  File "/Users/dev/PycharmProjects/trading/xxx.py", line 17, in main    await asyncio.gather(*coros)  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk    raise ValueErrorValueError

As you can see, the coros after index 1 never got to execute.


But await asyncio.wait(coros) continues to execute tasks, even if some of them fail -

0 ok1 crashed!2 ok3 crashed!4 ok5 crashed!6 ok7 crashed!8 ok9 crashed!Task exception was never retrievedfuture: <Task finished name='Task-10' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>Traceback (most recent call last):  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk    raise ValueErrorValueErrorTask exception was never retrievedfuture: <Task finished name='Task-8' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>Traceback (most recent call last):  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk    raise ValueErrorValueErrorTask exception was never retrievedfuture: <Task finished name='Task-2' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>Traceback (most recent call last):  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk    raise ValueErrorValueErrorTask exception was never retrievedfuture: <Task finished name='Task-9' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>Traceback (most recent call last):  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk    raise ValueErrorValueErrorTask exception was never retrievedfuture: <Task finished name='Task-3' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>Traceback (most recent call last):  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk    raise ValueErrorValueError

Of course, this behavior can be changed for both by using -

asyncio.gather(..., return_exceptions=True)

or,

asyncio.wait([...], return_when=asyncio.FIRST_EXCEPTION)


But it doesn't end here!

Notice:Task exception was never retrievedin the logs above.

asyncio.wait() won't re-raise exceptions from the child tasks until you await them individually. (The stacktrace in the logs are just messages, they cannot be caught!)

done, pending = await asyncio.wait(coros)for tsk in done:    try:        await tsk    except Exception as e:        print("I caught:", repr(e))

Output -

0 ok1 crashed!2 ok3 crashed!4 ok5 crashed!6 ok7 crashed!8 ok9 crashed!I caught: ValueError()I caught: ValueError()I caught: ValueError()I caught: ValueError()I caught: ValueError()

On the other hand, to catch exceptions with asyncio.gather(), you must -

results = await asyncio.gather(*coros, return_exceptions=True)for result_or_exc in results:    if isinstance(result_or_exc, Exception):        print("I caught:", repr(result_or_exc))

(Same output as before)