[pypy-svn] r78300 - in pypy/branch/fast-forward/pypy/module/_io: . test

afa at codespeak.net afa at codespeak.net
Tue Oct 26 16:49:50 CEST 2010


Author: afa
Date: Tue Oct 26 16:49:43 2010
New Revision: 78300

Modified:
   pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py
   pypy/branch/fast-forward/pypy/module/_io/test/test_bufferedio.py
Log:
Implement BufferedWriter: write(), flush().


Modified: pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py	Tue Oct 26 16:49:43 2010
@@ -8,7 +8,7 @@
 from pypy.rlib.rstring import StringBuilder
 from pypy.rlib.rarithmetic import r_longlong
 from pypy.module._io.interp_iobase import W_IOBase, convert_size
-from pypy.module._io.interp_io import DEFAULT_BUFFER_SIZE
+from pypy.module._io.interp_io import DEFAULT_BUFFER_SIZE, W_BlockingIOError
 from pypy.module.thread.os_lock import Lock
 
 class BlockingIOError(Exception):
@@ -17,14 +17,53 @@
 class W_BufferedIOBase(W_IOBase):
     def __init__(self, space):
         W_IOBase.__init__(self, space)
+
         self.buffer = lltype.nullptr(rffi.CCHARP.TO)
-        self.pos = 0     # Current logical position in the buffer
-        self.raw_pos = 0 # Position of the raw stream in the buffer.
+        self.pos = 0        # Current logical position in the buffer
+        self.raw_pos = 0    # Position of the raw stream in the buffer.
+
+        self.read_end = -1  # Just after the last buffered byte in the buffer,
+                            # or -1 if the buffer isn't ready for reading
+
+        self.write_pos = 0  # Just after the last byte actually written
+        self.write_end = -1 # Just after the last byte waiting to be written,
+                            # or -1 if the buffer isn't ready for writing.
+
         self.lock = None
 
         self.readable = False
         self.writable = False
 
+    def _unsupportedoperation(self, space, message):
+        w_exc = space.getattr(space.getbuiltinmodule('_io'),
+                              space.wrap('UnsupportedOperation'))
+        raise OperationError(w_exc, space.wrap(message))
+
+    @unwrap_spec('self', ObjSpace, W_Root)
+    def read_w(self, space, w_size=None):
+        self._unsupportedoperation(space, "read")
+
+    @unwrap_spec('self', ObjSpace, W_Root)
+    def write_w(self, space, w_size=None):
+        self._unsupportedoperation(space, "write")
+
+    def _reader_reset_buf(self):
+        self.read_end = -1
+
+    def _writer_reset_buf(self):
+        self.write_pos = 0
+        self.write_end = -1
+
+W_BufferedIOBase.typedef = TypeDef(
+    '_BufferedIOBase', W_IOBase.typedef,
+    __new__ = generic_new_descr(W_BufferedIOBase),
+    read = interp2app(W_BufferedIOBase.read_w),
+    write = interp2app(W_BufferedIOBase.write_w),
+    )
+
+class BufferedMixin:
+    _mixin_ = True
+
     def _init(self, space):
         if self.buffer_size <= 0:
             raise OperationError(space.w_ValueError, space.wrap(
@@ -61,21 +100,12 @@
         return 0
 
     def _raw_offset(self):
-        if self.raw_pos and (
+        if self.raw_pos >= 0 and (
             (self.readable and self.read_end != -1) or
             (self.writable and self.write_end != -1)):
             return self.raw_pos - self.pos
         return 0
 
-    def _unsupportedoperation(self, space, message):
-        w_exc = space.getattr(space.getbuiltinmodule('_io'),
-                              space.wrap('UnsupportedOperation'))
-        raise OperationError(w_exc, space.wrap(message))
-
-    @unwrap_spec('self', ObjSpace, W_Root)
-    def read_w(self, space, w_size=None):
-        self._unsupportedoperation(space, "read")
-
     @unwrap_spec('self', ObjSpace, r_longlong, int)
     def seek_w(self, space, pos, whence=0):
         if whence not in (0, 1, 2):
@@ -100,7 +130,7 @@
         # Fallback: invoke raw seek() method and clear buffer
         with self.lock:
             if self.writable:
-                self._writer_flush_unlocked(restore_pos=False)
+                self._writer_flush_unlocked(space)
                 self._writer_reset_buf()
 
             if whence == 1:
@@ -120,14 +150,80 @@
                 "Raw stream returned invalid position"))
         return pos
 
-W_BufferedIOBase.typedef = TypeDef(
-    '_BufferedIOBase', W_IOBase.typedef,
-    __new__ = generic_new_descr(W_BufferedIOBase),
-    read = interp2app(W_BufferedIOBase.read_w),
-    seek = interp2app(W_BufferedIOBase.seek_w),
-    )
+    def _closed(self, space):
+        return self.raw._closed(space)
+
+    @unwrap_spec('self', ObjSpace)
+    def close_w(self, space):
+        with self.lock:
+            if self._closed(space):
+                return
+        space.call_method(self, "flush")
+        with self.lock:
+            space.call_method(self.raw, "close")
+
+    @unwrap_spec('self', ObjSpace)
+    def flush_w(self, space):
+        return space.call_method(self.raw, "flush")
+
+    def _writer_flush_unlocked(self, space, restore_pos=False):
+        if self.write_end == -1 or self.write_pos == self.write_end:
+            return
+        # First, rewind
+        rewind = self._raw_offset() + (self.pos - self.write_pos)
+        if rewind != 0:
+            self._raw_seek(space, -rewind, 1)
+            self.raw_pos -= rewind
 
-class W_BufferedReader(W_BufferedIOBase):
+        written = 0
+        while self.write_pos < self.write_end:
+            try:
+                n = self._raw_write(space, self.write_pos, self.write_end)
+            except OperationError, e:
+                if not e.match(space, space.gettypeobject(
+                    W_BlockingIOError.typedef)):
+                    raise
+                self.write_pos += e.written
+                self.raw_pos = self.write_pos
+                written += e.written
+                # re-raise the error
+                e.written = written
+                raise
+            self.write_pos += n
+            self.raw_pos = self.write_pos
+            written += n
+            # Partial writes can return successfully when interrupted by a
+            # signal (see write(2)).  We must run signal handlers before
+            # blocking another time, possibly indefinitely.
+            # XXX PyErr_CheckSignals()
+
+        if restore_pos:
+            forward = rewind - written
+            if forward:
+                self._raw_seek(space, forward, 1)
+                self.raw_pos += forward
+
+        self._writer_reset_buf()
+
+    def _write(self, space, data):
+        w_data = space.wrap(data)
+        w_written = space.call_method(self.raw, "write", w_data)
+        written = space.getindex_w(w_written, space.w_IOError)
+        if not 0 <= written <= len(data):
+            raise OperationError(space.w_IOError, space.wrap(
+                "raw write() returned invalid length"))
+        if self.abs_pos != -1:
+            self.abs_pos += written
+        return written
+
+    def _raw_write(self, space, start, end):
+        # XXX inefficient
+        l = []
+        for i in range(start, end):
+            l.append(self.buffer[i])
+        return self._write(space, ''.join(l))
+
+class W_BufferedReader(W_BufferedIOBase, BufferedMixin):
     def __init__(self, space):
         W_BufferedIOBase.__init__(self, space)
         self.ok = False
@@ -145,12 +241,6 @@
         self._init(space)
         self._reader_reset_buf()
 
-    def _reader_reset_buf(self):
-        self.read_end = -1
-
-    def _closed(self, space):
-        return self.raw._closed(space)
-
     @unwrap_spec('self', ObjSpace, W_Root)
     def read_w(self, space, w_size=None):
         self._check_closed(space, "read of closed file")
@@ -180,7 +270,7 @@
         self._reader_reset_buf()
         # We're going past the buffer's bounds, flush it
         if self.writable:
-            self._writer_flush_unlocked(restore_pos=True)
+            self._writer_flush_unlocked(space, restore_pos=True)
 
         while True:
             # Read until EOF or until read() would block
@@ -239,7 +329,7 @@
         # XXX potential bug in CPython? The following is not enabled.
         # We're going past the buffer's bounds, flush it
         ## if self.writable:
-        ##     self._writer_flush_unlocked(restore_pos=True)
+        ##     self._writer_flush_unlocked(space, restore_pos=True)
 
         # Read whole blocks, and don't buffer them
         while remaining > 0:
@@ -302,13 +392,159 @@
     __init__  = interp2app(W_BufferedReader.descr_init),
 
     read = interp2app(W_BufferedReader.read_w),
+
+    # from the mixin class
+    seek = interp2app(W_BufferedReader.seek_w),
+    close = interp2app(W_BufferedReader.close_w),
+    flush = interp2app(W_BufferedReader.flush_w),
     )
 
-class W_BufferedWriter(W_BufferedIOBase):
-    pass
+class W_BufferedWriter(W_BufferedIOBase, BufferedMixin):
+    @unwrap_spec('self', ObjSpace, W_Root, int)
+    def descr_init(self, space, w_raw, buffer_size=DEFAULT_BUFFER_SIZE):
+        raw = space.interp_w(W_IOBase, w_raw)
+        raw.check_writable_w(space)
+
+        self.raw = raw
+        self.buffer_size = buffer_size
+        self.writable = True
+
+        self._init(space)
+        self._writer_reset_buf()
+
+    def _adjust_position(self, new_pos):
+        self.pos = new_pos
+        if self.readable and self.read_end != -1 and self.read_end < new_pos:
+            self.read_end = self.pos
+
+    @unwrap_spec('self', ObjSpace, W_Root)
+    def write_w(self, space, w_data):
+        self._check_closed(space, "write to closed file")
+        data = space.str_w(w_data)
+        size = len(data)
+
+        with self.lock:
+
+            if (not (self.readable and self.read_end == -1) and
+                not (self.writable and self.write_end == -1)):
+                self.pos = 0
+                self.raw_pos = 0
+            available = self.buffer_size - self.pos
+            # Fast path: the data to write can be fully buffered
+            if size <= available:
+                for i in range(size):
+                    self.buffer[self.pos + i] = data[i]
+                if self.write_end == -1:
+                    self.write_pos = self.pos
+                self._adjust_position(self.pos + size)
+                if self.pos > self.write_end:
+                    self.write_end = self.pos
+                return space.wrap(size)
+
+            # First write the current buffer
+            try:
+                self._writer_flush_unlocked(space)
+            except OperationError, e:
+                if not e.match(space, space.gettypeobject(
+                    W_BlockingIOError.typedef)):
+                    raise
+                if self.readable:
+                    self._reader_reset_buf()
+                # Make some place by shifting the buffer
+                for i in range(self.write_pos, self.write_end):
+                    self.buffer[i - self.write_pos] = self.buffer[i]
+                self.write_end -= self.write_pos
+                self.raw_pos -= self.write_pos
+                self.pos -= self.write_pos
+                self.write_pos = 0
+                available = self.buffer_size - self.write_end
+                if size <= available:
+                    # Everything can be buffered
+                    for i in range(size):
+                        self.buffer[self.write_end + i] = data[i]
+                    self.write_end += size
+                    return space.wrap(size)
+                # Buffer as much as possible
+                for i in range(available):
+                    self.buffer[self.write_end + i] = data[i]
+                    self.write_end += available
+                # Raise previous exception
+                e.written = available
+                raise
+
+            # Adjust the raw stream position if it is away from the logical
+            # stream position. This happens if the read buffer has been filled
+            # but not modified (and therefore _bufferedwriter_flush_unlocked()
+            # didn't rewind the raw stream by itself).
+            offset = self._raw_offset()
+            if offset:
+                self._raw_seek(space, -offset, 1)
+                self.raw_pos -= offset
+
+            # Then write buf itself. At this point the buffer has been emptied
+            remaining = size
+            written = 0
+            while remaining > self.buffer_size:
+                try:
+                    n = self._write(space, data[written:size - written])
+                except OperationError, e:
+                    if not e.match(space, space.gettypeobject(
+                        W_BlockingIOError.typedef)):
+                        raise
+                    written += e.written
+                    remaining -= e.written
+                    if remaining > self.buffer_size:
+                        # Can't buffer everything, still buffer as much as
+                        # possible
+                        for i in range(self.buffer_size):
+                            self.buffer[i] = data[written + i]
+                        self.raw_pos = 0
+                        self._adjust_position(self.buffer_size)
+                        self.write_end = self.buffer_size
+                        e.written = written + self.buffer_size
+                        raise
+                    break
+                written += n
+                remaining -= n
+                # Partial writes can return successfully when interrupted by a
+                # signal (see write(2)).  We must run signal handlers before
+                # blocking another time, possibly indefinitely.
+                # XXX PyErr_CheckSignals()
+
+            if self.readable:
+                self._reader_reset_buf()
+            if remaining > 0:
+                for i in range(remaining):
+                    self.buffer[i] = data[written + i]
+                written += remaining
+            self.write_pos = 0
+            self.write_end = remaining
+            self._adjust_position(remaining)
+            self.raw_pos = 0
+        return space.wrap(written)
+
+    @unwrap_spec('self', ObjSpace)
+    def flush_w(self, space):
+        self._check_closed(space, "flush of closed file")
+        with self.lock:
+            self._writer_flush_unlocked(space)
+            if self.readable:
+                # Rewind the raw stream so that its position corresponds to
+                # the current logical position.
+                self._raw_seek(space, -self._raw_offset(), 1)
+                self._reader_reset_buf()
+
 W_BufferedWriter.typedef = TypeDef(
     'BufferedWriter', W_BufferedIOBase.typedef,
     __new__ = generic_new_descr(W_BufferedWriter),
+    __init__  = interp2app(W_BufferedWriter.descr_init),
+
+    write = interp2app(W_BufferedWriter.write_w),
+    flush = interp2app(W_BufferedWriter.flush_w),
+
+    # from the mixin class
+    seek = interp2app(W_BufferedWriter.seek_w),
+    close = interp2app(W_BufferedWriter.close_w),
     )
 
 class W_BufferedRWPair(W_BufferedIOBase):

Modified: pypy/branch/fast-forward/pypy/module/_io/test/test_bufferedio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/test/test_bufferedio.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_io/test/test_bufferedio.py	Tue Oct 26 16:49:43 2010
@@ -1,7 +1,8 @@
 from pypy.conftest import gettestobjspace
+from pypy.interpreter.gateway import interp2app
 from pypy.tool.udir import udir
 
-class AppTestBufferedIO:
+class AppTestBufferedReader:
     def setup_class(cls):
         cls.space = gettestobjspace(usemodules=['_io'])
         tmpfile = udir.join('tmpfile')
@@ -15,7 +16,7 @@
         assert f.read() == "a\nb\nc"
         f.close()
         #
-        raw.seek(0)
+        raw = _io.FileIO(self.tmpfile)
         f = _io.BufferedReader(raw)
         r = f.read(4)
         assert r == "a\nb\n"
@@ -31,3 +32,28 @@
         f.seek(-2, 2)
         assert f.read() == "\nc"
         f.close()
+
+class AppTestBufferedWriter:
+    def setup_class(cls):
+        cls.space = gettestobjspace(usemodules=['_io'])
+        tmpfile = udir.join('tmpfile')
+        cls.w_tmpfile = cls.space.wrap(str(tmpfile))
+        def readfile(space):
+            return space.wrap(tmpfile.read())
+        cls.w_readfile = cls.space.wrap(interp2app(readfile))
+
+    def test_write(self):
+        import _io
+        raw = _io.FileIO(self.tmpfile, 'w')
+        f = _io.BufferedWriter(raw)
+        f.write("abcd")
+        f.close()
+        assert self.readfile() == "abcd"
+
+    def test_largewrite(self):
+        import _io
+        raw = _io.FileIO(self.tmpfile, 'w')
+        f = _io.BufferedWriter(raw)
+        f.write("abcd" * 5000)
+        f.close()
+        assert self.readfile() == "abcd" * 5000



More information about the Pypy-commit mailing list