[Python-ideas] channel (synchronous queue)

shibturn shibturn at gmail.com
Sun Feb 19 19:07:21 CET 2012


On 19/02/2012 5:05pm, Sturla Molden wrote:
> from multiprocessing import Event
> from math import ceil, log
> ...

I presume rank is the index of the process?  Sounds very MPIish.

One problem with multiprocessing's Event uses 5 semaphores.  (Condition 
uses 4 and Lock, RLock, Semaphore use 1).  So your Barrier will use 
5*numproc semaphores.  This is likely to be a problem for those Unixes 
(such as oldish versions of FreeBSD) which allow a very limited number 
of semaphores.

It would probably better to use something which has an API which is a 
closer match to threading.Barrier.  The code below gets closer in API 
but does not implement reset() (which I think is pretty pointless 
anyway), and wait() returns None instead of an index.  It is not 
properly tested though.


import multiprocessing as mp

class BrokenBarrierError(Exception):
     pass

class Barrier(object):

     def __init__(self, size):
         assert size > 0
         self.size = size
         self._lock = mp.Lock()
         self._entry_sema = mp.Semaphore(size-1)
         self._exit_sema = mp.Semaphore(0)
         self._broken_sema = mp.BoundedSemaphore(1)

     def wait(self, timeout=None):
         if self.broken:
             raise BrokenBarrierError
         try:
             if self._entry_sema.acquire(timeout=0):
                 if not self._exit_sema.acquire(timeout=timeout):
                     self.abort()
             else:
                 for i in range(self.size-1):
                     self._exit_sema.release()
                 for i in range(self.size-1):
                     self._entry_sema.release()
         except:
             self.abort()
             raise
         if self.broken:
             raise BrokenBarrierError

     def abort(self):
         with self._lock:
             self._broken_sema.acquire(timeout=5)
         for i in range(self.size):
             self._entry_sema.release()
             self._exit_sema.release()

     def reset(self):
         raise NotImplementedError

     @property
     def broken(self):
         with self._lock:
             if not self._broken_sema.acquire(timeout=0):
                 return True
             self._broken_sema.release()
             return False

##

import time, random

def child(b,l):
     for i in range(5):
         time.sleep(random.random()*5)
         with l:
             print i, "entering barrier:", mp.current_process().name
         b.wait()
         with l:
             print '\t', i, "exiting barrier:", mp.current_process().name

if __name__ == '__main__':
     b = Barrier(5)
     l = mp.Lock()
     for i in range(5):
         mp.Process(target=child, args=(b,l)).start()
     time.sleep(10)
     print("ABORTING")
     b.abort()




More information about the Python-ideas mailing list