[py-svn] r7148 - in py/dist/py: . test test/report/text
hpk at codespeak.net
hpk at codespeak.net
Thu Oct 28 15:24:18 CEST 2004
Author: hpk
Date: Thu Oct 28 15:24:17 2004
New Revision: 7148
Added:
py/dist/py/test/drive.py
- copied, changed from r7140, py/dist/py/test/run.py
Modified:
py/dist/py/__init__.py
py/dist/py/test/cmdline.py
py/dist/py/test/report/text/reporter.py
py/dist/py/test/report/text/summary.py
py/dist/py/test/run.py
Log:
- reorganized py.test implementation
- now in "normal" mode no subprocess and no thread is currently started
in session mode threads and a child process are started
- improved/enriched the reporter output slightly.
Modified: py/dist/py/__init__.py
==============================================================================
--- py/dist/py/__init__.py (original)
+++ py/dist/py/__init__.py Thu Oct 28 15:24:17 2004
@@ -17,13 +17,13 @@
'test.collect.PyCollector':'./test/collect.PyCollector',
'test.collect.Error': './test/collect.Error',
'test.Item': './test/item.Item',
- 'test.Driver': './test/run.Driver',
+ 'test.Driver': './test/drive.Driver',
'test.Option': './test/tool/optparse.Option',
'test.TextReporter': './test/report/text/reporter.TextReporter',
'test.MemoReporter': './test/report/memo.MemoReporter',
- 'test.exit': './test/run.exit',
- 'test.fail': './test/run.fail',
- 'test.skip': './test/run.skip',
+ 'test.exit': './test/drive.exit',
+ 'test.fail': './test/drive.fail',
+ 'test.skip': './test/drive.skip',
'test.raises': './test/raises.raises',
'test.config': './test/config.config',
'test.compat.TestCase': './test/compat.TestCase',
Modified: py/dist/py/test/cmdline.py
==============================================================================
--- py/dist/py/test/cmdline.py (original)
+++ py/dist/py/test/cmdline.py Thu Oct 28 15:24:17 2004
@@ -1,14 +1,13 @@
from __future__ import generators
import py
import sys
-from py.__impl__.execnet.channel import ChannelFile, receive2file
-from py.__impl__.test.config import configbasename
+from py.__impl__.test import run
#
# main entry point
#
-def old(argv):
+def _old(argv):
if argv is None:
argv = py.std.sys.argv
frame = py.std.sys._getframe(1)
@@ -18,178 +17,19 @@
import __main__
return [py.test.collect.Module(py.std.sys.argv[0])]
-def waitfilechange():
- """ wait until project files are changed. """
- rootdir = py.test.config.getfirst('rootdir')
- rootdir = py.path.local(rootdir)
- fil = py.path.checker(fnmatch='*.py')
- rec = py.path.checker(dotfile=0)
- statcache = {}
- for path in rootdir.visit(fil, rec):
- statcache[path] = path.stat()
-
- print "waiting for file change below", str(rootdir)
- while 1:
- py.std.time.sleep(0.4)
- for path, cst in statcache.items():
- try:
- st = path.stat()
- except path.NotFound:
- return
- else:
- if st.st_mtime != cst.st_mtime or \
- st.st_size != cst.st_size:
- return
-
-class FailingCollector(py.test.collect.Collector):
- def __init__(self, faileditems):
- self._faileditems = faileditems
-
- def __iter__(self):
- for x in self._faileditems:
- yield x
-
def main():
args = py.std.sys.argv[1:]
- py.test.config.readconfiguration(*getanchors(args))
+ py.test.config.readconfiguration(*run.getanchors(args))
filenames = py.test.config.parseargs(args)
if not filenames:
filenames.append(str(py.path.local()))
-
try:
- while 1:
- failures = master(args, filenames)
- if not failures or not py.test.config.option.session:
- break
- while failures:
- print "session mode: %d failures remaining" % len(failures)
- waitfilechange()
- failures = failure_master(args, filenames, failures)
+ if py.test.config.option.session:
+ run.session(args, filenames)
+ else:
+ run.inprocess(args, filenames)
except KeyboardInterrupt:
print
print "Keybordinterrupt"
raise SystemExit, 2
-
-class StdouterrProxy:
- def __init__(self, gateway):
- self.gateway = gateway
- def setup(self):
- channel = self.gateway.remote_exec("""
- import sys
- out, err = channel.newchannel(), channel.newchannel()
- channel.send(out)
- channel.send(err)
- sys.stdout, sys.stderr = out.open('w'), err.open('w')
- """)
- self.stdout = channel.receive()
- self.stderr = channel.receive()
- channel.waitclose(1.0)
- py.std.threading.Thread(target=receive2file,
- args=(self.stdout, sys.stdout)).start()
- py.std.threading.Thread(target=receive2file,
- args=(self.stderr, sys.stderr)).start()
- def teardown(self):
- self.stdout.close()
- self.stderr.close()
-
-def waitfinish(channel):
- try:
- while 1:
- try:
- channel.waitclose(0.1)
- except IOError:
- continue
- else:
- failures = channel.receive()
- return failures
- break
- finally:
- #print "closing down channel and gateway"
- channel.close()
- channel.gateway.exit()
-
-def failure_master(args, filenames, failures):
- gw = py.execnet.PopenGateway()
- outproxy = StdouterrProxy(gw)
- outproxy.setup()
- try:
- channel = gw.remote_exec("""
- from py.__impl__.test.cmdline import failure_slave
- failure_slave(channel)
- """)
- channel.send((args, filenames))
- channel.send(failures)
- return waitfinish(channel)
- finally:
- outproxy.teardown()
-
-def failure_slave(channel):
- """ we run this on the other side. """
- args, filenames = channel.receive()
- filenames = map(py.path.local, filenames)
- py.test.config.readconfiguration(*filenames)
- py.test.config.parseargs(args)
-
- failures = channel.receive()
- col = FailureCollector(failures)
- driver = py.test.Driver(channel)
- failures = driver.run(col)
- channel.send(failures)
-
-class FailureCollector(py.test.collect.Collector):
- def __init__(self, failures):
- self.failures = failures
- def __iter__(self):
- for root,modpath in self.failures:
- extpy = py.path.extpy(root, modpath)
- yield self.Item(extpy)
-
-def master(args, filenames):
- gw = py.execnet.PopenGateway()
- outproxy = StdouterrProxy(gw)
- outproxy.setup()
- try:
- channel = gw.remote_exec("""
- from py.__impl__.test.cmdline import slave
- slave(channel)
- """)
- channel.send((args, filenames))
- return waitfinish(channel)
- finally:
- outproxy.teardown()
-
-def slave(channel):
- """ we run this on the other side. """
- args, filenames = channel.receive()
- filenames = map(py.path.local, filenames)
- py.test.config.readconfiguration(*filenames)
- py.test.config.parseargs(args)
- fncollectors = list(getcollectors(filenames))
-
- driver = py.test.Driver(channel)
- failures = driver.run(fncollectors)
- channel.send(failures)
-
-def getcollectors(filenames):
- current = py.path.local()
- for fn in filenames:
- fullfn = current.join(fn, abs=1)
- if fullfn.check(file=1):
- yield py.test.collect.Module(fullfn)
- elif fullfn.check(dir=1):
- yield py.test.collect.Directory(fullfn)
- else:
- raise IOError, "%r does not exist" % fn
-
-def getanchors(args):
- """ yield "anchors" from skimming the args for existing files/dirs. """
- current = py.path.local()
- l = []
- for arg in args:
- anchor = current.join(arg, abs=1)
- if anchor.check():
- l.append(anchor)
- if not l:
- l = [current]
- return l
Copied: py/dist/py/test/drive.py (from r7140, py/dist/py/test/run.py)
==============================================================================
--- py/dist/py/test/run.py (original)
+++ py/dist/py/test/drive.py Thu Oct 28 15:24:17 2004
@@ -50,7 +50,7 @@
def run_collector_or_item(self, obj):
""" run (possibly many) testitems and/or collectors. """
- if self._channel.isclosed():
+ if self._channel and self._channel.isclosed():
raise SystemExit, "Channel is closed"
if isinstance(obj, py.test.Item):
if not self.option.collectonly:
Modified: py/dist/py/test/report/text/reporter.py
==============================================================================
--- py/dist/py/test/report/text/reporter.py (original)
+++ py/dist/py/test/report/text/reporter.py Thu Oct 28 15:24:17 2004
@@ -33,14 +33,17 @@
self.summary.option = self.option = py.test.config.option
def start(self, conf=None):
- self.summary.starttime = now()
if conf is not None and conf.mypath is not None:
self.out.line("using %s" % conf.mypath)
- self.out.sep("_")
- self.out.sep("_", " TESTS STARTING ")
- self.out.sep("_")
+ self.out.sep("=", "test process starts")
+ mode = py.test.config.option.session and 'session/child process' or 'inprocess'
+ self.out.line("testing-mode: %s" % mode)
+ self.out.line("executable : %s (%s)" %
+ (py.std.sys.executable, repr_pythonversion()))
+ self.out.sep("=")
if not self.option.nomagic:
py.magic.invoke(assertion=1)
+ self.summary.starttime = now()
def end(self):
if not self.option.nomagic:
@@ -175,3 +178,11 @@
# offendingline = misc.getline(tb)
# return (origin, offendingline)
+
+def repr_pythonversion():
+ v = py.std.sys.version_info
+ try:
+ return "%s.%s.%s-%s-%s" % v
+ except ValueError:
+ return str(v)
+
Modified: py/dist/py/test/report/text/summary.py
==============================================================================
--- py/dist/py/test/report/text/summary.py (original)
+++ py/dist/py/test/report/text/summary.py Thu Oct 28 15:24:17 2004
@@ -47,11 +47,13 @@
sum = 0
for typ in Item.Passed, Item.Failed, Item.Skipped:
l = self.getlist(typ)
- outlist.append('%d %s' % (len(l), typ.__name__.lower()))
+ if l:
+ outlist.append('%d %s' % (len(l), typ.__name__.lower()))
sum += len(l)
- self.out.sep('=', '%d TESTS FINISHED' % sum)
- self.out.write('%4.2f seconds' % (self.endtime-self.starttime))
- self.out.line(" (%s)" % ", ".join(outlist))
+ elapsed = self.endtime-self.starttime
+ status = "%s" % ", ".join(outlist)
+ self.out.sep('=', 'tests finished: %s in %4.2f seconds' %
+ (status, elapsed))
def getexinfo(self, res):
_,exc,tb = res.excinfo
Modified: py/dist/py/test/run.py
==============================================================================
--- py/dist/py/test/run.py (original)
+++ py/dist/py/test/run.py Thu Oct 28 15:24:17 2004
@@ -1,159 +1,179 @@
from __future__ import generators
import py
-collect = py.test.collect
-
-class Exit(Exception):
- """ for immediate program exits without tracebacks and reporter/summary. """
- def __init__(self, item=None):
- self.item = None
- Exception.__init__(self)
-
-def exit(*args):
- raise Exit(*args)
-
-def skip(msg="unknown reason"):
- """ skip with the given Message. """
- raise py.test.Item.Skipped(msg=msg, tbindex=-2)
-
-def fail(msg="unknown failure"):
- """ fail with the given Message. """
- raise py.test.Item.Failed(msg=msg, tbindex=-2)
-
-class Driver:
- option = py.test.config.option
- Exit = Exit
-
- def __init__(self, channel):
- self.reporter = py.test.config.getfirst('getreporter') ()
- self._setupstack = []
- self._instance = None
- self._channel = channel
- self._failed = []
-
- def run(self, collectors):
- """ main loop for running tests. """
- self.setup()
- try:
- try:
- self.reporter.start()
- for x in collectors:
- self.run_collector_or_item(x)
- finally:
- self.teardown()
- except self.Exit, ex:
- pass
- self.reporter.end()
- l = []
- for x in self._failed:
- l.append((str(x.extpy.root), str(x.extpy.modpath)))
- return l
-
- def run_collector_or_item(self, obj):
- """ run (possibly many) testitems and/or collectors. """
- if self._channel.isclosed():
- raise SystemExit, "Channel is closed"
- if isinstance(obj, py.test.Item):
- if not self.option.collectonly:
- #if self.option.args:
- # if str(obj.path).find(self.option.args[0]) == -1:
- # return
- self.runitem(obj)
- elif isinstance(obj, py.test.collect.Collector):
- self.runcollector(obj)
- elif isinstance(obj, py.test.collect.Collector.Error):
+from py.__impl__.execnet.channel import ChannelFile, receive2file
+from py.__impl__.test.config import configbasename
+import sys
+
+def waitfilechange():
+ """ wait until project files are changed. """
+ rootdir = py.test.config.getfirst('rootdir')
+ rootdir = py.path.local(rootdir)
+ fil = py.path.checker(fnmatch='*.py')
+ rec = py.path.checker(dotfile=0)
+ statcache = {}
+ for path in rootdir.visit(fil, rec):
+ statcache[path] = path.stat()
+
+ print "waiting for file change below", str(rootdir)
+ while 1:
+ py.std.time.sleep(0.4)
+ for path, cst in statcache.items():
try:
- self.reporter.report_collect_error(obj)
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- print "*" * 80
- print "Reporter Error"
- print "*" * 80
- import traceback
- traceback.print_exc()
- raise SystemExit, 1
- if self.option.exitfirstproblem:
- raise self.Exit()
- else:
- raise TypeError("%r is not a Item or Collector instance" % obj)
-
- def runcollector(self, collector):
- close = self.reporter.open(collector)
- try:
- for obj in collector:
- self.run_collector_or_item(obj)
- finally:
- if close:
- close()
-
- def runitem(self, item):
- self.reporter.startitem(item)
- try:
- res = item.execute(self) or item.Passed()
- except item.Outcome, res:
- res.excinfo = py.std.sys.exc_info()
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- res = item.Failed(excinfo=py.std.sys.exc_info())
- res.item = item
- self.reporter.enditem(res)
- if isinstance(res, (item.Failed,)):
- self._failed.append(item)
- if py.test.config.option.exitfirstproblem:
- raise self.Exit(res.item)
-
- def setup_path(self, extpy):
- """ setup objects along the path to the test-method
- (pointed to by extpy). Tear down any previously
- setup objects which are not directly needed.
- """
- # setupstack contains (extpy, obj)'s of already setup objects
- # strict ordering is maintained, i.e. each extpy in
- # the stack is "relto" the previous extpy.
- stack = self._setupstack
- while stack and not extpy.relto(stack[-1][0]):
- self._teardownone(stack.pop()[1])
- rest = extpy.parts()[len(stack):-1]
- for x in rest:
- stack.append((x, self._setupone(x)))
-
+ st = path.stat()
+ except path.NotFound:
+ return
+ else:
+ if st.st_mtime != cst.st_mtime or \
+ st.st_size != cst.st_size:
+ return
+
+class FailingCollector(py.test.collect.Collector):
+ def __init__(self, faileditems):
+ self._faileditems = faileditems
+
+ def __iter__(self):
+ for x in self._faileditems:
+ yield x
+
+
+class StdouterrProxy:
+ def __init__(self, gateway):
+ self.gateway = gateway
def setup(self):
- """ setup any neccessary resources. """
- self._failed[:] = []
-
+ channel = self.gateway.remote_exec("""
+ import sys
+ out, err = channel.newchannel(), channel.newchannel()
+ channel.send(out)
+ channel.send(err)
+ sys.stdout, sys.stderr = out.open('w'), err.open('w')
+ """)
+ self.stdout = channel.receive()
+ self.stderr = channel.receive()
+ channel.waitclose(1.0)
+ py.std.threading.Thread(target=receive2file,
+ args=(self.stdout, sys.stdout)).start()
+ py.std.threading.Thread(target=receive2file,
+ args=(self.stderr, sys.stderr)).start()
def teardown(self):
- """ teardown any resources the driver knows about. """
- while self._setupstack:
- self._teardownone(self._setupstack.pop()[1])
-
- def _setupone(self, extpy):
- obj = extpy.resolve()
- if py.std.inspect.ismodule(obj):
- if hasattr(obj, 'setup_module'):
- obj.setup_module(obj)
- elif py.std.inspect.isclass(obj):
- if hasattr(obj, 'setup_class'):
- obj.setup_class.im_func(obj)
- return obj
-
- def _teardownone(self, obj):
- if py.std.inspect.ismodule(obj):
- if hasattr(obj, 'teardown_module'):
- obj.teardown_module(obj)
- elif py.std.inspect.isclass(obj):
- if hasattr(obj, 'teardown_class'):
- obj.teardown_class.im_func(obj)
-
- def setup_method(self, extpy):
- """ return a tuple of (bound method or callable, teardown method). """
- method = extpy.resolve()
- if not hasattr(method, 'im_class'):
- return method, None
- if self._instance.__class__ != method.im_class:
- self._instance = method.im_class()
- method = method.__get__(self._instance, method.im_class)
- if hasattr(self._instance, 'setup_method'):
- print "execting setup", self._instance.setup_method
- self._instance.setup_method(method)
- return (method, getattr(self._instance, 'teardown_method', None))
+ self.stdout.close()
+ self.stderr.close()
+
+def waitfinish(channel):
+ try:
+ while 1:
+ try:
+ channel.waitclose(0.1)
+ except IOError:
+ continue
+ else:
+ failures = channel.receive()
+ return failures
+ break
+ finally:
+ #print "closing down channel and gateway"
+ channel.close()
+ channel.gateway.exit()
+
+def failure_master(args, filenames, failures):
+ gw = py.execnet.PopenGateway()
+ outproxy = StdouterrProxy(gw)
+ outproxy.setup()
+ try:
+ channel = gw.remote_exec("""
+ from py.__impl__.test.run import failure_slave
+ failure_slave(channel)
+ """)
+ channel.send((args, filenames))
+ channel.send(failures)
+ return waitfinish(channel)
+ finally:
+ outproxy.teardown()
+
+def failure_slave(channel):
+ """ we run this on the other side. """
+ args, filenames = channel.receive()
+ filenames = map(py.path.local, filenames)
+ py.test.config.readconfiguration(*filenames)
+ py.test.config.parseargs(args)
+
+ failures = channel.receive()
+ col = FailureCollector(failures)
+ driver = py.test.Driver(channel)
+ failures = driver.run(col)
+ channel.send(failures)
+
+class FailureCollector(py.test.collect.Collector):
+ def __init__(self, failures):
+ self.failures = failures
+ def __iter__(self):
+ for root,modpath in self.failures:
+ extpy = py.path.extpy(root, modpath)
+ yield self.Item(extpy)
+
+def master(args, filenames):
+ gw = py.execnet.PopenGateway()
+ outproxy = StdouterrProxy(gw)
+ outproxy.setup()
+ try:
+ channel = gw.remote_exec("""
+ from py.__impl__.test.run import slave
+ slave(channel)
+ """)
+ channel.send((args, filenames))
+ return waitfinish(channel)
+ finally:
+ outproxy.teardown()
+
+def slave(channel):
+ """ we run this on the other side. """
+ args, filenames = channel.receive()
+ filenames = map(py.path.local, filenames)
+ py.test.config.readconfiguration(*filenames)
+ py.test.config.parseargs(args)
+ fncollectors = list(getcollectors(filenames))
+
+ driver = py.test.Driver(channel)
+ failures = driver.run(fncollectors)
+ channel.send(failures)
+
+def getcollectors(filenames):
+ current = py.path.local()
+ for fn in filenames:
+ fullfn = current.join(fn, abs=1)
+ if fullfn.check(file=1):
+ yield py.test.collect.Module(fullfn)
+ elif fullfn.check(dir=1):
+ yield py.test.collect.Directory(fullfn)
+ else:
+ raise IOError, "%r does not exist" % fn
+
+def getanchors(args):
+ """ yield "anchors" from skimming the args for existing files/dirs. """
+ current = py.path.local()
+ l = []
+ for arg in args:
+ anchor = current.join(arg, abs=1)
+ if anchor.check():
+ l.append(anchor)
+ if not l:
+ l = [current]
+ return l
+
+#
+# Testing Modes
+#
+def inprocess(args, filenames):
+ """ we run this on the other side. """
+ fncollectors = list(getcollectors(filenames))
+ driver = py.test.Driver(None)
+ driver.run(fncollectors)
+
+def session(args, filenames):
+ while 1:
+ failures = master(args, filenames)
+ if not failures or not py.test.config.option.session:
+ break
+ while failures:
+ print "session mode: %d failures remaining" % len(failures)
+ waitfilechange()
+ failures = failure_master(args, filenames, failures)
More information about the pytest-commit
mailing list