ThreadPoolingMixIn

pavel.uvarov at gmail.com pavel.uvarov at gmail.com
Mon Jun 2 11:09:35 EDT 2008


On May 31, 9:13 pm, Rhamphoryncus <rha... at gmail.com> wrote:
> On May 30, 2:40 pm, pavel.uva... at gmail.com wrote:
>
> > Hi, everybody!
>
> > I wrote a useful class ThreadPoolingMixIn which can be used to create
> > fast thread-based servers. This mix-in works much faster than
> > ThreadingMixIn because it doesn't create a new thread on each request.
>
> Do you have any benchmarks demonstrating the performance difference/
>

To benchmark this I used a simple tcp server which writes a small
(16k)
string to the client and closes the connection.

I started 100 remote clients and got 500 replies/s for ThreadingMixIn
and more than 1500 replies/s for ThreadPoolingMixIn. I tested it on
FreeBSD 6.2 amd64.

I'm very curious about the exactness of the number 500 for
ThreadingMixIn. It seems to be the same for various packet sizes.
I suspect there is some OS limit on thread creating rate.

Below I include a bugfixed ThreadPoolingMixIn and the benchmarking
utility. The utility can be used to start clients on localhost, though
the reply rate will be slower (around 1000 replies/s).

To start benchmarking server with localhost clients use:
python ./TestServer.py --server=threading --n-clients=100
or
python ./TestServer.py --server=threadpooling --n-clients=100


#------- ThreadPoolingMixIn.py
from __future__ import with_statement
from SocketServer import ThreadingMixIn
import threading
import Queue
class ThreadPoolingMixIn(ThreadingMixIn):
    """Mix-in class to handle requests in a thread pool.

    The pool grows and thrinks depending on load.

    For instance, a threadpooling TCP server class is created as
follows:

    class ThreadPoolingUDPServer(ThreadPoolingMixIn, TCPServer): pass


    """
    __author__ = 'Pavel Uvarov <pavel.uvarov at gmail.com>'

    def init_thread_pool(self, min_workers = 5,
                         max_workers = 100, min_spare_workers = 5):
        """Initialize thread pool."""
        self.q = Queue.Queue()
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.min_spare_workers = min_spare_workers
        self.num_workers = 0
        self.num_busy_workers = 0
        self.workers_mutex = threading.Lock()
        self.start_workers(self.min_workers)

    def start_workers(self, n):
        """Start n workers."""
        for i in xrange(n):
            t = threading.Thread(target = self.worker)
            t.setDaemon(True)
            t.start()

    def worker(self):
        """A function of a working thread.

        It gets a request from queue (blocking if there
        are no requests) and processes it.

        After processing it checks how many spare workers
        are there now and if this value is greater than
        self.min_spare_workers then the worker exits.
        Otherwise it loops infinitely.

        """
        with self.workers_mutex:
            self.num_workers += 1
        while True:
            (request, client_address) = self.q.get()
            with self.workers_mutex:
                self.num_busy_workers += 1
            self.process_request_thread(request, client_address)
            self.q.task_done()
            with self.workers_mutex:
                self.num_busy_workers -= 1
                if (self.num_workers > self.min_workers and
                    self.num_workers - self.num_busy_workers >
                    self.min_spare_workers):
                    self.num_workers -= 1
                    return

    def process_request(self, request, client_address):
        """Puts a request into queue.

        If the queue size is too large, it adds extra worker.

        """
        self.q.put((request, client_address))
        with self.workers_mutex:
            if self.q.qsize() > 3 and self.num_workers <
self.max_workers:
                self.start_workers(1)

    def join(self):
        """Wait for all busy threads"""
        self.q.join()

#------- TestServer.py
from __future__ import with_statement
from SocketServer import *
import socket
import sys
import threading
import time
import os
from ThreadPoolingMixIn import *

class ThreadPoolingTCPServer(ThreadPoolingMixIn, TCPServer): pass

class TestServer(ThreadingTCPServer):

    allow_reuse_address = True
    request_queue_size = 128

    def __init__(self, server_address, RequestHandlerClass,
                 packet_size):
        TCPServer.__init__(self, server_address, RequestHandlerClass)
        self.packet_size = packet_size
        self.sum_t = 0
        self.total_num_requests = 0
        self.num_requests = 0
        self.t0 = time.time()
        self.lock = threading.Lock()

    def reset_stats(self):
        with self.lock:
            self.total_num_requests += self.num_requests
            self.num_requests = 0
            self.sum_t = 0
            self.t0 = time.time()

    def update_stats(self, t0, t1):
        with self.lock:
            self.num_requests += 1
            self.sum_t += t1 - t0
            n = self.num_requests
            sum_t = self.sum_t
        avg_t = sum_t / n
        rate = n / (t1 - self.t0)
        return (n, avg_t, rate)

    def handle_request(self):
        """Handle one request, possibly blocking."""
        try:
            request, client_address = self.get_request()
        except KeyboardInterrupt:
            raise
        except socket.error:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except KeyboardInterrupt:
                raise
            except:
                self.handle_error(request, client_address)
                self.close_request(request)

class TestServerThreadPool(ThreadPoolingMixIn,TestServer):
    def __init__(self, server_address, RequestHandlerClass,
                 packet_size):
        TestServer.__init__(self, server_address, RequestHandlerClass,
                            packet_size)
        self.init_thread_pool(2, 200, 20)

class TestRequestHandler(StreamRequestHandler):

    def __init__(self, request, client_address, server):
        self.t0 = time.time()
        StreamRequestHandler.__init__(self, request,
                                      client_address, server)

    def handle(self):
        self.wfile.write('a'*(self.server.packet_size))

        t1 = time.time()
        (n, avg_t, rate) = self.server.update_stats(self.t0, t1)
        if n % 10000 == 0:
            print('rate=%.2f ' % rate)
            self.server.reset_stats()

from optparse import OptionParser

def server(o):
    HandlerClass = TestRequestHandler

    if o.server == "threading":
        ServerClass = TestServer
    elif o.server == "threadpooling":
        ServerClass = TestServerThreadPool
    else:
        return

    server_address = ('', o.port)
    try:
        srv = ServerClass(server_address, HandlerClass,
                          o.packet_size)

        sa = srv.socket.getsockname()
        print "Serving on", sa[0], "port", sa[1], "..."
        srv.serve_forever()
    except Exception, val:
        print "Exception: %s" % str(val)
        raise

def client(o):
    for f in xrange(0,o.n_clients):
        if os.fork():
            while True:
                try:
                    sock = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
                    sock.connect(("localhost",o.port))
                    while len(sock.recv(4096)):
                        pass
                    sock.close()
                except Exception, val:
                    print val
                    time.sleep(1)

if __name__ == '__main__':
    args = sys.argv[1:]
    usage = "usage: %prog [options]"
    parser = OptionParser(usage)
    parser.add_option( "-p", "--port", help="Server port",
                       type="int", default=8123 )
    parser.add_option( "", "--n-clients", help="Number of client
forks",
                       type="int", default=0 )
    parser.add_option( "", "--server",
                       help="Type of the server (threading or
threadpooling)",
                       type="string", default="" )
    parser.add_option( "", "--packet-size", help="Packet size",
                       type="int", default=16*1024 )
    (o,a) = parser.parse_args(args)

    if os.fork() == 0:
        server(o)
    else:
        client(o)



More information about the Python-list mailing list