RecordSet (ADO) with stored procs
Syver Enstad
syver-en+usenet at online.no
Thu Oct 31 11:23:52 EST 2002
Robert_NJ_Sykes at msn.co.uk (Rob Sykes) writes:
> Paul Moore <gustav at morpheus.demon.co.uk> wrote in
> news:k7k1jc4x.fsf at morpheus.demon.co.uk:
>
> > Syver Enstad <syver-en+usenet at online.no> writes:
> >
> >> By the way:
> >> I have a pretty complete pure python implementation of the
> >> v.2.0 db-api that is based on ADO. If you are interested, I
> >> can fish it out from somewhere. It has a pretty complete unit
> >> testing suite, so that you can see how it is supposed to be
> >> used.
> >
> > I'm interested, even if the OP isn't!
> >
> > Paul.
> >
>
> You can add me in as well !
>
> --
> Rob Sykes
>
> Born again Bairn | 'The despair I can live with.
> rob at john-richard dot co dot uk | It's the hope I can't stand'
> (header email for spambots) | (Anon.)
-------------- next part --------------
# AdoDb.py
import pythoncom
import winerror
from utilities import Indexed
# force ADO 2.5 type library import
from win32com.client import gencache
gencache.EnsureModule('{00000205-0000-0010-8000-00AA006D2EA4}', 0, 2, 5)
import win32com.client
from win32com.client import constants
import time
import exceptions
# for binding and contructors
from types import *
from mx import DateTime # for binding and contructors
DateTimeType = type(DateTime.DateTime(0))
apilevel = "2.0"
paramstyle = 'qmark'
threadsafety = 2 # I guess the pythoncom implementation is threadsafe so.
# Constructors
Date = DateTime.Date
Time = DateTime.Time
Timestamp = DateTime.Timestamp
DateFromTicks = DateTime.DateFromTicks
TimeFromTicks = DateTime.TimeFromTicks
TimestampFromTicks = DateTime.TimestampFromTicks
Binary = buffer
class Warning(exceptions.StandardError):
pass
class Error(exceptions.StandardError):
pass
class InterfaceError(Error):
pass
class DatabaseError(Error):
pass
class DataError(DatabaseError):
pass
class OperationalError(DatabaseError):
pass
class IntegrityError(DatabaseError):
pass
class InternalError(DatabaseError):
pass
class ProgrammingError(DatabaseError):
pass
class NotSupportedError(DatabaseError):
pass
class DBAPITypeObject:
def __init__(self,*values):
self.values = values
def __cmp__(self,other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
STRING = DBAPITypeObject(constants.adChar,
constants.adLongVarChar,
constants.adBSTR,
constants.adGUID,
constants.adError,
constants.adVarChar,
constants.adLongVarWChar,
constants.adWChar
)
BINARY = DBAPITypeObject(constants.adBinary,
constants.adVarBinary,
constants.adLongVarBinary,
)
NUMBER = DBAPITypeObject(constants.adDecimal,
constants.adDouble,
constants.adInteger,
constants.adBoolean,
constants.adCurrency,
constants.adNumeric,
constants.adSingle,
constants.adSmallInt,
constants.adTinyInt,
constants.adUnsignedBigInt,
constants.adUnsignedInt,
constants.adUnsignedSmallInt,
constants.adUnsignedTinyInt,
)
DATETIME = DBAPITypeObject(constants.adDate,
constants.adDBDate,
constants.adDBTime,
constants.adDBTimeStamp,
constants.adFileTime,
)
ROWID = DBAPITypeObject(constants.adChapter, # not so sure about this one..
)
class _DateFieldDecorator(object):
def __init__(self, anAdoField):
self._adoField = anAdoField
def get_Value(self):
value = self._adoField.Value
if value:
return DateTime.DateTimeFromCOMDate(value)
else:
# we encountered a NULL value, which is returned as
# None in Python
return value
Value = property(get_Value, None, None, 'returns a DateTime object instead of a COM date')
def connect(connString, disableAutoTransaction=1):
return Connection(connString, disableAutoTransaction)
class Connection(object):
""" ADO Connection, defaults to using transactions """
def __init__(self, connString, disableAutoTransaction):
self._adoConnection = win32com.client.Dispatch('ADODB.Connection')
self.open(connString)
if disableAutoTransaction:
self._adoConnection.BeginTrans()
def open(self, connString):
try:
self._adoConnection.Open(connString)
except pythoncom.error, (hr, msg, excepInfo, arg):
if hr == winerror.DISP_E_EXCEPTION: # execInfo is not None
assert excepInfo
(wCode, errorSource, errorDescription, helpFile,
helpContext, sCode) = excepInfo
raise OperationalError(errorSource, errorDescription, sCode)
else:
raise DatabaseError(hr, msg, arg)
def close(self):
self._adoConnection.RollbackTrans()
self._adoConnection.Close()
def commit(self):
self._adoConnection.CommitTrans()
self._adoConnection.BeginTrans()
def rollback(self):
self._adoConnection.RollbackTrans()
self._adoConnection.BeginTrans()
def cursor(self):
return Cursor(self)
class _Command(object):
""" module internal class
"""
def __init__(self, anAdoConnection, anOperationAsString, commandTypeEnum):
self._adoCmd = win32com.client.Dispatch('ADODB.Command')
self._adoCmd.ActiveConnection = anAdoConnection
self._adoCmd.CommandText = anOperationAsString
self._adoCmd.CommandType = commandTypeEnum
self._areParametersRefreshed = False
self._parameters = []
## def compile(self):
## self._adoCmd.Prepared = True
def setParameters(self, parameters):
self._parameters = parameters
if not self._areParametersRefreshed:
# expensive operation needed for typesafety
# should maybe be possible to override by setInputsizes
self._adoCmd.Parameters.Refresh()
self._areParametersRefreshed = True
if len(self._adoCmd.Parameters) != len(self._parameters):
errMessage = 'Wrong number of parameters, '
'required: %i received: %i' % (len(self._adoCmd.Parameters),
len(self._parameters))
raise ProgrammingError, errMessage
for adoParam, pythonParam in zip(self._adoCmd.Parameters,
self._parameters):
if adoParam.Type == DATETIME:
adoParam.Value = pythonParam.COMDate()
else:
# there maybe elif's needed to correctly
# massage additinal types here,
# string, unicode, buffer seems to work fine
adoParam.Value = pythonParam
def _updateParameters(self):
""" Update self._parameters with output values if present
"""
for adoParam, index in Indexed(self._adoCmd.Parameters):
if adoParam.Direction != constants.adParamInput:
self._parameters[index] = adoParam.Value
def execute(self):
try:
result = self._adoCmd.Execute()
self._updateParameters()
return result
except pythoncom.error, (hr, msg, excepInfo, arg):
if hr == winerror.DISP_E_EXCEPTION: # execInfo is not None
assert excepInfo
(wCode, errorSource, errorDescription, helpFile, helpContext,
sCode) = excepInfo
raise ProgrammingError(errorSource, errorDescription, sCode)
else:
raise DatabaseError(hr, msg, arg)
class Cursor(object):
def __init__(self, connection):
self.connection = connection
self._rs = win32com.client.Dispatch('ADODB.Recordset')
self._fields = []
def executemany(self, operation, seq_parameters):
# should prepare statement here to optimize for many executions
for each in seq_parameters:
self.execute(operation, each)
def callproc(self, procname, parameters=[]):
""" identical to execute, """
return self.execute(procname, parameters)
def _cleanFieldCache(self):
self._fields = []
def execute(self, operation, parameters=[],
commandType=constants.adCmdText):
command = _Command(self.connection._adoConnection, operation,
commandType)
command.setParameters(parameters)
#command.compile()
self._rs, result = command.execute()
self._cleanFieldCache()
def fields(self):
if not self._fields:
for each in self._rs.Fields:
if each.Type == DATETIME:
each = _DateFieldDecorator(each)
self._fields.append(each)
return self._fields
def fetchone(self):
if not self._isOpen():
raise InterfaceError, 'No resultset present'
if self._rs.EOF:
return None
ret = []
for field in self.fields():
try:
ret.append(field.Value)
except pythoncom.error, err:
raise InternalError, err
self._rs.MoveNext() # advance the recordset
return tuple(ret)
def fetchmany(self, count=None):
if not count:
count = self.arraysize
resList = []
for count in xrange(count):
row = self.fetchone()
if row:
resList.append(row)
else:
break
return resList
def fetchall(self):
return self.fetchmany(0xffFFFF)
def setinputsizes(self):
raise NotImplementedError
def setoutputsizes(self):
raise NotImplementedError
def close(self):
if self._isOpen():
self._rs.Close()
self._rs = None
self.connection = None
def nextset(self):
self._rs, success = self._rs.NextRecordset()
return success
def _isOpen(self):
return self._rs.State
def get_rowcount(self):
return self._rs.RecordCount
rowcount = property(get_rowcount, None, None,
"rowcount in recordset")
def get_description(self):
""" helper for the description property
"""
if len(self._rs.Fields) == 0:
return None
ret = []
for field in self._rs.Fields:
tup = (field.Name,
field.Type, # to compare with DBApiTypeObject
field.DefinedSize, # DisplaySizes
field.ActualSize, # internalSize
field.Precision,
field.NumericScale,
(field.Attributes & constants.adFldIsNullable) != 0)
ret.append(tup)
return ret
description = property(get_description, None, None,
"Describes the types of the record set")
def get_arraysize(self):
return self._rs.CacheSize
def set_arraysize(self, value):
self._rs.CacheSize = value
arraysize = property(get_arraysize, set_arraysize,
None, "arraysize property")
import unittest
import pdb
_ConnectionTemplate = """Provider=sqloledb;
Data Source=%s;Initial Catalog=syvertest;
Integrated Security=SSPI;
"""
class TestBlob(unittest.TestCase):
def setUp(self):
self.conn = connect(_ConnectionTemplate % ('localhost'))
cursor = self.conn.cursor()
cursor.execute("""
create table blobTest (
col1 IMAGE,
col2 varbinary (255),
col3 varchar(255)
);
""")
self.conn.commit()
def tearDown(self):
cursor = self.conn.cursor()
try:
cursor.execute('drop table blobtest')
cursor.close()
self.conn.commit()
except DatabaseError, err:
print err
self.conn.close()
def testWrongNumberOfParameters(self):
cursor = self.conn.cursor()
self.assertRaises(ProgrammingError,
cursor.execute,
'select col1 from blobtest where col2 = ?',
['1', '2'])
self.conn.rollback()
cursor.close()
def testInsertion(self):
import os
import codecs
data1 = codecs.open(
os.path.expandvars('$USERPROFILE/desktop/datamover.reg'),
'r', 'utf-16').read() # read a unicode string from disk
data2 = '\0\1\3'
buf1 = Binary(data1)
buf2 = Binary(data2)
self.data1size = len(buf1)
cursor = self.conn.cursor()
cursor.execute(
'insert into blobtest (col1, col2, col3) values (?, ?, ?)',
[buf1, buf2, u'n?kkel'])
cursor.close()
self.conn.commit()
def testBlobReturn(self):
self.testInsertion()
cursor = self.conn.cursor()
cursor.execute('select col3, col2, col1 from blobtest where col3 = ?', [u'n?kkel'])
resultSet = cursor.fetchone()
assert resultSet[0] == u'n?kkel'
self.assertEquals(str(resultSet[1]), '\0\1\3')
latin1data = str(resultSet[2]).decode('utf-16').encode('latin-1')
import codecs
from cStringIO import StringIO
ReaderClass = codecs.getreader('utf-16')
stringStream = StringIO(resultSet[2])
reader = ReaderClass(stringStream)
self.assertEquals('Windows Registry Editor Version 5.00\r\n',
reader.readlines()[0]) # cannot use readline
# as this messes up unicode translation
self.assertEquals(len(resultSet[2]), self.data1size)
cursor.execute("select col1 from blobtest where col3 = 'n?kkel'")
resultSet = cursor.fetchone()
self.conn.rollback()
cursor.execute('select col3 from blobtest where col3 = ?',
[u'n?kkel'])
resultSet = cursor.fetchall()
self.assertEquals(resultSet[0][0], u'n?kkel')
def testError(self):
cursor = self.conn.cursor()
try:
cursor.execute("""
create table blobtest (
col1 BINARY,
col2 BINARY
);
""")
self.fail()
except ProgrammingError, err:
pass
class OtherBlobTest(unittest.TestCase):
def setUp(self):
self.conn = connect(_ConnectionTemplate % ('localhost'))
cursor = self.conn.cursor()
cursor.execute("""
create table blobTest (
string1 varchar(255),
string2 varchar(255),
string3 varchar(255)
);
""")
self.conn.commit()
def tearDown(self):
cursor = self.conn.cursor()
try:
cursor.execute('drop table blobtest')
cursor.close()
self.conn.commit()
except DatabaseError, err:
print err
self.conn.close()
def testInsertion(self):
import os
cursor = self.conn.cursor()
cursor.execute(
'insert into blobtest (string1, string2, string3) values (?, ?, ?)',
['hei', 'hopp', u'n?kkel'])
self.conn.commit()
cursor.close()
def testSelectAll(self):
self.testInsertion()
cursor = self.conn.cursor()
cursor.execute("""
select string3, string2, string3 from blobtest where string3 = ?""",
[u'n?kkel'])
resultSet = cursor.fetchone()
assert resultSet[0] == u'n?kkel'
self.conn.rollback()
cursor.execute("""select string1, string2, string3 from blobtest
where string3 = 'n?kkel'""")
self.conn.rollback()
cursor.execute('select string3 from blobtest where string3 = ?',
[u'n?kkel'])
self.conn.commit()
class _TestCustomer(unittest.TestCase):
def setUp(self):
self.conn = connect(_ConnectionTemplate % ('localhost'))
cursor = self.conn.cursor()
cursor.execute("""
create table customer (
customerId integer,
name varchar(255),
nick varchar(255),
date datetime
);""")
self.conn.commit()
def tearDown(self):
cursor = self.conn.cursor()
cursor.execute("drop table customer")
self.conn.commit()
self.conn.close()
def testFieldCache(self):
cursor = self.conn.cursor()
assert cursor._fields == []
cursor.executemany('insert into customer values (?, ?, ?, ?)',
[(1, 'syver', 'borger', DateFromTicks(time.time())),
(1, 'syver', 'borger', DateFromTicks(time.time()))])
assert cursor._fields == []
cursor.execute('select * from customer')
assert cursor._fields == []
cursor.fetchone()
assert cursor._fields != []
cursor.fetchone()
assert cursor._fields != []
cursor.execute('select * from customer')
assert cursor._fields == []
def testSelect(self):
cursor = self.conn.cursor()
cursor.execute('insert into customer values (1, ?, ?, ?)',
['Syver Enstad', 'stalin', DateFromTicks(time.time())])
cursor.execute('select name from customer where customerID = ?',
[1])
resultSet = cursor.fetchone()
assert resultSet[0] == 'Syver Enstad', resultSet[0]
def testTwoStatements(self):
cursor = self.conn.cursor()
cursor.execute("""
insert into customer (customerId, name, nick, date) values
(?, ?, ?, ?);
select * from customer;""",
(1, 'syver', 'stalin', (DateFromTicks(time.time()))))
self.assertRaises(InterfaceError, cursor.fetchone) # no record set is reurned
cursor.close()
def test_date(self):
cursor = self.conn.cursor()
now = DateFromTicks(time.time())
cursor.execute("""
insert into customer (customerId, name, nick, date) values
(?, ?, ?, ?)""",
(1, 'syver', 'stalin', (now)))
cursor.close()
self.conn.commit()
cursor = self.conn.cursor()
cursor.execute('select date from customer where nick=?', ['stalin'])
resultSet = cursor.fetchone()
result = resultSet[0]
assert result == now, result
def test_datetime(self):
cursor = self.conn.cursor()
now = TimestampFromTicks(time.time())
cursor.execute("""
insert into customer (customerId, name, nick, date) values
(?, ?, ?, ?)""",
(1, 'syver', 'stalin',now))
cursor.execute('select date from customer')
resultSet = cursor.fetchone()
result = resultSet[0]
assert DateTime.cmp(result, now, 0.000001) == 0 # a roundoff error in
# the roundtrip
self.assertEquals(result.COMDate(), now.COMDate())
cursor.close()
def testDateRoundtrip(self):
now = TimestampFromTicks(time.time())
comDate = now.COMDate()
newNow = DateTime.DateTimeFromCOMDate(comDate)
# roundoff error when converting to COM date
assert newNow != now, 'newNow: %s now: %s' % (newNow.absvalues(),
now.absValues())
def test_description(self):
cursor = self.conn.cursor()
cursor.close()
self.test_executemany()
cursor = self.conn.cursor()
cursor.execute('select * from customer')
desc = cursor.description
print desc
assert NUMBER == desc[0][1]
assert desc[0][1] == NUMBER
cursor.close()
def test_emptyRowset(self):
cursor = self.conn.cursor()
cursor.execute('select * from customer')
assert cursor.rowcount == -1
resultSet = cursor.fetchall()
cursor.close()
def test_rowcount(self):
cursor = self.conn.cursor()
cursor.execute('select * from customer')
assert cursor.rowcount == -1
names = [('Per B?rge',), ('Geir',), ('Ove',), ('Bjarte',), ('Jens',), ('Dag',)]
cursor.executemany("insert into customer (name) values (?)", names)
cursor.execute("select * from customer")
#assert cursor.rowcount == len(names), cursor.rowcount
cursor.close()
def test_executemany(self):
cursor = self.conn.cursor()
names = [(u'Per B?rge', u'p?rge'), (u'Geir',u'gigga'), (u'Ove', u'b?nna'), (u'Bjarte', u'tassen'), (u'Jens', u'j?lle'), (u'Dag', u'laffen')]
cursor.executemany("insert into customer (name, nick) values (?, ?)",
names)
self.conn.commit()
cursor.execute("select name, nick from customer")
result = cursor.fetchall()
for each in result:
assert each in names
cursor.close()
def test_close(self):
self.test_executemany()
cursor = self.conn.cursor()
cursor.execute('select * from Customer')
cursor.fetchall()
cursor.close()
self.conn.commit()
cursor = self.conn.cursor()
cursor.execute('delete from Customer')
cursor.execute('select * from customer')
assert cursor.rowcount == -1
cursor.close()
def testQmarkOperation(self):
cursor = self.conn.cursor()
cursor.execute(
u"insert into customer (name, nick) values ('Ove', 'B?nna')")
cursor.execute('select * from Customer where name=?', ['Ove'])
resultSet = cursor.fetchall()
self.assertEquals(resultSet[0][2], u'B?nna')
def test_Transactions(self):
cursor = self.conn.cursor()
cursor.execute('select * from Customer')
resultSet = cursor.fetchall()
assert len(resultSet) == 0
cursor.execute("insert into customer (name) values ('fredrik')")
cursor.execute('select * from Customer')
resultSet = cursor.fetchall()
cursor.close()
self.conn.commit()
cursor = self.conn.cursor()
cursor.execute('delete from Customer')
cursor.execute('select * from customer')
assert cursor.rowcount == -1
self.conn.rollback()
assert cursor.execute('select * from customer') != 0
resultSet = cursor.fetchall()
assert len(resultSet) == 1
class TestStoredProc(unittest.TestCase):
def setUp(self):
self.conn = connect(_ConnectionTemplate % ('localhost'))
cursor = self.conn.cursor()
cursor.execute("""
create proc min_proc (@param1 int, @param2 varchar(255),
@out_param varchar(255) OUTPUT)
as
set @out_param = 'out value'
select * from TestTable
""")
self.conn.commit()
cursor.close()
def tearDown(self):
cursor = self.conn.cursor()
cursor.execute('drop proc min_proc')
self.conn.commit()
cursor.close()
def testRecordsetReturn(self):
""" Jeg f?r ikke outparametere ut av en proc
som jeg bruker recordset return i
"""
cursor = self.conn.cursor()
#cursor.execute("{? = call min_proc(7, 'syver')}")
cmd = _Command(self.conn._adoConnection,
"{call min_proc(?, ?, ?)}", constants.adCmdText)
input = [7, 'syver', None]
cmd.setParameters(input)
rs, code = cmd.execute()
self.assertEquals(input, [7, 'syver', None]) # ingen outvalue
cursor._rs = rs
self.assertEquals(cursor.fetchall(),
[(u'syver', u'enstad ', u'1'),
(u'hei', u'p\xe5 ', u'2')])
class TestStoredOutput(unittest.TestCase):
def setUp(self):
self.conn = connect(_ConnectionTemplate % ('localhost'))
cursor = self.conn.cursor()
cursor.execute("""
create procedure min_proc(@param1 int, @param2 varchar(255),
@out_param varchar(255) OUTPUT)
as
set @out_param = @param2
return @param1
""")
self.conn.commit()
cursor.close()
def tearDown(self):
cursor = self.conn.cursor()
cursor.execute('drop proc min_proc')
self.conn.commit()
cursor.close()
def testParameterReturn(self):
""" Her bruker vi ikke noe recordset return, s? her kommer
utparameterene ut
"""
cursor = self.conn.cursor()
cmd = _Command(self.conn._adoConnection,
"{? = call min_proc(?, ?, ?)}", constants.adCmdText)
input = [None, 7, 'syver', None]
cmd.setParameters(input)
rs, code = cmd.execute()
self.assertEquals(input, [7, 7, 'syver', 'syver'])
assert rs.State == 0 # means no results present
class TestCallProc(unittest.TestCase):
def setUp(self):
self.conn = connect(_ConnectionTemplate % ('localhost'))
cursor = self.conn.cursor()
try:
cursor.execute("""
create procedure min_proc (@outparam int OUTPUT)
as
set @outparam = 10
""")
except ProgrammingError, (source, text, hresult):
# something went wrong in the last, that's okay
if (source == "There is already an object named 'min_proc' "
"in the database."):
print 'Caught exception, continuing', (source, text, hresult)
self.conn.commit()
cursor.close()
def tearDown(self):
cursor = self.conn.cursor()
cursor.execute('drop proc min_proc')
self.conn.commit()
cursor.close()
def testParameterReturn(self):
""" Her bruker vi ikke noe recordset return, s? her kommer
utparameterene ut
"""
cursor = self.conn.cursor()
cmd = _Command(self.conn._adoConnection,
"min_proc(0)", constants.adCmdStoredProc)
#input = [None]
#cmd.setParameters(input)
rs, code = cmd.execute()
#self.assertEquals(input, [10])
assert rs.State == 0 # means no results present
if __name__ == '__main__':
unittest.main()
-------------- next part --------------
--
Vennlig hilsen
Syver Enstad
More information about the Python-list
mailing list