[New-bugs-announce] [issue20527] multiprocessing.Queue deadlocks after “reader” process death

OscaTutenchamon report at bugs.python.org
Thu Feb 6 10:19:32 CET 2014


New submission from OscaTutenchamon:

I've experienced that queue can be deadlocked for reading when:
1. The "reader" process is using `get` with timeout > 0:
  self.queue.get(timeout=3)
2. "reader" dies while `get` is blocking due to timeout.

After that queue's `_rlock` is never being released so reading becomes impossible.

Application demonstrating the problem.
I create two child processes "Worker" (putting into queue) and "Receiver" (getting from queue). Also parent process periodically checks if his children are alive and starts new child if needed.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import procname
import time

class Receiver(multiprocessing.Process):
    ''' Reads from queue with 3 secs timeout '''

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        procname.setprocname('Receiver')
        while True:
            try:
                msg = self.queue.get(timeout=3)
                print '<<< `{}`, queue rlock: {}'.format(
                    msg, self.queue._rlock)
            except multiprocessing.queues.Empty:
                print '<<< EMPTY, Queue rlock: {}'.format(
                    self.queue._rlock)
                pass


class Worker(multiprocessing.Process):
    ''' Puts into queue with 1 sec sleep '''

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        procname.setprocname('Worker')
        while True:
            time.sleep(1)
            print 'Worker: putting msg, Queue size: ~{}'.format(
                self.queue.qsize())
            self.queue.put('msg from Worker')


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    worker = Worker(queue)
    worker.start()

    receiver = Receiver(queue)
    receiver.start()

    while True:
        time.sleep(1)
        if not worker.is_alive():
            print 'Restarting worker'
            worker = Worker(queue)
            worker.start()
        if not receiver.is_alive():
            print 'Restarting receiver'
            receiver = Receiver(queue)
            receiver.start()

Colored version here: http://pastebin.com/YifY63vE

Console output.
$ python queuetest.py
Worker: putting msg, Queue size: ~0
<<< `msg from Worker`, queue rlock: <Lock(owner=None)>
Worker: putting msg, Queue size: ~0
<<< `msg from Worker`, queue rlock: <Lock(owner=None)>
Restarting receiver                        <-- killed Receiver with SIGTERM
Worker: putting msg, Queue size: ~0
Worker: putting msg, Queue size: ~1
Worker: putting msg, Queue size: ~2
<<< EMPTY, Queue rlock: <Lock(owner=SomeOtherProcess)>
Worker: putting msg, Queue size: ~3
Worker: putting msg, Queue size: ~4
Worker: putting msg, Queue size: ~5
<<< EMPTY, Queue rlock: <Lock(owner=SomeOtherProcess)>
Worker: putting msg, Queue size: ~6
Worker: putting msg, Queue size: ~7 

System information.
$ uname -sr
Linux 3.11.8-200.fc19.x86_64
$ python -V
Python 2.7.5
In [3]: multiprocessing.__version__
Out[3]: '0.70a1'

----------
components: Library (Lib)
messages: 210369
nosy: OscaTutenchamon
priority: normal
severity: normal
status: open
title: multiprocessing.Queue deadlocks after “reader” process death
type: behavior
versions: Python 2.7

_______________________________________
Python tracker <report at bugs.python.org>
<http://bugs.python.org/issue20527>
_______________________________________


More information about the New-bugs-announce mailing list