Queue.Queue-like class without the busy-wait

Antoon Pardon apardon at forel.vub.ac.be
Tue Mar 29 06:01:19 EST 2005


Op 2005-03-25, Paul Rubin schreef <http>:
> Antoon Pardon <apardon at forel.vub.ac.be> writes:
>> Well maybe you could use an os.pipe as a timeout lock then. When the lock is
>> instantiated you put one byte in it. Aquiring the lock is implemented by
>> reading one byte, releasing the lock is implemented by writing a byte.
>> Aquiring the lock with a timeout would make use of select.
>
> Hmm, if I understand what you're saying, you'd need a separate pipe
> for every lock.  Interesting idea but I think it would burn too many
> file descriptors if you used a lot of them.  My mentioning select also
> is showing my age.  That was the way of doing short sleeps before
> usleep became widespread.
>
>> > A signal handler in the main thread could release a lock that the
>> > thread is waiting on.
>> 
>> This wouldn't work. A thread would have no way knowing for what
>> purpose the lock was released, because the lock was released
>> by the thread holding the lock or because the signal handler
>> released the lock, both would look the same for the thread
>> aquiring the lock.
>
> Yes, you'd need a separate lock for each blocked thread.  There would
> be a list of locks waiting for timeouts and the sigalarm handler would
> release any for which a wakeup was due.  You could use a priority
> queue to maintain the timeout list, so that adding or servicing a
> timeout would be O(log n).

Well have a look at what I have written over the weekend. It uses
a seperate thread with one pipe for a wakeup mechanisme. I didn't
like using a signal handler because you never know what other
modules might have use for signals and how they might interfere.
Try it out and let me know what you think.


Not thoroughly tested:

------------------------------- tlock.py --------------------------------------

import threading
import os

import time

from heapq import heappush, heappop
from weakref import ref

from select import select

heapmutex = threading.Lock()
heap = []
heappipe = os.pipe()
sentinel = 365.25 * 50 * 24 * 3600  # 50 jaar
heappush(heap, (time.time() + sentinel, None, None, None))

class _Plock:
  
  def __init__(self):
    self.mutex = threading.Lock()
    self.broken = False

  def acquire(self):
    self.mutex.acquire()

  def release(self):
    self.mutex.release()

class TimeOut(Exception):
  pass


class Tlock:

  def __init__(self):
    self.mutex = threading.Lock()
    self.locktable = [_Plock()]

  def acquire(self, timeout=None):
    self.mutex.acquire()
    newlock = _Plock()
    newlock.acquire()
    self.locktable.append(newlock)
    prevlock = self.locktable[-2]
    if len(self.locktable) > 2 and timeout is not None:
      heapmutex.acquire()
      heappush(heap, (time.time() + timeout, ref(prevlock), self.mutex, self.locktable))
      os.write(heappipe[1] , '-')
      heapmutex.release()
    self.mutex.release()
    prevlock.acquire()
    if prevlock.broken:
      raise TimeOut, "lock timed out"

  def release(self):
    self.mutex.acquire()
    self.locktable[0].release()
    del self.locktable[0]
    self.locktable[0].release()
    self.mutex.release()


def lock_breaker():

  heapfd = heappipe[0]
  while True:
    heapmutex.acquire()
    waketime, pl, mutex, table = heap[0]
    timeout = waketime - time.time()
    while timeout <= 0.0:
      lck = pl()
      if lck is not None:
        mutex.acquire()
        try:
          try:
            i = table.index(lck, 1)
            del table[i]
            lck.broken = True
            lck.release()
            # lck.release()
          except ValueError:
            pass
        finally:
          mutex.release()
      heappop(heap)
      waketime, pl, mutex, table = heap[0]
      timeout = waketime - time.time()
    heapmutex.release()
    rdlst, wrlst, erlst = select([heapfd],[],[],timeout)
    if rdlst:
      os.read(rdlst[0],1)

breaker = threading.Thread(target = lock_breaker)
breaker.setDaemon(True)
breaker.start()

if __name__ == "__main__":

  from time import sleep
  from random import randint

  T = Tlock()

  rg = 5

  def thrd(Id):

    for Nr in xrange(20):
      try:
        print "Trying   %d (loop %d)" % (Id, Nr)
        T.acquire(randint(0,rg))
        print "Entering %d (loop %d)" % (Id, Nr)
        sleep(randint(0,rg))
        print "Leaving  %d (loop %d)" % (Id, Nr)
        T.release()
      except TimeOut, ErrId:
        print "Failed   %d (loop %d)" % (Id, Nr)
      sleep(randint(0,rg))


  for i in xrange(rg):
    th = threading.Thread(target=thrd, args=(i,))
    th.start()



More information about the Python-list mailing list