Easier database queries
Steve Holden
sholden at holdenweb.com
Thu Aug 2 13:32:15 EDT 2001
I've been working for some time to come up with a handy way for the
relatively new Python database user to get stuff out of a database. For
other reasons I also wanted to implement data caching. The cachequery.py
module below has the following characteristics:
1. You create a query by providing:
the name of the table you want to query
the column names you want returned
the column names to use as key values
a database connection
an optional refresh period which specifies how old a
cache entry (0 by default, meaning no caching
will take place) may become before being refreshed
2. Once you have created your query, you call it with a tuple of arguments.
This tuple must be the same length as the tuple of key names that
you provided when you created the query.
This call returns a sequence of database tuples (as defined in Greg
Stein's
dtuple module, which is required), each member of which corresponds to
a row in the set for which the defined key fields have the values given
in
the call.
This means that you can access the columns of each row by name.
Example:
>>> import cachequery
>>> import mx.ODBC.Windows as odbc
>>> conn = odbc.connect("prom2000")
>>> query = query = cachequery.CacheQuery("members",
"Name Address Expiry".split(),
["MemberId"], conn)
Descriptor: (['Name'], ['Address'], ['Expiry'])
SQL: SELECT Name, Address, Expiry FROM members WHERE MemberId=?
>>> guido = query((1, ))[0]
>>> guido.Name, guido.Address, guido.Expiry
('Guido', 'Reston VA', <DateTime object for '2099-12-31 00:00:00.00' at
16eaf10>)
The code is shown below, and is also published at
http://www.holdenweb.com/Python/PDCode/pyindex.py
Comments will be welcome.
regards
Steve
--
http://www.holdenweb.com/
import mx.DateTime
import dtuple
#################################################################
# $Revision: 4 $
# $Date: 8/02/01 1:14p $
#################################################################
class CacheQuery:
"""Defines a database query that caches database row sets.
This object is initialized with
tbl table name in the database
colnames list of field names retrieved
keynames list of keys used for retrieval
conn database connection
refresh caching refresh interval
Individual results are read by calling the object with a
tuple of key values as an argument. If the row set associated
with this particular set of keys is not present, or was read
longer than the refresh interval ago, it is re-read from the
database and stored in the content table as a list of database
tuples, which allow columns to be accessed by name.
Otherwise the already-cached database tuple set is returned.
Refinements might be added, such as registering with an
observer that might clear down all cache entries periodically
to force a global database refresh, and using a second SQL query
on record modified timestamps to determine whether a refresh is
really required (which may or may not be a win for a given set
of columns).
"""
def __init__(self, tbl, colnames, keynames, conn, refresh=0):
"""Create a caching data set for the given table, columns and
keys."""
self._flush()
self.tbl = tbl
self.refresh = refresh
self.cursor = conn.cursor()
condition = " AND ".join(["%s=?" % f for f in keynames])
self.sql = "SELECT %s FROM %s WHERE %s" % \
(", ".join(colnames), tbl, condition)
self.desc = dtuple.TupleDescriptor([[n, ] for n in colnames])
print "Descriptor:", self.desc
print "SQL:", self.sql
def _flush(self):
"""Remove all trace of previous caching."""
self.recs = {}
self.when = {}
def __call__(self, keyvals, debug=0):
"""Return the data set associated with given key values."""
now = mx.DateTime.now()
if self.recs.has_key(keyvals) and self.refresh and (now -
self.when[keyvals] < self.refresh):
if debug: print "Interval:", now - self.when[keyvals]
return self.recs[keyvals]
else:
self.cursor.execute(self.sql, keyvals)
rows = self.cursor.fetchall()
result = [dtuple.DatabaseTuple(self.desc, row) for row in rows]
if self.refresh:
if debug: print "Caching", self.tbl, keyvals, " at", now
self.recs[keyvals] = result
self.when[keyvals] = now
return result
def close(self):
self.recs = None
self.when = None
self.cursor.close()
if __name__ == "__main__":
#
# Sorry, you'll need you own database details in here
#
import mx.ODBC.Windows as odbc
conn = odbc.connect("prom2000")
s1 = CacheQuery("department", # table
"DptName DptWelcome DptLnksTxt".split(), # columns
("DptCode",), # key
columns
conn, refresh=0) # other
stuff
while 1:
dc = raw_input("Department Code: ")
if not dc:
break
rows = s1((dc, ), debug=1)
if len(rows) == 0:
print "No such department"
else:
for row in rows:
print """
Department: %s Full Name: %s
Welcome Text:
%s
Links Text:
%s
""" % (dc, row.DptName, row.DptWelcome, row.DptLnksTxt)
s1.close()
conn.close()
More information about the Python-list
mailing list