1from __future__ import annotations
2
3from abc import abstractmethod
4from contextlib import AbstractAsyncContextManager, AbstractContextManager
5from inspect import isasyncgen, iscoroutine, isgenerator
6from types import TracebackType
7from typing import Protocol, TypeVar, cast, final
8
9_T_co = TypeVar("_T_co", covariant=True)
10_ExitT_co = TypeVar("_ExitT_co", covariant=True, bound="bool | None")
11
12
13class _SupportsCtxMgr(Protocol[_T_co, _ExitT_co]):
14 def __contextmanager__(self) -> AbstractContextManager[_T_co, _ExitT_co]: ...
15
16
17class _SupportsAsyncCtxMgr(Protocol[_T_co, _ExitT_co]):
18 def __asynccontextmanager__(
19 self,
20 ) -> AbstractAsyncContextManager[_T_co, _ExitT_co]: ...
21
22
23class ContextManagerMixin:
24 """
25 Mixin class providing context manager functionality via a generator-based
26 implementation.
27
28 This class allows you to implement a context manager via :meth:`__contextmanager__`
29 which should return a generator. The mechanics are meant to mirror those of
30 :func:`@contextmanager <contextlib.contextmanager>`.
31
32 .. note:: Classes using this mix-in are not reentrant as context managers, meaning
33 that once you enter it, you can't re-enter before first exiting it.
34
35 .. seealso:: :doc:`contextmanagers`
36 """
37
38 __cm: AbstractContextManager[object, bool | None] | None = None
39
40 @final
41 def __enter__(self: _SupportsCtxMgr[_T_co, bool | None]) -> _T_co:
42 # Needed for mypy to assume self still has the __cm member
43 assert isinstance(self, ContextManagerMixin)
44 if self.__cm is not None:
45 raise RuntimeError(
46 f"this {self.__class__.__qualname__} has already been entered"
47 )
48
49 cm = self.__contextmanager__()
50 if not isinstance(cm, AbstractContextManager):
51 if isgenerator(cm):
52 raise TypeError(
53 "__contextmanager__() returned a generator object instead of "
54 "a context manager. Did you forget to add the @contextmanager "
55 "decorator?"
56 )
57
58 raise TypeError(
59 f"__contextmanager__() did not return a context manager object, "
60 f"but {cm.__class__!r}"
61 )
62
63 if cm is self:
64 raise TypeError(
65 f"{self.__class__.__qualname__}.__contextmanager__() returned "
66 f"self. Did you forget to add the @contextmanager decorator and a "
67 f"'yield' statement?"
68 )
69
70 value = cm.__enter__()
71 self.__cm = cm
72 return value
73
74 @final
75 def __exit__(
76 self: _SupportsCtxMgr[object, _ExitT_co],
77 exc_type: type[BaseException] | None,
78 exc_val: BaseException | None,
79 exc_tb: TracebackType | None,
80 ) -> _ExitT_co:
81 # Needed for mypy to assume self still has the __cm member
82 assert isinstance(self, ContextManagerMixin)
83 if self.__cm is None:
84 raise RuntimeError(
85 f"this {self.__class__.__qualname__} has not been entered yet"
86 )
87
88 # Prevent circular references
89 cm = self.__cm
90 del self.__cm
91
92 return cast(_ExitT_co, cm.__exit__(exc_type, exc_val, exc_tb))
93
94 @abstractmethod
95 def __contextmanager__(self) -> AbstractContextManager[object, bool | None]:
96 """
97 Implement your context manager logic here.
98
99 This method **must** be decorated with
100 :func:`@contextmanager <contextlib.contextmanager>`.
101
102 .. note:: Remember that the ``yield`` will raise any exception raised in the
103 enclosed context block, so use a ``finally:`` block to clean up resources!
104
105 :return: a context manager object
106 """
107
108
109class AsyncContextManagerMixin:
110 """
111 Mixin class providing async context manager functionality via a generator-based
112 implementation.
113
114 This class allows you to implement a context manager via
115 :meth:`__asynccontextmanager__`. The mechanics are meant to mirror those of
116 :func:`@asynccontextmanager <contextlib.asynccontextmanager>`.
117
118 .. note:: Classes using this mix-in are not reentrant as context managers, meaning
119 that once you enter it, you can't re-enter before first exiting it.
120
121 .. seealso:: :doc:`contextmanagers`
122 """
123
124 __cm: AbstractAsyncContextManager[object, bool | None] | None = None
125
126 @final
127 async def __aenter__(self: _SupportsAsyncCtxMgr[_T_co, bool | None]) -> _T_co:
128 # Needed for mypy to assume self still has the __cm member
129 assert isinstance(self, AsyncContextManagerMixin)
130 if self.__cm is not None:
131 raise RuntimeError(
132 f"this {self.__class__.__qualname__} has already been entered"
133 )
134
135 cm = self.__asynccontextmanager__()
136 if not isinstance(cm, AbstractAsyncContextManager):
137 if isasyncgen(cm):
138 raise TypeError(
139 "__asynccontextmanager__() returned an async generator instead of "
140 "an async context manager. Did you forget to add the "
141 "@asynccontextmanager decorator?"
142 )
143 elif iscoroutine(cm):
144 cm.close()
145 raise TypeError(
146 "__asynccontextmanager__() returned a coroutine object instead of "
147 "an async context manager. Did you forget to add the "
148 "@asynccontextmanager decorator and a 'yield' statement?"
149 )
150
151 raise TypeError(
152 f"__asynccontextmanager__() did not return an async context manager, "
153 f"but {cm.__class__!r}"
154 )
155
156 if cm is self:
157 raise TypeError(
158 f"{self.__class__.__qualname__}.__asynccontextmanager__() returned "
159 f"self. Did you forget to add the @asynccontextmanager decorator and a "
160 f"'yield' statement?"
161 )
162
163 value = await cm.__aenter__()
164 self.__cm = cm
165 return value
166
167 @final
168 async def __aexit__(
169 self: _SupportsAsyncCtxMgr[object, _ExitT_co],
170 exc_type: type[BaseException] | None,
171 exc_val: BaseException | None,
172 exc_tb: TracebackType | None,
173 ) -> _ExitT_co:
174 assert isinstance(self, AsyncContextManagerMixin)
175 if self.__cm is None:
176 raise RuntimeError(
177 f"this {self.__class__.__qualname__} has not been entered yet"
178 )
179
180 # Prevent circular references
181 cm = self.__cm
182 del self.__cm
183
184 return cast(_ExitT_co, await cm.__aexit__(exc_type, exc_val, exc_tb))
185
186 @abstractmethod
187 def __asynccontextmanager__(
188 self,
189 ) -> AbstractAsyncContextManager[object, bool | None]:
190 """
191 Implement your async context manager logic here.
192
193 This method **must** be decorated with
194 :func:`@asynccontextmanager <contextlib.asynccontextmanager>`.
195
196 .. note:: Remember that the ``yield`` will raise any exception raised in the
197 enclosed context block, so use a ``finally:`` block to clean up resources!
198
199 :return: an async context manager object
200 """