Using a background thread with asyncio/futures with flask

Frank Millman frank at chagford.com
Sat Mar 23 09:25:28 EDT 2024


On 2024-03-22 12:08 PM, Thomas Nyberg via Python-list wrote:
> Hi,
> 
> Yeah so flask does support async (when installed with `pip3 install 
> flask[async]), but you are making a good point that flask in this case 
> is a distraction. Here's an example using just the standard library that 
> exhibits the same issue:
> 
> `app.py`
> ```
> import asyncio
> import threading
> import time
> from queue import Queue
> 
> 
> in_queue = Queue()
> out_queue = Queue()
> 
> 
> def worker():
>      print("worker started running")
>      while True:
>          future = in_queue.get()
>          print(f"worker got future: {future}")
>          time.sleep(5)
>          print("worker sleeped")
>          out_queue.put(future)
> 
> 
> def finalizer():
>      print("finalizer started running")
>      while True:
>          future = out_queue.get()
>          print(f"finalizer got future: {future}")
>          future.set_result("completed")
>          print("finalizer set result")
> 
> 
> threading.Thread(target=worker).start()
> threading.Thread(target=finalizer).start()
> 
> 
> async def main():
>      future = asyncio.get_event_loop().create_future()
>      in_queue.put(future)
>      print(f"main put future: {future}")
>      result = await future
>      print(result)
> 
> 
> if __name__ == "__main__":
>      loop = asyncio.get_event_loop()
>      loop.run_until_complete(main())
> ```
> 
> If I run that I see the following printed out (after which is just hangs):
> 
> ```

Combining Dieter's and Mark's ideas, here is a version that works.

It is not pretty! call_soon_threadsafe() is a loop function, but the 
loop is not accessible from a different thread. Therefore I include a 
reference to the loop in the message passed to in_queue, which in turn 
passes it to out_queue.

Frank

=======================================================

import asyncio
import threading
import time
from queue import Queue


in_queue = Queue()
out_queue = Queue()


def worker():
     print("worker started running")
     while True:
         loop, future = in_queue.get()
         print(f"worker got future: {future}")
         time.sleep(5)
         print("worker sleeped")
         out_queue.put((loop, future))


def finalizer():
     print("finalizer started running")
     while True:
         loop, future = out_queue.get()
         print(f"finalizer got future: {future}")
         loop.call_soon_threadsafe(future.set_result, "completed")
         print("finalizer set result")


threading.Thread(target=worker, daemon=True).start()
threading.Thread(target=finalizer, daemon=True).start()


async def main():
     loop = asyncio.get_event_loop()
     future = loop.create_future()
     in_queue.put((loop, future))
     print(f"main put future: {future}")
     result = await future
     print(result)


if __name__ == "__main__":
     # loop = asyncio.get_event_loop()
     # loop.run_until_complete(main())
     asyncio.run(main())



More information about the Python-list mailing list