[pypy-svn] r79348 - in pypy/branch/fast-forward/pypy/module/_io: . test
afa at codespeak.net
afa at codespeak.net
Mon Nov 22 17:29:44 CET 2010
Author: afa
Date: Mon Nov 22 17:29:41 2010
New Revision: 79348
Modified:
pypy/branch/fast-forward/pypy/module/_io/interp_textio.py
pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py
Log:
Looong code to implement TextIOWrapper.readline()
Modified: pypy/branch/fast-forward/pypy/module/_io/interp_textio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/interp_textio.py (original)
+++ pypy/branch/fast-forward/pypy/module/_io/interp_textio.py Mon Nov 22 17:29:41 2010
@@ -235,6 +235,10 @@
self.decoded_chars_used = 0 # offset into _decoded_chars for read()
self.chunk_size = 8192
+ self.readuniversal = False
+ self.readtranslate = False
+ self.readnl = None
+
@unwrap_spec('self', ObjSpace, W_Root, W_Root, W_Root, W_Root, int)
def descr_init(self, space, w_buffer, w_encoding=None,
w_errors=None, w_newline=None, line_buffering=0):
@@ -269,16 +273,18 @@
if space.is_w(w_newline, space.w_None):
newline = None
else:
- newline = space.str_w(w_newline)
- if newline and newline not in ('\n', '\r\n', '\r'):
+ newline = space.unicode_w(w_newline)
+ if newline and newline not in (u'\n', u'\r\n', u'\r'):
+ r = space.str_w(space.repr(w_newline))
raise OperationError(space.w_ValueError, space.wrap(
- "illegal newline value: %s" % (newline,)))
+ "illegal newline value: %s" % (r,)))
self.line_buffering = line_buffering
+ self.readuniversal = not newline # null or empty
+ self.readtranslate = newline is None
+ self.readnl = newline
# XXX self.writenl
- readuniversal = not newline # null or empty
- readtranslate = newline is None
# build the decoder object
if space.is_true(space.call_method(w_buffer, "readable")):
@@ -286,10 +292,10 @@
space.str_w(self.w_encoding))
self.w_decoder = space.call_method(w_codec,
"incrementaldecoder", w_errors)
- if readuniversal:
+ if self.readuniversal:
self.w_decoder = space.call_function(
space.gettypeobject(W_IncrementalNewlineDecoder.typedef),
- self.w_decoder, space.wrap(readtranslate))
+ self.w_decoder, space.wrap(self.readtranslate))
self.state = STATE_OK
@@ -352,6 +358,7 @@
available = len(self.decoded_chars) - self.decoded_chars_used
if size < 0 or size > available:
size = available
+ assert size >= 0
if self.decoded_chars_used > 0 or size < available:
start = self.decoded_chars_used
@@ -427,12 +434,137 @@
return space.wrap(builder.build())
+ def _find_line_ending(self, line, start):
+ end = len(line)
+ size = end - start
+ if self.readtranslate:
+
+ # Newlines are already translated, only search for \n
+ pos = line.find(u'\n', start, end)
+ if pos >= 0:
+ return pos - start + 1, 0
+ else:
+ return -1, size
+ elif self.readuniversal:
+ # Universal newline search. Find any of \r, \r\n, \n
+ # The decoder ensures that \r\n are not split in two pieces
+ i = 0
+ while True:
+ # Fast path for non-control chars. The loop always ends
+ # since the Py_UNICODE storage is NUL-terminated.
+ while line[start + i] > '\r':
+ i += 1
+ if i >= size:
+ return -1, size
+ ch = line[start + i]
+ i += 1
+ if ch == '\n':
+ return i, 0
+ if ch == '\r':
+ if line[start + i] == '\n':
+ return i + 1, 0
+ else:
+ return i, 0
+ else:
+ # Non-universal mode.
+ pos = line.find(self.readnl, start, end)
+ if pos >= 0:
+ return pos - start + len(self.readnl), 0
+ else:
+ restart = end - len(self.readnl)
+ if restart >= 0:
+ pos = line.find(self.readnl[0], restart, end)
+ if pos >= 0:
+ return -1, pos - start
+ return -1, size
+
@unwrap_spec('self', ObjSpace, W_Root)
def readline_w(self, space, w_limit=None):
- self._check_init(space)
- # XXX w_limit?
- w_bytes = space.call_method(self.w_buffer, "readline")
- return space.call_method(self.w_decoder, "decode", w_bytes)
+ self._check_closed(space)
+ # XXX self._writeflush(space)
+
+ limit = convert_size(space, w_limit)
+ chunked = 0
+
+ line = None
+ remaining = None
+ chunks = []
+
+ while True:
+ # First, get some data if necessary
+ has_data = True
+ while not self.decoded_chars:
+ if not self._read_chunk(space):
+ has_data = False
+ break
+ if not has_data:
+ # end of file
+ self._set_decoded_chars(None)
+ self.snapshot = None
+ start = endpos = offset_to_buffer = 0
+ break
+
+ if not remaining:
+ line = self.decoded_chars
+ start = self.decoded_chars_used
+ offset_to_buffer = 0
+ else:
+ assert self.decoded_chars_used == 0
+ line = remaining + self.decoded_chars
+ start = 0
+ offset_to_buffer = len(remaining)
+ remaining = None
+
+ line_len = len(line)
+ endpos, consumed = self._find_line_ending(line, start)
+ if endpos >= 0:
+ endpos += start
+ if limit >= 0 and endpos >= start + limit - chunked:
+ endpos = start + limit - chunked
+ assert endpos >= 0
+ break
+ assert consumed >= 0
+
+ # We can put aside up to `endpos`
+ endpos = consumed + start
+ if limit >= 0 and endpos >= start + limit - chunked:
+ # Didn't find line ending, but reached length limit
+ endpos = start + limit - chunked
+ assert endpos >= 0
+ break
+
+ # No line ending seen yet - put aside current data
+ if endpos > start:
+ s = line[start:endpos]
+ chunks.append(s)
+ chunked += len(s)
+ # There may be some remaining bytes we'll have to prepend to the
+ # next chunk of data
+ if endpos < line_len:
+ remaining = line[endpos:]
+ line = None
+ # We have consumed the buffer
+ self._set_decoded_chars(None)
+
+ if line:
+ # Our line ends in the current buffer
+ decoded_chars_used = endpos - offset_to_buffer
+ assert decoded_chars_used >= 0
+ self.decoded_chars_used = decoded_chars_used
+ if start > 0 or endpos < len(line):
+ line = line[start:endpos]
+ if remaining:
+ chunks.append(remaining)
+ remaining = None
+ if chunks:
+ if line:
+ chunks.append(line)
+ line = u''.join(chunks)
+
+ if line:
+ return space.wrap(line)
+ else:
+ return space.wrap(u'')
@unwrap_spec('self', ObjSpace)
def detach_w(self, space):
Modified: pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py (original)
+++ pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py Mon Nov 22 17:29:41 2010
@@ -77,6 +77,14 @@
reads += t.read()
assert reads == u"abc\ndef\n"
+ def test_read_some_then_readline(self):
+ import _io
+ r = _io.BytesIO("abc\ndef\n")
+ t = _io.TextIOWrapper(r)
+ reads = t.read(4)
+ reads += t.readline()
+ assert reads == u"abc\ndef\n"
+
class AppTestIncrementalNewlineDecoder:
def test_newline_decoder(self):
More information about the Pypy-commit
mailing list