[py-svn] r57058 - in py/branch/event/py/test2/rsession: . testing
hpk at codespeak.net
hpk at codespeak.net
Thu Aug 7 13:34:16 CEST 2008
Author: hpk
Date: Thu Aug 7 13:34:15 2008
New Revision: 57058
Modified:
py/branch/event/py/test2/rsession/masterslave.py
py/branch/event/py/test2/rsession/testing/test_masterslave.py
Log:
working more on robustness of functiona testing of nodes
Modified: py/branch/event/py/test2/rsession/masterslave.py
==============================================================================
--- py/branch/event/py/test2/rsession/masterslave.py (original)
+++ py/branch/event/py/test2/rsession/masterslave.py Thu Aug 7 13:34:15 2008
@@ -20,8 +20,9 @@
try:
if ev == self.ENDMARK:
err = self.channel._getremoteerror()
- ev = repevent.HostCrashed(self.host, self.pending, err)
- self.session.bus.notify(ev)
+ if not self._down or err:
+ ev = repevent.HostCrashed(self.host, self.pending, err)
+ self.session.bus.notify(ev)
return
if ev is None:
self.notify(repevent.HostDown(self.host, self.pending))
Modified: py/branch/event/py/test2/rsession/testing/test_masterslave.py
==============================================================================
--- py/branch/event/py/test2/rsession/testing/test_masterslave.py (original)
+++ py/branch/event/py/test2/rsession/testing/test_masterslave.py Thu Aug 7 13:34:15 2008
@@ -51,28 +51,25 @@
assert newtopdir == topdir
class TestMasterSlaveConnection(BasicRsessionTest):
- def makereportqueue(self, filterevent=repevent.ItemTestReport):
- self.queue = py.std.Queue.Queue()
- self.events = []
- self.filterevent = filterevent
- def append(event):
- self.events.append(event)
- if not filterevent or isinstance(event, filterevent):
- self.queue.put(event)
- self.session.bus.subscribe(append)
- return self.queue
-
- def getevent(self, TIMEOUT=2.0):
- try:
- return self.queue.get(timeout=TIMEOUT)
- except py.std.Queue.Empty:
- print "node channel", self.node.channel
- print "remoteerror", self.node.channel._getremoteerror()
- print "allevents", self.events
- py.test.fail("did not see %r events" % (self.filterevent,))
+ def getevent(self, eventtype=repevent.ItemTestReport, timeout=2.0):
+ events = []
+ while 1:
+ try:
+ ev = self.queue.get(timeout=timeout)
+ except py.std.Queue.Empty:
+ print "node channel", self.node.channel
+ print "remoteerror", self.node.channel._getremoteerror()
+ print "seen events", events
+ raise IOError("did not see %r events" % (eventtype))
+ else:
+ if isinstance(ev, eventtype):
+ return ev
+ events.append(ev)
def setup_method(self, method):
super(TestMasterSlaveConnection, self).setup_method(method)
+ self.queue = py.std.Queue.Queue()
+ self.session.bus.subscribe(self.queue.put)
self.host = Host("localhost")
self.host.initgateway()
self.node = MasterNode(self.host, self.session)
@@ -80,33 +77,33 @@
def teardown_method(self, method):
print "at teardown:", self.node.channel
+ self.session.bus.unsubscribe(self.queue.put)
def test_crash_invalid_item(self):
- self.makereportqueue(repevent.HostCrashed)
self.node.send(123) # invalid item
- ev = self.getevent()
+ ev = self.getevent(repevent.HostCrashed)
assert ev.host == self.host
assert ev.pending == [123]
assert str(ev.error).find("AttributeError") != -1
def test_crash_killed(self):
- self.makereportqueue(repevent.HostCrashed)
item = self.getfunc("kill15")
self.node.send(item) # invalid item
- ev = self.getevent()
+ ev = self.getevent(repevent.HostCrashed)
assert ev.host == self.host
assert ev.pending == [item]
assert not ev.error # currently this means crashed
def test_node_down(self):
- self.makereportqueue(repevent.HostDown)
self.node.send(None)
- event = self.getevent()
+ event = self.getevent(repevent.HostDown)
assert event.host == self.host
assert event.pending == []
+ self.node._callback(self.node.ENDMARK)
+ py.test.raises(IOError,
+ "self.getevent(repevent.HostCrashed, timeout=0.01)")
def test_send_one(self):
- self.makereportqueue()
item = self.getfunc("passed")
self.node.send(self.getfunc("passed"))
ev = self.getevent()
@@ -117,7 +114,6 @@
#assert event.item is not item
def test_send_multiple(self):
- self.makereportqueue()
for outcome in "passed failed skipped".split():
self.node.send(self.getfunc(outcome))
ev = self.getevent()
More information about the pytest-commit
mailing list