Suggested generator to add to threading module.

Alan Kennedy alanmk at hotmail.com
Fri Jan 16 17:57:50 EST 2004


[Andrae Muys]
>>> Found myself needing serialised access to a shared generator from
>>> multiple threads.  Came up with the following
>>>
>>> def serialise(gen):
>>>   lock = threading.Lock()
>>>   while 1:
>>>     lock.acquire()
>>>     try:
>>>       next = gen.next()
>>>     finally:
>>>       lock.release()
>>>     yield next

[Ype Kingma]
>> Is there any reason why the lock is not shared among threads?
>> From the looks of this, it doesn't synchronize anything
>> between different threads. Am I missing something?

[Jeff Epler]
> Yes, I think so.  You'd use the same "serialise" generator object in
> multiple threads, like this:
> 
> p = seralise(producer_generator())
> threads = [thread.start_new(worker_thread, (p,))
>                 for t in range(num_workers)]

Hmm. I think Ype is right: the above code does not correctly serialise
access to a generator.

The above serialise function is a generator which wraps a generator.
This presumably is in order to prevent the wrapped generators .next()
method being called simultaneously from multiple threads (which is
barred: PEP 255: "Restriction:  A generator cannot be resumed while it
is actively running")

http://www.python.org/peps/pep-0255.html

However, the above implementation re-creates the problem by using an
outer generator to wrap the inner one. The outer's .next() method will
then potentially be called simultaneously by multiple threads. The
following code illustrates the problem

#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
import time
import thread
import threading

def serialise(gen):
  lock = threading.Lock()
  while 1:
    lock.acquire()
    try:
      next = gen.next()
    finally:
      lock.release()
    yield next

def squares(n):
  i = 1
  while i < n:
    yield i*i
    i = i+1

def worker_thread(iter, markers):
  markers[thread.get_ident()] = 1
  results = [] ; clashes = 0
  while 1:
    try:
      results.append(iter.next())
    except StopIteration:
      break
    except ValueError, ve:
      if str(ve) == "generator already executing":
        clashes = clashes + 1
  del markers[thread.get_ident()]
  print "Thread %5s: %d results: %d clashes." % (thread.get_ident(),\
   len(results), clashes)

numthreads = 10 ; threadmarkers = {}
serp = serialise(squares(100))
threads = [thread.start_new_thread(worker_thread,\
            (serp, threadmarkers)) for t in xrange(numthreads)]
while len(threadmarkers.keys()) > 0:
  time.sleep(0.1)
#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

I believe that the following definition of serialise will correct the
problem (IFF I've understood the problem correctly :-)

#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
import time
import thread
import threading

class serialise:
  "Wrap a generator in an iterator for thread-safe access"

  def __init__(self, gen):
    self.lock = threading.Lock()
    self.gen = gen

  def __iter__(self):
    return self

  def next(self):
    self.lock.acquire()
    try:
      return self.gen.next()
    finally:
      self.lock.release()

def squares(n):
  i = 1
  while i < n:
    yield i*i
    i = i+1

def worker_thread(iter, markers):
  markers[thread.get_ident()] = 1
  results = [] ; clashes = 0
  while 1:
    try:
      results.append(iter.next())
    except StopIteration:
      break
    except ValueError, ve:
      if str(ve) == "generator already executing":
        clashes = clashes + 1
  del markers[thread.get_ident()]
  print "Thread %5s: %d results: %d clashes." % (thread.get_ident(),\
   len(results), clashes)

numthreads = 10 ; threadmarkers = {}
serp = serialise(squares(100))
threads = [thread.start_new_thread(worker_thread,\
            (serp, threadmarkers)) for t in xrange(numthreads)]
while len(threadmarkers.keys()) > 0:
  time.sleep(0.1)
#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

Also, I don't know if I'm happy with relying on the fact that the
generator raises StopIteration for *every* .next() call after the
actual generated sequence has ended. The above code depends on the
exhausted generator raising StopIteration in every thread. This seems
to me the kind of thing that might be python-implementation specific.
For example, the original "Simple Generators" specification, PEP 255,
makes no mention of expected behaviour of generators when multiple
calls are made to the its .next() method after the iteration is
exhausted. That I can see anyway? Am I wrong?

http://www.python.org/peps/pep-0255.html

regards,

-- 
alan kennedy
------------------------------------------------------
check http headers here: http://xhaus.com/headers
email alan:              http://xhaus.com/contact/alan



More information about the Python-list mailing list