[pypy-svn] r46380 - in pypy/dist/pypy: interpreter lib lib/test2 module/_stackless

tismer at codespeak.net tismer at codespeak.net
Fri Sep 7 03:24:33 CEST 2007


Author: tismer
Date: Fri Sep  7 03:24:31 2007
New Revision: 46380

Added:
   pypy/dist/pypy/lib/test2/pickledtasklet.py   (contents, props changed)
Modified:
   pypy/dist/pypy/interpreter/callmethod.py
   pypy/dist/pypy/lib/stackless.py
   pypy/dist/pypy/lib/test2/test_stackless.py
   pypy/dist/pypy/module/_stackless/coroutine.py
Log:
stackless pickling seems to be working, now. there is still a problem since we end up with a bus error. hopefully not mine...

Modified: pypy/dist/pypy/interpreter/callmethod.py
==============================================================================
--- pypy/dist/pypy/interpreter/callmethod.py	(original)
+++ pypy/dist/pypy/interpreter/callmethod.py	Fri Sep  7 03:24:31 2007
@@ -14,6 +14,7 @@
 from pypy.rlib.jit import we_are_jitted
 from pypy.interpreter.argument import Arguments
 from pypy.objspace.std import StdObjSpace
+from pypy.rlib import rstack # for resume points
 
 
 def object_getattribute(space):
@@ -66,6 +67,7 @@
         n = nargs + (w_self is not None)
         try:
             w_result = f.space.call_valuestack(w_callable, n, f)
+            rstack.resume_point("CALL_METHOD", f, nargs, returns=w_result)
         finally:
             f.dropvalues(nargs + 2)
         f.pushvalue(w_result)

Modified: pypy/dist/pypy/lib/stackless.py
==============================================================================
--- pypy/dist/pypy/lib/stackless.py	(original)
+++ pypy/dist/pypy/lib/stackless.py	Fri Sep  7 03:24:31 2007
@@ -57,7 +57,7 @@
                arguments *argl, **argd
             """
             if self._frame is None or self._frame.dead:
-                self._frame = frame = GWrap()
+                self._frame = frame = MWrap(None)##GWrap()
                 frame.coro = self
             if hasattr(self._frame, 'run') and self._frame.run:
                 raise ValueError("cannot bind a bound coroutine")
@@ -93,6 +93,9 @@
                 return _maincoro
         getcurrent = staticmethod(getcurrent)
 
+        def __reduce__(self):
+            raise TypeError, 'pickling is not possible based upon greenlets'
+
     _maincoro = coroutine()
     maingreenlet = greenlet.getcurrent()
     _maincoro._frame = frame = MWrap(maingreenlet)
@@ -169,6 +172,36 @@
     def raise_(self):
         raise self.type, self.value, self.traceback
 
+#
+# helpers for pickling
+#
+
+_stackless_primitive_registry = {}
+
+def register_stackless_primitive(thang, retval_expr='None'):
+    import types
+    func = thang
+    if isinstance(thang, types.MethodType):
+        func = thang.im_func
+    code = func.func_code
+    _stackless_primitive_registry[code] = retval_expr
+    # It is not too nice to attach info via the code object, but
+    # I can't think of a better solution without a real transform.
+
+def rewrite_stackless_primitive(coro_state, alive, tempval):
+    flags, state, thunk, parent = coro_state
+    for i, frame in enumerate(state):
+        retval_expr = _stackless_primitive_registry.get(frame.f_code)
+        if retval_expr:
+            # this tasklet needs to stop pickling here and return its value.
+            tempval = eval(retval_expr, globals(), frame.f_locals)
+            state = state[:i]
+            coro_state = flags, state, thunk, parent
+    return coro_state, alive, tempval
+
+#
+#
+
 class channel(object):
     """
     A channel object is used for communication between tasklets.
@@ -214,6 +247,7 @@
         continue immediately, and the sender is put at the end of
         the runnables list.
         The above policy can be changed by setting channel flags.
+        XXX channel flags are not implemented, yet.
         """
         receiver = getcurrent()
         willblock = not self.balance > 0
@@ -232,11 +266,14 @@
             _scheduler_remove(getcurrent())
             schedule()
             assert not receiver.blocked
-            
+          
+        # XXX wrong. This check should happen on every context switch, not here.  
         msg = receiver.tempval
         if isinstance(msg, bomb):
             msg.raise_()
         return msg
+        
+    register_stackless_primitive(receive, retval_expr='receiver.tempval')
 
     def send_exception(self, exp_type, msg):
         self.send(bomb(exp_type, exp_type(msg)))
@@ -273,6 +310,8 @@
             schedule()
             assert not sender.blocked
             
+    register_stackless_primitive(send)
+            
 class tasklet(coroutine):
     """
     A tasklet object represents a tiny task in a Python thread.
@@ -282,7 +321,10 @@
     """
     tempval = None
     def __new__(cls, func=None, label=''):
-        return coroutine.__new__(cls)
+        res = coroutine.__new__(cls)
+        res.label = label
+        res._task_id = None
+        return res
 
     def __init__(self, func=None, label=''):
         coroutine.__init__(self)
@@ -354,19 +396,42 @@
         return self
 
     def run(self):
-        if _scheduler_contains(self):
-            return
-        else:
-            _scheduler_append(self)
+        self.insert()
+        _scheduler_switch(getcurrent(), self)
 
+    def insert(self):
+        if self.blocked:
+            raise RuntimeError, "You cannot run a blocked tasklet"
+	   if not self.alive:
+	       raise RuntimeError, "You cannot run an unbound(dead) tasklet"
+        _scheduler_append(self)
+
+    def remove(self):
+        if self.blocked:
+            raise RuntimeError, "You cannot remove a blocked tasklet."
+        if self is getcurrent():
+            raise RuntimeError, "The current tasklet cannot be removed."
+		    # not sure if I will revive this  " Use t=tasklet().capture()"
+        _scheduler_remove(self)
+        
     def __reduce__(self):
-        one, two, three = coroutine.__reduce__(self)
+        one, two, coro_state = coroutine.__reduce__(self)
         assert one is coroutine
         assert two == ()
-        return tasklet, (), (three, self.alive, self.tempval)
+        # we want to get rid of the parent thing.
+        # for now, we just drop it
+        a, b, c, d = coro_state
+        if d:
+            assert isinstance(d, coroutine)
+        coro_state = a, b, c, None
+        coro_state, alive, tempval = rewrite_stackless_primitive(coro_state, self.alive, self.tempval)
+        inst_dict = self.__dict__.copy()
+        del inst_dict['tempval']
+        return self.__class__, (), (coro_state, alive, tempval, inst_dict)
 
-    def __setstate__(self, (coro_state, alive, tempval)):
+    def __setstate__(self, (coro_state, alive, tempval, inst_dict)):
         coroutine.__setstate__(self, coro_state)
+        self.__dict__.update(inst_dict)
         self.alive = alive
         self.tempval = tempval
 

Added: pypy/dist/pypy/lib/test2/pickledtasklet.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/lib/test2/pickledtasklet.py	Fri Sep  7 03:24:31 2007
@@ -0,0 +1,27 @@
+import pickle, sys
+import stackless
+
+ch = stackless.channel()
+
+def recurs(depth, level=1):
+    print 'enter level %s%d' % (level*'  ', level)
+    if level >= depth:
+        ch.send('hi')
+    if level < depth:
+        recurs(depth, level+1)
+    print 'leave level %s%d' % (level*'  ', level)
+
+def demo(depth):
+    t = stackless.tasklet(recurs)(depth)
+    print ch.receive()
+    pickle.dump(t, file('tasklet.pickle', 'wb'))
+
+if __name__ == '__main__':
+    if len(sys.argv) > 1:
+        t = pickle.load(file(sys.argv[1], 'rb'))
+        t.insert()
+    else:
+        t = stackless.tasklet(demo)(14)
+    stackless.run()
+
+# remark: think of fixing cells etc. on the sprint

Modified: pypy/dist/pypy/lib/test2/test_stackless.py
==============================================================================
--- pypy/dist/pypy/lib/test2/test_stackless.py	(original)
+++ pypy/dist/pypy/lib/test2/test_stackless.py	Fri Sep  7 03:24:31 2007
@@ -7,25 +7,44 @@
         cls.space = space
 
     def test_pickle(self):
-        import pickle, sys
-        import stackless
-        
-        ch = stackless.channel()
-        
-        def recurs(depth, level=1):
-            print 'enter level %s%d' % (level*'  ', level)
-            if level >= depth:
-                ch.send('hi')
-            if level < depth:
-                recurs(depth, level+1)
-            print 'leave level %s%d' % (level*'  ', level)
-        
-        def demo(depth):
-            t = stackless.tasklet(recurs)(depth)
-            print ch.receive()
-            blob = pickle.dumps(t)
-        
-        t = stackless.tasklet(demo)(14)
-        stackless.run()
-        
-# remark: think of fixing cells etc. on the sprint
+        import new, sys
+
+        mod = new.module('mod')
+        sys.modules['mod'] = mod
+        try:
+            exec '''
+import pickle, sys
+import stackless
+lev = 14
+
+ch = stackless.channel()
+seen = []
+
+def recurs(depth, level=1):
+    print 'enter level %s%d' % (level*'  ', level)
+    seen.append(level)
+    if level >= depth:
+        ch.send('hi')
+    if level < depth:
+        recurs(depth, level+1)
+    seen.append(level)
+    print 'leave level %s%d' % (level*'  ', level)
+
+def demo(depth):
+    t = stackless.tasklet(recurs)(depth)
+    print ch.receive()
+    global blob
+    blob = pickle.dumps(t)
+    
+t = stackless.tasklet(demo)(lev)
+stackless.run()
+assert seen == range(1, lev+1) + range(lev, 0, -1)
+print "now running the clone"
+tt = pickle.loads(blob)
+tt.insert()
+seen = []
+stackless.run()
+assert seen == range(lev, 0, -1)
+''' in mod.__dict__
+        finally:
+            del sys.modules['mod']

Modified: pypy/dist/pypy/module/_stackless/coroutine.py
==============================================================================
--- pypy/dist/pypy/module/_stackless/coroutine.py	(original)
+++ pypy/dist/pypy/module/_stackless/coroutine.py	Fri Sep  7 03:24:31 2007
@@ -219,7 +219,10 @@
                                         ec)
             instr = frame.last_instr
             opcode = ord(code[instr])
-            assert opcode == pythonopcode.opmap['CALL_FUNCTION']
+            map = pythonopcode.opmap
+            call_ops = [map['CALL_FUNCTION'], map['CALL_FUNCTION_KW'], map['CALL_FUNCTION_VAR'], 
+                        map['CALL_FUNCTION_VAR_KW'], map['CALL_METHOD']]
+            assert opcode in call_ops
             # ("dispatch_call", self, co_code, next_instr, ec)
             chain = resume_state_create(chain, "dispatch_call", frame, code,
                                         instr+3, ec)
@@ -229,8 +232,12 @@
                 # Only positional arguments
                 nargs = oparg & 0xff
                 # case1: ("CALL_FUNCTION", f, nargs, returns=w_result)
-                chain = resume_state_create(chain, 'CALL_FUNCTION', frame,
-                                            nargs)
+                if space.config.objspace.opcodes.CALL_METHOD and opcode == map['CALL_METHOD']:
+                    chain = resume_state_create(chain, 'CALL_METHOD', frame,
+                                                nargs)
+                else:
+                    chain = resume_state_create(chain, 'CALL_FUNCTION', frame,
+                                                nargs)
             else:
                 # case2: ("call_function", f, returns=w_result)
                 chain = resume_state_create(chain, 'call_function', frame)



More information about the Pypy-commit mailing list