[Python-checkins] bpo-45512: Use Argument Clinic to set sqlite3 isolation level (GH-29593)

corona10 webhook-mailer at python.org
Thu Nov 18 04:18:17 EST 2021


https://github.com/python/cpython/commit/0920b61a0cb30128287ebafab1df8cad3a3dffdb
commit: 0920b61a0cb30128287ebafab1df8cad3a3dffdb
branch: main
author: Erlend Egeberg Aasland <erlend.aasland at innova.no>
committer: corona10 <donghee.na92 at gmail.com>
date: 2021-11-18T18:18:09+09:00
summary:

bpo-45512: Use Argument Clinic to set sqlite3 isolation level (GH-29593)

files:
M Modules/_sqlite/clinic/connection.c.h
M Modules/_sqlite/connection.c

diff --git a/Modules/_sqlite/clinic/connection.c.h b/Modules/_sqlite/clinic/connection.c.h
index 3a3ae04e8a193..16ad2ee254eca 100644
--- a/Modules/_sqlite/clinic/connection.c.h
+++ b/Modules/_sqlite/clinic/connection.c.h
@@ -63,22 +63,7 @@ pysqlite_connection_init(PyObject *self, PyObject *args, PyObject *kwargs)
         }
     }
     if (fastargs[3]) {
-        if (fastargs[3] == Py_None) {
-            isolation_level = NULL;
-        }
-        else if (PyUnicode_Check(fastargs[3])) {
-            Py_ssize_t isolation_level_length;
-            isolation_level = PyUnicode_AsUTF8AndSize(fastargs[3], &isolation_level_length);
-            if (isolation_level == NULL) {
-                goto exit;
-            }
-            if (strlen(isolation_level) != (size_t)isolation_level_length) {
-                PyErr_SetString(PyExc_ValueError, "embedded null character");
-                goto exit;
-            }
-        }
-        else {
-            _PyArg_BadArgument("Connection", "argument 'isolation_level'", "str or None", fastargs[3]);
+        if (!isolation_level_converter(fastargs[3], &isolation_level)) {
             goto exit;
         }
         if (!--noptargs) {
@@ -851,4 +836,4 @@ getlimit(pysqlite_Connection *self, PyObject *arg)
 #ifndef PYSQLITE_CONNECTION_LOAD_EXTENSION_METHODDEF
     #define PYSQLITE_CONNECTION_LOAD_EXTENSION_METHODDEF
 #endif /* !defined(PYSQLITE_CONNECTION_LOAD_EXTENSION_METHODDEF) */
-/*[clinic end generated code: output=6f267f20e77f92d0 input=a9049054013a1b77]*/
+/*[clinic end generated code: output=c2faf6563397091b input=a9049054013a1b77]*/
diff --git a/Modules/_sqlite/connection.c b/Modules/_sqlite/connection.c
index f632e3359cfb9..0bc9d1d0eeda5 100644
--- a/Modules/_sqlite/connection.c
+++ b/Modules/_sqlite/connection.c
@@ -33,6 +33,60 @@
 #define HAVE_TRACE_V2
 #endif
 
+static const char *
+get_isolation_level(const char *level)
+{
+    assert(level != NULL);
+    static const char *const allowed_levels[] = {
+        "",
+        "DEFERRED",
+        "IMMEDIATE",
+        "EXCLUSIVE",
+        NULL
+    };
+    for (int i = 0; allowed_levels[i] != NULL; i++) {
+        const char *candidate = allowed_levels[i];
+        if (sqlite3_stricmp(level, candidate) == 0) {
+            return candidate;
+        }
+    }
+    PyErr_SetString(PyExc_ValueError,
+                    "isolation_level string must be '', 'DEFERRED', "
+                    "'IMMEDIATE', or 'EXCLUSIVE'");
+    return NULL;
+}
+
+static int
+isolation_level_converter(PyObject *str_or_none, const char **result)
+{
+    if (Py_IsNone(str_or_none)) {
+        *result = NULL;
+    }
+    else if (PyUnicode_Check(str_or_none)) {
+        Py_ssize_t sz;
+        const char *str = PyUnicode_AsUTF8AndSize(str_or_none, &sz);
+        if (str == NULL) {
+            return 0;
+        }
+        if (strlen(str) != (size_t)sz) {
+            PyErr_SetString(PyExc_ValueError, "embedded null character");
+            return 0;
+        }
+
+        const char *level = get_isolation_level(str);
+        if (level == NULL) {
+            return 0;
+        }
+        *result = level;
+    }
+    else {
+        PyErr_SetString(PyExc_TypeError,
+                        "isolation_level must be str or None");
+        return 0;
+    }
+    return 1;
+}
+
 static int
 clinic_fsconverter(PyObject *pathlike, const char **result)
 {
@@ -100,29 +154,6 @@ new_statement_cache(pysqlite_Connection *self, pysqlite_state *state,
     return res;
 }
 
-static const char *
-get_isolation_level(const char *level)
-{
-    assert(level != NULL);
-    static const char *const allowed_levels[] = {
-        "",
-        "DEFERRED",
-        "IMMEDIATE",
-        "EXCLUSIVE",
-        NULL
-    };
-    for (int i = 0; allowed_levels[i] != NULL; i++) {
-        const char *candidate = allowed_levels[i];
-        if (sqlite3_stricmp(level, candidate) == 0) {
-            return candidate;
-        }
-    }
-    PyErr_SetString(PyExc_ValueError,
-                    "isolation_level string must be '', 'DEFERRED', "
-                    "'IMMEDIATE', or 'EXCLUSIVE'");
-    return NULL;
-}
-
 /*[python input]
 class FSConverter_converter(CConverter):
     type = "const char *"
@@ -131,8 +162,13 @@ class FSConverter_converter(CConverter):
         self.c_default = "NULL"
     def cleanup(self):
         return f"PyMem_Free((void *){self.name});\n"
+
+class IsolationLevel_converter(CConverter):
+    type = "const char *"
+    converter = "isolation_level_converter"
+
 [python start generated code]*/
-/*[python end generated code: output=da39a3ee5e6b4b0d input=7b3be538bc4058c0]*/
+/*[python end generated code: output=da39a3ee5e6b4b0d input=be142323885672ab]*/
 
 /*[clinic input]
 _sqlite3.Connection.__init__ as pysqlite_connection_init
@@ -140,7 +176,7 @@ _sqlite3.Connection.__init__ as pysqlite_connection_init
     database: FSConverter
     timeout: double = 5.0
     detect_types: int = 0
-    isolation_level: str(accept={str, NoneType}) = ""
+    isolation_level: IsolationLevel = ""
     check_same_thread: bool(accept={int}) = True
     factory: object(c_default='(PyObject*)clinic_state()->ConnectionType') = ConnectionType
     cached_statements as cache_size: int = 128
@@ -153,7 +189,7 @@ pysqlite_connection_init_impl(pysqlite_Connection *self,
                               int detect_types, const char *isolation_level,
                               int check_same_thread, PyObject *factory,
                               int cache_size, int uri)
-/*[clinic end generated code: output=7d640ae1d83abfd4 input=35e316f66d9f70fd]*/
+/*[clinic end generated code: output=7d640ae1d83abfd4 input=342173993434ba1e]*/
 {
     if (PySys_Audit("sqlite3.connect", "s", database) < 0) {
         return -1;
@@ -189,15 +225,6 @@ pysqlite_connection_init_impl(pysqlite_Connection *self,
         return -1;
     }
 
-    // Convert isolation level to begin statement.
-    const char *level = NULL;
-    if (isolation_level != NULL) {
-        level = get_isolation_level(isolation_level);
-        if (level == NULL) {
-            return -1;
-        }
-    }
-
     // Create LRU statement cache; returns a new reference.
     PyObject *statement_cache = new_statement_cache(self, state, cache_size);
     if (statement_cache == NULL) {
@@ -215,7 +242,7 @@ pysqlite_connection_init_impl(pysqlite_Connection *self,
     self->db = db;
     self->state = state;
     self->detect_types = detect_types;
-    self->isolation_level = level;
+    self->isolation_level = isolation_level;
     self->check_same_thread = check_same_thread;
     self->thread_ident = PyThread_get_thread_ident();
     self->statement_cache = statement_cache;
@@ -1375,26 +1402,9 @@ pysqlite_connection_set_isolation_level(pysqlite_Connection* self, PyObject* iso
             return -1;
         }
         Py_DECREF(res);
+        return 0;
     }
-    else if (PyUnicode_Check(isolation_level)) {
-        Py_ssize_t len;
-        const char *cstr_level = PyUnicode_AsUTF8AndSize(isolation_level, &len);
-        if (cstr_level == NULL) {
-            return -1;
-        }
-        if (strlen(cstr_level) != (size_t)len) {
-            PyErr_SetString(PyExc_ValueError, "embedded null character");
-            return -1;
-        }
-        const char *level = get_isolation_level(cstr_level);
-        if (level == NULL) {
-            return -1;
-        }
-        self->isolation_level = level;
-    }
-    else {
-        PyErr_SetString(PyExc_TypeError,
-                        "isolation_level must be str or None");
+    if (!isolation_level_converter(isolation_level, &self->isolation_level)) {
         return -1;
     }
     return 0;



More information about the Python-checkins mailing list