[py-svn] r7148 - in py/dist/py: . test test/report/text

hpk at codespeak.net hpk at codespeak.net
Thu Oct 28 15:24:18 CEST 2004


Author: hpk
Date: Thu Oct 28 15:24:17 2004
New Revision: 7148

Added:
   py/dist/py/test/drive.py
      - copied, changed from r7140, py/dist/py/test/run.py
Modified:
   py/dist/py/__init__.py
   py/dist/py/test/cmdline.py
   py/dist/py/test/report/text/reporter.py
   py/dist/py/test/report/text/summary.py
   py/dist/py/test/run.py
Log:
- reorganized py.test implementation 

- now in "normal" mode no subprocess and no thread is currently started 
  in session mode threads and a child process are started 

- improved/enriched the reporter output slightly. 



Modified: py/dist/py/__init__.py
==============================================================================
--- py/dist/py/__init__.py	(original)
+++ py/dist/py/__init__.py	Thu Oct 28 15:24:17 2004
@@ -17,13 +17,13 @@
     'test.collect.PyCollector':'./test/collect.PyCollector',
     'test.collect.Error':      './test/collect.Error',
     'test.Item':               './test/item.Item', 
-    'test.Driver':       './test/run.Driver', 
+    'test.Driver':       './test/drive.Driver', 
     'test.Option':       './test/tool/optparse.Option', 
     'test.TextReporter': './test/report/text/reporter.TextReporter',
     'test.MemoReporter': './test/report/memo.MemoReporter',
-    'test.exit':       './test/run.exit',
-    'test.fail':       './test/run.fail',
-    'test.skip':       './test/run.skip',
+    'test.exit':       './test/drive.exit',
+    'test.fail':       './test/drive.fail',
+    'test.skip':       './test/drive.skip',
     'test.raises':       './test/raises.raises',
     'test.config':       './test/config.config',
     'test.compat.TestCase': './test/compat.TestCase',

Modified: py/dist/py/test/cmdline.py
==============================================================================
--- py/dist/py/test/cmdline.py	(original)
+++ py/dist/py/test/cmdline.py	Thu Oct 28 15:24:17 2004
@@ -1,14 +1,13 @@
 from __future__ import generators
 import py 
 import sys
-from py.__impl__.execnet.channel import ChannelFile, receive2file 
-from py.__impl__.test.config import configbasename 
+from py.__impl__.test import run 
 
 #
 # main entry point
 #
 
-def old(argv): 
+def _old(argv): 
     if argv is None:
         argv = py.std.sys.argv
         frame = py.std.sys._getframe(1)
@@ -18,178 +17,19 @@
         import __main__
         return [py.test.collect.Module(py.std.sys.argv[0])]
 
-def waitfilechange(): 
-    """ wait until project files are changed. """ 
-    rootdir = py.test.config.getfirst('rootdir')
-    rootdir = py.path.local(rootdir) 
-    fil = py.path.checker(fnmatch='*.py')
-    rec = py.path.checker(dotfile=0) 
-    statcache = {}
-    for path in rootdir.visit(fil, rec):
-        statcache[path] = path.stat()
-
-    print "waiting for file change below", str(rootdir)
-    while 1: 
-        py.std.time.sleep(0.4)
-        for path, cst in statcache.items(): 
-            try:
-                st = path.stat()
-            except path.NotFound:
-                return 
-            else:
-                if st.st_mtime != cst.st_mtime or \
-                   st.st_size != cst.st_size: 
-                    return 
-
-class FailingCollector(py.test.collect.Collector):
-    def __init__(self, faileditems):
-        self._faileditems = faileditems 
-
-    def __iter__(self):
-        for x in self._faileditems: 
-            yield x
-
 def main(): 
     args = py.std.sys.argv[1:]
-    py.test.config.readconfiguration(*getanchors(args)) 
+    py.test.config.readconfiguration(*run.getanchors(args)) 
 
     filenames = py.test.config.parseargs(args)
     if not filenames: 
         filenames.append(str(py.path.local()))
-
     try:
-        while 1:
-            failures = master(args, filenames)
-            if not failures or not py.test.config.option.session: 
-                break 
-            while failures: 
-                print "session mode: %d failures remaining" % len(failures)
-                waitfilechange() 
-                failures = failure_master(args, filenames, failures)
+        if py.test.config.option.session: 
+            run.session(args, filenames) 
+        else:
+            run.inprocess(args, filenames) 
     except KeyboardInterrupt:
         print
         print "Keybordinterrupt"
         raise SystemExit, 2
-   
-class StdouterrProxy:
-    def __init__(self, gateway):
-        self.gateway = gateway
-    def setup(self):
-        channel = self.gateway.remote_exec("""
-            import sys
-            out, err = channel.newchannel(), channel.newchannel()
-            channel.send(out) 
-            channel.send(err) 
-            sys.stdout, sys.stderr = out.open('w'), err.open('w') 
-        """)
-        self.stdout = channel.receive() 
-        self.stderr = channel.receive() 
-        channel.waitclose(1.0)
-        py.std.threading.Thread(target=receive2file, 
-                                args=(self.stdout, sys.stdout)).start()
-        py.std.threading.Thread(target=receive2file, 
-                                args=(self.stderr, sys.stderr)).start()
-    def teardown(self):
-        self.stdout.close()
-        self.stderr.close()
-
-def waitfinish(channel):
-    try:
-        while 1:
-            try:
-                channel.waitclose(0.1) 
-            except IOError:
-                continue
-            else:
-                failures = channel.receive()
-                return failures
-            break
-    finally:
-        #print "closing down channel and gateway"
-        channel.close()
-        channel.gateway.exit()
-
-def failure_master(args, filenames, failures): 
-    gw = py.execnet.PopenGateway() 
-    outproxy = StdouterrProxy(gw) 
-    outproxy.setup()
-    try:
-        channel = gw.remote_exec("""
-                from py.__impl__.test.cmdline import failure_slave
-                failure_slave(channel)
-        """) 
-        channel.send((args, filenames))
-        channel.send(failures)
-        return waitfinish(channel)
-    finally:
-        outproxy.teardown()
-
-def failure_slave(channel):
-    """ we run this on the other side. """
-    args, filenames = channel.receive()
-    filenames = map(py.path.local, filenames)
-    py.test.config.readconfiguration(*filenames) 
-    py.test.config.parseargs(args)
-
-    failures = channel.receive()
-    col = FailureCollector(failures)
-    driver = py.test.Driver(channel) 
-    failures = driver.run(col)
-    channel.send(failures) 
-
-class FailureCollector(py.test.collect.Collector):
-    def __init__(self, failures):
-        self.failures = failures
-    def __iter__(self):
-        for root,modpath in self.failures:
-            extpy = py.path.extpy(root, modpath) 
-            yield self.Item(extpy) 
-            
-def master(args, filenames): 
-    gw = py.execnet.PopenGateway() 
-    outproxy = StdouterrProxy(gw) 
-    outproxy.setup()
-    try:
-        channel = gw.remote_exec("""
-                from py.__impl__.test.cmdline import slave
-                slave(channel)
-        """) 
-        channel.send((args, filenames))
-        return waitfinish(channel)
-    finally:
-        outproxy.teardown()
-
-def slave(channel):
-    """ we run this on the other side. """
-    args, filenames = channel.receive()
-    filenames = map(py.path.local, filenames)
-    py.test.config.readconfiguration(*filenames) 
-    py.test.config.parseargs(args)
-    fncollectors = list(getcollectors(filenames))
-
-    driver = py.test.Driver(channel) 
-    failures = driver.run(fncollectors)
-    channel.send(failures) 
-
-def getcollectors(filenames):
-    current = py.path.local()
-    for fn in filenames:
-        fullfn = current.join(fn, abs=1)
-        if fullfn.check(file=1):
-            yield py.test.collect.Module(fullfn)
-        elif fullfn.check(dir=1):
-            yield py.test.collect.Directory(fullfn)
-        else:
-            raise IOError, "%r does not exist" % fn 
-        
-def getanchors(args):
-    """ yield "anchors" from skimming the args for existing files/dirs. """
-    current = py.path.local()
-    l = []
-    for arg in args: 
-        anchor = current.join(arg, abs=1) 
-        if anchor.check():
-            l.append(anchor) 
-    if not l:
-        l = [current]
-    return l 

Copied: py/dist/py/test/drive.py (from r7140, py/dist/py/test/run.py)
==============================================================================
--- py/dist/py/test/run.py	(original)
+++ py/dist/py/test/drive.py	Thu Oct 28 15:24:17 2004
@@ -50,7 +50,7 @@
 
     def run_collector_or_item(self, obj):
         """ run (possibly many) testitems and/or collectors. """
-        if self._channel.isclosed():
+        if self._channel and self._channel.isclosed():
             raise SystemExit, "Channel is closed"
         if isinstance(obj, py.test.Item):
             if not self.option.collectonly:

Modified: py/dist/py/test/report/text/reporter.py
==============================================================================
--- py/dist/py/test/report/text/reporter.py	(original)
+++ py/dist/py/test/report/text/reporter.py	Thu Oct 28 15:24:17 2004
@@ -33,14 +33,17 @@
         self.summary.option = self.option = py.test.config.option 
 
     def start(self, conf=None):
-        self.summary.starttime = now() 
         if conf is not None and conf.mypath is not None:
             self.out.line("using %s" % conf.mypath) 
-        self.out.sep("_")
-        self.out.sep("_", " TESTS STARTING ") 
-        self.out.sep("_")
+        self.out.sep("=", "test process starts") 
+        mode = py.test.config.option.session and 'session/child process' or 'inprocess'
+        self.out.line("testing-mode: %s" % mode) 
+        self.out.line("executable  : %s  (%s)" % 
+                          (py.std.sys.executable, repr_pythonversion()))
+        self.out.sep("=")
         if not self.option.nomagic:
             py.magic.invoke(assertion=1) 
+        self.summary.starttime = now() 
 
     def end(self):
         if not self.option.nomagic:
@@ -175,3 +178,11 @@
     #    offendingline = misc.getline(tb)
     #    return (origin, offendingline)
 
+
+def repr_pythonversion():
+    v = py.std.sys.version_info
+    try:
+        return "%s.%s.%s-%s-%s" % v
+    except ValueError:
+        return str(v)
+    

Modified: py/dist/py/test/report/text/summary.py
==============================================================================
--- py/dist/py/test/report/text/summary.py	(original)
+++ py/dist/py/test/report/text/summary.py	Thu Oct 28 15:24:17 2004
@@ -47,11 +47,13 @@
         sum = 0
         for typ in Item.Passed, Item.Failed, Item.Skipped:
             l = self.getlist(typ)
-            outlist.append('%d %s' % (len(l), typ.__name__.lower()))
+            if l:
+                outlist.append('%d %s' % (len(l), typ.__name__.lower()))
             sum += len(l)
-        self.out.sep('=', '%d TESTS FINISHED' % sum)
-        self.out.write('%4.2f seconds' % (self.endtime-self.starttime))
-        self.out.line(" (%s)" % ", ".join(outlist))
+        elapsed = self.endtime-self.starttime
+        status = "%s" % ", ".join(outlist)
+        self.out.sep('=', 'tests finished: %s in %4.2f seconds' % 
+                         (status, elapsed))
 
     def getexinfo(self, res):
         _,exc,tb = res.excinfo

Modified: py/dist/py/test/run.py
==============================================================================
--- py/dist/py/test/run.py	(original)
+++ py/dist/py/test/run.py	Thu Oct 28 15:24:17 2004
@@ -1,159 +1,179 @@
 from __future__ import generators
 import py 
-collect = py.test.collect 
-
-class Exit(Exception): 
-    """ for immediate program exits without tracebacks and reporter/summary. """
-    def __init__(self, item=None):
-        self.item = None
-        Exception.__init__(self) 
-
-def exit(*args):
-    raise Exit(*args) 
-
-def skip(msg="unknown reason"): 
-    """ skip with the given Message. """
-    raise py.test.Item.Skipped(msg=msg, tbindex=-2) 
-
-def fail(msg="unknown failure"): 
-    """ fail with the given Message. """
-    raise py.test.Item.Failed(msg=msg, tbindex=-2) 
-
-class Driver:
-    option = py.test.config.option
-    Exit = Exit
-
-    def __init__(self, channel): 
-        self.reporter = py.test.config.getfirst('getreporter') ()
-        self._setupstack = []
-        self._instance = None 
-        self._channel = channel 
-        self._failed = []
-
-    def run(self, collectors):
-        """ main loop for running tests. """
-        self.setup()
-        try:
-            try:
-                self.reporter.start()
-                for x in collectors: 
-                    self.run_collector_or_item(x) 
-            finally:
-                self.teardown()
-        except self.Exit, ex:
-            pass
-        self.reporter.end() 
-        l = []
-        for x in self._failed: 
-            l.append((str(x.extpy.root), str(x.extpy.modpath)))
-        return l 
-
-    def run_collector_or_item(self, obj):
-        """ run (possibly many) testitems and/or collectors. """
-        if self._channel.isclosed():
-            raise SystemExit, "Channel is closed"
-        if isinstance(obj, py.test.Item):
-            if not self.option.collectonly:
-                #if self.option.args:
-                #    if str(obj.path).find(self.option.args[0]) == -1:
-                #        return
-                self.runitem(obj)
-        elif isinstance(obj, py.test.collect.Collector):
-            self.runcollector(obj)
-        elif isinstance(obj, py.test.collect.Collector.Error): 
+from py.__impl__.execnet.channel import ChannelFile, receive2file
+from py.__impl__.test.config import configbasename
+import sys
+
+def waitfilechange(): 
+    """ wait until project files are changed. """ 
+    rootdir = py.test.config.getfirst('rootdir')
+    rootdir = py.path.local(rootdir) 
+    fil = py.path.checker(fnmatch='*.py')
+    rec = py.path.checker(dotfile=0) 
+    statcache = {}
+    for path in rootdir.visit(fil, rec):
+        statcache[path] = path.stat()
+
+    print "waiting for file change below", str(rootdir)
+    while 1: 
+        py.std.time.sleep(0.4)
+        for path, cst in statcache.items(): 
             try:
-                self.reporter.report_collect_error(obj)
-            except (KeyboardInterrupt, SystemExit):
-                raise 
-            except: 
-                print "*" * 80
-                print "Reporter Error"
-                print "*" * 80
-                import traceback
-                traceback.print_exc()
-                raise SystemExit, 1
-            if self.option.exitfirstproblem:
-                raise self.Exit() 
-        else:
-            raise TypeError("%r is not a Item or Collector instance" % obj)
-
-    def runcollector(self, collector):
-        close = self.reporter.open(collector)
-        try:
-            for obj in collector:
-                self.run_collector_or_item(obj)
-        finally:
-            if close:
-                close()
-
-    def runitem(self, item):
-        self.reporter.startitem(item)
-        try:
-            res = item.execute(self) or item.Passed()
-        except item.Outcome, res:
-            res.excinfo = py.std.sys.exc_info()
-        except (KeyboardInterrupt, SystemExit):
-            raise
-        except:
-            res = item.Failed(excinfo=py.std.sys.exc_info())
-        res.item = item
-        self.reporter.enditem(res)
-        if isinstance(res, (item.Failed,)): 
-            self._failed.append(item) 
-            if py.test.config.option.exitfirstproblem:
-                raise self.Exit(res.item) 
-
-    def setup_path(self, extpy):
-        """ setup objects along the path to the test-method 
-            (pointed to by extpy). Tear down any previously 
-            setup objects which are not directly needed. 
-        """ 
-        # setupstack contains (extpy, obj)'s of already setup objects 
-        # strict ordering is maintained, i.e. each extpy in
-        # the stack is "relto" the previous extpy. 
-        stack = self._setupstack 
-        while stack and not extpy.relto(stack[-1][0]): 
-            self._teardownone(stack.pop()[1])
-        rest = extpy.parts()[len(stack):-1]
-        for x in rest: 
-            stack.append((x, self._setupone(x)))
-
+                st = path.stat()
+            except path.NotFound:
+                return 
+            else:
+                if st.st_mtime != cst.st_mtime or \
+                   st.st_size != cst.st_size: 
+                    return 
+
+class FailingCollector(py.test.collect.Collector):
+    def __init__(self, faileditems):
+        self._faileditems = faileditems 
+
+    def __iter__(self):
+        for x in self._faileditems: 
+            yield x
+
+   
+class StdouterrProxy:
+    def __init__(self, gateway):
+        self.gateway = gateway
     def setup(self):
-        """ setup any neccessary resources. """ 
-        self._failed[:] = []
-
+        channel = self.gateway.remote_exec("""
+            import sys
+            out, err = channel.newchannel(), channel.newchannel()
+            channel.send(out) 
+            channel.send(err) 
+            sys.stdout, sys.stderr = out.open('w'), err.open('w') 
+        """)
+        self.stdout = channel.receive() 
+        self.stderr = channel.receive() 
+        channel.waitclose(1.0)
+        py.std.threading.Thread(target=receive2file, 
+                                args=(self.stdout, sys.stdout)).start()
+        py.std.threading.Thread(target=receive2file, 
+                                args=(self.stderr, sys.stderr)).start()
     def teardown(self):
-        """ teardown any resources the driver knows about. """ 
-        while self._setupstack: 
-            self._teardownone(self._setupstack.pop()[1]) 
-                
-    def _setupone(self, extpy):
-        obj = extpy.resolve() 
-        if py.std.inspect.ismodule(obj): 
-            if hasattr(obj, 'setup_module'):
-                obj.setup_module(obj) 
-        elif py.std.inspect.isclass(obj):
-            if hasattr(obj, 'setup_class'):
-                obj.setup_class.im_func(obj) 
-        return obj 
-
-    def _teardownone(self, obj):
-        if py.std.inspect.ismodule(obj): 
-            if hasattr(obj, 'teardown_module'):
-                obj.teardown_module(obj) 
-        elif py.std.inspect.isclass(obj):
-            if hasattr(obj, 'teardown_class'):
-                obj.teardown_class.im_func(obj) 
-
-    def setup_method(self, extpy): 
-        """ return a tuple of (bound method or callable, teardown method). """
-        method = extpy.resolve()
-        if not hasattr(method, 'im_class'):
-            return method, None
-        if self._instance.__class__ != method.im_class: 
-            self._instance = method.im_class() 
-        method = method.__get__(self._instance, method.im_class) 
-        if hasattr(self._instance, 'setup_method'):
-            print "execting setup", self._instance.setup_method
-            self._instance.setup_method(method) 
-        return (method, getattr(self._instance, 'teardown_method', None))
+        self.stdout.close()
+        self.stderr.close()
+
+def waitfinish(channel):
+    try:
+        while 1:
+            try:
+                channel.waitclose(0.1) 
+            except IOError:
+                continue
+            else:
+                failures = channel.receive()
+                return failures
+            break
+    finally:
+        #print "closing down channel and gateway"
+        channel.close()
+        channel.gateway.exit()
+
+def failure_master(args, filenames, failures): 
+    gw = py.execnet.PopenGateway() 
+    outproxy = StdouterrProxy(gw) 
+    outproxy.setup()
+    try:
+        channel = gw.remote_exec("""
+                from py.__impl__.test.run import failure_slave
+                failure_slave(channel)
+        """) 
+        channel.send((args, filenames))
+        channel.send(failures)
+        return waitfinish(channel)
+    finally:
+        outproxy.teardown()
+
+def failure_slave(channel):
+    """ we run this on the other side. """
+    args, filenames = channel.receive()
+    filenames = map(py.path.local, filenames)
+    py.test.config.readconfiguration(*filenames) 
+    py.test.config.parseargs(args)
+
+    failures = channel.receive()
+    col = FailureCollector(failures)
+    driver = py.test.Driver(channel) 
+    failures = driver.run(col)
+    channel.send(failures) 
+
+class FailureCollector(py.test.collect.Collector):
+    def __init__(self, failures):
+        self.failures = failures
+    def __iter__(self):
+        for root,modpath in self.failures:
+            extpy = py.path.extpy(root, modpath) 
+            yield self.Item(extpy) 
+            
+def master(args, filenames): 
+    gw = py.execnet.PopenGateway() 
+    outproxy = StdouterrProxy(gw) 
+    outproxy.setup()
+    try:
+        channel = gw.remote_exec("""
+                from py.__impl__.test.run import slave
+                slave(channel)
+        """) 
+        channel.send((args, filenames))
+        return waitfinish(channel)
+    finally:
+        outproxy.teardown()
+
+def slave(channel):
+    """ we run this on the other side. """
+    args, filenames = channel.receive()
+    filenames = map(py.path.local, filenames)
+    py.test.config.readconfiguration(*filenames) 
+    py.test.config.parseargs(args)
+    fncollectors = list(getcollectors(filenames))
+
+    driver = py.test.Driver(channel) 
+    failures = driver.run(fncollectors)
+    channel.send(failures) 
+
+def getcollectors(filenames):
+    current = py.path.local()
+    for fn in filenames:
+        fullfn = current.join(fn, abs=1)
+        if fullfn.check(file=1):
+            yield py.test.collect.Module(fullfn)
+        elif fullfn.check(dir=1):
+            yield py.test.collect.Directory(fullfn)
+        else:
+            raise IOError, "%r does not exist" % fn 
+        
+def getanchors(args):
+    """ yield "anchors" from skimming the args for existing files/dirs. """
+    current = py.path.local()
+    l = []
+    for arg in args: 
+        anchor = current.join(arg, abs=1) 
+        if anchor.check():
+            l.append(anchor) 
+    if not l:
+        l = [current]
+    return l 
+
+#
+# Testing Modes 
+#
+def inprocess(args, filenames): 
+    """ we run this on the other side. """
+    fncollectors = list(getcollectors(filenames))
+    driver = py.test.Driver(None) 
+    driver.run(fncollectors)
+
+def session(args, filenames):
+    while 1:
+        failures = master(args, filenames)
+        if not failures or not py.test.config.option.session: 
+            break 
+        while failures: 
+            print "session mode: %d failures remaining" % len(failures)
+            waitfilechange() 
+            failures = failure_master(args, filenames, failures)



More information about the pytest-commit mailing list