[Tutor] oo design/interaction quandary

Brian Jones bkjones at gmail.com
Sat Dec 26 03:03:10 CET 2009


Hi all,

I'm having a design issue that's really bothering me. The code I'm writing
is fairly large by now, but I've written what I think is a decent example
that illustrates my problem.

My app launches threads that each consume messages from a queue, send them
to a processor object, and then the processor needs to return the message to
the main thread -- if the processing was successful -- and then the main
thread puts the message into a different queue.

Here's a shortened version of my code.


class Processor(object):
    def __init__(self):
        self.qhost = 'localhost'
        self.qport = '5672'
        self.uname = 'guest'
        self.pwd = 'guest'
        self.ssl = false
        self.vhost = '/'
        self.exch_name = 'fooX'
        self.exch_type = 'direct'
        self.queue_name = 'fooQ'
        self.conn = amqp.Connection(userid=self.uname,
password=self.pwd, host=self.qhost,
                                    virtual_host=self.vhost, ssl=self.ssl)
        self.chan = self.conn.channel()
        self.chan.exchange_declare(self.exch_name, type=self.exch_type)
        self.chan.queue_declare(self.qname)
        self.chan.queue_bind(self.qname, self.exch_name)

    def consume(self, callback):
        self.chan.basic_consume(self.qname, callback=callback)
        while True:
            self.chan.wait()


class Munger(object):
    def munge(self,msg):
        if msg % 2 == 0:
            yield msg

class Sender(object):
    def run(self):
        p = Processor()
        m = Munger()
        for msg in p.consume(m.munge):
            """
            I know this doesn't work right now. This
            piece of the code should send 'msg' to another queue.

            """
            pass



if __name__ == '__main__':
    s = Sender()
    s.run()


The problem might be obvious to you, but I'll quickly explain:

The Sender object (a thread in the real code), is  calling p.consume,
which just wraps the basic_consume method in py-amqplib. The
basic_consume method registers a consumer and a callback with the amqp
server. The chan.wait() call blocks and waits for messages. When the
messages arrive they are passed to the python callable we passed to
the basic_consume method. Therein lies the problem.

Messages go from my Processor object, to the Munger object, and this
is all initiated by the Sender object, but I can't find a clean way to
get messages successfully processed by Munger back to Sender, so that
Sender can requeue the message to the new queue.

I've thought about having a Queue object in Sender, or maybe
registering Sender as an observer of (at different times) the Munger
or Processor objects, but I'd like this to be as simple and
understandable as possible by other developers, because I'm wanting to
make the Processor and Munger objects pluggable (so the Processor can
support different queuing protocols, Munger can do literally
anything... etc).

Ideas welcome. If I've failed to explain some aspect of my issue, give
me a poke and I'll expand as best I can.


brian


Brian K. Jones

Python Magazine  http://www.pythonmagazine.com
My Blog          http://www.protocolostomy.com
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/tutor/attachments/20091225/b88f669d/attachment-0001.htm>


More information about the Tutor mailing list