[pypy-commit] pypy default: ffi.from_buffer("type *")

arigo pypy.commits at gmail.com
Mon Jun 3 14:50:23 EDT 2019


Author: Armin Rigo <arigo at tunes.org>
Branch: 
Changeset: r96729:d9c0166080e4
Date: 2019-06-03 20:39 +0200
http://bitbucket.org/pypy/pypy/changeset/d9c0166080e4/

Log:	ffi.from_buffer("type *")

diff --git a/pypy/module/_cffi_backend/cdataobj.py b/pypy/module/_cffi_backend/cdataobj.py
--- a/pypy/module/_cffi_backend/cdataobj.py
+++ b/pypy/module/_cffi_backend/cdataobj.py
@@ -677,11 +677,14 @@
         return self.length
 
     def _repr_extra(self):
-        if self.w_keepalive is not None:
-            name = self.space.type(self.w_keepalive).name
+        from pypy.module._cffi_backend import ctypearray
+        if self.w_keepalive is None:
+            return "buffer RELEASED"
+        obj_tp_name = self.space.type(self.w_keepalive).name
+        if isinstance(self.ctype, ctypearray.W_CTypeArray):
+            return "buffer len %d from '%s' object" % (self.length, obj_tp_name)
         else:
-            name = "(released)"
-        return "buffer len %d from '%s' object" % (self.length, name)
+            return "buffer from '%s' object" % (obj_tp_name,)
 
     def enter_exit(self, exit_now):
         # for now, limited effect on PyPy
diff --git a/pypy/module/_cffi_backend/func.py b/pypy/module/_cffi_backend/func.py
--- a/pypy/module/_cffi_backend/func.py
+++ b/pypy/module/_cffi_backend/func.py
@@ -112,9 +112,10 @@
 
 @unwrap_spec(w_ctype=ctypeobj.W_CType, require_writable=int)
 def from_buffer(space, w_ctype, w_x, require_writable=0):
-    from pypy.module._cffi_backend import ctypearray
-    if not isinstance(w_ctype, ctypearray.W_CTypeArray):
-        raise oefmt(space.w_TypeError, "expected an array ctype, got '%s'",
+    from pypy.module._cffi_backend import ctypeptr, ctypearray
+    if not isinstance(w_ctype, ctypeptr.W_CTypePtrOrArray):
+        raise oefmt(space.w_TypeError,
+                    "expected a poiunter or array ctype, got '%s'",
                     w_ctype.name)
     if space.isinstance_w(w_x, space.w_unicode):
         raise oefmt(space.w_TypeError,
@@ -135,33 +136,36 @@
                         "raw address on PyPy", w_x)
     #
     buffersize = buf.getlength()
-    arraylength = w_ctype.length
-    if arraylength >= 0:
-        # it's an array with a fixed length; make sure that the
-        # buffer contains enough bytes.
-        if buffersize < w_ctype.size:
-            raise oefmt(space.w_ValueError,
-                "buffer is too small (%d bytes) for '%s' (%d bytes)",
-                buffersize, w_ctype.name, w_ctype.size)
+    if not isinstance(w_ctype, ctypearray.W_CTypeArray):
+        arraylength = buffersize   # number of bytes, not used so far
     else:
-        # it's an open 'array[]'
-        itemsize = w_ctype.ctitem.size
-        if itemsize == 1:
-            # fast path, performance only
-            arraylength = buffersize
-        elif itemsize > 0:
-            # give it as many items as fit the buffer.  Ignore a
-            # partial last element.
-            arraylength = buffersize / itemsize
+        arraylength = w_ctype.length
+        if arraylength >= 0:
+            # it's an array with a fixed length; make sure that the
+            # buffer contains enough bytes.
+            if buffersize < w_ctype.size:
+                raise oefmt(space.w_ValueError,
+                    "buffer is too small (%d bytes) for '%s' (%d bytes)",
+                    buffersize, w_ctype.name, w_ctype.size)
         else:
-            # it's an array 'empty[]'.  Unsupported obscure case:
-            # the problem is that setting the length of the result
-            # to anything large (like SSIZE_T_MAX) is dangerous,
-            # because if someone tries to loop over it, it will
-            # turn effectively into an infinite loop.
-            raise oefmt(space.w_ZeroDivisionError,
-                "from_buffer('%s', ..): the actual length of the array "
-                "cannot be computed", w_ctype.name)
+            # it's an open 'array[]'
+            itemsize = w_ctype.ctitem.size
+            if itemsize == 1:
+                # fast path, performance only
+                arraylength = buffersize
+            elif itemsize > 0:
+                # give it as many items as fit the buffer.  Ignore a
+                # partial last element.
+                arraylength = buffersize / itemsize
+            else:
+                # it's an array 'empty[]'.  Unsupported obscure case:
+                # the problem is that setting the length of the result
+                # to anything large (like SSIZE_T_MAX) is dangerous,
+                # because if someone tries to loop over it, it will
+                # turn effectively into an infinite loop.
+                raise oefmt(space.w_ZeroDivisionError,
+                    "from_buffer('%s', ..): the actual length of the array "
+                    "cannot be computed", w_ctype.name)
     #
     return cdataobj.W_CDataFromBuffer(space, _cdata, arraylength,
                                       w_ctype, buf, w_x)
diff --git a/pypy/module/_cffi_backend/test/_backend_test_c.py b/pypy/module/_cffi_backend/test/_backend_test_c.py
--- a/pypy/module/_cffi_backend/test/_backend_test_c.py
+++ b/pypy/module/_cffi_backend/test/_backend_test_c.py
@@ -3830,7 +3830,9 @@
     BIntP = new_pointer_type(BInt)
     BIntA = new_array_type(BIntP, None)
     lst = [-12345678, 87654321, 489148]
-    bytestring = buffer(newp(BIntA, lst))[:] + b'XYZ'
+    bytestring = bytearray(buffer(newp(BIntA, lst))[:] + b'XYZ')
+    lst2 = lst + [42, -999999999]
+    bytestring2 = bytearray(buffer(newp(BIntA, lst2))[:] + b'XYZ')
     #
     p1 = from_buffer(BIntA, bytestring)      # int[]
     assert typeof(p1) is BIntA
@@ -3844,7 +3846,19 @@
         p1[-1]
     #
     py.test.raises(TypeError, from_buffer, BInt, bytestring)
-    py.test.raises(TypeError, from_buffer, BIntP, bytestring)
+    #
+    p2 = from_buffer(BIntP, bytestring)      # int *
+    assert p2 == p1 or 'PY_DOT_PY' in globals()
+    # note: on py.py ^^^, bytearray buffers are not emulated well enough
+    assert typeof(p2) is BIntP
+    assert p2[0] == lst[0]
+    assert p2[1] == lst[1]
+    assert p2[2] == lst[2]
+    # hopefully does not crash, but doesn't raise an exception:
+    p2[3]
+    p2[-1]
+    # not enough data even for one, but this is not enforced:
+    from_buffer(BIntP, b"")
     #
     BIntA2 = new_array_type(BIntP, 2)
     p2 = from_buffer(BIntA2, bytestring)     # int[2]
@@ -3856,7 +3870,7 @@
         p2[2]
     with pytest.raises(IndexError):
         p2[-1]
-    assert p2 == p1
+    assert p2 == p1 or 'PY_DOT_PY' in globals()
     #
     BIntA4 = new_array_type(BIntP, 4)        # int[4]: too big
     py.test.raises(ValueError, from_buffer, BIntA4, bytestring)
@@ -3866,13 +3880,37 @@
                                        ('a2', BInt, -1)])
     BStructP = new_pointer_type(BStruct)
     BStructA = new_array_type(BStructP, None)
-    p1 = from_buffer(BStructA, bytestring)   # struct[]
-    assert len(p1) == 1
+    p1 = from_buffer(BStructA, bytestring2)   # struct[]
+    assert len(p1) == 2
     assert typeof(p1) is BStructA
-    assert p1[0].a1 == lst[0]
-    assert p1[0].a2 == lst[1]
+    assert p1[0].a1 == lst2[0]
+    assert p1[0].a2 == lst2[1]
+    assert p1[1].a1 == lst2[2]
+    assert p1[1].a2 == lst2[3]
     with pytest.raises(IndexError):
-        p1[1]
+        p1[2]
+    with pytest.raises(IndexError):
+        p1[-1]
+    assert repr(p1) == "<cdata 'foo[]' buffer len 2 from 'bytearray' object>"
+    #
+    p2 = from_buffer(BStructP, bytestring2)    # 'struct *'
+    assert p2 == p1 or 'PY_DOT_PY' in globals()
+    assert typeof(p2) is BStructP
+    assert p2.a1 == lst2[0]
+    assert p2.a2 == lst2[1]
+    assert p2[0].a1 == lst2[0]
+    assert p2[0].a2 == lst2[1]
+    assert p2[1].a1 == lst2[2]
+    assert p2[1].a2 == lst2[3]
+    # does not crash:
+    p2[2]
+    p2[-1]
+    # not enough data even for one, but this is not enforced:
+    from_buffer(BStructP, b"")
+    from_buffer(BStructP, b"1234567")
+    #
+    release(p1)
+    assert repr(p1) == "<cdata 'foo[]' buffer RELEASED>"
     #
     BEmptyStruct = new_struct_type("empty")
     complete_struct_or_union(BEmptyStruct, [], Ellipsis, 0)
@@ -3886,7 +3924,37 @@
     p1 = from_buffer(BEmptyStructA5, bytestring)   # struct empty[5]
     assert typeof(p1) is BEmptyStructA5
     assert len(p1) == 5
-    assert cast(BIntP, p1) == from_buffer(BIntA, bytestring)
+    assert (cast(BIntP, p1) == from_buffer(BIntA, bytestring)
+            or 'PY_DOT_PY' in globals())
+    #
+    BVarStruct = new_struct_type("varfoo")
+    BVarStructP = new_pointer_type(BVarStruct)
+    complete_struct_or_union(BVarStruct, [('a1', BInt, -1),
+                                          ('va', BIntA, -1)])
+    with pytest.raises(TypeError):
+        from_buffer(BVarStruct, bytestring)
+    pv = from_buffer(BVarStructP, bytestring)    # varfoo *
+    assert pv.a1 == lst[0]
+    assert pv.va[0] == lst[1]
+    assert pv.va[1] == lst[2]
+    assert sizeof(pv[0]) == 1 * size_of_int()
+    with pytest.raises(TypeError):
+        len(pv.va)
+    # hopefully does not crash, but doesn't raise an exception:
+    pv.va[2]
+    pv.va[-1]
+    # not enough data even for one, but this is not enforced:
+    from_buffer(BVarStructP, b"")
+    assert repr(pv) == "<cdata 'varfoo *' buffer from 'bytearray' object>"
+    assert repr(pv[0]).startswith("<cdata 'varfoo &' ")
+    #
+    release(pv)
+    assert repr(pv) == "<cdata 'varfoo *' buffer RELEASED>"
+    assert repr(pv[0]).startswith("<cdata 'varfoo &' ")
+    #
+    pv = from_buffer(BVarStructP, bytestring)    # make a fresh one
+    with pytest.raises(ValueError):
+        release(pv[0])
 
 def test_memmove():
     Short = new_primitive_type("short")
@@ -4312,8 +4380,10 @@
     BCharA = new_array_type(BCharP, None)
     p = from_buffer(BCharA, a)
     assert p[2] == b"z"
+    assert repr(p) == "<cdata 'char[]' buffer len 3 from 'bytearray' object>"
     release(p)
     assert p[2] == b"z"  # true so far, but might change to raise RuntimeError
+    assert repr(p) == "<cdata 'char[]' buffer RELEASED>"
     release(p)   # no effect
 
 def test_explicit_release_from_buffer_contextmgr():
@@ -4325,6 +4395,7 @@
     with p:
         assert p[2] == b"z"
     assert p[2] == b"z"  # true so far, but might change to raise RuntimeError
+    assert repr(p) == "<cdata 'char[]' buffer RELEASED>"
     release(p)   # no effect
 
 def test_explicit_release_bytearray_on_cpython():


More information about the pypy-commit mailing list