[pypy-commit] pypy default: issue 2389: (take 2) redo faulty logic by copying runicode.str_decode_utf8_impl
mattip
pypy.commits at gmail.com
Tue Oct 22 14:37:45 EDT 2019
Author: Matti Picus <matti.picus at gmail.com>
Branch:
Changeset: r97836:c4b55d31320d
Date: 2019-10-22 20:20 +0300
http://bitbucket.org/pypy/pypy/changeset/c4b55d31320d/
Log: issue 2389: (take 2) redo faulty logic by copying
runicode.str_decode_utf8_impl
diff --git a/pypy/interpreter/unicodehelper.py b/pypy/interpreter/unicodehelper.py
--- a/pypy/interpreter/unicodehelper.py
+++ b/pypy/interpreter/unicodehelper.py
@@ -7,6 +7,7 @@
from rpython.rlib.rarithmetic import r_uint, intmask
from rpython.rtyper.lltypesystem import rffi
from pypy.module.unicodedata.interp_ucd import unicodedb
+from rpython.rlib import runicode
@specialize.memo()
def decode_error_handler(space):
@@ -222,14 +223,12 @@
if sys.platform == 'win32':
def utf8_encode_mbcs(s, errors, errorhandler):
- from rpython.rlib import runicode
s = s.decode('utf-8')
slen = len(s)
res = runicode.unicode_encode_mbcs(s, slen, errors, errorhandler)
return res
def str_decode_mbcs(s, errors, final, errorhandler):
- from rpython.rlib import runicode
slen = len(s)
res, size = runicode.str_decode_mbcs(s, slen, final=final, errors=errors,
errorhandler=errorhandler)
@@ -240,134 +239,156 @@
valid so we're trying to either raise or pack stuff with error handler.
The key difference is that this is call_may_force
"""
- slen = len(s)
- res = StringBuilder(slen)
+
+ # Adapted from str_decode_utf_8_impl but with a StringBuilder, not a
+ # UnicodeBuilder
+ size = len(s)
+ result = StringBuilder(size)
pos = 0
- end = len(s)
- suppressing = False # we are in a chain of "bad" unicode, only emit one fix
- while pos < end:
+ while pos < size:
ordch1 = ord(s[pos])
# fast path for ASCII
- if ordch1 <= 0x7F:
+ if ordch1 < 0x80:
+ result.append(chr(ordch1))
pos += 1
- res.append(chr(ordch1))
- suppressing = False
continue
- if ordch1 <= 0xC1:
- r, pos = errorhandler(errors, "utf8", "invalid start byte",
- s, pos, pos + 1)
- if not suppressing:
- res.append(r)
- continue
+ n = ord(runicode._utf8_code_length[ordch1 - 0x80])
+ if pos + n > size:
+ if not final:
+ break
+ # argh, this obscure block of code is mostly a copy of
+ # what follows :-(
+ charsleft = size - pos - 1 # either 0, 1, 2
+ # note: when we get the 'unexpected end of data' we need
+ # to care about the pos returned; it can be lower than size,
+ # in case we need to continue running this loop
+ if not charsleft:
+ # there's only the start byte and nothing else
+ r, pos = errorhandler(errors, 'utf8',
+ 'unexpected end of data',
+ s, pos, pos+1)
+ result.append(r)
+ continue
+ ordch2 = ord(s[pos+1])
+ if n == 3:
+ # 3-bytes seq with only a continuation byte
+ if rutf8._invalid_byte_2_of_3(ordch1, ordch2, True):
+ # second byte invalid, take the first and continue
+ r, pos = errorhandler(errors, 'utf8',
+ 'invalid continuation byte',
+ s, pos, pos+1)
+ result.append(r)
+ continue
+ else:
+ # second byte valid, but third byte missing
+ r, pos = errorhandler(errors, 'utf8',
+ 'unexpected end of data',
+ s, pos, pos+2)
+ result.append(r)
+ continue
+ elif n == 4:
+ # 4-bytes seq with 1 or 2 continuation bytes
+ if rutf8._invalid_byte_2_of_4(ordch1, ordch2):
+ # second byte invalid, take the first and continue
+ r, pos = errorhandler(errors, 'utf8',
+ 'invalid continuation byte',
+ s, pos, pos+1)
+ result.append(r)
+ continue
+ elif charsleft == 2 and rutf8._invalid_byte_3_of_4(ord(s[pos+2])):
+ # third byte invalid, take the first two and continue
+ r, pos = errorhandler(errors, 'utf8',
+ 'invalid continuation byte',
+ s, pos, pos+2)
+ result.append(r)
+ continue
+ else:
+ # there's only 1 or 2 valid cb, but the others are missing
+ r, pos = errorhandler(errors, 'utf8',
+ 'unexpected end of data',
+ s, pos, pos+charsleft+1)
+ result.append(r)
+ continue
+ raise AssertionError("unreachable")
- pos += 1
+ if n == 0:
+ r, pos = errorhandler(errors, 'utf8',
+ 'invalid start byte',
+ s, pos, pos+1)
+ result.append(r)
- if ordch1 <= 0xDF:
- if pos >= end:
- if not final:
- pos -= 1
- break
- r, pos = errorhandler(errors, "utf8", "unexpected end of data",
- s, pos - 1, pos)
- if not suppressing:
- res.append(r)
- continue
- ordch2 = ord(s[pos])
+ elif n == 1:
+ assert 0, "ascii should have gone through the fast path"
+ elif n == 2:
+ ordch2 = ord(s[pos+1])
if rutf8._invalid_byte_2_of_2(ordch2):
- r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
- s, pos - 1, pos)
- if not suppressing:
- res.append(r)
+ r, pos = errorhandler(errors, 'utf8',
+ 'invalid continuation byte',
+ s, pos, pos+1)
+ result.append(r)
continue
# 110yyyyy 10zzzzzz -> 00000000 00000yyy yyzzzzzz
- pos += 1
- res.append(chr(ordch1))
- res.append(chr(ordch2))
- continue
+ result.append(chr(ordch1))
+ result.append(chr(ordch2))
+ pos += 2
- if ordch1 <= 0xEF:
- if (pos + 2) > end:
- if not final:
- pos -= 1
- break
- r, pos = errorhandler(errors, "utf8", "unexpected end of data",
- s, pos - 1, pos)
- res.append(r)
- suppressing = True
- continue
- ordch2 = ord(s[pos])
- ordch3 = ord(s[pos + 1])
+ elif n == 3:
+ ordch2 = ord(s[pos + 1])
+ ordch3 = ord(s[pos + 2])
if rutf8._invalid_byte_2_of_3(ordch1, ordch2, True):
- r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
- s, pos - 1, pos)
- if not suppressing:
- res.append(r)
+ r, pos = errorhandler(errors, 'utf8',
+ 'invalid continuation byte',
+ s, pos, pos + 1)
+ result.append(r)
continue
elif rutf8._invalid_byte_3_of_3(ordch3):
- r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
- s, pos - 1, pos + 1)
- if not suppressing:
- res.append(r)
+ r, pos = errorhandler(errors, 'utf8',
+ "invalid continuation byte",
+ s, pos, pos + 2)
+ result.append(r)
continue
- pos += 2
# 1110xxxx 10yyyyyy 10zzzzzz -> 00000000 xxxxyyyy yyzzzzzz
- res.append(chr(ordch1))
- res.append(chr(ordch2))
- res.append(chr(ordch3))
- suppressing = False
- continue
+ result.append(chr(ordch1))
+ result.append(chr(ordch2))
+ result.append(chr(ordch3))
+ pos += 3
- if ordch1 <= 0xF4:
- if (pos + 3) > end:
- if not final:
- pos -= 1
- break
- r, pos = errorhandler(errors, "utf8", "unexpected end of data",
- s, pos - 1, pos)
- res.append(r)
- suppressing = True
- continue
- ordch2 = ord(s[pos])
- ordch3 = ord(s[pos + 1])
- ordch4 = ord(s[pos + 2])
+ elif n == 4:
+ ordch2 = ord(s[pos + 1])
+ ordch3 = ord(s[pos + 2])
+ ordch4 = ord(s[pos + 3])
if rutf8._invalid_byte_2_of_4(ordch1, ordch2):
- r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
- s, pos - 1, pos)
- if not suppressing:
- res.append(r)
+ r, pos = errorhandler(errors, 'utf8',
+ 'invalid continuation byte',
+ s, pos, pos + 1)
+ result.append(r)
continue
elif rutf8._invalid_byte_3_of_4(ordch3):
- r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
- s, pos - 1, pos + 1)
- res.append(r)
+ r, pos = errorhandler(errors, 'utf8',
+ 'invalid continuation byte',
+ s, pos, pos + 2)
+ result.append(r)
continue
elif rutf8._invalid_byte_4_of_4(ordch4):
- r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
- s, pos - 1, pos + 2)
- if not suppressing:
- res.append(r)
+ r, pos = errorhandler(errors, 'utf8',
+ 'invalid continuation byte',
+ s, pos, pos + 3)
+ result.append(r)
continue
- pos += 3
# 11110www 10xxxxxx 10yyyyyy 10zzzzzz -> 000wwwxx xxxxyyyy yyzzzzzz
- res.append(chr(ordch1))
- res.append(chr(ordch2))
- res.append(chr(ordch3))
- res.append(chr(ordch4))
- suppressing = False
- continue
+ result.append(chr(ordch1))
+ result.append(chr(ordch2))
+ result.append(chr(ordch3))
+ result.append(chr(ordch4))
+ pos += 4
- r, pos = errorhandler(errors, "utf8", "invalid start byte",
- s, pos - 1, pos)
- if not suppressing:
- res.append(r)
-
- r = res.build()
+ r = result.build()
return r, pos, rutf8.check_utf8(r, True)
hexdigits = "0123456789ABCDEFabcdef"
diff --git a/pypy/module/_codecs/test/test_codecs.py b/pypy/module/_codecs/test/test_codecs.py
--- a/pypy/module/_codecs/test/test_codecs.py
+++ b/pypy/module/_codecs/test/test_codecs.py
@@ -938,3 +938,17 @@
errors = []
assert sin.encode("iso-8859-15", "test.record") == "\xac\xa4"
assert errors == [u'\u1234\u1234', u'\u8000']
+
+ def test_last_byte_handler(self):
+ # issue bb-2389
+ import _codecs
+ _codecs.register_error('custom_replace', lambda exc: (u'\ufffd', exc.start+1))
+ for s, res in ((b"WORD\xe3\xab",
+ (u'WORD\ufffd\ufffd', u'WORD\ufffd')),
+ (b"\xef\xbb\xbfWORD\xe3\xabWORD2",
+ (u'\ufeffWORD\ufffd\ufffdWORD2',
+ u'\ufeffWORD\ufffdWORD2'))):
+ r = s.decode('utf8', 'replace')
+ assert r == res[1]
+ r = s.decode('utf8', 'custom_replace')
+ assert r == res[0]
More information about the pypy-commit
mailing list