Producer-consumer threading problem

George Sakkis george.sakkis at gmail.com
Thu Jun 12 02:08:24 EDT 2008


On Jun 11, 3:07 pm, Carl Banks <pavlovevide... at gmail.com> wrote:

> On Jun 10, 11:33 pm, George Sakkis <george.sak... at gmail.com> wrote:
>
> > I pasted my current solution athttp://codepad.org/FXF2SWmg. Any
> > feedback, especially if it has to do with proving or disproving its
> > correctness, will be appreciated.
>
> It seems like you're reinventing the wheel.  The Queue class does all
> this, and it's been thorougly battle-tested.

Synchronized queues are an extremely useful data structure in many
situations. The producer/consumer paradigm however is a more general
model and doesn't depend on any specific data structure.

> So first of all, can you tell us why the following wouldn't work?  It
> might help us understand the issue you're facing (never mind the
> produce and consume arguments for now--I'll cover that below).
>
> def iter_consumed(items):
>     q = Queue.Queue()
>     sentinel = object()
>     def produce_all()
>         for item in items:
>             q.put()
>         q.put(sentinel)
>     producer = threading.Thread(target=produce_all)
>     producer.start()
>     try:
>         while True:
>             item = q.get()
>             if item is sentinel:
>                 return
>             yield item
>     finally:
>         # for robustness, notify producer to wrap things up
>         # left as exercise
>         producer.join()

As it is, all this does is yield the original items, but slower, which
is pretty much useless. The whole idea is to transform some inputs to
some outputs. How exactly each input is mapped to an output is
irrelevant at this point; this is the power of the producer/consumer
model. Produce might mean "send an email to address X" and consume
might mean "wait for an automated email response, parse it and return
a value Y". No queue has to be involved; it *may* be involved of
course, but that's an implementation detail, it shouldn't make a
difference to iter_consumed().

If you replace "q.push"/"q.pop" with "produce"/"consume" respectively
and make the last two parameters, you're much closer to my idea.
What's left is getting rid of the sentinel, since the producer and the
consumer may have been written independently, not aware of
iter_consumed. E.g. for the email example, all producers and consumers
(there may be more than one) must agree in advance on a "sentinel
email". For a given situation that might not be a big issue, but then
again, if iter_consumed() is written once without assuming a sentinel,
it makes life easier for all future producers and consumers.

> If you want to customize the effect of getting and putting, you can
> subclass Queue and override the _get and _put methods (however, last
> time I checked, the Queue class expects _put to always add an item to
> the internal sequence representing the queue--not necessarily to the
> top--and _get to always remove an item--not necessarily from the
> bottom).

Assuming that you're talking about the stdlib Queue.Queue class and
not some abstract Queue interface, extending it may help for limited
customization but can only get you so far. For instance the producer
and the consumer may not live in the same address space, or even in
the same machine.

> One issue from your function.  This line:
>
> done_remaining[1] += 1
>
> is not atomic, but your code depends on it being so.

No, it doesn't; it is protected by the condition object.

George



More information about the Python-list mailing list