[Python-checkins] cpython (2.7): sqlite3: Handle strings with embedded zeros correctly

petri.lehtinen python-checkins at python.org
Wed Feb 1 21:45:08 CET 2012


http://hg.python.org/cpython/rev/93ac4b12a750
changeset:   74709:93ac4b12a750
branch:      2.7
parent:      74705:94b69bae61eb
user:        Petri Lehtinen <petri at digip.org>
date:        Wed Feb 01 22:20:12 2012 +0200
summary:
  sqlite3: Handle strings with embedded zeros correctly

Closes #13676.

files:
  Lib/sqlite3/test/dbapi.py   |   7 ++++
  Lib/sqlite3/test/factory.py |  41 ++++++++++++++++++++++++-
  Misc/NEWS                   |   2 +
  Modules/_sqlite/cursor.c    |  16 +++++----
  Modules/_sqlite/statement.c |   8 ++--
  5 files changed, 62 insertions(+), 12 deletions(-)


diff --git a/Lib/sqlite3/test/dbapi.py b/Lib/sqlite3/test/dbapi.py
--- a/Lib/sqlite3/test/dbapi.py
+++ b/Lib/sqlite3/test/dbapi.py
@@ -203,6 +203,13 @@
     def CheckExecuteArgString(self):
         self.cu.execute("insert into test(name) values (?)", ("Hugo",))
 
+    def CheckExecuteArgStringWithZeroByte(self):
+        self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
+
+        self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
+        row = self.cu.fetchone()
+        self.assertEqual(row[0], "Hu\x00go")
+
     def CheckExecuteWrongNoOfArgs1(self):
         # too many parameters
         try:
diff --git a/Lib/sqlite3/test/factory.py b/Lib/sqlite3/test/factory.py
--- a/Lib/sqlite3/test/factory.py
+++ b/Lib/sqlite3/test/factory.py
@@ -189,13 +189,52 @@
     def tearDown(self):
         self.con.close()
 
+class TextFactoryTestsWithEmbeddedZeroBytes(unittest.TestCase):
+    def setUp(self):
+        self.con = sqlite.connect(":memory:")
+        self.con.execute("create table test (value text)")
+        self.con.execute("insert into test (value) values (?)", ("a\x00b",))
+
+    def CheckString(self):
+        # text_factory defaults to unicode
+        row = self.con.execute("select value from test").fetchone()
+        self.assertIs(type(row[0]), unicode)
+        self.assertEqual(row[0], "a\x00b")
+
+    def CheckCustom(self):
+        # A custom factory should receive an str argument
+        self.con.text_factory = lambda x: x
+        row = self.con.execute("select value from test").fetchone()
+        self.assertIs(type(row[0]), str)
+        self.assertEqual(row[0], "a\x00b")
+
+    def CheckOptimizedUnicodeAsString(self):
+        # ASCII -> str argument
+        self.con.text_factory = sqlite.OptimizedUnicode
+        row = self.con.execute("select value from test").fetchone()
+        self.assertIs(type(row[0]), str)
+        self.assertEqual(row[0], "a\x00b")
+
+    def CheckOptimizedUnicodeAsUnicode(self):
+        # Non-ASCII -> unicode argument
+        self.con.text_factory = sqlite.OptimizedUnicode
+        self.con.execute("delete from test")
+        self.con.execute("insert into test (value) values (?)", (u'ä\0ö',))
+        row = self.con.execute("select value from test").fetchone()
+        self.assertIs(type(row[0]), unicode)
+        self.assertEqual(row[0], u"ä\x00ö")
+
+    def tearDown(self):
+        self.con.close()
+
 def suite():
     connection_suite = unittest.makeSuite(ConnectionFactoryTests, "Check")
     cursor_suite = unittest.makeSuite(CursorFactoryTests, "Check")
     row_suite_compat = unittest.makeSuite(RowFactoryTestsBackwardsCompat, "Check")
     row_suite = unittest.makeSuite(RowFactoryTests, "Check")
     text_suite = unittest.makeSuite(TextFactoryTests, "Check")
-    return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite))
+    text_zero_bytes_suite = unittest.makeSuite(TextFactoryTestsWithEmbeddedZeroBytes, "Check")
+    return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite, text_zero_bytes_suite))
 
 def test():
     runner = unittest.TextTestRunner()
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -90,6 +90,8 @@
 Library
 -------
 
+- Issue #13676: Handle strings with embedded zeros correctly in sqlite3.
+
 - Issue #13506: Add '' to path for IDLE Shell when started and restarted with Restart Shell.
   Original patches by Marco Scataglini and Roger Serwy.
 
diff --git a/Modules/_sqlite/cursor.c b/Modules/_sqlite/cursor.c
--- a/Modules/_sqlite/cursor.c
+++ b/Modules/_sqlite/cursor.c
@@ -268,16 +268,17 @@
     }
 }
 
-PyObject* pysqlite_unicode_from_string(const char* val_str, int optimize)
+PyObject* pysqlite_unicode_from_string(const char* val_str, Py_ssize_t size, int optimize)
 {
     const char* check;
+    Py_ssize_t pos;
     int is_ascii = 0;
 
     if (optimize) {
         is_ascii = 1;
 
         check = val_str;
-        while (*check) {
+        for (pos = 0; pos < size; pos++) {
             if (*check & 0x80) {
                 is_ascii = 0;
                 break;
@@ -288,9 +289,9 @@
     }
 
     if (is_ascii) {
-        return PyString_FromString(val_str);
+        return PyString_FromStringAndSize(val_str, size);
     } else {
-        return PyUnicode_DecodeUTF8(val_str, strlen(val_str), NULL);
+        return PyUnicode_DecodeUTF8(val_str, size, NULL);
     }
 }
 
@@ -375,10 +376,11 @@
                 converted = PyFloat_FromDouble(sqlite3_column_double(self->statement->st, i));
             } else if (coltype == SQLITE_TEXT) {
                 val_str = (const char*)sqlite3_column_text(self->statement->st, i);
+                nbytes = sqlite3_column_bytes(self->statement->st, i);
                 if ((self->connection->text_factory == (PyObject*)&PyUnicode_Type)
                     || (self->connection->text_factory == pysqlite_OptimizedUnicode)) {
 
-                    converted = pysqlite_unicode_from_string(val_str,
+                    converted = pysqlite_unicode_from_string(val_str, nbytes,
                         self->connection->text_factory == pysqlite_OptimizedUnicode ? 1 : 0);
 
                     if (!converted) {
@@ -391,9 +393,9 @@
                         PyErr_SetString(pysqlite_OperationalError, buf);
                     }
                 } else if (self->connection->text_factory == (PyObject*)&PyString_Type) {
-                    converted = PyString_FromString(val_str);
+                    converted = PyString_FromStringAndSize(val_str, nbytes);
                 } else {
-                    converted = PyObject_CallFunction(self->connection->text_factory, "s", val_str);
+                    converted = PyObject_CallFunction(self->connection->text_factory, "s#", val_str, nbytes);
                 }
             } else {
                 /* coltype == SQLITE_BLOB */
diff --git a/Modules/_sqlite/statement.c b/Modules/_sqlite/statement.c
--- a/Modules/_sqlite/statement.c
+++ b/Modules/_sqlite/statement.c
@@ -166,13 +166,13 @@
             rc = sqlite3_bind_double(self->st, pos, PyFloat_AsDouble(parameter));
             break;
         case TYPE_STRING:
-            string = PyString_AS_STRING(parameter);
-            rc = sqlite3_bind_text(self->st, pos, string, -1, SQLITE_TRANSIENT);
+            PyString_AsStringAndSize(parameter, &string, &buflen);
+            rc = sqlite3_bind_text(self->st, pos, string, buflen, SQLITE_TRANSIENT);
             break;
         case TYPE_UNICODE:
             stringval = PyUnicode_AsUTF8String(parameter);
-            string = PyString_AsString(stringval);
-            rc = sqlite3_bind_text(self->st, pos, string, -1, SQLITE_TRANSIENT);
+            PyString_AsStringAndSize(stringval, &string, &buflen);
+            rc = sqlite3_bind_text(self->st, pos, string, buflen, SQLITE_TRANSIENT);
             Py_DECREF(stringval);
             break;
         case TYPE_BUFFER:

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list