Passing data across callbacks in ThreadPoolExecutor

Ian Kelly ian.g.kelly at gmail.com
Fri Feb 19 13:32:27 EST 2016


On Fri, Feb 19, 2016 at 10:18 AM, Joseph L. Casale
<jcasale at activenetwerx.com> wrote:
>> It's still not clear to me specifically what you're trying to do. It
>> would really help if you would describe the problem in more detail.
>> Here's what I think you're trying to do:
>>
>> 1) Submit a task to a ThreadPoolExecutor and get back a future.
>>
>> 2) When the task is complete, submit another task that needs the
>> result of the first task to do its work.
>>
>> 3) When the chained task is complete, return the final result of the
>> chained task back to whoever submitted the original task via the
>> original task's future.
>>
>> The problem arises in that the original task's future already
>> completed when the original task did. The chained task sets its result
>> in a different future that the submitter didn't know about.
>
> Yes, I may have 2 or more tasks that depend on the previous.
> They are each distinct tasks, or continuations, so they each want the result
> of the previous task however each one can cancel the set.
>
> The callback model doesn't apply, its based on the result of _one_ task.
>
> What I need I are continuations, much like the .NET TPL.
>
> def task_a():
>     return "a"
>
> def task_b():
>     return "b"
>
> def task_c():
>     return "c"
>
> So I submit task_a, if it completes successfully, task_b runs on its result.
> If task_b completes successfully, task_c runs on its result. They "look"
> like callbacks, but they are continuations.
>
> Task_b must be able to cancel the continuation of task_c  if it see's task_a
> has failed.

Thanks for the clarification. The first suggestion that I gave in my
initial reply will handle cancellations implicitly. If task_b() is
waiting on future_a.result() and task_a() raises an uncaught
exception, then future_a.result() will reraise the exception. If it
then propagates uncaught out of task_b() then future_b.result() will
again reraise the exception, and task_c() that is waiting on it will
reraise it as well.  See this in action below.

You might also be interested in the promise decorator described at
https://bloerg.net/2013/04/05/chaining-python-futures.html. It does
essentially the same as what I've described, but it abstracts it into
a decorator and allows the caller to determine which arguments to pass
as futures and which to pass as concrete values, instead of
hard-coding the resolution of futures of dependencies into the task
function itself.

In the course of digging that up, I also found this:
https://github.com/dvdotsenko/python-future-then which is brand-new
and looks promising, but I can't see how one would use it with a
ThreadPoolExecutor.


py> import concurrent.futures
py> def task_a():
...     raise ValueError(42)
...
py> def task_b(future_a):
...     return future_a.result()
...
py> def task_c(future_b):
...     return future_b.result()
...
py> executor = concurrent.futures.ThreadPoolExecutor()
py> future_a = executor.submit(task_a)
py> future_b = executor.submit(task_b, future_a)
py> future_c = executor.submit(task_c, future_b)
py> future_c.result()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line
398, in result
    return self.__get_result()
  File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line
357, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.5/concurrent/futures/thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "<stdin>", line 2, in task_c
  File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line
398, in result
    return self.__get_result()
  File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line
357, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.5/concurrent/futures/thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "<stdin>", line 2, in task_b
  File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line
398, in result
    return self.__get_result()
  File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line
357, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.5/concurrent/futures/thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "<stdin>", line 2, in task_a
ValueError: 42



More information about the Python-list mailing list