How to implement a Lock with a timeout in Python 2.7
to elaborate on Steven's comment suggestion:
import threadingimport timelock = threading.Lock()cond = threading.Condition(threading.Lock())def waitLock(timeout): with cond: current_time = start_time = time.time() while current_time < start_time + timeout: if lock.acquire(False): return True else: cond.wait(timeout - current_time + start_time) current_time = time.time() return False
Things to notice:
- there are two
threading.Lock()
objects, one is internal to thethreading.Condition()
. - when manipulating
cond
, it's lock is acquired; thewait()
operation unlocks it, though, so any number of threads can watch it. - the wait is embedded inside a for loop that keeps track of the time.
threading.Condition
can become notified for reasons other than timeouts, so you still need to track the time if you really want it to expire. - even with the condition, you still 'poll' the real lock, because its possible for more than one thread to wake and race for the lock. if the lock.acquire fails, the loop returns to waiting.
- callers of this
waitLock
function should follow alock.release()
with acond.notify()
so that other threads waiting on it are notified that they should retry aquiring the lock. This is not shown in the example.
My version using thread safe queues http://docs.python.org/2/library/queue.html and their put/get methods that supports timeout.
Until now is working fine, but if someone can do a peer review on it I'll be grateful.
"""Thread-safe lock mechanism with timeout support module."""from threading import ThreadError, current_threadfrom Queue import Queue, Full, Emptyclass TimeoutLock(object): """ Thread-safe lock mechanism with timeout support. """ def __init__(self, mutex=True): """ Constructor. Mutex parameter specifies if the lock should behave like a Mutex, and thus use the concept of thread ownership. """ self._queue = Queue(maxsize=1) self._owner = None self._mutex = mutex def acquire(self, timeout=0): """ Acquire the lock. Returns True if the lock was succesfully acquired, False otherwise. Timeout: - < 0 : Wait forever. - 0 : No wait. - > 0 : Wait x seconds. """ th = current_thread() try: self._queue.put( th, block=(timeout != 0), timeout=(None if timeout < 0 else timeout) ) except Full: return False self._owner = th return True def release(self): """ Release the lock. If the lock is configured as a Mutex, only the owner thread can release the lock. If another thread attempts to release the lock a ThreadException is raised. """ th = current_thread() if self._mutex and th != self._owner: raise ThreadError('This lock isn\'t owned by this thread.') self._owner = None try: self._queue.get(False) return True except Empty: raise ThreadError('This lock was released already.')
If somebody needs Python >= 3.2 API:
import threadingimport timeclass Lock(object): _lock_class = threading.Lock def __init__(self): self._lock = self._lock_class() self._cond = threading.Condition(threading.Lock()) def acquire(self, blocking=True, timeout=-1): if not blocking or timeout == 0: return self._lock.acquire(False) cond = self._cond lock = self._lock if timeout < 0: with cond: while True: if lock.acquire(False): return True else: cond.wait() else: with cond: current_time = time.time() stop_time = current_time + timeout while current_time < stop_time: if lock.acquire(False): return True else: cond.wait(stop_time - current_time) current_time = time.time() return False def release(self): with self._cond: self._lock.release() self._cond.notify() __enter__ = acquire def __exit__(self, t, v, tb): self.release()class RLock(Lock): _lock_class = threading.RLock