[pypy-commit] pypy unicode-utf8: get back to the point of passing objspace tests with utf8 turnaround removed from codecs
fijal
pypy.commits at gmail.com
Fri Nov 17 08:05:13 EST 2017
Author: fijal
Branch: unicode-utf8
Changeset: r93068:1e43261d5fd9
Date: 2017-11-17 14:04 +0100
http://bitbucket.org/pypy/pypy/changeset/1e43261d5fd9/
Log: get back to the point of passing objspace tests with utf8 turnaround
removed from codecs
diff --git a/pypy/interpreter/unicodehelper.py b/pypy/interpreter/unicodehelper.py
--- a/pypy/interpreter/unicodehelper.py
+++ b/pypy/interpreter/unicodehelper.py
@@ -68,6 +68,308 @@
flag = rutf8.FLAG_REGULAR
return flag
+# These functions take and return unwrapped rpython strings
+def decode_unicode_escape(space, string):
+ state = space.fromcache(interp_codecs.CodecState)
+ unicodedata_handler = state.get_unicodedata_handler(space)
+ result_utf8, consumed, length, flag = str_decode_unicode_escape(
+ string, "strict",
+ final=True,
+ errorhandler=decode_error_handler(space),
+ ud_handler=unicodedata_handler)
+ return result_utf8, length, flag
+
+def decode_raw_unicode_escape(space, string):
+ result_utf8, consumed, lgt, flag = str_decode_raw_unicode_escape(
+ string, "strict",
+ final=True, errorhandler=decode_error_handler(space))
+ return result_utf8, lgt, flag
+
+def check_ascii_or_raise(space, string):
+ try:
+ rutf8.check_ascii(string)
+ except rutf8.CheckError as e:
+ decode_error_handler(space)('strict', 'ascii',
+ 'ordinal not in range(128)', string,
+ e.pos, e.pos + 1)
+ assert False, "unreachable"
+
+def check_utf8_or_raise(space, string):
+ # Surrogates are accepted and not treated specially at all.
+ # If there happen to be two 3-bytes encoding a pair of surrogates,
+ # you still get two surrogate unicode characters in the result.
+ # These are the Python2 rules; Python3 differs.
+ try:
+ length, flag = rutf8.check_utf8(string, allow_surrogates=True)
+ except rutf8.CheckError as e:
+ # convert position into unicode position
+ lgt, flags = rutf8.check_utf8(string, True, stop=e.pos)
+ decode_error_handler(space)('strict', 'utf8', 'invalid utf-8', string,
+ lgt, lgt + 1)
+ assert False, "unreachable"
+ return length, flag
+
+def decode_utf8(space, s):
+ # DEPRECATED
+ return (s, check_utf8_or_raise(space, s))
+
+def str_decode_ascii(s, errors, final, errorhandler):
+ try:
+ rutf8.check_ascii(s)
+ return s, len(s), len(s), rutf8.FLAG_ASCII
+ except rutf8.CheckError:
+ return _str_decode_ascii_slowpath(s, errors, final, errorhandler)
+
+def _str_decode_ascii_slowpath(s, errors, final, errorhandler):
+ i = 0
+ res = StringBuilder()
+ while i < len(s):
+ ch = s[i]
+ if ord(ch) > 0x7F:
+ r, i = errorhandler(errors, 'ascii', 'ordinal not in range(128)',
+ s, i, i + 1)
+ res.append(r)
+ else:
+ res.append(ch)
+ i += 1
+ ress = res.build()
+ lgt, flag = rutf8.check_utf8(ress, True)
+ return ress, len(s), lgt, flag
+
+def str_decode_latin_1(s, errors, final, errorhandler):
+ xxx
+
+def utf8_encode_latin_1(s, errors, errorhandler):
+ try:
+ rutf8.check_ascii(s)
+ return s
+ except rutf8.CheckError:
+ return _utf8_encode_latin_1_slowpath(s, errors, errorhandler)
+
+def _utf8_encode_latin_1_slowpath(s, errors, errorhandler):
+ res = StringBuilder(len(s))
+ size = len(s)
+ cur = 0
+ i = 0
+ while i < size:
+ if ord(s[i]) <= 0x7F:
+ res.append(s[i])
+ else:
+ oc = rutf8.codepoint_at_pos(s, i)
+ if oc <= 0xFF:
+ res.append(chr(oc))
+ i += 1
+ else:
+ r, pos = errorhandler(errors, 'latin1',
+ 'ordinal not in range(256)', s, cur,
+ cur + 1)
+ res.append(r)
+ for j in range(pos - cur):
+ i = rutf8.next_codepoint_pos(s, i)
+ cur = pos
+ cur += 1
+ i += 1
+ r = res.build()
+ return r
+
+class DecodeWrapper(object):
+ def __init__(self, handler):
+ self.orig = handler
+
+ def handle(self, errors, encoding, msg, s, pos, endpos):
+ return self.orig(errors, encoding, msg, s, pos, endpos)
+
+class EncodeWrapper(object):
+ def __init__(self, handler):
+ self.orig = handler
+
+ def handle(self, errors, encoding, msg, s, pos, endpos):
+ return self.orig(errors, encoding, msg, s.encode("utf8"), pos, endpos)
+
+#def str_decode_unicode_escape(s, slen, errors, final, errorhandler, ud_handler):
+# w = DecodeWrapper(errorhandler)
+# u, pos = runicode.str_decode_unicode_escape(s, slen, errors, final,
+# w.handle,
+# ud_handler)
+# return u.encode('utf8'), pos, len(u), _get_flag(u)
+
+def setup_new_encoders_legacy(encoding):
+ encoder_name = 'utf8_encode_' + encoding
+ encoder_call_name = 'unicode_encode_' + encoding
+ decoder_name = 'str_decode_' + encoding
+ def encoder(utf8, errors, errorhandler):
+ u = utf8.decode("utf8")
+ w = EncodeWrapper(errorhandler)
+ return getattr(runicode, encoder_call_name)(u, len(u), errors,
+ w.handle)
+ def decoder(s, slen, errors, final, errorhandler):
+ w = DecodeWrapper((errorhandler))
+ u, pos = getattr(runicode, decoder_name)(s, slen, errors, final, w.handle)
+ return u.encode('utf8'), pos, len(u), _get_flag(u)
+ encoder.__name__ = encoder_name
+ decoder.__name__ = decoder_name
+ if encoder_name not in globals():
+ globals()[encoder_name] = encoder
+ if decoder_name not in globals():
+ globals()[decoder_name] = decoder
+
+def setup():
+ for encoding in ['utf_16', 'utf_16_le', 'utf_16_be', 'utf_32_le', 'utf_32',
+ 'utf_32_be', 'unicode_internal']:
+ setup_new_encoders_legacy(encoding)
+
+setup()
+
+def utf8_encode_ascii(utf8, errors, errorhandler):
+ """ Don't be confused - this is a slowpath for errors e.g. "ignore"
+ or an obscure errorhandler
+ """
+ res = StringBuilder()
+ i = 0
+ pos = 0
+ while i < len(utf8):
+ ch = rutf8.codepoint_at_pos(utf8, i)
+ if ch >= 0x7F:
+ msg = "ordinal not in range(128)"
+ r, newpos = errorhandler(errors, 'ascii', msg, utf8,
+ pos, pos + 1)
+ for _ in range(newpos - pos):
+ i = rutf8.next_codepoint_pos(utf8, i)
+ pos = newpos
+ res.append(r)
+ else:
+ res.append(chr(ch))
+ i = rutf8.next_codepoint_pos(utf8, i)
+ pos += 1
+
+ s = res.build()
+ return s
+
+def str_decode_utf8(s, errors, final, errorhandler):
+ """ Same as checking for the valid utf8, but we know the utf8 is not
+ valid so we're trying to either raise or pack stuff with error handler.
+ The key difference is that this is call_may_force
+ """
+ slen = len(s)
+ res = StringBuilder(slen)
+ pos = 0
+ continuation_bytes = 0
+ end = len(s)
+ while pos < end:
+ ordch1 = ord(s[pos])
+ # fast path for ASCII
+ if ordch1 <= 0x7F:
+ pos += 1
+ res.append(chr(ordch1))
+ continue
+
+ if ordch1 <= 0xC1:
+ r, pos = errorhandler(errors, "utf8", "invalid start byte",
+ s, pos, pos + 1)
+ res.append(r)
+ continue
+
+ pos += 1
+
+ if ordch1 <= 0xDF:
+ if pos >= end:
+ if not final:
+ break
+ r, pos = errorhandler(errors, "utf8", "unexpected end of data",
+ s, pos - 1, pos)
+ res.append(r)
+ continue
+ ordch2 = ord(s[pos])
+
+ if rutf8._invalid_byte_2_of_2(ordch2):
+ r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
+ s, pos - 1, pos)
+ res.append(r)
+ continue
+ # 110yyyyy 10zzzzzz -> 00000000 00000yyy yyzzzzzz
+ pos += 1
+ continuation_bytes += 1
+ res.append(chr(ordch1))
+ res.append(chr(ordch2))
+ continue
+
+ if ordch1 <= 0xEF:
+ if (pos + 2) > end:
+ if not final:
+ break
+ r, pos = errorhandler(errors, "utf8", "unexpected end of data",
+ s, pos - 1, pos + 1)
+ res.append(r)
+ continue
+ ordch2 = ord(s[pos])
+ ordch3 = ord(s[pos + 1])
+
+ if rutf8._invalid_byte_2_of_3(ordch1, ordch2, True):
+ r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
+ s, pos - 1, pos)
+ res.append(r)
+ continue
+ elif rutf8._invalid_byte_3_of_3(ordch3):
+ r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
+ s, pos - 1, pos + 1)
+ res.append(r)
+ continue
+ pos += 2
+
+ # 1110xxxx 10yyyyyy 10zzzzzz -> 00000000 xxxxyyyy yyzzzzzz
+ continuation_bytes += 2
+ res.append(chr(ordch1))
+ res.append(chr(ordch2))
+ res.append(chr(ordch3))
+ continue
+
+ if ordch1 <= 0xF4:
+ if (pos + 3) > end:
+ if not final:
+ break
+ r, pos = errorhandler(errors, "utf8", "unexpected end of data",
+ s, pos - 1, pos)
+ res.append(r)
+ continue
+ ordch2 = ord(s[pos])
+ ordch3 = ord(s[pos + 1])
+ ordch4 = ord(s[pos + 2])
+
+ if rutf8._invalid_byte_2_of_4(ordch1, ordch2):
+ r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
+ s, pos - 1, pos)
+ res.append(r)
+ continue
+ elif rutf8._invalid_byte_3_of_4(ordch3):
+ r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
+ s, pos - 1, pos + 1)
+ res.append(r)
+ continue
+ elif rutf8._invalid_byte_4_of_4(ordch4):
+ r, pos = errorhandler(errors, "utf8", "invalid continuation byte",
+ s, pos - 1, pos + 2)
+ res.append(r)
+ continue
+
+ pos += 3
+ # 11110www 10xxxxxx 10yyyyyy 10zzzzzz -> 000wwwxx xxxxyyyy yyzzzzzz
+ res.append(chr(ordch1))
+ res.append(chr(ordch2))
+ res.append(chr(ordch3))
+ res.append(chr(ordch4))
+ continuation_bytes += 3
+ continue
+
+ r, pos = errorhandler(errors, "utf8", "invalid start byte",
+ s, pos - 1, pos)
+ res.append(r)
+
+ assert pos == end
+ assert pos - continuation_bytes >= 0
+ r = res.build()
+ lgt, flag = rutf8.check_utf8(r, True)
+ return r, pos - continuation_bytes, lgt, flag
+
def hexescape(builder, s, pos, digits,
encoding, errorhandler, message, errors):
chr = 0
@@ -273,178 +575,57 @@
return builder.build(), pos, outsize, flag
-# These functions take and return unwrapped rpython strings and unicodes
-def decode_unicode_escape(space, string):
- state = space.fromcache(interp_codecs.CodecState)
- unicodedata_handler = state.get_unicodedata_handler(space)
- result_utf8, consumed, length, flag = str_decode_unicode_escape(
- string, "strict",
- final=True,
- errorhandler=decode_error_handler(space),
- ud_handler=unicodedata_handler)
- return result_utf8, length, flag
+# ____________________________________________________________
+# Raw unicode escape
-def decode_raw_unicode_escape(space, string):
- # XXX pick better length, maybe
- # XXX that guy does not belong in runicode (nor in rutf8)
- result_u, consumed = runicode.str_decode_raw_unicode_escape(
- string, len(string), "strict",
- final=True, errorhandler=DecodeWrapper(decode_error_handler(space)).handle)
- # XXX argh. we want each surrogate to be encoded separately
- utf8 = ''.join([u.encode('utf8') for u in result_u])
- if rutf8.first_non_ascii_char(utf8) == -1:
- flag = rutf8.FLAG_ASCII
- elif _has_surrogate(result_u):
- flag = rutf8.FLAG_HAS_SURROGATES
- else:
- flag = rutf8.FLAG_REGULAR
- return utf8, len(result_u), flag
+def str_decode_raw_unicode_escape(s, errors, final=False,
+ errorhandler=None):
+ size = len(s)
+ if size == 0:
+ return '', 0, 0, rutf8.FLAG_ASCII
-def check_ascii_or_raise(space, string):
- try:
- rutf8.check_ascii(string)
- except rutf8.CheckError as e:
- decode_error_handler(space)('strict', 'ascii',
- 'ordinal not in range(128)', string,
- e.pos, e.pos + 1)
- assert False, "unreachable"
+ result = StringBuilder(size)
+ pos = 0
+ while pos < size:
+ ch = s[pos]
-def check_utf8_or_raise(space, string):
- # Surrogates are accepted and not treated specially at all.
- # If there happen to be two 3-bytes encoding a pair of surrogates,
- # you still get two surrogate unicode characters in the result.
- # These are the Python2 rules; Python3 differs.
- try:
- length, flag = rutf8.check_utf8(string, allow_surrogates=True)
- except rutf8.CheckError as e:
- # convert position into unicode position
- lgt, flags = rutf8.check_utf8(string, True, stop=e.pos)
- decode_error_handler(space)('strict', 'utf8', 'invalid utf-8', string,
- lgt, lgt + 1)
- assert False, "unreachable"
- return length, flag
+ # Non-escape characters are interpreted as Unicode ordinals
+ if ch != '\\':
+ rutf8.unichr_as_utf8_append(result, ord(ch), True)
+ pos += 1
+ continue
-def encode_utf8(space, uni):
- # DEPRECATED
- # Note that this function never raises UnicodeEncodeError,
- # since surrogates are allowed, either paired or lone.
- # A paired surrogate is considered like the non-BMP character
- # it stands for. These are the Python2 rules; Python3 differs.
- return runicode.unicode_encode_utf_8(
- uni, len(uni), "strict",
- errorhandler=None,
- allow_surrogates=True)
+ # \u-escapes are only interpreted iff the number of leading
+ # backslashes is odd
+ bs = pos
+ while pos < size:
+ pos += 1
+ if pos == size or s[pos] != '\\':
+ break
+ result.append('\\')
-def decode_utf8(space, s):
- # DEPRECATED
- return (s, check_utf8_or_raise(space, s))
+ # we have a backslash at the end of the string, stop here
+ if pos >= size:
+ result.append('\\')
+ break
-def str_decode_ascii(s, errors, final, errorhandler):
- try:
- rutf8.check_ascii(s)
- return s, len(s), len(s), rutf8.FLAG_ASCII
- except rutf8.CheckError:
- return _str_decode_ascii_slowpath(s, errors, final, errorhandler)
+ if ((pos - bs) & 1 == 0 or
+ pos >= size or
+ (s[pos] != 'u' and s[pos] != 'U')):
+ result.append('\\')
+ rutf8.unichr_as_utf8_append(result, ord(s[pos]), True)
+ pos += 1
+ continue
-def _str_decode_ascii_slowpath(s, errors, final, errorhandler):
- i = 0
- res = StringBuilder()
- while i < len(s):
- ch = s[i]
- if ord(ch) > 0x7F:
- r, i = errorhandler(errors, 'ascii', 'ordinal not in range(128)',
- s, i, i + 1)
- res.append(r)
- else:
- res.append(ch)
- i += 1
- ress = res.build()
- lgt, flag = rutf8.check_utf8(ress, True)
- return ress, len(s), lgt, flag
+ digits = 4 if s[pos] == 'u' else 8
+ message = "truncated \\uXXXX"
+ pos += 1
+ pos = hexescape(result, s, pos, digits,
+ "rawunicodeescape", errorhandler, message, errors)
-# XXX wrappers, think about speed
-
-class DecodeWrapper(object):
- def __init__(self, handler):
- self.orig = handler
-
- def handle(self, errors, encoding, msg, s, pos, endpos):
- return self.orig(errors, encoding, msg, s, pos, endpos)
-
-class EncodeWrapper(object):
- def __init__(self, handler):
- self.orig = handler
-
- def handle(self, errors, encoding, msg, s, pos, endpos):
- return self.orig(errors, encoding, msg, s.encode("utf8"), pos, endpos)
-
-#def str_decode_unicode_escape(s, slen, errors, final, errorhandler, ud_handler):
-# w = DecodeWrapper(errorhandler)
-# u, pos = runicode.str_decode_unicode_escape(s, slen, errors, final,
-# w.handle,
-# ud_handler)
-# return u.encode('utf8'), pos, len(u), _get_flag(u)
-
-def setup_new_encoders_legacy(encoding):
- encoder_name = 'utf8_encode_' + encoding
- encoder_call_name = 'unicode_encode_' + encoding
- decoder_name = 'str_decode_' + encoding
- def encoder(utf8, utf8len, errors, errorhandler):
- u = utf8.decode("utf8")
- w = EncodeWrapper(errorhandler)
- return getattr(runicode, encoder_call_name)(u, len(u), errors,
- w.handle)
- def decoder(s, slen, errors, final, errorhandler):
- w = DecodeWrapper((errorhandler))
- u, pos = getattr(runicode, decoder_name)(s, slen, errors, final, w.handle)
- return u.encode('utf8'), pos, len(u), _get_flag(u)
- encoder.__name__ = encoder_name
- decoder.__name__ = decoder_name
- if encoder_name not in globals():
- globals()[encoder_name] = encoder
- if decoder_name not in globals():
- globals()[decoder_name] = decoder
-
-def setup():
- for encoding in ['raw_unicode_escape',
- 'utf_16', 'utf_16_le', 'utf_16_be', 'utf_32_le', 'utf_32',
- 'utf_32_be', 'latin_1', 'unicode_internal']:
- setup_new_encoders_legacy(encoding)
-
-setup()
-
-def utf8_encode_ascii(utf8, errors, errorhandler):
- """ Don't be confused - this is a slowpath for errors e.g. "ignore"
- or an obscure errorhandler
- """
- res = StringBuilder()
- i = 0
- pos = 0
- while i < len(utf8):
- ch = rutf8.codepoint_at_pos(utf8, i)
- if ch >= 0x7F:
- msg = "ordinal not in range(128)"
- r, newpos = errorhandler(errors, 'ascii', msg, utf8,
- pos, pos + 1)
- for _ in range(newpos - pos):
- i = rutf8.next_codepoint_pos(utf8, i)
- pos = newpos
- res.append(r)
- else:
- res.append(chr(ch))
- i = rutf8.next_codepoint_pos(utf8, i)
- pos += 1
-
- s = res.build()
- return s
-
-# some irregular interfaces
-def str_decode_utf8(s, slen, errors, final, errorhandler):
- xxxx
-
- u, pos = runicode.str_decode_utf_8_impl(s, slen, errors, final, w.handle,
- runicode.allow_surrogate_by_default)
- return u.encode('utf8'), pos, len(u), _get_flag(u)
+ r = result.build()
+ lgt, flag = rutf8.check_utf8(r, True)
+ return r, pos, lgt, flag
# ____________________________________________________________
# utf-7
@@ -660,7 +841,6 @@
base64bits >= 6 or
(base64bits > 0 and base64buffer != 0)):
msg = "unterminated shift sequence"
- xxxx
res, pos = errorhandler(errors, 'utf7', msg, s, shiftOutStartPos, pos)
reslen, resflags = rutf8.check_utf8(res, True)
outsize += reslen
diff --git a/pypy/module/_codecs/interp_codecs.py b/pypy/module/_codecs/interp_codecs.py
--- a/pypy/module/_codecs/interp_codecs.py
+++ b/pypy/module/_codecs/interp_codecs.py
@@ -473,7 +473,7 @@
lgt, flag = rutf8.check_utf8(string, allow_surrogates=True)
except rutf8.CheckError:
res, consumed, lgt, flag = unicodehelper.str_decode_utf8(string,
- len(string), errors, final, state.decode_error_handler)
+ errors, final, state.decode_error_handler)
return space.newtuple([space.newutf8(res, lgt, flag),
space.newint(consumed)])
else:
diff --git a/rpython/rlib/rutf8.py b/rpython/rlib/rutf8.py
--- a/rpython/rlib/rutf8.py
+++ b/rpython/rlib/rutf8.py
@@ -199,7 +199,6 @@
return
raise CheckError(res)
-
@jit.elidable
def first_non_ascii_char(s):
for i in range(len(s)):
More information about the pypy-commit
mailing list