[Python-checkins] r76769 - in sandbox/trunk/dbm_sqlite-3783/Lib: dbm/sqlite.py test/test_dbm_sqlite.py

antoine.pitrou python-checkins at python.org
Sat Dec 12 22:08:11 CET 2009


Author: antoine.pitrou
Date: Sat Dec 12 22:08:11 2009
New Revision: 76769

Log:
Commit a modified version of Raymond's implementation + Skip's test



Added:
   sandbox/trunk/dbm_sqlite-3783/Lib/dbm/sqlite.py   (contents, props changed)
   sandbox/trunk/dbm_sqlite-3783/Lib/test/test_dbm_sqlite.py   (contents, props changed)

Added: sandbox/trunk/dbm_sqlite-3783/Lib/dbm/sqlite.py
==============================================================================
--- (empty file)
+++ sandbox/trunk/dbm_sqlite-3783/Lib/dbm/sqlite.py	Sat Dec 12 22:08:11 2009
@@ -0,0 +1,146 @@
+''' Dbm based on sqlite -- Needed to support shelves
+
+Key and values are always stored as bytes. This means that when strings are
+used they are implicitly converted to the default encoding before being
+stored.
+
+Issues:
+
+    # ??? how to coordinate with whichdb
+    # ??? Any difference between blobs and text
+    # ??? does default encoding affect str-->bytes or PySqlite3 always use UTF-8
+    # ??? what is the correct isolation mode
+
+'''
+
+__all__ = ['error', 'open']
+
+import sqlite3
+import collections
+from operator import itemgetter
+
+error = sqlite3.DatabaseError
+
+# Disabled: the INSERT trigger gives erroneours results when REPLACE
+# is used for __setitem__.
+
+#MAKE_SHELF = '''
+#CREATE TABLE IF NOT EXISTS info
+    #(key TEXT PRIMARY KEY,
+     #value INFO NOT NULL);
+
+#INSERT OR IGNORE INTO info (key, value) VALUES ('size', 0);
+
+#CREATE TABLE IF NOT EXISTS shelf
+    #(key TEXT PRIMARY KEY,
+     #value TEXT NOT NULL);
+
+#CREATE TRIGGER IF NOT EXISTS insert_shelf
+    #AFTER INSERT ON shelf
+    #BEGIN
+        #UPDATE info SET value = value + 1 WHERE key = 'size';
+    #END;
+
+#CREATE TRIGGER IF NOT EXISTS delete_shelf
+    #AFTER DELETE ON shelf
+    #BEGIN
+        #UPDATE info SET value = value - 1 WHERE key = 'size';
+    #END;'''
+
+MAKE_SHELF = '''CREATE TABLE IF NOT EXISTS shelf
+    (key TEXT PRIMARY KEY,
+     value TEXT NOT NULL);'''
+
+GET_LEN =  'SELECT COUNT(*) FROM shelf'
+#GET_LEN =  "SELECT value FROM info WHERE key = 'size'"
+GET_BOOL =  'SELECT 1 FROM shelf LIMIT 1'
+
+HAS_ITEM = 'SELECT 1 FROM shelf WHERE key = ?'
+GET_ITEM = 'SELECT value FROM shelf WHERE key = ?'
+SET_ITEM = 'REPLACE INTO shelf (key, value) VALUES (?,?)'
+DEL_ITEM = 'DELETE FROM shelf WHERE key = ?'
+
+ITER_KEYS = 'SELECT key from shelf'
+ITER_VALUES = 'SELECT value from shelf'
+ITER_ITEMS = 'SELECT key, value from shelf'
+
+UPDATE_ITEMS = 'REPLACE INTO shelf (key, value) VALUES (?, ?)'
+
+CLEAR_ALL = 'DELETE FROM shelf; VACUUM;'
+
+
+class SQLHash(collections.MutableMapping):
+
+    def __init__(self, filename=':memory:', flags='r', mode=None):
+        # XXX add flag/mode handling
+        #   c -- create if it doesn't exist
+        #   n -- new empty
+        #   w -- open existing
+        #   r -- readonly
+        self.conn = sqlite3.connect(filename)
+        self.conn.text_factory = bytes
+        self.conn.executescript(MAKE_SHELF)
+        self.conn.commit()
+
+    def __len__(self):
+        return self.conn.execute(GET_LEN).fetchone()[0]
+
+    def __bool__(self):
+        return self.conn.execute(GET_BOOL).fetchone() is not None
+
+    def keys(self):
+        return map(itemgetter(0), self.conn.execute(ITER_KEYS))
+
+    __iter__ = keys
+
+    def values(self):
+        return map(itemgetter(0), self.conn.execute(ITER_VALUES))
+
+    def items(self):
+        return iter(self.conn.execute(ITER_ITEMS))
+
+    def __contains__(self, key):
+        return self.conn.execute(HAS_ITEM, (key,)).fetchone() is not None
+
+    def __getitem__(self, key):
+        item = self.conn.execute(GET_ITEM, (key,)).fetchone()
+        if item is None:
+            raise KeyError(key)
+        return item[0]
+
+    def __setitem__(self, key, value):
+        self.conn.execute(SET_ITEM, (key, value))
+
+    def __delitem__(self, key):
+        if key not in self:
+            raise KeyError(key)
+        self.conn.execute(DEL_ITEM, (key,))
+
+    def update(self, items=(), **kwds):
+        if isinstance(items, collections.Mapping):
+            items = items.items()
+        self.conn.executemany(UPDATE_ITEMS, items)
+        self.conn.commit()
+        if kwds:
+            self.update(kwds)
+
+    def clear(self):
+        self.conn.executescript(CLEAR_ALL)
+        self.conn.commit()
+
+    def sync(self):
+        if self.conn is not None:
+            self.conn.commit()
+
+    def close(self):
+        if self.conn is not None:
+            self.conn.commit()
+            self.conn.close()
+            self.conn = None
+
+    def __del__(self):
+        self.close()
+
+
+def open(filename, _flag=None, mode=0o666):
+    return SQLHash(filename, mode)

Added: sandbox/trunk/dbm_sqlite-3783/Lib/test/test_dbm_sqlite.py
==============================================================================
--- (empty file)
+++ sandbox/trunk/dbm_sqlite-3783/Lib/test/test_dbm_sqlite.py	Sat Dec 12 22:08:11 2009
@@ -0,0 +1,177 @@
+#! /usr/bin/env python
+"""Test script for the dbm.sqlite module
+   by Skip Montanaro, based on the 3.0 dumbdbm test module.
+"""
+
+import io
+import os
+import unittest
+import dbm.sqlite
+from test import support
+
+_fname = support.TESTFN
+
+def _delete_files():
+    try:
+        support.unlink(_fname)
+    except OSError:
+        pass
+
+class DbmSQLiteTestCase(unittest.TestCase):
+    _dict = {b'0': b'',
+             b'a': b'Python:',
+             b'b': b'Programming',
+             b'c': b'the',
+             b'd': b'way',
+             b'f': b'Guido',
+             b'g': b'intended',
+             }
+
+    def init_db(self):
+        f = dbm.sqlite.open(_fname, 'w')
+        for k in self._dict:
+            f[k] = self._dict[k]
+        f.close()
+
+    def keys_helper(self, f):
+        keys = sorted(f.keys())
+        dkeys = sorted(self._dict.keys())
+        self.assertEqual(keys, dkeys)
+        return keys
+
+    def read_helper(self, f):
+        keys = self.keys_helper(f)
+        for key in self._dict:
+            self.assertEqual(self._dict[key], f[key])
+
+    def test_creation(self):
+        f = dbm.sqlite.open(_fname, 'c')
+        self.assertEqual(list(f.keys()), [])
+        for key in self._dict:
+            f[key] = self._dict[key]
+        self.read_helper(f)
+        f.close()
+
+    def test_close_twice(self):
+        f = dbm.sqlite.open(_fname)
+        f[b'a'] = b'b'
+        f.close()
+        f.close()
+
+    def test_modification(self):
+        self.init_db()
+        f = dbm.sqlite.open(_fname, 'w')
+        self._dict[b'g'] = f[b'g'] = b"indented"
+        self.read_helper(f)
+        f.close()
+
+    def test_read(self):
+        self.init_db()
+        f = dbm.sqlite.open(_fname, 'r')
+        self.read_helper(f)
+        f.close()
+
+    def test_keys(self):
+        self.init_db()
+        f = dbm.sqlite.open(_fname)
+        keys = self.keys_helper(f)
+        f.close()
+
+    def test_len_bool(self):
+        f = dbm.sqlite.open(_fname)
+        self.assertEqual(len(f), 0)
+        self.assertFalse(f)
+        f[b'1'] = b'hello'
+        self.assertEqual(len(f), 1)
+        self.assertTrue(f)
+        f[b'1'] = b'goodbye'
+        self.assertEqual(len(f), 1)
+        self.assertTrue(f)
+        f[b'a'] = b'goodbye'
+        self.assertEqual(len(f), 2)
+        self.assertTrue(f)
+        del f[b'1']
+        self.assertEqual(len(f), 1)
+        self.assertTrue(f)
+        f.clear()
+        self.assertEqual(len(f), 0)
+        self.assertFalse(f)
+        f.close()
+
+    def test_write_contains(self):
+        f = dbm.sqlite.open(_fname)
+        f[b'1'] = b'hello'
+        self.assertTrue(b'1' in f)
+        f.close()
+
+    def test_write_write_read(self):
+        # test for bug #482460
+        f = dbm.sqlite.open(_fname)
+        f[b'1'] = b'hello'
+        f[b'1'] = b'hello2'
+        f.close()
+        f = dbm.sqlite.open(_fname)
+        self.assertEqual(f[b'1'], b'hello2')
+        f.close()
+
+    def test_write_then_read(self):
+        # test for bug #482460
+        f = dbm.sqlite.open(_fname)
+        f[b'1'] = b'hello'
+        self.assertEqual(f[b'1'], b'hello')
+        f.close()
+
+    # Perform randomized operations.  This doesn't make assumptions about
+    # what *might* fail.
+    def test_random(self):
+        import random
+        d = {}  # mirror the database
+        for dummy in range(5):
+            f = dbm.sqlite.open(_fname)
+            for dummy in range(100):
+                k = random.choice('abcdefghijklm').encode('ascii')
+                if random.random() < 0.2:
+                    if k in d:
+                        del d[k]
+                        del f[k]
+                else:
+                    v = (random.choice((b'a', b'b', b'c')) *
+                         random.randrange(100))
+                    d[k] = v
+                    f[k] = v
+                    if not (d[k], v == f[k] == d[k]):
+                        print("v:", v, "f[k]:", f[k],
+                              "d[k]:")
+                    self.assertEqual(f[k], v)
+            f.close()
+
+            f = dbm.sqlite.open(_fname)
+            expected = sorted(d.items())
+            got = sorted(f.items())
+            self.assertEqual(expected, got)
+            f.close()
+
+    # XXX: not an useful invariant, and enforcing it might slow things down
+    #def test_keys_values_items(self):
+        #f = dbm.sqlite.open(_fname, 'c')
+        #self.assertEqual(list(f.keys()), [])
+        #for key in self._dict:
+            #f[key.encode("ascii")] = self._dict[key]
+        #self.assertEqual(list(zip(f.keys(), f.values())),
+                         #list(f.items()))
+        #f.close()
+
+    def tearDown(self):
+        _delete_files()
+
+    def setUp(self):
+        _delete_files()
+
+def test_main():
+    try:
+        support.run_unittest(DbmSQLiteTestCase)
+    finally:
+        _delete_files()
+
+if __name__ == "__main__":
+    test_main()


More information about the Python-checkins mailing list