[pypy-svn] r79167 - in pypy/branch/fast-forward/pypy/module/_io: . test

afa at codespeak.net afa at codespeak.net
Wed Nov 17 00:12:17 CET 2010


Author: afa
Date: Wed Nov 17 00:12:14 2010
New Revision: 79167

Modified:
   pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py
   pypy/branch/fast-forward/pypy/module/_io/test/test_bufferedio.py
Log:
Implement BufferedRandom


Modified: pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py	Wed Nov 17 00:12:14 2010
@@ -9,7 +9,8 @@
 from pypy.rlib.rarithmetic import r_longlong
 from pypy.tool.sourcetools import func_renamer
 from pypy.module._io.interp_iobase import (
-    W_IOBase, convert_size, check_readable_w, check_writable_w)
+    W_IOBase, convert_size,
+    check_readable_w, check_writable_w, check_seekable_w)
 from pypy.module._io.interp_io import DEFAULT_BUFFER_SIZE, W_BlockingIOError
 from pypy.module.thread.os_lock import Lock
 
@@ -171,6 +172,10 @@
         return space.call_method(self.w_raw, "seekable")
 
     @unwrap_spec('self', ObjSpace)
+    def isatty_w(self, space):
+        return space.call_method(self.w_raw, "isatty")
+
+    @unwrap_spec('self', ObjSpace)
     def repr_w(self, space):
         typename = space.type(self).getname(space, '?')
         try:
@@ -358,19 +363,8 @@
 
             return space.call_method(self.w_raw, "truncate", w_size)
 
-class W_BufferedReader(BufferedMixin, W_BufferedIOBase):
-    @unwrap_spec('self', ObjSpace, W_Root, int)
-    def descr_init(self, space, w_raw, buffer_size=DEFAULT_BUFFER_SIZE):
-        self.state = STATE_ZERO
-        check_readable_w(space, w_raw)
-
-        self.w_raw = w_raw
-        self.buffer_size = buffer_size
-        self.readable = True
-
-        self._init(space)
-        self._reader_reset_buf()
-        self.state = STATE_OK
+    # ________________________________________________________________
+    # Read methods
 
     @unwrap_spec('self', ObjSpace, W_Root)
     def read_w(self, space, w_size=None):
@@ -590,49 +584,8 @@
             return res
         return None
 
-W_BufferedReader.typedef = TypeDef(
-    '_io.BufferedReader', W_BufferedIOBase.typedef,
-    __new__ = generic_new_descr(W_BufferedReader),
-    __init__  = interp2app(W_BufferedReader.descr_init),
-
-    read = interp2app(W_BufferedReader.read_w),
-    peek = interp2app(W_BufferedReader.peek_w),
-    read1 = interp2app(W_BufferedReader.read1_w),
-
-    # from the mixin class
-    __repr__ = interp2app(W_BufferedReader.repr_w),
-    readable = interp2app(W_BufferedReader.readable_w),
-    writable = interp2app(W_BufferedReader.writable_w),
-    seekable = interp2app(W_BufferedReader.seekable_w),
-    seek = interp2app(W_BufferedReader.seek_w),
-    tell = interp2app(W_BufferedReader.tell_w),
-    close = interp2app(W_BufferedReader.close_w),
-    flush = interp2app(W_BufferedReader.flush_w),
-    detach = interp2app(W_BufferedReader.detach_w),
-    truncate = interp2app(W_BufferedReader.truncate_w),
-    fileno = interp2app(W_BufferedReader.fileno_w),
-    closed = GetSetProperty(W_BufferedReader.closed_get_w),
-    name = GetSetProperty(W_BufferedReader.name_get_w),
-    mode = GetSetProperty(W_BufferedReader.mode_get_w),
-    )
-
-class W_BufferedWriter(BufferedMixin, W_BufferedIOBase):
-    @unwrap_spec('self', ObjSpace, W_Root, int, int)
-    def descr_init(self, space, w_raw, buffer_size=DEFAULT_BUFFER_SIZE,
-                   max_buffer_size=-234):
-        if max_buffer_size != -234:
-            self._deprecated_max_buffer_size(space)
-
-        self.state = STATE_ZERO
-        check_writable_w(space, w_raw)
-
-        self.w_raw = w_raw
-        self.buffer_size = buffer_size
-        self.writable = True
-
-        self._init(space)
-        self._writer_reset_buf()
-        self.state = STATE_OK
+    # ____________________________________________________
+    # Write methods
 
     def _adjust_position(self, new_pos):
         self.pos = new_pos
@@ -762,6 +715,66 @@
                 self._raw_seek(space, -self._raw_offset(), 1)
                 self._reader_reset_buf()
 
+
+class W_BufferedReader(BufferedMixin, W_BufferedIOBase):
+    @unwrap_spec('self', ObjSpace, W_Root, int)
+    def descr_init(self, space, w_raw, buffer_size=DEFAULT_BUFFER_SIZE):
+        self.state = STATE_ZERO
+        check_readable_w(space, w_raw)
+
+        self.w_raw = w_raw
+        self.buffer_size = buffer_size
+        self.readable = True
+
+        self._init(space)
+        self._reader_reset_buf()
+        self.state = STATE_OK
+
+W_BufferedReader.typedef = TypeDef(
+    '_io.BufferedReader', W_BufferedIOBase.typedef,
+    __new__ = generic_new_descr(W_BufferedReader),
+    __init__  = interp2app(W_BufferedReader.descr_init),
+
+    read = interp2app(W_BufferedReader.read_w),
+    peek = interp2app(W_BufferedReader.peek_w),
+    read1 = interp2app(W_BufferedReader.read1_w),
+
+    # from the mixin class
+    __repr__ = interp2app(W_BufferedReader.repr_w),
+    readable = interp2app(W_BufferedReader.readable_w),
+    writable = interp2app(W_BufferedReader.writable_w),
+    seekable = interp2app(W_BufferedReader.seekable_w),
+    seek = interp2app(W_BufferedReader.seek_w),
+    tell = interp2app(W_BufferedReader.tell_w),
+    close = interp2app(W_BufferedReader.close_w),
+    flush = interp2app(W_BufferedReader.flush_w),
+    detach = interp2app(W_BufferedReader.detach_w),
+    truncate = interp2app(W_BufferedReader.truncate_w),
+    fileno = interp2app(W_BufferedReader.fileno_w),
+    isatty = interp2app(W_BufferedReader.isatty_w),
+    closed = GetSetProperty(W_BufferedReader.closed_get_w),
+    name = GetSetProperty(W_BufferedReader.name_get_w),
+    mode = GetSetProperty(W_BufferedReader.mode_get_w),
+    )
+
+class W_BufferedWriter(BufferedMixin, W_BufferedIOBase):
+    @unwrap_spec('self', ObjSpace, W_Root, int, int)
+    def descr_init(self, space, w_raw, buffer_size=DEFAULT_BUFFER_SIZE,
+                   max_buffer_size=-234):
+        if max_buffer_size != -234:
+            self._deprecated_max_buffer_size(space)
+
+        self.state = STATE_ZERO
+        check_writable_w(space, w_raw)
+
+        self.w_raw = w_raw
+        self.buffer_size = buffer_size
+        self.writable = True
+
+        self._init(space)
+        self._writer_reset_buf()
+        self.state = STATE_OK
+
 W_BufferedWriter.typedef = TypeDef(
     '_io.BufferedWriter', W_BufferedIOBase.typedef,
     __new__ = generic_new_descr(W_BufferedWriter),
@@ -779,11 +792,12 @@
     tell = interp2app(W_BufferedWriter.tell_w),
     close = interp2app(W_BufferedWriter.close_w),
     fileno = interp2app(W_BufferedWriter.fileno_w),
+    isatty = interp2app(W_BufferedWriter.fileno_w),
     detach = interp2app(W_BufferedWriter.detach_w),
     truncate = interp2app(W_BufferedWriter.truncate_w),
     closed = GetSetProperty(W_BufferedWriter.closed_get_w),
-    name = GetSetProperty(W_BufferedReader.name_get_w),
-    mode = GetSetProperty(W_BufferedReader.mode_get_w),
+    name = GetSetProperty(W_BufferedWriter.name_get_w),
+    mode = GetSetProperty(W_BufferedWriter.mode_get_w),
     )
 
 def _forward_call(space, w_obj, method, __args__):
@@ -864,10 +878,55 @@
     **methods
     )
 
-class W_BufferedRandom(W_BufferedIOBase):
-    pass
+class W_BufferedRandom(BufferedMixin, W_BufferedIOBase):
+    @unwrap_spec('self', ObjSpace, W_Root, int, int)
+    def descr_init(self, space, w_raw, buffer_size=DEFAULT_BUFFER_SIZE,
+                   max_buffer_size = -234):
+        if max_buffer_size != -234:
+            self._deprecated_buffer_size(space)
+
+        self.state = STATE_ZERO
+
+        check_readable_w(space, w_raw)
+        check_writable_w(space, w_raw)
+        check_seekable_w(space, w_raw)
+
+        self.w_raw = w_raw
+        self.buffer_size = buffer_size
+        self.readable = self.writable = True
+
+        self._init(space)
+        self._reader_reset_buf()
+        self._writer_reset_buf()
+        self.pos = 0
+        self.state = STATE_OK
+
 W_BufferedRandom.typedef = TypeDef(
     '_io.BufferedRandom', W_BufferedIOBase.typedef,
     __new__ = generic_new_descr(W_BufferedRandom),
+    __init__ = interp2app(W_BufferedRandom.descr_init),
+
+    read = interp2app(W_BufferedRandom.read_w),
+    peek = interp2app(W_BufferedRandom.peek_w),
+    read1 = interp2app(W_BufferedRandom.read1_w),
+
+    write = interp2app(W_BufferedRandom.write_w),
+    flush = interp2app(W_BufferedRandom.flush_w),
+
+    # from the mixin class
+    __repr__ = interp2app(W_BufferedRandom.repr_w),
+    readable = interp2app(W_BufferedRandom.readable_w),
+    writable = interp2app(W_BufferedRandom.writable_w),
+    seekable = interp2app(W_BufferedRandom.seekable_w),
+    seek = interp2app(W_BufferedRandom.seek_w),
+    tell = interp2app(W_BufferedRandom.tell_w),
+    close = interp2app(W_BufferedRandom.close_w),
+    detach = interp2app(W_BufferedRandom.detach_w),
+    truncate = interp2app(W_BufferedRandom.truncate_w),
+    fileno = interp2app(W_BufferedRandom.fileno_w),
+    isatty = interp2app(W_BufferedRandom.isatty_w),
+    closed = GetSetProperty(W_BufferedRandom.closed_get_w),
+    name = GetSetProperty(W_BufferedRandom.name_get_w),
+    mode = GetSetProperty(W_BufferedRandom.mode_get_w),
     )
 

Modified: pypy/branch/fast-forward/pypy/module/_io/test/test_bufferedio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/test/test_bufferedio.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_io/test/test_bufferedio.py	Wed Nov 17 00:12:14 2010
@@ -299,3 +299,19 @@
                 return False
 
         raises(IOError, _io.BufferedRWPair, _io.BytesIO(), NotWritable())
+
+class AppTestBufferedRandom:
+    def setup_class(cls):
+        cls.space = gettestobjspace(usemodules=['_io'])
+        tmpfile = udir.join('tmpfile')
+        tmpfile.write("a\nb\nc", mode='wb')
+        cls.w_tmpfile = cls.space.wrap(str(tmpfile))
+
+    def test_simple_read(self):
+        import _io
+        raw = _io.FileIO(self.tmpfile, 'rb+')
+        f = _io.BufferedRandom(raw)
+        assert f.read(3) == 'a\nb'
+        f.write('xxxx')
+        f.seek(0)
+        assert f.read() == 'a\nbxxxx'



More information about the Pypy-commit mailing list