[pypy-svn] r23176 - pypy/dist/pypy/lib/logic/computation_space
auc at codespeak.net
auc at codespeak.net
Thu Feb 9 14:55:26 CET 2006
Author: auc
Date: Thu Feb 9 14:55:22 2006
New Revision: 23176
Modified:
pypy/dist/pypy/lib/logic/computation_space/test_variable.py
pypy/dist/pypy/lib/logic/computation_space/variable.py
Log:
rewritten stream stuff
Modified: pypy/dist/pypy/lib/logic/computation_space/test_variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/test_variable.py (original)
+++ pypy/dist/pypy/lib/logic/computation_space/test_variable.py Thu Feb 9 14:55:22 2006
@@ -86,92 +86,79 @@
for i in range(10):
assert vars_[i].val == str(i)
-## def test_basic_producer_consummer_sream(self):
-## # this one is quite silly
-## sp = space.ComputationSpace(dummy_problem)
-
-## def generate(thread, var, n, limit):
-## s = var.get()
-## while n<limit:
-## print n
-## s.put(n)
-## n += 1
-## s.put(None)
-
-## def reduc(thread, var, fun):
-## stream = var.get()
-## val = stream.get()
-## while (val != None):
-## print val
-## thread.result = fun(thread.result, val)
-## val = stream.get()
+ def test_basic_list(self):
+ s = v.make_list([1, 2, 3])
+ assert s.__str__() == '1|2|3'
+ assert s.length() == 3
+ s.rest().rest().set_rest(s)
+ assert s.length() == 4
+ assert s.__str__() == '1|2|3|...'
+ s. set_rest(s)
+ assert s.__str__() == '1|...'
+ assert s.length() == 2
+
+ def test_producer_consummer_stream(self):
+ sp = space.ComputationSpace(dummy_problem)
+ import time
-## s = sp.var('s')
-## s.bind(v.Stream())
+ def generate(thread, var, n, limit):
+ s = var.get()
+ while n<limit:
+ s.put(limit-n)
+ n += 1
+ s.put(v.NoValue)
-## generator = FunThread(generate, s, 1, 5)
-## reductor = FunThread(reduc, s, operator.mul)
-## reductor.result = 2
-
-## generator.start()
-## reductor.start()
-## generator.join()
-## reductor.join()
+ def reduc(thread, var, fun):
+ stream = var.get()
+ val = stream.get()
+ while (val != v.NoValue):
+ thread.result = fun(thread.result, val)
+ val = stream.get()
-## print reductor.result
-## assert 0
+ s = sp.var('s')
+ s.bind(v.Stream())
+
+ generator = FunThread(generate, s, 1, 10)
+ reductor = FunThread(reduc, s, operator.mul)
+ reductor.result = 2
+
+ generator.start()
+ reductor.start()
+ generator.join()
+ reductor.join()
+
+ assert reductor.result == 725760
def test_daisychain_stream(self):
# chained stupidity
sp = space.ComputationSpace(dummy_problem)
- s1 = sp.var('s1')
- s2 = sp.var('s2')
-
- stream1 = v.Stream(stuff=[1, 2, 3, s2])
- stream2 = v.Stream(stuff=[4, 5, 6, None])
-
- s1.bind(stream1)
- s2.bind(stream2)
-
- def woman_in_chains(thread, stream_variable):
- stream = stream_variable.get()
+ def woman_in_chains(thread, S):
+ stream = S.get()
+ assert isinstance(stream, v.Stream)
val = stream.get()
- while val != None:
+ while val != v.NoValue:
+ print val
thread.result = val
val = stream.get()
if isinstance(val, v.Var):
stream = val.get()
val = stream.get()
+ s1 = sp.var('s1')
+ s2 = sp.var('s2')
+ stream1 = v.Stream(v.make_list([1, 2, 3, s2]))
+ stream2 = v.Stream(v.make_list([4, 5, 6, v.NoValue]))
+ assert str(stream1) == '1|2|3|s2'
+ assert str(stream2) == '4|5|6|variable.NoValue'
+
woman = FunThread(woman_in_chains, s1)
woman.start()
+
+ s1.bind(stream1)
+ s2.bind(stream2)
+
woman.join()
assert woman.result == 6
- def test_cyclicproducer_consummer_sream(self):
- # infinite sillyness
- sp = space.ComputationSpace(dummy_problem)
-
- circular = sp.var('circular')
- s = v.Stream(stuff=[0, 1, 2, circular])
- circular.bind(s)
-
- def touch10(thread, stream_variable):
- stream = stream_variable.get()
- val = None
- for i in range(10):
- val = stream.get()
- if isinstance(val, v.Var):
- # at stream tail is a var
- stream = val.get()
- val = stream.get()
- assert i % 3 == val
- thread.result = val
-
- toucher = FunThread(touch10, circular)
- toucher.start()
- toucher.join()
-
- assert toucher.result == 0
Modified: pypy/dist/pypy/lib/logic/computation_space/variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/variable.py (original)
+++ pypy/dist/pypy/lib/logic/computation_space/variable.py Thu Feb 9 14:55:22 2006
@@ -37,17 +37,17 @@
raise AlreadyInStore(name)
self.name = name
# the creation-time (top-level) space
- self.cs = cs
+ self._cs = cs
# top-level 'commited' binding
self._val = NoValue
# domains in multiple spaces
self._doms = {cs : FiniteDomain([])}
- # when updated in a 'transaction', keep track
- # of our initial value (for abort cases)
- self.previous = None
- self.changed = False
+ # when updated while unification happens, keep track
+ # of our initial value (for failure cases)
+ self._previous = None
+ self._changed = False
# a condition variable for concurrent access
- self.value_condition = threading.Condition()
+ self._value_condition = threading.Condition()
# for consumption by the global cs
@@ -58,22 +58,22 @@
# atomic unification support
def _commit(self):
- self.changed = False
+ self._changed = False
def _abort(self):
- self.val = self.previous
- self.changed = False
+ self.val = self._previous
+ self._changed = False
# value accessors
def _set_val(self, val):
- self.value_condition.acquire()
- if self.cs.in_transaction:
- if not self.changed:
- self.previous = self._val
- self.changed = True
+ self._value_condition.acquire()
+ if self._cs.in_transaction:
+ if not self._changed:
+ self._previous = self._val
+ self._changed = True
self._val = val
- self.value_condition.notifyAll()
- self.value_condition.release()
+ self._value_condition.notifyAll()
+ self._value_condition.release()
def _get_val(self):
return self._val
@@ -96,7 +96,7 @@
def bind(self, val):
"""top-level space bind"""
- self.cs.bind(self, val)
+ self._cs.bind(self, val)
is_bound = _is_bound
@@ -117,46 +117,150 @@
being bound in the top-level space
"""
try:
- self.value_condition.acquire()
+ self._value_condition.acquire()
while not self._is_bound():
- self.value_condition.wait()
+ self._value_condition.wait()
return self.val
finally:
- self.value_condition.release()
+ self._value_condition.release()
#-- stream stuff -----------------------------
-from Queue import Queue
+class Pair(object):
+ """similar to CONS in Lisp"""
-class StreamUserBug(Exception):
- pass
+ def __init__(self, car, cdr):
+ self._car = car
+ self._cdr = cdr
+
+ def first(self):
+ return self._car
+
+ def rest(self):
+ return self._cdr
+
+ def set_rest(self, stuff):
+ self._cdr = stuff
+
+ def is_empty(self):
+ return self._car is None and self._cdr is None
+
+ def length(self):
+ ln = 0
+ curr = self
+ if curr.first() != None:
+ ln += 1
+ while curr.rest() != None:
+ curr = curr.rest()
+ if curr.first() != None:
+ ln += 1
+ # check for circularity
+ if curr == self: return ln
+ return ln
-class Stream(Queue):
- """a stream is potentially unbounded list
- of messages, i.e a list whose tail is
- an unbound dataflow variable
- """
-
- def __init__(self, size=5, stuff=None):
- self.elts = stuff
- self.idx = 0
- Queue.__init__(self, size)
+ def __str__(self):
+ # This will show bogus stuff for trees ...
+ seen = set()
+ strs = []
+
+ def build_elt_str(elt):
+ if elt in seen:
+ strs.pop() ; strs.pop()
+ # show ellipsis when recursing
+ strs.append('...')
+ elif isinstance(elt, Pair):
+ seen.add(elt)
+ build_pair_str(elt)
+ else:
+ if elt is None:
+ strs.pop()
+ elif isinstance(elt, Var):
+ strs.append(elt.name)
+ else:
+ strs.append(str(elt))
+
+ def build_pair_str(pair):
+ build_elt_str(pair._car)
+ strs.append('|')
+ build_elt_str(pair._cdr)
+
+ if self._car is None:
+ return 'nil'
+ build_pair_str(self)
+ return ''.join(strs)
+
+def make_list(data=None):
+ """Builds a list with pairs"""
+ assert (data is None) \
+ or type(data) in (list, tuple, set)
+ if data is None:
+ return Pair(None, None)
+ curr = Pair(data[0], None)
+ head = curr
+ for datum in data[1:]:
+ curr.set_rest(Pair(datum, None))
+ curr = curr.rest()
+ return head
+
+class Stream(object):
+ """A FIFO stream"""
+
+ def __init__(self, elts=Pair(None, None)):
+ self.head = elts
+ if elts.first() == None:
+ self.tail = elts
+ else:
+ curr = elts.rest()
+ prev = elts
+ while isinstance(curr, Pair):
+ prev = curr
+ curr = curr.rest()
+ # last pair of the chain
+ self.tail = prev
+ # head hurts tail sometimes ...
+ self.empty_condition = threading.Condition()
def get(self):
- if self.elts is None:
- Queue.get(self)
- else:
- try:
- v = self.elts[self.idx]
- self.idx += 1
- return v
- except IndexError:
- self.idx = 0
- return self.get()
-
- def put(self, elt):
- if self.elts is None:
- Queue.put(self, elt)
- else:
- raise NoImplemented
+ print self.head
+ # first thing to check is whether
+ # there is stuff to feed
+ self.empty_condition.acquire()
+ try:
+ if self.head == self.tail:
+ # there might remain one element there
+ while self.head.is_empty():
+ self.empty_condition.wait()
+ # sky is clear : there is something to get
+ elt = self.head.first()
+ # we might want to advance to the next pair
+ if self.head != self.tail:
+ self.head = self.head.rest()
+ else:
+ # or just nullify what we read
+ # to avoid reading it again ...
+ self.head._car = None
+ finally:
+ self.empty_condition.release()
+ return elt
+
+ def put(self, val):
+ # first, check for emptyness special case
+ self.empty_condition.acquire()
+ try:
+ if self.head.is_empty():
+ # then we put stuff into head
+ # without consing and just return
+ self.head._car = val
+ self.empty_condition.notifyAll()
+ return
+ finally:
+ self.empty_condition.release()
+ # either we did put and return
+ # or nothing done yet
+ new_tail = Pair(val, None)
+ self.tail.set_rest(new_tail)
+ self.tail = new_tail
+
+ def __str__(self):
+ return str(self.head)
More information about the Pypy-commit
mailing list