Multi thread reading a file

Mag Gam magawake at gmail.com
Wed Jul 1 07:41:29 EDT 2009


Thanks for the response Gabriel.



On Wed, Jul 1, 2009 at 12:54 AM, Gabriel
Genellina<gagsl-py2 at yahoo.com.ar> wrote:
> En Tue, 30 Jun 2009 22:52:18 -0300, Mag Gam <magawake at gmail.com> escribió:
>
>> I am very new to python and I am in the process of loading a very
>> large compressed csv file into another format.  I was wondering if I
>> can do this in a multi thread approach.
>
> Does the format conversion involve a significant processing time? If not,
> the total time is dominated by the I/O time (reading and writing the file)
> so it's doubtful you gain anything from multiple threads.

The format does inolve significant time processing each line.
>
>> Here is the pseudo code I was thinking about:
>>
>> Let T  = Total number of lines in a file, Example 1000000 (1 million
>> files)
>> Let B = Total number of lines in a buffer, for example 10000 lines
>>
>>
>> Create a thread to read until buffer
>> Create another thread to read buffer+buffer  ( So we have 2 threads
>> now. But since the file is zipped I have to wait until the first
>> thread is completed. Unless someone knows of a clever technique.
>> Write the content of thread 1 into a numpy array
>> Write the content of thread 2 into a numpy array
>
> Can you process each line independently? Is the record order important? If
> not (or at least, some local dis-ordering is acceptable) you may use a few
> worker threads (doing the conversion), feed them thru a Queue object, put
> the converted lines into another Queue, and let another thread write the
> results onto the destination file.

Yes, each line can be independent. The original file is a time series
file which I am placing it into a Numpy array therefore I don't think
the record order is important.  The writing is actually done when I
place a value into a "dset" object.


Let me show you what I mean.
reader=csv.reader(open("100000.csv"))
for s,row in enumerate(reader):
 if s!=0 and s%bufsize==0:
      dset[s-bufsize:s] = t  #here is where I am writing the data to
the data structure. Using a range or broadcasting.

  #15 columns
 if len(row) != 15:
   break

 t[s%bufsize] = tuple(row)

#Do this all the way at the end for flushing.
if (s%bufsize != 0):
   dset[(s//bufsize)*bufsize:s]=t[0:s%bufsize]












>
> import Queue, threading, csv
>
> def convert(in_queue, out_queue):
>  while True:
>    row = in_queue.get()
>    if row is None: break
>    # ... convert row
>    out_queue.put(converted_line)
>
> def write_output(out_queue):
>  while True:
>    line = out_queue.get()
>    if line is None: break
>    # ... write line to output file
>
> in_queue = Queue.Queue()
> out_queue = Queue.Queue()
> tlist = []
> for i in range(4):
>  t = threading.Thread(target=convert, args=(in_queue, out_queue))
>  t.start()
>  tlist.append(t)
> output_thread = threading.Thread(target=write_output, args=(out_queue,))
> output_thread.start()
>
> with open("...") as csvfile:
>  reader = csv.reader(csvfile, ...)
>  for row in reader:
>    in_queue.put(row)
>
> for t in tlist: in_queue.put(None) # indicate end-of-work
> for t in tlist: t.join() # wait until finished
> out_queue.put(None)
> output_thread.join() # wait until finished
>
> --
> Gabriel Genellina
>
> --
> http://mail.python.org/mailman/listinfo/python-list
>



More information about the Python-list mailing list