Accessing a shared generator from multiple threads.

Alan Kennedy alanmk at hotmail.com
Wed Jan 21 15:53:09 EST 2004


[Andrae Muys]
> Moved to email for higher bandwidth.  Feel free to quote to usenet if
> you desire. 

[Alan Kennedy]
>> I think I'm with Aahz on this one: when faced with this kind of 
>> problem, I think it is best to use a tried and tested inter-thread 
>> communication paradigm, such as Queue.Queue. In this case, Queue.Queue
>> fits the problem (which is just a variation of the producer/consumer
>> problem) naturally. Also, I doubt very much if there is much excessive
>> resource overhead when using Queue.Queues. 

[Andrae Muys]
>Well I'm puzzled, because I couldn't see an easy way to use Queue.Queue
>to achieve this because this isn't a strict producer/consumer problem.
>I am trying to synchronise multiple consumers, but I don't have a
>producer.  So the only way I can see to use Queue.Queue to achieve 
>this is to spawn a thread specifically to convert the iterator in to 
>a producer. 

Andrae,

I thought it best to continue this discussion on UseNet, to perhaps
get more opinions.

Yes, you're right. Using a Queue in this situation does require the
use of a dedicated thread for the producer. There is no way to "pull"
values from a generator to multiple consumers through a Queue.Queue.
The values have to be "pushed" onto the Queue.Queue by some producing
thread of execution.

The way I see it, the options are

Option 1. Spawn a separate thread to execute the producing generator.
However, this has problems:-

A: How do the threads recognise the end of the generated sequence?
This is not a simple problem: the Queue simply being empty does not
necessarily signify the end of the sequence (e.g., the producer thread
might not be getting its fair share of CPU time).

B: The Queue acts as a (potentially infinite) buffer for the generated
values, thus eliminating one of the primary benefits of generators:
their efficient "generate when required" nature. This can be helped
somewhat by limiting the number of entries in the Queue, but it is
still slightly unsatisfactory.

C: A thread of execution has to be dedicated to the producer, thus
consuming resources.

Option 2. Fill the Queue with values from a main thread which executes
the generator to exhaustion. The consuming threads simply peel values
from the Queue. Although this saves on thread overhead, it is the
least desirable in terms of memory overhead: the number of values
generated by the generator and buffered in the Queue could be very
large.

Option 3. Use the same paradigm as your original paradigm, i.e. there
is no producer thread and the consuming threads are themselves
responsible for calling the generator.next() method: access to this
method is synchronised on a threading.Lock. I really like this
solution, because values are only generated on demand, with no
temporary storage of values required.

I think that an ideal solution would be to create a dedicated class
for synchronising a generator, as my example did, BUT to implement the
same interface as Queue.Queue, so that client code would remain
ignorant that it was dealing with a generator.

Here is my version of such a beast

# -=-=-=-=-= file GenQueue.py =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
import threading

class Empty(Exception): pass
class Exhausted(StopIteration): pass
class IllegalOperation(Exception): pass

class GenQueue:
  "Simulate a Queue.Queue, with values produced from a generator"

  def __init__(self, gen):
    self.lock = threading.Lock()
    self.gen = gen

  def __iter__(self):
    return self

  def _get(self, block=1):
    if self.lock.acquire(block):
      try:
        try:
          return self.gen.next()
        except StopIteration:
          raise Exhausted
      finally:
        self.lock.release()
    else:
      raise Empty

  def next(self):
    return self._get(1)

  def get(self, block=1):
    return self._get(block)

  def get_nowait(self):
    return self._get(0)

  def put(self, item, block=1):
    raise IllegalOperation

  def put_nowait(self, item):
    self.put(item, 0)

  def full(self):
    return False

  def empty(self):
    return False

  def qsize(self):
    return 1j

#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

And here is some code that tests it

#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

import sys
import time
import thread
import GenQueue

def squares(n):
  i = 1
  while i <= n:
    yield i*i
    i = i+1

def test_blockget(results, queue):
  while 1:
    try:
      results.append(queue.get())
    except GenQueue.Empty:
      raise TestFailure
    except GenQueue.Exhausted:
      break

def test_iter(results, queue):
  for v in queue:
    results.append(v)

def test_noblockget(results, queue):
  while 1:
    try:
      results.append(queue.get_nowait())
    except GenQueue.Empty:
      pass
    except GenQueue.Exhausted:
      break

def threadwrap(func, queue, markers):
  markers[thread.get_ident()] = 1
  results = []
  func(results, queue)
  print "Test %s: Thread %5s: %d results." % (func.__name__, \
    thread.get_ident(), len(results))
  del markers[thread.get_ident()]

def test():
  numthreads = 10
  for tfunc in (test_blockget, test_iter, test_noblockget):
    print "Test: %s ------------------------------->" % tfunc.__name__
    threadmarkers = {}
    q = GenQueue.GenQueue(squares(100))
    threads = [thread.start_new_thread(threadwrap,\
                (tfunc, q, threadmarkers)) for t in
xrange(numthreads)]
    while len(threadmarkers.keys()) > 0:
      time.sleep(0.1)

if __name__ == "__main__":
  test()

#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

I find the combination of the iteration protocol and a Queue
intriguing: in this case, Queue.get() and iter.next() mean the same
thing. Or maybe I'm just being sucked in by the syntactic niceties of
something like

def worker(inq, outq):
  for thing in inq: outq.put(thing.work()) 

I'm interested to hear other opinions about the commonalities and
differences between Queues and iterators. 

One problem that is always in the back of my mind these days is how
one could write a dispatch-based coroutine scheduler that would work
efficiently when in communication (through Queue.Queues?) with
independently executing coroutine schedulers running on other
processors in the box. (And before you jump in shouting "Global
Interpreter Lock!", remember jython + generators will be able to do
this).

Not that I need such a thing: it's just a fun thing to think about,
like crosswords :-)

cheers,

-- 
alan kennedy
------------------------------------------------------
check http headers here: http://xhaus.com/headers
email alan:              http://xhaus.com/contact/alan



More information about the Python-list mailing list