Best way to report progress at fixed intervals

Slaunger Slaunger at gmail.com
Wed Dec 10 06:37:47 EST 2008


On 10 Dec., 12:08, eric <e... at ericaro.net> wrote:
> Don't mind if I give my shot ?
>
> def work(i):
>     """
>     Dummy process function, which takes a random time in the interval
>     0.0-0.5 secs to execute
>     """
>     print "Work step %d" % i
>     time.sleep(0.5 * random.random())
>
> def workAll(work, verbose=True, max_iter=20, progress_interval=1.0):
>     '''
>     pass the real job as a callable
>     '''
>     progress = time.time()
>     for i in range(max_iter): # do the requested loop
>         work(i)
>         if verbose:
>             print "Work through all %d steps reporting progress every
> %3.1f secs..." %(max_iter, progress_interval)
>         interval = time.time()-progress
>         if interval>progress_interval:
>             print "Processed %d of %d at pace %s" % (i, max_iter,
> interval)
>             progress +=interval
>
> if __name__=="__main__":
>     workAll(work, False)
>
> It's works fine, and the "pace" is 'almost' the required one. You earn
> a no-thread-mess, and cleaner alg.
>
> But the loop is controlled by the caller (the WorkAll function) this
> is also called ass-backward algorithm, and you cannot expect
> algorithms to be assbackward (even if it's the best way to implement
> them).
>
> You can use the yield statement, to turn  easilly your alg into a
> nice, stopable assbackward algo:
>
> def work():
>     """
>     Dummy process function, which takes a random time in the interval
>     0.0-0.5 secs to execute
>     """
>     for i in range(50):
>         print "Work step %d" % i
>         time.sleep(0.5 * random.random())
>         yield i # kind-of "publish it and let the caller do whatever
> it want s (good practice anyway)
>
> def workAll(work, verbose=True, max_iter=20, progress_interval=1.0):
>     '''
>     pass the real job as a generator
>     '''
>     progress = time.time()
>     i = 0
>     for w in work: # do the requested loop
>         if verbose:
>             print "Work through all %d steps reporting progress every
> %3.1f secs..." %(max_iter, progress_interval)
>         interval = time.time()-progress
>         if interval>progress_interval:
>             print "Processed %d at pace %s" % (w, interval)
>             progress +=interval
>         if i>=max_iter:
>             work.close()
>         i+=1
>
> if __name__=="__main__":
>     workAll(work(), False)     # note the calling difference
>
> hope it helps.

Hi eric,

No, I certainly don't mind you giving a try ;-)

I actually started out doing something like your first version here,
but I am a little annoyed by the fact that the progress report
interval is not a sure thing. For instance in my real applications, I
have seldomly occuring work steps, which may take significantly longer
than the progress_interval, and I'd like to let it keep reporting
that, oh, I am still woking, albeit on the same work step, to maintain
a sense of the script being alive.

I like you generator approach though.

Anyway, I have now given my own proposal another iteration based on
what I have seen here (and my personal preferences), and I have come
up with this:

============ src =======================
"""
Test module for testing generic ways of displaying progress
information
at regular intervals.
"""
import random
import threading
import time

def work(i):
    """
    Dummy process function, which takes a random time in the interval
    0.0-0.5 secs to execute
    """
    print "Work step %d" % i
    time.sleep(0.5 * random.random())


def workAll(verbose=True, max_iter=20, progress_interval=1.0):

    class ProgressReporter(threading.Thread):

        def __init__(self):
            threading.Thread.__init__(self)
            self.setDaemon(True)
            self.i = 0
            self.max = max_iter
            self.start_timer = verbose
            self.progress_interval = progress_interval

        def run(self):
            while self.start_timer:
                print "Processed %d of %d." % (self.i + 1, self.max)
                time.sleep(self.progress_interval)

    p = ProgressReporter()

    if verbose:
        print "Work through all %d steps reporting every %3.1f
secs..." % \
            (max_iter, progress_interval)
        p.start()

    for i in xrange(max_iter):
        work(i)
        p.i = i

    if verbose:
        print "Finished working through %d steps" % max_iter

if __name__ == "__main__":
    workAll()

========= end src ================================

I like this much better than my own first attempt in my initial post
on this thread.

-- Slaunger




More information about the Python-list mailing list