[pypy-svn] r30219 - in pypy/dist/pypy/objspace: . test
auc at codespeak.net
auc at codespeak.net
Wed Jul 19 12:46:36 CEST 2006
Author: auc
Date: Wed Jul 19 12:46:34 2006
New Revision: 30219
Modified:
pypy/dist/pypy/objspace/logic.py
pypy/dist/pypy/objspace/test/test_logicobjspace.py
Log:
misc changes (exceptions, futures, sched stats)
Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py (original)
+++ pypy/dist/pypy/objspace/logic.py Wed Jul 19 12:46:34 2006
@@ -41,6 +41,7 @@
HAVE_GREENLETS = True
try:
from py.magic import greenlet
+ del greenlet
except ImportError:
HAVE_GREENLETS = False
@@ -55,8 +56,8 @@
assert USE_COROUTINES # once & for all
from pypy.module._stackless.coroutine import _AppThunk
-from pypy.module._stackless.coroutine import Coroutine
-from pypy.module._stackless.interp_clonable import InterpClonableCoroutine as ClonableCoroutine
+from pypy.module._stackless.coroutine import Coroutine # XXX (that's for main)
+from pypy.module._stackless.interp_clonable import ClonableCoroutine
def SETNEXT(obj, val):
obj.next = val
@@ -74,6 +75,7 @@
self._main = ClonableCoroutine.w_getcurrent(space)
self._init_head(self._main)
self._init_blocked()
+ self._switch_count = 0
w ("MAIN THREAD = ", str(id(self._main)))
def _init_blocked(self):
@@ -136,13 +138,17 @@
def schedule(self):
to_be_run = self._select_next()
w(".. SWITCHING", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
- to_be_run.w_switch() # <- "Variable object has no attribute 'dict'
+ self._switch_count += 1
+ to_be_run.w_switch()
def _select_next(self):
to_be_run = self._head
sentinel = to_be_run
current = ClonableCoroutine.w_getcurrent(self.space)
- while (to_be_run in self._blocked) or (to_be_run == current):
+ print type(to_be_run)
+ while (to_be_run in self._blocked) \
+ or (to_be_run == current) \
+ or to_be_run.is_dead():
to_be_run = to_be_run.next
if to_be_run == sentinel:
self.display_head()
@@ -155,6 +161,15 @@
self.space.wrap("can't schedule, possible deadlock in sight"))
return to_be_run
+ def __len__(self):
+ curr = self._head
+ sentinel = curr
+ count = 1
+ while curr.next != sentinel:
+ curr = curr.next
+ count += 1
+ return count
+
def display_head(self):
curr = self._head
v("HEAD : [prev, curr, next]")
@@ -218,20 +233,27 @@
scheduler = []
-class Thunk(_AppThunk):
+class FutureThunk(_AppThunk):
def __init__(self, space, w_callable, args, w_Result, coro):
_AppThunk.__init__(self, space, coro.costate, w_callable, args)
self.w_Result = w_Result
self._coro = coro
def call(self):
- _AppThunk.call(self)
- # bind does not suffice for we can have a right-hand logic var
- unify(self.space, self.w_Result, self.costate.w_tempval)
- scheduler[0].remove_thread(self._coro)
- scheduler[0].schedule()
+ try:
+ try:
+ _AppThunk.call(self)
+ except Exception, exc:
+ print "EXCEPTION in call", exc
+ bind(self.space, self.w_Result, exc)
+ else:
+ unify(self.space, self.w_Result, self.costate.w_tempval)
+ finally:
+ scheduler[0].remove_thread(self._coro)
+ scheduler[0].schedule()
-def uthread(space, w_callable, __args__):
+def future(space, w_callable, __args__):
+ """returns a future result"""
args = __args__.normalize()
w_Future = W_Var()
# coro init
@@ -239,20 +261,29 @@
# prepare thread chaining, create missing slots
coro.next = coro.prev = None
# feed the coro
- thunk = Thunk(space, w_callable, args, w_Future, coro)
+ thunk = FutureThunk(space, w_callable, args, w_Future, coro)
coro.bind(thunk)
scheduler[0].add_new_thread(coro)
# XXX we should think about a way to make it read-only for the client
# aka true futures
return w_Future
-app_uthread = gateway.interp2app(uthread, unwrap_spec=[baseobjspace.ObjSpace,
- baseobjspace.W_Root,
- argument.Arguments])
+app_future = gateway.interp2app(future, unwrap_spec=[baseobjspace.ObjSpace,
+ baseobjspace.W_Root,
+ argument.Arguments])
+# need : getcurrent(), getmain(),
+# wrapper for schedule() ?
-
-# need : complete scheduler info, getcurrent(), getmain(),
-# wrappers for schedule()
+def sched_stats(space):
+ sched = scheduler[0]
+ ret = space.newdict([])
+ space.setitem(ret, space.wrap('switches'), space.wrap(sched._switch_count))
+ space.setitem(ret, space.wrap('threads'), space.wrap(len(sched)))
+ space.setitem(ret, space.wrap('blocked'), space.wrap(len(sched._blocked)))
+ space.setitem(ret, space.wrap('blocked_on'), space.wrap(len(sched._blocked_on)))
+ space.setitem(ret, space.wrap('blocked_byneed'), space.wrap(len(sched._blocked_byneed)))
+ return ret
+app_sched_stats = gateway.interp2app(sched_stats)
#-- VARIABLE ---------------------
@@ -410,7 +441,6 @@
def fail(space, w_obj1, w_obj2):
"""raises a specific exception for bind/unify"""
#FIXME : really raise some specific exception
- #print "failed to bind/unify"
assert isinstance(w_obj1, W_Root)
assert isinstance(w_obj2, W_Root)
raise OperationError(space.w_RuntimeError,
@@ -423,7 +453,7 @@
return a_str[l-3:l]
def interp_id(space, w_obj):
- assert isinstance(w_obj, W_ObjectObject)
+ assert isinstance(w_obj, W_Root) # or W_Wrappable ?
return space.newint(id(w_obj))
app_interp_id = gateway.interp2app(interp_id)
@@ -436,8 +466,12 @@
"""
v(" :bind")
assert isinstance(w_var, W_Var)
- assert isinstance(w_obj, W_Root)
- space.bind(w_var, w_obj)
+ try:
+ assert isinstance(w_obj, W_Root)
+ except: # we've got an interpreter exception, probably
+ _assign_exception(space, w_var, w_obj)
+ else:
+ space.bind(w_var, w_obj)
app_bind = gateway.interp2app(bind)
def bind__Var_Var(space, w_v1, w_v2):
@@ -455,14 +489,30 @@
else: # 1. both are unbound
return _alias(space, w_v1, w_v2)
+#XXX
+def _assign_exception(space, w_var, w_exc):
+ w()
+ w(" :assign an exception")
+ assert isinstance(w_var, W_Var)
+ w_curr = w_var
+ while 1:
+ w_next = w_curr.w_bound_to
+ w_curr.w_bound_to = w_exc
+ # notify the blocked threads
+ scheduler[0].unblock_on(w_curr)
+ if space.is_true(space.is_nb_(w_next, w_var)):
+ break
+ # switch to next
+ w_curr = w_next
+ w(" :assigned")
+ return space.w_None
+
def bind__Var_Root(space, w_var, w_obj):
w("var val", str(id(w_var)))
# 3. var and value
if space.is_true(space.is_free(w_var)):
return _assign(space, w_var, w_obj)
- # for dataflow behaviour we should allow
- # rebinding of unifiable values
if space.is_true(space.eq(w_var.w_bound_to, w_obj)):
return
raise OperationError(space.w_RuntimeError,
@@ -478,11 +528,9 @@
assert isinstance(w_var, W_Var)
assert isinstance(w_val, W_Root)
w_curr = w_var
- ass_count = 0
while 1:
w_next = w_curr.w_bound_to
w_curr.w_bound_to = w_val
- ass_count += 1
# notify the blocked threads
scheduler[0].unblock_on(w_curr)
if space.is_true(space.is_nb_(w_next, w_var)):
@@ -835,12 +883,14 @@
## space.setitem(space.builtin.w_dict, space.wrap('DichotomyDistributor'),
## space.wrap(distributor.app_make_dichotomy_distributor))
#-- threading --
- space.setitem(space.builtin.w_dict, space.wrap('uthread'),
- space.wrap(app_uthread))
+ space.setitem(space.builtin.w_dict, space.wrap('future'),
+ space.wrap(app_future))
space.setitem(space.builtin.w_dict, space.wrap('wait'),
space.wrap(app_wait))
space.setitem(space.builtin.w_dict, space.wrap('wait_needed'),
space.wrap(app_wait_needed))
+ space.setitem(space.builtin.w_dict, space.wrap('sched_stats'),
+ space.wrap(app_sched_stats))
#-- misc -----
space.setitem(space.builtin.w_dict, space.wrap('interp_id'),
Modified: pypy/dist/pypy/objspace/test/test_logicobjspace.py
==============================================================================
--- pypy/dist/pypy/objspace/test/test_logicobjspace.py (original)
+++ pypy/dist/pypy/objspace/test/test_logicobjspace.py Wed Jul 19 12:46:34 2006
@@ -1,4 +1,5 @@
from pypy.conftest import gettestobjspace
+from py.test import skip
class AppTest_Logic(object):
@@ -200,17 +201,60 @@
def setup_class(cls):
cls.space = gettestobjspace('logic', usemodules=("_stackless",))
- def test_one_thread(self):
+ def test_future_value(self):
def poop(X):
- wait(X)
return X + 1
X = newvar()
- Y = uthread(poop, X)
+ Y = future(poop, X)
bind(X, 42)
assert Y == 43
+ def test_future_exception(self):
+ skip('wait until we have working exception propagation')
+ class FooException(Exception): pass
+
+ def poop(X):
+ wait(X)
+ raise FooException
+
+ X=newvar()
+ Y=future(poop, X)
+ unify(X, 42)
+ try:
+ assert Y == 43
+ except FooException:
+ return
+ assert False
+
+ def test_exceptions_harder(self):
+ skip('wait until we have working exception propagation')
+ class FooException(Exception): pass
+
+ def raise_foo():
+ print "STATS", sched_stats()
+ raise FooException
+
+ def spawn(X, n):
+ print "SPAWN !"
+ if n>0:
+ F = future(spawn, X, n-1)
+ wait(X)
+ else:
+ raise_foo()
+
+ X = newvar()
+ Y = spawn(X, 3)
+ unify(X, 42)
+ try:
+ assert Y == 1
+ except FooException:
+ print "SUCCESS !"
+ print sched_stats()
+ return
+ assert False
+
def test_nested_threads(self):
"""check that a wait nested in a tree of
threads works correctly
@@ -220,10 +264,10 @@
return X
def call_sleep(X):
- return uthread(sleep, X)
+ return future(sleep, X)
X = newvar()
- v = uthread(call_sleep, X)
+ v = future(call_sleep, X)
bind(X, 42)
assert X == 42
assert is_free(v)
@@ -240,8 +284,8 @@
wait(V)
return V
- uthread(reader, X)
- uthread(binder, X)
+ future(reader, X)
+ future(binder, X)
assert X == 42
@@ -261,8 +305,8 @@
X = newvar()
S = newvar()
- unify(S, uthread(sum, X, 0))
- unify(X, uthread(generate, 0, 10))
+ unify(S, future(sum, X, 0))
+ unify(X, future(generate, 0, 10))
assert S == 45
@@ -288,8 +332,8 @@
Y = newvar()
T = newvar()
- uthread(lgenerate, 0, Y)
- unify(T, uthread(lsum, Y, 0, 10))
+ future(lgenerate, 0, Y)
+ unify(T, future(lsum, Y, 0, 10))
wait(T)
assert T == 45
@@ -301,15 +345,15 @@
def wait_two(X, Y):
Barrier = newvar()
- uthread(sleep, X, Barrier)
- uthread(sleep, Y, Barrier)
+ future(sleep, X, Barrier)
+ future(sleep, Y, Barrier)
wait(Barrier)
if is_free(Y):
return 1
return 2
X, Y = newvar(), newvar()
- o = uthread(wait_two, X, Y)
+ o = future(wait_two, X, Y)
unify(X, Y)
unify(Y, 42)
assert X == Y == 42
More information about the Pypy-commit
mailing list