[Jython-checkins] jython (merge default -> default): Merge v2.7 test_io fixes
jeff.allen
jython-checkins at python.org
Fri Mar 22 23:59:31 CET 2013
http://hg.python.org/jython/rev/efbbc2b78186
changeset: 7104:efbbc2b78186
parent: 7098:98a58fb284f1
parent: 7103:72cfa7952565
user: Jeff Allen <ja...py at farowl.co.uk>
date: Fri Mar 22 22:52:22 2013 +0000
summary:
Merge v2.7 test_io fixes
files:
Lib/_jyio.py | 43 +++-
Lib/test/test_io.py | 173 ++++++++++++++-
src/org/python/core/PyByteArray.java | 2 +
3 files changed, 206 insertions(+), 12 deletions(-)
diff --git a/Lib/_jyio.py b/Lib/_jyio.py
--- a/Lib/_jyio.py
+++ b/Lib/_jyio.py
@@ -227,7 +227,9 @@
def raw(self):
return self._raw
- # Jython difference: @property closed(self) inherited from _IOBase.__closed
+ @property
+ def closed(self):
+ return self.raw.closed
# Jython difference: emulate C implementation CHECK_INITIALIZED. This is for
# compatibility, to pass test.test_io.CTextIOWrapperTest.test_initialization.
@@ -969,6 +971,20 @@
)[self.seennl]
+def _check_decoded_chars(chars):
+ """Check decoder output is unicode"""
+ if not isinstance(chars, unicode):
+ raise TypeError("decoder should return a string result, not '%s'" %
+ type(chars))
+
+def _check_buffered_bytes(b, context="read"):
+ """Check buffer has returned bytes"""
+ if not isinstance(b, str):
+ raise TypeError("underlying %s() should have returned a bytes object, not '%s'" %
+ (context, type(b)))
+
+
+
class TextIOWrapper(_TextIOBase):
r"""Character and line based layer over a _BufferedIOBase object, buffer.
@@ -1108,7 +1124,9 @@
finally:
self.buffer.close()
- # Jython difference: @property closed(self) inherited from _IOBase.__closed
+ @property
+ def closed(self):
+ return self.buffer.closed
# Jython difference: emulate C implementation CHECK_INITIALIZED. This is for
# compatibility, to pass test.test_io.CTextIOWrapperTest.test_initialization.
@@ -1212,8 +1230,12 @@
# Read a chunk, decode it, and put the result in self._decoded_chars.
input_chunk = self.buffer.read1(self._CHUNK_SIZE)
+ _check_buffered_bytes(input_chunk, "read1")
+
eof = not input_chunk
- self._set_decoded_chars(self._decoder.decode(input_chunk, eof))
+ decoded_chunk = self._decoder.decode(input_chunk, eof)
+ _check_decoded_chars(decoded_chunk)
+ self._set_decoded_chars(decoded_chunk)
if self._telling:
# At the snapshot point, len(dec_buffer) bytes before the read,
@@ -1366,8 +1388,11 @@
if chars_to_skip:
# Just like _read_chunk, feed the decoder and save a snapshot.
input_chunk = self.buffer.read(bytes_to_feed)
- self._set_decoded_chars(
- self._decoder.decode(input_chunk, need_eof))
+ _check_buffered_bytes(input_chunk)
+ decoded_chunk = self._decoder.decode(input_chunk, need_eof)
+ _check_decoded_chars(decoded_chunk)
+ self._set_decoded_chars(decoded_chunk)
+
self._snapshot = (dec_flags, input_chunk)
# Skip chars_to_skip of the decoded characters.
@@ -1399,8 +1424,12 @@
raise TypeError("an integer is required")
if n < 0:
# Read everything.
- result = (self._get_decoded_chars() +
- decoder.decode(self.buffer.read(), final=True))
+ input_chunk = self.buffer.read()
+ # Jython difference: CPython textio.c omits:
+ _check_buffered_bytes(input_chunk)
+ decoded_chunk = decoder.decode(input_chunk, final=True)
+ _check_decoded_chars(decoded_chunk)
+ result = self._get_decoded_chars() + decoded_chunk
self._set_decoded_chars('')
self._snapshot = None
return result
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -34,7 +34,9 @@
import errno
from itertools import cycle, count
from collections import deque
+from UserList import UserList
from test import test_support as support
+import contextlib
import codecs
import io # subject of test
@@ -587,6 +589,7 @@
raise IOError()
f.flush = bad_flush
self.assertRaises(IOError, f.close) # exception not swallowed
+ self.assertTrue(f.closed)
def test_multi_close(self):
f = self.open(support.TESTFN, "wb", buffering=0)
@@ -608,6 +611,19 @@
self.assertEqual(rawio.read(2), None)
self.assertEqual(rawio.read(2), b"")
+ def test_fileio_closefd(self):
+ # Issue #4841
+ with self.open(__file__, 'rb') as f1, \
+ self.open(__file__, 'rb') as f2:
+ fileio = self.FileIO(f1.fileno(), closefd=False)
+ # .__init__() must not close f1
+ fileio.__init__(f2.fileno(), closefd=False)
+ f1.readline()
+ # .close() must not close f2
+ fileio.close()
+ f2.readline()
+
+
class CIOTest(IOTest):
@unittest.skipIf(support.is_jython, "GC nondeterministic in Jython")
@@ -752,6 +768,21 @@
raw.flush = bad_flush
b = self.tp(raw)
self.assertRaises(IOError, b.close) # exception not swallowed
+ self.assertTrue(b.closed)
+
+ def test_close_error_on_close(self):
+ raw = self.MockRawIO()
+ def bad_flush():
+ raise IOError('flush')
+ def bad_close():
+ raise IOError('close')
+ raw.close = bad_close
+ b = self.tp(raw)
+ b.flush = bad_flush
+ with self.assertRaises(IOError) as err: # exception not swallowed
+ b.close()
+ self.assertEqual(err.exception.args, ('close',))
+ self.assertFalse(b.closed)
def test_multi_close(self):
raw = self.MockRawIO()
@@ -769,6 +800,20 @@
buf.raw = x
+class SizeofTest:
+
+ @support.cpython_only
+ def test_sizeof(self):
+ bufsize1 = 4096
+ bufsize2 = 8192
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio, buffer_size=bufsize1)
+ size = sys.getsizeof(bufio) - bufsize1
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio, buffer_size=bufsize2)
+ self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
+
+
class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
read_mode = "rb"
@@ -952,7 +997,7 @@
"failed for {}: {} != 0".format(n, rawio._extraneous_reads))
-class CBufferedReaderTest(BufferedReaderTest):
+class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
tp = io.BufferedReader
def test_constructor(self):
@@ -998,6 +1043,14 @@
support.gc_collect()
self.assertTrue(wr() is None, wr)
+ @unittest.skipIf(support.is_jython, "FIXME: in the Java version with ArgParser")
+ # When implemented in Python, the error message is about __init__, even on CPython
+ def test_args_error(self):
+ # Issue #17275
+ with self.assertRaisesRegexp(TypeError, "BufferedReader"):
+ self.tp(io.BytesIO(), 1024, 1024, 1024)
+
+
class PyBufferedReaderTest(BufferedReaderTest):
tp = pyio.BufferedReader
@@ -1138,6 +1191,28 @@
bufio.flush()
self.assertEqual(b"abc", writer._write_stack[0])
+ def test_writelines(self):
+ l = [b'ab', b'cd', b'ef']
+ writer = self.MockRawIO()
+ bufio = self.tp(writer, 8)
+ bufio.writelines(l)
+ bufio.flush()
+ self.assertEqual(b''.join(writer._write_stack), b'abcdef')
+
+ def test_writelines_userlist(self):
+ l = UserList([b'ab', b'cd', b'ef'])
+ writer = self.MockRawIO()
+ bufio = self.tp(writer, 8)
+ bufio.writelines(l)
+ bufio.flush()
+ self.assertEqual(b''.join(writer._write_stack), b'abcdef')
+
+ def test_writelines_error(self):
+ writer = self.MockRawIO()
+ bufio = self.tp(writer, 8)
+ self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
+ self.assertRaises(TypeError, bufio.writelines, None)
+
@unittest.skipIf(support.is_jython, "GC nondeterministic in Jython")
def test_destructor(self):
writer = self.MockRawIO()
@@ -1220,8 +1295,18 @@
DeprecationWarning)):
self.tp(self.MockRawIO(), 8, 12)
-
-class CBufferedWriterTest(BufferedWriterTest):
+ def test_write_error_on_close(self):
+ raw = self.MockRawIO()
+ def bad_write(b):
+ raise IOError()
+ raw.write = bad_write
+ b = self.tp(raw)
+ b.write(b'spam')
+ self.assertRaises(IOError, b.close) # exception not swallowed
+ self.assertTrue(b.closed)
+
+
+class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
tp = io.BufferedWriter
def test_constructor(self):
@@ -1260,6 +1345,13 @@
with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"123xxx")
+ @unittest.skipIf(support.is_jython, "FIXME: in the Java version with ArgParser")
+ # When implemented in Python, the error message is about __init__, even on CPython
+ def test_args_error(self):
+ # Issue #17275
+ with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
+ self.tp(io.BytesIO(), 1024, 1024, 1024)
+
class PyBufferedWriterTest(BufferedWriterTest):
tp = pyio.BufferedWriter
@@ -1618,7 +1710,8 @@
self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
-class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
+class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
+ BufferedRandomTest, SizeofTest):
tp = io.BufferedRandom
def test_constructor(self):
@@ -1635,6 +1728,14 @@
CBufferedReaderTest.test_garbage_collection(self)
CBufferedWriterTest.test_garbage_collection(self)
+ @unittest.skipIf(support.is_jython, "FIXME: in the Java version with ArgParser")
+ # When implemented in Python, the error message is about __init__, even on CPython
+ def test_args_error(self):
+ # Issue #17275
+ with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
+ self.tp(io.BytesIO(), 1024, 1024, 1024)
+
+
class PyBufferedRandomTest(BufferedRandomTest):
tp = pyio.BufferedRandom
@@ -2233,6 +2334,28 @@
reads += c
self.assertEqual(reads, "A"*127+"\nB")
+ def test_writelines(self):
+ l = ['ab', 'cd', 'ef']
+ buf = self.BytesIO()
+ txt = self.TextIOWrapper(buf)
+ txt.writelines(l)
+ txt.flush()
+ self.assertEqual(buf.getvalue(), b'abcdef')
+
+ def test_writelines_userlist(self):
+ l = UserList(['ab', 'cd', 'ef'])
+ buf = self.BytesIO()
+ txt = self.TextIOWrapper(buf)
+ txt.writelines(l)
+ txt.flush()
+ self.assertEqual(buf.getvalue(), b'abcdef')
+
+ def test_writelines_error(self):
+ txt = self.TextIOWrapper(self.BytesIO())
+ self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
+ self.assertRaises(TypeError, txt.writelines, None)
+ self.assertRaises(TypeError, txt.writelines, b'abc')
+
def test_issue1395_1(self):
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
@@ -2356,6 +2479,7 @@
raise IOError()
txt.flush = bad_flush
self.assertRaises(IOError, txt.close) # exception not swallowed
+ self.assertTrue(txt.closed)
def test_multi_close(self):
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
@@ -2370,6 +2494,41 @@
with self.assertRaises((AttributeError, TypeError)):
txt.buffer = buf
+ def test_read_nonbytes(self):
+ # Issue #17106
+ # Crash when underlying read() returns non-bytes
+ class NonbytesStream(self.StringIO):
+ read1 = self.StringIO.read
+ class NonbytesStream(self.StringIO):
+ read1 = self.StringIO.read
+ t = self.TextIOWrapper(NonbytesStream('a'))
+ with self.maybeRaises(TypeError):
+ t.read(1)
+ t = self.TextIOWrapper(NonbytesStream('a'))
+ with self.maybeRaises(TypeError):
+ t.readline()
+ t = self.TextIOWrapper(NonbytesStream('a'))
+ with self.maybeRaises(TypeError):
+ # Jython difference: also detect the error in
+ t.read()
+
+ def test_illegal_decoder(self):
+ # Issue #17106
+ # Crash when decoder returns non-string
+ t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
+ encoding='quopri_codec')
+ with self.maybeRaises(TypeError):
+ t.read(1)
+ t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
+ encoding='quopri_codec')
+ with self.maybeRaises(TypeError):
+ t.readline()
+ t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
+ encoding='quopri_codec')
+ with self.maybeRaises(TypeError):
+ t.read()
+
+
class CTextIOWrapperTest(TextIOWrapperTest):
def test_initialization(self):
@@ -2416,9 +2575,13 @@
t2.buddy = t1
support.gc_collect()
+ maybeRaises = unittest.TestCase.assertRaises
+
class PyTextIOWrapperTest(TextIOWrapperTest):
- pass
+ @contextlib.contextmanager
+ def maybeRaises(self, *args, **kwds):
+ yield
class IncrementalNewlineDecoderTest(unittest.TestCase):
diff --git a/src/org/python/core/PyByteArray.java b/src/org/python/core/PyByteArray.java
--- a/src/org/python/core/PyByteArray.java
+++ b/src/org/python/core/PyByteArray.java
@@ -1191,6 +1191,8 @@
@ExposedMethod(doc = BuiltinDocs.bytearray_extend_doc)
final synchronized void bytearray_extend(PyObject o) {
+ // Raise TypeError if the argument is not iterable
+ o.__iter__();
// Use the general method, assigning to the crack at the end of the array.
// Note this deals with all legitimate PyObject types including the case o==this.
setslice(size, size, 1, o);
--
Repository URL: http://hg.python.org/jython
More information about the Jython-checkins
mailing list