[pypy-svn] r78300 - in pypy/branch/fast-forward/pypy/module/_io: . test
afa at codespeak.net
afa at codespeak.net
Tue Oct 26 16:49:50 CEST 2010
Author: afa
Date: Tue Oct 26 16:49:43 2010
New Revision: 78300
Modified:
pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py
pypy/branch/fast-forward/pypy/module/_io/test/test_bufferedio.py
Log:
Implement BufferedWriter: write(), flush().
Modified: pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py (original)
+++ pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py Tue Oct 26 16:49:43 2010
@@ -8,7 +8,7 @@
from pypy.rlib.rstring import StringBuilder
from pypy.rlib.rarithmetic import r_longlong
from pypy.module._io.interp_iobase import W_IOBase, convert_size
-from pypy.module._io.interp_io import DEFAULT_BUFFER_SIZE
+from pypy.module._io.interp_io import DEFAULT_BUFFER_SIZE, W_BlockingIOError
from pypy.module.thread.os_lock import Lock
class BlockingIOError(Exception):
@@ -17,14 +17,53 @@
class W_BufferedIOBase(W_IOBase):
def __init__(self, space):
W_IOBase.__init__(self, space)
+
self.buffer = lltype.nullptr(rffi.CCHARP.TO)
- self.pos = 0 # Current logical position in the buffer
- self.raw_pos = 0 # Position of the raw stream in the buffer.
+ self.pos = 0 # Current logical position in the buffer
+ self.raw_pos = 0 # Position of the raw stream in the buffer.
+
+ self.read_end = -1 # Just after the last buffered byte in the buffer,
+ # or -1 if the buffer isn't ready for reading
+
+ self.write_pos = 0 # Just after the last byte actually written
+ self.write_end = -1 # Just after the last byte waiting to be written,
+ # or -1 if the buffer isn't ready for writing.
+
self.lock = None
self.readable = False
self.writable = False
+ def _unsupportedoperation(self, space, message):
+ w_exc = space.getattr(space.getbuiltinmodule('_io'),
+ space.wrap('UnsupportedOperation'))
+ raise OperationError(w_exc, space.wrap(message))
+
+ @unwrap_spec('self', ObjSpace, W_Root)
+ def read_w(self, space, w_size=None):
+ self._unsupportedoperation(space, "read")
+
+ @unwrap_spec('self', ObjSpace, W_Root)
+ def write_w(self, space, w_size=None):
+ self._unsupportedoperation(space, "write")
+
+ def _reader_reset_buf(self):
+ self.read_end = -1
+
+ def _writer_reset_buf(self):
+ self.write_pos = 0
+ self.write_end = -1
+
+W_BufferedIOBase.typedef = TypeDef(
+ '_BufferedIOBase', W_IOBase.typedef,
+ __new__ = generic_new_descr(W_BufferedIOBase),
+ read = interp2app(W_BufferedIOBase.read_w),
+ write = interp2app(W_BufferedIOBase.write_w),
+ )
+
+class BufferedMixin:
+ _mixin_ = True
+
def _init(self, space):
if self.buffer_size <= 0:
raise OperationError(space.w_ValueError, space.wrap(
@@ -61,21 +100,12 @@
return 0
def _raw_offset(self):
- if self.raw_pos and (
+ if self.raw_pos >= 0 and (
(self.readable and self.read_end != -1) or
(self.writable and self.write_end != -1)):
return self.raw_pos - self.pos
return 0
- def _unsupportedoperation(self, space, message):
- w_exc = space.getattr(space.getbuiltinmodule('_io'),
- space.wrap('UnsupportedOperation'))
- raise OperationError(w_exc, space.wrap(message))
-
- @unwrap_spec('self', ObjSpace, W_Root)
- def read_w(self, space, w_size=None):
- self._unsupportedoperation(space, "read")
-
@unwrap_spec('self', ObjSpace, r_longlong, int)
def seek_w(self, space, pos, whence=0):
if whence not in (0, 1, 2):
@@ -100,7 +130,7 @@
# Fallback: invoke raw seek() method and clear buffer
with self.lock:
if self.writable:
- self._writer_flush_unlocked(restore_pos=False)
+ self._writer_flush_unlocked(space)
self._writer_reset_buf()
if whence == 1:
@@ -120,14 +150,80 @@
"Raw stream returned invalid position"))
return pos
-W_BufferedIOBase.typedef = TypeDef(
- '_BufferedIOBase', W_IOBase.typedef,
- __new__ = generic_new_descr(W_BufferedIOBase),
- read = interp2app(W_BufferedIOBase.read_w),
- seek = interp2app(W_BufferedIOBase.seek_w),
- )
+ def _closed(self, space):
+ return self.raw._closed(space)
+
+ @unwrap_spec('self', ObjSpace)
+ def close_w(self, space):
+ with self.lock:
+ if self._closed(space):
+ return
+ space.call_method(self, "flush")
+ with self.lock:
+ space.call_method(self.raw, "close")
+
+ @unwrap_spec('self', ObjSpace)
+ def flush_w(self, space):
+ return space.call_method(self.raw, "flush")
+
+ def _writer_flush_unlocked(self, space, restore_pos=False):
+ if self.write_end == -1 or self.write_pos == self.write_end:
+ return
+ # First, rewind
+ rewind = self._raw_offset() + (self.pos - self.write_pos)
+ if rewind != 0:
+ self._raw_seek(space, -rewind, 1)
+ self.raw_pos -= rewind
-class W_BufferedReader(W_BufferedIOBase):
+ written = 0
+ while self.write_pos < self.write_end:
+ try:
+ n = self._raw_write(space, self.write_pos, self.write_end)
+ except OperationError, e:
+ if not e.match(space, space.gettypeobject(
+ W_BlockingIOError.typedef)):
+ raise
+ self.write_pos += e.written
+ self.raw_pos = self.write_pos
+ written += e.written
+ # re-raise the error
+ e.written = written
+ raise
+ self.write_pos += n
+ self.raw_pos = self.write_pos
+ written += n
+ # Partial writes can return successfully when interrupted by a
+ # signal (see write(2)). We must run signal handlers before
+ # blocking another time, possibly indefinitely.
+ # XXX PyErr_CheckSignals()
+
+ if restore_pos:
+ forward = rewind - written
+ if forward:
+ self._raw_seek(space, forward, 1)
+ self.raw_pos += forward
+
+ self._writer_reset_buf()
+
+ def _write(self, space, data):
+ w_data = space.wrap(data)
+ w_written = space.call_method(self.raw, "write", w_data)
+ written = space.getindex_w(w_written, space.w_IOError)
+ if not 0 <= written <= len(data):
+ raise OperationError(space.w_IOError, space.wrap(
+ "raw write() returned invalid length"))
+ if self.abs_pos != -1:
+ self.abs_pos += written
+ return written
+
+ def _raw_write(self, space, start, end):
+ # XXX inefficient
+ l = []
+ for i in range(start, end):
+ l.append(self.buffer[i])
+ return self._write(space, ''.join(l))
+
+class W_BufferedReader(W_BufferedIOBase, BufferedMixin):
def __init__(self, space):
W_BufferedIOBase.__init__(self, space)
self.ok = False
@@ -145,12 +241,6 @@
self._init(space)
self._reader_reset_buf()
- def _reader_reset_buf(self):
- self.read_end = -1
-
- def _closed(self, space):
- return self.raw._closed(space)
-
@unwrap_spec('self', ObjSpace, W_Root)
def read_w(self, space, w_size=None):
self._check_closed(space, "read of closed file")
@@ -180,7 +270,7 @@
self._reader_reset_buf()
# We're going past the buffer's bounds, flush it
if self.writable:
- self._writer_flush_unlocked(restore_pos=True)
+ self._writer_flush_unlocked(space, restore_pos=True)
while True:
# Read until EOF or until read() would block
@@ -239,7 +329,7 @@
# XXX potential bug in CPython? The following is not enabled.
# We're going past the buffer's bounds, flush it
## if self.writable:
- ## self._writer_flush_unlocked(restore_pos=True)
+ ## self._writer_flush_unlocked(space, restore_pos=True)
# Read whole blocks, and don't buffer them
while remaining > 0:
@@ -302,13 +392,159 @@
__init__ = interp2app(W_BufferedReader.descr_init),
read = interp2app(W_BufferedReader.read_w),
+
+ # from the mixin class
+ seek = interp2app(W_BufferedReader.seek_w),
+ close = interp2app(W_BufferedReader.close_w),
+ flush = interp2app(W_BufferedReader.flush_w),
)
-class W_BufferedWriter(W_BufferedIOBase):
- pass
+class W_BufferedWriter(W_BufferedIOBase, BufferedMixin):
+ @unwrap_spec('self', ObjSpace, W_Root, int)
+ def descr_init(self, space, w_raw, buffer_size=DEFAULT_BUFFER_SIZE):
+ raw = space.interp_w(W_IOBase, w_raw)
+ raw.check_writable_w(space)
+
+ self.raw = raw
+ self.buffer_size = buffer_size
+ self.writable = True
+
+ self._init(space)
+ self._writer_reset_buf()
+
+ def _adjust_position(self, new_pos):
+ self.pos = new_pos
+ if self.readable and self.read_end != -1 and self.read_end < new_pos:
+ self.read_end = self.pos
+
+ @unwrap_spec('self', ObjSpace, W_Root)
+ def write_w(self, space, w_data):
+ self._check_closed(space, "write to closed file")
+ data = space.str_w(w_data)
+ size = len(data)
+
+ with self.lock:
+
+ if (not (self.readable and self.read_end == -1) and
+ not (self.writable and self.write_end == -1)):
+ self.pos = 0
+ self.raw_pos = 0
+ available = self.buffer_size - self.pos
+ # Fast path: the data to write can be fully buffered
+ if size <= available:
+ for i in range(size):
+ self.buffer[self.pos + i] = data[i]
+ if self.write_end == -1:
+ self.write_pos = self.pos
+ self._adjust_position(self.pos + size)
+ if self.pos > self.write_end:
+ self.write_end = self.pos
+ return space.wrap(size)
+
+ # First write the current buffer
+ try:
+ self._writer_flush_unlocked(space)
+ except OperationError, e:
+ if not e.match(space, space.gettypeobject(
+ W_BlockingIOError.typedef)):
+ raise
+ if self.readable:
+ self._reader_reset_buf()
+ # Make some place by shifting the buffer
+ for i in range(self.write_pos, self.write_end):
+ self.buffer[i - self.write_pos] = self.buffer[i]
+ self.write_end -= self.write_pos
+ self.raw_pos -= self.write_pos
+ self.pos -= self.write_pos
+ self.write_pos = 0
+ available = self.buffer_size - self.write_end
+ if size <= available:
+ # Everything can be buffered
+ for i in range(size):
+ self.buffer[self.write_end + i] = data[i]
+ self.write_end += size
+ return space.wrap(size)
+ # Buffer as much as possible
+ for i in range(available):
+ self.buffer[self.write_end + i] = data[i]
+ self.write_end += available
+ # Raise previous exception
+ e.written = available
+ raise
+
+ # Adjust the raw stream position if it is away from the logical
+ # stream position. This happens if the read buffer has been filled
+ # but not modified (and therefore _bufferedwriter_flush_unlocked()
+ # didn't rewind the raw stream by itself).
+ offset = self._raw_offset()
+ if offset:
+ self._raw_seek(space, -offset, 1)
+ self.raw_pos -= offset
+
+ # Then write buf itself. At this point the buffer has been emptied
+ remaining = size
+ written = 0
+ while remaining > self.buffer_size:
+ try:
+ n = self._write(space, data[written:size - written])
+ except OperationError, e:
+ if not e.match(space, space.gettypeobject(
+ W_BlockingIOError.typedef)):
+ raise
+ written += e.written
+ remaining -= e.written
+ if remaining > self.buffer_size:
+ # Can't buffer everything, still buffer as much as
+ # possible
+ for i in range(self.buffer_size):
+ self.buffer[i] = data[written + i]
+ self.raw_pos = 0
+ self._adjust_position(self.buffer_size)
+ self.write_end = self.buffer_size
+ e.written = written + self.buffer_size
+ raise
+ break
+ written += n
+ remaining -= n
+ # Partial writes can return successfully when interrupted by a
+ # signal (see write(2)). We must run signal handlers before
+ # blocking another time, possibly indefinitely.
+ # XXX PyErr_CheckSignals()
+
+ if self.readable:
+ self._reader_reset_buf()
+ if remaining > 0:
+ for i in range(remaining):
+ self.buffer[i] = data[written + i]
+ written += remaining
+ self.write_pos = 0
+ self.write_end = remaining
+ self._adjust_position(remaining)
+ self.raw_pos = 0
+ return space.wrap(written)
+
+ @unwrap_spec('self', ObjSpace)
+ def flush_w(self, space):
+ self._check_closed(space, "flush of closed file")
+ with self.lock:
+ self._writer_flush_unlocked(space)
+ if self.readable:
+ # Rewind the raw stream so that its position corresponds to
+ # the current logical position.
+ self._raw_seek(space, -self._raw_offset(), 1)
+ self._reader_reset_buf()
+
W_BufferedWriter.typedef = TypeDef(
'BufferedWriter', W_BufferedIOBase.typedef,
__new__ = generic_new_descr(W_BufferedWriter),
+ __init__ = interp2app(W_BufferedWriter.descr_init),
+
+ write = interp2app(W_BufferedWriter.write_w),
+ flush = interp2app(W_BufferedWriter.flush_w),
+
+ # from the mixin class
+ seek = interp2app(W_BufferedWriter.seek_w),
+ close = interp2app(W_BufferedWriter.close_w),
)
class W_BufferedRWPair(W_BufferedIOBase):
Modified: pypy/branch/fast-forward/pypy/module/_io/test/test_bufferedio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/test/test_bufferedio.py (original)
+++ pypy/branch/fast-forward/pypy/module/_io/test/test_bufferedio.py Tue Oct 26 16:49:43 2010
@@ -1,7 +1,8 @@
from pypy.conftest import gettestobjspace
+from pypy.interpreter.gateway import interp2app
from pypy.tool.udir import udir
-class AppTestBufferedIO:
+class AppTestBufferedReader:
def setup_class(cls):
cls.space = gettestobjspace(usemodules=['_io'])
tmpfile = udir.join('tmpfile')
@@ -15,7 +16,7 @@
assert f.read() == "a\nb\nc"
f.close()
#
- raw.seek(0)
+ raw = _io.FileIO(self.tmpfile)
f = _io.BufferedReader(raw)
r = f.read(4)
assert r == "a\nb\n"
@@ -31,3 +32,28 @@
f.seek(-2, 2)
assert f.read() == "\nc"
f.close()
+
+class AppTestBufferedWriter:
+ def setup_class(cls):
+ cls.space = gettestobjspace(usemodules=['_io'])
+ tmpfile = udir.join('tmpfile')
+ cls.w_tmpfile = cls.space.wrap(str(tmpfile))
+ def readfile(space):
+ return space.wrap(tmpfile.read())
+ cls.w_readfile = cls.space.wrap(interp2app(readfile))
+
+ def test_write(self):
+ import _io
+ raw = _io.FileIO(self.tmpfile, 'w')
+ f = _io.BufferedWriter(raw)
+ f.write("abcd")
+ f.close()
+ assert self.readfile() == "abcd"
+
+ def test_largewrite(self):
+ import _io
+ raw = _io.FileIO(self.tmpfile, 'w')
+ f = _io.BufferedWriter(raw)
+ f.write("abcd" * 5000)
+ f.close()
+ assert self.readfile() == "abcd" * 5000
More information about the Pypy-commit
mailing list