[pypy-svn] r29139 - pypy/dist/pypy/module/_stackless/test
pedronis at codespeak.net
pedronis at codespeak.net
Thu Jun 22 15:17:26 CEST 2006
Author: pedronis
Date: Thu Jun 22 15:17:25 2006
New Revision: 29139
Modified:
pypy/dist/pypy/module/_stackless/test/test_stackless.py
Log:
tests derived from the simple stackless.com examples
Modified: pypy/dist/pypy/module/_stackless/test/test_stackless.py
==============================================================================
--- pypy/dist/pypy/module/_stackless/test/test_stackless.py (original)
+++ pypy/dist/pypy/module/_stackless/test/test_stackless.py Thu Jun 22 15:17:25 2006
@@ -200,4 +200,130 @@
t.kill()
assert not t.alive
+ # tests inspired from simple stackless.com examples
+ def test_construction(self):
+ output = []
+ def print_(*args):
+ output.append(args)
+
+ import stackless
+ def aCallable(value):
+ print_("aCallable:", value)
+
+ task = stackless.tasklet(aCallable)
+ task.setup('Inline using setup')
+
+ stackless.run()
+ assert output == [("aCallable:", 'Inline using setup')]
+
+
+ del output[:]
+ task = stackless.tasklet(aCallable)
+ task('Inline using ()')
+
+ stackless.run()
+ assert output == [("aCallable:", 'Inline using ()')]
+
+ del output[:]
+ task = stackless.tasklet()
+ task.bind(aCallable)
+ task('Bind using ()')
+
+ stackless.run()
+ assert output == [("aCallable:", 'Bind using ()')]
+
+ def test_simple_channel(self):
+ output = []
+ def print_(*args):
+ output.append(args)
+
+ import stackless
+
+ def Sending(channel):
+ print_("sending")
+ channel.send("foo")
+
+ def Receiving(channel):
+ print_("receiving")
+ print_(channel.receive())
+
+ ch=stackless.channel()
+
+ task=stackless.tasklet(Sending)(ch)
+ stackless.schedule(task)
+ task2=stackless.tasklet(Receiving)(ch)
+ stackless.schedule(task2)
+
+ stackless.run()
+
+ assert output == [('sending',), ('receiving',), ('foo',)]
+
+ def test_balance_zero(self):
+ import stackless
+
+ ch=stackless.channel()
+ assert ch.balance == 0
+
+ def test_balance_send(self):
+ import stackless
+
+ def Sending(channel):
+ channel.send("foo")
+
+ ch=stackless.channel()
+
+ task=stackless.tasklet(Sending)(ch)
+ stackless.schedule(task)
+ stackless.run()
+
+ assert ch.balance == 1
+
+ def test_balance_recv(self):
+ import stackless
+
+ def Receiving(channel):
+ channel.receive()
+
+ ch=stackless.channel()
+
+ task=stackless.tasklet(Receiving)(ch)
+ stackless.schedule(task)
+ stackless.run()
+
+ assert ch.balance == -1
+
+ def test_run(self):
+ output = []
+ def print_(*args):
+ output.append(args)
+
+ import stackless
+ def f(i):
+ print_(i)
+
+ stackless.tasklet(f)(1)
+ stackless.tasklet(f)(2)
+ stackless.run()
+
+ assert output == [(1,), (2,)]
+
+ def test_cooperative(self):
+ output = []
+ def print_(*args):
+ output.append(args)
+
+ import stackless
+
+ def Loop(i):
+ for x in range(3):
+ stackless.schedule()
+ print_("schedule", i)
+
+ stackless.tasklet(Loop)(1)
+ stackless.tasklet(Loop)(2)
+ stackless.run()
+
+ assert output == [('schedule', 1), ('schedule', 2),
+ ('schedule', 1), ('schedule', 2),
+ ('schedule', 1), ('schedule', 2),]
More information about the Pypy-commit
mailing list