[Python-checkins] r76769 - in sandbox/trunk/dbm_sqlite-3783/Lib: dbm/sqlite.py test/test_dbm_sqlite.py
antoine.pitrou
python-checkins at python.org
Sat Dec 12 22:08:11 CET 2009
Author: antoine.pitrou
Date: Sat Dec 12 22:08:11 2009
New Revision: 76769
Log:
Commit a modified version of Raymond's implementation + Skip's test
Added:
sandbox/trunk/dbm_sqlite-3783/Lib/dbm/sqlite.py (contents, props changed)
sandbox/trunk/dbm_sqlite-3783/Lib/test/test_dbm_sqlite.py (contents, props changed)
Added: sandbox/trunk/dbm_sqlite-3783/Lib/dbm/sqlite.py
==============================================================================
--- (empty file)
+++ sandbox/trunk/dbm_sqlite-3783/Lib/dbm/sqlite.py Sat Dec 12 22:08:11 2009
@@ -0,0 +1,146 @@
+''' Dbm based on sqlite -- Needed to support shelves
+
+Key and values are always stored as bytes. This means that when strings are
+used they are implicitly converted to the default encoding before being
+stored.
+
+Issues:
+
+ # ??? how to coordinate with whichdb
+ # ??? Any difference between blobs and text
+ # ??? does default encoding affect str-->bytes or PySqlite3 always use UTF-8
+ # ??? what is the correct isolation mode
+
+'''
+
+__all__ = ['error', 'open']
+
+import sqlite3
+import collections
+from operator import itemgetter
+
+error = sqlite3.DatabaseError
+
+# Disabled: the INSERT trigger gives erroneours results when REPLACE
+# is used for __setitem__.
+
+#MAKE_SHELF = '''
+#CREATE TABLE IF NOT EXISTS info
+ #(key TEXT PRIMARY KEY,
+ #value INFO NOT NULL);
+
+#INSERT OR IGNORE INTO info (key, value) VALUES ('size', 0);
+
+#CREATE TABLE IF NOT EXISTS shelf
+ #(key TEXT PRIMARY KEY,
+ #value TEXT NOT NULL);
+
+#CREATE TRIGGER IF NOT EXISTS insert_shelf
+ #AFTER INSERT ON shelf
+ #BEGIN
+ #UPDATE info SET value = value + 1 WHERE key = 'size';
+ #END;
+
+#CREATE TRIGGER IF NOT EXISTS delete_shelf
+ #AFTER DELETE ON shelf
+ #BEGIN
+ #UPDATE info SET value = value - 1 WHERE key = 'size';
+ #END;'''
+
+MAKE_SHELF = '''CREATE TABLE IF NOT EXISTS shelf
+ (key TEXT PRIMARY KEY,
+ value TEXT NOT NULL);'''
+
+GET_LEN = 'SELECT COUNT(*) FROM shelf'
+#GET_LEN = "SELECT value FROM info WHERE key = 'size'"
+GET_BOOL = 'SELECT 1 FROM shelf LIMIT 1'
+
+HAS_ITEM = 'SELECT 1 FROM shelf WHERE key = ?'
+GET_ITEM = 'SELECT value FROM shelf WHERE key = ?'
+SET_ITEM = 'REPLACE INTO shelf (key, value) VALUES (?,?)'
+DEL_ITEM = 'DELETE FROM shelf WHERE key = ?'
+
+ITER_KEYS = 'SELECT key from shelf'
+ITER_VALUES = 'SELECT value from shelf'
+ITER_ITEMS = 'SELECT key, value from shelf'
+
+UPDATE_ITEMS = 'REPLACE INTO shelf (key, value) VALUES (?, ?)'
+
+CLEAR_ALL = 'DELETE FROM shelf; VACUUM;'
+
+
+class SQLHash(collections.MutableMapping):
+
+ def __init__(self, filename=':memory:', flags='r', mode=None):
+ # XXX add flag/mode handling
+ # c -- create if it doesn't exist
+ # n -- new empty
+ # w -- open existing
+ # r -- readonly
+ self.conn = sqlite3.connect(filename)
+ self.conn.text_factory = bytes
+ self.conn.executescript(MAKE_SHELF)
+ self.conn.commit()
+
+ def __len__(self):
+ return self.conn.execute(GET_LEN).fetchone()[0]
+
+ def __bool__(self):
+ return self.conn.execute(GET_BOOL).fetchone() is not None
+
+ def keys(self):
+ return map(itemgetter(0), self.conn.execute(ITER_KEYS))
+
+ __iter__ = keys
+
+ def values(self):
+ return map(itemgetter(0), self.conn.execute(ITER_VALUES))
+
+ def items(self):
+ return iter(self.conn.execute(ITER_ITEMS))
+
+ def __contains__(self, key):
+ return self.conn.execute(HAS_ITEM, (key,)).fetchone() is not None
+
+ def __getitem__(self, key):
+ item = self.conn.execute(GET_ITEM, (key,)).fetchone()
+ if item is None:
+ raise KeyError(key)
+ return item[0]
+
+ def __setitem__(self, key, value):
+ self.conn.execute(SET_ITEM, (key, value))
+
+ def __delitem__(self, key):
+ if key not in self:
+ raise KeyError(key)
+ self.conn.execute(DEL_ITEM, (key,))
+
+ def update(self, items=(), **kwds):
+ if isinstance(items, collections.Mapping):
+ items = items.items()
+ self.conn.executemany(UPDATE_ITEMS, items)
+ self.conn.commit()
+ if kwds:
+ self.update(kwds)
+
+ def clear(self):
+ self.conn.executescript(CLEAR_ALL)
+ self.conn.commit()
+
+ def sync(self):
+ if self.conn is not None:
+ self.conn.commit()
+
+ def close(self):
+ if self.conn is not None:
+ self.conn.commit()
+ self.conn.close()
+ self.conn = None
+
+ def __del__(self):
+ self.close()
+
+
+def open(filename, _flag=None, mode=0o666):
+ return SQLHash(filename, mode)
Added: sandbox/trunk/dbm_sqlite-3783/Lib/test/test_dbm_sqlite.py
==============================================================================
--- (empty file)
+++ sandbox/trunk/dbm_sqlite-3783/Lib/test/test_dbm_sqlite.py Sat Dec 12 22:08:11 2009
@@ -0,0 +1,177 @@
+#! /usr/bin/env python
+"""Test script for the dbm.sqlite module
+ by Skip Montanaro, based on the 3.0 dumbdbm test module.
+"""
+
+import io
+import os
+import unittest
+import dbm.sqlite
+from test import support
+
+_fname = support.TESTFN
+
+def _delete_files():
+ try:
+ support.unlink(_fname)
+ except OSError:
+ pass
+
+class DbmSQLiteTestCase(unittest.TestCase):
+ _dict = {b'0': b'',
+ b'a': b'Python:',
+ b'b': b'Programming',
+ b'c': b'the',
+ b'd': b'way',
+ b'f': b'Guido',
+ b'g': b'intended',
+ }
+
+ def init_db(self):
+ f = dbm.sqlite.open(_fname, 'w')
+ for k in self._dict:
+ f[k] = self._dict[k]
+ f.close()
+
+ def keys_helper(self, f):
+ keys = sorted(f.keys())
+ dkeys = sorted(self._dict.keys())
+ self.assertEqual(keys, dkeys)
+ return keys
+
+ def read_helper(self, f):
+ keys = self.keys_helper(f)
+ for key in self._dict:
+ self.assertEqual(self._dict[key], f[key])
+
+ def test_creation(self):
+ f = dbm.sqlite.open(_fname, 'c')
+ self.assertEqual(list(f.keys()), [])
+ for key in self._dict:
+ f[key] = self._dict[key]
+ self.read_helper(f)
+ f.close()
+
+ def test_close_twice(self):
+ f = dbm.sqlite.open(_fname)
+ f[b'a'] = b'b'
+ f.close()
+ f.close()
+
+ def test_modification(self):
+ self.init_db()
+ f = dbm.sqlite.open(_fname, 'w')
+ self._dict[b'g'] = f[b'g'] = b"indented"
+ self.read_helper(f)
+ f.close()
+
+ def test_read(self):
+ self.init_db()
+ f = dbm.sqlite.open(_fname, 'r')
+ self.read_helper(f)
+ f.close()
+
+ def test_keys(self):
+ self.init_db()
+ f = dbm.sqlite.open(_fname)
+ keys = self.keys_helper(f)
+ f.close()
+
+ def test_len_bool(self):
+ f = dbm.sqlite.open(_fname)
+ self.assertEqual(len(f), 0)
+ self.assertFalse(f)
+ f[b'1'] = b'hello'
+ self.assertEqual(len(f), 1)
+ self.assertTrue(f)
+ f[b'1'] = b'goodbye'
+ self.assertEqual(len(f), 1)
+ self.assertTrue(f)
+ f[b'a'] = b'goodbye'
+ self.assertEqual(len(f), 2)
+ self.assertTrue(f)
+ del f[b'1']
+ self.assertEqual(len(f), 1)
+ self.assertTrue(f)
+ f.clear()
+ self.assertEqual(len(f), 0)
+ self.assertFalse(f)
+ f.close()
+
+ def test_write_contains(self):
+ f = dbm.sqlite.open(_fname)
+ f[b'1'] = b'hello'
+ self.assertTrue(b'1' in f)
+ f.close()
+
+ def test_write_write_read(self):
+ # test for bug #482460
+ f = dbm.sqlite.open(_fname)
+ f[b'1'] = b'hello'
+ f[b'1'] = b'hello2'
+ f.close()
+ f = dbm.sqlite.open(_fname)
+ self.assertEqual(f[b'1'], b'hello2')
+ f.close()
+
+ def test_write_then_read(self):
+ # test for bug #482460
+ f = dbm.sqlite.open(_fname)
+ f[b'1'] = b'hello'
+ self.assertEqual(f[b'1'], b'hello')
+ f.close()
+
+ # Perform randomized operations. This doesn't make assumptions about
+ # what *might* fail.
+ def test_random(self):
+ import random
+ d = {} # mirror the database
+ for dummy in range(5):
+ f = dbm.sqlite.open(_fname)
+ for dummy in range(100):
+ k = random.choice('abcdefghijklm').encode('ascii')
+ if random.random() < 0.2:
+ if k in d:
+ del d[k]
+ del f[k]
+ else:
+ v = (random.choice((b'a', b'b', b'c')) *
+ random.randrange(100))
+ d[k] = v
+ f[k] = v
+ if not (d[k], v == f[k] == d[k]):
+ print("v:", v, "f[k]:", f[k],
+ "d[k]:")
+ self.assertEqual(f[k], v)
+ f.close()
+
+ f = dbm.sqlite.open(_fname)
+ expected = sorted(d.items())
+ got = sorted(f.items())
+ self.assertEqual(expected, got)
+ f.close()
+
+ # XXX: not an useful invariant, and enforcing it might slow things down
+ #def test_keys_values_items(self):
+ #f = dbm.sqlite.open(_fname, 'c')
+ #self.assertEqual(list(f.keys()), [])
+ #for key in self._dict:
+ #f[key.encode("ascii")] = self._dict[key]
+ #self.assertEqual(list(zip(f.keys(), f.values())),
+ #list(f.items()))
+ #f.close()
+
+ def tearDown(self):
+ _delete_files()
+
+ def setUp(self):
+ _delete_files()
+
+def test_main():
+ try:
+ support.run_unittest(DbmSQLiteTestCase)
+ finally:
+ _delete_files()
+
+if __name__ == "__main__":
+ test_main()
More information about the Python-checkins
mailing list