[py-svn] r62003 - in py/branch/pytestplugin/py/test: dsession/testing plugin

hpk at codespeak.net hpk at codespeak.net
Wed Feb 18 13:50:08 CET 2009


Author: hpk
Date: Wed Feb 18 13:50:06 2009
New Revision: 62003

Modified:
   py/branch/pytestplugin/py/test/dsession/testing/test_dsession.py
   py/branch/pytestplugin/py/test/plugin/pytest_pytester.py
   py/branch/pytestplugin/py/test/plugin/pytest_unittest.py
Log:
unify and test EventRecording 


Modified: py/branch/pytestplugin/py/test/dsession/testing/test_dsession.py
==============================================================================
--- py/branch/pytestplugin/py/test/dsession/testing/test_dsession.py	(original)
+++ py/branch/pytestplugin/py/test/dsession/testing/test_dsession.py	Wed Feb 18 13:50:06 2009
@@ -24,53 +24,6 @@
     while queue.qsize():
         print queue.get()
 
-class EventRecorder(object):
-    def __init__(self, bus):
-        self.events = []
-        self.bus = bus
-        bus.subscribe(self.record)
-
-    def record(self, (name, event)):
-        self.events.append((name, event))
-
-    def getfirstnamed(self, name):
-        for evname, event in self.events:
-            if evname == name:
-                return event
-
-    def getfailures(self):
-        l = []
-        for evname, ev in self.events:
-            if evname in ('itemtestreport', 'collectionreport'):
-                if ev.failed:
-                    l.append(ev)
-        return l
-
-    def clear(self):
-        self.events[:] = []
-
-    def unsubscribe(self):
-        self.bus.unsubscribe(self.record)
- 
-def test_eventrecorder():
-    bus = event.EventBus()
-    recorder = EventRecorder(bus)
-    bus.notify(anonymous=event.NOP())
-    assert recorder.events 
-    assert not recorder.getfailures()
-    rep = event.ItemTestReport(None, None)
-    rep.failed = True
-    bus.notify(itemtestreport=rep)
-    failures = recorder.getfailures()
-    assert failures == [rep]
-    recorder.clear() 
-    assert not recorder.events
-    assert not recorder.getfailures()
-    recorder.unsubscribe()
-    bus.notify(itemtestreport=rep)
-    assert not recorder.events 
-    assert not recorder.getfailures()
-
 class TestDSession(InlineCollection):
     def test_fixoptions(self):
         config = self.parseconfig("--exec=xxx")
@@ -199,7 +152,7 @@
         dumpqueue(session.queue)
         assert loopstate.exitstatus == outcome.EXIT_NOHOSTS
 
-    def test_hostdown_causes_reschedule_pending(self):
+    def test_hostdown_causes_reschedule_pending(self, EventRecorder):
         modcol = self.getmodulecol("""
             def test_crash(): 
                 assert 0
@@ -246,7 +199,7 @@
         session.loop_once(loopstate)
         assert len(session.host2pending) == 1
 
-    def test_event_propagation(self):
+    def test_event_propagation(self, EventRecorder):
         item = self.getitem("def test_func(): pass")
         session = DSession(item._config)
       
@@ -314,7 +267,7 @@
         assert loopstate.testsfailed
         assert loopstate.shuttingdown
 
-    def test_shuttingdown_filters_events(self):
+    def test_shuttingdown_filters_events(self, EventRecorder):
         item = self.getitem("def test_func(): pass")
         session = DSession(item._config)
         host = Host("localhost")
@@ -330,7 +283,7 @@
         session.loop_once(loopstate)
         assert evrec.getfirstnamed('hostdown') == ev
 
-    def test_filteritems(self):
+    def test_filteritems(self, EventRecorder):
         modcol = self.getmodulecol("""
             def test_fail(): 
                 assert 0

Modified: py/branch/pytestplugin/py/test/plugin/pytest_pytester.py
==============================================================================
--- py/branch/pytestplugin/py/test/plugin/pytest_pytester.py	(original)
+++ py/branch/pytestplugin/py/test/plugin/pytest_pytester.py	Wed Feb 18 13:50:06 2009
@@ -49,7 +49,7 @@
         config = self.parseconfig(*args)
         config.pluginmanager.configure(config)
         session = config.initsession()
-        sorter = EventSorter(config, session)
+        sorter = EventRecorder(config.bus)
         session.main()
         config.pluginmanager.unconfigure(config)
         return sorter
@@ -138,26 +138,52 @@
         return self.config.getfsnode(path)
 
 
-class EventSorter(object):
-    def __init__(self, config, session=None):
-        self.config = config
-        self.session = session
-        self.cls2events = d = {}
-        def app((name, event)):
-            print "[event]", event
-            for cls in py.std.inspect.getmro(event.__class__):
-                if cls is not object: 
-                    d.setdefault(cls, []).append(event) 
-        session.bus.subscribe(app)
+class EventRecorder(object):
+    def __init__(self, bus, debug=True):
+        self.events = []
+        self.bus = bus
+        bus.subscribe(self.record)
+        self.debug = debug
+
+    def record(self, (name, event)):
+        if self.debug:
+            print "[event] %s: %s" %(name, event)
+        self.events.append((name, event))
 
     def get(self, cls):
-        return self.cls2events.get(cls, [])
+        l = []
+        for name, value in self.events:
+            if isinstance(value, cls):
+                l.append(value)
+        return l 
+
+    def getnamed(self, *names):
+        l = []
+        for evname, event in self.events:
+            if evname in names:
+                l.append(event)
+        return l 
+
+    def getfirstnamed(self, name):
+        for evname, event in self.events:
+            if evname == name:
+                return event
+
+    def getfailures(self, names='itemtestreport collectionreport'):
+        l = []
+        for ev in self.getnamed(*names.split()):
+            if ev.failed:
+                l.append(ev)
+        return l
+
+    def getfailedcollections(self):
+        return self.getfailures('collectionreport')
 
     def listoutcomes(self):
         passed = []
         skipped = []
         failed = []
-        for ev in self.get(event.ItemTestReport):
+        for ev in self.getnamed('itemtestreport'): # , 'collectionreport'):
             if ev.passed: 
                 passed.append(ev) 
             elif ev.skipped: 
@@ -175,13 +201,6 @@
         assert skipped == len(realskipped)
         assert failed == len(realfailed)
 
-    def getfailedcollections(self):
-        l = []
-        for ev in self.get(event.CollectionReport):
-            if ev.failed:
-               l.append(ev) 
-        return l
-
     def getreport(self, inamepart): 
         """ return a testreport whose dotted import path matches """
         __tracebackhide__ = True
@@ -197,6 +216,55 @@
                              inamepart, l))
         return l[0]
 
+    def clear(self):
+        self.events[:] = []
+
+    def unsubscribe(self):
+        self.bus.unsubscribe(self.record)
+
+def test_eventrecorder():
+    bus = event.EventBus()
+    recorder = EventRecorder(bus)
+    bus.notify(anonymous=event.NOP())
+    assert recorder.events 
+    assert not recorder.getfailures()
+    rep = event.ItemTestReport(None, None)
+    rep.passed = False
+    rep.failed = True
+    bus.notify(itemtestreport=rep)
+    failures = recorder.getfailures()
+    assert failures == [rep]
+    failures = recorder.get(event.ItemTestReport)
+    assert failures == [rep]
+    failures = recorder.getnamed("itemtestreport")
+    assert failures == [rep]
+
+    rep = event.ItemTestReport(None, None)
+    rep.passed = False
+    rep.skipped = True
+    bus.notify(itemtestreport=rep)
+
+    rep = event.CollectionReport(None, None)
+    rep.passed = False
+    rep.failed = True
+    bus.notify(itemtestreport=rep)
+
+    passed, skipped, failed = recorder.listoutcomes()
+    assert not passed and skipped and failed
+
+    numpassed, numskipped, numfailed = recorder.countoutcomes()
+    assert numpassed == 0
+    assert numskipped == 1
+    assert numfailed == 2
+
+    recorder.clear() 
+    assert not recorder.events
+    assert not recorder.getfailures()
+    recorder.unsubscribe()
+    bus.notify(itemtestreport=rep)
+    assert not recorder.events 
+    assert not recorder.getfailures()
+
 class LineComp:
     def __init__(self):
         self.stringio = py.std.StringIO.StringIO()
@@ -266,3 +334,5 @@
         elif argname == "fstester":
             fstester = FSTester(pyfuncitem)
             return fstester, None
+        elif argname == "EventRecorder":
+            return EventRecorder, None  # or do some finalization

Modified: py/branch/pytestplugin/py/test/plugin/pytest_unittest.py
==============================================================================
--- py/branch/pytestplugin/py/test/plugin/pytest_unittest.py	(original)
+++ py/branch/pytestplugin/py/test/plugin/pytest_unittest.py	Wed Feb 18 13:50:06 2009
@@ -117,7 +117,8 @@
     """)
     sorter = fstester.inline_run(testpath)
     passed, skipped, failed = sorter.countoutcomes()
-    assert passed + skipped + failed == 2
+    print "COUNTS", passed, skipped, failed
     assert failed == 0, failed
     assert passed == 2
+    assert passed + skipped + failed == 2
 



More information about the pytest-commit mailing list