[Python-checkins] r80589 - in python/branches/py3k: Doc/library/shutil.rst Lib/shutil.py Lib/test/test_shutil.py Misc/NEWS

tarek.ziade python-checkins at python.org
Wed Apr 28 19:51:36 CEST 2010


Author: tarek.ziade
Date: Wed Apr 28 19:51:36 2010
New Revision: 80589

Log:
#8295 : Added shutil.unpack_archive and related APIs

Modified:
   python/branches/py3k/Doc/library/shutil.rst
   python/branches/py3k/Lib/shutil.py
   python/branches/py3k/Lib/test/test_shutil.py
   python/branches/py3k/Misc/NEWS

Modified: python/branches/py3k/Doc/library/shutil.rst
==============================================================================
--- python/branches/py3k/Doc/library/shutil.rst	(original)
+++ python/branches/py3k/Doc/library/shutil.rst	Wed Apr 28 19:51:36 2010
@@ -288,13 +288,75 @@
    .. versionadded:: 3.2
 
 
-.. function::  unregister_archive_format(name)
+.. function:: unregister_archive_format(name)
 
    Remove the archive format *name* from the list of supported formats.
 
    .. versionadded:: 3.2
 
 
+.. function:: unpack_archive(filename[, extract_dir[, format]])
+
+   Unpack an archive. *filename* is the full path of the archive.
+
+   *extract_dir* is the name of the target directory where the archive is
+   unpacked. If not provided, the current working directory is used.
+
+   *format* is the archive format: one of "zip", "tar", or "gztar". Or any
+   other format registered with :func:`register_unpack_format`. If not
+   provided, :func:`unpack_archive` will use the archive file name extension
+   and see if an unpacker was registered for that extension. In case none is
+   found, a :exc:`ValueError` is raised.
+
+   .. versionadded:: 3.2
+
+
+.. function:: register_unpack_format(name, extensions, function[, extra_args[,description]])
+
+   Registers an unpack format. *name* is the name of the format and
+   *extensions* is a list of extensions corresponding to the format, like
+   ``.zip`` for Zip files.
+
+   *function* is the callable that will be used to unpack archives. The
+   callable will receive the path of the archive, followed by the directory
+   the archive must be extracted to.
+
+   When provided, *extra_args* is a sequence of ``(name, value)`` tuples that
+   will be passed as keywords arguments to the callable.
+
+   *description* can be provided to describe the format, and will be returned
+   by the :func:`get_unpack_formats` function.
+
+   .. versionadded:: 3.2
+
+
+.. function:: unregister_unpack_format(name)
+
+   Unregister an unpack format. *name* is the name of the format.
+
+   .. versionadded:: 3.2
+
+
+.. function:: get_unpack_formats()
+
+   Return a list of all registered formats for unpacking.
+   Each element of the returned sequence is a tuple
+   ``(name, extensions, description)``.
+
+   By default :mod:`shutil` provides these formats:
+
+   - *gztar*: gzip'ed tar-file
+   - *bztar*: bzip2'ed tar-file
+   - *tar*: uncompressed tar file
+   - *zip*: ZIP file
+
+   You can register new formats or provide your own unpacker for any existing
+   formats, by using :func:`register_unpack_format`.
+
+   .. versionadded:: 3.2
+
+
+
 Archiving example
 :::::::::::::::::
 

Modified: python/branches/py3k/Lib/shutil.py
==============================================================================
--- python/branches/py3k/Lib/shutil.py	(original)
+++ python/branches/py3k/Lib/shutil.py	Wed Apr 28 19:51:36 2010
@@ -11,6 +11,7 @@
 import fnmatch
 import collections
 import errno
+import tarfile
 
 try:
     from pwd import getpwnam
@@ -25,7 +26,9 @@
 __all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
            "copytree", "move", "rmtree", "Error", "SpecialFileError",
            "ExecError", "make_archive", "get_archive_formats",
-           "register_archive_format", "unregister_archive_format"]
+           "register_archive_format", "unregister_archive_format",
+           "get_unpack_formats", "register_unpack_format",
+           "unregister_unpack_format", "unpack_archive"]
 
 class Error(EnvironmentError):
     pass
@@ -37,6 +40,14 @@
 class ExecError(EnvironmentError):
     """Raised when a command could not be executed"""
 
+class ReadError(EnvironmentError):
+    """Raised when an archive cannot be read"""
+
+class RegistryError(Exception):
+    """Raised when a registery operation with the archiving
+    and unpacking registeries fails"""
+
+
 try:
     WindowsError
 except NameError:
@@ -381,10 +392,7 @@
         if not dry_run:
             os.makedirs(archive_dir)
 
-
     # creating the tarball
-    import tarfile  # late import so Python build itself doesn't break
-
     if logger is not None:
         logger.info('Creating tar archive')
 
@@ -567,3 +575,165 @@
             os.chdir(save_cwd)
 
     return filename
+
+
+def get_unpack_formats():
+    """Returns a list of supported formats for unpacking.
+
+    Each element of the returned sequence is a tuple
+    (name, extensions, description)
+    """
+    formats = [(name, info[0], info[3]) for name, info in
+               _UNPACK_FORMATS.items()]
+    formats.sort()
+    return formats
+
+def _check_unpack_options(extensions, function, extra_args):
+    """Checks what gets registered as an unpacker."""
+    # first make sure no other unpacker is registered for this extension
+    existing_extensions = {}
+    for name, info in _UNPACK_FORMATS.items():
+        for ext in info[0]:
+            existing_extensions[ext] = name
+
+    for extension in extensions:
+        if extension in existing_extensions:
+            msg = '%s is already registered for "%s"'
+            raise RegistryError(msg % (extension,
+                                       existing_extensions[extension]))
+
+    if not isinstance(function, collections.Callable):
+        raise TypeError('The registered function must be a callable')
+
+
+def register_unpack_format(name, extensions, function, extra_args=None,
+                           description=''):
+    """Registers an unpack format.
+
+    `name` is the name of the format. `extensions` is a list of extensions
+    corresponding to the format.
+
+    `function` is the callable that will be
+    used to unpack archives. The callable will receive archives to unpack.
+    If it's unable to handle an archive, it needs to raise a ReadError
+    exception.
+
+    If provided, `extra_args` is a sequence of
+    (name, value) tuples that will be passed as arguments to the callable.
+    description can be provided to describe the format, and will be returned
+    by the get_unpack_formats() function.
+    """
+    if extra_args is None:
+        extra_args = []
+    _check_unpack_options(extensions, function, extra_args)
+    _UNPACK_FORMATS[name] = extensions, function, extra_args, description
+
+def unregister_unpack_format(name):
+    """Removes the pack format from the registery."""
+    del _UNPACK_FORMATS[name]
+
+def _ensure_directory(path):
+    """Ensure that the parent directory of `path` exists"""
+    dirname = os.path.dirname(path)
+    if not os.path.isdir(dirname):
+        os.makedirs(dirname)
+
+def _unpack_zipfile(filename, extract_dir):
+    """Unpack zip `filename` to `extract_dir`
+    """
+    try:
+        import zipfile
+    except ImportError:
+        raise ReadError('zlib not supported, cannot unpack this archive.')
+
+    if not zipfile.is_zipfile(filename):
+        raise ReadError("%s is not a zip file" % filename)
+
+    zip = zipfile.ZipFile(filename)
+    try:
+        for info in zip.infolist():
+            name = info.filename
+
+            # don't extract absolute paths or ones with .. in them
+            if name.startswith('/') or '..' in name:
+                continue
+
+            target = os.path.join(extract_dir, *name.split('/'))
+            if not target:
+                continue
+
+            _ensure_directory(target)
+            if not name.endswith('/'):
+                # file
+                data = zip.read(info.filename)
+                f = open(target,'wb')
+                try:
+                    f.write(data)
+                finally:
+                    f.close()
+                    del data
+    finally:
+        zip.close()
+
+def _unpack_tarfile(filename, extract_dir):
+    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
+    """
+    try:
+        tarobj = tarfile.open(filename)
+    except tarfile.TarError:
+        raise ReadError(
+            "%s is not a compressed or uncompressed tar file" % filename)
+    try:
+        tarobj.extractall(extract_dir)
+    finally:
+        tarobj.close()
+
+_UNPACK_FORMATS = {
+    'gztar': (['.tar.gz', '.tgz'], _unpack_tarfile, [], "gzip'ed tar-file"),
+    'bztar': (['.bz2'], _unpack_tarfile, [], "bzip2'ed tar-file"),
+    'tar':   (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
+    'zip':   (['.zip'], _unpack_zipfile, [], "ZIP file")
+    }
+
+def _find_unpack_format(filename):
+    for name, info in _UNPACK_FORMATS.items():
+        for extension in info[0]:
+            if filename.endswith(extension):
+                return name
+    return None
+
+def unpack_archive(filename, extract_dir=None, format=None):
+    """Unpack an archive.
+
+    `filename` is the name of the archive.
+
+    `extract_dir` is the name of the target directory, where the archive
+    is unpacked. If not provided, the current working directory is used.
+
+    `format` is the archive format: one of "zip", "tar", or "gztar". Or any
+    other registered format. If not provided, unpack_archive will use the
+    filename extension and see if an unpacker was registered for that
+    extension.
+
+    In case none is found, a ValueError is raised.
+    """
+    if extract_dir is None:
+        extract_dir = os.getcwd()
+
+    if format is not None:
+        try:
+            format_info = _UNPACK_FORMATS[format]
+        except KeyError:
+            raise ValueError("Unknown unpack format '{0}'".format(format))
+
+        func = format_info[0]
+        func(filename, extract_dir, **dict(format_info[1]))
+    else:
+        # we need to look at the registered unpackers supported extensions
+        format = _find_unpack_format(filename)
+        if format is None:
+            raise ReadError("Unknown archive format '{0}'".format(filename))
+
+        func = _UNPACK_FORMATS[format][1]
+        kwargs = dict(_UNPACK_FORMATS[format][2])
+        func(filename, extract_dir, **kwargs)

Modified: python/branches/py3k/Lib/test/test_shutil.py
==============================================================================
--- python/branches/py3k/Lib/test/test_shutil.py	(original)
+++ python/branches/py3k/Lib/test/test_shutil.py	Wed Apr 28 19:51:36 2010
@@ -13,7 +13,9 @@
 from distutils.spawn import find_executable, spawn
 from shutil import (_make_tarball, _make_zipfile, make_archive,
                     register_archive_format, unregister_archive_format,
-                    get_archive_formats, Error)
+                    get_archive_formats, Error, unpack_archive,
+                    register_unpack_format, RegistryError,
+                    unregister_unpack_format, get_unpack_formats)
 import tarfile
 import warnings
 
@@ -538,6 +540,7 @@
                            owner='kjhkjhkjg', group='oihohoh')
         self.assertTrue(os.path.exists(res))
 
+
     @unittest.skipUnless(zlib, "Requires zlib")
     @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
     def test_tarfile_root_owner(self):
@@ -595,6 +598,58 @@
         formats = [name for name, params in get_archive_formats()]
         self.assertNotIn('xxx', formats)
 
+    def _compare_dirs(self, dir1, dir2):
+        # check that dir1 and dir2 are equivalent,
+        # return the diff
+        diff = []
+        for root, dirs, files in os.walk(dir1):
+            for file_ in files:
+                path = os.path.join(root, file_)
+                target_path = os.path.join(dir2, os.path.split(path)[-1])
+                if not os.path.exists(target_path):
+                    diff.append(file_)
+        return diff
+
+    @unittest.skipUnless(zlib, "Requires zlib")
+    def test_unpack_archive(self):
+
+        for format in ('tar', 'gztar', 'bztar', 'zip'):
+            tmpdir = self.mkdtemp()
+            base_dir, root_dir, base_name =  self._create_files()
+            tmpdir2 = self.mkdtemp()
+            filename = make_archive(base_name, format, root_dir, base_dir)
+
+            # let's try to unpack it now
+            unpack_archive(filename, tmpdir2)
+            diff = self._compare_dirs(tmpdir, tmpdir2)
+            self.assertEquals(diff, [])
+
+    def test_unpack_registery(self):
+
+        formats = get_unpack_formats()
+
+        def _boo(filename, extract_dir, extra):
+            self.assertEquals(extra, 1)
+            self.assertEquals(filename, 'stuff.boo')
+            self.assertEquals(extract_dir, 'xx')
+
+        register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
+        unpack_archive('stuff.boo', 'xx')
+
+        # trying to register a .boo unpacker again
+        self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
+                          ['.boo'], _boo)
+
+        # should work now
+        unregister_unpack_format('Boo')
+        register_unpack_format('Boo2', ['.boo'], _boo)
+        self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
+        self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
+
+        # let's leave a clean state
+        unregister_unpack_format('Boo2')
+        self.assertEquals(get_unpack_formats(), formats)
+
 
 class TestMove(unittest.TestCase):
 

Modified: python/branches/py3k/Misc/NEWS
==============================================================================
--- python/branches/py3k/Misc/NEWS	(original)
+++ python/branches/py3k/Misc/NEWS	Wed Apr 28 19:51:36 2010
@@ -339,6 +339,8 @@
 Library
 -------
 
+- Issue #8295: Added shutil.unpack_archive.
+
 - Issue #6312: Fixed http HEAD request when the transfer encoding is chunked.
   It should correctly return an empty response now.
 


More information about the Python-checkins mailing list