A multi-threaded file searcher for processing large text files

Mahmoud Abdelkader mabdelkader at gmail.com
Fri Jul 17 12:32:12 EDT 2009


I'm building a threaded file searcher that uses some of Fredrik Lundh's (
http://effbot.org/zone/wide-finder.htm) suggestions for parsing text very
quickly in pure python, as I have about a 10GB log file to parse every day.
A naiive approach would be to just parse the 1MB chunks, add the results
into a list, and just traverse that list.

I want to take this idea a bit further. I want to process results as they're
being found. A great way to implement this is to use the Queue class that
python provides. My idea is to exploit the iterator protocol to have it
block until a result is found, if any, and return the result until we're
finished parsing the file then we can raise StopIteration.

My idea is sort of similar to a producer / consumer, but it follows
something of this idiom:
  producer produces the file chunks
  consumer consumes the file chunks
     -> consumer parsers the file chunks and produces results
  class waits on the production of the original consumer and processes it as
they come.

I am having a bit of trouble with the concurrency, but I'm using this as an
exercise to understand how concurrency works from a broader scale. I am not
trying to get into a debate of whether this is really needed or a
python-concurrency debate:)

Without further ado, my class is as follows:

class ThreadedFileSearcher(object):
    def __init__(self, filename, rx_pat, blocking_timeout = 10):
        self.name = filename
        self.pattern = rx_pat
        self.blocking_timeout = blocking_timeout

        #need to find a better way to do this with more threads that can
return
        #stable results (aka chunks are in order)
        self._thread_count = 1

        #the queues
        self._results = Queue.Queue()
        self._queue = Queue.Queue()

        #returns the get_chunk() implementation
        self._engine = LogParsingEngine(filename)

        #start acquiring file offsets for the file
        #as daemon threads
        self._initialize_worker_threads(self._prime_queue)

        #experimental...should probably be some type of conditional variable
        self._stop_processing = False

    def __iter__(self):
        #start the worker threads
        self._initialize_worker_threads(self._target_worker)
        return self.next()

    def _initialize_worker_threads(self, callback):
        #should really use just one thread
        for i in xrange(self._thread_count):
            t = threading.Thread(target=callback)
            t.setDaemon(True)
            t.start()

    def _prime_queue(self):
        """put code chunk offsets on the queue"""
        #get_chunks() just returns 1MB offsets in the file
        for chunk in self._engine.get_chunks():
            self._queue.put(chunk)

    def _target_worker(self):
        """code chunk to parse queue"""
        #loop infinitely
        while True:
            try:
                #get next chunk offset from the queue
                start_pos, bytes_to_read = self._queue.get(
                                              timeout=self.blocking_timeout
                                           )
            except (TypeError, Queue.Empty):
                #None was returned from the .get()
                #this will throw a TypeError as it tries to unpack None
                #or the Queue was empty
                self._stop_processing = True
                #exit loop
                break

            #process the cunk here
            f = open(self.name, 'r')
            f.seek(start_pos)
            #find all matching lines in the chunk
            for chunk in self.pattern.findall(f.read(bytes_to_read)):
                #an non-overlapping matches of self.pattern
                #placed on the queue as a string
                self._results.put(chunk)
            f.close()

            #done!
            self._queue.task_done()

    def next(self):
        while True:
            try:
                #wait for the results to be put on
                matchedlist =
self._results.get(timeout=self.blocking_timeout)
            except Queue.Empty:
                #if the worker thread finished
                if self._stop_processing:
                    raise StopIteration
            else:
                self._results.task_done()
                yield matchedlist

To use the following class, I wanted to have some kind of interface like
this:

regex = re.compile("-{3}Processing..-{3}") #---Processing..---
f = ThreadedFileSearcher("LogFile.log", regex)
for line in f:
    #guaranteed to be a line that matches regex
    #process something...
    print line


I am looking for some suggestions, comments, and better ways to modify this.


One thing someone will realize when using this class is that the initial
parsing will be incredibly quick, but if the blocking_timeout is set to 10,
then there will be a 10second wait at the end to test if the worker threads
should set the stop conditions. A glaring hole leading from this is if the
blocking_timeout is set to something like 1second and by the time a user
attempts to iterate over the results, the worker threads will prematurely
stop processing.

Speaking of stop processing, should self._stop_processing be a conditional
variable. Right now it's a boolean, and I think that's pretty hacky. I don't
like the StopIteration stop condition, maybe someone has a better
suggestion?

A future modification I'm looking for is to launch multiple threads that
process different parts of the file (different chunks) and return their
results, probably indexed by their chunk offset. Then I can iterate over
that sequentially. I think that would be a trivial parallel optimization.

Thoughts? Comments?

Thanks very much,

Mahmoud Abdelkader
mahmoud at linux.com
http://blog.mahmoudimus.com/
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-list/attachments/20090717/75ba6329/attachment.html>


More information about the Python-list mailing list