Suggested generator to add to threading module.

Ype Kingma ykingma at accessforall.nl
Sat Jan 17 14:37:54 EST 2004


Alan,

you wrote:
> [Andrae Muys]
>>>> Found myself needing serialised access to a shared generator from
>>>> multiple threads.  Came up with the following
>>>>
>>>> def serialise(gen):
>>>>   lock = threading.Lock()
>>>>   while 1:
>>>>     lock.acquire()
>>>>     try:
>>>>       next = gen.next()
>>>>     finally:
>>>>       lock.release()
>>>>     yield next
> 
> [Ype Kingma]
>>> Is there any reason why the lock is not shared among threads?
>>> From the looks of this, it doesn't synchronize anything
>>> between different threads. Am I missing something?
> 
> [Jeff Epler]
>> Yes, I think so.  You'd use the same "serialise" generator object in
>> multiple threads, like this:
>> 
>> p = seralise(producer_generator())
>> threads = [thread.start_new(worker_thread, (p,))
>>                 for t in range(num_workers)]
> 
> Hmm. I think Ype is right: the above code does not correctly serialise
> access to a generator.

Well, I just reread PEP 255, and I can assure you a was missing something...
 
> The above serialise function is a generator which wraps a generator.
> This presumably is in order to prevent the wrapped generators .next()
> method being called simultaneously from multiple threads (which is
> barred: PEP 255: "Restriction:  A generator cannot be resumed while it
> is actively running")
> 
> http://www.python.org/peps/pep-0255.html
> 
> However, the above implementation re-creates the problem by using an
> outer generator to wrap the inner one. The outer's .next() method will
> then potentially be called simultaneously by multiple threads. The

I agree (after rereading the PEP.)

> following code illustrates the problem
> 
> #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
> import time
> import thread
> import threading
> 
> def serialise(gen):
>   lock = threading.Lock()
>   while 1:
>     lock.acquire()
>     try:
>       next = gen.next()
>     finally:
>       lock.release()
>     yield next
> 
> def squares(n):
>   i = 1
>   while i < n:
>     yield i*i
>     i = i+1
> 
> def worker_thread(iter, markers):
>   markers[thread.get_ident()] = 1
>   results = [] ; clashes = 0
>   while 1:
>     try:
>       results.append(iter.next())
>     except StopIteration:
>       break
>     except ValueError, ve:
>       if str(ve) == "generator already executing":
>         clashes = clashes + 1
>   del markers[thread.get_ident()]
>   print "Thread %5s: %d results: %d clashes." % (thread.get_ident(),\
>    len(results), clashes)
> 
> numthreads = 10 ; threadmarkers = {}
> serp = serialise(squares(100))
> threads = [thread.start_new_thread(worker_thread,\
>             (serp, threadmarkers)) for t in xrange(numthreads)]
> while len(threadmarkers.keys()) > 0:
>   time.sleep(0.1)
> #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
> 
> I believe that the following definition of serialise will correct the
> problem (IFF I've understood the problem correctly :-)
> 
> #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
> import time
> import thread
> import threading
> 
> class serialise:
>   "Wrap a generator in an iterator for thread-safe access"
> 
>   def __init__(self, gen):
>     self.lock = threading.Lock()
>     self.gen = gen
> 
>   def __iter__(self):
>     return self
> 
>   def next(self):
>     self.lock.acquire()
>     try:
>       return self.gen.next()
>     finally:
>       self.lock.release()

Looks like a candidate for inclusion in a standard library to me.
 
> def squares(n):
>   i = 1
>   while i < n:
>     yield i*i
>     i = i+1
> 
> def worker_thread(iter, markers):
>   markers[thread.get_ident()] = 1
>   results = [] ; clashes = 0
>   while 1:
>     try:
>       results.append(iter.next())
>     except StopIteration:
>       break
>     except ValueError, ve:
>       if str(ve) == "generator already executing":
>         clashes = clashes + 1
>   del markers[thread.get_ident()]
>   print "Thread %5s: %d results: %d clashes." % (thread.get_ident(),\
>    len(results), clashes)
> 
> numthreads = 10 ; threadmarkers = {}
> serp = serialise(squares(100))
> threads = [thread.start_new_thread(worker_thread,\
>             (serp, threadmarkers)) for t in xrange(numthreads)]
> while len(threadmarkers.keys()) > 0:
>   time.sleep(0.1)
> #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
> 
> Also, I don't know if I'm happy with relying on the fact that the
> generator raises StopIteration for *every* .next() call after the
> actual generated sequence has ended. The above code depends on the
> exhausted generator raising StopIteration in every thread. This seems
> to me the kind of thing that might be python-implementation specific.
> For example, the original "Simple Generators" specification, PEP 255,
> makes no mention of expected behaviour of generators when multiple
> calls are made to the its .next() method after the iteration is
> exhausted. That I can see anyway? Am I wrong?

Quoting from PEP 234:
http://www.python.org/peps/pep-0234.html

"Once a particular iterator object has raised StopIteration, will
it also raise StopIteration on all subsequent next() calls?
...
Resolution: once StopIteration is raised, calling it.next()
continues to raise StopIteration."

Thanks to all for the help,

Ype




More information about the Python-list mailing list