[Python-checkins] distutils2: move the unpack utilities to _backport
tarek.ziade
python-checkins at python.org
Sun Jan 30 10:43:57 CET 2011
tarek.ziade pushed e26de21a7b4e to distutils2:
http://hg.python.org/distutils2/rev/e26de21a7b4e
changeset: 923:e26de21a7b4e
user: Alexis Metaireau <alexis at notmyidea.org>
date: Mon Jan 03 22:04:17 2011 +0100
summary:
move the unpack utilities to _backport
files:
distutils2/_backport/shutil.py
distutils2/index/dist.py
distutils2/install.py
distutils2/util.py
diff --git a/distutils2/_backport/shutil.py b/distutils2/_backport/shutil.py
--- a/distutils2/_backport/shutil.py
+++ b/distutils2/_backport/shutil.py
@@ -568,3 +568,72 @@
os.chdir(save_cwd)
return filename
+
+
+def _ensure_directory(path):
+ """Ensure that the parent directory of `path` exists"""
+ dirname = os.path.dirname(path)
+ if not os.path.isdir(dirname):
+ os.makedirs(dirname)
+
+def _unpack_zipfile(filename, extract_dir):
+ try:
+ import zipfile
+ except ImportError:
+ raise ReadError('zlib not supported, cannot unpack this archive.')
+
+ if not zipfile.is_zipfile(filename):
+ raise ReadError("%s is not a zip file" % filename)
+
+ zip = zipfile.ZipFile(filename)
+ try:
+ for info in zip.infolist():
+ name = info.filename
+ if name.startswith('/') or '..' in name:
+ continue
+
+ target = os.path.join(extract_dir, *name.split('/'))
+ if not target:
+ continue
+
+ _ensure_directory(target)
+ if not name.endswith('/'):
+ # file
+ data = zip.read(info.filename)
+ f = open(target,'wb')
+ try:
+ f.write(data)
+ finally:
+ f.close()
+ del data
+ finally:
+ zip.close()
+
+def _unpack_tarfile(filename, extract_dir):
+ try:
+ tarobj = tarfile.open(filename)
+ except tarfile.TarError:
+ raise ReadError(
+ "%s is not a compressed or uncompressed tar file" % filename)
+ try:
+ tarobj.extractall(extract_dir)
+ finally:
+ tarobj.close()
+
+
+_UNPACKERS = (
+ (['.tar.gz', '.tgz', '.tar'], _unpack_tarfile),
+ (['.zip', '.egg'], _unpack_zipfile))
+
+
+def unpack_archive(filename, extract_dir=None):
+ if extract_dir is None:
+ extract_dir = os.path.dirname(filename)
+
+ for formats, func in _UNPACKERS:
+ for format in formats:
+ if filename.endswith(format):
+ func(filename, extract_dir)
+ return extract_dir
+
+ raise ValueError('Unknown archive format: %s' % filename)
diff --git a/distutils2/index/dist.py b/distutils2/index/dist.py
--- a/distutils2/index/dist.py
+++ b/distutils2/index/dist.py
@@ -22,13 +22,14 @@
except ImportError:
from distutils2._backport import hashlib
+from distutils2._backport.shutil import unpack_archive
from distutils2.errors import IrrationalVersionError
from distutils2.index.errors import (HashDoesNotMatch, UnsupportedHashName,
CantParseArchiveName)
from distutils2.version import (suggest_normalized_version, NormalizedVersion,
get_version_predicate)
from distutils2.metadata import DistributionMetadata
-from distutils2.util import untar_file, unzip_file, splitext
+from distutils2.util import splitext
__all__ = ['ReleaseInfo', 'DistInfo', 'ReleasesList', 'get_infos_from_url']
@@ -313,17 +314,8 @@
filename = self.download()
content_type = mimetypes.guess_type(filename)[0]
+ self._unpacked_dir = unpack_archive(filename)
- if (content_type == 'application/zip'
- or filename.endswith('.zip')
- or filename.endswith('.pybundle')
- or zipfile.is_zipfile(filename)):
- unzip_file(filename, path, flatten=not filename.endswith('.pybundle'))
- elif (content_type == 'application/x-gzip'
- or tarfile.is_tarfile(filename)
- or splitext(filename)[1].lower() in ('.tar', '.tar.gz', '.tar.bz2', '.tgz', '.tbz')):
- untar_file(filename, path)
- self._unpacked_dir = path
return self._unpacked_dir
def _check_md5(self, filename):
diff --git a/distutils2/install.py b/distutils2/install.py
--- a/distutils2/install.py
+++ b/distutils2/install.py
@@ -60,77 +60,6 @@
yield old, new
-
-# ripped from shutil
-def _ensure_directory(path):
- """Ensure that the parent directory of `path` exists"""
- dirname = os.path.dirname(path)
- if not os.path.isdir(dirname):
- os.makedirs(dirname)
-
-def _unpack_zipfile(filename, extract_dir):
- try:
- import zipfile
- except ImportError:
- raise ReadError('zlib not supported, cannot unpack this archive.')
-
- if not zipfile.is_zipfile(filename):
- raise ReadError("%s is not a zip file" % filename)
-
- zip = zipfile.ZipFile(filename)
- try:
- for info in zip.infolist():
- name = info.filename
- if name.startswith('/') or '..' in name:
- continue
-
- target = os.path.join(extract_dir, *name.split('/'))
- if not target:
- continue
-
- _ensure_directory(target)
- if not name.endswith('/'):
- # file
- data = zip.read(info.filename)
- f = open(target,'wb')
- try:
- f.write(data)
- finally:
- f.close()
- del data
- finally:
- zip.close()
-
-def _unpack_tarfile(filename, extract_dir):
- try:
- tarobj = tarfile.open(filename)
- except tarfile.TarError:
- raise ReadError(
- "%s is not a compressed or uncompressed tar file" % filename)
- try:
- tarobj.extractall(extract_dir)
- finally:
- tarobj.close()
-
-
-_UNPACKERS = (
- (['.tar.gz', '.tgz', '.tar'], _unpack_tarfile),
- (['.zip', '.egg'], _unpack_zipfile))
-
-
-def _unpack(filename, extract_dir=None):
- if extract_dir is None:
- extract_dir = os.path.dirname(filename)
-
- for formats, func in _UNPACKERS:
- for format in formats:
- if filename.endswith(format):
- func(filename, extract_dir)
- return extract_dir
-
- raise ValueError('Unknown archive format: %s' % filename)
-
-
def _install_dist(dist, path):
"""Install a distribution into a path"""
def _run_d1_install(archive_dir, path):
@@ -152,11 +81,7 @@
# using our own install command
raise NotImplementedError()
- # download
- archive = dist.download()
-
- # unarchive
- where = _unpack(archive)
+ where = dist.unpack(archive)
# get into the dir
archive_dir = None
diff --git a/distutils2/util.py b/distutils2/util.py
--- a/distutils2/util.py
+++ b/distutils2/util.py
@@ -674,83 +674,6 @@
return base, ext
-def unzip_file(filename, location, flatten=True):
- """Unzip the file (zip file located at filename) to the destination
- location"""
- if not os.path.exists(location):
- os.makedirs(location)
- zipfp = open(filename, 'rb')
- try:
- zip = zipfile.ZipFile(zipfp)
- leading = has_leading_dir(zip.namelist()) and flatten
- for name in zip.namelist():
- data = zip.read(name)
- fn = name
- if leading:
- fn = split_leading_dir(name)[1]
- fn = os.path.join(location, fn)
- dir = os.path.dirname(fn)
- if not os.path.exists(dir):
- os.makedirs(dir)
- if fn.endswith('/') or fn.endswith('\\'):
- # A directory
- if not os.path.exists(fn):
- os.makedirs(fn)
- else:
- fp = open(fn, 'wb')
- try:
- fp.write(data)
- finally:
- fp.close()
- finally:
- zipfp.close()
-
-
-def untar_file(filename, location):
- """Untar the file (tar file located at filename) to the destination
- location
- """
- if not os.path.exists(location):
- os.makedirs(location)
- if filename.lower().endswith('.gz') or filename.lower().endswith('.tgz'):
- mode = 'r:gz'
- elif (filename.lower().endswith('.bz2')
- or filename.lower().endswith('.tbz')):
- mode = 'r:bz2'
- elif filename.lower().endswith('.tar'):
- mode = 'r'
- else:
- mode = 'r:*'
- tar = tarfile.open(filename, mode)
- try:
- leading = has_leading_dir([member.name for member in tar.getmembers()])
- for member in tar.getmembers():
- fn = member.name
- if leading:
- fn = split_leading_dir(fn)[1]
- path = os.path.join(location, fn)
- if member.isdir():
- if not os.path.exists(path):
- os.makedirs(path)
- else:
- try:
- fp = tar.extractfile(member)
- except (KeyError, AttributeError):
- # Some corrupt tar files seem to produce this
- # (specifically bad symlinks)
- continue
- if not os.path.exists(os.path.dirname(path)):
- os.makedirs(os.path.dirname(path))
- destfp = open(path, 'wb')
- try:
- shutil.copyfileobj(fp, destfp)
- finally:
- destfp.close()
- fp.close()
- finally:
- tar.close()
-
-
def has_leading_dir(paths):
"""Returns true if all the paths have the same leading path name
(i.e., everything is in one subdirectory in an archive)"""
--
Repository URL: http://hg.python.org/distutils2
More information about the Python-checkins
mailing list