UponAcquiring synchro. class

castironpi at gmail.com castironpi at gmail.com
Sat Mar 1 09:17:59 EST 2008


from __future__ import with_statement
'''
3)  upon_acquiring( lockA, lockB )( function, *ar, **kwar )

upon_acquiring spawns new thread upon acquiring locks A and B.  Locks
may be specified in any order, as none is acquired until all are free.

The options to spawn a new thread upon call, lock, and not release
until "it's its turn"; just block until then; and vary honoring order
are open.

6) @with_self
Prepends the function object to itself to the parameter list

'''
'''
a lockA lockB
b lockA lockB
c lockA lockB
d lockA lockC
e lockB lockD
f lockE lockF

assert a< b
assert b< c
assert c< d
assert c< e
assert f in.
'''
'''
A  (A,B)1 (A,C)2 (A,B)3
B  (A,B)1 (A,B)3
C  (A,C)2 (C,D,E)4
D  (C,D,E)4
E  (C,D,E)4
a.lock: A, a.state: Free, a.waiters: [ X1(a,b), X2(a,c), X3(a,b) ]
b.lock: B, b.state: Free, b.waiters: [ X1(a,b), X3(a,b) ]
c.lock: C, c.state: Free, X3.waiters: [ X2(a,c), X4(c,d,e) ]
d.lock: D, d.state: Free, X3.waiters: [ X4(c,d,e) ]
e.lock: E, e.state: Free, X3.waiters: [ X4(c,d,e) ]

acq a,b
x1= a,b
a.waiters+= x1
b.waiters+= x1
#same as
if a.state is free and b.state is free:
  a.state= taken
  b.state= taken
  a.waiters-= x1
  b.waiters-= x1
  a.lock.release() #acq?
  b.lock.release() #acq?
  x1.lock.release()
acq a,c
x2= a,c
a.waiters+= x2
c.waiters+= x2
if a.state is free and c.state is free:
  a.state= taken
  c.state= taken
  a.waiters-= x2
  c.waiters-= x2
  a.lock.release() #acq?
  c.lock.release() #acq?
  x2.lock.release()
acq a,b
x3= a,b
a.waiters+= x3
b.waiters+= x3
acq c,d,e
x4= c,d,e
c.waiters+= x4
d.waiters+= x4
e.waiters+= x4
'''

from thread import start_new_thread
from threading import Lock, Thread, Event
import time
from functools import partial

class LockDeps:
    def __init__( self, lock, state, waiters ):
        self.lock, self.state, self.waiters= \
            lock, state, waiters
class LockSet:
    #ok to use them elsewhere, just gets in line.
    def __init__( self, *locks ):
        self._locks= locks
        self._lock= Lock()
        self._lock.acquire()
        self._remains= set( locks )
        self._doneevt= Event()
        self.th= None
        self.retval= None
    def release( self, *locks ):
        for lock in locks:
            lock.release()
            self._remains.remove( lock )
    def releaseall( self ):
        for lock in self._remains:
            lock.release()
        self._remains.clear()

class UponAcquiring:
    def __init__( self ):
        self._deps= {}
        self._oplock= Lock()
    def acq( self, *locks ):
        lckset= LockSet( *locks )
        return partial( self._enqueue, lckset )
    def _enqueue( self, lckset, fun, *ar, **kwar ):
        with self._oplock:
            for lock in lckset._locks:
                dep= self._deps.get( lock )
                if None is dep:
                    dep= LockDeps( lock, False, [] )
                    self._deps[ lock ]= dep
                dep.waiters.append( lckset )
        th= Thread( target= self._functhd,
            args= ( lckset, fun )+ ar,
            kwargs= kwar )
        lckset.th= th
        th.start()
        self._analyze( lckset )
        return lckset
    def _functhd( self, lckset, fun, *ar, **kwar ):
        try:
            with lckset._lock:
                lckset.retval=\
                    fun( lckset, *ar, **kwar )
            lckset._doneevt.set()
        finally:
            with self._oplock:
                lckset.releaseall()
                for lock in lckset._locks:
                    self._deps[ lock ].state= False
            self._analyze( lckset )
    def _analyze( self, lckset ):
        with self._oplock:
            for lock in lckset._locks:
                dep= self._deps[ lock ]
                if dep.state: continue
                for lckset in dep.waiters:
                    assert lock in lckset._locks
                    for lock2 in lckset._locks:
                        if self._deps[  lock2 ].state:
                            break
                    else:
                        for lock2 in lckset._locks:
                            dep2= self._deps[ lock2 ]
                            dep2.state= True
                            assert dep2.waiters.count(
                                lckset )== 1
                            dep2.waiters.remove(
                                lckset )
                            lock2.acquire()
                        lckset._lock.release()
                        break

results= []
ver= results.index
lcksets= set()
import random
from sys import stdout
def callback( locks, i ):
    stdout.write( 'cb%i '% i )
    time.sleep( random.uniform( 0, .01 ) )
    results.append( i )
    if random.choice( [ False, True ] ):
        locks.releaseall()
    #if random.random()< 0.1:
    #   raise Exception()

while 1:
    class Case1:
        lockA, lockB, lockC= Lock(), Lock(), Lock()
        lockD, lockE, lockF= Lock(), Lock(), Lock()
        a= ( lockA, lockB )
        b= ( lockA, lockB )
        c= ( lockA, lockB )
        d= ( lockA, lockC )
        e= ( lockB, lockD )
        f= ( lockE, lockF )

        ua= UponAcquiring()
        for i, x in enumerate( [ a, b, c, d, e, f ] ):
            lcksets.add( ua.acq( *x )( callback, i ) )

        for lckset in lcksets:
            lckset.th.join()
        stdout.write( repr( results ) )
        stdout.write( '\n' )
        stdout.flush()
        assert ver( 0 )< ver( 1 )
        assert ver( 1 )< ver( 2 )
        assert ver( 2 )< ver( 3 )
        assert ver( 2 )< ver( 4 )
        assert len( set( results ) )== len( results )
        '''permissible orders e.g.:
        [0, 5, 1, 2, 3, 4]
        [5, 0, 1, 2, 3, 4]
        [0, 5, 1, 2, 4, 3]
        [5, 0, 1, 2, 4, 3]
        '''
        del results[:]
        lcksets.clear()



More information about the Python-list mailing list