Graceful exit from Python + multiprocessing daemons
deva.seetharam at gmail.com
deva.seetharam at gmail.com
Fri May 20 21:32:18 EDT 2016
Hello,
Greetings!
I would like to get your advice wrt following situation:
I have a Linux daemon written in python (version 2.7) using the python-daemon (https://pypi.python.org/pypi/python-daemon) module. The objective of using python daemon is to run as an init.d script in Linux. This gets instantiated and started using a main function.
The main function also creates a Driver object. That object in turn creates two daemon processes (multiprocessing.Process with daemon flag set to True) a producer and a consumer. The producer puts data in a multiprocessing.Queue and the consumer reads those data items and processes them.
I have two challenges. Please advise how these can be handled:
1. When the program is stopped, I would like to preserve all the elements in the Queue so it can be processed later. (So that data in the Queue is not lost.) However, when I call the finalize function, it says the Queue is empty even though elements are in the Queue. One thing I noticed is when the finalize function is called, the parent ID changes to the calling process from 1.
2. When the daemon (python App stop) is stopped, I get an assertion error as follows:
Terminating on signal 15
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
func(*targs, **kargs)
File "/usr/lib/python2.7/multiprocessing/util.py", line 325, in _exit_function
p.join()
File "/usr/lib/python2.7/multiprocessing/process.py", line 143, in join
assert self._parent_pid == os.getpid(), 'can only join a child process'
AssertionError: can only join a child process
I am including the skeletal structure of the code for your review and advice.
---------------------------------------------------------------
class Producer(object):
def run():
self.data_queue.put(new_item)
def finalize(self):
dump_file = open('/tmp/dumper', 'wb')
while not self.data_queue.empty():
record = self.data_queue.get()
pickle.dump(record, dump_file)
dump_file.close()
class Consumer(object):
def run():
next_item = self.data_queue.get()
process_item(next_item)
def finalize(self):
dump_file = open('/tmp/dumper', 'wb')
while not self.data_queue.empty():
record = self.data_queue.get()
pickle.dump(record, dump_file)
dump_file.close()
class Driver(object)
def run_task(self, task, dummy):
while not self.stop_event.is_set():
task.run()
task.finalize()
self.logger.debug('Exiting task')
def __create_workers__(self):
task1 = Producer(data_queue)
self.task_list.append(task1)
p1 = multiprocessing.Process(target=self.run_task, name='T1', args=(task1, 1))
self.worker_list.append(p1)
task2 = Consumer(data_queue)
self.task_list.append(task2)
p2 = multiprocessing.Process(target=self.run_task, name='T2', args=(task2, 2))
self.worker_list.append(p2)
def run(self):
self.__create_workers__()
for worker in self.worker_list:
worker.daemon = True
worker.start()
def finalize(self):
for task in self.task_list:
self.logger.debug('finalizing ')
task.finalize()
if __name__ == "__main__":
the_driver = Driver(app_logger=logger)
the_driver.run()
# the following is a python-daemon object
the_daemon_object = Damien(logger)
daemon_runner.do_action()
---------------------------------------------------------------
More information about the Python-list
mailing list