[Python-Dev] futures API

Brian Quinlan brian at sweetapp.com
Sat Dec 11 21:53:15 CET 2010


On Dec 11, 2010, at 6:44 AM, Thomas Nagy wrote:

> --- El vie, 10/12/10, Brian Quinlan escribió:
>> On Dec 10, 2010, at 10:51 AM, Thomas Nagy wrote:
>>> --- El vie, 10/12/10, Brian Quinlan escribió:
>>>> On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote:
>>>>> I have a process running for a long time, and
>> which
>>>> may use futures of different max_workers count. I
>> think it
>>>> is not too far-fetched to create a new futures
>> object each
>>>> time. Yet, the execution becomes slower after each
>> call, for
>>>> example with http://freehackers.org/~tnagy/futures_test.py:
>>>>>
>>>>> """
>>>>> import concurrent.futures
>>>>> from queue import Queue
>>>>> import datetime
>>>>>
>>>>> class counter(object):
>>>>>       def __init__(self, fut):
>>>>>           self.fut =
>> fut
>>>>>
>>>>>       def run(self):
>>>>>           def
>>>> look_busy(num, obj):
>>>>>
>>>>     tot = 0
>>>>>
>>>>     for x in range(num):
>>>>>
>>>>     tot += x
>>>>>
>>>>     obj.out_q.put(tot)
>>>>>
>>>>>           start =
>>>> datetime.datetime.utcnow()
>>>>>           self.count =
>> 0
>>>>>           self.out_q
>> =
>>>> Queue(0)
>>>>>           for x in
>>>> range(1000):
>>>>>
>>>>     self.count += 1
>>>>>
>>>>     self.fut.submit(look_busy,
>> self.count,
>>>> self)
>>>>>
>>>>>           while
>>>> self.count:
>>>>>
>>>>     self.count -= 1
>>>>>
>>>>     self.out_q.get()
>>>>>
>>>>>           delta =
>>>> datetime.datetime.utcnow() - start
>>>>>
>>>>     print(delta.total_seconds())
>>>>>
>>>>> fut =
>>>>
>> concurrent.futures.ThreadPoolExecutor(max_workers=20)
>>>>> for x in range(100):
>>>>>       # comment the following
>> line
>>>>>       fut =
>>>>
>> concurrent.futures.ThreadPoolExecutor(max_workers=20)
>>>>>       c = counter(fut)
>>>>>       c.run()
>>>>> """
>>>>>
>>>>> The runtime grows after each step:
>>>>> 0.216451
>>>>> 0.225186
>>>>> 0.223725
>>>>> 0.222274
>>>>> 0.230964
>>>>> 0.240531
>>>>> 0.24137
>>>>> 0.252393
>>>>> 0.249948
>>>>> 0.257153
>>>>> ...
>>>>>
>>>>> Is there a mistake in this piece of code?
>>>>
>>>> There is no mistake that I can see but I suspect
>> that the
>>>> circular references that you are building are
>> causing the
>>>> ThreadPoolExecutor to take a long time to be
>> collected. Try
>>>> adding:
>>>>
>>>>      c = counter(fut)
>>>>      c.run()
>>>> +    fut.shutdown()
>>>>
>>>> Even if that fixes your problem, I still don't
>> fully
>>>> understand this because I would expect the runtime
>> to fall
>>>> after a while as ThreadPoolExecutors are
>> collected.
>>>
>>> The shutdown call is indeed a good fix :-) Here is the
>> time response
>>> of the calls to counter() when shutdown is not
>> called:
>>> http://www.freehackers.org/~tnagy/runtime_futures.png
>>
>> FWIW, I think that you are confusion the term "future"
>> with
>> "executor". A future represents a single work item. An
>> executor
>> creates futures and schedules their underlying work.
>
> Ah yes, sorry. I have also realized that the executor is not the  
> killer feature I was expecting, it can only replace a little part of  
> the code I have: controlling the exceptions and the workflow is the  
> most complicated part.
>
> I have also observed a minor performance degradation with the  
> executor replacement (3 seconds for 5000 work items). The amount of  
> work items processed by unit of time does not seem to be a straight  
> line: http://www.freehackers.org/~tnagy/runtime_futures_2.png .

That looks pretty linear to me.

> Out of curiosity, what is the "_thread_references" for?

There is a big comment above it in the code:

# Workers are created as daemon threads. This is done to allow the  
interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's  
thread
# pool (i.e. shutdown() was not called). However, allowing workers to  
die with
# the interpreter has two undesirable properties:
#   - The workers would still be running during interpretor shutdown,
#     meaning that they would fail in unpredictable ways.
#   - The workers could be killed while evaluating a work item, which  
could
#     be bad if the callable being evaluated has external side-effects  
e.g.
#     writing to a file.
#
# To work around this problem, an exit handler is installed which  
tells the
# workers to exit when their work queues are empty and then waits  
until the
# threads finish.

_thread_references = set()
_shutdown = False

def _python_exit():
     global _shutdown
     _shutdown = True
     for thread_reference in _thread_references:
         thread = thread_reference()
         if thread is not None:
             thread.join()

Is it still unclear why it is there? Maybe you could propose some  
additional documentation.

Cheers,
Brian

> The source file for the example is in:
> http://www.freehackers.org/~tnagy/futures_test3.py
>
> The diagram was created by:
> http://www.freehackers.org/~tnagy/futures_test3.plot
>
> Thomas
>
>
>
>
> _______________________________________________
> Python-Dev mailing list
> Python-Dev at python.org
> http://mail.python.org/mailman/listinfo/python-dev
> Unsubscribe: http://mail.python.org/mailman/options/python-dev/brian%40sweetapp.com



More information about the Python-Dev mailing list