[Jython-checkins] jython: Update tempfile module and its test to lib-python/2.7.

jeff.allen jython-checkins at python.org
Thu Apr 9 09:28:33 CEST 2015


https://hg.python.org/jython/rev/f1af0477d30a
changeset:   7652:f1af0477d30a
user:        Jeff Allen <ja.py at farowl.co.uk>
date:        Tue Apr 07 20:37:18 2015 +0100
summary:
  Update tempfile module and its test to lib-python/2.7.

Selectively merge tempfile and test.test_tempfile from the lib-python
directory to Jython Lib, preserving Jython specialisations.

files:
  Lib/tempfile.py           |   41 ++-
  Lib/test/test_tempfile.py |  270 +++++++++++++++++++------
  2 files changed, 225 insertions(+), 86 deletions(-)


diff --git a/Lib/tempfile.py b/Lib/tempfile.py
--- a/Lib/tempfile.py
+++ b/Lib/tempfile.py
@@ -29,6 +29,7 @@
 
 # Imports.
 
+import io as _io
 import os as _os
 import errno as _errno
 from random import Random as _Random
@@ -170,7 +171,7 @@
     if os_name == 'riscos':
         dirname = _os.getenv('Wimp$ScrapDir')
         if dirname: dirlist.append(dirname)
-    elif os_name == "nt":
+    elif os_name == 'nt':
         dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
     else:
         dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
@@ -204,15 +205,18 @@
             name = namer.next()
             filename = _os.path.join(dir, name)
             try:
-                fd = _os.open(filename, flags, 0600)
-                fp = _os.fdopen(fd, 'w')
-                fp.write('blat')
-                fp.close()
-                _os.unlink(filename)
-                del fp, fd
+                fd = _os.open(filename, flags, 0o600)
+                try:
+                    try:
+                        with _io.open(fd, 'wb', closefd=False) as fp:
+                            fp.write(b'blat')
+                    finally:
+                        _os.close(fd)
+                finally:
+                    _os.unlink(filename)
                 return dir
-            except (OSError, IOError), e:
-                if e[0] != _errno.EEXIST:
+            except (OSError, IOError) as e:
+                if e.args[0] != _errno.EEXIST:
                     break # no point trying more names in this directory
                 pass
     raise IOError, (_errno.ENOENT,
@@ -558,7 +562,7 @@
         return self._file.closed
 
     @property
-    def encoding(self):
+    def encoding(self): # Jython not CPython
         return self._file.encoding
 
     def fileno(self):
@@ -573,14 +577,20 @@
 
     @property
     def mode(self):
-        return self._file.mode
+        try:
+            return self._file.mode
+        except AttributeError:
+            return self._TemporaryFileArgs[0]
 
     @property
     def name(self):
-        return self._file.name
+        try:
+            return self._file.name
+        except AttributeError:
+            return None
 
     @property
-    def newlines(self):
+    def newlines(self): # Jython not CPython
         return self._file.newlines
 
     def next(self):
@@ -621,4 +631,7 @@
         return rv
 
     def xreadlines(self, *args):
-        return self._file.xreadlines(*args)
+        try:
+            return self._file.xreadlines(*args)
+        except AttributeError:
+            return iter(self._file.readlines(*args))
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py
--- a/Lib/test/test_tempfile.py
+++ b/Lib/test/test_tempfile.py
@@ -1,14 +1,16 @@
-# From Python 2.5.1
 # tempfile.py unit tests.
 import tempfile
+import errno
+import io
 import os
+import signal
+import shutil
 import sys
 import re
-import errno
 import warnings
 
 import unittest
-from test import test_support
+from test import test_support as support
 
 warnings.filterwarnings("ignore",
                         category=RuntimeWarning,
@@ -62,7 +64,7 @@
                          "file '%s' does not end with '%s'" % (nbase, suf))
 
         nbase = nbase[len(pre):len(nbase)-len(suf)]
-        self.assert_(self.str_check.match(nbase),
+        self.assertTrue(self.str_check.match(nbase),
                      "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/"
                      % nbase)
 
@@ -91,7 +93,7 @@
         for key in dict:
             if key[0] != '_' and key not in expected:
                 unexp.append(key)
-        self.failUnless(len(unexp) == 0,
+        self.assertTrue(len(unexp) == 0,
                         "unexpected keys: %s" % unexp)
 
 test_classes.append(test_exports)
@@ -116,7 +118,7 @@
         for i in xrange(TEST_FILES):
             s = r.next()
             self.nameCheck(s, '', '', '')
-            self.failIf(s in dict)
+            self.assertNotIn(s, dict)
             dict[s] = 1
 
     def test_supports_iter(self):
@@ -132,6 +134,37 @@
         except:
             self.failOnException("iteration")
 
+    @unittest.skipUnless(hasattr(os, 'fork'),
+        "os.fork is required for this test")
+    def test_process_awareness(self):
+        # ensure that the random source differs between
+        # child and parent.
+        read_fd, write_fd = os.pipe()
+        pid = None
+        try:
+            pid = os.fork()
+            if not pid:
+                os.close(read_fd)
+                os.write(write_fd, next(self.r).encode("ascii"))
+                os.close(write_fd)
+                # bypass the normal exit handlers- leave those to
+                # the parent.
+                os._exit(0)
+            parent_value = next(self.r)
+            child_value = os.read(read_fd, len(parent_value)).decode("ascii")
+        finally:
+            if pid:
+                # best effort to ensure the process can't bleed out
+                # via any bugs above
+                try:
+                    os.kill(pid, signal.SIGKILL)
+                except EnvironmentError:
+                    pass
+            os.close(read_fd)
+            os.close(write_fd)
+        self.assertNotEqual(child_value, parent_value)
+
+
 test_classes.append(test__RandomNameSequence)
 
 
@@ -143,42 +176,84 @@
 
         cand = tempfile._candidate_tempdir_list()
 
-        self.failIf(len(cand) == 0)
+        self.assertFalse(len(cand) == 0)
         for c in cand:
-            self.assert_(isinstance(c, basestring),
-                         "%s is not a string" % c)
+            self.assertIsInstance(c, basestring)
 
     def test_wanted_dirs(self):
         # _candidate_tempdir_list contains the expected directories
 
         # Make sure the interesting environment variables are all set.
-        with test_support.EnvironmentVarGuard() as env:
+        with support.EnvironmentVarGuard() as env:
             for envname in 'TMPDIR', 'TEMP', 'TMP':
                 dirname = os.getenv(envname)
                 if not dirname:
-                    env.set(envname, os.path.abspath(envname))
+                    env[envname] = os.path.abspath(envname)
 
             cand = tempfile._candidate_tempdir_list()
 
             for envname in 'TMPDIR', 'TEMP', 'TMP':
                 dirname = os.getenv(envname)
                 if not dirname: raise ValueError
-                self.assert_(dirname in cand)
+                self.assertIn(dirname, cand)
 
             try:
                 dirname = os.getcwd()
             except (AttributeError, os.error):
                 dirname = os.curdir
 
-            self.assert_(dirname in cand)
+            self.assertIn(dirname, cand)
 
             # Not practical to try to verify the presence of OS-specific
             # paths in this list.
 
 test_classes.append(test__candidate_tempdir_list)
 
+# We test _get_default_tempdir some more by testing gettempdir.
 
-# We test _get_default_tempdir by testing gettempdir.
+class TestGetDefaultTempdir(TC):
+    """Test _get_default_tempdir()."""
+
+    def test_no_files_left_behind(self):
+        # use a private empty directory
+        our_temp_directory = tempfile.mkdtemp()
+        try:
+            # force _get_default_tempdir() to consider our empty directory
+            def our_candidate_list():
+                return [our_temp_directory]
+
+            with support.swap_attr(tempfile, "_candidate_tempdir_list",
+                                   our_candidate_list):
+                # verify our directory is empty after _get_default_tempdir()
+                tempfile._get_default_tempdir()
+                self.assertEqual(os.listdir(our_temp_directory), [])
+
+                def raise_OSError(*args, **kwargs):
+                    raise OSError(-1)
+
+                with support.swap_attr(io, "open", raise_OSError):
+                    # test again with failing io.open()
+                    with self.assertRaises(IOError) as cm:
+                        tempfile._get_default_tempdir()
+                    self.assertEqual(cm.exception.errno, errno.ENOENT)
+                    self.assertEqual(os.listdir(our_temp_directory), [])
+
+                open = io.open
+                def bad_writer(*args, **kwargs):
+                    fp = open(*args, **kwargs)
+                    fp.write = raise_OSError
+                    return fp
+
+                with support.swap_attr(io, "open", bad_writer):
+                    # test again with failing write()
+                    with self.assertRaises(IOError) as cm:
+                        tempfile._get_default_tempdir()
+                    self.assertEqual(cm.exception.errno, errno.ENOENT)
+                    self.assertEqual(os.listdir(our_temp_directory), [])
+        finally:
+            shutil.rmtree(our_temp_directory)
+
+test_classes.append(TestGetDefaultTempdir)
 
 
 class test__get_candidate_names(TC):
@@ -187,14 +262,14 @@
     def test_retval(self):
         # _get_candidate_names returns a _RandomNameSequence object
         obj = tempfile._get_candidate_names()
-        self.assert_(isinstance(obj, tempfile._RandomNameSequence))
+        self.assertIsInstance(obj, tempfile._RandomNameSequence)
 
     def test_same_thing(self):
         # _get_candidate_names always returns the same object
         a = tempfile._get_candidate_names()
         b = tempfile._get_candidate_names()
 
-        self.assert_(a is b)
+        self.assertTrue(a is b)
 
 test_classes.append(test__get_candidate_names)
 
@@ -268,7 +343,7 @@
     def _test_file_mode(self):
         # _mkstemp_inner creates files with the proper mode
         if not has_stat:
-            return            # ugh, can't use TestSkipped.
+            return            # ugh, can't use SkipTest.
 
         file = self.do_create()
         mode = stat.S_IMODE(os.stat(file.name).st_mode)
@@ -283,9 +358,9 @@
     def test_noinherit(self):
         # _mkstemp_inner file handles are not inherited by child processes
         if not has_spawnl:
-            return            # ugh, can't use TestSkipped.
+            return            # ugh, can't use SkipTest.
 
-        if test_support.verbose:
+        if support.verbose:
             v="v"
         else:
             v="q"
@@ -314,14 +389,14 @@
             decorated = sys.executable
 
         retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
-        self.failIf(retval < 0,
+        self.assertFalse(retval < 0,
                     "child process caught fatal signal %d" % -retval)
-        self.failIf(retval > 0, "child process reports failure %d"%retval)
+        self.assertFalse(retval > 0, "child process reports failure %d"%retval)
 
     def test_textmode(self):
         # _mkstemp_inner can create files in text mode
         if not has_textmode:
-            return            # ugh, can't use TestSkipped.
+            return            # ugh, can't use SkipTest.
 
         self.do_create(bin=0).write("blat\n")
         # XXX should test that the file really is a text file
@@ -336,8 +411,8 @@
         # gettempprefix returns a nonempty prefix string
         p = tempfile.gettempprefix()
 
-        self.assert_(isinstance(p, basestring))
-        self.assert_(len(p) > 0)
+        self.assertIsInstance(p, basestring)
+        self.assertTrue(len(p) > 0)
 
     def test_usable_template(self):
         # gettempprefix returns a usable prefix string
@@ -368,9 +443,9 @@
         # gettempdir returns a directory which exists
 
         dir = tempfile.gettempdir()
-        self.assert_(os.path.isabs(dir) or dir == os.curdir,
+        self.assertTrue(os.path.isabs(dir) or dir == os.curdir,
                      "%s is not an absolute path" % dir)
-        self.assert_(os.path.isdir(dir),
+        self.assertTrue(os.path.isdir(dir),
                      "%s is not a directory" % dir)
 
     def test_directory_writable(self):
@@ -391,7 +466,7 @@
         a = tempfile.gettempdir()
         b = tempfile.gettempdir()
 
-        self.assert_(a is b)
+        self.assertTrue(a is b)
 
     def test_case_sensitive(self):
         # gettempdir should not flatten its case
@@ -400,13 +475,13 @@
         case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
         _tempdir, tempfile.tempdir = tempfile.tempdir, None
         try:
-            with test_support.EnvironmentVarGuard() as env:
+            with support.EnvironmentVarGuard() as env:
                 # Fake the first env var which is checked as a candidate
                 env["TMPDIR"] = case_sensitive_tempdir
                 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
         finally:
             tempfile.tempdir = _tempdir
-            test_support.rmdir(case_sensitive_tempdir)
+            support.rmdir(case_sensitive_tempdir)
 
 
 test_classes.append(test_gettempdir)
@@ -502,7 +577,7 @@
         # mkdtemp creates directories with the proper mode
         if not has_stat:
             return            # ugh, can't use TestSkipped.
-        if test_support.is_jython and not os._native_posix:
+        if support.is_jython and not os._native_posix:
             # Java doesn't support stating files for permissions
             return
 
@@ -512,7 +587,7 @@
             mode &= 0777 # Mask off sticky bits inherited from /tmp
             expected = 0700
             if (sys.platform in ('win32', 'os2emx', 'mac') or
-                test_support.is_jython and os._name == 'nt'):
+                support.is_jython and os._name == 'nt'):
                 # There's no distinction among 'user', 'group' and 'world';
                 # replicate the 'user' bits.
                 user = expected >> 6
@@ -618,7 +693,7 @@
     def test_creates_named(self):
         # NamedTemporaryFile creates files with names
         f = tempfile.NamedTemporaryFile()
-        self.failUnless(os.path.exists(f.name),
+        self.assertTrue(os.path.exists(f.name),
                         "NamedTemporaryFile %s does not exist" % f.name)
 
     def test_del_on_close(self):
@@ -628,7 +703,7 @@
             f = tempfile.NamedTemporaryFile(dir=dir)
             f.write('blat')
             f.close()
-            self.failIf(os.path.exists(f.name),
+            self.assertFalse(os.path.exists(f.name),
                         "NamedTemporaryFile %s exists after close" % f.name)
         finally:
             os.rmdir(dir)
@@ -642,7 +717,7 @@
             tmp = f.name
             f.write('blat')
             f.close()
-            self.failUnless(os.path.exists(f.name),
+            self.assertTrue(os.path.exists(f.name),
                         "NamedTemporaryFile %s missing after close" % f.name)
         finally:
             if tmp is not None:
@@ -663,12 +738,12 @@
     def test_context_manager(self):
         # A NamedTemporaryFile can be used as a context manager
         with tempfile.NamedTemporaryFile() as f:
-            self.failUnless(os.path.exists(f.name))
-        self.failIf(os.path.exists(f.name))
+            self.assertTrue(os.path.exists(f.name))
+        self.assertFalse(os.path.exists(f.name))
         def use_closed():
             with f:
                 pass
-        self.failUnlessRaises(ValueError, use_closed)
+        self.assertRaises(ValueError, use_closed)
 
     # How to test the mode and bufsize parameters?
 
@@ -691,21 +766,21 @@
     def test_basic(self):
         # SpooledTemporaryFile can create files
         f = self.do_create()
-        self.failIf(f._rolled)
+        self.assertFalse(f._rolled)
         f = self.do_create(max_size=100, pre="a", suf=".txt")
-        self.failIf(f._rolled)
+        self.assertFalse(f._rolled)
 
     def test_del_on_close(self):
         # A SpooledTemporaryFile is deleted when closed
         dir = tempfile.mkdtemp()
         try:
             f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
-            self.failIf(f._rolled)
+            self.assertFalse(f._rolled)
             f.write('blat ' * 5)
-            self.failUnless(f._rolled)
+            self.assertTrue(f._rolled)
             filename = f.name
             f.close()
-            self.failIf(os.path.exists(filename),
+            self.assertFalse(os.path.exists(filename),
                         "SpooledTemporaryFile %s exists after close" % filename)
         finally:
             os.rmdir(dir)
@@ -713,46 +788,74 @@
     def test_rewrite_small(self):
         # A SpooledTemporaryFile can be written to multiple within the max_size
         f = self.do_create(max_size=30)
-        self.failIf(f._rolled)
+        self.assertFalse(f._rolled)
         for i in range(5):
             f.seek(0, 0)
             f.write('x' * 20)
-        self.failIf(f._rolled)
+        self.assertFalse(f._rolled)
 
     def test_write_sequential(self):
         # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
         # over afterward
         f = self.do_create(max_size=30)
-        self.failIf(f._rolled)
+        self.assertFalse(f._rolled)
         f.write('x' * 20)
-        self.failIf(f._rolled)
+        self.assertFalse(f._rolled)
         f.write('x' * 10)
-        self.failIf(f._rolled)
+        self.assertFalse(f._rolled)
         f.write('x')
-        self.failUnless(f._rolled)
+        self.assertTrue(f._rolled)
+
+    def test_writelines(self):
+        # Verify writelines with a SpooledTemporaryFile
+        f = self.do_create()
+        f.writelines((b'x', b'y', b'z'))
+        f.seek(0)
+        buf = f.read()
+        self.assertEqual(buf, b'xyz')
+
+    def test_writelines_sequential(self):
+        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
+        # over afterward
+        f = self.do_create(max_size=35)
+        f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
+        self.assertFalse(f._rolled)
+        f.write(b'x')
+        self.assertTrue(f._rolled)
+
+    def test_xreadlines(self):
+        f = self.do_create(max_size=20)
+        f.write(b'abc\n' * 5)
+        f.seek(0)
+        self.assertFalse(f._rolled)
+        self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5)
+        f.write(b'x\ny')
+        self.assertTrue(f._rolled)
+        f.seek(0)
+        self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5 + [b'x\n', b'y'])
 
     def test_sparse(self):
         # A SpooledTemporaryFile that is written late in the file will extend
         # when that occurs
         f = self.do_create(max_size=30)
-        self.failIf(f._rolled)
+        self.assertFalse(f._rolled)
         f.seek(100, 0)
-        self.failIf(f._rolled)
+        self.assertFalse(f._rolled)
         f.write('x')
-        self.failUnless(f._rolled)
+        self.assertTrue(f._rolled)
 
     def test_fileno(self):
         # A SpooledTemporaryFile should roll over to a real file on fileno()
         f = self.do_create(max_size=30)
-        self.failIf(f._rolled)
-        self.failUnless(f.fileno() > 0)
-        self.failUnless(f._rolled)
+        self.assertFalse(f._rolled)
+        self.assertTrue(f.fileno() > 0)
+        self.assertTrue(f._rolled)
 
     def test_multiple_close_before_rollover(self):
         # A SpooledTemporaryFile can be closed many times without error
         f = tempfile.SpooledTemporaryFile()
         f.write('abc\n')
-        self.failIf(f._rolled)
+        self.assertFalse(f._rolled)
         f.close()
         try:
             f.close()
@@ -764,7 +867,7 @@
         # A SpooledTemporaryFile can be closed many times without error
         f = tempfile.SpooledTemporaryFile(max_size=1)
         f.write('abc\n')
-        self.failUnless(f._rolled)
+        self.assertTrue(f._rolled)
         f.close()
         try:
             f.close()
@@ -784,46 +887,69 @@
         write("a" * 35)
         write("b" * 35)
         seek(0, 0)
-        self.failUnless(read(70) == 'a'*35 + 'b'*35)
+        self.assertTrue(read(70) == 'a'*35 + 'b'*35)
+
+    def test_properties(self):
+        f = tempfile.SpooledTemporaryFile(max_size=10)
+        f.write(b'x' * 10)
+        self.assertFalse(f._rolled)
+        self.assertEqual(f.mode, 'w+b')
+        self.assertIsNone(f.name)
+        # Jython SpooledTemporaryFile has these properties:
+        if not support.is_jython:
+            with self.assertRaises(AttributeError):
+                f.newlines
+            with self.assertRaises(AttributeError):
+                f.encoding
+
+        f.write(b'x')
+        self.assertTrue(f._rolled)
+        self.assertEqual(f.mode, 'w+b')
+        self.assertIsNotNone(f.name)
+        if not support.is_jython:
+            with self.assertRaises(AttributeError):
+                f.newlines
+            with self.assertRaises(AttributeError):
+                f.encoding
 
     def test_context_manager_before_rollover(self):
         # A SpooledTemporaryFile can be used as a context manager
         with tempfile.SpooledTemporaryFile(max_size=1) as f:
-            self.failIf(f._rolled)
-            self.failIf(f.closed)
-        self.failUnless(f.closed)
+            self.assertFalse(f._rolled)
+            self.assertFalse(f.closed)
+        self.assertTrue(f.closed)
         def use_closed():
             with f:
                 pass
-        self.failUnlessRaises(ValueError, use_closed)
+        self.assertRaises(ValueError, use_closed)
 
     def test_context_manager_during_rollover(self):
         # A SpooledTemporaryFile can be used as a context manager
         with tempfile.SpooledTemporaryFile(max_size=1) as f:
-            self.failIf(f._rolled)
+            self.assertFalse(f._rolled)
             f.write('abc\n')
             f.flush()
-            self.failUnless(f._rolled)
-            self.failIf(f.closed)
-        self.failUnless(f.closed)
+            self.assertTrue(f._rolled)
+            self.assertFalse(f.closed)
+        self.assertTrue(f.closed)
         def use_closed():
             with f:
                 pass
-        self.failUnlessRaises(ValueError, use_closed)
+        self.assertRaises(ValueError, use_closed)
 
     def test_context_manager_after_rollover(self):
         # A SpooledTemporaryFile can be used as a context manager
         f = tempfile.SpooledTemporaryFile(max_size=1)
         f.write('abc\n')
         f.flush()
-        self.failUnless(f._rolled)
+        self.assertTrue(f._rolled)
         with f:
-            self.failIf(f.closed)
-        self.failUnless(f.closed)
+            self.assertFalse(f.closed)
+        self.assertTrue(f.closed)
         def use_closed():
             with f:
                 pass
-        self.failUnlessRaises(ValueError, use_closed)
+        self.assertRaises(ValueError, use_closed)
 
 
 test_classes.append(test_SpooledTemporaryFile)
@@ -875,11 +1001,11 @@
     test_classes.append(test_TemporaryFile)
 
 def test_main():
-    test_support.run_unittest(*test_classes)
+    support.run_unittest(*test_classes)
 
 if __name__ == "__main__":
     test_main()
-    if test_support.is_jython:
+    if support.is_jython:
         # XXX: Nudge Java's GC in an attempt to trigger any temp file's
         # __del__ (cause them to be deleted) that hasn't been called
         import gc

-- 
Repository URL: https://hg.python.org/jython


More information about the Jython-checkins mailing list