asyncio questions

Frank Millman frank at chagford.com
Thu Jan 26 05:12:29 EST 2023


Hi all

I have written a simple HTTP server using asyncio. It works, but I don't 
always understand how it works, so I was pleased that Python 3.11 
introduced some new high-level concepts that hide the gory details. I 
want to refactor my code to use these concepts, but I am not finding it 
easy.

In simple terms my main loop looked like this -

     loop = asyncio.get_event_loop()
     server = loop.run_until_complete(
         asyncio.start_server(handle_client, host, port))
     loop.run_until_complete(setup_companies())
     session_check = asyncio.ensure_future(
         check_sessions())  # start background task
     print('Press Ctrl+C to stop')
     try:
         loop.run_forever()
     except KeyboardInterrupt:
         print()
     finally:
         session_check.cancel()  # tell session_check to stop running
         loop.run_until_complete(asyncio.wait([session_check]))
         server.close()
         loop.stop()

Using 3.11 it now looks like this -

     with asyncio.Runner() as runner:
         server = runner.run(asyncio.start_server(
             handle_client, host, port)
         runner.run(setup_companies())
         session_check = asyncio.ensure_future(
             check_sessions())  # start background task
         print('Press Ctrl+C to stop')
         try:
             runner.run(server.serve_forever())
         except KeyboardInterrupt:
             print()
         finally:
             session_check.cancel()  # tell session_check to stop running
             runner.run(asyncio.wait([session_check]))
             server.close()

It works, and I guess it looks a bit neater.

Problem 1.

The docs to 'asyncio.ensure_future' state 'See also the create_task() 
function which is the preferred way for creating new Tasks'

If I change 'ensure_future' to 'create_task', I get
     RuntimeError: no running event loop

I don't know how to fix this.

Problem 2.

The docs have a section on 'Handling Keyboard Interruption'

https://docs.python.org/3.11/library/asyncio-runner.html#asyncio.Runner

I have not figured out how to adapt my code to use this new approach.

Any suggestions appreciated.

Frank Millman

P.S. Might it be better to ask these questions on the Async_SIG 
Discussion Forum?


More information about the Python-list mailing list