feedback requested

castironpi at gmail.com castironpi at gmail.com
Thu Feb 28 11:09:01 EST 2008


I have a data structure I think would be very useful.  It passes a few
test cases, but one attempt to optimize it failed, so that may
indicate a bug.  Will anyone help me debug it, verify it, or clean it?

It pertains to multi-threading and is a synchro. structure.  If it is
not an interest of yours, please let it be someone else's.  It's an
augmented Lock that I described a few weeks ago (different from the
FlowStep structure I was developing with Mr. Bieber).

My goal is to return Deadlock from acquire() if its blocking would
directly create deadlock.  Basic example:

thread-1 acquires lockA
thread-2 acquires lockB
thread-1 calls to acquire lockB and blocks
thread-2 calls to acquire lockA and blocks

and neither can ever continue.  There is enough information to deny
thread-2's request, and return at once with failure.  The method is
already somewhat treated in other structures, but I did not find one
for Python.

thread-2 calls to acquire lockA and returns Deadlock flag.

I would appreciate a half-hour and a thought or observation you make,
bugs you spot, &c.  It lacks a blocking argument to acquire;
CustThread and Trylock should garbage collect themselves (note the set
of all); Trylock().lock should be an RLock; and I shouldn't need the
CustThread class at all, if anyone's going to publish this, but it can
be used for free right now.

It's runs in Python 3.0 and 2.5.

The current test consists of 6 threads randomly acquiring 20 locks and
infinitely looping.  Thanks in advance.

from __future__ import with_statement
import threading
import thread
from time import sleep
from collections import deque

class CustThread:
    count= 0
    all= {}
    def __init__( self, threadid ):
        assert threadid not in CustThread.all
        CustThread.count+= 1
        self.id= CustThread.count
        self.has= set()
        self.waits= set()
    def __repr__( self ):
        return '<CustThread %i>'% self.id
    def s( self ):
        return self, 'has', self.has, 'waits', self.waits

try:
    from collections import namedtuple
    Pair= namedtuple( "Pair", "thread lock" )
except:
    class Pair:
        def __init__( self, thread, lock ):
            self.thread, self.lock= thread, lock

Acquires, Blocks, Deadlocks= object(), object(), object()
resultdict= { Acquires: 'Acquires', Blocks: 'Blocks', Deadlocks:
'Deadlocks' }

class Trylock:
    count= 0
    locks= set()
    _threads= {}
    _goplock= threading.Lock()
    def __init__( self, *ar, **kwar ):
        #self.lock= threading.RLock()
        self.lock= threading.Lock()
        self._oplock= threading.Lock()
        Trylock.locks.add( self )
        self.id= Trylock.count
        Trylock.count+= 1
    def __repr__( self ):
        return '<Trylock %i>'% self.id
    def acquire( self ):
        callerid= thread.get_ident()
        with Trylock._goplock:
            caller= Trylock._threads.get( callerid )
            if caller is None:
                caller= CustThread( callerid )
                Trylock._threads[ callerid ]= caller
        with Trylock._goplock:
            if self._cycles():
                return Deadlocks
            caller.waits.add( self )
        ret= self.lock.acquire()
        with Trylock._goplock:
            caller.waits.remove( self )
            caller.has.add( self )
        return Acquires
    def release( self ):
        with self._oplock:
            has= [ th for th in Trylock._threads.values() if self in
th.has ]
            assert len( has )== 1
            has[0].has.remove( self )
            self.lock.release()
    def __enter__( self ):
        if not self.acquire():
            raise Exception( 'Deadlock' )
    def __exit__( self, t, v, tb ):
        self.release()
    def _cycles( self ):
        inth= Trylock._threads[ thread.get_ident() ]
        inlock= self
        edges= [ Pair( th, ck ) for th in Trylock._threads.values()
for ck in th.has| th.waits ]
        inpair= Pair( inth, inlock )
        edges.append( inpair )

        d= deque( [ e for e in edges if e.lock is inlock ] )
        while d:
            cur= d.popleft()
            locks= [ e.lock for e in edges if e.thread is cur.thread
and e.lock is not cur.lock ]
            for ck in locks:
                nexts= [ e for e in edges if ck is e.lock and e.thread
is not cur.thread ]
                if inpair in nexts: return True
                d.extend( nexts )
        return False

def main( func ):
    if __name__== '__main__':
        func()

@main
def fmain():
    import random
    locks= [ Trylock() for _ in range( 20 ) ]
    def th1( i ):
        while 1:
            lock= random.choice( locks )
            ret= lock.acquire()
            if ret is not Acquires:
                continue
            print( '%i th lock %s acquire\n'% ( i, lock ) ),
            sleep( .0001 )
            lock2= random.choice( locks )
            if lock2 is lock:
                pass
            elif lock2.acquire() is Acquires:
                print( '%i th lock2 %s acquire\n'% ( i, lock ) ),
                sleep( .0001 )
                lock2.release()
            lock.release()
            print( '%i th lock %s release\n'% ( i, lock ) ),
    ths= [ threading.Thread( target= th1, args= ( i, ) ) for i in
range( 6 ) ]
    [ th.start() for th in ths ]




More information about the Python-list mailing list