Problem in Multiprocessing module

Terry Reedy tjreedy at udel.edu
Fri Oct 11 18:58:44 EDT 2013


On 10/11/2013 10:53 AM, William Ray Wing wrote:
> I'm running into a problem in the multiprocessing module.
>
> My code is running four parallel processes which are doing network access completely independently of each other (gathering data from different remote sources).  On rare circumstances, the code blows up when one of my processes has do start doing some error recovery.  I strongly suspect it is because there is a time-out that isn't being caught in the multiprocessing lib, and that in turn is exposing the TypeError.  Note that the error is "cannot concatenate 'str' and 'NoneType' objects and it is occurring way down in the multiprocessing library.
>
> I'd really appreciate it if someone more knowledgeable about multiprocessing could confirm (or refute) my suspicion and then tell me how to fix things up.
>
> I'm running python 2.7.5 on a Mac OS-X 10.8.5

The version is important, see below.

> The traceback I get is:

After moving the last line to the top. Better to cut and paste as is.

> TypeError: cannot concatenate 'str' and 'NoneType' objects

To understand an exception, you must know what sort of expression could 
cause it. In 2.7, this arises from something like
 >>> 'a'+None

Traceback (most recent call last):
   File "<pyshell#0>", line 1, in <module>
     'a'+None
TypeError: cannot concatenate 'str' and 'NoneType' objects

In 3.x, the same expression generates
TypeError: Can't convert 'NoneType' object to str implicitly

The equivalent join expression gives a different message in 2.7 (and 
nearly the same in 3.3):

 >>> ''.join(('a', None))

Traceback (most recent call last):
   File "<pyshell#1>", line 1, in <module>
     ''.join(('a', None))
TypeError: sequence item 1: expected string, NoneType found

> File "/Users/wrw/Dev/Python/Connection_Monitor/Version3.0/CM_Harness.py", line 20, in <module>
>   my_pool = pool.map(monitor, targets)    # and hands off to four targets
> File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 250, in map
>   return self.map_async(func, iterable, chunksize).get()
> File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 554, in get
>   raise self._value
>
> To save you-all some time:
>
> The "get" function at line 554 in pool.py (which is in the multiprocessing lib) is:

class ApplyResult(object):

>     def get(self, timeout=None):
>         self.wait(timeout)

This must set self._ready, self._success, and self._value

>         if not self._ready:
>             raise TimeoutError

This did not happen, so self._ready must be True

>         if self._success:
>             return self._value

This did not happen, so self._success must be False

>         else:
>             raise self._value

This did, and self._value is the TypeError reported.
Let us look into self.wait and see if we can find where there is a 
string1 + string2 expression and then figure out how string2 might be None.

class ApplyResult(object):
     def __init__(self, cache, callback):
         self._cond = threading.Condition(threading.Lock())
         ...

threading.Condition is a (useless) function that returns a class 
_Condition object.

     def wait(self, timeout=None):
         self._cond.acquire()
         try:
             if not self._ready:
                 self._cond.wait(timeout)
         finally:
             self._cond.release()

so it seems that we need to look at the _Condition methods acquire, 
release, and wait. (The first two are lock methods
         self.acquire = lock.acquire
         self.release = lock.release
). However, this seems wrong because self._cond has no reference to self 
and hence cannot set self attributes.  The problem must be in some 
callback that is called while waiting. Async is terrible to debug 
because the call stack in the traceback ends with wait and does not tell 
us what function was called during the wait.

After the .get method is ._set, which starts

     def _set(self, i, obj):
         self._success, self._value = obj
         # and goes on to set
This is the only place where self._value is set, so it must have been 
called during the wait.

It is only used in Pool._handle_results where the relevant lines are
     @staticmethod
     def _handle_results(outqueue, get, cache):
...
                 task = get()
...
             job, i, obj = task
...
                 cache[job]._set(i, obj)

We need to find out what get is. _handle_results is only used in 
Pool.__init__:

         self._result_handler = threading.Thread(
             target=Pool._handle_results,
             args=(self._outqueue, self._quick_get, self._cache)
             )

so when _handle_results is called, get = self._quick_get, and what is 
that? .__init__ starts with self._setup_queues(self) and that has
         from .queues import SimpleQueue  # relative import
         self._outqueue = SimpleQueue()
         self._quick_get = self._outqueue._reader.recv

SimpleQueue._reader is the first member of the pair returned by 
multiprocessing.Pipe(duplex=False), which calls 
multiprocessing.connection.Pipe(duplex). For duplex=False, that in turn 
returns two _multiprocessing.Connections (not Windows) or 
PipeConnections (Windows). So it seems that get = _quick_get = 
_multiprocessing(Pipe)Connection.recv.

I tried looking at the C code for conn_recv_string() in
Modules/_multiprocessing/pipe_connection and conn_recv_bytes in 
.../socket_connection, but did not get very far. I do not see how a 
triple could be returned, nor any string concatenation. If I am on the 
right track, the concatenation would have to be deeper in one of the 
called functions.

But maybe I am on the wrong track and the problem is in your monitor 
program, or whatever part gets called as part of the callback.

-- 
Terry Jan Reedy




More information about the Python-list mailing list