[pypy-commit] pypy default: Extract UnicodeIO object from W_StringIO

rlamy pypy.commits at gmail.com
Fri Nov 24 21:31:05 EST 2017


Author: Ronan Lamy <ronan.lamy at gmail.com>
Branch: 
Changeset: r93173:1d90f3200c9c
Date: 2017-11-25 02:29 +0000
http://bitbucket.org/pypy/pypy/changeset/1d90f3200c9c/

Log:	Extract UnicodeIO object from W_StringIO

diff --git a/pypy/module/_io/interp_stringio.py b/pypy/module/_io/interp_stringio.py
--- a/pypy/module/_io/interp_stringio.py
+++ b/pypy/module/_io/interp_stringio.py
@@ -2,21 +2,65 @@
 from pypy.interpreter.typedef import (
     TypeDef, generic_new_descr, GetSetProperty)
 from pypy.interpreter.gateway import interp2app, unwrap_spec, WrappedDefault
-from pypy.module._io.interp_textio import W_TextIOBase, W_IncrementalNewlineDecoder
+from pypy.module._io.interp_textio import (
+        W_TextIOBase, W_IncrementalNewlineDecoder)
 from pypy.module._io.interp_iobase import convert_size
 
+class UnicodeIO(object):
+    def __init__(self, data=None, pos=0):
+        if data is None:
+            data = []
+        self.data = data
+        self.pos = pos
+
+    def resize(self, newlength):
+        if len(self.data) > newlength:
+            self.data = self.data[:newlength]
+        if len(self.data) < newlength:
+            self.data.extend([u'\0'] * (newlength - len(self.data)))
+
+    def read(self, size):
+        start = self.pos
+        available = len(self.data) - start
+        if available <= 0:
+            return u''
+        if size >= 0 and size <= available:
+            end = start + size
+        else:
+            end = len(self.data)
+        assert 0 <= start <= end
+        self.pos = end
+        return u''.join(self.data[start:end])
+
+    def write(self, string):
+        length = len(string)
+        if self.pos + length > len(self.data):
+            self.resize(self.pos + length)
+
+        for i in range(length):
+            self.data[self.pos + i] = string[i]
+        self.pos += length
+
+    def seek(self, pos):
+        self.pos = pos
+
+    def truncate(self, size):
+        if size < len(self.data):
+            self.resize(size)
+
+    def getvalue(self):
+        return u''.join(self.data)
+
 
 class W_StringIO(W_TextIOBase):
     def __init__(self, space):
         W_TextIOBase.__init__(self, space)
-        self.buf = []
-        self.pos = 0
+        self.buf = UnicodeIO()
 
-    @unwrap_spec(w_newline = WrappedDefault("\n"))
+    @unwrap_spec(w_newline=WrappedDefault("\n"))
     def descr_init(self, space, w_initvalue=None, w_newline=None):
         # In case __init__ is called multiple times
-        self.buf = []
-        self.pos = 0
+        self.buf = UnicodeIO()
         self.w_decoder = None
         self.readnl = None
         self.writenl = None
@@ -27,7 +71,7 @@
             newline = space.unicode_w(w_newline)
 
         if (newline is not None and newline != u"" and newline != u"\n" and
-            newline != u"\r" and newline != u"\r\n"):
+                newline != u"\r" and newline != u"\r\n"):
             # Not using oefmt() because I don't know how to use it
             # with unicode
             raise OperationError(space.w_ValueError,
@@ -50,7 +94,7 @@
 
         if not space.is_none(w_initvalue):
             self.write_w(space, w_initvalue)
-            self.pos = 0
+            self.buf.pos = 0
 
     def descr_getstate(self, space):
         w_initialval = self.getvalue_w(space)
@@ -58,9 +102,9 @@
         if self.readnl is None:
             w_readnl = space.w_None
         else:
-            w_readnl = space.str(space.newunicode(self.readnl)) # YYY
+            w_readnl = space.str(space.newunicode(self.readnl))  # YYY
         return space.newtuple([
-            w_initialval, w_readnl, space.newint(self.pos), w_dict
+            w_initialval, w_readnl, space.newint(self.buf.pos), w_dict
         ])
 
     def descr_setstate(self, space, w_state):
@@ -69,34 +113,33 @@
         # We allow the state tuple to be longer than 4, because we may need
         # someday to extend the object's state without breaking
         # backwards-compatibility
-        if not space.isinstance_w(w_state, space.w_tuple) or space.len_w(w_state) < 4:
+        if (not space.isinstance_w(w_state, space.w_tuple)
+                or space.len_w(w_state) < 4):
             raise oefmt(space.w_TypeError,
                         "%T.__setstate__ argument should be a 4-tuple, got %T",
                         self, w_state)
         w_initval, w_readnl, w_pos, w_dict = space.unpackiterable(w_state, 4)
+        if not space.isinstance_w(w_initval, space.w_unicode):
+            raise oefmt(space.w_TypeError,
+                        "unicode argument expected, got '%T'", w_initval)
         # Initialize state
-        self.descr_init(space, w_initval, w_readnl)
+        self.descr_init(space, None, w_readnl)
 
-        # Restore the buffer state. Even if __init__ did initialize the buffer,
-        # we have to initialize it again since __init__ may translates the
-        # newlines in the inital_value string. We clearly do not want that
+        # Restore the buffer state. We're not doing it via __init__
         # because the string value in the state tuple has already been
         # translated once by __init__. So we do not take any chance and replace
         # object's buffer completely
         initval = space.unicode_w(w_initval)
-        size = len(initval)
-        self.resize_buffer(size)
-        self.buf = list(initval)
         pos = space.getindex_w(w_pos, space.w_TypeError)
         if pos < 0:
             raise oefmt(space.w_ValueError,
                         "position value cannot be negative")
-        self.pos = pos
+        self.buf = UnicodeIO(list(initval), pos)
         if not space.is_w(w_dict, space.w_None):
             if not space.isinstance_w(w_dict, space.w_dict):
-                raise oefmt(space.w_TypeError,
-                            "fourth item of state should be a dict, got a %T",
-                            w_dict)
+                raise oefmt(
+                    space.w_TypeError,
+                    "fourth item of state should be a dict, got a %T", w_dict)
             # Alternatively, we could replace the internal dictionary
             # completely. However, it seems more practical to just update it.
             space.call_method(self.w_dict, "update", w_dict)
@@ -107,86 +150,56 @@
                 message = "I/O operation on closed file"
             raise OperationError(space.w_ValueError, space.newtext(message))
 
-    def resize_buffer(self, newlength):
-        if len(self.buf) > newlength:
-            self.buf = self.buf[:newlength]
-        if len(self.buf) < newlength:
-            self.buf.extend([u'\0'] * (newlength - len(self.buf)))
-
-    def write(self, string):
-        length = len(string)
-        if self.pos + length > len(self.buf):
-            self.resize_buffer(self.pos + length)
-
-        for i in range(length):
-            self.buf[self.pos + i] = string[i]
-        self.pos += length
-
     def write_w(self, space, w_obj):
         if not space.isinstance_w(w_obj, space.w_unicode):
             raise oefmt(space.w_TypeError,
                         "unicode argument expected, got '%T'", w_obj)
         self._check_closed(space)
-
         orig_size = space.len_w(w_obj)
 
         if self.w_decoder is not None:
             w_decoded = space.call_method(
-                self.w_decoder, "decode", w_obj, space.w_True
-            )
+                self.w_decoder, "decode", w_obj, space.w_True)
         else:
             w_decoded = w_obj
-
         if self.writenl:
             w_decoded = space.call_method(
-                w_decoded, "replace", space.newtext("\n"), space.newunicode(self.writenl)
-            )
+                w_decoded, "replace",
+                space.newtext("\n"), space.newunicode(self.writenl))
+        string = space.unicode_w(w_decoded)
+        if string:
+            self.buf.write(string)
 
-        string = space.unicode_w(w_decoded)
-        size = len(string)
-
-        if size:
-            self.write(string)
         return space.newint(orig_size)
 
     def read_w(self, space, w_size=None):
         self._check_closed(space)
         size = convert_size(space, w_size)
-        start = self.pos
-        available = len(self.buf) - start
-        if available <= 0:
-            return space.newunicode(u"")
-        if size >= 0 and size <= available:
-            end = start + size
-        else:
-            end = len(self.buf)
-        assert 0 <= start <= end
-        self.pos = end
-        return space.newunicode(u''.join(self.buf[start:end]))
+        return space.newunicode(self.buf.read(size))
 
     def readline_w(self, space, w_limit=None):
         self._check_closed(space)
         limit = convert_size(space, w_limit)
 
-        if self.pos >= len(self.buf):
+        if self.buf.pos >= len(self.buf.data):
             return space.newunicode(u"")
 
-        start = self.pos
-        if limit < 0 or limit > len(self.buf) - self.pos:
-            limit = len(self.buf) - self.pos
+        start = self.buf.pos
+        if limit < 0 or limit > len(self.buf.data) - self.buf.pos:
+            limit = len(self.buf.data) - self.buf.pos
         assert limit >= 0
 
         endpos, found = self._find_line_ending(
             # XXX: super inefficient, makes a copy of the entire contents.
-            u"".join(self.buf),
+            u"".join(self.buf.data),
             start,
             limit
         )
         if not found:
             endpos = start + limit
         assert endpos >= 0
-        self.pos = endpos
-        return space.newunicode(u"".join(self.buf[start:endpos]))
+        self.buf.pos = endpos
+        return space.newunicode(u"".join(self.buf.data[start:endpos]))
 
     @unwrap_spec(pos=int, mode=int)
     def seek_w(self, space, pos, mode=0):
@@ -202,32 +215,27 @@
 
         # XXX: this makes almost no sense, but its how CPython does it.
         if mode == 1:
-            pos = self.pos
+            pos = self.buf.pos
         elif mode == 2:
-            pos = len(self.buf)
-
+            pos = len(self.buf.data)
         assert pos >= 0
-        self.pos = pos
+        self.buf.seek(pos)
         return space.newint(pos)
 
     def truncate_w(self, space, w_size=None):
         self._check_closed(space)
         if space.is_none(w_size):
-            size = self.pos
+            size = self.buf.pos
         else:
             size = space.int_w(w_size)
-
         if size < 0:
             raise oefmt(space.w_ValueError, "Negative size value %d", size)
-
-        if size < len(self.buf):
-            self.resize_buffer(size)
-
+        self.buf.truncate(size)
         return space.newint(size)
 
     def getvalue_w(self, space):
         self._check_closed(space)
-        return space.newunicode(u''.join(self.buf))
+        return space.newunicode(self.buf.getvalue())
 
     def readable_w(self, space):
         self._check_closed(space)


More information about the pypy-commit mailing list