[Python-checkins] r68684 - python/branches/io-c/Lib/test/test_io.py
antoine.pitrou
python-checkins at python.org
Sun Jan 18 00:17:27 CET 2009
Author: antoine.pitrou
Date: Sun Jan 18 00:17:26 2009
New Revision: 68684
Log:
Fixes and additions to test_io.py
Modified:
python/branches/io-c/Lib/test/test_io.py
Modified: python/branches/io-c/Lib/test/test_io.py
==============================================================================
--- python/branches/io-c/Lib/test/test_io.py (original)
+++ python/branches/io-c/Lib/test/test_io.py Sun Jan 18 00:17:26 2009
@@ -7,27 +7,36 @@
import threading
import random
import unittest
-from itertools import chain, cycle
+from itertools import chain, cycle, count
+from collections import deque
from test import support
import codecs
import io # The module under test
+def _default_chunk_size():
+ """Get the default TextIOWrapper chunk size"""
+ with open(__file__, "r", encoding="latin1") as f:
+ return f._CHUNK_SIZE
+
+
class MockRawIO(io.RawIOBase):
def __init__(self, read_stack=()):
self._read_stack = list(read_stack)
self._write_stack = []
+ self._reads = 0
def read(self, n=None):
+ self._reads += 1
try:
return self._read_stack.pop(0)
except:
return b""
def write(self, b):
- self._write_stack.append(b[:])
+ self._write_stack.append(bytes(b))
return len(b)
def writable(self):
@@ -43,10 +52,57 @@
return True
def seek(self, pos, whence):
- pass
+ return 0 # wrong but we gotta return something
def tell(self):
- return 42
+ return 0 # same comment as above
+
+ def readinto(self, buf):
+ self._reads += 1
+ max_len = len(buf)
+ try:
+ data = self._read_stack[0]
+ except IndexError:
+ return 0
+ if data is None:
+ del self._read_stack[0]
+ return None
+ n = len(data)
+ if len(data) <= max_len:
+ del self._read_stack[0]
+ buf[:n] = data
+ return n
+ else:
+ buf[:] = data[:max_len]
+ self._read_stack[0] = data[max_len:]
+ return max_len
+
+ def truncate(self, pos=None):
+ return pos
+
+
+class MisbehavedRawIO(MockRawIO):
+ def write(self, b):
+ return MockRawIO.write(self, b) * 2
+
+ def read(self, n=None):
+ return MockRawIO.read(self, n) * 2
+
+ def seek(self, pos, whence):
+ return -123
+
+ def tell(self):
+ return -456
+
+ def readinto(self, buf):
+ MockRawIO.readinto(self, buf)
+ return len(buf) * 5
+
+
+class CloseFailureIO(MockRawIO):
+
+ def close(self):
+ raise IOError
class MockFileIO(io.BytesIO):
@@ -60,24 +116,50 @@
self.read_history.append(None if res is None else len(res))
return res
+ def readinto(self, b):
+ res = io.BytesIO.readinto(self, b)
+ self.read_history.append(res)
+ return res
class MockNonBlockWriterIO(io.RawIOBase):
- def __init__(self, blocking_script):
- self._blocking_script = list(blocking_script)
+ def __init__(self):
self._write_stack = []
+ self._blocker_char = None
- def write(self, b):
- self._write_stack.append(b[:])
- n = self._blocking_script.pop(0)
- if (n < 0):
- raise io.BlockingIOError(0, "test blocking", -n)
- else:
- return n
+ def pop_written(self):
+ s = b"".join(self._write_stack)
+ self._write_stack[:] = []
+ return s
+
+ def block_on(self, char):
+ """Block when a given char is encountered."""
+ self._blocker_char = char
+
+ def readable(self):
+ return True
+
+ def seekable(self):
+ return True
def writable(self):
return True
+ def write(self, b):
+ b = bytes(b)
+ n = -1
+ if self._blocker_char:
+ try:
+ n = b.index(self._blocker_char)
+ except ValueError:
+ pass
+ else:
+ self._blocker_char = None
+ self._write_stack.append(b[:n])
+ raise io.BlockingIOError(0, "test blocking", n)
+ self._write_stack.append(b)
+ return len(b)
+
class IOTest(unittest.TestCase):
@@ -180,14 +262,15 @@
def test_readline(self):
f = io.open(support.TESTFN, "wb")
- f.write(b"abc\ndef\nxyzzy\nfoo")
+ f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
f.close()
f = io.open(support.TESTFN, "rb")
self.assertEqual(f.readline(), b"abc\n")
self.assertEqual(f.readline(10), b"def\n")
self.assertEqual(f.readline(2), b"xy")
self.assertEqual(f.readline(4), b"zzy\n")
- self.assertEqual(f.readline(), b"foo")
+ self.assertEqual(f.readline(), b"foo\x00bar\n")
+ self.assertEqual(f.readline(), b"another line")
f.close()
def test_raw_bytes_io(self):
@@ -238,15 +321,20 @@
class MyFileIO(io.FileIO):
def __del__(self):
record.append(1)
- io.FileIO.__del__(self)
+ try:
+ f = io.FileIO.__del__
+ except AttributeError:
+ pass
+ else:
+ f(self)
def close(self):
record.append(2)
io.FileIO.close(self)
def flush(self):
record.append(3)
io.FileIO.flush(self)
- f = MyFileIO(support.TESTFN, "w")
- f.write("xxx")
+ f = MyFileIO(support.TESTFN, "wb")
+ f.write(b"xxx")
del f
self.assertEqual(record, [1, 2, 3])
@@ -354,13 +442,138 @@
EOF = ""
-class BufferedReaderTest(unittest.TestCase):
+class CommonBufferedTests:
+ # Tests common to BufferedReader, BufferedWriter and BufferedRandom
+
+ def testFileno(self):
+ rawio = MockRawIO()
+ bufio = self.tp(rawio)
+
+ self.assertEquals(42, bufio.fileno())
+
+ def testFilenoNoFileno(self):
+ # XXX will we always have fileno() function? If so, kill
+ # this test. Else, write it.
+ pass
+
+ def testInvalidArgs(self):
+ rawio = MockRawIO()
+ bufio = self.tp(rawio)
+ # Invalid whence
+ self.assertRaises(ValueError, bufio.seek, 0, -1)
+ self.assertRaises(ValueError, bufio.seek, 0, 3)
+
+ def testOverrideDestructor(self):
+ tp = self.tp
+ record = []
+ class MyBufferedIO(tp):
+ def __del__(self):
+ record.append(1)
+ try:
+ f = tp.__del__
+ except AttributeError:
+ pass
+ else:
+ f(self)
+ def close(self):
+ record.append(2)
+ tp.close(self)
+ def flush(self):
+ record.append(3)
+ tp.flush(self)
+ rawio = MockRawIO()
+ bufio = MyBufferedIO(rawio)
+ writable = bufio.writable()
+ del bufio
+ if writable:
+ self.assertEqual(record, [1, 2, 3])
+ else:
+ self.assertEqual(record, [1, 2])
+
+ def testContext(self):
+ # Test usability as a context manager
+ rawio = MockRawIO()
+ bufio = self.tp(rawio)
+ def _with():
+ with bufio:
+ pass
+ _with()
+ # bufio should now be closed, and using it a second time should raise
+ # a ValueError.
+ self.assertRaises(ValueError, _with)
+
+ def testErrorThroughDestructor(self):
+ # Test that the exception state is not modified by a destructor,
+ # even if close() fails.
+ rawio = CloseFailureIO()
+ def f():
+ self.tp(rawio).xyzzy
+ self.assertRaises(AttributeError, f)
+
+
+class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
+ tp = io.BufferedReader
+ read_mode = "rb"
+
+ def testConstructor(self):
+ rawio = MockRawIO([b"abc"])
+ bufio = self.tp(rawio)
+ bufio.__init__(rawio)
+ bufio.__init__(rawio, buffer_size=1024)
+ bufio.__init__(rawio, buffer_size=16)
+ self.assertEquals(b"abc", bufio.read())
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
+ self.assertRaises(ValueError, bufio.read)
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
+ self.assertRaises(ValueError, bufio.read)
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
+ self.assertRaises(ValueError, bufio.read)
+ if sys.maxsize > 0x7FFFFFFF:
+ # The allocation can succeed on 32-bit builds, e.g. with more
+ # than 2GB RAM and a 64-bit kernel.
+ self.assertRaises((OverflowError, MemoryError, ValueError),
+ bufio.__init__, rawio, sys.maxsize)
+ rawio = MockRawIO([b"abc"])
+ bufio.__init__(rawio)
+ self.assertEquals(b"abc", bufio.read())
def testRead(self):
rawio = MockRawIO((b"abc", b"d", b"efg"))
- bufio = io.BufferedReader(rawio)
-
+ bufio = self.tp(rawio)
self.assertEquals(b"abcdef", bufio.read(6))
+ # Invalid args
+ self.assertRaises(ValueError, bufio.read, -2)
+
+ def testRead1(self):
+ rawio = MockRawIO((b"abc", b"d", b"efg"))
+ bufio = self.tp(rawio)
+ self.assertEquals(b"a", bufio.read(1))
+ self.assertEquals(b"b", bufio.read1(1))
+ self.assertEquals(rawio._reads, 1)
+ self.assertEquals(b"c", bufio.read1(100))
+ self.assertEquals(rawio._reads, 1)
+ self.assertEquals(b"d", bufio.read1(100))
+ self.assertEquals(rawio._reads, 2)
+ self.assertEquals(b"efg", bufio.read1(100))
+ self.assertEquals(rawio._reads, 3)
+ self.assertEquals(b"", bufio.read1(100))
+ # Invalid args
+ self.assertRaises(ValueError, bufio.read1, -1)
+
+ def testReadinto(self):
+ rawio = MockRawIO((b"abc", b"d", b"efg"))
+ bufio = self.tp(rawio)
+ b = bytearray(2)
+ self.assertEquals(bufio.readinto(b), 2)
+ self.assertEquals(b, b"ab")
+ self.assertEquals(bufio.readinto(b), 2)
+ self.assertEquals(b, b"cd")
+ self.assertEquals(bufio.readinto(b), 2)
+ self.assertEquals(b, b"ef")
+ self.assertEquals(bufio.readinto(b), 1)
+ self.assertEquals(b, b"gf")
+ self.assertEquals(bufio.readinto(b), 0)
+ self.assertEquals(b, b"gf")
def testBuffering(self):
data = b"abcdefghi"
@@ -374,47 +587,38 @@
for bufsize, buf_read_sizes, raw_read_sizes in tests:
rawio = MockFileIO(data)
- bufio = io.BufferedReader(rawio, buffer_size=bufsize)
+ bufio = self.tp(rawio, buffer_size=bufsize)
pos = 0
for nbytes in buf_read_sizes:
self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
pos += nbytes
+ # this is mildly implementation-dependent
self.assertEquals(rawio.read_history, raw_read_sizes)
def testReadNonBlocking(self):
# Inject some None's in there to simulate EWOULDBLOCK
- rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
- bufio = io.BufferedReader(rawio)
+ rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
+ bufio = self.tp(rawio)
self.assertEquals(b"abcd", bufio.read(6))
self.assertEquals(b"e", bufio.read(1))
self.assertEquals(b"fg", bufio.read())
+ self.assertEquals(b"", bufio.peek(1))
self.assert_(None is bufio.read())
self.assertEquals(b"", bufio.read())
def testReadToEof(self):
rawio = MockRawIO((b"abc", b"d", b"efg"))
- bufio = io.BufferedReader(rawio)
+ bufio = self.tp(rawio)
self.assertEquals(b"abcdefg", bufio.read(9000))
def testReadNoArgs(self):
rawio = MockRawIO((b"abc", b"d", b"efg"))
- bufio = io.BufferedReader(rawio)
+ bufio = self.tp(rawio)
self.assertEquals(b"abcdefg", bufio.read())
- def testFileno(self):
- rawio = MockRawIO((b"abc", b"d", b"efg"))
- bufio = io.BufferedReader(rawio)
-
- self.assertEquals(42, bufio.fileno())
-
- def testFilenoNoFileno(self):
- # XXX will we always have fileno() function? If so, kill
- # this test. Else, write it.
- pass
-
def testThreads(self):
try:
# Write out many bytes with exactly the same number of 0's,
@@ -426,8 +630,8 @@
s = bytes(bytearray(l))
with io.open(support.TESTFN, "wb") as f:
f.write(s)
- with io.open(support.TESTFN, "rb", buffering=0) as raw:
- bufio = io.BufferedReader(raw, 8)
+ with io.open(support.TESTFN, self.read_mode, buffering=0) as raw:
+ bufio = self.tp(raw, 8)
errors = []
results = []
def f():
@@ -457,80 +661,196 @@
finally:
support.unlink(support.TESTFN)
-
-
-class BufferedWriterTest(unittest.TestCase):
+ def testMisbehavedRawIO(self):
+ rawio = MisbehavedRawIO((b"abc", b"d", b"efg"))
+ bufio = self.tp(rawio)
+ self.assertRaises(IOError, bufio.seek, 0)
+ self.assertRaises(IOError, bufio.tell)
+ self.assertRaises(IOError, bufio.read, 10)
+
+
+class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
+ tp = io.BufferedWriter
+ write_mode = "wb"
+
+ def testConstructor(self):
+ rawio = MockRawIO()
+ bufio = self.tp(rawio)
+ bufio.__init__(rawio)
+ bufio.__init__(rawio, buffer_size=1024)
+ bufio.__init__(rawio, buffer_size=16)
+ self.assertEquals(3, bufio.write(b"abc"))
+ bufio.flush()
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
+ self.assertRaises(ValueError, bufio.write, b"def")
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
+ self.assertRaises(ValueError, bufio.write, b"def")
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
+ self.assertRaises(ValueError, bufio.write, b"def")
+ if sys.maxsize > 0x7FFFFFFF:
+ # The allocation can succeed on 32-bit builds, e.g. with more
+ # than 2GB RAM and a 64-bit kernel.
+ self.assertRaises((OverflowError, MemoryError, ValueError),
+ bufio.__init__, rawio, sys.maxsize)
+ bufio.__init__(rawio)
+ self.assertEquals(3, bufio.write(b"ghi"))
+ bufio.flush()
+ self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
def testWrite(self):
# Write to the buffered IO but don't overflow the buffer.
writer = MockRawIO()
- bufio = io.BufferedWriter(writer, 8)
-
+ bufio = self.tp(writer, 8)
bufio.write(b"abc")
-
self.assertFalse(writer._write_stack)
def testWriteOverflow(self):
writer = MockRawIO()
- bufio = io.BufferedWriter(writer, 8)
+ bufio = self.tp(writer, 8)
+ contents = b"abcdefghijklmnop"
+ for n in range(0, len(contents), 3):
+ bufio.write(contents[n:n+3])
+ flushed = b"".join(writer._write_stack)
+ # At least (total - 8) bytes were implicitly flushed, perhaps more
+ # depending on the implementation.
+ self.assert_(flushed.startswith(contents[:-8]), flushed)
+
+ def check_writes(self, intermediate_func):
+ # Lots of writes, test the flushed output is as expected.
+ contents = bytes(range(256)) * 1000
+ n = 0
+ writer = MockRawIO()
+ bufio = self.tp(writer, 13)
+ # Generator of write sizes: repeat each N 15 times then proceed to N+1
+ def gen_sizes():
+ for size in count(1):
+ for i in range(15):
+ yield size
+ sizes = gen_sizes()
+ while n < len(contents):
+ size = min(next(sizes), len(contents) - n)
+ self.assertEquals(bufio.write(contents[n:n+size]), size)
+ intermediate_func(bufio)
+ n += size
+ bufio.flush()
+ self.assertEquals(contents, b"".join(writer._write_stack))
- bufio.write(b"abc")
- bufio.write(b"defghijkl")
+ def testWrites(self):
+ self.check_writes(lambda bufio: None)
- self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
+ def testWritesAndFlushes(self):
+ self.check_writes(lambda bufio: bufio.flush())
- def testWriteNonBlocking(self):
- raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
- bufio = io.BufferedWriter(raw, 8, 16)
+ def testWritesAndSeeks(self):
+ def _seekabs(bufio):
+ pos = bufio.tell()
+ bufio.seek(pos + 1, 0)
+ bufio.seek(pos - 1, 0)
+ bufio.seek(pos, 0)
+ self.check_writes(_seekabs)
+ def _seekrel(bufio):
+ pos = bufio.seek(0, 1)
+ bufio.seek(+1, 1)
+ bufio.seek(-1, 1)
+ bufio.seek(pos, 0)
+ self.check_writes(_seekrel)
- bufio.write(b"asdf")
- bufio.write(b"asdfa")
- self.assertEquals(b"asdfasdfa", raw._write_stack[0])
-
- bufio.write(b"asdfasdfasdf")
- self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
- bufio.write(b"asdfasdfasdf")
- self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
- self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
-
- bufio.write(b"asdfasdfasdf")
-
- # XXX I don't like this test. It relies too heavily on how the
- # algorithm actually works, which we might change. Refactor
- # later.
+ def testWritesAndTruncates(self):
+ self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
- def testFileno(self):
- rawio = MockRawIO((b"abc", b"d", b"efg"))
- bufio = io.BufferedWriter(rawio)
+ def testWriteNonBlocking(self):
+ raw = MockNonBlockWriterIO()
+ bufio = self.tp(raw, 8, 8)
- self.assertEquals(42, bufio.fileno())
+ self.assertEquals(bufio.write(b"abcd"), 4)
+ self.assertEquals(bufio.write(b"efghi"), 5)
+ # 1 byte will be written, the rest will be buffered
+ raw.block_on(b"k")
+ self.assertEquals(bufio.write(b"jklmn"), 5)
+
+ # 8 bytes will be written, 8 will be buffered and the rest will be lost
+ raw.block_on(b"0")
+ try:
+ bufio.write(b"opqrwxyz0123456789")
+ except io.BlockingIOError as e:
+ written = e.characters_written
+ else:
+ self.fail("BlockingIOError should have been raised")
+ self.assertEquals(written, 16)
+ self.assertEquals(raw.pop_written(),
+ b"abcdefghijklmnopqrwxyz")
+
+ self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
+ s = raw.pop_written()
+ # Previously buffered bytes were flushed
+ self.assertTrue(s.startswith(b"01234567A"), s)
+
+ def testWriteAndRewind(self):
+ raw = io.BytesIO()
+ bufio = self.tp(raw, 4)
+ self.assertEqual(bufio.write(b"abcdef"), 6)
+ self.assertEqual(bufio.tell(), 6)
+ bufio.seek(0, 0)
+ self.assertEqual(bufio.write(b"XY"), 2)
+ bufio.seek(6, 0)
+ self.assertEqual(raw.getvalue(), b"XYcdef")
+ self.assertEqual(bufio.write(b"123456"), 6)
+ bufio.flush()
+ self.assertEqual(raw.getvalue(), b"XYcdef123456")
def testFlush(self):
writer = MockRawIO()
- bufio = io.BufferedWriter(writer, 8)
-
+ bufio = self.tp(writer, 8)
bufio.write(b"abc")
bufio.flush()
+ self.assertEquals(b"abc", writer._write_stack[0])
+ def testDestructor(self):
+ writer = MockRawIO()
+ bufio = self.tp(writer, 8)
+ bufio.write(b"abc")
+ del bufio
self.assertEquals(b"abc", writer._write_stack[0])
+ def testTruncate(self):
+ # Truncate implicitly flushes the buffer.
+ with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
+ bufio = self.tp(raw, 8)
+ bufio.write(b"abcdef")
+ self.assertEqual(bufio.truncate(3), 3)
+ self.assertEqual(bufio.tell(), 3)
+ with io.open(support.TESTFN, "rb", buffering=0) as f:
+ self.assertEqual(f.read(), b"abc")
+
def testThreads(self):
- # BufferedWriter should not raise exceptions or crash
- # when called from multiple threads.
try:
+ # Write out many bytes from many threads and test they were
+ # all flushed.
+ N = 1000
+ contents = bytes(range(256)) * N
+ sizes = cycle([1, 19])
+ n = 0
+ queue = deque()
+ while n < len(contents):
+ size = next(sizes)
+ queue.append(contents[n:n+size])
+ n += size
+ del contents
# We use a real file object because it allows us to
# exercise situations where the GIL is released before
# writing the buffer to the raw streams. This is in addition
# to concurrency issues due to switching threads in the middle
# of Python code.
- with io.open(support.TESTFN, "wb", buffering=0) as raw:
- bufio = io.BufferedWriter(raw, 8)
+ with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
+ bufio = self.tp(raw, 8)
errors = []
def f():
try:
- # Write enough bytes to flush the buffer
- s = b"a" * 19
- for i in range(50):
+ while True:
+ try:
+ s = queue.popleft()
+ except IndexError:
+ return
bufio.write(s)
except Exception as e:
errors.append(e)
@@ -543,9 +863,21 @@
t.join()
self.assertFalse(errors,
"the following exceptions were caught: %r" % errors)
+ bufio.close()
+ with io.open(support.TESTFN, "rb") as f:
+ s = f.read()
+ for i in range(256):
+ self.assertEquals(s.count(bytes([i])), N)
finally:
support.unlink(support.TESTFN)
+ def testMisbehavedRawIO(self):
+ rawio = MisbehavedRawIO()
+ bufio = self.tp(rawio, 5)
+ self.assertRaises(IOError, bufio.seek, 0)
+ self.assertRaises(IOError, bufio.tell)
+ self.assertRaises(IOError, bufio.write, b"abcdef")
+
class BufferedRWPairTest(unittest.TestCase):
@@ -558,22 +890,29 @@
# XXX More Tests
-class BufferedRandomTest(unittest.TestCase):
+class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
+ tp = io.BufferedRandom
+ read_mode = "rb+"
+ write_mode = "wb+"
+
+ def testConstructor(self):
+ BufferedReaderTest.testConstructor(self)
+ BufferedWriterTest.testConstructor(self)
def testReadAndWrite(self):
raw = MockRawIO((b"asdf", b"ghjk"))
- rw = io.BufferedRandom(raw, 8, 12)
+ rw = self.tp(raw, 8, 12)
self.assertEqual(b"as", rw.read(2))
rw.write(b"ddd")
rw.write(b"eee")
self.assertFalse(raw._write_stack) # Buffer writes
- self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
+ self.assertEqual(b"ghjk", rw.read())
self.assertEquals(b"dddeee", raw._write_stack[0])
def testSeekAndTell(self):
raw = io.BytesIO(b"asdfghjkl")
- rw = io.BufferedRandom(raw)
+ rw = self.tp(raw)
self.assertEquals(b"as", rw.read(2))
self.assertEquals(2, rw.tell())
@@ -591,6 +930,95 @@
self.assertEquals(b"fl", rw.read(11))
self.assertRaises(TypeError, rw.seek, 0.0)
+ def check_flush_and_read(self, read_func):
+ raw = io.BytesIO(b"abcdefghi")
+ bufio = self.tp(raw)
+
+ self.assertEquals(b"ab", read_func(bufio, 2))
+ bufio.write(b"12")
+ self.assertEquals(b"ef", read_func(bufio, 2))
+ self.assertEquals(6, bufio.tell())
+ bufio.flush()
+ self.assertEquals(6, bufio.tell())
+ self.assertEquals(b"ghi", read_func(bufio))
+ raw.seek(0, 0)
+ raw.write(b"XYZ")
+ # flush() resets the read buffer
+ bufio.flush()
+ bufio.seek(0, 0)
+ self.assertEquals(b"XYZ", read_func(bufio, 3))
+
+ def testFlushAndRead(self):
+ self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
+
+ def testFlushAndReadinto(self):
+ def _readinto(bufio, n=-1):
+ b = bytearray(n if n >= 0 else 9999)
+ n = bufio.readinto(b)
+ return bytes(b[:n])
+ self.check_flush_and_read(_readinto)
+
+ def testFlushAndPeek(self):
+ def _peek(bufio, n=-1):
+ # This relies on the fact that the buffer can contain the whole
+ # raw stream, otherwise peek() can return less.
+ b = bufio.peek(n)
+ if n != -1:
+ b = b[:n]
+ bufio.seek(len(b), 1)
+ return b
+ self.check_flush_and_read(_peek)
+
+ def testFlushAndWrite(self):
+ raw = io.BytesIO(b"abcdefghi")
+ bufio = self.tp(raw)
+
+ bufio.write(b"123")
+ bufio.flush()
+ bufio.write(b"45")
+ bufio.flush()
+ bufio.seek(0, 0)
+ self.assertEquals(b"12345fghi", raw.getvalue())
+ self.assertEquals(b"12345fghi", bufio.read())
+
+ def testThreads(self):
+ BufferedReaderTest.testThreads(self)
+ BufferedWriterTest.testThreads(self)
+
+ def testWritesAndPeeks(self):
+ def _peek(bufio):
+ bufio.peek(1)
+ self.check_writes(_peek)
+ def _peek(bufio):
+ pos = bufio.tell()
+ bufio.seek(-1, 1)
+ bufio.peek(1)
+ bufio.seek(pos, 0)
+ self.check_writes(_peek)
+
+ def testWritesAndReads(self):
+ def _read(bufio):
+ bufio.seek(-1, 1)
+ bufio.read(1)
+ self.check_writes(_read)
+
+ def testWritesAndRead1s(self):
+ def _read1(bufio):
+ bufio.seek(-1, 1)
+ bufio.read1(1)
+ self.check_writes(_read1)
+
+ def testWritesAndReadintos(self):
+ def _read(bufio):
+ bufio.seek(-1, 1)
+ bufio.readinto(bytearray(1))
+ self.check_writes(_read)
+
+ def testMisbehavedRawIO(self):
+ BufferedReaderTest.testMisbehavedRawIO(self)
+ BufferedWriterTest.testMisbehavedRawIO(self)
+
+
# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
# properties:
# - A single output character can correspond to many bytes of input.
@@ -745,6 +1173,18 @@
def tearDown(self):
support.unlink(support.TESTFN)
+ def testConstructor(self):
+ r = io.BytesIO(b"\xc3\xa9\n\n")
+ b = io.BufferedReader(r, 1000)
+ t = io.TextIOWrapper(b)
+ t.__init__(b, encoding="latin1", newline="\r\n")
+ t.__init__(b, encoding="utf8")
+ self.assertEquals("\xe9\n", t.readline())
+ self.assertRaises(TypeError, t.__init__, b, newline=42)
+ self.assertRaises(ValueError, t.read)
+ self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
+ self.assertRaises(ValueError, t.read)
+
def testLineBuffering(self):
r = io.BytesIO()
b = io.BufferedWriter(r, 1000)
@@ -756,6 +1196,15 @@
t.write("A\rB")
self.assertEquals(r.getvalue(), b"XY\nZA\rB")
+ def testEncoding(self):
+ # Check the encoding attribute is always set, and valid
+ b = io.BytesIO()
+ t = io.TextIOWrapper(b, encoding="utf8")
+ self.assertEqual(t.encoding, "utf8")
+ t = io.TextIOWrapper(b)
+ self.assert_(t.encoding is not None)
+ codecs.lookup(t.encoding)
+
def testEncodingErrorsReading(self):
# (1) default
b = io.BytesIO(b"abc\n\xff\n")
@@ -798,40 +1247,6 @@
t.flush()
self.assertEquals(b.getvalue(), b"abc?def\n")
- def testNewlinesInput(self):
- testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
- normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
- for newline, expected in [
- (None, normalized.decode("ascii").splitlines(True)),
- ("", testdata.decode("ascii").splitlines(True)),
- ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
- ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
- ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
- ]:
- buf = io.BytesIO(testdata)
- txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
- self.assertEquals(txt.readlines(), expected)
- txt.seek(0)
- self.assertEquals(txt.read(), "".join(expected))
-
- def testNewlinesOutput(self):
- testdict = {
- "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
- "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
- "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
- "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
- }
- tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
- for newline, expected in tests:
- buf = io.BytesIO()
- txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
- txt.write("AAA\nB")
- txt.write("BB\nCCC\n")
- txt.write("X\rY\r\nZ")
- txt.flush()
- self.assertEquals(buf.closed, False)
- self.assertEquals(buf.getvalue(), expected)
-
def testNewlines(self):
input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
@@ -875,14 +1290,14 @@
self.assertEquals(len(got_lines), len(exp_lines))
def testNewlinesInput(self):
- testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
+ testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
for newline, expected in [
(None, normalized.decode("ascii").splitlines(True)),
("", testdata.decode("ascii").splitlines(True)),
- ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
- ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
- ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
+ ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+ ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+ ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
]:
buf = io.BytesIO(testdata)
txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
@@ -891,32 +1306,64 @@
self.assertEquals(txt.read(), "".join(expected))
def testNewlinesOutput(self):
- data = "AAA\nBBB\rCCC\n"
- data_lf = b"AAA\nBBB\rCCC\n"
- data_cr = b"AAA\rBBB\rCCC\r"
- data_crlf = b"AAA\r\nBBB\rCCC\r\n"
- save_linesep = os.linesep
- try:
- for os.linesep, newline, expected in [
- ("\n", None, data_lf),
- ("\r\n", None, data_crlf),
- ("\n", "", data_lf),
- ("\r\n", "", data_lf),
- ("\n", "\n", data_lf),
- ("\r\n", "\n", data_lf),
- ("\n", "\r", data_cr),
- ("\r\n", "\r", data_cr),
- ("\n", "\r\n", data_crlf),
- ("\r\n", "\r\n", data_crlf),
- ]:
- buf = io.BytesIO()
- txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
- txt.write(data)
- txt.close()
- self.assertEquals(buf.closed, True)
- self.assertRaises(ValueError, buf.getvalue)
- finally:
- os.linesep = save_linesep
+ testdict = {
+ "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+ "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+ "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
+ "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
+ }
+ tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
+ for newline, expected in tests:
+ buf = io.BytesIO()
+ txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
+ txt.write("AAA\nB")
+ txt.write("BB\nCCC\n")
+ txt.write("X\rY\r\nZ")
+ txt.flush()
+ self.assertEquals(buf.closed, False)
+ self.assertEquals(buf.getvalue(), expected)
+
+ def testDestructor(self):
+ l = []
+ class MyBytesIO(io.BytesIO):
+ def close(self):
+ l.append(self.getvalue())
+ io.BytesIO.close(self)
+ b = MyBytesIO()
+ t = io.TextIOWrapper(b, encoding="ascii")
+ t.write("abc")
+ del t
+ self.assertEquals([b"abc"], l)
+
+ def testOverrideDestructor(self):
+ record = []
+ class MyTextIO(io.TextIOWrapper):
+ def __del__(self):
+ record.append(1)
+ try:
+ f = io.TextIOWrapper.__del__
+ except AttributeError:
+ pass
+ else:
+ f(self)
+ def close(self):
+ record.append(2)
+ io.TextIOWrapper.close(self)
+ def flush(self):
+ record.append(3)
+ io.TextIOWrapper.flush(self)
+ b = io.BytesIO()
+ t = MyTextIO(b, encoding="ascii")
+ del t
+ self.assertEqual(record, [1, 2, 3])
+
+ def testErrorThroughDestructor(self):
+ # Test that the exception state is not modified by a destructor,
+ # even if close() fails.
+ rawio = CloseFailureIO()
+ def f():
+ io.TextIOWrapper(rawio).xyzzy
+ self.assertRaises(AttributeError, f)
# Systematic tests of the text I/O API
@@ -990,7 +1437,7 @@
f.close()
def testSeeking(self):
- chunk_size = io.TextIOWrapper._CHUNK_SIZE
+ chunk_size = _default_chunk_size()
prefix_size = chunk_size - 2
u_prefix = "a" * prefix_size
prefix = bytes(u_prefix.encode("utf-8"))
@@ -1021,9 +1468,7 @@
def testSeekAndTell(self):
"""Test seek/tell using the StatefulIncrementalDecoder."""
- # Make this test faster by forcing a smaller (but large enough)
- # chunk size. The bigger the chunker size, the slower seek() is,
- # as it tries to replay character decoding one byte at a time.
+ # Make test faster by doing smaller seeks
CHUNK_SIZE = 256
def testSeekAndTellWithData(data, min_pos=0):
@@ -1033,13 +1478,13 @@
f.write(data)
f.close()
f = io.open(support.TESTFN, encoding='test_decoder')
+ f._CHUNK_SIZE = CHUNK_SIZE
decoded = f.read()
f.close()
for i in range(min_pos, len(decoded) + 1): # seek positions
for j in [1, 5, len(decoded) - i]: # read lengths
f = io.open(support.TESTFN, encoding='test_decoder')
- f._CHUNK_SIZE = CHUNK_SIZE
self.assertEquals(f.read(i), decoded[:i])
cookie = f.tell()
self.assertEquals(f.read(j), decoded[i:i + j])
@@ -1057,6 +1502,7 @@
testSeekAndTellWithData(input)
# Position each test case so that it crosses a chunk boundary.
+ #CHUNK_SIZE = _default_chunk_size()
for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
offset = CHUNK_SIZE - len(input)//2
prefix = b'.'*offset
@@ -1328,52 +1774,16 @@
f.close()
g.close()
- def test_io_after_close(self):
- for kwargs in [
- {"mode": "w"},
- {"mode": "wb"},
- {"mode": "w", "buffering": 1},
- {"mode": "w", "buffering": 2},
- {"mode": "wb", "buffering": 0},
- {"mode": "r"},
- {"mode": "rb"},
- {"mode": "r", "buffering": 1},
- {"mode": "r", "buffering": 2},
- {"mode": "rb", "buffering": 0},
- {"mode": "w+"},
- {"mode": "w+b"},
- {"mode": "w+", "buffering": 1},
- {"mode": "w+", "buffering": 2},
- {"mode": "w+b", "buffering": 0},
- ]:
- f = io.open(support.TESTFN, **kwargs)
- f.close()
- self.assertRaises(ValueError, f.flush)
- self.assertRaises(ValueError, f.fileno)
- self.assertRaises(ValueError, f.isatty)
- self.assertRaises(ValueError, f.__iter__)
- if hasattr(f, "peek"):
- self.assertRaises(ValueError, f.peek, 1)
- self.assertRaises(ValueError, f.read)
- if hasattr(f, "read1"):
- self.assertRaises(ValueError, f.read1, 1024)
- if hasattr(f, "readinto"):
- self.assertRaises(ValueError, f.readinto, bytearray(1024))
- self.assertRaises(ValueError, f.readline)
- self.assertRaises(ValueError, f.readlines)
- self.assertRaises(ValueError, f.seek, 0)
- self.assertRaises(ValueError, f.tell)
- self.assertRaises(ValueError, f.truncate)
- self.assertRaises(ValueError, f.write, "")
- self.assertRaises(ValueError, f.writelines, [])
-
def test_main():
- support.run_unittest(IOTest, BytesIOTest, StringIOTest,
+ support.run_unittest(
+ IOTest, BytesIOTest, StringIOTest,
BufferedReaderTest, BufferedWriterTest,
BufferedRWPairTest, BufferedRandomTest,
StatefulIncrementalDecoderTest,
- TextIOWrapperTest, MiscIOTest)
+ TextIOWrapperTest, MiscIOTest
+ )
if __name__ == "__main__":
- unittest.main()
+ test_main()
+ #unittest.main()
More information about the Python-checkins
mailing list