[py-svn] r51708 - in py/branch/guido-svn-auth/py/path/svn: . testing

guido at codespeak.net guido at codespeak.net
Wed Feb 20 22:28:59 CET 2008


Author: guido
Date: Wed Feb 20 22:28:58 2008
New Revision: 51708

Modified:
   py/branch/guido-svn-auth/py/path/svn/auth.txt
   py/branch/guido-svn-auth/py/path/svn/svncommon.py
   py/branch/guido-svn-auth/py/path/svn/testing/test_auth.py
   py/branch/guido-svn-auth/py/path/svn/urlcommand.py
Log:
Added support for SvnAuth to urlcommand - one can either pass the object to
the constructor, or set the .auth property on py.path.svnurl objects. Made
some relatively small refactorings to SvnCommandPath - it's still a bit messy,
but the popen and cmdexec calls are factored out now to make testing easier.


Modified: py/branch/guido-svn-auth/py/path/svn/auth.txt
==============================================================================
--- py/branch/guido-svn-auth/py/path/svn/auth.txt	(original)
+++ py/branch/guido-svn-auth/py/path/svn/auth.txt	Wed Feb 20 22:28:58 2008
@@ -2,9 +2,9 @@
 ==========================
 
 This document describes authentication support for both py.path.svnwc and
-py.path.svnurl (yet in its implemention phase). This allows using the library
-in a completely automated fashion, without having to provide svn credentials
-interactively.
+py.path.svnurl (yet in its implemention phase). This feature allows using the
+library in a completely automated fashion, without having to provide svn
+credentials interactively.
 
 Current implementation
 ----------------------
@@ -12,16 +12,17 @@
 The credentials are passed to the constructor of the path objects, and are used
 (transparently) for every action that accesses the server. Also, when provided,
 they are passed recursively to all child objects created by methods such as
-join(), ensure(), etc.
+join(), ensure(), etc. (XXX currently only true for svnurl, not for svnwc)
 
 To pass credentials to path objects, an SvnAuth class needs to be created to
 hold them. This is then passed to the constructor or methods as the 'auth'
-keyword argument.
+keyword argument. (XXX the latter currently only for svnwc, and preferrably
+that needs to be removed in favour of an .auth attribute like in svnurl)
 
 It is configurable whether the credentials are stored on disk. Storing them is
 useful in certain situations (executive access to the repository will not
 require the credentials to be passed) but might not be desired in others - for
-instance if a webserver runs more than one application, you do not want to
+instance if a webserver runs more than one application, one does not want to
 pollute the webserver's home directory (if it even has one). This behaviour can
 be controlled by passing a False value for the 'cache_auth' argument to
 SvnAuth.
@@ -40,7 +41,8 @@
 
   >>> auth = py.path.SvnAuth('user', 'pass', cache_auth=False,
   ...                        interactive=False)
-  >>> wc = py.path.svnwc(url, auth=auth)
+  >>> wcpath = py.path.svnwc(path, auth=auth)
+  >>> urlpath = py.path.svnurl(url, auth=auth)
 
 Open issues
 -----------
@@ -57,7 +59,7 @@
   Current idea: ignore this and let the client handle (so no passing auth
   around to the children).
 
-* Affected methods
+* Affected methods for svnwc:
 
   - switch
   - checkout
@@ -68,3 +70,8 @@
   - commit
   - log
   - status (for locking, etc.?)
+
+* Affected methods for svnurl:
+
+  not appropriate - the auth is passed to the constructor rather or set to
+  path.auth rather than passed to all methods

Modified: py/branch/guido-svn-auth/py/path/svn/svncommon.py
==============================================================================
--- py/branch/guido-svn-auth/py/path/svn/svncommon.py	(original)
+++ py/branch/guido-svn-auth/py/path/svn/svncommon.py	Wed Feb 20 22:28:58 2008
@@ -65,6 +65,7 @@
         """
         obj = object.__new__(self.__class__)
         obj.rev = kw.get('rev', self.rev)
+        obj.auth = kw.get('auth', self.auth)
         dirname, basename, purebasename, ext = self._getbyspec(
              "dirname,basename,purebasename,ext")
         if 'basename' in kw:
@@ -138,7 +139,7 @@
 
         args = tuple([arg.strip(self.sep) for arg in args])
         parts = (self.strpath, ) + args
-        newpath = self.__class__(self.sep.join(parts), self.rev)
+        newpath = self.__class__(self.sep.join(parts), self.rev, self.auth)
         return newpath
 
     def propget(self, name):

Modified: py/branch/guido-svn-auth/py/path/svn/testing/test_auth.py
==============================================================================
--- py/branch/guido-svn-auth/py/path/svn/testing/test_auth.py	(original)
+++ py/branch/guido-svn-auth/py/path/svn/testing/test_auth.py	Wed Feb 20 22:28:58 2008
@@ -30,7 +30,7 @@
     confdir.join('passwd').write(passwddata)
 
 def serve_bg(repopath):
-    pidfile = repopath.join('pid')
+    pidfile = py.path.local(repopath).join('pid')
     port = 10000
     e = None
     while port < 10010:
@@ -122,7 +122,164 @@
         assert wc.commands == [('co', 'url',
                                 '--username="user" --password="pass"')]
 
-class TestSvnAuthFunctional(object):
+class svnurl_no_svn(py.path.svnurl):
+    cmdexec_output = 'test'
+    popen_output = 'test'
+
+    def _cmdexec(self, cmd):
+        self.commands.append(cmd)
+        return self.cmdexec_output
+
+    def _popen(self, cmd):
+        self.commands.append(cmd)
+        return self.popen_output
+
+class TestSvnURLAuth(object):
+    def setup_method(self, meth):
+        svnurl_no_svn.commands = []
+
+    def test_init(self):
+        u = svnurl_no_svn('http://foo.bar/svn')
+        assert u.auth is None
+
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn', auth=auth)
+        assert u.auth is auth
+
+    def test_new(self):
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn/foo', auth=auth)
+        new = u.new(basename='bar')
+        assert new.auth is auth
+        assert new.url == 'http://foo.bar/svn/bar'
+
+    def test_join(self):
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn', auth=auth)
+        new = u.join('foo')
+        assert new.auth is auth
+        assert new.url == 'http://foo.bar/svn/foo'
+
+    def test_listdir(self):
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn', auth=auth)
+        u.cmdexec_output = '''\
+   1717 johnny           1529 Nov 04 14:32 LICENSE.txt
+   1716 johnny           5352 Nov 04 14:28 README.txt
+'''
+        paths = u.listdir()
+        assert paths[0].auth is auth
+        assert paths[1].auth is auth
+        assert paths[0].basename == 'LICENSE.txt'
+
+    def test_info(self):
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn/LICENSE.txt', auth=auth)
+        def dirpath(self):
+            return self
+        u.cmdexec_output = '''\
+   1717 johnny           1529 Nov 04 14:32 LICENSE.txt
+   1716 johnny           5352 Nov 04 14:28 README.txt
+'''
+        org_dp = u.__class__.dirpath
+        u.__class__.dirpath = dirpath
+        try:
+            info = u.info()
+        finally:
+            u.dirpath = org_dp
+        assert info.size == 1529
+
+    def test_open(self):
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn', auth=auth)
+        foo = u.join('foo')
+        foo.check = lambda *args, **kwargs: True
+        ret = foo.open()
+        assert ret == 'test'
+        assert foo.commands[0].endswith('svn cat "http://foo.bar/svn/foo" '
+                                        '--username="foo" --password="bar"')
+
+    def test_dirpath(self):
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn/foo', auth=auth)
+        parent = u.dirpath()
+        assert parent.auth is auth
+
+    def test_mkdir(self):
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn', auth=auth)
+        u.mkdir('foo', msg='created dir foo')
+        assert u.commands[0].endswith('svn mkdir "-m" "created dir foo" '
+                                      '"http://foo.bar/svn/foo" '
+                                      '--username="foo" --password="bar"')
+
+    def test_copy(self):
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn', auth=auth)
+        u2 = svnurl_no_svn('http://foo.bar/svn2')
+        u.copy(u2, 'copied dir')
+        assert u.commands[0].endswith('svn copy -m "copied dir" '
+                                      '"http://foo.bar/svn" '
+                                      '"http://foo.bar/svn2" '
+                                      '--username="foo" --password="bar"')
+
+    def test_rename(self):
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn/foo', auth=auth)
+        u.rename('http://foo.bar/svn/bar', 'moved foo to bar')
+        assert u.commands[0].endswith('svn move -m "moved foo to bar" --force '
+                                      '"http://foo.bar/svn/foo" '
+                                      '"http://foo.bar/svn/bar" '
+                                      '--username="foo" --password="bar"')
+
+    def test_remove(self):
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn/foo', auth=auth)
+        u.remove(msg='removing foo')
+        assert u.commands[0].endswith('svn rm -m "removing foo" '
+                                      '"http://foo.bar/svn/foo" '
+                                      '--username="foo" --password="bar"')
+
+    def test_export(self):
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn', auth=auth)
+        target = py.path.local('/foo')
+        u.export(target)
+        assert u.commands[0].endswith('svn export "http://foo.bar/svn" "/foo" '
+                                      '--username="foo" --password="bar"')
+
+    def test_log(self):
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn/foo', auth=auth)
+        u.popen_output = py.std.StringIO.StringIO('''\
+<?xml version="1.0"?>
+<log>
+<logentry revision="51381">
+<author>guido</author>
+<date>2008-02-11T12:12:18.476481Z</date>
+<msg>Creating branch to work on auth support for py.path.svn*.
+</msg>
+</logentry>
+</log>
+''')
+        u.check = lambda *args, **kwargs: True
+        ret = u.log(10, 20, verbose=True)
+        assert u.commands[0].endswith('svn log --xml -r 10:20 -v '
+                                      '"http://foo.bar/svn/foo" '
+                                      '--username="foo" --password="bar"')
+        assert len(ret) == 1
+        assert int(ret[0].rev) == 51381
+        assert ret[0].author == 'guido'
+
+    def test_propget(self):
+        auth = SvnAuth('foo', 'bar')
+        u = svnurl_no_svn('http://foo.bar/svn', auth=auth)
+        u.propget('foo')
+        assert u.commands[0].endswith('svn propget "foo" '
+                                      '"http://foo.bar/svn" '
+                                      '--username="foo" --password="bar"')
+
+class SvnAuthFunctionalTestBase(object):
     def setup_class(cls):
         if not option.runslowtests:
             py.test.skip('skipping slow functional tests - use --runslowtests '
@@ -136,6 +293,14 @@
         self.temppath = py.test.ensuretemp('TestSvnAuthFunctional.%s' % (
                                            func_name))
 
+    def _start_svnserve(self):
+        make_repo_auth(self.repopath, {'johnny': ('foo', 'rw')})
+        try:
+            return serve_bg(self.repopath.dirpath())
+        except IOError, e:
+            py.test.skip(str(e))
+
+class TestSvnWCAuthFunctional(SvnAuthFunctionalTestBase):
     def test_checkout_constructor_arg(self):
         port, pid = self._start_svnserve()
         try:
@@ -281,9 +446,75 @@
         finally:
             killproc(pid)
 
-    def _start_svnserve(self):
-        make_repo_auth(self.repopath, {'johnny': ('foo', 'rw')})
+class TestSvnURLAuthFunctional(SvnAuthFunctionalTestBase):
+    def test_listdir(self):
+        port, pid = self._start_svnserve()
         try:
-            return serve_bg(self.repopath.dirpath())
-        except IOError, e:
-            py.test.skip(str(e))
+            auth = SvnAuth('johnny', 'foo', cache_auth=False,
+                           interactive=False)
+            u = py.path.svnurl(
+                'svn://localhost:%s/%s' % (port, self.repopath.basename),
+                auth=auth)
+            u.ensure('foo')
+            paths = u.listdir()
+            assert len(paths) == 1
+            assert paths[0].auth is auth
+
+            auth = SvnAuth('foo', 'bar', interactive=False)
+            u = py.path.svnurl(
+                'svn://localhost:%s/%s' % (port, self.repopath.basename),
+                auth=auth)
+            py.test.raises(Exception, 'u.listdir()')
+        finally:
+            killproc(pid)
+
+    def test_copy(self):
+        port, pid = self._start_svnserve()
+        try:
+            auth = SvnAuth('johnny', 'foo', cache_auth=False,
+                           interactive=False)
+            u = py.path.svnurl(
+                'svn://localhost:%s/%s' % (port, self.repopath.basename),
+                auth=auth)
+            foo = u.ensure('foo')
+            bar = u.join('bar')
+            foo.copy(bar)
+            assert bar.check()
+            assert bar.auth is auth
+
+            auth = SvnAuth('foo', 'bar', interactive=False)
+            u = py.path.svnurl(
+                'svn://localhost:%s/%s' % (port, self.repopath.basename),
+                auth=auth)
+            foo = u.join('foo')
+            bar = u.join('bar')
+            py.test.raises(Exception, 'foo.copy(bar)')
+        finally:
+            killproc(pid)
+
+    def test_write_read(self):
+        port, pid = self._start_svnserve()
+        try:
+            auth = SvnAuth('johnny', 'foo', cache_auth=False,
+                           interactive=False)
+            u = py.path.svnurl(
+                'svn://localhost:%s/%s' % (port, self.repopath.basename),
+                auth=auth)
+            foo = u.ensure('foo')
+            fp = foo.open()
+            try:
+                data = fp.read()
+            finally:
+                fp.close()
+            assert data == ''
+
+            auth = SvnAuth('foo', 'bar', interactive=False)
+            u = py.path.svnurl(
+                'svn://localhost:%s/%s' % (port, self.repopath.basename),
+                auth=auth)
+            foo = u.join('foo')
+            py.test.raises(Exception, 'foo.open()')
+        finally:
+            killproc(pid)
+
+    # XXX rinse, repeat... :|

Modified: py/branch/guido-svn-auth/py/path/svn/urlcommand.py
==============================================================================
--- py/branch/guido-svn-auth/py/path/svn/urlcommand.py	(original)
+++ py/branch/guido-svn-auth/py/path/svn/urlcommand.py	Wed Feb 20 22:28:58 2008
@@ -21,10 +21,11 @@
     _lsrevcache = BuildcostAccessCache(maxentries=128)
     _lsnorevcache = AgingCache(maxentries=1000, maxseconds=60.0)
 
-    def __new__(cls, path, rev=None):
+    def __new__(cls, path, rev=None, auth=None):
         self = object.__new__(cls)
         if isinstance(path, cls): 
             rev = path.rev 
+            auth = path.auth
             path = path.strpath 
         proto, uri = path.split("://", 1)
         host, uripath = uri.split('/', 1)
@@ -36,6 +37,7 @@
         path = path.rstrip('/')
         self.strpath = path
         self.rev = rev
+        self.auth = auth
         return self
 
     def __repr__(self):
@@ -44,7 +46,8 @@
         else:
             return 'svnurl(%r, %r)' % (self.strpath, self.rev)
 
-    def _svn(self, cmd, *args):
+    def _svnwithrev(self, cmd, *args):
+        """ execute an svn command, append our own url and revision """
         if self.rev is None:
             return self._svnwrite(cmd, *args)
         else:
@@ -52,16 +55,28 @@
             return self._svnwrite(cmd, *args)
 
     def _svnwrite(self, cmd, *args):
+        """ execute an svn command, append our own url """
         l = ['svn %s' % cmd]
         args = ['"%s"' % self._escape(item) for item in args]
         l.extend(args)
         l.append('"%s"' % self._encodedurl())
         # fixing the locale because we can't otherwise parse
-        string = svncommon.fixlocale() + " ".join(l)
+        string = " ".join(l)
         if DEBUG:
             print "execing", string
+        out = self._svncmdexecauth(string)
+        return out
+
+    def _svncmdexecauth(self, cmd):
+        """ execute an svn command 'as is' """
+        cmd = svncommon.fixlocale() + cmd
+        if self.auth is not None:
+            cmd += ' ' + self.auth.makecmdoptions()
+        return self._cmdexec(cmd)
+
+    def _cmdexec(self, cmd):
         try:
-            out = process.cmdexec(string)
+            out = process.cmdexec(cmd)
         except py.process.cmdexec.Error, e:
             if (e.err.find('File Exists') != -1 or
                             e.err.find('File already exists') != -1):
@@ -69,6 +84,16 @@
             raise
         return out
 
+    def _svnpopenauth(self, cmd):
+        """ execute an svn command, return a pipe for reading stdin """
+        cmd = svncommon.fixlocale() + cmd
+        if self.auth is not None:
+            cmd += ' ' + self.auth.makecmdoptions()
+        return self._popen(cmd)
+
+    def _popen(self, cmd):
+        return os.popen(cmd)
+
     def _encodedurl(self):
         return self._escape(self.strpath)
 
@@ -76,14 +101,12 @@
         """ return an opened file with the given mode. """
         assert 'w' not in mode and 'a' not in mode, "XXX not implemented for svn cmdline"
         assert self.check(file=1) # svn cat returns an empty file otherwise
-        def popen(cmd):
-            return os.popen(cmd)
         if self.rev is None:
-            return popen(svncommon.fixlocale() +
-                            'svn cat "%s"' % (self._escape(self.strpath), ))
+            return self._svnpopenauth('svn cat "%s"' % (
+                                      self._escape(self.strpath), ))
         else:
-            return popen(svncommon.fixlocale() +
-                            'svn cat -r %s "%s"' % (self.rev, self._escape(self.strpath)))
+            return self._svnpopenauth('svn cat -r %s "%s"' % (
+                                      self.rev, self._escape(self.strpath)))
 
     def dirpath(self, *args, **kwargs):
         """ return the directory path of the current path joined
@@ -104,33 +127,37 @@
         commit_msg=kwargs.get('msg', "mkdir by py lib invocation")
         createpath = self.join(*args)
         createpath._svnwrite('mkdir', '-m', commit_msg)
-        self._lsnorevcache.delentry(createpath.dirpath().strpath)
+        auth = self.auth and self.auth.makecmdoptions() or None
+        self._lsnorevcache.delentry((createpath.dirpath().strpath, auth))
         return createpath
 
     def copy(self, target, msg='copied by py lib invocation'):
         """ copy path to target with checkin message msg."""
         if getattr(target, 'rev', None) is not None:
             raise py.error.EINVAL(target, "revisions are immutable")
-        process.cmdexec('svn copy -m "%s" "%s" "%s"' %(msg, 
-                                                self._escape(self), self._escape(target)))
-        self._lsnorevcache.delentry(target.dirpath().strpath)
+        self._svncmdexecauth('svn copy -m "%s" "%s" "%s"' %(msg,
+                             self._escape(self), self._escape(target)))
+        auth = self.auth and self.auth.makecmdoptions() or None
+        self._lsnorevcache.delentry((target.dirpath().strpath, auth))
 
     def rename(self, target, msg="renamed by py lib invocation"):
         """ rename this path to target with checkin message msg. """
         if getattr(self, 'rev', None) is not None:
             raise py.error.EINVAL(self, "revisions are immutable")
-        py.process.cmdexec('svn move -m "%s" --force "%s" "%s"' %(
-                           msg, self._escape(self), self._escape(target)))
-        self._lsnorevcache.delentry(self.dirpath().strpath)
-        self._lsnorevcache.delentry(self.strpath)
+        self._svncmdexecauth('svn move -m "%s" --force "%s" "%s"' %(
+                             msg, self._escape(self), self._escape(target)))
+        auth = self.auth and self.auth.makecmdoptions() or None
+        self._lsnorevcache.delentry((self.dirpath().strpath, auth))
+        self._lsnorevcache.delentry((self.strpath, auth))
 
     def remove(self, rec=1, msg='removed by py lib invocation'):
         """ remove a file or directory (or a directory tree if rec=1) with
 checkin message msg."""
         if self.rev is not None:
             raise py.error.EINVAL(self, "revisions are immutable")
-        process.cmdexec('svn rm -m "%s" "%s"' %(msg, self._escape(self)))
-        self._lsnorevcache.delentry(self.dirpath().strpath)
+        self._svncmdexecauth('svn rm -m "%s" "%s"' %(msg, self._escape(self)))
+        auth = self.auth and self.auth.makecmdoptions() or None
+        self._lsnorevcache.delentry((self.dirpath().strpath, auth))
 
     def export(self, topath):
         """ export to a local path
@@ -143,7 +170,7 @@
                 '"%s"' % (self._escape(topath),)]
         if self.rev is not None:
             args = ['-r', str(self.rev)] + args
-        process.cmdexec('svn export %s' % (' '.join(args),))
+        self._svncmdexecauth('svn export %s' % (' '.join(args),))
         return topath
 
     def ensure(self, *args, **kwargs):
@@ -173,19 +200,20 @@
                     "ensure %s" % self._escape(tocreate), 
                     self._escape(tempdir.join(basename)), 
                     x.join(basename)._encodedurl())
-            process.cmdexec(cmd) 
-            self._lsnorevcache.delentry(x.strpath)  # !!! 
+            self._svncmdexecauth(cmd) 
+            auth = self.auth and self.auth.makecmdoptions() or None
+            self._lsnorevcache.delentry((x.strpath, auth)) # !!!
         finally:    
             tempdir.remove() 
         return target
 
     # end of modifying methods
     def _propget(self, name):
-        res = self._svn('propget', name)
+        res = self._svnwithrev('propget', name)
         return res[:-1] # strip trailing newline
 
     def _proplist(self):
-        res = self._svn('proplist')
+        res = self._svnwithrev('proplist')
         lines = res.split('\n')
         lines = map(str.strip, lines[1:])
         return svncommon.PropListDict(self, lines)
@@ -194,7 +222,7 @@
         """ return sequence of name-info directory entries of self """
         def builder():
             try:
-                res = self._svn('ls', '-v')
+                res = self._svnwithrev('ls', '-v')
             except process.cmdexec.Error, e:
                 if e.err.find('non-existent in that revision') != -1:
                     raise py.error.ENOENT(self, e.err)
@@ -214,10 +242,13 @@
                     info = InfoSvnCommand(lsline)
                     nameinfo_seq.append((info._name, info))
             return nameinfo_seq
+        auth = self.auth and self.auth.makecmdoptions() or None
         if self.rev is not None:
-            return self._lsrevcache.getorbuild((self.strpath, self.rev), builder)
+            return self._lsrevcache.getorbuild((self.strpath, self.rev, auth),
+                                               builder)
         else:
-            return self._lsnorevcache.getorbuild(self.strpath, builder)
+            return self._lsnorevcache.getorbuild((self.strpath, auth),
+                                                 builder)
 
     def log(self, rev_start=None, rev_end=1, verbose=False):
         """ return a list of LogEntry instances for this path.
@@ -234,9 +265,8 @@
         else:
             rev_opt = "-r %s:%s" % (rev_start, rev_end)
         verbose_opt = verbose and "-v" or ""
-        xmlpipe =  os.popen(svncommon.fixlocale() +
-                      'svn log --xml %s %s "%s"' %
-                      (rev_opt, verbose_opt, self.strpath))
+        xmlpipe =  self._svnpopenauth('svn log --xml %s %s "%s"' %
+                                      (rev_opt, verbose_opt, self.strpath))
         from xml.dom import minidom
         tree = minidom.parse(xmlpipe)
         result = []



More information about the pytest-commit mailing list