[pypy-commit] pypy stdlib-2.7.4-fixed-io: fixed support for io in stdlib 2.7.4

andrewsmedina noreply at buildbot.pypy.org
Tue Jul 30 00:33:35 CEST 2013


Author: Andrews Medina <andrewsmedina at gmail.com>
Branch: stdlib-2.7.4-fixed-io
Changeset: r65804:43221a2e8365
Date: 2013-07-28 18:46 -0300
http://bitbucket.org/pypy/pypy/changeset/43221a2e8365/

Log:	fixed support for io in stdlib 2.7.4

diff --git a/lib-python/2.7/test/test_io.py b/lib-python/2.7/test/test_io.py
--- a/lib-python/2.7/test/test_io.py
+++ b/lib-python/2.7/test/test_io.py
@@ -1004,6 +1004,7 @@
         support.gc_collect()
         self.assertTrue(wr() is None, wr)
 
+    @support.impl_detail(cpython=True)
     def test_args_error(self):
         # Issue #17275
         with self.assertRaisesRegexp(TypeError, "BufferedReader"):
@@ -1302,6 +1303,7 @@
         with self.open(support.TESTFN, "rb") as f:
             self.assertEqual(f.read(), b"123xxx")
 
+    @support.impl_detail(cpython=True)
     def test_args_error(self):
         # Issue #17275
         with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
@@ -1676,6 +1678,7 @@
         CBufferedReaderTest.test_garbage_collection(self)
         CBufferedWriterTest.test_garbage_collection(self)
 
+    @support.impl_detail(cpython=True)
     def test_args_error(self):
         # Issue #17275
         with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
diff --git a/pypy/module/_io/interp_textio.py b/pypy/module/_io/interp_textio.py
--- a/pypy/module/_io/interp_textio.py
+++ b/pypy/module/_io/interp_textio.py
@@ -1,7 +1,7 @@
 import sys
 
 from pypy.interpreter.baseobjspace import W_Root
-from pypy.interpreter.error import OperationError
+from pypy.interpreter.error import OperationError, operationerrfmt
 from pypy.interpreter.gateway import WrappedDefault, interp2app, unwrap_spec
 from pypy.interpreter.typedef import (
     GetSetProperty, TypeDef, generic_new_descr, interp_attrproperty,
@@ -327,6 +327,13 @@
         self.flags = flags
         self.input = input
 
+
+def check_decoded(space, w_decoded):
+    if not space.isinstance_w(w_decoded, space.w_unicode):
+        msg = "decoder should return a string result, not '%T'"
+        raise operationerrfmt(space.w_TypeError, msg, w_decoded)
+
+
 class W_TextIOWrapper(W_TextIOBase):
     def __init__(self, space):
         W_TextIOBase.__init__(self, space)
@@ -546,9 +553,15 @@
         # Read a chunk, decode it, and put the result in self._decoded_chars
         w_input = space.call_method(self.w_buffer, "read1",
                                     space.wrap(self.chunk_size))
+
+        if not space.isinstance_w(w_input, space.w_str):
+            msg = "decoder getstate() should have returned a bytes object not '%T'"
+            raise operationerrfmt(space.w_TypeError, msg, w_input)
+
         eof = space.len_w(w_input) == 0
         w_decoded = space.call_method(self.w_decoder, "decode",
                                       w_input, space.wrap(eof))
+        check_decoded(space, w_decoded)
         self._set_decoded_chars(space.unicode_w(w_decoded))
         if space.len_w(w_decoded) > 0:
             eof = False
@@ -577,10 +590,12 @@
 
         size = convert_size(space, w_size)
         self._writeflush(space)
+
         if size < 0:
             # Read everything
             w_bytes = space.call_method(self.w_buffer, "read")
             w_decoded = space.call_method(self.w_decoder, "decode", w_bytes, space.w_True)
+            check_decoded(space, w_decoded)
             w_result = space.wrap(self._get_decoded_chars(-1))
             w_final = space.add(w_result, w_decoded)
             self.snapshot = None
@@ -701,6 +716,10 @@
         if not self.w_encoder:
             raise OperationError(space.w_IOError, space.wrap("not writable"))
 
+        if not space.isinstance_w(w_text, space.w_unicode):
+            msg = "unicode argument expected, got '%T'"
+            raise operationerrfmt(space.w_TypeError, msg, w_text)
+
         text = space.unicode_w(w_text)
         textlen = len(text)
 
@@ -845,11 +864,16 @@
             # Just like _read_chunk, feed the decoder and save a snapshot.
             w_chunk = space.call_method(self.w_buffer, "read",
                                         space.wrap(cookie.bytes_to_feed))
+            if not space.isinstance_w(w_chunk, space.w_str):
+                msg = "underlying read() should have returned a bytes object, not '%T'"
+                raise operationerrfmt(space.w_TypeError, msg, w_chunk)
+
             self.snapshot = PositionSnapshot(cookie.dec_flags,
                                              space.str_w(w_chunk))
 
             w_decoded = space.call_method(self.w_decoder, "decode",
                                           w_chunk, space.wrap(cookie.need_eof))
+            check_decoded(space, w_decoded)
             self._set_decoded_chars(space.unicode_w(w_decoded))
 
             # Skip chars_to_skip of the decoded characters
@@ -918,6 +942,7 @@
             while i < len(input):
                 w_decoded = space.call_method(self.w_decoder, "decode",
                                               space.wrap(input[i]))
+                check_decoded(space, w_decoded)
                 chars_decoded += len(space.unicode_w(w_decoded))
 
                 cookie.bytes_to_feed += 1
@@ -942,6 +967,7 @@
                 w_decoded = space.call_method(self.w_decoder, "decode",
                                               space.wrap(""),
                                               space.wrap(1)) # final=1
+                check_decoded(space, w_decoded)
                 chars_decoded += len(space.unicode_w(w_decoded))
                 cookie.need_eof = 1
 
diff --git a/pypy/module/_io/test/test_fileio.py b/pypy/module/_io/test/test_fileio.py
--- a/pypy/module/_io/test/test_fileio.py
+++ b/pypy/module/_io/test/test_fileio.py
@@ -98,6 +98,13 @@
         f.close()
         f2.close()
 
+    def test_writelines_error(self):
+        import _io
+        txt = _io.TextIOWrapper(_io.BytesIO())
+        raises(TypeError, txt.writelines, [1, 2, 3])
+        raises(TypeError, txt.writelines, None)
+        raises(TypeError, txt.writelines, b'abc')
+
     def test_seek(self):
         import _io
         f = _io.FileIO(self.tmpfile, 'rb')
@@ -195,7 +202,7 @@
     space.appexec([space.wrap(str(tmpfile))], """(tmpfile):
         import io
         f = io.open(tmpfile, 'w', encoding='ascii')
-        f.write('42')
+        f.write(u'42')
         # no flush() and no close()
         import sys; sys._keepalivesomewhereobscure = f
     """)
diff --git a/pypy/module/_io/test/test_textio.py b/pypy/module/_io/test/test_textio.py
--- a/pypy/module/_io/test/test_textio.py
+++ b/pypy/module/_io/test/test_textio.py
@@ -216,6 +216,29 @@
         raises(IOError, txt.close)  # exception not swallowed
         assert txt.closed
 
+    def test_illegal_decoder(self):
+        import _io
+        t = _io.TextIOWrapper(_io.BytesIO(b'aaaaaa'), newline='\n',
+                             encoding='quopri_codec')
+        raises(TypeError, t.read, 1)
+        t = _io.TextIOWrapper(_io.BytesIO(b'aaaaaa'), newline='\n',
+                             encoding='quopri_codec')
+        raises(TypeError, t.readline)
+        t = _io.TextIOWrapper(_io.BytesIO(b'aaaaaa'), newline='\n',
+                             encoding='quopri_codec')
+        raises(TypeError, t.read)
+
+    def test_read_nonbytes(self):
+        import _io
+        class NonbytesStream(_io.StringIO):
+            read1 = _io.StringIO.read
+        t = _io.TextIOWrapper(NonbytesStream(u'a'))
+        raises(TypeError, t.read, 1)
+        t = _io.TextIOWrapper(NonbytesStream(u'a'))
+        raises(TypeError, t.readline)
+        t = _io.TextIOWrapper(NonbytesStream(u'a'))
+        t.read() == u'a'
+
 
 class AppTestIncrementalNewlineDecoder:
     def test_newline_decoder(self):


More information about the pypy-commit mailing list