[pypy-svn] r53758 - in pypy/dist/pypy: module/cStringIO module/cStringIO/test rlib rlib/test

arigo at codespeak.net arigo at codespeak.net
Mon Apr 14 16:07:14 CEST 2008


Author: arigo
Date: Mon Apr 14 16:07:14 2008
New Revision: 53758

Modified:
   pypy/dist/pypy/module/cStringIO/interp_stringio.py
   pypy/dist/pypy/module/cStringIO/test/test_interp_stringio.py
   pypy/dist/pypy/rlib/rStringIO.py
   pypy/dist/pypy/rlib/test/test_rStringIO.py
Log:
Roughly finished the methods of StringO.


Modified: pypy/dist/pypy/module/cStringIO/interp_stringio.py
==============================================================================
--- pypy/dist/pypy/module/cStringIO/interp_stringio.py	(original)
+++ pypy/dist/pypy/module/cStringIO/interp_stringio.py	Mon Apr 14 16:07:14 2008
@@ -1,7 +1,8 @@
+import sys
 from pypy.interpreter.error import OperationError
 from pypy.interpreter.baseobjspace import Wrappable
 from pypy.interpreter.typedef import TypeDef, GetSetProperty
-from pypy.interpreter.gateway import interp2app
+from pypy.interpreter.gateway import interp2app, W_Root
 from pypy.rlib.rStringIO import RStringIO
 
 
@@ -15,6 +16,11 @@
         self.space = space
         self.softspace = 0    # part of the file object API
 
+    def descr___iter__(self):
+        self.check_closed()
+        return self
+    descr___iter__.unwrap_spec = ['self']
+
     def descr_close(self):
         self.close()
     descr_close.unwrap_spec = ['self']
@@ -25,16 +31,69 @@
             raise OperationError(space.w_ValueError,
                                  space.wrap("I/O operation on closed file"))
 
+    def descr_flush(self):
+        self.check_closed()
+    descr_flush.unwrap_spec = ['self']
+
     def descr_getvalue(self):
         self.check_closed()
         return self.space.wrap(self.getvalue())
     descr_getvalue.unwrap_spec = ['self']
 
+    def descr_isatty(self):
+        self.check_closed()
+        return self.space.w_False
+    descr_isatty.unwrap_spec = ['self']
+
+    def descr_next(self):
+        space = self.space
+        self.check_closed()
+        line = self.readline()
+        if len(line) == 0:
+            raise OperationError(space.w_StopIteration, space.w_None)
+        return space.wrap(line)
+
     def descr_read(self, n=-1):
         self.check_closed()
         return self.space.wrap(self.read(n))
     descr_read.unwrap_spec = ['self', int]
 
+    def readline(self, size=-1):
+        p = self.tell()
+        bigbuffer = self.copy_into_bigbuffer()
+        end = len(bigbuffer)
+        if size >= 0:
+            end = min(end, p + size)
+        assert p >= 0
+        i = p
+        while i < end:
+            finished = bigbuffer[i] == '\n'
+            i += 1
+            if finished:
+                break
+        self.seek(i)
+        return ''.join(bigbuffer[p:i])
+
+    def descr_readline(self, size=-1):
+        self.check_closed()
+        return self.space.wrap(self.readline(size))
+    descr_readline.unwrap_spec = ['self', int]
+
+    def descr_readlines(self, size=0):
+        self.check_closed()
+        lines_w = []
+        while True:
+            line = self.readline()
+            if len(line) == 0:
+                break
+            lines_w.append(self.space.wrap(line))
+            if size > 0:
+                size -= len(line)
+                if size <= 0:
+                    break
+        return self.space.newlist(lines_w)
+    descr_readlines.unwrap_spec = ['self', int]
+
     def descr_reset(self):
         self.check_closed()
         self.seek(0)
@@ -50,11 +109,36 @@
         return self.space.wrap(self.tell())
     descr_tell.unwrap_spec = ['self']
 
+    def descr_truncate(self, w_size=None):  # note: a wrapped size!
+        self.check_closed()
+        space = self.space
+        if w_size is None or space.is_w(w_size, space.w_None):
+            size = self.tell()
+        else:
+            size = space.int_w(w_size)
+        if size < 0:
+            raise OperationError(space.w_IOError, space.wrap("negative size"))
+        self.truncate(size)
+    descr_truncate.unwrap_spec = ['self', W_Root]
+
     def descr_write(self, buffer):
         self.check_closed()
         self.write(buffer)
     descr_write.unwrap_spec = ['self', 'bufferstr']
 
+    def descr_writelines(self, w_lines):
+        space = self.space
+        w_iterator = space.iter(w_lines)
+        while True:
+            try:
+                w_line = space.next(w_iterator)
+            except OperationError, e:
+                if not e.match(space, space.w_StopIteration):
+                    raise
+                break  # done
+            self.write(space.str_w(w_line))
+    descr_writelines.unwrap_spec = ['self', W_Root]
+
 # ____________________________________________________________
 
 def descr_closed(space, self):
@@ -68,17 +152,26 @@
 
 W_OutputType.typedef = TypeDef(
     "cStringIO.StringO",
+    __doc__      = "Simple type for output to strings.",
+    __iter__     = interp2app(W_OutputType.descr___iter__),
     close        = interp2app(W_OutputType.descr_close),
     closed       = GetSetProperty(descr_closed, cls=W_OutputType),
+    flush        = interp2app(W_OutputType.descr_flush),
     getvalue     = interp2app(W_OutputType.descr_getvalue),
+    isatty       = interp2app(W_OutputType.descr_isatty),
+    next         = interp2app(W_OutputType.descr_next),
     read         = interp2app(W_OutputType.descr_read),
+    readline     = interp2app(W_OutputType.descr_readline),
+    readlines    = interp2app(W_OutputType.descr_readlines),
     reset        = interp2app(W_OutputType.descr_reset),
     seek         = interp2app(W_OutputType.descr_seek),
     softspace    = GetSetProperty(descr_softspace,
                                   descr_setsoftspace,
                                   cls=W_OutputType),
     tell         = interp2app(W_OutputType.descr_tell),
+    truncate     = interp2app(W_OutputType.descr_truncate),
     write        = interp2app(W_OutputType.descr_write),
+    writelines   = interp2app(W_OutputType.descr_writelines),
     )
 
 # ____________________________________________________________

Modified: pypy/dist/pypy/module/cStringIO/test/test_interp_stringio.py
==============================================================================
--- pypy/dist/pypy/module/cStringIO/test/test_interp_stringio.py	(original)
+++ pypy/dist/pypy/module/cStringIO/test/test_interp_stringio.py	Mon Apr 14 16:07:14 2008
@@ -92,16 +92,14 @@
         assert f.tell() == 15
 
     def test_reset(self):
-        from cStringIO import StringIO
-        f = StringIO()
+        f = self.StringIO()
         f.write('foobar')
         f.reset()
         res = f.read()
         assert res == 'foobar'
 
     def test_close(self):
-        from cStringIO import StringIO
-        f = StringIO()
+        f = self.StringIO()
         assert not f.closed
         f.close()
         raises(ValueError, f.write, 'hello')
@@ -111,3 +109,52 @@
         assert f.closed
         f.close()
         assert f.closed
+
+    def test_readline(self):
+        f = self.StringIO()
+        f.write('foo\nbar\nbaz')
+        f.seek(0)
+        assert f.readline() == 'foo\n'
+        assert f.readline(2) == 'ba'
+        assert f.readline() == 'r\n'
+        assert f.readline() == 'baz'
+        assert f.readline() == ''
+        f.seek(0)
+        assert iter(f) is f
+        assert list(f) == ['foo\n', 'bar\n', 'baz']
+        f.write('\n')
+        f.seek(0)
+        assert iter(f) is f
+        assert list(f) == ['foo\n', 'bar\n', 'baz\n']
+        f.seek(0)
+        assert f.readlines() == ['foo\n', 'bar\n', 'baz\n']
+        f.seek(0)
+        assert f.readlines(2) == ['foo\n']
+
+    def test_misc(self):
+        f = self.StringIO()
+        f.flush()
+        assert f.isatty() is False
+
+    def test_truncate(self):
+        f = self.StringIO()
+        f.truncate(20)
+        assert f.getvalue() == '\x00' * 20
+        assert f.tell() == 0
+        f.seek(0, 2)
+        f.write('hello')
+        f.write(' world')
+        f.truncate(30)
+        assert f.getvalue() == '\x00' * 20 + 'hello worl'
+        f.truncate(25)
+        assert f.getvalue() == '\x00' * 20 + 'hello'
+        f.write('baz')
+        f.write('egg')
+        f.truncate(3)
+        assert f.getvalue() == '\x00' * 3
+        raises(IOError, f.truncate, -1)
+
+    def test_writelines(self):
+        f = self.StringIO()
+        f.writelines(['foo', 'bar', 'baz'])
+        assert f.getvalue() == 'foobarbaz'

Modified: pypy/dist/pypy/rlib/rStringIO.py
==============================================================================
--- pypy/dist/pypy/rlib/rStringIO.py	(original)
+++ pypy/dist/pypy/rlib/rStringIO.py	Mon Apr 14 16:07:14 2008
@@ -64,6 +64,7 @@
             self.strings[i] = ''
         self.numstrings = 0
         self.numbigstrings = 0
+        return self.bigbuffer
 
     def reduce(self):
         """Reduce the number of (non-empty) strings in self.strings."""
@@ -106,22 +107,22 @@
             else:
                 # slow path: collect all data into self.bigbuffer and
                 # handle the various cases
-                self.copy_into_bigbuffer()
-                fitting = len(self.bigbuffer) - p
+                bigbuffer = self.copy_into_bigbuffer()
+                fitting = len(bigbuffer) - p
                 if fitting > 0:
                     # the write starts before the end of the data
                     fitting = min(len(buffer), fitting)
                     for i in range(fitting):
-                        self.bigbuffer[p+i] = buffer[i]
+                        bigbuffer[p+i] = buffer[i]
                     if len(buffer) > fitting:
                         # the write extends beyond the end of the data
-                        self.bigbuffer += buffer[fitting:]
+                        bigbuffer += buffer[fitting:]
                         endp = AT_END
                     self.pos = endp
                     return
                 else:
                     # the write starts at or beyond the end of the data
-                    self.bigbuffer += '\x00' * (-fitting)
+                    bigbuffer += '\x00' * (-fitting)
                     self.pos = AT_END      # fall-through to the fast path
         # Fast path.
         # See comments in reduce().
@@ -158,8 +159,8 @@
             return self.getvalue()     # reading everything
         if p == AT_END:
             return ''
-        self.copy_into_bigbuffer()
-        mysize = len(self.bigbuffer)
+        bigbuffer = self.copy_into_bigbuffer()
+        mysize = len(bigbuffer)
         count = mysize - p
         if n >= 0:
             count = min(n, count)
@@ -167,7 +168,23 @@
             return ''
         if p == 0 and count == mysize:
             self.pos = AT_END
-            return ''.join(self.bigbuffer)
+            return ''.join(bigbuffer)
         else:
             self.pos = p + count
-            return ''.join(self.bigbuffer[p:p+count])
+            return ''.join(bigbuffer[p:p+count])
+
+    def truncate(self, size):
+        assert size >= 0
+        self.pos = self.tell()     # in case it was AT_END
+        if size > len(self.bigbuffer):
+            self.copy_into_bigbuffer()
+        else:
+            # we can drop all extra strings
+            for i in range(0, self.numstrings):
+                self.strings[i] = ''
+            self.numstrings = 0
+            self.numbigstrings = 0
+        if size <= len(self.bigbuffer):
+            del self.bigbuffer[size:]
+        else:
+            self.bigbuffer += '\x00' * (size - len(self.bigbuffer))

Modified: pypy/dist/pypy/rlib/test/test_rStringIO.py
==============================================================================
--- pypy/dist/pypy/rlib/test/test_rStringIO.py	(original)
+++ pypy/dist/pypy/rlib/test/test_rStringIO.py	Mon Apr 14 16:07:14 2008
@@ -77,6 +77,23 @@
     assert f.read(2) == ''
     assert f.tell() == 15
 
+def test_truncate():
+    f = RStringIO()
+    f.truncate(20)
+    assert f.getvalue() == '\x00' * 20
+    assert f.tell() == 0
+    f.seek(0, 2)
+    f.write('hello')
+    f.write(' world')
+    f.truncate(30)
+    assert f.getvalue() == '\x00' * 20 + 'hello worl'
+    f.truncate(25)
+    assert f.getvalue() == '\x00' * 20 + 'hello'
+    f.write('baz')
+    f.write('egg')
+    f.truncate(3)
+    assert f.getvalue() == '\x00' * 3
+
 def test_stress():
     import cStringIO, random
     f = RStringIO()



More information about the Pypy-commit mailing list