[py-svn] r39983 - in py/trunk/py/net: . test
fijal at codespeak.net
fijal at codespeak.net
Tue Mar 6 14:54:35 CET 2007
Author: fijal
Date: Tue Mar 6 14:54:33 2007
New Revision: 39983
Modified:
py/trunk/py/net/greenexecnet.py
py/trunk/py/net/greensock2.py
py/trunk/py/net/test/test_greensock2.py
Log:
(arigo, fijal) -
* oneof(functions) runs all functions in parallel
until one returns a value, then returns that value
* allof(functions) same as previous, but returns all
return values in a tuple
Modified: py/trunk/py/net/greenexecnet.py
==============================================================================
--- py/trunk/py/net/greenexecnet.py (original)
+++ py/trunk/py/net/greenexecnet.py Tue Mar 6 14:54:33 2007
@@ -189,7 +189,7 @@
os.dup2(fd, 0)
os.dup2(fd, 1)
os.close(fd)
- greensock2.wait(gw.greenlet)
+ greensock2._suspend_forever()
run_server = staticmethod(run_server)
class PopenGateway(PopenCmdGateway):
Modified: py/trunk/py/net/greensock2.py
==============================================================================
--- py/trunk/py/net/greensock2.py (original)
+++ py/trunk/py/net/greensock2.py Tue Mar 6 14:54:33 2007
@@ -196,80 +196,6 @@
d.clear()
d.extend(lst)
-# ____________________________________________________________
-
-##class Queue(object):
-
-## def __init__(self):
-## self.giver, self.accepter = meetingpoint()
-## self.pending = deque()
-
-## def put(self, item): # preserve the caller's atomicity
-## self.pending.append(item)
-## if self.accepter.ready():
-## self.accepter.accept()
-
-## def get(self, block=True):
-## if self.pending:
-## return self.pending.popleft()
-## elif block:
-## self.giver.give(None)
-## return self.pending.popleft()
-## else:
-## raise Empty
-
-##class Empty(Interrupted):
-## pass
-
-##class Event(object):
-
-## def __init__(self):
-## self.giver, self.accepter = meetingpoint()
-
-## clear = __init__
-
-## def isSet(self):
-## return self.accepter is None
-
-## def set(self): # preserve the caller's atomicity
-## if self.accepter is not None:
-## accepter = self.accepter
-## self.giver = self.accepter = None
-## while accepter.ready(): # wake up all waiters
-## accepter.accept()
-
-## def wait(self, timeout=None):
-## if self.accepter is not None:
-## if timeout is None:
-## self.giver.give(None)
-## else:
-## timer = Timer(timeout)
-## try:
-## try:
-## self.giver.give(None)
-## except Interrupted:
-## pass
-## finally:
-## timer.stop()
-
-##class Semaphore(object):
-
-## def __init__(self, value=1):
-## self.giver, self.accepter = meetingpoint()
-## for i in range(value):
-## self.release()
-
-## def acquire(self, blocking=True):
-## if blocking or self.accepter.ready():
-## return self.accepter.accept()
-## else:
-## return False
-
-## def release(self):
-## autogreenlet(self.giver.put, True)
-
-# ____________________________________________________________
-
def wait_input(sock):
_register(g_iwtd, sock)
@@ -339,28 +265,41 @@
in_front = True
-def sleep(duration, *greenlets):
+def sleep(duration):
timer = Timer(duration)
try:
- wait(*greenlets)
+ _suspend_forever()
finally:
ok = timer.finished
timer.stop()
if not ok:
raise Interrupted
-def _wait():
+def _suspend_forever():
g_dispatcher.switch()
-def wait(*greenlets):
- assert greenlets#, "should not wait without events to wait on"
- current = g_getcurrent()
+def oneof(*callables):
+ assert callables
+ for c in callables:
+ assert callable(c)
+ greenlets = [tracinggreenlet(c) for c in callables]
+ g_active.extend(greenlets)
+ res = g_dispatcher.switch()
for g in greenlets:
- if g in g_waiters:
- g_waiters[g].append(current)
- else:
- g_waiters[g] = [current]
- g_dispatcher.switch()
+ g.interrupt()
+ return res
+
+def allof(*callables):
+ for c in callables:
+ assert callable(c)
+ greenlets = [tracinggreenlet(lambda i=i, c=c: (i, c()))
+ for i, c in enumerate(callables)]
+ g_active.extend(greenlets)
+ result = [None] * len(callables)
+ for _ in callables:
+ num, res = g_dispatcher.switch()
+ result[num] = res
+ return tuple(result)
class Timer(object):
started = False
@@ -387,31 +326,31 @@
# ____________________________________________________________
-class autogreenlet(greenlet):
+class tracinggreenlet(greenlet):
def __init__(self, function, *args, **kwds):
- self.parent = g_dispatcher
self.function = function
self.args = args
self.kwds = kwds
- g_active.append(self)
+
+ def __repr__(self):
+## args = ', '.join([repr(s) for s in self.args] +
+## ['%s=%r' % keyvalue for keyvalue in self.kwds.items()])
+## return '<autogreenlet %s(%s)>' % (self.function.__name__, args)
+ return '<%s %s at %s>' % (self.__class__.__name__,
+ self.function.__name__,
+ hex(id(self)))
def run(self):
self.trace("start")
try:
- self.function(*self.args, **self.kwds)
+ res = self.function(*self.args, **self.kwds)
except Exception, e:
self.trace("stop (%s%s)", e.__class__.__name__,
str(e) and (': '+str(e)))
raise
else:
self.trace("done")
-
- def __repr__(self):
-## args = ', '.join([repr(s) for s in self.args] +
-## ['%s=%r' % keyvalue for keyvalue in self.kwds.items()])
-## return '<autogreenlet %s(%s)>' % (self.function.__name__, args)
- return '<autogreenlet %s at %s>' % (self.function.__name__,
- hex(id(self)))
+ return res
def trace(self, msg, *args):
if TRACE:
@@ -420,6 +359,11 @@
def interrupt(self):
self.throw(Interrupted)
+class autogreenlet(tracinggreenlet):
+ def __init__(self, *args, **kwargs):
+ super(autogreenlet, self).__init__(*args, **kwargs)
+ self.parent = g_dispatcher
+ g_active.append(self)
g_active = deque()
g_iwtd = {}
@@ -452,24 +396,25 @@
for k in to_remove:
del mapping[k]
-def check_waiters(active):
- if active in g_waiters:
- for g in g_waiters[active]:
- g.switch()
- del g_waiters[active]
+#def check_waiters(active):
+# if active in g_waiters:
+# for g in g_waiters[active]:
+# g.switch()
+# del g_waiters[active]
def dispatcher_mainloop():
global g_timers_mixed
+ GreenletExit = greenlet.GreenletExit
while 1:
try:
while g_active:
- print 'active:', g_active[0]
- active = g_active.popleft()
- active.switch()
- if active.dead:
- check_waiters(active)
- del active
+ #print 'active:', g_active[0]
+ g_active.popleft().switch()
+# active.switch()
+# if active.dead:
+# check_waiters(active)
+# del active
if g_timers:
if g_timers_mixed:
heapify(g_timers)
@@ -482,8 +427,8 @@
#print 'timeout:', g
timer.finished = True
timer.g.switch()
- if timer.g.dead:
- check_waiters(timer.g)
+# if timer.g.dead:
+# check_waiters(timer.g)
continue
delay = 0.0
timer.started = True
@@ -496,9 +441,9 @@
continue
delay = None
- print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay
+ #print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay
iwtd, owtd, _ = _select(g_iwtd.keys(), g_owtd.keys(), [], delay)
- print 'done'
+ #print 'done'
for s in owtd:
if s in g_owtd:
d = g_owtd[s]
@@ -510,8 +455,8 @@
except KeyError:
pass
g.switch(g_owtd)
- if g.dead:
- check_waiters(g)
+# if g.dead:
+# check_waiters(g)
for s in iwtd:
if s in g_iwtd:
d = g_iwtd[s]
@@ -523,11 +468,13 @@
except KeyError:
pass
g.switch(g_iwtd)
- if g.dead:
- check_waiters(g)
+# if g.dead:
+# check_waiters(g)
+ except GreenletExit:
+ raise
except:
import sys
g_dispatcher.parent.throw(*sys.exc_info())
g_dispatcher = greenlet(dispatcher_mainloop)
-g_waiters = {}
+#g_waiters = {}
Modified: py/trunk/py/net/test/test_greensock2.py
==============================================================================
--- py/trunk/py/net/test/test_greensock2.py (original)
+++ py/trunk/py/net/test/test_greensock2.py Tue Mar 6 14:54:33 2007
@@ -60,8 +60,7 @@
except Interrupted:
lst.append(8)
- g = autogreenlet(cons)
- wait(g)
+ oneof(cons)
assert lst == [4, 5, 1, 145, 6, 2, 87, 7, 3, 8]
@@ -69,24 +68,33 @@
lst = []
def g1():
- sleep(0.1, g_1)
+ sleep(0.1)
lst.append(1)
- sleep(0.2, g_1)
+ sleep(0.2)
lst.append(3)
def g2():
lst.append(0)
- sleep(0.2, g_2)
+ sleep(0.2)
lst.append(2)
- sleep(0.2, g_2)
+ sleep(0.2)
lst.append(4)
- g_1 = autogreenlet(g1)
- g_2 = autogreenlet(g2)
- wait(g_1)
- wait(g_2)
- assert lst == [0, 1, 2, 3, 4]
+ oneof(g1, g2)
+ assert lst == [0, 1, 2, 3]
+def test_kill_other():
+
+ def g1():
+ sleep(.1)
+ return 1
+
+ def g2():
+ sleep(.2)
+ return 2
+
+ res = oneof(g1, g2)
+ assert res == 1
def test_socket():
s1 = socket(AF_INET, SOCK_DGRAM)
@@ -105,6 +113,7 @@
lst.append(3)
sendall(s1, 'world')
lst.append(4)
+ return 1
def g2():
lst.append(1)
@@ -113,13 +122,12 @@
y = recv(s2, 5)
assert y == 'world'
lst.append(5)
+ return 2
- g_1 = autogreenlet(g1)
- g_2 = autogreenlet(g2)
- wait(g_1)
- wait(g_2)
+ one, two = allof(g1, g2)
assert lst == [0, 1, 2, 3, 4, 5]
-
+ assert one == 1
+ assert two == 2
##def test_Queue():
More information about the pytest-commit
mailing list