[py-svn] py-trunk commit 687b208efc07: * refactor some setup/teardown/ensuretemp usages to use funcargs

commits-noreply at bitbucket.org commits-noreply at bitbucket.org
Sun Sep 6 13:39:32 CEST 2009


# HG changeset patch -- Bitbucket.org
# Project py-trunk
# URL http://bitbucket.org/hpk42/py-trunk/overview/
# User holger krekel <holger at merlinux.eu>
# Date 1252185718 -7200
# Node ID 687b208efc077843d12f69a9bdcce6c1813facc1
# Parent 8f764e29e5e9f13d1a3accef0e39f522b03da22a
* refactor some setup/teardown/ensuretemp usages to use funcargs
* introduce monkeypatch.syspath_prepend for safe monkey patching of module import path
* fix monkeypatch naming

--- a/py/test/plugin/test_pytest_capture.py
+++ b/py/test/plugin/test_pytest_capture.py
@@ -10,7 +10,7 @@ class TestCaptureManager:
         try:
             assert capman._getmethod(config, None) == "sys" 
         finally:
-            monkeypatch.finalize()
+            monkeypatch.undo()
 
     def test_configure_per_fspath(self, testdir):
         config = testdir.parseconfig(testdir.tmpdir)

--- a/py/execnet/testing/test_rsync.py
+++ b/py/execnet/testing/test_rsync.py
@@ -2,38 +2,37 @@ import py
 from py.execnet import RSync
 
 
-def setup_module(mod):
-    mod.gw = py.execnet.PopenGateway()
-    mod.gw2 = py.execnet.PopenGateway()
+def pytest_funcarg__gw1(request):
+    return request.cached_setup(
+        setup=py.execnet.PopenGateway,
+        teardown=lambda val: val.exit(),
+        scope="module"
+    )
+pytest_funcarg__gw2 = pytest_funcarg__gw1
 
-def teardown_module(mod):
-    mod.gw.exit()
-    mod.gw2.exit()
+def pytest_funcarg__dirs(request):
+    t = request.getfuncargvalue('tmpdir')
+    class dirs:
+        source = t.join("source")
+        dest1 = t.join("dest1")
+        dest2 = t.join("dest2")
+    return dirs
 
-
-class DirSetup:
-    def setup_method(self, method):
-        name = "%s.%s" %(self.__class__.__name__, method.__name__)
-        self.tmpdir = t = py.test.ensuretemp(name)
-        self.source = t.join("source")
-        self.dest1 = t.join("dest1")
-        self.dest2 = t.join("dest2")
-
-class TestRSync(DirSetup):
-    def test_notargets(self):
-        rsync = RSync(self.source)
+class TestRSync:
+    def test_notargets(self, dirs):
+        rsync = RSync(dirs.source)
         py.test.raises(IOError, "rsync.send()")
         assert rsync.send(raises=False) is None
 
-    def test_dirsync(self):
-        dest = self.dest1
-        dest2 = self.dest2
-        source = self.source
+    def test_dirsync(self, dirs, gw1, gw2):
+        dest = dirs.dest1
+        dest2 = dirs.dest2
+        source = dirs.source
 
         for s in ('content1', 'content2', 'content2-a-bit-longer'): 
             source.ensure('subdir', 'file1').write(s) 
-            rsync = RSync(self.source)
-            rsync.add_target(gw, dest)
+            rsync = RSync(dirs.source)
+            rsync.add_target(gw1, dest)
             rsync.add_target(gw2, dest2)
             rsync.send()
             assert dest.join('subdir').check(dir=1)
@@ -49,76 +48,70 @@ class TestRSync(DirSetup):
         source.join('subdir').remove('file1')
         rsync = RSync(source)
         rsync.add_target(gw2, dest2)
-        rsync.add_target(gw, dest)
+        rsync.add_target(gw1, dest)
         rsync.send()
         assert dest.join('subdir', 'file1').check(file=1)
         assert dest2.join('subdir', 'file1').check(file=1)
         rsync = RSync(source)
-        rsync.add_target(gw, dest, delete=True)
+        rsync.add_target(gw1, dest, delete=True)
         rsync.add_target(gw2, dest2)
         rsync.send()
         assert not dest.join('subdir', 'file1').check() 
         assert dest2.join('subdir', 'file1').check() 
 
-    def test_dirsync_twice(self):
-        source = self.source
+    def test_dirsync_twice(self, dirs, gw1, gw2):
+        source = dirs.source
         source.ensure("hello")
         rsync = RSync(source)
-        rsync.add_target(gw, self.dest1)
+        rsync.add_target(gw1, dirs.dest1)
         rsync.send()
-        assert self.dest1.join('hello').check()
+        assert dirs.dest1.join('hello').check()
         py.test.raises(IOError, "rsync.send()")
         assert rsync.send(raises=False) is None
-        rsync.add_target(gw, self.dest2)
+        rsync.add_target(gw1, dirs.dest2)
         rsync.send()
-        assert self.dest2.join('hello').check()
+        assert dirs.dest2.join('hello').check()
         py.test.raises(IOError, "rsync.send()")
         assert rsync.send(raises=False) is None
 
-    def test_rsync_default_reporting(self):
-        source = self.source
+    def test_rsync_default_reporting(self, capsys, dirs, gw1):
+        source = dirs.source
         source.ensure("hello")
-        cap = py.io.StdCapture()
-        try:
-            rsync = RSync(source)
-            rsync.add_target(gw, self.dest1)
-            rsync.send()
-        finally:
-            out, err = cap.reset()
+        rsync = RSync(source)
+        rsync.add_target(gw1, dirs.dest1)
+        rsync.send()
+        out, err = capsys.readouterr()
         assert out.find("hello") != -1
 
-    def test_rsync_non_verbose(self):
-        source = self.source
+    def test_rsync_non_verbose(self, capsys, dirs, gw1):
+        source = dirs.source
         source.ensure("hello")
-        cap = py.io.StdCapture()
-        try:
-            rsync = RSync(source, verbose=False)
-            rsync.add_target(gw, self.dest1)
-            rsync.send()
-        finally:
-            out, err = cap.reset()
+        rsync = RSync(source, verbose=False)
+        rsync.add_target(gw1, dirs.dest1)
+        rsync.send()
+        out, err = capsys.readouterr()
         assert not out
         assert not err
 
-    def test_symlink_rsync(self):
+    def test_symlink_rsync(self, dirs, gw1):
         if py.std.sys.platform == 'win32':
             py.test.skip("symlinks are unsupported on Windows.")
-        source = self.source
-        dest = self.dest1
-        self.source.ensure("existant")
+        source = dirs.source
+        dest = dirs.dest1
+        dirs.source.ensure("existant")
         source.join("rellink").mksymlinkto(source.join("existant"), absolute=0)
         source.join('abslink').mksymlinkto(source.join("existant"))
         
         rsync = RSync(source)
-        rsync.add_target(gw, dest)
+        rsync.add_target(gw1, dest)
         rsync.send()
         
         assert dest.join('rellink').readlink() == dest.join("existant")
         assert dest.join('abslink').readlink() == dest.join("existant")
 
-    def test_callback(self):
-        dest = self.dest1
-        source = self.source
+    def test_callback(self, dirs, gw1):
+        dest = dirs.dest1
+        source = dirs.source
         source.ensure("existant").write("a" * 100)
         source.ensure("existant2").write("a" * 10)
         total = {}
@@ -127,14 +120,14 @@ class TestRSync(DirSetup):
 
         rsync = RSync(source, callback=callback)
         #rsync = RSync()
-        rsync.add_target(gw, dest)
+        rsync.add_target(gw1, dest)
         rsync.send()
 
         assert total == {("list", 110):True, ("ack", 100):True, ("ack", 10):True}
 
-    def test_file_disappearing(self):
-        dest = self.dest1
-        source = self.source 
+    def test_file_disappearing(self, dirs, gw1):
+        dest = dirs.dest1
+        source = dirs.source 
         source.ensure("ex").write("a" * 100)
         source.ensure("ex2").write("a" * 100)
 
@@ -147,7 +140,7 @@ class TestRSync(DirSetup):
                 return True
 
         rsync = DRsync(source)
-        rsync.add_target(gw, dest)
+        rsync.add_target(gw1, dest)
         rsync.send()
         assert rsync.x == 1
         assert len(dest.listdir()) == 1

--- a/py/path/testing/test_local.py
+++ b/py/path/testing/test_local.py
@@ -325,9 +325,8 @@ class TestImport:
         from xxxpackage import module1 
         assert module1 is mod1
 
-def test_pypkgdir():
-    datadir = py.test.ensuretemp("pypkgdir")
-    pkg = datadir.ensure('pkg1', dir=1)
+def test_pypkgdir(tmpdir):
+    pkg = tmpdir.ensure('pkg1', dir=1)
     pkg.ensure("__init__.py")
     pkg.ensure("subdir/__init__.py")
     assert pkg.pypkgpath() == pkg

--- a/py/execnet/testing/test_gwmanage.py
+++ b/py/execnet/testing/test_gwmanage.py
@@ -6,6 +6,7 @@
 """
 
 import py
+import os
 from py.__.execnet.gwmanage import GatewayManager, HostRSync
 
 class TestGatewayManagerPopen:
@@ -75,7 +76,6 @@ class TestGatewayManagerPopen:
         call = rec.popcall("pyexecnet_gwmanage_rsyncfinish") 
 
     def test_multi_chdir_popen_with_path(self, testdir):
-        import os
         hm = GatewayManager(["popen//chdir=hello"] * 2)
         testdir.tmpdir.chdir()
         hellopath = testdir.tmpdir.mkdir("hello").realpath()
@@ -117,7 +117,7 @@ class TestGatewayManagerPopen:
 
 class pytest_funcarg__mysetup:
     def __init__(self, request):
-        tmp = request.config.mktemp(request.function.__name__, numbered=True)
+        tmp = request.getfuncargvalue('tmpdir')
         self.source = tmp.mkdir("source")
         self.dest = tmp.mkdir("dest")
         request.getfuncargvalue("_pytest") # to have patching of py._com.comregistry

--- a/py/execnet/testing/test_gateway.py
+++ b/py/execnet/testing/test_gateway.py
@@ -689,8 +689,7 @@ class TestSshGateway(BasicRemoteExecutio
         cls.sshhost = getspecssh().ssh
         cls.gw = py.execnet.SshGateway(cls.sshhost)
 
-    def test_sshconfig_functional(self):
-        tmpdir = py.test.ensuretemp("test_sshconfig")
+    def test_sshconfig_functional(self, tmpdir):
         ssh_config = tmpdir.join("ssh_config") 
         ssh_config.write(
             "Host alias123\n"

--- a/py/test/plugin/pytest_monkeypatch.py
+++ b/py/test/plugin/pytest_monkeypatch.py
@@ -48,7 +48,7 @@ object, however.
 .. _`monkeypatch blog post`: http://tetamap.wordpress.com/2009/03/03/monkeypatching-in-unit-tests-done-right/
 """
 
-import py, os
+import py, os, sys
 
 def pytest_funcarg__monkeypatch(request):
     """The returned ``monkeypatch`` funcarg provides these 
@@ -60,6 +60,7 @@ def pytest_funcarg__monkeypatch(request)
         monkeypatch.delitem(obj, name, raising=True)
         monkeypatch.setenv(name, value, prepend=False) 
         monkeypatch.delenv(name, value, raising=True)
+        monkeypatch.syspath_prepend(path)
 
     All modifications will be undone when the requesting 
     test function finished its execution.  For the ``del`` 
@@ -111,6 +112,11 @@ class MonkeyPatch:
     def delenv(self, name, raising=True):
         self.delitem(os.environ, name, raising=raising)
 
+    def syspath_prepend(self, path):
+        if not hasattr(self, '_savesyspath'):
+            self._savesyspath = sys.path[:]
+        sys.path.insert(0, str(path))
+
     def undo(self):
         for obj, name, value in self._setattr:
             if value is not notset:
@@ -124,7 +130,8 @@ class MonkeyPatch:
             else:
                 dictionary[name] = value
         self._setitem[:] = []
-
+        if hasattr(self, '_savesyspath'):
+            sys.path[:] = self._savesyspath
 
 def test_setattr():
     class A:
@@ -242,4 +249,20 @@ def test_monkeypatch_plugin(testdir):
     """)
     res = reprec.countoutcomes()
     assert tuple(res) == (1, 0, 0), res
-        
+
+def test_syspath_prepend():
+    old = list(sys.path)
+    try:
+        monkeypatch = MonkeyPatch()
+        monkeypatch.syspath_prepend('world')
+        monkeypatch.syspath_prepend('hello')
+        assert sys.path[0] == "hello"
+        assert sys.path[1] == "world"
+        monkeypatch.undo()
+        assert sys.path == old 
+        monkeypatch.undo()
+        assert sys.path == old 
+    finally:
+        sys.path[:] = old
+
+            

--- a/py/_testing/test_initpkg.py
+++ b/py/_testing/test_initpkg.py
@@ -48,7 +48,7 @@ def test_virtual_module_identity():
 
 def test_importall():
     base = py.path.local(py.__file__).dirpath()
-    nodirs = (
+    nodirs = [
         base.join('test', 'testing', 'data'),
         base.join('test', 'web'),
         base.join('path', 'gateway',),
@@ -57,10 +57,14 @@ def test_importall():
         base.join('test', 'testing', 'import_test'),
         base.join('bin'),
         base.join('code', 'oldmagic.py'),
-        base.join('code', '_assertionold.py'),
         base.join('execnet', 'script'),
         base.join('compat', 'testing'),
-    )
+    ]
+    if sys.version_info >= (3,0):
+        nodirs.append(base.join('code', '_assertionold.py'))
+    else:
+        nodirs.append(base.join('code', '_assertionnew.py'))
+        
     def recurse(p):
         return p.check(dotfile=0) and p.basename != "attic"
 

--- a/py/path/testing/conftest.py
+++ b/py/path/testing/conftest.py
@@ -10,8 +10,9 @@ def pytest_funcarg__repowc1(request):
         py.test.skip("svn binary not found")
 
     modname = request.module.__name__
+    tmpdir = request.getfuncargvalue("tmpdir")
     repo, wc = request.cached_setup(
-        setup=lambda: getrepowc("repo-"+modname, "wc-" + modname), 
+        setup=lambda: getrepowc(tmpdir, "repo-"+modname, "wc-" + modname), 
         scope="module", 
     )
     for x in ('test_remove', 'test_move', 'test_status_deleted'):
@@ -21,8 +22,9 @@ def pytest_funcarg__repowc1(request):
     return repo, wc
 
 def pytest_funcarg__repowc2(request):
+    tmpdir = request.getfuncargvalue("tmpdir")
     name = request.function.__name__
-    return getrepowc("%s-repo-2" % name, "%s-wc-2" % name)
+    return getrepowc(tmpdir, "%s-repo-2" % name, "%s-wc-2" % name)
 
 def getsvnbin():
     if svnbin is None:
@@ -32,9 +34,9 @@ def getsvnbin():
 # make a wc directory out of a given root url
 # cache previously obtained wcs!
 #
-def getrepowc(reponame='basetestrepo', wcname='wc'):
-    repo = py.test.ensuretemp(reponame)
-    wcdir = py.test.ensuretemp(wcname)
+def getrepowc(tmpdir, reponame='basetestrepo', wcname='wc'):
+    repo = tmpdir.mkdir(reponame)
+    wcdir = tmpdir.mkdir(wcname)
     repo.ensure(dir=1)
     py.process.cmdexec('svnadmin create "%s"' %
             svncommon._escape_helper(repo))



More information about the pytest-commit mailing list