[Python-checkins] bpo-43553: Improve `sqlite3` test coverage (GH-26886)

pablogsal webhook-mailer at python.org
Thu Jun 24 07:57:22 EDT 2021


https://github.com/python/cpython/commit/2c1ae09764446beda5248759fb99c859e14f1b25
commit: 2c1ae09764446beda5248759fb99c859e14f1b25
branch: main
author: Erlend Egeberg Aasland <erlend.aasland at innova.no>
committer: pablogsal <Pablogsal at gmail.com>
date: 2021-06-24T12:56:56+01:00
summary:

bpo-43553: Improve `sqlite3` test coverage (GH-26886)

files:
M Lib/sqlite3/test/dbapi.py
M Lib/sqlite3/test/factory.py
M Lib/sqlite3/test/types.py
M Lib/sqlite3/test/userfunctions.py

diff --git a/Lib/sqlite3/test/dbapi.py b/Lib/sqlite3/test/dbapi.py
index 7e44cac76f5b3..1a4b44188bd41 100644
--- a/Lib/sqlite3/test/dbapi.py
+++ b/Lib/sqlite3/test/dbapi.py
@@ -26,9 +26,8 @@
 import threading
 import unittest
 
-from test.support import check_disallow_instantiation
+from test.support import check_disallow_instantiation, threading_helper
 from test.support.os_helper import TESTFN, unlink
-from test.support import threading_helper
 
 
 # Helper for tests using TESTFN
@@ -110,6 +109,10 @@ def test_disallow_instantiation(self):
         cx = sqlite.connect(":memory:")
         check_disallow_instantiation(self, type(cx("select 1")))
 
+    def test_complete_statement(self):
+        self.assertFalse(sqlite.complete_statement("select t"))
+        self.assertTrue(sqlite.complete_statement("create table t(t);"))
+
 
 class ConnectionTests(unittest.TestCase):
 
@@ -225,6 +228,20 @@ def test_connection_exceptions(self):
                 self.assertTrue(hasattr(self.cx, exc))
                 self.assertIs(getattr(sqlite, exc), getattr(self.cx, exc))
 
+    def test_interrupt_on_closed_db(self):
+        cx = sqlite.connect(":memory:")
+        cx.close()
+        with self.assertRaises(sqlite.ProgrammingError):
+            cx.interrupt()
+
+    def test_interrupt(self):
+        self.assertIsNone(self.cx.interrupt())
+
+    def test_drop_unused_refs(self):
+        for n in range(500):
+            cu = self.cx.execute(f"select {n}")
+            self.assertEqual(cu.fetchone()[0], n)
+
 
 class OpenTests(unittest.TestCase):
     _sql = "create table test(id integer)"
@@ -594,6 +611,11 @@ def test_column_count(self):
         new_count = len(res.description)
         self.assertEqual(new_count - old_count, 1)
 
+    def test_same_query_in_multiple_cursors(self):
+        cursors = [self.cx.execute("select 1") for _ in range(3)]
+        for cu in cursors:
+            self.assertEqual(cu.fetchall(), [(1,)])
+
 
 class ThreadTests(unittest.TestCase):
     def setUp(self):
diff --git a/Lib/sqlite3/test/factory.py b/Lib/sqlite3/test/factory.py
index 876428497542f..7faa9ac8c1fc2 100644
--- a/Lib/sqlite3/test/factory.py
+++ b/Lib/sqlite3/test/factory.py
@@ -123,6 +123,8 @@ def test_sqlite_row_index(self):
             row[-3]
         with self.assertRaises(IndexError):
             row[2**1000]
+        with self.assertRaises(IndexError):
+            row[complex()]  # index must be int or string
 
     def test_sqlite_row_index_unicode(self):
         self.con.row_factory = sqlite.Row
diff --git a/Lib/sqlite3/test/types.py b/Lib/sqlite3/test/types.py
index 4bb1de80887b8..4f0e4f6d26839 100644
--- a/Lib/sqlite3/test/types.py
+++ b/Lib/sqlite3/test/types.py
@@ -381,6 +381,43 @@ def test_caster_is_used(self):
         val = self.cur.fetchone()[0]
         self.assertEqual(type(val), float)
 
+    def test_missing_adapter(self):
+        with self.assertRaises(sqlite.ProgrammingError):
+            sqlite.adapt(1.)  # No float adapter registered
+
+    def test_missing_protocol(self):
+        with self.assertRaises(sqlite.ProgrammingError):
+            sqlite.adapt(1, None)
+
+    def test_defect_proto(self):
+        class DefectProto():
+            def __adapt__(self):
+                return None
+        with self.assertRaises(sqlite.ProgrammingError):
+            sqlite.adapt(1., DefectProto)
+
+    def test_defect_self_adapt(self):
+        class DefectSelfAdapt(float):
+            def __conform__(self, _):
+                return None
+        with self.assertRaises(sqlite.ProgrammingError):
+            sqlite.adapt(DefectSelfAdapt(1.))
+
+    def test_custom_proto(self):
+        class CustomProto():
+            def __adapt__(self):
+                return "adapted"
+        self.assertEqual(sqlite.adapt(1., CustomProto), "adapted")
+
+    def test_adapt(self):
+        val = 42
+        self.assertEqual(float(val), sqlite.adapt(val))
+
+    def test_adapt_alt(self):
+        alt = "other"
+        self.assertEqual(alt, sqlite.adapt(1., None, alt))
+
+
 @unittest.skipUnless(zlib, "requires zlib")
 class BinaryConverterTests(unittest.TestCase):
     def convert(s):
diff --git a/Lib/sqlite3/test/userfunctions.py b/Lib/sqlite3/test/userfunctions.py
index 429089072496e..dc900f6486f49 100644
--- a/Lib/sqlite3/test/userfunctions.py
+++ b/Lib/sqlite3/test/userfunctions.py
@@ -21,11 +21,36 @@
 #    misrepresented as being the original software.
 # 3. This notice may not be removed or altered from any source distribution.
 
+import contextlib
+import functools
+import io
 import unittest
 import unittest.mock
 import gc
 import sqlite3 as sqlite
 
+def with_tracebacks(strings):
+    """Convenience decorator for testing callback tracebacks."""
+    strings.append('Traceback')
+
+    def decorator(func):
+        @functools.wraps(func)
+        def wrapper(self, *args, **kwargs):
+            # First, run the test with traceback enabled.
+            sqlite.enable_callback_tracebacks(True)
+            buf = io.StringIO()
+            with contextlib.redirect_stderr(buf):
+                func(self, *args, **kwargs)
+            tb = buf.getvalue()
+            for s in strings:
+                self.assertIn(s, tb)
+
+            # Then run the test with traceback disabled.
+            sqlite.enable_callback_tracebacks(False)
+            func(self, *args, **kwargs)
+        return wrapper
+    return decorator
+
 def func_returntext():
     return "foo"
 def func_returnunicode():
@@ -228,6 +253,7 @@ def test_func_return_long_long(self):
         val = cur.fetchone()[0]
         self.assertEqual(val, 1<<31)
 
+    @with_tracebacks(['func_raiseexception', '5/0', 'ZeroDivisionError'])
     def test_func_exception(self):
         cur = self.con.cursor()
         with self.assertRaises(sqlite.OperationalError) as cm:
@@ -387,6 +413,7 @@ def test_aggr_no_finalize(self):
             val = cur.fetchone()[0]
         self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error")
 
+    @with_tracebacks(['__init__', '5/0', 'ZeroDivisionError'])
     def test_aggr_exception_in_init(self):
         cur = self.con.cursor()
         with self.assertRaises(sqlite.OperationalError) as cm:
@@ -394,6 +421,7 @@ def test_aggr_exception_in_init(self):
             val = cur.fetchone()[0]
         self.assertEqual(str(cm.exception), "user-defined aggregate's '__init__' method raised error")
 
+    @with_tracebacks(['step', '5/0', 'ZeroDivisionError'])
     def test_aggr_exception_in_step(self):
         cur = self.con.cursor()
         with self.assertRaises(sqlite.OperationalError) as cm:
@@ -401,6 +429,7 @@ def test_aggr_exception_in_step(self):
             val = cur.fetchone()[0]
         self.assertEqual(str(cm.exception), "user-defined aggregate's 'step' method raised error")
 
+    @with_tracebacks(['finalize', '5/0', 'ZeroDivisionError'])
     def test_aggr_exception_in_finalize(self):
         cur = self.con.cursor()
         with self.assertRaises(sqlite.OperationalError) as cm:
@@ -502,6 +531,14 @@ def authorizer_cb(action, arg1, arg2, dbname, source):
             raise ValueError
         return sqlite.SQLITE_OK
 
+    @with_tracebacks(['authorizer_cb', 'ValueError'])
+    def test_table_access(self):
+        super().test_table_access()
+
+    @with_tracebacks(['authorizer_cb', 'ValueError'])
+    def test_column_access(self):
+        super().test_table_access()
+
 class AuthorizerIllegalTypeTests(AuthorizerTests):
     @staticmethod
     def authorizer_cb(action, arg1, arg2, dbname, source):



More information about the Python-checkins mailing list