[pypy-svn] r64513 - in pypy/trunk/pypy/rlib: . test
iko at codespeak.net
iko at codespeak.net
Tue Apr 21 15:49:39 CEST 2009
Author: iko
Date: Tue Apr 21 15:49:38 2009
New Revision: 64513
Modified:
pypy/trunk/pypy/rlib/streamio.py
pypy/trunk/pypy/rlib/test/test_streamio.py
Log:
(iko, pedronis) Implement text mode for windows in streamio
Modified: pypy/trunk/pypy/rlib/streamio.py
==============================================================================
--- pypy/trunk/pypy/rlib/streamio.py (original)
+++ pypy/trunk/pypy/rlib/streamio.py Tue Apr 21 15:49:38 2009
@@ -72,18 +72,18 @@
def open_file_as_stream(path, mode="r", buffering=-1):
- os_flags, universal, reading, writing, basemode = decode_mode(mode)
+ os_flags, universal, reading, writing, basemode, binary = decode_mode(mode)
stream = open_path_helper(path, os_flags, basemode == "a")
return construct_stream_tower(stream, buffering, universal, reading,
- writing)
+ writing, binary)
def fdopen_as_stream(fd, mode, buffering):
# XXX XXX XXX you want do check whether the modes are compatible
# otherwise you get funny results
- os_flags, universal, reading, writing, basemode = decode_mode(mode)
+ os_flags, universal, reading, writing, basemode, binary = decode_mode(mode)
stream = DiskFile(fd)
return construct_stream_tower(stream, buffering, universal, reading,
- writing)
+ writing, binary)
def open_path_helper(path, os_flags, append):
# XXX for now always return DiskFile
@@ -116,16 +116,16 @@
break
flag = OS_MODE[basemode, plus]
- if binary or universal:
- flag |= O_BINARY
+ flag |= O_BINARY
reading = basemode == 'r' or plus
writing = basemode != 'r' or plus
- return flag, universal, reading, writing, basemode
+ return flag, universal, reading, writing, basemode, binary
-def construct_stream_tower(stream, buffering, universal, reading, writing):
+def construct_stream_tower(stream, buffering, universal, reading, writing,
+ binary):
if buffering == 0: # no buffering
pass
elif buffering == 1: # line-buffering
@@ -147,6 +147,8 @@
stream = TextOutputFilter(stream)
if reading:
stream = TextInputFilter(stream)
+ elif not binary and os.linesep == '\r\n':
+ stream = TextCRLFFilter(stream)
return stream
@@ -234,6 +236,9 @@
def truncate(self, size):
raise NotImplementedError
+ def flush_buffers(self):
+ pass
+
def flush(self):
pass
@@ -814,6 +819,72 @@
try_to_find_file_descriptor = PassThrough("try_to_find_file_descriptor",
flush_buffers=False)
+class TextCRLFFilter(Stream):
+
+ """Filtering stream for universal newlines.
+
+ TextInputFilter is more general, but this is faster when you don't
+ need tell/seek.
+ """
+
+ def __init__(self, base):
+ self.base = base
+ self.do_read = base.read
+ self.do_write = base.write
+ self.do_flush = base.flush_buffers
+ self.lfbuffer = ""
+
+ def read(self, n):
+ data = self.lfbuffer + self.do_read(n)
+ self.lfbuffer = ""
+ if data.endswith("\r"):
+ c = self.do_read(1)
+ if c and c[0] == '\n':
+ data = data + '\n'
+ self.lfbuffer = c[1:]
+ else:
+ self.lfbuffer = c
+
+ result = []
+ offset = 0
+ while True:
+ newoffset = data.find('\r\n', offset)
+ if newoffset < 0:
+ result.append(data[offset:])
+ break
+ result.append(data[offset:newoffset])
+ offset = newoffset + 2
+
+ return '\n'.join(result)
+
+ def tell(self):
+ pos = self.base.tell()
+ return pos - len(self.lfbuffer)
+
+ def seek(self, offset, whence):
+ if whence == 1:
+ offset -= len(self.lfbuffer) # correct for already-read-ahead character
+ self.base.seek(offset, whence)
+ self.lfbuffer = ""
+
+ def flush_buffers(self):
+ if self.lfbuffer:
+ self.base.seek(-len(self.lfbuffer), 1)
+ self.lfbuffer = ""
+ self.do_flush()
+
+ def write(self, data):
+ data = replace_char_with_str(data, '\n', '\r\n')
+ self.flush_buffers()
+ self.do_write(data)
+
+ truncate = PassThrough("truncate", flush_buffers=True)
+ flush = PassThrough("flush", flush_buffers=False)
+ flushable= PassThrough("flushable", flush_buffers=False)
+ close = PassThrough("close", flush_buffers=False)
+ try_to_find_file_descriptor = PassThrough("try_to_find_file_descriptor",
+ flush_buffers=False)
+
class TextInputFilter(Stream):
"""Filtering input stream for universal newline translation."""
Modified: pypy/trunk/pypy/rlib/test/test_streamio.py
==============================================================================
--- pypy/trunk/pypy/rlib/test/test_streamio.py (original)
+++ pypy/trunk/pypy/rlib/test/test_streamio.py Tue Apr 21 15:49:38 2009
@@ -590,6 +590,83 @@
class TestCRLFFilterOOinterp(BaseTestCRLFFilter, OORtypeMixin):
pass
+class BaseTestTextCRLFFilter(BaseRtypingTest):
+ def test_simple(self):
+ packets = ["abc\r\n", "abc\r", "\nd\r\nef\r\ngh", "a\rbc\r", "def\n",
+ "\r", "\n\r"]
+ expected = ["abc\n", "abc\n", "d\nef\ngh", "a\rbc\r", "def\n", "\n",
+ "\r"]
+ crlf = streamio.TextCRLFFilter(TSource(packets))
+ def f():
+ blocks = []
+ while True:
+ block = crlf.read(100)
+ if not block:
+ break
+ blocks.append(block)
+ assert blocks == expected
+ self.interpret(f, [])
+
+ def test_readline_and_seek(self):
+ packets = ["abc\r\n", "abc\r", "\nd\r\nef\r\ngh", "a\rbc\r", "def\n",
+ "\r", "\n\r"]
+ expected = ["abc\n", "abc\n", "d\n","ef\n", "gha\rbc\rdef\n", "\n",
+ "\r"]
+ crlf = streamio.TextCRLFFilter(TSource(packets))
+ def f():
+ lines = []
+ while True:
+ pos = crlf.tell()
+ line = crlf.readline()
+ if not line:
+ break
+ crlf.seek(pos, 0)
+ line2 = crlf.readline()
+ assert line2 == line
+ lines.append(line)
+ assert lines == expected
+ self.interpret(f, [])
+
+ def test_seek_relative(self):
+ packets = ["abc\r\n", "abc\r", "\nd\r\nef\r"]
+ expected = ["abc\n", "abc\n", "d\n","ef\r"]
+
+ crlf = streamio.TextCRLFFilter(TSource(packets))
+ def f():
+ lines = []
+ while True:
+ pos = crlf.tell()
+ line = crlf.readline()
+ if not line:
+ break
+ crlf.seek(0, 1)
+ lines.append(line)
+ assert lines == expected
+ self.interpret(f, [])
+
+ def test_write(self):
+ data = "line1\r\nline2\rline3\r\n"
+ crlf = streamio.TextCRLFFilter(TReaderWriter(data))
+ def f():
+ line = crlf.readline()
+ assert line == 'line1\n'
+ line = crlf.read(6)
+ assert line == 'line2\r'
+ pos = crlf.tell()
+ crlf.write('line3\n')
+ crlf.seek(pos,0)
+ line = crlf.readline()
+ assert line == 'line3\n'
+ line = crlf.readline()
+ assert line == ''
+ self.interpret(f, [])
+
+class TestTextCRLFFilterLLInterp(BaseTestTextCRLFFilter, LLRtypeMixin):
+ pass
+
+class TestTextCRLFFilterOOInterp(BaseTestTextCRLFFilter, OORtypeMixin):
+ pass
+
class TestMMapFile(BaseTestBufferingInputStreamTests):
tfn = None
fd = None
More information about the Pypy-commit
mailing list