Using a background thread with asyncio/futures with flask

Mark Bourne nntp.mbourne at spamgourmet.com
Fri Mar 22 16:58:31 EDT 2024


Thomas Nyberg wrote:
> Hi,
> 
> Yeah so flask does support async (when installed with `pip3 install 
> flask[async]), but you are making a good point that flask in this case 
> is a distraction. Here's an example using just the standard library that 
> exhibits the same issue:
> 
> `app.py`
> ```
> import asyncio
> import threading
> import time
> from queue import Queue
> 
> 
> in_queue = Queue()
> out_queue = Queue()
> 
> 
> def worker():
>      print("worker started running")
>      while True:
>          future = in_queue.get()
>          print(f"worker got future: {future}")
>          time.sleep(5)
>          print("worker sleeped")
>          out_queue.put(future)
> 
> 
> def finalizer():
>      print("finalizer started running")
>      while True:
>          future = out_queue.get()
>          print(f"finalizer got future: {future}")
>          future.set_result("completed")
>          print("finalizer set result")
> 
> 
> threading.Thread(target=worker).start()
> threading.Thread(target=finalizer).start()
> 
> 
> async def main():
>      future = asyncio.get_event_loop().create_future()
>      in_queue.put(future)
>      print(f"main put future: {future}")
>      result = await future
>      print(result)
> 
> 
> if __name__ == "__main__":
>      loop = asyncio.get_event_loop()
>      loop.run_until_complete(main())
> ```
> 
> If I run that I see the following printed out (after which is just hangs):
> 
> ```
> $ python3 app.py
> worker started running
> finalizer started running
> main put future: <Future pending>
> worker got future: <Future pending>
> worker sleeped
> finalizer got future: <Future pending cb=[Task.task_wakeup()]>
> finalizer set result
> ```
> 
> I believe async uses a cooperative multitasking setup under the hood, so 
> I presume the way I'm doing this threading just isn't playing well with 
> that (and presumably some csp yield isn't happening somewhere). Anyway 
> at this point I feel like the easiest approach is to just throw away 
> threads entirely and learn how to do all I want fully in the brave new 
> async world, but I'm still curious why this is failing and how to make 
> this sort of setup work since it points to my not understanding the 
> basic implementation/semantics of async in python.
> 
> Thanks for any help!
> 
> /Thomas
> 
> On 3/22/24 08:27, Lars Liedtke via Python-list wrote:
>> Hey,
>>
>> As far as I know (might be old news) flask does not support asyncio.
>>
>> You would have to use a different framework, like e.g. FastAPI or 
>> similar. Maybe someone has already written "flask with asyncio" but I 
>> don't know about that.
>>
>> Cheers
>>
>> Lars
>>
>>
>> Lars Liedtke
>> Lead Developer
>>
>> [Tel.]  +49 721 98993-
>> [Fax]   +49 721 98993-
>> [E-Mail]        lal at solute.de<mailto:lal at solute.de>
>>
>>
>> solute GmbH
>> Zeppelinstraße 15
>> 76185 Karlsruhe
>> Germany
>>
>> [Marken]
>>
>> Geschäftsführer | Managing Director: Dr. Thilo Gans, Bernd Vermaaten
>> Webseite | www.solute.de <http://www.solute.de/>
>> Sitz | Registered Office: Karlsruhe
>> Registergericht | Register Court: Amtsgericht Mannheim
>> Registernummer | Register No.: HRB 748044
>> USt-ID | VAT ID: DE234663798
>>
>>
>>
>> Informationen zum Datenschutz | Information about privacy policy
>> https://www.solute.de/ger/datenschutz/grundsaetze-der-datenverarbeitung.php 
>>
>>
>>
>>
>>
>> Am 20.03.24 um 09:22 schrieb Thomas Nyberg via Python-list:
>>
>> Hello,
>>
>> I have a simple (and not working) example of what I'm trying to do. 
>> This is a simplified version of what I'm trying to achieve (obviously 
>> the background workers and finalizer functions will do more later):
>>
>> `app.py`
>>
>> ```
>> import asyncio
>> import threading
>> import time
>> from queue import Queue
>>
>> from flask import Flask
>>
>> in_queue = Queue()
>> out_queue = Queue()
>>
>>
>> def worker():
>>     print("worker started running")
>>     while True:
>>         future = in_queue.get()
>>         print(f"worker got future: {future}")
>>         time.sleep(5)
>>         print("worker sleeped")
>>         out_queue.put(future)
>>
>>
>> def finalizer():
>>     print("finalizer started running")
>>     while True:
>>         future = out_queue.get()
>>         print(f"finalizer got future: {future}")
>>         future.set_result("completed")
>>         print("finalizer set result")
>>
>>
>> threading.Thread(target=worker, daemon=True).start()
>> threading.Thread(target=finalizer, daemon=True).start()
>>
>> app = Flask(__name__)
>>
>>
>> @app.route("/")
>> async def root():
>>     future = asyncio.get_event_loop().create_future()
>>     in_queue.put(future)
>>     print(f"root put future: {future}")
>>     result = await future
>>     return result
>>
>>
>> if __name__ == "__main__":
>>     app.run()
>> ```
>>
>> If I start up that server, and execute `curl http://localhost:5000`, 
>> it prints out the following in the server before hanging:
>>
>> ```
>> $ python3 app.py
>> worker started running
>> finalizer started running
>> * Serving Flask app 'app'
>> * Debug mode: off
>> WARNING: This is a development server. Do not use it in a production 
>> deployment. Use a production WSGI server instead.
>> * Running on http://127.0.0.1:5000
>> Press CTRL+C to quit
>> root put future: <Future pending>
>> worker got future: <Future pending cb=[Task.task_wakeup()]>
>> worker sleeped
>> finalizer got future: <Future pending cb=[Task.task_wakeup()]>
>> finalizer set result
>> ```
>>
>> Judging by what's printing out, the `final result = await future` 
>> doesn't seem to be happy here.
>>
>> Maybe someone sees something obvious I'm doing wrong here? I presume 
>> I'm mixing threads and asyncio in a way I shouldn't be.

Aside from possible issues mixing threads and asyncio (I'm no expert on 
asyncio), there's also the issue that there's nothing to cause the 
threads to exit.  The following doesn't use asyncio, but also hangs 
after the main thread has got the result:

```
import queue
import threading
import time

in_queue = queue.Queue()
out_queue = queue.Queue()
result_queue = queue.Queue()

def worker():
     print("worker started running")
     while True:
         item = in_queue.get()
         print(f"worker got item: {item}")
         time.sleep(5)
         print("worker sleeped")
         out_queue.put(item)


def finalizer():
     print("finalizer started running")
     while True:
         item = out_queue.get()
         print(f"finalizer got item: {item}")
         result_queue.put(item)
         print("finalizer set result")


threading.Thread(target=worker).start()
threading.Thread(target=finalizer).start()
# threading.Thread(target=worker, daemon=True).start()
# threading.Thread(target=finalizer, daemon=True).start()


def main():
     item = "Item to process"
     in_queue.put("Item to process")
     print(f"main put item: {item}")
     result = None
     while True:
         try:
             result = result_queue.get(timeout=1)
         except queue.Empty:
             # No result yet
             print("main waiting for result")
             continue
         break
     print(f"main got result {result}")


if __name__ == "__main__":
     main()
```

By default, the main process won't exit until there are no non-daemon 
threads still running.  You can either send some sort of signal to the 
threads signal the threads to exit the loop and return cleanly (you'd 
also need a timeout on the queue `get()` calls).  Or you can create the 
threads as "daemon" threads (as in the commented-out lines), in which 
case they'll be killed when all non-daemon threads have exited.  Daemon 
threads don't get a chance to do any cleanup, close resources, etc. when 
they're killed, though, so aren't always appropriate.

-- 
Mark.


More information about the Python-list mailing list