Queue.Queue-like class without the busy-wait
Antoon Pardon
apardon at forel.vub.ac.be
Tue Mar 29 06:01:19 EST 2005
Op 2005-03-25, Paul Rubin schreef <http>:
> Antoon Pardon <apardon at forel.vub.ac.be> writes:
>> Well maybe you could use an os.pipe as a timeout lock then. When the lock is
>> instantiated you put one byte in it. Aquiring the lock is implemented by
>> reading one byte, releasing the lock is implemented by writing a byte.
>> Aquiring the lock with a timeout would make use of select.
>
> Hmm, if I understand what you're saying, you'd need a separate pipe
> for every lock. Interesting idea but I think it would burn too many
> file descriptors if you used a lot of them. My mentioning select also
> is showing my age. That was the way of doing short sleeps before
> usleep became widespread.
>
>> > A signal handler in the main thread could release a lock that the
>> > thread is waiting on.
>>
>> This wouldn't work. A thread would have no way knowing for what
>> purpose the lock was released, because the lock was released
>> by the thread holding the lock or because the signal handler
>> released the lock, both would look the same for the thread
>> aquiring the lock.
>
> Yes, you'd need a separate lock for each blocked thread. There would
> be a list of locks waiting for timeouts and the sigalarm handler would
> release any for which a wakeup was due. You could use a priority
> queue to maintain the timeout list, so that adding or servicing a
> timeout would be O(log n).
Well have a look at what I have written over the weekend. It uses
a seperate thread with one pipe for a wakeup mechanisme. I didn't
like using a signal handler because you never know what other
modules might have use for signals and how they might interfere.
Try it out and let me know what you think.
Not thoroughly tested:
------------------------------- tlock.py --------------------------------------
import threading
import os
import time
from heapq import heappush, heappop
from weakref import ref
from select import select
heapmutex = threading.Lock()
heap = []
heappipe = os.pipe()
sentinel = 365.25 * 50 * 24 * 3600 # 50 jaar
heappush(heap, (time.time() + sentinel, None, None, None))
class _Plock:
def __init__(self):
self.mutex = threading.Lock()
self.broken = False
def acquire(self):
self.mutex.acquire()
def release(self):
self.mutex.release()
class TimeOut(Exception):
pass
class Tlock:
def __init__(self):
self.mutex = threading.Lock()
self.locktable = [_Plock()]
def acquire(self, timeout=None):
self.mutex.acquire()
newlock = _Plock()
newlock.acquire()
self.locktable.append(newlock)
prevlock = self.locktable[-2]
if len(self.locktable) > 2 and timeout is not None:
heapmutex.acquire()
heappush(heap, (time.time() + timeout, ref(prevlock), self.mutex, self.locktable))
os.write(heappipe[1] , '-')
heapmutex.release()
self.mutex.release()
prevlock.acquire()
if prevlock.broken:
raise TimeOut, "lock timed out"
def release(self):
self.mutex.acquire()
self.locktable[0].release()
del self.locktable[0]
self.locktable[0].release()
self.mutex.release()
def lock_breaker():
heapfd = heappipe[0]
while True:
heapmutex.acquire()
waketime, pl, mutex, table = heap[0]
timeout = waketime - time.time()
while timeout <= 0.0:
lck = pl()
if lck is not None:
mutex.acquire()
try:
try:
i = table.index(lck, 1)
del table[i]
lck.broken = True
lck.release()
# lck.release()
except ValueError:
pass
finally:
mutex.release()
heappop(heap)
waketime, pl, mutex, table = heap[0]
timeout = waketime - time.time()
heapmutex.release()
rdlst, wrlst, erlst = select([heapfd],[],[],timeout)
if rdlst:
os.read(rdlst[0],1)
breaker = threading.Thread(target = lock_breaker)
breaker.setDaemon(True)
breaker.start()
if __name__ == "__main__":
from time import sleep
from random import randint
T = Tlock()
rg = 5
def thrd(Id):
for Nr in xrange(20):
try:
print "Trying %d (loop %d)" % (Id, Nr)
T.acquire(randint(0,rg))
print "Entering %d (loop %d)" % (Id, Nr)
sleep(randint(0,rg))
print "Leaving %d (loop %d)" % (Id, Nr)
T.release()
except TimeOut, ErrId:
print "Failed %d (loop %d)" % (Id, Nr)
sleep(randint(0,rg))
for i in xrange(rg):
th = threading.Thread(target=thrd, args=(i,))
th.start()
More information about the Python-list
mailing list