[py-svn] r39983 - in py/trunk/py/net: . test

fijal at codespeak.net fijal at codespeak.net
Tue Mar 6 14:54:35 CET 2007


Author: fijal
Date: Tue Mar  6 14:54:33 2007
New Revision: 39983

Modified:
   py/trunk/py/net/greenexecnet.py
   py/trunk/py/net/greensock2.py
   py/trunk/py/net/test/test_greensock2.py
Log:
(arigo, fijal) -

* oneof(functions) runs all functions in parallel
  until one returns a value, then returns that value

* allof(functions) same as previous, but returns all
  return values in a tuple


Modified: py/trunk/py/net/greenexecnet.py
==============================================================================
--- py/trunk/py/net/greenexecnet.py	(original)
+++ py/trunk/py/net/greenexecnet.py	Tue Mar  6 14:54:33 2007
@@ -189,7 +189,7 @@
         os.dup2(fd, 0)
         os.dup2(fd, 1)
         os.close(fd)
-        greensock2.wait(gw.greenlet)
+        greensock2._suspend_forever()
     run_server = staticmethod(run_server)
 
 class PopenGateway(PopenCmdGateway):

Modified: py/trunk/py/net/greensock2.py
==============================================================================
--- py/trunk/py/net/greensock2.py	(original)
+++ py/trunk/py/net/greensock2.py	Tue Mar  6 14:54:33 2007
@@ -196,80 +196,6 @@
     d.clear()
     d.extend(lst)
 
-# ____________________________________________________________
-
-##class Queue(object):
-
-##    def __init__(self):
-##        self.giver, self.accepter = meetingpoint()
-##        self.pending = deque()
-
-##    def put(self, item):    # preserve the caller's atomicity
-##        self.pending.append(item)
-##        if self.accepter.ready():
-##            self.accepter.accept()
-
-##    def get(self, block=True):
-##        if self.pending:
-##            return self.pending.popleft()
-##        elif block:
-##            self.giver.give(None)
-##            return self.pending.popleft()
-##        else:
-##            raise Empty
-
-##class Empty(Interrupted):
-##    pass
-
-##class Event(object):
-
-##    def __init__(self):
-##        self.giver, self.accepter = meetingpoint()
-
-##    clear = __init__
-
-##    def isSet(self):
-##        return self.accepter is None
-
-##    def set(self):        # preserve the caller's atomicity
-##        if self.accepter is not None:
-##            accepter = self.accepter
-##            self.giver = self.accepter = None
-##            while accepter.ready():   # wake up all waiters
-##                accepter.accept()
-
-##    def wait(self, timeout=None):
-##        if self.accepter is not None:
-##            if timeout is None:
-##                self.giver.give(None)
-##            else:
-##                timer = Timer(timeout)
-##                try:
-##                    try:
-##                        self.giver.give(None)
-##                    except Interrupted:
-##                        pass
-##                finally:
-##                    timer.stop()
-
-##class Semaphore(object):
-
-##    def __init__(self, value=1):
-##        self.giver, self.accepter = meetingpoint()
-##        for i in range(value):
-##            self.release()
-
-##    def acquire(self, blocking=True):
-##        if blocking or self.accepter.ready():
-##            return self.accepter.accept()
-##        else:
-##            return False
-
-##    def release(self):
-##        autogreenlet(self.giver.put, True)
-
-# ____________________________________________________________
-
 def wait_input(sock):
     _register(g_iwtd, sock)
 
@@ -339,28 +265,41 @@
         in_front = True
 
 
-def sleep(duration, *greenlets):
+def sleep(duration):
     timer = Timer(duration)
     try:
-        wait(*greenlets)
+        _suspend_forever()
     finally:
         ok = timer.finished
         timer.stop()
     if not ok:
         raise Interrupted
 
-def _wait():
+def _suspend_forever():
     g_dispatcher.switch()
 
-def wait(*greenlets):
-    assert greenlets#, "should not wait without events to wait on"
-    current = g_getcurrent()
+def oneof(*callables):
+    assert callables
+    for c in callables:
+        assert callable(c)
+    greenlets = [tracinggreenlet(c) for c in callables]
+    g_active.extend(greenlets)
+    res = g_dispatcher.switch()
     for g in greenlets:
-        if g in g_waiters:
-            g_waiters[g].append(current)
-        else:
-            g_waiters[g] = [current]
-    g_dispatcher.switch()
+        g.interrupt()
+    return res
+
+def allof(*callables):
+    for c in callables:
+        assert callable(c)
+    greenlets = [tracinggreenlet(lambda i=i, c=c: (i, c()))
+                 for i, c in enumerate(callables)]
+    g_active.extend(greenlets)
+    result = [None] * len(callables)
+    for _ in callables:
+        num, res = g_dispatcher.switch()
+        result[num] = res
+    return tuple(result)
 
 class Timer(object):
     started = False
@@ -387,31 +326,31 @@
 
 # ____________________________________________________________
 
-class autogreenlet(greenlet):
+class tracinggreenlet(greenlet):
     def __init__(self, function, *args, **kwds):
-        self.parent = g_dispatcher
         self.function = function
         self.args = args
         self.kwds = kwds
-        g_active.append(self)
+
+    def __repr__(self):
+##        args = ', '.join([repr(s) for s in self.args] +
+##                        ['%s=%r' % keyvalue for keyvalue in self.kwds.items()])
+##        return '<autogreenlet %s(%s)>' % (self.function.__name__, args)
+        return '<%s %s at %s>' % (self.__class__.__name__,
+                                  self.function.__name__,
+                                  hex(id(self)))
 
     def run(self):
         self.trace("start")
         try:
-            self.function(*self.args, **self.kwds)
+            res = self.function(*self.args, **self.kwds)
         except Exception, e:
             self.trace("stop (%s%s)", e.__class__.__name__,
                        str(e) and (': '+str(e)))
             raise
         else:
             self.trace("done")
-
-    def __repr__(self):
-##        args = ', '.join([repr(s) for s in self.args] +
-##                        ['%s=%r' % keyvalue for keyvalue in self.kwds.items()])
-##        return '<autogreenlet %s(%s)>' % (self.function.__name__, args)
-        return '<autogreenlet %s at %s>' % (self.function.__name__,
-                                            hex(id(self)))
+            return res
 
     def trace(self, msg, *args):
         if TRACE:
@@ -420,6 +359,11 @@
     def interrupt(self):
         self.throw(Interrupted)
 
+class autogreenlet(tracinggreenlet):
+    def __init__(self, *args, **kwargs):
+        super(autogreenlet, self).__init__(*args, **kwargs)
+        self.parent = g_dispatcher
+        g_active.append(self)
 
 g_active = deque()
 g_iwtd = {}
@@ -452,24 +396,25 @@
     for k in to_remove:
         del mapping[k]
 
-def check_waiters(active):
-    if active in g_waiters:
-        for g in g_waiters[active]:
-            g.switch()
-        del g_waiters[active]
+#def check_waiters(active):
+#    if active in g_waiters:
+#        for g in g_waiters[active]:
+#            g.switch()
+#        del g_waiters[active]
 
 
 def dispatcher_mainloop():
     global g_timers_mixed
+    GreenletExit = greenlet.GreenletExit
     while 1:
         try:
             while g_active:
-                print 'active:', g_active[0]
-                active = g_active.popleft()
-                active.switch()
-                if active.dead:
-                    check_waiters(active)
-                    del active
+                #print 'active:', g_active[0]
+                g_active.popleft().switch()
+#                active.switch()
+#                if active.dead:
+#                    check_waiters(active)
+#                    del active
             if g_timers:
                 if g_timers_mixed:
                     heapify(g_timers)
@@ -482,8 +427,8 @@
                         #print 'timeout:', g
                         timer.finished = True
                         timer.g.switch()
-                        if timer.g.dead:
-                            check_waiters(timer.g)
+#                        if timer.g.dead:
+#                            check_waiters(timer.g)
                         continue
                     delay = 0.0
                 timer.started = True
@@ -496,9 +441,9 @@
                     continue
                 delay = None
 
-            print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay
+            #print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay
             iwtd, owtd, _ = _select(g_iwtd.keys(), g_owtd.keys(), [], delay)
-            print 'done'
+            #print 'done'
             for s in owtd:
                 if s in g_owtd:
                     d = g_owtd[s]
@@ -510,8 +455,8 @@
                         except KeyError:
                             pass
                     g.switch(g_owtd)
-                    if g.dead:
-                        check_waiters(g)
+#                    if g.dead:
+#                        check_waiters(g)
             for s in iwtd:
                 if s in g_iwtd:
                     d = g_iwtd[s]
@@ -523,11 +468,13 @@
                         except KeyError:
                             pass
                     g.switch(g_iwtd)
-                    if g.dead:
-                        check_waiters(g)
+#                    if g.dead:
+#                        check_waiters(g)
+        except GreenletExit:
+            raise
         except:
             import sys
             g_dispatcher.parent.throw(*sys.exc_info())
 
 g_dispatcher = greenlet(dispatcher_mainloop)
-g_waiters = {}
+#g_waiters = {}

Modified: py/trunk/py/net/test/test_greensock2.py
==============================================================================
--- py/trunk/py/net/test/test_greensock2.py	(original)
+++ py/trunk/py/net/test/test_greensock2.py	Tue Mar  6 14:54:33 2007
@@ -60,8 +60,7 @@
         except Interrupted:
             lst.append(8)
 
-    g = autogreenlet(cons)
-    wait(g)
+    oneof(cons)
     assert lst == [4, 5, 1, 145, 6, 2, 87, 7, 3, 8]
 
 
@@ -69,24 +68,33 @@
     lst = []
 
     def g1():
-        sleep(0.1, g_1)
+        sleep(0.1)
         lst.append(1)
-        sleep(0.2, g_1)
+        sleep(0.2)
         lst.append(3)
 
     def g2():
         lst.append(0)
-        sleep(0.2, g_2)
+        sleep(0.2)
         lst.append(2)
-        sleep(0.2, g_2)
+        sleep(0.2)
         lst.append(4)
 
-    g_1 = autogreenlet(g1)
-    g_2 = autogreenlet(g2)
-    wait(g_1)
-    wait(g_2)
-    assert lst == [0, 1, 2, 3, 4]
+    oneof(g1, g2)
+    assert lst == [0, 1, 2, 3]
 
+def test_kill_other():
+
+    def g1():
+        sleep(.1)
+        return 1
+
+    def g2():
+        sleep(.2)
+        return 2
+
+    res = oneof(g1, g2)
+    assert res == 1
 
 def test_socket():
     s1 = socket(AF_INET, SOCK_DGRAM)
@@ -105,6 +113,7 @@
         lst.append(3)
         sendall(s1, 'world')
         lst.append(4)
+        return 1
 
     def g2():
         lst.append(1)
@@ -113,13 +122,12 @@
         y = recv(s2, 5)
         assert y == 'world'
         lst.append(5)
+        return 2
 
-    g_1 = autogreenlet(g1)
-    g_2 = autogreenlet(g2)
-    wait(g_1)
-    wait(g_2)
+    one, two = allof(g1, g2)
     assert lst == [0, 1, 2, 3, 4, 5]
-
+    assert one == 1
+    assert two == 2
 
 ##def test_Queue():
 



More information about the pytest-commit mailing list