[Python-checkins] cpython: Enhance and modernize test_os

victor.stinner python-checkins at python.org
Thu Mar 24 12:13:58 EDT 2016


https://hg.python.org/cpython/rev/a83a786c31b5
changeset:   100718:a83a786c31b5
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Thu Mar 24 17:12:55 2016 +0100
summary:
  Enhance and modernize test_os

* add create_file() helper function
* create files using "x" mode instead of "w" to detect when a previous test
  forget to remove a file
* open file for writing in unbuferred mode (buffering=0)
* replace "try/finally: unlink" with self.addCleanup(support.unlink)
* register unlink cleanup function *before* creating new files

files:
  Lib/test/test_os.py |  157 +++++++++++++++----------------
  1 files changed, 78 insertions(+), 79 deletions(-)


diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -99,6 +99,11 @@
         yield
 
 
+def create_file(filename, content=b'content'):
+    with open(filename, "xb", 0) as fp:
+        fp.write(content)
+
+
 # Tests creating TESTFN
 class FileTests(unittest.TestCase):
     def setUp(self):
@@ -157,9 +162,8 @@
                          "needs INT_MAX < PY_SSIZE_T_MAX")
     @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
     def test_large_read(self, size):
-        with open(support.TESTFN, "wb") as fp:
-            fp.write(b'test')
         self.addCleanup(support.unlink, support.TESTFN)
+        create_file(support.TESTFN, b'test')
 
         # Issue #21932: Make sure that os.read() does not raise an
         # OverflowError for size larger than INT_MAX
@@ -216,11 +220,12 @@
 
     def test_replace(self):
         TESTFN2 = support.TESTFN + ".2"
-        with open(support.TESTFN, 'w') as f:
-            f.write("1")
-        with open(TESTFN2, 'w') as f:
-            f.write("2")
-        self.addCleanup(os.unlink, TESTFN2)
+        self.addCleanup(support.unlink, support.TESTFN)
+        self.addCleanup(support.unlink, TESTFN2)
+
+        create_file(support.TESTFN, b"1")
+        create_file(TESTFN2, b"2")
+
         os.replace(support.TESTFN, TESTFN2)
         self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
         with open(TESTFN2, 'r') as f:
@@ -245,8 +250,7 @@
     def setUp(self):
         self.fname = support.TESTFN
         self.addCleanup(support.unlink, self.fname)
-        with open(self.fname, 'wb') as fp:
-            fp.write(b"ABC")
+        create_file(self.fname, b"ABC")
 
     @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
     def check_stat_attributes(self, fname):
@@ -455,8 +459,7 @@
 
         self.addCleanup(support.rmtree, self.dirname)
         os.mkdir(self.dirname)
-        with open(self.fname, 'wb') as fp:
-            fp.write(b"ABC")
+        create_file(self.fname)
 
         def restore_float_times(state):
             with ignore_deprecation_warnings('stat_float_times'):
@@ -555,7 +558,7 @@
                          "fd support for utime required for this test.")
     def test_utime_fd(self):
         def set_time(filename, ns):
-            with open(filename, 'wb') as fp:
+            with open(filename, 'wb', 0) as fp:
                 # use a file descriptor to test futimens(timespec)
                 # or futimes(timeval)
                 os.utime(fp.fileno(), ns=ns)
@@ -1213,8 +1216,7 @@
         os.mkdir(dira)
         dirb = os.path.join(dira, 'dirb')
         os.mkdir(dirb)
-        with open(os.path.join(dira, 'file.txt'), 'w') as f:
-            f.write('text')
+        create_file(os.path.join(dira, 'file.txt'))
         os.removedirs(dirb)
         self.assertFalse(os.path.exists(dirb))
         self.assertTrue(os.path.exists(dira))
@@ -1225,8 +1227,7 @@
         os.mkdir(dira)
         dirb = os.path.join(dira, 'dirb')
         os.mkdir(dirb)
-        with open(os.path.join(dirb, 'file.txt'), 'w') as f:
-            f.write('text')
+        create_file(os.path.join(dirb, 'file.txt'))
         with self.assertRaises(OSError):
             os.removedirs(dirb)
         self.assertTrue(os.path.exists(dirb))
@@ -1236,7 +1237,7 @@
 
 class DevNullTests(unittest.TestCase):
     def test_devnull(self):
-        with open(os.devnull, 'wb') as f:
+        with open(os.devnull, 'wb', 0) as f:
             f.write(b'hello')
             f.close()
         with open(os.devnull, 'rb') as f:
@@ -1323,9 +1324,9 @@
     def test_urandom_fd_reopened(self):
         # Issue #21207: urandom() should detect its fd to /dev/urandom
         # changed to something else, and reopen it.
-        with open(support.TESTFN, 'wb') as f:
-            f.write(b"x" * 256)
-        self.addCleanup(os.unlink, support.TESTFN)
+        self.addCleanup(support.unlink, support.TESTFN)
+        create_file(support.TESTFN, b"x" * 256)
+
         code = """if 1:
             import os
             import sys
@@ -1464,12 +1465,10 @@
         self.assertRaises(OSError, os.chdir, support.TESTFN)
 
     def test_mkdir(self):
-        f = open(support.TESTFN, "w")
-        try:
+        self.addCleanup(support.unlink, support.TESTFN)
+
+        with open(support.TESTFN, "w") as f:
             self.assertRaises(OSError, os.mkdir, support.TESTFN)
-        finally:
-            f.close()
-            os.unlink(support.TESTFN)
 
     def test_utime(self):
         self.assertRaises(OSError, os.utime, support.TESTFN, None)
@@ -1988,42 +1987,36 @@
         level1 = os.path.abspath(support.TESTFN)
         level2 = os.path.join(level1, "level2")
         level3 = os.path.join(level2, "level3")
+        self.addCleanup(support.rmtree, level1)
+
+        os.mkdir(level1)
+        os.mkdir(level2)
+        os.mkdir(level3)
+
+        file1 = os.path.abspath(os.path.join(level1, "file1"))
+        create_file(file1)
+
+        orig_dir = os.getcwd()
         try:
-            os.mkdir(level1)
-            os.mkdir(level2)
-            os.mkdir(level3)
-
-            file1 = os.path.abspath(os.path.join(level1, "file1"))
-
-            with open(file1, "w") as f:
-                f.write("file1")
-
-            orig_dir = os.getcwd()
-            try:
-                os.chdir(level2)
-                link = os.path.join(level2, "link")
-                os.symlink(os.path.relpath(file1), "link")
-                self.assertIn("link", os.listdir(os.getcwd()))
-
-                # Check os.stat calls from the same dir as the link
-                self.assertEqual(os.stat(file1), os.stat("link"))
-
-                # Check os.stat calls from a dir below the link
-                os.chdir(level1)
-                self.assertEqual(os.stat(file1),
-                                 os.stat(os.path.relpath(link)))
-
-                # Check os.stat calls from a dir above the link
-                os.chdir(level3)
-                self.assertEqual(os.stat(file1),
-                                 os.stat(os.path.relpath(link)))
-            finally:
-                os.chdir(orig_dir)
-        except OSError as err:
-            self.fail(err)
+            os.chdir(level2)
+            link = os.path.join(level2, "link")
+            os.symlink(os.path.relpath(file1), "link")
+            self.assertIn("link", os.listdir(os.getcwd()))
+
+            # Check os.stat calls from the same dir as the link
+            self.assertEqual(os.stat(file1), os.stat("link"))
+
+            # Check os.stat calls from a dir below the link
+            os.chdir(level1)
+            self.assertEqual(os.stat(file1),
+                             os.stat(os.path.relpath(link)))
+
+            # Check os.stat calls from a dir above the link
+            os.chdir(level3)
+            self.assertEqual(os.stat(file1),
+                             os.stat(os.path.relpath(link)))
         finally:
-            os.remove(file1)
-            shutil.rmtree(level1)
+            os.chdir(orig_dir)
 
 
 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
@@ -2159,8 +2152,8 @@
         try:
             new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
             if base >= 19 and new_prio <= 19:
-                raise unittest.SkipTest(
-      "unable to reliably test setpriority at current nice level of %s" % base)
+                raise unittest.SkipTest("unable to reliably test setpriority "
+                                        "at current nice level of %s" % base)
             else:
                 self.assertEqual(new_prio, base + 1)
         finally:
@@ -2270,8 +2263,7 @@
     @classmethod
     def setUpClass(cls):
         cls.key = support.threading_setup()
-        with open(support.TESTFN, "wb") as f:
-            f.write(cls.DATA)
+        create_file(support.TESTFN, cls.DATA)
 
     @classmethod
     def tearDownClass(cls):
@@ -2421,10 +2413,11 @@
     def test_trailers(self):
         TESTFN2 = support.TESTFN + "2"
         file_data = b"abcdef"
-        with open(TESTFN2, 'wb') as f:
-            f.write(file_data)
-        with open(TESTFN2, 'rb')as f:
-            self.addCleanup(os.remove, TESTFN2)
+
+        self.addCleanup(support.unlink, TESTFN2)
+        create_file(TESTFN2, file_data)
+
+        with open(TESTFN2, 'rb') as f:
             os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
                         trailers=[b"1234"])
             self.client.close()
@@ -2447,8 +2440,9 @@
 def supports_extended_attributes():
     if not hasattr(os, "setxattr"):
         return False
+
     try:
-        with open(support.TESTFN, "wb") as fp:
+        with open(support.TESTFN, "xb", 0) as fp:
             try:
                 os.setxattr(fp.fileno(), b"user.test", b"")
             except OSError:
@@ -2465,17 +2459,18 @@
 @support.requires_linux_version(2, 6, 39)
 class ExtendedAttributeTests(unittest.TestCase):
 
-    def tearDown(self):
-        support.unlink(support.TESTFN)
-
     def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
         fn = support.TESTFN
-        open(fn, "wb").close()
+        self.addCleanup(support.unlink, fn)
+        create_file(fn)
+
         with self.assertRaises(OSError) as cm:
             getxattr(fn, s("user.test"), **kwargs)
         self.assertEqual(cm.exception.errno, errno.ENODATA)
+
         init_xattr = listxattr(fn)
         self.assertIsInstance(init_xattr, list)
+
         setxattr(fn, s("user.test"), b"", **kwargs)
         xattr = set(init_xattr)
         xattr.add("user.test")
@@ -2483,19 +2478,24 @@
         self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
         setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
         self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
+
         with self.assertRaises(OSError) as cm:
             setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
         self.assertEqual(cm.exception.errno, errno.EEXIST)
+
         with self.assertRaises(OSError) as cm:
             setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
         self.assertEqual(cm.exception.errno, errno.ENODATA)
+
         setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
         xattr.add("user.test2")
         self.assertEqual(set(listxattr(fn)), xattr)
         removexattr(fn, s("user.test"), **kwargs)
+
         with self.assertRaises(OSError) as cm:
             getxattr(fn, s("user.test"), **kwargs)
         self.assertEqual(cm.exception.errno, errno.ENODATA)
+
         xattr.remove("user.test")
         self.assertEqual(set(listxattr(fn)), xattr)
         self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
@@ -2508,11 +2508,11 @@
         self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
 
     def _check_xattrs(self, *args, **kwargs):
-        def make_bytes(s):
-            return bytes(s, "ascii")
         self._check_xattrs_str(str, *args, **kwargs)
         support.unlink(support.TESTFN)
-        self._check_xattrs_str(make_bytes, *args, **kwargs)
+
+        self._check_xattrs_str(os.fsencode, *args, **kwargs)
+        support.unlink(support.TESTFN)
 
     def test_simple(self):
         self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
@@ -2527,10 +2527,10 @@
             with open(path, "rb") as fp:
                 return os.getxattr(fp.fileno(), *args)
         def setxattr(path, *args):
-            with open(path, "wb") as fp:
+            with open(path, "wb", 0) as fp:
                 os.setxattr(fp.fileno(), *args)
         def removexattr(path, *args):
-            with open(path, "wb") as fp:
+            with open(path, "wb", 0) as fp:
                 os.removexattr(fp.fileno(), *args)
         def listxattr(path, *args):
             with open(path, "rb") as fp:
@@ -2844,8 +2844,7 @@
 
     def create_file(self, name="file.txt"):
         filename = os.path.join(self.path, name)
-        with open(filename, "wb") as fp:
-            fp.write(b'python')
+        create_file(filename, b'python')
         return filename
 
     def get_entries(self, names):

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list