[pypy-commit] pypy stdlib-2.7.4-fixed-io: fixed support for io in stdlib 2.7.4
andrewsmedina
noreply at buildbot.pypy.org
Tue Jul 30 00:33:35 CEST 2013
Author: Andrews Medina <andrewsmedina at gmail.com>
Branch: stdlib-2.7.4-fixed-io
Changeset: r65804:43221a2e8365
Date: 2013-07-28 18:46 -0300
http://bitbucket.org/pypy/pypy/changeset/43221a2e8365/
Log: fixed support for io in stdlib 2.7.4
diff --git a/lib-python/2.7/test/test_io.py b/lib-python/2.7/test/test_io.py
--- a/lib-python/2.7/test/test_io.py
+++ b/lib-python/2.7/test/test_io.py
@@ -1004,6 +1004,7 @@
support.gc_collect()
self.assertTrue(wr() is None, wr)
+ @support.impl_detail(cpython=True)
def test_args_error(self):
# Issue #17275
with self.assertRaisesRegexp(TypeError, "BufferedReader"):
@@ -1302,6 +1303,7 @@
with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"123xxx")
+ @support.impl_detail(cpython=True)
def test_args_error(self):
# Issue #17275
with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
@@ -1676,6 +1678,7 @@
CBufferedReaderTest.test_garbage_collection(self)
CBufferedWriterTest.test_garbage_collection(self)
+ @support.impl_detail(cpython=True)
def test_args_error(self):
# Issue #17275
with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
diff --git a/pypy/module/_io/interp_textio.py b/pypy/module/_io/interp_textio.py
--- a/pypy/module/_io/interp_textio.py
+++ b/pypy/module/_io/interp_textio.py
@@ -1,7 +1,7 @@
import sys
from pypy.interpreter.baseobjspace import W_Root
-from pypy.interpreter.error import OperationError
+from pypy.interpreter.error import OperationError, operationerrfmt
from pypy.interpreter.gateway import WrappedDefault, interp2app, unwrap_spec
from pypy.interpreter.typedef import (
GetSetProperty, TypeDef, generic_new_descr, interp_attrproperty,
@@ -327,6 +327,13 @@
self.flags = flags
self.input = input
+
+def check_decoded(space, w_decoded):
+ if not space.isinstance_w(w_decoded, space.w_unicode):
+ msg = "decoder should return a string result, not '%T'"
+ raise operationerrfmt(space.w_TypeError, msg, w_decoded)
+
+
class W_TextIOWrapper(W_TextIOBase):
def __init__(self, space):
W_TextIOBase.__init__(self, space)
@@ -546,9 +553,15 @@
# Read a chunk, decode it, and put the result in self._decoded_chars
w_input = space.call_method(self.w_buffer, "read1",
space.wrap(self.chunk_size))
+
+ if not space.isinstance_w(w_input, space.w_str):
+ msg = "decoder getstate() should have returned a bytes object not '%T'"
+ raise operationerrfmt(space.w_TypeError, msg, w_input)
+
eof = space.len_w(w_input) == 0
w_decoded = space.call_method(self.w_decoder, "decode",
w_input, space.wrap(eof))
+ check_decoded(space, w_decoded)
self._set_decoded_chars(space.unicode_w(w_decoded))
if space.len_w(w_decoded) > 0:
eof = False
@@ -577,10 +590,12 @@
size = convert_size(space, w_size)
self._writeflush(space)
+
if size < 0:
# Read everything
w_bytes = space.call_method(self.w_buffer, "read")
w_decoded = space.call_method(self.w_decoder, "decode", w_bytes, space.w_True)
+ check_decoded(space, w_decoded)
w_result = space.wrap(self._get_decoded_chars(-1))
w_final = space.add(w_result, w_decoded)
self.snapshot = None
@@ -701,6 +716,10 @@
if not self.w_encoder:
raise OperationError(space.w_IOError, space.wrap("not writable"))
+ if not space.isinstance_w(w_text, space.w_unicode):
+ msg = "unicode argument expected, got '%T'"
+ raise operationerrfmt(space.w_TypeError, msg, w_text)
+
text = space.unicode_w(w_text)
textlen = len(text)
@@ -845,11 +864,16 @@
# Just like _read_chunk, feed the decoder and save a snapshot.
w_chunk = space.call_method(self.w_buffer, "read",
space.wrap(cookie.bytes_to_feed))
+ if not space.isinstance_w(w_chunk, space.w_str):
+ msg = "underlying read() should have returned a bytes object, not '%T'"
+ raise operationerrfmt(space.w_TypeError, msg, w_chunk)
+
self.snapshot = PositionSnapshot(cookie.dec_flags,
space.str_w(w_chunk))
w_decoded = space.call_method(self.w_decoder, "decode",
w_chunk, space.wrap(cookie.need_eof))
+ check_decoded(space, w_decoded)
self._set_decoded_chars(space.unicode_w(w_decoded))
# Skip chars_to_skip of the decoded characters
@@ -918,6 +942,7 @@
while i < len(input):
w_decoded = space.call_method(self.w_decoder, "decode",
space.wrap(input[i]))
+ check_decoded(space, w_decoded)
chars_decoded += len(space.unicode_w(w_decoded))
cookie.bytes_to_feed += 1
@@ -942,6 +967,7 @@
w_decoded = space.call_method(self.w_decoder, "decode",
space.wrap(""),
space.wrap(1)) # final=1
+ check_decoded(space, w_decoded)
chars_decoded += len(space.unicode_w(w_decoded))
cookie.need_eof = 1
diff --git a/pypy/module/_io/test/test_fileio.py b/pypy/module/_io/test/test_fileio.py
--- a/pypy/module/_io/test/test_fileio.py
+++ b/pypy/module/_io/test/test_fileio.py
@@ -98,6 +98,13 @@
f.close()
f2.close()
+ def test_writelines_error(self):
+ import _io
+ txt = _io.TextIOWrapper(_io.BytesIO())
+ raises(TypeError, txt.writelines, [1, 2, 3])
+ raises(TypeError, txt.writelines, None)
+ raises(TypeError, txt.writelines, b'abc')
+
def test_seek(self):
import _io
f = _io.FileIO(self.tmpfile, 'rb')
@@ -195,7 +202,7 @@
space.appexec([space.wrap(str(tmpfile))], """(tmpfile):
import io
f = io.open(tmpfile, 'w', encoding='ascii')
- f.write('42')
+ f.write(u'42')
# no flush() and no close()
import sys; sys._keepalivesomewhereobscure = f
""")
diff --git a/pypy/module/_io/test/test_textio.py b/pypy/module/_io/test/test_textio.py
--- a/pypy/module/_io/test/test_textio.py
+++ b/pypy/module/_io/test/test_textio.py
@@ -216,6 +216,29 @@
raises(IOError, txt.close) # exception not swallowed
assert txt.closed
+ def test_illegal_decoder(self):
+ import _io
+ t = _io.TextIOWrapper(_io.BytesIO(b'aaaaaa'), newline='\n',
+ encoding='quopri_codec')
+ raises(TypeError, t.read, 1)
+ t = _io.TextIOWrapper(_io.BytesIO(b'aaaaaa'), newline='\n',
+ encoding='quopri_codec')
+ raises(TypeError, t.readline)
+ t = _io.TextIOWrapper(_io.BytesIO(b'aaaaaa'), newline='\n',
+ encoding='quopri_codec')
+ raises(TypeError, t.read)
+
+ def test_read_nonbytes(self):
+ import _io
+ class NonbytesStream(_io.StringIO):
+ read1 = _io.StringIO.read
+ t = _io.TextIOWrapper(NonbytesStream(u'a'))
+ raises(TypeError, t.read, 1)
+ t = _io.TextIOWrapper(NonbytesStream(u'a'))
+ raises(TypeError, t.readline)
+ t = _io.TextIOWrapper(NonbytesStream(u'a'))
+ t.read() == u'a'
+
class AppTestIncrementalNewlineDecoder:
def test_newline_decoder(self):
More information about the pypy-commit
mailing list