How to implement a Lock with a timeout in Python 2.7 How to implement a Lock with a timeout in Python 2.7 python python

How to implement a Lock with a timeout in Python 2.7


to elaborate on Steven's comment suggestion:

import threadingimport timelock = threading.Lock()cond = threading.Condition(threading.Lock())def waitLock(timeout):    with cond:        current_time = start_time = time.time()        while current_time < start_time + timeout:            if lock.acquire(False):                return True            else:                cond.wait(timeout - current_time + start_time)                current_time = time.time()    return False

Things to notice:

  • there are two threading.Lock() objects, one is internal to the threading.Condition().
  • when manipulating cond, it's lock is acquired; the wait() operation unlocks it, though, so any number of threads can watch it.
  • the wait is embedded inside a for loop that keeps track of the time. threading.Condition can become notified for reasons other than timeouts, so you still need to track the time if you really want it to expire.
  • even with the condition, you still 'poll' the real lock, because its possible for more than one thread to wake and race for the lock. if the lock.acquire fails, the loop returns to waiting.
  • callers of this waitLock function should follow a lock.release() with a cond.notify() so that other threads waiting on it are notified that they should retry aquiring the lock. This is not shown in the example.


My version using thread safe queues http://docs.python.org/2/library/queue.html and their put/get methods that supports timeout.

Until now is working fine, but if someone can do a peer review on it I'll be grateful.

"""Thread-safe lock mechanism with timeout support module."""from threading import ThreadError, current_threadfrom Queue import Queue, Full, Emptyclass TimeoutLock(object):    """    Thread-safe lock mechanism with timeout support.    """    def __init__(self, mutex=True):        """        Constructor.        Mutex parameter specifies if the lock should behave like a Mutex, and        thus use the concept of thread ownership.        """        self._queue = Queue(maxsize=1)        self._owner = None        self._mutex = mutex    def acquire(self, timeout=0):        """        Acquire the lock.        Returns True if the lock was succesfully acquired, False otherwise.        Timeout:        - < 0 : Wait forever.        -   0 : No wait.        - > 0 : Wait x seconds.        """        th = current_thread()        try:            self._queue.put(                th, block=(timeout != 0),                timeout=(None if timeout < 0 else timeout)            )        except Full:            return False        self._owner = th        return True    def release(self):        """        Release the lock.        If the lock is configured as a Mutex, only the owner thread can release        the lock. If another thread attempts to release the lock a        ThreadException is raised.        """        th = current_thread()        if self._mutex and th != self._owner:            raise ThreadError('This lock isn\'t owned by this thread.')        self._owner = None        try:            self._queue.get(False)            return True        except Empty:            raise ThreadError('This lock was released already.')


If somebody needs Python >= 3.2 API:

import threadingimport timeclass Lock(object):    _lock_class = threading.Lock    def __init__(self):        self._lock = self._lock_class()        self._cond = threading.Condition(threading.Lock())    def acquire(self, blocking=True, timeout=-1):        if not blocking or timeout == 0:            return self._lock.acquire(False)        cond = self._cond        lock = self._lock        if timeout < 0:            with cond:                while True:                    if lock.acquire(False):                        return True                    else:                        cond.wait()        else:            with cond:                current_time = time.time()                stop_time = current_time + timeout                while current_time < stop_time:                    if lock.acquire(False):                        return True                    else:                        cond.wait(stop_time - current_time)                        current_time = time.time()                return False    def release(self):        with self._cond:            self._lock.release()            self._cond.notify()    __enter__ = acquire    def __exit__(self, t, v, tb):        self.release()class RLock(Lock):    _lock_class = threading.RLock