[py-svn] r61991 - in py/branch/pytestplugin/py/test: . plugin testing

hpk at codespeak.net hpk at codespeak.net
Wed Feb 18 09:07:23 CET 2009


Author: hpk
Date: Wed Feb 18 09:07:21 2009
New Revision: 61991

Removed:
   py/branch/pytestplugin/py/test/pmanage.py
   py/branch/pytestplugin/py/test/testing/test_pmanage.py
Modified:
   py/branch/pytestplugin/py/test/plugin/pytest_pocoo.py
   py/branch/pytestplugin/py/test/plugin/pytest_terminal.py
   py/branch/pytestplugin/py/test/plugin/pytest_xfail.py
   py/branch/pytestplugin/py/test/session.py
Log:
* getting rid of a base class in the Terminal Reporter 
  and moving yet more towards keyword based events. 
* removing old plugin manage and tests



Modified: py/branch/pytestplugin/py/test/plugin/pytest_pocoo.py
==============================================================================
--- py/branch/pytestplugin/py/test/plugin/pytest_pocoo.py	(original)
+++ py/branch/pytestplugin/py/test/plugin/pytest_pocoo.py	Wed Feb 18 09:07:21 2009
@@ -21,11 +21,11 @@
     def pytest_terminal_summary(self, terminalreporter):
         if terminalreporter.config.option.pocoo_sendfailures:
             tr = terminalreporter
-            if "failed" in tr.status2event and tr.config.option.tbstyle != "no":
+            if tr.stats.failed and tr.config.option.tbstyle != "no":
                 terminalreporter.write_sep("=", "Sending failures to %s" %(url.base,))
                 terminalreporter.write_line("xmlrpcurl: %s" %(url.xmlrpc,))
                 serverproxy = self.getproxy()
-                for ev in terminalreporter.status2event['failed']:
+                for ev in terminalreporter.stats.failed:
                     tw = py.io.TerminalWriter(stringio=True)
                     ev.toterminal(tw)
                     s = tw.stringio.getvalue()

Modified: py/branch/pytestplugin/py/test/plugin/pytest_terminal.py
==============================================================================
--- py/branch/pytestplugin/py/test/plugin/pytest_terminal.py	(original)
+++ py/branch/pytestplugin/py/test/plugin/pytest_terminal.py	Wed Feb 18 09:07:21 2009
@@ -22,20 +22,18 @@
                 assert hasattr(self.reporter._tw, name), name
                 setattr(self.reporter._tw, name, getattr(config, attr))
 
-    def pytest_event(self, (name, obj)):
+    def pytest_event(self, (name, event)):
         #print "processing event", event
         #print "looking for", name
-        repmethod = getattr(self.reporter, "pyevent_%s" % name, None)
+        evname = event.__class__.__name__ 
+        repmethod = getattr(self.reporter, "rep_%s" % evname, None)
         if repmethod is not None:
-            #print "found", repmethod
-            repmethod(obj)
-        else:
-            self.reporter.processevent(obj)
+            #print "calling", repmethod, event
+            repmethod(event)
 
 class OutcomeStats:
-    def __init__(self):
+    def __init__(self, bus):
         self._reset()
-    def _subscribeat(self, bus):
         bus.subscribe_methods(self)
 
     def _reset(self):
@@ -47,14 +45,14 @@
     def pyevent_itemtestreport(self, ev):
         for name in 'skipped failed passed'.split():
             if getattr(ev, name):
-                #print "appending", ev
-                getattr(self, name).append(ev)
+                l = getattr(self, name)
+                l.append(ev)
 
     def pyevent_collectionreport(self, ev):
-        for name in 'skipped failed'.split():
-            if getattr(ev, name):
-                getattr(self, name).append(ev)
-                return 
+        if ev.skipped:
+            self.skipped.append(ev)
+        if ev.failed:
+            self.failed.append(ev)
 
     def pyevent_deselected(self, ev):
         self.deselected.extend(ev.items)
@@ -62,20 +60,6 @@
     def pyevent_testrunstart(self, ev):
         self._reset()
 
-    def get_folded_skips(self):
-        d = {}
-        #print "self.status2event skipped",  self.status2event.get("skipped", [])
-        #print "self.stats.skipped", self.stats.skipped
-        assert len(self.status2event.get('skipped', ())) == len(self.stats.skipped)
-        for event in self.status2event.get("skipped", []):
-            entry = event.longrepr.reprcrash 
-            key = entry.path, entry.lineno, entry.message
-            d.setdefault(key, []).append(event)
-        l = []
-        for key, events in d.iteritems(): 
-            l.append((len(events),) + key)
-        return l 
-
 class TestOutcomeStats:
     def test_outcomestats(self):
         names = 'passed skipped deselected failed'.split()
@@ -84,18 +68,13 @@
         #    assert getattr(stats, name) == []
         #    bus = py.test._EventBus()
         #    bus.subscribe_methods(stats)
-        #    class X: pass
-        #    for name2 in names:
-        #        if name2 != name:
-        #            setattr(X, name2, False)
-        #        else:
-        #            setattr(X, name2, True)
-        #    bus.notify(itemtestreport=X)
-        #    assert getattr(stats, name) == [X]
+        #    rep = event.ItemTestReport(None)
+        #    setattr(rep, name, True)
+        #    bus.notify(itemtestreport=rep)
+        #    assert getattr(stats, name) == [rep]
 
-        stats = OutcomeStats()
         bus = py.test._EventBus()
-        stats._subscribeat(bus)
+        stats = OutcomeStats(bus)
         class P:
             passed = True
             failed = skipped = False
@@ -104,63 +83,25 @@
         assert stats.passed == [P]
         bus.notify(itemtestreport=P)
         assert stats.passed == [P, P]
+
         class F:
             skipped = passed = False 
             failed = True
         bus.notify(itemtestreport=F)
         assert stats.failed == [F]
+        bus.notify(collectionreport=F)
+        assert stats.failed == [F, F]
         class S:
             skipped = True
             passed = failed = False 
         bus.notify(itemtestreport=S)
         assert stats.skipped == [S]
            
-        bus.notify(collectionreport=F)
-        assert stats.failed == [F, F]
         class D:
             items = [42]
         bus.notify(deselected=D)
         assert stats.deselected == [42]
 
-    
-
-class BaseReporter(object):
-    def __init__(self):
-        self.stats = OutcomeStats()
-        self._reset()
-
-    def _reset(self):
-        self.status2event = {}
-
-    def processevent(self, ev):
-        evname = ev.__class__.__name__ 
-        repmethod = getattr(self, "rep_%s" % evname, None)
-        if repmethod is None:
-            self.rep(ev)
-        else:
-            repmethod(ev)
-
-    def rep(self, ev):
-        pass
-
-    def rep_ItemTestReport(self, ev):
-        for name in 'skipped failed passed'.split():
-            if getattr(ev, name):
-                self.status2event.setdefault(name, []).append(ev)
-                return 
-
-    def rep_CollectionReport(self, ev):
-        for name in 'skipped failed'.split():
-            if getattr(ev, name):
-                self.status2event.setdefault(name, []).append(ev)
-                return 
-
-    def rep_TestrunStart(self, ev):
-        self._reset()
-
-    def rep_Deselected(self, ev):
-        self.status2event.setdefault('deselected', []).extend(ev.items)
-
 def folded_skips(skipped):
     d = {}
     for event in skipped:
@@ -173,19 +114,15 @@
     return l 
 
 
-class TerminalReporter(BaseReporter):
+class TerminalReporter:
     def __init__(self, config, file=None):
-        super(TerminalReporter, self).__init__()
-        self.config = config
-        self.stats._subscribeat(config.bus)
+        self.config = config 
+        self.stats = OutcomeStats(config.bus)
         self.curdir = py.path.local()
         if file is None:
             file = py.std.sys.stdout
         self._tw = py.io.TerminalWriter(file)
-
-    def _reset(self):
         self.currentfspath = None 
-        super(TerminalReporter, self)._reset()
 
     def write_fspath_result(self, fspath, res):
         if fspath != self.currentfspath:
@@ -284,10 +221,11 @@
             self.write_sep("!", "RESCHEDULING %s " %(ev.items,))
     
     def rep_ItemTestReport(self, ev):
-        #super(TerminalReporter, self).rep_ItemTestReport(ev)
         fspath = ev.colitem.fspath 
         cat, letter, word = self.getcategoryletterword(ev)
-        self.status2event.setdefault(cat, []).append(ev)
+        l = self.stats.__dict__.setdefault(cat, [])
+        if ev not in l:
+            l.append(ev)
         if not self.config.option.verbose:
             self.write_fspath_result(fspath, letter)
         else:
@@ -296,7 +234,6 @@
             self.write_ensure_prefix(line, word)
 
     def rep_CollectionReport(self, ev):
-        super(TerminalReporter, self).rep_CollectionReport(ev)
         if not ev.passed:
             if ev.failed:
                 msg = ev.longrepr.reprcrash.message 
@@ -305,7 +242,6 @@
                 self.write_fspath_result(ev.colitem.fspath, "S")
 
     def rep_TestrunStart(self, ev):
-        super(TerminalReporter, self).rep_TestrunStart(ev)
         self.write_sep("=", "test session starts", bold=True)
         self._sessionstarttime = py.std.time.time()
         rev = py.__pkg__.getrev()
@@ -359,9 +295,9 @@
     #
 
     def summary_failures(self):
-        if "failed" in self.status2event and self.config.option.tbstyle != "no":
+        if self.stats.failed and self.config.option.tbstyle != "no":
             self.write_sep("=", "FAILURES")
-            for ev in self.status2event['failed']:
+            for ev in self.stats.failed:
                 self.write_sep("_")
                 ev.toterminal(self._tw)
 
@@ -371,21 +307,21 @@
         keys = "failed passed skipped deselected".split()
         parts = []
         for key in keys:
-            if key in self.status2event:
-                parts.append("%d %s" %(len(self.status2event[key]), key))
+            val = getattr(self.stats, key)
+            if val:
+                parts.append("%d %s" %(len(val), key))
         line = ", ".join(parts)
         # XXX coloring
         self.write_sep("=", "%s in %.2f seconds" %(line, session_duration))
 
     def summary_deselected(self):
-        l = self.status2event.get("deselected", None)
-        if l:
+        if self.stats.deselected:
             self.write_sep("=", "%d tests deselected by %r" %(
-                len(l), self.config.option.keyword), bold=True)
+                len(self.stats.deselected), self.config.option.keyword), bold=True)
 
     def summary_skips(self):
-        if "failed" not in self.status2event or self.config.option.showskipsummary:
-            fskips = folded_skips(self.status2event.get('skipped', []))
+        if not self.stats.failed or self.config.option.showskipsummary:
+            fskips = folded_skips(self.stats.skipped)
             if fskips:
                 self.write_sep("_", "skipped test summary")
                 for num, fspath, lineno, reason in fskips:
@@ -404,12 +340,11 @@
                         py.std.sys.executable, 
                         repr_pythonversion()))
 
-class CollectonlyReporter(BaseReporter):
+class CollectonlyReporter:
     INDENT = "  "
 
     def __init__(self, config, out=None):
-        super(CollectonlyReporter, self).__init__()
-        self.stats._subscribeat(config.bus)
+        self.config = config 
         if out is None:
             out = py.std.sys.stdout
         self.out = py.io.TerminalWriter(out)
@@ -427,7 +362,6 @@
         self.outindent(event.item)
 
     def rep_CollectionReport(self, ev):
-        super(CollectonlyReporter, self).rep_CollectionReport(ev)
         if not ev.passed:
             self.outindent("!!! %s !!!" % ev.longrepr.reprcrash.message)
         self.indent = self.indent[:-len(self.INDENT)]
@@ -459,7 +393,7 @@
         item = tsession.getitem("def test_func(): pass")
         rep = TerminalReporter(item._config, linecomp.stringio)
         host = Host("localhost")
-        rep.processevent(makehostup(host))
+        rep.rep_HostUp(makehostup(host))
         linecomp.assert_contains_lines([
             "*%s %s %s - Python %s" %(host.hostid, sys.platform, 
             sys.executable, repr_pythonversion(sys.version_info))
@@ -476,15 +410,16 @@
                 assert 0
         """)
         rep = TerminalReporter(modcol._config, file=linecomp.stringio)
-        rep.processevent(event.TestrunStart())
+        registerdispatcher(rep)
+        rep.config.bus.notify(testrunstart=event.TestrunStart())
+        
         for item in tsession.genitems([modcol]):
             ev = basic_run_report(item) 
-            rep.processevent(ev)
+            rep.config.bus.notify(itemtestreport=ev)
         linecomp.assert_contains_lines([
                 "*test_pass_skip_fail.py .sF"
         ])
-
-        rep.processevent(event.TestrunFinish())
+        rep.config.bus.notify(testrunfinish=event.TestrunFinish())
         linecomp.assert_contains_lines([
             "    def test_func():",
             ">       assert 0",
@@ -502,21 +437,21 @@
                 assert 0
         """, configargs=("-v",))
         rep = TerminalReporter(modcol._config, file=linecomp.stringio)
-        rep.processevent(event.TestrunStart())
+        registerdispatcher(rep)
+        rep.config.bus.notify(testrunstart=event.TestrunStart())
         items = modcol.collect()
         for item in items:
-            rep.processevent(event.ItemStart(item))
+            rep.config.bus.notify(itemstart=event.ItemStart(item))
             s = linecomp.stringio.getvalue().strip()
             assert s.endswith(item.name)
-            ev = basic_run_report(item) 
-            rep.processevent(ev)
+            rep.config.bus.notify(itemtestreport=basic_run_report(item))
 
         linecomp.assert_contains_lines([
             "*test_pass_skip_fail_verbose.py:2: *test_ok*PASS",
             "*test_pass_skip_fail_verbose.py:4: *test_skip*SKIP",
             "*test_pass_skip_fail_verbose.py:6: *test_func*FAIL",
         ])
-        rep.processevent(event.TestrunFinish())
+        rep.config.bus.notify(testrunfinish=event.TestrunFinish())
         linecomp.assert_contains_lines([
             "    def test_func():",
             ">       assert 0",
@@ -526,14 +461,14 @@
     def test_collect_fail(self, tsession, linecomp):
         modcol = tsession.getmodulecol("import xyz")
         rep = TerminalReporter(modcol._config, file=linecomp.stringio)
-        tsession.session.bus.subscribe(rep.processevent) 
-        rep.processevent(event.TestrunStart())
+        registerdispatcher(rep)
+        rep.config.bus.notify(testrunstart=event.TestrunStart())
         l = list(tsession.genitems([modcol]))
         assert len(l) == 0
         linecomp.assert_contains_lines([
             "*test_collect_fail.py F*"
         ])
-        rep.processevent(event.TestrunFinish())
+        rep.config.bus.notify(testrunfinish=event.TestrunFinish())
         linecomp.assert_contains_lines([
             ">   import xyz",
             "E   ImportError: No module named xyz"
@@ -543,7 +478,7 @@
         modcol = tsession.getmodulecol("def test_one(): pass")
         rep = TerminalReporter(modcol._config, file=linecomp.stringio)
         excinfo = py.test.raises(ValueError, "raise ValueError('hello')")
-        rep.processevent(event.InternalException(excinfo))
+        rep.rep_InternalException(event.InternalException(excinfo))
         linecomp.assert_contains_lines([
             "InternalException: >*raise ValueError*"
         ])
@@ -555,11 +490,11 @@
         """, configargs=("-v",))
         host1 = Host("localhost")
         rep = TerminalReporter(modcol._config, file=linecomp.stringio)
-        rep.processevent(event.HostGatewayReady(host1, None))
+        rep.rep_HostGatewayReady(event.HostGatewayReady(host1, None))
         linecomp.assert_contains_lines([
             "*HostGatewayReady*"
         ])
-        rep.processevent(event.HostDown(host1, "myerror"))
+        rep.rep_HostDown(event.HostDown(host1, "myerror"))
         linecomp.assert_contains_lines([
             "*HostDown*myerror*", 
         ])
@@ -584,7 +519,7 @@
         """)
         rep = TerminalReporter(modcol._config, file=linecomp.stringio)
         reports = [basic_run_report(x) for x in modcol.collect()]
-        rep.processevent(event.LooponfailingInfo(reports, [modcol._config.topdir]))
+        rep.rep_LooponfailingInfo(event.LooponfailingInfo(reports, [modcol._config.topdir]))
         linecomp.assert_contains_lines([
             "*test_looponfailingreport.py:2: assert 0",
             "*test_looponfailingreport.py:4: ValueError*",
@@ -593,7 +528,8 @@
         ])
 
     def test_tb_option(self, tsession, linecomp):
-        for tbopt in ["no", "short", "long"]:
+        # XXX usage of tsession and event bus
+        for tbopt in ["long", "short", "no"]:
             print 'testing --tb=%s...' % tbopt
             modcol = tsession.getmodulecol("""
                 import py
@@ -604,13 +540,14 @@
                     g()  # --calling--
             """, configargs=("--tb=%s" % tbopt,))
             rep = TerminalReporter(modcol._config, file=linecomp.stringio)
-            rep.processevent(event.TestrunStart())
+            registerdispatcher(rep)
+            rep.config.bus.notify(testrunstart=event.TestrunStart())
             for item in tsession.genitems([modcol]):
-                ev = basic_run_report(item) 
-                rep.processevent(ev)
-            rep.processevent(event.TestrunFinish())
+                rep.config.bus.notify(itemtestreport=basic_run_report(item))
+            rep.config.bus.notify(testrunfinish=event.TestrunFinish())
             s = linecomp.stringio.getvalue()
             if tbopt == "long":
+                print s
                 assert 'print 6*7' in s
             else:
                 assert 'print 6*7' not in s
@@ -629,9 +566,10 @@
                 pass
         """)
         rep = TerminalReporter(modcol._config, file=linecomp.stringio)
+        registerdispatcher(rep)
         l = list(tsession.genitems([modcol]))
         assert len(l) == 1
-        rep.processevent(event.ItemStart(l[0]))
+        rep.config.bus.notify(itemstart=event.ItemStart(l[0]))
         linecomp.assert_contains_lines([
             "*test_show_path_before_running_test.py*"
         ])
@@ -646,8 +584,8 @@
                 raise KeyboardInterrupt   # simulating the user
         """, configargs=("--showskipsummary",) + ("-v",)*verbose)
         rep = TerminalReporter(modcol._config, file=linecomp.stringio)
+        registerdispatcher(rep)
         bus = modcol._config.bus
-        bus.subscribe(rep.processevent) # XXX
         bus.notify(testrunstart=event.TestrunStart())
         try:
             for item in tsession.genitems([modcol]):
@@ -708,17 +646,19 @@
         """)
         #stringio = py.std.cStringIO.StringIO()
         rep = CollectonlyReporter(modcol._config, out=linecomp.stringio)
+        registerdispatcher(rep)
         indent = rep.indent
-        rep.processevent(event.CollectionStart(modcol))
+        rep.config.bus.notify(collectionstart=event.CollectionStart(modcol))
         linecomp.assert_contains_lines([
            "<Module 'test_collectonly_basic.py'>"
         ])
         item = modcol.join("test_func")
-        rep.processevent(event.ItemStart(item))
+        rep.config.bus.notify(collectionstart=event.ItemStart(item))
         linecomp.assert_contains_lines([
            "  <Function 'test_func'>", 
         ])
-        rep.processevent(event.CollectionReport(modcol, [], excinfo=None))
+        rep.config.bus.notify(
+            collectionreport=event.CollectionReport(modcol, [], excinfo=None))
         assert rep.indent == indent 
 
     def test_collectonly_skipped_module(self, tsession, linecomp):
@@ -727,7 +667,7 @@
             py.test.skip("nomod")
         """)
         rep = CollectonlyReporter(modcol._config, out=linecomp.stringio)
-        tsession.session.bus.subscribe(rep.processevent) 
+        registerdispatcher(rep)
         cols = list(tsession.genitems([modcol]))
         assert len(cols) == 0
         linecomp.assert_contains_lines("""
@@ -740,7 +680,7 @@
             raise ValueError(0)
         """)
         rep = CollectonlyReporter(modcol._config, out=linecomp.stringio)
-        tsession.session.bus.subscribe(rep.processevent) 
+        registerdispatcher(rep)
         cols = list(tsession.genitems([modcol]))
         assert len(cols) == 0
         linecomp.assert_contains_lines("""
@@ -748,46 +688,6 @@
               !!! ValueError: 0 !!!
         """)
 
-class TestBaseReporter:
-    def test_dispatch_to_matching_method(self):
-        l = []
-        class MyReporter(BaseReporter):
-            def rep_TestrunStart(self, ev):
-                l.append(ev)
-        rep = MyReporter()
-        ev = event.TestrunStart()
-        rep.processevent(ev)
-        assert len(l) == 1
-        assert l[0] is ev
-
-    def test_dispatch_to_default(self):
-        l = []
-        class MyReporter(BaseReporter):
-            def rep(self, ev):
-                l.append(ev)
-        rep = MyReporter()
-        ev = event.NOP()
-        rep.processevent(ev)
-        assert len(l) == 1
-        assert l[0] is ev
-
-    def test_TestItemReport_one(self):
-        for outcome in 'passed skipped failed'.split():
-            rep = BaseReporter()
-            ev = event.ItemTestReport(None)
-            setattr(ev, outcome, True)
-            rep.processevent(ev)
-            assert rep.status2event[outcome]
-
-    def test_CollectionReport_events_are_counted(self):
-        for outcome in 'skipped failed'.split():
-            rep = BaseReporter()
-            ev = event.CollectionReport(None, None)
-            setattr(ev, outcome, True)
-            rep.processevent(ev)
-            assert rep.status2event[outcome]
-
-
 def test_repr_python_version():
     py.magic.patch(sys, 'version_info', (2, 5, 1, 'final', 0))
     try:
@@ -796,3 +696,11 @@
         assert repr_pythonversion() == str(x) 
     finally: 
         py.magic.revert(sys, 'version_info') 
+
+def registerdispatcher(rep):
+    def dispatch((name, event)):
+        meth = getattr(rep, "rep_" + event.__class__.__name__, None)
+        if meth is not None:
+            meth(event)
+    rep.config.bus._bus.subscribe(dispatch)
+

Modified: py/branch/pytestplugin/py/test/plugin/pytest_xfail.py
==============================================================================
--- py/branch/pytestplugin/py/test/plugin/pytest_xfail.py	(original)
+++ py/branch/pytestplugin/py/test/plugin/pytest_xfail.py	Wed Feb 18 09:07:21 2009
@@ -19,7 +19,7 @@
     # a hook implemented called by the terminalreporter instance/plugin
     def pytest_terminal_summary(self, terminalreporter):
         tr = terminalreporter
-        xfailed = tr.status2event.get("xfailed")
+        xfailed = tr.stats.__dict__.get("xfailed")
         if xfailed:
             tr.write_sep("_", "EXPECTED XFAILURES")
             for event in xfailed:
@@ -29,7 +29,7 @@
                 modpath = event.colitem.getmodpath(stopatmodule=False)
                 tr._tw.line("%s %s:%d: %s" %(modpath, entry.path, entry.lineno, entry.message))
 
-        xpassed = terminalreporter.status2event.get("xpassed")
+        xpassed = terminalreporter.stats.__dict__.get("xpassed")
         if xpassed:
             tr.write_sep("_", "UNEXPECTEDLY PASSING")
             for event in xpassed:

Deleted: /py/branch/pytestplugin/py/test/pmanage.py
==============================================================================
--- /py/branch/pytestplugin/py/test/pmanage.py	Wed Feb 18 09:07:21 2009
+++ (empty file)
@@ -1,122 +0,0 @@
-"""
-manage bootstrap, lifecycle and interaction with plugins 
-"""
-import py
-
-pytest_plugins = "pytest_plugins"
-
-DEBUG = False
-
-class PluginManager(object):
-    def __init__(self):
-        # XXX turn this into ordered dict? 
-        self._plugins = {}
-
-    def addpluginclass(self, pluginclass):
-        name = pluginclass.__name__.lower()
-        if name in self._plugins:
-            self.trace("plugin with name %r aready loaded" % name)
-            return
-        plugin = pluginclass()
-        if hasattr(self, '_configured') and hasattr(plugin, 'pytest_configure'):
-            plugin.pytest_configure(config=self._configured)
-            self.trace("instantiating and configuring: %s" %(pluginclass,))
-        else:
-            self.trace("instantiating plugin: %s" %(pluginclass,))
-        self._plugins[name] = plugin
-        return plugin 
-
-    def trace(self, msg): 
-        if DEBUG:
-            print >>py.std.sys.stderr, msg 
-
-    def import_plugin(self, importspec):
-        if not isinstance(importspec, basestring):
-           return self.addpluginclass(importspec)
-        else:
-            modprefix = "pytest_"
-            if not importspec.startswith(modprefix):
-                importspec = modprefix + importspec
-            try:
-                mod = __import__(importspec) 
-            except ImportError, e:
-                try:
-                    mod = __import__("py.__.test.plugin.%s" %(importspec), None, None, '__doc__')
-                except ImportError:
-                    raise ImportError(importspec)
-            clsname = importspec[len(modprefix):].capitalize()
-            pluginclass = getattr(mod, clsname) 
-            result = self.addpluginclass(pluginclass)
-            self.consider_module(mod)
-            return result
-
-    def getplugin(self, pname):
-        return self._plugins[pname.lower()] 
-
-    def consider_module(self, mod):
-        attr = getattr(mod, pytest_plugins, ())
-        if not isinstance(attr, (list, tuple)):
-            attr = (attr,)
-        for spec in attr:
-            if spec:
-                self.import_plugin(spec) 
-
-    #
-    # API for interacting with registered and instantiated plugin objects 
-    #
-    def setinitial(self, modules):
-        for module in modules:
-            self.consider_module(module)
-        envspec  = py.std.os.environ.get("PYTEST_PLUGINS", None)
-        if envspec:
-            for spec in map(str.strip, envspec.split(",")):
-                if spec:
-                    self.import_plugin(spec)
-
-    def add_cmdlineoptions(self, config):
-        # XXX think about sorting/grouping of options from user-perspective 
-        opts = []
-        for name, options in self.listattr("pytest_cmdlineoptions"):
-            opts.extend(options)
-        config.addoptions("ungrouped options added by plugins", *opts) 
-
-    def callplugins(self, methname, **args):
-        """ call all plugins with the given method name and args. """ 
-        for name, method in self.listattr(methname):
-            method(**args)
-
-    def callfirst(self, methname, **args):
-        """ call all plugins with the given method name and args. """ 
-        for name, method in self.listattr(methname):
-            res = method(**args)
-            if res is not None:
-                return res 
-
-    def getfirst(self, methname):
-        """ call all plugins with the given method name and args. """ 
-        l = self.listattr(methname)
-        if l:
-            return l[0][1]
-
-    def forward_event(self, event):
-        self.callplugins("pytest_event", event=event)
-
-    def listattr(self, attrname):
-        l = []
-        for name, plugin in self._plugins.items():
-            try:
-                attrvalue = getattr(plugin, attrname)
-                l.append((name, attrvalue))
-            except AttributeError:
-                continue 
-        return l
-
-    def configure(self, config):
-        #assert self.forward_event not in config.bus._subscribers
-        config.bus.subscribe(self.forward_event)
-        self.callplugins("pytest_configure", config=config) 
-        self._configured = config
-
-    def unconfigure(self, config):
-        self.callplugins("pytest_unconfigure", config=config)
-        config.bus.unsubscribe(self.forward_event)

Modified: py/branch/pytestplugin/py/test/session.py
==============================================================================
--- py/branch/pytestplugin/py/test/session.py	(original)
+++ py/branch/pytestplugin/py/test/session.py	Wed Feb 18 09:07:21 2009
@@ -42,19 +42,21 @@
         """ yield Items from iterating over the given colitems. """
         while colitems: 
             next = colitems.pop(0)
+            assert self.bus is next._config.bus
+            notify = self.bus.notify 
             if isinstance(next, Item):
                 remaining = self.filteritems([next])
                 if remaining:
-                    self.bus.notify(itemstart=event.ItemStart(next))
+                    notify(itemstart=event.ItemStart(next))
                     yield next 
             else:
                 assert isinstance(next, Collector)
-                self.bus.notify(collectionstart=event.CollectionStart(next))
+                notify(collectionstart=event.CollectionStart(next))
                 ev = basic_collect_report(next)
                 if ev.passed:
                     for x in self.genitems(ev.result, keywordexpr):
                         yield x 
-                self.bus.notify(collectionfinish=ev)
+                notify(collectionreport=ev)
 
     def filteritems(self, colitems):
         """ return items to process (some may be deselected)"""
@@ -72,7 +74,7 @@
                     continue
             remaining.append(colitem)
         if deselected: 
-            self.bus.notify(deselectedtest=event.Deselected(deselected, ))
+            self.bus.notify(deselected=event.Deselected(deselected, ))
             if self.config.option.keyword.endswith(":"):
                 self._nomatch = True
         return remaining 

Deleted: /py/branch/pytestplugin/py/test/testing/test_pmanage.py
==============================================================================
--- /py/branch/pytestplugin/py/test/testing/test_pmanage.py	Wed Feb 18 09:07:21 2009
+++ (empty file)
@@ -1,230 +0,0 @@
-import os, sys
-import py
-from py.__.test.pmanage import PluginManager 
-from py.__.test.event import NOP
-from py.__.test.config import Config as pytestConfig
-
-class TestInitialization:
-    def setup_method(self, method):
-        self.tmpdir = py.test.ensuretemp("%s.%s.%s" %
-            (__name__, self.__class__.__name__, method.__name__))
-
-    def test_import_plugin_importname(self):
-        pm = PluginManager()
-        py.test.raises(ImportError, 'pm.import_plugin("x.y")')
-        py.test.raises(ImportError, 'pm.import_plugin("pytest_x.y")')
-
-        sys.path.insert(0, str(self.tmpdir))
-        try:
-            pluginname = "pytest_hello"
-            self.tmpdir.join(pluginname + ".py").write(py.code.Source("""
-                class Hello:
-                    pass
-            """))
-            pm.import_plugin("hello")
-            pm.import_plugin("pytest_hello")
-            plugin = pm.getplugin("hello")
-        finally:
-            sys.path.remove(str(self.tmpdir))
-
-    @py.test.keywords(xfail=True)
-    def test_import_plugin_defaults_to_pytestplugin(self):
-        from py.__.test.defaultconftest import pytest_plugins
-        for name in pytest_plugins:
-            if isinstance(name, str):
-                break
-        sys.path.insert(0, str(self.tmpdir))
-        try:
-            self.tmpdir.join(name + ".py").write(py.code.Source("""
-                class Terminal:
-                    pass
-                class AnotherPlugin:
-                    pass
-                pytest_plugins = AnotherPlugin
-            """))
-            pm = PluginManager()
-            pm.import_plugin(name) 
-            plugin = pm.getplugin("terminal") 
-            del sys.modules[name]
-            print pm._plugins
-            plugin = pm.getplugin("anotherplugin") 
-        finally:
-            sys.path.remove(str(self.tmpdir))
-
-    def test_import_plugin_class(self):
-        pm = PluginManager()
-        class SomePlugin:
-            pass
-        pm.import_plugin(SomePlugin)
-        plugin = pm.getplugin("someplugin") 
-        assert isinstance(plugin, SomePlugin)
-        i = len(pm._plugins)
-        pm.import_plugin(SomePlugin)
-        assert len(pm._plugins) == i
-
-    def test_addpluginclass_post_configure(self):
-        pm = PluginManager()
-        l = []
-        class SomePlugin:
-            def pytest_configure(self, config):
-                l.append(config)
-        conf = pytestConfig()
-        pm.configure(config=conf)
-        pm.import_plugin(SomePlugin)
-        assert len(l) == 1
-        assert l[0] is conf
-
-    def test_consider_module(self):
-        pm = PluginManager()
-        sys.path.insert(0, str(self.tmpdir))
-        try:
-            self.tmpdir.join("pytest_plug1.py").write("class Plug1: pass")
-            self.tmpdir.join("pytest_plug2.py").write("class Plug2: pass")
-            mod = py.std.new.module("temp")
-            mod.pytest_plugins = ["pytest_plug1", "pytest_plug2"]
-            pm.consider_module(mod)
-            assert pm.getplugin("plug1").__class__.__name__ == "Plug1"
-            assert pm.getplugin("plug2").__class__.__name__ == "Plug2"
-        finally:
-            sys.path.remove(str(self.tmpdir))
-
-
-    def test_addpluginclass(self):
-        pm = PluginManager()
-        class My:
-            pass 
-        pm.addpluginclass(My)
-        assert len(pm._plugins) == 1
-        pm.addpluginclass(My)
-        assert len(pm._plugins) == 1
-
-    def test_getplugin(self):
-        pm = PluginManager()
-        assert py.test.raises(LookupError, "pm.getplugin('_xxx')")
-
-        class PluGin: pass 
-        pm.addpluginclass(PluGin)
-        myplugin1 = pm.getplugin("plugin")
-        myplugin2 = pm.getplugin("Plugin") 
-        assert myplugin1 is myplugin2 
-
-class TestPluginInteraction:
-    def test_getfirst(self):
-        pm = PluginManager()
-        class My1:
-            x = 1
-        assert pm.getfirst("x") is None
-        pm.addpluginclass(My1)
-        assert pm.getfirst("x") == 1
-
-    def test_callplugins(self):
-        pm = PluginManager()
-        class My:
-            def method(self, arg):
-                pass
-        pm.addpluginclass(My)
-        py.test.raises(TypeError, 'pm.callplugins("method")')
-        py.test.raises(TypeError, 'pm.callplugins("method", 42)')
-        pm.callplugins("method", arg=42)
-        py.test.raises(TypeError, 'pm.callplugins("method", arg=42, s=13)')
-
-    def test_callfirst(self):
-        pm = PluginManager()
-        class My1:
-            def method(self):
-                pass
-        class My2:
-            def method(self):
-                return True 
-        class My3:
-            def method(self):
-                return None
-        assert pm.callfirst("method") is None
-        assert pm.callfirst("methodnotexists") is None
-        pm.addpluginclass(My1)
-        assert pm.callfirst("method") is None
-        pm.addpluginclass(My2)
-        assert pm.callfirst("method") == True
-        pm.addpluginclass(My3)
-        assert pm.callfirst("method") == True
-
-    def test_listattr(self):
-        pm = PluginManager()
-        class My2:
-            x = 42
-        pm.addpluginclass(My2)
-        assert not pm.listattr("hello")
-        assert pm.listattr("x") == [('my2', 42)]
-
-    def test_eventbus_interaction(self):
-        pm = PluginManager()
-        l = []
-        class My3:
-            def pytest_configure(self, config):
-                l.append("configure")
-            def pytest_unconfigure(self, config):
-                l.append("unconfigure")
-        pm.addpluginclass(My3)
-        config = pytestConfig()
-        pm.configure(config)
-        assert config.bus.issubscribed(pm.forward_event)
-        pm.unconfigure(config)
-        assert not config.bus.issubscribed(pm.forward_event)
-        assert l == ['configure', 'unconfigure']
-
-    def test_pytest_event(self):
-        pm = PluginManager()
-        l = []
-        class My2:
-            def pytest_event(self, event):
-                l.append(event) 
-        pm.addpluginclass(My2)
-        ev = NOP()
-        pm.forward_event(ev)
-        assert len(l) == 1
-        assert l[0] is ev
-
-    def test_addcmdlineoptions(self):
-        pm = PluginManager()
-        class My:
-            pytest_cmdlineoptions = [
-                py.test.config.Option("--hello", dest="dest", default=242)
-            ]
-        pm.addpluginclass(My)
-        config = pytestConfig()
-        pm.add_cmdlineoptions(config)
-        opt = config._parser.get_option("--hello")
-        assert opt
-        assert opt.default == 242
-
-    def test_setinitial_env(self):
-        pm = PluginManager()
-        KEY = "PYTEST_PLUGINS"
-        old = os.environ.get(KEY, None)
-        try:
-            os.environ[KEY] = "test_setinitial"
-            py.test.raises(ImportError, "pm.setinitial([])")
-        finally:
-            if old is None: 
-                del os.environ[KEY]
-            else:
-                os.environ[KEY] = old 
-
-    def test_conftest_specifies_plugin(self, fstester):
-        fstester.makepyfile(
-            conftest="""
-                import py
-                class MyPlugin:
-                    pytest_cmdlineoptions = [
-                        py.test.config.Option("--myplugin-option", dest="myplugin", 
-                        help="myplugin option",
-                        )
-                    ]
-                pytest_plugins = MyPlugin
-            """
-        )
-        result = fstester.runpytest(fstester.tmpdir, '-h')
-        result.stdout.fnmatch_lines([
-            "*--myplugin-option*", 
-        ])
-



More information about the pytest-commit mailing list