[py-svn] r31540 - py/branch/distributed/py/test/rsession/testing

hpk at codespeak.net hpk at codespeak.net
Wed Aug 23 14:33:51 CEST 2006


Author: hpk
Date: Wed Aug 23 14:33:48 2006
New Revision: 31540

Removed:
   py/branch/distributed/py/test/rsession/testing/test_dispatcher.py
Modified:
   py/branch/distributed/py/test/rsession/testing/test_master.py
Log:
merge tests 


Deleted: /py/branch/distributed/py/test/rsession/testing/test_dispatcher.py
==============================================================================
--- /py/branch/distributed/py/test/rsession/testing/test_dispatcher.py	Wed Aug 23 14:33:48 2006
+++ (empty file)
@@ -1,65 +0,0 @@
-
-""" test dispatching stuff for making 1-n master -> slave connection
-and test it locally
-"""
-
-import time, threading
-
-import py
-from py.__.test.rsession.master import dispatch_loop, setup_slave, MasterNode
-from py.__.test.rsession.outcome import ReprOutcome
-from py.__.test.rsession.testing.test_slave import funcpass_spec, funcfail_spec
-
-def setup_module(mod):
-    mod.rootdir = py.path.local(py.__file__).dirpath().dirpath()
-
-class DummyMasterNode(object):
-    def __init__(self):
-        self.pending = []
-    
-    def send(self, data):
-        self.pending.append(data)
-
-def test_dispatch_loop():
-    masternodes = [DummyMasterNode(), DummyMasterNode()]
-    itemgenerator = iter(range(100))
-    shouldstop = lambda : False
-    def waiter():
-        for node in masternodes:
-            node.pending.pop()
-    dispatch_loop(masternodes, itemgenerator, shouldstop, waiter=waiter)
-
-def test_slave_setup():
-    gw = py.execnet.PopenGateway()
-    channel = setup_slave(gw, rootdir)
-    channel.send(funcpass_spec)
-    output = ReprOutcome(channel.receive())
-    assert output.passed
-    channel.send(None)
-    channel.waitclose(10)
-    gw.exit()
-
-def test_slave_running():
-    def simple_reporter(result):
-        item, res = result
-        if item.code.name == 'funcpass':
-            assert res.passed
-        else:
-            assert not res.passed
-    
-    def open_gw():
-        gw = py.execnet.PopenGateway()
-        channel = setup_slave(gw, rootdir)
-        mn = MasterNode(channel, simple_reporter)
-        return mn
-    
-    master_nodes = [open_gw(), open_gw(), open_gw()]
-    rootcol = py.test.collect.Directory(rootdir)
-    funcpass_item = rootcol.getitembynames(funcpass_spec)
-    funcfail_item = rootcol.getitembynames(funcfail_spec)
-    # XXX: Just make this test pass
-    funcpass_item.parent.parent.parent.parent.parent.parent = None
-    funcfail_item.parent.parent.parent.parent.parent.parent = None
-    itemgenerator = iter([funcfail_item] + [funcpass_item] * 5 + [funcfail_item] * 5)
-    shouldstop = lambda : False
-    dispatch_loop(master_nodes, itemgenerator, shouldstop)

Modified: py/branch/distributed/py/test/rsession/testing/test_master.py
==============================================================================
--- py/branch/distributed/py/test/rsession/testing/test_master.py	(original)
+++ py/branch/distributed/py/test/rsession/testing/test_master.py	Wed Aug 23 14:33:48 2006
@@ -1,8 +1,16 @@
-""" test master node """
+""" master code and test dispatching for 
+    making 1-n master -> slave connection
+    and test it locally. 
+"""
 
+import time, threading
 import py
-from py.__.test.rsession.master import MasterNode
-from py.__.test.rsession.outcome import Outcome 
+from py.__.test.rsession.master import dispatch_loop, setup_slave, MasterNode
+from py.__.test.rsession.outcome import ReprOutcome, Outcome 
+from py.__.test.rsession.testing.test_slave import funcpass_spec, funcfail_spec
+
+def setup_module(mod):
+    mod.rootdir = py.path.local(py.__file__).dirpath().dirpath()
 
 class DummyChannel(object):
     def __init__(self):
@@ -26,3 +34,55 @@
     assert len(reportlist) == 2
     assert reportlist[0][1].passed 
     assert not reportlist[1][1].passed 
+
+
+class DummyMasterNode(object):
+    def __init__(self):
+        self.pending = []
+    
+    def send(self, data):
+        self.pending.append(data)
+
+def test_dispatch_loop():
+    masternodes = [DummyMasterNode(), DummyMasterNode()]
+    itemgenerator = iter(range(100))
+    shouldstop = lambda : False
+    def waiter():
+        for node in masternodes:
+            node.pending.pop()
+    dispatch_loop(masternodes, itemgenerator, shouldstop, waiter=waiter)
+
+def test_slave_setup():
+    gw = py.execnet.PopenGateway()
+    channel = setup_slave(gw, rootdir)
+    channel.send(funcpass_spec)
+    output = ReprOutcome(channel.receive())
+    assert output.passed
+    channel.send(None)
+    channel.waitclose(10)
+    gw.exit()
+
+def test_slave_running():
+    def simple_reporter(result):
+        item, res = result
+        if item.code.name == 'funcpass':
+            assert res.passed
+        else:
+            assert not res.passed
+    
+    def open_gw():
+        gw = py.execnet.PopenGateway()
+        channel = setup_slave(gw, rootdir)
+        mn = MasterNode(channel, simple_reporter)
+        return mn
+    
+    master_nodes = [open_gw(), open_gw(), open_gw()]
+    rootcol = py.test.collect.Directory(rootdir)
+    funcpass_item = rootcol.getitembynames(funcpass_spec)
+    funcfail_item = rootcol.getitembynames(funcfail_spec)
+    # XXX: Just make this test pass
+    funcpass_item.parent.parent.parent.parent.parent.parent = None
+    funcfail_item.parent.parent.parent.parent.parent.parent = None
+    itemgenerator = iter([funcfail_item] + [funcpass_item] * 5 + [funcfail_item] * 5)
+    shouldstop = lambda : False
+    dispatch_loop(master_nodes, itemgenerator, shouldstop)



More information about the pytest-commit mailing list