[pypy-svn] r23288 - pypy/dist/pypy/lib/logic/computation_space

auc at codespeak.net auc at codespeak.net
Mon Feb 13 18:21:48 CET 2006


Author: auc
Date: Mon Feb 13 18:21:46 2006
New Revision: 23288

Modified:
   pypy/dist/pypy/lib/logic/computation_space/test_variable.py
   pypy/dist/pypy/lib/logic/computation_space/variable.py
Log:
* fix tests with multiple stream readers
* test demand-driven list consumption


Modified: pypy/dist/pypy/lib/logic/computation_space/test_variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/test_variable.py	(original)
+++ pypy/dist/pypy/lib/logic/computation_space/test_variable.py	Mon Feb 13 18:21:46 2006
@@ -97,6 +97,8 @@
         assert s.__str__() == '1|...'
         assert s.length() == 2
 
+    #-- concurrent streams and lists ----------------
+
     def test_producer_consummer_stream(self):
         sp = space.ComputationSpace(dummy_problem)
         import time
@@ -161,43 +163,126 @@
 
         assert woman.result == 6
                 
-    def test_multiple_readers_list(self):
+    def test_multiple_readers_eager_list(self):
+        """the generator controls the flow"""
         sp = space.ComputationSpace(dummy_problem)
+
+        class EOL: pass
         
-        def generate(thread, L, N):
-            n=N.get()
-            assert 0 < n < 32768
-            l = v.Pair(0, None)
-            L.bind(l)
-            for i in range(1,n):
-                l.set_rest(v.Pair(i, None))
-                l = l.rest()
-            l.set_rest(v.NoValue)
-
-        def reduc(thread, L, fun):
-            l=L.get()
-            thread.result = 0
-            while l != v.NoValue:
-                val = l.first()
-                thread.result = fun(thread.result, val)
-                l = l.rest()
+        def generate(thread, Xs, n, limit):
+            """declare
+            fun {Generate N Limit}
+               if N<Limit then
+                  N|{Generate N+1 Limit}
+               else nil end
+            end"""
+            if n<limit:
+                sp = space.ComputationSpace(dummy_problem)
+                Xr = sp.var('Xr')
+                Xs.bind(v.CList(n, Xr))
+                generate(thread, Xr, n+1, limit)
+            else:
+                Xs.bind(EOL)
+                
+        def reduc(thread, Xs, a, fun):
+            """declare
+            fun {Sum Xs A}
+                case Xs
+                    of X|Xr then {Sum Xr A+X}
+                    [] nil then A
+                    else {Sum Xs A}
+                end
+            end"""
+            X_Xr = Xs.get()
+            if X_Xr == EOL:
+                thread.result = a
+                return
+            Xr = X_Xr.rest()
+            reduc(thread, Xr, fun(a, X_Xr.first()), fun)
             
-        L = sp.var('L')
-        N = sp.var('N')
+        Xs = sp.var('L')
 
-        r1 = FunThread(reduc, L, operator.add)
-        r2 = FunThread(reduc, L, operator.add)
-        r3 = FunThread(reduc, L, operator.add)
-        generator = FunThread(generate, L, N)
+        r1 = FunThread(reduc, Xs, 0, operator.add)
+        r2 = FunThread(reduc, Xs, 0, operator.add)
+        r3 = FunThread(reduc, Xs, 0, operator.add)
+        generator = FunThread(generate, Xs, 0, 42)
 
         r1.start()
         r2.start()
         r3.start()
         generator.start()
 
-        N.bind(42)
-
         generator.join()
         for r in (r1, r2, r3):
             r.join()
             assert r.result == 861
+
+    def test_lazy_list(self):
+        """the reader controls the flow"""
+        sp = space.ComputationSpace(dummy_problem)
+
+        def newspace():
+            return space.ComputationSpace(dummy_problem)
+
+        def dgenerate(thread, n, Xs):
+            """declare
+            proc {DGenerate N Xs}
+                case Xs of X|Xr then
+                   X=N
+                   {DGenerate N+1 Xr}
+                end
+            end"""
+            # new local space
+            sp = newspace()
+            # go ahead ...
+            print "GENERATOR waits on Xs"
+            X_Xr = Xs.get()      # destructure Xs
+            if X_Xr == None: return
+            X = X_Xr.first()     # ... into X
+            X.bind(n)            # bind X to n
+            print "GENERATOR binds X to", n
+            Xr = X_Xr.rest()     # ... and Xr
+            dgenerate(thread, n+1, Xr)
+
+        def dsum(thread, Xs, a, limit):
+            """declare
+            fun {DSum ?Xs A Limit}
+               if Limit>0 then
+                  X|Xr=Xs
+               in
+                  {DSum Xr A+X Limit-1}
+               else A end
+            end"""
+            if limit > 0:
+                sp = newspace()
+                # fill Xs with an empty pair
+                X = sp.var('X')
+                Xr = sp.var('Xr')
+                print "CLIENT binds Xs to X|Xr"
+                Xs.bind(v.Pair(X, Xr))
+                x = X.get() # wait on the value of X
+                print "CLIENT got", x
+                dsum(thread, Xr, a+x, limit-1)
+            else:
+                print "CLIENT binds Xs to None and exits"
+                Xs.bind(None)
+                thread.result = a
+
+        def run_test(t1, t2):
+            """
+            local Xs S in
+              thread {DGenerate 0 Xs} end
+              thread S={DSum Xs 0 15} end
+              {Browse S}
+            end"""
+            t1.start()
+            t2.start()
+            t1.join()
+            t2.join()
+
+        Xs = sp.var('Xs')
+        generator = FunThread(dgenerate, 0, Xs)
+        summer = FunThread(dsum, Xs, 0, 15)
+
+        run_test(generator, summer)
+        assert summer.result == 105

Modified: pypy/dist/pypy/lib/logic/computation_space/variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/variable.py	(original)
+++ pypy/dist/pypy/lib/logic/computation_space/variable.py	Mon Feb 13 18:21:46 2006
@@ -209,8 +209,8 @@
 class CList(Pair):
     """A List supporting concurrent access"""
 
-    def __init__(self, *args):
-        Pair.__init__(*args)
+    def __init__(self, car, cdr):
+        Pair.__init__(self, car, cdr)
         self.last_condition = threading.Condition()
 
     def set_rest(self, rest):



More information about the Pypy-commit mailing list