[pypy-commit] pypy ffi-backend: Start writing a compatibility layer for _rawffi on top of CFFI.
arigo
noreply at buildbot.pypy.org
Mon Aug 6 10:52:33 CEST 2012
Author: Armin Rigo <arigo at tunes.org>
Branch: ffi-backend
Changeset: r56593:c0fb051a56a3
Date: 2012-08-06 10:52 +0200
http://bitbucket.org/pypy/pypy/changeset/c0fb051a56a3/
Log: Start writing a compatibility layer for _rawffi on top of CFFI.
That's wrong, but it should offer the fastest transition layer.
diff --git a/lib_pypy/_rawffi.py b/lib_pypy/_rawffi.py
new file mode 100644
--- /dev/null
+++ b/lib_pypy/_rawffi.py
@@ -0,0 +1,78 @@
+import _cffi_backend
+
+cffi_type_void = _cffi_backend.new_void_type()
+cffi_type_pointer = _cffi_backend.new_pointer_type(cffi_type_void)
+
+cffi_type_char = _cffi_backend.new_primitive_type("char")
+cffi_type_schar = _cffi_backend.new_primitive_type("signed char")
+cffi_type_uchar = _cffi_backend.new_primitive_type("unsigned char")
+cffi_type_short = _cffi_backend.new_primitive_type("short")
+cffi_type_ushort = _cffi_backend.new_primitive_type("unsigned short")
+cffi_type_long = _cffi_backend.new_primitive_type("long")
+cffi_type_ulong = _cffi_backend.new_primitive_type("unsigned long")
+cffi_type_longlong = _cffi_backend.new_primitive_type("long long")
+cffi_type_ulonglong = _cffi_backend.new_primitive_type("unsigned long long")
+cffi_type_float = _cffi_backend.new_primitive_type("float")
+cffi_type_double = _cffi_backend.new_primitive_type("double")
+cffi_type_longdouble = _cffi_backend.new_primitive_type("long double")
+
+cffi_type_short_p = _cffi_backend.new_pointer_type(cffi_type_short)
+cffi_type_ushort_p = _cffi_backend.new_pointer_type(cffi_type_ushort)
+cffi_type_long_p = _cffi_backend.new_pointer_type(cffi_type_long)
+cffi_type_ulong_p = _cffi_backend.new_pointer_type(cffi_type_ulong)
+
+cffi_types = {
+ 'c': cffi_type_char,
+ 'b': cffi_type_schar,
+ 'B': cffi_type_uchar,
+ 'h': cffi_type_short,
+ 'H': cffi_type_ushort,
+ 'l': cffi_type_long,
+ 'L': cffi_type_ulong,
+ 'q': cffi_type_longlong,
+ 'Q': cffi_type_ulonglong,
+ 'f': cffi_type_float,
+ 'd': cffi_type_double,
+ 'g': cffi_type_longdouble,
+ 'z': cffi_type_pointer,
+ 'P': cffi_type_pointer,
+ 'O': cffi_type_pointer,
+ }
+
+
+def sizeof(tp_letter):
+ return _cffi_backend.sizeof(cffi_types[tp_letter])
+
+def alignment(tp_letter):
+ return _cffi_backend.alignof(cffi_types[tp_letter])
+
+class CDLL(object):
+ def __init__(self, libname):
+ if libname is None:
+ from ctypes.util import find_library
+ libname = find_library('c')
+ self._cffi_library = _cffi_backend.load_library(libname)
+ self.libname = libname
+
+ def getaddressindll(self, name):
+ return self._cffi_library.read_variable(cffi_type_pointer, name)
+
+def get_libc():
+ return CDLL(None)
+
+FUNCFLAG_STDCALL = 0 # on Windows: for WINAPI calls
+FUNCFLAG_CDECL = 1 # on Windows: for __cdecl calls
+FUNCFLAG_PYTHONAPI = 4
+FUNCFLAG_USE_ERRNO = 8
+FUNCFLAG_USE_LASTERROR = 16
+
+class DataInstance(object):
+ pass
+
+class Array(DataInstance):
+ def __init__(self, shape):
+ pass
+
+class CallbackPtr(DataInstance):
+ def __init__(self, *stuff):
+ pass
diff --git a/lib_pypy/pypy_test/test__rawffi.py b/lib_pypy/pypy_test/test__rawffi.py
new file mode 100644
--- /dev/null
+++ b/lib_pypy/pypy_test/test__rawffi.py
@@ -0,0 +1,1058 @@
+import os, sys, py
+
+class TestFfi:
+ def prepare_c_example():
+ from pypy.tool.udir import udir
+ c_file = udir.ensure("test__rawffi", dir=1).join("xlib.c")
+ c_file.write(py.code.Source('''
+ #include <stdlib.h>
+ #include <stdio.h>
+
+ struct x
+ {
+ int x1;
+ short x2;
+ char x3;
+ struct x* next;
+ };
+
+ void nothing()
+ {
+ }
+
+ char inner_struct_elem(struct x *x1)
+ {
+ return x1->next->x3;
+ }
+
+ struct x* create_double_struct()
+ {
+ struct x* x1, *x2;
+
+ x1 = (struct x*)malloc(sizeof(struct x));
+ x2 = (struct x*)malloc(sizeof(struct x));
+ x1->next = x2;
+ x2->x2 = 3;
+ return x1;
+ }
+
+ void free_double_struct(struct x* x1)
+ {
+ free(x1->next);
+ free(x1);
+ }
+
+ const char *static_str = "xxxxxx";
+ long static_int = 42;
+ double static_double = 42.42;
+ long double static_longdouble = 42.42;
+
+ unsigned short add_shorts(short one, short two)
+ {
+ return one + two;
+ }
+
+ void* get_raw_pointer()
+ {
+ return (void*)add_shorts;
+ }
+
+ char get_char(char* s, unsigned short num)
+ {
+ return s[num];
+ }
+
+ const char *char_check(char x, char y)
+ {
+ if (y == static_str[0])
+ return static_str;
+ return NULL;
+ }
+
+ int get_array_elem(int* stuff, int num)
+ {
+ return stuff[num];
+ }
+
+ struct x* get_array_elem_s(struct x** array, int num)
+ {
+ return array[num];
+ }
+
+ long long some_huge_value()
+ {
+ return 1LL<<42;
+ }
+
+ unsigned long long some_huge_uvalue()
+ {
+ return 1LL<<42;
+ }
+
+ long long pass_ll(long long x)
+ {
+ return x;
+ }
+
+ static int prebuilt_array1[] = {3};
+
+ int* allocate_array()
+ {
+ return prebuilt_array1;
+ }
+
+ long long runcallback(long long(*callback)())
+ {
+ return callback();
+ }
+
+ struct x_y {
+ long x;
+ long y;
+ };
+
+ long sum_x_y(struct x_y s) {
+ return s.x + s.y;
+ }
+
+ long op_x_y(struct x_y s, long(*callback)(struct x_y))
+ {
+ return callback(s);
+ }
+
+ struct s2h {
+ short x;
+ short y;
+ };
+
+ struct s2h give(short x, short y) {
+ struct s2h out;
+ out.x = x;
+ out.y = y;
+ return out;
+ }
+
+ struct s2h perturb(struct s2h inp) {
+ inp.x *= 2;
+ inp.y *= 3;
+ return inp;
+ }
+
+ struct s2a {
+ int bah[2];
+ };
+
+ struct s2a get_s2a(void) {
+ struct s2a outp;
+ outp.bah[0] = 4;
+ outp.bah[1] = 5;
+ return outp;
+ }
+
+ int check_s2a(struct s2a inp) {
+ return (inp.bah[0] == 4 && inp.bah[1] == 5);
+ }
+
+ int AAA_first_ordinal_function()
+ {
+ return 42;
+ }
+
+ typedef union {
+ short x;
+ long y;
+ } UN;
+
+ UN ret_un_func(UN inp)
+ {
+ inp.y = inp.x * 100;
+ return inp;
+ }
+
+ '''))
+ symbols = """get_char char_check get_raw_pointer
+ add_shorts
+ inner_struct_elem create_double_struct free_double_struct
+ get_array_elem get_array_elem_s
+ nothing
+ some_huge_value some_huge_uvalue pass_ll
+ runcallback
+ allocate_array
+ static_int static_double static_longdouble
+ sum_x_y op_x_y
+ give perturb get_s2a check_s2a
+ AAA_first_ordinal_function
+ ret_un_func
+ """.split()
+ eci = ExternalCompilationInfo(export_symbols=symbols)
+ return str(platform.compile([c_file], eci, 'x', standalone=False))
+ prepare_c_example = staticmethod(prepare_c_example)
+
+## def setup_class(cls):
+## from pypy.rlib.clibffi import get_libc_name
+## space = gettestobjspace(usemodules=('_rawffi', 'struct'))
+## cls.space = space
+## cls.w_lib_name = space.wrap(cls.prepare_c_example())
+## cls.w_libc_name = space.wrap(get_libc_name())
+## if sys.platform == 'win32':
+## cls.w_iswin32 = space.wrap(True)
+## cls.w_libm_name = space.wrap('msvcrt')
+## else:
+## cls.w_iswin32 = space.wrap(False)
+## cls.w_libm_name = space.wrap('libm.so')
+## if sys.platform == "darwin":
+## cls.w_libm_name = space.wrap('libm.dylib')
+## cls.w_platform = space.wrap(platform.name)
+## cls.w_sizes_and_alignments = space.wrap(dict(
+## [(k, (v.c_size, v.c_alignment)) for k,v in TYPEMAP.iteritems()]))
+
+ libc_name = 'libc.so.6' # XXX
+
+ def test_libload(self):
+ import _rawffi
+ _rawffi.CDLL(self.libc_name)
+
+ def test_libload_fail(self):
+ import _rawffi
+ try:
+ _rawffi.CDLL("xxxxx_this_name_does_not_exist_xxxxx")
+ except OSError, e:
+ print e
+ assert str(e).startswith("xxxxx_this_name_does_not_exist_xxxxx: ")
+ else:
+ raise AssertionError("did not fail??")
+
+ def test_libload_None(self):
+ if self.iswin32:
+ skip("unix specific")
+ import _rawffi
+ # this should return *all* loaded libs, dlopen(NULL)
+ dll = _rawffi.CDLL(None)
+ # Assume CPython, or PyPy compiled with cpyext
+ res = dll.ptr('Py_IsInitialized', [], 'l')()
+ assert res[0] == 1
+
+ def test_libc_load(self):
+ import _rawffi
+ _rawffi.get_libc()
+
+ def test_getattr(self):
+ import _rawffi
+ libc = _rawffi.get_libc()
+ func = libc.ptr('rand', [], 'i')
+ assert libc.ptr('rand', [], 'i') is func # caching
+ assert libc.ptr('rand', [], 'l') is not func
+ assert isinstance(func, _rawffi.FuncPtr)
+ raises(AttributeError, "libc.xxxxxxxxxxxxxx")
+
+ def test_byordinal(self):
+ if not self.iswin32:
+ skip("win32 specific")
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ # This will call the ordinal function numbered 1
+ # my compiler seems to order them alphabetically:
+ # AAA_first_ordinal_function
+ assert lib.ptr(1, [], 'i')()[0] == 42
+
+ def test_getchar(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ get_char = lib.ptr('get_char', ['P', 'H'], 'c')
+ A = _rawffi.Array('c')
+ B = _rawffi.Array('H')
+ dupa = A(5, 'dupa')
+ dupaptr = dupa.byptr()
+ for i in range(4):
+ intptr = B(1)
+ intptr[0] = i
+ res = get_char(dupaptr, intptr)
+ assert res[0] == 'dupa'[i]
+ intptr.free()
+ dupaptr.free()
+ dupa.free()
+
+ def test_chararray_as_bytebuffer(self):
+ # a useful extension to arrays of shape 'c': buffer-like slicing
+ import _rawffi
+ A = _rawffi.Array('c')
+ buf = A(10, autofree=True)
+ buf[0] = '*'
+ assert buf[1:5] == '\x00' * 4
+ buf[7:] = 'abc'
+ assert buf[9] == 'c'
+ assert buf[:8] == '*' + '\x00'*6 + 'a'
+
+ def test_returning_str(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ char_check = lib.ptr('char_check', ['c', 'c'], 's')
+ A = _rawffi.Array('c')
+ arg1 = A(1)
+ arg2 = A(1)
+ arg1[0] = 'y'
+ arg2[0] = 'x'
+ res = char_check(arg1, arg2)
+ assert _rawffi.charp2string(res[0]) == 'xxxxxx'
+ assert _rawffi.charp2rawstring(res[0]) == 'xxxxxx'
+ assert _rawffi.charp2rawstring(res[0], 3) == 'xxx'
+ a = A(6, 'xx\x00\x00xx')
+ assert _rawffi.charp2string(a.buffer) == 'xx'
+ assert _rawffi.charp2rawstring(a.buffer, 4) == 'xx\x00\x00'
+ arg1[0] = 'x'
+ arg2[0] = 'y'
+ res = char_check(arg1, arg2)
+ assert res[0] == 0
+ assert _rawffi.charp2string(res[0]) is None
+ arg1.free()
+ arg2.free()
+ a.free()
+
+ def test_returning_unicode(self):
+ import _rawffi
+ A = _rawffi.Array('u')
+ a = A(6, u'xx\x00\x00xx')
+ res = _rawffi.wcharp2unicode(a.buffer)
+ assert isinstance(res, unicode)
+ assert res == u'xx'
+ a.free()
+
+ def test_raw_callable(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ get_raw_pointer = lib.ptr('get_raw_pointer', [], 'P')
+ ptr = get_raw_pointer()
+ rawcall = _rawffi.FuncPtr(ptr[0], ['h', 'h'], 'H')
+ A = _rawffi.Array('h')
+ arg1 = A(1)
+ arg2 = A(1)
+ arg1[0] = 1
+ arg2[0] = 2
+ res = rawcall(arg1, arg2)
+ assert res[0] == 3
+ arg1.free()
+ arg2.free()
+ assert rawcall.buffer == ptr[0]
+ ptr = rawcall.byptr()
+ assert ptr[0] == rawcall.buffer
+ ptr.free()
+
+ def test_short_addition(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ short_add = lib.ptr('add_shorts', ['h', 'h'], 'H')
+ A = _rawffi.Array('h')
+ arg1 = A(1)
+ arg2 = A(1)
+ arg1[0] = 1
+ arg2[0] = 2
+ res = short_add(arg1, arg2)
+ assert res[0] == 3
+ arg1.free()
+ arg2.free()
+
+ def test_pow(self):
+ import _rawffi
+ libm = _rawffi.CDLL(self.libm_name)
+ pow = libm.ptr('pow', ['d', 'd'], 'd')
+ A = _rawffi.Array('d')
+ arg1 = A(1)
+ arg2 = A(1)
+ raises(TypeError, "arg1[0] = 'x'")
+ arg1[0] = 3
+ arg2[0] = 2.0
+ res = pow(arg1, arg2)
+ assert res[0] == 9.0
+ arg1.free()
+ arg2.free()
+
+ def test_time(self):
+ import _rawffi
+ libc = _rawffi.get_libc()
+ try:
+ time = libc.ptr('time', ['z'], 'l') # 'z' instead of 'P' just for test
+ except AttributeError:
+ # Since msvcr80, this function is named differently
+ time = libc.ptr('_time32', ['z'], 'l')
+ arg = _rawffi.Array('P')(1)
+ arg[0] = 0
+ res = time(arg)
+ assert res[0] != 0
+ arg.free()
+
+ def test_gettimeofday(self):
+ if self.iswin32:
+ skip("No gettimeofday on win32")
+ import _rawffi
+ struct_type = _rawffi.Structure([('tv_sec', 'l'), ('tv_usec', 'l')])
+ structure = struct_type()
+ libc = _rawffi.get_libc()
+ gettimeofday = libc.ptr('gettimeofday', ['P', 'P'], 'i')
+
+ arg1 = structure.byptr()
+ arg2 = _rawffi.Array('P')(1)
+ res = gettimeofday(arg1, arg2)
+ assert res[0] == 0
+
+ struct2 = struct_type()
+ arg1[0] = struct2
+ res = gettimeofday(arg1, arg2)
+ assert res[0] == 0
+
+ assert structure.tv_usec != struct2.tv_usec
+ assert (structure.tv_sec == struct2.tv_sec) or (structure.tv_sec == struct2.tv_sec - 1)
+ raises(AttributeError, "structure.xxx")
+ structure.free()
+ struct2.free()
+ arg1.free()
+ arg2.free()
+
+ def test_structreturn(self):
+ import _rawffi
+ X = _rawffi.Structure([('x', 'l')])
+ x = X()
+ x.x = 121
+ Tm = _rawffi.Structure([('tm_sec', 'i'),
+ ('tm_min', 'i'),
+ ('tm_hour', 'i'),
+ ("tm_mday", 'i'),
+ ("tm_mon", 'i'),
+ ("tm_year", 'i'),
+ ("tm_wday", 'i'),
+ ("tm_yday", 'i'),
+ ("tm_isdst", 'i')])
+ libc = _rawffi.get_libc()
+ try:
+ gmtime = libc.ptr('gmtime', ['P'], 'P')
+ except AttributeError:
+ # Since msvcr80, this function is named differently
+ gmtime = libc.ptr('_gmtime32', ['P'], 'P')
+
+ arg = x.byptr()
+ res = gmtime(arg)
+ t = Tm.fromaddress(res[0])
+ arg.free()
+ assert t.tm_year == 70
+ assert t.tm_sec == 1
+ assert t.tm_min == 2
+ x.free()
+
+ def test_nested_structures(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ inner = lib.ptr("inner_struct_elem", ['P'], 'c')
+ X = _rawffi.Structure([('x1', 'i'), ('x2', 'h'), ('x3', 'c'), ('next', 'P')])
+ next = X()
+ next.next = 0
+ next.x3 = 'x'
+ x = X()
+ x.next = next
+ x.x1 = 1
+ x.x2 = 2
+ x.x3 = 'x'
+ assert X.fromaddress(x.next).x3 == 'x'
+ x.free()
+ next.free()
+ create_double_struct = lib.ptr("create_double_struct", [], 'P')
+ res = create_double_struct()
+ x = X.fromaddress(res[0])
+ assert X.fromaddress(x.next).x2 == 3
+ free_double_struct = lib.ptr("free_double_struct", ['P'], None)
+ free_double_struct(res)
+
+ def test_structure_bitfields(self):
+ import _rawffi
+ X = _rawffi.Structure([('A', 'I', 1),
+ ('B', 'I', 2),
+ ('C', 'i', 2)])
+ x = X()
+ x.A = 0xf
+ x.B = 0xf
+ x.C = 0xf
+ assert x.A == 1
+ assert x.B == 3
+ assert x.C == -1
+ x.free()
+
+ Y = _rawffi.Structure([('a', 'i', 1),
+ ('b', 'i', 30),
+ ('c', 'i', 1)])
+ y = Y()
+ y.a, y.b, y.c = -1, -7, 0
+ assert (y.a, y.b, y.c) == (-1, -7, 0)
+ y.free()
+
+ def test_invalid_bitfields(self):
+ import _rawffi
+ raises(TypeError, _rawffi.Structure, [('A', 'c', 1)])
+ raises(ValueError, _rawffi.Structure, [('A', 'I', 129)])
+ raises(ValueError, _rawffi.Structure, [('A', 'I', -1)])
+ raises(ValueError, _rawffi.Structure, [('A', 'I', 0)])
+
+ def test_packed_structure(self):
+ import _rawffi
+ Y = _rawffi.Structure([('a', 'c'),
+ ('b', 'i')], pack=1)
+ assert Y.size == 5
+
+ def test_array(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ A = _rawffi.Array('i')
+ get_array_elem = lib.ptr('get_array_elem', ['P', 'i'], 'i')
+ a = A(10)
+ a[8] = 3
+ a[7] = 1
+ a[6] = 2
+ arg1 = a.byptr()
+ arg2 = A(1)
+ for i, expected in enumerate([0, 0, 0, 0, 0, 0, 2, 1, 3, 0]):
+ arg2[0] = i
+ res = get_array_elem(arg1, arg2)
+ assert res[0] == expected
+ arg1.free()
+ arg2.free()
+ assert a[3] == 0
+ a.free()
+
+ def test_array_of_structure(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ A = _rawffi.Array('P')
+ X = _rawffi.Structure([('x1', 'i'), ('x2', 'h'), ('x3', 'c'), ('next', 'P')])
+ x = X()
+ x.x2 = 3
+ a = A(3)
+ a[1] = x
+ get_array_elem_s = lib.ptr('get_array_elem_s', ['P', 'i'], 'P')
+ arg1 = a.byptr()
+ arg2 = _rawffi.Array('i')(1)
+ res = get_array_elem_s(arg1, arg2)
+ assert res[0] == 0
+ arg2[0] = 1
+ res = get_array_elem_s(arg1, arg2)
+ assert X.fromaddress(res[0]).x2 == 3
+ assert res[0] == x.buffer
+ arg1.free()
+ arg2.free()
+ x.free()
+ a.free()
+
+ def test_bad_parameters(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ nothing = lib.ptr('nothing', [], None)
+ assert nothing() is None
+ raises(AttributeError, "lib.ptr('get_charx', [], None)")
+ raises(ValueError, "lib.ptr('get_char', ['xx'], None)")
+ raises(ValueError, "lib.ptr('get_char', ['x'], None)")
+ raises(ValueError, "lib.ptr('get_char', [], 'x')")
+ raises(ValueError, "_rawffi.Structure(['x1', 'xx'])")
+ raises(ValueError, _rawffi.Structure, [('x1', 'xx')])
+ raises(ValueError, "_rawffi.Array('xx')")
+
+ def test_longs_ulongs(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ some_huge_value = lib.ptr('some_huge_value', [], 'q')
+ res = some_huge_value()
+ assert res[0] == 1<<42
+ some_huge_uvalue = lib.ptr('some_huge_uvalue', [], 'Q')
+ res = some_huge_uvalue()
+ assert res[0] == 1<<42
+ pass_ll = lib.ptr('pass_ll', ['q'], 'q')
+ arg1 = _rawffi.Array('q')(1)
+ arg1[0] = 1<<42
+ res = pass_ll(arg1)
+ assert res[0] == 1<<42
+ arg1.free()
+
+ def test_callback(self):
+ import _rawffi
+ import struct
+ libc = _rawffi.get_libc()
+ ll_to_sort = _rawffi.Array('i')(4)
+ for i in range(4):
+ ll_to_sort[i] = 4-i
+ qsort = libc.ptr('qsort', ['P', 'l', 'l', 'P'], None)
+ bogus_args = []
+ def compare(a, b):
+ a1 = _rawffi.Array('i').fromaddress(_rawffi.Array('P').fromaddress(a, 1)[0], 1)
+ a2 = _rawffi.Array('i').fromaddress(_rawffi.Array('P').fromaddress(b, 1)[0], 1)
+ print "comparing", a1[0], "with", a2[0]
+ if a1[0] not in [1,2,3,4] or a2[0] not in [1,2,3,4]:
+ bogus_args.append((a1[0], a2[0]))
+ if a1[0] > a2[0]:
+ return 1
+ return -1
+ a1 = ll_to_sort.byptr()
+ a2 = _rawffi.Array('l')(1)
+ a2[0] = len(ll_to_sort)
+ a3 = _rawffi.Array('l')(1)
+ a3[0] = struct.calcsize('i')
+ cb = _rawffi.CallbackPtr(compare, ['P', 'P'], 'i')
+ a4 = cb.byptr()
+ qsort(a1, a2, a3, a4)
+ res = [ll_to_sort[i] for i in range(len(ll_to_sort))]
+ assert res == [1,2,3,4]
+ assert not bogus_args
+ a1.free()
+ a2.free()
+ a3.free()
+ a4.free()
+ ll_to_sort.free()
+ cb.free()
+
+ def test_another_callback(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ runcallback = lib.ptr('runcallback', ['P'], 'q')
+ def callback():
+ return 1<<42
+
+ cb = _rawffi.CallbackPtr(callback, [], 'q')
+ a1 = cb.byptr()
+ res = runcallback(a1)
+ assert res[0] == 1<<42
+ a1.free()
+ cb.free()
+
+ def test_void_returning_callback(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ runcallback = lib.ptr('runcallback', ['P'], None)
+ called = []
+ def callback():
+ called.append(True)
+
+ cb = _rawffi.CallbackPtr(callback, [], None)
+ a1 = cb.byptr()
+ res = runcallback(a1)
+ assert res is None
+ assert called == [True]
+ a1.free()
+ cb.free()
+
+ def test_raising_callback(self):
+ import _rawffi, sys
+ import StringIO
+ lib = _rawffi.CDLL(self.lib_name)
+ err = StringIO.StringIO()
+ orig = sys.stderr
+ sys.stderr = err
+ try:
+ runcallback = lib.ptr('runcallback', ['P'], 'q')
+ def callback():
+ 1/0
+
+ cb = _rawffi.CallbackPtr(callback, [], 'q')
+ a1 = cb.byptr()
+ res = runcallback(a1)
+ a1.free()
+ cb.free()
+ val = err.getvalue()
+ assert 'ZeroDivisionError' in val
+ assert 'callback' in val
+ assert res[0] == 0L
+ finally:
+ sys.stderr = orig
+
+
+ def test_setattr_struct(self):
+ import _rawffi
+ X = _rawffi.Structure([('value1', 'i'), ('value2', 'i')])
+ x = X()
+ x.value1 = 1
+ x.value2 = 2
+ assert x.value1 == 1
+ assert x.value2 == 2
+ x.value1 = 3
+ assert x.value1 == 3
+ raises(AttributeError, "x.foo")
+ raises(AttributeError, "x.foo = 1")
+ x.free()
+
+ def test_sizes_and_alignments(self):
+ import _rawffi
+ for k, (s, a) in self.sizes_and_alignments.iteritems():
+ assert _rawffi.sizeof(k) == s
+ assert _rawffi.alignment(k) == a
+
+ def test_array_addressof(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ alloc = lib.ptr('allocate_array', [], 'P')
+ A = _rawffi.Array('i')
+ res = alloc()
+ a = A.fromaddress(res[0], 1)
+ assert a[0] == 3
+ assert A.fromaddress(a.buffer, 1)[0] == 3
+
+ def test_shape(self):
+ import _rawffi
+ A = _rawffi.Array('i')
+ a = A(1)
+ assert a.shape is A
+ a.free()
+ S = _rawffi.Structure([('v1', 'i')])
+ s = S()
+ s.v1 = 3
+ assert s.shape is S
+ s.free()
+
+ def test_negative_pointers(self):
+ import _rawffi
+ A = _rawffi.Array('P')
+ a = A(1)
+ a[0] = -1234
+ a.free()
+
+ def test_long_with_fromaddress(self):
+ import _rawffi
+ addr = -1
+ raises(ValueError, _rawffi.Array('u').fromaddress, addr, 100)
+
+ def test_passing_raw_pointers(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ A = _rawffi.Array('i')
+ get_array_elem = lib.ptr('get_array_elem', ['P', 'i'], 'i')
+ a = A(1)
+ a[0] = 3
+ arg1 = _rawffi.Array('P')(1)
+ arg1[0] = a.buffer
+ arg2 = _rawffi.Array('i')(1)
+ res = get_array_elem(arg1, arg2)
+ assert res[0] == 3
+ arg1.free()
+ arg2.free()
+ a.free()
+
+ def test_repr(self):
+ import _rawffi, struct
+ isize = struct.calcsize("i")
+ lsize = struct.calcsize("l")
+ assert (repr(_rawffi.Array('i')) ==
+ "<_rawffi.Array 'i' (%d, %d)>" % (isize, isize))
+
+ # fragile
+ S = _rawffi.Structure([('x', 'c'), ('y', 'l')])
+ assert (repr(_rawffi.Array((S, 2))) ==
+ "<_rawffi.Array '\0' (%d, %d)>" % (4*lsize, lsize))
+
+ assert (repr(_rawffi.Structure([('x', 'i'), ('yz', 'i')])) ==
+ "<_rawffi.Structure 'x' 'yz' (%d, %d)>" % (2*isize, isize))
+
+ s = _rawffi.Structure([('x', 'i'), ('yz', 'i')])()
+ assert repr(s) == "<_rawffi struct %x>" % (s.buffer,)
+ s.free()
+ a = _rawffi.Array('i')(5)
+ assert repr(a) == "<_rawffi array %x of length %d>" % (a.buffer,
+ len(a))
+ a.free()
+
+ def test_wide_char(self):
+ import _rawffi, sys
+ A = _rawffi.Array('u')
+ a = A(3)
+ a[0] = u'x'
+ a[1] = u'y'
+ a[2] = u'z'
+ assert a[0] == u'x'
+ b = _rawffi.Array('c').fromaddress(a.buffer, 38)
+ if sys.maxunicode > 65535:
+ # UCS4 build
+ assert b[0] == 'x'
+ assert b[1] == '\x00'
+ assert b[2] == '\x00'
+ assert b[3] == '\x00'
+ assert b[4] == 'y'
+ else:
+ # UCS2 build
+ assert b[0] == 'x'
+ assert b[1] == '\x00'
+ assert b[2] == 'y'
+ a.free()
+
+ def test_truncate(self):
+ import _rawffi, struct
+ a = _rawffi.Array('b')(1)
+ a[0] = -5
+ assert a[0] == -5
+ a[0] = 123L
+ assert a[0] == 123
+ a[0] = 0x97817182ab128111111111111171817d042
+ assert a[0] == 0x42
+ a[0] = 255
+ assert a[0] == -1
+ a[0] = -2
+ assert a[0] == -2
+ a[0] = -255
+ assert a[0] == 1
+ a.free()
+
+ a = _rawffi.Array('B')(1)
+ a[0] = 123L
+ assert a[0] == 123
+ a[0] = 0x18329b1718b97d89b7198db817d042
+ assert a[0] == 0x42
+ a[0] = 255
+ assert a[0] == 255
+ a[0] = -2
+ assert a[0] == 254
+ a[0] = -255
+ assert a[0] == 1
+ a.free()
+
+ a = _rawffi.Array('h')(1)
+ a[0] = 123L
+ assert a[0] == 123
+ a[0] = 0x9112cbc91bd91db19aaaaaaaaaaaaaa8170d42
+ assert a[0] == 0x0d42
+ a[0] = 65535
+ assert a[0] == -1
+ a[0] = -2
+ assert a[0] == -2
+ a[0] = -65535
+ assert a[0] == 1
+ a.free()
+
+ a = _rawffi.Array('H')(1)
+ a[0] = 123L
+ assert a[0] == 123
+ a[0] = 0xeeeeeeeeeeeeeeeeeeeeeeeeeeeee817d042
+ assert a[0] == 0xd042
+ a[0] = -2
+ assert a[0] == 65534
+ a.free()
+
+ maxptr = (256 ** struct.calcsize("P")) - 1
+ a = _rawffi.Array('P')(1)
+ a[0] = 123L
+ assert a[0] == 123
+ a[0] = 0xeeeeeeeeeeeeeeeeeeeeeeeeeeeee817d042
+ assert a[0] == 0xeeeeeeeeeeeeeeeeeeeeeeeeeeeee817d042 & maxptr
+ a[0] = -2
+ assert a[0] == maxptr - 1
+ a.free()
+
+ def test_getaddressindll(self):
+ import _rawffi
+ lib = _rawffi.CDLL(self.lib_name)
+ def getprimitive(typecode, name):
+ addr = lib.getaddressindll(name)
+ return _rawffi.Array(typecode).fromaddress(addr, 1)
+ a = getprimitive("l", "static_int")
+ assert a[0] == 42
+ a[0] = 43
+ assert a[0] == 43
+ a = getprimitive("d", "static_double")
+ assert a[0] == 42.42
+ a[0] = 43.43
+ assert a[0] == 43.43
+ a = getprimitive("g", "static_longdouble")
+ assert a[0] == 42.42
+ a[0] = 43.43
+ assert a[0] == 43.43
+ raises(ValueError, getprimitive, 'z', 'ddddddd')
+ raises(ValueError, getprimitive, 'zzz', 'static_int')
+
+ def test_segfault_exception(self):
+ import _rawffi
+ S = _rawffi.Structure([('x', 'i')])
+ s = S()
+ s.x = 3
+ s.free()
+ raises(_rawffi.SegfaultException, s.__getattr__, 'x')
+ raises(_rawffi.SegfaultException, s.__setattr__, 'x', 3)
+ A = _rawffi.Array('c')
+ a = A(13)
+ a.free()
+ raises(_rawffi.SegfaultException, a.__getitem__, 3)
+ raises(_rawffi.SegfaultException, a.__setitem__, 3, 3)
+
+ def test_stackcheck(self):
+ if self.platform != "msvc":
+ skip("win32 msvc specific")
+
+ # Even if the call corresponds to the specified signature,
+ # the STDCALL calling convention may detect some errors
+ import _rawffi
+ lib = _rawffi.CDLL('kernel32')
+
+ f = lib.ptr('SetLastError', [], 'i')
+ try:
+ f()
+ except ValueError, e:
+ assert "Procedure called with not enough arguments" in e.message
+ else:
+ assert 0, "Did not raise"
+
+ f = lib.ptr('GetLastError', ['i'], None,
+ flags=_rawffi.FUNCFLAG_STDCALL)
+ arg = _rawffi.Array('i')(1)
+ arg[0] = 1
+ try:
+ f(arg)
+ except ValueError, e:
+ assert "Procedure called with too many arguments" in e.message
+ else:
+ assert 0, "Did not raise"
+ arg.free()
+
+ def test_struct_byvalue(self):
+ import _rawffi, sys
+ X_Y = _rawffi.Structure([('x', 'l'), ('y', 'l')])
+ x_y = X_Y()
+ lib = _rawffi.CDLL(self.lib_name)
+ print >> sys.stderr, "getting..."
+ sum_x_y = lib.ptr('sum_x_y', [(X_Y, 1)], 'l')
+ x_y.x = 200
+ x_y.y = 220
+ print >> sys.stderr, "calling..."
+ res = sum_x_y(x_y)
+ print >> sys.stderr, "done"
+ assert res[0] == 420
+ x_y.free()
+
+ def test_callback_struct_byvalue(self):
+ import _rawffi, sys
+ X_Y = _rawffi.Structure([('x', 'l'), ('y', 'l')])
+ lib = _rawffi.CDLL(self.lib_name)
+ op_x_y = lib.ptr('op_x_y', [(X_Y, 1), 'P'], 'l')
+
+ def callback(x_y):
+ return x_y.x + x_y.y
+ cb = _rawffi.CallbackPtr(callback, [(X_Y, 1)], 'l')
+
+ x_y = X_Y()
+ x_y.x = 200
+ x_y.y = 220
+
+ a1 = cb.byptr()
+ res = op_x_y(x_y, a1)
+ a1.free()
+ x_y.free()
+ cb.free()
+
+ assert res[0] == 420
+
+ def test_ret_struct(self):
+ import _rawffi
+ S2H = _rawffi.Structure([('x', 'h'), ('y', 'h')])
+ s2h = S2H()
+ lib = _rawffi.CDLL(self.lib_name)
+ give = lib.ptr('give', ['h', 'h'], (S2H, 1))
+ a1 = _rawffi.Array('h')(1)
+ a2 = _rawffi.Array('h')(1)
+ a1[0] = 13
+ a2[0] = 17
+ res = give(a1, a2)
+ assert isinstance(res, _rawffi.StructureInstanceAutoFree)
+ assert res.shape is S2H
+ assert res.x == 13
+ assert res.y == 17
+ a1.free()
+ a2.free()
+
+ s2h.x = 7
+ s2h.y = 11
+ perturb = lib.ptr('perturb', [(S2H, 1)], (S2H, 1))
+ res = perturb(s2h)
+ assert isinstance(res, _rawffi.StructureInstanceAutoFree)
+ assert res.shape is S2H
+ assert res.x == 14
+ assert res.y == 33
+ assert s2h.x == 7
+ assert s2h.y == 11
+
+ s2h.free()
+
+ def test_ret_struct_containing_array(self):
+ import _rawffi
+ AoI = _rawffi.Array('i')
+ S2A = _rawffi.Structure([('bah', (AoI, 2))])
+ lib = _rawffi.CDLL(self.lib_name)
+ get_s2a = lib.ptr('get_s2a', [], (S2A, 1))
+ check_s2a = lib.ptr('check_s2a', [(S2A, 1)], 'i')
+
+ res = get_s2a()
+ assert isinstance(res, _rawffi.StructureInstanceAutoFree)
+ assert res.shape is S2A
+ ok = check_s2a(res)
+ assert ok[0] == 1
+
+ def test_buffer(self):
+ import _rawffi
+ S = _rawffi.Structure((40, 1))
+ s = S(autofree=True)
+ b = buffer(s)
+ assert len(b) == 40
+ b[4] = 'X'
+ b[:3] = 'ABC'
+ assert b[:6] == 'ABC\x00X\x00'
+
+ A = _rawffi.Array('c')
+ a = A(10, autofree=True)
+ a[3] = 'x'
+ b = buffer(a)
+ assert len(b) == 10
+ assert b[3] == 'x'
+ b[6] = 'y'
+ assert a[6] == 'y'
+ b[3:5] = 'zt'
+ assert a[3] == 'z'
+ assert a[4] == 't'
+
+ def test_union(self):
+ import _rawffi
+ longsize = _rawffi.sizeof('l')
+ S = _rawffi.Structure([('x', 'h'), ('y', 'l')], union=True)
+ s = S(autofree=False)
+ s.x = 12345
+ lib = _rawffi.CDLL(self.lib_name)
+ f = lib.ptr('ret_un_func', [(S, 1)], (S, 1))
+ ret = f(s)
+ assert ret.y == 1234500, "ret.y == %d" % (ret.y,)
+ s.free()
+
+ def test_ffi_type(self):
+ import _rawffi
+ EMPTY = _rawffi.Structure([])
+ S2E = _rawffi.Structure([('bah', (EMPTY, 1))])
+ S2E.get_ffi_type() # does not hang
+
+class TestAutoFree:
+ def setup_class(cls):
+ space = gettestobjspace(usemodules=('_rawffi', 'struct'))
+ cls.space = space
+ cls.w_sizes_and_alignments = space.wrap(dict(
+ [(k, (v.c_size, v.c_alignment)) for k,v in TYPEMAP.iteritems()]))
+ Tracker.DO_TRACING = True
+
+ def test_structure_autofree(self):
+ import gc, _rawffi
+ gc.collect()
+ gc.collect()
+ S = _rawffi.Structure([('x', 'i')])
+ oldnum = _rawffi._num_of_allocated_objects()
+ s = S(autofree=True)
+ s.x = 3
+ s = None
+ gc.collect()
+ assert oldnum == _rawffi._num_of_allocated_objects()
+
+ def test_array_autofree(self):
+ import gc, _rawffi
+ gc.collect()
+ oldnum = _rawffi._num_of_allocated_objects()
+
+ A = _rawffi.Array('c')
+ a = A(6, 'xxyxx\x00', autofree=True)
+ assert _rawffi.charp2string(a.buffer) == 'xxyxx'
+ a = None
+ gc.collect()
+ assert oldnum == _rawffi._num_of_allocated_objects()
+
+ def teardown_class(cls):
+ Tracker.DO_TRACING = False
More information about the pypy-commit
mailing list