[pypy-svn] r30287 - in pypy/dist/pypy/objspace: . test

auc at codespeak.net auc at codespeak.net
Thu Jul 20 19:21:16 CEST 2006


Author: auc
Date: Thu Jul 20 19:21:13 2006
New Revision: 30287

Modified:
   pypy/dist/pypy/objspace/logic.py
   pypy/dist/pypy/objspace/test/test_logicobjspace.py
Log:
some bits of exception propagation

Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py	(original)
+++ pypy/dist/pypy/objspace/logic.py	Thu Jul 20 19:21:13 2006
@@ -12,18 +12,22 @@
 from pypy.objspace.std.stringobject import W_StringObject
 from pypy.objspace.std.model import StdObjSpaceMultiMethod
 
+#
+from pypy.interpreter.error import OperationError
+
 # misc
 import os
 
+NO_DEBUG_INFO = [False]
 def w(*msgs):
     """writeln"""
-    if we_are_translated(): return
+    if NO_DEBUG_INFO[0]: return
     v(*msgs)
     os.write(1, ' \n')
 
 def v(*msgs):
     """write"""
-    if we_are_translated(): return
+    if NO_DEBUG_INFO[0]: return
     for msg in msgs:
         os.write(1, msg)
         os.write(1, ' ')
@@ -121,6 +125,7 @@
             SETNEXT(thread, r)
 
     def remove_thread(self, thread):
+        #XXX don't we need to notify the consumers ?
         w(".. REMOVING", str(id(thread)))
         assert thread not in self._blocked
         l = thread.prev
@@ -145,7 +150,6 @@
         to_be_run = self._head
         sentinel = to_be_run
         current = ClonableCoroutine.w_getcurrent(self.space)
-        print type(to_be_run)
         while (to_be_run in self._blocked) \
                   or (to_be_run == current) \
                   or to_be_run.is_dead():
@@ -162,9 +166,10 @@
         return to_be_run
 
     def __len__(self):
+        "count of known threads (including dead ones)"
         curr = self._head
         sentinel = curr
-        count = 1
+        count = 1 # there is always a main thread
         while curr.next != sentinel:
             curr = curr.next
             count += 1
@@ -172,12 +177,11 @@
 
     def display_head(self):
         curr = self._head
-        v("HEAD : [prev, curr, next]")
-        v(str([id(self._head.prev), id(self._head), id(self._head.next)]))
+        v('Threads : [', '-'.join([str(id(curr)), str(curr in self._blocked)]))
         while curr.next != self._head:
             curr = curr.next
-            v(str([id(curr.prev), id(curr), id(curr.next)]))
-        w()
+            v('-'.join([str(id(curr)), str(curr in self._blocked)]))
+        w(']')
 
     def add_new_thread(self, thread):
         "insert 'thread' at end of running queue"
@@ -244,9 +248,13 @@
             try:
                 _AppThunk.call(self)
             except Exception, exc:
-                print "EXCEPTION in call", exc
-                bind(self.space, self.w_Result, exc)
+                w(".. exceptional EXIT of", str(id(self._coro)), "with", str(exc))
+                bind(self.space, self.w_Result, W_FailedValue(exc))
+                self._coro._dead = True
             else:
+                w(".. clean EXIT of", str(id(self._coro)),
+                  "-- setting future result to",
+                  str(self.costate.w_tempval))
                 unify(self.space, self.w_Result, self.costate.w_tempval)
         finally:
             scheduler[0].remove_thread(self._coro)
@@ -254,6 +262,7 @@
 
 def future(space, w_callable, __args__):
     """returns a future result"""
+    v(".. FUTURE")
     args = __args__.normalize()
     w_Future = W_Var()
     # coro init
@@ -263,9 +272,10 @@
     # feed the coro
     thunk = FutureThunk(space, w_callable, args, w_Future, coro)
     coro.bind(thunk)
+    w(str(id(coro)))
     scheduler[0].add_new_thread(coro)
     # XXX we should think about a way to make it read-only for the client
-    #     aka true futures
+    #     (i.e the originator), aka true futures
     return w_Future
 app_future = gateway.interp2app(future, unwrap_spec=[baseobjspace.ObjSpace,
                                                      baseobjspace.W_Root,
@@ -276,17 +286,34 @@
 
 def sched_stats(space):
     sched = scheduler[0]
-    ret = space.newdict([])
-    space.setitem(ret, space.wrap('switches'), space.wrap(sched._switch_count))
-    space.setitem(ret, space.wrap('threads'), space.wrap(len(sched)))
-    space.setitem(ret, space.wrap('blocked'), space.wrap(len(sched._blocked)))
-    space.setitem(ret, space.wrap('blocked_on'), space.wrap(len(sched._blocked_on)))
-    space.setitem(ret, space.wrap('blocked_byneed'), space.wrap(len(sched._blocked_byneed)))
-    return ret
+    w_ret = space.newdict([])
+    space.setitem(w_ret, space.wrap('switches'), space.wrap(sched._switch_count))
+    space.setitem(w_ret, space.wrap('threads'), space.wrap(len(sched)))
+    space.setitem(w_ret, space.wrap('blocked'), space.wrap(len(sched._blocked)))
+    space.setitem(w_ret, space.wrap('blocked_on'), space.wrap(len(sched._blocked_on)))
+    space.setitem(w_ret, space.wrap('blocked_byneed'), space.wrap(len(sched._blocked_byneed)))
+    return w_ret
 app_sched_stats = gateway.interp2app(sched_stats)
 
 #-- VARIABLE ---------------------
 
+#-- Exceptions -------
+
+## what we can know:
+
+##     location of new var
+##     transmission of vars to threads
+
+##     => possibility to propagate exceptions amongst all threads that share a var
+##     (kind of process linking in Erlang, where giving var => do link)
+
+## what is sane ?
+
+##      reraising FailedValues (case of futures)
+##      XXX
+
+#-- /Exceptions ------
+
 class W_Var(W_Root, object):
     def __init__(w_self):
         w_self.w_bound_to = w_self 
@@ -304,6 +331,10 @@
 app_newvar = gateway.interp2app(newvar)
 
 
+class W_FailedValue(W_Root, object):
+    def __init__(w_self, exc):
+        w_self.exc = exc
+
 def wait__Root(space, w_obj):
     return w_obj
 
@@ -313,7 +344,12 @@
         scheduler[0].unblock_byneed_on(space, w_var)
         scheduler[0].add_to_blocked_on(w_var, ClonableCoroutine.w_getcurrent(space))
         scheduler[0].schedule()
-    return w_var.w_bound_to
+    assert space.is_true(space.is_bound(w_var))
+    w_ret = w_var.w_bound_to
+    if isinstance(w_ret, W_FailedValue):
+        w(".. reraising Failed Value")
+        raise w_ret.exc
+    return w_ret
 
 def wait(space, w_obj):
     assert isinstance(w_obj, W_Root)
@@ -457,6 +493,10 @@
     return space.newint(id(w_obj))
 app_interp_id = gateway.interp2app(interp_id)
 
+def switch_debug_info(space):
+    NO_DEBUG_INFO[0] = not NO_DEBUG_INFO[0]
+app_switch_debug_info = gateway.interp2app(switch_debug_info)
+
 #-- BIND -----------------------------
 
 def bind(space, w_var, w_obj):
@@ -466,12 +506,8 @@
     """
     v(" :bind")
     assert isinstance(w_var, W_Var)
-    try:
-        assert isinstance(w_obj, W_Root)
-    except: # we've got an interpreter exception, probably
-        _assign_exception(space, w_var, w_obj)
-    else:
-        space.bind(w_var, w_obj)
+    assert isinstance(w_obj, W_Root)
+    space.bind(w_var, w_obj)
 app_bind = gateway.interp2app(bind)
 
 def bind__Var_Var(space, w_v1, w_v2):
@@ -489,25 +525,6 @@
     else: # 1. both are unbound
         return _alias(space, w_v1, w_v2)
 
-#XXX 
-def _assign_exception(space, w_var, w_exc):
-    w()
-    w("  :assign an exception")
-    assert isinstance(w_var, W_Var)
-    w_curr = w_var
-    while 1:
-        w_next = w_curr.w_bound_to
-        w_curr.w_bound_to = w_exc
-        # notify the blocked threads
-        scheduler[0].unblock_on(w_curr)
-        if space.is_true(space.is_nb_(w_next, w_var)):
-            break
-        # switch to next
-        w_curr = w_next
-    w("  :assigned")
-    return space.w_None
-
-
 def bind__Var_Root(space, w_var, w_obj):
     w("var val", str(id(w_var)))
     # 3. var and value
@@ -583,7 +600,7 @@
 def unify(space, w_x, w_y):
     assert isinstance(w_x, W_Root)
     assert isinstance(w_y, W_Root)
-    #print ":unify ", id(w_x), id(w_y)
+    w(":unify ", str(id(w_x)), str(id(w_y)))
     return space.unify(w_x, w_y)
 app_unify = gateway.interp2app(unify)
 
@@ -609,7 +626,7 @@
         return bind(space, w_x, w_y) 
     
 def unify__Var_Root(space, w_x, w_y):
-    w(" :unify var val", str(id(w_x)))
+    w(" :unify var val", str(id(w_x)), str(w_y))
     if space.is_true(space.is_bound(w_x)):
         return space.unify(deref(space, w_x), w_y)            
     return bind(space, w_x, w_y)
@@ -895,7 +912,9 @@
     #-- misc -----
     space.setitem(space.builtin.w_dict, space.wrap('interp_id'),
                   space.wrap(app_interp_id))
-    
+    space.setitem(space.builtin.w_dict, space.wrap('switch_debug_info'),
+                  space.wrap(app_switch_debug_info))
+
     #-- path to the applevel modules --
     import pypy.objspace.constraint
     import os

Modified: pypy/dist/pypy/objspace/test/test_logicobjspace.py
==============================================================================
--- pypy/dist/pypy/objspace/test/test_logicobjspace.py	(original)
+++ pypy/dist/pypy/objspace/test/test_logicobjspace.py	Thu Jul 20 19:21:13 2006
@@ -211,8 +211,7 @@
         bind(X, 42)
         assert Y == 43
 
-    def test_future_exception(self):
-        skip('wait until we have working exception propagation')
+    def test_one_future_exception(self):
         class FooException(Exception): pass
         
         def poop(X):
@@ -359,3 +358,24 @@
         assert X == Y == 42
         assert o == 2
         
+    def test_fib(self):
+        skip("recursion limits breakage")
+        def fib(X):
+            if X<2:
+                return 1
+            else:
+                return future(fib, X-1) + fib(X-2)
+
+        X = newvar()
+        F = future(fib, X)
+        unify(11, X)
+        assert F == 144
+
+        X = newvar()
+        F = future(fib, X)
+
+        try:
+            unify(50, X)
+            print F
+        except Exception, e:
+            print e



More information about the Pypy-commit mailing list