[pypy-commit] pypy unicode-utf8: (arigo, fijal climbing)

arigo pypy.commits at gmail.com
Thu Aug 24 08:40:00 EDT 2017


Author: Armin Rigo <arigo at tunes.org>
Branch: unicode-utf8
Changeset: r92248:bf0d9ddd4a6e
Date: 2017-08-24 14:39 +0200
http://bitbucket.org/pypy/pypy/changeset/bf0d9ddd4a6e/

Log:	(arigo, fijal climbing)

	Clean up rutf8.py

diff --git a/rpython/rlib/rutf8.py b/rpython/rlib/rutf8.py
--- a/rpython/rlib/rutf8.py
+++ b/rpython/rlib/rutf8.py
@@ -1,60 +1,70 @@
+""" This file is about supporting unicode strings in RPython,
+represented by a byte string that is exactly the UTF-8 version
+(for some definition of UTF-8).
 
+This doesn't support Python 2's unicode characters beyond 0x10ffff,
+which are theoretically possible to obtain using strange tricks like
+the array or ctypes modules.
+
+Fun comes from surrogates.  Various functions don't normally accept
+any unicode character betwen 0xd800 and 0xdfff, but do if you give
+the 'allow_surrogates = True' flag.
+"""
+
+from rpython.rlib.objectmodel import enforceargs
 from rpython.rlib.rstring import StringBuilder
-from rpython.rlib import runicode, jit
+from rpython.rlib import jit
+from rpython.rlib.rarithmetic import r_uint
 
-def unichr_as_utf8(code):
-    """ Encode code (numeric value) as utf8 encoded string
+
+def unichr_as_utf8(code, allow_surrogates=False):
+    """Encode code (numeric value) as utf8 encoded string
     """
-    if code < 0:
-        raise ValueError
-    lgt = 1
-    if code >= runicode.MAXUNICODE:
-        lgt = 2
-    if code < 0x80:
+    code = r_uint(code)
+    if code <= r_uint(0x7F):
         # Encode ASCII
-        return chr(code), 1
-    if code < 0x0800:
-        # Encode Latin-1
-        return chr((0xc0 | (code >> 6))) + chr((0x80 | (code & 0x3f))), lgt
-    if code < 0x10000:
+        return chr(code)
+    if code <= r_uint(0x07FF):
+        return chr((0xc0 | (code >> 6))) + chr((0x80 | (code & 0x3f)))
+    if code <= r_uint(0xFFFF):
+        if not allow_surrogates and 0xD800 <= code <= 0xDfff:
+            raise ValueError
         return (chr((0xe0 | (code >> 12))) +
                 chr((0x80 | ((code >> 6) & 0x3f))) +
-                chr((0x80 | (code & 0x3f)))), lgt
-    if code < 0x10ffff:
+                chr((0x80 | (code & 0x3f))))
+    if code <= r_uint(0x10FFFF):
         return (chr((0xf0 | (code >> 18))) +
                 chr((0x80 | ((code >> 12) & 0x3f))) +
                 chr((0x80 | ((code >> 6) & 0x3f))) +
-                chr((0x80 | (code & 0x3f)))), lgt
+                chr((0x80 | (code & 0x3f))))
     raise ValueError
 
-def unichr_as_utf8_append(builder, code):
-    """ Encode code (numeric value) as utf8 encoded string
+def unichr_as_utf8_append(builder, code, allow_surrogates=False):
+    """Encode code (numeric value) as utf8 encoded string
+    and emit the result into the given StringBuilder.
     """
-    if code < 0:
-        raise ValueError
-    lgt = 1
-    if code >= runicode.MAXUNICODE:
-        lgt = 2
-    if code < 0x80:
+    code = r_uint(code)
+    if code <= r_uint(0x7F):
         # Encode ASCII
         builder.append(chr(code))
-        return 1
-    if code < 0x0800:
-        # Encode Latin-1
+        return
+    if code <= r_uint(0x07FF):
         builder.append(chr((0xc0 | (code >> 6))))
         builder.append(chr((0x80 | (code & 0x3f))))
-        return lgt
-    if code < 0x10000:
+        return
+    if code <= r_uint(0xFFFF):
+        if not allow_surrogates and 0xd800 <= code <= 0xdfff:
+            raise ValueError
         builder.append(chr((0xe0 | (code >> 12))))
         builder.append(chr((0x80 | ((code >> 6) & 0x3f))))
         builder.append(chr((0x80 | (code & 0x3f))))
-        return lgt
-    if code < 0x10ffff:
+        return
+    if code <= r_uint(0x10FFFF):
         builder.append(chr((0xf0 | (code >> 18))))
         builder.append(chr((0x80 | ((code >> 12) & 0x3f))))
         builder.append(chr((0x80 | ((code >> 6) & 0x3f))))
         builder.append(chr((0x80 | (code & 0x3f))))
-        return lgt
+        return
     raise ValueError
 
 # note - table lookups are really slow. Measured on various elements of obama
@@ -62,61 +72,64 @@
 #        In extreme cases (small, only chinese text), they're 40% slower
 
 def next_codepoint_pos(code, pos):
-    """ Gives the position of the next codepoint after pos, -1
-    if it's the last one (assumes valid utf8)
+    """Gives the position of the next codepoint after pos.
+    Assumes valid utf8.  'pos' must be before the end of the string.
     """
     chr1 = ord(code[pos])
-    if chr1 < 0x80:
+    if chr1 <= 0x7F:
         return pos + 1
-    if 0xC2 <= chr1 <= 0xDF:
+    if chr1 <= 0xDF:
         return pos + 2
-    if chr1 >= 0xE0 and chr1 <= 0xEF:
+    if chr1 <= 0xEF:
         return pos + 3
     return pos + 4
 
 def prev_codepoint_pos(code, pos):
-    """ Gives the position of the previous codepoint
+    """Gives the position of the previous codepoint.
+    'pos' must not be zero.
     """
     pos -= 1
     chr1 = ord(code[pos])
-    if chr1 < 0x80:
+    if chr1 <= 0x7F:
         return pos
-    while ord(code[pos]) & 0xC0 == 0x80:
-        pos -= 1
+    pos -= 1
+    if ord(code[pos]) >= 0xC0:
+        return pos
+    pos -= 1
+    if ord(code[pos]) >= 0xC0:
+        return pos
+    pos -= 1
     return pos
 
 def compute_length_utf8(s):
-    pos = 0
-    lgt = 0
-    while pos < len(s):
-        pos = next_codepoint_pos(s, pos)
-        lgt += 1
-    return lgt
+    continuation_bytes = 0
+    for i in range(len(s)):
+        if 0x80 <= ord(s[i]) <= 0xBF:    # count the continuation bytes
+            continuation_bytes += 1
+    return len(s) - continuation_bytes
 
 def codepoint_at_pos(code, pos):
     """ Give a codepoint in code at pos - assumes valid utf8, no checking!
     """
     ordch1 = ord(code[pos])
-    if ordch1 < 0x80:
+    if ordch1 <= 0x7F:
         return ordch1
 
-    n = ord(runicode._utf8_code_length[ordch1 - 0x80])
-    if n == 2:
-        ordch2 = ord(code[pos+1])
+    ordch2 = ord(code[pos+1])
+    if ordch1 <= 0xDF:
         # 110yyyyy 10zzzzzz -> 00000000 00000yyy yyzzzzzz
         return (((ordch1 & 0x1F) << 6) +    # 0b00011111
                  (ordch2 & 0x3F))           # 0b00111111
-    elif n == 3:
-        ordch2 = ord(code[pos+1])
-        ordch3 = ord(code[pos+2])
+
+    ordch3 = ord(code[pos+2])
+    if ordch1 <= 0xEF:
         # 1110xxxx 10yyyyyy 10zzzzzz -> 00000000 xxxxyyyy yyzzzzzz
         return (((ordch1 & 0x0F) << 12) +     # 0b00001111
                 ((ordch2 & 0x3F) << 6) +      # 0b00111111
                 (ordch3 & 0x3F))              # 0b00111111
-    elif n == 4:
-        ordch2 = ord(code[pos+1])
-        ordch3 = ord(code[pos+2])
-        ordch4 = ord(code[pos+3])
+
+    ordch4 = ord(code[pos+3])
+    if True:
         # 11110www 10xxxxxx 10yyyyyy 10zzzzzz -> 000wwwxx xxxxyyyy yyzzzzzz
         return (((ordch1 & 0x07) << 18) +      # 0b00000111
                 ((ordch2 & 0x3F) << 12) +      # 0b00111111
@@ -124,46 +137,44 @@
                 (ordch4 & 0x3F))               # 0b00111111
     assert False, "unreachable"
 
-class AsciiCheckError(Exception):
-    def __init__(self, pos):
-        self.pos = pos
+class CheckError(Exception):
+    pass
 
-def check_ascii(s, size=-1):
-    if size == -1:
-        size = len(s)
-    for i in range(0, size):
-        if ord(s[i]) & 0x80:
-            raise AsciiCheckError(i)
+ at jit.elidable
+def check_ascii(s):
+    for i in range(len(s)):
+        if ord(s[i]) > 0x7F:
+            raise CheckError
 
-def utf8_encode_ascii(s, errors, encoding, msg, errorhandler):
-    res = StringBuilder(len(s))
-    u_pos = 0
-    pos = 0
-    while pos < len(s):
-        chr1 = s[pos]
-        if ord(chr1) < 0x80:
-            res.append(chr1)
-        else:
-            repl, _, _, _ = errorhandler(errors, encoding, msg, s, u_pos, u_pos + 1)
-            res.append(repl)
-        u_pos += 1
-        pos = next_codepoint_pos(s, pos)
-    return res.build()
+#def utf8_encode_ascii(s, errors, encoding, msg, errorhandler):
+#    res = StringBuilder(len(s))
+#    u_pos = 0
+#    pos = 0
+#    while pos < len(s):
+#        chr1 = s[pos]
+#        if ord(chr1) < 0x80:
+#            res.append(chr1)
+#        else:
+#            repl, _, _, _ = errorhandler(errors, encoding, msg, s, u_pos, u_pos + 1)
+#            res.append(repl)
+#        u_pos += 1
+#        pos = next_codepoint_pos(s, pos)
+#    return res.build()
 
-def str_decode_ascii(s, size, errors, errorhandler):
-    # ASCII is equivalent to the first 128 ordinals in Unicode.
-    result = StringBuilder(size)
-    pos = 0
-    while pos < size:
-        c = s[pos]
-        if ord(c) < 128:
-            result.append(c)
-        else:
-            r, _, _ = errorhandler(errors, "ascii", "ordinal not in range(128)",
-                                   s,  pos, pos + 1)
-            result.append(r)
-        pos += 1
-    return result.build(), pos, -1
+#def str_decode_ascii(s, size, errors, errorhandler):
+#    # ASCII is equivalent to the first 128 ordinals in Unicode.
+#    result = StringBuilder(size)
+#    pos = 0
+#    while pos < size:
+#        c = s[pos]
+#        if ord(c) < 128:
+#            result.append(c)
+#        else:
+#            r, _, _ = errorhandler(errors, "ascii", "ordinal not in range(128)",
+#                                   s,  pos, pos + 1)
+#            result.append(r)
+#        pos += 1
+#    return result.build(), pos, -1
 
 def islinebreak(s, pos):
     chr1 = ord(s[pos])
@@ -217,149 +228,92 @@
         return True
     return False
 
-def utf8_in_chars(value, pos, chars):
-    """ equivalent of u'x' in u'xyz', just done in utf8
-    """
-    lgt = next_codepoint_pos(value, pos) - pos
-    i = 0
-    while i < len(chars):
-        j = next_codepoint_pos(chars, i)
-        if j - i != lgt:
-            i = j
-            continue
-        for k in range(lgt):
-            if value[k + pos] != chars[i + k]:
-                break
-        else:
-            return True
-        i = j
-    return False
 
-class Utf8CheckError(Exception):
-    def __init__(self, msg, startpos, endpos):
-        self.msg = msg
-        self.startpos = startpos
-        self.endpos = endpos
+def _invalid_cont_byte(ordch):
+    return ordch>>6 != 0x2    # 0b10
+
+_invalid_byte_2_of_2 = _invalid_cont_byte
+_invalid_byte_3_of_3 = _invalid_cont_byte
+_invalid_byte_3_of_4 = _invalid_cont_byte
+_invalid_byte_4_of_4 = _invalid_cont_byte
+
+ at enforceargs(allow_surrogates=bool)
+def _invalid_byte_2_of_3(ordch1, ordch2, allow_surrogates):
+    return (ordch2>>6 != 0x2 or    # 0b10
+            (ordch1 == 0xe0 and ordch2 < 0xa0)
+            # surrogates shouldn't be valid UTF-8!
+            or (ordch1 == 0xed and ordch2 > 0x9f and not allow_surrogates))
+
+def _invalid_byte_2_of_4(ordch1, ordch2):
+    return (ordch2>>6 != 0x2 or    # 0b10
+            (ordch1 == 0xf0 and ordch2 < 0x90) or
+            (ordch1 == 0xf4 and ordch2 > 0x8f))
+
 
 @jit.elidable
-def str_check_utf8(s, size, final=False,
-                   allow_surrogates=runicode.allow_surrogate_by_default):
-    """ A simplified version of utf8 encoder - it only works with 'strict'
-    error handling.
+def check_utf8(s, allow_surrogates=False):
+    """Check that 's' is a utf-8-encoded byte string.
+    Returns the length (number of chars) or raise CheckError.
+    Note that surrogates are not handled specially here.
     """
-    # XXX do the following in a cleaner way, e.g. via signature
-    # NB. a bit messy because rtyper/rstr.py also calls the same
-    # function.  Make sure we annotate for the args it passes, too
-    #if NonConstant(False):
-    #    s = NonConstant('?????')
-    #    size = NonConstant(12345)
-    #    errors = NonConstant('strict')
-    #    final = NonConstant(True)
-    #    errorhandler = ll_unicode_error_decode
-    #    allow_surrogates = NonConstant(True)
-    if size == 0:
-        return 0, 0
-
     pos = 0
-    lgt = 0
-    while pos < size:
+    continuation_bytes = 0
+    while pos < len(s):
         ordch1 = ord(s[pos])
+        pos += 1
         # fast path for ASCII
-        # XXX maybe use a while loop here
-        if ordch1 < 0x80:
-            lgt += 1
-            pos += 1
+        if ordch1 <= 0x7F:
             continue
 
-        n = ord(runicode._utf8_code_length[ordch1 - 0x80])
-        if pos + n > size:
-            if not final:
-                break
-            # argh, this obscure block of code is mostly a copy of
-            # what follows :-(
-            charsleft = size - pos - 1 # either 0, 1, 2
-            # note: when we get the 'unexpected end of data' we need
-            # to care about the pos returned; it can be lower than size,
-            # in case we need to continue running this loop
-            if not charsleft:
-                # there's only the start byte and nothing else
-                raise Utf8CheckError('unexpected end of data', pos, pos + 1)
-            ordch2 = ord(s[pos+1])
-            if n == 3:
-                # 3-bytes seq with only a continuation byte
-                if runicode._invalid_byte_2_of_3(ordch1, ordch2, allow_surrogates):
-                    # second byte invalid, take the first and continue
-                    raise Utf8CheckError('invalid continuation byte', pos,
-                                         pos + 1)
-                else:
-                    # second byte valid, but third byte missing
-                    raise Utf8CheckError('unexpected end of data', pos, pos + 2)
-            elif n == 4:
-                # 4-bytes seq with 1 or 2 continuation bytes
-                if runicode._invalid_byte_2_of_4(ordch1, ordch2):
-                    # second byte invalid, take the first and continue
-                    raise Utf8CheckError('invalid continuation byte', pos,
-                                         pos + 1)
-                elif charsleft == 2 and runicode._invalid_byte_3_of_4(ord(s[pos+2])):
-                    # third byte invalid, take the first two and continue
-                    raise Utf8CheckError('invalid continuation byte', pos,
-                                         pos + 2)
-                else:
-                    # there's only 1 or 2 valid cb, but the others are missing
-                    raise Utf8CheckError('unexpected end of data', pos,
-                                         pos + charsleft + 1)
-            raise AssertionError("unreachable")
+        if ordch1 <= 0xC1:
+            raise CheckError
 
-        if n == 0:
-            raise Utf8CheckError('invalid start byte', pos, pos + 1)
-        elif n == 1:
-            assert 0, "ascii should have gone through the fast path"
+        if ordch1 <= 0xDF:
+            continuation_bytes += 1
+            if pos >= len(s):
+                raise CheckError
+            ordch2 = ord(s[pos])
+            pos += 1
 
-        elif n == 2:
-            ordch2 = ord(s[pos+1])
-            if runicode._invalid_byte_2_of_2(ordch2):
-                raise Utf8CheckError('invalid continuation byte', pos,
-                                     pos + 2)
+            if _invalid_byte_2_of_2(ordch2):
+                raise CheckError
             # 110yyyyy 10zzzzzz -> 00000000 00000yyy yyzzzzzz
-            lgt += 1
+            continue
+
+        if ordch1 <= 0xEF:
+            continuation_bytes += 2
+            if (pos + 2) > len(s):
+                raise CheckError
+            ordch2 = ord(s[pos])
+            ordch3 = ord(s[pos + 1])
             pos += 2
 
-        elif n == 3:
-            ordch2 = ord(s[pos+1])
-            ordch3 = ord(s[pos+2])
-            if runicode._invalid_byte_2_of_3(ordch1, ordch2, allow_surrogates):
-                raise Utf8CheckError('invalid continuation byte', pos,
-                                     pos + 1)
-            elif runicode._invalid_byte_3_of_3(ordch3):
-                raise Utf8CheckError('invalid continuation byte', pos,
-                                     pos + 2)
+            if _invalid_byte_2_of_3(ordch1, ordch2, allow_surrogates):
+                raise CheckError
+            elif _invalid_byte_3_of_3(ordch3):
+                raise CheckError
             # 1110xxxx 10yyyyyy 10zzzzzz -> 00000000 xxxxyyyy yyzzzzzz
-            lgt += 1
+            continue
+
+        if ordch1 <= 0xF4:
+            continuation_bytes += 3
+            if (pos + 3) > len(s):
+                raise CheckError
+            ordch2 = ord(s[pos])
+            ordch3 = ord(s[pos + 1])
+            ordch4 = ord(s[pos + 2])
             pos += 3
 
-        elif n == 4:
-            ordch2 = ord(s[pos+1])
-            ordch3 = ord(s[pos+2])
-            ordch4 = ord(s[pos+3])
-            if runicode._invalid_byte_2_of_4(ordch1, ordch2):
-                raise Utf8CheckError('invalid continuation byte', pos,
-                                     pos + 1)
-            elif runicode._invalid_byte_3_of_4(ordch3):
-                raise Utf8CheckError('invalid continuation byte', pos,
-                                     pos + 2)
-            elif runicode._invalid_byte_4_of_4(ordch4):
-                raise Utf8CheckError('invalid continuation byte', pos,
-                                     pos + 3)
+            if _invalid_byte_2_of_4(ordch1, ordch2):
+                raise CheckError
+            elif _invalid_byte_3_of_4(ordch3):
+                raise CheckError
+            elif _invalid_byte_4_of_4(ordch4):
+                raise CheckError
             # 11110www 10xxxxxx 10yyyyyy 10zzzzzz -> 000wwwxx xxxxyyyy yyzzzzzz
-            c = (((ordch1 & 0x07) << 18) +      # 0b00000111
-                 ((ordch2 & 0x3F) << 12) +      # 0b00111111
-                 ((ordch3 & 0x3F) << 6) +       # 0b00111111
-                 (ordch4 & 0x3F))               # 0b00111111
-            if c <= runicode.MAXUNICODE:
-                lgt += 1
-            else:
-                # append the two surrogates:
-                lgt += 2
-            pos += 4
+            continue
 
-    return pos, lgt
+        raise CheckError
+
+    assert pos == len(s)
+    return pos - continuation_bytes
diff --git a/rpython/rlib/test/test_rutf8.py b/rpython/rlib/test/test_rutf8.py
--- a/rpython/rlib/test/test_rutf8.py
+++ b/rpython/rlib/test/test_rutf8.py
@@ -1,14 +1,18 @@
-
+import py
 import sys
 from hypothesis import given, strategies, settings, example
 
 from rpython.rlib import rutf8, runicode
 
- at given(strategies.integers(min_value=0, max_value=runicode.MAXUNICODE))
-def test_unichr_as_utf8(i):
-    u, lgt = rutf8.unichr_as_utf8(i)
-    r = runicode.UNICHR(i)
-    assert u == r.encode('utf8')
+
+ at given(strategies.characters(), strategies.booleans())
+def test_unichr_as_utf8(c, allow_surrogates):
+    i = ord(c)
+    if not allow_surrogates and 0xD800 <= i <= 0xDFFF:
+        py.test.raises(ValueError, rutf8.unichr_as_utf8, i, allow_surrogates)
+    else:
+        u = rutf8.unichr_as_utf8(i, allow_surrogates)
+        assert u == c.encode('utf8')
 
 @given(strategies.binary())
 def test_check_ascii(s):
@@ -19,28 +23,32 @@
         raised = True
     try:
         rutf8.check_ascii(s)
-    except rutf8.AsciiCheckError as a:
+    except rutf8.CheckError:
         assert raised
-        assert a.pos == e.start
     else:
         assert not raised
 
- at given(strategies.binary())
-def test_str_check_utf8(s):
+ at given(strategies.binary(), strategies.booleans())
+def test_check_utf8(s, allow_surrogates):
+    _test_check_utf8(s, allow_surrogates)
+
+ at given(strategies.text(), strategies.booleans())
+def test_check_utf8_valid(u, allow_surrogates):
+    _test_check_utf8(u.encode('utf-8'), allow_surrogates)
+
+def _test_check_utf8(s, allow_surrogates):
     try:
-        u, _ = runicode.str_decode_utf_8(s, len(s), None, final=True)
+        u, _ = runicode.str_decode_utf_8(s, len(s), None, final=True,
+                                         allow_surrogates=allow_surrogates)
         valid = True
     except UnicodeDecodeError as e:
         valid = False
     try:
-        consumed, length = rutf8.str_check_utf8(s, len(s), final=True)
-    except rutf8.Utf8CheckError as a:
+        length = rutf8.check_utf8(s, allow_surrogates)
+    except rutf8.CheckError:
         assert not valid
-        assert a.startpos == e.start
-        # assert a.end == e.end, ideally
     else:
         assert valid
-        assert consumed == len(s)
         assert length == len(u)
 
 @given(strategies.characters())
@@ -80,5 +88,5 @@
         response = True
     else:
         response = False
-    r = rutf8.utf8_in_chars(unichr(i).encode('utf8'), 0, uni.encode('utf8'))
+    r = unichr(i).encode('utf8') in uni.encode('utf8')
     assert r == response


More information about the pypy-commit mailing list