help with multiprocessing pool

Craig Yoshioka craigyk at me.com
Tue Jan 25 20:19:00 EST 2011


Hi all, 

I could really use some help with a problem I'm having.
I wrote a function that can take a pattern of actions and it apply it to the filesystem.
It takes a list of starting paths, and a pattern like this:

pattern = {
            InGlob('Test/**'):{
		MatchRemove('DS_Store'):[],
                NoMatchAdd('(alhpaID_)|(DS_Store)','warnings'):[],
                MatchAdd('alphaID_','alpha_found'):[],
		InDir('alphaID_'):{
                    NoMatchAdd('(betaID_)|(DS_Store)','warnings'):[],
                    InDir('betaID_'):{
                        NoMatchAdd('(gammaID_)|(DS_Store)','warnings'):[],
                        MatchAdd('gammaID_','gamma_found'):[] }}}}

so if you run evalFSPattern(['Volumes/**'],pattern) it'll return a dictionary where:

dict['gamma_found'] = [list of paths that matched] (i.e. '/Volumes/HD1/Test/alphaID_3382/betaID_38824/gammaID_848384')
dict['warning'] = [list of paths that failed to match] (ie. '/Volumes/HD1/Test/alphaID_3382/gammaID_47383') 

Since some of these volumes are on network shares I also wanted to parallelize this so that it would not block on IO.  I started the parallelization by using multiprocessing.Pool and got it to work if I ran the fsparser from the interpreter.  It ran in *much* less time and produced correct output that matched the non-parallelized version.  The problem begins if I then try to use the parallelized function from within the code.

For example I wrote a class whose instances are created around valid FS paths, that are cached to reduce expensive FS lookups.

class Experiment(object):
	
	SlidePaths = None

	@classmethod
        def getSlidePaths(cls):
              if cls.SlidePaths == None:
                   cls.SlidePaths = fsparser(['/Volumes/**'],pattern)
	      return cls.SlidePaths
	
	@classmethod
	def lookupPathWithGammaID(cls,id):
                paths = cls.getSlidePaths()
                ...
                return paths[selected]

	@classmethod
        def fromGamaID(cls,id):
        	path = cls.lookupPathWithGammaID(id)
                return cls(path)
	
	def __init__(self,path)
		self.Path = path
		...
	
	...	

If I do the following from the interpreter it works:

>>>from experiment import Experiment
>>>expt = Experiment.fromGammaID(10102)

but if I write a script called test.py:

from experiment import Experiment
expt1 = Experiment.fromGammaID(10102)
expt2 = Experiment.fromGammaID(10103)
comparison = expt1.compareTo(expt2)

it fails, if I try to import it or run it from bash prompt:

>>> from test import comparison (hangs forever)
$ python test.py (hangs forever)

I would really like some help trying to figure this out...  I thought it should work easily since all the spawned processes don't share data or state (their results are merged in the main thread).  The classes used in the pattern are also simple python objects (use python primitives).


These are the main functions:

def mapAction(pool,paths,action):
    merge = {'next':[]}
    for result in pool.map(action,paths):
        if result == None:
            continue
        merge = mergeDicts(merge,result)
    return merge


def mergeDicts(d1,d2):
    for key in d2:
        if key not in d1:
            d1[key] = d2[key]
        else:
            d1[key] += d2[key]
    return d1


def evalFSPattern(paths,pattern):
    pool = Pool(10)
    results = {}
    for action in pattern:
        tomerge1 = mapAction(pool,paths,action)
        tomerge2 = evalFSPattern(tomerge1['next'],pattern[action])
        del tomerge1['next']
        results = mergeDicts(results,tomerge1)
        results = mergeDicts(results,tomerge2)
    return results

the classes used in the pattern (InGlob,NoMatchAdd,etc.) are callable classes that take a single parameter (a path) and return a dict result or None which makes them trivial to adapt to Pool.map.

Note if I change the mapAction function to:

def mapAction(pool,paths,action):
    merge = {'next':[]}
    for path in paths:
         result = action(path)
         if result == None:
            continue
        merge = mergeDicts(merge,result)
    return merge

everything works just fine.


Thanks.





More information about the Python-list mailing list