[py-svn] r31540 - py/branch/distributed/py/test/rsession/testing
hpk at codespeak.net
hpk at codespeak.net
Wed Aug 23 14:33:51 CEST 2006
Author: hpk
Date: Wed Aug 23 14:33:48 2006
New Revision: 31540
Removed:
py/branch/distributed/py/test/rsession/testing/test_dispatcher.py
Modified:
py/branch/distributed/py/test/rsession/testing/test_master.py
Log:
merge tests
Deleted: /py/branch/distributed/py/test/rsession/testing/test_dispatcher.py
==============================================================================
--- /py/branch/distributed/py/test/rsession/testing/test_dispatcher.py Wed Aug 23 14:33:48 2006
+++ (empty file)
@@ -1,65 +0,0 @@
-
-""" test dispatching stuff for making 1-n master -> slave connection
-and test it locally
-"""
-
-import time, threading
-
-import py
-from py.__.test.rsession.master import dispatch_loop, setup_slave, MasterNode
-from py.__.test.rsession.outcome import ReprOutcome
-from py.__.test.rsession.testing.test_slave import funcpass_spec, funcfail_spec
-
-def setup_module(mod):
- mod.rootdir = py.path.local(py.__file__).dirpath().dirpath()
-
-class DummyMasterNode(object):
- def __init__(self):
- self.pending = []
-
- def send(self, data):
- self.pending.append(data)
-
-def test_dispatch_loop():
- masternodes = [DummyMasterNode(), DummyMasterNode()]
- itemgenerator = iter(range(100))
- shouldstop = lambda : False
- def waiter():
- for node in masternodes:
- node.pending.pop()
- dispatch_loop(masternodes, itemgenerator, shouldstop, waiter=waiter)
-
-def test_slave_setup():
- gw = py.execnet.PopenGateway()
- channel = setup_slave(gw, rootdir)
- channel.send(funcpass_spec)
- output = ReprOutcome(channel.receive())
- assert output.passed
- channel.send(None)
- channel.waitclose(10)
- gw.exit()
-
-def test_slave_running():
- def simple_reporter(result):
- item, res = result
- if item.code.name == 'funcpass':
- assert res.passed
- else:
- assert not res.passed
-
- def open_gw():
- gw = py.execnet.PopenGateway()
- channel = setup_slave(gw, rootdir)
- mn = MasterNode(channel, simple_reporter)
- return mn
-
- master_nodes = [open_gw(), open_gw(), open_gw()]
- rootcol = py.test.collect.Directory(rootdir)
- funcpass_item = rootcol.getitembynames(funcpass_spec)
- funcfail_item = rootcol.getitembynames(funcfail_spec)
- # XXX: Just make this test pass
- funcpass_item.parent.parent.parent.parent.parent.parent = None
- funcfail_item.parent.parent.parent.parent.parent.parent = None
- itemgenerator = iter([funcfail_item] + [funcpass_item] * 5 + [funcfail_item] * 5)
- shouldstop = lambda : False
- dispatch_loop(master_nodes, itemgenerator, shouldstop)
Modified: py/branch/distributed/py/test/rsession/testing/test_master.py
==============================================================================
--- py/branch/distributed/py/test/rsession/testing/test_master.py (original)
+++ py/branch/distributed/py/test/rsession/testing/test_master.py Wed Aug 23 14:33:48 2006
@@ -1,8 +1,16 @@
-""" test master node """
+""" master code and test dispatching for
+ making 1-n master -> slave connection
+ and test it locally.
+"""
+import time, threading
import py
-from py.__.test.rsession.master import MasterNode
-from py.__.test.rsession.outcome import Outcome
+from py.__.test.rsession.master import dispatch_loop, setup_slave, MasterNode
+from py.__.test.rsession.outcome import ReprOutcome, Outcome
+from py.__.test.rsession.testing.test_slave import funcpass_spec, funcfail_spec
+
+def setup_module(mod):
+ mod.rootdir = py.path.local(py.__file__).dirpath().dirpath()
class DummyChannel(object):
def __init__(self):
@@ -26,3 +34,55 @@
assert len(reportlist) == 2
assert reportlist[0][1].passed
assert not reportlist[1][1].passed
+
+
+class DummyMasterNode(object):
+ def __init__(self):
+ self.pending = []
+
+ def send(self, data):
+ self.pending.append(data)
+
+def test_dispatch_loop():
+ masternodes = [DummyMasterNode(), DummyMasterNode()]
+ itemgenerator = iter(range(100))
+ shouldstop = lambda : False
+ def waiter():
+ for node in masternodes:
+ node.pending.pop()
+ dispatch_loop(masternodes, itemgenerator, shouldstop, waiter=waiter)
+
+def test_slave_setup():
+ gw = py.execnet.PopenGateway()
+ channel = setup_slave(gw, rootdir)
+ channel.send(funcpass_spec)
+ output = ReprOutcome(channel.receive())
+ assert output.passed
+ channel.send(None)
+ channel.waitclose(10)
+ gw.exit()
+
+def test_slave_running():
+ def simple_reporter(result):
+ item, res = result
+ if item.code.name == 'funcpass':
+ assert res.passed
+ else:
+ assert not res.passed
+
+ def open_gw():
+ gw = py.execnet.PopenGateway()
+ channel = setup_slave(gw, rootdir)
+ mn = MasterNode(channel, simple_reporter)
+ return mn
+
+ master_nodes = [open_gw(), open_gw(), open_gw()]
+ rootcol = py.test.collect.Directory(rootdir)
+ funcpass_item = rootcol.getitembynames(funcpass_spec)
+ funcfail_item = rootcol.getitembynames(funcfail_spec)
+ # XXX: Just make this test pass
+ funcpass_item.parent.parent.parent.parent.parent.parent = None
+ funcfail_item.parent.parent.parent.parent.parent.parent = None
+ itemgenerator = iter([funcfail_item] + [funcpass_item] * 5 + [funcfail_item] * 5)
+ shouldstop = lambda : False
+ dispatch_loop(master_nodes, itemgenerator, shouldstop)
More information about the pytest-commit
mailing list