[Python-checkins] cpython: Issue #14082: shutil.copy2() now copies extended attributes, if possible.

antoine.pitrou python-checkins at python.org
Sat May 12 19:05:03 CEST 2012


http://hg.python.org/cpython/rev/85824b819bcb
changeset:   76883:85824b819bcb
parent:      76880:d4b559faefc4
user:        Antoine Pitrou <solipsis at pitrou.net>
date:        Sat May 12 19:02:01 2012 +0200
summary:
  Issue #14082: shutil.copy2() now copies extended attributes, if possible.
Patch by Hynek Schlawack.

files:
  Doc/library/shutil.rst  |   6 +-
  Lib/shutil.py           |  31 ++++++++++++
  Lib/test/support.py     |  29 +++++++++++
  Lib/test/test_os.py     |  20 +-------
  Lib/test/test_shutil.py |  74 +++++++++++++++++++++++++++++
  Misc/NEWS               |   3 +
  6 files changed, 141 insertions(+), 22 deletions(-)


diff --git a/Doc/library/shutil.rst b/Doc/library/shutil.rst
--- a/Doc/library/shutil.rst
+++ b/Doc/library/shutil.rst
@@ -102,14 +102,14 @@
 
 .. function:: copy2(src, dst[, symlinks=False])
 
-   Similar to :func:`shutil.copy`, but metadata is copied as well -- in fact,
-   this is just :func:`shutil.copy` followed by :func:`copystat`.  This is
+   Similar to :func:`shutil.copy`, but metadata is copied as well. This is
    similar to the Unix command :program:`cp -p`.  If *symlinks* is true,
    symbolic links won't be followed but recreated instead -- this resembles
    GNU's :program:`cp -P`.
 
    .. versionchanged:: 3.3
-      Added *symlinks* argument.
+      Added *symlinks* argument, try to copy extended file system attributes
+      too (currently Linux only).
 
 .. function:: ignore_patterns(\*patterns)
 
diff --git a/Lib/shutil.py b/Lib/shutil.py
--- a/Lib/shutil.py
+++ b/Lib/shutil.py
@@ -166,6 +166,36 @@
             else:
                 raise
 
+if hasattr(os, 'listxattr'):
+    def _copyxattr(src, dst, symlinks=False):
+        """Copy extended filesystem attributes from `src` to `dst`.
+
+        Overwrite existing attributes.
+
+        If the optional flag `symlinks` is set, symlinks won't be followed.
+
+        """
+        if symlinks:
+            listxattr = os.llistxattr
+            removexattr = os.lremovexattr
+            setxattr = os.lsetxattr
+            getxattr = os.lgetxattr
+        else:
+            listxattr = os.listxattr
+            removexattr = os.removexattr
+            setxattr = os.setxattr
+            getxattr = os.getxattr
+
+        for attr in listxattr(src):
+            try:
+                setxattr(dst, attr, getxattr(src, attr))
+            except OSError as e:
+                if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA):
+                    raise
+else:
+    def _copyxattr(*args, **kwargs):
+        pass
+
 def copy(src, dst, symlinks=False):
     """Copy data and mode bits ("cp src dst").
 
@@ -193,6 +223,7 @@
         dst = os.path.join(dst, os.path.basename(src))
     copyfile(src, dst, symlinks=symlinks)
     copystat(src, dst, symlinks=symlinks)
+    _copyxattr(src, dst, symlinks=symlinks)
 
 def ignore_patterns(*patterns):
     """Function that can be used as copytree() ignore parameter.
diff --git a/Lib/test/support.py b/Lib/test/support.py
--- a/Lib/test/support.py
+++ b/Lib/test/support.py
@@ -1696,6 +1696,35 @@
     msg = "Requires functional symlink implementation"
     return test if ok else unittest.skip(msg)(test)
 
+_can_xattr = None
+def can_xattr():
+    global _can_xattr
+    if _can_xattr is not None:
+        return _can_xattr
+    if not hasattr(os, "setxattr"):
+        can = False
+    else:
+        try:
+            with open(TESTFN, "wb") as fp:
+                try:
+                    os.fsetxattr(fp.fileno(), b"user.test", b"")
+                    # Kernels < 2.6.39 don't respect setxattr flags.
+                    kernel_version = platform.release()
+                    m = re.match("2.6.(\d{1,2})", kernel_version)
+                    can = m is None or int(m.group(1)) >= 39
+                except OSError:
+                    can = False
+        finally:
+            unlink(TESTFN)
+    _can_xattr = can
+    return can
+
+def skip_unless_xattr(test):
+    """Skip decorator for tests that require functional extended attributes"""
+    ok = can_xattr()
+    msg = "no non-broken extended attribute support"
+    return test if ok else unittest.skip(msg)(test)
+
 def patch(test_instance, object_to_patch, attr_name, new_value):
     """Override 'object_to_patch'.'attr_name' with 'new_value'.
 
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -1810,25 +1810,7 @@
                         raise
 
 
-def supports_extended_attributes():
-    if not hasattr(os, "setxattr"):
-        return False
-    try:
-        with open(support.TESTFN, "wb") as fp:
-            try:
-                os.fsetxattr(fp.fileno(), b"user.test", b"")
-            except OSError:
-                return False
-    finally:
-        support.unlink(support.TESTFN)
-    # Kernels < 2.6.39 don't respect setxattr flags.
-    kernel_version = platform.release()
-    m = re.match("2.6.(\d{1,2})", kernel_version)
-    return m is None or int(m.group(1)) >= 39
-
-
- at unittest.skipUnless(supports_extended_attributes(),
-                     "no non-broken extended attribute support")
+ at support.skip_unless_xattr
 class ExtendedAttributeTests(unittest.TestCase):
 
     def tearDown(self):
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py
--- a/Lib/test/test_shutil.py
+++ b/Lib/test/test_shutil.py
@@ -311,6 +311,67 @@
         finally:
             os.chflags = old_chflags
 
+    @support.skip_unless_xattr
+    def test_copyxattr(self):
+        tmp_dir = self.mkdtemp()
+        src = os.path.join(tmp_dir, 'foo')
+        write_file(src, 'foo')
+        dst = os.path.join(tmp_dir, 'bar')
+        write_file(dst, 'bar')
+
+        # no xattr == no problem
+        shutil._copyxattr(src, dst)
+        # common case
+        os.setxattr(src, 'user.foo', b'42')
+        os.setxattr(src, 'user.bar', b'43')
+        shutil._copyxattr(src, dst)
+        self.assertEqual(os.listxattr(src), os.listxattr(dst))
+        self.assertEqual(
+                os.getxattr(src, 'user.foo'),
+                os.getxattr(dst, 'user.foo'))
+        # check errors don't affect other attrs
+        os.remove(dst)
+        write_file(dst, 'bar')
+        os_error = OSError(errno.EPERM, 'EPERM')
+
+        def _raise_on_user_foo(fname, attr, val):
+            if attr == 'user.foo':
+                raise os_error
+            else:
+                orig_setxattr(fname, attr, val)
+        try:
+            orig_setxattr = os.setxattr
+            os.setxattr = _raise_on_user_foo
+            shutil._copyxattr(src, dst)
+            self.assertEqual(['user.bar'], os.listxattr(dst))
+        finally:
+            os.setxattr = orig_setxattr
+
+    @support.skip_unless_symlink
+    @support.skip_unless_xattr
+    @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
+                         'root privileges required')
+    def test_copyxattr_symlinks(self):
+        # On Linux, it's only possible to access non-user xattr for symlinks;
+        # which in turn require root privileges. This test should be expanded
+        # as soon as other platforms gain support for extended attributes.
+        tmp_dir = self.mkdtemp()
+        src = os.path.join(tmp_dir, 'foo')
+        src_link = os.path.join(tmp_dir, 'baz')
+        write_file(src, 'foo')
+        os.symlink(src, src_link)
+        os.setxattr(src, 'trusted.foo', b'42')
+        os.lsetxattr(src_link, 'trusted.foo', b'43')
+        dst = os.path.join(tmp_dir, 'bar')
+        dst_link = os.path.join(tmp_dir, 'qux')
+        write_file(dst, 'bar')
+        os.symlink(dst, dst_link)
+        shutil._copyxattr(src_link, dst_link, symlinks=True)
+        self.assertEqual(os.lgetxattr(dst_link, 'trusted.foo'), b'43')
+        self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
+        shutil._copyxattr(src_link, dst, symlinks=True)
+        self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
+
     @support.skip_unless_symlink
     def test_copy_symlinks(self):
         tmp_dir = self.mkdtemp()
@@ -369,6 +430,19 @@
         if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
             self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
 
+    @support.skip_unless_xattr
+    def test_copy2_xattr(self):
+        tmp_dir = self.mkdtemp()
+        src = os.path.join(tmp_dir, 'foo')
+        dst = os.path.join(tmp_dir, 'bar')
+        write_file(src, 'foo')
+        os.setxattr(src, 'user.foo', b'42')
+        shutil.copy2(src, dst)
+        self.assertEqual(
+                os.getxattr(src, 'user.foo'),
+                os.getxattr(dst, 'user.foo'))
+        os.remove(dst)
+
     @support.skip_unless_symlink
     def test_copyfile_symlinks(self):
         tmp_dir = self.mkdtemp()
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -23,6 +23,9 @@
 Library
 -------
 
+- Issue #14082: shutil.copy2() now copies extended attributes, if possible.
+  Patch by Hynek Schlawack.
+
 - Issue #13959: Make importlib.abc.FileLoader.load_module()/get_filename() and
   importlib.machinery.ExtensionFileLoader.load_module() have their single
   argument be optional. Allows for the replacement (and thus deprecation) of

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list