[issue9244] multiprocessing.pool: Worker crashes if result can't be encoded

Greg Brockman report at bugs.python.org
Tue Jul 13 17:24:47 CEST 2010


Greg Brockman <gdb at ksplice.com> added the comment:

This looks pretty reasonable to my untrained eye.  I successfully applied and ran the test suite.

To be clear, the errback change and the unpickleable result change are actually orthogonal, right?  Anyway, I'm not really familiar with the protocol here, but assuming that you're open to code review:

> -    def apply_async(self, func, args=(), kwds={}, callback=None):
> +    def apply_async(self, func, args=(), kwds={}, callback=None,
> +            error_callback=None):
>          '''
>          Asynchronous equivalent of `apply()` builtin
>          '''
>          assert self._state == RUN
> -        result = ApplyResult(self._cache, callback)
> +        result = ApplyResult(self._cache, callback, error_callback)
>          self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
>          return result
Sure.  Why not add an error_callback for map_async as well?

> -    def __init__(self, cache, callback):
> +    def __init__(self, cache, callback, error_callback=None):
>          self._cond = threading.Condition(threading.Lock())
>          self._job = job_counter.next()
>          self._cache = cache
>          self._ready = False
>          self._callback = callback
> +        self._errback = error_callback
>          cache[self._job] = self
Any reason you chose to use a different internal name (errback versus error_callback)?   It seems cleaner to me to be consistent about the name.

>  def sqr(x, wait=0.0):
>      time.sleep(wait)
>      return x*x
> +
>  class _TestPool(BaseTestCase):
>      def test_apply(self):
> @@ -1020,6 +1021,7 @@ class _TestPool(BaseTestCase):
>          self.assertEqual(get(), 49)
>          self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
>  
> +
>      def test_async_timeout(self):
In general, I'm wary of nonessential whitespace changes... did you mean to include these?

> +        scratchpad = [None]
> +        def errback(exc):
> +            scratchpad[0] = exc
> +
> +        res = p.apply_async(raising, error_callback=errback)
> +        self.assertRaises(KeyError, res.get)
> +        self.assertTrue(scratchpad[0])
> +        self.assertIsInstance(scratchpad[0], KeyError)
> +
> +        p.close()
Using "assertTrue" seems misleading.  "assertIsNotNone" is what you really mean, right?  Although, I believe that's redundant, since presumably self.assertIsInstance(None, KeyError) will error out anyway (I haven't verified this).


> +    def test_unpickleable_result(self):
> +        from multiprocessing.pool import MaybeEncodingError
> +        p = multiprocessing.Pool(2)
> +
> +        # Make sure we don't lose pool processes because of encoding errors.
> +        for iteration in xrange(20):
> +
> +            scratchpad = [None]
> +            def errback(exc):
> +                scratchpad[0] = exc
> +
> +            res = p.apply_async(unpickleable_result, error_callback=errback)
> +            self.assertRaises(MaybeEncodingError, res.get)
> +            wrapped = scratchpad[0]
> +            self.assertTrue(wrapped)
Again, assertTrue is probably not what you want, and is probably redundant.
> +            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
Why use scratchpad[0] rather than wrapped?
> +            self.assertIsNotNone(wrapped.exc)
> +            self.assertIsNotNone(wrapped.value)
Under what circumstances would these be None?  (Perhaps you want wrapped.exc != 'None'?)  The initializer for MaybeEncodingError enforces the invariant that exc/value are strings, right?


> +
>  class _TestPoolWorkerLifetime(BaseTestCase):
>  
>      ALLOWED_TYPES = ('processes', )
Three line breaks there seems excessive.

----------

_______________________________________
Python tracker <report at bugs.python.org>
<http://bugs.python.org/issue9244>
_______________________________________


More information about the Python-bugs-list mailing list