threading.Semaphore (quite long)

Thomas Rachel nutznetz-0c1b6768-bfa9-48d5-a470-7603bd3aa915 at spamschutz.glglgl.de
Mon Mar 28 16:27:44 EDT 2011


[posted in de.clp in German]

Hello,

I want to implement an alternative concept to worker threads processing 
a job queue. The alternative consists of threads being the jobs 
themselves and thus running only this one job. The job threads are 
started after a Semaphore's acquire() "giving the OK" to do so. The 
Semaphore's release() gets called inside the jobs/threads, saying 
"done, the next one please".

But the program doesn't always stop on Ctrl-C.

So I had a look on threading.Semaphore and found

     def acquire(self, blocking=1):
         rc = False
         self.__cond.acquire() [1]
	[...]
         self.__cond.release() [2]
         return rc

     __enter__ = acquire

     def release(self):
         self.__cond.acquire() [3]
	[...]
         self.__cond.release()

Due to Strg-C, after returning from [1] there was a KeyboardInterrupt. 
[2] was never reached, so the threads deadlocked on [3].

In the other classes (Event, Condition) the "helper lock calls" are 
wrapped with try...finally gepackt, where after Ctrl-C 
self.__cond.release() would be called as well, allowing for the 
currently running jobs to finish.

First I thought about writing a bug report. But does t make it better? 
Because, with the finally: solution mentionned, an exception still can 
happen between finally: and release() - or not? It is less probable, the 
time window being smaller, but not impossible, so this error becomes 
more subtle.


Thus, the following questions:

0. (First and in general) Am I doing "bad things" when calling release() 
at a completely different place than acquire() with a Semaphore? I used 
to believe that's why there are Semaphores?

1. Is a bug report useful, or would that worsen the problem, as the race 
condition doesn't disappear completely?

2. Is in, in general, an error to work with Semaphores (ore, more 
general with Locks) in the MainThread? Or should this be done in a 
separate thread being informed about a keyboard exception by the main 
thread?


Below is a minimal example (t.py) demonstrating the behaviour.

python -m t 1 does the IMHO bad thing: Ctrl-C stops the MainThread and 
has the others starve on the lock.

python -m t 2 does spawning in a separate thread. The MainThread reacts 
to Strg-C with run = False, telling the other threads to stop ASAP.

python -m t 3 improves Semaphore() by putting self.__cond.release() into 
a finally: clause.

python -m t 4 "destroys" this beautiful new concept by throwing an 
Asserion at the appropriate (worst case) place (instead of a 
KeyboardInterrupt).


So the question is: Bug(report) doe to finally:, or used the wrong way?

TIA and greetings,
Thomas

import threading
import time
import random

run = True
destroyed = False

sema = threading.Semaphore(5)
def target():
     print threading.current_thread(), ' works'
     time.sleep(max(random.normalvariate(2, .4), 0))
     sema.release()

class MySema(threading._Semaphore):
     def acquire(self, blocking=1):
         rc = False
         ok = False # for destroying
         try:
             self._Semaphore__cond.acquire()
             while self._Semaphore__value == 0:
                 if not blocking:
                     break
                 if __debug__:
                     self._note("%s.acquire(%s): blocked waiting, value=%s",
                             self, blocking, self._Semaphore__value)
                 self._Semaphore__cond.wait()
             else:
                 self._Semaphore__value = self._Semaphore__value - 1
                 if __debug__:
                     self._note("%s.acquire: success, value=%s",
                             self, self._Semaphore__value)
                 rc = True
             ok = True # for not destroying
         finally:
             if destroyed and not ok: assert 0, "hier kaputt, release() 
doch nicht"
             self._Semaphore__cond.release()
         return rc

     __enter__ = acquire

     def release(self):
         try:
             self._Semaphore__cond.acquire()
             self._Semaphore__value = self._Semaphore__value + 1
             if __debug__:
                 self._note("%s.release: success, value=%s",
                            self, self._Semaphore__value)
             self._Semaphore__cond.notify()
         finally:
             self._Semaphore__cond.release()

def thread():
     t = threading.Thread(target=target)
     t.start()
     return t

def erroneous():
     global run
     try:
         while True:
             sema.acquire()
             t = thread()
             print 'main thread spawned', t
     finally:
         run = False

def extrathread():
     global run
     def extra():
         while run:
             sema.acquire()
             t = thread()
             print 'control thread spawned', t
     threading.Thread(target=extra).start()
     try:
         while True:
             time.sleep(.1)
     finally:
         run = False

def alternative():
     global sema
     sema = MySema(5)
     erroneous()


if __name__ == '__main__':
     import sys
     if len(sys.argv) < 2 or sys.argv[1] == '1':
         f = erroneous
     elif sys.argv[1] == '2':
         f = extrathread
     elif sys.argv[1] == '3':
         f = alternative
     else:
         destroyed = True
         f = alternative
     print f
     f()



More information about the Python-list mailing list