RecordSet (ADO) with stored procs

Syver Enstad syver-en+usenet at online.no
Thu Oct 31 11:23:52 EST 2002


Robert_NJ_Sykes at msn.co.uk (Rob Sykes) writes:

> Paul Moore <gustav at morpheus.demon.co.uk> wrote in
> news:k7k1jc4x.fsf at morpheus.demon.co.uk: 
> 
> > Syver Enstad <syver-en+usenet at online.no> writes:
> > 
> >> By the way:
> >> I have a pretty complete pure python implementation of the
> >> v.2.0 db-api that is based on ADO. If you are interested, I
> >> can fish it out from somewhere. It has a pretty complete unit
> >> testing suite, so that you can see how it is supposed to be
> >> used. 
> > 
> > I'm interested, even if the OP isn't!
> > 
> > Paul.
> > 
> 
> You can add me in as well !
> 
> -- 
> Rob Sykes 
> 
> Born again Bairn                  | 'The despair I can live with.
> rob at john-richard dot co dot uk |  It's the hope I can't stand'
> (header email for spambots)       |  (Anon.)


-------------- next part --------------
# AdoDb.py

import pythoncom
import winerror
from utilities import Indexed

# force ADO 2.5 type library import
from win32com.client import gencache
gencache.EnsureModule('{00000205-0000-0010-8000-00AA006D2EA4}', 0, 2, 5)

import win32com.client
from win32com.client import constants
import time
import exceptions


# for binding and contructors
from types import *
from mx import DateTime # for binding and contructors
DateTimeType = type(DateTime.DateTime(0))

apilevel = "2.0"
paramstyle = 'qmark'
threadsafety = 2 # I guess the pythoncom implementation is threadsafe so.



# Constructors
Date = DateTime.Date
Time = DateTime.Time
Timestamp = DateTime.Timestamp
DateFromTicks = DateTime.DateFromTicks
TimeFromTicks = DateTime.TimeFromTicks
TimestampFromTicks = DateTime.TimestampFromTicks
Binary = buffer

class Warning(exceptions.StandardError):
    pass
class Error(exceptions.StandardError):
    pass
class InterfaceError(Error):
    pass
class DatabaseError(Error):
    pass
class DataError(DatabaseError):
    pass
class OperationalError(DatabaseError):
    pass
class IntegrityError(DatabaseError):
    pass
class InternalError(DatabaseError):
    pass
class ProgrammingError(DatabaseError):
    pass
class NotSupportedError(DatabaseError):
    pass


class DBAPITypeObject:
    def __init__(self,*values):
        self.values = values
    def __cmp__(self,other):
        if other in self.values:
            return 0
        if other < self.values:
            return 1
        else:
            return -1
        

STRING =   DBAPITypeObject(constants.adChar,                             
                           constants.adLongVarChar,
                           constants.adBSTR,                             
                           constants.adGUID,                             
                           constants.adError,
                           constants.adVarChar,
                           constants.adLongVarWChar,
                           constants.adWChar
                           )
BINARY =   DBAPITypeObject(constants.adBinary,
                           constants.adVarBinary,                        
                           constants.adLongVarBinary,
                           )
NUMBER =   DBAPITypeObject(constants.adDecimal,         
                           constants.adDouble,                           
                           constants.adInteger,                          
                           constants.adBoolean,                          
                           constants.adCurrency,                         
                           constants.adNumeric,                          
                           constants.adSingle,                           
                           constants.adSmallInt,                         
                           constants.adTinyInt,                          
                           constants.adUnsignedBigInt,                   
                           constants.adUnsignedInt,                      
                           constants.adUnsignedSmallInt,                 
                           constants.adUnsignedTinyInt,
                           )                        
DATETIME = DBAPITypeObject(constants.adDate,       
                           constants.adDBDate,                           
                           constants.adDBTime,                           
                           constants.adDBTimeStamp,                      
                           constants.adFileTime,                         
                           )
                                                 
ROWID =    DBAPITypeObject(constants.adChapter, # not so sure about this one..
                           )



class _DateFieldDecorator(object):
    def __init__(self, anAdoField):
        self._adoField = anAdoField

    def get_Value(self):
        value = self._adoField.Value
        if value:
            return DateTime.DateTimeFromCOMDate(value)
        else:
            # we encountered a NULL value, which is returned as
            # None in Python
            return value
    Value = property(get_Value, None, None, 'returns a DateTime object instead of a COM date')


def connect(connString, disableAutoTransaction=1):
    return Connection(connString, disableAutoTransaction)

class Connection(object):
    """ ADO Connection, defaults to using transactions """
    def __init__(self, connString, disableAutoTransaction):
        self._adoConnection = win32com.client.Dispatch('ADODB.Connection')
        self.open(connString)
        if disableAutoTransaction:
            self._adoConnection.BeginTrans()

    def open(self, connString):
        try:
            self._adoConnection.Open(connString)
        except pythoncom.error, (hr, msg, excepInfo, arg):
	    if hr == winerror.DISP_E_EXCEPTION: # execInfo is not None
		assert excepInfo
		(wCode, errorSource, errorDescription, helpFile,
                 helpContext, sCode) = excepInfo
		raise OperationalError(errorSource, errorDescription, sCode)
	    else:
		raise DatabaseError(hr, msg, arg)                     

    def close(self):
        self._adoConnection.RollbackTrans()
        self._adoConnection.Close()

    def commit(self):
        self._adoConnection.CommitTrans()
        self._adoConnection.BeginTrans()

    def rollback(self):
        self._adoConnection.RollbackTrans()
        self._adoConnection.BeginTrans()

    def cursor(self):
        return Cursor(self)


class _Command(object):
    """ module internal class
    """
    def __init__(self, anAdoConnection, anOperationAsString, commandTypeEnum):
        self._adoCmd = win32com.client.Dispatch('ADODB.Command')
        self._adoCmd.ActiveConnection = anAdoConnection
        self._adoCmd.CommandText = anOperationAsString
        self._adoCmd.CommandType = commandTypeEnum
        self._areParametersRefreshed = False
        self._parameters = []

##     def compile(self):
##         self._adoCmd.Prepared = True

    def setParameters(self, parameters):
        self._parameters = parameters
        if not self._areParametersRefreshed:
            # expensive operation needed for typesafety
            # should maybe be possible to override by setInputsizes
            self._adoCmd.Parameters.Refresh()
            self._areParametersRefreshed = True
        if len(self._adoCmd.Parameters) != len(self._parameters):
            errMessage = 'Wrong number of parameters, '
            'required: %i received: %i' % (len(self._adoCmd.Parameters),
                                           len(self._parameters))
            raise ProgrammingError, errMessage
        for adoParam, pythonParam in zip(self._adoCmd.Parameters,
                                         self._parameters):
            if adoParam.Type == DATETIME:
                adoParam.Value = pythonParam.COMDate()
            else:
                # there maybe elif's needed to correctly
                # massage additinal types here,
                # string, unicode, buffer seems to work fine
                adoParam.Value = pythonParam

    def _updateParameters(self):
        """ Update self._parameters with output values if present
        """
        for adoParam, index in Indexed(self._adoCmd.Parameters):
            if adoParam.Direction != constants.adParamInput:
                self._parameters[index] = adoParam.Value
        
    def execute(self):
        try:
            result = self._adoCmd.Execute()
            self._updateParameters()
            return result
        except pythoncom.error, (hr, msg, excepInfo, arg):
	    if hr == winerror.DISP_E_EXCEPTION: # execInfo is not None
		assert excepInfo
		(wCode, errorSource, errorDescription, helpFile, helpContext,
		 sCode) = excepInfo
		raise ProgrammingError(errorSource, errorDescription, sCode)
	    else:
		raise DatabaseError(hr, msg, arg)                     
        

class Cursor(object):
    def __init__(self, connection):
        self.connection = connection
        self._rs = win32com.client.Dispatch('ADODB.Recordset')
        self._fields = []
        
    def executemany(self, operation, seq_parameters):
        # should prepare statement here to optimize for many executions
        for each in seq_parameters:
            self.execute(operation, each)
        
    def callproc(self, procname, parameters=[]):
        """ identical to execute, """
        return self.execute(procname, parameters)

    def _cleanFieldCache(self):
        self._fields = [] 

    def execute(self, operation, parameters=[],
                commandType=constants.adCmdText):
        command = _Command(self.connection._adoConnection, operation,
                           commandType)
        command.setParameters(parameters)
        #command.compile()
        self._rs, result = command.execute()
        self._cleanFieldCache()
                
    def fields(self):
        if not self._fields:
            for each in self._rs.Fields:
                if each.Type == DATETIME:
                    each = _DateFieldDecorator(each)
                self._fields.append(each)
        return self._fields

    def fetchone(self):
        if not self._isOpen():
            raise InterfaceError, 'No resultset present'
        if self._rs.EOF:
            return None
        ret = []
        for field in self.fields():
            try:
                ret.append(field.Value)
            except pythoncom.error, err:
                raise InternalError, err
        self._rs.MoveNext() # advance the recordset
        return tuple(ret)

    def fetchmany(self, count=None):
        if not count:
            count = self.arraysize
        resList = []
        for count in xrange(count):
            row = self.fetchone()
            if row:
                resList.append(row)
            else:
                break
	return resList
        
    def fetchall(self):
        return self.fetchmany(0xffFFFF)

    def setinputsizes(self):
        raise NotImplementedError

    def setoutputsizes(self):
        raise NotImplementedError

    def close(self):
        if self._isOpen():
            self._rs.Close()
        self._rs = None
        self.connection = None

    def nextset(self):
        self._rs, success = self._rs.NextRecordset()
        return success

    def _isOpen(self):
        return self._rs.State

    def get_rowcount(self):
        return self._rs.RecordCount
    rowcount = property(get_rowcount, None, None,
                        "rowcount in recordset")

    def get_description(self):
        """ helper for the description property
        """
        if len(self._rs.Fields) == 0:
            return None
        ret = []
        for field in self._rs.Fields:
            tup = (field.Name,
                   field.Type,       # to compare with DBApiTypeObject
                   field.DefinedSize, # DisplaySizes
                   field.ActualSize, # internalSize
                   field.Precision, 
                   field.NumericScale,
                   (field.Attributes & constants.adFldIsNullable) != 0)
            ret.append(tup)
        return ret
    description = property(get_description, None, None,
                           "Describes the types of the record set")

    def get_arraysize(self):
        return self._rs.CacheSize

    def set_arraysize(self, value):
        self._rs.CacheSize = value
    arraysize = property(get_arraysize, set_arraysize,
                         None, "arraysize property")
        


import unittest
import pdb


_ConnectionTemplate = """Provider=sqloledb;
Data Source=%s;Initial Catalog=syvertest;
Integrated Security=SSPI;
"""

class TestBlob(unittest.TestCase):
    def setUp(self):
        self.conn = connect(_ConnectionTemplate % ('localhost'))
        cursor = self.conn.cursor()
        cursor.execute("""
create table blobTest (
  col1 IMAGE,
  col2 varbinary (255),
  col3 varchar(255)
);
""")
        self.conn.commit()

    def tearDown(self):
        cursor = self.conn.cursor()
        try:
            cursor.execute('drop table blobtest')
            cursor.close()
            self.conn.commit()
        except DatabaseError, err:
            print err
        self.conn.close()

    def testWrongNumberOfParameters(self):
        cursor = self.conn.cursor()
        self.assertRaises(ProgrammingError,
                          cursor.execute,
                          'select col1 from blobtest where col2 = ?',
                          ['1', '2'])
        self.conn.rollback()
        cursor.close()

    def testInsertion(self):
        import os
        import codecs
        
        data1 = codecs.open(
            os.path.expandvars('$USERPROFILE/desktop/datamover.reg'),
            'r', 'utf-16').read() # read a unicode string from disk
        data2 = '\0\1\3'
        buf1 = Binary(data1)
        buf2 = Binary(data2)
        self.data1size = len(buf1)
        cursor = self.conn.cursor()
        cursor.execute(
            'insert into blobtest (col1, col2, col3) values (?, ?, ?)',
            [buf1, buf2, u'n?kkel'])
        cursor.close()
        self.conn.commit()

    def testBlobReturn(self):
        self.testInsertion()
        cursor = self.conn.cursor()
        cursor.execute('select col3, col2, col1 from blobtest where col3 = ?', [u'n?kkel'])
        resultSet = cursor.fetchone()
        assert resultSet[0] == u'n?kkel'
        self.assertEquals(str(resultSet[1]), '\0\1\3')
        latin1data = str(resultSet[2]).decode('utf-16').encode('latin-1')
        import codecs
        from cStringIO import StringIO
        ReaderClass = codecs.getreader('utf-16')
        stringStream = StringIO(resultSet[2])
        reader = ReaderClass(stringStream)
        self.assertEquals('Windows Registry Editor Version 5.00\r\n',
                          reader.readlines()[0]) # cannot use readline
                                        # as this messes up unicode translation
        self.assertEquals(len(resultSet[2]), self.data1size)
        cursor.execute("select col1 from blobtest where col3 = 'n?kkel'")
        resultSet = cursor.fetchone()
        self.conn.rollback()
        cursor.execute('select col3 from blobtest where col3 = ?',
                       [u'n?kkel'])
        resultSet = cursor.fetchall()
        self.assertEquals(resultSet[0][0], u'n?kkel')

        

    def testError(self):
        cursor = self.conn.cursor()
        try:
            cursor.execute("""
create table blobtest (
     col1 BINARY,
     col2 BINARY
);
""")
	    self.fail()
        except ProgrammingError, err:
            pass



class OtherBlobTest(unittest.TestCase):
    def setUp(self):
        self.conn = connect(_ConnectionTemplate % ('localhost'))
        cursor = self.conn.cursor()
        cursor.execute("""
create table blobTest (
  string1 varchar(255),
  string2 varchar(255),
  string3 varchar(255)
);
""")
        self.conn.commit()

    def tearDown(self):
        cursor = self.conn.cursor()
        try:
            cursor.execute('drop table blobtest')
            cursor.close()
            self.conn.commit()
        except DatabaseError, err:
            print err
        self.conn.close()

    def testInsertion(self):
        import os
        cursor = self.conn.cursor()
        cursor.execute(
            'insert into blobtest (string1, string2, string3) values (?, ?, ?)',
            ['hei', 'hopp', u'n?kkel'])
        self.conn.commit()
        cursor.close()

    def testSelectAll(self):
        self.testInsertion()
        cursor = self.conn.cursor()
        cursor.execute("""
        select string3, string2, string3 from blobtest where string3 = ?""",
        [u'n?kkel'])
        resultSet = cursor.fetchone()
        assert resultSet[0] == u'n?kkel'
        self.conn.rollback()
        cursor.execute("""select string1, string2, string3 from blobtest
        where string3 = 'n?kkel'""")
        self.conn.rollback()
        cursor.execute('select string3 from blobtest where string3 = ?',
                       [u'n?kkel'])
        self.conn.commit()


class _TestCustomer(unittest.TestCase):
    def setUp(self):
        self.conn = connect(_ConnectionTemplate % ('localhost'))
        cursor = self.conn.cursor()
        cursor.execute("""
create table customer (
customerId integer,
name varchar(255),
nick varchar(255),
date datetime
);""")
        self.conn.commit()
    
    def tearDown(self):
        cursor = self.conn.cursor()
        cursor.execute("drop table customer")
        self.conn.commit()
        self.conn.close()

    def testFieldCache(self):
        cursor = self.conn.cursor()
        assert cursor._fields == []
        cursor.executemany('insert into customer values (?, ?, ?, ?)',
                           [(1, 'syver', 'borger', DateFromTicks(time.time())),
                            (1, 'syver', 'borger', DateFromTicks(time.time()))])
        assert cursor._fields == []
        cursor.execute('select * from customer')
        assert cursor._fields == []
        cursor.fetchone()
        assert cursor._fields != []
        cursor.fetchone()
        assert cursor._fields != []
        cursor.execute('select * from customer')
        assert cursor._fields == []

    def testSelect(self):
        cursor = self.conn.cursor()
        cursor.execute('insert into customer values (1, ?, ?, ?)',
                       ['Syver Enstad', 'stalin', DateFromTicks(time.time())])
        cursor.execute('select name from customer where customerID = ?',
                       [1])
        resultSet = cursor.fetchone()
        assert resultSet[0] == 'Syver Enstad', resultSet[0]

    def testTwoStatements(self):
        cursor = self.conn.cursor()
        cursor.execute("""
    insert into customer (customerId, name, nick, date) values 
    (?, ?, ?, ?);
    select * from customer;""",
               (1, 'syver', 'stalin', (DateFromTicks(time.time()))))
        self.assertRaises(InterfaceError, cursor.fetchone) # no record set is reurned
        cursor.close()


    def test_date(self):
        cursor = self.conn.cursor()
        now = DateFromTicks(time.time())
        cursor.execute("""
    insert into customer (customerId, name, nick, date) values 
    (?, ?, ?, ?)""",
               (1, 'syver', 'stalin', (now)))
        cursor.close()
        self.conn.commit()
        cursor = self.conn.cursor()
        cursor.execute('select date from customer where nick=?', ['stalin'])
        resultSet = cursor.fetchone()
        result = resultSet[0]
        assert result == now, result

    def test_datetime(self):
        cursor = self.conn.cursor()
        now = TimestampFromTicks(time.time())
        cursor.execute("""
    insert into customer (customerId, name, nick, date) values 
    (?, ?, ?, ?)""",
               (1, 'syver', 'stalin',now))
	cursor.execute('select date from customer')
	resultSet = cursor.fetchone()
        result = resultSet[0]
        assert DateTime.cmp(result, now, 0.000001) == 0 # a roundoff error in
                                        # the roundtrip
        self.assertEquals(result.COMDate(), now.COMDate())
        cursor.close()

    def testDateRoundtrip(self):
        now = TimestampFromTicks(time.time())
        comDate = now.COMDate()
        newNow = DateTime.DateTimeFromCOMDate(comDate)
        # roundoff error when converting to COM date
        assert newNow != now, 'newNow: %s now: %s' % (newNow.absvalues(),
                                                      now.absValues()) 
                       
    def test_description(self):
        cursor = self.conn.cursor()
        cursor.close()
        self.test_executemany()
        cursor = self.conn.cursor()
        cursor.execute('select * from customer')
        desc = cursor.description
        print desc
        assert NUMBER == desc[0][1]
        assert desc[0][1] == NUMBER
        cursor.close()
        
    def test_emptyRowset(self):
        cursor = self.conn.cursor()
        cursor.execute('select * from customer')
        assert cursor.rowcount == -1
        resultSet = cursor.fetchall()
        cursor.close()
        
    def test_rowcount(self):
        cursor = self.conn.cursor()
        cursor.execute('select * from customer')
        assert cursor.rowcount == -1
        names = [('Per B?rge',), ('Geir',), ('Ove',), ('Bjarte',), ('Jens',), ('Dag',)]
        cursor.executemany("insert into customer (name) values (?)", names)
        cursor.execute("select * from customer")
        #assert cursor.rowcount == len(names), cursor.rowcount
        cursor.close()
        
    def test_executemany(self):
        cursor = self.conn.cursor()
        names = [(u'Per B?rge', u'p?rge'), (u'Geir',u'gigga'), (u'Ove', u'b?nna'), (u'Bjarte', u'tassen'), (u'Jens', u'j?lle'), (u'Dag', u'laffen')]
        cursor.executemany("insert into customer (name, nick) values (?, ?)",
                           names)
        self.conn.commit()
        cursor.execute("select name, nick from customer")
        result =  cursor.fetchall()
        for each in result:
            assert each in names
        cursor.close()

    def test_close(self):
        self.test_executemany()
        cursor = self.conn.cursor()
        cursor.execute('select * from Customer')
        cursor.fetchall()
        cursor.close()
        self.conn.commit()    
        cursor = self.conn.cursor()
        cursor.execute('delete from Customer')
        cursor.execute('select * from customer')
        assert cursor.rowcount == -1
        cursor.close()

    def testQmarkOperation(self):
        cursor = self.conn.cursor()
        cursor.execute(
            u"insert into customer (name, nick) values ('Ove', 'B?nna')")
        cursor.execute('select * from Customer where name=?', ['Ove'])
        resultSet = cursor.fetchall()
        self.assertEquals(resultSet[0][2], u'B?nna')
        
    def test_Transactions(self):
        cursor = self.conn.cursor()
        cursor.execute('select * from Customer')
        resultSet = cursor.fetchall()
        assert len(resultSet) == 0
        cursor.execute("insert into customer (name) values ('fredrik')")
        cursor.execute('select * from Customer')
        resultSet = cursor.fetchall()
        cursor.close()
        self.conn.commit()
        cursor = self.conn.cursor()
        cursor.execute('delete from Customer')
        cursor.execute('select * from customer')
        assert cursor.rowcount == -1
        self.conn.rollback()
        assert cursor.execute('select * from customer') != 0
        resultSet = cursor.fetchall()
        assert len(resultSet) == 1

class TestStoredProc(unittest.TestCase):
    def setUp(self):
        self.conn = connect(_ConnectionTemplate % ('localhost'))        
        cursor = self.conn.cursor()
        cursor.execute("""
        create proc min_proc (@param1 int, @param2 varchar(255),
             @out_param varchar(255) OUTPUT)
        as
        set @out_param = 'out value'
        select * from TestTable
        """)
        self.conn.commit()
        cursor.close()

    def tearDown(self):
        cursor = self.conn.cursor()
        cursor.execute('drop proc min_proc')
        self.conn.commit()
        cursor.close()
        

    def testRecordsetReturn(self):
        """ Jeg f?r ikke outparametere ut av en proc
        som jeg bruker recordset return i
        """
        cursor = self.conn.cursor()
        #cursor.execute("{? = call min_proc(7, 'syver')}")
        cmd = _Command(self.conn._adoConnection,
                      "{call min_proc(?, ?, ?)}", constants.adCmdText)
        input = [7, 'syver', None]
        cmd.setParameters(input)
        rs, code = cmd.execute()
        self.assertEquals(input, [7, 'syver', None]) # ingen outvalue
        cursor._rs = rs
        self.assertEquals(cursor.fetchall(),
                          [(u'syver', u'enstad    ', u'1'),
                           (u'hei', u'p\xe5        ', u'2')])


class TestStoredOutput(unittest.TestCase):
    def setUp(self):
        self.conn = connect(_ConnectionTemplate % ('localhost'))        
        cursor = self.conn.cursor()
        cursor.execute("""
        create procedure min_proc(@param1 int, @param2 varchar(255),
            @out_param varchar(255) OUTPUT)
        as
        set @out_param = @param2
        return @param1
        """)
        self.conn.commit()
        cursor.close()

    def tearDown(self):
        cursor = self.conn.cursor()
        cursor.execute('drop proc min_proc')
        self.conn.commit()
        cursor.close()
        
    def testParameterReturn(self):
        """ Her bruker vi ikke noe recordset return, s? her kommer
        utparameterene ut
        """
        cursor = self.conn.cursor()
        cmd = _Command(self.conn._adoConnection,
                      "{? = call min_proc(?, ?, ?)}", constants.adCmdText)
        input = [None, 7, 'syver', None]
        cmd.setParameters(input)
        rs, code = cmd.execute()
        self.assertEquals(input, [7, 7, 'syver', 'syver'])
        assert rs.State == 0 # means no results present


class TestCallProc(unittest.TestCase):
    def setUp(self):
        self.conn = connect(_ConnectionTemplate % ('localhost'))        
        cursor = self.conn.cursor()
        try:
            cursor.execute("""
            create procedure min_proc (@outparam int OUTPUT)
            as
            set @outparam = 10
            """)
        except ProgrammingError, (source, text, hresult):
            # something went wrong in the last, that's okay
            if (source == "There is already an object named 'min_proc' "
                "in the database."):
                print 'Caught exception, continuing', (source, text, hresult)
        self.conn.commit()
        cursor.close()

    def tearDown(self):
        cursor = self.conn.cursor()
        cursor.execute('drop proc min_proc')
        self.conn.commit()
        cursor.close()        

    def testParameterReturn(self):
        """ Her bruker vi ikke noe recordset return, s? her kommer
        utparameterene ut
        """
        cursor = self.conn.cursor()
        cmd = _Command(self.conn._adoConnection,
                      "min_proc(0)", constants.adCmdStoredProc)
        #input = [None]
        #cmd.setParameters(input)
        rs, code = cmd.execute()
        #self.assertEquals(input, [10])
        assert rs.State == 0 # means no results present

        
if __name__ == '__main__':
    unittest.main()
-------------- next part --------------


-- 

Vennlig hilsen 

Syver Enstad


More information about the Python-list mailing list