`DummyExecutor` for Python's `futures`
Something like this should do it:
from concurrent.futures import Future, Executorfrom threading import Lockclass DummyExecutor(Executor): def __init__(self): self._shutdown = False self._shutdownLock = Lock() def submit(self, fn, *args, **kwargs): with self._shutdownLock: if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = Future() try: result = fn(*args, **kwargs) except BaseException as e: f.set_exception(e) else: f.set_result(result) return f def shutdown(self, wait=True): with self._shutdownLock: self._shutdown = Trueif __name__ == '__main__': def fnc(err): if err: raise Exception("test") else: return "ok" ex = DummyExecutor() print(ex.submit(fnc, True)) print(ex.submit(fnc, False)) ex.shutdown() ex.submit(fnc, True) # raises exception
locking is probably not needed in this case, but can't hurt to have it.
Use this to mock your ThreadPoolExecutor
class MockThreadPoolExecutor(): def __init__(self, **kwargs): pass def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_traceback): pass def submit(self, fn, *args, **kwargs): # execute functions in series without creating threads # for easier unit testing result = fn(*args, **kwargs) return result def shutdown(self, wait=True): passif __name__ == "__main__": def sum(a, b): return a + b with MockThreadPoolExecutor(max_workers=3) as executor: future_result = list() for i in range(5): future_result.append(executor.submit(sum, i + 1, i + 2))