[pypy-svn] r71960 - in pypy/trunk/pypy: doc/config doc/discussion lib module/_collections module/_collections/test module/imp module/imp/test module/oracle module/oracle/test translator/benchmark
benjamin at codespeak.net
benjamin at codespeak.net
Tue Mar 9 15:46:16 CET 2010
Author: benjamin
Date: Tue Mar 9 15:46:14 2010
New Revision: 71960
Modified:
pypy/trunk/pypy/doc/config/objspace.usemodules.imp.txt (contents, props changed)
pypy/trunk/pypy/doc/config/translation.jit_profiler.txt (props changed)
pypy/trunk/pypy/doc/discussion/improve-rpython.txt (contents, props changed)
pypy/trunk/pypy/lib/PyQt4.py (props changed)
pypy/trunk/pypy/lib/_rpyc_support.py (props changed)
pypy/trunk/pypy/lib/sip.py (props changed)
pypy/trunk/pypy/module/_collections/__init__.py (contents, props changed)
pypy/trunk/pypy/module/_collections/interp_collection.py (contents, props changed)
pypy/trunk/pypy/module/_collections/test/__init__.py (props changed)
pypy/trunk/pypy/module/_collections/test/test_collection.py (contents, props changed)
pypy/trunk/pypy/module/imp/interp_imp.py (contents, props changed)
pypy/trunk/pypy/module/imp/test/test_app.py (contents, props changed)
pypy/trunk/pypy/module/oracle/interp_lob.py (contents, props changed)
pypy/trunk/pypy/module/oracle/interp_object.py (contents, props changed)
pypy/trunk/pypy/module/oracle/interp_pool.py (contents, props changed)
pypy/trunk/pypy/module/oracle/test/test_cursorvar.py (contents, props changed)
pypy/trunk/pypy/module/oracle/test/test_lobvar.py (contents, props changed)
pypy/trunk/pypy/module/oracle/test/test_objectvar.py (contents, props changed)
pypy/trunk/pypy/translator/benchmark/jitbench.py (props changed)
Log:
set svn:eol-style
Modified: pypy/trunk/pypy/doc/config/objspace.usemodules.imp.txt
==============================================================================
--- pypy/trunk/pypy/doc/config/objspace.usemodules.imp.txt (original)
+++ pypy/trunk/pypy/doc/config/objspace.usemodules.imp.txt Tue Mar 9 15:46:14 2010
@@ -1,2 +1,2 @@
-Use the 'imp' module.
-This module is included by default.
+Use the 'imp' module.
+This module is included by default.
Modified: pypy/trunk/pypy/doc/discussion/improve-rpython.txt
==============================================================================
--- pypy/trunk/pypy/doc/discussion/improve-rpython.txt (original)
+++ pypy/trunk/pypy/doc/discussion/improve-rpython.txt Tue Mar 9 15:46:14 2010
@@ -1,93 +1,93 @@
-Possible improvements of the rpython language
-=============================================
-
-Improve the interpreter API
----------------------------
-
-- Rationalize the modules, and the names, of the differents functions needed to
- implement a pypy module. A typical rpython file is likely to contain many
- `import` statements::
-
- from pypy.interpreter.baseobjspace import Wrappable
- from pypy.interpreter.gateway import ObjSpace, W_Root, NoneNotWrapped
- from pypy.interpreter.argument import Arguments
- from pypy.interpreter.typedef import TypeDef, GetSetProperty
- from pypy.interpreter.typedef import interp_attrproperty, interp_attrproperty_w
- from pypy.interpreter.gateway import interp2app
- from pypy.interpreter.error import OperationError
- from pypy.rpython.lltypesystem import rffi, lltype
-
-- A more direct declarative way to write Typedef::
-
- class W_Socket(Wrappable):
- _typedef_name_ = 'socket'
- _typedef_base_ = W_EventualBaseClass
-
- @interp2app_method("connect", ['self', ObjSpace, W_Root])
- def connect_w(self, space, w_addr):
- ...
-
-- Support for metaclasses written in rpython. For a sample, see the skipped test
- `pypy.objspace.std.test.TestTypeObject.test_metaclass_typedef`
-
-RPython language
-----------------
-
-- Arithmetic with unsigned integer, and between integer of different signedness,
- when this is not ambiguous. At least, comparison and assignement with
- constants should be allowed.
-
-- Allocate variables on the stack, and pass their address ("by reference") to
- llexternal functions. For a typical usage, see
- `pypy.rlib.rsocket.RSocket.getsockopt_int`.
-
-- Support context managers and the `with` statement. This could be a workaround
- before the previous point is available.
-
-Extensible type system for llexternal
--------------------------------------
-
-llexternal allows the description of a C function, and conveys the same
-information about the arguments as a C header. But this is often not enough.
-For example, a parameter of type `int*` is converted to
-`rffi.CArrayPtr(rffi.INT)`, but this information is not enough to use the
-function. The parameter could be an array of int, a reference to a single value,
-for input or output...
-
-A "type system" could hold this additional information, and automatically
-generate some conversion code to ease the usage of the function from
-rpython. For example::
-
- # double frexp(double x, int *exp);
- frexp = llexternal("frexp", [rffi.DOUBLE, OutPtr(rffi.int)], rffi.DOUBLE)
-
-`OutPtr` indicates that the parameter is output-only, which need not to be
-initialized, and which *value* is returned to the caller. In rpython the call
-becomes::
-
- fraction, exponent = frexp(value)
-
-Also, we could imagine that one item in the llexternal argument list corresponds
-to two parameters in C. Here, OutCharBufferN indicates that the caller will pass
-a rpython string; the framework will pass buffer and length to the function::
-
- # ssize_t write(int fd, const void *buf, size_t count);
- write = llexternal("write", [rffi.INT, CharBufferAndSize], rffi.SSIZE_T)
-
-The rpython code that calls this function is very simple::
-
- written = write(fd, data)
-
-compared with the present::
-
- count = len(data)
- buf = rffi.get_nonmovingbuffer(data)
- try:
- written = rffi.cast(lltype.Signed, os_write(
- rffi.cast(rffi.INT, fd),
- buf, rffi.cast(rffi.SIZE_T, count)))
- finally:
- rffi.free_nonmovingbuffer(data, buf)
-
-Typemaps are very useful for large APIs where the same conversions are needed in
-many places. XXX example
+Possible improvements of the rpython language
+=============================================
+
+Improve the interpreter API
+---------------------------
+
+- Rationalize the modules, and the names, of the differents functions needed to
+ implement a pypy module. A typical rpython file is likely to contain many
+ `import` statements::
+
+ from pypy.interpreter.baseobjspace import Wrappable
+ from pypy.interpreter.gateway import ObjSpace, W_Root, NoneNotWrapped
+ from pypy.interpreter.argument import Arguments
+ from pypy.interpreter.typedef import TypeDef, GetSetProperty
+ from pypy.interpreter.typedef import interp_attrproperty, interp_attrproperty_w
+ from pypy.interpreter.gateway import interp2app
+ from pypy.interpreter.error import OperationError
+ from pypy.rpython.lltypesystem import rffi, lltype
+
+- A more direct declarative way to write Typedef::
+
+ class W_Socket(Wrappable):
+ _typedef_name_ = 'socket'
+ _typedef_base_ = W_EventualBaseClass
+
+ @interp2app_method("connect", ['self', ObjSpace, W_Root])
+ def connect_w(self, space, w_addr):
+ ...
+
+- Support for metaclasses written in rpython. For a sample, see the skipped test
+ `pypy.objspace.std.test.TestTypeObject.test_metaclass_typedef`
+
+RPython language
+----------------
+
+- Arithmetic with unsigned integer, and between integer of different signedness,
+ when this is not ambiguous. At least, comparison and assignement with
+ constants should be allowed.
+
+- Allocate variables on the stack, and pass their address ("by reference") to
+ llexternal functions. For a typical usage, see
+ `pypy.rlib.rsocket.RSocket.getsockopt_int`.
+
+- Support context managers and the `with` statement. This could be a workaround
+ before the previous point is available.
+
+Extensible type system for llexternal
+-------------------------------------
+
+llexternal allows the description of a C function, and conveys the same
+information about the arguments as a C header. But this is often not enough.
+For example, a parameter of type `int*` is converted to
+`rffi.CArrayPtr(rffi.INT)`, but this information is not enough to use the
+function. The parameter could be an array of int, a reference to a single value,
+for input or output...
+
+A "type system" could hold this additional information, and automatically
+generate some conversion code to ease the usage of the function from
+rpython. For example::
+
+ # double frexp(double x, int *exp);
+ frexp = llexternal("frexp", [rffi.DOUBLE, OutPtr(rffi.int)], rffi.DOUBLE)
+
+`OutPtr` indicates that the parameter is output-only, which need not to be
+initialized, and which *value* is returned to the caller. In rpython the call
+becomes::
+
+ fraction, exponent = frexp(value)
+
+Also, we could imagine that one item in the llexternal argument list corresponds
+to two parameters in C. Here, OutCharBufferN indicates that the caller will pass
+a rpython string; the framework will pass buffer and length to the function::
+
+ # ssize_t write(int fd, const void *buf, size_t count);
+ write = llexternal("write", [rffi.INT, CharBufferAndSize], rffi.SSIZE_T)
+
+The rpython code that calls this function is very simple::
+
+ written = write(fd, data)
+
+compared with the present::
+
+ count = len(data)
+ buf = rffi.get_nonmovingbuffer(data)
+ try:
+ written = rffi.cast(lltype.Signed, os_write(
+ rffi.cast(rffi.INT, fd),
+ buf, rffi.cast(rffi.SIZE_T, count)))
+ finally:
+ rffi.free_nonmovingbuffer(data, buf)
+
+Typemaps are very useful for large APIs where the same conversions are needed in
+many places. XXX example
Modified: pypy/trunk/pypy/module/_collections/__init__.py
==============================================================================
--- pypy/trunk/pypy/module/_collections/__init__.py (original)
+++ pypy/trunk/pypy/module/_collections/__init__.py Tue Mar 9 15:46:14 2010
@@ -1,11 +1,11 @@
-import py
-
-from pypy.interpreter.mixedmodule import MixedModule
-
-class Module(MixedModule):
- appleveldefs = {}
-
- interpleveldefs = {
- 'identity_dict' : 'interp_collection.W_IdentityDict',
- }
-
+import py
+
+from pypy.interpreter.mixedmodule import MixedModule
+
+class Module(MixedModule):
+ appleveldefs = {}
+
+ interpleveldefs = {
+ 'identity_dict' : 'interp_collection.W_IdentityDict',
+ }
+
Modified: pypy/trunk/pypy/module/_collections/interp_collection.py
==============================================================================
--- pypy/trunk/pypy/module/_collections/interp_collection.py (original)
+++ pypy/trunk/pypy/module/_collections/interp_collection.py Tue Mar 9 15:46:14 2010
@@ -1,68 +1,68 @@
-from pypy.interpreter.error import OperationError
-from pypy.interpreter.typedef import TypeDef
-from pypy.interpreter.gateway import ObjSpace, W_Root, NoneNotWrapped, interp2app
-from pypy.interpreter.gateway import Arguments, unwrap_spec
-from pypy.interpreter.baseobjspace import Wrappable
-
-class W_IdentityDict(Wrappable):
- def __init__(self, space):
- self.dict = {}
- __init__.unwrap_spec = ['self', ObjSpace]
-
- @unwrap_spec(ObjSpace, W_Root)
- def descr_new(space, w_subtype):
- self = space.allocate_instance(W_IdentityDict, w_subtype)
- W_IdentityDict.__init__(self, space)
- return space.wrap(self)
-
- @unwrap_spec('self', ObjSpace)
- def descr_len(self, space):
- return space.wrap(len(self.dict))
-
- @unwrap_spec('self', ObjSpace, W_Root)
- def descr_contains(self, space, w_key):
- return space.wrap(w_key in self.dict)
-
- @unwrap_spec('self', ObjSpace, W_Root, W_Root)
- def descr_setitem(self, space, w_key, w_value):
- self.dict[w_key] = w_value
-
- @unwrap_spec('self', ObjSpace, W_Root)
- def descr_getitem(self, space, w_key):
- try:
- return self.dict[w_key]
- except KeyError:
- raise OperationError(space.w_KeyError, w_key)
-
- @unwrap_spec('self', ObjSpace, W_Root, W_Root)
- def get(self, space, w_key, w_default=None):
- return self.dict.get(w_key, w_default)
-
- @unwrap_spec('self', ObjSpace)
- def keys(self, space):
- return space.newlist(self.dict.keys())
-
- @unwrap_spec('self', ObjSpace)
- def values(self, space):
- return space.newlist(self.dict.values())
-
- @unwrap_spec('self', ObjSpace)
- def clear(self, space):
- self.dict.clear()
-
-W_IdentityDict.typedef = TypeDef("identity_dict",
- __doc__="""\
-A dictionary that considers keys by object identity.
-Distinct objects that compare equal will have separate entries.
-All objects can be used as keys, even non-hashable ones.
-""",
- __new__ = interp2app(W_IdentityDict.descr_new.im_func),
- __len__ = interp2app(W_IdentityDict.descr_len),
- __contains__ = interp2app(W_IdentityDict.descr_contains),
- __setitem__ = interp2app(W_IdentityDict.descr_setitem),
- __getitem__ = interp2app(W_IdentityDict.descr_getitem),
- get = interp2app(W_IdentityDict.get),
- keys = interp2app(W_IdentityDict.keys),
- values = interp2app(W_IdentityDict.values),
- clear = interp2app(W_IdentityDict.clear),
-)
+from pypy.interpreter.error import OperationError
+from pypy.interpreter.typedef import TypeDef
+from pypy.interpreter.gateway import ObjSpace, W_Root, NoneNotWrapped, interp2app
+from pypy.interpreter.gateway import Arguments, unwrap_spec
+from pypy.interpreter.baseobjspace import Wrappable
+
+class W_IdentityDict(Wrappable):
+ def __init__(self, space):
+ self.dict = {}
+ __init__.unwrap_spec = ['self', ObjSpace]
+
+ @unwrap_spec(ObjSpace, W_Root)
+ def descr_new(space, w_subtype):
+ self = space.allocate_instance(W_IdentityDict, w_subtype)
+ W_IdentityDict.__init__(self, space)
+ return space.wrap(self)
+
+ @unwrap_spec('self', ObjSpace)
+ def descr_len(self, space):
+ return space.wrap(len(self.dict))
+
+ @unwrap_spec('self', ObjSpace, W_Root)
+ def descr_contains(self, space, w_key):
+ return space.wrap(w_key in self.dict)
+
+ @unwrap_spec('self', ObjSpace, W_Root, W_Root)
+ def descr_setitem(self, space, w_key, w_value):
+ self.dict[w_key] = w_value
+
+ @unwrap_spec('self', ObjSpace, W_Root)
+ def descr_getitem(self, space, w_key):
+ try:
+ return self.dict[w_key]
+ except KeyError:
+ raise OperationError(space.w_KeyError, w_key)
+
+ @unwrap_spec('self', ObjSpace, W_Root, W_Root)
+ def get(self, space, w_key, w_default=None):
+ return self.dict.get(w_key, w_default)
+
+ @unwrap_spec('self', ObjSpace)
+ def keys(self, space):
+ return space.newlist(self.dict.keys())
+
+ @unwrap_spec('self', ObjSpace)
+ def values(self, space):
+ return space.newlist(self.dict.values())
+
+ @unwrap_spec('self', ObjSpace)
+ def clear(self, space):
+ self.dict.clear()
+
+W_IdentityDict.typedef = TypeDef("identity_dict",
+ __doc__="""\
+A dictionary that considers keys by object identity.
+Distinct objects that compare equal will have separate entries.
+All objects can be used as keys, even non-hashable ones.
+""",
+ __new__ = interp2app(W_IdentityDict.descr_new.im_func),
+ __len__ = interp2app(W_IdentityDict.descr_len),
+ __contains__ = interp2app(W_IdentityDict.descr_contains),
+ __setitem__ = interp2app(W_IdentityDict.descr_setitem),
+ __getitem__ = interp2app(W_IdentityDict.descr_getitem),
+ get = interp2app(W_IdentityDict.get),
+ keys = interp2app(W_IdentityDict.keys),
+ values = interp2app(W_IdentityDict.values),
+ clear = interp2app(W_IdentityDict.clear),
+)
Modified: pypy/trunk/pypy/module/_collections/test/test_collection.py
==============================================================================
--- pypy/trunk/pypy/module/_collections/test/test_collection.py (original)
+++ pypy/trunk/pypy/module/_collections/test/test_collection.py Tue Mar 9 15:46:14 2010
@@ -1,61 +1,61 @@
-import py
-from pypy.conftest import gettestobjspace
-
-class AppTestIdentityDict:
- def setup_class(cls):
- cls.space = gettestobjspace(usemodules=['_collections'])
-
- def test_numbers(self):
- from _collections import identity_dict
- d = identity_dict()
- d[0] = 1
- d[0.0] = 2
- d[long(0)] = 3
-
- assert d
- assert len(d) == 3
- d.clear()
- assert not d
-
- def test_get(self):
- from _collections import identity_dict
- d = identity_dict()
- d[None] = 1
-
- assert d.get(None, 42) == 1
- assert d.get(None) == 1
- assert d.get(1) is None
- assert d.get(1, 42) == 42
-
- def test_unhashable(self):
- from _collections import identity_dict
-
- d = identity_dict()
- d[[]] = 1
- d[[]] = 2
- a = []
- d[a] = 3
- assert len(d) == 3
- d[a] = 4
- assert len(d) == 3
- assert d[a] == 4
-
- raises(KeyError, d.__getitem__, [])
-
- def test_keys(self):
- from _collections import identity_dict
- d = identity_dict()
- d[[]] = 1
- d[[]] = 2
- d[[]] = 3
-
- assert d.keys() == [[], [], []]
- assert sorted(d.values()) == [1, 2, 3]
-
- def test_in(self):
- from _collections import identity_dict
- d = identity_dict()
- d[None] = 1
-
- assert None in d
- assert [] not in d
+import py
+from pypy.conftest import gettestobjspace
+
+class AppTestIdentityDict:
+ def setup_class(cls):
+ cls.space = gettestobjspace(usemodules=['_collections'])
+
+ def test_numbers(self):
+ from _collections import identity_dict
+ d = identity_dict()
+ d[0] = 1
+ d[0.0] = 2
+ d[long(0)] = 3
+
+ assert d
+ assert len(d) == 3
+ d.clear()
+ assert not d
+
+ def test_get(self):
+ from _collections import identity_dict
+ d = identity_dict()
+ d[None] = 1
+
+ assert d.get(None, 42) == 1
+ assert d.get(None) == 1
+ assert d.get(1) is None
+ assert d.get(1, 42) == 42
+
+ def test_unhashable(self):
+ from _collections import identity_dict
+
+ d = identity_dict()
+ d[[]] = 1
+ d[[]] = 2
+ a = []
+ d[a] = 3
+ assert len(d) == 3
+ d[a] = 4
+ assert len(d) == 3
+ assert d[a] == 4
+
+ raises(KeyError, d.__getitem__, [])
+
+ def test_keys(self):
+ from _collections import identity_dict
+ d = identity_dict()
+ d[[]] = 1
+ d[[]] = 2
+ d[[]] = 3
+
+ assert d.keys() == [[], [], []]
+ assert sorted(d.values()) == [1, 2, 3]
+
+ def test_in(self):
+ from _collections import identity_dict
+ d = identity_dict()
+ d[None] = 1
+
+ assert None in d
+ assert [] not in d
Modified: pypy/trunk/pypy/module/imp/interp_imp.py
==============================================================================
--- pypy/trunk/pypy/module/imp/interp_imp.py (original)
+++ pypy/trunk/pypy/module/imp/interp_imp.py Tue Mar 9 15:46:14 2010
@@ -1,154 +1,154 @@
-from pypy.module.imp import importing
-from pypy.module._file.interp_file import W_File
-from pypy.rlib import streamio
-from pypy.interpreter.error import OperationError, operationerrfmt
-from pypy.interpreter.module import Module
-from pypy.interpreter.gateway import NoneNotWrapped
-import struct
-
-def get_suffixes(space):
- w = space.wrap
- return space.newlist([
- space.newtuple([w('.py'), w('U'), w(importing.PY_SOURCE)]),
- space.newtuple([w('.pyc'), w('rb'), w(importing.PY_COMPILED)]),
- ])
-
-def get_magic(space):
- x = importing.get_pyc_magic(space)
- a = x & 0xff
- x >>= 8
- b = x & 0xff
- x >>= 8
- c = x & 0xff
- x >>= 8
- d = x & 0xff
- return space.wrap(chr(a) + chr(b) + chr(c) + chr(d))
-
-def get_file(space, w_file, filename, filemode):
- if w_file is None or space.is_w(w_file, space.w_None):
- return streamio.open_file_as_stream(filename, filemode)
- else:
- return space.interp_w(W_File, w_file).stream
-
-def find_module(space, w_name, w_path=None):
- name = space.str_w(w_name)
- if space.is_w(w_path, space.w_None):
- w_path = None
-
- find_info = importing.find_module(
- space, name, w_name, name, w_path, use_loader=False)
- if not find_info:
- raise operationerrfmt(
- space.w_ImportError,
- "No module named %s", name)
-
- w_filename = space.wrap(find_info.filename)
- stream = find_info.stream
-
- if stream is not None:
- fileobj = W_File(space)
- fileobj.fdopenstream(
- stream, stream.try_to_find_file_descriptor(),
- find_info.filemode, w_filename)
- w_fileobj = space.wrap(fileobj)
- else:
- w_fileobj = space.w_None
- w_import_info = space.newtuple(
- [space.wrap(find_info.suffix),
- space.wrap(find_info.filemode),
- space.wrap(find_info.modtype)])
- return space.newtuple([w_fileobj, w_filename, w_import_info])
-
-def load_module(space, w_name, w_file, w_filename, w_info):
- w_suffix, w_filemode, w_modtype = space.unpackiterable(w_info)
-
- filename = space.str_w(w_filename)
- filemode = space.str_w(w_filemode)
- if space.is_w(w_file, space.w_None):
- stream = None
- else:
- stream = get_file(space, w_file, filename, filemode)
-
- find_info = importing.FindInfo(
- space.int_w(w_modtype),
- filename,
- stream,
- space.str_w(w_suffix),
- filemode)
- return importing.load_module(
- space, w_name, find_info)
-
-def load_source(space, w_modulename, w_filename, w_file=None):
- filename = space.str_w(w_filename)
-
- stream = get_file(space, w_file, filename, 'U')
-
- w_mod = space.wrap(Module(space, w_modulename))
- importing._prepare_module(space, w_mod, filename, None)
-
- importing.load_source_module(
- space, w_modulename, w_mod, filename, stream.readall())
- if space.is_w(w_file, space.w_None):
- stream.close()
- return w_mod
-
-def load_compiled(space, w_modulename, w_filename, w_file=None):
- filename = space.str_w(w_filename)
-
- stream = get_file(space, w_file, filename, 'rb')
-
- w_mod = space.wrap(Module(space, w_modulename))
- importing._prepare_module(space, w_mod, filename, None)
-
- magic = importing._r_long(stream)
- timestamp = importing._r_long(stream)
-
- importing.load_compiled_module(
- space, w_modulename, w_mod, filename, magic, timestamp,
- stream.readall())
- if space.is_w(w_file, space.w_None):
- stream.close()
- return w_mod
-
-def new_module(space, w_name):
- return space.wrap(Module(space, w_name))
-
-def init_builtin(space, w_name):
- name = space.str_w(w_name)
- if name not in space.builtin_modules:
- return
- if space.finditem(space.sys.get('modules'), w_name) is not None:
- raise OperationError(
- space.w_ImportError,
- space.wrap("cannot initialize a built-in module twice in PyPy"))
- return space.getbuiltinmodule(name)
-
-def init_frozen(space, w_name):
- return None
-
-def is_builtin(space, w_name):
- name = space.str_w(w_name)
- if name not in space.builtin_modules:
- return space.wrap(0)
- if space.finditem(space.sys.get('modules'), w_name) is not None:
- return space.wrap(-1) # cannot be initialized again
- return space.wrap(1)
-
-def is_frozen(space, w_name):
- return space.w_False
-
-#__________________________________________________________________
-
-def lock_held(space):
- if space.config.objspace.usemodules.thread:
- return space.wrap(importing.getimportlock(space).lock_held())
- else:
- return space.w_False
-
-def acquire_lock(space):
- if space.config.objspace.usemodules.thread:
- importing.getimportlock(space).acquire_lock()
-
-def release_lock(space):
- if space.config.objspace.usemodules.thread:
- importing.getimportlock(space).release_lock()
+from pypy.module.imp import importing
+from pypy.module._file.interp_file import W_File
+from pypy.rlib import streamio
+from pypy.interpreter.error import OperationError, operationerrfmt
+from pypy.interpreter.module import Module
+from pypy.interpreter.gateway import NoneNotWrapped
+import struct
+
+def get_suffixes(space):
+ w = space.wrap
+ return space.newlist([
+ space.newtuple([w('.py'), w('U'), w(importing.PY_SOURCE)]),
+ space.newtuple([w('.pyc'), w('rb'), w(importing.PY_COMPILED)]),
+ ])
+
+def get_magic(space):
+ x = importing.get_pyc_magic(space)
+ a = x & 0xff
+ x >>= 8
+ b = x & 0xff
+ x >>= 8
+ c = x & 0xff
+ x >>= 8
+ d = x & 0xff
+ return space.wrap(chr(a) + chr(b) + chr(c) + chr(d))
+
+def get_file(space, w_file, filename, filemode):
+ if w_file is None or space.is_w(w_file, space.w_None):
+ return streamio.open_file_as_stream(filename, filemode)
+ else:
+ return space.interp_w(W_File, w_file).stream
+
+def find_module(space, w_name, w_path=None):
+ name = space.str_w(w_name)
+ if space.is_w(w_path, space.w_None):
+ w_path = None
+
+ find_info = importing.find_module(
+ space, name, w_name, name, w_path, use_loader=False)
+ if not find_info:
+ raise operationerrfmt(
+ space.w_ImportError,
+ "No module named %s", name)
+
+ w_filename = space.wrap(find_info.filename)
+ stream = find_info.stream
+
+ if stream is not None:
+ fileobj = W_File(space)
+ fileobj.fdopenstream(
+ stream, stream.try_to_find_file_descriptor(),
+ find_info.filemode, w_filename)
+ w_fileobj = space.wrap(fileobj)
+ else:
+ w_fileobj = space.w_None
+ w_import_info = space.newtuple(
+ [space.wrap(find_info.suffix),
+ space.wrap(find_info.filemode),
+ space.wrap(find_info.modtype)])
+ return space.newtuple([w_fileobj, w_filename, w_import_info])
+
+def load_module(space, w_name, w_file, w_filename, w_info):
+ w_suffix, w_filemode, w_modtype = space.unpackiterable(w_info)
+
+ filename = space.str_w(w_filename)
+ filemode = space.str_w(w_filemode)
+ if space.is_w(w_file, space.w_None):
+ stream = None
+ else:
+ stream = get_file(space, w_file, filename, filemode)
+
+ find_info = importing.FindInfo(
+ space.int_w(w_modtype),
+ filename,
+ stream,
+ space.str_w(w_suffix),
+ filemode)
+ return importing.load_module(
+ space, w_name, find_info)
+
+def load_source(space, w_modulename, w_filename, w_file=None):
+ filename = space.str_w(w_filename)
+
+ stream = get_file(space, w_file, filename, 'U')
+
+ w_mod = space.wrap(Module(space, w_modulename))
+ importing._prepare_module(space, w_mod, filename, None)
+
+ importing.load_source_module(
+ space, w_modulename, w_mod, filename, stream.readall())
+ if space.is_w(w_file, space.w_None):
+ stream.close()
+ return w_mod
+
+def load_compiled(space, w_modulename, w_filename, w_file=None):
+ filename = space.str_w(w_filename)
+
+ stream = get_file(space, w_file, filename, 'rb')
+
+ w_mod = space.wrap(Module(space, w_modulename))
+ importing._prepare_module(space, w_mod, filename, None)
+
+ magic = importing._r_long(stream)
+ timestamp = importing._r_long(stream)
+
+ importing.load_compiled_module(
+ space, w_modulename, w_mod, filename, magic, timestamp,
+ stream.readall())
+ if space.is_w(w_file, space.w_None):
+ stream.close()
+ return w_mod
+
+def new_module(space, w_name):
+ return space.wrap(Module(space, w_name))
+
+def init_builtin(space, w_name):
+ name = space.str_w(w_name)
+ if name not in space.builtin_modules:
+ return
+ if space.finditem(space.sys.get('modules'), w_name) is not None:
+ raise OperationError(
+ space.w_ImportError,
+ space.wrap("cannot initialize a built-in module twice in PyPy"))
+ return space.getbuiltinmodule(name)
+
+def init_frozen(space, w_name):
+ return None
+
+def is_builtin(space, w_name):
+ name = space.str_w(w_name)
+ if name not in space.builtin_modules:
+ return space.wrap(0)
+ if space.finditem(space.sys.get('modules'), w_name) is not None:
+ return space.wrap(-1) # cannot be initialized again
+ return space.wrap(1)
+
+def is_frozen(space, w_name):
+ return space.w_False
+
+#__________________________________________________________________
+
+def lock_held(space):
+ if space.config.objspace.usemodules.thread:
+ return space.wrap(importing.getimportlock(space).lock_held())
+ else:
+ return space.w_False
+
+def acquire_lock(space):
+ if space.config.objspace.usemodules.thread:
+ importing.getimportlock(space).acquire_lock()
+
+def release_lock(space):
+ if space.config.objspace.usemodules.thread:
+ importing.getimportlock(space).release_lock()
Modified: pypy/trunk/pypy/module/imp/test/test_app.py
==============================================================================
--- pypy/trunk/pypy/module/imp/test/test_app.py (original)
+++ pypy/trunk/pypy/module/imp/test/test_app.py Tue Mar 9 15:46:14 2010
@@ -1,112 +1,112 @@
-MARKER = 42
-
-class AppTestImpModule:
- def setup_class(cls):
- cls.w_imp = cls.space.getbuiltinmodule('imp')
-
- cls.w__py_file = cls.space.appexec(
- [cls.space.wrap(__file__)], r"""(__file__):
- def _py_file():
- fn = __file__
- if fn.lower().endswith('c') or fn.lower().endswith('o'):
- fn = fn[:-1]
- assert fn.lower().endswith('.py')
- return fn
- return _py_file""")
-
- cls.w__pyc_file = cls.space.appexec([], r"""():
- def _pyc_file():
- import marshal, imp
- co = compile("marker=42", "x.py", "exec")
- f = open('@TEST.pyc', 'wb')
- f.write(imp.get_magic())
- f.write('\x00\x00\x00\x00')
- marshal.dump(co, f)
- f.close()
- return '@TEST.pyc'
- return _pyc_file""")
-
-
-
- def test_find_module(self):
- import os
- file, pathname, description = self.imp.find_module('StringIO')
- assert file is not None
- file.close()
- assert os.path.exists(pathname)
- pathname = pathname.lower()
- assert pathname.endswith('.py') # even if .pyc is up-to-date
- assert description in self.imp.get_suffixes()
-
-
- def test_suffixes(self):
- for suffix, mode, type in self.imp.get_suffixes():
- if mode == self.imp.PY_SOURCE:
- assert suffix == '.py'
- assert type == 'r'
- elif mode == self.imp.PY_COMPILED:
- assert suffix in ('.pyc', '.pyo')
- assert type == 'rb'
-
-
- def test_obscure_functions(self):
- mod = self.imp.new_module('hi')
- assert mod.__name__ == 'hi'
- mod = self.imp.init_builtin('hello.world.this.is.never.a.builtin.module.name')
- assert mod is None
- mod = self.imp.init_frozen('hello.world.this.is.never.a.frozen.module.name')
- assert mod is None
- assert self.imp.is_builtin('sys')
- assert not self.imp.is_builtin('hello.world.this.is.never.a.builtin.module.name')
- assert not self.imp.is_frozen('hello.world.this.is.never.a.frozen.module.name')
-
-
- def test_load_module_py(self):
- fn = self._py_file()
- descr = ('.py', 'U', self.imp.PY_SOURCE)
- f = open(fn, 'U')
- mod = self.imp.load_module('test_imp_extra_AUTO1', f, fn, descr)
- f.close()
- assert mod.MARKER == 42
- import test_imp_extra_AUTO1
- assert mod is test_imp_extra_AUTO1
-
- def test_load_module_pyc(self):
- fn = self._pyc_file()
- try:
- descr = ('.pyc', 'rb', self.imp.PY_COMPILED)
- f = open(fn, 'rb')
- mod = self.imp.load_module('test_imp_extra_AUTO2', f, fn, descr)
- f.close()
- assert mod.marker == 42
- import test_imp_extra_AUTO2
- assert mod is test_imp_extra_AUTO2
- finally:
- os.unlink(fn)
-
- def test_load_source(self):
- fn = self._py_file()
- mod = self.imp.load_source('test_imp_extra_AUTO3', fn)
- assert mod.MARKER == 42
- import test_imp_extra_AUTO3
- assert mod is test_imp_extra_AUTO3
-
- def test_load_module_pyc(self):
- import os
- fn = self._pyc_file()
- try:
- mod = self.imp.load_compiled('test_imp_extra_AUTO4', fn)
- assert mod.marker == 42
- import test_imp_extra_AUTO4
- assert mod is test_imp_extra_AUTO4
- finally:
- os.unlink(fn)
-
- def test_load_broken_pyc(self):
- fn = self._py_file()
- try:
- self.imp.load_compiled('test_imp_extra_AUTO5', fn)
- except ImportError:
- pass
- else:
- raise Exception("expected an ImportError")
+MARKER = 42
+
+class AppTestImpModule:
+ def setup_class(cls):
+ cls.w_imp = cls.space.getbuiltinmodule('imp')
+
+ cls.w__py_file = cls.space.appexec(
+ [cls.space.wrap(__file__)], r"""(__file__):
+ def _py_file():
+ fn = __file__
+ if fn.lower().endswith('c') or fn.lower().endswith('o'):
+ fn = fn[:-1]
+ assert fn.lower().endswith('.py')
+ return fn
+ return _py_file""")
+
+ cls.w__pyc_file = cls.space.appexec([], r"""():
+ def _pyc_file():
+ import marshal, imp
+ co = compile("marker=42", "x.py", "exec")
+ f = open('@TEST.pyc', 'wb')
+ f.write(imp.get_magic())
+ f.write('\x00\x00\x00\x00')
+ marshal.dump(co, f)
+ f.close()
+ return '@TEST.pyc'
+ return _pyc_file""")
+
+
+
+ def test_find_module(self):
+ import os
+ file, pathname, description = self.imp.find_module('StringIO')
+ assert file is not None
+ file.close()
+ assert os.path.exists(pathname)
+ pathname = pathname.lower()
+ assert pathname.endswith('.py') # even if .pyc is up-to-date
+ assert description in self.imp.get_suffixes()
+
+
+ def test_suffixes(self):
+ for suffix, mode, type in self.imp.get_suffixes():
+ if mode == self.imp.PY_SOURCE:
+ assert suffix == '.py'
+ assert type == 'r'
+ elif mode == self.imp.PY_COMPILED:
+ assert suffix in ('.pyc', '.pyo')
+ assert type == 'rb'
+
+
+ def test_obscure_functions(self):
+ mod = self.imp.new_module('hi')
+ assert mod.__name__ == 'hi'
+ mod = self.imp.init_builtin('hello.world.this.is.never.a.builtin.module.name')
+ assert mod is None
+ mod = self.imp.init_frozen('hello.world.this.is.never.a.frozen.module.name')
+ assert mod is None
+ assert self.imp.is_builtin('sys')
+ assert not self.imp.is_builtin('hello.world.this.is.never.a.builtin.module.name')
+ assert not self.imp.is_frozen('hello.world.this.is.never.a.frozen.module.name')
+
+
+ def test_load_module_py(self):
+ fn = self._py_file()
+ descr = ('.py', 'U', self.imp.PY_SOURCE)
+ f = open(fn, 'U')
+ mod = self.imp.load_module('test_imp_extra_AUTO1', f, fn, descr)
+ f.close()
+ assert mod.MARKER == 42
+ import test_imp_extra_AUTO1
+ assert mod is test_imp_extra_AUTO1
+
+ def test_load_module_pyc(self):
+ fn = self._pyc_file()
+ try:
+ descr = ('.pyc', 'rb', self.imp.PY_COMPILED)
+ f = open(fn, 'rb')
+ mod = self.imp.load_module('test_imp_extra_AUTO2', f, fn, descr)
+ f.close()
+ assert mod.marker == 42
+ import test_imp_extra_AUTO2
+ assert mod is test_imp_extra_AUTO2
+ finally:
+ os.unlink(fn)
+
+ def test_load_source(self):
+ fn = self._py_file()
+ mod = self.imp.load_source('test_imp_extra_AUTO3', fn)
+ assert mod.MARKER == 42
+ import test_imp_extra_AUTO3
+ assert mod is test_imp_extra_AUTO3
+
+ def test_load_module_pyc(self):
+ import os
+ fn = self._pyc_file()
+ try:
+ mod = self.imp.load_compiled('test_imp_extra_AUTO4', fn)
+ assert mod.marker == 42
+ import test_imp_extra_AUTO4
+ assert mod is test_imp_extra_AUTO4
+ finally:
+ os.unlink(fn)
+
+ def test_load_broken_pyc(self):
+ fn = self._py_file()
+ try:
+ self.imp.load_compiled('test_imp_extra_AUTO5', fn)
+ except ImportError:
+ pass
+ else:
+ raise Exception("expected an ImportError")
Modified: pypy/trunk/pypy/module/oracle/interp_lob.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/interp_lob.py (original)
+++ pypy/trunk/pypy/module/oracle/interp_lob.py Tue Mar 9 15:46:14 2010
@@ -1,47 +1,47 @@
-from pypy.interpreter.baseobjspace import Wrappable
-from pypy.interpreter.typedef import TypeDef
-from pypy.interpreter.gateway import ObjSpace
-from pypy.interpreter.gateway import interp2app
-from pypy.interpreter.error import OperationError
-
-from pypy.module.oracle.interp_error import get
-
-class W_ExternalLob(Wrappable):
- def __init__(self, var, pos):
- self.lobVar = var
- self.pos = pos
- self.internalFetchNum = var.internalFetchNum
-
- def _verify(self, space):
- if self.internalFetchNum != self.lobVar.internalFetchNum:
- raise OperationError(
- get(space).w_ProgrammingError,
- space.wrap(
- "LOB variable no longer valid after subsequent fetch"))
-
- def read(self, space, offset=-1, amount=-1):
- self._verify(space)
- return self.lobVar.read(space, self.pos, offset, amount)
- read.unwrap_spec = ['self', ObjSpace, int, int]
-
- def size(self, space):
- self._verify(space)
- return space.wrap(self.lobVar.getLength(space, self.pos))
- size.unwrap_spec = ['self', ObjSpace]
-
- def trim(self, space, newSize=0):
- self._verify(space)
- self.lobVar.trim(space, self.pos, newSize)
- trim.unwrap_spec = ['self', ObjSpace, int]
-
- def desc_str(self, space):
- return self.read(space, offset=1, amount=-1)
- desc_str.unwrap_spec = ['self', ObjSpace]
-
-W_ExternalLob.typedef = TypeDef(
- 'ExternalLob',
- read = interp2app(W_ExternalLob.read),
- size = interp2app(W_ExternalLob.size),
- trim = interp2app(W_ExternalLob.trim),
- __str__ = interp2app(W_ExternalLob.desc_str),
- )
+from pypy.interpreter.baseobjspace import Wrappable
+from pypy.interpreter.typedef import TypeDef
+from pypy.interpreter.gateway import ObjSpace
+from pypy.interpreter.gateway import interp2app
+from pypy.interpreter.error import OperationError
+
+from pypy.module.oracle.interp_error import get
+
+class W_ExternalLob(Wrappable):
+ def __init__(self, var, pos):
+ self.lobVar = var
+ self.pos = pos
+ self.internalFetchNum = var.internalFetchNum
+
+ def _verify(self, space):
+ if self.internalFetchNum != self.lobVar.internalFetchNum:
+ raise OperationError(
+ get(space).w_ProgrammingError,
+ space.wrap(
+ "LOB variable no longer valid after subsequent fetch"))
+
+ def read(self, space, offset=-1, amount=-1):
+ self._verify(space)
+ return self.lobVar.read(space, self.pos, offset, amount)
+ read.unwrap_spec = ['self', ObjSpace, int, int]
+
+ def size(self, space):
+ self._verify(space)
+ return space.wrap(self.lobVar.getLength(space, self.pos))
+ size.unwrap_spec = ['self', ObjSpace]
+
+ def trim(self, space, newSize=0):
+ self._verify(space)
+ self.lobVar.trim(space, self.pos, newSize)
+ trim.unwrap_spec = ['self', ObjSpace, int]
+
+ def desc_str(self, space):
+ return self.read(space, offset=1, amount=-1)
+ desc_str.unwrap_spec = ['self', ObjSpace]
+
+W_ExternalLob.typedef = TypeDef(
+ 'ExternalLob',
+ read = interp2app(W_ExternalLob.read),
+ size = interp2app(W_ExternalLob.size),
+ trim = interp2app(W_ExternalLob.trim),
+ __str__ = interp2app(W_ExternalLob.desc_str),
+ )
Modified: pypy/trunk/pypy/module/oracle/interp_object.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/interp_object.py (original)
+++ pypy/trunk/pypy/module/oracle/interp_object.py Tue Mar 9 15:46:14 2010
@@ -1,516 +1,516 @@
-from pypy.interpreter.baseobjspace import Wrappable
-from pypy.interpreter.typedef import TypeDef, GetSetProperty
-from pypy.interpreter.typedef import interp_attrproperty
-from pypy.interpreter.gateway import ObjSpace
-from pypy.interpreter.gateway import interp2app
-from pypy.interpreter.error import OperationError
-from pypy.rpython.lltypesystem import rffi, lltype
-
-from pypy.module.oracle import roci, config, transform
-from pypy.module.oracle.interp_error import get
-
-class W_ObjectType(Wrappable):
- def __init__(self, connection, param):
- self.tdo = lltype.nullptr(roci.dvoidp.TO)
- self.environment = connection.environment
- self.isCollection = False
- self.initialize(connection, param)
-
- def __del__(self):
- if self.tdo:
- roci.OCIObjectUnpin(
- self.environment.handle,
- self.environment.errorHandle,
- self.tdo)
-
- def initialize(self, connection, param):
- nameptr = lltype.malloc(rffi.CArrayPtr(roci.oratext).TO, 1,
- flavor='raw')
- lenptr = lltype.malloc(rffi.CArrayPtr(roci.ub4).TO, 1,
- flavor='raw')
- try:
- # determine the schema of the type
- status = roci.OCIAttrGet(
- param, roci.OCI_HTYPE_DESCRIBE,
- rffi.cast(roci.dvoidp, nameptr),
- lenptr,
- roci.OCI_ATTR_SCHEMA_NAME,
- self.environment.errorHandle)
- self.environment.checkForError(
- status,
- "ObjectType_Initialize(): get schema name")
- self.schema = rffi.charpsize2str(nameptr[0], lenptr[0])
-
- # determine the name of the type
- status = roci.OCIAttrGet(
- param, roci.OCI_HTYPE_DESCRIBE,
- rffi.cast(roci.dvoidp, nameptr),
- lenptr,
- roci.OCI_ATTR_TYPE_NAME,
- self.environment.errorHandle)
- self.environment.checkForError(
- status,
- "ObjectType_Initialize(): get schema name")
- self.name = rffi.charpsize2str(nameptr[0], lenptr[0])
- finally:
- lltype.free(nameptr, flavor='raw')
- lltype.free(lenptr, flavor='raw')
-
- # retrieve TDO (type descriptor object) of the parameter
- tdorefptr = lltype.malloc(rffi.CArrayPtr(roci.OCIRef).TO, 1,
- flavor='raw')
- try:
- status = roci.OCIAttrGet(
- param, roci.OCI_HTYPE_DESCRIBE,
- rffi.cast(roci.dvoidp, tdorefptr),
- lltype.nullptr(roci.Ptr(roci.ub4).TO),
- roci.OCI_ATTR_REF_TDO,
- self.environment.errorHandle)
- self.environment.checkForError(
- status,
- "ObjectType_Initialize(): get TDO reference")
- tdoref = tdorefptr[0]
- finally:
- lltype.free(tdorefptr, flavor='raw')
-
- tdoptr = lltype.malloc(rffi.CArrayPtr(roci.dvoidp).TO, 1,
- flavor='raw')
- try:
- status = roci.OCIObjectPin(
- self.environment.handle,
- self.environment.errorHandle,
- tdoref,
- None, roci.OCI_PIN_ANY,
- roci.OCI_DURATION_SESSION, roci.OCI_LOCK_NONE,
- tdoptr)
- self.environment.checkForError(
- status,
- "ObjectType_Initialize(): pin TDO reference")
- self.tdo = tdoptr[0]
- finally:
- lltype.free(tdoptr, flavor='raw')
-
- # acquire a describe handle
- handleptr = lltype.malloc(rffi.CArrayPtr(roci.OCIDescribe).TO,
- 1, flavor='raw')
- try:
- status = roci.OCIHandleAlloc(
- self.environment.handle,
- handleptr, roci.OCI_HTYPE_DESCRIBE, 0,
- lltype.nullptr(rffi.CArray(roci.dvoidp)))
- self.environment.checkForError(
- status, "ObjectType_Initialize(): allocate describe handle")
- describeHandle = handleptr[0]
- finally:
- lltype.free(handleptr, flavor='raw')
-
- # describe the type
- try:
- self.describe(connection, describeHandle)
- finally:
- roci.OCIHandleFree(describeHandle, roci.OCI_HTYPE_DESCRIBE)
-
- def describe(self, connection, describeHandle):
- "Describe the type and store information about it as needed"
-
- # describe the type
- status = roci.OCIDescribeAny(
- connection.handle,
- self.environment.errorHandle,
- self.tdo, 0,
- roci.OCI_OTYPE_PTR,
- roci.OCI_DEFAULT,
- roci.OCI_PTYPE_TYPE,
- describeHandle)
- self.environment.checkForError(
- status, "ObjectType_Describe(): describe type")
-
- # get top level parameter descriptor
- paramptr = lltype.malloc(roci.Ptr(roci.OCIParam).TO,
- 1, flavor='raw')
- try:
- status = roci.OCIAttrGet(
- describeHandle, roci.OCI_HTYPE_DESCRIBE,
- rffi.cast(roci.dvoidp, paramptr),
- lltype.nullptr(roci.Ptr(roci.ub4).TO),
- roci.OCI_ATTR_PARAM,
- self.environment.errorHandle)
- self.environment.checkForError(
- status,
- "ObjectType_Describe(): get top level parameter descriptor")
- toplevelParam = paramptr[0]
- finally:
- lltype.free(paramptr, flavor='raw')
-
- # determine type of type
- typecodeptr = lltype.malloc(roci.Ptr(roci.OCITypeCode).TO,
- 1, flavor='raw')
- try:
- status = roci.OCIAttrGet(
- toplevelParam, roci.OCI_DTYPE_PARAM,
- rffi.cast(roci.dvoidp, typecodeptr),
- lltype.nullptr(roci.Ptr(roci.ub4).TO),
- roci.OCI_ATTR_TYPECODE,
- self.environment.errorHandle)
- self.environment.checkForError(
- status, "ObjectType_Describe(): get type code")
- typeCode = rffi.cast(lltype.Signed, typecodeptr[0])
- finally:
- lltype.free(typecodeptr, flavor='raw')
-
- # if a collection, need to determine the sub type
- if typeCode == roci.OCI_TYPECODE_NAMEDCOLLECTION:
- self.isCollection = 1
-
- # determine type of collection
- typecodeptr = lltype.malloc(roci.Ptr(roci.OCITypeCode).TO,
- 1, flavor='raw')
- try:
- status = roci.OCIAttrGet(
- toplevelParam, roci.OCI_DTYPE_PARAM,
- rffi.cast(roci.dvoidp, typecodeptr),
- lltype.nullptr(roci.Ptr(roci.ub4).TO),
- roci.OCI_ATTR_TYPECODE,
- self.environment.errorHandle)
- self.environment.checkForError(
- status, "ObjectType_Describe(): get collection type code")
- self.collectionTypeCode = typecodeptr[0]
- finally:
- lltype.free(typecodeptr, flavor='raw')
-
- # acquire collection parameter descriptor
- paramptr = lltype.malloc(roci.Ptr(roci.OCIParam).TO,
- 1, flavor='raw')
- try:
- status = roci.OCIAttrGet(
- toplevelParam, roci.OCI_DTYPE_PARAM,
- rffi.cast(roci.dvoidp, paramptr),
- lltype.nullptr(roci.Ptr(roci.ub4).TO),
- roci.OCI_ATTR_COLLECTION_ELEMENT,
- self.environment.errorHandle)
- self.environment.checkForError(
- status,
- "ObjectType_Describe(): get collection descriptor")
- collectionParam = paramptr[0]
- finally:
- lltype.free(paramptr, flavor='raw')
-
- # determine type of element
- typecodeptr = lltype.malloc(roci.Ptr(roci.OCITypeCode).TO,
- 1, flavor='raw')
- try:
- status = roci.OCIAttrGet(
- collectionParam, roci.OCI_DTYPE_PARAM,
- rffi.cast(roci.dvoidp, typecodeptr),
- lltype.nullptr(roci.Ptr(roci.ub4).TO),
- roci.OCI_ATTR_TYPECODE,
- self.environment.errorHandle)
- self.environment.checkForError(
- status, "ObjectType_Describe(): get element type code")
- self.elementTypeCode = rffi.cast(lltype.Signed, typecodeptr[0])
- finally:
- lltype.free(typecodeptr, flavor='raw')
-
- # if element type is an object type get its type
- if self.elementTypeCode == roci.OCI_TYPECODE_OBJECT:
- self.elementType = W_ObjectType(connection, collectionParam)
- else:
- self.elementType = None
-
- # determine the number of attributes
- numptr = lltype.malloc(roci.Ptr(roci.ub2).TO,
- 1, flavor='raw')
- try:
- status = roci.OCIAttrGet(
- toplevelParam, roci.OCI_DTYPE_PARAM,
- rffi.cast(roci.dvoidp, numptr),
- lltype.nullptr(roci.Ptr(roci.ub4).TO),
- roci.OCI_ATTR_NUM_TYPE_ATTRS,
- self.environment.errorHandle)
- self.environment.checkForError(
- status, "ObjectType_Describe(): get number of attributes")
- numAttributes = numptr[0]
- finally:
- lltype.free(numptr, flavor='raw')
-
- # allocate the attribute list and dictionary
- self.attributes = []
- self.attributesByName = {}
-
- # acquire the list parameter descriptor
- listptr = lltype.malloc(roci.Ptr(roci.OCIParam).TO,
- 1, flavor='raw')
- try:
- status = roci.OCIAttrGet(
- toplevelParam, roci.OCI_DTYPE_PARAM,
- rffi.cast(roci.dvoidp, listptr),
- lltype.nullptr(roci.Ptr(roci.ub4).TO),
- roci.OCI_ATTR_LIST_TYPE_ATTRS,
- self.environment.errorHandle)
- self.environment.checkForError(
- status, "ObjectType_Describe(): get list parameter descriptor")
- attributeListParam = listptr[0]
- finally:
- lltype.free(listptr, flavor='raw')
-
- # create attribute information for each attribute
- for i in range(numAttributes):
- paramptr = lltype.malloc(roci.Ptr(roci.OCIParam).TO,
- 1, flavor='raw')
- try:
- status = roci.OCIParamGet(
- attributeListParam, roci.OCI_DTYPE_PARAM,
- self.environment.errorHandle,
- paramptr, i + 1)
- self.environment.checkForError(
- status,
- "ObjectType_Describe(): get attribute param descriptor")
- attribute = W_ObjectAttribute(connection, paramptr[0])
- finally:
- lltype.free(paramptr, flavor='raw')
-
- self.attributes.append(attribute)
- self.attributesByName[attribute.name] = attribute
-
- def get_attributes(space, self):
- return space.newlist([space.wrap(attr) for attr in self.attributes])
-
-W_ObjectType.typedef = TypeDef(
- 'ObjectType',
- schema = interp_attrproperty('schema', W_ObjectType),
- name = interp_attrproperty('name', W_ObjectType),
- attributes = GetSetProperty(W_ObjectType.get_attributes),
- )
-
-class W_ObjectAttribute(Wrappable):
- def __init__(self, connection, param):
- self.initialize(connection, param)
-
- def initialize(self, connection, param):
- # determine the name of the attribute
- nameptr = lltype.malloc(rffi.CArrayPtr(roci.oratext).TO, 1,
- flavor='raw')
- lenptr = lltype.malloc(rffi.CArrayPtr(roci.ub4).TO, 1,
- flavor='raw')
- try:
- status = roci.OCIAttrGet(
- param, roci.OCI_DTYPE_PARAM,
- rffi.cast(roci.dvoidp, nameptr),
- lenptr,
- roci.OCI_ATTR_NAME,
- connection.environment.errorHandle)
- connection.environment.checkForError(
- status,
- "ObjectAttribute_Initialize(): get name")
- self.name = rffi.charpsize2str(nameptr[0], lenptr[0])
- finally:
- lltype.free(nameptr, flavor='raw')
- lltype.free(lenptr, flavor='raw')
-
- # determine the type of the attribute
- typecodeptr = lltype.malloc(roci.Ptr(roci.OCITypeCode).TO,
- 1, flavor='raw')
- try:
- status = roci.OCIAttrGet(
- param, roci.OCI_DTYPE_PARAM,
- rffi.cast(roci.dvoidp, typecodeptr),
- lltype.nullptr(roci.Ptr(roci.ub4).TO),
- roci.OCI_ATTR_TYPECODE,
- connection.environment.errorHandle)
- connection.environment.checkForError(
- status, "ObjectType_Describe(): get type code")
- self.typeCode = rffi.cast(lltype.Signed, typecodeptr[0])
- finally:
- lltype.free(typecodeptr, flavor='raw')
-
- # if the type of the attribute is object, recurse
- if self.typeCode in (roci.OCI_TYPECODE_NAMEDCOLLECTION,
- roci.OCI_TYPECODE_OBJECT):
- self.subType = W_ObjectType(connection, param)
- else:
- self.subType = None
-
-W_ObjectAttribute.typedef = TypeDef(
- 'ObjectAttribute',
- name = interp_attrproperty('name', W_ObjectAttribute),
- )
-
-class W_ExternalObject(Wrappable):
- def __init__(self, var, objectType, instance, indicator,
- isIndependent=True):
- self.var = var # keepalive
- self.objectType = objectType
- self.instance = instance
- self.indicator = indicator
- self.isIndependent = isIndependent
-
- def getattr(self, space, attr):
- try:
- attribute = self.objectType.attributesByName[attr]
- except KeyError:
- msg = "ExternalObject has no attribute '%s'" %(attr,)
- raise OperationError(space.w_AttributeError, space.wrap(msg))
-
- environment = self.objectType.environment
-
- scalarvalueindicatorptr = lltype.malloc(rffi.CArrayPtr(roci.OCIInd).TO,
- 1, flavor='raw')
- valueindicatorptr = lltype.malloc(rffi.CArrayPtr(roci.dvoidp).TO,
- 1, flavor='raw')
- valueptr = lltype.malloc(rffi.CArrayPtr(roci.dvoidp).TO,
- 1, flavor='raw')
- tdoptr = lltype.malloc(rffi.CArrayPtr(roci.OCIType).TO,
- 1, flavor='raw')
- nameptr = lltype.malloc(rffi.CArrayPtr(roci.oratext).TO,
- 1, flavor='raw')
- nameptr[0] = rffi.str2charp(attr)
- namelenptr = lltype.malloc(rffi.CArrayPtr(roci.ub4).TO,
- 1, flavor='raw')
- namelenptr[0] = rffi.cast(roci.ub4, len(attr))
-
- try:
- status = roci.OCIObjectGetAttr(
- environment.handle,
- environment.errorHandle,
- self.instance,
- self.indicator,
- self.objectType.tdo,
- nameptr, namelenptr, 1,
- lltype.nullptr(roci.Ptr(roci.ub4).TO), 0,
- scalarvalueindicatorptr,
- valueindicatorptr,
- valueptr,
- tdoptr)
- environment.checkForError(
- status, "ExternalObject_GetAttributeValue(): getting value")
-
- # determine the proper null indicator
- valueIndicator = valueindicatorptr[0]
- if not valueIndicator:
- valueIndicator = rffi.cast(roci.dvoidp,
- scalarvalueindicatorptr)
- value = valueptr[0]
-
- return convertObject(
- space, environment,
- attribute.typeCode,
- value, valueIndicator,
- self, attribute.subType)
- finally:
- lltype.free(scalarvalueindicatorptr, flavor='raw')
- lltype.free(valueindicatorptr, flavor='raw')
- lltype.free(valueptr, flavor='raw')
- lltype.free(tdoptr, flavor='raw')
- rffi.free_charp(nameptr[0])
- lltype.free(nameptr, flavor='raw')
- lltype.free(namelenptr, flavor='raw')
-
- getattr.unwrap_spec = ['self', ObjSpace, str]
-
-W_ExternalObject.typedef = TypeDef(
- 'ExternalObject',
- type = interp_attrproperty('objectType', W_ExternalObject),
- __getattr__ = interp2app(W_ExternalObject.getattr),
- )
-
-def convertObject(space, environment, typeCode,
- value, indicator, var, subtype):
-
- # null values returned as None
- if (rffi.cast(lltype.Signed,
- rffi.cast(roci.Ptr(roci.OCIInd),
- indicator)[0])
- ==
- rffi.cast(lltype.Signed, roci.OCI_IND_NULL)):
- return space.w_None
-
- if typeCode in (roci.OCI_TYPECODE_CHAR,
- roci.OCI_TYPECODE_VARCHAR,
- roci.OCI_TYPECODE_VARCHAR2):
- strValue = rffi.cast(roci.Ptr(roci.OCIString), value)[0]
- ptr = roci.OCIStringPtr(environment.handle, strValue)
- size = roci.OCIStringSize(environment.handle, strValue)
- return config.w_string(space, ptr, size)
- elif typeCode == roci.OCI_TYPECODE_NUMBER:
- return transform.OracleNumberToPythonFloat(
- environment,
- rffi.cast(roci.Ptr(roci.OCINumber), value))
- elif typeCode == roci.OCI_TYPECODE_DATE:
- dateValue = rffi.cast(roci.Ptr(roci.OCIDate), value)
- return transform.OracleDateToPythonDateTime(environment, dateValue)
- elif typeCode == roci.OCI_TYPECODE_TIMESTAMP:
- dateValue = rffi.cast(roci.Ptr(roci.OCIDateTime), value)
- return transform.OracleTimestampToPythonDate(environment, dateValue)
- elif typeCode == roci.OCI_TYPECODE_OBJECT:
- return space.wrap(W_ExternalObject(var, subtype, value, indicator,
- isIndependent=False))
- elif typeCode == roci.OCI_TYPECODE_NAMEDCOLLECTION:
- return convertCollection(space, environment, value, var, subtype)
-
- raise OperationError(
- get(space).w_NotSupportedError,
- space.wrap(
- "ExternalObjectVar_GetAttributeValue(): unhandled data type %d" % (
- typeCode,)))
-
-
-def convertCollection(space, environment, value, var, objectType):
- "Convert a collection to a Python list"
-
- result_w = []
-
- iterptr = lltype.malloc(rffi.CArrayPtr(roci.OCIIter).TO, 1, flavor='raw')
- try:
- # create the iterator
- status = roci.OCIIterCreate(
- environment.handle,
- environment.errorHandle,
- value,
- iterptr)
- environment.checkForError(
- status, "ExternalObjectVar_ConvertCollection(): creating iterator")
-
- try:
- # create the result list
- valueptr = lltype.malloc(rffi.CArrayPtr(roci.dvoidp).TO,
- 1, flavor='raw')
- indicatorptr = lltype.malloc(rffi.CArrayPtr(roci.dvoidp).TO,
- 1, flavor='raw')
- eofptr = lltype.malloc(rffi.CArrayPtr(roci.boolean).TO,
- 1, flavor='raw')
- try:
- while True:
- status = roci.OCIIterNext(
- environment.handle,
- environment.errorHandle,
- iterptr[0],
- valueptr,
- indicatorptr,
- eofptr)
- environment.checkForError(
- status,
- "ExternalObjectVar_ConvertCollection(): get next")
-
- if rffi.cast(lltype.Signed, eofptr[0]):
- break
- element = convertObject(
- space, environment,
- objectType.elementTypeCode,
- valueptr[0], indicatorptr[0],
- var, objectType.elementType)
- result_w.append(element)
- finally:
- lltype.free(valueptr, flavor='raw')
- lltype.free(indicatorptr, flavor='raw')
- lltype.free(eofptr, flavor='raw')
-
- finally:
- roci.OCIIterDelete(
- environment.handle,
- environment.errorHandle,
- iterptr)
- finally:
- lltype.free(iterptr, flavor='raw')
-
- return space.newlist(result_w)
-
+from pypy.interpreter.baseobjspace import Wrappable
+from pypy.interpreter.typedef import TypeDef, GetSetProperty
+from pypy.interpreter.typedef import interp_attrproperty
+from pypy.interpreter.gateway import ObjSpace
+from pypy.interpreter.gateway import interp2app
+from pypy.interpreter.error import OperationError
+from pypy.rpython.lltypesystem import rffi, lltype
+
+from pypy.module.oracle import roci, config, transform
+from pypy.module.oracle.interp_error import get
+
+class W_ObjectType(Wrappable):
+ def __init__(self, connection, param):
+ self.tdo = lltype.nullptr(roci.dvoidp.TO)
+ self.environment = connection.environment
+ self.isCollection = False
+ self.initialize(connection, param)
+
+ def __del__(self):
+ if self.tdo:
+ roci.OCIObjectUnpin(
+ self.environment.handle,
+ self.environment.errorHandle,
+ self.tdo)
+
+ def initialize(self, connection, param):
+ nameptr = lltype.malloc(rffi.CArrayPtr(roci.oratext).TO, 1,
+ flavor='raw')
+ lenptr = lltype.malloc(rffi.CArrayPtr(roci.ub4).TO, 1,
+ flavor='raw')
+ try:
+ # determine the schema of the type
+ status = roci.OCIAttrGet(
+ param, roci.OCI_HTYPE_DESCRIBE,
+ rffi.cast(roci.dvoidp, nameptr),
+ lenptr,
+ roci.OCI_ATTR_SCHEMA_NAME,
+ self.environment.errorHandle)
+ self.environment.checkForError(
+ status,
+ "ObjectType_Initialize(): get schema name")
+ self.schema = rffi.charpsize2str(nameptr[0], lenptr[0])
+
+ # determine the name of the type
+ status = roci.OCIAttrGet(
+ param, roci.OCI_HTYPE_DESCRIBE,
+ rffi.cast(roci.dvoidp, nameptr),
+ lenptr,
+ roci.OCI_ATTR_TYPE_NAME,
+ self.environment.errorHandle)
+ self.environment.checkForError(
+ status,
+ "ObjectType_Initialize(): get schema name")
+ self.name = rffi.charpsize2str(nameptr[0], lenptr[0])
+ finally:
+ lltype.free(nameptr, flavor='raw')
+ lltype.free(lenptr, flavor='raw')
+
+ # retrieve TDO (type descriptor object) of the parameter
+ tdorefptr = lltype.malloc(rffi.CArrayPtr(roci.OCIRef).TO, 1,
+ flavor='raw')
+ try:
+ status = roci.OCIAttrGet(
+ param, roci.OCI_HTYPE_DESCRIBE,
+ rffi.cast(roci.dvoidp, tdorefptr),
+ lltype.nullptr(roci.Ptr(roci.ub4).TO),
+ roci.OCI_ATTR_REF_TDO,
+ self.environment.errorHandle)
+ self.environment.checkForError(
+ status,
+ "ObjectType_Initialize(): get TDO reference")
+ tdoref = tdorefptr[0]
+ finally:
+ lltype.free(tdorefptr, flavor='raw')
+
+ tdoptr = lltype.malloc(rffi.CArrayPtr(roci.dvoidp).TO, 1,
+ flavor='raw')
+ try:
+ status = roci.OCIObjectPin(
+ self.environment.handle,
+ self.environment.errorHandle,
+ tdoref,
+ None, roci.OCI_PIN_ANY,
+ roci.OCI_DURATION_SESSION, roci.OCI_LOCK_NONE,
+ tdoptr)
+ self.environment.checkForError(
+ status,
+ "ObjectType_Initialize(): pin TDO reference")
+ self.tdo = tdoptr[0]
+ finally:
+ lltype.free(tdoptr, flavor='raw')
+
+ # acquire a describe handle
+ handleptr = lltype.malloc(rffi.CArrayPtr(roci.OCIDescribe).TO,
+ 1, flavor='raw')
+ try:
+ status = roci.OCIHandleAlloc(
+ self.environment.handle,
+ handleptr, roci.OCI_HTYPE_DESCRIBE, 0,
+ lltype.nullptr(rffi.CArray(roci.dvoidp)))
+ self.environment.checkForError(
+ status, "ObjectType_Initialize(): allocate describe handle")
+ describeHandle = handleptr[0]
+ finally:
+ lltype.free(handleptr, flavor='raw')
+
+ # describe the type
+ try:
+ self.describe(connection, describeHandle)
+ finally:
+ roci.OCIHandleFree(describeHandle, roci.OCI_HTYPE_DESCRIBE)
+
+ def describe(self, connection, describeHandle):
+ "Describe the type and store information about it as needed"
+
+ # describe the type
+ status = roci.OCIDescribeAny(
+ connection.handle,
+ self.environment.errorHandle,
+ self.tdo, 0,
+ roci.OCI_OTYPE_PTR,
+ roci.OCI_DEFAULT,
+ roci.OCI_PTYPE_TYPE,
+ describeHandle)
+ self.environment.checkForError(
+ status, "ObjectType_Describe(): describe type")
+
+ # get top level parameter descriptor
+ paramptr = lltype.malloc(roci.Ptr(roci.OCIParam).TO,
+ 1, flavor='raw')
+ try:
+ status = roci.OCIAttrGet(
+ describeHandle, roci.OCI_HTYPE_DESCRIBE,
+ rffi.cast(roci.dvoidp, paramptr),
+ lltype.nullptr(roci.Ptr(roci.ub4).TO),
+ roci.OCI_ATTR_PARAM,
+ self.environment.errorHandle)
+ self.environment.checkForError(
+ status,
+ "ObjectType_Describe(): get top level parameter descriptor")
+ toplevelParam = paramptr[0]
+ finally:
+ lltype.free(paramptr, flavor='raw')
+
+ # determine type of type
+ typecodeptr = lltype.malloc(roci.Ptr(roci.OCITypeCode).TO,
+ 1, flavor='raw')
+ try:
+ status = roci.OCIAttrGet(
+ toplevelParam, roci.OCI_DTYPE_PARAM,
+ rffi.cast(roci.dvoidp, typecodeptr),
+ lltype.nullptr(roci.Ptr(roci.ub4).TO),
+ roci.OCI_ATTR_TYPECODE,
+ self.environment.errorHandle)
+ self.environment.checkForError(
+ status, "ObjectType_Describe(): get type code")
+ typeCode = rffi.cast(lltype.Signed, typecodeptr[0])
+ finally:
+ lltype.free(typecodeptr, flavor='raw')
+
+ # if a collection, need to determine the sub type
+ if typeCode == roci.OCI_TYPECODE_NAMEDCOLLECTION:
+ self.isCollection = 1
+
+ # determine type of collection
+ typecodeptr = lltype.malloc(roci.Ptr(roci.OCITypeCode).TO,
+ 1, flavor='raw')
+ try:
+ status = roci.OCIAttrGet(
+ toplevelParam, roci.OCI_DTYPE_PARAM,
+ rffi.cast(roci.dvoidp, typecodeptr),
+ lltype.nullptr(roci.Ptr(roci.ub4).TO),
+ roci.OCI_ATTR_TYPECODE,
+ self.environment.errorHandle)
+ self.environment.checkForError(
+ status, "ObjectType_Describe(): get collection type code")
+ self.collectionTypeCode = typecodeptr[0]
+ finally:
+ lltype.free(typecodeptr, flavor='raw')
+
+ # acquire collection parameter descriptor
+ paramptr = lltype.malloc(roci.Ptr(roci.OCIParam).TO,
+ 1, flavor='raw')
+ try:
+ status = roci.OCIAttrGet(
+ toplevelParam, roci.OCI_DTYPE_PARAM,
+ rffi.cast(roci.dvoidp, paramptr),
+ lltype.nullptr(roci.Ptr(roci.ub4).TO),
+ roci.OCI_ATTR_COLLECTION_ELEMENT,
+ self.environment.errorHandle)
+ self.environment.checkForError(
+ status,
+ "ObjectType_Describe(): get collection descriptor")
+ collectionParam = paramptr[0]
+ finally:
+ lltype.free(paramptr, flavor='raw')
+
+ # determine type of element
+ typecodeptr = lltype.malloc(roci.Ptr(roci.OCITypeCode).TO,
+ 1, flavor='raw')
+ try:
+ status = roci.OCIAttrGet(
+ collectionParam, roci.OCI_DTYPE_PARAM,
+ rffi.cast(roci.dvoidp, typecodeptr),
+ lltype.nullptr(roci.Ptr(roci.ub4).TO),
+ roci.OCI_ATTR_TYPECODE,
+ self.environment.errorHandle)
+ self.environment.checkForError(
+ status, "ObjectType_Describe(): get element type code")
+ self.elementTypeCode = rffi.cast(lltype.Signed, typecodeptr[0])
+ finally:
+ lltype.free(typecodeptr, flavor='raw')
+
+ # if element type is an object type get its type
+ if self.elementTypeCode == roci.OCI_TYPECODE_OBJECT:
+ self.elementType = W_ObjectType(connection, collectionParam)
+ else:
+ self.elementType = None
+
+ # determine the number of attributes
+ numptr = lltype.malloc(roci.Ptr(roci.ub2).TO,
+ 1, flavor='raw')
+ try:
+ status = roci.OCIAttrGet(
+ toplevelParam, roci.OCI_DTYPE_PARAM,
+ rffi.cast(roci.dvoidp, numptr),
+ lltype.nullptr(roci.Ptr(roci.ub4).TO),
+ roci.OCI_ATTR_NUM_TYPE_ATTRS,
+ self.environment.errorHandle)
+ self.environment.checkForError(
+ status, "ObjectType_Describe(): get number of attributes")
+ numAttributes = numptr[0]
+ finally:
+ lltype.free(numptr, flavor='raw')
+
+ # allocate the attribute list and dictionary
+ self.attributes = []
+ self.attributesByName = {}
+
+ # acquire the list parameter descriptor
+ listptr = lltype.malloc(roci.Ptr(roci.OCIParam).TO,
+ 1, flavor='raw')
+ try:
+ status = roci.OCIAttrGet(
+ toplevelParam, roci.OCI_DTYPE_PARAM,
+ rffi.cast(roci.dvoidp, listptr),
+ lltype.nullptr(roci.Ptr(roci.ub4).TO),
+ roci.OCI_ATTR_LIST_TYPE_ATTRS,
+ self.environment.errorHandle)
+ self.environment.checkForError(
+ status, "ObjectType_Describe(): get list parameter descriptor")
+ attributeListParam = listptr[0]
+ finally:
+ lltype.free(listptr, flavor='raw')
+
+ # create attribute information for each attribute
+ for i in range(numAttributes):
+ paramptr = lltype.malloc(roci.Ptr(roci.OCIParam).TO,
+ 1, flavor='raw')
+ try:
+ status = roci.OCIParamGet(
+ attributeListParam, roci.OCI_DTYPE_PARAM,
+ self.environment.errorHandle,
+ paramptr, i + 1)
+ self.environment.checkForError(
+ status,
+ "ObjectType_Describe(): get attribute param descriptor")
+ attribute = W_ObjectAttribute(connection, paramptr[0])
+ finally:
+ lltype.free(paramptr, flavor='raw')
+
+ self.attributes.append(attribute)
+ self.attributesByName[attribute.name] = attribute
+
+ def get_attributes(space, self):
+ return space.newlist([space.wrap(attr) for attr in self.attributes])
+
+W_ObjectType.typedef = TypeDef(
+ 'ObjectType',
+ schema = interp_attrproperty('schema', W_ObjectType),
+ name = interp_attrproperty('name', W_ObjectType),
+ attributes = GetSetProperty(W_ObjectType.get_attributes),
+ )
+
+class W_ObjectAttribute(Wrappable):
+ def __init__(self, connection, param):
+ self.initialize(connection, param)
+
+ def initialize(self, connection, param):
+ # determine the name of the attribute
+ nameptr = lltype.malloc(rffi.CArrayPtr(roci.oratext).TO, 1,
+ flavor='raw')
+ lenptr = lltype.malloc(rffi.CArrayPtr(roci.ub4).TO, 1,
+ flavor='raw')
+ try:
+ status = roci.OCIAttrGet(
+ param, roci.OCI_DTYPE_PARAM,
+ rffi.cast(roci.dvoidp, nameptr),
+ lenptr,
+ roci.OCI_ATTR_NAME,
+ connection.environment.errorHandle)
+ connection.environment.checkForError(
+ status,
+ "ObjectAttribute_Initialize(): get name")
+ self.name = rffi.charpsize2str(nameptr[0], lenptr[0])
+ finally:
+ lltype.free(nameptr, flavor='raw')
+ lltype.free(lenptr, flavor='raw')
+
+ # determine the type of the attribute
+ typecodeptr = lltype.malloc(roci.Ptr(roci.OCITypeCode).TO,
+ 1, flavor='raw')
+ try:
+ status = roci.OCIAttrGet(
+ param, roci.OCI_DTYPE_PARAM,
+ rffi.cast(roci.dvoidp, typecodeptr),
+ lltype.nullptr(roci.Ptr(roci.ub4).TO),
+ roci.OCI_ATTR_TYPECODE,
+ connection.environment.errorHandle)
+ connection.environment.checkForError(
+ status, "ObjectType_Describe(): get type code")
+ self.typeCode = rffi.cast(lltype.Signed, typecodeptr[0])
+ finally:
+ lltype.free(typecodeptr, flavor='raw')
+
+ # if the type of the attribute is object, recurse
+ if self.typeCode in (roci.OCI_TYPECODE_NAMEDCOLLECTION,
+ roci.OCI_TYPECODE_OBJECT):
+ self.subType = W_ObjectType(connection, param)
+ else:
+ self.subType = None
+
+W_ObjectAttribute.typedef = TypeDef(
+ 'ObjectAttribute',
+ name = interp_attrproperty('name', W_ObjectAttribute),
+ )
+
+class W_ExternalObject(Wrappable):
+ def __init__(self, var, objectType, instance, indicator,
+ isIndependent=True):
+ self.var = var # keepalive
+ self.objectType = objectType
+ self.instance = instance
+ self.indicator = indicator
+ self.isIndependent = isIndependent
+
+ def getattr(self, space, attr):
+ try:
+ attribute = self.objectType.attributesByName[attr]
+ except KeyError:
+ msg = "ExternalObject has no attribute '%s'" %(attr,)
+ raise OperationError(space.w_AttributeError, space.wrap(msg))
+
+ environment = self.objectType.environment
+
+ scalarvalueindicatorptr = lltype.malloc(rffi.CArrayPtr(roci.OCIInd).TO,
+ 1, flavor='raw')
+ valueindicatorptr = lltype.malloc(rffi.CArrayPtr(roci.dvoidp).TO,
+ 1, flavor='raw')
+ valueptr = lltype.malloc(rffi.CArrayPtr(roci.dvoidp).TO,
+ 1, flavor='raw')
+ tdoptr = lltype.malloc(rffi.CArrayPtr(roci.OCIType).TO,
+ 1, flavor='raw')
+ nameptr = lltype.malloc(rffi.CArrayPtr(roci.oratext).TO,
+ 1, flavor='raw')
+ nameptr[0] = rffi.str2charp(attr)
+ namelenptr = lltype.malloc(rffi.CArrayPtr(roci.ub4).TO,
+ 1, flavor='raw')
+ namelenptr[0] = rffi.cast(roci.ub4, len(attr))
+
+ try:
+ status = roci.OCIObjectGetAttr(
+ environment.handle,
+ environment.errorHandle,
+ self.instance,
+ self.indicator,
+ self.objectType.tdo,
+ nameptr, namelenptr, 1,
+ lltype.nullptr(roci.Ptr(roci.ub4).TO), 0,
+ scalarvalueindicatorptr,
+ valueindicatorptr,
+ valueptr,
+ tdoptr)
+ environment.checkForError(
+ status, "ExternalObject_GetAttributeValue(): getting value")
+
+ # determine the proper null indicator
+ valueIndicator = valueindicatorptr[0]
+ if not valueIndicator:
+ valueIndicator = rffi.cast(roci.dvoidp,
+ scalarvalueindicatorptr)
+ value = valueptr[0]
+
+ return convertObject(
+ space, environment,
+ attribute.typeCode,
+ value, valueIndicator,
+ self, attribute.subType)
+ finally:
+ lltype.free(scalarvalueindicatorptr, flavor='raw')
+ lltype.free(valueindicatorptr, flavor='raw')
+ lltype.free(valueptr, flavor='raw')
+ lltype.free(tdoptr, flavor='raw')
+ rffi.free_charp(nameptr[0])
+ lltype.free(nameptr, flavor='raw')
+ lltype.free(namelenptr, flavor='raw')
+
+ getattr.unwrap_spec = ['self', ObjSpace, str]
+
+W_ExternalObject.typedef = TypeDef(
+ 'ExternalObject',
+ type = interp_attrproperty('objectType', W_ExternalObject),
+ __getattr__ = interp2app(W_ExternalObject.getattr),
+ )
+
+def convertObject(space, environment, typeCode,
+ value, indicator, var, subtype):
+
+ # null values returned as None
+ if (rffi.cast(lltype.Signed,
+ rffi.cast(roci.Ptr(roci.OCIInd),
+ indicator)[0])
+ ==
+ rffi.cast(lltype.Signed, roci.OCI_IND_NULL)):
+ return space.w_None
+
+ if typeCode in (roci.OCI_TYPECODE_CHAR,
+ roci.OCI_TYPECODE_VARCHAR,
+ roci.OCI_TYPECODE_VARCHAR2):
+ strValue = rffi.cast(roci.Ptr(roci.OCIString), value)[0]
+ ptr = roci.OCIStringPtr(environment.handle, strValue)
+ size = roci.OCIStringSize(environment.handle, strValue)
+ return config.w_string(space, ptr, size)
+ elif typeCode == roci.OCI_TYPECODE_NUMBER:
+ return transform.OracleNumberToPythonFloat(
+ environment,
+ rffi.cast(roci.Ptr(roci.OCINumber), value))
+ elif typeCode == roci.OCI_TYPECODE_DATE:
+ dateValue = rffi.cast(roci.Ptr(roci.OCIDate), value)
+ return transform.OracleDateToPythonDateTime(environment, dateValue)
+ elif typeCode == roci.OCI_TYPECODE_TIMESTAMP:
+ dateValue = rffi.cast(roci.Ptr(roci.OCIDateTime), value)
+ return transform.OracleTimestampToPythonDate(environment, dateValue)
+ elif typeCode == roci.OCI_TYPECODE_OBJECT:
+ return space.wrap(W_ExternalObject(var, subtype, value, indicator,
+ isIndependent=False))
+ elif typeCode == roci.OCI_TYPECODE_NAMEDCOLLECTION:
+ return convertCollection(space, environment, value, var, subtype)
+
+ raise OperationError(
+ get(space).w_NotSupportedError,
+ space.wrap(
+ "ExternalObjectVar_GetAttributeValue(): unhandled data type %d" % (
+ typeCode,)))
+
+
+def convertCollection(space, environment, value, var, objectType):
+ "Convert a collection to a Python list"
+
+ result_w = []
+
+ iterptr = lltype.malloc(rffi.CArrayPtr(roci.OCIIter).TO, 1, flavor='raw')
+ try:
+ # create the iterator
+ status = roci.OCIIterCreate(
+ environment.handle,
+ environment.errorHandle,
+ value,
+ iterptr)
+ environment.checkForError(
+ status, "ExternalObjectVar_ConvertCollection(): creating iterator")
+
+ try:
+ # create the result list
+ valueptr = lltype.malloc(rffi.CArrayPtr(roci.dvoidp).TO,
+ 1, flavor='raw')
+ indicatorptr = lltype.malloc(rffi.CArrayPtr(roci.dvoidp).TO,
+ 1, flavor='raw')
+ eofptr = lltype.malloc(rffi.CArrayPtr(roci.boolean).TO,
+ 1, flavor='raw')
+ try:
+ while True:
+ status = roci.OCIIterNext(
+ environment.handle,
+ environment.errorHandle,
+ iterptr[0],
+ valueptr,
+ indicatorptr,
+ eofptr)
+ environment.checkForError(
+ status,
+ "ExternalObjectVar_ConvertCollection(): get next")
+
+ if rffi.cast(lltype.Signed, eofptr[0]):
+ break
+ element = convertObject(
+ space, environment,
+ objectType.elementTypeCode,
+ valueptr[0], indicatorptr[0],
+ var, objectType.elementType)
+ result_w.append(element)
+ finally:
+ lltype.free(valueptr, flavor='raw')
+ lltype.free(indicatorptr, flavor='raw')
+ lltype.free(eofptr, flavor='raw')
+
+ finally:
+ roci.OCIIterDelete(
+ environment.handle,
+ environment.errorHandle,
+ iterptr)
+ finally:
+ lltype.free(iterptr, flavor='raw')
+
+ return space.newlist(result_w)
+
Modified: pypy/trunk/pypy/module/oracle/interp_pool.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/interp_pool.py (original)
+++ pypy/trunk/pypy/module/oracle/interp_pool.py Tue Mar 9 15:46:14 2010
@@ -1,216 +1,216 @@
-from pypy.interpreter.baseobjspace import Wrappable
-from pypy.interpreter.argument import Arguments, Signature
-from pypy.interpreter.gateway import ObjSpace, W_Root, NoneNotWrapped
-from pypy.interpreter.gateway import interp2app
-from pypy.interpreter.typedef import TypeDef, GetSetProperty
-from pypy.interpreter.typedef import interp_attrproperty, interp_attrproperty_w
-from pypy.interpreter.error import OperationError
-from pypy.rpython.lltypesystem import rffi, lltype
-
-Null = NoneNotWrapped
-
-from pypy.module.oracle import roci, config
-from pypy.module.oracle import interp_error, interp_environ
-from pypy.module.oracle.interp_error import get
-
-class W_SessionPool(Wrappable):
- def __init__(self):
- self.environment = None
-
- def descr_new(space, w_subtype,
- w_user, w_password, w_dsn,
- min, max, increment,
- w_connectiontype=Null,
- threaded=False,
- getmode=roci.OCI_SPOOL_ATTRVAL_NOWAIT,
- events=False,
- homogeneous=True):
- self = space.allocate_instance(W_SessionPool, w_subtype)
- W_SessionPool.__init__(self)
-
- if w_connectiontype is not None:
- if not space.is_true(space.issubtype(w_connectiontype,
- get(space).w_Connection)):
- raise OperationError(
- interp_error.get(space).w_ProgrammingError,
- space.wrap(
- "connectiontype must be a subclass of Connection"))
- self.w_connectionType = w_connectiontype
- else:
- self.w_connectionType = get(space).w_Connection
-
- self.w_username = w_user
- self.w_password = w_password
- self.w_tnsentry = w_dsn
-
- self.minSessions = min
- self.maxSessions = max
- self.sessionIncrement = increment
- self.homogeneous = homogeneous
-
- # set up the environment
- self.environment = interp_environ.Environment.create(
- space, threaded, events)
-
- # create the session pool handle
- handleptr = lltype.malloc(rffi.CArrayPtr(roci.OCIServer).TO,
- 1, flavor='raw')
- try:
- status = roci.OCIHandleAlloc(
- self.environment.handle,
- handleptr, roci.OCI_HTYPE_SPOOL, 0,
- lltype.nullptr(rffi.CArray(roci.dvoidp)))
- self.environment.checkForError(
- status, "SessionPool_New(): allocate handle")
- self.handle = handleptr[0]
- finally:
- lltype.free(handleptr, flavor='raw')
-
- # prepare pool mode
- poolMode = roci.OCI_SPC_STMTCACHE
- if self.homogeneous:
- poolMode |= roci.OCI_SPC_HOMOGENEOUS
-
- # create the session pool
- user_buf = config.StringBuffer()
- user_buf.fill(space, self.w_username)
- password_buf = config.StringBuffer()
- password_buf.fill(space, self.w_password)
- dsn_buf = config.StringBuffer()
- dsn_buf.fill(space, self.w_tnsentry)
- poolnameptr = lltype.malloc(rffi.CArrayPtr(roci.oratext).TO, 1,
- flavor='raw')
- poolnamelenptr = lltype.malloc(rffi.CArrayPtr(roci.ub4).TO, 1,
- flavor='raw')
-
- try:
- status = roci.OCISessionPoolCreate(
- self.environment.handle,
- self.environment.errorHandle,
- self.handle,
- poolnameptr, poolnamelenptr,
- dsn_buf.ptr, dsn_buf.size,
- min, max, increment,
- user_buf.ptr, user_buf.size,
- password_buf.ptr, password_buf.size,
- poolMode)
- self.environment.checkForError(
- status, "SessionPool_New(): create pool")
-
- self.w_name = config.w_string(space, poolnameptr[0],
- poolnamelenptr[0])
- finally:
- user_buf.clear()
- password_buf.clear()
- dsn_buf.clear()
-
- return space.wrap(self)
- descr_new.unwrap_spec = [ObjSpace, W_Root,
- W_Root, W_Root, W_Root,
- int, int, int,
- W_Root,
- bool, int, bool, bool]
-
- def checkConnected(self, space):
- if not self.handle:
- raise OperationError(
- get(space).w_InterfaceError,
- space.wrap("not connected"))
-
- def acquire(self, space, __args__):
- (w_user, w_password, w_cclass, w_purity
- ) = __args__.parse_obj(
- None, "acquire",
- Signature(["user", "password", "cclass", "purity"]),
- defaults_w=[None, None, None, space.w_False])
- if self.homogeneous and (w_user or w_password):
- raise OperationError(
- get(space).w_ProgrammingError,
- space.wrap("pool is homogeneous. "
- "Proxy authentication is not possible."))
-
- self.checkConnected(space)
-
- newargs = Arguments(space,
- __args__.arguments_w,
- __args__.keywords + ["pool"],
- __args__.keywords_w + [space.wrap(self)])
- return space.call_args(self.w_connectionType, newargs)
- acquire.unwrap_spec = ['self', ObjSpace, Arguments]
-
- def release(self, space, w_connection):
- self._release(space, w_connection, roci.OCI_DEFAULT)
- release.unwrap_spec = ['self', ObjSpace, W_Root]
-
- def drop(self, space, w_connection):
- self._release(space, w_connection, roci.OCI_SESSRLS_DROPSESS)
- drop.unwrap_spec = ['self', ObjSpace, W_Root]
-
- def _release(self, space, w_connection, mode):
- from pypy.module.oracle.interp_connect import W_Connection
- connection = space.interp_w(W_Connection, w_connection)
-
- self.checkConnected(space)
-
- if connection.sessionPool is not self:
- raise OperationError(
- get(space).w_ProgrammingError,
- space.wrap("connection not acquired with this session pool"))
-
- # attempt a rollback
- status = roci.OCITransRollback(
- connection.handle, connection.environment.errorHandle,
- roci.OCI_DEFAULT)
- # if dropping the connection from the pool, ignore the error
- if mode != roci.OCI_SESSRLS_DROPSESS:
- self.environment.checkForError(
- status, "SessionPool_Release(): rollback")
-
- # release the connection
- status = roci.OCISessionRelease(
- connection.handle, connection.environment.errorHandle,
- None, 0, mode)
- self.environment.checkForError(
- status, "SessionPool_Release(): release session")
-
- # ensure that the connection behaves as closed
- connection.sessionPool = None
- connection.handle = lltype.nullptr(roci.OCISvcCtx.TO)
-
-def computedProperty(oci_attr_code, oci_value_type):
- def fget(space, self):
- self.checkConnected(space)
-
- valueptr = lltype.malloc(rffi.CArrayPtr(oci_value_type).TO,
- 1, flavor='raw')
- try:
- status = roci.OCIAttrGet(
- self.handle, roci.OCI_HTYPE_SPOOL,
- rffi.cast(roci.dvoidp, valueptr),
- lltype.nullptr(roci.Ptr(roci.ub4).TO),
- oci_attr_code,
- self.environment.errorHandle)
- return space.wrap(valueptr[0])
- finally:
- lltype.free(valueptr, flavor='raw')
- return GetSetProperty(fget, cls=W_SessionPool)
-
-W_SessionPool.typedef = TypeDef(
- "SessionPool",
- __new__ = interp2app(W_SessionPool.descr_new.im_func),
- acquire = interp2app(W_SessionPool.acquire),
- release = interp2app(W_SessionPool.release),
- drop = interp2app(W_SessionPool.drop),
-
- username = interp_attrproperty_w('w_username', W_SessionPool),
- password = interp_attrproperty_w('w_password', W_SessionPool),
- tnsentry = interp_attrproperty_w('w_tnsentry', W_SessionPool),
- min = interp_attrproperty('minSessions', W_SessionPool),
- max = interp_attrproperty('maxSessions', W_SessionPool),
- increment = interp_attrproperty('sessionIncrement', W_SessionPool),
- homogeneous = interp_attrproperty('homogeneous', W_SessionPool),
- opened = computedProperty(roci.OCI_ATTR_SPOOL_OPEN_COUNT, roci.ub4),
- busy = computedProperty(roci.OCI_ATTR_SPOOL_BUSY_COUNT, roci.ub4),
- timeout = computedProperty(roci.OCI_ATTR_SPOOL_TIMEOUT, roci.ub4),
- getmode = computedProperty(roci.OCI_ATTR_SPOOL_GETMODE, roci.ub1),
- )
+from pypy.interpreter.baseobjspace import Wrappable
+from pypy.interpreter.argument import Arguments, Signature
+from pypy.interpreter.gateway import ObjSpace, W_Root, NoneNotWrapped
+from pypy.interpreter.gateway import interp2app
+from pypy.interpreter.typedef import TypeDef, GetSetProperty
+from pypy.interpreter.typedef import interp_attrproperty, interp_attrproperty_w
+from pypy.interpreter.error import OperationError
+from pypy.rpython.lltypesystem import rffi, lltype
+
+Null = NoneNotWrapped
+
+from pypy.module.oracle import roci, config
+from pypy.module.oracle import interp_error, interp_environ
+from pypy.module.oracle.interp_error import get
+
+class W_SessionPool(Wrappable):
+ def __init__(self):
+ self.environment = None
+
+ def descr_new(space, w_subtype,
+ w_user, w_password, w_dsn,
+ min, max, increment,
+ w_connectiontype=Null,
+ threaded=False,
+ getmode=roci.OCI_SPOOL_ATTRVAL_NOWAIT,
+ events=False,
+ homogeneous=True):
+ self = space.allocate_instance(W_SessionPool, w_subtype)
+ W_SessionPool.__init__(self)
+
+ if w_connectiontype is not None:
+ if not space.is_true(space.issubtype(w_connectiontype,
+ get(space).w_Connection)):
+ raise OperationError(
+ interp_error.get(space).w_ProgrammingError,
+ space.wrap(
+ "connectiontype must be a subclass of Connection"))
+ self.w_connectionType = w_connectiontype
+ else:
+ self.w_connectionType = get(space).w_Connection
+
+ self.w_username = w_user
+ self.w_password = w_password
+ self.w_tnsentry = w_dsn
+
+ self.minSessions = min
+ self.maxSessions = max
+ self.sessionIncrement = increment
+ self.homogeneous = homogeneous
+
+ # set up the environment
+ self.environment = interp_environ.Environment.create(
+ space, threaded, events)
+
+ # create the session pool handle
+ handleptr = lltype.malloc(rffi.CArrayPtr(roci.OCIServer).TO,
+ 1, flavor='raw')
+ try:
+ status = roci.OCIHandleAlloc(
+ self.environment.handle,
+ handleptr, roci.OCI_HTYPE_SPOOL, 0,
+ lltype.nullptr(rffi.CArray(roci.dvoidp)))
+ self.environment.checkForError(
+ status, "SessionPool_New(): allocate handle")
+ self.handle = handleptr[0]
+ finally:
+ lltype.free(handleptr, flavor='raw')
+
+ # prepare pool mode
+ poolMode = roci.OCI_SPC_STMTCACHE
+ if self.homogeneous:
+ poolMode |= roci.OCI_SPC_HOMOGENEOUS
+
+ # create the session pool
+ user_buf = config.StringBuffer()
+ user_buf.fill(space, self.w_username)
+ password_buf = config.StringBuffer()
+ password_buf.fill(space, self.w_password)
+ dsn_buf = config.StringBuffer()
+ dsn_buf.fill(space, self.w_tnsentry)
+ poolnameptr = lltype.malloc(rffi.CArrayPtr(roci.oratext).TO, 1,
+ flavor='raw')
+ poolnamelenptr = lltype.malloc(rffi.CArrayPtr(roci.ub4).TO, 1,
+ flavor='raw')
+
+ try:
+ status = roci.OCISessionPoolCreate(
+ self.environment.handle,
+ self.environment.errorHandle,
+ self.handle,
+ poolnameptr, poolnamelenptr,
+ dsn_buf.ptr, dsn_buf.size,
+ min, max, increment,
+ user_buf.ptr, user_buf.size,
+ password_buf.ptr, password_buf.size,
+ poolMode)
+ self.environment.checkForError(
+ status, "SessionPool_New(): create pool")
+
+ self.w_name = config.w_string(space, poolnameptr[0],
+ poolnamelenptr[0])
+ finally:
+ user_buf.clear()
+ password_buf.clear()
+ dsn_buf.clear()
+
+ return space.wrap(self)
+ descr_new.unwrap_spec = [ObjSpace, W_Root,
+ W_Root, W_Root, W_Root,
+ int, int, int,
+ W_Root,
+ bool, int, bool, bool]
+
+ def checkConnected(self, space):
+ if not self.handle:
+ raise OperationError(
+ get(space).w_InterfaceError,
+ space.wrap("not connected"))
+
+ def acquire(self, space, __args__):
+ (w_user, w_password, w_cclass, w_purity
+ ) = __args__.parse_obj(
+ None, "acquire",
+ Signature(["user", "password", "cclass", "purity"]),
+ defaults_w=[None, None, None, space.w_False])
+ if self.homogeneous and (w_user or w_password):
+ raise OperationError(
+ get(space).w_ProgrammingError,
+ space.wrap("pool is homogeneous. "
+ "Proxy authentication is not possible."))
+
+ self.checkConnected(space)
+
+ newargs = Arguments(space,
+ __args__.arguments_w,
+ __args__.keywords + ["pool"],
+ __args__.keywords_w + [space.wrap(self)])
+ return space.call_args(self.w_connectionType, newargs)
+ acquire.unwrap_spec = ['self', ObjSpace, Arguments]
+
+ def release(self, space, w_connection):
+ self._release(space, w_connection, roci.OCI_DEFAULT)
+ release.unwrap_spec = ['self', ObjSpace, W_Root]
+
+ def drop(self, space, w_connection):
+ self._release(space, w_connection, roci.OCI_SESSRLS_DROPSESS)
+ drop.unwrap_spec = ['self', ObjSpace, W_Root]
+
+ def _release(self, space, w_connection, mode):
+ from pypy.module.oracle.interp_connect import W_Connection
+ connection = space.interp_w(W_Connection, w_connection)
+
+ self.checkConnected(space)
+
+ if connection.sessionPool is not self:
+ raise OperationError(
+ get(space).w_ProgrammingError,
+ space.wrap("connection not acquired with this session pool"))
+
+ # attempt a rollback
+ status = roci.OCITransRollback(
+ connection.handle, connection.environment.errorHandle,
+ roci.OCI_DEFAULT)
+ # if dropping the connection from the pool, ignore the error
+ if mode != roci.OCI_SESSRLS_DROPSESS:
+ self.environment.checkForError(
+ status, "SessionPool_Release(): rollback")
+
+ # release the connection
+ status = roci.OCISessionRelease(
+ connection.handle, connection.environment.errorHandle,
+ None, 0, mode)
+ self.environment.checkForError(
+ status, "SessionPool_Release(): release session")
+
+ # ensure that the connection behaves as closed
+ connection.sessionPool = None
+ connection.handle = lltype.nullptr(roci.OCISvcCtx.TO)
+
+def computedProperty(oci_attr_code, oci_value_type):
+ def fget(space, self):
+ self.checkConnected(space)
+
+ valueptr = lltype.malloc(rffi.CArrayPtr(oci_value_type).TO,
+ 1, flavor='raw')
+ try:
+ status = roci.OCIAttrGet(
+ self.handle, roci.OCI_HTYPE_SPOOL,
+ rffi.cast(roci.dvoidp, valueptr),
+ lltype.nullptr(roci.Ptr(roci.ub4).TO),
+ oci_attr_code,
+ self.environment.errorHandle)
+ return space.wrap(valueptr[0])
+ finally:
+ lltype.free(valueptr, flavor='raw')
+ return GetSetProperty(fget, cls=W_SessionPool)
+
+W_SessionPool.typedef = TypeDef(
+ "SessionPool",
+ __new__ = interp2app(W_SessionPool.descr_new.im_func),
+ acquire = interp2app(W_SessionPool.acquire),
+ release = interp2app(W_SessionPool.release),
+ drop = interp2app(W_SessionPool.drop),
+
+ username = interp_attrproperty_w('w_username', W_SessionPool),
+ password = interp_attrproperty_w('w_password', W_SessionPool),
+ tnsentry = interp_attrproperty_w('w_tnsentry', W_SessionPool),
+ min = interp_attrproperty('minSessions', W_SessionPool),
+ max = interp_attrproperty('maxSessions', W_SessionPool),
+ increment = interp_attrproperty('sessionIncrement', W_SessionPool),
+ homogeneous = interp_attrproperty('homogeneous', W_SessionPool),
+ opened = computedProperty(roci.OCI_ATTR_SPOOL_OPEN_COUNT, roci.ub4),
+ busy = computedProperty(roci.OCI_ATTR_SPOOL_BUSY_COUNT, roci.ub4),
+ timeout = computedProperty(roci.OCI_ATTR_SPOOL_TIMEOUT, roci.ub4),
+ getmode = computedProperty(roci.OCI_ATTR_SPOOL_GETMODE, roci.ub1),
+ )
Modified: pypy/trunk/pypy/module/oracle/test/test_cursorvar.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/test/test_cursorvar.py (original)
+++ pypy/trunk/pypy/module/oracle/test/test_cursorvar.py Tue Mar 9 15:46:14 2010
@@ -1,45 +1,45 @@
-from pypy.module.oracle.test.test_connect import OracleTestBase
-
-class AppTestCursorVar(OracleTestBase):
-
- def test_bind_inout(self):
- cur = self.cnx.cursor()
- cursor = self.cnx.cursor()
- assert cursor.description is None
- cur.execute("""
- begin
- open :cursor for select 1 numbercol from dual;
- end;""",
- cursor=cursor)
- assert (cursor.description ==
- [('NUMBERCOL', oracle.NUMBER, 127, 2, 0, 0, 1)]
- or cursor.description ==
- [('NUMBERCOL', oracle.NUMBER, 127, 2, 0, -127, 1)])
- data = cursor.fetchall()
- assert data == [(1,)]
-
- def test_bind_frompackage(self):
- cur = self.cnx.cursor()
- # create package
- try:
- cur.execute("drop package pypy_temp_cursorpkg")
- except oracle.DatabaseError:
- pass
- cur.execute("""
- create package pypy_temp_cursorpkg as
- type refcur is ref cursor;
- procedure test_cursor(cur out refcur);
- end;""")
- cur.execute("""
- create package body pypy_temp_cursorpkg as
- procedure test_cursor(cur out refcur) is
- begin
- open cur for
- select level-1 intcol
- from dual connect by level-1<42;
- end;
- end;""")
- cursor = self.cnx.cursor()
- cur.callproc("pypy_temp_cursorpkg.test_cursor", (cursor,))
- data = cursor.fetchall()
- assert data == [(x,) for x in range(42)]
+from pypy.module.oracle.test.test_connect import OracleTestBase
+
+class AppTestCursorVar(OracleTestBase):
+
+ def test_bind_inout(self):
+ cur = self.cnx.cursor()
+ cursor = self.cnx.cursor()
+ assert cursor.description is None
+ cur.execute("""
+ begin
+ open :cursor for select 1 numbercol from dual;
+ end;""",
+ cursor=cursor)
+ assert (cursor.description ==
+ [('NUMBERCOL', oracle.NUMBER, 127, 2, 0, 0, 1)]
+ or cursor.description ==
+ [('NUMBERCOL', oracle.NUMBER, 127, 2, 0, -127, 1)])
+ data = cursor.fetchall()
+ assert data == [(1,)]
+
+ def test_bind_frompackage(self):
+ cur = self.cnx.cursor()
+ # create package
+ try:
+ cur.execute("drop package pypy_temp_cursorpkg")
+ except oracle.DatabaseError:
+ pass
+ cur.execute("""
+ create package pypy_temp_cursorpkg as
+ type refcur is ref cursor;
+ procedure test_cursor(cur out refcur);
+ end;""")
+ cur.execute("""
+ create package body pypy_temp_cursorpkg as
+ procedure test_cursor(cur out refcur) is
+ begin
+ open cur for
+ select level-1 intcol
+ from dual connect by level-1<42;
+ end;
+ end;""")
+ cursor = self.cnx.cursor()
+ cur.callproc("pypy_temp_cursorpkg.test_cursor", (cursor,))
+ data = cursor.fetchall()
+ assert data == [(x,) for x in range(42)]
Modified: pypy/trunk/pypy/module/oracle/test/test_lobvar.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/test/test_lobvar.py (original)
+++ pypy/trunk/pypy/module/oracle/test/test_lobvar.py Tue Mar 9 15:46:14 2010
@@ -1,61 +1,61 @@
-from pypy.module.oracle.test.test_connect import OracleTestBase
-
-class LobTests(object):
- @classmethod
- def setup_class(cls):
- super(LobTests, cls).setup_class()
- cls.w_lobType = cls.space.wrap(cls.lobType)
-
- def test_bind(self):
- inputType = getattr(oracle, self.lobType)
-
- cur = self.cnx.cursor()
- try:
- cur.execute("drop table pypy_temp_lobtable")
- except oracle.DatabaseError:
- pass
- cur.execute("create table pypy_temp_lobtable "
- "(lobcol %s)" % self.lobType)
-
- longString = ""
- for i in range(2):
- cur.execute("truncate table pypy_temp_lobtable")
- if i > 0:
- longString += chr(65+i) * 25000
-
- cur.setinputsizes(lob=inputType)
- cur.execute("insert into pypy_temp_lobtable values (:lob)",
- lob=longString)
- cur.execute("select lobcol from pypy_temp_lobtable")
- lob, = cur.fetchone()
- assert lob.size() == len(longString)
- assert lob.read() == longString
-
- def test_trim(self):
- inputType = getattr(oracle, self.lobType)
-
- cur = self.cnx.cursor()
- try:
- cur.execute("drop table pypy_temp_lobtable")
- except oracle.DatabaseError:
- pass
- cur.execute("create table pypy_temp_lobtable "
- "(lobcol %s)" % self.lobType)
-
- longString = "X" * 75000
- cur.setinputsizes(lob=inputType)
- cur.execute("insert into pypy_temp_lobtable values (:lob)",
- lob=longString)
- cur.execute("select lobcol from pypy_temp_lobtable")
- lob, = cur.fetchone()
- assert lob.size() == 75000
- lob.trim(25000)
- assert lob.size() == 25000
- lob.trim()
- assert lob.size() == 0
-
-class AppTestBlob(LobTests, OracleTestBase):
- lobType = "BLOB"
-
-class AppTestClob(LobTests, OracleTestBase):
- lobType = "CLOB"
+from pypy.module.oracle.test.test_connect import OracleTestBase
+
+class LobTests(object):
+ @classmethod
+ def setup_class(cls):
+ super(LobTests, cls).setup_class()
+ cls.w_lobType = cls.space.wrap(cls.lobType)
+
+ def test_bind(self):
+ inputType = getattr(oracle, self.lobType)
+
+ cur = self.cnx.cursor()
+ try:
+ cur.execute("drop table pypy_temp_lobtable")
+ except oracle.DatabaseError:
+ pass
+ cur.execute("create table pypy_temp_lobtable "
+ "(lobcol %s)" % self.lobType)
+
+ longString = ""
+ for i in range(2):
+ cur.execute("truncate table pypy_temp_lobtable")
+ if i > 0:
+ longString += chr(65+i) * 25000
+
+ cur.setinputsizes(lob=inputType)
+ cur.execute("insert into pypy_temp_lobtable values (:lob)",
+ lob=longString)
+ cur.execute("select lobcol from pypy_temp_lobtable")
+ lob, = cur.fetchone()
+ assert lob.size() == len(longString)
+ assert lob.read() == longString
+
+ def test_trim(self):
+ inputType = getattr(oracle, self.lobType)
+
+ cur = self.cnx.cursor()
+ try:
+ cur.execute("drop table pypy_temp_lobtable")
+ except oracle.DatabaseError:
+ pass
+ cur.execute("create table pypy_temp_lobtable "
+ "(lobcol %s)" % self.lobType)
+
+ longString = "X" * 75000
+ cur.setinputsizes(lob=inputType)
+ cur.execute("insert into pypy_temp_lobtable values (:lob)",
+ lob=longString)
+ cur.execute("select lobcol from pypy_temp_lobtable")
+ lob, = cur.fetchone()
+ assert lob.size() == 75000
+ lob.trim(25000)
+ assert lob.size() == 25000
+ lob.trim()
+ assert lob.size() == 0
+
+class AppTestBlob(LobTests, OracleTestBase):
+ lobType = "BLOB"
+
+class AppTestClob(LobTests, OracleTestBase):
+ lobType = "CLOB"
Modified: pypy/trunk/pypy/module/oracle/test/test_objectvar.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/test/test_objectvar.py (original)
+++ pypy/trunk/pypy/module/oracle/test/test_objectvar.py Tue Mar 9 15:46:14 2010
@@ -1,50 +1,50 @@
-from pypy.module.oracle.test.test_connect import OracleTestBase
-
-class AppTestObjectVar(OracleTestBase):
- def test_fetch_object(self):
- import datetime
- cur = self.cnx.cursor()
- try:
- cur.execute("drop table pypy_test_objtable")
- except oracle.DatabaseError:
- pass
- try:
- cur.execute("drop type pypy_test_objtype")
- except oracle.DatabaseError:
- pass
- try:
- cur.execute("drop type pypy_test_arraytype")
- except oracle.DatabaseError:
- pass
- cur.execute("""\
- create type pypy_test_objtype as object (
- numbercol number,
- stringcol varchar2(60),
- datecol date);
- """)
- cur.execute("""\
- create type pypy_test_arraytype as varray(10) of number;
- """)
- cur.execute("""\
- create table pypy_test_objtable (
- objcol pypy_test_objtype,
- arraycol pypy_test_arraytype)
- """)
- cur.execute("""\
- insert into pypy_test_objtable values (
- pypy_test_objtype(1, 'someText',
- to_date(20070306, 'YYYYMMDD')),
- pypy_test_arraytype(5, 10, null, 20))
- """)
-
- cur.execute("select objcol, arraycol from pypy_test_objtable")
- objValue, arrayValue = cur.fetchone()
- assert objValue.type.schema == self.cnx.username.upper()
- assert objValue.type.name == "PYPY_TEST_OBJTYPE"
- assert objValue.type.attributes[0].name == "NUMBERCOL"
- assert isinstance(arrayValue, list)
- assert arrayValue == [5, 10, None, 20]
- assert objValue.NUMBERCOL == 1
- assert objValue.STRINGCOL == "someText"
- assert objValue.DATECOL == datetime.datetime(2007, 03, 06)
- raises(AttributeError, getattr, objValue, 'OTHER')
+from pypy.module.oracle.test.test_connect import OracleTestBase
+
+class AppTestObjectVar(OracleTestBase):
+ def test_fetch_object(self):
+ import datetime
+ cur = self.cnx.cursor()
+ try:
+ cur.execute("drop table pypy_test_objtable")
+ except oracle.DatabaseError:
+ pass
+ try:
+ cur.execute("drop type pypy_test_objtype")
+ except oracle.DatabaseError:
+ pass
+ try:
+ cur.execute("drop type pypy_test_arraytype")
+ except oracle.DatabaseError:
+ pass
+ cur.execute("""\
+ create type pypy_test_objtype as object (
+ numbercol number,
+ stringcol varchar2(60),
+ datecol date);
+ """)
+ cur.execute("""\
+ create type pypy_test_arraytype as varray(10) of number;
+ """)
+ cur.execute("""\
+ create table pypy_test_objtable (
+ objcol pypy_test_objtype,
+ arraycol pypy_test_arraytype)
+ """)
+ cur.execute("""\
+ insert into pypy_test_objtable values (
+ pypy_test_objtype(1, 'someText',
+ to_date(20070306, 'YYYYMMDD')),
+ pypy_test_arraytype(5, 10, null, 20))
+ """)
+
+ cur.execute("select objcol, arraycol from pypy_test_objtable")
+ objValue, arrayValue = cur.fetchone()
+ assert objValue.type.schema == self.cnx.username.upper()
+ assert objValue.type.name == "PYPY_TEST_OBJTYPE"
+ assert objValue.type.attributes[0].name == "NUMBERCOL"
+ assert isinstance(arrayValue, list)
+ assert arrayValue == [5, 10, None, 20]
+ assert objValue.NUMBERCOL == 1
+ assert objValue.STRINGCOL == "someText"
+ assert objValue.DATECOL == datetime.datetime(2007, 03, 06)
+ raises(AttributeError, getattr, objValue, 'OTHER')
More information about the Pypy-commit
mailing list