[Python-checkins] r68684 - python/branches/io-c/Lib/test/test_io.py

antoine.pitrou python-checkins at python.org
Sun Jan 18 00:17:27 CET 2009


Author: antoine.pitrou
Date: Sun Jan 18 00:17:26 2009
New Revision: 68684

Log:
Fixes and additions to test_io.py



Modified:
   python/branches/io-c/Lib/test/test_io.py

Modified: python/branches/io-c/Lib/test/test_io.py
==============================================================================
--- python/branches/io-c/Lib/test/test_io.py	(original)
+++ python/branches/io-c/Lib/test/test_io.py	Sun Jan 18 00:17:26 2009
@@ -7,27 +7,36 @@
 import threading
 import random
 import unittest
-from itertools import chain, cycle
+from itertools import chain, cycle, count
+from collections import deque
 from test import support
 
 import codecs
 import io  # The module under test
 
 
+def _default_chunk_size():
+    """Get the default TextIOWrapper chunk size"""
+    with open(__file__, "r", encoding="latin1") as f:
+        return f._CHUNK_SIZE
+
+
 class MockRawIO(io.RawIOBase):
 
     def __init__(self, read_stack=()):
         self._read_stack = list(read_stack)
         self._write_stack = []
+        self._reads = 0
 
     def read(self, n=None):
+        self._reads += 1
         try:
             return self._read_stack.pop(0)
         except:
             return b""
 
     def write(self, b):
-        self._write_stack.append(b[:])
+        self._write_stack.append(bytes(b))
         return len(b)
 
     def writable(self):
@@ -43,10 +52,57 @@
         return True
 
     def seek(self, pos, whence):
-        pass
+        return 0   # wrong but we gotta return something
 
     def tell(self):
-        return 42
+        return 0   # same comment as above
+
+    def readinto(self, buf):
+        self._reads += 1
+        max_len = len(buf)
+        try:
+            data = self._read_stack[0]
+        except IndexError:
+            return 0
+        if data is None:
+            del self._read_stack[0]
+            return None
+        n = len(data)
+        if len(data) <= max_len:
+            del self._read_stack[0]
+            buf[:n] = data
+            return n
+        else:
+            buf[:] = data[:max_len]
+            self._read_stack[0] = data[max_len:]
+            return max_len
+
+    def truncate(self, pos=None):
+        return pos
+
+
+class MisbehavedRawIO(MockRawIO):
+    def write(self, b):
+        return MockRawIO.write(self, b) * 2
+
+    def read(self, n=None):
+        return MockRawIO.read(self, n) * 2
+
+    def seek(self, pos, whence):
+        return -123
+
+    def tell(self):
+        return -456
+
+    def readinto(self, buf):
+        MockRawIO.readinto(self, buf)
+        return len(buf) * 5
+
+
+class CloseFailureIO(MockRawIO):
+
+    def close(self):
+        raise IOError
 
 
 class MockFileIO(io.BytesIO):
@@ -60,24 +116,50 @@
         self.read_history.append(None if res is None else len(res))
         return res
 
+    def readinto(self, b):
+        res = io.BytesIO.readinto(self, b)
+        self.read_history.append(res)
+        return res
 
 class MockNonBlockWriterIO(io.RawIOBase):
 
-    def __init__(self, blocking_script):
-        self._blocking_script = list(blocking_script)
+    def __init__(self):
         self._write_stack = []
+        self._blocker_char = None
 
-    def write(self, b):
-        self._write_stack.append(b[:])
-        n = self._blocking_script.pop(0)
-        if (n < 0):
-            raise io.BlockingIOError(0, "test blocking", -n)
-        else:
-            return n
+    def pop_written(self):
+        s = b"".join(self._write_stack)
+        self._write_stack[:] = []
+        return s
+
+    def block_on(self, char):
+        """Block when a given char is encountered."""
+        self._blocker_char = char
+
+    def readable(self):
+        return True
+
+    def seekable(self):
+        return True
 
     def writable(self):
         return True
 
+    def write(self, b):
+        b = bytes(b)
+        n = -1
+        if self._blocker_char:
+            try:
+                n = b.index(self._blocker_char)
+            except ValueError:
+                pass
+            else:
+                self._blocker_char = None
+                self._write_stack.append(b[:n])
+                raise io.BlockingIOError(0, "test blocking", n)
+        self._write_stack.append(b)
+        return len(b)
+
 
 class IOTest(unittest.TestCase):
 
@@ -180,14 +262,15 @@
 
     def test_readline(self):
         f = io.open(support.TESTFN, "wb")
-        f.write(b"abc\ndef\nxyzzy\nfoo")
+        f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
         f.close()
         f = io.open(support.TESTFN, "rb")
         self.assertEqual(f.readline(), b"abc\n")
         self.assertEqual(f.readline(10), b"def\n")
         self.assertEqual(f.readline(2), b"xy")
         self.assertEqual(f.readline(4), b"zzy\n")
-        self.assertEqual(f.readline(), b"foo")
+        self.assertEqual(f.readline(), b"foo\x00bar\n")
+        self.assertEqual(f.readline(), b"another line")
         f.close()
 
     def test_raw_bytes_io(self):
@@ -238,15 +321,20 @@
         class MyFileIO(io.FileIO):
             def __del__(self):
                 record.append(1)
-                io.FileIO.__del__(self)
+                try:
+                    f = io.FileIO.__del__
+                except AttributeError:
+                    pass
+                else:
+                    f(self)
             def close(self):
                 record.append(2)
                 io.FileIO.close(self)
             def flush(self):
                 record.append(3)
                 io.FileIO.flush(self)
-        f = MyFileIO(support.TESTFN, "w")
-        f.write("xxx")
+        f = MyFileIO(support.TESTFN, "wb")
+        f.write(b"xxx")
         del f
         self.assertEqual(record, [1, 2, 3])
 
@@ -354,13 +442,138 @@
     EOF = ""
 
 
-class BufferedReaderTest(unittest.TestCase):
+class CommonBufferedTests:
+    # Tests common to BufferedReader, BufferedWriter and BufferedRandom
+
+    def testFileno(self):
+        rawio = MockRawIO()
+        bufio = self.tp(rawio)
+
+        self.assertEquals(42, bufio.fileno())
+
+    def testFilenoNoFileno(self):
+        # XXX will we always have fileno() function? If so, kill
+        # this test. Else, write it.
+        pass
+
+    def testInvalidArgs(self):
+        rawio = MockRawIO()
+        bufio = self.tp(rawio)
+        # Invalid whence
+        self.assertRaises(ValueError, bufio.seek, 0, -1)
+        self.assertRaises(ValueError, bufio.seek, 0, 3)
+
+    def testOverrideDestructor(self):
+        tp = self.tp
+        record = []
+        class MyBufferedIO(tp):
+            def __del__(self):
+                record.append(1)
+                try:
+                    f = tp.__del__
+                except AttributeError:
+                    pass
+                else:
+                    f(self)
+            def close(self):
+                record.append(2)
+                tp.close(self)
+            def flush(self):
+                record.append(3)
+                tp.flush(self)
+        rawio = MockRawIO()
+        bufio = MyBufferedIO(rawio)
+        writable = bufio.writable()
+        del bufio
+        if writable:
+            self.assertEqual(record, [1, 2, 3])
+        else:
+            self.assertEqual(record, [1, 2])
+
+    def testContext(self):
+        # Test usability as a context manager
+        rawio = MockRawIO()
+        bufio = self.tp(rawio)
+        def _with():
+            with bufio:
+                pass
+        _with()
+        # bufio should now be closed, and using it a second time should raise
+        # a ValueError.
+        self.assertRaises(ValueError, _with)
+
+    def testErrorThroughDestructor(self):
+        # Test that the exception state is not modified by a destructor,
+        # even if close() fails.
+        rawio = CloseFailureIO()
+        def f():
+            self.tp(rawio).xyzzy
+        self.assertRaises(AttributeError, f)
+
+
+class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
+    tp = io.BufferedReader
+    read_mode = "rb"
+
+    def testConstructor(self):
+        rawio = MockRawIO([b"abc"])
+        bufio = self.tp(rawio)
+        bufio.__init__(rawio)
+        bufio.__init__(rawio, buffer_size=1024)
+        bufio.__init__(rawio, buffer_size=16)
+        self.assertEquals(b"abc", bufio.read())
+        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
+        self.assertRaises(ValueError, bufio.read)
+        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
+        self.assertRaises(ValueError, bufio.read)
+        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
+        self.assertRaises(ValueError, bufio.read)
+        if sys.maxsize > 0x7FFFFFFF:
+            # The allocation can succeed on 32-bit builds, e.g. with more
+            # than 2GB RAM and a 64-bit kernel.
+            self.assertRaises((OverflowError, MemoryError, ValueError),
+                bufio.__init__, rawio, sys.maxsize)
+        rawio = MockRawIO([b"abc"])
+        bufio.__init__(rawio)
+        self.assertEquals(b"abc", bufio.read())
 
     def testRead(self):
         rawio = MockRawIO((b"abc", b"d", b"efg"))
-        bufio = io.BufferedReader(rawio)
-
+        bufio = self.tp(rawio)
         self.assertEquals(b"abcdef", bufio.read(6))
+        # Invalid args
+        self.assertRaises(ValueError, bufio.read, -2)
+
+    def testRead1(self):
+        rawio = MockRawIO((b"abc", b"d", b"efg"))
+        bufio = self.tp(rawio)
+        self.assertEquals(b"a", bufio.read(1))
+        self.assertEquals(b"b", bufio.read1(1))
+        self.assertEquals(rawio._reads, 1)
+        self.assertEquals(b"c", bufio.read1(100))
+        self.assertEquals(rawio._reads, 1)
+        self.assertEquals(b"d", bufio.read1(100))
+        self.assertEquals(rawio._reads, 2)
+        self.assertEquals(b"efg", bufio.read1(100))
+        self.assertEquals(rawio._reads, 3)
+        self.assertEquals(b"", bufio.read1(100))
+        # Invalid args
+        self.assertRaises(ValueError, bufio.read1, -1)
+
+    def testReadinto(self):
+        rawio = MockRawIO((b"abc", b"d", b"efg"))
+        bufio = self.tp(rawio)
+        b = bytearray(2)
+        self.assertEquals(bufio.readinto(b), 2)
+        self.assertEquals(b, b"ab")
+        self.assertEquals(bufio.readinto(b), 2)
+        self.assertEquals(b, b"cd")
+        self.assertEquals(bufio.readinto(b), 2)
+        self.assertEquals(b, b"ef")
+        self.assertEquals(bufio.readinto(b), 1)
+        self.assertEquals(b, b"gf")
+        self.assertEquals(bufio.readinto(b), 0)
+        self.assertEquals(b, b"gf")
 
     def testBuffering(self):
         data = b"abcdefghi"
@@ -374,47 +587,38 @@
 
         for bufsize, buf_read_sizes, raw_read_sizes in tests:
             rawio = MockFileIO(data)
-            bufio = io.BufferedReader(rawio, buffer_size=bufsize)
+            bufio = self.tp(rawio, buffer_size=bufsize)
             pos = 0
             for nbytes in buf_read_sizes:
                 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
                 pos += nbytes
+            # this is mildly implementation-dependent
             self.assertEquals(rawio.read_history, raw_read_sizes)
 
     def testReadNonBlocking(self):
         # Inject some None's in there to simulate EWOULDBLOCK
-        rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
-        bufio = io.BufferedReader(rawio)
+        rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
+        bufio = self.tp(rawio)
 
         self.assertEquals(b"abcd", bufio.read(6))
         self.assertEquals(b"e", bufio.read(1))
         self.assertEquals(b"fg", bufio.read())
+        self.assertEquals(b"", bufio.peek(1))
         self.assert_(None is bufio.read())
         self.assertEquals(b"", bufio.read())
 
     def testReadToEof(self):
         rawio = MockRawIO((b"abc", b"d", b"efg"))
-        bufio = io.BufferedReader(rawio)
+        bufio = self.tp(rawio)
 
         self.assertEquals(b"abcdefg", bufio.read(9000))
 
     def testReadNoArgs(self):
         rawio = MockRawIO((b"abc", b"d", b"efg"))
-        bufio = io.BufferedReader(rawio)
+        bufio = self.tp(rawio)
 
         self.assertEquals(b"abcdefg", bufio.read())
 
-    def testFileno(self):
-        rawio = MockRawIO((b"abc", b"d", b"efg"))
-        bufio = io.BufferedReader(rawio)
-
-        self.assertEquals(42, bufio.fileno())
-
-    def testFilenoNoFileno(self):
-        # XXX will we always have fileno() function? If so, kill
-        # this test. Else, write it.
-        pass
-
     def testThreads(self):
         try:
             # Write out many bytes with exactly the same number of 0's,
@@ -426,8 +630,8 @@
             s = bytes(bytearray(l))
             with io.open(support.TESTFN, "wb") as f:
                 f.write(s)
-            with io.open(support.TESTFN, "rb", buffering=0) as raw:
-                bufio = io.BufferedReader(raw, 8)
+            with io.open(support.TESTFN, self.read_mode, buffering=0) as raw:
+                bufio = self.tp(raw, 8)
                 errors = []
                 results = []
                 def f():
@@ -457,80 +661,196 @@
         finally:
             support.unlink(support.TESTFN)
 
-
-
-class BufferedWriterTest(unittest.TestCase):
+    def testMisbehavedRawIO(self):
+        rawio = MisbehavedRawIO((b"abc", b"d", b"efg"))
+        bufio = self.tp(rawio)
+        self.assertRaises(IOError, bufio.seek, 0)
+        self.assertRaises(IOError, bufio.tell)
+        self.assertRaises(IOError, bufio.read, 10)
+
+
+class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
+    tp = io.BufferedWriter
+    write_mode = "wb"
+
+    def testConstructor(self):
+        rawio = MockRawIO()
+        bufio = self.tp(rawio)
+        bufio.__init__(rawio)
+        bufio.__init__(rawio, buffer_size=1024)
+        bufio.__init__(rawio, buffer_size=16)
+        self.assertEquals(3, bufio.write(b"abc"))
+        bufio.flush()
+        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
+        self.assertRaises(ValueError, bufio.write, b"def")
+        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
+        self.assertRaises(ValueError, bufio.write, b"def")
+        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
+        self.assertRaises(ValueError, bufio.write, b"def")
+        if sys.maxsize > 0x7FFFFFFF:
+            # The allocation can succeed on 32-bit builds, e.g. with more
+            # than 2GB RAM and a 64-bit kernel.
+            self.assertRaises((OverflowError, MemoryError, ValueError),
+                bufio.__init__, rawio, sys.maxsize)
+        bufio.__init__(rawio)
+        self.assertEquals(3, bufio.write(b"ghi"))
+        bufio.flush()
+        self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
 
     def testWrite(self):
         # Write to the buffered IO but don't overflow the buffer.
         writer = MockRawIO()
-        bufio = io.BufferedWriter(writer, 8)
-
+        bufio = self.tp(writer, 8)
         bufio.write(b"abc")
-
         self.assertFalse(writer._write_stack)
 
     def testWriteOverflow(self):
         writer = MockRawIO()
-        bufio = io.BufferedWriter(writer, 8)
+        bufio = self.tp(writer, 8)
+        contents = b"abcdefghijklmnop"
+        for n in range(0, len(contents), 3):
+            bufio.write(contents[n:n+3])
+        flushed = b"".join(writer._write_stack)
+        # At least (total - 8) bytes were implicitly flushed, perhaps more
+        # depending on the implementation.
+        self.assert_(flushed.startswith(contents[:-8]), flushed)
+
+    def check_writes(self, intermediate_func):
+        # Lots of writes, test the flushed output is as expected.
+        contents = bytes(range(256)) * 1000
+        n = 0
+        writer = MockRawIO()
+        bufio = self.tp(writer, 13)
+        # Generator of write sizes: repeat each N 15 times then proceed to N+1
+        def gen_sizes():
+            for size in count(1):
+                for i in range(15):
+                    yield size
+        sizes = gen_sizes()
+        while n < len(contents):
+            size = min(next(sizes), len(contents) - n)
+            self.assertEquals(bufio.write(contents[n:n+size]), size)
+            intermediate_func(bufio)
+            n += size
+        bufio.flush()
+        self.assertEquals(contents, b"".join(writer._write_stack))
 
-        bufio.write(b"abc")
-        bufio.write(b"defghijkl")
+    def testWrites(self):
+        self.check_writes(lambda bufio: None)
 
-        self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
+    def testWritesAndFlushes(self):
+        self.check_writes(lambda bufio: bufio.flush())
 
-    def testWriteNonBlocking(self):
-        raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
-        bufio = io.BufferedWriter(raw, 8, 16)
+    def testWritesAndSeeks(self):
+        def _seekabs(bufio):
+            pos = bufio.tell()
+            bufio.seek(pos + 1, 0)
+            bufio.seek(pos - 1, 0)
+            bufio.seek(pos, 0)
+        self.check_writes(_seekabs)
+        def _seekrel(bufio):
+            pos = bufio.seek(0, 1)
+            bufio.seek(+1, 1)
+            bufio.seek(-1, 1)
+            bufio.seek(pos, 0)
+        self.check_writes(_seekrel)
 
-        bufio.write(b"asdf")
-        bufio.write(b"asdfa")
-        self.assertEquals(b"asdfasdfa", raw._write_stack[0])
-
-        bufio.write(b"asdfasdfasdf")
-        self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
-        bufio.write(b"asdfasdfasdf")
-        self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
-        self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
-
-        bufio.write(b"asdfasdfasdf")
-
-        # XXX I don't like this test. It relies too heavily on how the
-        # algorithm actually works, which we might change. Refactor
-        # later.
+    def testWritesAndTruncates(self):
+        self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
 
-    def testFileno(self):
-        rawio = MockRawIO((b"abc", b"d", b"efg"))
-        bufio = io.BufferedWriter(rawio)
+    def testWriteNonBlocking(self):
+        raw = MockNonBlockWriterIO()
+        bufio = self.tp(raw, 8, 8)
 
-        self.assertEquals(42, bufio.fileno())
+        self.assertEquals(bufio.write(b"abcd"), 4)
+        self.assertEquals(bufio.write(b"efghi"), 5)
+        # 1 byte will be written, the rest will be buffered
+        raw.block_on(b"k")
+        self.assertEquals(bufio.write(b"jklmn"), 5)
+
+        # 8 bytes will be written, 8 will be buffered and the rest will be lost
+        raw.block_on(b"0")
+        try:
+            bufio.write(b"opqrwxyz0123456789")
+        except io.BlockingIOError as e:
+            written = e.characters_written
+        else:
+            self.fail("BlockingIOError should have been raised")
+        self.assertEquals(written, 16)
+        self.assertEquals(raw.pop_written(),
+            b"abcdefghijklmnopqrwxyz")
+
+        self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
+        s = raw.pop_written()
+        # Previously buffered bytes were flushed
+        self.assertTrue(s.startswith(b"01234567A"), s)
+
+    def testWriteAndRewind(self):
+        raw = io.BytesIO()
+        bufio = self.tp(raw, 4)
+        self.assertEqual(bufio.write(b"abcdef"), 6)
+        self.assertEqual(bufio.tell(), 6)
+        bufio.seek(0, 0)
+        self.assertEqual(bufio.write(b"XY"), 2)
+        bufio.seek(6, 0)
+        self.assertEqual(raw.getvalue(), b"XYcdef")
+        self.assertEqual(bufio.write(b"123456"), 6)
+        bufio.flush()
+        self.assertEqual(raw.getvalue(), b"XYcdef123456")
 
     def testFlush(self):
         writer = MockRawIO()
-        bufio = io.BufferedWriter(writer, 8)
-
+        bufio = self.tp(writer, 8)
         bufio.write(b"abc")
         bufio.flush()
+        self.assertEquals(b"abc", writer._write_stack[0])
 
+    def testDestructor(self):
+        writer = MockRawIO()
+        bufio = self.tp(writer, 8)
+        bufio.write(b"abc")
+        del bufio
         self.assertEquals(b"abc", writer._write_stack[0])
 
+    def testTruncate(self):
+        # Truncate implicitly flushes the buffer.
+        with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
+            bufio = self.tp(raw, 8)
+            bufio.write(b"abcdef")
+            self.assertEqual(bufio.truncate(3), 3)
+            self.assertEqual(bufio.tell(), 3)
+        with io.open(support.TESTFN, "rb", buffering=0) as f:
+            self.assertEqual(f.read(), b"abc")
+
     def testThreads(self):
-        # BufferedWriter should not raise exceptions or crash
-        # when called from multiple threads.
         try:
+            # Write out many bytes from many threads and test they were
+            # all flushed.
+            N = 1000
+            contents = bytes(range(256)) * N
+            sizes = cycle([1, 19])
+            n = 0
+            queue = deque()
+            while n < len(contents):
+                size = next(sizes)
+                queue.append(contents[n:n+size])
+                n += size
+            del contents
             # We use a real file object because it allows us to
             # exercise situations where the GIL is released before
             # writing the buffer to the raw streams. This is in addition
             # to concurrency issues due to switching threads in the middle
             # of Python code.
-            with io.open(support.TESTFN, "wb", buffering=0) as raw:
-                bufio = io.BufferedWriter(raw, 8)
+            with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
+                bufio = self.tp(raw, 8)
                 errors = []
                 def f():
                     try:
-                        # Write enough bytes to flush the buffer
-                        s = b"a" * 19
-                        for i in range(50):
+                        while True:
+                            try:
+                                s = queue.popleft()
+                            except IndexError:
+                                return
                             bufio.write(s)
                     except Exception as e:
                         errors.append(e)
@@ -543,9 +863,21 @@
                     t.join()
                 self.assertFalse(errors,
                     "the following exceptions were caught: %r" % errors)
+                bufio.close()
+            with io.open(support.TESTFN, "rb") as f:
+                s = f.read()
+            for i in range(256):
+                self.assertEquals(s.count(bytes([i])), N)
         finally:
             support.unlink(support.TESTFN)
 
+    def testMisbehavedRawIO(self):
+        rawio = MisbehavedRawIO()
+        bufio = self.tp(rawio, 5)
+        self.assertRaises(IOError, bufio.seek, 0)
+        self.assertRaises(IOError, bufio.tell)
+        self.assertRaises(IOError, bufio.write, b"abcdef")
+
 
 class BufferedRWPairTest(unittest.TestCase):
 
@@ -558,22 +890,29 @@
         # XXX More Tests
 
 
-class BufferedRandomTest(unittest.TestCase):
+class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
+    tp = io.BufferedRandom
+    read_mode = "rb+"
+    write_mode = "wb+"
+
+    def testConstructor(self):
+        BufferedReaderTest.testConstructor(self)
+        BufferedWriterTest.testConstructor(self)
 
     def testReadAndWrite(self):
         raw = MockRawIO((b"asdf", b"ghjk"))
-        rw = io.BufferedRandom(raw, 8, 12)
+        rw = self.tp(raw, 8, 12)
 
         self.assertEqual(b"as", rw.read(2))
         rw.write(b"ddd")
         rw.write(b"eee")
         self.assertFalse(raw._write_stack) # Buffer writes
-        self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
+        self.assertEqual(b"ghjk", rw.read())
         self.assertEquals(b"dddeee", raw._write_stack[0])
 
     def testSeekAndTell(self):
         raw = io.BytesIO(b"asdfghjkl")
-        rw = io.BufferedRandom(raw)
+        rw = self.tp(raw)
 
         self.assertEquals(b"as", rw.read(2))
         self.assertEquals(2, rw.tell())
@@ -591,6 +930,95 @@
         self.assertEquals(b"fl", rw.read(11))
         self.assertRaises(TypeError, rw.seek, 0.0)
 
+    def check_flush_and_read(self, read_func):
+        raw = io.BytesIO(b"abcdefghi")
+        bufio = self.tp(raw)
+
+        self.assertEquals(b"ab", read_func(bufio, 2))
+        bufio.write(b"12")
+        self.assertEquals(b"ef", read_func(bufio, 2))
+        self.assertEquals(6, bufio.tell())
+        bufio.flush()
+        self.assertEquals(6, bufio.tell())
+        self.assertEquals(b"ghi", read_func(bufio))
+        raw.seek(0, 0)
+        raw.write(b"XYZ")
+        # flush() resets the read buffer
+        bufio.flush()
+        bufio.seek(0, 0)
+        self.assertEquals(b"XYZ", read_func(bufio, 3))
+
+    def testFlushAndRead(self):
+        self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
+
+    def testFlushAndReadinto(self):
+        def _readinto(bufio, n=-1):
+            b = bytearray(n if n >= 0 else 9999)
+            n = bufio.readinto(b)
+            return bytes(b[:n])
+        self.check_flush_and_read(_readinto)
+
+    def testFlushAndPeek(self):
+        def _peek(bufio, n=-1):
+            # This relies on the fact that the buffer can contain the whole
+            # raw stream, otherwise peek() can return less.
+            b = bufio.peek(n)
+            if n != -1:
+                b = b[:n]
+            bufio.seek(len(b), 1)
+            return b
+        self.check_flush_and_read(_peek)
+
+    def testFlushAndWrite(self):
+        raw = io.BytesIO(b"abcdefghi")
+        bufio = self.tp(raw)
+
+        bufio.write(b"123")
+        bufio.flush()
+        bufio.write(b"45")
+        bufio.flush()
+        bufio.seek(0, 0)
+        self.assertEquals(b"12345fghi", raw.getvalue())
+        self.assertEquals(b"12345fghi", bufio.read())
+
+    def testThreads(self):
+        BufferedReaderTest.testThreads(self)
+        BufferedWriterTest.testThreads(self)
+
+    def testWritesAndPeeks(self):
+        def _peek(bufio):
+            bufio.peek(1)
+        self.check_writes(_peek)
+        def _peek(bufio):
+            pos = bufio.tell()
+            bufio.seek(-1, 1)
+            bufio.peek(1)
+            bufio.seek(pos, 0)
+        self.check_writes(_peek)
+
+    def testWritesAndReads(self):
+        def _read(bufio):
+            bufio.seek(-1, 1)
+            bufio.read(1)
+        self.check_writes(_read)
+
+    def testWritesAndRead1s(self):
+        def _read1(bufio):
+            bufio.seek(-1, 1)
+            bufio.read1(1)
+        self.check_writes(_read1)
+
+    def testWritesAndReadintos(self):
+        def _read(bufio):
+            bufio.seek(-1, 1)
+            bufio.readinto(bytearray(1))
+        self.check_writes(_read)
+
+    def testMisbehavedRawIO(self):
+        BufferedReaderTest.testMisbehavedRawIO(self)
+        BufferedWriterTest.testMisbehavedRawIO(self)
+
+
 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
 # properties:
 #   - A single output character can correspond to many bytes of input.
@@ -745,6 +1173,18 @@
     def tearDown(self):
         support.unlink(support.TESTFN)
 
+    def testConstructor(self):
+        r = io.BytesIO(b"\xc3\xa9\n\n")
+        b = io.BufferedReader(r, 1000)
+        t = io.TextIOWrapper(b)
+        t.__init__(b, encoding="latin1", newline="\r\n")
+        t.__init__(b, encoding="utf8")
+        self.assertEquals("\xe9\n", t.readline())
+        self.assertRaises(TypeError, t.__init__, b, newline=42)
+        self.assertRaises(ValueError, t.read)
+        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
+        self.assertRaises(ValueError, t.read)
+
     def testLineBuffering(self):
         r = io.BytesIO()
         b = io.BufferedWriter(r, 1000)
@@ -756,6 +1196,15 @@
         t.write("A\rB")
         self.assertEquals(r.getvalue(), b"XY\nZA\rB")
 
+    def testEncoding(self):
+        # Check the encoding attribute is always set, and valid
+        b = io.BytesIO()
+        t = io.TextIOWrapper(b, encoding="utf8")
+        self.assertEqual(t.encoding, "utf8")
+        t = io.TextIOWrapper(b)
+        self.assert_(t.encoding is not None)
+        codecs.lookup(t.encoding)
+
     def testEncodingErrorsReading(self):
         # (1) default
         b = io.BytesIO(b"abc\n\xff\n")
@@ -798,40 +1247,6 @@
         t.flush()
         self.assertEquals(b.getvalue(), b"abc?def\n")
 
-    def testNewlinesInput(self):
-        testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
-        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
-        for newline, expected in [
-            (None, normalized.decode("ascii").splitlines(True)),
-            ("", testdata.decode("ascii").splitlines(True)),
-            ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
-            ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
-            ("\r",  ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
-            ]:
-            buf = io.BytesIO(testdata)
-            txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
-            self.assertEquals(txt.readlines(), expected)
-            txt.seek(0)
-            self.assertEquals(txt.read(), "".join(expected))
-
-    def testNewlinesOutput(self):
-        testdict = {
-            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
-            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
-            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
-            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
-            }
-        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
-        for newline, expected in tests:
-            buf = io.BytesIO()
-            txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
-            txt.write("AAA\nB")
-            txt.write("BB\nCCC\n")
-            txt.write("X\rY\r\nZ")
-            txt.flush()
-            self.assertEquals(buf.closed, False)
-            self.assertEquals(buf.getvalue(), expected)
-
     def testNewlines(self):
         input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
 
@@ -875,14 +1290,14 @@
                         self.assertEquals(len(got_lines), len(exp_lines))
 
     def testNewlinesInput(self):
-        testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
+        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
         normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
         for newline, expected in [
             (None, normalized.decode("ascii").splitlines(True)),
             ("", testdata.decode("ascii").splitlines(True)),
-            ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
-            ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
-            ("\r",  ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
+            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
             ]:
             buf = io.BytesIO(testdata)
             txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
@@ -891,32 +1306,64 @@
             self.assertEquals(txt.read(), "".join(expected))
 
     def testNewlinesOutput(self):
-        data = "AAA\nBBB\rCCC\n"
-        data_lf = b"AAA\nBBB\rCCC\n"
-        data_cr = b"AAA\rBBB\rCCC\r"
-        data_crlf = b"AAA\r\nBBB\rCCC\r\n"
-        save_linesep = os.linesep
-        try:
-            for os.linesep, newline, expected in [
-                ("\n", None, data_lf),
-                ("\r\n", None, data_crlf),
-                ("\n", "", data_lf),
-                ("\r\n", "", data_lf),
-                ("\n", "\n", data_lf),
-                ("\r\n", "\n", data_lf),
-                ("\n", "\r", data_cr),
-                ("\r\n", "\r", data_cr),
-                ("\n", "\r\n", data_crlf),
-                ("\r\n", "\r\n", data_crlf),
-                ]:
-                buf = io.BytesIO()
-                txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
-                txt.write(data)
-                txt.close()
-                self.assertEquals(buf.closed, True)
-                self.assertRaises(ValueError, buf.getvalue)
-        finally:
-            os.linesep = save_linesep
+        testdict = {
+            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
+            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
+            }
+        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
+        for newline, expected in tests:
+            buf = io.BytesIO()
+            txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
+            txt.write("AAA\nB")
+            txt.write("BB\nCCC\n")
+            txt.write("X\rY\r\nZ")
+            txt.flush()
+            self.assertEquals(buf.closed, False)
+            self.assertEquals(buf.getvalue(), expected)
+
+    def testDestructor(self):
+        l = []
+        class MyBytesIO(io.BytesIO):
+            def close(self):
+                l.append(self.getvalue())
+                io.BytesIO.close(self)
+        b = MyBytesIO()
+        t = io.TextIOWrapper(b, encoding="ascii")
+        t.write("abc")
+        del t
+        self.assertEquals([b"abc"], l)
+
+    def testOverrideDestructor(self):
+        record = []
+        class MyTextIO(io.TextIOWrapper):
+            def __del__(self):
+                record.append(1)
+                try:
+                    f = io.TextIOWrapper.__del__
+                except AttributeError:
+                    pass
+                else:
+                    f(self)
+            def close(self):
+                record.append(2)
+                io.TextIOWrapper.close(self)
+            def flush(self):
+                record.append(3)
+                io.TextIOWrapper.flush(self)
+        b = io.BytesIO()
+        t = MyTextIO(b, encoding="ascii")
+        del t
+        self.assertEqual(record, [1, 2, 3])
+
+    def testErrorThroughDestructor(self):
+        # Test that the exception state is not modified by a destructor,
+        # even if close() fails.
+        rawio = CloseFailureIO()
+        def f():
+            io.TextIOWrapper(rawio).xyzzy
+        self.assertRaises(AttributeError, f)
 
     # Systematic tests of the text I/O API
 
@@ -990,7 +1437,7 @@
         f.close()
 
     def testSeeking(self):
-        chunk_size = io.TextIOWrapper._CHUNK_SIZE
+        chunk_size = _default_chunk_size()
         prefix_size = chunk_size - 2
         u_prefix = "a" * prefix_size
         prefix = bytes(u_prefix.encode("utf-8"))
@@ -1021,9 +1468,7 @@
 
     def testSeekAndTell(self):
         """Test seek/tell using the StatefulIncrementalDecoder."""
-        # Make this test faster by forcing a smaller (but large enough)
-        # chunk size. The bigger the chunker size, the slower seek() is,
-        # as it tries to replay character decoding one byte at a time.
+        # Make test faster by doing smaller seeks
         CHUNK_SIZE = 256
 
         def testSeekAndTellWithData(data, min_pos=0):
@@ -1033,13 +1478,13 @@
             f.write(data)
             f.close()
             f = io.open(support.TESTFN, encoding='test_decoder')
+            f._CHUNK_SIZE = CHUNK_SIZE
             decoded = f.read()
             f.close()
 
             for i in range(min_pos, len(decoded) + 1): # seek positions
                 for j in [1, 5, len(decoded) - i]: # read lengths
                     f = io.open(support.TESTFN, encoding='test_decoder')
-                    f._CHUNK_SIZE = CHUNK_SIZE
                     self.assertEquals(f.read(i), decoded[:i])
                     cookie = f.tell()
                     self.assertEquals(f.read(j), decoded[i:i + j])
@@ -1057,6 +1502,7 @@
                 testSeekAndTellWithData(input)
 
             # Position each test case so that it crosses a chunk boundary.
+            #CHUNK_SIZE = _default_chunk_size()
             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
                 offset = CHUNK_SIZE - len(input)//2
                 prefix = b'.'*offset
@@ -1328,52 +1774,16 @@
         f.close()
         g.close()
 
-    def test_io_after_close(self):
-        for kwargs in [
-                {"mode": "w"},
-                {"mode": "wb"},
-                {"mode": "w", "buffering": 1},
-                {"mode": "w", "buffering": 2},
-                {"mode": "wb", "buffering": 0},
-                {"mode": "r"},
-                {"mode": "rb"},
-                {"mode": "r", "buffering": 1},
-                {"mode": "r", "buffering": 2},
-                {"mode": "rb", "buffering": 0},
-                {"mode": "w+"},
-                {"mode": "w+b"},
-                {"mode": "w+", "buffering": 1},
-                {"mode": "w+", "buffering": 2},
-                {"mode": "w+b", "buffering": 0},
-            ]:
-            f = io.open(support.TESTFN, **kwargs)
-            f.close()
-            self.assertRaises(ValueError, f.flush)
-            self.assertRaises(ValueError, f.fileno)
-            self.assertRaises(ValueError, f.isatty)
-            self.assertRaises(ValueError, f.__iter__)
-            if hasattr(f, "peek"):
-                self.assertRaises(ValueError, f.peek, 1)
-            self.assertRaises(ValueError, f.read)
-            if hasattr(f, "read1"):
-                self.assertRaises(ValueError, f.read1, 1024)
-            if hasattr(f, "readinto"):
-                self.assertRaises(ValueError, f.readinto, bytearray(1024))
-            self.assertRaises(ValueError, f.readline)
-            self.assertRaises(ValueError, f.readlines)
-            self.assertRaises(ValueError, f.seek, 0)
-            self.assertRaises(ValueError, f.tell)
-            self.assertRaises(ValueError, f.truncate)
-            self.assertRaises(ValueError, f.write, "")
-            self.assertRaises(ValueError, f.writelines, [])
-
 
 def test_main():
-    support.run_unittest(IOTest, BytesIOTest, StringIOTest,
+    support.run_unittest(
+        IOTest, BytesIOTest, StringIOTest,
                               BufferedReaderTest, BufferedWriterTest,
                               BufferedRWPairTest, BufferedRandomTest,
                               StatefulIncrementalDecoderTest,
-                              TextIOWrapperTest, MiscIOTest)
+                              TextIOWrapperTest, MiscIOTest
+                              )
 
 if __name__ == "__main__":
-    unittest.main()
+    test_main()
+    #unittest.main()


More information about the Python-checkins mailing list