[py-svn] r61991 - in py/branch/pytestplugin/py/test: . plugin testing
hpk at codespeak.net
hpk at codespeak.net
Wed Feb 18 09:07:23 CET 2009
Author: hpk
Date: Wed Feb 18 09:07:21 2009
New Revision: 61991
Removed:
py/branch/pytestplugin/py/test/pmanage.py
py/branch/pytestplugin/py/test/testing/test_pmanage.py
Modified:
py/branch/pytestplugin/py/test/plugin/pytest_pocoo.py
py/branch/pytestplugin/py/test/plugin/pytest_terminal.py
py/branch/pytestplugin/py/test/plugin/pytest_xfail.py
py/branch/pytestplugin/py/test/session.py
Log:
* getting rid of a base class in the Terminal Reporter
and moving yet more towards keyword based events.
* removing old plugin manage and tests
Modified: py/branch/pytestplugin/py/test/plugin/pytest_pocoo.py
==============================================================================
--- py/branch/pytestplugin/py/test/plugin/pytest_pocoo.py (original)
+++ py/branch/pytestplugin/py/test/plugin/pytest_pocoo.py Wed Feb 18 09:07:21 2009
@@ -21,11 +21,11 @@
def pytest_terminal_summary(self, terminalreporter):
if terminalreporter.config.option.pocoo_sendfailures:
tr = terminalreporter
- if "failed" in tr.status2event and tr.config.option.tbstyle != "no":
+ if tr.stats.failed and tr.config.option.tbstyle != "no":
terminalreporter.write_sep("=", "Sending failures to %s" %(url.base,))
terminalreporter.write_line("xmlrpcurl: %s" %(url.xmlrpc,))
serverproxy = self.getproxy()
- for ev in terminalreporter.status2event['failed']:
+ for ev in terminalreporter.stats.failed:
tw = py.io.TerminalWriter(stringio=True)
ev.toterminal(tw)
s = tw.stringio.getvalue()
Modified: py/branch/pytestplugin/py/test/plugin/pytest_terminal.py
==============================================================================
--- py/branch/pytestplugin/py/test/plugin/pytest_terminal.py (original)
+++ py/branch/pytestplugin/py/test/plugin/pytest_terminal.py Wed Feb 18 09:07:21 2009
@@ -22,20 +22,18 @@
assert hasattr(self.reporter._tw, name), name
setattr(self.reporter._tw, name, getattr(config, attr))
- def pytest_event(self, (name, obj)):
+ def pytest_event(self, (name, event)):
#print "processing event", event
#print "looking for", name
- repmethod = getattr(self.reporter, "pyevent_%s" % name, None)
+ evname = event.__class__.__name__
+ repmethod = getattr(self.reporter, "rep_%s" % evname, None)
if repmethod is not None:
- #print "found", repmethod
- repmethod(obj)
- else:
- self.reporter.processevent(obj)
+ #print "calling", repmethod, event
+ repmethod(event)
class OutcomeStats:
- def __init__(self):
+ def __init__(self, bus):
self._reset()
- def _subscribeat(self, bus):
bus.subscribe_methods(self)
def _reset(self):
@@ -47,14 +45,14 @@
def pyevent_itemtestreport(self, ev):
for name in 'skipped failed passed'.split():
if getattr(ev, name):
- #print "appending", ev
- getattr(self, name).append(ev)
+ l = getattr(self, name)
+ l.append(ev)
def pyevent_collectionreport(self, ev):
- for name in 'skipped failed'.split():
- if getattr(ev, name):
- getattr(self, name).append(ev)
- return
+ if ev.skipped:
+ self.skipped.append(ev)
+ if ev.failed:
+ self.failed.append(ev)
def pyevent_deselected(self, ev):
self.deselected.extend(ev.items)
@@ -62,20 +60,6 @@
def pyevent_testrunstart(self, ev):
self._reset()
- def get_folded_skips(self):
- d = {}
- #print "self.status2event skipped", self.status2event.get("skipped", [])
- #print "self.stats.skipped", self.stats.skipped
- assert len(self.status2event.get('skipped', ())) == len(self.stats.skipped)
- for event in self.status2event.get("skipped", []):
- entry = event.longrepr.reprcrash
- key = entry.path, entry.lineno, entry.message
- d.setdefault(key, []).append(event)
- l = []
- for key, events in d.iteritems():
- l.append((len(events),) + key)
- return l
-
class TestOutcomeStats:
def test_outcomestats(self):
names = 'passed skipped deselected failed'.split()
@@ -84,18 +68,13 @@
# assert getattr(stats, name) == []
# bus = py.test._EventBus()
# bus.subscribe_methods(stats)
- # class X: pass
- # for name2 in names:
- # if name2 != name:
- # setattr(X, name2, False)
- # else:
- # setattr(X, name2, True)
- # bus.notify(itemtestreport=X)
- # assert getattr(stats, name) == [X]
+ # rep = event.ItemTestReport(None)
+ # setattr(rep, name, True)
+ # bus.notify(itemtestreport=rep)
+ # assert getattr(stats, name) == [rep]
- stats = OutcomeStats()
bus = py.test._EventBus()
- stats._subscribeat(bus)
+ stats = OutcomeStats(bus)
class P:
passed = True
failed = skipped = False
@@ -104,63 +83,25 @@
assert stats.passed == [P]
bus.notify(itemtestreport=P)
assert stats.passed == [P, P]
+
class F:
skipped = passed = False
failed = True
bus.notify(itemtestreport=F)
assert stats.failed == [F]
+ bus.notify(collectionreport=F)
+ assert stats.failed == [F, F]
class S:
skipped = True
passed = failed = False
bus.notify(itemtestreport=S)
assert stats.skipped == [S]
- bus.notify(collectionreport=F)
- assert stats.failed == [F, F]
class D:
items = [42]
bus.notify(deselected=D)
assert stats.deselected == [42]
-
-
-class BaseReporter(object):
- def __init__(self):
- self.stats = OutcomeStats()
- self._reset()
-
- def _reset(self):
- self.status2event = {}
-
- def processevent(self, ev):
- evname = ev.__class__.__name__
- repmethod = getattr(self, "rep_%s" % evname, None)
- if repmethod is None:
- self.rep(ev)
- else:
- repmethod(ev)
-
- def rep(self, ev):
- pass
-
- def rep_ItemTestReport(self, ev):
- for name in 'skipped failed passed'.split():
- if getattr(ev, name):
- self.status2event.setdefault(name, []).append(ev)
- return
-
- def rep_CollectionReport(self, ev):
- for name in 'skipped failed'.split():
- if getattr(ev, name):
- self.status2event.setdefault(name, []).append(ev)
- return
-
- def rep_TestrunStart(self, ev):
- self._reset()
-
- def rep_Deselected(self, ev):
- self.status2event.setdefault('deselected', []).extend(ev.items)
-
def folded_skips(skipped):
d = {}
for event in skipped:
@@ -173,19 +114,15 @@
return l
-class TerminalReporter(BaseReporter):
+class TerminalReporter:
def __init__(self, config, file=None):
- super(TerminalReporter, self).__init__()
- self.config = config
- self.stats._subscribeat(config.bus)
+ self.config = config
+ self.stats = OutcomeStats(config.bus)
self.curdir = py.path.local()
if file is None:
file = py.std.sys.stdout
self._tw = py.io.TerminalWriter(file)
-
- def _reset(self):
self.currentfspath = None
- super(TerminalReporter, self)._reset()
def write_fspath_result(self, fspath, res):
if fspath != self.currentfspath:
@@ -284,10 +221,11 @@
self.write_sep("!", "RESCHEDULING %s " %(ev.items,))
def rep_ItemTestReport(self, ev):
- #super(TerminalReporter, self).rep_ItemTestReport(ev)
fspath = ev.colitem.fspath
cat, letter, word = self.getcategoryletterword(ev)
- self.status2event.setdefault(cat, []).append(ev)
+ l = self.stats.__dict__.setdefault(cat, [])
+ if ev not in l:
+ l.append(ev)
if not self.config.option.verbose:
self.write_fspath_result(fspath, letter)
else:
@@ -296,7 +234,6 @@
self.write_ensure_prefix(line, word)
def rep_CollectionReport(self, ev):
- super(TerminalReporter, self).rep_CollectionReport(ev)
if not ev.passed:
if ev.failed:
msg = ev.longrepr.reprcrash.message
@@ -305,7 +242,6 @@
self.write_fspath_result(ev.colitem.fspath, "S")
def rep_TestrunStart(self, ev):
- super(TerminalReporter, self).rep_TestrunStart(ev)
self.write_sep("=", "test session starts", bold=True)
self._sessionstarttime = py.std.time.time()
rev = py.__pkg__.getrev()
@@ -359,9 +295,9 @@
#
def summary_failures(self):
- if "failed" in self.status2event and self.config.option.tbstyle != "no":
+ if self.stats.failed and self.config.option.tbstyle != "no":
self.write_sep("=", "FAILURES")
- for ev in self.status2event['failed']:
+ for ev in self.stats.failed:
self.write_sep("_")
ev.toterminal(self._tw)
@@ -371,21 +307,21 @@
keys = "failed passed skipped deselected".split()
parts = []
for key in keys:
- if key in self.status2event:
- parts.append("%d %s" %(len(self.status2event[key]), key))
+ val = getattr(self.stats, key)
+ if val:
+ parts.append("%d %s" %(len(val), key))
line = ", ".join(parts)
# XXX coloring
self.write_sep("=", "%s in %.2f seconds" %(line, session_duration))
def summary_deselected(self):
- l = self.status2event.get("deselected", None)
- if l:
+ if self.stats.deselected:
self.write_sep("=", "%d tests deselected by %r" %(
- len(l), self.config.option.keyword), bold=True)
+ len(self.stats.deselected), self.config.option.keyword), bold=True)
def summary_skips(self):
- if "failed" not in self.status2event or self.config.option.showskipsummary:
- fskips = folded_skips(self.status2event.get('skipped', []))
+ if not self.stats.failed or self.config.option.showskipsummary:
+ fskips = folded_skips(self.stats.skipped)
if fskips:
self.write_sep("_", "skipped test summary")
for num, fspath, lineno, reason in fskips:
@@ -404,12 +340,11 @@
py.std.sys.executable,
repr_pythonversion()))
-class CollectonlyReporter(BaseReporter):
+class CollectonlyReporter:
INDENT = " "
def __init__(self, config, out=None):
- super(CollectonlyReporter, self).__init__()
- self.stats._subscribeat(config.bus)
+ self.config = config
if out is None:
out = py.std.sys.stdout
self.out = py.io.TerminalWriter(out)
@@ -427,7 +362,6 @@
self.outindent(event.item)
def rep_CollectionReport(self, ev):
- super(CollectonlyReporter, self).rep_CollectionReport(ev)
if not ev.passed:
self.outindent("!!! %s !!!" % ev.longrepr.reprcrash.message)
self.indent = self.indent[:-len(self.INDENT)]
@@ -459,7 +393,7 @@
item = tsession.getitem("def test_func(): pass")
rep = TerminalReporter(item._config, linecomp.stringio)
host = Host("localhost")
- rep.processevent(makehostup(host))
+ rep.rep_HostUp(makehostup(host))
linecomp.assert_contains_lines([
"*%s %s %s - Python %s" %(host.hostid, sys.platform,
sys.executable, repr_pythonversion(sys.version_info))
@@ -476,15 +410,16 @@
assert 0
""")
rep = TerminalReporter(modcol._config, file=linecomp.stringio)
- rep.processevent(event.TestrunStart())
+ registerdispatcher(rep)
+ rep.config.bus.notify(testrunstart=event.TestrunStart())
+
for item in tsession.genitems([modcol]):
ev = basic_run_report(item)
- rep.processevent(ev)
+ rep.config.bus.notify(itemtestreport=ev)
linecomp.assert_contains_lines([
"*test_pass_skip_fail.py .sF"
])
-
- rep.processevent(event.TestrunFinish())
+ rep.config.bus.notify(testrunfinish=event.TestrunFinish())
linecomp.assert_contains_lines([
" def test_func():",
"> assert 0",
@@ -502,21 +437,21 @@
assert 0
""", configargs=("-v",))
rep = TerminalReporter(modcol._config, file=linecomp.stringio)
- rep.processevent(event.TestrunStart())
+ registerdispatcher(rep)
+ rep.config.bus.notify(testrunstart=event.TestrunStart())
items = modcol.collect()
for item in items:
- rep.processevent(event.ItemStart(item))
+ rep.config.bus.notify(itemstart=event.ItemStart(item))
s = linecomp.stringio.getvalue().strip()
assert s.endswith(item.name)
- ev = basic_run_report(item)
- rep.processevent(ev)
+ rep.config.bus.notify(itemtestreport=basic_run_report(item))
linecomp.assert_contains_lines([
"*test_pass_skip_fail_verbose.py:2: *test_ok*PASS",
"*test_pass_skip_fail_verbose.py:4: *test_skip*SKIP",
"*test_pass_skip_fail_verbose.py:6: *test_func*FAIL",
])
- rep.processevent(event.TestrunFinish())
+ rep.config.bus.notify(testrunfinish=event.TestrunFinish())
linecomp.assert_contains_lines([
" def test_func():",
"> assert 0",
@@ -526,14 +461,14 @@
def test_collect_fail(self, tsession, linecomp):
modcol = tsession.getmodulecol("import xyz")
rep = TerminalReporter(modcol._config, file=linecomp.stringio)
- tsession.session.bus.subscribe(rep.processevent)
- rep.processevent(event.TestrunStart())
+ registerdispatcher(rep)
+ rep.config.bus.notify(testrunstart=event.TestrunStart())
l = list(tsession.genitems([modcol]))
assert len(l) == 0
linecomp.assert_contains_lines([
"*test_collect_fail.py F*"
])
- rep.processevent(event.TestrunFinish())
+ rep.config.bus.notify(testrunfinish=event.TestrunFinish())
linecomp.assert_contains_lines([
"> import xyz",
"E ImportError: No module named xyz"
@@ -543,7 +478,7 @@
modcol = tsession.getmodulecol("def test_one(): pass")
rep = TerminalReporter(modcol._config, file=linecomp.stringio)
excinfo = py.test.raises(ValueError, "raise ValueError('hello')")
- rep.processevent(event.InternalException(excinfo))
+ rep.rep_InternalException(event.InternalException(excinfo))
linecomp.assert_contains_lines([
"InternalException: >*raise ValueError*"
])
@@ -555,11 +490,11 @@
""", configargs=("-v",))
host1 = Host("localhost")
rep = TerminalReporter(modcol._config, file=linecomp.stringio)
- rep.processevent(event.HostGatewayReady(host1, None))
+ rep.rep_HostGatewayReady(event.HostGatewayReady(host1, None))
linecomp.assert_contains_lines([
"*HostGatewayReady*"
])
- rep.processevent(event.HostDown(host1, "myerror"))
+ rep.rep_HostDown(event.HostDown(host1, "myerror"))
linecomp.assert_contains_lines([
"*HostDown*myerror*",
])
@@ -584,7 +519,7 @@
""")
rep = TerminalReporter(modcol._config, file=linecomp.stringio)
reports = [basic_run_report(x) for x in modcol.collect()]
- rep.processevent(event.LooponfailingInfo(reports, [modcol._config.topdir]))
+ rep.rep_LooponfailingInfo(event.LooponfailingInfo(reports, [modcol._config.topdir]))
linecomp.assert_contains_lines([
"*test_looponfailingreport.py:2: assert 0",
"*test_looponfailingreport.py:4: ValueError*",
@@ -593,7 +528,8 @@
])
def test_tb_option(self, tsession, linecomp):
- for tbopt in ["no", "short", "long"]:
+ # XXX usage of tsession and event bus
+ for tbopt in ["long", "short", "no"]:
print 'testing --tb=%s...' % tbopt
modcol = tsession.getmodulecol("""
import py
@@ -604,13 +540,14 @@
g() # --calling--
""", configargs=("--tb=%s" % tbopt,))
rep = TerminalReporter(modcol._config, file=linecomp.stringio)
- rep.processevent(event.TestrunStart())
+ registerdispatcher(rep)
+ rep.config.bus.notify(testrunstart=event.TestrunStart())
for item in tsession.genitems([modcol]):
- ev = basic_run_report(item)
- rep.processevent(ev)
- rep.processevent(event.TestrunFinish())
+ rep.config.bus.notify(itemtestreport=basic_run_report(item))
+ rep.config.bus.notify(testrunfinish=event.TestrunFinish())
s = linecomp.stringio.getvalue()
if tbopt == "long":
+ print s
assert 'print 6*7' in s
else:
assert 'print 6*7' not in s
@@ -629,9 +566,10 @@
pass
""")
rep = TerminalReporter(modcol._config, file=linecomp.stringio)
+ registerdispatcher(rep)
l = list(tsession.genitems([modcol]))
assert len(l) == 1
- rep.processevent(event.ItemStart(l[0]))
+ rep.config.bus.notify(itemstart=event.ItemStart(l[0]))
linecomp.assert_contains_lines([
"*test_show_path_before_running_test.py*"
])
@@ -646,8 +584,8 @@
raise KeyboardInterrupt # simulating the user
""", configargs=("--showskipsummary",) + ("-v",)*verbose)
rep = TerminalReporter(modcol._config, file=linecomp.stringio)
+ registerdispatcher(rep)
bus = modcol._config.bus
- bus.subscribe(rep.processevent) # XXX
bus.notify(testrunstart=event.TestrunStart())
try:
for item in tsession.genitems([modcol]):
@@ -708,17 +646,19 @@
""")
#stringio = py.std.cStringIO.StringIO()
rep = CollectonlyReporter(modcol._config, out=linecomp.stringio)
+ registerdispatcher(rep)
indent = rep.indent
- rep.processevent(event.CollectionStart(modcol))
+ rep.config.bus.notify(collectionstart=event.CollectionStart(modcol))
linecomp.assert_contains_lines([
"<Module 'test_collectonly_basic.py'>"
])
item = modcol.join("test_func")
- rep.processevent(event.ItemStart(item))
+ rep.config.bus.notify(collectionstart=event.ItemStart(item))
linecomp.assert_contains_lines([
" <Function 'test_func'>",
])
- rep.processevent(event.CollectionReport(modcol, [], excinfo=None))
+ rep.config.bus.notify(
+ collectionreport=event.CollectionReport(modcol, [], excinfo=None))
assert rep.indent == indent
def test_collectonly_skipped_module(self, tsession, linecomp):
@@ -727,7 +667,7 @@
py.test.skip("nomod")
""")
rep = CollectonlyReporter(modcol._config, out=linecomp.stringio)
- tsession.session.bus.subscribe(rep.processevent)
+ registerdispatcher(rep)
cols = list(tsession.genitems([modcol]))
assert len(cols) == 0
linecomp.assert_contains_lines("""
@@ -740,7 +680,7 @@
raise ValueError(0)
""")
rep = CollectonlyReporter(modcol._config, out=linecomp.stringio)
- tsession.session.bus.subscribe(rep.processevent)
+ registerdispatcher(rep)
cols = list(tsession.genitems([modcol]))
assert len(cols) == 0
linecomp.assert_contains_lines("""
@@ -748,46 +688,6 @@
!!! ValueError: 0 !!!
""")
-class TestBaseReporter:
- def test_dispatch_to_matching_method(self):
- l = []
- class MyReporter(BaseReporter):
- def rep_TestrunStart(self, ev):
- l.append(ev)
- rep = MyReporter()
- ev = event.TestrunStart()
- rep.processevent(ev)
- assert len(l) == 1
- assert l[0] is ev
-
- def test_dispatch_to_default(self):
- l = []
- class MyReporter(BaseReporter):
- def rep(self, ev):
- l.append(ev)
- rep = MyReporter()
- ev = event.NOP()
- rep.processevent(ev)
- assert len(l) == 1
- assert l[0] is ev
-
- def test_TestItemReport_one(self):
- for outcome in 'passed skipped failed'.split():
- rep = BaseReporter()
- ev = event.ItemTestReport(None)
- setattr(ev, outcome, True)
- rep.processevent(ev)
- assert rep.status2event[outcome]
-
- def test_CollectionReport_events_are_counted(self):
- for outcome in 'skipped failed'.split():
- rep = BaseReporter()
- ev = event.CollectionReport(None, None)
- setattr(ev, outcome, True)
- rep.processevent(ev)
- assert rep.status2event[outcome]
-
-
def test_repr_python_version():
py.magic.patch(sys, 'version_info', (2, 5, 1, 'final', 0))
try:
@@ -796,3 +696,11 @@
assert repr_pythonversion() == str(x)
finally:
py.magic.revert(sys, 'version_info')
+
+def registerdispatcher(rep):
+ def dispatch((name, event)):
+ meth = getattr(rep, "rep_" + event.__class__.__name__, None)
+ if meth is not None:
+ meth(event)
+ rep.config.bus._bus.subscribe(dispatch)
+
Modified: py/branch/pytestplugin/py/test/plugin/pytest_xfail.py
==============================================================================
--- py/branch/pytestplugin/py/test/plugin/pytest_xfail.py (original)
+++ py/branch/pytestplugin/py/test/plugin/pytest_xfail.py Wed Feb 18 09:07:21 2009
@@ -19,7 +19,7 @@
# a hook implemented called by the terminalreporter instance/plugin
def pytest_terminal_summary(self, terminalreporter):
tr = terminalreporter
- xfailed = tr.status2event.get("xfailed")
+ xfailed = tr.stats.__dict__.get("xfailed")
if xfailed:
tr.write_sep("_", "EXPECTED XFAILURES")
for event in xfailed:
@@ -29,7 +29,7 @@
modpath = event.colitem.getmodpath(stopatmodule=False)
tr._tw.line("%s %s:%d: %s" %(modpath, entry.path, entry.lineno, entry.message))
- xpassed = terminalreporter.status2event.get("xpassed")
+ xpassed = terminalreporter.stats.__dict__.get("xpassed")
if xpassed:
tr.write_sep("_", "UNEXPECTEDLY PASSING")
for event in xpassed:
Deleted: /py/branch/pytestplugin/py/test/pmanage.py
==============================================================================
--- /py/branch/pytestplugin/py/test/pmanage.py Wed Feb 18 09:07:21 2009
+++ (empty file)
@@ -1,122 +0,0 @@
-"""
-manage bootstrap, lifecycle and interaction with plugins
-"""
-import py
-
-pytest_plugins = "pytest_plugins"
-
-DEBUG = False
-
-class PluginManager(object):
- def __init__(self):
- # XXX turn this into ordered dict?
- self._plugins = {}
-
- def addpluginclass(self, pluginclass):
- name = pluginclass.__name__.lower()
- if name in self._plugins:
- self.trace("plugin with name %r aready loaded" % name)
- return
- plugin = pluginclass()
- if hasattr(self, '_configured') and hasattr(plugin, 'pytest_configure'):
- plugin.pytest_configure(config=self._configured)
- self.trace("instantiating and configuring: %s" %(pluginclass,))
- else:
- self.trace("instantiating plugin: %s" %(pluginclass,))
- self._plugins[name] = plugin
- return plugin
-
- def trace(self, msg):
- if DEBUG:
- print >>py.std.sys.stderr, msg
-
- def import_plugin(self, importspec):
- if not isinstance(importspec, basestring):
- return self.addpluginclass(importspec)
- else:
- modprefix = "pytest_"
- if not importspec.startswith(modprefix):
- importspec = modprefix + importspec
- try:
- mod = __import__(importspec)
- except ImportError, e:
- try:
- mod = __import__("py.__.test.plugin.%s" %(importspec), None, None, '__doc__')
- except ImportError:
- raise ImportError(importspec)
- clsname = importspec[len(modprefix):].capitalize()
- pluginclass = getattr(mod, clsname)
- result = self.addpluginclass(pluginclass)
- self.consider_module(mod)
- return result
-
- def getplugin(self, pname):
- return self._plugins[pname.lower()]
-
- def consider_module(self, mod):
- attr = getattr(mod, pytest_plugins, ())
- if not isinstance(attr, (list, tuple)):
- attr = (attr,)
- for spec in attr:
- if spec:
- self.import_plugin(spec)
-
- #
- # API for interacting with registered and instantiated plugin objects
- #
- def setinitial(self, modules):
- for module in modules:
- self.consider_module(module)
- envspec = py.std.os.environ.get("PYTEST_PLUGINS", None)
- if envspec:
- for spec in map(str.strip, envspec.split(",")):
- if spec:
- self.import_plugin(spec)
-
- def add_cmdlineoptions(self, config):
- # XXX think about sorting/grouping of options from user-perspective
- opts = []
- for name, options in self.listattr("pytest_cmdlineoptions"):
- opts.extend(options)
- config.addoptions("ungrouped options added by plugins", *opts)
-
- def callplugins(self, methname, **args):
- """ call all plugins with the given method name and args. """
- for name, method in self.listattr(methname):
- method(**args)
-
- def callfirst(self, methname, **args):
- """ call all plugins with the given method name and args. """
- for name, method in self.listattr(methname):
- res = method(**args)
- if res is not None:
- return res
-
- def getfirst(self, methname):
- """ call all plugins with the given method name and args. """
- l = self.listattr(methname)
- if l:
- return l[0][1]
-
- def forward_event(self, event):
- self.callplugins("pytest_event", event=event)
-
- def listattr(self, attrname):
- l = []
- for name, plugin in self._plugins.items():
- try:
- attrvalue = getattr(plugin, attrname)
- l.append((name, attrvalue))
- except AttributeError:
- continue
- return l
-
- def configure(self, config):
- #assert self.forward_event not in config.bus._subscribers
- config.bus.subscribe(self.forward_event)
- self.callplugins("pytest_configure", config=config)
- self._configured = config
-
- def unconfigure(self, config):
- self.callplugins("pytest_unconfigure", config=config)
- config.bus.unsubscribe(self.forward_event)
Modified: py/branch/pytestplugin/py/test/session.py
==============================================================================
--- py/branch/pytestplugin/py/test/session.py (original)
+++ py/branch/pytestplugin/py/test/session.py Wed Feb 18 09:07:21 2009
@@ -42,19 +42,21 @@
""" yield Items from iterating over the given colitems. """
while colitems:
next = colitems.pop(0)
+ assert self.bus is next._config.bus
+ notify = self.bus.notify
if isinstance(next, Item):
remaining = self.filteritems([next])
if remaining:
- self.bus.notify(itemstart=event.ItemStart(next))
+ notify(itemstart=event.ItemStart(next))
yield next
else:
assert isinstance(next, Collector)
- self.bus.notify(collectionstart=event.CollectionStart(next))
+ notify(collectionstart=event.CollectionStart(next))
ev = basic_collect_report(next)
if ev.passed:
for x in self.genitems(ev.result, keywordexpr):
yield x
- self.bus.notify(collectionfinish=ev)
+ notify(collectionreport=ev)
def filteritems(self, colitems):
""" return items to process (some may be deselected)"""
@@ -72,7 +74,7 @@
continue
remaining.append(colitem)
if deselected:
- self.bus.notify(deselectedtest=event.Deselected(deselected, ))
+ self.bus.notify(deselected=event.Deselected(deselected, ))
if self.config.option.keyword.endswith(":"):
self._nomatch = True
return remaining
Deleted: /py/branch/pytestplugin/py/test/testing/test_pmanage.py
==============================================================================
--- /py/branch/pytestplugin/py/test/testing/test_pmanage.py Wed Feb 18 09:07:21 2009
+++ (empty file)
@@ -1,230 +0,0 @@
-import os, sys
-import py
-from py.__.test.pmanage import PluginManager
-from py.__.test.event import NOP
-from py.__.test.config import Config as pytestConfig
-
-class TestInitialization:
- def setup_method(self, method):
- self.tmpdir = py.test.ensuretemp("%s.%s.%s" %
- (__name__, self.__class__.__name__, method.__name__))
-
- def test_import_plugin_importname(self):
- pm = PluginManager()
- py.test.raises(ImportError, 'pm.import_plugin("x.y")')
- py.test.raises(ImportError, 'pm.import_plugin("pytest_x.y")')
-
- sys.path.insert(0, str(self.tmpdir))
- try:
- pluginname = "pytest_hello"
- self.tmpdir.join(pluginname + ".py").write(py.code.Source("""
- class Hello:
- pass
- """))
- pm.import_plugin("hello")
- pm.import_plugin("pytest_hello")
- plugin = pm.getplugin("hello")
- finally:
- sys.path.remove(str(self.tmpdir))
-
- @py.test.keywords(xfail=True)
- def test_import_plugin_defaults_to_pytestplugin(self):
- from py.__.test.defaultconftest import pytest_plugins
- for name in pytest_plugins:
- if isinstance(name, str):
- break
- sys.path.insert(0, str(self.tmpdir))
- try:
- self.tmpdir.join(name + ".py").write(py.code.Source("""
- class Terminal:
- pass
- class AnotherPlugin:
- pass
- pytest_plugins = AnotherPlugin
- """))
- pm = PluginManager()
- pm.import_plugin(name)
- plugin = pm.getplugin("terminal")
- del sys.modules[name]
- print pm._plugins
- plugin = pm.getplugin("anotherplugin")
- finally:
- sys.path.remove(str(self.tmpdir))
-
- def test_import_plugin_class(self):
- pm = PluginManager()
- class SomePlugin:
- pass
- pm.import_plugin(SomePlugin)
- plugin = pm.getplugin("someplugin")
- assert isinstance(plugin, SomePlugin)
- i = len(pm._plugins)
- pm.import_plugin(SomePlugin)
- assert len(pm._plugins) == i
-
- def test_addpluginclass_post_configure(self):
- pm = PluginManager()
- l = []
- class SomePlugin:
- def pytest_configure(self, config):
- l.append(config)
- conf = pytestConfig()
- pm.configure(config=conf)
- pm.import_plugin(SomePlugin)
- assert len(l) == 1
- assert l[0] is conf
-
- def test_consider_module(self):
- pm = PluginManager()
- sys.path.insert(0, str(self.tmpdir))
- try:
- self.tmpdir.join("pytest_plug1.py").write("class Plug1: pass")
- self.tmpdir.join("pytest_plug2.py").write("class Plug2: pass")
- mod = py.std.new.module("temp")
- mod.pytest_plugins = ["pytest_plug1", "pytest_plug2"]
- pm.consider_module(mod)
- assert pm.getplugin("plug1").__class__.__name__ == "Plug1"
- assert pm.getplugin("plug2").__class__.__name__ == "Plug2"
- finally:
- sys.path.remove(str(self.tmpdir))
-
-
- def test_addpluginclass(self):
- pm = PluginManager()
- class My:
- pass
- pm.addpluginclass(My)
- assert len(pm._plugins) == 1
- pm.addpluginclass(My)
- assert len(pm._plugins) == 1
-
- def test_getplugin(self):
- pm = PluginManager()
- assert py.test.raises(LookupError, "pm.getplugin('_xxx')")
-
- class PluGin: pass
- pm.addpluginclass(PluGin)
- myplugin1 = pm.getplugin("plugin")
- myplugin2 = pm.getplugin("Plugin")
- assert myplugin1 is myplugin2
-
-class TestPluginInteraction:
- def test_getfirst(self):
- pm = PluginManager()
- class My1:
- x = 1
- assert pm.getfirst("x") is None
- pm.addpluginclass(My1)
- assert pm.getfirst("x") == 1
-
- def test_callplugins(self):
- pm = PluginManager()
- class My:
- def method(self, arg):
- pass
- pm.addpluginclass(My)
- py.test.raises(TypeError, 'pm.callplugins("method")')
- py.test.raises(TypeError, 'pm.callplugins("method", 42)')
- pm.callplugins("method", arg=42)
- py.test.raises(TypeError, 'pm.callplugins("method", arg=42, s=13)')
-
- def test_callfirst(self):
- pm = PluginManager()
- class My1:
- def method(self):
- pass
- class My2:
- def method(self):
- return True
- class My3:
- def method(self):
- return None
- assert pm.callfirst("method") is None
- assert pm.callfirst("methodnotexists") is None
- pm.addpluginclass(My1)
- assert pm.callfirst("method") is None
- pm.addpluginclass(My2)
- assert pm.callfirst("method") == True
- pm.addpluginclass(My3)
- assert pm.callfirst("method") == True
-
- def test_listattr(self):
- pm = PluginManager()
- class My2:
- x = 42
- pm.addpluginclass(My2)
- assert not pm.listattr("hello")
- assert pm.listattr("x") == [('my2', 42)]
-
- def test_eventbus_interaction(self):
- pm = PluginManager()
- l = []
- class My3:
- def pytest_configure(self, config):
- l.append("configure")
- def pytest_unconfigure(self, config):
- l.append("unconfigure")
- pm.addpluginclass(My3)
- config = pytestConfig()
- pm.configure(config)
- assert config.bus.issubscribed(pm.forward_event)
- pm.unconfigure(config)
- assert not config.bus.issubscribed(pm.forward_event)
- assert l == ['configure', 'unconfigure']
-
- def test_pytest_event(self):
- pm = PluginManager()
- l = []
- class My2:
- def pytest_event(self, event):
- l.append(event)
- pm.addpluginclass(My2)
- ev = NOP()
- pm.forward_event(ev)
- assert len(l) == 1
- assert l[0] is ev
-
- def test_addcmdlineoptions(self):
- pm = PluginManager()
- class My:
- pytest_cmdlineoptions = [
- py.test.config.Option("--hello", dest="dest", default=242)
- ]
- pm.addpluginclass(My)
- config = pytestConfig()
- pm.add_cmdlineoptions(config)
- opt = config._parser.get_option("--hello")
- assert opt
- assert opt.default == 242
-
- def test_setinitial_env(self):
- pm = PluginManager()
- KEY = "PYTEST_PLUGINS"
- old = os.environ.get(KEY, None)
- try:
- os.environ[KEY] = "test_setinitial"
- py.test.raises(ImportError, "pm.setinitial([])")
- finally:
- if old is None:
- del os.environ[KEY]
- else:
- os.environ[KEY] = old
-
- def test_conftest_specifies_plugin(self, fstester):
- fstester.makepyfile(
- conftest="""
- import py
- class MyPlugin:
- pytest_cmdlineoptions = [
- py.test.config.Option("--myplugin-option", dest="myplugin",
- help="myplugin option",
- )
- ]
- pytest_plugins = MyPlugin
- """
- )
- result = fstester.runpytest(fstester.tmpdir, '-h')
- result.stdout.fnmatch_lines([
- "*--myplugin-option*",
- ])
-
More information about the pytest-commit
mailing list