[pypy-svn] r29139 - pypy/dist/pypy/module/_stackless/test

pedronis at codespeak.net pedronis at codespeak.net
Thu Jun 22 15:17:26 CEST 2006


Author: pedronis
Date: Thu Jun 22 15:17:25 2006
New Revision: 29139

Modified:
   pypy/dist/pypy/module/_stackless/test/test_stackless.py
Log:
tests derived from the simple stackless.com examples



Modified: pypy/dist/pypy/module/_stackless/test/test_stackless.py
==============================================================================
--- pypy/dist/pypy/module/_stackless/test/test_stackless.py	(original)
+++ pypy/dist/pypy/module/_stackless/test/test_stackless.py	Thu Jun 22 15:17:25 2006
@@ -200,4 +200,130 @@
         t.kill()
         assert not t.alive
 
+    # tests inspired from simple stackless.com examples
 
+    def test_construction(self):
+        output = []
+        def print_(*args):
+            output.append(args)
+
+        import stackless
+        def aCallable(value):
+            print_("aCallable:", value)
+
+        task = stackless.tasklet(aCallable)
+        task.setup('Inline using setup')
+
+        stackless.run()
+        assert output == [("aCallable:", 'Inline using setup')]
+
+
+        del output[:]
+        task = stackless.tasklet(aCallable)
+        task('Inline using ()')
+
+        stackless.run()
+        assert output == [("aCallable:", 'Inline using ()')]
+        
+        del output[:]
+        task = stackless.tasklet()
+        task.bind(aCallable)
+        task('Bind using ()')
+
+        stackless.run()
+        assert output == [("aCallable:", 'Bind using ()')]
+
+    def test_simple_channel(self):
+        output = []
+        def print_(*args):
+            output.append(args)
+            
+        import stackless
+        
+        def Sending(channel):
+            print_("sending")
+            channel.send("foo")
+
+        def Receiving(channel):
+            print_("receiving")
+            print_(channel.receive())
+
+        ch=stackless.channel()
+
+        task=stackless.tasklet(Sending)(ch)
+        stackless.schedule(task)
+        task2=stackless.tasklet(Receiving)(ch)
+        stackless.schedule(task2)
+
+        stackless.run()
+
+        assert output == [('sending',), ('receiving',), ('foo',)]
+
+    def test_balance_zero(self):
+        import stackless
+
+        ch=stackless.channel()
+        assert ch.balance == 0
+        
+    def test_balance_send(self):
+        import stackless
+
+        def Sending(channel):
+            channel.send("foo")
+
+        ch=stackless.channel()
+
+        task=stackless.tasklet(Sending)(ch)
+        stackless.schedule(task)
+        stackless.run()
+
+        assert ch.balance == 1
+
+    def test_balance_recv(self):
+        import stackless
+
+        def Receiving(channel):
+            channel.receive()
+
+        ch=stackless.channel()
+
+        task=stackless.tasklet(Receiving)(ch)
+        stackless.schedule(task)
+        stackless.run()
+
+        assert ch.balance == -1
+
+    def test_run(self):
+        output = []
+        def print_(*args):
+            output.append(args)
+
+        import stackless
+        def f(i):
+            print_(i)
+
+        stackless.tasklet(f)(1)
+        stackless.tasklet(f)(2)
+        stackless.run()
+
+        assert output == [(1,), (2,)]
+
+    def test_cooperative(self):
+        output = []
+        def print_(*args):
+            output.append(args)
+
+        import stackless
+        
+        def Loop(i):
+            for x in range(3):
+                stackless.schedule()
+                print_("schedule", i)
+
+        stackless.tasklet(Loop)(1)
+        stackless.tasklet(Loop)(2)
+        stackless.run()
+
+        assert output == [('schedule', 1), ('schedule', 2),
+                          ('schedule', 1), ('schedule', 2),
+                          ('schedule', 1), ('schedule', 2),]



More information about the Pypy-commit mailing list