[pypy-commit] pypy default: Extract UnicodeIO object from W_StringIO
rlamy
pypy.commits at gmail.com
Fri Nov 24 21:31:05 EST 2017
Author: Ronan Lamy <ronan.lamy at gmail.com>
Branch:
Changeset: r93173:1d90f3200c9c
Date: 2017-11-25 02:29 +0000
http://bitbucket.org/pypy/pypy/changeset/1d90f3200c9c/
Log: Extract UnicodeIO object from W_StringIO
diff --git a/pypy/module/_io/interp_stringio.py b/pypy/module/_io/interp_stringio.py
--- a/pypy/module/_io/interp_stringio.py
+++ b/pypy/module/_io/interp_stringio.py
@@ -2,21 +2,65 @@
from pypy.interpreter.typedef import (
TypeDef, generic_new_descr, GetSetProperty)
from pypy.interpreter.gateway import interp2app, unwrap_spec, WrappedDefault
-from pypy.module._io.interp_textio import W_TextIOBase, W_IncrementalNewlineDecoder
+from pypy.module._io.interp_textio import (
+ W_TextIOBase, W_IncrementalNewlineDecoder)
from pypy.module._io.interp_iobase import convert_size
+class UnicodeIO(object):
+ def __init__(self, data=None, pos=0):
+ if data is None:
+ data = []
+ self.data = data
+ self.pos = pos
+
+ def resize(self, newlength):
+ if len(self.data) > newlength:
+ self.data = self.data[:newlength]
+ if len(self.data) < newlength:
+ self.data.extend([u'\0'] * (newlength - len(self.data)))
+
+ def read(self, size):
+ start = self.pos
+ available = len(self.data) - start
+ if available <= 0:
+ return u''
+ if size >= 0 and size <= available:
+ end = start + size
+ else:
+ end = len(self.data)
+ assert 0 <= start <= end
+ self.pos = end
+ return u''.join(self.data[start:end])
+
+ def write(self, string):
+ length = len(string)
+ if self.pos + length > len(self.data):
+ self.resize(self.pos + length)
+
+ for i in range(length):
+ self.data[self.pos + i] = string[i]
+ self.pos += length
+
+ def seek(self, pos):
+ self.pos = pos
+
+ def truncate(self, size):
+ if size < len(self.data):
+ self.resize(size)
+
+ def getvalue(self):
+ return u''.join(self.data)
+
class W_StringIO(W_TextIOBase):
def __init__(self, space):
W_TextIOBase.__init__(self, space)
- self.buf = []
- self.pos = 0
+ self.buf = UnicodeIO()
- @unwrap_spec(w_newline = WrappedDefault("\n"))
+ @unwrap_spec(w_newline=WrappedDefault("\n"))
def descr_init(self, space, w_initvalue=None, w_newline=None):
# In case __init__ is called multiple times
- self.buf = []
- self.pos = 0
+ self.buf = UnicodeIO()
self.w_decoder = None
self.readnl = None
self.writenl = None
@@ -27,7 +71,7 @@
newline = space.unicode_w(w_newline)
if (newline is not None and newline != u"" and newline != u"\n" and
- newline != u"\r" and newline != u"\r\n"):
+ newline != u"\r" and newline != u"\r\n"):
# Not using oefmt() because I don't know how to use it
# with unicode
raise OperationError(space.w_ValueError,
@@ -50,7 +94,7 @@
if not space.is_none(w_initvalue):
self.write_w(space, w_initvalue)
- self.pos = 0
+ self.buf.pos = 0
def descr_getstate(self, space):
w_initialval = self.getvalue_w(space)
@@ -58,9 +102,9 @@
if self.readnl is None:
w_readnl = space.w_None
else:
- w_readnl = space.str(space.newunicode(self.readnl)) # YYY
+ w_readnl = space.str(space.newunicode(self.readnl)) # YYY
return space.newtuple([
- w_initialval, w_readnl, space.newint(self.pos), w_dict
+ w_initialval, w_readnl, space.newint(self.buf.pos), w_dict
])
def descr_setstate(self, space, w_state):
@@ -69,34 +113,33 @@
# We allow the state tuple to be longer than 4, because we may need
# someday to extend the object's state without breaking
# backwards-compatibility
- if not space.isinstance_w(w_state, space.w_tuple) or space.len_w(w_state) < 4:
+ if (not space.isinstance_w(w_state, space.w_tuple)
+ or space.len_w(w_state) < 4):
raise oefmt(space.w_TypeError,
"%T.__setstate__ argument should be a 4-tuple, got %T",
self, w_state)
w_initval, w_readnl, w_pos, w_dict = space.unpackiterable(w_state, 4)
+ if not space.isinstance_w(w_initval, space.w_unicode):
+ raise oefmt(space.w_TypeError,
+ "unicode argument expected, got '%T'", w_initval)
# Initialize state
- self.descr_init(space, w_initval, w_readnl)
+ self.descr_init(space, None, w_readnl)
- # Restore the buffer state. Even if __init__ did initialize the buffer,
- # we have to initialize it again since __init__ may translates the
- # newlines in the inital_value string. We clearly do not want that
+ # Restore the buffer state. We're not doing it via __init__
# because the string value in the state tuple has already been
# translated once by __init__. So we do not take any chance and replace
# object's buffer completely
initval = space.unicode_w(w_initval)
- size = len(initval)
- self.resize_buffer(size)
- self.buf = list(initval)
pos = space.getindex_w(w_pos, space.w_TypeError)
if pos < 0:
raise oefmt(space.w_ValueError,
"position value cannot be negative")
- self.pos = pos
+ self.buf = UnicodeIO(list(initval), pos)
if not space.is_w(w_dict, space.w_None):
if not space.isinstance_w(w_dict, space.w_dict):
- raise oefmt(space.w_TypeError,
- "fourth item of state should be a dict, got a %T",
- w_dict)
+ raise oefmt(
+ space.w_TypeError,
+ "fourth item of state should be a dict, got a %T", w_dict)
# Alternatively, we could replace the internal dictionary
# completely. However, it seems more practical to just update it.
space.call_method(self.w_dict, "update", w_dict)
@@ -107,86 +150,56 @@
message = "I/O operation on closed file"
raise OperationError(space.w_ValueError, space.newtext(message))
- def resize_buffer(self, newlength):
- if len(self.buf) > newlength:
- self.buf = self.buf[:newlength]
- if len(self.buf) < newlength:
- self.buf.extend([u'\0'] * (newlength - len(self.buf)))
-
- def write(self, string):
- length = len(string)
- if self.pos + length > len(self.buf):
- self.resize_buffer(self.pos + length)
-
- for i in range(length):
- self.buf[self.pos + i] = string[i]
- self.pos += length
-
def write_w(self, space, w_obj):
if not space.isinstance_w(w_obj, space.w_unicode):
raise oefmt(space.w_TypeError,
"unicode argument expected, got '%T'", w_obj)
self._check_closed(space)
-
orig_size = space.len_w(w_obj)
if self.w_decoder is not None:
w_decoded = space.call_method(
- self.w_decoder, "decode", w_obj, space.w_True
- )
+ self.w_decoder, "decode", w_obj, space.w_True)
else:
w_decoded = w_obj
-
if self.writenl:
w_decoded = space.call_method(
- w_decoded, "replace", space.newtext("\n"), space.newunicode(self.writenl)
- )
+ w_decoded, "replace",
+ space.newtext("\n"), space.newunicode(self.writenl))
+ string = space.unicode_w(w_decoded)
+ if string:
+ self.buf.write(string)
- string = space.unicode_w(w_decoded)
- size = len(string)
-
- if size:
- self.write(string)
return space.newint(orig_size)
def read_w(self, space, w_size=None):
self._check_closed(space)
size = convert_size(space, w_size)
- start = self.pos
- available = len(self.buf) - start
- if available <= 0:
- return space.newunicode(u"")
- if size >= 0 and size <= available:
- end = start + size
- else:
- end = len(self.buf)
- assert 0 <= start <= end
- self.pos = end
- return space.newunicode(u''.join(self.buf[start:end]))
+ return space.newunicode(self.buf.read(size))
def readline_w(self, space, w_limit=None):
self._check_closed(space)
limit = convert_size(space, w_limit)
- if self.pos >= len(self.buf):
+ if self.buf.pos >= len(self.buf.data):
return space.newunicode(u"")
- start = self.pos
- if limit < 0 or limit > len(self.buf) - self.pos:
- limit = len(self.buf) - self.pos
+ start = self.buf.pos
+ if limit < 0 or limit > len(self.buf.data) - self.buf.pos:
+ limit = len(self.buf.data) - self.buf.pos
assert limit >= 0
endpos, found = self._find_line_ending(
# XXX: super inefficient, makes a copy of the entire contents.
- u"".join(self.buf),
+ u"".join(self.buf.data),
start,
limit
)
if not found:
endpos = start + limit
assert endpos >= 0
- self.pos = endpos
- return space.newunicode(u"".join(self.buf[start:endpos]))
+ self.buf.pos = endpos
+ return space.newunicode(u"".join(self.buf.data[start:endpos]))
@unwrap_spec(pos=int, mode=int)
def seek_w(self, space, pos, mode=0):
@@ -202,32 +215,27 @@
# XXX: this makes almost no sense, but its how CPython does it.
if mode == 1:
- pos = self.pos
+ pos = self.buf.pos
elif mode == 2:
- pos = len(self.buf)
-
+ pos = len(self.buf.data)
assert pos >= 0
- self.pos = pos
+ self.buf.seek(pos)
return space.newint(pos)
def truncate_w(self, space, w_size=None):
self._check_closed(space)
if space.is_none(w_size):
- size = self.pos
+ size = self.buf.pos
else:
size = space.int_w(w_size)
-
if size < 0:
raise oefmt(space.w_ValueError, "Negative size value %d", size)
-
- if size < len(self.buf):
- self.resize_buffer(size)
-
+ self.buf.truncate(size)
return space.newint(size)
def getvalue_w(self, space):
self._check_closed(space)
- return space.newunicode(u''.join(self.buf))
+ return space.newunicode(self.buf.getvalue())
def readable_w(self, space):
self._check_closed(space)
More information about the pypy-commit
mailing list