Threading with Socket Server

baloan baloand at googlemail.com
Thu Mar 24 08:37:15 EDT 2011


If you don't mind to use the coroutine library eventlet you can
implement a single threaded solution. See example below. For your use
case you need to change controller to load the shelve every
eventlet.sleep(n) seconds.

Regards, Andreas

# eventlet single thread demo

""" prc_publish.eventlet

Price Publisher
"""

# imports

from argparse import ArgumentParser
import eventlet
import logging
import os
import random
import sys
import cPickle as pickle

LOG = logging.getLogger()

# definitions

def main(argv=None):
    if argv is None:
        argv = sys.argv
    LOG.info("starting '%s %s'", os.path.basename(argv[0]), "
".join(argv[1:]))
    # parse options and arguments
    parser = ArgumentParser(description="Price Publisher")
    parser.add_argument("-f", "--file", dest="filename",
                      help="read configuration from %(dest)s")
    parser.add_argument("-p", "--port", default=8001, type=int,
                      help="server port [default: %(default)s")
    args = parser.parse_args()
    print args
    # create product dict
    prds = { }
    pubqs = []
    for n in range(10):
        key = "AB" + "{:04}".format(n)
        prds["AB" + key] = Pricer(key)
    # start one thread for price changes
    eventlet.spawn(controller, prds, pubqs)
    address = ('localhost', 8010)
    eventlet.spawn(listener, address, pubqs)
    # main thread runs eventlet loop
    while True:
        eventlet.sleep(10)


def listener(address, pubqs):
    sock = eventlet.listen(address)
    while True:
        LOG.info('waiting for connection on %s', address)
        cx, remote = sock.accept()
        LOG.info("accepting connection from %s", remote)
        inq = eventlet.queue.Queue()
        pubqs.append(inq)
        eventlet.spawn(receiver, cx)
        eventlet.spawn(publisher, pubqs, inq, cx)


def publisher(pubqs, inq, cx):
    LOG.info("Publisher running")
    try:
        while True:
            # what happens if client does not pick up
            # what happens if client dies during queue wait
            try:
                with eventlet.Timeout(1):
                    item = inq.get()
                    s = pickle.dumps(item, pickle.HIGHEST_PROTOCOL)
                    # s = "{0[0]} {0[1]}\n\r".format(item)
                    cx.send(s)
            except eventlet.Timeout:
                # raises IOError if connection lost
                cx.fileno()
    # if connection closes
    except IOError, e:
        LOG.info(e)
    # make sure to close the socket
    finally:
        cx.close()
        pubqs.remove(inq)
    LOG.info("Publisher terminated")


def receiver(cx):
    LOG.info("Receiver running")
    try:
        while True:
            # what happens if client does not pick up
            s = cx.recv(4096)
            if not s:
                break
            LOG.info(s)
    # if connection closes
    except IOError, e:
        LOG.info(e)
    # make sure to close the socket
    finally:
        cx.close()
    LOG.info("Receiver terminated")

def controller(prds, pubqs):
    while True:
        LOG.info("controller: price update cycle, %i pubqs",
len(pubqs))
        Pricer.VOLA = update_vola(Pricer.VOLA)
        for prd in prds.values():
            prd.run()
            for pubq in pubqs:
                pubq.put((prd.name, prd.prc))
        eventlet.sleep(5)

def update_vola(old_vola):
    new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01)
    return new_vola

class Pricer(object):
    VOLA = 0.01
    def __init__(self, name):
        self.name = name
        self.prc = random.random() * 100.0

    def run(self):
        self.prc += random.choice((-1, +1)) * self.prc * self.VOLA


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)s.%(msecs)03i %(levelname).
4s %(funcName)10s: %(message)s',
                        datefmt='%H:%M:%S')
    main()



More information about the Python-list mailing list