[Python-3000-checkins] r54754 - in python/branches/p3yk/Lib: io.py test/test_io.py

guido.van.rossum python-3000-checkins at python.org
Wed Apr 11 18:07:53 CEST 2007


Author: guido.van.rossum
Date: Wed Apr 11 18:07:50 2007
New Revision: 54754

Modified:
   python/branches/p3yk/Lib/io.py
   python/branches/p3yk/Lib/test/test_io.py
Log:
Speed up next() by disabling snapshot updating then.


Modified: python/branches/p3yk/Lib/io.py
==============================================================================
--- python/branches/p3yk/Lib/io.py	(original)
+++ python/branches/p3yk/Lib/io.py	Wed Apr 11 18:07:50 2007
@@ -1,7 +1,7 @@
 """New I/O library conforming to PEP 3116.
 
-This is an early prototype; eventually some of this will be
-reimplemented in C and the rest may be turned into a package.
+This is a prototype; hopefully eventually some of this will be
+reimplemented in C.
 
 Conformance of alternative implementations: all arguments are intended
 to be positional-only except the arguments of the open() function.
@@ -11,6 +11,7 @@
 names like __iter__).  Only the top-level names listed in the __all__
 variable are part of the specification.
 
+XXX edge cases when switching between reading/writing
 XXX need to default buffer size to 1 if isatty()
 XXX need to support 1 meaning line-buffered
 XXX don't use assert to validate input requirements
@@ -877,7 +878,7 @@
     Character and line based layer over a BufferedIOBase object.
     """
 
-    _CHUNK_SIZE = 64
+    _CHUNK_SIZE = 128
 
     def __init__(self, buffer, encoding=None, newline=None):
         if newline not in (None, "\n", "\r\n"):
@@ -894,7 +895,7 @@
         self._decoder_in_rest_pickle = None
         self._pending = ""
         self._snapshot = None
-        self._seekable = self.buffer.seekable()
+        self._seekable = self._telling = self.buffer.seekable()
 
     # A word about _snapshot.  This attribute is either None, or a
     # tuple (decoder_pickle, readahead, pending) where decoder_pickle
@@ -908,6 +909,7 @@
 
     def flush(self):
         self.buffer.flush()
+        self._telling = self._seekable
 
     def close(self):
         self.flush()
@@ -945,7 +947,7 @@
 
     def _read_chunk(self):
         assert self._decoder is not None
-        if not self._seekable:
+        if not self._telling:
             readahead = self.buffer.read(self._CHUNK_SIZE)
             pending = self._decoder.decode(readahead, not readahead)
             return readahead, pending
@@ -976,6 +978,8 @@
     def tell(self):
         if not self._seekable:
             raise IOError("Underlying stream is not seekable")
+        if not self._telling:
+            raise IOError("Telling position disabled by next() call")
         self.flush()
         position = self.buffer.tell()
         if self._decoder is None or self._snapshot is None:
@@ -1016,6 +1020,7 @@
                              (whence,))
         if pos < 0:
             raise ValueError("Negative seek position %r" % (pos,))
+        self.flush()
         orig_pos = pos
         ds, pos = self._decode_decoder_state(pos)
         if not ds:
@@ -1050,6 +1055,15 @@
             self._pending = res[n:]
             return res[:n]
 
+    def next(self) -> str:
+        self._telling = False
+        line = self.readline()
+        if not line:
+            self._snapshot = None
+            self._telling = self._seekable
+            raise StopIteration
+        return line
+
     def readline(self, limit=None):
         if limit is not None:
             # XXX Hack to support limit argument, for backwards compatibility

Modified: python/branches/p3yk/Lib/test/test_io.py
==============================================================================
--- python/branches/p3yk/Lib/test/test_io.py	(original)
+++ python/branches/p3yk/Lib/test/test_io.py	Wed Apr 11 18:07:50 2007
@@ -1,6 +1,7 @@
 """Unit tests for io.py."""
 
 import sys
+import time
 import unittest
 from itertools import chain
 from test import test_support
@@ -549,6 +550,63 @@
             rlines.append((pos, line))
         self.assertEquals(rlines, wlines)
 
+    def testTelling(self):
+        f = io.open(test_support.TESTFN, "w+", encoding="utf8")
+        p0 = f.tell()
+        f.write(u"\xff\n")
+        p1 = f.tell()
+        f.write(u"\xff\n")
+        p2 = f.tell()
+        f.seek(0)
+        self.assertEquals(f.tell(), p0)
+        self.assertEquals(f.readline(), u"\xff\n")
+        self.assertEquals(f.tell(), p1)
+        self.assertEquals(f.readline(), u"\xff\n")
+        self.assertEquals(f.tell(), p2)
+        f.seek(0)
+        for line in f:
+            self.assertEquals(line, u"\xff\n")
+            self.assertRaises(IOError, f.tell)
+        self.assertEquals(f.tell(), p2)
+        f.close()
+
+    def timingTest(self):
+        timer = time.time
+        enc = "utf8"
+        line = u"\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
+        nlines = 10000
+        nchars = len(line)
+        nbytes = len(line.encode(enc))
+        for chunk_size in (32, 64, 128, 256):
+            f = io.open(test_support.TESTFN, "w+", encoding=enc)
+            f._CHUNK_SIZE = chunk_size
+            t0 = timer()
+            for i in range(nlines):
+                f.write(line)
+            f.flush()
+            t1 = timer()
+            f.seek(0)
+            for line in f:
+                pass
+            t2 = timer()
+            f.seek(0)
+            while f.readline():
+                pass
+            t3 = timer()
+            f.seek(0)
+            while f.readline():
+                f.tell()
+            t4 = timer()
+            f.close()
+            if test_support.verbose:
+                print("\nTiming test: %d lines of %d characters (%d bytes)" %
+                      (nlines, nchars, nbytes))
+                print("File chunk size:          %6s" % f._CHUNK_SIZE)
+                print("Writing:                  %6.3f seconds" % (t1-t0))
+                print("Reading using iteration:  %6.3f seconds" % (t2-t1))
+                print("Reading using readline(): %6.3f seconds" % (t3-t2))
+                print("Using readline()+tell():  %6.3f seconds" % (t4-t3))
+
 
 # XXX Tests for open()
 


More information about the Python-3000-checkins mailing list