[pypy-svn] r30455 - in pypy/dist/pypy/objspace: . test
auc at codespeak.net
auc at codespeak.net
Mon Jul 24 16:54:30 CEST 2006
Author: auc
Date: Mon Jul 24 16:54:28 2006
New Revision: 30455
Modified:
pypy/dist/pypy/objspace/logic.py
pypy/dist/pypy/objspace/test/test_logicobjspace.py
Log:
exceptions propagation to logic vars
Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py (original)
+++ pypy/dist/pypy/objspace/logic.py Mon Jul 24 16:54:28 2006
@@ -71,6 +71,7 @@
self._init_head(self._main)
self._init_blocked()
self._switch_count = 0
+ self._traced = {}
w (".. MAIN THREAD = ", str(id(self._main)))
def _init_blocked(self):
@@ -119,6 +120,7 @@
#XXX don't we need to notify the consumers ?
w(".. REMOVING", str(id(thread)))
assert thread not in self._blocked
+ del self._traced[thread]
l = thread.prev
r = thread.next
l.next = r
@@ -136,19 +138,28 @@
w(".. SWITCHING", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
self._switch_count += 1
to_be_run.w_switch()
+
+ def schedule_or_pass(self):
+ to_be_run = self._select_next(dont_pass=False)
+ curr = ClonableCoroutine.w_getcurrent(self.space)
+ if to_be_run == curr:
+ w(".. PASS")
+ return
+ w(".. SWITCHING", str(id(curr)), "=>", str(id(to_be_run)))
+ self._switch_count += 1
+ to_be_run.w_switch()
- def _select_next(self):
+ def _select_next(self, dont_pass=True):
to_be_run = self._head
sentinel = to_be_run
current = ClonableCoroutine.w_getcurrent(self.space)
while (to_be_run in self._blocked) \
- or (to_be_run == current) \
- or to_be_run.is_dead():
+ or to_be_run.is_dead() \
+ or (dont_pass and (to_be_run == current)):
to_be_run = to_be_run.next
if to_be_run == sentinel:
self.display_head()
## we RESET sched state so as to keep being usable beyond that
- # (for instance, allow other tests to be run)
self._init_head(self._main)
self._init_blocked()
w(".. SCHEDULER reinitialized")
@@ -214,18 +225,29 @@
blocked.append(uthread)
self._blocked[uthread] = True
- def unblock_byneed_on(self, space, w_var):
+ def unblock_byneed_on(self, w_var):
v(".. we UNBLOCK BYNEED dependants of var", str(id(w_var)))
assert isinstance(w_var, W_Var)
blocked = []
- for w_alias in aliases(space, w_var):
+ for w_alias in aliases(self.space, w_var):
if w_alias in self._blocked_byneed:
blocked += self._blocked_byneed[w_alias]
del self._blocked_byneed[w_alias]
- w_alias.w_needed = True
+ w_alias.needed = True
w(str([id(thr) for thr in blocked]))
for thr in blocked: del self._blocked[thr]
+ def trace_vars(self, thread, lvars):
+ w(".. TRACING logic vars.", str(lvars), "for", str(id(thread)))
+ assert not self._traced.has_key(thread)
+ self._traced[thread] = lvars
+
+ def dirty_traced_vars(self, thread, failed_value):
+ w(".. DIRTYING traced vars")
+ for w_var in self._traced[thread]:
+ if self.space.is_true(self.space.is_free(w_var)):
+ bind(self.space, w_var, failed_value)
+
scheduler = []
class FutureThunk(_AppThunk):
@@ -235,12 +257,15 @@
self._coro = coro
def call(self):
+ scheduler[0].trace_vars(self._coro, logic_args(self.args.unpack()))
try:
try:
_AppThunk.call(self)
except Exception, exc:
w(".. exceptional EXIT of", str(id(self._coro)), "with", str(exc))
- bind(self.space, self.w_Result, W_FailedValue(exc))
+ failed_val = W_FailedValue(exc)
+ bind(self.space, self.w_Result, failed_val)
+ scheduler[0].dirty_traced_vars(self._coro, failed_val)
self._coro._dead = True
else:
w(".. clean EXIT of", str(id(self._coro)),
@@ -251,16 +276,27 @@
scheduler[0].remove_thread(self._coro)
scheduler[0].schedule()
+
+def logic_args(args):
+ "returns logic vars found in unpacked normalized args"
+ pos = args[0]
+ kwa = args[1]
+ pos_l = [arg for arg in pos
+ if isinstance(arg, W_Var)]
+ kwa_l = [arg for arg in kwa.keys()
+ if isinstance(arg, W_Var)]
+ return pos_l + kwa_l
+
def future(space, w_callable, __args__):
"""returns a future result"""
v(".. THREAD")
args = __args__.normalize()
- w_Future = W_Future(space)
# coro init
coro = ClonableCoroutine(space)
# prepare thread chaining, create missing slots
coro.next = coro.prev = None
# feed the coro
+ w_Future = W_Future(space)
thunk = FutureThunk(space, w_callable, args, w_Future, coro)
coro.bind(thunk)
w(str(id(coro)))
@@ -272,8 +308,7 @@
baseobjspace.W_Root,
argument.Arguments])
-# need : getcurrent(), getmain(),
-# wrapper for schedule() ?
+# need (applevel) : getcurrent(), getmain(),
def sched_stats(space):
sched = scheduler[0]
@@ -286,29 +321,17 @@
return w_ret
app_sched_stats = gateway.interp2app(sched_stats)
-#-- VARIABLE ---------------------
-
-#-- Exceptions -------
-
-## what we can know:
-
-## location of new var
-## transmission of vars to threads
-## => possibility to propagate exceptions amongst all threads that share a var
-## (kind of process linking in Erlang, where giving var => do link)
+def schedule(space):
+ scheduler[0].schedule_or_pass()
+app_schedule = gateway.interp2app(schedule)
-## what is sane ?
-
-## reraising FailedValues (case of futures)
-## XXX
-
-#-- /Exceptions ------
+#-- VARIABLE ---------------------
class W_Var(W_Root, object):
- def __init__(w_self):
+ def __init__(w_self, space):
w_self.w_bound_to = w_self
- w_self.w_needed = False
+ w_self.needed = False
def __repr__(w_self):
if w_self.w_bound_to:
@@ -318,12 +341,14 @@
prettyfy_id(str(id(w_self))))
def newvar(space):
- return W_Var()
+ w_v = W_Var(space)
+ w("VAR", str(id(w_v)))
+ return w_v
app_newvar = gateway.interp2app(newvar)
class W_Future(W_Var):
def __init__(w_self, space):
- W_Var.__init__(w_self)
+ W_Var.__init__(w_self, space)
w_self.client = ClonableCoroutine.w_getcurrent(space)
class W_FailedValue(W_Root, object):
@@ -341,7 +366,7 @@
def wait__Var(space, w_var):
w(":wait", str(id(ClonableCoroutine.w_getcurrent(space))))
if space.is_true(space.is_free(w_var)):
- scheduler[0].unblock_byneed_on(space, w_var)
+ scheduler[0].unblock_byneed_on(w_var)
scheduler[0].add_to_blocked_on(w_var, ClonableCoroutine.w_getcurrent(space))
scheduler[0].schedule()
assert space.is_true(space.is_bound(w_var))
@@ -365,7 +390,7 @@
def wait_needed__Var(space, w_var):
#print " :needed", w_var
if space.is_true(space.is_free(w_var)):
- if w_var.w_needed:
+ if w_var.needed:
return
scheduler[0].add_to_blocked_byneed(w_var, ClonableCoroutine.w_getcurrent(space))
scheduler[0].schedule()
@@ -528,7 +553,7 @@
bind__Var_Root(space, w_fut, w_obj) # call-next-method ?
def bind__Var_Var(space, w_v1, w_v2):
- w("var var")
+ #w("var var")
if space.is_true(space.is_bound(w_v1)):
if space.is_true(space.is_bound(w_v2)):
# we allow re-binding to same value, see 3.
@@ -928,6 +953,8 @@
space.wrap(app_wait_needed))
space.setitem(space.builtin.w_dict, space.wrap('sched_stats'),
space.wrap(app_sched_stats))
+ space.setitem(space.builtin.w_dict, space.wrap('schedule'),
+ space.wrap(app_schedule))
#-- misc -----
space.setitem(space.builtin.w_dict, space.wrap('interp_id'),
Modified: pypy/dist/pypy/objspace/test/test_logicobjspace.py
==============================================================================
--- pypy/dist/pypy/objspace/test/test_logicobjspace.py (original)
+++ pypy/dist/pypy/objspace/test/test_logicobjspace.py Mon Jul 24 16:54:28 2006
@@ -247,7 +247,7 @@
return
assert False
- def test_exceptions_harder(self):
+ def test_exception_in_chain(self):
class FooException(Exception): pass
def raise_foo():
@@ -270,6 +270,36 @@
return
assert False
+ def test_exception_in_group(self):
+ class FooException(Exception): pass
+ from operator import add
+
+ def loop_or_raise(Canary, crit, Bomb_signal):
+ "Canary will be untouched there ..."
+ while(1):
+ if is_bound(Bomb_signal):
+ if Bomb_signal == crit:
+ raise FooException
+ else: # but returned
+ return Canary
+ schedule()
+ return 42
+
+ B, C = newvar(), newvar()
+ T = future(loop_or_raise, C, 'foo', B)
+ U = future(loop_or_raise, C, 'bar', B)
+
+ unify(B, 'foo')
+ try:
+ wait(T)
+ except FooException:
+ try:
+ # and contamined
+ wait(U)
+ except FooException:
+ return
+ assert False
+
def test_nested_threads(self):
"""check that a wait nested in a tree of
threads works correctly
More information about the Pypy-commit
mailing list