[Jython-checkins] jython: Update tempfile module and its test to lib-python/2.7.
jeff.allen
jython-checkins at python.org
Thu Apr 9 09:28:33 CEST 2015
https://hg.python.org/jython/rev/f1af0477d30a
changeset: 7652:f1af0477d30a
user: Jeff Allen <ja.py at farowl.co.uk>
date: Tue Apr 07 20:37:18 2015 +0100
summary:
Update tempfile module and its test to lib-python/2.7.
Selectively merge tempfile and test.test_tempfile from the lib-python
directory to Jython Lib, preserving Jython specialisations.
files:
Lib/tempfile.py | 41 ++-
Lib/test/test_tempfile.py | 270 +++++++++++++++++++------
2 files changed, 225 insertions(+), 86 deletions(-)
diff --git a/Lib/tempfile.py b/Lib/tempfile.py
--- a/Lib/tempfile.py
+++ b/Lib/tempfile.py
@@ -29,6 +29,7 @@
# Imports.
+import io as _io
import os as _os
import errno as _errno
from random import Random as _Random
@@ -170,7 +171,7 @@
if os_name == 'riscos':
dirname = _os.getenv('Wimp$ScrapDir')
if dirname: dirlist.append(dirname)
- elif os_name == "nt":
+ elif os_name == 'nt':
dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
else:
dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
@@ -204,15 +205,18 @@
name = namer.next()
filename = _os.path.join(dir, name)
try:
- fd = _os.open(filename, flags, 0600)
- fp = _os.fdopen(fd, 'w')
- fp.write('blat')
- fp.close()
- _os.unlink(filename)
- del fp, fd
+ fd = _os.open(filename, flags, 0o600)
+ try:
+ try:
+ with _io.open(fd, 'wb', closefd=False) as fp:
+ fp.write(b'blat')
+ finally:
+ _os.close(fd)
+ finally:
+ _os.unlink(filename)
return dir
- except (OSError, IOError), e:
- if e[0] != _errno.EEXIST:
+ except (OSError, IOError) as e:
+ if e.args[0] != _errno.EEXIST:
break # no point trying more names in this directory
pass
raise IOError, (_errno.ENOENT,
@@ -558,7 +562,7 @@
return self._file.closed
@property
- def encoding(self):
+ def encoding(self): # Jython not CPython
return self._file.encoding
def fileno(self):
@@ -573,14 +577,20 @@
@property
def mode(self):
- return self._file.mode
+ try:
+ return self._file.mode
+ except AttributeError:
+ return self._TemporaryFileArgs[0]
@property
def name(self):
- return self._file.name
+ try:
+ return self._file.name
+ except AttributeError:
+ return None
@property
- def newlines(self):
+ def newlines(self): # Jython not CPython
return self._file.newlines
def next(self):
@@ -621,4 +631,7 @@
return rv
def xreadlines(self, *args):
- return self._file.xreadlines(*args)
+ try:
+ return self._file.xreadlines(*args)
+ except AttributeError:
+ return iter(self._file.readlines(*args))
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py
--- a/Lib/test/test_tempfile.py
+++ b/Lib/test/test_tempfile.py
@@ -1,14 +1,16 @@
-# From Python 2.5.1
# tempfile.py unit tests.
import tempfile
+import errno
+import io
import os
+import signal
+import shutil
import sys
import re
-import errno
import warnings
import unittest
-from test import test_support
+from test import test_support as support
warnings.filterwarnings("ignore",
category=RuntimeWarning,
@@ -62,7 +64,7 @@
"file '%s' does not end with '%s'" % (nbase, suf))
nbase = nbase[len(pre):len(nbase)-len(suf)]
- self.assert_(self.str_check.match(nbase),
+ self.assertTrue(self.str_check.match(nbase),
"random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/"
% nbase)
@@ -91,7 +93,7 @@
for key in dict:
if key[0] != '_' and key not in expected:
unexp.append(key)
- self.failUnless(len(unexp) == 0,
+ self.assertTrue(len(unexp) == 0,
"unexpected keys: %s" % unexp)
test_classes.append(test_exports)
@@ -116,7 +118,7 @@
for i in xrange(TEST_FILES):
s = r.next()
self.nameCheck(s, '', '', '')
- self.failIf(s in dict)
+ self.assertNotIn(s, dict)
dict[s] = 1
def test_supports_iter(self):
@@ -132,6 +134,37 @@
except:
self.failOnException("iteration")
+ @unittest.skipUnless(hasattr(os, 'fork'),
+ "os.fork is required for this test")
+ def test_process_awareness(self):
+ # ensure that the random source differs between
+ # child and parent.
+ read_fd, write_fd = os.pipe()
+ pid = None
+ try:
+ pid = os.fork()
+ if not pid:
+ os.close(read_fd)
+ os.write(write_fd, next(self.r).encode("ascii"))
+ os.close(write_fd)
+ # bypass the normal exit handlers- leave those to
+ # the parent.
+ os._exit(0)
+ parent_value = next(self.r)
+ child_value = os.read(read_fd, len(parent_value)).decode("ascii")
+ finally:
+ if pid:
+ # best effort to ensure the process can't bleed out
+ # via any bugs above
+ try:
+ os.kill(pid, signal.SIGKILL)
+ except EnvironmentError:
+ pass
+ os.close(read_fd)
+ os.close(write_fd)
+ self.assertNotEqual(child_value, parent_value)
+
+
test_classes.append(test__RandomNameSequence)
@@ -143,42 +176,84 @@
cand = tempfile._candidate_tempdir_list()
- self.failIf(len(cand) == 0)
+ self.assertFalse(len(cand) == 0)
for c in cand:
- self.assert_(isinstance(c, basestring),
- "%s is not a string" % c)
+ self.assertIsInstance(c, basestring)
def test_wanted_dirs(self):
# _candidate_tempdir_list contains the expected directories
# Make sure the interesting environment variables are all set.
- with test_support.EnvironmentVarGuard() as env:
+ with support.EnvironmentVarGuard() as env:
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname:
- env.set(envname, os.path.abspath(envname))
+ env[envname] = os.path.abspath(envname)
cand = tempfile._candidate_tempdir_list()
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname: raise ValueError
- self.assert_(dirname in cand)
+ self.assertIn(dirname, cand)
try:
dirname = os.getcwd()
except (AttributeError, os.error):
dirname = os.curdir
- self.assert_(dirname in cand)
+ self.assertIn(dirname, cand)
# Not practical to try to verify the presence of OS-specific
# paths in this list.
test_classes.append(test__candidate_tempdir_list)
+# We test _get_default_tempdir some more by testing gettempdir.
-# We test _get_default_tempdir by testing gettempdir.
+class TestGetDefaultTempdir(TC):
+ """Test _get_default_tempdir()."""
+
+ def test_no_files_left_behind(self):
+ # use a private empty directory
+ our_temp_directory = tempfile.mkdtemp()
+ try:
+ # force _get_default_tempdir() to consider our empty directory
+ def our_candidate_list():
+ return [our_temp_directory]
+
+ with support.swap_attr(tempfile, "_candidate_tempdir_list",
+ our_candidate_list):
+ # verify our directory is empty after _get_default_tempdir()
+ tempfile._get_default_tempdir()
+ self.assertEqual(os.listdir(our_temp_directory), [])
+
+ def raise_OSError(*args, **kwargs):
+ raise OSError(-1)
+
+ with support.swap_attr(io, "open", raise_OSError):
+ # test again with failing io.open()
+ with self.assertRaises(IOError) as cm:
+ tempfile._get_default_tempdir()
+ self.assertEqual(cm.exception.errno, errno.ENOENT)
+ self.assertEqual(os.listdir(our_temp_directory), [])
+
+ open = io.open
+ def bad_writer(*args, **kwargs):
+ fp = open(*args, **kwargs)
+ fp.write = raise_OSError
+ return fp
+
+ with support.swap_attr(io, "open", bad_writer):
+ # test again with failing write()
+ with self.assertRaises(IOError) as cm:
+ tempfile._get_default_tempdir()
+ self.assertEqual(cm.exception.errno, errno.ENOENT)
+ self.assertEqual(os.listdir(our_temp_directory), [])
+ finally:
+ shutil.rmtree(our_temp_directory)
+
+test_classes.append(TestGetDefaultTempdir)
class test__get_candidate_names(TC):
@@ -187,14 +262,14 @@
def test_retval(self):
# _get_candidate_names returns a _RandomNameSequence object
obj = tempfile._get_candidate_names()
- self.assert_(isinstance(obj, tempfile._RandomNameSequence))
+ self.assertIsInstance(obj, tempfile._RandomNameSequence)
def test_same_thing(self):
# _get_candidate_names always returns the same object
a = tempfile._get_candidate_names()
b = tempfile._get_candidate_names()
- self.assert_(a is b)
+ self.assertTrue(a is b)
test_classes.append(test__get_candidate_names)
@@ -268,7 +343,7 @@
def _test_file_mode(self):
# _mkstemp_inner creates files with the proper mode
if not has_stat:
- return # ugh, can't use TestSkipped.
+ return # ugh, can't use SkipTest.
file = self.do_create()
mode = stat.S_IMODE(os.stat(file.name).st_mode)
@@ -283,9 +358,9 @@
def test_noinherit(self):
# _mkstemp_inner file handles are not inherited by child processes
if not has_spawnl:
- return # ugh, can't use TestSkipped.
+ return # ugh, can't use SkipTest.
- if test_support.verbose:
+ if support.verbose:
v="v"
else:
v="q"
@@ -314,14 +389,14 @@
decorated = sys.executable
retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
- self.failIf(retval < 0,
+ self.assertFalse(retval < 0,
"child process caught fatal signal %d" % -retval)
- self.failIf(retval > 0, "child process reports failure %d"%retval)
+ self.assertFalse(retval > 0, "child process reports failure %d"%retval)
def test_textmode(self):
# _mkstemp_inner can create files in text mode
if not has_textmode:
- return # ugh, can't use TestSkipped.
+ return # ugh, can't use SkipTest.
self.do_create(bin=0).write("blat\n")
# XXX should test that the file really is a text file
@@ -336,8 +411,8 @@
# gettempprefix returns a nonempty prefix string
p = tempfile.gettempprefix()
- self.assert_(isinstance(p, basestring))
- self.assert_(len(p) > 0)
+ self.assertIsInstance(p, basestring)
+ self.assertTrue(len(p) > 0)
def test_usable_template(self):
# gettempprefix returns a usable prefix string
@@ -368,9 +443,9 @@
# gettempdir returns a directory which exists
dir = tempfile.gettempdir()
- self.assert_(os.path.isabs(dir) or dir == os.curdir,
+ self.assertTrue(os.path.isabs(dir) or dir == os.curdir,
"%s is not an absolute path" % dir)
- self.assert_(os.path.isdir(dir),
+ self.assertTrue(os.path.isdir(dir),
"%s is not a directory" % dir)
def test_directory_writable(self):
@@ -391,7 +466,7 @@
a = tempfile.gettempdir()
b = tempfile.gettempdir()
- self.assert_(a is b)
+ self.assertTrue(a is b)
def test_case_sensitive(self):
# gettempdir should not flatten its case
@@ -400,13 +475,13 @@
case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
_tempdir, tempfile.tempdir = tempfile.tempdir, None
try:
- with test_support.EnvironmentVarGuard() as env:
+ with support.EnvironmentVarGuard() as env:
# Fake the first env var which is checked as a candidate
env["TMPDIR"] = case_sensitive_tempdir
self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
finally:
tempfile.tempdir = _tempdir
- test_support.rmdir(case_sensitive_tempdir)
+ support.rmdir(case_sensitive_tempdir)
test_classes.append(test_gettempdir)
@@ -502,7 +577,7 @@
# mkdtemp creates directories with the proper mode
if not has_stat:
return # ugh, can't use TestSkipped.
- if test_support.is_jython and not os._native_posix:
+ if support.is_jython and not os._native_posix:
# Java doesn't support stating files for permissions
return
@@ -512,7 +587,7 @@
mode &= 0777 # Mask off sticky bits inherited from /tmp
expected = 0700
if (sys.platform in ('win32', 'os2emx', 'mac') or
- test_support.is_jython and os._name == 'nt'):
+ support.is_jython and os._name == 'nt'):
# There's no distinction among 'user', 'group' and 'world';
# replicate the 'user' bits.
user = expected >> 6
@@ -618,7 +693,7 @@
def test_creates_named(self):
# NamedTemporaryFile creates files with names
f = tempfile.NamedTemporaryFile()
- self.failUnless(os.path.exists(f.name),
+ self.assertTrue(os.path.exists(f.name),
"NamedTemporaryFile %s does not exist" % f.name)
def test_del_on_close(self):
@@ -628,7 +703,7 @@
f = tempfile.NamedTemporaryFile(dir=dir)
f.write('blat')
f.close()
- self.failIf(os.path.exists(f.name),
+ self.assertFalse(os.path.exists(f.name),
"NamedTemporaryFile %s exists after close" % f.name)
finally:
os.rmdir(dir)
@@ -642,7 +717,7 @@
tmp = f.name
f.write('blat')
f.close()
- self.failUnless(os.path.exists(f.name),
+ self.assertTrue(os.path.exists(f.name),
"NamedTemporaryFile %s missing after close" % f.name)
finally:
if tmp is not None:
@@ -663,12 +738,12 @@
def test_context_manager(self):
# A NamedTemporaryFile can be used as a context manager
with tempfile.NamedTemporaryFile() as f:
- self.failUnless(os.path.exists(f.name))
- self.failIf(os.path.exists(f.name))
+ self.assertTrue(os.path.exists(f.name))
+ self.assertFalse(os.path.exists(f.name))
def use_closed():
with f:
pass
- self.failUnlessRaises(ValueError, use_closed)
+ self.assertRaises(ValueError, use_closed)
# How to test the mode and bufsize parameters?
@@ -691,21 +766,21 @@
def test_basic(self):
# SpooledTemporaryFile can create files
f = self.do_create()
- self.failIf(f._rolled)
+ self.assertFalse(f._rolled)
f = self.do_create(max_size=100, pre="a", suf=".txt")
- self.failIf(f._rolled)
+ self.assertFalse(f._rolled)
def test_del_on_close(self):
# A SpooledTemporaryFile is deleted when closed
dir = tempfile.mkdtemp()
try:
f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
- self.failIf(f._rolled)
+ self.assertFalse(f._rolled)
f.write('blat ' * 5)
- self.failUnless(f._rolled)
+ self.assertTrue(f._rolled)
filename = f.name
f.close()
- self.failIf(os.path.exists(filename),
+ self.assertFalse(os.path.exists(filename),
"SpooledTemporaryFile %s exists after close" % filename)
finally:
os.rmdir(dir)
@@ -713,46 +788,74 @@
def test_rewrite_small(self):
# A SpooledTemporaryFile can be written to multiple within the max_size
f = self.do_create(max_size=30)
- self.failIf(f._rolled)
+ self.assertFalse(f._rolled)
for i in range(5):
f.seek(0, 0)
f.write('x' * 20)
- self.failIf(f._rolled)
+ self.assertFalse(f._rolled)
def test_write_sequential(self):
# A SpooledTemporaryFile should hold exactly max_size bytes, and roll
# over afterward
f = self.do_create(max_size=30)
- self.failIf(f._rolled)
+ self.assertFalse(f._rolled)
f.write('x' * 20)
- self.failIf(f._rolled)
+ self.assertFalse(f._rolled)
f.write('x' * 10)
- self.failIf(f._rolled)
+ self.assertFalse(f._rolled)
f.write('x')
- self.failUnless(f._rolled)
+ self.assertTrue(f._rolled)
+
+ def test_writelines(self):
+ # Verify writelines with a SpooledTemporaryFile
+ f = self.do_create()
+ f.writelines((b'x', b'y', b'z'))
+ f.seek(0)
+ buf = f.read()
+ self.assertEqual(buf, b'xyz')
+
+ def test_writelines_sequential(self):
+ # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
+ # over afterward
+ f = self.do_create(max_size=35)
+ f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
+ self.assertFalse(f._rolled)
+ f.write(b'x')
+ self.assertTrue(f._rolled)
+
+ def test_xreadlines(self):
+ f = self.do_create(max_size=20)
+ f.write(b'abc\n' * 5)
+ f.seek(0)
+ self.assertFalse(f._rolled)
+ self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5)
+ f.write(b'x\ny')
+ self.assertTrue(f._rolled)
+ f.seek(0)
+ self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5 + [b'x\n', b'y'])
def test_sparse(self):
# A SpooledTemporaryFile that is written late in the file will extend
# when that occurs
f = self.do_create(max_size=30)
- self.failIf(f._rolled)
+ self.assertFalse(f._rolled)
f.seek(100, 0)
- self.failIf(f._rolled)
+ self.assertFalse(f._rolled)
f.write('x')
- self.failUnless(f._rolled)
+ self.assertTrue(f._rolled)
def test_fileno(self):
# A SpooledTemporaryFile should roll over to a real file on fileno()
f = self.do_create(max_size=30)
- self.failIf(f._rolled)
- self.failUnless(f.fileno() > 0)
- self.failUnless(f._rolled)
+ self.assertFalse(f._rolled)
+ self.assertTrue(f.fileno() > 0)
+ self.assertTrue(f._rolled)
def test_multiple_close_before_rollover(self):
# A SpooledTemporaryFile can be closed many times without error
f = tempfile.SpooledTemporaryFile()
f.write('abc\n')
- self.failIf(f._rolled)
+ self.assertFalse(f._rolled)
f.close()
try:
f.close()
@@ -764,7 +867,7 @@
# A SpooledTemporaryFile can be closed many times without error
f = tempfile.SpooledTemporaryFile(max_size=1)
f.write('abc\n')
- self.failUnless(f._rolled)
+ self.assertTrue(f._rolled)
f.close()
try:
f.close()
@@ -784,46 +887,69 @@
write("a" * 35)
write("b" * 35)
seek(0, 0)
- self.failUnless(read(70) == 'a'*35 + 'b'*35)
+ self.assertTrue(read(70) == 'a'*35 + 'b'*35)
+
+ def test_properties(self):
+ f = tempfile.SpooledTemporaryFile(max_size=10)
+ f.write(b'x' * 10)
+ self.assertFalse(f._rolled)
+ self.assertEqual(f.mode, 'w+b')
+ self.assertIsNone(f.name)
+ # Jython SpooledTemporaryFile has these properties:
+ if not support.is_jython:
+ with self.assertRaises(AttributeError):
+ f.newlines
+ with self.assertRaises(AttributeError):
+ f.encoding
+
+ f.write(b'x')
+ self.assertTrue(f._rolled)
+ self.assertEqual(f.mode, 'w+b')
+ self.assertIsNotNone(f.name)
+ if not support.is_jython:
+ with self.assertRaises(AttributeError):
+ f.newlines
+ with self.assertRaises(AttributeError):
+ f.encoding
def test_context_manager_before_rollover(self):
# A SpooledTemporaryFile can be used as a context manager
with tempfile.SpooledTemporaryFile(max_size=1) as f:
- self.failIf(f._rolled)
- self.failIf(f.closed)
- self.failUnless(f.closed)
+ self.assertFalse(f._rolled)
+ self.assertFalse(f.closed)
+ self.assertTrue(f.closed)
def use_closed():
with f:
pass
- self.failUnlessRaises(ValueError, use_closed)
+ self.assertRaises(ValueError, use_closed)
def test_context_manager_during_rollover(self):
# A SpooledTemporaryFile can be used as a context manager
with tempfile.SpooledTemporaryFile(max_size=1) as f:
- self.failIf(f._rolled)
+ self.assertFalse(f._rolled)
f.write('abc\n')
f.flush()
- self.failUnless(f._rolled)
- self.failIf(f.closed)
- self.failUnless(f.closed)
+ self.assertTrue(f._rolled)
+ self.assertFalse(f.closed)
+ self.assertTrue(f.closed)
def use_closed():
with f:
pass
- self.failUnlessRaises(ValueError, use_closed)
+ self.assertRaises(ValueError, use_closed)
def test_context_manager_after_rollover(self):
# A SpooledTemporaryFile can be used as a context manager
f = tempfile.SpooledTemporaryFile(max_size=1)
f.write('abc\n')
f.flush()
- self.failUnless(f._rolled)
+ self.assertTrue(f._rolled)
with f:
- self.failIf(f.closed)
- self.failUnless(f.closed)
+ self.assertFalse(f.closed)
+ self.assertTrue(f.closed)
def use_closed():
with f:
pass
- self.failUnlessRaises(ValueError, use_closed)
+ self.assertRaises(ValueError, use_closed)
test_classes.append(test_SpooledTemporaryFile)
@@ -875,11 +1001,11 @@
test_classes.append(test_TemporaryFile)
def test_main():
- test_support.run_unittest(*test_classes)
+ support.run_unittest(*test_classes)
if __name__ == "__main__":
test_main()
- if test_support.is_jython:
+ if support.is_jython:
# XXX: Nudge Java's GC in an attempt to trigger any temp file's
# __del__ (cause them to be deleted) that hasn't been called
import gc
--
Repository URL: https://hg.python.org/jython
More information about the Jython-checkins
mailing list