ThreadPoolingMixIn
Giampaolo Rodola'
gnewsg at gmail.com
Sat May 31 15:40:15 EDT 2008
On 30 Mag, 22:40, pavel.uva... at gmail.com wrote:
> Hi, everybody!
>
> I wrote a useful class ThreadPoolingMixIn which can be used to create
> fast thread-based servers. This mix-in works much faster than
> ThreadingMixIn because it doesn't create a new thread on each request.
>
> Is it worth including in SocketServer.py?
>
> from __future__ import with_statement
> from SocketServer import ThreadingMixIn
> import threading
> import Queue
> class ThreadPoolingMixIn(ThreadingMixIn):
> """Mix-in class to handle requests in a thread
> pool.
>
> The pool grows and thrinks depending on
> load.
>
> For instance, a threading UDP server class is created as
> follows:
>
> class ThreadPoolingUDPServer(ThreadPoolingMixIn, UDPServer):
> pass
>
> """
> __author__ = 'Pavel Uvarov <pavel.uva... at gmail.com>'
>
> def init_thread_pool(self, min_workers = 5,
> max_workers = 100, min_spare_workers = 5):
> """Initialize thread pool."""
> self.q = Queue.Queue()
> self.min_workers = min_workers
> self.max_workers = max_workers
> self.min_spare_workers = min_spare_workers
> self.num_workers = 0
> self.num_busy_workers = 0
> self.workers_mutex = threading.Lock()
> self.start_workers(self.min_workers)
>
> def start_workers(self, n):
> """Start n workers."""
> for i in xrange(n):
> t = threading.Thread(target = self.worker)
> t.setDaemon(True)
> t.start()
>
> def worker(self):
> """A function of a working
> thread.
>
> It gets a request from queue (blocking if
> there
> are no requests) and processes
> it.
>
> After processing it checks how many spare
> workers
> are there now and if this value is greater
> than
> self.min_spare_workers then the worker
> exits.
> Otherwise it loops
> infinitely.
>
> """
> with self.workers_mutex:
> self.num_workers += 1
> while True:
> (request, client_address) = self.q.get()
> with self.workers_mutex:
> self.num_busy_workers += 1
> self.process_request_thread(request, client_address)
> self.q.task_done()
> with self.workers_mutex:
> self.num_busy_workers -= 1
> if self.num_workers - self.num_busy_workers > \
> self.min_spare_workers:
> self.num_workers -= 1
> return
>
> def process_request(self, request, client_address):
> """Puts a request into
> queue.
>
> If the queue size is too large, it adds extra
> worker.
>
> """
> self.q.put((request, client_address))
> with self.workers_mutex:
> if self.q.qsize() > 3 and self.num_workers <
> self.max_workers:
> self.start_workers(1)
>
> def join(self):
> """Wait for all busy threads"""
> self.q.join()
This is not the right place to discuss about such a thing.
Post this same message on python-dev ml or, even better, open a new
ticket on the bug tracker attaching the patch and, most important, a
benchmark demonstrating the speed improvement.
--- Giampaolo
http://code.google.com/p/pyftpdlib/
More information about the Python-list
mailing list