Graceful exit from Python + multiprocessing daemons

deva.seetharam at gmail.com deva.seetharam at gmail.com
Fri May 20 21:32:18 EDT 2016


Hello,
Greetings!

I would like to get your advice wrt following situation:


I have a Linux daemon written in python (version 2.7) using the python-daemon (https://pypi.python.org/pypi/python-daemon) module. The objective of using python daemon is to run as an init.d script in Linux. This gets instantiated and started using a main function. 

The main function also creates a Driver object. That object in turn creates two daemon processes (multiprocessing.Process with daemon flag set to True) a producer and a consumer. The producer puts data in a multiprocessing.Queue and the consumer reads those data items and processes them. 

I have two challenges. Please advise how these can be handled:
1. When the program is stopped, I would like to preserve all the elements in the Queue so it can be processed later. (So that data in the Queue is not lost.) However, when I call the finalize function, it says the Queue is empty even though elements are in the Queue. One thing I noticed is when the finalize function is called, the parent ID changes to the calling process from 1. 
2. When the daemon (python App stop)  is stopped, I get an assertion error as follows:
Terminating on signal 15
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib/python2.7/multiprocessing/util.py", line 325, in _exit_function
    p.join()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 143, in join
    assert self._parent_pid == os.getpid(), 'can only join a child process'
AssertionError: can only join a child process

I am including the skeletal structure of the code for your review and advice. 
---------------------------------------------------------------
class Producer(object):

	def run():
		self.data_queue.put(new_item)
	
	def finalize(self):
	    dump_file = open('/tmp/dumper', 'wb')
	    while not self.data_queue.empty():
	        record = self.data_queue.get()
	        pickle.dump(record, dump_file)
	    dump_file.close()
		
class Consumer(object):

	def run():
		next_item = self.data_queue.get()
		process_item(next_item)
		
	def finalize(self):
	    dump_file = open('/tmp/dumper', 'wb')
	    while not self.data_queue.empty():
	        record = self.data_queue.get()
	        pickle.dump(record, dump_file)
	    dump_file.close()

class Driver(object)
	def run_task(self, task, dummy):
        while not self.stop_event.is_set():
            task.run()
        task.finalize()
        self.logger.debug('Exiting task')

    def __create_workers__(self):
        task1 = Producer(data_queue)
        self.task_list.append(task1)
        p1 = multiprocessing.Process(target=self.run_task, name='T1', args=(task1, 1))
        self.worker_list.append(p1)
        

        task2 = Consumer(data_queue)
        self.task_list.append(task2)
        p2 = multiprocessing.Process(target=self.run_task, name='T2', args=(task2, 2))
        self.worker_list.append(p2)

    def run(self):
        self.__create_workers__()
        for worker in self.worker_list:
            worker.daemon = True
            worker.start()

    def finalize(self):
        for task in self.task_list:
            self.logger.debug('finalizing ')
            task.finalize()

if __name__ == "__main__":

	the_driver = Driver(app_logger=logger)	
    the_driver.run()

	# the following is a python-daemon object
    the_daemon_object = Damien(logger)
	daemon_runner.do_action()
---------------------------------------------------------------



More information about the Python-list mailing list