Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpclib/utils.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import sys
2import signal
3import asyncio
4import warnings
6from types import TracebackType
7from typing import TYPE_CHECKING, Optional, Set, Type, ContextManager, List
8from typing import Iterator, Collection, Callable, Any, cast
9from functools import wraps
10from contextlib import contextmanager
13if sys.version_info > (3, 7):
14 _current_task = asyncio.current_task
15else:
16 _current_task = asyncio.Task.current_task
19if TYPE_CHECKING:
20 from .metadata import Deadline # noqa
21 from ._typing import IServable, IClosable # noqa
24class Wrapper(ContextManager[None]):
25 """Special wrapper for coroutines to wake them up in case of some error.
27 Example:
29 .. code-block:: python3
31 w = Wrapper()
33 async def blocking_call():
34 with w:
35 await asyncio.sleep(10)
37 # and somewhere else:
38 w.cancel(NoNeedToWaitError('With explanation'))
40 """
41 _error: Optional[Exception] = None
43 cancelled: Optional[bool] = None
44 cancel_failed: Optional[bool] = None
46 def __init__(self) -> None:
47 self._tasks: Set['asyncio.Task[Any]'] = set()
49 def __enter__(self) -> None:
50 if self._error is not None:
51 raise self._error
53 task = _current_task()
54 if task is None:
55 raise RuntimeError('Called not inside a task')
57 self._tasks.add(task)
59 def __exit__(
60 self,
61 exc_type: Optional[Type[BaseException]],
62 exc_val: Optional[BaseException],
63 exc_tb: Optional[TracebackType],
64 ) -> None:
65 task = _current_task()
66 assert task
67 self._tasks.discard(task)
68 if self._error is not None:
69 self.cancel_failed = exc_type is not asyncio.CancelledError
70 raise self._error
72 def cancel(self, error: Exception) -> None:
73 self._error = error
74 for task in self._tasks:
75 task.cancel()
76 self.cancelled = True
79class DeadlineWrapper(Wrapper):
80 """Deadline wrapper to specify deadline once for any number of awaiting
81 method calls.
83 Example:
85 .. code-block:: python3
87 dw = DeadlineWrapper()
89 with dw.start(deadline):
90 await handle_request()
92 # somewhere during request handling:
94 async def blocking_call():
95 with dw:
96 await asyncio.sleep(10)
98 """
99 @contextmanager
100 def start(self, deadline: 'Deadline') -> Iterator[None]:
101 timeout = deadline.time_remaining()
102 if not timeout:
103 raise asyncio.TimeoutError('Deadline exceeded')
105 def callback() -> None:
106 self.cancel(asyncio.TimeoutError('Deadline exceeded'))
108 loop = asyncio.get_event_loop()
109 timer = loop.call_later(timeout, callback)
110 try:
111 yield
112 finally:
113 timer.cancel()
116def _service_name(service: 'IServable') -> str:
117 methods = service.__mapping__()
118 method_name = next(iter(methods), None)
119 assert method_name is not None
120 _, service_name, _ = method_name.split('/')
121 return service_name
124def _first_stage(
125 sig_num: 'signal.Signals',
126 servers: Collection['IClosable'],
127) -> None:
128 fail = False
129 for server in servers:
130 try:
131 server.close()
132 except RuntimeError:
133 # probably server wasn't started yet
134 fail = True
135 if fail:
136 # using second stage in case of error will ensure that non-closed
137 # server wont start later
138 _second_stage(sig_num)
141def _second_stage(sig_num: 'signal.Signals') -> None:
142 raise SystemExit(128 + sig_num)
145def _exit_handler(
146 sig_num: int,
147 servers: Collection['IClosable'],
148 flag: List[bool],
149) -> None:
150 if flag:
151 _second_stage(cast('signal.Signals', sig_num))
152 else:
153 _first_stage(cast('signal.Signals', sig_num), servers)
154 flag.append(True)
157@contextmanager
158def graceful_exit(
159 servers: Collection['IClosable'],
160 *,
161 loop: Optional[asyncio.AbstractEventLoop] = None,
162 signals: Collection[int] = (signal.SIGINT, signal.SIGTERM),
163) -> Iterator[None]:
164 """Utility context-manager to help properly shutdown server in response to
165 the OS signals
167 By default this context-manager handles ``SIGINT`` and ``SIGTERM`` signals.
169 There are two stages:
171 1. first received signal closes servers
172 2. subsequent signals raise ``SystemExit`` exception
174 Example:
176 .. code-block:: python3
178 async def main(...):
179 ...
180 with graceful_exit([server]):
181 await server.start(host, port)
182 print('Serving on {}:{}'.format(host, port))
183 await server.wait_closed()
184 print('Server closed')
186 First stage calls ``server.close()`` and ``await server.wait_closed()``
187 should complete successfully without errors. If server wasn't started yet,
188 second stage runs to prevent server start.
190 Second stage raises ``SystemExit`` exception, but you will receive
191 ``asyncio.CancelledError`` in your ``async def main()`` coroutine. You
192 can use ``try..finally`` constructs and context-managers to properly handle
193 this error.
195 This context-manager is designed to work in cooperation with
196 :py:func:`python:asyncio.run` function:
198 .. code-block:: python3
200 if __name__ == '__main__':
201 asyncio.run(main())
203 :param servers: list of servers
204 :param loop: (deprecated) asyncio-compatible event loop
205 :param signals: set of the OS signals to handle
207 .. note:: Not supported in Windows
208 """
209 if loop:
210 warnings.warn("The loop argument is deprecated and scheduled "
211 "for removal in grpclib 0.5",
212 DeprecationWarning, stacklevel=2)
214 loop = loop or asyncio.get_event_loop()
215 signals = set(signals)
216 flag: 'List[bool]' = []
217 for sig_num in signals:
218 loop.add_signal_handler(sig_num, _exit_handler, sig_num, servers, flag)
219 try:
220 yield
221 finally:
222 for sig_num in signals:
223 loop.remove_signal_handler(sig_num)
226def _cached(func: Callable[[], Any]) -> Callable[[], Any]:
227 @wraps(func)
228 def wrapper() -> Any:
229 try:
230 return func.__result__ # type: ignore
231 except AttributeError:
232 func.__result__ = func() # type: ignore
233 return func.__result__ # type: ignore
234 return wrapper