167 lines
5.3 KiB

"""Utilities for with-statement contexts. See PEP 343.
Original source code: https://hg.python.org/cpython/file/3.4/Lib/contextlib.py
Not implemented:
- redirect_stdout;
import sys
from collections import deque
from ucontextlib import *
class closing(object):
"""Context to automatically close something at the end of a block.
Code like this:
with closing(<module>.open(<arguments>)) as f:
is equivalent to this:
f = <module>.open(<arguments>)
def __init__(self, thing):
self.thing = thing
def __enter__(self):
return self.thing
def __exit__(self, *exc_info):
class suppress:
"""Context manager to suppress specified exceptions
After the exception is suppressed, execution proceeds with the next
statement following the with statement.
with suppress(FileNotFoundError):
# Execution still resumes here if the file was already removed
def __init__(self, *exceptions):
self._exceptions = exceptions
def __enter__(self):
def __exit__(self, exctype, excinst, exctb):
# Unlike isinstance and issubclass, CPython exception handling
# currently only looks at the concrete type hierarchy (ignoring
# the instance and subclass checking hooks). While Guido considers
# that a bug rather than a feature, it's a fairly hard one to fix
# due to various internal implementation details. suppress provides
# the simpler issubclass based semantics, rather than trying to
# exactly reproduce the limitations of the CPython interpreter.
# See http://bugs.python.org/issue12029 for more details
return exctype is not None and issubclass(exctype, self._exceptions)
# Inspired by discussions on http://bugs.python.org/issue13585
class ExitStack(object):
"""Context manager for dynamic management of a stack of exit callbacks
For example:
with ExitStack() as stack:
files = [stack.enter_context(open(fname)) for fname in filenames]
# All opened files will automatically be closed at the end of
# the with statement, even if attempts to open files later
# in the list raise an exception
def __init__(self):
self._exit_callbacks = deque()
def pop_all(self):
"""Preserve the context stack by transferring it to a new instance"""
new_stack = type(self)()
new_stack._exit_callbacks = self._exit_callbacks
self._exit_callbacks = deque()
return new_stack
def _push_cm_exit(self, cm, cm_exit):
"""Helper to correctly register callbacks to __exit__ methods"""
def _exit_wrapper(*exc_details):
return cm_exit(cm, *exc_details)
def push(self, exit):
"""Registers a callback with the standard __exit__ method signature
Can suppress exceptions the same way __exit__ methods can.
Also accepts any object with an __exit__ method (registering a call
to the method instead of the object itself)
# We use an unbound method rather than a bound method to follow
# the standard lookup behaviour for special methods
_cb_type = type(exit)
exit_method = _cb_type.__exit__
except AttributeError:
# Not a context manager, so assume its a callable
self._push_cm_exit(exit, exit_method)
return exit # Allow use as a decorator
def callback(self, callback, *args, **kwds):
"""Registers an arbitrary callback and arguments.
Cannot suppress exceptions.
def _exit_wrapper(exc_type, exc, tb):
callback(*args, **kwds)
return callback # Allow use as a decorator
def enter_context(self, cm):
"""Enters the supplied context manager
If successful, also pushes its __exit__ method as a callback and
returns the result of the __enter__ method.
# We look up the special methods on the type to match the with statement
_cm_type = type(cm)
_exit = _cm_type.__exit__
result = _cm_type.__enter__(cm)
self._push_cm_exit(cm, _exit)
return result
def close(self):
"""Immediately unwind the context stack"""
self.__exit__(None, None, None)
def __enter__(self):
return self
def __exit__(self, *exc_details):
received_exc = exc_details[0] is not None
# Callbacks are invoked in LIFO order to match the behaviour of
# nested context managers
suppressed_exc = False
pending_raise = False
while self._exit_callbacks:
cb = self._exit_callbacks.pop()
if cb(*exc_details):
suppressed_exc = True
pending_raise = False
exc_details = (None, None, None)
exc_details = sys.exc_info()
pending_raise = True
if pending_raise:
raise exc_details[1]
return received_exc and suppressed_exc