[pypy-svn] r23176 - pypy/dist/pypy/lib/logic/computation_space

auc at codespeak.net auc at codespeak.net
Thu Feb 9 14:55:26 CET 2006


Author: auc
Date: Thu Feb  9 14:55:22 2006
New Revision: 23176

Modified:
   pypy/dist/pypy/lib/logic/computation_space/test_variable.py
   pypy/dist/pypy/lib/logic/computation_space/variable.py
Log:
rewritten stream stuff


Modified: pypy/dist/pypy/lib/logic/computation_space/test_variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/test_variable.py	(original)
+++ pypy/dist/pypy/lib/logic/computation_space/test_variable.py	Thu Feb  9 14:55:22 2006
@@ -86,92 +86,79 @@
         for i in range(10):
             assert vars_[i].val == str(i)
 
-##     def test_basic_producer_consummer_sream(self):
-##         # this one is quite silly
-##         sp = space.ComputationSpace(dummy_problem)
-
-##         def generate(thread, var, n, limit):
-##             s = var.get()
-##             while n<limit:
-##                 print n
-##                 s.put(n)
-##                 n += 1
-##             s.put(None)
-        
-##         def reduc(thread, var, fun):
-##             stream = var.get()
-##             val = stream.get()
-##             while (val != None):
-##                 print val
-##                 thread.result = fun(thread.result, val)
-##                 val = stream.get()
+    def test_basic_list(self):
+        s = v.make_list([1, 2, 3])
+        assert s.__str__() == '1|2|3'
+        assert s.length() == 3
+        s.rest().rest().set_rest(s)
+        assert s.length() == 4
+        assert s.__str__() == '1|2|3|...'
+        s. set_rest(s)
+        assert s.__str__() == '1|...'
+        assert s.length() == 2
+
+    def test_producer_consummer_stream(self):
+        sp = space.ComputationSpace(dummy_problem)
+        import time
 
-##         s = sp.var('s')
-##         s.bind(v.Stream())
+        def generate(thread, var, n, limit):
+            s = var.get()
+            while n<limit:
+                s.put(limit-n)
+                n += 1
+            s.put(v.NoValue)
         
-##         generator = FunThread(generate, s, 1, 5)
-##         reductor = FunThread(reduc, s, operator.mul)
-##         reductor.result = 2
-
-##         generator.start()
-##         reductor.start()
-##         generator.join()
-##         reductor.join()
+        def reduc(thread, var, fun):
+            stream = var.get()
+            val = stream.get()
+            while (val != v.NoValue):
+                thread.result = fun(thread.result, val)
+                val = stream.get()
 
-##         print  reductor.result
-##         assert 0
+        s = sp.var('s')
+        s.bind(v.Stream())
+        
+        generator = FunThread(generate, s, 1, 10)
+        reductor = FunThread(reduc, s, operator.mul)
+        reductor.result = 2
+
+        generator.start()
+        reductor.start()
+        generator.join()
+        reductor.join()
+        
+        assert reductor.result == 725760
 
     def test_daisychain_stream(self):
         # chained stupidity
         sp = space.ComputationSpace(dummy_problem)
 
-        s1 = sp.var('s1')
-        s2 = sp.var('s2')
-
-        stream1 = v.Stream(stuff=[1, 2, 3, s2])
-        stream2 = v.Stream(stuff=[4, 5, 6, None])
-
-        s1.bind(stream1)
-        s2.bind(stream2)
-
-        def woman_in_chains(thread, stream_variable):
-            stream = stream_variable.get()
+        def woman_in_chains(thread, S):
+            stream = S.get()
+            assert isinstance(stream, v.Stream)
             val = stream.get()
-            while val != None:
+            while val != v.NoValue:
+                print val
                 thread.result = val
                 val = stream.get()
                 if isinstance(val, v.Var):
                     stream = val.get()
                     val = stream.get()
 
+        s1 = sp.var('s1')
+        s2 = sp.var('s2')
+        stream1 = v.Stream(v.make_list([1, 2, 3, s2]))
+        stream2 = v.Stream(v.make_list([4, 5, 6, v.NoValue]))
+        assert str(stream1) == '1|2|3|s2'
+        assert str(stream2) == '4|5|6|variable.NoValue'
+        
         woman = FunThread(woman_in_chains, s1)
         woman.start()
+
+        s1.bind(stream1)
+        s2.bind(stream2)
+
         woman.join()
 
         assert woman.result == 6
                 
-    def test_cyclicproducer_consummer_sream(self):
-        # infinite sillyness
-        sp = space.ComputationSpace(dummy_problem)
-
-        circular = sp.var('circular')
-        s = v.Stream(stuff=[0, 1, 2, circular])
-        circular.bind(s)
-
-        def touch10(thread, stream_variable):
-            stream = stream_variable.get()
-            val = None
-            for i in range(10):
-                val = stream.get()
-                if isinstance(val, v.Var):
-                    # at stream tail is a var
-                    stream = val.get()
-                    val = stream.get()
-                assert i % 3 == val 
-            thread.result = val
-
-        toucher = FunThread(touch10, circular)
-        toucher.start()
-        toucher.join()
-
-        assert toucher.result == 0

Modified: pypy/dist/pypy/lib/logic/computation_space/variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/variable.py	(original)
+++ pypy/dist/pypy/lib/logic/computation_space/variable.py	Thu Feb  9 14:55:22 2006
@@ -37,17 +37,17 @@
             raise AlreadyInStore(name)
         self.name = name
         # the creation-time (top-level) space
-        self.cs = cs
+        self._cs = cs
         # top-level 'commited' binding
         self._val = NoValue
         # domains in multiple spaces
         self._doms = {cs : FiniteDomain([])}
-        # when updated in a 'transaction', keep track
-        # of our initial value (for abort cases)
-        self.previous = None
-        self.changed = False
+        # when updated while unification happens, keep track
+        # of our initial value (for failure cases)
+        self._previous = None
+        self._changed = False
         # a condition variable for concurrent access
-        self.value_condition = threading.Condition()
+        self._value_condition = threading.Condition()
 
     # for consumption by the global cs
 
@@ -58,22 +58,22 @@
     # atomic unification support
 
     def _commit(self):
-        self.changed = False
+        self._changed = False
 
     def _abort(self):
-        self.val = self.previous
-        self.changed = False
+        self.val = self._previous
+        self._changed = False
 
     # value accessors
     def _set_val(self, val):
-        self.value_condition.acquire()
-        if self.cs.in_transaction:
-            if not self.changed:
-                self.previous = self._val
-                self.changed = True
+        self._value_condition.acquire()
+        if self._cs.in_transaction:
+            if not self._changed:
+                self._previous = self._val
+                self._changed = True
         self._val = val
-        self.value_condition.notifyAll()
-        self.value_condition.release()
+        self._value_condition.notifyAll()
+        self._value_condition.release()
         
     def _get_val(self):
         return self._val
@@ -96,7 +96,7 @@
 
     def bind(self, val):
         """top-level space bind"""
-        self.cs.bind(self, val)
+        self._cs.bind(self, val)
 
     is_bound = _is_bound
 
@@ -117,46 +117,150 @@
            being bound in the top-level space
         """
         try:
-            self.value_condition.acquire()
+            self._value_condition.acquire()
             while not self._is_bound():
-                self.value_condition.wait()
+                self._value_condition.wait()
             return self.val
         finally:
-            self.value_condition.release()
+            self._value_condition.release()
 
 
 #-- stream stuff -----------------------------
 
-from Queue import Queue
+class Pair(object):
+    """similar to CONS in Lisp"""
 
-class StreamUserBug(Exception):
-    pass
+    def __init__(self, car, cdr):
+        self._car = car
+        self._cdr = cdr
+
+    def first(self):
+        return self._car
+
+    def rest(self):
+        return self._cdr
+
+    def set_rest(self, stuff):
+        self._cdr = stuff
+
+    def is_empty(self):
+        return self._car is None and self._cdr is None
+
+    def length(self):
+        ln = 0
+        curr = self
+        if curr.first() != None:
+            ln += 1
+        while curr.rest() != None:
+            curr = curr.rest()
+            if curr.first() != None:
+                ln += 1
+            # check for circularity
+            if curr == self: return ln
+        return ln
 
-class Stream(Queue):
-    """a stream is potentially unbounded list
-       of messages, i.e a list whose tail is
-       an unbound dataflow variable
-    """
-
-    def __init__(self, size=5, stuff=None):
-        self.elts = stuff
-        self.idx = 0
-        Queue.__init__(self, size)
+    def __str__(self):
+        # This will show bogus stuff for trees ...
+        seen = set()
+        strs = []
+
+        def build_elt_str(elt):
+            if elt in seen:
+                strs.pop() ; strs.pop()
+                # show ellipsis when recursing
+                strs.append('...')
+            elif isinstance(elt, Pair):
+                seen.add(elt)
+                build_pair_str(elt)
+            else:
+                if elt is None:
+                    strs.pop()
+                elif isinstance(elt, Var):
+                    strs.append(elt.name)
+                else:
+                    strs.append(str(elt))
+
+        def build_pair_str(pair):
+            build_elt_str(pair._car)
+            strs.append('|')
+            build_elt_str(pair._cdr)
+
+        if self._car is None:
+            return 'nil'
+        build_pair_str(self)
+        return ''.join(strs)
+
+def make_list(data=None):
+    """Builds a list with pairs"""
+    assert (data is None) \
+           or type(data) in (list, tuple, set)
+    if data is None:
+        return Pair(None, None)
+    curr = Pair(data[0], None)
+    head = curr
+    for datum in data[1:]:
+        curr.set_rest(Pair(datum, None))
+        curr = curr.rest()
+    return head
+
+class Stream(object):
+    """A FIFO stream"""
+
+    def __init__(self, elts=Pair(None, None)):
+        self.head = elts
+        if elts.first() == None:
+            self.tail = elts
+        else:
+            curr = elts.rest()
+            prev = elts
+            while isinstance(curr, Pair):
+                prev = curr
+                curr = curr.rest()
+            # last pair of the chain
+            self.tail = prev
+        # head hurts tail sometimes ...
+        self.empty_condition = threading.Condition()
 
     def get(self):
-        if self.elts is None:
-            Queue.get(self)
-        else:
-            try:
-                v = self.elts[self.idx]
-                self.idx += 1
-                return v
-            except IndexError:
-                self.idx = 0
-                return self.get()
-
-    def put(self, elt):
-        if self.elts is None:
-            Queue.put(self, elt)
-        else:
-            raise NoImplemented
+        print self.head
+        # first thing to check is whether
+        # there is stuff to feed
+        self.empty_condition.acquire()
+        try:
+            if self.head == self.tail:
+                # there might remain one element there
+                while self.head.is_empty():
+                    self.empty_condition.wait()
+            # sky is clear : there is something to get
+            elt = self.head.first()
+            # we might want to advance to the next pair
+            if self.head != self.tail:
+                self.head = self.head.rest()
+            else:
+                # or just nullify what we read
+                # to avoid reading it again ...
+                self.head._car = None
+        finally:
+            self.empty_condition.release()
+        return elt
+
+    def put(self, val):
+        # first, check for emptyness special case
+        self.empty_condition.acquire()
+        try:
+            if self.head.is_empty():
+                # then we put stuff into head
+                # without consing and just return
+                self.head._car = val
+                self.empty_condition.notifyAll()
+                return
+        finally:
+            self.empty_condition.release()
+        # either we did put and return
+        # or nothing done yet
+        new_tail = Pair(val, None)
+        self.tail.set_rest(new_tail)
+        self.tail = new_tail
+
+    def __str__(self):
+        return str(self.head)



More information about the Pypy-commit mailing list