[Python-checkins] bpo-33078 - Fix queue size on pickling error (GH-6119) (GH-6178)

Antoine Pitrou webhook-mailer at python.org
Wed Mar 21 12:21:20 EDT 2018


https://github.com/python/cpython/commit/bb5b5291971c104ea773db1a30e46d410b6b3e1e
commit: bb5b5291971c104ea773db1a30e46d410b6b3e1e
branch: 3.7
author: Miss Islington (bot) <31488909+miss-islington at users.noreply.github.com>
committer: Antoine Pitrou <pitrou at free.fr>
date: 2018-03-21T17:21:15+01:00
summary:

bpo-33078 - Fix queue size on pickling error (GH-6119) (GH-6178)

(cherry picked from commit e2f33add635df4fde81be9960bab367e010c19bf)

Co-authored-by: Thomas Moreau <thomas.moreau.2010 at gmail.com>

files:
A Misc/NEWS.d/next/Library/2018-03-15-07-38-00.bpo-33078.RmjUF5.rst
M Lib/multiprocessing/queues.py
M Lib/test/_test_multiprocessing.py

diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index d66d37a5c3e2..715a9b0e1290 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -161,7 +161,7 @@ def _start_thread(self):
             target=Queue._feed,
             args=(self._buffer, self._notempty, self._send_bytes,
                   self._wlock, self._writer.close, self._ignore_epipe,
-                  self._on_queue_feeder_error),
+                  self._on_queue_feeder_error, self._sem),
             name='QueueFeederThread'
         )
         self._thread.daemon = True
@@ -203,7 +203,7 @@ def _finalize_close(buffer, notempty):
 
     @staticmethod
     def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
-              onerror):
+              onerror, queue_sem):
         debug('starting thread to feed data to pipe')
         nacquire = notempty.acquire
         nrelease = notempty.release
@@ -255,6 +255,12 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
                     info('error in queue thread: %s', e)
                     return
                 else:
+                    # Since the object has not been sent in the queue, we need
+                    # to decrease the size of the queue. The error acts as
+                    # if the object had been silently removed from the queue
+                    # and this step is necessary to have a properly working
+                    # queue.
+                    queue_sem.release()
                     onerror(e, obj)
 
     @staticmethod
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 940fe584e752..c787702f1d4c 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -1056,6 +1056,19 @@ def __reduce__(self):
             self.assertTrue(q.get(timeout=1.0))
             close_queue(q)
 
+        with test.support.captured_stderr():
+            # bpo-33078: verify that the queue size is correctly handled
+            # on errors.
+            q = self.Queue(maxsize=1)
+            q.put(NotSerializable())
+            q.put(True)
+            self.assertEqual(q.qsize(), 1)
+            # bpo-30595: use a timeout of 1 second for slow buildbots
+            self.assertTrue(q.get(timeout=1.0))
+            # Check that the size of the queue is correct
+            self.assertEqual(q.qsize(), 0)
+            close_queue(q)
+
     def test_queue_feeder_on_queue_feeder_error(self):
         # bpo-30006: verify feeder handles exceptions using the
         # _on_queue_feeder_error hook.
diff --git a/Misc/NEWS.d/next/Library/2018-03-15-07-38-00.bpo-33078.RmjUF5.rst b/Misc/NEWS.d/next/Library/2018-03-15-07-38-00.bpo-33078.RmjUF5.rst
new file mode 100644
index 000000000000..55c2b1de8668
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-03-15-07-38-00.bpo-33078.RmjUF5.rst
@@ -0,0 +1,2 @@
+Fix the size handling in multiprocessing.Queue when a pickling error
+occurs.



More information about the Python-checkins mailing list