How to add a timeout to a function in Python How to add a timeout to a function in Python python python

How to add a timeout to a function in Python


The principal problem with your code is the overuse of the double underscore namespace conflict prevention in a class that isn't intended to be subclassed at all.

In general, self.__foo is a code smell that should be accompanied by a comment along the lines of # This is a mixin and we don't want arbitrary subclasses to have a namespace conflict.

Further the client API of this method would look like this:

def mymethod(): passmymethod = add_timeout(mymethod, 15)# start the processing    timeout_obj = mymethod()try:    # access the property, which is really a function call    ret = timeout_obj.valueexcept TimeoutError:    # handle a timeout here    ret = None

This is not very pythonic at all and a better client api would be:

@timeout(15)def mymethod(): passtry:    my_method()except TimeoutError:    pass

You are using @property in your class for something that is a state mutating accessor, this is not a good idea. For instance, what would happen when .value is accessed twice? It looks like it would fail because queue.get() would return trash because the queue is already empty.

Remove @property entirely. Don't use it in this context, it's not suitable for your use-case. Make call block when called and return the value or raise the exception itself. If you really must have value accessed later, make it a method like .get() or .value().

This code for the _target should be rewritten a little:

def _target(queue, function, *args, **kwargs):    try:        queue.put((True, function(*args, **kwargs)))    except:        queue.put((False, exc_info())) # get *all* the exec info, don't do exc_info[1]# then later:    raise exc_info[0], exc_info[1], exc_info[2]

That way the stack trace will be preserved correctly and visible to the programmer.

I think you've made a reasonable first crack at writing a useful library, I like the usage of the processing module to achieve the goals.


This is how to get the decorator syntax Jerub mentioned

def timeout(limit=None):    if limit is None:        limit = DEFAULT_TIMEOUT    if limit <= 0:        raise TimeoutError() # why not ValueError here?    def wrap(function):        return _Timeout(function,limit)    return wrap@timeout(15)def mymethod(): pass


This question was asked over 9 years ago, and Python has changed a decent amount since then as has my repertoire of experience. After reviewing other APIs in the standard library and wanting to partially replicate one in particular, the follow module was written to serve a similar purpose as the one posted in the question.

asynchronous.py

#! /usr/bin/env python3import _threadimport abc as _abcimport collections as _collectionsimport enum as _enumimport math as _mathimport multiprocessing as _multiprocessingimport operator as _operatorimport queue as _queueimport signal as _signalimport sys as _sysimport time as _time__all__ = (    'Executor',    'get_timeout',    'set_timeout',    'submit',    'map_',    'shutdown')class _Base(metaclass=_abc.ABCMeta):    __slots__ = (        '__timeout',    )    @_abc.abstractmethod    def __init__(self, timeout):        self.timeout = _math.inf if timeout is None else timeout    def get_timeout(self):        return self.__timeout    def set_timeout(self, value):        if not isinstance(value, (float, int)):            raise TypeError('value must be of type float or int')        if value <= 0:            raise ValueError('value must be greater than zero')        self.__timeout = value    timeout = property(get_timeout, set_timeout)def _run_and_catch(fn, args, kwargs):    # noinspection PyPep8,PyBroadException    try:        return False, fn(*args, **kwargs)    except:        return True, _sys.exc_info()[1]def _run(fn, args, kwargs, queue):    queue.put_nowait(_run_and_catch(fn, args, kwargs))class _State(_enum.IntEnum):    PENDING = _enum.auto()    RUNNING = _enum.auto()    CANCELLED = _enum.auto()    FINISHED = _enum.auto()    ERROR = _enum.auto()def _run_and_catch_loop(iterable, *args, **kwargs):    exception = None    for fn in iterable:        error, value = _run_and_catch(fn, args, kwargs)        if error:            exception = value    if exception:        raise exceptionclass _Future(_Base):    __slots__ = (        '__queue',        '__process',        '__start_time',        '__callbacks',        '__result',        '__mutex'    )    def __init__(self, timeout, fn, args, kwargs):        super().__init__(timeout)        self.__queue = _multiprocessing.Queue(1)        self.__process = _multiprocessing.Process(            target=_run,            args=(fn, args, kwargs, self.__queue),            daemon=True        )        self.__start_time = _math.inf        self.__callbacks = _collections.deque()        self.__result = True, TimeoutError()        self.__mutex = _thread.allocate_lock()    @property    def __state(self):        pid, exitcode = self.__process.pid, self.__process.exitcode        return (_State.PENDING if pid is None else                _State.RUNNING if exitcode is None else                _State.CANCELLED if exitcode == -_signal.SIGTERM else                _State.FINISHED if exitcode == 0 else                _State.ERROR)    def __repr__(self):        root = f'{type(self).__name__} at {id(self)} state={self.__state.name}'        if self.__state < _State.CANCELLED:            return f'<{root}>'        error, value = self.__result        suffix = f'{"raised" if error else "returned"} {type(value).__name__}'        return f'<{root} {suffix}>'    def __consume_callbacks(self):        while self.__callbacks:            yield self.__callbacks.popleft()    def __invoke_callbacks(self):        self.__process.join()        _run_and_catch_loop(self.__consume_callbacks(), self)    def cancel(self):        self.__process.terminate()        self.__invoke_callbacks()    def __auto_cancel(self):        elapsed_time = _time.perf_counter() - self.__start_time        if elapsed_time > self.timeout:            self.cancel()        return elapsed_time    def cancelled(self):        self.__auto_cancel()        return self.__state is _State.CANCELLED    def running(self):        self.__auto_cancel()        return self.__state is _State.RUNNING    def done(self):        self.__auto_cancel()        return self.__state > _State.RUNNING    def __handle_result(self, error, value):        self.__result = error, value        self.__invoke_callbacks()    def __ensure_termination(self):        with self.__mutex:            elapsed_time = self.__auto_cancel()            if not self.__queue.empty():                self.__handle_result(*self.__queue.get_nowait())            elif self.__state < _State.CANCELLED:                remaining_time = self.timeout - elapsed_time                if remaining_time == _math.inf:                    remaining_time = None                try:                    result = self.__queue.get(True, remaining_time)                except _queue.Empty:                    self.cancel()                else:                    self.__handle_result(*result)    def result(self):        self.__ensure_termination()        error, value = self.__result        if error:            raise value        return value    def exception(self):        self.__ensure_termination()        error, value = self.__result        if error:            return value    def add_done_callback(self, fn):        if self.done():            fn(self)        else:            self.__callbacks.append(fn)    def _set_running_or_notify_cancel(self):        if self.__state is _State.PENDING:            self.__process.start()            self.__start_time = _time.perf_counter()        else:            self.cancel()class Executor(_Base):    __slots__ = (        '__futures',    )    def __init__(self, timeout=None):        super().__init__(timeout)        self.__futures = set()    def submit(self, fn, *args, **kwargs):        future = _Future(self.timeout, fn, args, kwargs)        self.__futures.add(future)        future.add_done_callback(self.__futures.remove)        # noinspection PyProtectedMember        future._set_running_or_notify_cancel()        return future    @staticmethod    def __cancel_futures(iterable):        _run_and_catch_loop(map(_operator.attrgetter('cancel'), iterable))    def map(self, fn, *iterables):        futures = tuple(self.submit(fn, *args) for args in zip(*iterables))        def result_iterator():            future_iterator = iter(futures)            try:                for future in future_iterator:                    yield future.result()            finally:                self.__cancel_futures(future_iterator)        return result_iterator()    def shutdown(self):        self.__cancel_futures(frozenset(self.__futures))    def __enter__(self):        return self    def __exit__(self, exc_type, exc_val, exc_tb):        self.shutdown()        return False_executor = Executor()get_timeout = _executor.get_timeoutset_timeout = _executor.set_timeoutsubmit = _executor.submitmap_ = _executor.mapshutdown = _executor.shutdowndel _executor