import thread import functor # Don Beaudry's functor module class Condition: """Stolen from distribution's demo/threads/condition.py. Thanks, Tim.""" def __init__(self, mutex): # The monitor mutex that this condition is associated with self.mutex = mutex # lock used to block threads until a signal self.checkout = thread.allocate_lock() self.checkout.acquire() # internal critical-section lock, & the data it protects self.idlock = thread.allocate_lock() self.id = 0 self.waiting = 0 # num waiters subject to current release self.pending = 0 # num waiters awaiting next signal self.torelease = 0 # num waiters to release self.releasing = 0 # 1 iff release is in progress def acquire(self): self.mutex.acquire() def release(self): self.mutex.release() def wait(self): mutex, checkout, idlock = self.mutex, self.checkout, self.idlock if not mutex.locked(): raise ValueError, \ "condition must be .acquire'd when .wait() invoked" idlock.acquire() myid = self.id self.pending = self.pending + 1 idlock.release() mutex.release() while 1: checkout.acquire(); idlock.acquire() if myid < self.id: break checkout.release(); idlock.release() self.waiting = self.waiting - 1 self.torelease = self.torelease - 1 if self.torelease: checkout.release() else: self.releasing = 0 if self.waiting == self.pending == 0: self.id = 0 idlock.release() mutex.acquire() def signal(self): self.broadcast(1) def broadcast(self, num = -1): if num < -1: raise ValueError, '.broadcast called with num ' + `num` if num == 0: return self.idlock.acquire() if self.pending: self.waiting = self.waiting + self.pending self.pending = 0 self.id = self.id + 1 if num == -1: self.torelease = self.waiting else: self.torelease = min( self.waiting, self.torelease + num ) if self.torelease and not self.releasing: self.releasing = 1 self.checkout.release() self.idlock.release() def wrap_function(func): """Nest func inside a new function that does the locking. Uses Don Beaudry's functor module (see contrib software on python.org). """ def new_func(func, self, *args, **kw): self._fMonitor__mutex.acquire() try: return apply(func, (self,) + args, kw) finally: self._fMonitor__mutex.release() # wrap old function inside new function # man would I like nested lexical scope return functor.functor(new_func, (func,), {}) class fMonitor: """Functor monitor""" def __init__(self): if not hasattr(self, '__mutex'): self.__mutex = thread.allocate_lock() if not hasattr(Monitor, '__entry'): # race condition is possible self.__entry = 1 klass = self.__class__ for meth in dir(klass): if meth[:7] == 'entry__': wrapname = meth[7:] setattr(klass, wrapname, wrap_function(getattr(klass, meth))) def create_condition(self): if not hasattr(self, '_fMonitor__mutex'): self.__mutex = thread.allocate_lock() return Condition(self.__mutex) # used by Monitor (below) to wrap entry__ functions template = \ """def %s(self, *args, **kw): self._Monitor__mutex.acquire() try: apply(self.%s, args, kw) finally: self._Monitor__mutex.release() self.__class__.%s = %s """ class Monitor: """Exec monitor""" def __init__(self): if not hasattr(self, '__mutex'): self.__mutex = thread.allocate_lock() if not hasattr(self.__class__, '__entry'): # race condition is possible self.__entry = 1 klass = self.__class__ print dir(klass) for meth in dir(klass): if meth[:7] == 'entry__': wrapname = meth[7:] src_code = template % (wrapname, meth, wrapname, wrapname) exec src_code def create_condition(self): if not hasattr(self, '_Monitor__mutex'): self.__mutex = thread.allocate_lock() return Condition(self.__mutex) if __name__ == "__main__": import time import random class Collector(Monitor): def __init__(self, count): Monitor.__init__(self) self.count = count self.cond = self.create_condition() def entry__done(self): self.count = self.count - 1 self.cond.broadcast() def entry__wait(self): while self.count > 0: self.cond.wait() return class RdWrLock(Monitor): def __init__(self): Monitor.__init__(self) self.readers = 0 self.readWaiters = 0 self.writeWaiters = 0 self.readCond = self.create_condition() self.writeCond = self.create_condition() def __msg(self, txt): ## print "%d: %s" % (thread.get_ident(), txt) pass def entry__acquireShared(self): self.readWaiters = self.readWaiters + 1 if self.writeWaiters > 0: self.__msg('let the writer go') self.readCond.wait() while self.readers < 0: self.__msg('waiting for writer') self.readCond.wait() self.readWaiters = self.readWaiters - 1 self.readers = self.readers + 1 self.readCond.signal() def entry__releaseShared(self): self.readers = self.readers - 1 if self.readers == 0: self.writeCond.signal() def entry__acquireExclusive(self): self.writeWaiters = self.writeWaiters + 1 while self.readers > 0: self.__msg('waiting for reader(s)') self.writeCond.wait() self.writeWaiters = self.writeWaiters - 1 self.readers = -1 def entry__releaseExclusive(self): self.readers = 0 if self.readWaiters > 0: self.__msg('wakeup readers') self.readCond.broadcast() else: self.__msg('wakeup writers') self.writeCond.signal() def reader(done, rwl, times): while times > 0: time.sleep(random.randint(1, 5)) rwl.acquireShared() print "read(%d)" % thread.get_ident() time.sleep(1) rwl.releaseShared() times = times - 1 done.done() def writer(done, rwl, times): while times > 0: time.sleep(random.randint(5, 10)) rwl.acquireExclusive() print "write(%d)" % thread.get_ident() time.sleep(1) rwl.releaseExclusive() times = times - 1 done.done() RWL = RdWrLock() numReaders = 10 numWriters = 3 collector = Collector(numReaders + numWriters) for i in range(numReaders): thread.start_new_thread(reader, (collector, RWL, 15)) for i in range(numWriters): thread.start_new_thread(writer, (collector, RWL, 5)) collector.wait() print "Done"