[pypy-commit] pypy default: ffi.from_buffer("type *")
arigo
pypy.commits at gmail.com
Mon Jun 3 14:50:23 EDT 2019
Author: Armin Rigo <arigo at tunes.org>
Branch:
Changeset: r96729:d9c0166080e4
Date: 2019-06-03 20:39 +0200
http://bitbucket.org/pypy/pypy/changeset/d9c0166080e4/
Log: ffi.from_buffer("type *")
diff --git a/pypy/module/_cffi_backend/cdataobj.py b/pypy/module/_cffi_backend/cdataobj.py
--- a/pypy/module/_cffi_backend/cdataobj.py
+++ b/pypy/module/_cffi_backend/cdataobj.py
@@ -677,11 +677,14 @@
return self.length
def _repr_extra(self):
- if self.w_keepalive is not None:
- name = self.space.type(self.w_keepalive).name
+ from pypy.module._cffi_backend import ctypearray
+ if self.w_keepalive is None:
+ return "buffer RELEASED"
+ obj_tp_name = self.space.type(self.w_keepalive).name
+ if isinstance(self.ctype, ctypearray.W_CTypeArray):
+ return "buffer len %d from '%s' object" % (self.length, obj_tp_name)
else:
- name = "(released)"
- return "buffer len %d from '%s' object" % (self.length, name)
+ return "buffer from '%s' object" % (obj_tp_name,)
def enter_exit(self, exit_now):
# for now, limited effect on PyPy
diff --git a/pypy/module/_cffi_backend/func.py b/pypy/module/_cffi_backend/func.py
--- a/pypy/module/_cffi_backend/func.py
+++ b/pypy/module/_cffi_backend/func.py
@@ -112,9 +112,10 @@
@unwrap_spec(w_ctype=ctypeobj.W_CType, require_writable=int)
def from_buffer(space, w_ctype, w_x, require_writable=0):
- from pypy.module._cffi_backend import ctypearray
- if not isinstance(w_ctype, ctypearray.W_CTypeArray):
- raise oefmt(space.w_TypeError, "expected an array ctype, got '%s'",
+ from pypy.module._cffi_backend import ctypeptr, ctypearray
+ if not isinstance(w_ctype, ctypeptr.W_CTypePtrOrArray):
+ raise oefmt(space.w_TypeError,
+ "expected a poiunter or array ctype, got '%s'",
w_ctype.name)
if space.isinstance_w(w_x, space.w_unicode):
raise oefmt(space.w_TypeError,
@@ -135,33 +136,36 @@
"raw address on PyPy", w_x)
#
buffersize = buf.getlength()
- arraylength = w_ctype.length
- if arraylength >= 0:
- # it's an array with a fixed length; make sure that the
- # buffer contains enough bytes.
- if buffersize < w_ctype.size:
- raise oefmt(space.w_ValueError,
- "buffer is too small (%d bytes) for '%s' (%d bytes)",
- buffersize, w_ctype.name, w_ctype.size)
+ if not isinstance(w_ctype, ctypearray.W_CTypeArray):
+ arraylength = buffersize # number of bytes, not used so far
else:
- # it's an open 'array[]'
- itemsize = w_ctype.ctitem.size
- if itemsize == 1:
- # fast path, performance only
- arraylength = buffersize
- elif itemsize > 0:
- # give it as many items as fit the buffer. Ignore a
- # partial last element.
- arraylength = buffersize / itemsize
+ arraylength = w_ctype.length
+ if arraylength >= 0:
+ # it's an array with a fixed length; make sure that the
+ # buffer contains enough bytes.
+ if buffersize < w_ctype.size:
+ raise oefmt(space.w_ValueError,
+ "buffer is too small (%d bytes) for '%s' (%d bytes)",
+ buffersize, w_ctype.name, w_ctype.size)
else:
- # it's an array 'empty[]'. Unsupported obscure case:
- # the problem is that setting the length of the result
- # to anything large (like SSIZE_T_MAX) is dangerous,
- # because if someone tries to loop over it, it will
- # turn effectively into an infinite loop.
- raise oefmt(space.w_ZeroDivisionError,
- "from_buffer('%s', ..): the actual length of the array "
- "cannot be computed", w_ctype.name)
+ # it's an open 'array[]'
+ itemsize = w_ctype.ctitem.size
+ if itemsize == 1:
+ # fast path, performance only
+ arraylength = buffersize
+ elif itemsize > 0:
+ # give it as many items as fit the buffer. Ignore a
+ # partial last element.
+ arraylength = buffersize / itemsize
+ else:
+ # it's an array 'empty[]'. Unsupported obscure case:
+ # the problem is that setting the length of the result
+ # to anything large (like SSIZE_T_MAX) is dangerous,
+ # because if someone tries to loop over it, it will
+ # turn effectively into an infinite loop.
+ raise oefmt(space.w_ZeroDivisionError,
+ "from_buffer('%s', ..): the actual length of the array "
+ "cannot be computed", w_ctype.name)
#
return cdataobj.W_CDataFromBuffer(space, _cdata, arraylength,
w_ctype, buf, w_x)
diff --git a/pypy/module/_cffi_backend/test/_backend_test_c.py b/pypy/module/_cffi_backend/test/_backend_test_c.py
--- a/pypy/module/_cffi_backend/test/_backend_test_c.py
+++ b/pypy/module/_cffi_backend/test/_backend_test_c.py
@@ -3830,7 +3830,9 @@
BIntP = new_pointer_type(BInt)
BIntA = new_array_type(BIntP, None)
lst = [-12345678, 87654321, 489148]
- bytestring = buffer(newp(BIntA, lst))[:] + b'XYZ'
+ bytestring = bytearray(buffer(newp(BIntA, lst))[:] + b'XYZ')
+ lst2 = lst + [42, -999999999]
+ bytestring2 = bytearray(buffer(newp(BIntA, lst2))[:] + b'XYZ')
#
p1 = from_buffer(BIntA, bytestring) # int[]
assert typeof(p1) is BIntA
@@ -3844,7 +3846,19 @@
p1[-1]
#
py.test.raises(TypeError, from_buffer, BInt, bytestring)
- py.test.raises(TypeError, from_buffer, BIntP, bytestring)
+ #
+ p2 = from_buffer(BIntP, bytestring) # int *
+ assert p2 == p1 or 'PY_DOT_PY' in globals()
+ # note: on py.py ^^^, bytearray buffers are not emulated well enough
+ assert typeof(p2) is BIntP
+ assert p2[0] == lst[0]
+ assert p2[1] == lst[1]
+ assert p2[2] == lst[2]
+ # hopefully does not crash, but doesn't raise an exception:
+ p2[3]
+ p2[-1]
+ # not enough data even for one, but this is not enforced:
+ from_buffer(BIntP, b"")
#
BIntA2 = new_array_type(BIntP, 2)
p2 = from_buffer(BIntA2, bytestring) # int[2]
@@ -3856,7 +3870,7 @@
p2[2]
with pytest.raises(IndexError):
p2[-1]
- assert p2 == p1
+ assert p2 == p1 or 'PY_DOT_PY' in globals()
#
BIntA4 = new_array_type(BIntP, 4) # int[4]: too big
py.test.raises(ValueError, from_buffer, BIntA4, bytestring)
@@ -3866,13 +3880,37 @@
('a2', BInt, -1)])
BStructP = new_pointer_type(BStruct)
BStructA = new_array_type(BStructP, None)
- p1 = from_buffer(BStructA, bytestring) # struct[]
- assert len(p1) == 1
+ p1 = from_buffer(BStructA, bytestring2) # struct[]
+ assert len(p1) == 2
assert typeof(p1) is BStructA
- assert p1[0].a1 == lst[0]
- assert p1[0].a2 == lst[1]
+ assert p1[0].a1 == lst2[0]
+ assert p1[0].a2 == lst2[1]
+ assert p1[1].a1 == lst2[2]
+ assert p1[1].a2 == lst2[3]
with pytest.raises(IndexError):
- p1[1]
+ p1[2]
+ with pytest.raises(IndexError):
+ p1[-1]
+ assert repr(p1) == "<cdata 'foo[]' buffer len 2 from 'bytearray' object>"
+ #
+ p2 = from_buffer(BStructP, bytestring2) # 'struct *'
+ assert p2 == p1 or 'PY_DOT_PY' in globals()
+ assert typeof(p2) is BStructP
+ assert p2.a1 == lst2[0]
+ assert p2.a2 == lst2[1]
+ assert p2[0].a1 == lst2[0]
+ assert p2[0].a2 == lst2[1]
+ assert p2[1].a1 == lst2[2]
+ assert p2[1].a2 == lst2[3]
+ # does not crash:
+ p2[2]
+ p2[-1]
+ # not enough data even for one, but this is not enforced:
+ from_buffer(BStructP, b"")
+ from_buffer(BStructP, b"1234567")
+ #
+ release(p1)
+ assert repr(p1) == "<cdata 'foo[]' buffer RELEASED>"
#
BEmptyStruct = new_struct_type("empty")
complete_struct_or_union(BEmptyStruct, [], Ellipsis, 0)
@@ -3886,7 +3924,37 @@
p1 = from_buffer(BEmptyStructA5, bytestring) # struct empty[5]
assert typeof(p1) is BEmptyStructA5
assert len(p1) == 5
- assert cast(BIntP, p1) == from_buffer(BIntA, bytestring)
+ assert (cast(BIntP, p1) == from_buffer(BIntA, bytestring)
+ or 'PY_DOT_PY' in globals())
+ #
+ BVarStruct = new_struct_type("varfoo")
+ BVarStructP = new_pointer_type(BVarStruct)
+ complete_struct_or_union(BVarStruct, [('a1', BInt, -1),
+ ('va', BIntA, -1)])
+ with pytest.raises(TypeError):
+ from_buffer(BVarStruct, bytestring)
+ pv = from_buffer(BVarStructP, bytestring) # varfoo *
+ assert pv.a1 == lst[0]
+ assert pv.va[0] == lst[1]
+ assert pv.va[1] == lst[2]
+ assert sizeof(pv[0]) == 1 * size_of_int()
+ with pytest.raises(TypeError):
+ len(pv.va)
+ # hopefully does not crash, but doesn't raise an exception:
+ pv.va[2]
+ pv.va[-1]
+ # not enough data even for one, but this is not enforced:
+ from_buffer(BVarStructP, b"")
+ assert repr(pv) == "<cdata 'varfoo *' buffer from 'bytearray' object>"
+ assert repr(pv[0]).startswith("<cdata 'varfoo &' ")
+ #
+ release(pv)
+ assert repr(pv) == "<cdata 'varfoo *' buffer RELEASED>"
+ assert repr(pv[0]).startswith("<cdata 'varfoo &' ")
+ #
+ pv = from_buffer(BVarStructP, bytestring) # make a fresh one
+ with pytest.raises(ValueError):
+ release(pv[0])
def test_memmove():
Short = new_primitive_type("short")
@@ -4312,8 +4380,10 @@
BCharA = new_array_type(BCharP, None)
p = from_buffer(BCharA, a)
assert p[2] == b"z"
+ assert repr(p) == "<cdata 'char[]' buffer len 3 from 'bytearray' object>"
release(p)
assert p[2] == b"z" # true so far, but might change to raise RuntimeError
+ assert repr(p) == "<cdata 'char[]' buffer RELEASED>"
release(p) # no effect
def test_explicit_release_from_buffer_contextmgr():
@@ -4325,6 +4395,7 @@
with p:
assert p[2] == b"z"
assert p[2] == b"z" # true so far, but might change to raise RuntimeError
+ assert repr(p) == "<cdata 'char[]' buffer RELEASED>"
release(p) # no effect
def test_explicit_release_bytearray_on_cpython():
More information about the pypy-commit
mailing list