Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpclib/utils.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

97 statements  

1import sys 

2import signal 

3import asyncio 

4import warnings 

5 

6from types import TracebackType 

7from typing import TYPE_CHECKING, Optional, Set, Type, ContextManager, List 

8from typing import Iterator, Collection, Callable, Any, cast 

9from functools import wraps 

10from contextlib import contextmanager 

11 

12 

13if sys.version_info > (3, 7): 

14 _current_task = asyncio.current_task 

15else: 

16 _current_task = asyncio.Task.current_task 

17 

18 

19if TYPE_CHECKING: 

20 from .metadata import Deadline # noqa 

21 from ._typing import IServable, IClosable # noqa 

22 

23 

24class Wrapper(ContextManager[None]): 

25 """Special wrapper for coroutines to wake them up in case of some error. 

26 

27 Example: 

28 

29 .. code-block:: python3 

30 

31 w = Wrapper() 

32 

33 async def blocking_call(): 

34 with w: 

35 await asyncio.sleep(10) 

36 

37 # and somewhere else: 

38 w.cancel(NoNeedToWaitError('With explanation')) 

39 

40 """ 

41 _error: Optional[Exception] = None 

42 

43 cancelled: Optional[bool] = None 

44 cancel_failed: Optional[bool] = None 

45 

46 def __init__(self) -> None: 

47 self._tasks: Set['asyncio.Task[Any]'] = set() 

48 

49 def __enter__(self) -> None: 

50 if self._error is not None: 

51 raise self._error 

52 

53 task = _current_task() 

54 if task is None: 

55 raise RuntimeError('Called not inside a task') 

56 

57 self._tasks.add(task) 

58 

59 def __exit__( 

60 self, 

61 exc_type: Optional[Type[BaseException]], 

62 exc_val: Optional[BaseException], 

63 exc_tb: Optional[TracebackType], 

64 ) -> None: 

65 task = _current_task() 

66 assert task 

67 self._tasks.discard(task) 

68 if self._error is not None: 

69 self.cancel_failed = exc_type is not asyncio.CancelledError 

70 raise self._error 

71 

72 def cancel(self, error: Exception) -> None: 

73 self._error = error 

74 for task in self._tasks: 

75 task.cancel() 

76 self.cancelled = True 

77 

78 

79class DeadlineWrapper(Wrapper): 

80 """Deadline wrapper to specify deadline once for any number of awaiting 

81 method calls. 

82 

83 Example: 

84 

85 .. code-block:: python3 

86 

87 dw = DeadlineWrapper() 

88 

89 with dw.start(deadline): 

90 await handle_request() 

91 

92 # somewhere during request handling: 

93 

94 async def blocking_call(): 

95 with dw: 

96 await asyncio.sleep(10) 

97 

98 """ 

99 @contextmanager 

100 def start(self, deadline: 'Deadline') -> Iterator[None]: 

101 timeout = deadline.time_remaining() 

102 if not timeout: 

103 raise asyncio.TimeoutError('Deadline exceeded') 

104 

105 def callback() -> None: 

106 self.cancel(asyncio.TimeoutError('Deadline exceeded')) 

107 

108 loop = asyncio.get_event_loop() 

109 timer = loop.call_later(timeout, callback) 

110 try: 

111 yield 

112 finally: 

113 timer.cancel() 

114 

115 

116def _service_name(service: 'IServable') -> str: 

117 methods = service.__mapping__() 

118 method_name = next(iter(methods), None) 

119 assert method_name is not None 

120 _, service_name, _ = method_name.split('/') 

121 return service_name 

122 

123 

124def _first_stage( 

125 sig_num: 'signal.Signals', 

126 servers: Collection['IClosable'], 

127) -> None: 

128 fail = False 

129 for server in servers: 

130 try: 

131 server.close() 

132 except RuntimeError: 

133 # probably server wasn't started yet 

134 fail = True 

135 if fail: 

136 # using second stage in case of error will ensure that non-closed 

137 # server wont start later 

138 _second_stage(sig_num) 

139 

140 

141def _second_stage(sig_num: 'signal.Signals') -> None: 

142 raise SystemExit(128 + sig_num) 

143 

144 

145def _exit_handler( 

146 sig_num: int, 

147 servers: Collection['IClosable'], 

148 flag: List[bool], 

149) -> None: 

150 if flag: 

151 _second_stage(cast('signal.Signals', sig_num)) 

152 else: 

153 _first_stage(cast('signal.Signals', sig_num), servers) 

154 flag.append(True) 

155 

156 

157@contextmanager 

158def graceful_exit( 

159 servers: Collection['IClosable'], 

160 *, 

161 loop: Optional[asyncio.AbstractEventLoop] = None, 

162 signals: Collection[int] = (signal.SIGINT, signal.SIGTERM), 

163) -> Iterator[None]: 

164 """Utility context-manager to help properly shutdown server in response to 

165 the OS signals 

166 

167 By default this context-manager handles ``SIGINT`` and ``SIGTERM`` signals. 

168 

169 There are two stages: 

170 

171 1. first received signal closes servers 

172 2. subsequent signals raise ``SystemExit`` exception 

173 

174 Example: 

175 

176 .. code-block:: python3 

177 

178 async def main(...): 

179 ... 

180 with graceful_exit([server]): 

181 await server.start(host, port) 

182 print('Serving on {}:{}'.format(host, port)) 

183 await server.wait_closed() 

184 print('Server closed') 

185 

186 First stage calls ``server.close()`` and ``await server.wait_closed()`` 

187 should complete successfully without errors. If server wasn't started yet, 

188 second stage runs to prevent server start. 

189 

190 Second stage raises ``SystemExit`` exception, but you will receive 

191 ``asyncio.CancelledError`` in your ``async def main()`` coroutine. You 

192 can use ``try..finally`` constructs and context-managers to properly handle 

193 this error. 

194 

195 This context-manager is designed to work in cooperation with 

196 :py:func:`python:asyncio.run` function: 

197 

198 .. code-block:: python3 

199 

200 if __name__ == '__main__': 

201 asyncio.run(main()) 

202 

203 :param servers: list of servers 

204 :param loop: (deprecated) asyncio-compatible event loop 

205 :param signals: set of the OS signals to handle 

206 

207 .. note:: Not supported in Windows 

208 """ 

209 if loop: 

210 warnings.warn("The loop argument is deprecated and scheduled " 

211 "for removal in grpclib 0.5", 

212 DeprecationWarning, stacklevel=2) 

213 

214 loop = loop or asyncio.get_event_loop() 

215 signals = set(signals) 

216 flag: 'List[bool]' = [] 

217 for sig_num in signals: 

218 loop.add_signal_handler(sig_num, _exit_handler, sig_num, servers, flag) 

219 try: 

220 yield 

221 finally: 

222 for sig_num in signals: 

223 loop.remove_signal_handler(sig_num) 

224 

225 

226def _cached(func: Callable[[], Any]) -> Callable[[], Any]: 

227 @wraps(func) 

228 def wrapper() -> Any: 

229 try: 

230 return func.__result__ # type: ignore 

231 except AttributeError: 

232 func.__result__ = func() # type: ignore 

233 return func.__result__ # type: ignore 

234 return wrapper