[Python-checkins] bpo-47029: Fix BrokenPipeError in multiprocessing.Queue at garbage collection and explicit close (GH-31913)
miss-islington
webhook-mailer at python.org
Tue May 3 20:16:42 EDT 2022
https://github.com/python/cpython/commit/28eea73e7c5405ec41dda0cddae2a3ebaac908f5
commit: 28eea73e7c5405ec41dda0cddae2a3ebaac908f5
branch: 3.10
author: Miss Islington (bot) <31488909+miss-islington at users.noreply.github.com>
committer: miss-islington <31488909+miss-islington at users.noreply.github.com>
date: 2022-05-03T17:16:21-07:00
summary:
bpo-47029: Fix BrokenPipeError in multiprocessing.Queue at garbage collection and explicit close (GH-31913)
(cherry picked from commit dfb1b9da8a4becaeaed3d9cffcaac41bcaf746f4)
Co-authored-by: Géry Ogam <gery.ogam at gmail.com>
files:
A Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst
M Lib/multiprocessing/queues.py
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index a2901814876d6..f37f114a96887 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -139,13 +139,10 @@ def put_nowait(self, obj):
def close(self):
self._closed = True
- try:
- self._reader.close()
- finally:
- close = self._close
- if close:
- self._close = None
- close()
+ close = self._close
+ if close:
+ self._close = None
+ close()
def join_thread(self):
debug('Queue.join_thread()')
@@ -169,8 +166,9 @@ def _start_thread(self):
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send_bytes,
- self._wlock, self._writer.close, self._ignore_epipe,
- self._on_queue_feeder_error, self._sem),
+ self._wlock, self._reader.close, self._writer.close,
+ self._ignore_epipe, self._on_queue_feeder_error,
+ self._sem),
name='QueueFeederThread'
)
self._thread.daemon = True
@@ -211,8 +209,8 @@ def _finalize_close(buffer, notempty):
notempty.notify()
@staticmethod
- def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
- onerror, queue_sem):
+ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
+ writer_close, ignore_epipe, onerror, queue_sem):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
@@ -238,7 +236,8 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
obj = bpopleft()
if obj is sentinel:
debug('feeder thread got sentinel -- exiting')
- close()
+ reader_close()
+ writer_close()
return
# serialize the data before acquiring the lock
diff --git a/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst b/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst
new file mode 100644
index 0000000000000..cc054673338f0
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst
@@ -0,0 +1,4 @@
+Always close the read end of the pipe used by :class:`multiprocessing.Queue`
+*after* the last write of buffered data to the write end of the pipe to avoid
+:exc:`BrokenPipeError` at garbage collection and at
+:meth:`multiprocessing.Queue.close` calls. Patch by Géry Ogam.
More information about the Python-checkins
mailing list