[Chicago] threads and xmlrpc?
kirby urner
kirby.urner at gmail.com
Fri Jan 30 21:33:05 CET 2009
Not sure if relevant in any way but I've advised at least one client
to consider Pyro over XMLRPC when it comes to interprocess
communication. It's not like you necessarily need both or anything
and you'll find testimony on the web that Pyro is faster, if you're
willing to live within the Pythonic domain (and many are).
Kirby
[ wrote this scaffolding (skeletal) for Synovate, bouncing of
Patrick's production code version ]
"""
Yo, I'm a Pyro Client, meaning I talk to some
Pyro Server, perhaps on a different machine.
I'm actually something of a "hive mind" in that
we're actually a suite of separate processes,
called Bees, each with independent access to a
backend Content table. As we complete our tasks,
our Queen sends notifications back to Mother.
Bees come in two flavors, Visitor and Scorer, each
defined in separate modules, and inheriting from Bee
"""
import Pyro.core
import time
import random
from processing import currentProcess, Process, freezeSupport
from processing import Queue
class Queen:
def __init__(self):
self._open_channel()
self.need_content = []
def _open_channel(self):
"""
establish a connection to Mother and
instance her remote objects
"""
self.mom = Pyro.core.getProxyForURI("PYROLOC://localhost:7766/Mother")
def need_content_for(self):
"""
urls needing stuff in Content
"""
self.need_content = self.mom.need_visitors()
return self.need_content
def need_scores_for(self):
"""
stuff in Content needing scoring
"""
self.need_scores = self.mom.need_scores()
return self.need_scores
def content_found(self, completed):
"""
tell Mother what I've downloaded
"""
self.mom.been_visited(completed)
def score_found(self, completed):
"""
tell Mother what I've scored
"""
self.mom.been_scored(completed)
def kill_daemon(self):
self.mom.kill_daemon()
class Bee:
"""
I talk directly to sql's Content, report to Queen
when I'm done (she talks to Mother)
"""
# busy work for testing
def __init__(self):
pass
def busy_work(self, input, output):
for item in iter(input.get, 'STOP'):
func, args = item
result = calculate(func, args)
output.put(result)
def start(self):
pass
def stop(self):
pass
#========== <start> for testing purposes ========
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(currentProcess().getName(), func.__name__, args, result)
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
def test1():
# finds object automatically if you're running the Name Server.
jokes = Pyro.core.getProxyForURI("PYROLOC://localhost:7766/Mother")
print jokes.joke("Irmen")
def test2():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
task_queue.putmany(TASKS1)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
thebee = Bee()
Process(target=thebee.busy_work, args=(task_queue, done_queue)).start()
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS1)):
print '\t', done_queue.get()
# Add more tasks using `put()` instead of `putmany()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
#========== <end> for testing purposes ========
if __name__ == '__main__':
# we'll wait to test the Queen with one of the subclasses of Bee
# since Queen talks to the daemon Mother
# test1()
test2()
More information about the Chicago
mailing list