[py-svn] r62043 - in py/branch/pytestplugin/py: . misc/testing test test/testing

hpk at codespeak.net hpk at codespeak.net
Fri Feb 20 03:47:45 CET 2009


Author: hpk
Date: Fri Feb 20 03:47:43 2009
New Revision: 62043

Removed:
   py/branch/pytestplugin/py/test/testing/plugintester.py
Modified:
   py/branch/pytestplugin/py/_com.py
   py/branch/pytestplugin/py/misc/testing/test_com.py
   py/branch/pytestplugin/py/test/handleplugin.py
   py/branch/pytestplugin/py/test/pycollect.py
   py/branch/pytestplugin/py/test/testing/test_handleplugin.py
Log:
* have Call object be based on methods
* add callevent/subscribe/unsubscribe methods to PluginManager 



Modified: py/branch/pytestplugin/py/_com.py
==============================================================================
--- py/branch/pytestplugin/py/_com.py	(original)
+++ py/branch/pytestplugin/py/_com.py	Fri Feb 20 03:47:43 2009
@@ -4,28 +4,20 @@
 class Call:
     NONEASRESULT = object()
 
-    def __init__(self, callees, methname, *args, **kwargs):
-        self.callees = list(callees) 
-        self.methname = methname
+    def __init__(self, methods, *args, **kwargs):
+        self.methods = methods
         self.args = args 
         self.kwargs = kwargs 
         self.results = []
 
-    def listmethods(self):
-        l = []
-        for callee in self.callees:
-            try:
-                l.append(getattr(callee, self.methname))
-            except AttributeError:
-                continue 
-        return l 
-
     def execute(self, firstresult=False):
-        self.methods = self.listmethods()
         while self.methods:
             self.currentmethod = self.methods.pop()
-            varnames = self.currentmethod.im_func.func_code.co_varnames
-            if varnames >1 and varnames[1] == 'call':
+            try:
+                varnames = self.currentmethod.im_func.func_code.co_varnames
+            except AttributeError:
+                varnames = ()
+            if len(varnames) > 1 and varnames[1] == 'call':
                 res = self.currentmethod(self, *self.args, **self.kwargs)
             else:
                 res = self.currentmethod(*self.args, **self.kwargs)
@@ -46,6 +38,7 @@
             bus = EventBus()
         self.bus = bus 
         self.plugins = []
+        self._callbacks = []
 
     def import_module(self, modspec):
         # XXX allow modspec to specify version / lookup 
@@ -75,27 +68,41 @@
         #print "registering", self, plugin
         self.bus.notify(pluginregistered=plugin)
 
-    def iterattr(self, attrname):
-        for plugin in self.plugins:
+    def listattr(self, attrname, plugins=None, extra=()):
+        l = []
+        if plugins is None:
+            plugins = self.plugins
+        if extra:
+            plugins += list(extra)
+        for plugin in plugins:
             try:
-                yield getattr(plugin, attrname)
+                l.append(getattr(plugin, attrname))
             except AttributeError:
                 continue 
+        return l
 
-    #def Call(self, methname, *args, **kwargs):
-    #    """ return call object for executing a plugin call. """
-    #    return Call(self.plugins, methname, args, kwargs)
+    def call_each(self, methname, *args, **kwargs):
+        """ return call object for executing a plugin call. """
+        return Call(self.listattr(methname), *args, **kwargs).execute()
 
     def call_firstresult(self, methname, *args, **kwargs):
         """ return first non-None result of a plugin method. """ 
-        return Call(self.plugins, methname, *args, **kwargs).execute(firstresult=True)
-
-    def call_each(self, methname, *args, **kwargs):
-        """ return call object for executing a plugin call. """
-        return Call(self.plugins, methname, *args, **kwargs).execute()
+        return Call(self.listattr(methname), *args, **kwargs).execute(firstresult=True)
 
     def callplugin(self, plugin, methname, *args, **kwargs):
-        return Call([plugin], methname, *args, **kwargs).execute(firstresult=True)
+        return Call(self.listattr(methname, plugins=[plugin]), 
+                    *args, **kwargs).execute(firstresult=True)
+
+    def callevent(self, eventname, *args, **kwargs):
+        return Call(
+            self.listattr("pyevent_" + eventname) + self.listattr('pyevent') + 
+            self._callbacks, *args, **kwargs).execute()
+
+    def subscribe(self, callback):
+        self._callbacks.append(callback)
+
+    def unsubscribe(self, callback):
+        self._callbacks.remove(callback)
 
 class EventBus(object): 
     """ Event Bus for distributing named and unnamed events. """ 

Modified: py/branch/pytestplugin/py/misc/testing/test_com.py
==============================================================================
--- py/branch/pytestplugin/py/misc/testing/test_com.py	(original)
+++ py/branch/pytestplugin/py/misc/testing/test_com.py	Fri Feb 20 03:47:43 2009
@@ -24,7 +24,7 @@
                
         p1 = P1() 
         p2 = P2() 
-        call = Call([p1, p2], 'm', 23)
+        call = Call([p1.m, p2.m], 23)
         reslist = call.execute()
         assert len(reslist) == 2
         # ensure reversed order 
@@ -34,7 +34,7 @@
         class P1:
             def m(self, x):
                 return x
-        call = Call([P1()], 'm', 23)
+        call = Call([P1().m], 23)
         assert call.execute() == [23]
         assert call.execute(firstresult=True) == 23
  
@@ -108,7 +108,7 @@
         assert pm.call_firstresult('m') is None
         assert pm.call_each('m') == [None, None]
 
-    def test_iterattr(self):
+    def test_listattr(self):
         pm = PluginManager()
         class api1:
             x = 42
@@ -116,10 +116,24 @@
             x = 41
         pm.register(api1())
         pm.register(api2())
-        l = list(pm.iterattr('x'))
+        l = list(pm.listattr('x'))
         l.sort()
         assert l == [41, 42]
 
+    def test_callevent(self):
+        pm = PluginManager()
+        class api1:
+            def pyevent_some(self, x): return x + 1
+        class api2:
+            def pyevent(self, x): return x + 2
+        l = []
+        pm.register(api1())
+        pm.register(api2())
+        pm.subscribe(l.append)
+        res = pm.callevent('some', 0)
+        assert res == [2, 1]
+        assert l == [0]
+
     def test_consider_env(self):
         # XXX write a helper for preserving os.environ 
         pm = PluginManager()

Modified: py/branch/pytestplugin/py/test/handleplugin.py
==============================================================================
--- py/branch/pytestplugin/py/test/handleplugin.py	(original)
+++ py/branch/pytestplugin/py/test/handleplugin.py	Fri Feb 20 03:47:43 2009
@@ -48,7 +48,7 @@
     #
     # 
     def getfirst(self, methname):
-        for x in self.pm.iterattr(methname):
+        for x in self.pm.listattr(methname):
             return x
         
     def call_firstresult(self, *args, **kwargs):
@@ -62,7 +62,7 @@
         # XXX think about sorting/grouping of options from user-perspective 
         #assert self.pm.list
         opts = []
-        for options in self.pm.iterattr("pytest_cmdlineoptions"):
+        for options in self.pm.listattr("pytest_cmdlineoptions"):
             opts.extend(options)
         config.addoptions("ungrouped options added by plugins", *opts) 
 

Modified: py/branch/pytestplugin/py/test/pycollect.py
==============================================================================
--- py/branch/pytestplugin/py/test/pycollect.py	(original)
+++ py/branch/pytestplugin/py/test/pycollect.py	Fri Feb 20 03:47:43 2009
@@ -355,9 +355,10 @@
 
     def fillarg(self, argname, kwargs):
         value = argfinalizer = None
+        pm = self._config.pluginmanager.pm
         call = py._com.Call(
-            self._config.pluginmanager.pm.plugins + [self.parent.obj], 
-            "pytest_pyfunc_arg", pyfuncitem=self, argname=argname
+            pm.listattr("pytest_pyfunc_arg", extra=[self.parent.obj]), 
+            pyfuncitem=self, argname=argname
         )
         res = call.execute(firstresult=True)
         if res:

Deleted: /py/branch/pytestplugin/py/test/testing/plugintester.py
==============================================================================
--- /py/branch/pytestplugin/py/test/testing/plugintester.py	Fri Feb 20 03:47:43 2009
+++ (empty file)
@@ -1,71 +0,0 @@
-import py
-from py.__.test.pmanage import PluginManager
-
-# generic test methods 
-
-def nocalls(impname):
-    pname = impname.split("_")[-1]
-    pm = PluginManager()
-    pm.import_plugin(impname) 
-    plugin = pm.getplugin(pname)
-    assert plugin
-    callhooklist = [
-        ("pytest_configure", "config"), 
-        ("pytest_unconfigure", "config"), 
-        ("pytest_report_teststatus", "event"),
-        ("pytest_terminal_summary", "terminalreporter"),
-        ("pytest_event", "event"), 
-    ]
-    plugin_pytest_methods = dict([item for item in vars(plugin.__class__).items() 
-                                if item[0].startswith("pytest_")])
-    print plugin_pytest_methods
-    for name, argnames in callhooklist:
-        if name in plugin_pytest_methods:
-            func = plugin_pytest_methods.pop(name)
-            for argname in argnames.split(","):
-                assert argname in func.func_code.co_varnames
-    try:
-        value = plugin_pytest_methods.pop("pytest_cmdlineoptions")
-    except KeyError:
-        pass
-    else:
-        assert isinstance(value, list) 
-    assert not plugin_pytest_methods
-
-def setupfs(tmpdir):
-    tmpdir.join("test_x.py").write(py.code.Source("""
-        import py
-        def test_pass():
-            pass
-        def test_fail():
-            assert 0
-        def test_skip():
-            py.test.skip('hello')
-    """))
-
-def functional(impname, *cmdlineopts):
-    from py.__.test.config import Config
-    tmpdir = py.test.ensuretemp("autotest_" + impname)
-    pname = impname.split("_")[-1]
-    olddir = tmpdir.chdir()
-    try:
-        config = Config()
-        setupfs(tmpdir)
-        config.parse([tmpdir] + list(cmdlineopts)) 
-        try:
-            config.pluginmanager.import_plugin(impname)
-        except ValueError:
-            pass # already registered 
-        plugin = config.pluginmanager.getplugin(pname)
-        assert plugin
-        config.pluginmanager.configure(config)
-        session = config.initsession()
-        exitstatus = session.main()
-        config.pluginmanager.unconfigure(config)
-        return tmpdir
-    finally:
-        olddir.chdir()
-
-def plugintester(impname):
-    plugintester_nocalls(impname)
-    plugintester_standard_events(impname)

Modified: py/branch/pytestplugin/py/test/testing/test_handleplugin.py
==============================================================================
--- py/branch/pytestplugin/py/test/testing/test_handleplugin.py	(original)
+++ py/branch/pytestplugin/py/test/testing/test_handleplugin.py	Fri Feb 20 03:47:43 2009
@@ -44,7 +44,7 @@
 
 def test_addcmdlineoptions():
     class PseudoPM:
-        def iterattr(self, name):
+        def listattr(self, name):
             assert name == "pytest_cmdlineoptions"
             return [("xxx", 
              [py.std.optparse.Option("--pseudopm", action="store", dest="x")])]



More information about the pytest-commit mailing list