[Python-ideas] Add closing and iteration to threading.Queue

Nathaniel Smith njs at pobox.com
Tue Oct 23 01:13:38 EDT 2018


On Sun, Oct 21, 2018 at 8:31 PM, Guido van Rossum <guido at python.org> wrote:
> On Sun, Oct 21, 2018 at 6:08 PM Nathaniel Smith <njs at pobox.com> wrote:
>> I'm not sure if this is an issue the way Queue is used in practice, but in
>> general you have to be careful with this kind of circular flow because if
>> your queue communicates backpressure (which it should) then circular flows
>> can deadlock.
>
> Nathaniel, would you be able to elaborate more on the issue of backpressure?
> I think a lot of people here are not really familiar with the concepts and
> its importance, and it changes how you have to think about queues and the
> like.

Sure.

Suppose you have some kind of producer connected to some kind of
consumer. If the producer consistently runs faster than the consumer,
what should happen? By default with queue.Queue, there's no limit on
its internal buffer, so if the producer puts, say, 10 items per
second, and the consumer only gets, say, 1 item per second, then the
internal buffer grows by 9 items per second. Basically you have a
memory leak, which will eventually crash your program. And well before
that, your latency will become terrible. How can we avoid this?

I guess we could avoid this by carefully engineering our systems to
make sure that producers always run slower than consumers, but that's
difficult and fragile. Instead, what we usually want to do is to
dynamically detect when a producer is outrunning a consumer, and apply
*backpressure*. (It's called that b/c it involves the consumer
"pushing back" against the producer.) The simplest way is to put a
limit on how large our Queue's buffer can grow, and make put() block
if it would exceed this limit. That way producers are automatically
slowed down, because they have to wait for the consumer to drain the
buffer before they can continue executing.

This simple approach also works well when you have several tasks
arranged in a pipeline like A -> B -> C, where B gets objects from A,
does some processing, and then puts new items on to C. If C is running
slow, this will eventually apply backpressure to B, which will block
in put(), and then since B is blocked and not calling get(), then A
will eventually get backpressure too. In fact, this works fine for any
acyclic network topology.

If you have a cycle though, like A -> B -> C -> A, then you at least
potentially have the risk of deadlock, where every task is blocked in
put(), and can't continue until the downstream task calls get(), but
it never will because it's blocked in put() too. Sometimes it's OK and
won't deadlock, but you need to think carefully about the details to
figure that out.

If a task gets and puts to the same queue, like someone suggested
doing for the sentinel value upthread, then that's a cycle and you
need to do some more analysis. (I guess if you have a single sentinel
value, then queue.Queue is probably OK, since the minimal buffer size
it supports is 1? So when the last thread get()s the sentinel, it
knows that there's at least 1 free space in the buffer, and can put()
it back without blocking. But if there's a risk of somehow getting
multiple sentinel values, or if Queues ever gain support for
zero-sized buffers, then this pattern could deadlock.)

There's a runnable example here:
https://trio.readthedocs.io/en/latest/reference-core.html#buffering-in-channels
And I also wrote about backpressure and asyncio here:
https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#bug-1-backpressure

-n

-- 
Nathaniel J. Smith -- https://vorpus.org


More information about the Python-ideas mailing list