with timeout(...):

Nick Craig-Wood nick at craig-wood.com
Tue Mar 27 04:30:05 EDT 2007


Klaas <mike.klaas at gmail.com> wrote:
>  On Mar 26, 3:30 am, Nick Craig-Wood <n... at craig-wood.com> wrote:
> > Did anyone write a contextmanager implementing a timeout for
> > python2.5?
> >
> > I'd love to be able to write something like
> >
> >     with timeout(5.0) as exceeded:
> >         some_long_running_stuff()
> >     if exceeded:
> >         print "Oops - took too long!"
> >
> > And have it work reliably and in a cross platform way!
> 
>  Doubt it.  But you could try:
> 
>  class TimeoutException(BaseException):
>     pass
> 
>  class timeout(object):
>     def __init__(self, limit_t):
>         self.limit_t = limit
>         self.timer = None
>         self.timed_out = False
>     def __nonzero__(self):
>         return self.timed_out
>     def __enter__(self):
>         self.timer = threading.Timer(self.limit_t, ...)
>         self.timer.start()
>         return self
>     def __exit__(self, exc_c, exc, tb):
>         if exc_c is TimeoutException:
>            self.timed_out = True
>            return True # suppress exception
>         return False # raise exception (maybe)
> 
>  where '...' is a ctypes call to raise the given exception in the
>  current thread (the capi call PyThreadState_SetAsyncExc)
> 
>  Definitely not fool-proof, as it relies on thread switching.  Also,
>  lock acquisition can't be interrupted, anyway.  Also, this style of
>  programming is rather unsafe.
> 
>  But I bet it would work frequently.

Here is my effort...  You'll note from the comments that there are
lots of tricky bits.

It isn't perfect though as it sometimes leaves behind threads (see the
FIXME).  I don't think it crashes any more though!

------------------------------------------------------------

"""
General purpose timeout mechanism not using alarm(), ie cross platform

Eg

from timeout import Timeout, TimeoutError

def might_infinite_loop(arg):
    while 1:
        pass

try:
    Timeout(10, might_infinite_loop, "some arg")
except TimeoutError:
    print "Oops took too long"
else:
    print "Ran just fine"

"""

import threading
import time
import sys
import ctypes
import os

class TimeoutError(Exception):
    """Thrown on a timeout"""
PyThreadState_SetAsyncExc = ctypes.pythonapi.PyThreadState_SetAsyncExc
_c_TimeoutError = ctypes.py_object(TimeoutError)

class Timeout(threading.Thread):
    """
    A General purpose timeout class
    timeout is int/float in seconds
    action is a callable
    *args, **kwargs are passed to the callable
    """
    def __init__(self, timeout, action, *args, **kwargs):
        threading.Thread.__init__(self)
        self.action = action
        self.args = args
        self.kwargs = kwargs
        self.stopped = False
        self.exc_value = None
        self.end_lock = threading.Lock()
        # start subtask
        self.setDaemon(True)                # FIXME this shouldn't be needed but is, indicating sub tasks aren't ending
        self.start()
        # Wait for subtask to end naturally
        self.join(timeout)
        # Use end_lock to kill the thread in a non-racy
        # fashion. (Using isAlive is racy).  Poking exceptions into
        # the Thread cleanup code isn't a good idea either
        if self.end_lock.acquire(False):
            # gained end_lock => sub thread is still running
            # sub thread is still running so kill it with a TimeoutError
            self.exc_value = TimeoutError()
            PyThreadState_SetAsyncExc(self.id, _c_TimeoutError)
            # release the lock so it can progress into thread cleanup
            self.end_lock.release()
        # shouldn't block since we've killed the thread
        self.join()
        # re-raise any exception
        if self.exc_value:
            raise self.exc_value
    def run(self):
        self.id = threading._get_ident()
        try:
            self.action(*self.args, **self.kwargs)
        except:
            self.exc_value = sys.exc_value
        # only end if we can acquire the end_lock
        self.end_lock.acquire()

if __name__ == "__main__":
    
    def _spin(t):
        """Spins for t seconds"""
        start = time.time()
        end = start + t
        while time.time() < end:
            pass

    def _test_time_limit(name, expecting_time_out, t_limit, fn, *args, **kwargs):
        """Test Timeout"""
        start = time.time()
        
        if expecting_time_out:
            print "Test",name,"should timeout"
        else:
            print "Test",name,"shouldn't timeout"

        try:
            Timeout(t_limit, fn, *args, **kwargs)
        except TimeoutError, e:
            if expecting_time_out:
                print "Timeout generated OK"
            else:
                raise RuntimeError("Wasn't expecting TimeoutError Here")
        else:
            if expecting_time_out:
                raise RuntimeError("Was expecting TimeoutError Here")
            else:
                print "No TimeoutError generated OK"

        elapsed = time.time() - start
        print "That took",elapsed,"seconds for timeout of",t_limit

    def test():
        """Test code"""

        # no nesting
        _test_time_limit("simple #1", True, 5, _spin, 10)
        _test_time_limit("simple #2", False, 10, _spin, 5)

        # 1 level of nesting
        _test_time_limit("nested #1",  True, 4, _test_time_limit,
                         "nested #1a", True, 5, _spin, 10)
        _test_time_limit("nested #2",  False, 6, _test_time_limit,
                         "nested #2a", True, 5, _spin, 10)
        _test_time_limit("nested #4",  False, 6, _test_time_limit,
                         "nested #4a", False, 10, _spin, 5)

        # 2 level of nesting
        _test_time_limit("nested #5",  True, 3, _test_time_limit,
                         "nested #5a", True, 4, _test_time_limit,
                         "nested #5b", True, 5, _spin, 10)
        _test_time_limit("nested #9",  False, 7, _test_time_limit,
                         "nested #9a", True, 4, _test_time_limit,
                         "nested #9b", True, 5, _spin, 10)
        _test_time_limit("nested #10", False, 7, _test_time_limit,
                         "nested #10a",False, 6, _test_time_limit,
                         "nested #10b",True, 5, _spin, 10)
        _test_time_limit("nested #12", False, 7, _test_time_limit,
                         "nested #12a",False, 6, _test_time_limit,
                         "nested #12b",False, 10, _spin, 5)

        print "All tests OK"

    test()


-- 
Nick Craig-Wood <nick at craig-wood.com> -- http://www.craig-wood.com/nick



More information about the Python-list mailing list