[pypy-svn] r64513 - in pypy/trunk/pypy/rlib: . test

iko at codespeak.net iko at codespeak.net
Tue Apr 21 15:49:39 CEST 2009


Author: iko
Date: Tue Apr 21 15:49:38 2009
New Revision: 64513

Modified:
   pypy/trunk/pypy/rlib/streamio.py
   pypy/trunk/pypy/rlib/test/test_streamio.py
Log:
(iko, pedronis) Implement text mode for windows in streamio



Modified: pypy/trunk/pypy/rlib/streamio.py
==============================================================================
--- pypy/trunk/pypy/rlib/streamio.py	(original)
+++ pypy/trunk/pypy/rlib/streamio.py	Tue Apr 21 15:49:38 2009
@@ -72,18 +72,18 @@
 
 
 def open_file_as_stream(path, mode="r", buffering=-1):
-    os_flags, universal, reading, writing, basemode = decode_mode(mode)
+    os_flags, universal, reading, writing, basemode, binary = decode_mode(mode)
     stream = open_path_helper(path, os_flags, basemode == "a")
     return construct_stream_tower(stream, buffering, universal, reading,
-                                  writing)
+                                  writing, binary)
 
 def fdopen_as_stream(fd, mode, buffering):
     # XXX XXX XXX you want do check whether the modes are compatible
     # otherwise you get funny results
-    os_flags, universal, reading, writing, basemode = decode_mode(mode)
+    os_flags, universal, reading, writing, basemode, binary = decode_mode(mode)
     stream = DiskFile(fd)
     return construct_stream_tower(stream, buffering, universal, reading,
-                                  writing)
+                                  writing, binary)
 
 def open_path_helper(path, os_flags, append):
     # XXX for now always return DiskFile
@@ -116,16 +116,16 @@
             break
 
     flag = OS_MODE[basemode, plus]
-    if binary or universal:
-        flag |= O_BINARY
+    flag |= O_BINARY
 
     reading = basemode == 'r' or plus
     writing = basemode != 'r' or plus
 
-    return flag, universal, reading, writing, basemode
+    return flag, universal, reading, writing, basemode, binary
 
 
-def construct_stream_tower(stream, buffering, universal, reading, writing):
+def construct_stream_tower(stream, buffering, universal, reading, writing,
+                           binary):
     if buffering == 0:   # no buffering
         pass
     elif buffering == 1:   # line-buffering
@@ -147,6 +147,8 @@
             stream = TextOutputFilter(stream)
         if reading:
             stream = TextInputFilter(stream)
+    elif not binary and os.linesep == '\r\n':
+        stream = TextCRLFFilter(stream)
     return stream
 
 
@@ -234,6 +236,9 @@
     def truncate(self, size):
         raise NotImplementedError
 
+    def flush_buffers(self):
+        pass
+
     def flush(self):
         pass
 
@@ -814,6 +819,72 @@
     try_to_find_file_descriptor = PassThrough("try_to_find_file_descriptor",
                                               flush_buffers=False)
 
+class TextCRLFFilter(Stream):
+
+    """Filtering stream for universal newlines.
+
+    TextInputFilter is more general, but this is faster when you don't
+    need tell/seek.
+    """
+
+    def __init__(self, base):
+        self.base = base
+        self.do_read = base.read
+        self.do_write = base.write
+        self.do_flush = base.flush_buffers
+        self.lfbuffer = ""
+
+    def read(self, n):
+        data = self.lfbuffer + self.do_read(n)
+        self.lfbuffer = ""
+        if data.endswith("\r"):
+            c = self.do_read(1)
+            if c and c[0] == '\n':
+                data = data + '\n'
+                self.lfbuffer = c[1:]
+            else:
+                self.lfbuffer = c
+
+        result = []
+        offset = 0
+        while True:
+            newoffset = data.find('\r\n', offset)
+            if newoffset < 0:
+                result.append(data[offset:])
+                break
+            result.append(data[offset:newoffset])
+            offset = newoffset + 2
+
+        return '\n'.join(result)
+
+    def tell(self):
+        pos = self.base.tell()
+        return pos - len(self.lfbuffer)
+
+    def seek(self, offset, whence):
+        if whence == 1:
+            offset -= len(self.lfbuffer)   # correct for already-read-ahead character
+        self.base.seek(offset, whence)
+        self.lfbuffer = ""
+
+    def flush_buffers(self):
+        if self.lfbuffer:
+            self.base.seek(-len(self.lfbuffer), 1)
+            self.lfbuffer = ""
+        self.do_flush()
+
+    def write(self, data):
+        data = replace_char_with_str(data, '\n', '\r\n')
+        self.flush_buffers()
+        self.do_write(data)
+
+    truncate = PassThrough("truncate", flush_buffers=True)
+    flush    = PassThrough("flush", flush_buffers=False)
+    flushable= PassThrough("flushable", flush_buffers=False)
+    close    = PassThrough("close", flush_buffers=False)
+    try_to_find_file_descriptor = PassThrough("try_to_find_file_descriptor",
+                                              flush_buffers=False)
+    
 class TextInputFilter(Stream):
 
     """Filtering input stream for universal newline translation."""

Modified: pypy/trunk/pypy/rlib/test/test_streamio.py
==============================================================================
--- pypy/trunk/pypy/rlib/test/test_streamio.py	(original)
+++ pypy/trunk/pypy/rlib/test/test_streamio.py	Tue Apr 21 15:49:38 2009
@@ -590,6 +590,83 @@
 class TestCRLFFilterOOinterp(BaseTestCRLFFilter, OORtypeMixin):
     pass
 
+class BaseTestTextCRLFFilter(BaseRtypingTest):
+    def test_simple(self):
+        packets = ["abc\r\n", "abc\r", "\nd\r\nef\r\ngh", "a\rbc\r", "def\n",
+                   "\r", "\n\r"]
+        expected = ["abc\n", "abc\n", "d\nef\ngh", "a\rbc\r", "def\n", "\n",
+                    "\r"]
+        crlf = streamio.TextCRLFFilter(TSource(packets))
+        def f():
+            blocks = []
+            while True:
+                block = crlf.read(100)
+                if not block:
+                    break
+                blocks.append(block)
+            assert blocks == expected
+        self.interpret(f, [])
+
+    def test_readline_and_seek(self):
+        packets = ["abc\r\n", "abc\r", "\nd\r\nef\r\ngh", "a\rbc\r", "def\n",
+                   "\r", "\n\r"]
+        expected = ["abc\n", "abc\n", "d\n","ef\n", "gha\rbc\rdef\n", "\n",
+                    "\r"]
+        crlf = streamio.TextCRLFFilter(TSource(packets))
+        def f():
+            lines = []
+            while True:
+                pos = crlf.tell()
+                line = crlf.readline()
+                if not line:
+                    break
+                crlf.seek(pos, 0)
+                line2 = crlf.readline()
+                assert line2 == line                         
+                lines.append(line)
+            assert lines == expected
+        self.interpret(f, [])
+
+    def test_seek_relative(self):
+        packets = ["abc\r\n", "abc\r", "\nd\r\nef\r"]
+        expected = ["abc\n", "abc\n", "d\n","ef\r"]
+
+        crlf = streamio.TextCRLFFilter(TSource(packets))
+        def f():
+            lines = []
+            while True:
+                pos = crlf.tell()
+                line = crlf.readline()
+                if not line:
+                    break
+                crlf.seek(0, 1)
+                lines.append(line)
+            assert lines == expected
+        self.interpret(f, [])
+
+    def test_write(self):
+        data = "line1\r\nline2\rline3\r\n"
+        crlf = streamio.TextCRLFFilter(TReaderWriter(data))
+        def f():
+            line = crlf.readline()
+            assert line == 'line1\n'
+            line = crlf.read(6)
+            assert line == 'line2\r'
+            pos = crlf.tell()
+            crlf.write('line3\n')
+            crlf.seek(pos,0)
+            line = crlf.readline()
+            assert line == 'line3\n'
+            line = crlf.readline()
+            assert line == ''
+        self.interpret(f, [])
+
+class TestTextCRLFFilterLLInterp(BaseTestTextCRLFFilter, LLRtypeMixin):
+    pass
+        
+class TestTextCRLFFilterOOInterp(BaseTestTextCRLFFilter, OORtypeMixin):
+    pass
+        
 class TestMMapFile(BaseTestBufferingInputStreamTests):
     tfn = None
     fd = None



More information about the Pypy-commit mailing list