ThreadPoolingMixIn

Giampaolo Rodola' gnewsg at gmail.com
Sat May 31 15:40:15 EDT 2008


On 30 Mag, 22:40, pavel.uva... at gmail.com wrote:
> Hi, everybody!
>
> I wrote a useful class ThreadPoolingMixIn which can be used to create
> fast thread-based servers. This mix-in works much faster than
> ThreadingMixIn because it doesn't create a new thread on each request.
>
> Is it worth including in SocketServer.py?
>
> from __future__ import with_statement
> from SocketServer import ThreadingMixIn
> import threading
> import Queue
> class ThreadPoolingMixIn(ThreadingMixIn):
>     """Mix-in class to handle requests in a thread
> pool.
>
>     The pool grows and thrinks depending on
> load.
>
>     For instance, a threading UDP server class is created as
> follows:
>
>     class ThreadPoolingUDPServer(ThreadPoolingMixIn, UDPServer):
> pass
>
>     """
>     __author__ = 'Pavel Uvarov <pavel.uva... at gmail.com>'
>
>     def init_thread_pool(self, min_workers = 5,
>                          max_workers = 100, min_spare_workers = 5):
>         """Initialize thread pool."""
>         self.q = Queue.Queue()
>         self.min_workers = min_workers
>         self.max_workers = max_workers
>         self.min_spare_workers = min_spare_workers
>         self.num_workers = 0
>         self.num_busy_workers = 0
>         self.workers_mutex = threading.Lock()
>         self.start_workers(self.min_workers)
>
>     def start_workers(self, n):
>         """Start n workers."""
>         for i in xrange(n):
>             t = threading.Thread(target = self.worker)
>             t.setDaemon(True)
>             t.start()
>
>     def worker(self):
>         """A function of a working
> thread.
>
>         It gets a request from queue (blocking if
> there
>         are no requests) and processes
> it.
>
>         After processing it checks how many spare
> workers
>         are there now and if this value is greater
> than
>         self.min_spare_workers then the worker
> exits.
>         Otherwise it loops
> infinitely.
>
>         """
>         with self.workers_mutex:
>             self.num_workers += 1
>         while True:
>             (request, client_address) = self.q.get()
>             with self.workers_mutex:
>                 self.num_busy_workers += 1
>             self.process_request_thread(request, client_address)
>             self.q.task_done()
>             with self.workers_mutex:
>                 self.num_busy_workers -= 1
>                 if self.num_workers - self.num_busy_workers > \
>                         self.min_spare_workers:
>                     self.num_workers -= 1
>                     return
>
>     def process_request(self, request, client_address):
>         """Puts a request into
> queue.
>
>         If the queue size is too large, it adds extra
> worker.
>
>         """
>         self.q.put((request, client_address))
>         with self.workers_mutex:
>             if self.q.qsize() > 3 and self.num_workers <
> self.max_workers:
>                 self.start_workers(1)
>
>     def join(self):
>         """Wait for all busy threads"""
>         self.q.join()

This is not the right place to discuss about such a thing.
Post this same message on python-dev ml or, even better, open a new
ticket on the bug tracker attaching the patch and, most important, a
benchmark demonstrating the speed improvement.

--- Giampaolo
http://code.google.com/p/pyftpdlib/



More information about the Python-list mailing list