problem with multiprocessing and defaultdict

Robert Kern robert.kern at gmail.com
Mon Jan 11 18:31:52 EST 2010


On 2010-01-11 17:15 PM, wiso wrote:
> I'm using a class to read some data from files:
>
> import multiprocessing
> from collections import defaultdict
>
> def SingleContainer():
>      return list()
>
>
> class Container(defaultdict):
>      """
>      this class store odd line in self["odd"] and even line in self["even"].
>      It is stupid, but it's only an example.
>      """
>      def __init__(self,file_name):
>          if type(file_name) != str:
>              raise AttributeError, "%s is not a string" % file_name
>          defaultdict.__init__(self,SingleContainer)
>          self.file_name = file_name
>          self.readen_lines = 0
>      def read(self):
>          f = open(self.file_name)
>          print "start reading file %s" % self.file_name
>          for line in f:
>              self.readen_lines += 1
>              values = line.split()
>              key = {0: "even", 1: "odd"}[self.readen_lines %2]
>              self[key].append(values)
>          print "readen %d lines from file %s" % (self.readen_lines,
> self.file_name)
>
> """
> Now I want to read more than one file per times
> """
>
> def do(file_name):
>      container = Container(file_name)
>      container.read()
>      return container
>
> if __name__ == "__main__":
>      file_names = ["prova_200909.log", "prova_200910.log"]
>      pool = multiprocessing.Pool(len(file_names))
>      result = pool.map(do,file_names)
>      pool.close()
>      pool.join()
>      print "Finish"
>
>
>
> but I got:
> start reading file prova_200909.log
> start reading file prova_200910.log
> readen 142 lines from file prova_200909.log
> readen 160 lines from file prova_200910.log
> Exception in thread Thread-2:
> Traceback (most recent call last):
>    File "/usr/lib64/python2.6/threading.py", line 522, in __bootstrap_inner
>      self.run()
>    File "/usr/lib64/python2.6/threading.py", line 477, in run
>      self.__target(*self.__args, **self.__kwargs)
>    File "/usr/lib64/python2.6/multiprocessing/pool.py", line 259, in
> _handle_results
>      task = get()
>    File "main2.py", line 11, in __init__
>      raise AttributeError, "%s is not a string" % file_name
> AttributeError: (AttributeError('<function SingleContainer at
> 0x7f08b253d938>  is not a string',),<class '__main__.Container'>, (<function
> SingleContainer at 0x7f08b253d938>,))
>
>
> the problem is when pool share objects, but why is it calling
> Container.__init__ with a Container parameter?

When you return the container from do() in the worker process, it must be 
pickled in order to be sent over the socket. You did not override the 
implementation of the .__reduce_ex__() method, so it used defaultdict's version 
which passes the factory function as the first argument to the constructor.

I would recommend passing back the container.items() list instead of a Container 
instance as the easiest path forward.

-- 
Robert Kern

"I have come to believe that the whole world is an enigma, a harmless enigma
  that is made terrible by our own mad attempt to interpret it as though it had
  an underlying truth."
   -- Umberto Eco




More information about the Python-list mailing list