multiprocessing queue hangs up on the Amazon cloud

jgrant at smith.edu jgrant at smith.edu
Wed Jan 14 16:55:03 EST 2015


Hello!

I searched and found posts that were similar to mine, but either I couldn't understand the answer or the problem was different enough that the answers weren't helpful - please excuse me if this seems to repeat a problem already answered.

I am trying to run a series of scripts on the Amazon cloud, multiprocessing on the 32 cores of our AWS instance.  The scripts run well, and the queuing seems to work BUT, although the processes run to completion, the script below that runs the queue never ends.  I have tried Queue and JoinableQueue and the same thing happens for both.

I've simplified the example below a bit, just having it print, but the problem is the same.  Can anyone see what I have done wrong here?

Thanks!

Jessica

----------------

#!/usr/bin/env python
import os

from multiprocessing import Lock, Process, Queue, current_process, freeze_support,JoinableQueue
from Pipeline import Pipeline
import time

def worker(done_queue,work_queue):
	try:
		for f in iter(work_queue.get, 'STOP'):
			###########################################
			# Normally I call a complicated series of scripts here but to test, just print	
			print f
			########################################### 	
			done_queue.put("%s - %s got %s." % (current_process().name, f, f))
			log("%s - %s got %s." % (current_process().name, f, f),f)
			work_queue.task_done()
	except Exception, e:
		done_queue.put("%s failed on %s with: %s" % (current_process().name, f, e.message))
		log("%s failed on %s with: %s" % (current_process().name, f, e.message),f)
		work_queue.task_done()
	done_queue.put('STOP')

	return


def log(string,f):
	outfile = open('queue_taxon_logfile' + str(f),'a')
	outfile.write(str(string) + '\n')
	outfile.close()

def main():

	workers = 32 #change if fewer processors are available
	work_queue = JoinableQueue(810) #change if more than 810 taxa
	done_queue = JoinableQueue(810)
	processes = []
	
	print "1 it  gets here"
	for f in range(64):
		work_queue.put(f)	
	work_queue.put('STOP')
	
	print "2 it  gets here"
	
	for w in xrange(workers):
		p = Process(target=worker, args=(done_queue,work_queue))
		p.start()
		processes.append(p)

	print "3  it  gets here"
	for p in processes:
		print p                     # it only prints once - <Process(Process-1, started)>
		p.join()
		
	print "it never gets here"
	for status in iter(done_queue.get, 'STOP'):
		print status


if __name__ == '__main__':
    main()
    
   



More information about the Python-list mailing list