[Python-checkins] cpython: Issue #8796: codecs.open() calls the builtin open() function instead of using

victor.stinner python-checkins at python.org
Fri May 27 01:53:59 CEST 2011


http://hg.python.org/cpython/rev/3555cf6f9c98
changeset:   70430:3555cf6f9c98
user:        Victor Stinner <victor.stinner at haypocalc.com>
date:        Fri May 27 01:51:18 2011 +0200
summary:
  Issue #8796: codecs.open() calls the builtin open() function instead of using
StreamReaderWriter. Deprecate StreamReader, StreamWriter, StreamReaderWriter,
StreamRecoder and EncodedFile() of the codec module. Use the builtin open()
function or io.TextIOWrapper instead.

files:
  Doc/library/codecs.rst  |   25 ++++
  Lib/codecs.py           |   25 ++--
  Lib/test/test_codecs.py |  152 +++++++++++++++++++--------
  Misc/NEWS               |    5 +
  4 files changed, 148 insertions(+), 59 deletions(-)


diff --git a/Doc/library/codecs.rst b/Doc/library/codecs.rst
--- a/Doc/library/codecs.rst
+++ b/Doc/library/codecs.rst
@@ -85,6 +85,9 @@
    In case a search function cannot find a given encoding, it should return
    ``None``.
 
+   .. deprecated:: 3.3
+      *streamreader* and *streamwriter* attributes are now deprecated.
+
 
 .. function:: lookup(encoding)
 
@@ -139,6 +142,8 @@
 
    Raises a :exc:`LookupError` in case the encoding cannot be found.
 
+   .. deprecated:: 3.3
+
 
 .. function:: getwriter(encoding)
 
@@ -147,6 +152,8 @@
 
    Raises a :exc:`LookupError` in case the encoding cannot be found.
 
+   .. deprecated:: 3.3
+
 
 .. function:: register_error(name, error_handler)
 
@@ -217,6 +224,11 @@
 
    .. note::
 
+      This function is kept for backward compatibility with Python 2, the
+      builtin :func:`open` function should be used instead.
+
+   .. note::
+
       The wrapped version's methods will accept and return strings only.  Bytes
       arguments will be rejected.
 
@@ -251,6 +263,8 @@
    ``'strict'``, which causes :exc:`ValueError` to be raised in case an encoding
    error occurs.
 
+   .. deprecated:: 3.3
+
 
 .. function:: iterencode(iterator, encoding, errors='strict', **kwargs)
 
@@ -563,6 +577,9 @@
 following methods which every stream writer must define in order to be
 compatible with the Python codec registry.
 
+.. deprecated:: 3.3
+   Use the builtin the :class:`io.TextIOWrapper` class.
+
 
 .. class:: StreamWriter(stream[, errors])
 
@@ -628,6 +645,9 @@
 following methods which every stream reader must define in order to be
 compatible with the Python codec registry.
 
+.. deprecated:: 3.3
+   Use the builtin the :class:`io.TextIOWrapper` class.
+
 
 .. class:: StreamReader(stream[, errors])
 
@@ -728,6 +748,9 @@
 The design is such that one can use the factory functions returned by the
 :func:`lookup` function to construct the instance.
 
+.. deprecated:: 3.3
+   Use the :class:`io.TextIOWrapper` class.
+
 
 .. class:: StreamReaderWriter(stream, Reader, Writer, errors)
 
@@ -752,6 +775,8 @@
 The design is such that one can use the factory functions returned by the
 :func:`lookup` function to construct the instance.
 
+.. deprecated:: 3.3
+
 
 .. class:: StreamRecoder(stream, encode, decode, Reader, Writer, errors)
 
diff --git a/Lib/codecs.py b/Lib/codecs.py
--- a/Lib/codecs.py
+++ b/Lib/codecs.py
@@ -345,6 +345,8 @@
             The set of allowed parameter values can be extended via
             register_error.
         """
+        import warnings
+        warnings.warn('use io.TextIOWrapper', DeprecationWarning, stacklevel=2)
         self.stream = stream
         self.errors = errors
 
@@ -416,6 +418,8 @@
             The set of allowed parameter values can be extended via
             register_error.
         """
+        import warnings
+        warnings.warn('use io.TextIOWrapper', DeprecationWarning, stacklevel=2)
         self.stream = stream
         self.errors = errors
         self.bytebuffer = b""
@@ -846,7 +850,7 @@
 
 ### Shortcuts
 
-def open(filename, mode='rb', encoding=None, errors='strict', buffering=1):
+def open(filename, mode='r', encoding=None, errors=None, buffering=1):
 
     """ Open an encoded file using the given mode and return
         a wrapped version providing transparent encoding/decoding.
@@ -877,18 +881,13 @@
         parameter.
 
     """
-    if encoding is not None and \
-       'b' not in mode:
-        # Force opening of the file in binary mode
-        mode = mode + 'b'
-    file = builtins.open(filename, mode, buffering)
-    if encoding is None:
-        return file
-    info = lookup(encoding)
-    srw = StreamReaderWriter(file, info.streamreader, info.streamwriter, errors)
-    # Add attributes to simplify introspection
-    srw.encoding = encoding
-    return srw
+    if encoding is not None:
+        return builtins.open(filename, mode, buffering,
+                             encoding, errors, newline='')
+    else:
+        if 'b' not in mode:
+            mode = mode + 'b'
+        return builtins.open(filename, mode, buffering, encoding, errors)
 
 def EncodedFile(file, data_encoding, file_encoding=None, errors='strict'):
 
diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py
--- a/Lib/test/test_codecs.py
+++ b/Lib/test/test_codecs.py
@@ -1,7 +1,10 @@
 from test import support
+import _testcapi
+import codecs
+import io
+import sys
 import unittest
-import codecs
-import sys, _testcapi, io
+import warnings
 
 class Queue(object):
     """
@@ -63,7 +66,9 @@
         # the StreamReader and check that the results equal the appropriate
         # entries from partialresults.
         q = Queue(b"")
-        r = codecs.getreader(self.encoding)(q)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            r = codecs.getreader(self.encoding)(q)
         result = ""
         for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
             q.write(bytes([c]))
@@ -106,7 +111,9 @@
             return codecs.getreader(self.encoding)(stream)
 
         def readalllines(input, keepends=True, size=None):
-            reader = getreader(input)
+            with warnings.catch_warnings():
+                warnings.simplefilter("ignore", DeprecationWarning)
+                reader = getreader(input)
             lines = []
             while True:
                 line = reader.readline(size=size, keepends=keepends)
@@ -215,14 +222,18 @@
             '                \r\n',
         ]
         stream = io.BytesIO("".join(s).encode(self.encoding))
-        reader = codecs.getreader(self.encoding)(stream)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            reader = codecs.getreader(self.encoding)(stream)
         for (i, line) in enumerate(reader):
             self.assertEqual(line, s[i])
 
     def test_readlinequeue(self):
         q = Queue(b"")
-        writer = codecs.getwriter(self.encoding)(q)
-        reader = codecs.getreader(self.encoding)(q)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            writer = codecs.getwriter(self.encoding)(q)
+            reader = codecs.getreader(self.encoding)(q)
 
         # No lineends
         writer.write("foo\r")
@@ -253,7 +264,9 @@
 
         s = (s1+s2+s3).encode(self.encoding)
         stream = io.BytesIO(s)
-        reader = codecs.getreader(self.encoding)(stream)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            reader = codecs.getreader(self.encoding)(stream)
         self.assertEqual(reader.readline(), s1)
         self.assertEqual(reader.readline(), s2)
         self.assertEqual(reader.readline(), s3)
@@ -268,7 +281,9 @@
 
         s = (s1+s2+s3+s4+s5).encode(self.encoding)
         stream = io.BytesIO(s)
-        reader = codecs.getreader(self.encoding)(stream)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            reader = codecs.getreader(self.encoding)(stream)
         self.assertEqual(reader.readline(), s1)
         self.assertEqual(reader.readline(), s2)
         self.assertEqual(reader.readline(), s3)
@@ -290,7 +305,9 @@
         _,_,reader,writer = codecs.lookup(self.encoding)
         # encode some stream
         s = io.BytesIO()
-        f = writer(s)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            f = writer(s)
         f.write("spam")
         f.write("spam")
         d = s.getvalue()
@@ -298,16 +315,22 @@
         self.assertTrue(d == self.spamle or d == self.spambe)
         # try to read it back
         s = io.BytesIO(d)
-        f = reader(s)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            f = reader(s)
         self.assertEqual(f.read(), "spamspam")
 
     def test_badbom(self):
         s = io.BytesIO(4*b"\xff")
-        f = codecs.getreader(self.encoding)(s)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            f = codecs.getreader(self.encoding)(s)
         self.assertRaises(UnicodeError, f.read)
 
         s = io.BytesIO(8*b"\xff")
-        f = codecs.getreader(self.encoding)(s)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            f = codecs.getreader(self.encoding)(s)
         self.assertRaises(UnicodeError, f.read)
 
     def test_partial(self):
@@ -454,7 +477,9 @@
         _,_,reader,writer = codecs.lookup(self.encoding)
         # encode some stream
         s = io.BytesIO()
-        f = writer(s)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            f = writer(s)
         f.write("spam")
         f.write("spam")
         d = s.getvalue()
@@ -462,16 +487,22 @@
         self.assertTrue(d == self.spamle or d == self.spambe)
         # try to read it back
         s = io.BytesIO(d)
-        f = reader(s)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            f = reader(s)
         self.assertEqual(f.read(), "spamspam")
 
     def test_badbom(self):
         s = io.BytesIO(b"\xff\xff")
-        f = codecs.getreader(self.encoding)(s)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            f = codecs.getreader(self.encoding)(s)
         self.assertRaises(UnicodeError, f.read)
 
         s = io.BytesIO(b"\xff\xff\xff\xff")
-        f = codecs.getreader(self.encoding)(s)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            f = codecs.getreader(self.encoding)(s)
         self.assertRaises(UnicodeError, f.read)
 
     def test_partial(self):
@@ -517,7 +548,8 @@
         self.addCleanup(support.unlink, support.TESTFN)
         with open(support.TESTFN, 'wb') as fp:
             fp.write(s)
-        with codecs.open(support.TESTFN, 'U', encoding=self.encoding) as reader:
+        with codecs.open(support.TESTFN, 'U',
+                         encoding=self.encoding) as reader:
             self.assertEqual(reader.read(), s1)
 
 class UTF16LETest(ReadTest):
@@ -705,7 +737,9 @@
         reader = codecs.getreader("utf-8-sig")
         for sizehint in [None] + list(range(1, 11)) + \
                         [64, 128, 256, 512, 1024]:
-            istream = reader(io.BytesIO(bytestring))
+            with warnings.catch_warnings():
+                warnings.simplefilter("ignore", DeprecationWarning)
+                istream = reader(io.BytesIO(bytestring))
             ostream = io.StringIO()
             while 1:
                 if sizehint is not None:
@@ -727,7 +761,9 @@
         reader = codecs.getreader("utf-8-sig")
         for sizehint in [None] + list(range(1, 11)) + \
                         [64, 128, 256, 512, 1024]:
-            istream = reader(io.BytesIO(bytestring))
+            with warnings.catch_warnings():
+                warnings.simplefilter("ignore", DeprecationWarning)
+                istream = reader(io.BytesIO(bytestring))
             ostream = io.StringIO()
             while 1:
                 if sizehint is not None:
@@ -749,7 +785,9 @@
 class RecodingTest(unittest.TestCase):
     def test_recoding(self):
         f = io.BytesIO()
-        f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
         f2.write("a")
         f2.close()
         # Python used to crash on this at exit because of a refcount
@@ -1126,7 +1164,9 @@
         self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
 
     def test_stream(self):
-        r = codecs.getreader("idna")(io.BytesIO(b"abc"))
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            r = codecs.getreader("idna")(io.BytesIO(b"abc"))
         r.read(3)
         self.assertEqual(r.read(), "")
 
@@ -1233,18 +1273,24 @@
 class StreamReaderTest(unittest.TestCase):
 
     def setUp(self):
-        self.reader = codecs.getreader('utf-8')
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            self.reader = codecs.getreader('utf-8')
         self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
 
     def test_readlines(self):
-        f = self.reader(self.stream)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            f = self.reader(self.stream)
         self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00'])
 
 class EncodedFileTest(unittest.TestCase):
 
     def test_basic(self):
         f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
-        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
         self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')
 
         f = io.BytesIO()
@@ -1388,7 +1434,9 @@
             if encoding not in broken_unicode_with_streams:
                 # check stream reader/writer
                 q = Queue(b"")
-                writer = codecs.getwriter(encoding)(q)
+                with warnings.catch_warnings():
+                    warnings.simplefilter("ignore", DeprecationWarning)
+                    writer = codecs.getwriter(encoding)(q)
                 encodedresult = b""
                 for c in s:
                     writer.write(c)
@@ -1396,7 +1444,9 @@
                     self.assertTrue(type(chunk) is bytes, type(chunk))
                     encodedresult += chunk
                 q = Queue(b"")
-                reader = codecs.getreader(encoding)(q)
+                with warnings.catch_warnings():
+                    warnings.simplefilter("ignore", DeprecationWarning)
+                    reader = codecs.getreader(encoding)(q)
                 decodedresult = ""
                 for c in encodedresult:
                     q.write(bytes([c]))
@@ -1470,7 +1520,9 @@
                 continue
             if encoding in broken_unicode_with_streams:
                 continue
-            reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
+            with warnings.catch_warnings():
+                warnings.simplefilter("ignore", DeprecationWarning)
+                reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
             for t in range(5):
                 # Test that calling seek resets the internal codec state and buffers
                 reader.seek(0, 0)
@@ -1539,15 +1591,19 @@
 class WithStmtTest(unittest.TestCase):
     def test_encodedfile(self):
         f = io.BytesIO(b"\xc3\xbc")
-        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
-            self.assertEqual(ef.read(), b"\xfc")
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
+                self.assertEqual(ef.read(), b"\xfc")
 
     def test_streamreaderwriter(self):
         f = io.BytesIO(b"\xc3\xbc")
         info = codecs.lookup("utf-8")
-        with codecs.StreamReaderWriter(f, info.streamreader,
-                                       info.streamwriter, 'strict') as srw:
-            self.assertEqual(srw.read(), "\xfc")
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", DeprecationWarning)
+            with codecs.StreamReaderWriter(f, info.streamreader,
+                                           info.streamwriter, 'strict') as srw:
+                self.assertEqual(srw.read(), "\xfc")
 
 class TypesTest(unittest.TestCase):
     def test_decode_unicode(self):
@@ -1644,15 +1700,15 @@
 
             # (StreamWriter) Check that the BOM is written after a seek(0)
             with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
-                f.writer.write(data[0])
-                self.assertNotEqual(f.writer.tell(), 0)
-                f.writer.seek(0)
-                f.writer.write(data)
+                f.write(data[0])
+                self.assertNotEqual(f.tell(), 0)
+                f.seek(0)
+                f.write(data)
                 f.seek(0)
                 self.assertEqual(f.read(), data)
 
-            # Check that the BOM is not written after a seek() at a position
-            # different than the start
+            # Check that the BOM is not written after a seek() at a
+            # position different than the start
             with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
                 f.write(data)
                 f.seek(f.tell())
@@ -1660,12 +1716,12 @@
                 f.seek(0)
                 self.assertEqual(f.read(), data * 2)
 
-            # (StreamWriter) Check that the BOM is not written after a seek()
-            # at a position different than the start
+            # (StreamWriter) Check that the BOM is not written after a
+            # seek() at a position different than the start
             with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
-                f.writer.write(data)
-                f.writer.seek(f.writer.tell())
-                f.writer.write(data)
+                f.write(data)
+                f.seek(f.tell())
+                f.write(data)
                 f.seek(0)
                 self.assertEqual(f.read(), data * 2)
 
@@ -1704,7 +1760,9 @@
     def test_read(self):
         for encoding in bytes_transform_encodings:
             sin = codecs.encode(b"\x80", encoding)
-            reader = codecs.getreader(encoding)(io.BytesIO(sin))
+            with warnings.catch_warnings():
+                warnings.simplefilter("ignore", DeprecationWarning)
+                reader = codecs.getreader(encoding)(io.BytesIO(sin))
             sout = reader.read()
             self.assertEqual(sout, b"\x80")
 
@@ -1713,7 +1771,9 @@
             if encoding in ['uu_codec', 'zlib_codec']:
                 continue
             sin = codecs.encode(b"\x80", encoding)
-            reader = codecs.getreader(encoding)(io.BytesIO(sin))
+            with warnings.catch_warnings():
+                warnings.simplefilter("ignore", DeprecationWarning)
+                reader = codecs.getreader(encoding)(io.BytesIO(sin))
             sout = reader.readline()
             self.assertEqual(sout, b"\x80")
 
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -161,6 +161,11 @@
 Library
 -------
 
+- Issue #8796: codecs.open() calls the builtin open() function instead of using
+  StreamReaderWriter. Deprecate StreamReader, StreamWriter, StreamReaderWriter,
+  StreamRecoder and EncodedFile() of the codec module. Use the builtin open()
+  function or io.TextIOWrapper instead.
+
 - Issue #12175: BufferedReader.read(-1) now calls raw.readall() if available.
 
 - Issue #12175: FileIO.readall() now only reads the file position and size

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list