Equivalent of asyncio.Queues with worker "threads" Equivalent of asyncio.Queues with worker "threads" python python

Equivalent of asyncio.Queues with worker "threads"


Yes, exactly. Tasks are your friends:

import asyncio, randomq = asyncio.Queue()@asyncio.coroutinedef produce():    while True:        yield from q.put(random.random())        yield from asyncio.sleep(0.5 + random.random())@asyncio.coroutinedef consume():    while True:        value = yield from q.get()        print("Consumed", value)loop = asyncio.get_event_loop()loop.create_task(produce())loop.create_task(consume())loop.run_forever()

asyncio.ensure_future can be used for task creation also.

And please keep in mind: q.put() is a coroutine, so you should to use yield from q.put(value).

UPD

Switched from asyncio.Task()/asyncio.async() to new brand API loop.create_task() and asyncio.ensure_future() in example.


A bit later and maybe OT, have in mind that you can consume from the Queue from multiple tasks as they were independent consumers.

The following snippet shows as an example how you can achieve the same thread pool pattern with asyncio tasks.

q = asyncio.Queue()async def sum(x):    await asyncio.sleep(0.1)  # simulates asynchronously    return xasync def consumer(i):    print("Consumer {} started".format(i))    while True:        f, x = await q.get()        print("Consumer {} procesing {}".format(i, x))        r = await sum(x)        f.set_result(r)async def producer():    consumers = [asyncio.ensure_future(consumer(i)) for i in range(5)]    loop = asyncio.get_event_loop()    tasks = [(asyncio.Future(), x) for x in range(10)]    for task in tasks:        await q.put(task)    # wait until all futures are completed    results = await asyncio.gather(*[f for f, _ in tasks])    assert results == [r for _, r in tasks]    # destroy tasks    for c in consumers:        c.cancel()asyncio.get_event_loop().run_until_complete(producer())