[pypy-commit] pypy unicode-utf8: (arigo, fijal climbing)
arigo
pypy.commits at gmail.com
Thu Aug 24 08:40:00 EDT 2017
Author: Armin Rigo <arigo at tunes.org>
Branch: unicode-utf8
Changeset: r92248:bf0d9ddd4a6e
Date: 2017-08-24 14:39 +0200
http://bitbucket.org/pypy/pypy/changeset/bf0d9ddd4a6e/
Log: (arigo, fijal climbing)
Clean up rutf8.py
diff --git a/rpython/rlib/rutf8.py b/rpython/rlib/rutf8.py
--- a/rpython/rlib/rutf8.py
+++ b/rpython/rlib/rutf8.py
@@ -1,60 +1,70 @@
+""" This file is about supporting unicode strings in RPython,
+represented by a byte string that is exactly the UTF-8 version
+(for some definition of UTF-8).
+This doesn't support Python 2's unicode characters beyond 0x10ffff,
+which are theoretically possible to obtain using strange tricks like
+the array or ctypes modules.
+
+Fun comes from surrogates. Various functions don't normally accept
+any unicode character betwen 0xd800 and 0xdfff, but do if you give
+the 'allow_surrogates = True' flag.
+"""
+
+from rpython.rlib.objectmodel import enforceargs
from rpython.rlib.rstring import StringBuilder
-from rpython.rlib import runicode, jit
+from rpython.rlib import jit
+from rpython.rlib.rarithmetic import r_uint
-def unichr_as_utf8(code):
- """ Encode code (numeric value) as utf8 encoded string
+
+def unichr_as_utf8(code, allow_surrogates=False):
+ """Encode code (numeric value) as utf8 encoded string
"""
- if code < 0:
- raise ValueError
- lgt = 1
- if code >= runicode.MAXUNICODE:
- lgt = 2
- if code < 0x80:
+ code = r_uint(code)
+ if code <= r_uint(0x7F):
# Encode ASCII
- return chr(code), 1
- if code < 0x0800:
- # Encode Latin-1
- return chr((0xc0 | (code >> 6))) + chr((0x80 | (code & 0x3f))), lgt
- if code < 0x10000:
+ return chr(code)
+ if code <= r_uint(0x07FF):
+ return chr((0xc0 | (code >> 6))) + chr((0x80 | (code & 0x3f)))
+ if code <= r_uint(0xFFFF):
+ if not allow_surrogates and 0xD800 <= code <= 0xDfff:
+ raise ValueError
return (chr((0xe0 | (code >> 12))) +
chr((0x80 | ((code >> 6) & 0x3f))) +
- chr((0x80 | (code & 0x3f)))), lgt
- if code < 0x10ffff:
+ chr((0x80 | (code & 0x3f))))
+ if code <= r_uint(0x10FFFF):
return (chr((0xf0 | (code >> 18))) +
chr((0x80 | ((code >> 12) & 0x3f))) +
chr((0x80 | ((code >> 6) & 0x3f))) +
- chr((0x80 | (code & 0x3f)))), lgt
+ chr((0x80 | (code & 0x3f))))
raise ValueError
-def unichr_as_utf8_append(builder, code):
- """ Encode code (numeric value) as utf8 encoded string
+def unichr_as_utf8_append(builder, code, allow_surrogates=False):
+ """Encode code (numeric value) as utf8 encoded string
+ and emit the result into the given StringBuilder.
"""
- if code < 0:
- raise ValueError
- lgt = 1
- if code >= runicode.MAXUNICODE:
- lgt = 2
- if code < 0x80:
+ code = r_uint(code)
+ if code <= r_uint(0x7F):
# Encode ASCII
builder.append(chr(code))
- return 1
- if code < 0x0800:
- # Encode Latin-1
+ return
+ if code <= r_uint(0x07FF):
builder.append(chr((0xc0 | (code >> 6))))
builder.append(chr((0x80 | (code & 0x3f))))
- return lgt
- if code < 0x10000:
+ return
+ if code <= r_uint(0xFFFF):
+ if not allow_surrogates and 0xd800 <= code <= 0xdfff:
+ raise ValueError
builder.append(chr((0xe0 | (code >> 12))))
builder.append(chr((0x80 | ((code >> 6) & 0x3f))))
builder.append(chr((0x80 | (code & 0x3f))))
- return lgt
- if code < 0x10ffff:
+ return
+ if code <= r_uint(0x10FFFF):
builder.append(chr((0xf0 | (code >> 18))))
builder.append(chr((0x80 | ((code >> 12) & 0x3f))))
builder.append(chr((0x80 | ((code >> 6) & 0x3f))))
builder.append(chr((0x80 | (code & 0x3f))))
- return lgt
+ return
raise ValueError
# note - table lookups are really slow. Measured on various elements of obama
@@ -62,61 +72,64 @@
# In extreme cases (small, only chinese text), they're 40% slower
def next_codepoint_pos(code, pos):
- """ Gives the position of the next codepoint after pos, -1
- if it's the last one (assumes valid utf8)
+ """Gives the position of the next codepoint after pos.
+ Assumes valid utf8. 'pos' must be before the end of the string.
"""
chr1 = ord(code[pos])
- if chr1 < 0x80:
+ if chr1 <= 0x7F:
return pos + 1
- if 0xC2 <= chr1 <= 0xDF:
+ if chr1 <= 0xDF:
return pos + 2
- if chr1 >= 0xE0 and chr1 <= 0xEF:
+ if chr1 <= 0xEF:
return pos + 3
return pos + 4
def prev_codepoint_pos(code, pos):
- """ Gives the position of the previous codepoint
+ """Gives the position of the previous codepoint.
+ 'pos' must not be zero.
"""
pos -= 1
chr1 = ord(code[pos])
- if chr1 < 0x80:
+ if chr1 <= 0x7F:
return pos
- while ord(code[pos]) & 0xC0 == 0x80:
- pos -= 1
+ pos -= 1
+ if ord(code[pos]) >= 0xC0:
+ return pos
+ pos -= 1
+ if ord(code[pos]) >= 0xC0:
+ return pos
+ pos -= 1
return pos
def compute_length_utf8(s):
- pos = 0
- lgt = 0
- while pos < len(s):
- pos = next_codepoint_pos(s, pos)
- lgt += 1
- return lgt
+ continuation_bytes = 0
+ for i in range(len(s)):
+ if 0x80 <= ord(s[i]) <= 0xBF: # count the continuation bytes
+ continuation_bytes += 1
+ return len(s) - continuation_bytes
def codepoint_at_pos(code, pos):
""" Give a codepoint in code at pos - assumes valid utf8, no checking!
"""
ordch1 = ord(code[pos])
- if ordch1 < 0x80:
+ if ordch1 <= 0x7F:
return ordch1
- n = ord(runicode._utf8_code_length[ordch1 - 0x80])
- if n == 2:
- ordch2 = ord(code[pos+1])
+ ordch2 = ord(code[pos+1])
+ if ordch1 <= 0xDF:
# 110yyyyy 10zzzzzz -> 00000000 00000yyy yyzzzzzz
return (((ordch1 & 0x1F) << 6) + # 0b00011111
(ordch2 & 0x3F)) # 0b00111111
- elif n == 3:
- ordch2 = ord(code[pos+1])
- ordch3 = ord(code[pos+2])
+
+ ordch3 = ord(code[pos+2])
+ if ordch1 <= 0xEF:
# 1110xxxx 10yyyyyy 10zzzzzz -> 00000000 xxxxyyyy yyzzzzzz
return (((ordch1 & 0x0F) << 12) + # 0b00001111
((ordch2 & 0x3F) << 6) + # 0b00111111
(ordch3 & 0x3F)) # 0b00111111
- elif n == 4:
- ordch2 = ord(code[pos+1])
- ordch3 = ord(code[pos+2])
- ordch4 = ord(code[pos+3])
+
+ ordch4 = ord(code[pos+3])
+ if True:
# 11110www 10xxxxxx 10yyyyyy 10zzzzzz -> 000wwwxx xxxxyyyy yyzzzzzz
return (((ordch1 & 0x07) << 18) + # 0b00000111
((ordch2 & 0x3F) << 12) + # 0b00111111
@@ -124,46 +137,44 @@
(ordch4 & 0x3F)) # 0b00111111
assert False, "unreachable"
-class AsciiCheckError(Exception):
- def __init__(self, pos):
- self.pos = pos
+class CheckError(Exception):
+ pass
-def check_ascii(s, size=-1):
- if size == -1:
- size = len(s)
- for i in range(0, size):
- if ord(s[i]) & 0x80:
- raise AsciiCheckError(i)
+ at jit.elidable
+def check_ascii(s):
+ for i in range(len(s)):
+ if ord(s[i]) > 0x7F:
+ raise CheckError
-def utf8_encode_ascii(s, errors, encoding, msg, errorhandler):
- res = StringBuilder(len(s))
- u_pos = 0
- pos = 0
- while pos < len(s):
- chr1 = s[pos]
- if ord(chr1) < 0x80:
- res.append(chr1)
- else:
- repl, _, _, _ = errorhandler(errors, encoding, msg, s, u_pos, u_pos + 1)
- res.append(repl)
- u_pos += 1
- pos = next_codepoint_pos(s, pos)
- return res.build()
+#def utf8_encode_ascii(s, errors, encoding, msg, errorhandler):
+# res = StringBuilder(len(s))
+# u_pos = 0
+# pos = 0
+# while pos < len(s):
+# chr1 = s[pos]
+# if ord(chr1) < 0x80:
+# res.append(chr1)
+# else:
+# repl, _, _, _ = errorhandler(errors, encoding, msg, s, u_pos, u_pos + 1)
+# res.append(repl)
+# u_pos += 1
+# pos = next_codepoint_pos(s, pos)
+# return res.build()
-def str_decode_ascii(s, size, errors, errorhandler):
- # ASCII is equivalent to the first 128 ordinals in Unicode.
- result = StringBuilder(size)
- pos = 0
- while pos < size:
- c = s[pos]
- if ord(c) < 128:
- result.append(c)
- else:
- r, _, _ = errorhandler(errors, "ascii", "ordinal not in range(128)",
- s, pos, pos + 1)
- result.append(r)
- pos += 1
- return result.build(), pos, -1
+#def str_decode_ascii(s, size, errors, errorhandler):
+# # ASCII is equivalent to the first 128 ordinals in Unicode.
+# result = StringBuilder(size)
+# pos = 0
+# while pos < size:
+# c = s[pos]
+# if ord(c) < 128:
+# result.append(c)
+# else:
+# r, _, _ = errorhandler(errors, "ascii", "ordinal not in range(128)",
+# s, pos, pos + 1)
+# result.append(r)
+# pos += 1
+# return result.build(), pos, -1
def islinebreak(s, pos):
chr1 = ord(s[pos])
@@ -217,149 +228,92 @@
return True
return False
-def utf8_in_chars(value, pos, chars):
- """ equivalent of u'x' in u'xyz', just done in utf8
- """
- lgt = next_codepoint_pos(value, pos) - pos
- i = 0
- while i < len(chars):
- j = next_codepoint_pos(chars, i)
- if j - i != lgt:
- i = j
- continue
- for k in range(lgt):
- if value[k + pos] != chars[i + k]:
- break
- else:
- return True
- i = j
- return False
-class Utf8CheckError(Exception):
- def __init__(self, msg, startpos, endpos):
- self.msg = msg
- self.startpos = startpos
- self.endpos = endpos
+def _invalid_cont_byte(ordch):
+ return ordch>>6 != 0x2 # 0b10
+
+_invalid_byte_2_of_2 = _invalid_cont_byte
+_invalid_byte_3_of_3 = _invalid_cont_byte
+_invalid_byte_3_of_4 = _invalid_cont_byte
+_invalid_byte_4_of_4 = _invalid_cont_byte
+
+ at enforceargs(allow_surrogates=bool)
+def _invalid_byte_2_of_3(ordch1, ordch2, allow_surrogates):
+ return (ordch2>>6 != 0x2 or # 0b10
+ (ordch1 == 0xe0 and ordch2 < 0xa0)
+ # surrogates shouldn't be valid UTF-8!
+ or (ordch1 == 0xed and ordch2 > 0x9f and not allow_surrogates))
+
+def _invalid_byte_2_of_4(ordch1, ordch2):
+ return (ordch2>>6 != 0x2 or # 0b10
+ (ordch1 == 0xf0 and ordch2 < 0x90) or
+ (ordch1 == 0xf4 and ordch2 > 0x8f))
+
@jit.elidable
-def str_check_utf8(s, size, final=False,
- allow_surrogates=runicode.allow_surrogate_by_default):
- """ A simplified version of utf8 encoder - it only works with 'strict'
- error handling.
+def check_utf8(s, allow_surrogates=False):
+ """Check that 's' is a utf-8-encoded byte string.
+ Returns the length (number of chars) or raise CheckError.
+ Note that surrogates are not handled specially here.
"""
- # XXX do the following in a cleaner way, e.g. via signature
- # NB. a bit messy because rtyper/rstr.py also calls the same
- # function. Make sure we annotate for the args it passes, too
- #if NonConstant(False):
- # s = NonConstant('?????')
- # size = NonConstant(12345)
- # errors = NonConstant('strict')
- # final = NonConstant(True)
- # errorhandler = ll_unicode_error_decode
- # allow_surrogates = NonConstant(True)
- if size == 0:
- return 0, 0
-
pos = 0
- lgt = 0
- while pos < size:
+ continuation_bytes = 0
+ while pos < len(s):
ordch1 = ord(s[pos])
+ pos += 1
# fast path for ASCII
- # XXX maybe use a while loop here
- if ordch1 < 0x80:
- lgt += 1
- pos += 1
+ if ordch1 <= 0x7F:
continue
- n = ord(runicode._utf8_code_length[ordch1 - 0x80])
- if pos + n > size:
- if not final:
- break
- # argh, this obscure block of code is mostly a copy of
- # what follows :-(
- charsleft = size - pos - 1 # either 0, 1, 2
- # note: when we get the 'unexpected end of data' we need
- # to care about the pos returned; it can be lower than size,
- # in case we need to continue running this loop
- if not charsleft:
- # there's only the start byte and nothing else
- raise Utf8CheckError('unexpected end of data', pos, pos + 1)
- ordch2 = ord(s[pos+1])
- if n == 3:
- # 3-bytes seq with only a continuation byte
- if runicode._invalid_byte_2_of_3(ordch1, ordch2, allow_surrogates):
- # second byte invalid, take the first and continue
- raise Utf8CheckError('invalid continuation byte', pos,
- pos + 1)
- else:
- # second byte valid, but third byte missing
- raise Utf8CheckError('unexpected end of data', pos, pos + 2)
- elif n == 4:
- # 4-bytes seq with 1 or 2 continuation bytes
- if runicode._invalid_byte_2_of_4(ordch1, ordch2):
- # second byte invalid, take the first and continue
- raise Utf8CheckError('invalid continuation byte', pos,
- pos + 1)
- elif charsleft == 2 and runicode._invalid_byte_3_of_4(ord(s[pos+2])):
- # third byte invalid, take the first two and continue
- raise Utf8CheckError('invalid continuation byte', pos,
- pos + 2)
- else:
- # there's only 1 or 2 valid cb, but the others are missing
- raise Utf8CheckError('unexpected end of data', pos,
- pos + charsleft + 1)
- raise AssertionError("unreachable")
+ if ordch1 <= 0xC1:
+ raise CheckError
- if n == 0:
- raise Utf8CheckError('invalid start byte', pos, pos + 1)
- elif n == 1:
- assert 0, "ascii should have gone through the fast path"
+ if ordch1 <= 0xDF:
+ continuation_bytes += 1
+ if pos >= len(s):
+ raise CheckError
+ ordch2 = ord(s[pos])
+ pos += 1
- elif n == 2:
- ordch2 = ord(s[pos+1])
- if runicode._invalid_byte_2_of_2(ordch2):
- raise Utf8CheckError('invalid continuation byte', pos,
- pos + 2)
+ if _invalid_byte_2_of_2(ordch2):
+ raise CheckError
# 110yyyyy 10zzzzzz -> 00000000 00000yyy yyzzzzzz
- lgt += 1
+ continue
+
+ if ordch1 <= 0xEF:
+ continuation_bytes += 2
+ if (pos + 2) > len(s):
+ raise CheckError
+ ordch2 = ord(s[pos])
+ ordch3 = ord(s[pos + 1])
pos += 2
- elif n == 3:
- ordch2 = ord(s[pos+1])
- ordch3 = ord(s[pos+2])
- if runicode._invalid_byte_2_of_3(ordch1, ordch2, allow_surrogates):
- raise Utf8CheckError('invalid continuation byte', pos,
- pos + 1)
- elif runicode._invalid_byte_3_of_3(ordch3):
- raise Utf8CheckError('invalid continuation byte', pos,
- pos + 2)
+ if _invalid_byte_2_of_3(ordch1, ordch2, allow_surrogates):
+ raise CheckError
+ elif _invalid_byte_3_of_3(ordch3):
+ raise CheckError
# 1110xxxx 10yyyyyy 10zzzzzz -> 00000000 xxxxyyyy yyzzzzzz
- lgt += 1
+ continue
+
+ if ordch1 <= 0xF4:
+ continuation_bytes += 3
+ if (pos + 3) > len(s):
+ raise CheckError
+ ordch2 = ord(s[pos])
+ ordch3 = ord(s[pos + 1])
+ ordch4 = ord(s[pos + 2])
pos += 3
- elif n == 4:
- ordch2 = ord(s[pos+1])
- ordch3 = ord(s[pos+2])
- ordch4 = ord(s[pos+3])
- if runicode._invalid_byte_2_of_4(ordch1, ordch2):
- raise Utf8CheckError('invalid continuation byte', pos,
- pos + 1)
- elif runicode._invalid_byte_3_of_4(ordch3):
- raise Utf8CheckError('invalid continuation byte', pos,
- pos + 2)
- elif runicode._invalid_byte_4_of_4(ordch4):
- raise Utf8CheckError('invalid continuation byte', pos,
- pos + 3)
+ if _invalid_byte_2_of_4(ordch1, ordch2):
+ raise CheckError
+ elif _invalid_byte_3_of_4(ordch3):
+ raise CheckError
+ elif _invalid_byte_4_of_4(ordch4):
+ raise CheckError
# 11110www 10xxxxxx 10yyyyyy 10zzzzzz -> 000wwwxx xxxxyyyy yyzzzzzz
- c = (((ordch1 & 0x07) << 18) + # 0b00000111
- ((ordch2 & 0x3F) << 12) + # 0b00111111
- ((ordch3 & 0x3F) << 6) + # 0b00111111
- (ordch4 & 0x3F)) # 0b00111111
- if c <= runicode.MAXUNICODE:
- lgt += 1
- else:
- # append the two surrogates:
- lgt += 2
- pos += 4
+ continue
- return pos, lgt
+ raise CheckError
+
+ assert pos == len(s)
+ return pos - continuation_bytes
diff --git a/rpython/rlib/test/test_rutf8.py b/rpython/rlib/test/test_rutf8.py
--- a/rpython/rlib/test/test_rutf8.py
+++ b/rpython/rlib/test/test_rutf8.py
@@ -1,14 +1,18 @@
-
+import py
import sys
from hypothesis import given, strategies, settings, example
from rpython.rlib import rutf8, runicode
- at given(strategies.integers(min_value=0, max_value=runicode.MAXUNICODE))
-def test_unichr_as_utf8(i):
- u, lgt = rutf8.unichr_as_utf8(i)
- r = runicode.UNICHR(i)
- assert u == r.encode('utf8')
+
+ at given(strategies.characters(), strategies.booleans())
+def test_unichr_as_utf8(c, allow_surrogates):
+ i = ord(c)
+ if not allow_surrogates and 0xD800 <= i <= 0xDFFF:
+ py.test.raises(ValueError, rutf8.unichr_as_utf8, i, allow_surrogates)
+ else:
+ u = rutf8.unichr_as_utf8(i, allow_surrogates)
+ assert u == c.encode('utf8')
@given(strategies.binary())
def test_check_ascii(s):
@@ -19,28 +23,32 @@
raised = True
try:
rutf8.check_ascii(s)
- except rutf8.AsciiCheckError as a:
+ except rutf8.CheckError:
assert raised
- assert a.pos == e.start
else:
assert not raised
- at given(strategies.binary())
-def test_str_check_utf8(s):
+ at given(strategies.binary(), strategies.booleans())
+def test_check_utf8(s, allow_surrogates):
+ _test_check_utf8(s, allow_surrogates)
+
+ at given(strategies.text(), strategies.booleans())
+def test_check_utf8_valid(u, allow_surrogates):
+ _test_check_utf8(u.encode('utf-8'), allow_surrogates)
+
+def _test_check_utf8(s, allow_surrogates):
try:
- u, _ = runicode.str_decode_utf_8(s, len(s), None, final=True)
+ u, _ = runicode.str_decode_utf_8(s, len(s), None, final=True,
+ allow_surrogates=allow_surrogates)
valid = True
except UnicodeDecodeError as e:
valid = False
try:
- consumed, length = rutf8.str_check_utf8(s, len(s), final=True)
- except rutf8.Utf8CheckError as a:
+ length = rutf8.check_utf8(s, allow_surrogates)
+ except rutf8.CheckError:
assert not valid
- assert a.startpos == e.start
- # assert a.end == e.end, ideally
else:
assert valid
- assert consumed == len(s)
assert length == len(u)
@given(strategies.characters())
@@ -80,5 +88,5 @@
response = True
else:
response = False
- r = rutf8.utf8_in_chars(unichr(i).encode('utf8'), 0, uni.encode('utf8'))
+ r = unichr(i).encode('utf8') in uni.encode('utf8')
assert r == response
More information about the pypy-commit
mailing list