`DummyExecutor` for Python's `futures` `DummyExecutor` for Python's `futures` python python

`DummyExecutor` for Python's `futures`


Something like this should do it:

from concurrent.futures import Future, Executorfrom threading import Lockclass DummyExecutor(Executor):    def __init__(self):        self._shutdown = False        self._shutdownLock = Lock()    def submit(self, fn, *args, **kwargs):        with self._shutdownLock:            if self._shutdown:                raise RuntimeError('cannot schedule new futures after shutdown')            f = Future()            try:                result = fn(*args, **kwargs)            except BaseException as e:                f.set_exception(e)            else:                f.set_result(result)            return f    def shutdown(self, wait=True):        with self._shutdownLock:            self._shutdown = Trueif __name__ == '__main__':    def fnc(err):        if err:            raise Exception("test")        else:            return "ok"    ex = DummyExecutor()    print(ex.submit(fnc, True))    print(ex.submit(fnc, False))    ex.shutdown()    ex.submit(fnc, True) # raises exception

locking is probably not needed in this case, but can't hurt to have it.


Use this to mock your ThreadPoolExecutor

class MockThreadPoolExecutor():    def __init__(self, **kwargs):        pass    def __enter__(self):        return self    def __exit__(self, exc_type, exc_value, exc_traceback):        pass    def submit(self, fn, *args, **kwargs):        # execute functions in series without creating threads        # for easier unit testing        result = fn(*args, **kwargs)        return result    def shutdown(self, wait=True):        passif __name__ == "__main__":    def sum(a, b):        return a + b    with MockThreadPoolExecutor(max_workers=3) as executor:        future_result = list()        for i in range(5):            future_result.append(executor.submit(sum, i + 1, i + 2))