[pypy-svn] r46380 - in pypy/dist/pypy: interpreter lib lib/test2 module/_stackless
tismer at codespeak.net
tismer at codespeak.net
Fri Sep 7 03:24:33 CEST 2007
Author: tismer
Date: Fri Sep 7 03:24:31 2007
New Revision: 46380
Added:
pypy/dist/pypy/lib/test2/pickledtasklet.py (contents, props changed)
Modified:
pypy/dist/pypy/interpreter/callmethod.py
pypy/dist/pypy/lib/stackless.py
pypy/dist/pypy/lib/test2/test_stackless.py
pypy/dist/pypy/module/_stackless/coroutine.py
Log:
stackless pickling seems to be working, now. there is still a problem since we end up with a bus error. hopefully not mine...
Modified: pypy/dist/pypy/interpreter/callmethod.py
==============================================================================
--- pypy/dist/pypy/interpreter/callmethod.py (original)
+++ pypy/dist/pypy/interpreter/callmethod.py Fri Sep 7 03:24:31 2007
@@ -14,6 +14,7 @@
from pypy.rlib.jit import we_are_jitted
from pypy.interpreter.argument import Arguments
from pypy.objspace.std import StdObjSpace
+from pypy.rlib import rstack # for resume points
def object_getattribute(space):
@@ -66,6 +67,7 @@
n = nargs + (w_self is not None)
try:
w_result = f.space.call_valuestack(w_callable, n, f)
+ rstack.resume_point("CALL_METHOD", f, nargs, returns=w_result)
finally:
f.dropvalues(nargs + 2)
f.pushvalue(w_result)
Modified: pypy/dist/pypy/lib/stackless.py
==============================================================================
--- pypy/dist/pypy/lib/stackless.py (original)
+++ pypy/dist/pypy/lib/stackless.py Fri Sep 7 03:24:31 2007
@@ -57,7 +57,7 @@
arguments *argl, **argd
"""
if self._frame is None or self._frame.dead:
- self._frame = frame = GWrap()
+ self._frame = frame = MWrap(None)##GWrap()
frame.coro = self
if hasattr(self._frame, 'run') and self._frame.run:
raise ValueError("cannot bind a bound coroutine")
@@ -93,6 +93,9 @@
return _maincoro
getcurrent = staticmethod(getcurrent)
+ def __reduce__(self):
+ raise TypeError, 'pickling is not possible based upon greenlets'
+
_maincoro = coroutine()
maingreenlet = greenlet.getcurrent()
_maincoro._frame = frame = MWrap(maingreenlet)
@@ -169,6 +172,36 @@
def raise_(self):
raise self.type, self.value, self.traceback
+#
+# helpers for pickling
+#
+
+_stackless_primitive_registry = {}
+
+def register_stackless_primitive(thang, retval_expr='None'):
+ import types
+ func = thang
+ if isinstance(thang, types.MethodType):
+ func = thang.im_func
+ code = func.func_code
+ _stackless_primitive_registry[code] = retval_expr
+ # It is not too nice to attach info via the code object, but
+ # I can't think of a better solution without a real transform.
+
+def rewrite_stackless_primitive(coro_state, alive, tempval):
+ flags, state, thunk, parent = coro_state
+ for i, frame in enumerate(state):
+ retval_expr = _stackless_primitive_registry.get(frame.f_code)
+ if retval_expr:
+ # this tasklet needs to stop pickling here and return its value.
+ tempval = eval(retval_expr, globals(), frame.f_locals)
+ state = state[:i]
+ coro_state = flags, state, thunk, parent
+ return coro_state, alive, tempval
+
+#
+#
+
class channel(object):
"""
A channel object is used for communication between tasklets.
@@ -214,6 +247,7 @@
continue immediately, and the sender is put at the end of
the runnables list.
The above policy can be changed by setting channel flags.
+ XXX channel flags are not implemented, yet.
"""
receiver = getcurrent()
willblock = not self.balance > 0
@@ -232,11 +266,14 @@
_scheduler_remove(getcurrent())
schedule()
assert not receiver.blocked
-
+
+ # XXX wrong. This check should happen on every context switch, not here.
msg = receiver.tempval
if isinstance(msg, bomb):
msg.raise_()
return msg
+
+ register_stackless_primitive(receive, retval_expr='receiver.tempval')
def send_exception(self, exp_type, msg):
self.send(bomb(exp_type, exp_type(msg)))
@@ -273,6 +310,8 @@
schedule()
assert not sender.blocked
+ register_stackless_primitive(send)
+
class tasklet(coroutine):
"""
A tasklet object represents a tiny task in a Python thread.
@@ -282,7 +321,10 @@
"""
tempval = None
def __new__(cls, func=None, label=''):
- return coroutine.__new__(cls)
+ res = coroutine.__new__(cls)
+ res.label = label
+ res._task_id = None
+ return res
def __init__(self, func=None, label=''):
coroutine.__init__(self)
@@ -354,19 +396,42 @@
return self
def run(self):
- if _scheduler_contains(self):
- return
- else:
- _scheduler_append(self)
+ self.insert()
+ _scheduler_switch(getcurrent(), self)
+ def insert(self):
+ if self.blocked:
+ raise RuntimeError, "You cannot run a blocked tasklet"
+ if not self.alive:
+ raise RuntimeError, "You cannot run an unbound(dead) tasklet"
+ _scheduler_append(self)
+
+ def remove(self):
+ if self.blocked:
+ raise RuntimeError, "You cannot remove a blocked tasklet."
+ if self is getcurrent():
+ raise RuntimeError, "The current tasklet cannot be removed."
+ # not sure if I will revive this " Use t=tasklet().capture()"
+ _scheduler_remove(self)
+
def __reduce__(self):
- one, two, three = coroutine.__reduce__(self)
+ one, two, coro_state = coroutine.__reduce__(self)
assert one is coroutine
assert two == ()
- return tasklet, (), (three, self.alive, self.tempval)
+ # we want to get rid of the parent thing.
+ # for now, we just drop it
+ a, b, c, d = coro_state
+ if d:
+ assert isinstance(d, coroutine)
+ coro_state = a, b, c, None
+ coro_state, alive, tempval = rewrite_stackless_primitive(coro_state, self.alive, self.tempval)
+ inst_dict = self.__dict__.copy()
+ del inst_dict['tempval']
+ return self.__class__, (), (coro_state, alive, tempval, inst_dict)
- def __setstate__(self, (coro_state, alive, tempval)):
+ def __setstate__(self, (coro_state, alive, tempval, inst_dict)):
coroutine.__setstate__(self, coro_state)
+ self.__dict__.update(inst_dict)
self.alive = alive
self.tempval = tempval
Added: pypy/dist/pypy/lib/test2/pickledtasklet.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/lib/test2/pickledtasklet.py Fri Sep 7 03:24:31 2007
@@ -0,0 +1,27 @@
+import pickle, sys
+import stackless
+
+ch = stackless.channel()
+
+def recurs(depth, level=1):
+ print 'enter level %s%d' % (level*' ', level)
+ if level >= depth:
+ ch.send('hi')
+ if level < depth:
+ recurs(depth, level+1)
+ print 'leave level %s%d' % (level*' ', level)
+
+def demo(depth):
+ t = stackless.tasklet(recurs)(depth)
+ print ch.receive()
+ pickle.dump(t, file('tasklet.pickle', 'wb'))
+
+if __name__ == '__main__':
+ if len(sys.argv) > 1:
+ t = pickle.load(file(sys.argv[1], 'rb'))
+ t.insert()
+ else:
+ t = stackless.tasklet(demo)(14)
+ stackless.run()
+
+# remark: think of fixing cells etc. on the sprint
Modified: pypy/dist/pypy/lib/test2/test_stackless.py
==============================================================================
--- pypy/dist/pypy/lib/test2/test_stackless.py (original)
+++ pypy/dist/pypy/lib/test2/test_stackless.py Fri Sep 7 03:24:31 2007
@@ -7,25 +7,44 @@
cls.space = space
def test_pickle(self):
- import pickle, sys
- import stackless
-
- ch = stackless.channel()
-
- def recurs(depth, level=1):
- print 'enter level %s%d' % (level*' ', level)
- if level >= depth:
- ch.send('hi')
- if level < depth:
- recurs(depth, level+1)
- print 'leave level %s%d' % (level*' ', level)
-
- def demo(depth):
- t = stackless.tasklet(recurs)(depth)
- print ch.receive()
- blob = pickle.dumps(t)
-
- t = stackless.tasklet(demo)(14)
- stackless.run()
-
-# remark: think of fixing cells etc. on the sprint
+ import new, sys
+
+ mod = new.module('mod')
+ sys.modules['mod'] = mod
+ try:
+ exec '''
+import pickle, sys
+import stackless
+lev = 14
+
+ch = stackless.channel()
+seen = []
+
+def recurs(depth, level=1):
+ print 'enter level %s%d' % (level*' ', level)
+ seen.append(level)
+ if level >= depth:
+ ch.send('hi')
+ if level < depth:
+ recurs(depth, level+1)
+ seen.append(level)
+ print 'leave level %s%d' % (level*' ', level)
+
+def demo(depth):
+ t = stackless.tasklet(recurs)(depth)
+ print ch.receive()
+ global blob
+ blob = pickle.dumps(t)
+
+t = stackless.tasklet(demo)(lev)
+stackless.run()
+assert seen == range(1, lev+1) + range(lev, 0, -1)
+print "now running the clone"
+tt = pickle.loads(blob)
+tt.insert()
+seen = []
+stackless.run()
+assert seen == range(lev, 0, -1)
+''' in mod.__dict__
+ finally:
+ del sys.modules['mod']
Modified: pypy/dist/pypy/module/_stackless/coroutine.py
==============================================================================
--- pypy/dist/pypy/module/_stackless/coroutine.py (original)
+++ pypy/dist/pypy/module/_stackless/coroutine.py Fri Sep 7 03:24:31 2007
@@ -219,7 +219,10 @@
ec)
instr = frame.last_instr
opcode = ord(code[instr])
- assert opcode == pythonopcode.opmap['CALL_FUNCTION']
+ map = pythonopcode.opmap
+ call_ops = [map['CALL_FUNCTION'], map['CALL_FUNCTION_KW'], map['CALL_FUNCTION_VAR'],
+ map['CALL_FUNCTION_VAR_KW'], map['CALL_METHOD']]
+ assert opcode in call_ops
# ("dispatch_call", self, co_code, next_instr, ec)
chain = resume_state_create(chain, "dispatch_call", frame, code,
instr+3, ec)
@@ -229,8 +232,12 @@
# Only positional arguments
nargs = oparg & 0xff
# case1: ("CALL_FUNCTION", f, nargs, returns=w_result)
- chain = resume_state_create(chain, 'CALL_FUNCTION', frame,
- nargs)
+ if space.config.objspace.opcodes.CALL_METHOD and opcode == map['CALL_METHOD']:
+ chain = resume_state_create(chain, 'CALL_METHOD', frame,
+ nargs)
+ else:
+ chain = resume_state_create(chain, 'CALL_FUNCTION', frame,
+ nargs)
else:
# case2: ("call_function", f, returns=w_result)
chain = resume_state_create(chain, 'call_function', frame)
More information about the Pypy-commit
mailing list