[issue37334] Add a cancel method to asyncio Queues

Caleb Hattingh report at bugs.python.org
Fri Nov 22 09:06:05 EST 2019


Caleb Hattingh <caleb.hattingh at gmail.com> added the comment:

Ok, I see now. The improvement with only a single producer/consumer might be marginal, but the proposition that `queue.cancel()` might simplify the situation with multiple producers and/or consumers is more compelling.

Usually, assuming M producers and N threads (and 1 queue), one would typically stop the M producers outright, and then place N `None` values on the queue, and then each consumer shuts itself down when it receives a `None`.  It's clunky but it works. 

This from experience with threaded code, but the difference with asyncio is that we have cancellation available to us, whereas we could not cancel threads. I think this is why I've carried over my queue-idioms from threads to asyncio. So this is an interesting idea: if we introduce cancellation to queue handling, do things get better for the MxN producers and consumers design?

Rehashing, what I might expect to happen when I call `queue.cancel()` is that 

1. A CancelledError (or maybe`QueueCancelled`?) exception is raised in all producers and consumers ) - this gives a producer a chance to handle the error and do something with the waiting item that could not be `put()`
2. Items currently on the queue still get processed in the consumers before the consumers exit.

I think (1) is easy, but I expected (2) to be more tricky. You said it already works that way in your PR. I didn't believe it so I wrote a silly program to check, and it does! All pending items on the queue are still consumed by consumers even after the queue._getters futures are cancelled.  

So yes, both (1) and (2) appear to work.

> As for the "why don't I just cancel the task?", well, if you know it. There may be many consumer or producer tasks waiting for their turn. Sure, you can keep a list of all those tasks. But that's exactly the point of the proposed change: the Queue already knows all the waiting tasks, no need to keep another list up-to-date!

Finally - while I think the MxN producers/consumers case might be simplified by this, it's worth noting that usually in shutdown, *all* the currently-pending tasks are cancelled anyway. And as I said before, in an MxN queue scenario, one might place N `None` values on the queue, and then just send `CancelledError` to everything anyway (consumers will ignore the cancellation and just wait for the `None`s to exit). This works well.  

Thus, if `queue.cancel()` were available to me right now, the primary difference as I see it would be that during shutdown, instead of placing N `None` values on the queue, I would instead call `queue.cancel()`. I agree that's a bit neater.  (It however will still necessary to absorb CancelledError in the consumers, e.g. what is raised by `asyncio.run()` during shutdown, so that's unchanged).

I agree with Yury that I don't like `queue.close`. "Cancel" seems better after all.

I disagree with Yury that items are discarded - I checked that already-present items on the queue will still be consumed by consumers, before the `queue.close` cancellation is actually raised.

----------

_______________________________________
Python tracker <report at bugs.python.org>
<https://bugs.python.org/issue37334>
_______________________________________


More information about the Python-bugs-list mailing list