[pypy-commit] pypy default: issue 2389: (take 2) redo faulty logic by copying runicode.str_decode_utf8_impl

mattip pypy.commits at gmail.com
Tue Oct 22 14:37:45 EDT 2019


Author: Matti Picus <matti.picus at gmail.com>
Branch: 
Changeset: r97836:c4b55d31320d
Date: 2019-10-22 20:20 +0300
http://bitbucket.org/pypy/pypy/changeset/c4b55d31320d/

Log:	issue 2389: (take 2) redo faulty logic by copying
	runicode.str_decode_utf8_impl

diff --git a/pypy/interpreter/unicodehelper.py b/pypy/interpreter/unicodehelper.py
--- a/pypy/interpreter/unicodehelper.py
+++ b/pypy/interpreter/unicodehelper.py
@@ -7,6 +7,7 @@
 from rpython.rlib.rarithmetic import r_uint, intmask
 from rpython.rtyper.lltypesystem import rffi
 from pypy.module.unicodedata.interp_ucd import unicodedb
+from rpython.rlib import runicode
 
 @specialize.memo()
 def decode_error_handler(space):
@@ -222,14 +223,12 @@
 
 if sys.platform == 'win32':
     def utf8_encode_mbcs(s, errors, errorhandler):
-        from rpython.rlib import runicode
         s = s.decode('utf-8')
         slen = len(s)
         res = runicode.unicode_encode_mbcs(s, slen, errors, errorhandler)
         return res
 
     def str_decode_mbcs(s, errors, final, errorhandler):
-        from rpython.rlib import runicode
         slen = len(s)
         res, size = runicode.str_decode_mbcs(s, slen, final=final, errors=errors,
                                            errorhandler=errorhandler)
@@ -240,134 +239,156 @@
     valid so we're trying to either raise or pack stuff with error handler.
     The key difference is that this is call_may_force
     """
-    slen = len(s)
-    res = StringBuilder(slen)
+
+    # Adapted from str_decode_utf_8_impl but with a StringBuilder, not a
+    # UnicodeBuilder
+    size = len(s)
+    result = StringBuilder(size)
     pos = 0
-    end = len(s)
-    suppressing = False # we are in a chain of "bad" unicode, only emit one fix
-    while pos < end:
+    while pos < size:
         ordch1 = ord(s[pos])
         # fast path for ASCII
-        if ordch1 <= 0x7F:
+        if ordch1 < 0x80:
+            result.append(chr(ordch1))
             pos += 1
-            res.append(chr(ordch1))
-            suppressing = False
             continue
 
-        if ordch1 <= 0xC1:
-            r, pos = errorhandler(errors, "utf8", "invalid start byte",
-                    s, pos, pos + 1)
-            if not suppressing:
-                res.append(r)
-            continue
+        n = ord(runicode._utf8_code_length[ordch1 - 0x80])
+        if pos + n > size:
+            if not final:
+                break
+            # argh, this obscure block of code is mostly a copy of
+            # what follows :-(
+            charsleft = size - pos - 1 # either 0, 1, 2
+            # note: when we get the 'unexpected end of data' we need
+            # to care about the pos returned; it can be lower than size,
+            # in case we need to continue running this loop
+            if not charsleft:
+                # there's only the start byte and nothing else
+                r, pos = errorhandler(errors, 'utf8',
+                                      'unexpected end of data',
+                                      s, pos, pos+1)
+                result.append(r)
+                continue
+            ordch2 = ord(s[pos+1])
+            if n == 3:
+                # 3-bytes seq with only a continuation byte
+                if rutf8._invalid_byte_2_of_3(ordch1, ordch2, True):
+                    # second byte invalid, take the first and continue
+                    r, pos = errorhandler(errors, 'utf8',
+                                          'invalid continuation byte',
+                                          s, pos, pos+1)
+                    result.append(r)
+                    continue
+                else:
+                    # second byte valid, but third byte missing
+                    r, pos = errorhandler(errors, 'utf8',
+                                      'unexpected end of data',
+                                      s, pos, pos+2)
+                    result.append(r)
+                    continue
+            elif n == 4:
+                # 4-bytes seq with 1 or 2 continuation bytes
+                if rutf8._invalid_byte_2_of_4(ordch1, ordch2):
+                    # second byte invalid, take the first and continue
+                    r, pos = errorhandler(errors, 'utf8',
+                                          'invalid continuation byte',
+                                          s, pos, pos+1)
+                    result.append(r)
+                    continue
+                elif charsleft == 2 and rutf8._invalid_byte_3_of_4(ord(s[pos+2])):
+                    # third byte invalid, take the first two and continue
+                    r, pos = errorhandler(errors, 'utf8',
+                                          'invalid continuation byte',
+                                          s, pos, pos+2)
+                    result.append(r)
+                    continue
+                else:
+                    # there's only 1 or 2 valid cb, but the others are missing
+                    r, pos = errorhandler(errors, 'utf8',
+                                      'unexpected end of data',
+                                      s, pos, pos+charsleft+1)
+                    result.append(r)
+                    continue
+            raise AssertionError("unreachable")
 
-        pos += 1
+        if n == 0:
+            r, pos = errorhandler(errors, 'utf8',
+                                  'invalid start byte',
+                                  s, pos, pos+1)
+            result.append(r)
 
-        if ordch1 <= 0xDF:
-            if pos >= end:
-                if not final:
-                    pos -= 1
-                    break
-                r, pos = errorhandler(errors, "utf8", "unexpected end of data",
-                    s, pos - 1, pos)
-                if not suppressing:
-                    res.append(r)
-                continue
-            ordch2 = ord(s[pos])
+        elif n == 1:
+            assert 0, "ascii should have gone through the fast path"
 
+        elif n == 2:
+            ordch2 = ord(s[pos+1])
             if rutf8._invalid_byte_2_of_2(ordch2):
-                r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
-                    s, pos - 1, pos)
-                if not suppressing:
-                    res.append(r)
+                r, pos = errorhandler(errors, 'utf8',
+                                      'invalid continuation byte',
+                                      s, pos, pos+1)
+                result.append(r)
                 continue
             # 110yyyyy 10zzzzzz -> 00000000 00000yyy yyzzzzzz
-            pos += 1
-            res.append(chr(ordch1))
-            res.append(chr(ordch2))
-            continue
+            result.append(chr(ordch1))
+            result.append(chr(ordch2))
+            pos += 2
 
-        if ordch1 <= 0xEF:
-            if (pos + 2) > end:
-                if not final:
-                    pos -= 1
-                    break
-                r, pos = errorhandler(errors, "utf8", "unexpected end of data",
-                    s, pos - 1, pos)
-                res.append(r)
-                suppressing = True
-                continue
-            ordch2 = ord(s[pos])
-            ordch3 = ord(s[pos + 1])
+        elif n == 3:
+            ordch2 = ord(s[pos + 1])
+            ordch3 = ord(s[pos + 2])
 
             if rutf8._invalid_byte_2_of_3(ordch1, ordch2, True):
-                r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
-                    s, pos - 1, pos)
-                if not suppressing:
-                    res.append(r)
+                r, pos = errorhandler(errors, 'utf8',
+                                      'invalid continuation byte',
+                                      s, pos, pos + 1)
+                result.append(r)
                 continue
             elif rutf8._invalid_byte_3_of_3(ordch3):
-                r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
-                    s, pos - 1, pos + 1)
-                if not suppressing:
-                    res.append(r)
+                r, pos = errorhandler(errors, 'utf8',
+                                      "invalid continuation byte",
+                                      s, pos, pos + 2)
+                result.append(r)
                 continue
-            pos += 2
 
             # 1110xxxx 10yyyyyy 10zzzzzz -> 00000000 xxxxyyyy yyzzzzzz
-            res.append(chr(ordch1))
-            res.append(chr(ordch2))
-            res.append(chr(ordch3))
-            suppressing = False
-            continue
+            result.append(chr(ordch1))
+            result.append(chr(ordch2))
+            result.append(chr(ordch3))
+            pos += 3
 
-        if ordch1 <= 0xF4:
-            if (pos + 3) > end:
-                if not final:
-                    pos -= 1
-                    break
-                r, pos = errorhandler(errors, "utf8", "unexpected end of data",
-                    s, pos - 1, pos)
-                res.append(r)
-                suppressing = True
-                continue
-            ordch2 = ord(s[pos])
-            ordch3 = ord(s[pos + 1])
-            ordch4 = ord(s[pos + 2])
+        elif n == 4:
+            ordch2 = ord(s[pos + 1])
+            ordch3 = ord(s[pos + 2])
+            ordch4 = ord(s[pos + 3])
 
             if rutf8._invalid_byte_2_of_4(ordch1, ordch2):
-                r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
-                    s, pos - 1, pos)
-                if not suppressing:
-                    res.append(r)
+                r, pos = errorhandler(errors, 'utf8',
+                                      'invalid continuation byte',
+                                      s, pos, pos + 1)
+                result.append(r)
                 continue
             elif rutf8._invalid_byte_3_of_4(ordch3):
-                r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
-                    s, pos - 1, pos + 1)
-                res.append(r)
+                r, pos = errorhandler(errors, 'utf8',
+                                     'invalid continuation byte',
+                                     s, pos, pos + 2)
+                result.append(r)
                 continue
             elif rutf8._invalid_byte_4_of_4(ordch4):
-                r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
-                    s, pos - 1, pos + 2)
-                if not suppressing:
-                    res.append(r)
+                r, pos = errorhandler(errors, 'utf8',
+                                      'invalid continuation byte',
+                                      s, pos, pos + 3)
+                result.append(r)
                 continue
 
-            pos += 3
             # 11110www 10xxxxxx 10yyyyyy 10zzzzzz -> 000wwwxx xxxxyyyy yyzzzzzz
-            res.append(chr(ordch1))
-            res.append(chr(ordch2))
-            res.append(chr(ordch3))
-            res.append(chr(ordch4))
-            suppressing = False
-            continue
+            result.append(chr(ordch1))
+            result.append(chr(ordch2))
+            result.append(chr(ordch3))
+            result.append(chr(ordch4))
+            pos += 4
 
-        r, pos = errorhandler(errors, "utf8", "invalid start byte",
-                s, pos - 1, pos)
-        if not suppressing:
-            res.append(r)
-
-    r = res.build()
+    r = result.build()
     return r, pos, rutf8.check_utf8(r, True)
 
 hexdigits = "0123456789ABCDEFabcdef"
diff --git a/pypy/module/_codecs/test/test_codecs.py b/pypy/module/_codecs/test/test_codecs.py
--- a/pypy/module/_codecs/test/test_codecs.py
+++ b/pypy/module/_codecs/test/test_codecs.py
@@ -938,3 +938,17 @@
         errors = []
         assert sin.encode("iso-8859-15", "test.record") == "\xac\xa4"
         assert errors == [u'\u1234\u1234', u'\u8000']
+
+    def test_last_byte_handler(self):
+        # issue bb-2389
+        import _codecs
+        _codecs.register_error('custom_replace', lambda exc: (u'\ufffd', exc.start+1))
+        for s, res in ((b"WORD\xe3\xab",
+                            (u'WORD\ufffd\ufffd', u'WORD\ufffd')),
+                       (b"\xef\xbb\xbfWORD\xe3\xabWORD2",
+                            (u'\ufeffWORD\ufffd\ufffdWORD2',
+                             u'\ufeffWORD\ufffdWORD2'))):
+            r = s.decode('utf8', 'replace')
+            assert r == res[1]
+            r = s.decode('utf8', 'custom_replace')
+            assert r == res[0]


More information about the pypy-commit mailing list