[Python-Dev] futures API

Thomas Nagy tnagyemail-mail at yahoo.fr
Sun Dec 12 03:32:18 CET 2010


--- El sáb, 11/12/10, Brian Quinlan escribió:
> 
> On Dec 11, 2010, at 6:44 AM, Thomas Nagy wrote:
> 
> > --- El vie, 10/12/10, Brian Quinlan escribió:
> >> On Dec 10, 2010, at 10:51 AM, Thomas Nagy wrote:
> >>> --- El vie, 10/12/10, Brian Quinlan
> escribió:
> >>>> On Dec 10, 2010, at 5:36 AM, Thomas Nagy
> wrote:
> >>>>> I have a process running for a long
> time, and
> >> which
> >>>> may use futures of different max_workers
> count. I
> >> think it
> >>>> is not too far-fetched to create a new
> futures
> >> object each
> >>>> time. Yet, the execution becomes slower
> after each
> >> call, for
> >>>> example with http://freehackers.org/~tnagy/futures_test.py:
> >>>>> 
> >>>>> """
> >>>>> import concurrent.futures
> >>>>> from queue import Queue
> >>>>> import datetime
> >>>>> 
> >>>>> class counter(object):
> >>>>>       def
> __init__(self, fut):
> >>>>>       
>    self.fut =
> >> fut
> >>>>> 
> >>>>>       def
> run(self):
> >>>>>       
>    def
> >>>> look_busy(num, obj):
> >>>>> 
> >>>>     tot = 0
> >>>>> 
> >>>>     for x in
> range(num):
> >>>>> 
> >>>>     tot += x
> >>>>> 
> >>>> 
>    obj.out_q.put(tot)
> >>>>> 
> >>>>>       
>    start =
> >>>> datetime.datetime.utcnow()
> >>>>>       
>    self.count =
> >> 0
> >>>>>       
>    self.out_q
> >> =
> >>>> Queue(0)
> >>>>>       
>    for x in
> >>>> range(1000):
> >>>>> 
> >>>>     self.count += 1
> >>>>> 
> >>>> 
>    self.fut.submit(look_busy,
> >> self.count,
> >>>> self)
> >>>>> 
> >>>>>       
>    while
> >>>> self.count:
> >>>>> 
> >>>>     self.count -= 1
> >>>>> 
> >>>>     self.out_q.get()
> >>>>> 
> >>>>>       
>    delta =
> >>>> datetime.datetime.utcnow() - start
> >>>>> 
> >>>> 
>    print(delta.total_seconds())
> >>>>> 
> >>>>> fut =
> >>>> 
> >>
> concurrent.futures.ThreadPoolExecutor(max_workers=20)
> >>>>> for x in range(100):
> >>>>>       #
> comment the following
> >> line
> >>>>>       fut =
> >>>> 
> >>
> concurrent.futures.ThreadPoolExecutor(max_workers=20)
> >>>>>       c =
> counter(fut)
> >>>>>   
>    c.run()
> >>>>> """
> >>>>> 
> >>>>> The runtime grows after each step:
> >>>>> 0.216451
> >>>>> 0.225186
> >>>>> 0.223725
> >>>>> 0.222274
> >>>>> 0.230964
> >>>>> 0.240531
> >>>>> 0.24137
> >>>>> 0.252393
> >>>>> 0.249948
> >>>>> 0.257153
> >>>>> ...
> >>>>> 
> >>>>> Is there a mistake in this piece of
> code?
> >>>> 
> >>>> There is no mistake that I can see but I
> suspect
> >> that the
> >>>> circular references that you are building
> are
> >> causing the
> >>>> ThreadPoolExecutor to take a long time to
> be
> >> collected. Try
> >>>> adding:
> >>>> 
> >>>>      c = counter(fut)
> >>>>      c.run()
> >>>> +    fut.shutdown()
> >>>> 
> >>>> Even if that fixes your problem, I still
> don't
> >> fully
> >>>> understand this because I would expect the
> runtime
> >> to fall
> >>>> after a while as ThreadPoolExecutors are
> >> collected.
> >>> 
> >>> The shutdown call is indeed a good fix :-)
> Here is the
> >> time response
> >>> of the calls to counter() when shutdown is
> not
> >> called:
> >>> http://www.freehackers.org/~tnagy/runtime_futures.png
> >> 
> >> FWIW, I think that you are confusion the term
> "future"
> >> with
> >> "executor". A future represents a single work
> item. An
> >> executor
> >> creates futures and schedules their underlying
> work.
> > 
> > Ah yes, sorry. I have also realized that the executor
> is not the killer feature I was expecting, it can only
> replace a little part of the code I have: controlling the
> exceptions and the workflow is the most complicated part.
> > 
> > I have also observed a minor performance degradation
> with the executor replacement (3 seconds for 5000 work
> items). The amount of work items processed by unit of time
> does not seem to be a straight line: http://www.freehackers.org/~tnagy/runtime_futures_2.png
> .
> 
> That looks pretty linear to me.

Ok.
 
> > Out of curiosity, what is the "_thread_references"
> for?
> 
> There is a big comment above it in the code:
> 
> # Workers are created as daemon threads. This is done to
> allow the interpreter
> # to exit when there are still idle threads in a
> ThreadPoolExecutor's thread
> # pool (i.e. shutdown() was not called). However, allowing
> workers to die with
> # the interpreter has two undesirable properties:
> #   - The workers would still be running
> during interpretor shutdown,
> #     meaning that they would fail in
> unpredictable ways.
> #   - The workers could be killed while
> evaluating a work item, which could
> #     be bad if the callable being
> evaluated has external side-effects e.g.
> #     writing to a file.
> #
> # To work around this problem, an exit handler is installed
> which tells the
> # workers to exit when their work queues are empty and then
> waits until the
> # threads finish.
> 
> _thread_references = set()
> _shutdown = False
> 
> def _python_exit():
>     global _shutdown
>     _shutdown = True
>     for thread_reference in _thread_references:
>         thread = thread_reference()
>         if thread is not None:
>             thread.join()
> 
> Is it still unclear why it is there? Maybe you could
> propose some additional documentation.

I was thinking that if exceptions have to be caught - and it is likely to be the case in general - then this scheme is redundant. Now I see that the threads are getting their work items from a queue, so it is clear now.

Thanks for all the information,

Thomas



      


More information about the Python-Dev mailing list