Alternative to contextlib.nested with variable number of context managers Alternative to contextlib.nested with variable number of context managers python python

Alternative to contextlib.nested with variable number of context managers


The new Python 3 contextlib.ExitStack class was added as a replacement for contextlib.nested() (see issue 13585).

It is coded in such a way you can use it in Python 2 directly:

import sysfrom collections import dequeclass ExitStack(object):    """Context manager for dynamic management of a stack of exit callbacks    For example:        with ExitStack() as stack:            files = [stack.enter_context(open(fname)) for fname in filenames]            # All opened files will automatically be closed at the end of            # the with statement, even if attempts to open files later            # in the list raise an exception    """    def __init__(self):        self._exit_callbacks = deque()    def pop_all(self):        """Preserve the context stack by transferring it to a new instance"""        new_stack = type(self)()        new_stack._exit_callbacks = self._exit_callbacks        self._exit_callbacks = deque()        return new_stack    def _push_cm_exit(self, cm, cm_exit):        """Helper to correctly register callbacks to __exit__ methods"""        def _exit_wrapper(*exc_details):            return cm_exit(cm, *exc_details)        _exit_wrapper.__self__ = cm        self.push(_exit_wrapper)    def push(self, exit):        """Registers a callback with the standard __exit__ method signature        Can suppress exceptions the same way __exit__ methods can.        Also accepts any object with an __exit__ method (registering a call        to the method instead of the object itself)        """        # We use an unbound method rather than a bound method to follow        # the standard lookup behaviour for special methods        _cb_type = type(exit)        try:            exit_method = _cb_type.__exit__        except AttributeError:            # Not a context manager, so assume its a callable            self._exit_callbacks.append(exit)        else:            self._push_cm_exit(exit, exit_method)        return exit # Allow use as a decorator    def callback(self, callback, *args, **kwds):        """Registers an arbitrary callback and arguments.        Cannot suppress exceptions.        """        def _exit_wrapper(exc_type, exc, tb):            callback(*args, **kwds)        # We changed the signature, so using @wraps is not appropriate, but        # setting __wrapped__ may still help with introspection        _exit_wrapper.__wrapped__ = callback        self.push(_exit_wrapper)        return callback # Allow use as a decorator    def enter_context(self, cm):        """Enters the supplied context manager        If successful, also pushes its __exit__ method as a callback and        returns the result of the __enter__ method.        """        # We look up the special methods on the type to match the with statement        _cm_type = type(cm)        _exit = _cm_type.__exit__        result = _cm_type.__enter__(cm)        self._push_cm_exit(cm, _exit)        return result    def close(self):        """Immediately unwind the context stack"""        self.__exit__(None, None, None)    def __enter__(self):        return self    def __exit__(self, *exc_details):        # We manipulate the exception state so it behaves as though        # we were actually nesting multiple with statements        frame_exc = sys.exc_info()[1]        def _fix_exception_context(new_exc, old_exc):            while 1:                exc_context = new_exc.__context__                if exc_context in (None, frame_exc):                    break                new_exc = exc_context            new_exc.__context__ = old_exc        # Callbacks are invoked in LIFO order to match the behaviour of        # nested context managers        suppressed_exc = False        while self._exit_callbacks:            cb = self._exit_callbacks.pop()            try:                if cb(*exc_details):                    suppressed_exc = True                    exc_details = (None, None, None)            except:                new_exc_details = sys.exc_info()                # simulate the stack of exceptions by setting the context                _fix_exception_context(new_exc_details[1], exc_details[1])                if not self._exit_callbacks:                    raise                exc_details = new_exc_details        return suppressed_exc

Use this as your context manager, then add nested context managers at will:

with ExitStack() as stack:    managers = [stack.enter_context(my_context(arg)) for arg in items]    print("processing under", managers)

For your example context manager, this prints:

>>> my_fn(range(3))('entering', 0)('entering', 1)('entering', 2)('processing under', [0, 1, 2])('exiting', 2)('exiting', 1)('exiting', 0)

You can also install the contextlib2 module; it includes ExitStack as a backport.


It's a little vexing that the python3 maintainers chose to break backwards compatibility, since implementing nested in terms of ExitStack is pretty straightforward:

try:    from contextlib import nested  # Python 2except ImportError:    from contextlib import ExitStack, contextmanager    @contextmanager    def nested(*contexts):        """        Reimplementation of nested in python 3.        """        with ExitStack() as stack:            for ctx in contexts:                stack.enter_context(ctx)            yield contexts


import sysimport contextlibclass nodeA(object):    def __init__(self):        print( '__init__ nodeA')    def __enter__(self):        print( '__enter__ nodeA')    def __exit__(self, a, b, c):        print( '__exit__ nodeA')class nodeB(object):    def __init__(self):        print( '__init__ nodeB')    def __enter__(self):        print( '__enter__ nodeB')    def __exit__(self, a, b, c):        print( '__exit__ nodeB')class nodeC(object):    def __init__(self):        print( '__init__ nodeC')    def __enter__(self):        print( '__enter__ nodeC')    def __exit__(self, a, b, c):        print( '__exit__ nodeC')print( 'Start...')a = nodeA()b = nodeB()c = nodeC()print( 'Python version: %s' % (sys.version))if sys.version.startswith('2'):    print('Use python 2!')    with contextlib.nested(a, b, c):        print('hallo?')if sys.version.startswith('3'):    print('Use python 3!')    with contextlib.ExitStack() as stack:        [stack.enter_context(arg) for arg in [a,b,c]]print('...end!')