Is it possible to run only a single step of the asyncio event loop Is it possible to run only a single step of the asyncio event loop tkinter tkinter

Is it possible to run only a single step of the asyncio event loop


The missing of public method like loop.run_once() is intentional.Not every supported event loop has a method to iterate one step. Often underlying API has methods for creating event loop and running it forever but emulating single step may be very ineffective.

If you really need it you may implement single-step iteration easy:

import asynciodef run_once(loop):    loop.call_soon(loop.stop)    loop.run_forever()loop = asyncio.get_event_loop()for i in range(100):    print('Iteration', i)    run_once(loop)


Take a look at this example.

import asynciofrom tkinter import *class asyncTk(Tk):    def __init__(self):        super().__init__()        self.running = True        self.protocol("WM_DELETE_WINDOW", self.on_closing)    def on_closing(self):        self.running = False        self.destroy()            def __await__(self):        while self.running:            self.update()            yieldasync def asd():    for x in range(1,10):        await asyncio.sleep(1)        print(x)async def main():    w = asyncTk()    asyncio.create_task(asd())    await wasyncio.run(main())


I using the following procedure for own run_once() and run_forever() creation.

Here's a simplified example:

import asyncioasync def worker(**kwargs):    id = kwargs.get('id', '0.0.0.0.0.0')    time = kwargs.get('time', 1)    try:        # Do stuff.        print('start: ' + id)    finally:        await asyncio.sleep(time)async def worker_forever(**kwargs):    while True:        await worker(**kwargs)def init_loop(configs, forever=True):    loop = asyncio.get_event_loop()    if forever:        tasks = [            loop.create_task(worker_forever(id=conf['id'], time=conf['time']))             for conf in config        ]    else:        tasks = [            asyncio.ensure_future(worker(id=conf['id'], time=conf['time']))             for conf in configs        ]    return loop, tasksdef run_once(configs):    print('RUN_ONCE')    loop, futures = init_loop(configs, forever=False)    result = loop.run_until_complete(asyncio.gather(*futures))    print(result)def run_forever(configs):    print('RUN_FOREVER')    loop, _ = init_loop(configs, forever=True)    try:        loop.run_forever()    except KeyboardInterrupt:        pass    finally:        print("Closing Loop")        loop.close()if __name__ == '__main__':    configurations = [        {'time': 5, 'id': '4'},        {'time': 6, 'id': '5'},        {'time': 1, 'id': '6'},    ]  # TODO :: DUMMY    run_once(configurations)    run_forever(configurations)