ThreadPoolingMixIn

pavel.uvarov at gmail.com pavel.uvarov at gmail.com
Fri May 30 16:40:13 EDT 2008


Hi, everybody!

I wrote a useful class ThreadPoolingMixIn which can be used to create
fast thread-based servers. This mix-in works much faster than
ThreadingMixIn because it doesn't create a new thread on each request.

Is it worth including in SocketServer.py?


from __future__ import with_statement
from SocketServer import ThreadingMixIn
import threading
import Queue
class ThreadPoolingMixIn(ThreadingMixIn):
    """Mix-in class to handle requests in a thread
pool.

    The pool grows and thrinks depending on
load.

    For instance, a threading UDP server class is created as
follows:

    class ThreadPoolingUDPServer(ThreadPoolingMixIn, UDPServer):
pass

    """
    __author__ = 'Pavel Uvarov <pavel.uvarov at gmail.com>'

    def init_thread_pool(self, min_workers = 5,
                         max_workers = 100, min_spare_workers = 5):
        """Initialize thread pool."""
        self.q = Queue.Queue()
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.min_spare_workers = min_spare_workers
        self.num_workers = 0
        self.num_busy_workers = 0
        self.workers_mutex = threading.Lock()
        self.start_workers(self.min_workers)

    def start_workers(self, n):
        """Start n workers."""
        for i in xrange(n):
            t = threading.Thread(target = self.worker)
            t.setDaemon(True)
            t.start()

    def worker(self):
        """A function of a working
thread.

        It gets a request from queue (blocking if
there
        are no requests) and processes
it.

        After processing it checks how many spare
workers
        are there now and if this value is greater
than
        self.min_spare_workers then the worker
exits.
        Otherwise it loops
infinitely.

        """
        with self.workers_mutex:
            self.num_workers += 1
        while True:
            (request, client_address) = self.q.get()
            with self.workers_mutex:
                self.num_busy_workers += 1
            self.process_request_thread(request, client_address)
            self.q.task_done()
            with self.workers_mutex:
                self.num_busy_workers -= 1
                if self.num_workers - self.num_busy_workers > \
                        self.min_spare_workers:
                    self.num_workers -= 1
                    return

    def process_request(self, request, client_address):
        """Puts a request into
queue.

        If the queue size is too large, it adds extra
worker.

        """
        self.q.put((request, client_address))
        with self.workers_mutex:
            if self.q.qsize() > 3 and self.num_workers <
self.max_workers:
                self.start_workers(1)

    def join(self):
        """Wait for all busy threads"""
        self.q.join()



More information about the Python-list mailing list