help with multiprocessing pool

Philip Semanchuk philip at semanchuk.com
Thu Jan 27 13:38:00 EST 2011


On Jan 27, 2011, at 1:12 PM, Craig Yoshioka wrote:

> The code will be multi-platform.  The OSXisms are there as an example, though I am developing on OS X machine.
> 
> I've distilled my problem down to a simpler case, so hopefully that'll help troubleshoot.  
> 
> I have 2 files:
> 
> test.py:
> --------------------------------------------------------------
> from multiprocessing import Pool
> 
> def square(x):
>    return x*x
> 
> def squares(numbers):
>     pool = Pool(12)
>     return pool.map(square,numbers)
> 
> 
> test2.py:
> --------------------------------------------------------------
> from test import squares
> 
> maxvalues = squares(range(3))
> print maxvalues
> 
> 
> 
> Now if I import squares into the interactive interpreter:
> 
> from test import squares
> print squares(range(3))
> 
> I get the correct result, but if I try to import maxvalues from test2 the interactive interpreter python hangs.
> if I run the script from bash, though, it seems to run fine. 

The short, complete example is much more useful, but it sounds like it demonstrates a different problem than you first described. Your first posting said that your code worked in the interpreter but failed when run from the command line. This code has the opposite problem. Correct?

> I think it might have something to do with this note in the docs, though I am not sure how to use this information to fix my problem:
> 
> Note: Functionality within this package requires that the __main__ method be importable by the children. This is covered inProgramming guidelines however it is worth pointing out here. This means that some examples, such as themultiprocessing.Pool examples will not work in the interactive interpreter.

I suspect this is the problem with the demo above. Your original code ran fine in the interpreter, though, correct?

bye
Philip


> 
> On Jan 27, 2011, at 6:39 AM, Philip Semanchuk wrote:
> 
>> 
>> On Jan 25, 2011, at 8:19 PM, Craig Yoshioka wrote:
>> 
>>> Hi all, 
>>> 
>>> I could really use some help with a problem I'm having.
>> 
>> 
>> Hiya Craig,
>> I don't know if I can help, but it's really difficult to do without a full working example. 
>> 
>> Also, your code has several OS X-isms in it so I guess that's the platform you're on. But in case you're on Windows, note that that platform requires some extra care when using multiprocessing:
>> http://docs.python.org/library/multiprocessing.html#windows
>> 
>> 
>> Good luck
>> Philip
>> 
>> 
>>> I wrote a function that can take a pattern of actions and it apply it to the filesystem.
>>> It takes a list of starting paths, and a pattern like this:
>>> 
>>> pattern = {
>>>          InGlob('Test/**'):{
>>> 		MatchRemove('DS_Store'):[],
>>>              NoMatchAdd('(alhpaID_)|(DS_Store)','warnings'):[],
>>>              MatchAdd('alphaID_','alpha_found'):[],
>>> 		InDir('alphaID_'):{
>>>                  NoMatchAdd('(betaID_)|(DS_Store)','warnings'):[],
>>>                  InDir('betaID_'):{
>>>                      NoMatchAdd('(gammaID_)|(DS_Store)','warnings'):[],
>>>                      MatchAdd('gammaID_','gamma_found'):[] }}}}
>>> 
>>> so if you run evalFSPattern(['Volumes/**'],pattern) it'll return a dictionary where:
>>> 
>>> dict['gamma_found'] = [list of paths that matched] (i.e. '/Volumes/HD1/Test/alphaID_3382/betaID_38824/gammaID_848384')
>>> dict['warning'] = [list of paths that failed to match] (ie. '/Volumes/HD1/Test/alphaID_3382/gammaID_47383') 
>>> 
>>> Since some of these volumes are on network shares I also wanted to parallelize this so that it would not block on IO.  I started the parallelization by using multiprocessing.Pool and got it to work if I ran the fsparser from the interpreter.  It ran in *much* less time and produced correct output that matched the non-parallelized version.  The problem begins if I then try to use the parallelized function from within the code.
>>> 
>>> For example I wrote a class whose instances are created around valid FS paths, that are cached to reduce expensive FS lookups.
>>> 
>>> class Experiment(object):
>>> 	
>>> 	SlidePaths = None
>>> 
>>> 	@classmethod
>>>      def getSlidePaths(cls):
>>>            if cls.SlidePaths == None:
>>>                 cls.SlidePaths = fsparser(['/Volumes/**'],pattern)
>>> 	      return cls.SlidePaths
>>> 	
>>> 	@classmethod
>>> 	def lookupPathWithGammaID(cls,id):
>>>              paths = cls.getSlidePaths()
>>>              ...
>>>              return paths[selected]
>>> 
>>> 	@classmethod
>>>      def fromGamaID(cls,id):
>>>      	path = cls.lookupPathWithGammaID(id)
>>>              return cls(path)
>>> 	
>>> 	def __init__(self,path)
>>> 		self.Path = path
>>> 		...
>>> 	
>>> 	...	
>>> 
>>> If I do the following from the interpreter it works:
>>> 
>>>>>> from experiment import Experiment
>>>>>> expt = Experiment.fromGammaID(10102)
>>> 
>>> but if I write a script called test.py:
>>> 
>>> from experiment import Experiment
>>> expt1 = Experiment.fromGammaID(10102)
>>> expt2 = Experiment.fromGammaID(10103)
>>> comparison = expt1.compareTo(expt2)
>>> 
>>> it fails, if I try to import it or run it from bash prompt:
>>> 
>>>>>> from test import comparison (hangs forever)
>>> $ python test.py (hangs forever)
>>> 
>>> I would really like some help trying to figure this out...  I thought it should work easily since all the spawned processes don't share data or state (their results are merged in the main thread).  The classes used in the pattern are also simple python objects (use python primitives).
>>> 
>>> 
>>> These are the main functions:
>>> 
>>> def mapAction(pool,paths,action):
>>>  merge = {'next':[]}
>>>  for result in pool.map(action,paths):
>>>      if result == None:
>>>          continue
>>>      merge = mergeDicts(merge,result)
>>>  return merge
>>> 
>>> 
>>> def mergeDicts(d1,d2):
>>>  for key in d2:
>>>      if key not in d1:
>>>          d1[key] = d2[key]
>>>      else:
>>>          d1[key] += d2[key]
>>>  return d1
>>> 
>>> 
>>> def evalFSPattern(paths,pattern):
>>>  pool = Pool(10)
>>>  results = {}
>>>  for action in pattern:
>>>      tomerge1 = mapAction(pool,paths,action)
>>>      tomerge2 = evalFSPattern(tomerge1['next'],pattern[action])
>>>      del tomerge1['next']
>>>      results = mergeDicts(results,tomerge1)
>>>      results = mergeDicts(results,tomerge2)
>>>  return results
>>> 
>>> the classes used in the pattern (InGlob,NoMatchAdd,etc.) are callable classes that take a single parameter (a path) and return a dict result or None which makes them trivial to adapt to Pool.map.
>>> 
>>> Note if I change the mapAction function to:
>>> 
>>> def mapAction(pool,paths,action):
>>>  merge = {'next':[]}
>>>  for path in paths:
>>>       result = action(path)
>>>       if result == None:
>>>          continue
>>>      merge = mergeDicts(merge,result)
>>>  return merge
>>> 
>>> everything works just fine.
>>> 
>>> 
>>> Thanks.
>>> 
>>> 
>>> -- 
>>> http://mail.python.org/mailman/listinfo/python-list
>> 
>> -- 
>> http://mail.python.org/mailman/listinfo/python-list
> 




More information about the Python-list mailing list