[Python-checkins] cpython: Issue #21859: Added Python implementation of io.FileIO.

serhiy.storchaka python-checkins at python.org
Fri Apr 10 15:23:16 CEST 2015


https://hg.python.org/cpython/rev/9ef5765d56b7
changeset:   95527:9ef5765d56b7
user:        Serhiy Storchaka <storchaka at gmail.com>
date:        Fri Apr 10 16:16:16 2015 +0300
summary:
  Issue #21859: Added Python implementation of io.FileIO.

files:
  Lib/_pyio.py                |  344 ++++++++++++++++++++++++
  Lib/test/test_file_eintr.py |   40 ++-
  Lib/test/test_fileio.py     |  180 ++++++++----
  Misc/NEWS                   |    2 +
  4 files changed, 500 insertions(+), 66 deletions(-)


diff --git a/Lib/_pyio.py b/Lib/_pyio.py
--- a/Lib/_pyio.py
+++ b/Lib/_pyio.py
@@ -7,11 +7,16 @@
 import codecs
 import errno
 import array
+import stat
 # Import _thread instead of threading to reduce startup cost
 try:
     from _thread import allocate_lock as Lock
 except ImportError:
     from _dummy_thread import allocate_lock as Lock
+if os.name == 'win32':
+    from msvcrt import setmode as _setmode
+else:
+    _setmode = None
 
 import io
 from io import (__all__, SEEK_SET, SEEK_CUR, SEEK_END)
@@ -1378,6 +1383,345 @@
         return BufferedWriter.write(self, b)
 
 
+class FileIO(RawIOBase):
+    _fd = -1
+    _created = False
+    _readable = False
+    _writable = False
+    _appending = False
+    _seekable = None
+    _closefd = True
+
+    def __init__(self, file, mode='r', closefd=True, opener=None):
+        """Open a file.  The mode can be 'r' (default), 'w', 'x' or 'a' for reading,
+        writing, exclusive creation or appending.  The file will be created if it
+        doesn't exist when opened for writing or appending; it will be truncated
+        when opened for writing.  A FileExistsError will be raised if it already
+        exists when opened for creating. Opening a file for creating implies
+        writing so this mode behaves in a similar way to 'w'. Add a '+' to the mode
+        to allow simultaneous reading and writing. A custom opener can be used by
+        passing a callable as *opener*. The underlying file descriptor for the file
+        object is then obtained by calling opener with (*name*, *flags*).
+        *opener* must return an open file descriptor (passing os.open as *opener*
+        results in functionality similar to passing None).
+        """
+        if self._fd >= 0:
+            # Have to close the existing file first.
+            try:
+                if self._closefd:
+                    os.close(self._fd)
+            finally:
+                self._fd = -1
+
+        if isinstance(file, float):
+            raise TypeError('integer argument expected, got float')
+        if isinstance(file, int):
+            fd = file
+            if fd < 0:
+                raise ValueError('negative file descriptor')
+        else:
+            fd = -1
+
+        if not isinstance(mode, str):
+            raise TypeError('invalid mode: %s' % (mode,))
+        if not set(mode) <= set('xrwab+'):
+            raise ValueError('invalid mode: %s' % (mode,))
+        if sum(c in 'rwax' for c in mode) != 1 or mode.count('+') > 1:
+            raise ValueError('Must have exactly one of create/read/write/append '
+                             'mode and at most one plus')
+
+        if 'x' in mode:
+            self._created = True
+            self._writable = True
+            flags = os.O_EXCL | os.O_CREAT
+        elif 'r' in mode:
+            self._readable = True
+            flags = 0
+        elif 'w' in mode:
+            self._writable = True
+            flags = os.O_CREAT | os.O_TRUNC
+        elif 'a' in mode:
+            self._writable = True
+            self._appending = True
+            flags = os.O_APPEND | os.O_CREAT
+
+        if '+' in mode:
+            self._readable = True
+            self._writable = True
+
+        if self._readable and self._writable:
+            flags |= os.O_RDWR
+        elif self._readable:
+            flags |= os.O_RDONLY
+        else:
+            flags |= os.O_WRONLY
+
+        flags |= getattr(os, 'O_BINARY', 0)
+
+        noinherit_flag = (getattr(os, 'O_NOINHERIT', 0) or
+                          getattr(os, 'O_CLOEXEC', 0))
+        flags |= noinherit_flag
+
+        owned_fd = None
+        try:
+            if fd < 0:
+                if not closefd:
+                    raise ValueError('Cannot use closefd=False with file name')
+                if opener is None:
+                    fd = os.open(file, flags, 0o666)
+                else:
+                    fd = opener(file, flags)
+                    if not isinstance(fd, int):
+                        raise TypeError('expected integer from opener')
+                    if fd < 0:
+                        raise OSError('Negative file descriptor')
+                owned_fd = fd
+                if not noinherit_flag:
+                    os.set_inheritable(fd, False)
+
+            self._closefd = closefd
+            fdfstat = os.fstat(fd)
+            try:
+                if stat.S_ISDIR(fdfstat.st_mode):
+                    raise IsADirectoryError(errno.EISDIR,
+                                            os.strerror(errno.EISDIR), file)
+            except AttributeError:
+                # Ignore the AttribueError if stat.S_ISDIR or errno.EISDIR
+                # don't exist.
+                pass
+            self._blksize = getattr(fdfstat, 'st_blksize', 0)
+            if self._blksize <= 1:
+                self._blksize = DEFAULT_BUFFER_SIZE
+
+            if _setmode:
+                # don't translate newlines (\r\n <=> \n)
+                _setmode(fd, os.O_BINARY)
+
+            self.name = file
+            if self._appending:
+                # For consistent behaviour, we explicitly seek to the
+                # end of file (otherwise, it might be done only on the
+                # first write()).
+                os.lseek(fd, 0, SEEK_END)
+        except:
+            if owned_fd is not None:
+                os.close(owned_fd)
+            raise
+        self._fd = fd
+
+    def __del__(self):
+        if self._fd >= 0 and self._closefd and not self.closed:
+            import warnings
+            warnings.warn('unclosed file %r' % (self,), ResourceWarning,
+                          stacklevel=2)
+            self.close()
+
+    def __getstate__(self):
+        raise TypeError("cannot serialize '%s' object", self.__class__.__name__)
+
+    def __repr__(self):
+        class_name = '%s.%s' % (self.__class__.__module__,
+                                self.__class__.__qualname__)
+        if self.closed:
+            return '<%s [closed]>' % class_name
+        try:
+            name = self.name
+        except AttributeError:
+            return ('<%s fd=%d mode=%r closefd=%r>' %
+                    (class_name, self._fd, self.mode, self._closefd))
+        else:
+            return ('<%s name=%r mode=%r closefd=%r>' %
+                    (class_name, name, self.mode, self._closefd))
+
+    def _checkReadable(self):
+        if not self._readable:
+            raise UnsupportedOperation('File not open for reading')
+
+    def _checkWritable(self, msg=None):
+        if not self._writable:
+            raise UnsupportedOperation('File not open for writing')
+
+    def read(self, size=None):
+        """Read at most size bytes, returned as bytes.
+
+        Only makes one system call, so less data may be returned than requested
+        In non-blocking mode, returns None if no data is available.
+        Return an empty bytes object at EOF.
+        """
+        self._checkClosed()
+        self._checkReadable()
+        if size is None or size < 0:
+            return self.readall()
+        try:
+            return os.read(self._fd, size)
+        except BlockingIOError:
+            return None
+
+    def readall(self):
+        """Read all data from the file, returned as bytes.
+
+        In non-blocking mode, returns as much as is immediately available,
+        or None if no data is available.  Return an empty bytes object at EOF.
+        """
+        self._checkClosed()
+        self._checkReadable()
+        bufsize = DEFAULT_BUFFER_SIZE
+        try:
+            pos = os.lseek(self._fd, 0, SEEK_CUR)
+            end = os.fstat(self._fd).st_size
+            if end >= pos:
+                bufsize = end - pos + 1
+        except OSError:
+            pass
+
+        result = bytearray()
+        while True:
+            if len(result) >= bufsize:
+                bufsize = len(result)
+                bufsize += max(bufsize, DEFAULT_BUFFER_SIZE)
+            n = bufsize - len(result)
+            try:
+                chunk = os.read(self._fd, n)
+            except BlockingIOError:
+                if result:
+                    break
+                return None
+            if not chunk: # reached the end of the file
+                break
+            result += chunk
+
+        return bytes(result)
+
+    def readinto(self, b):
+        """Same as RawIOBase.readinto()."""
+        m = memoryview(b).cast('B')
+        data = self.read(len(m))
+        n = len(data)
+        m[:n] = data
+        return n
+
+    def write(self, b):
+        """Write bytes b to file, return number written.
+
+        Only makes one system call, so not all of the data may be written.
+        The number of bytes actually written is returned.  In non-blocking mode,
+        returns None if the write would block.
+        """
+        self._checkClosed()
+        self._checkWritable()
+        try:
+            return os.write(self._fd, b)
+        except BlockingIOError:
+            return None
+
+    def seek(self, pos, whence=SEEK_SET):
+        """Move to new file position.
+
+        Argument offset is a byte count.  Optional argument whence defaults to
+        SEEK_SET or 0 (offset from start of file, offset should be >= 0); other values
+        are SEEK_CUR or 1 (move relative to current position, positive or negative),
+        and SEEK_END or 2 (move relative to end of file, usually negative, although
+        many platforms allow seeking beyond the end of a file).
+
+        Note that not all file objects are seekable.
+        """
+        if isinstance(pos, float):
+            raise TypeError('an integer is required')
+        self._checkClosed()
+        return os.lseek(self._fd, pos, whence)
+
+    def tell(self):
+        """tell() -> int.  Current file position.
+
+        Can raise OSError for non seekable files."""
+        self._checkClosed()
+        return os.lseek(self._fd, 0, SEEK_CUR)
+
+    def truncate(self, size=None):
+        """Truncate the file to at most size bytes.
+
+        Size defaults to the current file position, as returned by tell().
+        The current file position is changed to the value of size.
+        """
+        self._checkClosed()
+        self._checkWritable()
+        if size is None:
+            size = self.tell()
+        os.ftruncate(self._fd, size)
+        return size
+
+    def close(self):
+        """Close the file.
+
+        A closed file cannot be used for further I/O operations.  close() may be
+        called more than once without error.
+        """
+        if not self.closed:
+            try:
+                if self._closefd:
+                    os.close(self._fd)
+            finally:
+                super().close()
+
+    def seekable(self):
+        """True if file supports random-access."""
+        self._checkClosed()
+        if self._seekable is None:
+            try:
+                self.tell()
+            except OSError:
+                self._seekable = False
+            else:
+                self._seekable = True
+        return self._seekable
+
+    def readable(self):
+        """True if file was opened in a read mode."""
+        self._checkClosed()
+        return self._readable
+
+    def writable(self):
+        """True if file was opened in a write mode."""
+        self._checkClosed()
+        return self._writable
+
+    def fileno(self):
+        """Return the underlying file descriptor (an integer)."""
+        self._checkClosed()
+        return self._fd
+
+    def isatty(self):
+        """True if the file is connected to a TTY device."""
+        self._checkClosed()
+        return os.isatty(self._fd)
+
+    @property
+    def closefd(self):
+        """True if the file descriptor will be closed by close()."""
+        return self._closefd
+
+    @property
+    def mode(self):
+        """String giving the file mode"""
+        if self._created:
+            if self._readable:
+                return 'xb+'
+            else:
+                return 'xb'
+        elif self._appending:
+            if self._readable:
+                return 'ab+'
+            else:
+                return 'ab'
+        elif self._readable:
+            if self._writable:
+                return 'rb+'
+            else:
+                return 'rb'
+        else:
+            return 'wb'
+
+
 class TextIOBase(IOBase):
 
     """Base class for text I/O.
diff --git a/Lib/test/test_file_eintr.py b/Lib/test/test_file_eintr.py
--- a/Lib/test/test_file_eintr.py
+++ b/Lib/test/test_file_eintr.py
@@ -18,11 +18,12 @@
 import unittest
 
 # Test import all of the things we're about to try testing up front.
-from _io import FileIO
+import _io
+import _pyio
 
 
 @unittest.skipUnless(os.name == 'posix', 'tests requires a posix system.')
-class TestFileIOSignalInterrupt(unittest.TestCase):
+class TestFileIOSignalInterrupt:
     def setUp(self):
         self._process = None
 
@@ -38,8 +39,9 @@
 
         subclasseses should override this to test different IO objects.
         """
-        return ('import _io ;'
-                'infile = _io.FileIO(sys.stdin.fileno(), "rb")')
+        return ('import %s as io ;'
+                'infile = io.FileIO(sys.stdin.fileno(), "rb")' %
+                self.modname)
 
     def fail_with_process_info(self, why, stdout=b'', stderr=b'',
                                communicate=True):
@@ -179,11 +181,19 @@
                         expected=b'hello\nworld!\n'))
 
 
+class CTestFileIOSignalInterrupt(TestFileIOSignalInterrupt, unittest.TestCase):
+    modname = '_io'
+
+class PyTestFileIOSignalInterrupt(TestFileIOSignalInterrupt, unittest.TestCase):
+    modname = '_pyio'
+
+
 class TestBufferedIOSignalInterrupt(TestFileIOSignalInterrupt):
     def _generate_infile_setup_code(self):
         """Returns the infile = ... line of code to make a BufferedReader."""
-        return ('infile = open(sys.stdin.fileno(), "rb") ;'
-                'import _io ;assert isinstance(infile, _io.BufferedReader)')
+        return ('import %s as io ;infile = io.open(sys.stdin.fileno(), "rb") ;'
+                'assert isinstance(infile, io.BufferedReader)' %
+                self.modname)
 
     def test_readall(self):
         """BufferedReader.read() must handle signals and not lose data."""
@@ -193,12 +203,20 @@
                         read_method_name='read',
                         expected=b'hello\nworld!\n'))
 
+class CTestBufferedIOSignalInterrupt(TestBufferedIOSignalInterrupt, unittest.TestCase):
+    modname = '_io'
+
+class PyTestBufferedIOSignalInterrupt(TestBufferedIOSignalInterrupt, unittest.TestCase):
+    modname = '_pyio'
+
 
 class TestTextIOSignalInterrupt(TestFileIOSignalInterrupt):
     def _generate_infile_setup_code(self):
         """Returns the infile = ... line of code to make a TextIOWrapper."""
-        return ('infile = open(sys.stdin.fileno(), "rt", newline=None) ;'
-                'import _io ;assert isinstance(infile, _io.TextIOWrapper)')
+        return ('import %s as io ;'
+                'infile = io.open(sys.stdin.fileno(), "rt", newline=None) ;'
+                'assert isinstance(infile, io.TextIOWrapper)' %
+                self.modname)
 
     def test_readline(self):
         """readline() must handle signals and not lose data."""
@@ -224,6 +242,12 @@
                         read_method_name='read',
                         expected="hello\nworld!\n"))
 
+class CTestTextIOSignalInterrupt(TestTextIOSignalInterrupt, unittest.TestCase):
+    modname = '_io'
+
+class PyTestTextIOSignalInterrupt(TestTextIOSignalInterrupt, unittest.TestCase):
+    modname = '_pyio'
+
 
 def test_main():
     test_cases = [
diff --git a/Lib/test/test_fileio.py b/Lib/test/test_fileio.py
--- a/Lib/test/test_fileio.py
+++ b/Lib/test/test_fileio.py
@@ -12,13 +12,15 @@
 from test.support import TESTFN, check_warnings, run_unittest, make_bad_fd, cpython_only
 from collections import UserList
 
-from _io import FileIO as _FileIO
+import _io  # C implementation of io
+import _pyio # Python implementation of io
 
-class AutoFileTests(unittest.TestCase):
+
+class AutoFileTests:
     # file tests for which a test file is automatically set up
 
     def setUp(self):
-        self.f = _FileIO(TESTFN, 'w')
+        self.f = self.FileIO(TESTFN, 'w')
 
     def tearDown(self):
         if self.f:
@@ -69,20 +71,60 @@
             blksize = getattr(fst, 'st_blksize', blksize)
         self.assertEqual(self.f._blksize, blksize)
 
-    def testReadinto(self):
-        # verify readinto
-        self.f.write(bytes([1, 2]))
+    # verify readinto
+    def testReadintoByteArray(self):
+        self.f.write(bytes([1, 2, 0, 255]))
         self.f.close()
-        a = array('b', b'x'*10)
-        self.f = _FileIO(TESTFN, 'r')
-        n = self.f.readinto(a)
-        self.assertEqual(array('b', [1, 2]), a[:n])
+
+        ba = bytearray(b'abcdefgh')
+        with self.FileIO(TESTFN, 'r') as f:
+            n = f.readinto(ba)
+        self.assertEqual(ba, b'\x01\x02\x00\xffefgh')
+        self.assertEqual(n, 4)
+
+    def _testReadintoMemoryview(self):
+        self.f.write(bytes([1, 2, 0, 255]))
+        self.f.close()
+
+        m = memoryview(bytearray(b'abcdefgh'))
+        with self.FileIO(TESTFN, 'r') as f:
+            n = f.readinto(m)
+        self.assertEqual(m, b'\x01\x02\x00\xffefgh')
+        self.assertEqual(n, 4)
+
+        m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2])
+        with self.FileIO(TESTFN, 'r') as f:
+            n = f.readinto(m)
+        self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh')
+        self.assertEqual(n, 4)
+
+    def _testReadintoArray(self):
+        self.f.write(bytes([1, 2, 0, 255]))
+        self.f.close()
+
+        a = array('B', b'abcdefgh')
+        with self.FileIO(TESTFN, 'r') as f:
+            n = f.readinto(a)
+        self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104]))
+        self.assertEqual(n, 4)
+
+        a = array('b', b'abcdefgh')
+        with self.FileIO(TESTFN, 'r') as f:
+            n = f.readinto(a)
+        self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104]))
+        self.assertEqual(n, 4)
+
+        a = array('I', b'abcdefgh')
+        with self.FileIO(TESTFN, 'r') as f:
+            n = f.readinto(a)
+        self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh'))
+        self.assertEqual(n, 4)
 
     def testWritelinesList(self):
         l = [b'123', b'456']
         self.f.writelines(l)
         self.f.close()
-        self.f = _FileIO(TESTFN, 'rb')
+        self.f = self.FileIO(TESTFN, 'rb')
         buf = self.f.read()
         self.assertEqual(buf, b'123456')
 
@@ -90,7 +132,7 @@
         l = UserList([b'123', b'456'])
         self.f.writelines(l)
         self.f.close()
-        self.f = _FileIO(TESTFN, 'rb')
+        self.f = self.FileIO(TESTFN, 'rb')
         buf = self.f.read()
         self.assertEqual(buf, b'123456')
 
@@ -102,7 +144,7 @@
     def test_none_args(self):
         self.f.write(b"hi\nbye\nabc")
         self.f.close()
-        self.f = _FileIO(TESTFN, 'r')
+        self.f = self.FileIO(TESTFN, 'r')
         self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
         self.f.seek(0)
         self.assertEqual(self.f.readline(None), b"hi\n")
@@ -112,23 +154,24 @@
         self.assertRaises(TypeError, self.f.write, "Hello!")
 
     def testRepr(self):
-        self.assertEqual(
-            repr(self.f), "<_io.FileIO name=%r mode=%r closefd=True>"
-                          % (self.f.name, self.f.mode))
+        self.assertEqual(repr(self.f),
+                         "<%s.FileIO name=%r mode=%r closefd=True>" %
+                         (self.modulename, self.f.name, self.f.mode))
         del self.f.name
-        self.assertEqual(
-            repr(self.f), "<_io.FileIO fd=%r mode=%r closefd=True>"
-                          % (self.f.fileno(), self.f.mode))
+        self.assertEqual(repr(self.f),
+                         "<%s.FileIO fd=%r mode=%r closefd=True>" %
+                         (self.modulename, self.f.fileno(), self.f.mode))
         self.f.close()
-        self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")
+        self.assertEqual(repr(self.f),
+                         "<%s.FileIO [closed]>" % (self.modulename,))
 
     def testReprNoCloseFD(self):
         fd = os.open(TESTFN, os.O_RDONLY)
         try:
-            with _FileIO(fd, 'r', closefd=False) as f:
+            with self.FileIO(fd, 'r', closefd=False) as f:
                 self.assertEqual(repr(f),
-                                 "<_io.FileIO name=%r mode=%r closefd=False>"
-                                 % (f.name, f.mode))
+                                 "<%s.FileIO name=%r mode=%r closefd=False>" %
+                                 (self.modulename, f.name, f.mode))
         finally:
             os.close(fd)
 
@@ -140,15 +183,15 @@
         self.assertRaises(ValueError, f.read, 10) # Open for reading
         f.close()
         self.assertTrue(f.closed)
-        f = _FileIO(TESTFN, 'r')
+        f = self.FileIO(TESTFN, 'r')
         self.assertRaises(TypeError, f.readinto, "")
         self.assertTrue(not f.closed)
         f.close()
         self.assertTrue(f.closed)
 
     def testMethods(self):
-        methods = ['fileno', 'isatty', 'read', 'readinto',
-                   'seek', 'tell', 'truncate', 'write', 'seekable',
+        methods = ['fileno', 'isatty', 'read',
+                   'tell', 'truncate', 'seekable',
                    'readable', 'writable']
 
         self.f.close()
@@ -158,13 +201,16 @@
             method = getattr(self.f, methodname)
             # should raise on closed file
             self.assertRaises(ValueError, method)
+        self.assertRaises(ValueError, self.f.readinto, bytearray())
+        self.assertRaises(ValueError, self.f.seek, 0, os.SEEK_CUR)
+        self.assertRaises(ValueError, self.f.write, b'')
 
     def testOpendir(self):
         # Issue 3703: opening a directory should fill the errno
         # Windows always returns "[Errno 13]: Permission denied
         # Unix uses fstat and returns "[Errno 21]: Is a directory"
         try:
-            _FileIO('.', 'r')
+            self.FileIO('.', 'r')
         except OSError as e:
             self.assertNotEqual(e.errno, 0)
             self.assertEqual(e.filename, ".")
@@ -175,7 +221,7 @@
     def testOpenDirFD(self):
         fd = os.open('.', os.O_RDONLY)
         with self.assertRaises(OSError) as cm:
-            _FileIO(fd, 'r')
+            self.FileIO(fd, 'r')
         os.close(fd)
         self.assertEqual(cm.exception.errno, errno.EISDIR)
 
@@ -260,7 +306,7 @@
             self.f.close()
         except OSError:
             pass
-        self.f = _FileIO(TESTFN, 'r')
+        self.f = self.FileIO(TESTFN, 'r')
         os.close(self.f.fileno())
         return self.f
 
@@ -280,23 +326,32 @@
         a = array('b', b'x'*10)
         f.readinto(a)
 
-class OtherFileTests(unittest.TestCase):
+class CAutoFileTests(AutoFileTests, unittest.TestCase):
+    FileIO = _io.FileIO
+    modulename = '_io'
+
+class PyAutoFileTests(AutoFileTests, unittest.TestCase):
+    FileIO = _pyio.FileIO
+    modulename = '_pyio'
+
+
+class OtherFileTests:
 
     def testAbles(self):
         try:
-            f = _FileIO(TESTFN, "w")
+            f = self.FileIO(TESTFN, "w")
             self.assertEqual(f.readable(), False)
             self.assertEqual(f.writable(), True)
             self.assertEqual(f.seekable(), True)
             f.close()
 
-            f = _FileIO(TESTFN, "r")
+            f = self.FileIO(TESTFN, "r")
             self.assertEqual(f.readable(), True)
             self.assertEqual(f.writable(), False)
             self.assertEqual(f.seekable(), True)
             f.close()
 
-            f = _FileIO(TESTFN, "a+")
+            f = self.FileIO(TESTFN, "a+")
             self.assertEqual(f.readable(), True)
             self.assertEqual(f.writable(), True)
             self.assertEqual(f.seekable(), True)
@@ -305,7 +360,7 @@
 
             if sys.platform != "win32":
                 try:
-                    f = _FileIO("/dev/tty", "a")
+                    f = self.FileIO("/dev/tty", "a")
                 except OSError:
                     # When run in a cron job there just aren't any
                     # ttys, so skip the test.  This also handles other
@@ -328,7 +383,7 @@
         # check invalid mode strings
         for mode in ("", "aU", "wU+", "rw", "rt"):
             try:
-                f = _FileIO(TESTFN, mode)
+                f = self.FileIO(TESTFN, mode)
             except ValueError:
                 pass
             else:
@@ -344,7 +399,7 @@
                           ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
                           ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
                 # read modes are last so that TESTFN will exist first
-                with _FileIO(TESTFN, modes[0]) as f:
+                with self.FileIO(TESTFN, modes[0]) as f:
                     self.assertEqual(f.mode, modes[1])
         finally:
             if os.path.exists(TESTFN):
@@ -352,7 +407,7 @@
 
     def testUnicodeOpen(self):
         # verify repr works for unicode too
-        f = _FileIO(str(TESTFN), "w")
+        f = self.FileIO(str(TESTFN), "w")
         f.close()
         os.unlink(TESTFN)
 
@@ -362,7 +417,7 @@
             fn = TESTFN.encode("ascii")
         except UnicodeEncodeError:
             self.skipTest('could not encode %r to ascii' % TESTFN)
-        f = _FileIO(fn, "w")
+        f = self.FileIO(fn, "w")
         try:
             f.write(b"abc")
             f.close()
@@ -373,28 +428,21 @@
 
     def testConstructorHandlesNULChars(self):
         fn_with_NUL = 'foo\0bar'
-        self.assertRaises(ValueError, _FileIO, fn_with_NUL, 'w')
-        self.assertRaises(ValueError, _FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
+        self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w')
+        self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
 
     def testInvalidFd(self):
-        self.assertRaises(ValueError, _FileIO, -10)
-        self.assertRaises(OSError, _FileIO, make_bad_fd())
+        self.assertRaises(ValueError, self.FileIO, -10)
+        self.assertRaises(OSError, self.FileIO, make_bad_fd())
         if sys.platform == 'win32':
             import msvcrt
             self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd())
 
-    @cpython_only
-    def testInvalidFd_overflow(self):
-        # Issue 15989
-        import _testcapi
-        self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1)
-        self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1)
-
     def testBadModeArgument(self):
         # verify that we get a sensible error message for bad mode argument
         bad_mode = "qwerty"
         try:
-            f = _FileIO(TESTFN, bad_mode)
+            f = self.FileIO(TESTFN, bad_mode)
         except ValueError as msg:
             if msg.args[0] != 0:
                 s = str(msg)
@@ -407,7 +455,7 @@
             self.fail("no error for invalid mode: %s" % bad_mode)
 
     def testTruncate(self):
-        f = _FileIO(TESTFN, 'w')
+        f = self.FileIO(TESTFN, 'w')
         f.write(bytes(bytearray(range(10))))
         self.assertEqual(f.tell(), 10)
         f.truncate(5)
@@ -422,11 +470,11 @@
         def bug801631():
             # SF bug <http://www.python.org/sf/801631>
             # "file.truncate fault on windows"
-            f = _FileIO(TESTFN, 'w')
+            f = self.FileIO(TESTFN, 'w')
             f.write(bytes(range(11)))
             f.close()
 
-            f = _FileIO(TESTFN,'r+')
+            f = self.FileIO(TESTFN,'r+')
             data = f.read(5)
             if data != bytes(range(5)):
                 self.fail("Read on file opened for update failed %r" % data)
@@ -466,19 +514,19 @@
                 pass
 
     def testInvalidInit(self):
-        self.assertRaises(TypeError, _FileIO, "1", 0, 0)
+        self.assertRaises(TypeError, self.FileIO, "1", 0, 0)
 
     def testWarnings(self):
         with check_warnings(quiet=True) as w:
             self.assertEqual(w.warnings, [])
-            self.assertRaises(TypeError, _FileIO, [])
+            self.assertRaises(TypeError, self.FileIO, [])
             self.assertEqual(w.warnings, [])
-            self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")
+            self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt")
             self.assertEqual(w.warnings, [])
 
     def testUnclosedFDOnException(self):
         class MyException(Exception): pass
-        class MyFileIO(_FileIO):
+        class MyFileIO(self.FileIO):
             def __setattr__(self, name, value):
                 if name == "name":
                     raise MyException("blocked setting name")
@@ -487,12 +535,28 @@
         self.assertRaises(MyException, MyFileIO, fd)
         os.close(fd)  # should not raise OSError(EBADF)
 
+class COtherFileTests(OtherFileTests, unittest.TestCase):
+    FileIO = _io.FileIO
+    modulename = '_io'
+
+    @cpython_only
+    def testInvalidFd_overflow(self):
+        # Issue 15989
+        import _testcapi
+        self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1)
+        self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1)
+
+class PyOtherFileTests(OtherFileTests, unittest.TestCase):
+    FileIO = _pyio.FileIO
+    modulename = '_pyio'
+
 
 def test_main():
     # Historically, these tests have been sloppy about removing TESTFN.
     # So get rid of it no matter what.
     try:
-        run_unittest(AutoFileTests, OtherFileTests)
+        run_unittest(CAutoFileTests, PyAutoFileTests,
+                     COtherFileTests, PyOtherFileTests)
     finally:
         if os.path.exists(TESTFN):
             os.unlink(TESTFN)
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -19,6 +19,8 @@
 Library
 -------
 
+- Issue #21859: Added Python implementation of io.FileIO.
+
 - Issue #23865: close() methods in multiple modules now are idempotent and more
   robust at shutdown. If needs to release multiple resources, they are released
   even if errors are occured.

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list