Time out question

Nick Craig-Wood nick at craig-wood.com
Tue Jul 4 05:30:03 EDT 2006


Grant Edwards <grante at visi.com> wrote:
>  On 2006-07-03, Nick Craig-Wood <nick at craig-wood.com> wrote:
> > Alex Martelli <aleax at mac.com> wrote:
> >>  DarkBlue <nomail at nixmail.com> wrote:
> >> >  try for 10 seconds
> >> >    if database.connected :
> >> >     do your remote thing
> >> >  except raise after 10 seconds
> >> >    abort any connection attempt
> >> >    do something else 
> >> 
> >>  Sure, see function setdefaulttimeout in module socket.  Just call it as
> >>  you wish before trying the DB connection and catch the resulting
> >>  exception.
> >
> > It would be nice to have language support for the general case, ie a
> > general purpose timeout.
> 
>  I just use signal.alarm():

There are quite a lot of problems with alarm()

1) doesn't work on windows
2) there is only one alarm so nested alarms are tricky
3) alarm() only has 1 second resolution, however you can use the POSIX
   setitimer() which has better resolution
4) signals make havoc with some things (signals are basically
   interrupts passed to user space under unix)
5) alarm() doesn't play nice with threads

All that said it does work under unix for a lot of programs.  Here is
a module which fixes point 2) anyway...

It is a lot easier to follow than the threads example I posted
earlier.  There are still potential race conditions though even with
something as simple as alarm()!

............................................................

"""
Implement time limited function calls.  Only works on platforms which
support the signal module and signal.alarm()

Eg

from time_limit import time_limit, TimeOut

def might_infinite_loop(arg):
    while 1:
        pass

try:
    time_limit(10, might_infinite_loop, "some arg")
except TimeOut:
    print "Oops took too long"
else:
    print "Ran just fine"

"""

import signal
import time
import inspect

class TimeOut(Exception):
    """Thrown on alarm"""
    pass

class _NestedTimeOut(Exception):
    """Thrown on TimeOut detected for next layer"""
    pass

def _sig_alarm(signum, frame):
    """Alarm handler for this module"""
    raise TimeOut()

def time_limit(t_limit, fn, *args, **kwargs):
    """
    Calls fn with the *args and **kwargs returning its result or
    raising a TimeOut exception if it doesn't complete within t
    seconds
    """

    # Turn alarm off and read old value
    old_t_limit = signal.alarm(0)
    start_time = time.time()
    # Install new handler remembering old
    old_handler = signal.signal(signal.SIGALRM, _sig_alarm)
    # Set the timer going
    signal.alarm(t_limit)

    try:
        try:
            try:
                rc = fn(*args, **kwargs)
            except _NestedTimeOut:
                raise TimeOut()
        finally:
            # Disable alarm
            signal.alarm(0)
    finally:
        # Alarm will be disabled or will have gone off when we get here
        # Restore the old handler
        signal.signal(signal.SIGALRM, old_handler)
        # If there was an alarm running restore it
        if old_t_limit > 0:
            # Calculate how much of the old timeout is left
            elapsed_time = time.time() - start_time
            old_t_limit = int(round(old_t_limit - elapsed_time, 0))
            if old_t_limit <= 0:
                # Alarm should have gone off so call old signal handler
                if old_handler == _sig_alarm:
                    # If handler is our handler then...
                    raise _NestedTimeOut()
                else:
                    # Call old handler
                    old_handler(signal.SIGALRM, inspect.currentframe())
            else:
                # Re-enable alarm
                signal.alarm(old_t_limit)

    return rc

if __name__ == "__main__":
    
    def _spin(t):
        """Spins for t seconds"""
        start = time.time()
        end = start + t
        while time.time() < end:
            pass

    def _test_time_limit(name, expecting_time_out, t_limit, fn, *args, **kwargs):
        """Test time_limit"""
        start = time.time()
        
        if expecting_time_out:
            print "Test",name,"should timeout"
        else:
            print "Test",name,"shouldn't timeout"

        try:
            time_limit(t_limit, fn, *args, **kwargs)
        except TimeOut, e:
            if expecting_time_out:
                print "Timeout generated OK"
            else:
                raise RuntimeError("Wasn't expecting TimeOut Here")
        else:
            if expecting_time_out:
                raise RuntimeError("Was expecting TimeOut Here")
            else:
                print "No timeout generated OK"

        elapsed = time.time() - start
        print "That took",elapsed,"seconds for timeout of",t_limit

    def test():
        """Test code"""

        # no nesting
        _test_time_limit("simple #1", True, 5, _spin, 10)
        _test_time_limit("simple #2", False, 10, _spin, 5)

        # 1 level of nesting
        _test_time_limit("nested #1", True, 4, _test_time_limit, "nested #1a", True, 5, _spin, 10)
        _test_time_limit("nested #2", False, 6, _test_time_limit, "nested #2a", True, 5, _spin, 10)
        _test_time_limit("nested #3", True, 4, _test_time_limit, "nested #3a", False, 10, _spin, 5)
        _test_time_limit("nested #4", False, 6, _test_time_limit, "nested #4a", False, 10, _spin, 5)

        # 2 level of nesting
        _test_time_limit("nested #5", True, 3, _test_time_limit, "nested #5a", True, 4, _test_time_limit, "nested #5b", True, 5, _spin, 10)
        _test_time_limit("nested #6", True, 3, _test_time_limit, "nested #6a", False, 6, _test_time_limit, "nested #6b", True, 5, _spin, 10)
        _test_time_limit("nested #7", True, 3, _test_time_limit, "nested #7a", True, 4, _test_time_limit, "nested #7b", False, 10, _spin, 5)
        _test_time_limit("nested #8", True, 3, _test_time_limit, "nested #8a", False, 6, _test_time_limit, "nested #8b", False, 10, _spin, 5)
        _test_time_limit("nested #9", False, 7, _test_time_limit, "nested #9a", True, 4, _test_time_limit, "nested #9b", True, 5, _spin, 10)
        _test_time_limit("nested #10", False, 7, _test_time_limit, "nested #10a", False, 6, _test_time_limit, "nested #10b", True, 5, _spin, 10)
        _test_time_limit("nested #11", False, 7, _test_time_limit, "nested #11a", True, 4, _test_time_limit, "nested #11b", False, 10, _spin, 5)
        _test_time_limit("nested #12", False, 7, _test_time_limit, "nested #12a", False, 6, _test_time_limit, "nested #12b", False, 10, _spin, 5)

        print "All tests OK"

    test()

-- 
Nick Craig-Wood <nick at craig-wood.com> -- http://www.craig-wood.com/nick



More information about the Python-list mailing list