Multi thread reading a file

Stefan Behnel stefan_ml at behnel.de
Thu Jul 2 00:50:54 EDT 2009


Mag Gam wrote:
> On Wed, Jul 1, 2009 at 12:54 AM, Gabriel Genellina wrote:
>> Does the format conversion involve a significant processing time? If not,
>> the total time is dominated by the I/O time (reading and writing the file)
>> so it's doubtful you gain anything from multiple threads.
> 
> The format does inolve significant time processing each line.
>
>>> Here is the pseudo code I was thinking about:
>>>
>>> Let T  = Total number of lines in a file, Example 1000000 (1 million
>>> files)
>>> Let B = Total number of lines in a buffer, for example 10000 lines
>>>
>>>
>>> Create a thread to read until buffer
>>> Create another thread to read buffer+buffer  ( So we have 2 threads
>>> now. But since the file is zipped I have to wait until the first
>>> thread is completed. Unless someone knows of a clever technique.
>>> Write the content of thread 1 into a numpy array
>>> Write the content of thread 2 into a numpy array
>> Can you process each line independently? Is the record order important? If
>> not (or at least, some local dis-ordering is acceptable) you may use a few
>> worker threads (doing the conversion), feed them thru a Queue object, put
>> the converted lines into another Queue, and let another thread write the
>> results onto the destination file.
> 
> Yes, each line can be independent. The original file is a time series
> file which I am placing it into a Numpy array therefore I don't think
> the record order is important.  The writing is actually done when I
> place a value into a "dset" object.
> 
> Let me show you what I mean.
> reader=csv.reader(open("100000.csv"))

Have you tried if using the csv reader here is actually fast enough for
you? Maybe you can just .split(',') each line or something (no idea what
else the csv reader may do, but anyway...)


> for s,row in enumerate(reader):
>  if s!=0 and s%bufsize==0:
>       dset[s-bufsize:s] = t  #here is where I am writing the data to
> the data structure. Using a range or broadcasting.

Erm, what's "t"? And where do you think the "significant" processing time
comes from in your example? From your code, that's totally non-obvious to me.


>   #15 columns
>  if len(row) != 15:
>    break
> 
>  t[s%bufsize] = tuple(row)
> 
> #Do this all the way at the end for flushing.
> if (s%bufsize != 0):
>    dset[(s//bufsize)*bufsize:s]=t[0:s%bufsize]

If you *really* think your code is not I/O bound, but that the code for
writing the data into the NumPy array is the slow part (which I actually
doubt, but anyway), you can try writing your code in Cython, which has
direct support for writing to NumPy arrays through their buffer interface.

http://docs.cython.org/docs/numpy_tutorial.html

Stefan



More information about the Python-list mailing list