Equivalent of asyncio.Queues with worker "threads"
Yes, exactly. Tasks are your friends:
import asyncio, randomq = asyncio.Queue()@asyncio.coroutinedef produce(): while True: yield from q.put(random.random()) yield from asyncio.sleep(0.5 + random.random())@asyncio.coroutinedef consume(): while True: value = yield from q.get() print("Consumed", value)loop = asyncio.get_event_loop()loop.create_task(produce())loop.create_task(consume())loop.run_forever()
asyncio.ensure_future
can be used for task creation also.
And please keep in mind: q.put()
is a coroutine, so you should to use yield from q.put(value)
.
UPD
Switched from asyncio.Task()
/asyncio.async()
to new brand API loop.create_task()
and asyncio.ensure_future()
in example.
Here's what I use in production, moved to gist: https://gist.github.com/thehesiod/7081ab165b9a0d4de2e07d321cc2391d
A bit later and maybe OT, have in mind that you can consume from the Queue
from multiple tasks as they were independent consumers.
The following snippet shows as an example how you can achieve the same thread pool pattern with asyncio
tasks.
q = asyncio.Queue()async def sum(x): await asyncio.sleep(0.1) # simulates asynchronously return xasync def consumer(i): print("Consumer {} started".format(i)) while True: f, x = await q.get() print("Consumer {} procesing {}".format(i, x)) r = await sum(x) f.set_result(r)async def producer(): consumers = [asyncio.ensure_future(consumer(i)) for i in range(5)] loop = asyncio.get_event_loop() tasks = [(asyncio.Future(), x) for x in range(10)] for task in tasks: await q.put(task) # wait until all futures are completed results = await asyncio.gather(*[f for f, _ in tasks]) assert results == [r for _, r in tasks] # destroy tasks for c in consumers: c.cancel()asyncio.get_event_loop().run_until_complete(producer())