python asyncio, how to create and cancel tasks from another thread python asyncio, how to create and cancel tasks from another thread multithreading multithreading

python asyncio, how to create and cancel tasks from another thread


I think you may need to make your add_task method aware of whether or not its being called from a thread other than the event loop's. That way, if it's being called from the same thread, you can just call asyncio.async directly, otherwise, it can do some extra work to pass the task from the loop's thread to the calling thread. Here's an example:

import timeimport asyncioimport functoolsfrom threading import Thread, current_thread, Eventfrom concurrent.futures import Futureclass B(Thread):    def __init__(self, start_event):        Thread.__init__(self)        self.loop = None        self.tid = None        self.event = start_event    def run(self):        self.loop = asyncio.new_event_loop()        asyncio.set_event_loop(self.loop)        self.tid = current_thread()        self.loop.call_soon(self.event.set)        self.loop.run_forever()    def stop(self):        self.loop.call_soon_threadsafe(self.loop.stop)    def add_task(self, coro):        """this method should return a task object, that I          can cancel, not a handle"""        def _async_add(func, fut):            try:                ret = func()                fut.set_result(ret)            except Exception as e:                fut.set_exception(e)        f = functools.partial(asyncio.async, coro, loop=self.loop)        if current_thread() == self.tid:            return f() # We can call directly if we're not going between threads.        else:            # We're in a non-event loop thread so we use a Future            # to get the task from the event loop thread once            # it's ready.            fut = Future()            self.loop.call_soon_threadsafe(_async_add, f, fut)            return fut.result()    def cancel_task(self, task):        self.loop.call_soon_threadsafe(task.cancel)@asyncio.coroutinedef test():    while True:        print("running")        yield from asyncio.sleep(1)event = Event()b = B(event)b.start()event.wait() # Let the loop's thread signal us, rather than sleepingt = b.add_task(test()) # This is a real tasktime.sleep(10)b.stop()

First, we save the thread id of the event loop in the run method, so we can figure out if calls to add_task are coming from other threads later. If add_task is called from a non-event loop thread, we use call_soon_threadsafe to call a function that will both schedule the coroutine, and then use a concurrent.futures.Future to pass the task back to the calling thread, which waits on the result of the Future.

A note on cancelling a task: You when you call cancel on a Task, a CancelledError will be raised in the coroutine the next time the event loop runs. This means that the coroutine that the Task is wrapping will aborted due to the exception the next time it hit a yield point - unless the coroutine catches the CancelledError and prevents itself from aborting. Also note that this only works if the function being wrapped is actually an interruptible coroutine; an asyncio.Future returned by BaseEventLoop.run_in_executor, for example, can't really be cancelled, because it's actually wrapped around a concurrent.futures.Future, and those can't be cancelled once their underlying function actually starts executing. In those cases, the asyncio.Future will say its cancelled, but the function actually running in the executor will continue to run.

Edit: Updated the first example to use concurrent.futures.Future, instead of a queue.Queue, per Andrew Svetlov's suggestion.

Note: asyncio.async is deprecated since version 3.4.4 use asyncio.ensure_future instead.


You do everything right.For task stopping make method

class B(Thread):    # ...    def cancel(self, task):        self.loop.call_soon_threadsafe(task.cancel)

BTW you have to setup an event loop for the created thread explicitly by

self.loop = asyncio.new_event_loop()asyncio.set_event_loop(self.loop)

because asyncio creates implicit event loop only for main thread.


just for reference here it the code I finally implemented based on the the help I got on this site, it is simpler since I did not need all features. thanks again!

import asynciofrom threading import Threadfrom concurrent.futures import Futureimport functoolsclass B(Thread):    def __init__(self):        Thread.__init__(self)        self.loop = None    def run(self):        self.loop = asyncio.new_event_loop()        asyncio.set_event_loop(self.loop)        self.loop.run_forever()    def stop(self):        self.loop.call_soon_threadsafe(self.loop.stop)    def _add_task(self, future, coro):        task = self.loop.create_task(coro)        future.set_result(task)    def add_task(self, coro):        future = Future()        p = functools.partial(self._add_task, future, coro)        self.loop.call_soon_threadsafe(p)        return future.result() #block until result is available    def cancel(self, task):        self.loop.call_soon_threadsafe(task.cancel)