help with multiprocessing pool

Philip Semanchuk philip at semanchuk.com
Thu Jan 27 09:39:41 EST 2011


On Jan 25, 2011, at 8:19 PM, Craig Yoshioka wrote:

> Hi all, 
> 
> I could really use some help with a problem I'm having.


Hiya Craig,
I don't know if I can help, but it's really difficult to do without a full working example. 

Also, your code has several OS X-isms in it so I guess that's the platform you're on. But in case you're on Windows, note that that platform requires some extra care when using multiprocessing:
http://docs.python.org/library/multiprocessing.html#windows


Good luck
Philip


> I wrote a function that can take a pattern of actions and it apply it to the filesystem.
> It takes a list of starting paths, and a pattern like this:
> 
> pattern = {
>            InGlob('Test/**'):{
> 		MatchRemove('DS_Store'):[],
>                NoMatchAdd('(alhpaID_)|(DS_Store)','warnings'):[],
>                MatchAdd('alphaID_','alpha_found'):[],
> 		InDir('alphaID_'):{
>                    NoMatchAdd('(betaID_)|(DS_Store)','warnings'):[],
>                    InDir('betaID_'):{
>                        NoMatchAdd('(gammaID_)|(DS_Store)','warnings'):[],
>                        MatchAdd('gammaID_','gamma_found'):[] }}}}
> 
> so if you run evalFSPattern(['Volumes/**'],pattern) it'll return a dictionary where:
> 
> dict['gamma_found'] = [list of paths that matched] (i.e. '/Volumes/HD1/Test/alphaID_3382/betaID_38824/gammaID_848384')
> dict['warning'] = [list of paths that failed to match] (ie. '/Volumes/HD1/Test/alphaID_3382/gammaID_47383') 
> 
> Since some of these volumes are on network shares I also wanted to parallelize this so that it would not block on IO.  I started the parallelization by using multiprocessing.Pool and got it to work if I ran the fsparser from the interpreter.  It ran in *much* less time and produced correct output that matched the non-parallelized version.  The problem begins if I then try to use the parallelized function from within the code.
> 
> For example I wrote a class whose instances are created around valid FS paths, that are cached to reduce expensive FS lookups.
> 
> class Experiment(object):
> 	
> 	SlidePaths = None
> 
> 	@classmethod
>        def getSlidePaths(cls):
>              if cls.SlidePaths == None:
>                   cls.SlidePaths = fsparser(['/Volumes/**'],pattern)
> 	      return cls.SlidePaths
> 	
> 	@classmethod
> 	def lookupPathWithGammaID(cls,id):
>                paths = cls.getSlidePaths()
>                ...
>                return paths[selected]
> 
> 	@classmethod
>        def fromGamaID(cls,id):
>        	path = cls.lookupPathWithGammaID(id)
>                return cls(path)
> 	
> 	def __init__(self,path)
> 		self.Path = path
> 		...
> 	
> 	...	
> 
> If I do the following from the interpreter it works:
> 
>>>> from experiment import Experiment
>>>> expt = Experiment.fromGammaID(10102)
> 
> but if I write a script called test.py:
> 
> from experiment import Experiment
> expt1 = Experiment.fromGammaID(10102)
> expt2 = Experiment.fromGammaID(10103)
> comparison = expt1.compareTo(expt2)
> 
> it fails, if I try to import it or run it from bash prompt:
> 
>>>> from test import comparison (hangs forever)
> $ python test.py (hangs forever)
> 
> I would really like some help trying to figure this out...  I thought it should work easily since all the spawned processes don't share data or state (their results are merged in the main thread).  The classes used in the pattern are also simple python objects (use python primitives).
> 
> 
> These are the main functions:
> 
> def mapAction(pool,paths,action):
>    merge = {'next':[]}
>    for result in pool.map(action,paths):
>        if result == None:
>            continue
>        merge = mergeDicts(merge,result)
>    return merge
> 
> 
> def mergeDicts(d1,d2):
>    for key in d2:
>        if key not in d1:
>            d1[key] = d2[key]
>        else:
>            d1[key] += d2[key]
>    return d1
> 
> 
> def evalFSPattern(paths,pattern):
>    pool = Pool(10)
>    results = {}
>    for action in pattern:
>        tomerge1 = mapAction(pool,paths,action)
>        tomerge2 = evalFSPattern(tomerge1['next'],pattern[action])
>        del tomerge1['next']
>        results = mergeDicts(results,tomerge1)
>        results = mergeDicts(results,tomerge2)
>    return results
> 
> the classes used in the pattern (InGlob,NoMatchAdd,etc.) are callable classes that take a single parameter (a path) and return a dict result or None which makes them trivial to adapt to Pool.map.
> 
> Note if I change the mapAction function to:
> 
> def mapAction(pool,paths,action):
>    merge = {'next':[]}
>    for path in paths:
>         result = action(path)
>         if result == None:
>            continue
>        merge = mergeDicts(merge,result)
>    return merge
> 
> everything works just fine.
> 
> 
> Thanks.
> 
> 
> -- 
> http://mail.python.org/mailman/listinfo/python-list




More information about the Python-list mailing list