[Python-ideas] channel (synchronous queue)
shibturn
shibturn at gmail.com
Sun Feb 19 19:07:21 CET 2012
On 19/02/2012 5:05pm, Sturla Molden wrote:
> from multiprocessing import Event
> from math import ceil, log
> ...
I presume rank is the index of the process? Sounds very MPIish.
One problem with multiprocessing's Event uses 5 semaphores. (Condition
uses 4 and Lock, RLock, Semaphore use 1). So your Barrier will use
5*numproc semaphores. This is likely to be a problem for those Unixes
(such as oldish versions of FreeBSD) which allow a very limited number
of semaphores.
It would probably better to use something which has an API which is a
closer match to threading.Barrier. The code below gets closer in API
but does not implement reset() (which I think is pretty pointless
anyway), and wait() returns None instead of an index. It is not
properly tested though.
import multiprocessing as mp
class BrokenBarrierError(Exception):
pass
class Barrier(object):
def __init__(self, size):
assert size > 0
self.size = size
self._lock = mp.Lock()
self._entry_sema = mp.Semaphore(size-1)
self._exit_sema = mp.Semaphore(0)
self._broken_sema = mp.BoundedSemaphore(1)
def wait(self, timeout=None):
if self.broken:
raise BrokenBarrierError
try:
if self._entry_sema.acquire(timeout=0):
if not self._exit_sema.acquire(timeout=timeout):
self.abort()
else:
for i in range(self.size-1):
self._exit_sema.release()
for i in range(self.size-1):
self._entry_sema.release()
except:
self.abort()
raise
if self.broken:
raise BrokenBarrierError
def abort(self):
with self._lock:
self._broken_sema.acquire(timeout=5)
for i in range(self.size):
self._entry_sema.release()
self._exit_sema.release()
def reset(self):
raise NotImplementedError
@property
def broken(self):
with self._lock:
if not self._broken_sema.acquire(timeout=0):
return True
self._broken_sema.release()
return False
##
import time, random
def child(b,l):
for i in range(5):
time.sleep(random.random()*5)
with l:
print i, "entering barrier:", mp.current_process().name
b.wait()
with l:
print '\t', i, "exiting barrier:", mp.current_process().name
if __name__ == '__main__':
b = Barrier(5)
l = mp.Lock()
for i in range(5):
mp.Process(target=child, args=(b,l)).start()
time.sleep(10)
print("ABORTING")
b.abort()
More information about the Python-ideas
mailing list