[Python-ideas] asyncio: return from multiple coroutines

Kyle Stanley aeros167 at gmail.com
Thu Jun 25 01:35:56 EDT 2020


(Resending this email since it didn't originally go through to
python-list, sorry for the duplicate Pablo)

> Yes, I want to have multiple results: the connections listening forever, returning a result for each message received.

> I forgot to mention thatI did try to use asyncio.wait with `FIRST_COMPLETED`; however, the problem is that it seems to evict the not-completed coroutines, so the messenger that arrives second does not send the message. To check it, I have run that script without the random sleep. just msgr1 waits 1s and msgr2 waits 2s, so msgr1 always ends first. I expect a result like this (which I am currently getting with queues):

FYI, the other coroutines are not evicted or cancelled, they are
simply in the "pending" set of the "done, pending" tuple returned by
`asyncio.wait()` and were not returned. I misunderstood what you were
actually looking for, but I think that I understand now.

Since it seems like you want to be able to receive the results out of
order from both "messengers" and have them continuously listen to
their respective socket (without affecting the other), a queue is
likely going to be the best approach. I think you had the right
general idea with your example, but here's a way to make it
significantly less cumbersome and easy to expand upon (such as
expanding the number of websockets to listen to):

```
import asyncio

class MessengerQueue:
    def __init__(self):
        self._queue = asyncio.Queue()

    async def get(self):
        return await self._queue.get()

    async def start_messengers(self):
        # Could be easily modified to create any number of
"messengers" set to
        # listen on specific websockets, by passing a list and
creating a task for
        # each one.
        asyncio.create_task(self._messenger("Messenger 1", 1))
        asyncio.create_task(self._messender("Messenger 2", 2))

    async def _messenger(self, message: str, sleep_time: int):
        while True:
            await asyncio.sleep(sleep_time)
            await self._queue.put(f'{message} awaited for {sleep_time:.2f}s')


async def main():
    mqueue = MessengerQueue()
    asyncio.create_task(mqueue.start_messengers())
    while True:
        result = await mqueue.get()
        print(result)

asyncio.run(main())
```

This results in your desired output:
Messenger 1 awaited for 1.00s
Messenger 2 awaited for 2.00s
Messenger 1 awaited for 1.00s
Messenger 1 awaited for 1.00s
Messenger 2 awaited for 2.00s
Messenger 1 awaited for 1.00s
Messenger 1 awaited for 1.00s
Messenger 2 awaited for 2.00s

Note: it would probably be more idiomatic to call these "consumers" or
"listeners" rather than "messengers"/"messagers" (the websocket docs
refer to them as "consumer handlers"), but I used "messengers" to make
it a bit more easily comparable to the original queue example from the
OP: https://pastebin.com/BzaxRbtF.

I hope the above example is of some use. :-)

Regards,
Kyle Stanley

On Thu, Jun 25, 2020 at 1:28 AM Kyle Stanley <aeros167 at gmail.com> wrote:
>
> > Yes, I want to have multiple results: the connections listening forever, returning a result for each message received.
>
> > I forgot to mention thatI did try to use asyncio.wait with `FIRST_COMPLETED`; however, the problem is that it seems to evict the not-completed coroutines, so the messenger that arrives second does not send the message. To check it, I have run that script without the random sleep. just msgr1 waits 1s and msgr2 waits 2s, so msgr1 always ends first. I expect a result like this (which I am currently getting with queues):
>
> FYI, the other coroutines are not evicted or cancelled, they are
> simply in the "pending" set of the "done, pending" tuple returned by
> `asyncio.wait()` and were not returned. I misunderstood what you were
> actually looking for, but I think that I understand now.
>
> Since it seems like you want to be able to receive the results out of
> order from both "messengers" and have them continuously listen to
> their respective socket (without affecting the other), a queue is
> likely going to be the best approach. I think you had the right
> general idea with your example, but here's a way to make it
> significantly less cumbersome and easy to expand upon (such as
> expanding the number of websockets to listen to):
>
> ```
> import asyncio
>
> class MessengerQueue:
>     def __init__(self):
>         self._queue = asyncio.Queue()
>
>     async def get(self):
>         return await self._queue.get()
>
>     async def start_messengers(self):
>         # Could be easily modified to create any number of
> "messengers" set to
>         # listen on specific websockets, by passing a list and
> creating a task for
>         # each one.
>         asyncio.create_task(self._messenger("Messenger 1", 1))
>         asyncio.create_task(self._messender("Messenger 2", 2))
>
>     async def _messenger(self, message: str, sleep_time: int):
>         while True:
>             await asyncio.sleep(sleep_time)
>             await self._queue.put(f'{message} awaited for {sleep_time:.2f}s')
>
>
> async def main():
>     mqueue = MessengerQueue()
>     asyncio.create_task(mqueue.start_messengers())
>     while True:
>         result = await mqueue.get()
>         print(result)
>
> asyncio.run(main())
> ```
>
> This results in your desired output:
> Messenger 1 awaited for 1.00s
> Messenger 2 awaited for 2.00s
> Messenger 1 awaited for 1.00s
> Messenger 1 awaited for 1.00s
> Messenger 2 awaited for 2.00s
> Messenger 1 awaited for 1.00s
> Messenger 1 awaited for 1.00s
> Messenger 2 awaited for 2.00s
>
> Note: it would probably be more idiomatic to call these "consumers" or
> "listeners" rather than "messengers"/"messagers" (the websocket docs
> refer to them as "consumer handlers"), but I used "messengers" to make
> it a bit more easily comparable to the original queue example from the
> OP: https://pastebin.com/BzaxRbtF.
>
> I hope the above example is of some use. :-)
>
> Regards,
> Kyle Stanley
>
>
>
>
> On Tue, Jun 23, 2020 at 10:36 AM Pablo Alcain <pabloalcain at gmail.com> wrote:
> >
> > Thank you very much Kyle for your answer, I am moving this conversation to the more proper python-list for whoever wants to chime in. I summarize here the key points of my original question (full question on the quoted email):
> >
> > I have an application that listens on two websockets through the async library https://websockets.readthedocs.io/ and I have to perform the same function on the result, no matter where the message came from. I have implemented a rather cumbersome solution with async Queues: https://pastebin.com/BzaxRbtF, but i think there has to be a more async-friendly option I am missing.
> >
> > Now I move on to the comments that Kyle made
> >
> > On Tue, Jun 23, 2020 at 12:32 AM Kyle Stanley <aeros167 at gmail.com> wrote:
> >>
> >> I believe asyncio.wait() with "return_when=FIRST_COMPLETED" would
> >> perform the functionality you're looking for with the
> >> "asyncio.on_first_return()". For details on the functionality of
> >> asyncio.wait(), see
> >> https://docs.python.org/3/library/asyncio-task.html#asyncio.wait.
> >>
> >> > I understand that I can create two coroutines that call the same function, but it would be much cleaner (because of implementation issues) if I can simply create a coroutine that yields the result of whichever connection arrives first.
> >>
> >> You can use an asynchronous generator that will continuously yield the
> >> result of the first recv() that finishes (I'm assuming you mean
> >> "yields" literally and want multiple results from a generator, but I
> >> might be misinterpreting that part).
> >
> >
> > Yes, I want to have multiple results: the connections listening forever, returning a result for each message received.
> >
> >>
> >>
> >> Here's a brief example, using the recv() coroutine function from the
> >> pastebin linked:
> >>
> >> ```
> >> import asyncio
> >> import random
> >>
> >> async def recv(message: str, max_sleep: int):
> >>     sleep_time = max_sleep * random.random()
> >>     await asyncio.sleep(sleep_time)
> >>     return f'{message} awaited for {sleep_time:.2f}s'
> >>
> >> async def _start():
> >>     while True:
> >>         msgs = [
> >>             asyncio.create_task(recv("Messager 1", max_sleep=1)),
> >>             asyncio.create_task(recv("Messager 2", max_sleep=1))
> >>         ]
> >>         done, _ = await asyncio.wait(msgs,
> >>             return_when=asyncio.FIRST_COMPLETED)
> >>         result = done.pop()
> >>         yield await result
> >>
> >> async def main():
> >>     async for result in _start():
> >>         print(result)
> >>
> >> asyncio.run(main())
> >> ```
> >
> >
> > I forgot to mention thatI did try to use asyncio.wait with `FIRST_COMPLETED`; however, the problem is that it seems to evict the not-completed coroutines, so the messenger that arrives second does not send the message. To check it, I have run that script without the random sleep. just msgr1 waits 1s and msgr2 waits 2s, so msgr1 always ends first. I expect a result like this (which I am currently getting with queues):
> >
> > Messenger 1 waits for 1.0s
> > Messenger 1 waits for 1.0s
> > Messenger 2 waits for 2.0s
> > Messenger 1 waits for 1.0s
> > Messenger 1 waits for 1.0s
> > Messenger 2 waits for 2.0s
> > Messenger 1 waits for 1.0s
> > ...
> >
> > but instead I got this:
> >
> > Messenger 1 waits for 1.0s
> > Messenger 1 waits for 1.0s
> > Messenger 1 waits for 1.0s
> > Messenger 1 waits for 1.0s
> > Messenger 1 waits for 1.0s
> > ...
> >
> >
> >>
> >>
> >>
> >> Note that in the above example, in "msgs", you can technically pass
> >> the coroutine objects directly to asyncio.wait(), as they will be
> >> implicitly converted to tasks. However, we decided to deprecate that
> >> functionality in Python 3.8 since it can be rather confusing. So
> >> creating and passing the tasks is a better practice.
> >
> >
> > Thanks for that info, I am still trying to grasp the best practices surrounding mostly the explicitness in async.
> >
> >>
> >> > Again, it's quite likely I am not seeing something obvious, but I didn't know where else to ask.
> >>
> >> If you're not mostly certain or relatively inexperienced with the
> >> specific area that the question pertains to, I'd recommend asking on
> >> python-list first (or another Python user community). python-ideas is
> >> primarily intended for new feature proposals/suggestions. Although if
> >> you've tried other resources and haven't found an answer, it's
> >> perfectly fine to ask a question as part of the suggestion post.
> >>
> >>
> >
> > Original question, as posted in python-ideas:
> >
> >>
> >> On Mon, Jun 22, 2020 at 6:24 PM Pablo Alcain <pabloalcain at gmail.com> wrote:
> >> >
> >> > Hey everyone. I have been looking into asyncio lately, and even though I have had my fair share of work, I still have some of it very shaky, so first of all forgive me if what I am saying here is already implemented and I totally missed it (so far, it looks *really* likely).
> >> >
> >> > Basically this is the situation: I have an application that listens on two websockets through the async library https://websockets.readthedocs.io/ and I have to perform the same function on the result, no matter where the message came from. I understand that I can create two coroutines that call the same function, but it would be much cleaner (because of implementation issues) if I can simply create a coroutine that yields the result of whichever connection arrives first.
> >> >
> >> > I have implemented a rather cumbersome solution with async Queues (as I would do in threading), in which each connection puts its message in a queue and an adapter class awaits the first element of the queue on "receive". Here I attach a pastebin with the minimal working example: https://pastebin.com/BzaxRbtF
> >> >
> >> > However, it looks like a more async-friendly solution should exist, something like
> >> >
> >> > ```
> >> > async def _start():
> >> >     msg1 = recv("Messager 1", sleep_time=1)
> >> >     msg2 = recv("Messager 2", sleep_time=2)
> >> >     while True:
> >> >         result = await asyncio.on_first_return(msg1, msg2)
> >> >         print(result)
> >> > ```
> >> >
> >> > (I understand that this implementation would not work because the event loop doesn't know that it is "the same task repeated", but it's just to tell you the general idea)
> >> >
> >> > Again, it's quite likely I am not seeing something obvious, but I didn't know where else to ask.
> >> >
> >> > Thank you very much,
> >> > Pablo
> >> >
> >> > _______________________________________________
> >> > Python-ideas mailing list -- python-ideas at python.org
> >> > To unsubscribe send an email to python-ideas-leave at python.org
> >> > https://mail.python.org/mailman3/lists/python-ideas.python.org/
> >> > Message archived at https://mail.python.org/archives/list/python-ideas@python.org/message/XBR5QPXRBCCJELDVEWMKRBPTNG4SJM64/
> >> > Code of Conduct: http://python.org/psf/codeofconduct/


More information about the Python-list mailing list