[pypy-svn] r53758 - in pypy/dist/pypy: module/cStringIO module/cStringIO/test rlib rlib/test
arigo at codespeak.net
arigo at codespeak.net
Mon Apr 14 16:07:14 CEST 2008
Author: arigo
Date: Mon Apr 14 16:07:14 2008
New Revision: 53758
Modified:
pypy/dist/pypy/module/cStringIO/interp_stringio.py
pypy/dist/pypy/module/cStringIO/test/test_interp_stringio.py
pypy/dist/pypy/rlib/rStringIO.py
pypy/dist/pypy/rlib/test/test_rStringIO.py
Log:
Roughly finished the methods of StringO.
Modified: pypy/dist/pypy/module/cStringIO/interp_stringio.py
==============================================================================
--- pypy/dist/pypy/module/cStringIO/interp_stringio.py (original)
+++ pypy/dist/pypy/module/cStringIO/interp_stringio.py Mon Apr 14 16:07:14 2008
@@ -1,7 +1,8 @@
+import sys
from pypy.interpreter.error import OperationError
from pypy.interpreter.baseobjspace import Wrappable
from pypy.interpreter.typedef import TypeDef, GetSetProperty
-from pypy.interpreter.gateway import interp2app
+from pypy.interpreter.gateway import interp2app, W_Root
from pypy.rlib.rStringIO import RStringIO
@@ -15,6 +16,11 @@
self.space = space
self.softspace = 0 # part of the file object API
+ def descr___iter__(self):
+ self.check_closed()
+ return self
+ descr___iter__.unwrap_spec = ['self']
+
def descr_close(self):
self.close()
descr_close.unwrap_spec = ['self']
@@ -25,16 +31,69 @@
raise OperationError(space.w_ValueError,
space.wrap("I/O operation on closed file"))
+ def descr_flush(self):
+ self.check_closed()
+ descr_flush.unwrap_spec = ['self']
+
def descr_getvalue(self):
self.check_closed()
return self.space.wrap(self.getvalue())
descr_getvalue.unwrap_spec = ['self']
+ def descr_isatty(self):
+ self.check_closed()
+ return self.space.w_False
+ descr_isatty.unwrap_spec = ['self']
+
+ def descr_next(self):
+ space = self.space
+ self.check_closed()
+ line = self.readline()
+ if len(line) == 0:
+ raise OperationError(space.w_StopIteration, space.w_None)
+ return space.wrap(line)
+
def descr_read(self, n=-1):
self.check_closed()
return self.space.wrap(self.read(n))
descr_read.unwrap_spec = ['self', int]
+ def readline(self, size=-1):
+ p = self.tell()
+ bigbuffer = self.copy_into_bigbuffer()
+ end = len(bigbuffer)
+ if size >= 0:
+ end = min(end, p + size)
+ assert p >= 0
+ i = p
+ while i < end:
+ finished = bigbuffer[i] == '\n'
+ i += 1
+ if finished:
+ break
+ self.seek(i)
+ return ''.join(bigbuffer[p:i])
+
+ def descr_readline(self, size=-1):
+ self.check_closed()
+ return self.space.wrap(self.readline(size))
+ descr_readline.unwrap_spec = ['self', int]
+
+ def descr_readlines(self, size=0):
+ self.check_closed()
+ lines_w = []
+ while True:
+ line = self.readline()
+ if len(line) == 0:
+ break
+ lines_w.append(self.space.wrap(line))
+ if size > 0:
+ size -= len(line)
+ if size <= 0:
+ break
+ return self.space.newlist(lines_w)
+ descr_readlines.unwrap_spec = ['self', int]
+
def descr_reset(self):
self.check_closed()
self.seek(0)
@@ -50,11 +109,36 @@
return self.space.wrap(self.tell())
descr_tell.unwrap_spec = ['self']
+ def descr_truncate(self, w_size=None): # note: a wrapped size!
+ self.check_closed()
+ space = self.space
+ if w_size is None or space.is_w(w_size, space.w_None):
+ size = self.tell()
+ else:
+ size = space.int_w(w_size)
+ if size < 0:
+ raise OperationError(space.w_IOError, space.wrap("negative size"))
+ self.truncate(size)
+ descr_truncate.unwrap_spec = ['self', W_Root]
+
def descr_write(self, buffer):
self.check_closed()
self.write(buffer)
descr_write.unwrap_spec = ['self', 'bufferstr']
+ def descr_writelines(self, w_lines):
+ space = self.space
+ w_iterator = space.iter(w_lines)
+ while True:
+ try:
+ w_line = space.next(w_iterator)
+ except OperationError, e:
+ if not e.match(space, space.w_StopIteration):
+ raise
+ break # done
+ self.write(space.str_w(w_line))
+ descr_writelines.unwrap_spec = ['self', W_Root]
+
# ____________________________________________________________
def descr_closed(space, self):
@@ -68,17 +152,26 @@
W_OutputType.typedef = TypeDef(
"cStringIO.StringO",
+ __doc__ = "Simple type for output to strings.",
+ __iter__ = interp2app(W_OutputType.descr___iter__),
close = interp2app(W_OutputType.descr_close),
closed = GetSetProperty(descr_closed, cls=W_OutputType),
+ flush = interp2app(W_OutputType.descr_flush),
getvalue = interp2app(W_OutputType.descr_getvalue),
+ isatty = interp2app(W_OutputType.descr_isatty),
+ next = interp2app(W_OutputType.descr_next),
read = interp2app(W_OutputType.descr_read),
+ readline = interp2app(W_OutputType.descr_readline),
+ readlines = interp2app(W_OutputType.descr_readlines),
reset = interp2app(W_OutputType.descr_reset),
seek = interp2app(W_OutputType.descr_seek),
softspace = GetSetProperty(descr_softspace,
descr_setsoftspace,
cls=W_OutputType),
tell = interp2app(W_OutputType.descr_tell),
+ truncate = interp2app(W_OutputType.descr_truncate),
write = interp2app(W_OutputType.descr_write),
+ writelines = interp2app(W_OutputType.descr_writelines),
)
# ____________________________________________________________
Modified: pypy/dist/pypy/module/cStringIO/test/test_interp_stringio.py
==============================================================================
--- pypy/dist/pypy/module/cStringIO/test/test_interp_stringio.py (original)
+++ pypy/dist/pypy/module/cStringIO/test/test_interp_stringio.py Mon Apr 14 16:07:14 2008
@@ -92,16 +92,14 @@
assert f.tell() == 15
def test_reset(self):
- from cStringIO import StringIO
- f = StringIO()
+ f = self.StringIO()
f.write('foobar')
f.reset()
res = f.read()
assert res == 'foobar'
def test_close(self):
- from cStringIO import StringIO
- f = StringIO()
+ f = self.StringIO()
assert not f.closed
f.close()
raises(ValueError, f.write, 'hello')
@@ -111,3 +109,52 @@
assert f.closed
f.close()
assert f.closed
+
+ def test_readline(self):
+ f = self.StringIO()
+ f.write('foo\nbar\nbaz')
+ f.seek(0)
+ assert f.readline() == 'foo\n'
+ assert f.readline(2) == 'ba'
+ assert f.readline() == 'r\n'
+ assert f.readline() == 'baz'
+ assert f.readline() == ''
+ f.seek(0)
+ assert iter(f) is f
+ assert list(f) == ['foo\n', 'bar\n', 'baz']
+ f.write('\n')
+ f.seek(0)
+ assert iter(f) is f
+ assert list(f) == ['foo\n', 'bar\n', 'baz\n']
+ f.seek(0)
+ assert f.readlines() == ['foo\n', 'bar\n', 'baz\n']
+ f.seek(0)
+ assert f.readlines(2) == ['foo\n']
+
+ def test_misc(self):
+ f = self.StringIO()
+ f.flush()
+ assert f.isatty() is False
+
+ def test_truncate(self):
+ f = self.StringIO()
+ f.truncate(20)
+ assert f.getvalue() == '\x00' * 20
+ assert f.tell() == 0
+ f.seek(0, 2)
+ f.write('hello')
+ f.write(' world')
+ f.truncate(30)
+ assert f.getvalue() == '\x00' * 20 + 'hello worl'
+ f.truncate(25)
+ assert f.getvalue() == '\x00' * 20 + 'hello'
+ f.write('baz')
+ f.write('egg')
+ f.truncate(3)
+ assert f.getvalue() == '\x00' * 3
+ raises(IOError, f.truncate, -1)
+
+ def test_writelines(self):
+ f = self.StringIO()
+ f.writelines(['foo', 'bar', 'baz'])
+ assert f.getvalue() == 'foobarbaz'
Modified: pypy/dist/pypy/rlib/rStringIO.py
==============================================================================
--- pypy/dist/pypy/rlib/rStringIO.py (original)
+++ pypy/dist/pypy/rlib/rStringIO.py Mon Apr 14 16:07:14 2008
@@ -64,6 +64,7 @@
self.strings[i] = ''
self.numstrings = 0
self.numbigstrings = 0
+ return self.bigbuffer
def reduce(self):
"""Reduce the number of (non-empty) strings in self.strings."""
@@ -106,22 +107,22 @@
else:
# slow path: collect all data into self.bigbuffer and
# handle the various cases
- self.copy_into_bigbuffer()
- fitting = len(self.bigbuffer) - p
+ bigbuffer = self.copy_into_bigbuffer()
+ fitting = len(bigbuffer) - p
if fitting > 0:
# the write starts before the end of the data
fitting = min(len(buffer), fitting)
for i in range(fitting):
- self.bigbuffer[p+i] = buffer[i]
+ bigbuffer[p+i] = buffer[i]
if len(buffer) > fitting:
# the write extends beyond the end of the data
- self.bigbuffer += buffer[fitting:]
+ bigbuffer += buffer[fitting:]
endp = AT_END
self.pos = endp
return
else:
# the write starts at or beyond the end of the data
- self.bigbuffer += '\x00' * (-fitting)
+ bigbuffer += '\x00' * (-fitting)
self.pos = AT_END # fall-through to the fast path
# Fast path.
# See comments in reduce().
@@ -158,8 +159,8 @@
return self.getvalue() # reading everything
if p == AT_END:
return ''
- self.copy_into_bigbuffer()
- mysize = len(self.bigbuffer)
+ bigbuffer = self.copy_into_bigbuffer()
+ mysize = len(bigbuffer)
count = mysize - p
if n >= 0:
count = min(n, count)
@@ -167,7 +168,23 @@
return ''
if p == 0 and count == mysize:
self.pos = AT_END
- return ''.join(self.bigbuffer)
+ return ''.join(bigbuffer)
else:
self.pos = p + count
- return ''.join(self.bigbuffer[p:p+count])
+ return ''.join(bigbuffer[p:p+count])
+
+ def truncate(self, size):
+ assert size >= 0
+ self.pos = self.tell() # in case it was AT_END
+ if size > len(self.bigbuffer):
+ self.copy_into_bigbuffer()
+ else:
+ # we can drop all extra strings
+ for i in range(0, self.numstrings):
+ self.strings[i] = ''
+ self.numstrings = 0
+ self.numbigstrings = 0
+ if size <= len(self.bigbuffer):
+ del self.bigbuffer[size:]
+ else:
+ self.bigbuffer += '\x00' * (size - len(self.bigbuffer))
Modified: pypy/dist/pypy/rlib/test/test_rStringIO.py
==============================================================================
--- pypy/dist/pypy/rlib/test/test_rStringIO.py (original)
+++ pypy/dist/pypy/rlib/test/test_rStringIO.py Mon Apr 14 16:07:14 2008
@@ -77,6 +77,23 @@
assert f.read(2) == ''
assert f.tell() == 15
+def test_truncate():
+ f = RStringIO()
+ f.truncate(20)
+ assert f.getvalue() == '\x00' * 20
+ assert f.tell() == 0
+ f.seek(0, 2)
+ f.write('hello')
+ f.write(' world')
+ f.truncate(30)
+ assert f.getvalue() == '\x00' * 20 + 'hello worl'
+ f.truncate(25)
+ assert f.getvalue() == '\x00' * 20 + 'hello'
+ f.write('baz')
+ f.write('egg')
+ f.truncate(3)
+ assert f.getvalue() == '\x00' * 3
+
def test_stress():
import cStringIO, random
f = RStringIO()
More information about the Pypy-commit
mailing list