[Jython-checkins] jython (merge default -> default): Merge v2.7 test_io fixes

jeff.allen jython-checkins at python.org
Fri Mar 22 23:59:31 CET 2013


http://hg.python.org/jython/rev/efbbc2b78186
changeset:   7104:efbbc2b78186
parent:      7098:98a58fb284f1
parent:      7103:72cfa7952565
user:        Jeff Allen <ja...py at farowl.co.uk>
date:        Fri Mar 22 22:52:22 2013 +0000
summary:
  Merge v2.7 test_io fixes

files:
  Lib/_jyio.py                         |   43 +++-
  Lib/test/test_io.py                  |  173 ++++++++++++++-
  src/org/python/core/PyByteArray.java |    2 +
  3 files changed, 206 insertions(+), 12 deletions(-)


diff --git a/Lib/_jyio.py b/Lib/_jyio.py
--- a/Lib/_jyio.py
+++ b/Lib/_jyio.py
@@ -227,7 +227,9 @@
     def raw(self):
         return self._raw
 
-    # Jython difference: @property closed(self) inherited from _IOBase.__closed
+    @property
+    def closed(self):
+        return self.raw.closed
 
     # Jython difference: emulate C implementation CHECK_INITIALIZED. This is for
     # compatibility, to pass test.test_io.CTextIOWrapperTest.test_initialization.
@@ -969,6 +971,20 @@
                )[self.seennl]
 
 
+def _check_decoded_chars(chars):
+    """Check decoder output is unicode"""
+    if not isinstance(chars, unicode):
+        raise TypeError("decoder should return a string result, not '%s'" %
+                        type(chars))
+
+def _check_buffered_bytes(b, context="read"):
+    """Check buffer has returned bytes"""
+    if not isinstance(b, str):
+        raise TypeError("underlying %s() should have returned a bytes object, not '%s'" %
+                        (context, type(b)))
+
+
+
 class TextIOWrapper(_TextIOBase):
 
     r"""Character and line based layer over a _BufferedIOBase object, buffer.
@@ -1108,7 +1124,9 @@
             finally:
                 self.buffer.close()
 
-    # Jython difference: @property closed(self) inherited from _IOBase.__closed
+    @property
+    def closed(self):
+        return self.buffer.closed
 
     # Jython difference: emulate C implementation CHECK_INITIALIZED. This is for
     # compatibility, to pass test.test_io.CTextIOWrapperTest.test_initialization.
@@ -1212,8 +1230,12 @@
 
         # Read a chunk, decode it, and put the result in self._decoded_chars.
         input_chunk = self.buffer.read1(self._CHUNK_SIZE)
+        _check_buffered_bytes(input_chunk, "read1")
+
         eof = not input_chunk
-        self._set_decoded_chars(self._decoder.decode(input_chunk, eof))
+        decoded_chunk = self._decoder.decode(input_chunk, eof)
+        _check_decoded_chars(decoded_chunk)
+        self._set_decoded_chars(decoded_chunk)
 
         if self._telling:
             # At the snapshot point, len(dec_buffer) bytes before the read,
@@ -1366,8 +1388,11 @@
         if chars_to_skip:
             # Just like _read_chunk, feed the decoder and save a snapshot.
             input_chunk = self.buffer.read(bytes_to_feed)
-            self._set_decoded_chars(
-                self._decoder.decode(input_chunk, need_eof))
+            _check_buffered_bytes(input_chunk)
+            decoded_chunk = self._decoder.decode(input_chunk, need_eof)
+            _check_decoded_chars(decoded_chunk)
+            self._set_decoded_chars(decoded_chunk)
+
             self._snapshot = (dec_flags, input_chunk)
 
             # Skip chars_to_skip of the decoded characters.
@@ -1399,8 +1424,12 @@
             raise TypeError("an integer is required")
         if n < 0:
             # Read everything.
-            result = (self._get_decoded_chars() +
-                      decoder.decode(self.buffer.read(), final=True))
+            input_chunk = self.buffer.read()
+            # Jython difference: CPython textio.c omits:
+            _check_buffered_bytes(input_chunk)
+            decoded_chunk = decoder.decode(input_chunk, final=True)
+            _check_decoded_chars(decoded_chunk)
+            result = self._get_decoded_chars() + decoded_chunk
             self._set_decoded_chars('')
             self._snapshot = None
             return result
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -34,7 +34,9 @@
 import errno
 from itertools import cycle, count
 from collections import deque
+from UserList import UserList
 from test import test_support as support
+import contextlib
 
 import codecs
 import io               # subject of test
@@ -587,6 +589,7 @@
             raise IOError()
         f.flush = bad_flush
         self.assertRaises(IOError, f.close) # exception not swallowed
+        self.assertTrue(f.closed)
 
     def test_multi_close(self):
         f = self.open(support.TESTFN, "wb", buffering=0)
@@ -608,6 +611,19 @@
         self.assertEqual(rawio.read(2), None)
         self.assertEqual(rawio.read(2), b"")
 
+    def test_fileio_closefd(self):
+        # Issue #4841
+        with self.open(__file__, 'rb') as f1, \
+             self.open(__file__, 'rb') as f2:
+            fileio = self.FileIO(f1.fileno(), closefd=False)
+            # .__init__() must not close f1
+            fileio.__init__(f2.fileno(), closefd=False)
+            f1.readline()
+            # .close() must not close f2
+            fileio.close()
+            f2.readline()
+
+
 class CIOTest(IOTest):
 
     @unittest.skipIf(support.is_jython, "GC nondeterministic in Jython")
@@ -752,6 +768,21 @@
         raw.flush = bad_flush
         b = self.tp(raw)
         self.assertRaises(IOError, b.close) # exception not swallowed
+        self.assertTrue(b.closed)
+
+    def test_close_error_on_close(self):
+        raw = self.MockRawIO()
+        def bad_flush():
+            raise IOError('flush')
+        def bad_close():
+            raise IOError('close')
+        raw.close = bad_close
+        b = self.tp(raw)
+        b.flush = bad_flush
+        with self.assertRaises(IOError) as err: # exception not swallowed
+            b.close()
+        self.assertEqual(err.exception.args, ('close',))
+        self.assertFalse(b.closed)
 
     def test_multi_close(self):
         raw = self.MockRawIO()
@@ -769,6 +800,20 @@
             buf.raw = x
 
 
+class SizeofTest:
+
+    @support.cpython_only
+    def test_sizeof(self):
+        bufsize1 = 4096
+        bufsize2 = 8192
+        rawio = self.MockRawIO()
+        bufio = self.tp(rawio, buffer_size=bufsize1)
+        size = sys.getsizeof(bufio) - bufsize1
+        rawio = self.MockRawIO()
+        bufio = self.tp(rawio, buffer_size=bufsize2)
+        self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
+
+
 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
     read_mode = "rb"
 
@@ -952,7 +997,7 @@
                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
 
 
-class CBufferedReaderTest(BufferedReaderTest):
+class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
     tp = io.BufferedReader
 
     def test_constructor(self):
@@ -998,6 +1043,14 @@
         support.gc_collect()
         self.assertTrue(wr() is None, wr)
 
+    @unittest.skipIf(support.is_jython, "FIXME: in the Java version with ArgParser")
+    # When implemented in Python, the error message is about __init__, even on CPython
+    def test_args_error(self):
+        # Issue #17275
+        with self.assertRaisesRegexp(TypeError, "BufferedReader"):
+            self.tp(io.BytesIO(), 1024, 1024, 1024)
+
+
 class PyBufferedReaderTest(BufferedReaderTest):
     tp = pyio.BufferedReader
 
@@ -1138,6 +1191,28 @@
         bufio.flush()
         self.assertEqual(b"abc", writer._write_stack[0])
 
+    def test_writelines(self):
+        l = [b'ab', b'cd', b'ef']
+        writer = self.MockRawIO()
+        bufio = self.tp(writer, 8)
+        bufio.writelines(l)
+        bufio.flush()
+        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
+
+    def test_writelines_userlist(self):
+        l = UserList([b'ab', b'cd', b'ef'])
+        writer = self.MockRawIO()
+        bufio = self.tp(writer, 8)
+        bufio.writelines(l)
+        bufio.flush()
+        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
+
+    def test_writelines_error(self):
+        writer = self.MockRawIO()
+        bufio = self.tp(writer, 8)
+        self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
+        self.assertRaises(TypeError, bufio.writelines, None)
+
     @unittest.skipIf(support.is_jython, "GC nondeterministic in Jython")
     def test_destructor(self):
         writer = self.MockRawIO()
@@ -1220,8 +1295,18 @@
                                      DeprecationWarning)):
             self.tp(self.MockRawIO(), 8, 12)
 
-
-class CBufferedWriterTest(BufferedWriterTest):
+    def test_write_error_on_close(self):
+        raw = self.MockRawIO()
+        def bad_write(b):
+            raise IOError()
+        raw.write = bad_write
+        b = self.tp(raw)
+        b.write(b'spam')
+        self.assertRaises(IOError, b.close) # exception not swallowed
+        self.assertTrue(b.closed)
+
+
+class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
     tp = io.BufferedWriter
 
     def test_constructor(self):
@@ -1260,6 +1345,13 @@
         with self.open(support.TESTFN, "rb") as f:
             self.assertEqual(f.read(), b"123xxx")
 
+    @unittest.skipIf(support.is_jython, "FIXME: in the Java version with ArgParser")
+    # When implemented in Python, the error message is about __init__, even on CPython
+    def test_args_error(self):
+        # Issue #17275
+        with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
+            self.tp(io.BytesIO(), 1024, 1024, 1024)
+
 
 class PyBufferedWriterTest(BufferedWriterTest):
     tp = pyio.BufferedWriter
@@ -1618,7 +1710,8 @@
                 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
 
 
-class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
+class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
+                          BufferedRandomTest, SizeofTest):
     tp = io.BufferedRandom
 
     def test_constructor(self):
@@ -1635,6 +1728,14 @@
         CBufferedReaderTest.test_garbage_collection(self)
         CBufferedWriterTest.test_garbage_collection(self)
 
+    @unittest.skipIf(support.is_jython, "FIXME: in the Java version with ArgParser")
+    # When implemented in Python, the error message is about __init__, even on CPython
+    def test_args_error(self):
+        # Issue #17275
+        with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
+            self.tp(io.BytesIO(), 1024, 1024, 1024)
+
+
 class PyBufferedRandomTest(BufferedRandomTest):
     tp = pyio.BufferedRandom
 
@@ -2233,6 +2334,28 @@
             reads += c
         self.assertEqual(reads, "A"*127+"\nB")
 
+    def test_writelines(self):
+        l = ['ab', 'cd', 'ef']
+        buf = self.BytesIO()
+        txt = self.TextIOWrapper(buf)
+        txt.writelines(l)
+        txt.flush()
+        self.assertEqual(buf.getvalue(), b'abcdef')
+
+    def test_writelines_userlist(self):
+        l = UserList(['ab', 'cd', 'ef'])
+        buf = self.BytesIO()
+        txt = self.TextIOWrapper(buf)
+        txt.writelines(l)
+        txt.flush()
+        self.assertEqual(buf.getvalue(), b'abcdef')
+
+    def test_writelines_error(self):
+        txt = self.TextIOWrapper(self.BytesIO())
+        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
+        self.assertRaises(TypeError, txt.writelines, None)
+        self.assertRaises(TypeError, txt.writelines, b'abc')
+
     def test_issue1395_1(self):
         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
 
@@ -2356,6 +2479,7 @@
             raise IOError()
         txt.flush = bad_flush
         self.assertRaises(IOError, txt.close) # exception not swallowed
+        self.assertTrue(txt.closed)
 
     def test_multi_close(self):
         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
@@ -2370,6 +2494,41 @@
         with self.assertRaises((AttributeError, TypeError)):
             txt.buffer = buf
 
+    def test_read_nonbytes(self):
+        # Issue #17106
+        # Crash when underlying read() returns non-bytes
+        class NonbytesStream(self.StringIO):
+            read1 = self.StringIO.read
+        class NonbytesStream(self.StringIO):
+            read1 = self.StringIO.read
+        t = self.TextIOWrapper(NonbytesStream('a'))
+        with self.maybeRaises(TypeError):
+            t.read(1)
+        t = self.TextIOWrapper(NonbytesStream('a'))
+        with self.maybeRaises(TypeError):
+            t.readline()
+        t = self.TextIOWrapper(NonbytesStream('a'))
+        with self.maybeRaises(TypeError):
+            # Jython difference: also detect the error in
+            t.read()
+
+    def test_illegal_decoder(self):
+        # Issue #17106
+        # Crash when decoder returns non-string
+        t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
+                               encoding='quopri_codec')
+        with self.maybeRaises(TypeError):
+            t.read(1)
+        t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
+                               encoding='quopri_codec')
+        with self.maybeRaises(TypeError):
+            t.readline()
+        t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
+                               encoding='quopri_codec')
+        with self.maybeRaises(TypeError):
+            t.read()
+
+
 class CTextIOWrapperTest(TextIOWrapperTest):
 
     def test_initialization(self):
@@ -2416,9 +2575,13 @@
             t2.buddy = t1
         support.gc_collect()
 
+    maybeRaises = unittest.TestCase.assertRaises
+
 
 class PyTextIOWrapperTest(TextIOWrapperTest):
-    pass
+    @contextlib.contextmanager
+    def maybeRaises(self, *args, **kwds):
+        yield
 
 
 class IncrementalNewlineDecoderTest(unittest.TestCase):
diff --git a/src/org/python/core/PyByteArray.java b/src/org/python/core/PyByteArray.java
--- a/src/org/python/core/PyByteArray.java
+++ b/src/org/python/core/PyByteArray.java
@@ -1191,6 +1191,8 @@
 
     @ExposedMethod(doc = BuiltinDocs.bytearray_extend_doc)
     final synchronized void bytearray_extend(PyObject o) {
+        // Raise TypeError if the argument is not iterable
+        o.__iter__();
         // Use the general method, assigning to the crack at the end of the array.
         // Note this deals with all legitimate PyObject types including the case o==this.
         setslice(size, size, 1, o);

-- 
Repository URL: http://hg.python.org/jython


More information about the Jython-checkins mailing list