Passing data across callbacks in ThreadPoolExecutor

Ian Kelly ian.g.kelly at gmail.com
Thu Feb 18 22:35:19 EST 2016


On Thu, Feb 18, 2016 at 12:06 PM, Joseph L. Casale
<jcasale at activenetwerx.com> wrote:
> On Thur, Feb 17, 2016 at 9:24 AM, Ian Kelly <ian.g.kelly at gmail.com> wrote:
>>> What is the pattern for chaining execution of tasks with ThreadPoolExecutor?
>>> Callbacks is not an adequate facility as each task I have will generate new output.
>>
>> Can you specify in more detail what your use case is?

It's still not clear to me specifically what you're trying to do. It
would really help if you would describe the problem in more detail.
Here's what I think you're trying to do:

1) Submit a task to a ThreadPoolExecutor and get back a future.

2) When the task is complete, submit another task that needs the
result of the first task to do its work.

3) When the chained task is complete, return the final result of the
chained task back to whoever submitted the original task via the
original task's future.

The problem arises in that the original task's future already
completed when the original task did. The chained task sets its result
in a different future that the submitter didn't know about.

>> If you don't mind having threads sitting around waiting, you can just
>> submit each chained task at the start and have each task wait on the
>> futures of its dependencies. The advantage of this is that it's easy
>> to conceptualize the dependency graph of the tasks. The disadvantage
>> is that it eats up extra threads. You'll probably want to increase the
>> size of the thread pool to handle the waiting tasks (or use a separate
>> ThreadPoolExecutor for each chained task).
>
> The thing with callbacks is each one gets the original tasks result. So the
> callback can't pass a new task result up and/or cancel the task "set".

If you start each individual task at time of submission, then you'll
get separate futures for them, and you can add callbacks to each of
the futures individually. Each callback will get the result of the
task it was added to, not the original task. If you only care about
the final result, then you would only add a callback to the future for
the final task.

>> Otherwise, is there some reason you can't use multiple callbacks, one
>> to handle the task's output and one to submit the chained task?
>>
>> E.g.:
>>
>> def chain_task2(input, f2):
>>     f2 = executor.submit(task2, input, f2.result())
>>     f2.add_done_callback(handle_task2_done)
>>
>> f1 = executor.submit(task1, input)
>> f1.add_done_callback(handle_task1_done)
>> f1.add_done_callback(functools.partial(chain_task2, input))
>
> Closing over the executer instance seems potentially race'y. What happens when
> the last task finishes and the executer checks (locking the queue) and the task also
> wants to add more work. I'll check the source, but I had hoped for a more succinct way.

Again, it's not clear to me what scenario you're talking about here.
The executor never "checks" the queue; it's only used to submit tasks.
The only things pulling from the queue are the worker threads
themselves.

Besides, if something is waiting on a queue, it's not going to hold
the lock on the queue while it's waiting; that would make it
impossible to put anything on the queue and it would just end up
waiting forever.



More information about the Python-list mailing list