[pypy-svn] r30455 - in pypy/dist/pypy/objspace: . test

auc at codespeak.net auc at codespeak.net
Mon Jul 24 16:54:30 CEST 2006


Author: auc
Date: Mon Jul 24 16:54:28 2006
New Revision: 30455

Modified:
   pypy/dist/pypy/objspace/logic.py
   pypy/dist/pypy/objspace/test/test_logicobjspace.py
Log:
exceptions propagation to logic vars

Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py	(original)
+++ pypy/dist/pypy/objspace/logic.py	Mon Jul 24 16:54:28 2006
@@ -71,6 +71,7 @@
         self._init_head(self._main)
         self._init_blocked()
         self._switch_count = 0
+        self._traced = {}
         w (".. MAIN THREAD = ", str(id(self._main)))
 
     def _init_blocked(self):
@@ -119,6 +120,7 @@
         #XXX don't we need to notify the consumers ?
         w(".. REMOVING", str(id(thread)))
         assert thread not in self._blocked
+        del self._traced[thread]
         l = thread.prev
         r = thread.next
         l.next = r
@@ -136,19 +138,28 @@
         w(".. SWITCHING", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
         self._switch_count += 1
         to_be_run.w_switch() 
+
+    def schedule_or_pass(self):
+        to_be_run = self._select_next(dont_pass=False)
+        curr = ClonableCoroutine.w_getcurrent(self.space)
+        if to_be_run == curr:
+            w(".. PASS")
+            return
+        w(".. SWITCHING", str(id(curr)), "=>", str(id(to_be_run)))
+        self._switch_count += 1
+        to_be_run.w_switch() 
         
-    def _select_next(self):
+    def _select_next(self, dont_pass=True):
         to_be_run = self._head
         sentinel = to_be_run
         current = ClonableCoroutine.w_getcurrent(self.space)
         while (to_be_run in self._blocked) \
-                  or (to_be_run == current) \
-                  or to_be_run.is_dead():
+                  or to_be_run.is_dead() \
+                  or (dont_pass and (to_be_run == current)):
             to_be_run = to_be_run.next
             if to_be_run == sentinel:
                 self.display_head()
                 ## we RESET sched state so as to keep being usable beyond that
-                #  (for instance, allow other tests to be run)
                 self._init_head(self._main)
                 self._init_blocked()
                 w(".. SCHEDULER reinitialized")
@@ -214,18 +225,29 @@
         blocked.append(uthread)
         self._blocked[uthread] = True
 
-    def unblock_byneed_on(self, space, w_var):
+    def unblock_byneed_on(self, w_var):
         v(".. we UNBLOCK BYNEED dependants of var", str(id(w_var)))
         assert isinstance(w_var, W_Var)
         blocked = []
-        for w_alias in aliases(space, w_var):
+        for w_alias in aliases(self.space, w_var):
             if w_alias in self._blocked_byneed:
                 blocked += self._blocked_byneed[w_alias]
                 del self._blocked_byneed[w_alias]
-            w_alias.w_needed = True
+            w_alias.needed = True
         w(str([id(thr) for thr in blocked]))
         for thr in blocked: del self._blocked[thr]
 
+    def trace_vars(self, thread, lvars):
+        w(".. TRACING logic vars.", str(lvars), "for", str(id(thread)))
+        assert not self._traced.has_key(thread)
+        self._traced[thread] = lvars
+
+    def dirty_traced_vars(self, thread, failed_value):
+        w(".. DIRTYING traced vars")
+        for w_var in self._traced[thread]:
+            if self.space.is_true(self.space.is_free(w_var)):
+                bind(self.space, w_var, failed_value)
+
 scheduler = []
 
 class FutureThunk(_AppThunk):
@@ -235,12 +257,15 @@
         self._coro = coro
 
     def call(self):
+        scheduler[0].trace_vars(self._coro, logic_args(self.args.unpack()))
         try:
             try:
                 _AppThunk.call(self)
             except Exception, exc:
                 w(".. exceptional EXIT of", str(id(self._coro)), "with", str(exc))
-                bind(self.space, self.w_Result, W_FailedValue(exc))
+                failed_val = W_FailedValue(exc)
+                bind(self.space, self.w_Result, failed_val)
+                scheduler[0].dirty_traced_vars(self._coro, failed_val)
                 self._coro._dead = True
             else:
                 w(".. clean EXIT of", str(id(self._coro)),
@@ -251,16 +276,27 @@
             scheduler[0].remove_thread(self._coro)
             scheduler[0].schedule()
 
+
+def logic_args(args):
+    "returns logic vars found in unpacked normalized args"
+    pos = args[0]
+    kwa = args[1]
+    pos_l = [arg for arg in pos
+             if isinstance(arg, W_Var)]
+    kwa_l = [arg for arg in kwa.keys()
+             if isinstance(arg, W_Var)]
+    return pos_l + kwa_l
+
 def future(space, w_callable, __args__):
     """returns a future result"""
     v(".. THREAD")
     args = __args__.normalize()
-    w_Future = W_Future(space)
     # coro init
     coro = ClonableCoroutine(space)
     # prepare thread chaining, create missing slots
     coro.next = coro.prev = None
     # feed the coro
+    w_Future = W_Future(space)
     thunk = FutureThunk(space, w_callable, args, w_Future, coro)
     coro.bind(thunk)
     w(str(id(coro)))
@@ -272,8 +308,7 @@
                                                      baseobjspace.W_Root,
                                                      argument.Arguments])
     
-# need : getcurrent(), getmain(), 
-# wrapper for schedule() ?
+# need (applevel) : getcurrent(), getmain(), 
 
 def sched_stats(space):
     sched = scheduler[0]
@@ -286,29 +321,17 @@
     return w_ret
 app_sched_stats = gateway.interp2app(sched_stats)
 
-#-- VARIABLE ---------------------
-
-#-- Exceptions -------
-
-## what we can know:
-
-##     location of new var
-##     transmission of vars to threads
 
-##     => possibility to propagate exceptions amongst all threads that share a var
-##     (kind of process linking in Erlang, where giving var => do link)
+def schedule(space):
+    scheduler[0].schedule_or_pass()
+app_schedule = gateway.interp2app(schedule)
 
-## what is sane ?
-
-##      reraising FailedValues (case of futures)
-##      XXX
-
-#-- /Exceptions ------
+#-- VARIABLE ---------------------
 
 class W_Var(W_Root, object):
-    def __init__(w_self):
+    def __init__(w_self, space):
         w_self.w_bound_to = w_self 
-        w_self.w_needed = False    
+        w_self.needed = False
 
     def __repr__(w_self):
         if w_self.w_bound_to:
@@ -318,12 +341,14 @@
                                 prettyfy_id(str(id(w_self))))
 
 def newvar(space):
-    return W_Var()
+    w_v = W_Var(space)
+    w("VAR", str(id(w_v)))
+    return w_v
 app_newvar = gateway.interp2app(newvar)
 
 class W_Future(W_Var):
     def __init__(w_self, space):
-        W_Var.__init__(w_self)
+        W_Var.__init__(w_self, space)
         w_self.client = ClonableCoroutine.w_getcurrent(space)
 
 class W_FailedValue(W_Root, object):
@@ -341,7 +366,7 @@
 def wait__Var(space, w_var):
     w(":wait", str(id(ClonableCoroutine.w_getcurrent(space))))
     if space.is_true(space.is_free(w_var)):
-        scheduler[0].unblock_byneed_on(space, w_var)
+        scheduler[0].unblock_byneed_on(w_var)
         scheduler[0].add_to_blocked_on(w_var, ClonableCoroutine.w_getcurrent(space))
         scheduler[0].schedule()
     assert space.is_true(space.is_bound(w_var))
@@ -365,7 +390,7 @@
 def wait_needed__Var(space, w_var):
     #print " :needed", w_var
     if space.is_true(space.is_free(w_var)):
-        if w_var.w_needed:
+        if w_var.needed:
             return
         scheduler[0].add_to_blocked_byneed(w_var, ClonableCoroutine.w_getcurrent(space))
         scheduler[0].schedule()
@@ -528,7 +553,7 @@
     bind__Var_Root(space, w_fut, w_obj) # call-next-method ?
 
 def bind__Var_Var(space, w_v1, w_v2):
-    w("var var")
+    #w("var var")
     if space.is_true(space.is_bound(w_v1)):
         if space.is_true(space.is_bound(w_v2)):
             # we allow re-binding to same value, see 3.
@@ -928,6 +953,8 @@
                   space.wrap(app_wait_needed))
     space.setitem(space.builtin.w_dict, space.wrap('sched_stats'),
                   space.wrap(app_sched_stats))
+    space.setitem(space.builtin.w_dict, space.wrap('schedule'),
+                  space.wrap(app_schedule))
 
     #-- misc -----
     space.setitem(space.builtin.w_dict, space.wrap('interp_id'),

Modified: pypy/dist/pypy/objspace/test/test_logicobjspace.py
==============================================================================
--- pypy/dist/pypy/objspace/test/test_logicobjspace.py	(original)
+++ pypy/dist/pypy/objspace/test/test_logicobjspace.py	Mon Jul 24 16:54:28 2006
@@ -247,7 +247,7 @@
             return
         assert False
 
-    def test_exceptions_harder(self):
+    def test_exception_in_chain(self):
         class FooException(Exception): pass
 
         def raise_foo():
@@ -270,6 +270,36 @@
             return
         assert False
 
+    def test_exception_in_group(self):
+        class FooException(Exception): pass
+        from operator import add
+
+        def loop_or_raise(Canary, crit, Bomb_signal):
+            "Canary will be untouched there ..."
+            while(1):
+                if is_bound(Bomb_signal):
+                    if Bomb_signal == crit:
+                        raise FooException
+                    else: # but returned
+                        return Canary
+                schedule()
+            return 42
+
+        B, C = newvar(), newvar()
+        T = future(loop_or_raise, C, 'foo', B)
+        U = future(loop_or_raise, C, 'bar', B)
+
+        unify(B, 'foo')
+        try:
+            wait(T)
+        except FooException:
+            try:
+                # and contamined
+                wait(U)
+            except FooException:
+                return
+        assert False
+        
     def test_nested_threads(self):
         """check that a wait nested in a tree of
            threads works correctly



More information about the Pypy-commit mailing list