Job queue using xmlrpc and threading

psaffrey at googlemail.com psaffrey at googlemail.com
Mon Sep 22 14:47:03 EDT 2008


I'm trying to implement an application that will listen for requests,
run them when they are present but also be able to add new requests
even while it's running. I've tried to do this using the thread and
xmlrpc modules - the idea is that an XML-RPC exposed object tell the
queue thread object to add a job. If there are no jobs running, it
creates a file, adds the new job to the end and then another
consumption thread starts working through the jobs in the file. New
jobs coming in are just added to the end of the file by the queue
thread.

Unfortunately, I can't get it to work. The problem is that the
consumption thread seems to read the job queue before it gets written,
even though I've used a lock. I've also had the application get to the
stage where it ignores ctrl-c, which is a little worrying - I fear it
doesn't bode well for future stability... I don't have a lot of
experience with multi-threaded applications, so I may well have chosen
a poor approach.

I've posted the code below. It's in three parts, the job queue, the
manager that listens for new requests and an application to add jobs
to the queue. Sorry for the long listings...

Any guidance gratefully received,

Peter

===
testqueue.py:

import thread
import time
import shutil
import os

class JobQueue:

	def __init__(self, filename):
		self.queuefile = filename
		self.jobthread = 0
		# lock for the jobfile queue file
		self.jfqlock = thread.allocate_lock()

	def addJob(self, jobfileuri, email):
		self.jfqlock.acquire()
		if not self.jobthread:
			print "starting jobfile consumption thread"
			if os.access(self.queuefile, os.R_OK):
				print "cleaning stale jobfile queue file"
				try:
					os.remove(self.queuefile)
				except:
					print "problem removing jobfile queue file"
					raise
			self.jobthread = thread.start_new_thread(self.main, ())
		else:
			print "using existing jobfile consumption thread in file",
self.queuefile
		fh = open(self.queuefile, "a")
		# choose "::::" as a delimiter
		print >> fh, jobfileuri + "::::" + email
		self.jfqlock.release()
		return 1

	def main(self):
		while 1:
			self.jfqlock.acquire()
			rfh = open(self.queuefile, "r")
#			breakpoint()
			finput = rfh.readline()
			print "found:", finput
			if not finput:
				print "finished jobfiles. Closing thread"
				rfh.close()
				self.jobthread = 0
				self.jfqlock.release()
				return
			else:
				print "found jobfile in queue: attempting to run"
				# most of this is to shift up the lines in the file
				tmpname = self.queuefile + ".tmp"
				wfh = open(tmpname, "w")
				line = rfh.readline()
				while line:
					wfh.write(line)
					line = rfh.readline()
				wfh.close()
				rfh.close()
				shutil.move(tmpname, self.queuefile)
				self.jfqlock.release()
				# lop off the trailing line break
				print
				print "***run Starting***"
				try:
					self.runJob(finput[:-1])
					print "***run finished***"
				except:
					print "***run failed***"
				print

	def runJob(self, job):
		time.sleep(2.0)
		print "running job", job
		if not report:
			print "some problem with run. Cannot mail out report"
			return


===
queuemanager.py

from testqueue import JobQueue
from SimpleXMLRPCServer import *


class QM:
	def __init__(self, filename):
		self.jq = JobQueue("queue.txt")

	def addJob(self, jobname):
		self.jq.addJob(jobname, "tester at testdomain")

if __name__=="__main__":
	qm = QM("jobqueue.txt")
	rpcserver = SimpleXMLRPCServer(("localhost", 8000))
	rpcserver.register_instance(qm)
	rpcserver.serve_forever()

===
addjob.py:

import xmlrpclib
import sys

server = "localhost"
port = 8000

serveradd = "http://%s:%s" % (server, port)
manager = xmlrpclib.ServerProxy(serveradd)

jobname = sys.argv[1]

manager.addJob(jobname)



More information about the Python-list mailing list