Python - how to run multiple coroutines concurrently using asyncio? Python - how to run multiple coroutines concurrently using asyncio? python python

Python - how to run multiple coroutines concurrently using asyncio?


TL;DR Use asyncio.ensure_future() to run several coroutines concurrently.


Maybe this scenario requires a framework based on events/callbacks rather than one based on coroutines? Tornado?

No, you don't need any other framework for this. The whole idea the asynchronous application vs synchronous is that it doesn't block, while waiting for result. It doesn't matter how it is implemented, using coroutines or callbacks.

I mean, because connection_handler is constantly waiting for incoming messages, the server can only take action after it has received a message from the client, right? What am I missing here?

In synchronous application you will write something like msg = websocket.recv(), which would block whole application until you receive message (as you described). But in the asynchronous application it's completely different.

When you do msg = yield from websocket.recv() you say something like: suspend execution of connection_handler() until websocket.recv() will produce something. Using yield from inside coroutine returns control back to the event loop, so some other code can be executed, while we're waiting for result of websocket.recv(). Please, refer to documentation to better understand how coroutines work.

Let's say we – additionally – wanted to send a message to the client whenever some event happens. For simplicity, let's send a message periodically every 60 seconds. How would we do that?

You can use asyncio.async() to run as many coroutines as you want, before executing blocking call for starting event loop.

import asyncioimport websockets# here we'll store all active connections to use for sending periodic messagesconnections = []@asyncio.coroutinedef connection_handler(connection, path):    connections.append(connection)  # add connection to pool    while True:        msg = yield from connection.recv()        if msg is None:  # connection lost            connections.remove(connection)  # remove connection from pool, when client disconnects            break        else:            print('< {}'.format(msg))        yield from connection.send(msg)        print('> {}'.format(msg))@asyncio.coroutinedef send_periodically():    while True:        yield from asyncio.sleep(5)  # switch to other code and continue execution in 5 seconds        for connection in connections:            print('> Periodic event happened.')            yield from connection.send('Periodic event happened.')  # send message to each connected clientstart_server = websockets.serve(connection_handler, 'localhost', 8000)asyncio.get_event_loop().run_until_complete(start_server)asyncio.async(send_periodically())  # before blocking call we schedule our coroutine for sending periodic messagesasyncio.get_event_loop().run_forever()

Here is an example client implementation. It asks you to enter name, receives it back from the echo server, waits for two more messages from server (which are our periodic messages) and closes connection.

import asyncioimport websockets@asyncio.coroutinedef hello():    connection = yield from websockets.connect('ws://localhost:8000/')    name = input("What's your name? ")    yield from connection.send(name)    print("> {}".format(name))    for _ in range(3):        msg = yield from connection.recv()        print("< {}".format(msg))    yield from connection.close()asyncio.get_event_loop().run_until_complete(hello())

Important points:

  1. In Python 3.4.4 asyncio.async() was renamed to asyncio.ensure_future().
  2. There are special methods for scheduling delayed calls, but they don't work with coroutines.


I'm surprised gather isn't mentioned.

From the Python documentation:

import asyncioasync def factorial(name, number):    f = 1    for i in range(2, number + 1):        print(f"Task {name}: Compute factorial({i})...")        await asyncio.sleep(1)        f *= i    print(f"Task {name}: factorial({number}) = {f}")async def main():    # Schedule three calls *concurrently*:    await asyncio.gather(        factorial("A", 2),        factorial("B", 3),        factorial("C", 4),    )asyncio.run(main())# Expected output:##     Task A: Compute factorial(2)...#     Task B: Compute factorial(2)...#     Task C: Compute factorial(2)...#     Task A: factorial(2) = 2#     Task B: Compute factorial(3)...#     Task C: Compute factorial(3)...#     Task B: factorial(3) = 6#     Task C: Compute factorial(4)...#     Task C: factorial(4) = 24


Same issue, can hardly got solution until I saw the perfect sample here: http://websockets.readthedocs.io/en/stable/intro.html#both

 done, pending = await asyncio.wait(        [listener_task, producer_task],        return_when=asyncio.FIRST_COMPLETED)  # Important

So, I can handle multi coroutine tasks such as heartbeat and redis subscribe.