[pypy-svn] r25309 - pypy/dist/pypy/objspace
auc at codespeak.net
auc at codespeak.net
Tue Apr 4 18:59:06 CEST 2006
Author: auc
Date: Tue Apr 4 18:59:06 2006
New Revision: 25309
Modified:
pypy/dist/pypy/objspace/logic.py
Log:
placate logic objspace with assert isinstance ...
Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py (original)
+++ pypy/dist/pypy/objspace/logic.py Tue Apr 4 18:59:06 2006
@@ -8,6 +8,7 @@
from pypy.objspace.std.dictobject import W_DictObject
from pypy.objspace.std.objectobject import W_ObjectObject
from pypy.objspace.std.intobject import W_IntObject
+from pypy.objspace.std.stringobject import W_StringObject
from pypy.objspace.std.model import StdObjSpaceMultiMethod
#-- THE BUILTINS ----------------------------------------------------------------------
@@ -50,9 +51,11 @@
return key
def add_to_runnable(self, uthread):
+ assert isinstance(uthread, GreenletCoroutine)
self.runnable_uthreads[uthread] = True
def remove_from_runnable(self, uthread):
+ assert isinstance(uthread, GreenletCoroutine)
del self.runnable_uthreads[uthread]
def have_runnable_threads(self):
@@ -62,6 +65,8 @@
return bool(self.uthreads_blocked_on)
def add_to_blocked(self, w_var, uthread):
+ assert isinstance(w_var, W_Var)
+ assert isinstance(uthread, GreenletCoroutine)
if w_var in self.uthreads_blocked_on:
blocked = self.uthreads_blocked_on[w_var]
else:
@@ -70,6 +75,7 @@
blocked.append(uthread)
def pop_blocked_on(self, w_var):
+ assert isinstance(w_var, W_Var)
if w_var not in self.uthreads_blocked_on:
return []
blocked = self.uthreads_blocked_on[w_var]
@@ -77,6 +83,8 @@
return blocked
def add_to_blocked_byneed(self, w_var, uthread):
+ assert isinstance(w_var, W_Var)
+ assert isinstance(uthread, GreenletCoroutine)
#print " adding", uthread, "to byneed on", w_var
if w_var in self.uthreads_blocked_byneed:
blocked = self.uthreads_blocked_byneed[w_var]
@@ -86,6 +94,7 @@
blocked.append(uthread)
def pop_blocked_byneed_on(self, w_var):
+ assert isinstance(w_var, W_Var)
if w_var not in self.uthreads_blocked_byneed:
#print " there was nobody to remove for", w_var
return []
@@ -128,9 +137,11 @@
return hash(self.greenlet)
def __eq__(self, other):
+ assert isinstance(other, GreenletCoroutine)
return self.greenlet == other.greenlet
def __ne__(self, other):
+ assert isinstance(other, GreenletCoroutine)
return not (self == other)
def __repr__(self):
@@ -170,9 +181,8 @@
#-- VARIABLE ---------------------
-class W_Var(baseobjspace.W_Root, object):
+class W_Var(W_Root, object):
def __init__(w_self):
- w_self.typedef = 'Variable'
w_self.w_bound_to = w_self
w_self.w_needed = False
@@ -224,6 +234,7 @@
return w_self.w_bound_to
def wait(space, w_obj):
+ assert isinstance(w_obj, W_Root)
return space.wait(w_obj)
app_wait = gateway.interp2app(wait)
@@ -264,6 +275,7 @@
space.wrap("wait_needed only supported on unbound variables"))
def wait_needed(space, w_var):
+ assert isinstance(w_var, W_Var)
return space.wait_needed(w_var)
app_wait_needed = gateway.interp2app(wait_needed)
@@ -282,6 +294,7 @@
app_is_aliased = gateway.interp2app(is_aliased)
def is_free(space, w_obj):
+ assert isinstance(w_obj, W_Root)
return space.is_free(w_obj)
app_is_free = gateway.interp2app(is_free)
@@ -297,6 +310,7 @@
all_mms['is_free'] = is_free_mm
def is_bound(space, w_obj):
+ assert isinstance(w_obj, W_Root)
return space.is_bound(w_obj)
app_is_bound = gateway.interp2app(is_bound)
@@ -313,6 +327,8 @@
def alias_of(space, w_var1, w_var2): # FIXME: appears to block
+ assert isinstance(w_var1, W_Var)
+ assert isinstance(w_var2, W_Var)
assert space.is_true(space.is_free(w_var1))
assert space.is_true(space.is_free(w_var2))
w_curr = w_var1
@@ -347,6 +363,7 @@
def aliases(space, w_var):
"""return the aliases of a var, including itself"""
+ assert isinstance(w_var, W_Var)
al = []
w_curr = w_var
while 1:
@@ -359,6 +376,7 @@
def get_ring_tail(space, w_start):
"""returns the last var of a ring of aliases"""
+ assert isinstance(w_start, W_Var)
w_curr = w_start
while 1:
w_next = w_curr.w_bound_to
@@ -371,6 +389,8 @@
"""raises a specific exception for bind/unify"""
#FIXME : really raise some specific exception
#print "failed to bind/unify"
+ assert isinstance(w_obj1, W_Root)
+ assert isinstance(w_obj2, W_Root)
raise OperationError(space.w_RuntimeError,
space.wrap("Unification failure"))
@@ -381,31 +401,36 @@
pass
def prettyfy_id(a_str):
+ assert isinstance(a_str, W_StringObject)
l = len(a_str) - 1
return a_str[l-3:l]
#FIXME : does not work at all,
# even a pure applevel version ...
-def _sleep(space, w_var, w_barrier):
- wait(space, w_var)
- bind(space, w_barrier, space.newint(1))
-
-def wait_two(space, w_1, w_2):
- """waits until one out of two logic variables
- becomes bound, then tells which one,
- with a bias toward the first if both are
- suddenly bound"""
- w_barrier = newvar(space)
- uthread(space, space.wrap(_sleep),
- argument.Arguments(space, [w_1, w_barrier]))
- uthread(space, space.wrap(_sleep),
- argument.Arguments(space, [w_2, w_barrier]))
- wait(space, w_barrier)
- if space.is_true(space.is_free(w_2)):
- return space.newint(1)
- return space.newint(2)
-app_wait_two = gateway.interp2app(wait_two)
+## def _sleep(space, w_var, w_barrier):
+## assert isinstance(w_var, W_Var)
+## assert isinstance(w_barrier, W_Var)
+## wait(space, w_var)
+## bind(space, w_barrier, space.newint(1))
+
+## def wait_two(space, w_v1, w_v2):
+## """waits until one out of two logic variables
+## becomes bound, then tells which one,
+## with a bias toward the first if both are
+## suddenly bound"""
+## assert isinstance(w_v1, W_Var)
+## assert isinstance(w_v2, W_Var)
+## w_barrier = newvar(space)
+## uthread(space, space.wrap(_sleep),
+## argument.Arguments(space, [w_v1, w_barrier]))
+## uthread(space, space.wrap(_sleep),
+## argument.Arguments(space, [w_v2, w_barrier]))
+## wait(space, w_barrier)
+## if space.is_true(space.is_free(w_v2)):
+## return space.newint(1)
+## return space.newint(2)
+## app_wait_two = gateway.interp2app(wait_two)
#-- BIND -----------------------------
@@ -415,6 +440,8 @@
3. assign value to unbound var
"""
#print " :bind", w_var, w_obj
+ assert isinstance(w_var, W_Var)
+ assert isinstance(w_obj, W_Root)
space.bind(w_var, w_obj)
app_bind = gateway.interp2app(bind)
@@ -446,6 +473,8 @@
all_mms['bind'] = bind_mm
def _assign(space, w_var, w_val):
+ assert isinstance(w_var, W_Var)
+ assert isinstance(w_val, W_Root)
#print " :assign", w_var, w_val, '[',
w_curr = w_var
ass_count = 0
@@ -468,6 +497,8 @@
def _alias(space, w_v1, w_v2):
"""appends one var to the alias chain of another
user must ensure freeness of both vars"""
+ assert isinstance(w_v1, W_Var)
+ assert isinstance(w_v2, W_Var)
#print " :alias", w_v1, w_v2
if space.is_true(space.is_nb_(w_v1, w_v2)):
return space.w_None
@@ -500,24 +531,21 @@
#-- UNIFY -------------------------
def unify(space, w_x, w_y):
+ assert isinstance(w_x, W_Root)
+ assert isinstance(w_y, W_Root)
#print ":unify ", w_x, w_y
return space.unify(w_x, w_y)
app_unify = gateway.interp2app(unify)
-def unify__Int_Int(space, w_x, w_y):
+def unify__Root_Root(space, w_x, w_y):
if not space.eq_w(w_x, w_y):
+ w_d1 = w_x.getdict()
+ w_d2 = w_y.getdict()
+ if None not in (w_d1, w_d2):
+ return space.unify(w_d1, w_d2)
+ else:
fail(space, w_x, w_y)
return space.w_None
-
-## def unify__Root_Root(space, w_x, w_y):
-## if not space.eq_w(w_x, w_y):
-## w_d1 = w_x.getdict()
-## w_d2 = w_y.getdict()
-## if None not in (w_d1, w_d2):
-## return space.unify(w_d1, w_d2)
-## else:
-## fail(space, w_x, w_y)
-## return space.w_None
def unify__Var_Var(space, w_x, w_y):
#print " :unify of two vars"
@@ -546,6 +574,8 @@
return _unify_iterables(space, w_x, w_y)
def _unify_iterables(space, w_i1, w_i2):
+ assert isinstance(w_i1, W_TupleObject) or isinstance(w_i1, W_ListObject)
+ assert isinstance(w_i2, W_TupleObject) or isinstance(w_i2, W_ListObject)
#print " :unify iterables", w_i1, w_i2
if len(w_i1.wrappeditems) != len(w_i2.wrappeditems):
fail(space, w_i1, w_i2)
@@ -559,6 +589,8 @@
unify(space, w_xi, w_yi)
def unify__Dict_Dict(space, w_m1, w_m2):
+ assert isinstance(w_m1, W_DictObject)
+ assert isinstance(w_m2, W_DictObject)
#print " :unify mappings", w_m1, w_m2
for w_xk in w_m1.content.keys():
w_xi = space.getitem(w_m1, w_xk)
@@ -569,16 +601,13 @@
unify_mm = StdObjSpaceMultiMethod('unify', 2)
-#unify_mm.register(unify__Root_Root, W_Root, W_Root)
-
-unify_mm.register(unify__Int_Int, W_IntObject, W_IntObject)
-
-## unify_mm.register(unify__Var_Var, W_Var, W_Var)
-## unify_mm.register(unify__Var_Root, W_Var, W_Root)
-## unify_mm.register(unify__Root_Var, W_Root, W_Var)
-## unify_mm.register(unify__Tuple_Tuple, W_TupleObject, W_TupleObject)
-## unify_mm.register(unify__List_List, W_ListObject, W_ListObject)
-## unify_mm.register(unify__Dict_Dict, W_DictObject, W_DictObject)
+unify_mm.register(unify__Root_Root, W_Root, W_Root)
+unify_mm.register(unify__Var_Var, W_Var, W_Var)
+unify_mm.register(unify__Var_Root, W_Var, W_Root)
+unify_mm.register(unify__Root_Var, W_Root, W_Var)
+unify_mm.register(unify__Tuple_Tuple, W_TupleObject, W_TupleObject)
+unify_mm.register(unify__List_List, W_ListObject, W_ListObject)
+unify_mm.register(unify__Dict_Dict, W_DictObject, W_DictObject)
all_mms['unify'] = unify_mm
@@ -621,6 +650,8 @@
def eqproxy(space, parentfn):
def eq(w_obj1, w_obj2):
+ assert isinstance(w_obj1, W_Root)
+ assert isinstance(w_obj2, W_Root)
if space.is_true(space.is_nb_(w_obj1, w_obj2)):
return space.newbool(True)
if space.is_true(space.is_free(w_obj1)):
@@ -632,6 +663,8 @@
def isproxy(space, parentfn):
def is_(w_obj1, w_obj2):
+ assert isinstance(w_obj1, W_Root)
+ assert isinstance(w_obj2, W_Root)
if space.is_true(space.is_nb_(w_obj1, w_obj2)):
return space.newbool(True)
return parentfn(wait(space, w_obj1), wait(space, w_obj2))
@@ -639,6 +672,8 @@
def cmpproxy(space, parentfn):
def cmp(w_obj1, w_obj2):
+ assert isinstance(w_obj1, W_Root)
+ assert isinstance(w_obj2, W_Root)
if space.is_true(space.is_nb_(w_obj1, w_obj2)):
return space.newbool(0)
if space.is_true(space.is_free(w_obj1)):
@@ -650,6 +685,8 @@
def neproxy(space, parentfn):
def ne(w_obj1, w_obj2):
+ assert isinstance(w_obj1, W_Root)
+ assert isinstance(w_obj2, W_Root)
if space.is_true(space.is_nb_(w_obj1, w_obj2)):
return space.newbool(False)
if space.is_true(space.is_free(w_obj1)):
More information about the Pypy-commit
mailing list