[Python-checkins] distutils2: Fix backport changesets part 3: backported modules.
eric.araujo
python-checkins at python.org
Mon Sep 19 15:12:39 CEST 2011
http://hg.python.org/distutils2/rev/d1d251292ee7
changeset: 1152:d1d251292ee7
user: Ãric Araujo <merwok at netwok.org>
date: Sun Sep 18 23:10:58 2011 +0200
summary:
Fix backport changesets part 3: backported modules.
shutil, sysconfig, tarfile and their tests have been updated to the
latest 3.2 version (except for test_tarfile which is not backported yet)
and edited to be compatible with 2.4.
Duplicates added in util during the Great Update have been deleted, as
well as functions I removed recently in packaging. Unneeded modules in
_backport have been deleted or moved to d2.compat.
files:
distutils2/_backport/__init__.py | 10 +-
distutils2/_backport/functools.py | 56 -
distutils2/_backport/path.py | 15 -
distutils2/_backport/shutil.py | 27 +-
distutils2/_backport/sysconfig.py | 24 +-
distutils2/_backport/tarfile.py | 894 +++++----
distutils2/_backport/tests/test_shutil.py | 233 +-
distutils2/_backport/tests/test_sysconfig.py | 212 +-
distutils2/command/build_scripts.py | 5 +-
distutils2/command/cmd.py | 3 +-
distutils2/command/sdist.py | 4 +-
distutils2/compat.py | 131 +-
distutils2/create.py | 26 +-
distutils2/install.py | 3 +-
distutils2/pypi/dist.py | 14 +-
distutils2/pypi/simple.py | 5 +-
distutils2/tests/pypi_server.py | 5 +-
distutils2/tests/support.py | 2 +-
distutils2/tests/test_command_sdist.py | 3 +-
distutils2/tests/test_util.py | 9 +-
distutils2/util.py | 727 +-------
21 files changed, 969 insertions(+), 1439 deletions(-)
diff --git a/distutils2/_backport/__init__.py b/distutils2/_backport/__init__.py
--- a/distutils2/_backport/__init__.py
+++ b/distutils2/_backport/__init__.py
@@ -1,8 +1,4 @@
-"""Things that will land in the Python 3.3 std lib but which we must drag along
- us for now to support 2.x."""
+"""Modules copied from the Python 3.2 standard library.
-def any(seq):
- for elem in seq:
- if elem:
- return True
- return False
+Individual classes and objects like the any function are in compat.
+"""
diff --git a/distutils2/_backport/functools.py b/distutils2/_backport/functools.py
deleted file mode 100644
--- a/distutils2/_backport/functools.py
+++ /dev/null
@@ -1,56 +0,0 @@
-"""functools.py - Tools for working with functions and callable objects
-Copied from:
-https://github.com/dln/pycassa/commit/90736f8146c1cac8287f66e8c8b64cb80e011513#diff-1
-
-"""
-
-try:
- from _functools import partial
-except:
- class partial(object):
- "A simple replacement of functools.partial"
- def __init__(self, func, *args, **kw):
- self.func = func
- self.args = args
- self.keywords = kw
- def __call__(self, *otherargs, **otherkw):
- kw = self.keywords.copy()
- kw.update(otherkw)
- return self.func(*(self.args + otherargs), **kw)
-
-# update_wrapper() and wraps() are tools to help write
-# wrapper functions that can handle naive introspection
-
-WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__doc__')
-WRAPPER_UPDATES = ('__dict__',)
-def update_wrapper(wrapper, wrapped, assigned=WRAPPER_ASSIGNMENTS,
- updated=WRAPPER_UPDATES):
- """Update a wrapper function to look like the wrapped function
-
- wrapper is the function to be updated
- wrapped is the original function
- assigned is a tuple naming the attributes assigned directly
- from the wrapped function to the wrapper function (defaults to
- functools.WRAPPER_ASSIGNMENTS)
- updated is a tuple naming the attributes of the wrapper that
- are updated with the corresponding attribute from the wrapped
- function (defaults to functools.WRAPPER_UPDATES)
- """
- for attr in assigned:
- setattr(wrapper, attr, getattr(wrapped, attr))
- for attr in updated:
- getattr(wrapper, attr).update(getattr(wrapped, attr, {}))
- # Return the wrapper so this can be used as a decorator via partial()
- return wrapper
-
-def wraps(wrapped, assigned=WRAPPER_ASSIGNMENTS, updated=WRAPPER_UPDATES):
- """Decorator factory to apply update_wrapper() to a wrapper function
-
- Returns a decorator that invokes update_wrapper() with the decorated
- function as the wrapper argument and the arguments to wraps() as the
- remaining arguments. Default arguments are as for update_wrapper().
- This is a convenience function to simplify applying partial() to
- update_wrapper().
- """
- return partial(update_wrapper, wrapped=wrapped,
- assigned=assigned, updated=updated)
diff --git a/distutils2/_backport/path.py b/distutils2/_backport/path.py
deleted file mode 100644
--- a/distutils2/_backport/path.py
+++ /dev/null
@@ -1,15 +0,0 @@
-from posixpath import curdir, sep, pardir, join, abspath, commonprefix
-
-def relpath(path, start=curdir):
- """Return a relative version of a path"""
- if not path:
- raise ValueError("no path specified")
- start_list = abspath(start).split(sep)
- path_list = abspath(path).split(sep)
- # Work out how much of the filepath is shared by start and path.
- i = len(commonprefix([start_list, path_list]))
- rel_list = [pardir] * (len(start_list)-i) + path_list[i:]
- if not rel_list:
- return curdir
- return join(*rel_list)
-
diff --git a/distutils2/_backport/shutil.py b/distutils2/_backport/shutil.py
--- a/distutils2/_backport/shutil.py
+++ b/distutils2/_backport/shutil.py
@@ -32,7 +32,7 @@
"ExecError", "make_archive", "get_archive_formats",
"register_archive_format", "unregister_archive_format",
"get_unpack_formats", "register_unpack_format",
- "unregister_unpack_format", "unpack_archive"]
+ "unregister_unpack_format", "unpack_archive", "ignore_patterns"]
class Error(EnvironmentError):
pass
@@ -202,8 +202,11 @@
else:
ignored_names = set()
- if not os.path.exists(dst):
+ try:
os.makedirs(dst)
+ except OSError, e:
+ if e.errno != errno.EEXIST:
+ raise
errors = []
for name in names:
@@ -317,6 +320,12 @@
"""
real_dst = dst
if os.path.isdir(dst):
+ if _samefile(src, dst):
+ # We might be on a case insensitive filesystem,
+ # perform the rename anyway.
+ os.rename(src, dst)
+ return
+
real_dst = os.path.join(dst, _basename(src))
if os.path.exists(real_dst):
raise Error("Destination path '%s' already exists" % real_dst)
@@ -408,7 +417,7 @@
from distutils2._backport import tarfile
if logger is not None:
- logger.info('creating tar archive')
+ logger.info('Creating tar archive')
uid = _get_uid(owner)
gid = _get_gid(group)
@@ -696,6 +705,7 @@
def _unpack_tarfile(filename, extract_dir):
"""Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
"""
+ # late import because of circular dependency
from distutils2._backport import tarfile
try:
tarobj = tarfile.open(filename)
@@ -742,16 +752,14 @@
if extract_dir is None:
extract_dir = os.getcwd()
- func = None
-
if format is not None:
try:
format_info = _UNPACK_FORMATS[format]
except KeyError:
raise ValueError("Unknown unpack format '%s'" % format)
- func = format_info[0]
- func(filename, extract_dir, **dict(format_info[1]))
+ func = format_info[1]
+ func(filename, extract_dir, **dict(format_info[2]))
else:
# we need to look at the registered unpackers supported extensions
format = _find_unpack_format(filename)
@@ -761,8 +769,3 @@
func = _UNPACK_FORMATS[format][1]
kwargs = dict(_UNPACK_FORMATS[format][2])
func(filename, extract_dir, **kwargs)
-
- if func is None:
- raise ValueError('Unknown archive format: %s' % filename)
-
- return extract_dir
diff --git a/distutils2/_backport/sysconfig.py b/distutils2/_backport/sysconfig.py
--- a/distutils2/_backport/sysconfig.py
+++ b/distutils2/_backport/sysconfig.py
@@ -338,8 +338,10 @@
config_h = get_config_h_filename()
try:
f = open(config_h)
- parse_config_h(f, vars)
- f.close()
+ try:
+ parse_config_h(f, vars)
+ finally:
+ f.close()
except IOError, e:
msg = "invalid Python installation: unable to open %s" % config_h
if hasattr(e, "strerror"):
@@ -730,13 +732,13 @@
# On OSX the machine type returned by uname is always the
# 32-bit variant, even if the executable architecture is
# the 64-bit variant
- if sys.maxsize >= 2**32:
+ if sys.maxint >= 2**32:
machine = 'x86_64'
elif machine in ('PowerPC', 'Power_Macintosh'):
# Pick a sane name for the PPC architecture.
# See 'i386' case
- if sys.maxsize >= 2**32:
+ if sys.maxint >= 2**32:
machine = 'ppc64'
else:
machine = 'ppc'
@@ -751,18 +753,18 @@
def _print_dict(title, data):
for index, (key, value) in enumerate(sorted(data.items())):
if index == 0:
- print('%s: ' % (title))
- print('\t%s = "%s"' % (key, value))
+ print '%s: ' % (title)
+ print '\t%s = "%s"' % (key, value)
def _main():
"""Display all information sysconfig detains."""
- print('Platform: "%s"' % get_platform())
- print('Python version: "%s"' % get_python_version())
- print('Current installation scheme: "%s"' % _get_default_scheme())
- print(u'')
+ print 'Platform: "%s"' % get_platform()
+ print 'Python version: "%s"' % get_python_version()
+ print 'Current installation scheme: "%s"' % _get_default_scheme()
+ print
_print_dict('Paths', get_paths())
- print(u'')
+ print
_print_dict('Variables', get_config_vars())
diff --git a/distutils2/_backport/tarfile.py b/distutils2/_backport/tarfile.py
--- a/distutils2/_backport/tarfile.py
+++ b/distutils2/_backport/tarfile.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python
-# -*- coding: iso-8859-1 -*-
+# encoding: utf-8
#-------------------------------------------------------------------
# tarfile.py
#-------------------------------------------------------------------
-# Copyright (C) 2002 Lars Gustäbel <lars at gustaebel.de>
+# Copyright (C) 2002 Lars Gustäbel <lars at gustaebel.de>
# All rights reserved.
#
# Permission is hereby granted, free of charge, to any person
@@ -30,14 +30,13 @@
"""Read from and write to tar format archives.
"""
-__version__ = "$Revision: 76780 $"
-# $Source$
+__version__ = "$Revision$"
version = "0.9.0"
-__author__ = "Lars Gustäbel (lars at gustaebel.de)"
-__date__ = "$Date: 2009-12-13 12:32:27 +0100 (Dim 13 déc 2009) $"
-__cvsid__ = "$Id: tarfile.py 76780 2009-12-13 11:32:27Z lars.gustaebel $"
-__credits__ = "Gustavo Niemeyer, Niels Gustäbel, Richard Townsend."
+__author__ = u"Lars Gust\u00e4bel (lars at gustaebel.de)"
+__date__ = "$Date: 2011-02-25 17:42:01 +0200 (Fri, 25 Feb 2011) $"
+__cvsid__ = "$Id: tarfile.py 88586 2011-02-25 15:42:01Z marc-andre.lemburg $"
+__credits__ = u"Gustavo Niemeyer, Niels Gust\u00e4bel, Richard Townsend."
#---------
# Imports
@@ -51,19 +50,26 @@
import struct
import copy
import re
-import operator
-
-if not hasattr(os, 'SEEK_SET'):
- os.SEEK_SET = 0
try:
import grp, pwd
except ImportError:
grp = pwd = None
+# os.symlink on Windows prior to 6.0 raises NotImplementedError
+symlink_exception = (AttributeError, NotImplementedError)
+try:
+ # WindowsError (1314) will be raised if the caller does not hold the
+ # SeCreateSymbolicLinkPrivilege privilege
+ symlink_exception += (WindowsError,)
+except NameError:
+ pass
+
# from tarfile import *
__all__ = ["TarFile", "TarInfo", "is_tarfile", "TarError"]
+from __builtin__ import open as _open # Since 'open' is TarFile.open
+
#---------------------------------------------------------
# tar constants
#---------------------------------------------------------
@@ -122,6 +128,9 @@
PAX_FIELDS = ("path", "linkpath", "size", "mtime",
"uid", "gid", "uname", "gname")
+# Fields from a pax header that are affected by hdrcharset.
+PAX_NAME_FIELDS = set(("path", "linkpath", "uname", "gname"))
+
# Fields in a pax header that are numbers, all other fields
# are treated as strings.
PAX_NUMBER_FIELDS = {
@@ -160,27 +169,28 @@
#---------------------------------------------------------
# initialization
#---------------------------------------------------------
-ENCODING = sys.getfilesystemencoding()
-if ENCODING is None:
- ENCODING = sys.getdefaultencoding()
+if os.name in ("nt", "ce"):
+ ENCODING = "utf-8"
+else:
+ ENCODING = sys.getfilesystemencoding()
#---------------------------------------------------------
# Some useful functions
#---------------------------------------------------------
-def stn(s, length):
- """Convert a python string to a null-terminated string buffer.
+def stn(s, length, encoding, errors):
+ """Convert a string to a null-terminated bytes object.
"""
+ s = s.encode(encoding, errors)
return s[:length] + (length - len(s)) * NUL
-def nts(s):
- """Convert a null-terminated string field to a python string.
+def nts(s, encoding, errors):
+ """Convert a null-terminated bytes object to a string.
"""
- # Use the string up to the first null char.
p = s.find("\0")
- if p == -1:
- return s
- return s[:p]
+ if p != -1:
+ s = s[:p]
+ return s.decode(encoding, errors)
def nti(s):
"""Convert a number field to a python number.
@@ -189,7 +199,7 @@
# itn() below.
if s[0] != chr(0200):
try:
- n = int(nts(s) or "0", 8)
+ n = int(nts(s, "ascii", "strict") or "0", 8)
except ValueError:
raise InvalidHeaderError("invalid header")
else:
@@ -226,26 +236,6 @@
s = chr(0200) + s
return s
-def uts(s, encoding, errors):
- """Convert a unicode object to a string.
- """
- if errors == "utf-8":
- # An extra error handler similar to the -o invalid=UTF-8 option
- # in POSIX.1-2001. Replace untranslatable characters with their
- # UTF-8 representation.
- try:
- return s.encode(encoding, "strict")
- except UnicodeEncodeError:
- x = []
- for c in s:
- try:
- x.append(c.encode(encoding, "strict"))
- except UnicodeEncodeError:
- x.append(c.encode("utf8"))
- return "".join(x)
- else:
- return s.encode(encoding, errors)
-
def calc_chksums(buf):
"""Calculate the checksum for a member's header by summing up all
characters except for the chksum field which is treated as if
@@ -376,7 +366,7 @@
}[mode]
if hasattr(os, "O_BINARY"):
mode |= os.O_BINARY
- self.fd = os.open(name, mode)
+ self.fd = os.open(name, mode, 0666)
def close(self):
os.close(self.fd)
@@ -421,28 +411,34 @@
self.pos = 0L
self.closed = False
- if comptype == "gz":
- try:
- import zlib
- except ImportError:
- raise CompressionError("zlib module is not available")
- self.zlib = zlib
- self.crc = zlib.crc32("") & 0xffffffffL
- if mode == "r":
- self._init_read_gz()
- else:
- self._init_write_gz()
+ try:
+ if comptype == "gz":
+ try:
+ import zlib
+ except ImportError:
+ raise CompressionError("zlib module is not available")
+ self.zlib = zlib
+ self.crc = zlib.crc32("")
+ if mode == "r":
+ self._init_read_gz()
+ else:
+ self._init_write_gz()
- if comptype == "bz2":
- try:
- import bz2
- except ImportError:
- raise CompressionError("bz2 module is not available")
- if mode == "r":
- self.dbuf = ""
- self.cmp = bz2.BZ2Decompressor()
- else:
- self.cmp = bz2.BZ2Compressor()
+ if comptype == "bz2":
+ try:
+ import bz2
+ except ImportError:
+ raise CompressionError("bz2 module is not available")
+ if mode == "r":
+ self.dbuf = ""
+ self.cmp = bz2.BZ2Decompressor()
+ else:
+ self.cmp = bz2.BZ2Compressor()
+ except:
+ if not self._extfileobj:
+ self.fileobj.close()
+ self.closed = True
+ raise
def __del__(self):
if hasattr(self, "closed") and not self.closed:
@@ -459,7 +455,8 @@
self.__write("\037\213\010\010%s\002\377" % timestamp)
if self.name.endswith(".gz"):
self.name = self.name[:-3]
- self.__write(self.name + NUL)
+ # RFC1952 says we must use ISO-8859-1 for the FNAME field.
+ self.__write(self.name.encode("iso-8859-1", "replace") + NUL)
def write(self, s):
"""Write string s to the stream.
@@ -582,7 +579,6 @@
return self.__read(size)
c = len(self.dbuf)
- t = [self.dbuf]
while c < size:
buf = self.__read(self.bufsize)
if not buf:
@@ -591,27 +587,26 @@
buf = self.cmp.decompress(buf)
except IOError:
raise ReadError("invalid compressed data")
- t.append(buf)
+ self.dbuf += buf
c += len(buf)
- t = "".join(t)
- self.dbuf = t[size:]
- return t[:size]
+ buf = self.dbuf[:size]
+ self.dbuf = self.dbuf[size:]
+ return buf
def __read(self, size):
"""Return size bytes from stream. If internal buffer is empty,
read another block from the stream.
"""
c = len(self.buf)
- t = [self.buf]
while c < size:
buf = self.fileobj.read(self.bufsize)
if not buf:
break
- t.append(buf)
+ self.buf += buf
c += len(buf)
- t = "".join(t)
- self.buf = t[size:]
- return t[:size]
+ buf = self.buf[:size]
+ self.buf = self.buf[size:]
+ return buf
# class _Stream
class _StreamProxy(object):
@@ -665,16 +660,14 @@
self.bz2obj = bz2.BZ2Compressor()
def read(self, size):
- b = [self.buf]
x = len(self.buf)
while x < size:
raw = self.fileobj.read(self.blocksize)
if not raw:
break
data = self.bz2obj.decompress(raw)
- b.append(data)
+ self.buf += data
x += len(data)
- self.buf = "".join(b)
buf = self.buf[:size]
self.buf = self.buf[size:]
@@ -709,13 +702,35 @@
object.
"""
- def __init__(self, fileobj, offset, size, sparse=None):
+ def __init__(self, fileobj, offset, size, blockinfo=None):
self.fileobj = fileobj
self.offset = offset
self.size = size
- self.sparse = sparse
self.position = 0
+ if blockinfo is None:
+ blockinfo = [(0, size)]
+
+ # Construct a map with data and zero blocks.
+ self.map_index = 0
+ self.map = []
+ lastpos = 0
+ realpos = self.offset
+ for offset, size in blockinfo:
+ if offset > lastpos:
+ self.map.append((False, lastpos, offset, None))
+ self.map.append((True, offset, offset + size, realpos))
+ realpos += size
+ lastpos = offset + size
+ if lastpos < self.size:
+ self.map.append((False, lastpos, self.size, None))
+
+ def seekable(self):
+ if not hasattr(self.fileobj, "seekable"):
+ # XXX gzip.GzipFile and bz2.BZ2File
+ return True
+ return self.fileobj.seekable()
+
def tell(self):
"""Return the current file position.
"""
@@ -734,48 +749,25 @@
else:
size = min(size, self.size - self.position)
- if self.sparse is None:
- return self.readnormal(size)
- else:
- return self.readsparse(size)
-
- def readnormal(self, size):
- """Read operation for regular files.
- """
- self.fileobj.seek(self.offset + self.position)
- self.position += size
- return self.fileobj.read(size)
-
- def readsparse(self, size):
- """Read operation for sparse files.
- """
- data = []
+ buf = ""
while size > 0:
- buf = self.readsparsesection(size)
- if not buf:
- break
- size -= len(buf)
- data.append(buf)
- return "".join(data)
-
- def readsparsesection(self, size):
- """Read a single section of a sparse file.
- """
- section = self.sparse.find(self.position)
-
- if section is None:
- return ""
-
- size = min(size, section.offset + section.size - self.position)
-
- if isinstance(section, _data):
- realpos = section.realpos + self.position - section.offset
- self.fileobj.seek(self.offset + realpos)
- self.position += size
- return self.fileobj.read(size)
- else:
- self.position += size
- return NUL * size
+ while True:
+ data, start, stop, offset = self.map[self.map_index]
+ if start <= self.position < stop:
+ break
+ else:
+ self.map_index += 1
+ if self.map_index == len(self.map):
+ self.map_index = 0
+ length = min(size, stop - self.position)
+ if data:
+ self.fileobj.seek(offset + (self.position - start))
+ buf += self.fileobj.read(length)
+ else:
+ buf += NUL * length
+ size -= length
+ self.position += length
+ return buf
#class _FileInFile
@@ -789,7 +781,7 @@
self.fileobj = _FileInFile(tarfile.fileobj,
tarinfo.offset_data,
tarinfo.size,
- getattr(tarinfo, "sparse", None))
+ tarinfo.sparse)
self.name = tarinfo.name
self.mode = "r"
self.closed = False
@@ -798,6 +790,15 @@
self.position = 0
self.buffer = ""
+ def readable(self):
+ return True
+
+ def writable(self):
+ return False
+
+ def seekable(self):
+ return self.fileobj.seekable()
+
def read(self, size=None):
"""Read at most size bytes from the file. If size is not
present or None, read all data until EOF is reached.
@@ -822,6 +823,9 @@
self.position += len(buf)
return buf
+ # XXX TextIOWrapper uses the read1() method.
+ read1 = read
+
def readline(self, size=-1):
"""Read one entire line from the file. If size is present
and non-negative, return a string with at most that
@@ -830,15 +834,13 @@
if self.closed:
raise ValueError("I/O operation on closed file")
- if "\n" in self.buffer:
- pos = self.buffer.find("\n") + 1
- else:
- buffers = [self.buffer]
+ pos = self.buffer.find("\n") + 1
+ if pos == 0:
+ # no newline found.
while True:
buf = self.fileobj.read(self.blocksize)
- buffers.append(buf)
+ self.buffer += buf
if not buf or "\n" in buf:
- self.buffer = "".join(buffers)
pos = self.buffer.find("\n") + 1
if pos == 0:
# no newline found.
@@ -871,20 +873,20 @@
return self.position
- def seek(self, pos, whence=os.SEEK_SET):
+ def seek(self, pos, whence=0):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
- if whence == os.SEEK_SET:
+ if whence == 0: # os.SEEK_SET
self.position = min(max(pos, 0), self.size)
- elif whence == os.SEEK_CUR:
+ elif whence == 1: # os.SEEK_CUR
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
- elif whence == os.SEEK_END:
+ elif whence == 2: # os.SEEK_END
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
@@ -918,6 +920,12 @@
usually created internally.
"""
+ __slots__ = ("name", "mode", "uid", "gid", "size", "mtime",
+ "chksum", "type", "linkname", "uname", "gname",
+ "devmajor", "devminor",
+ "offset", "offset_data", "pax_headers", "sparse",
+ "tarfile", "_sparse_structs", "_link_target")
+
def __init__(self, name=""):
"""Construct a TarInfo object. name is the optional name
of the member.
@@ -931,14 +939,15 @@
self.chksum = 0 # header checksum
self.type = REGTYPE # member type
self.linkname = "" # link name
- self.uname = "root" # user name
- self.gname = "root" # group name
+ self.uname = "" # user name
+ self.gname = "" # group name
self.devmajor = 0 # device major number
self.devminor = 0 # device minor number
self.offset = 0 # the tar header starts here
self.offset_data = 0 # the file's data starts here
+ self.sparse = None # sparse member information
self.pax_headers = {} # pax header information
# In pax headers the "name" and "linkname" field are called
@@ -958,7 +967,7 @@
def __repr__(self):
return "<%s %r at %#x>" % (self.__class__.__name__,self.name,id(self))
- def get_info(self, encoding, errors):
+ def get_info(self):
"""Return the TarInfo's attributes as a dictionary.
"""
info = {
@@ -980,27 +989,23 @@
if info["type"] == DIRTYPE and not info["name"].endswith("/"):
info["name"] += "/"
- for key in ("name", "linkname", "uname", "gname"):
- if type(info[key]) is unicode:
- info[key] = info[key].encode(encoding, errors)
-
return info
def tobuf(self, format=DEFAULT_FORMAT, encoding=ENCODING, errors="strict"):
"""Return a tar header as a string of 512 byte blocks.
"""
- info = self.get_info(encoding, errors)
+ info = self.get_info()
if format == USTAR_FORMAT:
- return self.create_ustar_header(info)
+ return self.create_ustar_header(info, encoding, errors)
elif format == GNU_FORMAT:
- return self.create_gnu_header(info)
+ return self.create_gnu_header(info, encoding, errors)
elif format == PAX_FORMAT:
- return self.create_pax_header(info, encoding, errors)
+ return self.create_pax_header(info, encoding)
else:
raise ValueError("invalid format")
- def create_ustar_header(self, info):
+ def create_ustar_header(self, info, encoding, errors):
"""Return the object as a ustar header block.
"""
info["magic"] = POSIX_MAGIC
@@ -1011,23 +1016,23 @@
if len(info["name"]) > LENGTH_NAME:
info["prefix"], info["name"] = self._posix_split_name(info["name"])
- return self._create_header(info, USTAR_FORMAT)
+ return self._create_header(info, USTAR_FORMAT, encoding, errors)
- def create_gnu_header(self, info):
+ def create_gnu_header(self, info, encoding, errors):
"""Return the object as a GNU header block sequence.
"""
info["magic"] = GNU_MAGIC
buf = ""
if len(info["linkname"]) > LENGTH_LINK:
- buf += self._create_gnu_long_header(info["linkname"], GNUTYPE_LONGLINK)
+ buf += self._create_gnu_long_header(info["linkname"], GNUTYPE_LONGLINK, encoding, errors)
if len(info["name"]) > LENGTH_NAME:
- buf += self._create_gnu_long_header(info["name"], GNUTYPE_LONGNAME)
+ buf += self._create_gnu_long_header(info["name"], GNUTYPE_LONGNAME, encoding, errors)
- return buf + self._create_header(info, GNU_FORMAT)
+ return buf + self._create_header(info, GNU_FORMAT, encoding, errors)
- def create_pax_header(self, info, encoding, errors):
+ def create_pax_header(self, info, encoding):
"""Return the object as a ustar header block. If it cannot be
represented this way, prepend a pax extended header sequence
with supplement information.
@@ -1045,17 +1050,15 @@
# The pax header has priority.
continue
- val = info[name].decode(encoding, errors)
-
# Try to encode the string as ASCII.
try:
- val.encode("ascii")
+ info[name].encode("ascii", "strict")
except UnicodeEncodeError:
- pax_headers[hname] = val
+ pax_headers[hname] = info[name]
continue
if len(info[name]) > length:
- pax_headers[hname] = val
+ pax_headers[hname] = info[name]
# Test number fields for values that exceed the field limit or values
# that like to be stored as float.
@@ -1072,17 +1075,17 @@
# Create a pax extended header if necessary.
if pax_headers:
- buf = self._create_pax_generic_header(pax_headers)
+ buf = self._create_pax_generic_header(pax_headers, XHDTYPE, encoding)
else:
buf = ""
- return buf + self._create_header(info, USTAR_FORMAT)
+ return buf + self._create_header(info, USTAR_FORMAT, "ascii", "replace")
@classmethod
def create_pax_global_header(cls, pax_headers):
"""Return the object as a pax global header block sequence.
"""
- return cls._create_pax_generic_header(pax_headers, type=XGLTYPE)
+ return cls._create_pax_generic_header(pax_headers, XGLTYPE, "utf8")
def _posix_split_name(self, name):
"""Split a name longer than 100 chars into a prefix
@@ -1100,12 +1103,12 @@
return prefix, name
@staticmethod
- def _create_header(info, format):
+ def _create_header(info, format, encoding, errors):
"""Return a header block. info is a dictionary with file
information, format must be one of the *_FORMAT constants.
"""
parts = [
- stn(info.get("name", ""), 100),
+ stn(info.get("name", ""), 100, encoding, errors),
itn(info.get("mode", 0) & 07777, 8, format),
itn(info.get("uid", 0), 8, format),
itn(info.get("gid", 0), 8, format),
@@ -1113,13 +1116,13 @@
itn(info.get("mtime", 0), 12, format),
" ", # checksum field
info.get("type", REGTYPE),
- stn(info.get("linkname", ""), 100),
- stn(info.get("magic", POSIX_MAGIC), 8),
- stn(info.get("uname", "root"), 32),
- stn(info.get("gname", "root"), 32),
+ stn(info.get("linkname", ""), 100, encoding, errors),
+ info.get("magic", POSIX_MAGIC),
+ stn(info.get("uname", ""), 32, encoding, errors),
+ stn(info.get("gname", ""), 32, encoding, errors),
itn(info.get("devmajor", 0), 8, format),
itn(info.get("devminor", 0), 8, format),
- stn(info.get("prefix", ""), 155)
+ stn(info.get("prefix", ""), 155, encoding, errors)
]
buf = struct.pack("%ds" % BLOCKSIZE, "".join(parts))
@@ -1138,11 +1141,11 @@
return payload
@classmethod
- def _create_gnu_long_header(cls, name, type):
+ def _create_gnu_long_header(cls, name, type, encoding, errors):
"""Return a GNUTYPE_LONGNAME or GNUTYPE_LONGLINK sequence
for name.
"""
- name += NUL
+ name = name.encode(encoding, errors) + NUL
info = {}
info["name"] = "././@LongLink"
@@ -1151,19 +1154,39 @@
info["magic"] = GNU_MAGIC
# create extended header + name blocks.
- return cls._create_header(info, USTAR_FORMAT) + \
+ return cls._create_header(info, USTAR_FORMAT, encoding, errors) + \
cls._create_payload(name)
@classmethod
- def _create_pax_generic_header(cls, pax_headers, type=XHDTYPE):
- """Return a POSIX.1-2001 extended or global header sequence
+ def _create_pax_generic_header(cls, pax_headers, type, encoding):
+ """Return a POSIX.1-2008 extended or global header sequence
that contains a list of keyword, value pairs. The values
must be unicode objects.
"""
- records = []
- for keyword, value in pax_headers.iteritems():
+ # Check if one of the fields contains surrogate characters and thereby
+ # forces hdrcharset=BINARY, see _proc_pax() for more information.
+ binary = False
+ for keyword, value in pax_headers.items():
+ try:
+ value.encode("utf8", "strict")
+ except UnicodeEncodeError:
+ binary = True
+ break
+
+ records = ""
+ if binary:
+ # Put the hdrcharset field at the beginning of the header.
+ records += "21 hdrcharset=BINARY\n"
+
+ for keyword, value in pax_headers.items():
keyword = keyword.encode("utf8")
- value = value.encode("utf8")
+ if binary:
+ # Try to restore the original byte representation of `value'.
+ # Needless to say, that the encoding must match the string.
+ value = value.encode(encoding, "surrogateescape")
+ else:
+ value = value.encode("utf8")
+
l = len(keyword) + len(value) + 3 # ' ' + '=' + '\n'
n = p = 0
while True:
@@ -1171,8 +1194,7 @@
if n == p:
break
p = n
- records.append("%d %s=%s\n" % (p, keyword, value))
- records = "".join(records)
+ records += bytes(str(p), "ascii") + " " + keyword + "=" + value + "\n"
# We use a hardcoded "././@PaxHeader" name like star does
# instead of the one that POSIX recommends.
@@ -1183,12 +1205,12 @@
info["magic"] = POSIX_MAGIC
# Create pax header + record blocks.
- return cls._create_header(info, USTAR_FORMAT) + \
+ return cls._create_header(info, USTAR_FORMAT, "ascii", "replace") + \
cls._create_payload(records)
@classmethod
- def frombuf(cls, buf):
- """Construct a TarInfo object from a 512 byte string buffer.
+ def frombuf(cls, buf, encoding, errors):
+ """Construct a TarInfo object from a 512 byte bytes object.
"""
if len(buf) == 0:
raise EmptyHeaderError("empty header")
@@ -1202,8 +1224,7 @@
raise InvalidHeaderError("bad checksum")
obj = cls()
- obj.buf = buf
- obj.name = nts(buf[0:100])
+ obj.name = nts(buf[0:100], encoding, errors)
obj.mode = nti(buf[100:108])
obj.uid = nti(buf[108:116])
obj.gid = nti(buf[116:124])
@@ -1211,18 +1232,36 @@
obj.mtime = nti(buf[136:148])
obj.chksum = chksum
obj.type = buf[156:157]
- obj.linkname = nts(buf[157:257])
- obj.uname = nts(buf[265:297])
- obj.gname = nts(buf[297:329])
+ obj.linkname = nts(buf[157:257], encoding, errors)
+ obj.uname = nts(buf[265:297], encoding, errors)
+ obj.gname = nts(buf[297:329], encoding, errors)
obj.devmajor = nti(buf[329:337])
obj.devminor = nti(buf[337:345])
- prefix = nts(buf[345:500])
+ prefix = nts(buf[345:500], encoding, errors)
# Old V7 tar format represents a directory as a regular
# file with a trailing slash.
if obj.type == AREGTYPE and obj.name.endswith("/"):
obj.type = DIRTYPE
+ # The old GNU sparse format occupies some of the unused
+ # space in the buffer for up to 4 sparse structures.
+ # Save the them for later processing in _proc_sparse().
+ if obj.type == GNUTYPE_SPARSE:
+ pos = 386
+ structs = []
+ for i in range(4):
+ try:
+ offset = nti(buf[pos:pos + 12])
+ numbytes = nti(buf[pos + 12:pos + 24])
+ except ValueError:
+ break
+ structs.append((offset, numbytes))
+ pos += 24
+ isextended = bool(buf[482])
+ origsize = nti(buf[483:495])
+ obj._sparse_structs = (structs, isextended, origsize)
+
# Remove redundant slashes from directories.
if obj.isdir():
obj.name = obj.name.rstrip("/")
@@ -1238,7 +1277,7 @@
tarfile.
"""
buf = tarfile.fileobj.read(BLOCKSIZE)
- obj = cls.frombuf(buf)
+ obj = cls.frombuf(buf, tarfile.encoding, tarfile.errors)
obj.offset = tarfile.fileobj.tell() - BLOCKSIZE
return obj._proc_member(tarfile)
@@ -1299,41 +1338,21 @@
# the longname information.
next.offset = self.offset
if self.type == GNUTYPE_LONGNAME:
- next.name = nts(buf)
+ next.name = nts(buf, tarfile.encoding, tarfile.errors)
elif self.type == GNUTYPE_LONGLINK:
- next.linkname = nts(buf)
+ next.linkname = nts(buf, tarfile.encoding, tarfile.errors)
return next
def _proc_sparse(self, tarfile):
"""Process a GNU sparse header plus extra headers.
"""
- buf = self.buf
- sp = _ringbuffer()
- pos = 386
- lastpos = 0L
- realpos = 0L
- # There are 4 possible sparse structs in the
- # first header.
- for i in xrange(4):
- try:
- offset = nti(buf[pos:pos + 12])
- numbytes = nti(buf[pos + 12:pos + 24])
- except ValueError:
- break
- if offset > lastpos:
- sp.append(_hole(lastpos, offset - lastpos))
- sp.append(_data(offset, numbytes, realpos))
- realpos += numbytes
- lastpos = offset + numbytes
- pos += 24
+ # We already collected some sparse structures in frombuf().
+ structs, isextended, origsize = self._sparse_structs
+ del self._sparse_structs
- isextended = ord(buf[482])
- origsize = nti(buf[483:495])
-
- # If the isextended flag is given,
- # there are extra headers to process.
- while isextended == 1:
+ # Collect sparse structures from extended header blocks.
+ while isextended:
buf = tarfile.fileobj.read(BLOCKSIZE)
pos = 0
for i in xrange(21):
@@ -1342,28 +1361,20 @@
numbytes = nti(buf[pos + 12:pos + 24])
except ValueError:
break
- if offset > lastpos:
- sp.append(_hole(lastpos, offset - lastpos))
- sp.append(_data(offset, numbytes, realpos))
- realpos += numbytes
- lastpos = offset + numbytes
+ if offset and numbytes:
+ structs.append((offset, numbytes))
pos += 24
- isextended = ord(buf[504])
-
- if lastpos < origsize:
- sp.append(_hole(lastpos, origsize - lastpos))
-
- self.sparse = sp
+ isextended = bool(buf[504])
+ self.sparse = structs
self.offset_data = tarfile.fileobj.tell()
tarfile.offset = self.offset_data + self._block(self.size)
self.size = origsize
-
return self
def _proc_pax(self, tarfile):
"""Process an extended or global header as described in
- POSIX.1-2001.
+ POSIX.1-2008.
"""
# Read the header information.
buf = tarfile.fileobj.read(self._block(self.size))
@@ -1376,11 +1387,29 @@
else:
pax_headers = tarfile.pax_headers.copy()
+ # Check if the pax header contains a hdrcharset field. This tells us
+ # the encoding of the path, linkpath, uname and gname fields. Normally,
+ # these fields are UTF-8 encoded but since POSIX.1-2008 tar
+ # implementations are allowed to store them as raw binary strings if
+ # the translation to UTF-8 fails.
+ match = re.search(r"\d+ hdrcharset=([^\n]+)\n", buf)
+ if match is not None:
+ pax_headers["hdrcharset"] = match.group(1).decode("utf8")
+
+ # For the time being, we don't care about anything other than "BINARY".
+ # The only other value that is currently allowed by the standard is
+ # "ISO-IR 10646 2000 UTF-8" in other words UTF-8.
+ hdrcharset = pax_headers.get("hdrcharset")
+ if hdrcharset == "BINARY":
+ encoding = tarfile.encoding
+ else:
+ encoding = "utf8"
+
# Parse pax header information. A record looks like that:
# "%d %s=%s\n" % (length, keyword, value). length is the size
# of the complete record including the length field itself and
# the newline. keyword and value are both UTF-8 encoded strings.
- regex = re.compile(r"(\d+) ([^=]+)=", re.U)
+ regex = re.compile(r"(\d+) ([^=]+)=")
pos = 0
while True:
match = regex.match(buf, pos)
@@ -1391,8 +1420,21 @@
length = int(length)
value = buf[match.end(2) + 1:match.start(1) + length - 1]
- keyword = keyword.decode("utf8")
- value = value.decode("utf8")
+ # Normally, we could just use "utf8" as the encoding and "strict"
+ # as the error handler, but we better not take the risk. For
+ # example, GNU tar <= 1.23 is known to store filenames it cannot
+ # translate to UTF-8 as raw strings (unfortunately without a
+ # hdrcharset=BINARY header).
+ # We first try the strict standard encoding, and if that fails we
+ # fall back on the user's encoding and error handler.
+ keyword = self._decode_pax_field(keyword, "utf8", "utf8",
+ tarfile.errors)
+ if keyword in PAX_NAME_FIELDS:
+ value = self._decode_pax_field(value, encoding, tarfile.encoding,
+ tarfile.errors)
+ else:
+ value = self._decode_pax_field(value, "utf8", "utf8",
+ tarfile.errors)
pax_headers[keyword] = value
pos += length
@@ -1403,6 +1445,19 @@
except HeaderError:
raise SubsequentHeaderError("missing or bad subsequent header")
+ # Process GNU sparse information.
+ if "GNU.sparse.map" in pax_headers:
+ # GNU extended sparse format version 0.1.
+ self._proc_gnusparse_01(next, pax_headers)
+
+ elif "GNU.sparse.size" in pax_headers:
+ # GNU extended sparse format version 0.0.
+ self._proc_gnusparse_00(next, pax_headers, buf)
+
+ elif pax_headers.get("GNU.sparse.major") == "1" and pax_headers.get("GNU.sparse.minor") == "0":
+ # GNU extended sparse format version 1.0.
+ self._proc_gnusparse_10(next, pax_headers, tarfile)
+
if self.type in (XHDTYPE, SOLARIS_XHDTYPE):
# Patch the TarInfo object with the extended header info.
next._apply_pax_info(pax_headers, tarfile.encoding, tarfile.errors)
@@ -1419,29 +1474,70 @@
return next
+ def _proc_gnusparse_00(self, next, pax_headers, buf):
+ """Process a GNU tar extended sparse header, version 0.0.
+ """
+ offsets = []
+ for match in re.finditer(r"\d+ GNU.sparse.offset=(\d+)\n", buf):
+ offsets.append(int(match.group(1)))
+ numbytes = []
+ for match in re.finditer(r"\d+ GNU.sparse.numbytes=(\d+)\n", buf):
+ numbytes.append(int(match.group(1)))
+ next.sparse = list(zip(offsets, numbytes))
+
+ def _proc_gnusparse_01(self, next, pax_headers):
+ """Process a GNU tar extended sparse header, version 0.1.
+ """
+ sparse = [int(x) for x in pax_headers["GNU.sparse.map"].split(",")]
+ next.sparse = list(zip(sparse[::2], sparse[1::2]))
+
+ def _proc_gnusparse_10(self, next, pax_headers, tarfile):
+ """Process a GNU tar extended sparse header, version 1.0.
+ """
+ fields = None
+ sparse = []
+ buf = tarfile.fileobj.read(BLOCKSIZE)
+ fields, buf = buf.split("\n", 1)
+ fields = int(fields)
+ while len(sparse) < fields * 2:
+ if "\n" not in buf:
+ buf += tarfile.fileobj.read(BLOCKSIZE)
+ number, buf = buf.split("\n", 1)
+ sparse.append(int(number))
+ next.offset_data = tarfile.fileobj.tell()
+ next.sparse = list(zip(sparse[::2], sparse[1::2]))
+
def _apply_pax_info(self, pax_headers, encoding, errors):
"""Replace fields with supplemental information from a previous
pax extended or global header.
"""
- for keyword, value in pax_headers.iteritems():
- if keyword not in PAX_FIELDS:
- continue
-
- if keyword == "path":
- value = value.rstrip("/")
-
- if keyword in PAX_NUMBER_FIELDS:
- try:
- value = PAX_NUMBER_FIELDS[keyword](value)
- except ValueError:
- value = 0
- else:
- value = uts(value, encoding, errors)
-
- setattr(self, keyword, value)
+ for keyword, value in pax_headers.items():
+ if keyword == "GNU.sparse.name":
+ setattr(self, "path", value)
+ elif keyword == "GNU.sparse.size":
+ setattr(self, "size", int(value))
+ elif keyword == "GNU.sparse.realsize":
+ setattr(self, "size", int(value))
+ elif keyword in PAX_FIELDS:
+ if keyword in PAX_NUMBER_FIELDS:
+ try:
+ value = PAX_NUMBER_FIELDS[keyword](value)
+ except ValueError:
+ value = 0
+ if keyword == "path":
+ value = value.rstrip("/")
+ setattr(self, keyword, value)
self.pax_headers = pax_headers.copy()
+ def _decode_pax_field(self, value, encoding, fallback_encoding, fallback_errors):
+ """Decode a single field from a pax record.
+ """
+ try:
+ return value.decode(encoding, "strict")
+ except UnicodeDecodeError:
+ return value.decode(fallback_encoding, fallback_errors)
+
def _block(self, count):
"""Round up a byte count by BLOCKSIZE and return it,
e.g. _block(834) => 1024.
@@ -1468,7 +1564,7 @@
def isfifo(self):
return self.type == FIFOTYPE
def issparse(self):
- return self.type == GNUTYPE_SPARSE
+ return self.sparse is not None
def isdev(self):
return self.type in (CHRTYPE, BLKTYPE, FIFOTYPE)
# class TarInfo
@@ -1501,7 +1597,7 @@
def __init__(self, name=None, mode="r", fileobj=None, format=None,
tarinfo=None, dereference=None, ignore_zeros=None, encoding=None,
- errors=None, pax_headers=None, debug=None, errorlevel=None):
+ errors="strict", pax_headers=None, debug=None, errorlevel=None):
"""Open an (uncompressed) tar archive `name'. `mode' is either 'r' to
read from an existing archive, 'a' to append data to an existing
file or 'w' to create a new file overwriting an existing one. `mode'
@@ -1545,13 +1641,7 @@
self.ignore_zeros = ignore_zeros
if encoding is not None:
self.encoding = encoding
-
- if errors is not None:
- self.errors = errors
- elif mode == "r":
- self.errors = "utf-8"
- else:
- self.errors = "strict"
+ self.errors = errors
if pax_headers is not None and self.format == PAX_FORMAT:
self.pax_headers = pax_headers
@@ -1604,18 +1694,6 @@
self.closed = True
raise
- def _getposix(self):
- return self.format == USTAR_FORMAT
- def _setposix(self, value):
- import warnings
- warnings.warn("use the format attribute instead", DeprecationWarning,
- 2)
- if value:
- self.format = USTAR_FORMAT
- else:
- self.format = GNU_FORMAT
- posix = property(_getposix, _setposix)
-
#--------------------------------------------------------------------------
# Below are the classmethods which act as alternate constructors to the
# TarFile class. The open() method is the only one that is needed for
@@ -1689,9 +1767,12 @@
if filemode not in "rw":
raise ValueError("mode must be 'r' or 'w'")
- t = cls(name, filemode,
- _Stream(name, filemode, comptype, fileobj, bufsize),
- **kwargs)
+ stream = _Stream(name, filemode, comptype, fileobj, bufsize)
+ try:
+ t = cls(name, filemode, stream, **kwargs)
+ except:
+ stream.close()
+ raise
t._extfileobj = False
return t
@@ -1722,16 +1803,21 @@
except (ImportError, AttributeError):
raise CompressionError("gzip module is not available")
- if fileobj is None:
- fileobj = bltn_open(name, mode + "b")
-
+ extfileobj = fileobj is not None
try:
- t = cls.taropen(name, mode,
- gzip.GzipFile(name, mode, compresslevel, fileobj),
- **kwargs)
+ fileobj = gzip.GzipFile(name, mode + "b", compresslevel, fileobj)
+ t = cls.taropen(name, mode, fileobj, **kwargs)
except IOError:
+ if not extfileobj and fileobj is not None:
+ fileobj.close()
+ if fileobj is None:
+ raise
raise ReadError("not a gzip file")
- t._extfileobj = False
+ except:
+ if not extfileobj and fileobj is not None:
+ fileobj.close()
+ raise
+ t._extfileobj = extfileobj
return t
@classmethod
@@ -1755,6 +1841,7 @@
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (IOError, EOFError):
+ fileobj.close()
raise ReadError("not a bzip2 file")
t._extfileobj = False
return t
@@ -1890,10 +1977,10 @@
tarinfo.mode = stmd
tarinfo.uid = statres.st_uid
tarinfo.gid = statres.st_gid
- if stat.S_ISREG(stmd):
+ if type == REGTYPE:
tarinfo.size = statres.st_size
else:
- tarinfo.size = 0L
+ tarinfo.size = 0
tarinfo.mtime = statres.st_mtime
tarinfo.type = type
tarinfo.linkname = linkname
@@ -1938,7 +2025,7 @@
sep = "/"
else:
sep = ""
- print tarinfo.name + (sep),
+ print tarinfo.name + sep,
if verbose:
if tarinfo.issym():
@@ -1996,17 +2083,15 @@
# Append the tar header and data to the archive.
if tarinfo.isreg():
f = bltn_open(name, "rb")
- try:
- self.addfile(tarinfo, f)
- finally:
- f.close()
+ self.addfile(tarinfo, f)
+ f.close()
elif tarinfo.isdir():
self.addfile(tarinfo)
if recursive:
for f in os.listdir(name):
self.add(os.path.join(name, f), os.path.join(arcname, f),
- recursive, exclude, filter)
+ recursive, exclude, filter=filter)
else:
self.addfile(tarinfo)
@@ -2055,10 +2140,11 @@
directories.append(tarinfo)
tarinfo = copy.copy(tarinfo)
tarinfo.mode = 0700
- self.extract(tarinfo, path)
+ # Do not set_attrs directories, as we will do that further down
+ self.extract(tarinfo, path, set_attrs=not tarinfo.isdir())
# Reverse sort directories.
- directories.sort(key=operator.attrgetter('name'))
+ directories.sort(key=lambda a: a.name)
directories.reverse()
# Set correct owner, mtime and filemode on directories.
@@ -2074,11 +2160,12 @@
else:
self._dbg(1, "tarfile: %s" % e)
- def extract(self, member, path=""):
+ def extract(self, member, path="", set_attrs=True):
"""Extract a member from the archive to the current working directory,
using its full name. Its file information is extracted as accurately
as possible. `member' may be a filename or a TarInfo object. You can
- specify a different directory using `path'.
+ specify a different directory using `path'. File attributes (owner,
+ mtime, mode) are set unless `set_attrs' is False.
"""
self._check("r")
@@ -2092,7 +2179,8 @@
tarinfo._link_target = os.path.join(path, tarinfo.linkname)
try:
- self._extract_member(tarinfo, os.path.join(path, tarinfo.name))
+ self._extract_member(tarinfo, os.path.join(path, tarinfo.name),
+ set_attrs=set_attrs)
except EnvironmentError, e:
if self.errorlevel > 0:
raise
@@ -2139,14 +2227,13 @@
raise StreamError("cannot extract (sym)link as file object")
else:
# A (sym)link's file object is its target's file object.
- return self.extractfile(self._getmember(tarinfo.linkname,
- tarinfo))
+ return self.extractfile(self._find_link_target(tarinfo))
else:
# If there's no data associated with the member (directory, chrdev,
# blkdev, etc.), return None instead of a file object.
return None
- def _extract_member(self, tarinfo, targetpath):
+ def _extract_member(self, tarinfo, targetpath, set_attrs=True):
"""Extract the TarInfo object tarinfo to a physical
file called targetpath.
"""
@@ -2183,10 +2270,11 @@
else:
self.makefile(tarinfo, targetpath)
- self.chown(tarinfo, targetpath)
- if not tarinfo.issym():
- self.chmod(tarinfo, targetpath)
- self.utime(tarinfo, targetpath)
+ if set_attrs:
+ self.chown(tarinfo, targetpath)
+ if not tarinfo.issym():
+ self.chmod(tarinfo, targetpath)
+ self.utime(tarinfo, targetpath)
#--------------------------------------------------------------------------
# Below are the different file methods. They are called via
@@ -2207,13 +2295,18 @@
def makefile(self, tarinfo, targetpath):
"""Make a file called targetpath.
"""
- source = self.extractfile(tarinfo)
+ source = self.fileobj
+ source.seek(tarinfo.offset_data)
target = bltn_open(targetpath, "wb")
- try:
- copyfileobj(source, target)
- finally:
- source.close()
- target.close()
+ if tarinfo.sparse is not None:
+ for offset, size in tarinfo.sparse:
+ target.seek(offset)
+ copyfileobj(source, target, size)
+ else:
+ copyfileobj(source, target, tarinfo.size)
+ target.seek(tarinfo.size)
+ target.truncate()
+ target.close()
def makeunknown(self, tarinfo, targetpath):
"""Make a file from a TarInfo object with an unknown type
@@ -2252,26 +2345,28 @@
instead of a link.
"""
try:
+ # For systems that support symbolic and hard links.
if tarinfo.issym():
os.symlink(tarinfo.linkname, targetpath)
else:
# See extract().
- os.link(tarinfo._link_target, targetpath)
- except AttributeError:
+ if os.path.exists(tarinfo._link_target):
+ os.link(tarinfo._link_target, targetpath)
+ else:
+ self._extract_member(self._find_link_target(tarinfo),
+ targetpath)
+ except symlink_exception:
if tarinfo.issym():
- linkpath = os.path.dirname(tarinfo.name) + "/" + \
- tarinfo.linkname
+ linkpath = os.path.join(os.path.dirname(tarinfo.name),
+ tarinfo.linkname)
else:
linkpath = tarinfo.linkname
-
+ else:
try:
- self._extract_member(self.getmember(linkpath), targetpath)
- except (EnvironmentError, KeyError), e:
- linkpath = linkpath.replace("/", os.sep)
- try:
- shutil.copy2(linkpath, targetpath)
- except EnvironmentError, e:
- raise IOError("link could not be created")
+ self._extract_member(self._find_link_target(tarinfo),
+ targetpath)
+ except KeyError:
+ raise ExtractError("unable to resolve link inside archive")
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
@@ -2281,17 +2376,11 @@
try:
g = grp.getgrnam(tarinfo.gname)[2]
except KeyError:
- try:
- g = grp.getgrgid(tarinfo.gid)[2]
- except KeyError:
- g = os.getgid()
+ g = tarinfo.gid
try:
u = pwd.getpwnam(tarinfo.uname)[2]
except KeyError:
- try:
- u = pwd.getpwuid(tarinfo.uid)[2]
- except KeyError:
- u = os.getuid()
+ u = tarinfo.uid
try:
if tarinfo.issym() and hasattr(os, "lchown"):
os.lchown(targetpath, u, g)
@@ -2370,21 +2459,28 @@
#--------------------------------------------------------------------------
# Little helper methods:
- def _getmember(self, name, tarinfo=None):
+ def _getmember(self, name, tarinfo=None, normalize=False):
"""Find an archive member by name from bottom to top.
If tarinfo is given, it is used as the starting point.
"""
# Ensure that all members have been loaded.
members = self.getmembers()
- if tarinfo is None:
- end = len(members)
- else:
- end = members.index(tarinfo)
+ # Limit the member search list up to tarinfo.
+ if tarinfo is not None:
+ members = members[:members.index(tarinfo)]
- for i in xrange(end - 1, -1, -1):
- if name == members[i].name:
- return members[i]
+ if normalize:
+ name = os.path.normpath(name)
+
+ for member in reversed(members):
+ if normalize:
+ member_name = os.path.normpath(member.name)
+ else:
+ member_name = member.name
+
+ if name == member_name:
+ return member
def _load(self):
"""Read through the entire archive file and look for readable
@@ -2405,6 +2501,25 @@
if mode is not None and self.mode not in mode:
raise IOError("bad operation for mode %r" % self.mode)
+ def _find_link_target(self, tarinfo):
+ """Find the target member of a symlink or hardlink member in the
+ archive.
+ """
+ if tarinfo.issym():
+ # Always search the entire archive.
+ linkname = os.path.dirname(tarinfo.name) + "/" + tarinfo.linkname
+ limit = None
+ else:
+ # Search the archive before the link, because a hard link is
+ # just a reference to an already archived file.
+ linkname = tarinfo.linkname
+ limit = tarinfo
+
+ member = self._getmember(linkname, tarinfo=limit, normalize=True)
+ if member is None:
+ raise KeyError("linkname %r not found" % linkname)
+ return member
+
def __iter__(self):
"""Provide an iterator object.
"""
@@ -2418,6 +2533,20 @@
"""
if level <= self.debug:
print >> sys.stderr, msg
+
+ def __enter__(self):
+ self._check()
+ return self
+
+ def __exit__(self, type, value, traceback):
+ if type is None:
+ self.close()
+ else:
+ # An exception occurred. We must not call close() because
+ # it would try to write end-of-archive blocks and padding.
+ if not self._extfileobj:
+ self.fileobj.close()
+ self.closed = True
# class TarFile
class TarIter(object):
@@ -2456,103 +2585,6 @@
self.index += 1
return tarinfo
-# Helper classes for sparse file support
-class _section(object):
- """Base class for _data and _hole.
- """
- def __init__(self, offset, size):
- self.offset = offset
- self.size = size
- def __contains__(self, offset):
- return self.offset <= offset < self.offset + self.size
-
-class _data(_section):
- """Represent a data section in a sparse file.
- """
- def __init__(self, offset, size, realpos):
- _section.__init__(self, offset, size)
- self.realpos = realpos
-
-class _hole(_section):
- """Represent a hole section in a sparse file.
- """
- pass
-
-class _ringbuffer(list):
- """Ringbuffer class which increases performance
- over a regular list.
- """
- def __init__(self):
- self.idx = 0
- def find(self, offset):
- idx = self.idx
- while True:
- item = self[idx]
- if offset in item:
- break
- idx += 1
- if idx == len(self):
- idx = 0
- if idx == self.idx:
- # End of File
- return None
- self.idx = idx
- return item
-
-#---------------------------------------------
-# zipfile compatible TarFile class
-#---------------------------------------------
-TAR_PLAIN = 0 # zipfile.ZIP_STORED
-TAR_GZIPPED = 8 # zipfile.ZIP_DEFLATED
-class TarFileCompat(object):
- """TarFile class compatible with standard module zipfile's
- ZipFile class.
- """
- def __init__(self, file, mode="r", compression=TAR_PLAIN):
- from warnings import warnpy3k
- warnpy3k("the TarFileCompat class has been removed in Python 3.0",
- stacklevel=2)
- if compression == TAR_PLAIN:
- self.tarfile = TarFile.taropen(file, mode)
- elif compression == TAR_GZIPPED:
- self.tarfile = TarFile.gzopen(file, mode)
- else:
- raise ValueError("unknown compression constant")
- if mode[0:1] == "r":
- members = self.tarfile.getmembers()
- for m in members:
- m.filename = m.name
- m.file_size = m.size
- m.date_time = time.gmtime(m.mtime)[:6]
- def namelist(self):
- return map(lambda m: m.name, self.infolist())
- def infolist(self):
- return filter(lambda m: m.type in REGULAR_TYPES,
- self.tarfile.getmembers())
- def printdir(self):
- self.tarfile.list()
- def testzip(self):
- return
- def getinfo(self, name):
- return self.tarfile.getmember(name)
- def read(self, name):
- return self.tarfile.extractfile(self.tarfile.getmember(name)).read()
- def write(self, filename, arcname=None, compress_type=None):
- self.tarfile.add(filename, arcname)
- def writestr(self, zinfo, bytes):
- try:
- from cStringIO import StringIO
- except ImportError:
- from StringIO import StringIO
- import calendar
- tinfo = TarInfo(zinfo.filename)
- tinfo.size = len(bytes)
- tinfo.mtime = calendar.timegm(zinfo.date_time)
- self.tarfile.addfile(tinfo, StringIO(bytes))
- def close(self):
- self.tarfile.close()
-#class TarFileCompat
-
#--------------------
# exported functions
#--------------------
@@ -2561,10 +2593,8 @@
are able to handle, else return False.
"""
try:
- try:
- t = open(name)
- finally:
- t.close()
+ t = open(name)
+ t.close()
return True
except TarError:
return False
diff --git a/distutils2/_backport/tests/test_shutil.py b/distutils2/_backport/tests/test_shutil.py
--- a/distutils2/_backport/tests/test_shutil.py
+++ b/distutils2/_backport/tests/test_shutil.py
@@ -1,12 +1,13 @@
import os
import sys
-import tempfile
import stat
import tarfile
+import tempfile
from os.path import splitdrive
from StringIO import StringIO
from distutils.spawn import find_executable, spawn
+from distutils2.compat import wraps
from distutils2._backport import shutil
from distutils2._backport.shutil import (
_make_tarball, _make_zipfile, make_archive, unpack_archive,
@@ -17,6 +18,7 @@
from distutils2.tests import unittest, support
from test.test_support import TESTFN
+
try:
import bz2
BZ2_SUPPORTED = True
@@ -43,6 +45,21 @@
except ImportError:
ZIP_SUPPORT = find_executable('zip')
+def _fake_rename(*args, **kwargs):
+ # Pretend the destination path is on a different filesystem.
+ raise OSError()
+
+def mock_rename(func):
+ @wraps(func)
+ def wrap(*args, **kwargs):
+ try:
+ builtin_rename = os.rename
+ os.rename = _fake_rename
+ return func(*args, **kwargs)
+ finally:
+ os.rename = builtin_rename
+ return wrap
+
class TestShutil(unittest.TestCase):
def setUp(self):
@@ -266,27 +283,45 @@
shutil.rmtree(src_dir)
shutil.rmtree(os.path.dirname(dst_dir))
- @support.skip_unless_symlink
+ @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
def test_dont_copy_file_onto_link_to_itself(self):
+ # Temporarily disable test on Windows.
+ if os.name == 'nt':
+ return
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
f = open(src, 'w')
- f.write('cheddar')
- f.close()
+ try:
+ f.write('cheddar')
+ finally:
+ f.close()
- if hasattr(os, "link"):
- os.link(src, dst)
- self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
- f = open(src, 'r')
- try:
- self.assertEqual(f.read(), 'cheddar')
- finally:
- f.close()
- os.remove(dst)
+ os.link(src, dst)
+ self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
+ f = open(src, 'r')
+ try:
+ self.assertEqual(f.read(), 'cheddar')
+ finally:
+ f.close()
+ os.remove(dst)
+ finally:
+ shutil.rmtree(TESTFN, ignore_errors=True)
+ @support.skip_unless_symlink
+ def test_dont_copy_file_onto_symlink_to_itself(self):
+ # bug 851123.
+ os.mkdir(TESTFN)
+ src = os.path.join(TESTFN, 'cheese')
+ dst = os.path.join(TESTFN, 'shop')
+ try:
+ f = open(src, 'w')
+ try:
+ f.write('cheddar')
+ finally:
+ f.close()
# Using `src` here would mean we end up with a symlink pointing
# to TESTFN/TESTFN/cheese, while it should point at
# TESTFN/cheese.
@@ -299,10 +334,7 @@
f.close()
os.remove(dst)
finally:
- try:
- shutil.rmtree(TESTFN)
- except OSError:
- pass
+ shutil.rmtree(TESTFN, ignore_errors=True)
@support.skip_unless_symlink
def test_rmtree_on_symlink(self):
@@ -329,26 +361,26 @@
finally:
os.remove(TESTFN)
- @unittest.skipUnless(hasattr(os, 'mkfifo'), 'requires os.mkfifo')
- def test_copytree_named_pipe(self):
- os.mkdir(TESTFN)
- try:
- subdir = os.path.join(TESTFN, "subdir")
- os.mkdir(subdir)
- pipe = os.path.join(subdir, "mypipe")
- os.mkfifo(pipe)
+ @support.skip_unless_symlink
+ def test_copytree_named_pipe(self):
+ os.mkdir(TESTFN)
try:
- shutil.copytree(TESTFN, TESTFN2)
- except shutil.Error, e:
- errors = e.args[0]
- self.assertEqual(len(errors), 1)
- src, dst, error_msg = errors[0]
- self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
- else:
- self.fail("shutil.Error should have been raised")
- finally:
- shutil.rmtree(TESTFN, ignore_errors=True)
- shutil.rmtree(TESTFN2, ignore_errors=True)
+ subdir = os.path.join(TESTFN, "subdir")
+ os.mkdir(subdir)
+ pipe = os.path.join(subdir, "mypipe")
+ os.mkfifo(pipe)
+ try:
+ shutil.copytree(TESTFN, TESTFN2)
+ except shutil.Error, e:
+ errors = e.args[0]
+ self.assertEqual(len(errors), 1)
+ src, dst, error_msg = errors[0]
+ self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
+ else:
+ self.fail("shutil.Error should have been raised")
+ finally:
+ shutil.rmtree(TESTFN, ignore_errors=True)
+ shutil.rmtree(TESTFN2, ignore_errors=True)
def test_copytree_special_func(self):
@@ -363,7 +395,7 @@
copied.append((src, dst))
shutil.copytree(src_dir, dst_dir, copy_function=_copy)
- self.assertEquals(len(copied), 2)
+ self.assertEqual(len(copied), 2)
@support.skip_unless_symlink
def test_copytree_dangling_symlinks(self):
@@ -386,6 +418,41 @@
shutil.copytree(src_dir, dst_dir, symlinks=True)
self.assertIn('test.txt', os.listdir(dst_dir))
+ def _copy_file(self, method):
+ fname = 'test.txt'
+ tmpdir = self.mkdtemp()
+ self.write_file([tmpdir, fname])
+ file1 = os.path.join(tmpdir, fname)
+ tmpdir2 = self.mkdtemp()
+ method(file1, tmpdir2)
+ file2 = os.path.join(tmpdir2, fname)
+ return (file1, file2)
+
+ @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
+ def test_copy(self):
+ # Ensure that the copied file exists and has the same mode bits.
+ file1, file2 = self._copy_file(shutil.copy)
+ self.assertTrue(os.path.exists(file2))
+ self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
+
+ @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
+ @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
+ def test_copy2(self):
+ # Ensure that the copied file exists and has the same mode and
+ # modification time bits.
+ file1, file2 = self._copy_file(shutil.copy2)
+ self.assertTrue(os.path.exists(file2))
+ file1_stat = os.stat(file1)
+ file2_stat = os.stat(file2)
+ self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
+ for attr in 'st_atime', 'st_mtime':
+ # The modification times may be truncated in the new file.
+ self.assertLessEqual(getattr(file1_stat, attr),
+ getattr(file2_stat, attr) + 1)
+ if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
+ self.assertEqual(getattr(file1_stat, 'st_flags'),
+ getattr(file2_stat, 'st_flags'))
+
@unittest.skipUnless(zlib, "requires zlib")
def test_make_tarball(self):
# creating something to tar
@@ -396,6 +463,8 @@
self.write_file([tmpdir, 'sub', 'file3'], 'xxx')
tmpdir2 = self.mkdtemp()
+ # force shutil to create the directory
+ os.rmdir(tmpdir2)
unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
"source and target should be on same drive")
@@ -481,7 +550,7 @@
self.assertTrue(os.path.exists(tarball2))
# let's compare both tarballs
- self.assertEquals(self._tarinfo(tarball), self._tarinfo(tarball2))
+ self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
# trying an uncompressed one
base_name = os.path.join(tmpdir2, 'archive')
@@ -514,6 +583,8 @@
self.write_file([tmpdir, 'file2'], 'xxx')
tmpdir2 = self.mkdtemp()
+ # force shutil to create the directory
+ os.rmdir(tmpdir2)
base_name = os.path.join(tmpdir2, 'archive')
_make_zipfile(base_name, tmpdir)
@@ -576,8 +647,8 @@
archive = tarfile.open(archive_name)
try:
for member in archive.getmembers():
- self.assertEquals(member.uid, 0)
- self.assertEquals(member.gid, 0)
+ self.assertEqual(member.uid, 0)
+ self.assertEqual(member.gid, 0)
finally:
archive.close()
@@ -592,7 +663,7 @@
make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
except Exception:
pass
- self.assertEquals(os.getcwd(), current_dir)
+ self.assertEqual(os.getcwd(), current_dir)
finally:
unregister_archive_format('xxx')
@@ -639,16 +710,24 @@
# let's try to unpack it now
unpack_archive(filename, tmpdir2)
diff = self._compare_dirs(tmpdir, tmpdir2)
- self.assertEquals(diff, [])
+ self.assertEqual(diff, [])
+
+ # and again, this time with the format specified
+ tmpdir3 = self.mkdtemp()
+ unpack_archive(filename, tmpdir3, format=format)
+ diff = self._compare_dirs(tmpdir, tmpdir3)
+ self.assertEqual(diff, [])
+ self.assertRaises(shutil.ReadError, unpack_archive, TESTFN)
+ self.assertRaises(ValueError, unpack_archive, TESTFN, format='xxx')
def test_unpack_registery(self):
formats = get_unpack_formats()
def _boo(filename, extract_dir, extra):
- self.assertEquals(extra, 1)
- self.assertEquals(filename, 'stuff.boo')
- self.assertEquals(extract_dir, 'xx')
+ self.assertEqual(extra, 1)
+ self.assertEqual(filename, 'stuff.boo')
+ self.assertEqual(extract_dir, 'xx')
register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
unpack_archive('stuff.boo', 'xx')
@@ -665,7 +744,7 @@
# let's leave a clean state
unregister_unpack_format('Boo2')
- self.assertEquals(get_unpack_formats(), formats)
+ self.assertEqual(get_unpack_formats(), formats)
class TestMove(unittest.TestCase):
@@ -676,15 +755,6 @@
self.dst_dir = tempfile.mkdtemp()
self.src_file = os.path.join(self.src_dir, filename)
self.dst_file = os.path.join(self.dst_dir, filename)
- # Try to create a dir in the current directory, hoping that it is
- # not located on the same filesystem as the system tmp dir.
- try:
- self.dir_other_fs = tempfile.mkdtemp(
- dir=os.path.dirname(__file__))
- self.file_other_fs = os.path.join(self.dir_other_fs,
- filename)
- except OSError:
- self.dir_other_fs = None
f = open(self.src_file, "wb")
try:
f.write("spam")
@@ -692,7 +762,7 @@
f.close()
def tearDown(self):
- for d in (self.src_dir, self.dst_dir, self.dir_other_fs):
+ for d in (self.src_dir, self.dst_dir):
try:
if d:
shutil.rmtree(d)
@@ -729,21 +799,15 @@
# Move a file inside an existing dir on the same filesystem.
self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
+ @mock_rename
def test_move_file_other_fs(self):
# Move a file to an existing dir on another filesystem.
- if not self.dir_other_fs:
- # skip
- return
- self._check_move_file(self.src_file, self.file_other_fs,
- self.file_other_fs)
+ self.test_move_file()
+ @mock_rename
def test_move_file_to_dir_other_fs(self):
# Move a file to another location on another filesystem.
- if not self.dir_other_fs:
- # skip
- return
- self._check_move_file(self.src_file, self.dir_other_fs,
- self.file_other_fs)
+ self.test_move_file_to_dir()
def test_move_dir(self):
# Move a dir to another location on the same filesystem.
@@ -756,32 +820,20 @@
except:
pass
+ @mock_rename
def test_move_dir_other_fs(self):
# Move a dir to another location on another filesystem.
- if not self.dir_other_fs:
- # skip
- return
- dst_dir = tempfile.mktemp(dir=self.dir_other_fs)
- try:
- self._check_move_dir(self.src_dir, dst_dir, dst_dir)
- finally:
- try:
- shutil.rmtree(dst_dir)
- except:
- pass
+ self.test_move_dir()
def test_move_dir_to_dir(self):
# Move a dir inside an existing dir on the same filesystem.
self._check_move_dir(self.src_dir, self.dst_dir,
os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
+ @mock_rename
def test_move_dir_to_dir_other_fs(self):
# Move a dir inside an existing dir on another filesystem.
- if not self.dir_other_fs:
- # skip
- return
- self._check_move_dir(self.src_dir, self.dir_other_fs,
- os.path.join(self.dir_other_fs, os.path.basename(self.src_dir)))
+ self.test_move_dir_to_dir()
def test_existing_file_inside_dest_dir(self):
# A file with the same name inside the destination dir already exists.
@@ -932,6 +984,23 @@
self.assertTrue(srcfile._exited_with[0] is None)
self.assertTrue(srcfile._raised)
+ def test_move_dir_caseinsensitive(self):
+ # Renames a folder to the same name
+ # but a different case.
+
+ self.src_dir = tempfile.mkdtemp()
+ dst_dir = os.path.join(
+ os.path.dirname(self.src_dir),
+ os.path.basename(self.src_dir).upper())
+ self.assertNotEqual(self.src_dir, dst_dir)
+
+ try:
+ shutil.move(self.src_dir, dst_dir)
+ self.assertTrue(os.path.isdir(dst_dir))
+ finally:
+ if os.path.exists(dst_dir):
+ os.rmdir(dst_dir)
+
def test_suite():
suite = unittest.TestSuite()
diff --git a/distutils2/_backport/tests/test_sysconfig.py b/distutils2/_backport/tests/test_sysconfig.py
--- a/distutils2/_backport/tests/test_sysconfig.py
+++ b/distutils2/_backport/tests/test_sysconfig.py
@@ -1,5 +1,3 @@
-"""Tests for sysconfig."""
-
import os
import sys
import subprocess
@@ -10,29 +8,21 @@
from distutils2._backport import sysconfig
from distutils2._backport.sysconfig import (
- _expand_globals, _expand_vars, _get_default_scheme, _subst_vars,
- get_config_var, get_config_vars, get_path, get_paths, get_platform,
- get_scheme_names, _main, _SCHEMES)
+ get_paths, get_platform, get_config_vars, get_path, get_path_names,
+ _SCHEMES, _get_default_scheme, _expand_vars, get_scheme_names,
+ get_config_var, _main)
from distutils2.tests import unittest
-from distutils2.tests.support import EnvironRestorer
+from distutils2.tests.support import skip_unless_symlink
+
from test.test_support import TESTFN, unlink
-try:
- from test.test_support import skip_unless_symlink
-except ImportError:
- skip_unless_symlink = unittest.skip(
- 'requires test.test_support.skip_unless_symlink')
-
-class TestSysConfig(EnvironRestorer, unittest.TestCase):
-
- restore_environ = ['MACOSX_DEPLOYMENT_TARGET', 'PATH']
+class TestSysConfig(unittest.TestCase):
def setUp(self):
super(TestSysConfig, self).setUp()
self.sys_path = sys.path[:]
- self.makefile = None
# patching os.uname
if hasattr(os, 'uname'):
self.uname = os.uname
@@ -45,17 +35,21 @@
self.name = os.name
self.platform = sys.platform
self.version = sys.version
- self.maxint = sys.maxint
self.sep = os.sep
self.join = os.path.join
self.isabs = os.path.isabs
self.splitdrive = os.path.splitdrive
self._config_vars = copy(sysconfig._CONFIG_VARS)
+ self._added_envvars = []
+ self._changed_envvars = []
+ for var in ('MACOSX_DEPLOYMENT_TARGET', 'PATH'):
+ if var in os.environ:
+ self._changed_envvars.append((var, os.environ[var]))
+ else:
+ self._added_envvars.append(var)
def tearDown(self):
sys.path[:] = self.sys_path
- if self.makefile is not None:
- os.unlink(self.makefile)
self._cleanup_testfn()
if self.uname is not None:
os.uname = self.uname
@@ -64,12 +58,16 @@
os.name = self.name
sys.platform = self.platform
sys.version = self.version
- sys.maxint = self.maxint
os.sep = self.sep
os.path.join = self.join
os.path.isabs = self.isabs
os.path.splitdrive = self.splitdrive
sysconfig._CONFIG_VARS = copy(self._config_vars)
+ for var, value in self._changed_envvars:
+ os.environ[var] = value
+ for var in self._added_envvars:
+ os.environ.pop(var, None)
+
super(TestSysConfig, self).tearDown()
def _set_uname(self, uname):
@@ -85,19 +83,8 @@
elif os.path.isdir(path):
shutil.rmtree(path)
- # TODO use a static list or remove the test
- #def test_get_path_names(self):
- # self.assertEqual(get_path_names(), sysconfig._SCHEME_KEYS)
-
- def test_nested_var_substitution(self):
- # Assert that the {curly brace token} expansion pattern will replace
- # only the inner {something} on nested expressions like {py{something}} on
- # the first pass.
-
- # We have no plans to make use of this, but it keeps the option open for
- # the future, at the cost only of disallowing { itself as a piece of a
- # substitution key (which would be weird).
- self.assertEqual(_subst_vars('{py{version}}', {'version': '31'}), '{py31}')
+ def test_get_path_names(self):
+ self.assertEqual(get_path_names(), _SCHEMES.options('posix_prefix'))
def test_get_paths(self):
scheme = get_paths()
@@ -108,10 +95,10 @@
self.assertEqual(scheme, wanted)
def test_get_path(self):
- # xxx make real tests here
+ # XXX make real tests here
for scheme in _SCHEMES.sections():
for name, _ in _SCHEMES.items(scheme):
- get_path(name, scheme)
+ res = get_path(name, scheme)
def test_get_config_vars(self):
cvars = get_config_vars()
@@ -146,37 +133,43 @@
'\n[GCC 4.0.1 (Apple Computer, Inc. build 5341)]')
sys.platform = 'darwin'
self._set_uname(('Darwin', 'macziade', '8.11.1',
- ('Darwin Kernel Version 8.11.1: '
- 'Wed Oct 10 18:23:28 PDT 2007; '
- 'root:xnu-792.25.20~1/RELEASE_I386'), 'PowerPC'))
- os.environ['MACOSX_DEPLOYMENT_TARGET'] = '10.3'
+ ('Darwin Kernel Version 8.11.1: '
+ 'Wed Oct 10 18:23:28 PDT 2007; '
+ 'root:xnu-792.25.20~1/RELEASE_I386'), 'PowerPC'))
+ get_config_vars()['MACOSX_DEPLOYMENT_TARGET'] = '10.3'
get_config_vars()['CFLAGS'] = ('-fno-strict-aliasing -DNDEBUG -g '
'-fwrapv -O3 -Wall -Wstrict-prototypes')
- sys.maxint = 2147483647
- self.assertEqual(get_platform(), 'macosx-10.3-ppc')
- sys.maxint = 9223372036854775807
- self.assertEqual(get_platform(), 'macosx-10.3-ppc64')
-
+ maxint = sys.maxint
+ try:
+ sys.maxint = 2147483647
+ self.assertEqual(get_platform(), 'macosx-10.3-ppc')
+ sys.maxint = 9223372036854775807
+ self.assertEqual(get_platform(), 'macosx-10.3-ppc64')
+ finally:
+ sys.maxint = maxint
self._set_uname(('Darwin', 'macziade', '8.11.1',
- ('Darwin Kernel Version 8.11.1: '
- 'Wed Oct 10 18:23:28 PDT 2007; '
- 'root:xnu-792.25.20~1/RELEASE_I386'), 'i386'))
+ ('Darwin Kernel Version 8.11.1: '
+ 'Wed Oct 10 18:23:28 PDT 2007; '
+ 'root:xnu-792.25.20~1/RELEASE_I386'), 'i386'))
get_config_vars()['MACOSX_DEPLOYMENT_TARGET'] = '10.3'
- os.environ['MACOSX_DEPLOYMENT_TARGET'] = '10.3'
+ get_config_vars()['MACOSX_DEPLOYMENT_TARGET'] = '10.3'
get_config_vars()['CFLAGS'] = ('-fno-strict-aliasing -DNDEBUG -g '
'-fwrapv -O3 -Wall -Wstrict-prototypes')
-
- sys.maxint = 2147483647
- self.assertEqual(get_platform(), 'macosx-10.3-i386')
- sys.maxint = 9223372036854775807
- self.assertEqual(get_platform(), 'macosx-10.3-x86_64')
+ maxint = sys.maxint
+ try:
+ sys.maxint = 2147483647
+ self.assertEqual(get_platform(), 'macosx-10.3-i386')
+ sys.maxint = 9223372036854775807
+ self.assertEqual(get_platform(), 'macosx-10.3-x86_64')
+ finally:
+ sys.maxint = maxint
# macbook with fat binaries (fat, universal or fat64)
- os.environ['MACOSX_DEPLOYMENT_TARGET'] = '10.4'
+ get_config_vars()['MACOSX_DEPLOYMENT_TARGET'] = '10.4'
get_config_vars()['CFLAGS'] = ('-arch ppc -arch i386 -isysroot '
'/Developer/SDKs/MacOSX10.4u.sdk '
'-fno-strict-aliasing -fno-common '
@@ -214,9 +207,9 @@
get_config_vars()['CFLAGS'] = ('-arch %s -isysroot '
'/Developer/SDKs/MacOSX10.4u.sdk '
'-fno-strict-aliasing -fno-common '
- '-dynamic -DNDEBUG -g -O3'%(arch,))
+ '-dynamic -DNDEBUG -g -O3' % arch)
- self.assertEqual(get_platform(), 'macosx-10.4-%s'%(arch,))
+ self.assertEqual(get_platform(), 'macosx-10.4-%s' % arch)
# linux debian sarge
os.name = 'posix'
@@ -234,10 +227,6 @@
config_h = sysconfig.get_config_h_filename()
self.assertTrue(os.path.isfile(config_h), config_h)
- def test_get_makefile_filename(self):
- makefile = sysconfig.get_makefile_filename()
- self.assertTrue(os.path.isfile(makefile), makefile)
-
def test_get_scheme_names(self):
wanted = ('nt', 'nt_user', 'os2', 'os2_home', 'osx_framework_user',
'posix_home', 'posix_prefix', 'posix_user')
@@ -268,14 +257,14 @@
@unittest.skipIf(sys.version < '2.6', 'requires Python 2.6 or higher')
def test_user_similar(self):
- # Issue 8759 : make sure the posix scheme for the users
+ # Issue #8759: make sure the posix scheme for the users
# is similar to the global posix_prefix one
base = get_config_var('base')
user = get_config_var('userbase')
for name in ('stdlib', 'platstdlib', 'purelib', 'platlib'):
global_path = get_path(name, 'posix_prefix')
user_path = get_path(name, 'posix_user')
- self.assertEqual(user_path, global_path.replace(base, user))
+ self.assertEqual(user_path, global_path.replace(base, user, 1))
def test_main(self):
# just making sure _main() runs and returns things in the stdout
@@ -291,25 +280,96 @@
self.assertIn(ldflags, ldshared)
- def test_expand_globals(self):
- config = RawConfigParser()
- config.add_section('globals')
- config.set('globals', 'foo', 'ok')
- config.add_section('posix')
- config.set('posix', 'config', '/etc')
- config.set('posix', 'more', '{config}/ok')
+ @unittest.skipUnless(sys.platform == "darwin", "test only relevant on MacOSX")
+ def test_platform_in_subprocess(self):
+ my_platform = sysconfig.get_platform()
- _expand_globals(config)
+ # Test without MACOSX_DEPLOYMENT_TARGET in the environment
- self.assertEqual(config.get('posix', 'foo'), 'ok')
- self.assertEqual(config.get('posix', 'more'), '/etc/ok')
+ env = os.environ.copy()
+ if 'MACOSX_DEPLOYMENT_TARGET' in env:
+ del env['MACOSX_DEPLOYMENT_TARGET']
- # we might not have globals after all
- # extending again (==no more globals section)
- _expand_globals(config)
+ devnull_fp = open('/dev/null', 'w')
+ try:
+ p = subprocess.Popen([
+ sys.executable, '-c',
+ 'from distutils2._backport import sysconfig; '
+ 'print sysconfig.get_platform()',
+ ],
+ stdout=subprocess.PIPE,
+ stderr=devnull_fp,
+ env=env)
+ finally:
+ fp.close()
+ test_platform = p.communicate()[0].strip()
+ test_platform = test_platform.decode('utf-8')
+ status = p.wait()
+
+ self.assertEqual(status, 0)
+ self.assertEqual(my_platform, test_platform)
+
+ # Test with MACOSX_DEPLOYMENT_TARGET in the environment, and
+ # using a value that is unlikely to be the default one.
+ env = os.environ.copy()
+ env['MACOSX_DEPLOYMENT_TARGET'] = '10.1'
+
+ dev_null = open('/dev/null')
+ try:
+ p = subprocess.Popen([
+ sys.executable, '-c',
+ 'from distutils2._backport import sysconfig; '
+ 'print sysconfig.get_platform()',
+ ],
+ stdout=subprocess.PIPE,
+ stderr=dev_null,
+ env=env)
+ test_platform = p.communicate()[0].strip()
+ test_platform = test_platform.decode('utf-8')
+ status = p.wait()
+
+ self.assertEqual(status, 0)
+ self.assertEqual(my_platform, test_platform)
+ finally:
+ dev_null.close()
+
+
+class MakefileTests(unittest.TestCase):
+
+ @unittest.skipIf(sys.platform.startswith('win'),
+ 'Test is not Windows compatible')
+ def test_get_makefile_filename(self):
+ makefile = sysconfig.get_makefile_filename()
+ self.assertTrue(os.path.isfile(makefile), makefile)
+
+ def test_parse_makefile(self):
+ self.addCleanup(unlink, TESTFN)
+ makefile = open(TESTFN, "w")
+ try:
+ print >> makefile, "var1=a$(VAR2)"
+ print >> makefile, "VAR2=b$(var3)"
+ print >> makefile, "var3=42"
+ print >> makefile, "var4=$/invalid"
+ print >> makefile, "var5=dollar$$5"
+ finally:
+ makefile.close()
+ vars = sysconfig._parse_makefile(TESTFN)
+ self.assertEqual(vars, {
+ 'var1': 'ab42',
+ 'VAR2': 'b42',
+ 'var3': 42,
+ 'var4': '$/invalid',
+ 'var5': 'dollar$5',
+ })
+
def test_suite():
- return unittest.makeSuite(TestSysConfig)
+ suite = unittest.TestSuite()
+ load = unittest.defaultTestLoader.loadTestsFromTestCase
+ suite.addTest(load(TestSysConfig))
+ suite.addTest(load(MakefileTests))
+ return suite
+
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
diff --git a/distutils2/command/build_scripts.py b/distutils2/command/build_scripts.py
--- a/distutils2/command/build_scripts.py
+++ b/distutils2/command/build_scripts.py
@@ -3,11 +3,12 @@
import os
import re
-from distutils2._backport import sysconfig
from distutils2.command.cmd import Command
-from distutils2.util import convert_path, newer, detect_encoding, fsencode
+from distutils2.util import convert_path, newer
from distutils2 import logger
from distutils2.compat import Mixin2to3
+from distutils2.compat import detect_encoding, fsencode
+from distutils2._backport import sysconfig
# check if Python is called on the first line with this expression
diff --git a/distutils2/command/cmd.py b/distutils2/command/cmd.py
--- a/distutils2/command/cmd.py
+++ b/distutils2/command/cmd.py
@@ -2,11 +2,10 @@
import os
import re
-from shutil import copyfile, move
from distutils2 import util
from distutils2 import logger
-from distutils2.util import make_archive
from distutils2.errors import PackagingOptionError
+from distutils2._backport.shutil import copyfile, move, make_archive
class Command(object):
diff --git a/distutils2/command/sdist.py b/distutils2/command/sdist.py
--- a/distutils2/command/sdist.py
+++ b/distutils2/command/sdist.py
@@ -4,15 +4,15 @@
import re
import sys
from StringIO import StringIO
-from shutil import rmtree
from distutils2 import logger
-from distutils2.util import resolve_name, get_archive_formats
+from distutils2.util import resolve_name
from distutils2.errors import (PackagingPlatformError, PackagingOptionError,
PackagingModuleError, PackagingFileError)
from distutils2.command import get_command_names
from distutils2.command.cmd import Command
from distutils2.manifest import Manifest
+from distutils2._backport.shutil import get_archive_formats, rmtree
def show_formats():
diff --git a/distutils2/compat.py b/distutils2/compat.py
--- a/distutils2/compat.py
+++ b/distutils2/compat.py
@@ -1,17 +1,18 @@
"""Compatibility helpers.
-This module provides classes, variables and imports which are used to
-support distutils2 across Python 2.x and 3.x.
+This module provides individual classes or objects backported from
+Python 3.2, for internal use only. Whole modules are in _backport.
"""
+import re
+import sys
+import codecs
from distutils2 import logger
# XXX Having two classes with the same name is not a good thing.
# XXX 2to3-related code should move from util to this module
-# TODO Move common code here: PY3 (bool indicating if we're on 3.x), any, etc.
-
try:
from distutils2.util import Mixin2to3 as _Mixin2to3
_CONVERT = True
@@ -55,3 +56,125 @@
def _run_2to3(self, files, doctests=[], fixers=[]):
pass
+
+
+# The rest of this file does not exist in packaging
+# functions are sorted alphabetically and are not included in __all__
+
+try:
+ any
+except NameError:
+ def any(seq):
+ for elem in seq:
+ if elem:
+ return True
+ return False
+
+
+_cookie_re = re.compile("coding[:=]\s*([-\w.]+)")
+
+def _get_normal_name(orig_enc):
+ """Imitates get_normal_name in tokenizer.c."""
+ # Only care about the first 12 characters.
+ enc = orig_enc[:12].lower().replace("_", "-")
+ if enc == "utf-8" or enc.startswith("utf-8-"):
+ return "utf-8"
+ if enc in ("latin-1", "iso-8859-1", "iso-latin-1") or \
+ enc.startswith(("latin-1-", "iso-8859-1-", "iso-latin-1-")):
+ return "iso-8859-1"
+ return orig_enc
+
+def detect_encoding(readline):
+ """
+ The detect_encoding() function is used to detect the encoding that should
+ be used to decode a Python source file. It requires one argment, readline,
+ in the same way as the tokenize() generator.
+
+ It will call readline a maximum of twice, and return the encoding used
+ (as a string) and a list of any lines (left as bytes) it has read in.
+
+ It detects the encoding from the presence of a utf-8 bom or an encoding
+ cookie as specified in pep-0263. If both a bom and a cookie are present,
+ but disagree, a SyntaxError will be raised. If the encoding cookie is an
+ invalid charset, raise a SyntaxError. Note that if a utf-8 bom is found,
+ 'utf-8-sig' is returned.
+
+ If no encoding is specified, then the default of 'utf-8' will be returned.
+ """
+ bom_found = False
+ encoding = None
+ default = 'utf-8'
+ def read_or_stop():
+ try:
+ return readline()
+ except StopIteration:
+ return ''
+
+ def find_cookie(line):
+ try:
+ line_string = line.decode('ascii')
+ except UnicodeDecodeError:
+ return None
+
+ matches = _cookie_re.findall(line_string)
+ if not matches:
+ return None
+ encoding = _get_normal_name(matches[0])
+ try:
+ codec = codecs.lookup(encoding)
+ except LookupError:
+ # This behaviour mimics the Python interpreter
+ raise SyntaxError("unknown encoding: " + encoding)
+
+ if bom_found:
+ if codec.name != 'utf-8':
+ # This behaviour mimics the Python interpreter
+ raise SyntaxError('encoding problem: utf-8')
+ encoding += '-sig'
+ return encoding
+
+ first = read_or_stop()
+ if first.startswith(codecs.BOM_UTF8):
+ bom_found = True
+ first = first[3:]
+ default = 'utf-8-sig'
+ if not first:
+ return default, []
+
+ encoding = find_cookie(first)
+ if encoding:
+ return encoding, [first]
+
+ second = read_or_stop()
+ if not second:
+ return default, [first]
+
+ encoding = find_cookie(second)
+ if encoding:
+ return encoding, [first, second]
+
+ return default, [first, second]
+
+
+def fsencode(filename):
+ """
+ Encode filename to the filesystem encoding with 'surrogateescape' error
+ handler, return bytes unchanged. On Windows, use 'strict' error handler if
+ the file system encoding is 'mbcs' (which is the default encoding).
+ """
+ if isinstance(filename, str):
+ return filename
+ elif isinstance(filename, unicode):
+ return filename.encode(sys.getfilesystemencoding())
+ else:
+ raise TypeError("expect bytes or str, not %s" % type(filename).__name__)
+
+
+try:
+ from functools import wraps
+except ImportError:
+ def wraps(func=None):
+ """No-op replacement for functools.wraps"""
+ def wrapped(func):
+ return func
+ return wrapped
diff --git a/distutils2/create.py b/distutils2/create.py
--- a/distutils2/create.py
+++ b/distutils2/create.py
@@ -27,16 +27,17 @@
import shutil
from textwrap import dedent
from ConfigParser import RawConfigParser
-from distutils2.util import cmp_to_key, detect_encoding
+
# importing this with an underscore as it should be replaced by the
# dict form or another structures for all purposes
from distutils2._trove import all_classifiers as _CLASSIFIERS_LIST
+from distutils2.compat import detect_encoding
from distutils2.version import is_valid_version
from distutils2._backport import sysconfig
try:
any
except NameError:
- from distutils2._backport import any
+ from distutils2.compat import any
try:
from hashlib import md5
except ImportError:
@@ -367,8 +368,9 @@
('description', 'summary'),
('long_description', 'description'),
('url', 'home_page'),
- ('platforms', 'platform'),
- # backport only for 2.5+
+ ('platforms', 'platform'))
+ if sys.version >= '2.5':
+ labels += (
('provides', 'provides-dist'),
('obsoletes', 'obsoletes-dist'),
('requires', 'requires-dist'))
@@ -388,21 +390,9 @@
dist.data_files = [('', dist.data_files)]
# add tokens in the destination paths
vars = {'distribution.name': data['name']}
- path_tokens = list(sysconfig.get_paths(vars=vars).items())
-
- # TODO replace this with a key function
- def length_comparison(x, y):
- len_x = len(x[1])
- len_y = len(y[1])
- if len_x == len_y:
- return 0
- elif len_x < len_y:
- return -1
- else:
- return 1
-
+ path_tokens = sysconfig.get_paths(vars=vars).items()
# sort tokens to use the longest one first
- path_tokens.sort(key=cmp_to_key(length_comparison))
+ path_tokens = sorted(path_tokens, key=lambda x: len(x[1]))
for dest, srcs in (dist.data_files or []):
dest = os.path.join(sys.prefix, dest)
dest = dest.replace(os.path.sep, '/')
diff --git a/distutils2/install.py b/distutils2/install.py
--- a/distutils2/install.py
+++ b/distutils2/install.py
@@ -17,7 +17,7 @@
from distutils2 import logger
from distutils2.dist import Distribution
from distutils2.util import (_is_archive_file, ask, get_install_method,
- egginfo_to_distinfo, unpack_archive)
+ egginfo_to_distinfo)
from distutils2.pypi import wrapper
from distutils2.version import get_version_predicate
from distutils2.database import get_distributions, get_distribution
@@ -27,6 +27,7 @@
InstallationConflict, CCompilerError)
from distutils2.pypi.errors import ProjectNotFound, ReleaseNotFound
from distutils2 import database
+from distutils2._backport.shutil import unpack_archive
from distutils2._backport.sysconfig import (get_config_var, get_path,
is_python_build)
diff --git a/distutils2/pypi/dist.py b/distutils2/pypi/dist.py
--- a/distutils2/pypi/dist.py
+++ b/distutils2/pypi/dist.py
@@ -8,20 +8,20 @@
"""
import re
-try:
- import hashlib
-except ImportError:
- from distutils2._backport import hashlib
import tempfile
import urllib
import urlparse
from distutils2.errors import IrrationalVersionError
from distutils2.version import (suggest_normalized_version, NormalizedVersion,
- get_version_predicate)
+ get_version_predicate)
from distutils2.metadata import Metadata
from distutils2.pypi.errors import (HashDoesNotMatch, UnsupportedHashName,
- CantParseArchiveName)
-from distutils2.util import unpack_archive
+ CantParseArchiveName)
+from distutils2._backport.shutil import unpack_archive
+try:
+ import hashlib
+except ImportError:
+ from distutils2._backport import hashlib
__all__ = ['ReleaseInfo', 'DistInfo', 'ReleasesList', 'get_infos_from_url']
diff --git a/distutils2/pypi/simple.py b/distutils2/pypi/simple.py
--- a/distutils2/pypi/simple.py
+++ b/distutils2/pypi/simple.py
@@ -15,11 +15,8 @@
import os
from fnmatch import translate
-try:
- from functools import wraps
-except ImportError:
- from distutils2._backport.functools import wraps
from distutils2 import logger
+from distutils2.compat import wraps
from distutils2.metadata import Metadata
from distutils2.version import get_version_predicate
from distutils2 import __version__ as distutils2_version
diff --git a/distutils2/tests/pypi_server.py b/distutils2/tests/pypi_server.py
--- a/distutils2/tests/pypi_server.py
+++ b/distutils2/tests/pypi_server.py
@@ -39,11 +39,8 @@
from SimpleXMLRPCServer import SimpleXMLRPCServer
from distutils2.tests import unittest
+from distutils2.compat import wraps
-try:
- from functools import wraps
-except ImportError:
- from distutils2._backport.functools import wraps
PYPI_DEFAULT_STATIC_PATH = os.path.join(
diff --git a/distutils2/tests/support.py b/distutils2/tests/support.py
--- a/distutils2/tests/support.py
+++ b/distutils2/tests/support.py
@@ -384,7 +384,7 @@
if not _thread:
return func
- @functools.wraps(func)
+ @wraps(func)
def decorator(*args):
key = threading_setup()
try:
diff --git a/distutils2/tests/test_command_sdist.py b/distutils2/tests/test_command_sdist.py
--- a/distutils2/tests/test_command_sdist.py
+++ b/distutils2/tests/test_command_sdist.py
@@ -20,8 +20,9 @@
from distutils2.dist import Distribution
from distutils2.tests import unittest
from distutils2.errors import PackagingOptionError
-from distutils2.util import find_executable, get_archive_formats
+from distutils2.util import find_executable
from distutils2.tests import support
+from distutils2._backport.shutil import get_archive_formats
MANIFEST = """\
diff --git a/distutils2/tests/test_util.py b/distutils2/tests/test_util.py
--- a/distutils2/tests/test_util.py
+++ b/distutils2/tests/test_util.py
@@ -15,7 +15,7 @@
from distutils2 import util
from distutils2.dist import Distribution
from distutils2.util import (
- convert_path, change_root, split_quoted, strtobool, rfc822_escape, run_2to3,
+ convert_path, change_root, split_quoted, strtobool, run_2to3,
get_compiler_versions, _MAC_OS_X_LD_VERSION, byte_compile, find_packages,
spawn, get_pypirc_path, generate_pypirc, read_pypirc, resolve_name, iglob,
RICH_GLOB, egginfo_to_distinfo, is_setuptools, is_distutils, is_packaging,
@@ -255,13 +255,6 @@
for n in no:
self.assertFalse(strtobool(n))
- def test_rfc822_escape(self):
- header = 'I am a\npoor\nlonesome\nheader\n'
- res = rfc822_escape(header)
- wanted = ('I am a%(8s)spoor%(8s)slonesome%(8s)s'
- 'header%(8s)s') % {'8s': '\n' + 8 * ' '}
- self.assertEqual(res, wanted)
-
def test_find_exe_version(self):
# the ld version scheme under MAC OS is:
# ^@(#)PROGRAM:ld PROJECT:ld64-VERSION
diff --git a/distutils2/util.py b/distutils2/util.py
--- a/distutils2/util.py
+++ b/distutils2/util.py
@@ -8,8 +8,6 @@
import codecs
import shutil
import string
-import tarfile
-import zipfile
import posixpath
import subprocess
from fnmatch import fnmatchcase
@@ -30,6 +28,30 @@
InstallationException, PackagingInternalError)
from distutils2._backport import sysconfig
+__all__ = [
+ # file dependencies
+ 'newer', 'newer_group',
+ # helpers for commands (dry-run system)
+ 'execute', 'write_file',
+ # spawning programs
+ 'find_executable', 'spawn',
+ # path manipulation
+ 'convert_path', 'change_root',
+ # 2to3 conversion
+ 'Mixin2to3', 'run_2to3',
+ # packaging compatibility helpers
+ 'cfg_to_args', 'generate_setup_py',
+ 'egginfo_to_distinfo',
+ 'get_install_method',
+ # misc
+ 'ask', 'check_environ', 'encode_multipart', 'resolve_name',
+ # querying for information TODO move to sysconfig
+ 'get_compiler_versions', 'get_platform', 'set_platform',
+ # configuration TODO move to packaging.config
+ 'get_pypirc_path', 'read_pypirc', 'generate_pypirc',
+ 'strtobool', 'split_multiline',
+]
+
_PLATFORM = None
_DEFAULT_INSTALLER = 'distutils2'
@@ -159,31 +181,6 @@
_environ_checked = True
-def subst_vars(s, local_vars):
- """Perform shell/Perl-style variable substitution on 'string'.
-
- Every occurrence of '$' followed by a name is considered a variable, and
- variable is substituted by the value found in the 'local_vars'
- dictionary, or in 'os.environ' if it's not in 'local_vars'.
- 'os.environ' is first checked/augmented to guarantee that it contains
- certain values: see 'check_environ()'. Raise ValueError for any
- variables not found in either 'local_vars' or 'os.environ'.
- """
- check_environ()
-
- def _subst(match, local_vars=local_vars):
- var_name = match.group(1)
- if var_name in local_vars:
- return str(local_vars[var_name])
- else:
- return os.environ[var_name]
-
- try:
- return re.sub(r'\$([a-zA-Z_][a-zA-Z_0-9]*)', _subst, s)
- except KeyError, e:
- raise ValueError("invalid variable '$%s'" % e)
-
-
# Needed by 'split_quoted()'
_wordchars_re = _squote_re = _dquote_re = None
@@ -195,6 +192,8 @@
_dquote_re = re.compile(r'"(?:[^"\\]|\\.)*"')
+# TODO replace with shlex.split after testing
+
def split_quoted(s):
"""Split a string up according to Unix shell-like rules for quotes and
backslashes.
@@ -446,15 +445,6 @@
file, cfile_base)
-def rfc822_escape(header):
- """Return a form of *header* suitable for inclusion in an RFC 822-header.
-
- This function ensures there are 8 spaces after each newline.
- """
- lines = header.split('\n')
- sep = '\n' + 8 * ' '
- return sep.join(lines)
-
_RE_VERSION = re.compile('(\d+\.\d+(\.\d+)*)')
_MAC_OS_X_LD_VERSION = re.compile('^@\(#\)PROGRAM:ld '
'PROJECT:ld64-((\d+)(\.\d+)*)')
@@ -554,6 +544,10 @@
"""Create *filename* and write *contents* to it.
*contents* is a sequence of strings without line terminators.
+
+ This functions is not intended to replace the usual with open + write
+ idiom in all cases, only with Command.execute, which runs depending on
+ the dry_run argument and also logs its arguments).
"""
f = open(filename, "w")
try:
@@ -575,6 +569,7 @@
def _under(path, root):
+ # XXX use os.path
path = path.split(os.sep)
root = root.split(os.sep)
if len(root) > len(path):
@@ -677,105 +672,6 @@
return base, ext
-def unzip_file(filename, location, flatten=True):
- """Unzip the file *filename* into the *location* directory."""
- if not os.path.exists(location):
- os.makedirs(location)
- zipfp = open(filename, 'rb')
-
- zip = zipfile.ZipFile(zipfp)
- leading = has_leading_dir(zip.namelist()) and flatten
- for name in zip.namelist():
- data = zip.read(name)
- fn = name
- if leading:
- fn = split_leading_dir(name)[1]
- fn = os.path.join(location, fn)
- dir = os.path.dirname(fn)
- if not os.path.exists(dir):
- os.makedirs(dir)
- if fn.endswith('/') or fn.endswith('\\'):
- # A directory
- if not os.path.exists(fn):
- os.makedirs(fn)
- else:
- fp = open(fn, 'wb')
- fp.write(data)
- fp.close()
- zipfp.close()
-
-
-def untar_file(filename, location):
- """Untar the file *filename* into the *location* directory."""
- if not os.path.exists(location):
- os.makedirs(location)
- if filename.lower().endswith('.gz') or filename.lower().endswith('.tgz'):
- mode = 'r:gz'
- elif (filename.lower().endswith('.bz2')
- or filename.lower().endswith('.tbz')):
- mode = 'r:bz2'
- elif filename.lower().endswith('.tar'):
- mode = 'r'
- else:
- mode = 'r:*'
-
- tar = tarfile.open(filename, mode)
- leading = has_leading_dir(member.name for member in tar.getmembers())
- for member in tar.getmembers():
- fn = member.name
- if leading:
- fn = split_leading_dir(fn)[1]
- path = os.path.join(location, fn)
- if member.isdir():
- if not os.path.exists(path):
- os.makedirs(path)
- else:
- try:
- fp = tar.extractfile(member)
- except (KeyError, AttributeError):
- # Some corrupt tar files seem to produce this
- # (specifically bad symlinks)
- continue
- try:
- if not os.path.exists(os.path.dirname(path)):
- os.makedirs(os.path.dirname(path))
- destfp = open(path, 'wb')
- shutil.copyfileobj(fp, destfp)
- destfp.close()
- except:
- fp.close()
- raise
- fp.close()
-
-def has_leading_dir(paths):
- """Return true if all the paths have the same leading path name.
-
- In other words, check that everything is in one subdirectory in an
- archive.
- """
- common_prefix = None
- for path in paths:
- prefix, rest = split_leading_dir(path)
- if not prefix:
- return False
- elif common_prefix is None:
- common_prefix = prefix
- elif prefix != common_prefix:
- return False
- return True
-
-
-def split_leading_dir(path):
- path = str(path)
- path = path.lstrip('/').lstrip('\\')
- if '/' in path and (('\\' in path and path.find('/') < path.find('\\'))
- or '\\' not in path):
- return path.split('/', 1)
- elif '\\' in path:
- return path.split('\\', 1)
- else:
- return path, ''
-
if sys.platform == 'darwin':
_cfg_target = None
_cfg_target_split = None
@@ -1563,11 +1459,12 @@
for key, values in fields:
# handle multiple entries for the same name
if not isinstance(values, (tuple, list)):
- values=[values]
+ values = [values]
for value in values:
l.extend((
'--' + boundary,
+ # XXX should encode to match packaging but it causes bugs
('Content-Disposition: form-data; name="%s"' % key), '', value))
for key, filename, value in files:
@@ -1584,561 +1481,3 @@
body = '\r\n'.join(l)
content_type = 'multipart/form-data; boundary=' + boundary
return content_type, body
-
-# shutil stuff
-
-try:
- import bz2
- _BZ2_SUPPORTED = True
-except ImportError:
- _BZ2_SUPPORTED = False
-
-try:
- from pwd import getpwnam
-except ImportError:
- getpwnam = None
-
-try:
- from grp import getgrnam
-except ImportError:
- getgrnam = None
-
-def _get_gid(name):
- """Returns a gid, given a group name."""
- if getgrnam is None or name is None:
- return None
- try:
- result = getgrnam(name)
- except KeyError:
- result = None
- if result is not None:
- return result[2]
- return None
-
-def _get_uid(name):
- """Returns an uid, given a user name."""
- if getpwnam is None or name is None:
- return None
- try:
- result = getpwnam(name)
- except KeyError:
- result = None
- if result is not None:
- return result[2]
- return None
-
-def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
- owner=None, group=None, logger=None):
- """Create a (possibly compressed) tar file from all the files under
- 'base_dir'.
-
- 'compress' must be "gzip" (the default), "bzip2", or None.
-
- 'owner' and 'group' can be used to define an owner and a group for the
- archive that is being built. If not provided, the current owner and group
- will be used.
-
- The output tar file will be named 'base_name' + ".tar", possibly plus
- the appropriate compression extension (".gz", or ".bz2").
-
- Returns the output filename.
- """
- tar_compression = {'gzip': 'gz', None: ''}
- compress_ext = {'gzip': '.gz'}
-
- if _BZ2_SUPPORTED:
- tar_compression['bzip2'] = 'bz2'
- compress_ext['bzip2'] = '.bz2'
-
- # flags for compression program, each element of list will be an argument
- if compress is not None and compress not in compress_ext.keys():
- raise ValueError("bad value for 'compress', or compression format not "
- "supported : %s" % compress)
-
- archive_name = base_name + '.tar' + compress_ext.get(compress, '')
- archive_dir = os.path.dirname(archive_name)
-
- if not os.path.exists(archive_dir):
- if logger is not None:
- logger.info("creating %s" % archive_dir)
- if not dry_run:
- os.makedirs(archive_dir)
-
- # creating the tarball
- if logger is not None:
- logger.info('Creating tar archive')
-
- uid = _get_uid(owner)
- gid = _get_gid(group)
-
- def _set_uid_gid(tarinfo):
- if gid is not None:
- tarinfo.gid = gid
- tarinfo.gname = group
- if uid is not None:
- tarinfo.uid = uid
- tarinfo.uname = owner
- return tarinfo
-
- if not dry_run:
- tar = tarfile.open(archive_name, 'w|%s' % tar_compression[compress])
- try:
- #tar.add(base_dir, filter=_set_uid_gid)
- tar.add(base_dir)
- finally:
- tar.close()
-
- return archive_name
-
-def _call_external_zip(base_dir, zip_filename, verbose=False, dry_run=False):
- # XXX see if we want to keep an external call here
- if verbose:
- zipoptions = "-r"
- else:
- zipoptions = "-rq"
- from distutils.errors import DistutilsExecError
- from distutils.spawn import spawn
- try:
- spawn(["zip", zipoptions, zip_filename, base_dir], dry_run=dry_run)
- except DistutilsExecError:
- # XXX really should distinguish between "couldn't find
- # external 'zip' command" and "zip failed".
- raise ExecError("unable to create zip file '%s': "
- "could neither import the 'zipfile' module nor "
- "find a standalone zip utility") % zip_filename
-
-def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None):
- """Create a zip file from all the files under 'base_dir'.
-
- The output zip file will be named 'base_name' + ".zip". Uses either the
- "zipfile" Python module (if available) or the InfoZIP "zip" utility
- (if installed and found on the default search path). If neither tool is
- available, raises ExecError. Returns the name of the output zip
- file.
- """
- zip_filename = base_name + ".zip"
- archive_dir = os.path.dirname(base_name)
-
- if not os.path.exists(archive_dir):
- if logger is not None:
- logger.info("creating %s", archive_dir)
- if not dry_run:
- os.makedirs(archive_dir)
-
- # If zipfile module is not available, try spawning an external 'zip'
- # command.
- try:
- import zipfile
- except ImportError:
- zipfile = None
-
- if zipfile is None:
- _call_external_zip(base_dir, zip_filename, verbose, dry_run)
- else:
- if logger is not None:
- logger.info("creating '%s' and adding '%s' to it",
- zip_filename, base_dir)
-
- if not dry_run:
- zip = zipfile.ZipFile(zip_filename, "w",
- compression=zipfile.ZIP_DEFLATED)
-
- for dirpath, dirnames, filenames in os.walk(base_dir):
- for name in filenames:
- path = os.path.normpath(os.path.join(dirpath, name))
- if os.path.isfile(path):
- zip.write(path, path)
- if logger is not None:
- logger.info("adding '%s'", path)
- zip.close()
-
- return zip_filename
-
-_ARCHIVE_FORMATS = {
- 'gztar': (_make_tarball, [('compress', 'gzip')], "gzip'ed tar-file"),
- 'bztar': (_make_tarball, [('compress', 'bzip2')], "bzip2'ed tar-file"),
- 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"),
- 'zip': (_make_zipfile, [],"ZIP file")
- }
-
-if _BZ2_SUPPORTED:
- _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
- "bzip2'ed tar-file")
-
-def get_archive_formats():
- """Returns a list of supported formats for archiving and unarchiving.
-
- Each element of the returned sequence is a tuple (name, description)
- """
- formats = [(name, registry[2]) for name, registry in
- _ARCHIVE_FORMATS.items()]
- formats.sort()
- return formats
-
-def register_archive_format(name, function, extra_args=None, description=''):
- """Registers an archive format.
-
- name is the name of the format. function is the callable that will be
- used to create archives. If provided, extra_args is a sequence of
- (name, value) tuples that will be passed as arguments to the callable.
- description can be provided to describe the format, and will be returned
- by the get_archive_formats() function.
- """
- if extra_args is None:
- extra_args = []
- if not isinstance(function, collections.Callable):
- raise TypeError('The %s object is not callable' % function)
- if not isinstance(extra_args, (tuple, list)):
- raise TypeError('extra_args needs to be a sequence')
- for element in extra_args:
- if not isinstance(element, (tuple, list)) or len(element) !=2 :
- raise TypeError('extra_args elements are : (arg_name, value)')
-
- _ARCHIVE_FORMATS[name] = (function, extra_args, description)
-
-def unregister_archive_format(name):
- del _ARCHIVE_FORMATS[name]
-
-def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
- dry_run=0, owner=None, group=None, logger=None):
- """Create an archive file (eg. zip or tar).
-
- 'base_name' is the name of the file to create, minus any format-specific
- extension; 'format' is the archive format: one of "zip", "tar", "bztar"
- or "gztar".
-
- 'root_dir' is a directory that will be the root directory of the
- archive; ie. we typically chdir into 'root_dir' before creating the
- archive. 'base_dir' is the directory where we start archiving from;
- ie. 'base_dir' will be the common prefix of all files and
- directories in the archive. 'root_dir' and 'base_dir' both default
- to the current directory. Returns the name of the archive file.
-
- 'owner' and 'group' are used when creating a tar archive. By default,
- uses the current owner and group.
- """
- save_cwd = os.getcwd()
- base_name = fsencode(base_name)
- if root_dir is not None:
- if logger is not None:
- logger.debug("changing into '%s'", root_dir)
- base_name = os.path.abspath(base_name)
- if not dry_run:
- os.chdir(root_dir)
-
- if base_dir is None:
- base_dir = os.curdir
-
- kwargs = {'dry_run': dry_run, 'logger': logger}
-
- try:
- format_info = _ARCHIVE_FORMATS[format]
- except KeyError:
- raise ValueError("unknown archive format '%s'" % format)
-
- func = format_info[0]
- for arg, val in format_info[1]:
- kwargs[arg] = val
-
- if format != 'zip':
- kwargs['owner'] = owner
- kwargs['group'] = group
-
- try:
- filename = func(base_name, base_dir, **kwargs)
- finally:
- if root_dir is not None:
- if logger is not None:
- logger.debug("changing back to '%s'", save_cwd)
- os.chdir(save_cwd)
-
- return filename
-
-
-def get_unpack_formats():
- """Returns a list of supported formats for unpacking.
-
- Each element of the returned sequence is a tuple
- (name, extensions, description)
- """
- formats = [(name, info[0], info[3]) for name, info in
- _UNPACK_FORMATS.items()]
- formats.sort()
- return formats
-
-def _check_unpack_options(extensions, function, extra_args):
- """Checks what gets registered as an unpacker."""
- # first make sure no other unpacker is registered for this extension
- existing_extensions = {}
- for name, info in _UNPACK_FORMATS.items():
- for ext in info[0]:
- existing_extensions[ext] = name
-
- for extension in extensions:
- if extension in existing_extensions:
- msg = '%s is already registered for "%s"'
- raise RegistryError(msg % (extension,
- existing_extensions[extension]))
-
- if not isinstance(function, collections.Callable):
- raise TypeError('The registered function must be a callable')
-
-
-def register_unpack_format(name, extensions, function, extra_args=None,
- description=''):
- """Registers an unpack format.
-
- `name` is the name of the format. `extensions` is a list of extensions
- corresponding to the format.
-
- `function` is the callable that will be
- used to unpack archives. The callable will receive archives to unpack.
- If it's unable to handle an archive, it needs to raise a ReadError
- exception.
-
- If provided, `extra_args` is a sequence of
- (name, value) tuples that will be passed as arguments to the callable.
- description can be provided to describe the format, and will be returned
- by the get_unpack_formats() function.
- """
- if extra_args is None:
- extra_args = []
- _check_unpack_options(extensions, function, extra_args)
- _UNPACK_FORMATS[name] = extensions, function, extra_args, description
-
-def unregister_unpack_format(name):
- """Removes the pack format from the registery."""
- del _UNPACK_FORMATS[name]
-
-def _ensure_directory(path):
- """Ensure that the parent directory of `path` exists"""
- dirname = os.path.dirname(path)
- if not os.path.isdir(dirname):
- os.makedirs(dirname)
-
-def _unpack_zipfile(filename, extract_dir):
- """Unpack zip `filename` to `extract_dir`
- """
- try:
- import zipfile
- except ImportError:
- raise ReadError('zlib not supported, cannot unpack this archive.')
-
- if not zipfile.is_zipfile(filename):
- raise ReadError("%s is not a zip file" % filename)
-
- zip = zipfile.ZipFile(filename)
- try:
- for info in zip.infolist():
- name = info.filename
-
- # don't extract absolute paths or ones with .. in them
- if name.startswith('/') or '..' in name:
- continue
-
- target = os.path.join(extract_dir, *name.split('/'))
- if not target:
- continue
-
- _ensure_directory(target)
- if not name.endswith('/'):
- # file
- data = zip.read(info.filename)
- f = open(target,'wb')
- try:
- f.write(data)
- finally:
- f.close()
- del data
- finally:
- zip.close()
-
-def _unpack_tarfile(filename, extract_dir):
- """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
- """
- try:
- tarobj = tarfile.open(filename)
- except tarfile.TarError:
- raise ReadError(
- "%s is not a compressed or uncompressed tar file" % filename)
- try:
- tarobj.extractall(extract_dir)
- finally:
- tarobj.close()
-
-_UNPACK_FORMATS = {
- 'gztar': (['.tar.gz', '.tgz'], _unpack_tarfile, [], "gzip'ed tar-file"),
- 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
- 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file")
- }
-
-if _BZ2_SUPPORTED:
- _UNPACK_FORMATS['bztar'] = (['.bz2'], _unpack_tarfile, [],
- "bzip2'ed tar-file")
-
-def _find_unpack_format(filename):
- for name, info in _UNPACK_FORMATS.items():
- for extension in info[0]:
- if filename.endswith(extension):
- return name
- return None
-
-def unpack_archive(filename, extract_dir=None, format=None):
- """Unpack an archive.
-
- `filename` is the name of the archive.
-
- `extract_dir` is the name of the target directory, where the archive
- is unpacked. If not provided, the current working directory is used.
-
- `format` is the archive format: one of "zip", "tar", or "gztar". Or any
- other registered format. If not provided, unpack_archive will use the
- filename extension and see if an unpacker was registered for that
- extension.
-
- In case none is found, a ValueError is raised.
- """
- if extract_dir is None:
- extract_dir = os.getcwd()
-
- if format is not None:
- try:
- format_info = _UNPACK_FORMATS[format]
- except KeyError:
- raise ValueError("Unknown unpack format '%s'" % format)
-
- func = format_info[1]
- func(filename, extract_dir, **dict(format_info[2]))
- else:
- # we need to look at the registered unpackers supported extensions
- format = _find_unpack_format(filename)
- if format is None:
- raise ReadError("Unknown archive format '%s'" % filename)
-
- func = _UNPACK_FORMATS[format][1]
- kwargs = dict(_UNPACK_FORMATS[format][2])
- func(filename, extract_dir, **kwargs)
-
-# tokenize stuff
-
-cookie_re = re.compile("coding[:=]\s*([-\w.]+)")
-
-def _get_normal_name(orig_enc):
- """Imitates get_normal_name in tokenizer.c."""
- # Only care about the first 12 characters.
- enc = orig_enc[:12].lower().replace("_", "-")
- if enc == "utf-8" or enc.startswith("utf-8-"):
- return "utf-8"
- if enc in ("latin-1", "iso-8859-1", "iso-latin-1") or \
- enc.startswith(("latin-1-", "iso-8859-1-", "iso-latin-1-")):
- return "iso-8859-1"
- return orig_enc
-
-def detect_encoding(readline):
- """
- The detect_encoding() function is used to detect the encoding that should
- be used to decode a Python source file. It requires one argment, readline,
- in the same way as the tokenize() generator.
-
- It will call readline a maximum of twice, and return the encoding used
- (as a string) and a list of any lines (left as bytes) it has read in.
-
- It detects the encoding from the presence of a utf-8 bom or an encoding
- cookie as specified in pep-0263. If both a bom and a cookie are present,
- but disagree, a SyntaxError will be raised. If the encoding cookie is an
- invalid charset, raise a SyntaxError. Note that if a utf-8 bom is found,
- 'utf-8-sig' is returned.
-
- If no encoding is specified, then the default of 'utf-8' will be returned.
- """
- bom_found = False
- encoding = None
- default = 'utf-8'
- def read_or_stop():
- try:
- return readline()
- except StopIteration:
- return ''
-
- def find_cookie(line):
- try:
- line_string = line.decode('ascii')
- except UnicodeDecodeError:
- return None
-
- matches = cookie_re.findall(line_string)
- if not matches:
- return None
- encoding = _get_normal_name(matches[0])
- try:
- codec = codecs.lookup(encoding)
- except LookupError:
- # This behaviour mimics the Python interpreter
- raise SyntaxError("unknown encoding: " + encoding)
-
- if bom_found:
- if codec.name != 'utf-8':
- # This behaviour mimics the Python interpreter
- raise SyntaxError('encoding problem: utf-8')
- encoding += '-sig'
- return encoding
-
- first = read_or_stop()
- if first.startswith(codecs.BOM_UTF8):
- bom_found = True
- first = first[3:]
- default = 'utf-8-sig'
- if not first:
- return default, []
-
- encoding = find_cookie(first)
- if encoding:
- return encoding, [first]
-
- second = read_or_stop()
- if not second:
- return default, [first]
-
- encoding = find_cookie(second)
- if encoding:
- return encoding, [first, second]
-
- return default, [first, second]
-
-# functools stuff
-
-def cmp_to_key(mycmp):
- """Convert a cmp= function into a key= function"""
- class K(object):
- __slots__ = ['obj']
- def __init__(self, obj):
- self.obj = obj
- def __lt__(self, other):
- return mycmp(self.obj, other.obj) < 0
- def __gt__(self, other):
- return mycmp(self.obj, other.obj) > 0
- def __eq__(self, other):
- return mycmp(self.obj, other.obj) == 0
- def __le__(self, other):
- return mycmp(self.obj, other.obj) <= 0
- def __ge__(self, other):
- return mycmp(self.obj, other.obj) >= 0
- def __ne__(self, other):
- return mycmp(self.obj, other.obj) != 0
- __hash__ = None
- return K
-
-# os stuff
-
-def fsencode(filename):
- """
- Encode filename to the filesystem encoding with 'surrogateescape' error
- handler, return bytes unchanged. On Windows, use 'strict' error handler if
- the file system encoding is 'mbcs' (which is the default encoding).
- """
- if isinstance(filename, str):
- return filename
- elif isinstance(filename, unicode):
- return filename.encode(sys.getfilesystemencoding())
- else:
- raise TypeError("expect bytes or str, not %s" % type(filename).__name__)
--
Repository URL: http://hg.python.org/distutils2
More information about the Python-checkins
mailing list