[py-svn] r53735 - in py/branch/code-coverage/py: . apigen/testing doc misc/testing path/svn path/svn/testing test
nilton at codespeak.net
nilton at codespeak.net
Sun Apr 13 04:48:14 CEST 2008
Author: nilton
Date: Sun Apr 13 04:48:13 2008
New Revision: 53735
Added:
py/branch/code-coverage/py/path/svn/testing/test_auth.py
- copied unchanged from r53733, py/trunk/py/path/svn/testing/test_auth.py
Modified:
py/branch/code-coverage/py/__init__.py
py/branch/code-coverage/py/apigen/testing/test_apigen_functional.py
py/branch/code-coverage/py/conftest.py
py/branch/code-coverage/py/doc/path.txt
py/branch/code-coverage/py/misc/testing/test_oskill.py
py/branch/code-coverage/py/path/svn/svncommon.py
py/branch/code-coverage/py/path/svn/testing/test_wccommand.py
py/branch/code-coverage/py/path/svn/urlcommand.py
py/branch/code-coverage/py/path/svn/wccommand.py
py/branch/code-coverage/py/test/reporter.py
Log:
Merged py changes r51711:53733 from trunk into py-coverage branch
Modified: py/branch/code-coverage/py/__init__.py
==============================================================================
--- py/branch/code-coverage/py/__init__.py (original)
+++ py/branch/code-coverage/py/__init__.py Sun Apr 13 04:48:13 2008
@@ -67,6 +67,7 @@
'path.svnwc' : ('./path/svn/wccommand.py', 'SvnWCCommandPath'),
'path.svnurl' : ('./path/svn/urlcommand.py', 'SvnCommandPath'),
'path.local' : ('./path/local/local.py', 'LocalPath'),
+ 'path.SvnAuth' : ('./path/svn/svncommon.py', 'SvnAuth'),
# some nice slightly magic APIs
'magic.__doc__' : ('./magic/__init__.py', '__doc__'),
Modified: py/branch/code-coverage/py/apigen/testing/test_apigen_functional.py
==============================================================================
--- py/branch/code-coverage/py/apigen/testing/test_apigen_functional.py (original)
+++ py/branch/code-coverage/py/apigen/testing/test_apigen_functional.py Sun Apr 13 04:48:13 2008
@@ -117,7 +117,9 @@
pkgname, documentable = apigen.get_documentable_items_pkgdir(
fs_root.join(package_name))
assert pkgname == 'pak'
- assert sorted(documentable.keys()) == [
+ keys = documentable.keys()
+ keys.sort()
+ assert keys == [
'main.SomeTestClass', 'main.SomeTestSubClass', 'main.func',
'main.sub.func', 'somenamespace.baz', 'somenamespace.foo']
Modified: py/branch/code-coverage/py/conftest.py
==============================================================================
--- py/branch/code-coverage/py/conftest.py (original)
+++ py/branch/code-coverage/py/conftest.py Sun Apr 13 04:48:13 2008
@@ -33,6 +33,9 @@
action='store', dest='docpath',
default="doc", type='string',
help="relative path to doc output location (relative from py/)"),
+ Option('', '--runslowtests',
+ action="store_true", dest="runslowtests", default=False,
+ help="run slow tests)"),
)
dist_rsync_roots = ['.']
Modified: py/branch/code-coverage/py/doc/path.txt
==============================================================================
--- py/branch/code-coverage/py/doc/path.txt (original)
+++ py/branch/code-coverage/py/doc/path.txt Sun Apr 13 04:48:13 2008
@@ -187,6 +187,23 @@
>>> len(wc.status().prop_modified)
0
+SVN authentication
+++++++++++++++++++++++
+
+Some uncommon functionality can also be provided as extensions, such as SVN
+authentication::
+
+ >>> auth = py.path.SvnAuth('anonymous', 'user', cache_auth=False,
+ ... interactive=False)
+ >>> wc.auth = auth
+ >>> wc.update() # this should work
+ >>> path = wc.ensure('thisshouldnotexist.txt')
+ >>> try:
+ ... path.commit('testing')
+ ... except py.process.cmdexec.Error, e:
+ ... pass
+ >>> 'authorization failed' in str(e)
+ True
Known problems / limitations
===================================
Modified: py/branch/code-coverage/py/misc/testing/test_oskill.py
==============================================================================
--- py/branch/code-coverage/py/misc/testing/test_oskill.py (original)
+++ py/branch/code-coverage/py/misc/testing/test_oskill.py Sun Apr 13 04:48:13 2008
@@ -4,6 +4,10 @@
from py.__.misc.killproc import killproc
def test_win_killsubprocess():
+ if sys.platform == 'win32' and not py.path.local.sysfind('taskkill'):
+ py.test.skip("you\'re using an older version of windows, which "
+ "doesn\'t support 'taskkill' - py.misc.killproc is not "
+ "available")
tmp = py.test.ensuretemp("test_win_killsubprocess")
t = tmp.join("t.py")
t.write("import time ; time.sleep(100)")
Modified: py/branch/code-coverage/py/path/svn/svncommon.py
==============================================================================
--- py/branch/code-coverage/py/path/svn/svncommon.py (original)
+++ py/branch/code-coverage/py/path/svn/svncommon.py Sun Apr 13 04:48:13 2008
@@ -65,6 +65,7 @@
"""
obj = object.__new__(self.__class__)
obj.rev = kw.get('rev', self.rev)
+ obj.auth = kw.get('auth', self.auth)
dirname, basename, purebasename, ext = self._getbyspec(
"dirname,basename,purebasename,ext")
if 'basename' in kw:
@@ -138,7 +139,7 @@
args = tuple([arg.strip(self.sep) for arg in args])
parts = (self.strpath, ) + args
- newpath = self.__class__(self.sep.join(parts), self.rev)
+ newpath = self.__class__(self.sep.join(parts), self.rev, self.auth)
return newpath
def propget(self, name):
@@ -330,3 +331,27 @@
fspath = '%s at HEAD' % (fspath,)
return 'file://%s' % (fspath,)
+class SvnAuth(object):
+ """ container for auth information for Subversion """
+ def __init__(self, username, password, cache_auth=True, interactive=True):
+ self.username = username
+ self.password = password
+ self.cache_auth = cache_auth
+ self.interactive = interactive
+
+ def makecmdoptions(self):
+ uname = self.username.replace('"', '\\"')
+ passwd = self.password.replace('"', '\\"')
+ ret = []
+ if uname:
+ ret.append('--username="%s"' % (uname,))
+ if passwd:
+ ret.append('--password="%s"' % (passwd,))
+ if not self.cache_auth:
+ ret.append('--no-auth-cache')
+ if not self.interactive:
+ ret.append('--non-interactive')
+ return ' '.join(ret)
+
+ def __str__(self):
+ return "<SvnAuth username=%s ...>" %(self.username,)
Modified: py/branch/code-coverage/py/path/svn/testing/test_wccommand.py
==============================================================================
--- py/branch/code-coverage/py/path/svn/testing/test_wccommand.py (original)
+++ py/branch/code-coverage/py/path/svn/testing/test_wccommand.py Sun Apr 13 04:48:13 2008
@@ -1,13 +1,27 @@
import py
+import sys
from py.__.path.svn.testing.svntestbase import CommonSvnTests, getrepowc
from py.__.path.svn.wccommand import InfoSvnWCCommand
from py.__.path.svn.wccommand import parse_wcinfotime
from py.__.path.svn import svncommon
-
if py.path.local.sysfind('svn') is None:
py.test.skip("cannot test py.path.svn, 'svn' binary not found")
+if sys.platform != 'win32':
+ def normpath(p):
+ return p
+else:
+ try:
+ import win32api
+ except ImportError:
+ def normpath(p):
+ py.test.skip('this test requires win32api to run on windows')
+ else:
+ import os
+ def normpath(p):
+ p = win32api.GetShortPathName(p)
+ return os.path.normpath(os.path.normcase(p))
class TestWCSvnCommandPath(CommonSvnTests):
@@ -253,7 +267,7 @@
try:
locked = root.status().locked
assert len(locked) == 1
- assert str(locked[0]) == str(somefile)
+ assert normpath(str(locked[0])) == normpath(str(somefile))
#assert somefile.locked()
py.test.raises(Exception, 'somefile.lock()')
finally:
Modified: py/branch/code-coverage/py/path/svn/urlcommand.py
==============================================================================
--- py/branch/code-coverage/py/path/svn/urlcommand.py (original)
+++ py/branch/code-coverage/py/path/svn/urlcommand.py Sun Apr 13 04:48:13 2008
@@ -21,10 +21,11 @@
_lsrevcache = BuildcostAccessCache(maxentries=128)
_lsnorevcache = AgingCache(maxentries=1000, maxseconds=60.0)
- def __new__(cls, path, rev=None):
+ def __new__(cls, path, rev=None, auth=None):
self = object.__new__(cls)
if isinstance(path, cls):
rev = path.rev
+ auth = path.auth
path = path.strpath
proto, uri = path.split("://", 1)
host, uripath = uri.split('/', 1)
@@ -36,6 +37,7 @@
path = path.rstrip('/')
self.strpath = path
self.rev = rev
+ self.auth = auth
return self
def __repr__(self):
@@ -44,7 +46,8 @@
else:
return 'svnurl(%r, %r)' % (self.strpath, self.rev)
- def _svn(self, cmd, *args):
+ def _svnwithrev(self, cmd, *args):
+ """ execute an svn command, append our own url and revision """
if self.rev is None:
return self._svnwrite(cmd, *args)
else:
@@ -52,16 +55,28 @@
return self._svnwrite(cmd, *args)
def _svnwrite(self, cmd, *args):
+ """ execute an svn command, append our own url """
l = ['svn %s' % cmd]
args = ['"%s"' % self._escape(item) for item in args]
l.extend(args)
l.append('"%s"' % self._encodedurl())
# fixing the locale because we can't otherwise parse
- string = svncommon.fixlocale() + " ".join(l)
+ string = " ".join(l)
if DEBUG:
print "execing", string
+ out = self._svncmdexecauth(string)
+ return out
+
+ def _svncmdexecauth(self, cmd):
+ """ execute an svn command 'as is' """
+ cmd = svncommon.fixlocale() + cmd
+ if self.auth is not None:
+ cmd += ' ' + self.auth.makecmdoptions()
+ return self._cmdexec(cmd)
+
+ def _cmdexec(self, cmd):
try:
- out = process.cmdexec(string)
+ out = process.cmdexec(cmd)
except py.process.cmdexec.Error, e:
if (e.err.find('File Exists') != -1 or
e.err.find('File already exists') != -1):
@@ -69,21 +84,33 @@
raise
return out
+ def _svnpopenauth(self, cmd):
+ """ execute an svn command, return a pipe for reading stdin """
+ cmd = svncommon.fixlocale() + cmd
+ if self.auth is not None:
+ cmd += ' ' + self.auth.makecmdoptions()
+ return self._popen(cmd)
+
+ def _popen(self, cmd):
+ return os.popen(cmd)
+
def _encodedurl(self):
return self._escape(self.strpath)
+ def _norev_delentry(self, path):
+ auth = self.auth and self.auth.makecmdoptions() or None
+ self._lsnorevcache.delentry((str(path), auth))
+
def open(self, mode='r'):
""" return an opened file with the given mode. """
assert 'w' not in mode and 'a' not in mode, "XXX not implemented for svn cmdline"
assert self.check(file=1) # svn cat returns an empty file otherwise
- def popen(cmd):
- return os.popen(cmd)
if self.rev is None:
- return popen(svncommon.fixlocale() +
- 'svn cat "%s"' % (self._escape(self.strpath), ))
+ return self._svnpopenauth('svn cat "%s"' % (
+ self._escape(self.strpath), ))
else:
- return popen(svncommon.fixlocale() +
- 'svn cat -r %s "%s"' % (self.rev, self._escape(self.strpath)))
+ return self._svnpopenauth('svn cat -r %s "%s"' % (
+ self.rev, self._escape(self.strpath)))
def dirpath(self, *args, **kwargs):
""" return the directory path of the current path joined
@@ -104,33 +131,33 @@
commit_msg=kwargs.get('msg', "mkdir by py lib invocation")
createpath = self.join(*args)
createpath._svnwrite('mkdir', '-m', commit_msg)
- self._lsnorevcache.delentry(createpath.dirpath().strpath)
+ self._norev_delentry(createpath.dirpath())
return createpath
def copy(self, target, msg='copied by py lib invocation'):
""" copy path to target with checkin message msg."""
if getattr(target, 'rev', None) is not None:
raise py.error.EINVAL(target, "revisions are immutable")
- process.cmdexec('svn copy -m "%s" "%s" "%s"' %(msg,
- self._escape(self), self._escape(target)))
- self._lsnorevcache.delentry(target.dirpath().strpath)
+ self._svncmdexecauth('svn copy -m "%s" "%s" "%s"' %(msg,
+ self._escape(self), self._escape(target)))
+ self._norev_delentry(target.dirpath())
def rename(self, target, msg="renamed by py lib invocation"):
""" rename this path to target with checkin message msg. """
if getattr(self, 'rev', None) is not None:
raise py.error.EINVAL(self, "revisions are immutable")
- py.process.cmdexec('svn move -m "%s" --force "%s" "%s"' %(
- msg, self._escape(self), self._escape(target)))
- self._lsnorevcache.delentry(self.dirpath().strpath)
- self._lsnorevcache.delentry(self.strpath)
+ self._svncmdexecauth('svn move -m "%s" --force "%s" "%s"' %(
+ msg, self._escape(self), self._escape(target)))
+ self._norev_delentry(self.dirpath())
+ self._norev_delentry(self)
def remove(self, rec=1, msg='removed by py lib invocation'):
""" remove a file or directory (or a directory tree if rec=1) with
checkin message msg."""
if self.rev is not None:
raise py.error.EINVAL(self, "revisions are immutable")
- process.cmdexec('svn rm -m "%s" "%s"' %(msg, self._escape(self)))
- self._lsnorevcache.delentry(self.dirpath().strpath)
+ self._svncmdexecauth('svn rm -m "%s" "%s"' %(msg, self._escape(self)))
+ self._norev_delentry(self.dirpath())
def export(self, topath):
""" export to a local path
@@ -143,7 +170,7 @@
'"%s"' % (self._escape(topath),)]
if self.rev is not None:
args = ['-r', str(self.rev)] + args
- process.cmdexec('svn export %s' % (' '.join(args),))
+ self._svncmdexecauth('svn export %s' % (' '.join(args),))
return topath
def ensure(self, *args, **kwargs):
@@ -173,19 +200,19 @@
"ensure %s" % self._escape(tocreate),
self._escape(tempdir.join(basename)),
x.join(basename)._encodedurl())
- process.cmdexec(cmd)
- self._lsnorevcache.delentry(x.strpath) # !!!
+ self._svncmdexecauth(cmd)
+ self._norev_delentry(x)
finally:
tempdir.remove()
return target
# end of modifying methods
def _propget(self, name):
- res = self._svn('propget', name)
+ res = self._svnwithrev('propget', name)
return res[:-1] # strip trailing newline
def _proplist(self):
- res = self._svn('proplist')
+ res = self._svnwithrev('proplist')
lines = res.split('\n')
lines = map(str.strip, lines[1:])
return svncommon.PropListDict(self, lines)
@@ -194,7 +221,7 @@
""" return sequence of name-info directory entries of self """
def builder():
try:
- res = self._svn('ls', '-v')
+ res = self._svnwithrev('ls', '-v')
except process.cmdexec.Error, e:
if e.err.find('non-existent in that revision') != -1:
raise py.error.ENOENT(self, e.err)
@@ -214,10 +241,13 @@
info = InfoSvnCommand(lsline)
nameinfo_seq.append((info._name, info))
return nameinfo_seq
+ auth = self.auth and self.auth.makecmdoptions() or None
if self.rev is not None:
- return self._lsrevcache.getorbuild((self.strpath, self.rev), builder)
+ return self._lsrevcache.getorbuild((self.strpath, self.rev, auth),
+ builder)
else:
- return self._lsnorevcache.getorbuild(self.strpath, builder)
+ return self._lsnorevcache.getorbuild((self.strpath, auth),
+ builder)
def log(self, rev_start=None, rev_end=1, verbose=False):
""" return a list of LogEntry instances for this path.
@@ -234,9 +264,8 @@
else:
rev_opt = "-r %s:%s" % (rev_start, rev_end)
verbose_opt = verbose and "-v" or ""
- xmlpipe = os.popen(svncommon.fixlocale() +
- 'svn log --xml %s %s "%s"' %
- (rev_opt, verbose_opt, self.strpath))
+ xmlpipe = self._svnpopenauth('svn log --xml %s %s "%s"' %
+ (rev_opt, verbose_opt, self.strpath))
from xml.dom import minidom
tree = minidom.parse(xmlpipe)
result = []
@@ -254,7 +283,7 @@
# the '0?' part in the middle is an indication of whether the resource is
# locked, see 'svn help ls'
lspattern = re.compile(
- r'^ *(?P<rev>\d+) +(?P<author>\S+) +(0? *(?P<size>\d+))? '
+ r'^ *(?P<rev>\d+) +(?P<author>.+?) +(0? *(?P<size>\d+))? '
'*(?P<date>\w+ +\d{2} +[\d:]+) +(?P<file>.*)$')
def __init__(self, line):
# this is a typical line from 'svn ls http://...'
Modified: py/branch/code-coverage/py/path/svn/wccommand.py
==============================================================================
--- py/branch/code-coverage/py/path/svn/wccommand.py (original)
+++ py/branch/code-coverage/py/path/svn/wccommand.py Sun Apr 13 04:48:13 2008
@@ -25,7 +25,7 @@
"""
sep = os.sep
- def __new__(cls, wcpath=None):
+ def __new__(cls, wcpath=None, auth=None):
self = object.__new__(cls)
if isinstance(wcpath, cls):
if wcpath.__class__ == cls:
@@ -35,6 +35,7 @@
svncommon.ALLOWED_CHARS):
raise ValueError("bad char in wcpath %s" % (wcpath, ))
self.localpath = py.path.local(wcpath)
+ self.auth = auth
return self
strpath = property(lambda x: str(x.localpath), None, None, "string path")
@@ -63,13 +64,22 @@
info = self.info()
return py.path.svnurl(info.url)
-
def __repr__(self):
return "svnwc(%r)" % (self.strpath) # , self._url)
def __str__(self):
return str(self.localpath)
+ def _makeauthoptions(self):
+ if self.auth is None:
+ return ''
+ return self.auth.makecmdoptions()
+
+ def _authsvn(self, cmd, args=None):
+ args = args and list(args) or []
+ args.append(self._makeauthoptions())
+ return self._svn(cmd, *args)
+
def _svn(self, cmd, *args):
l = ['svn %s' % cmd]
args = [self._escape(item) for item in args]
@@ -101,9 +111,9 @@
raise
return out
- def switch(self, url):
+ def switch(self, url):
""" switch to given URL. """
- self._svn('switch', url)
+ self._authsvn('switch', [url])
def checkout(self, url=None, rev=None):
""" checkout from url to local wcpath. """
@@ -119,11 +129,12 @@
url += "@%d" % rev
else:
args.append('-r' + str(rev))
- self._svn('co', url, *args)
+ args.append(url)
+ self._authsvn('co', args)
def update(self, rev = 'HEAD'):
""" update working copy item to given revision. (None -> HEAD). """
- self._svn('up -r %s' % rev)
+ self._authsvn('up', ['-r', rev])
def write(self, content, mode='wb'):
""" write content into local filesystem wc. """
@@ -131,7 +142,7 @@
def dirpath(self, *args):
""" return the directory Path of the current Path. """
- return self.__class__(self.localpath.dirpath(*args))
+ return self.__class__(self.localpath.dirpath(*args), auth=self.auth)
def _ensuredirs(self):
parent = self.dirpath()
@@ -197,18 +208,21 @@
""" rename this path to target. """
py.process.cmdexec("svn move --force %s %s" %(str(self), str(target)))
- _rex_status = re.compile(r'\s+(\d+|-)\s+(\S+)\s+(\S+)\s+(.*)')
+ # XXX a bit scary to assume there's always 2 spaces between username and
+ # path, however with win32 allowing spaces in user names there doesn't
+ # seem to be a more solid approach :(
+ _rex_status = re.compile(r'\s+(\d+|-)\s+(\S+)\s+(.+?)\s{2,}(.*)')
def lock(self):
""" set a lock (exclusive) on the resource """
- out = self._svn('lock').strip()
+ out = self._authsvn('lock').strip()
if not out:
# warning or error, raise exception
raise Exception(out[4:])
def unlock(self):
""" unset a previously set lock """
- out = self._svn('unlock').strip()
+ out = self._authsvn('unlock').strip()
if out.startswith('svn:'):
# warning or error, raise exception
raise Exception(out[4:])
@@ -248,7 +262,8 @@
update_rev = None
- out = self._svn('status -v %s %s %s' % (updates, rec, externals))
+ cmd = 'status -v %s %s %s' % (updates, rec, externals)
+ out = self._authsvn(cmd)
rootstatus = WCStatus(self)
for line in out.split('\n'):
if not line.strip():
@@ -266,7 +281,8 @@
wcpath = self.join(fn, abs=1)
rootstatus.unknown.append(wcpath)
elif c0 == 'X':
- wcpath = self.__class__(self.localpath.join(fn, abs=1))
+ wcpath = self.__class__(self.localpath.join(fn, abs=1),
+ auth=self.auth)
rootstatus.external.append(wcpath)
elif c0 == 'I':
wcpath = self.join(fn, abs=1)
@@ -334,10 +350,10 @@
""" return a diff of the current path against revision rev (defaulting
to the last one).
"""
- if rev is None:
- out = self._svn('diff')
- else:
- out = self._svn('diff -r %d' % rev)
+ args = []
+ if rev is not None:
+ args.append("-r %d" % rev)
+ out = self._authsvn('diff', args)
return out
def blame(self):
@@ -365,7 +381,7 @@
cmd = 'commit -m "%s" --force-log' % (msg.replace('"', '\\"'),)
if not rec:
cmd += ' -N'
- out = self._svn(cmd)
+ out = self._authsvn(cmd)
try:
del cache.info[self]
except KeyError:
@@ -431,7 +447,7 @@
localpath = self.localpath.new(**kw)
else:
localpath = self.localpath
- return self.__class__(localpath)
+ return self.__class__(localpath, auth=self.auth)
def join(self, *args, **kwargs):
""" return a new Path (with the same revision) which is composed
@@ -440,7 +456,7 @@
if not args:
return self
localpath = self.localpath.join(*args, **kwargs)
- return self.__class__(localpath)
+ return self.__class__(localpath, auth=self.auth)
def info(self, usecache=1):
""" return an Info structure with svn-provided information. """
@@ -483,7 +499,7 @@
paths = []
for localpath in self.localpath.listdir(notsvn):
- p = self.__class__(localpath)
+ p = self.__class__(localpath, auth=self.auth)
paths.append(p)
if fil or sort:
@@ -534,11 +550,13 @@
else:
rev_opt = "-r %s:%s" % (rev_start, rev_end)
verbose_opt = verbose and "-v" or ""
- s = svncommon.fixlocale()
+ locale_env = svncommon.fixlocale()
# some blather on stderr
- stdin, stdout, stderr = os.popen3(s + 'svn log --xml %s %s "%s"' % (
- rev_opt, verbose_opt,
- self.strpath))
+ auth_opt = self._makeauthoptions()
+ stdin, stdout, stderr = os.popen3(locale_env +
+ 'svn log --xml %s %s %s "%s"' % (
+ rev_opt, verbose_opt, auth_opt,
+ self.strpath))
from xml.dom import minidom
from xml.parsers.expat import ExpatError
try:
@@ -562,7 +580,7 @@
return self.info().mtime
def __hash__(self):
- return hash((self.strpath, self.__class__))
+ return hash((self.strpath, self.__class__, self.auth))
class WCStatus:
Modified: py/branch/code-coverage/py/test/reporter.py
==============================================================================
--- py/branch/code-coverage/py/test/reporter.py (original)
+++ py/branch/code-coverage/py/test/reporter.py Sun Apr 13 04:48:13 2008
@@ -49,12 +49,15 @@
return self.flag
class AbstractReporter(object):
- def __init__(self, config, hosts):
+ def __init__(self, config, hosts, out=None):
self.config = config
self.hosts = hosts
self.failed_tests_outcome = []
self.skipped_tests_outcome = []
- self.out = getout(py.std.sys.stdout)
+ if out is None:
+ self.out = getout(py.std.sys.stdout)
+ else:
+ self.out = out
self.presenter = Presenter(self.out, config)
self.to_rsync = {}
More information about the pytest-commit
mailing list