[pypy-svn] r30219 - in pypy/dist/pypy/objspace: . test

auc at codespeak.net auc at codespeak.net
Wed Jul 19 12:46:36 CEST 2006


Author: auc
Date: Wed Jul 19 12:46:34 2006
New Revision: 30219

Modified:
   pypy/dist/pypy/objspace/logic.py
   pypy/dist/pypy/objspace/test/test_logicobjspace.py
Log:
misc changes (exceptions, futures, sched stats)

Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py	(original)
+++ pypy/dist/pypy/objspace/logic.py	Wed Jul 19 12:46:34 2006
@@ -41,6 +41,7 @@
 HAVE_GREENLETS = True
 try:
     from py.magic import greenlet
+    del greenlet
 except ImportError:
     HAVE_GREENLETS = False
 
@@ -55,8 +56,8 @@
 assert USE_COROUTINES # once & for all
 
 from pypy.module._stackless.coroutine import _AppThunk
-from pypy.module._stackless.coroutine import Coroutine
-from pypy.module._stackless.interp_clonable import InterpClonableCoroutine as ClonableCoroutine
+from pypy.module._stackless.coroutine import Coroutine # XXX (that's for main)
+from pypy.module._stackless.interp_clonable import ClonableCoroutine
 
 def SETNEXT(obj, val):
     obj.next = val
@@ -74,6 +75,7 @@
         self._main = ClonableCoroutine.w_getcurrent(space)
         self._init_head(self._main)
         self._init_blocked()
+        self._switch_count = 0
         w ("MAIN THREAD = ", str(id(self._main)))
 
     def _init_blocked(self):
@@ -136,13 +138,17 @@
     def schedule(self):
         to_be_run = self._select_next()
         w(".. SWITCHING", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
-        to_be_run.w_switch() # <- "Variable object has no attribute 'dict'
+        self._switch_count += 1
+        to_be_run.w_switch() 
         
     def _select_next(self):
         to_be_run = self._head
         sentinel = to_be_run
         current = ClonableCoroutine.w_getcurrent(self.space)
-        while (to_be_run in self._blocked) or (to_be_run == current): 
+        print type(to_be_run)
+        while (to_be_run in self._blocked) \
+                  or (to_be_run == current) \
+                  or to_be_run.is_dead():
             to_be_run = to_be_run.next
             if to_be_run == sentinel:
                 self.display_head()
@@ -155,6 +161,15 @@
                                      self.space.wrap("can't schedule, possible deadlock in sight"))
         return to_be_run
 
+    def __len__(self):
+        curr = self._head
+        sentinel = curr
+        count = 1
+        while curr.next != sentinel:
+            curr = curr.next
+            count += 1
+        return count
+
     def display_head(self):
         curr = self._head
         v("HEAD : [prev, curr, next]")
@@ -218,20 +233,27 @@
 
 scheduler = []
 
-class Thunk(_AppThunk):
+class FutureThunk(_AppThunk):
     def __init__(self, space, w_callable, args, w_Result, coro):
         _AppThunk.__init__(self, space, coro.costate, w_callable, args)
         self.w_Result = w_Result 
         self._coro = coro
 
     def call(self):
-        _AppThunk.call(self)
-        # bind does not suffice for we can have a right-hand logic var
-        unify(self.space, self.w_Result, self.costate.w_tempval)
-        scheduler[0].remove_thread(self._coro)
-        scheduler[0].schedule()
+        try:
+            try:
+                _AppThunk.call(self)
+            except Exception, exc:
+                print "EXCEPTION in call", exc
+                bind(self.space, self.w_Result, exc)
+            else:
+                unify(self.space, self.w_Result, self.costate.w_tempval)
+        finally:
+            scheduler[0].remove_thread(self._coro)
+            scheduler[0].schedule()
 
-def uthread(space, w_callable, __args__):
+def future(space, w_callable, __args__):
+    """returns a future result"""
     args = __args__.normalize()
     w_Future = W_Var()
     # coro init
@@ -239,20 +261,29 @@
     # prepare thread chaining, create missing slots
     coro.next = coro.prev = None
     # feed the coro
-    thunk = Thunk(space, w_callable, args, w_Future, coro)
+    thunk = FutureThunk(space, w_callable, args, w_Future, coro)
     coro.bind(thunk)
     scheduler[0].add_new_thread(coro)
     # XXX we should think about a way to make it read-only for the client
     #     aka true futures
     return w_Future
-app_uthread = gateway.interp2app(uthread, unwrap_spec=[baseobjspace.ObjSpace,
-                                                       baseobjspace.W_Root,
-                                                       argument.Arguments])
+app_future = gateway.interp2app(future, unwrap_spec=[baseobjspace.ObjSpace,
+                                                     baseobjspace.W_Root,
+                                                     argument.Arguments])
     
+# need : getcurrent(), getmain(), 
+# wrapper for schedule() ?
 
-
-# need : complete scheduler info, getcurrent(), getmain(), 
-# wrappers for schedule()
+def sched_stats(space):
+    sched = scheduler[0]
+    ret = space.newdict([])
+    space.setitem(ret, space.wrap('switches'), space.wrap(sched._switch_count))
+    space.setitem(ret, space.wrap('threads'), space.wrap(len(sched)))
+    space.setitem(ret, space.wrap('blocked'), space.wrap(len(sched._blocked)))
+    space.setitem(ret, space.wrap('blocked_on'), space.wrap(len(sched._blocked_on)))
+    space.setitem(ret, space.wrap('blocked_byneed'), space.wrap(len(sched._blocked_byneed)))
+    return ret
+app_sched_stats = gateway.interp2app(sched_stats)
 
 #-- VARIABLE ---------------------
 
@@ -410,7 +441,6 @@
 def fail(space, w_obj1, w_obj2):
     """raises a specific exception for bind/unify"""
     #FIXME : really raise some specific exception
-    #print "failed to bind/unify"
     assert isinstance(w_obj1, W_Root)
     assert isinstance(w_obj2, W_Root)
     raise OperationError(space.w_RuntimeError,
@@ -423,7 +453,7 @@
     return a_str[l-3:l]
 
 def interp_id(space, w_obj):
-    assert isinstance(w_obj, W_ObjectObject)
+    assert isinstance(w_obj, W_Root) # or W_Wrappable ?
     return space.newint(id(w_obj))
 app_interp_id = gateway.interp2app(interp_id)
 
@@ -436,8 +466,12 @@
     """
     v(" :bind")
     assert isinstance(w_var, W_Var)
-    assert isinstance(w_obj, W_Root)
-    space.bind(w_var, w_obj)
+    try:
+        assert isinstance(w_obj, W_Root)
+    except: # we've got an interpreter exception, probably
+        _assign_exception(space, w_var, w_obj)
+    else:
+        space.bind(w_var, w_obj)
 app_bind = gateway.interp2app(bind)
 
 def bind__Var_Var(space, w_v1, w_v2):
@@ -455,14 +489,30 @@
     else: # 1. both are unbound
         return _alias(space, w_v1, w_v2)
 
+#XXX 
+def _assign_exception(space, w_var, w_exc):
+    w()
+    w("  :assign an exception")
+    assert isinstance(w_var, W_Var)
+    w_curr = w_var
+    while 1:
+        w_next = w_curr.w_bound_to
+        w_curr.w_bound_to = w_exc
+        # notify the blocked threads
+        scheduler[0].unblock_on(w_curr)
+        if space.is_true(space.is_nb_(w_next, w_var)):
+            break
+        # switch to next
+        w_curr = w_next
+    w("  :assigned")
+    return space.w_None
+
 
 def bind__Var_Root(space, w_var, w_obj):
     w("var val", str(id(w_var)))
     # 3. var and value
     if space.is_true(space.is_free(w_var)):
         return _assign(space, w_var, w_obj)
-    # for dataflow behaviour we should allow
-    # rebinding of unifiable values
     if space.is_true(space.eq(w_var.w_bound_to, w_obj)):
         return
     raise OperationError(space.w_RuntimeError,
@@ -478,11 +528,9 @@
     assert isinstance(w_var, W_Var)
     assert isinstance(w_val, W_Root)
     w_curr = w_var
-    ass_count = 0
     while 1:
         w_next = w_curr.w_bound_to
         w_curr.w_bound_to = w_val
-        ass_count += 1
         # notify the blocked threads
         scheduler[0].unblock_on(w_curr)
         if space.is_true(space.is_nb_(w_next, w_var)):
@@ -835,12 +883,14 @@
 ##     space.setitem(space.builtin.w_dict, space.wrap('DichotomyDistributor'),
 ##                  space.wrap(distributor.app_make_dichotomy_distributor))
     #-- threading --
-    space.setitem(space.builtin.w_dict, space.wrap('uthread'),
-                 space.wrap(app_uthread))
+    space.setitem(space.builtin.w_dict, space.wrap('future'),
+                 space.wrap(app_future))
     space.setitem(space.builtin.w_dict, space.wrap('wait'),
                  space.wrap(app_wait))
     space.setitem(space.builtin.w_dict, space.wrap('wait_needed'),
                   space.wrap(app_wait_needed))
+    space.setitem(space.builtin.w_dict, space.wrap('sched_stats'),
+                  space.wrap(app_sched_stats))
 
     #-- misc -----
     space.setitem(space.builtin.w_dict, space.wrap('interp_id'),

Modified: pypy/dist/pypy/objspace/test/test_logicobjspace.py
==============================================================================
--- pypy/dist/pypy/objspace/test/test_logicobjspace.py	(original)
+++ pypy/dist/pypy/objspace/test/test_logicobjspace.py	Wed Jul 19 12:46:34 2006
@@ -1,4 +1,5 @@
 from pypy.conftest import gettestobjspace
+from py.test import skip
  
 class AppTest_Logic(object):
 
@@ -200,17 +201,60 @@
     def setup_class(cls):
         cls.space = gettestobjspace('logic', usemodules=("_stackless",))
 
-    def test_one_thread(self):
+    def test_future_value(self):
         
         def poop(X):
-            wait(X)
             return X + 1
 
         X = newvar()
-        Y = uthread(poop, X)
+        Y = future(poop, X)
         bind(X, 42)
         assert Y == 43
 
+    def test_future_exception(self):
+        skip('wait until we have working exception propagation')
+        class FooException(Exception): pass
+        
+        def poop(X):
+            wait(X)
+            raise FooException
+
+        X=newvar()
+        Y=future(poop, X)
+        unify(X, 42)
+        try:
+            assert Y == 43
+        except FooException:
+            return
+        assert False
+
+    def test_exceptions_harder(self):
+        skip('wait until we have working exception propagation')
+        class FooException(Exception): pass
+
+        def raise_foo():
+            print "STATS", sched_stats()
+            raise FooException
+
+        def spawn(X, n):
+            print "SPAWN !"
+            if n>0:
+                F = future(spawn, X, n-1)
+                wait(X)
+            else:
+                raise_foo()
+
+        X = newvar()
+        Y = spawn(X, 3)
+        unify(X, 42)
+        try:
+            assert Y == 1
+        except FooException:
+            print "SUCCESS !"
+            print sched_stats()
+            return
+        assert False
+
     def test_nested_threads(self):
         """check that a wait nested in a tree of
            threads works correctly
@@ -220,10 +264,10 @@
             return X
 
         def call_sleep(X):
-            return uthread(sleep, X)
+            return future(sleep, X)
 
         X = newvar()
-        v = uthread(call_sleep, X)
+        v = future(call_sleep, X)
         bind(X, 42)
         assert X == 42
         assert is_free(v)
@@ -240,8 +284,8 @@
             wait(V)
             return V
 
-        uthread(reader, X)
-        uthread(binder, X)
+        future(reader, X)
+        future(binder, X)
 
         assert X == 42
 
@@ -261,8 +305,8 @@
 
         X = newvar()
         S = newvar()
-        unify(S, uthread(sum, X, 0))
-        unify(X, uthread(generate, 0, 10))
+        unify(S, future(sum, X, 0))
+        unify(X, future(generate, 0, 10))
         assert S == 45
 
 
@@ -288,8 +332,8 @@
         Y = newvar()
         T = newvar()
 
-        uthread(lgenerate, 0, Y)
-        unify(T, uthread(lsum, Y, 0, 10))
+        future(lgenerate, 0, Y)
+        unify(T, future(lsum, Y, 0, 10))
 
         wait(T)
         assert T == 45
@@ -301,15 +345,15 @@
         
         def wait_two(X, Y):
             Barrier = newvar()
-            uthread(sleep, X, Barrier)
-            uthread(sleep, Y, Barrier)
+            future(sleep, X, Barrier)
+            future(sleep, Y, Barrier)
             wait(Barrier)
             if is_free(Y):
                 return 1
             return 2
 
         X, Y = newvar(), newvar()
-        o = uthread(wait_two, X, Y)
+        o = future(wait_two, X, Y)
         unify(X, Y)
         unify(Y, 42)
         assert X == Y == 42



More information about the Pypy-commit mailing list