Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_channel.py: 41%

185 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-06 06:03 +0000

1# Copyright 2019 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Asyncio Python.""" 

15 

16import asyncio 

17import sys 

18from typing import Any, Iterable, List, Optional, Sequence 

19 

20import grpc 

21from grpc import _common 

22from grpc import _compression 

23from grpc import _grpcio_metadata 

24from grpc._cython import cygrpc 

25 

26from . import _base_call 

27from . import _base_channel 

28from ._call import StreamStreamCall 

29from ._call import StreamUnaryCall 

30from ._call import UnaryStreamCall 

31from ._call import UnaryUnaryCall 

32from ._interceptor import ClientInterceptor 

33from ._interceptor import InterceptedStreamStreamCall 

34from ._interceptor import InterceptedStreamUnaryCall 

35from ._interceptor import InterceptedUnaryStreamCall 

36from ._interceptor import InterceptedUnaryUnaryCall 

37from ._interceptor import StreamStreamClientInterceptor 

38from ._interceptor import StreamUnaryClientInterceptor 

39from ._interceptor import UnaryStreamClientInterceptor 

40from ._interceptor import UnaryUnaryClientInterceptor 

41from ._metadata import Metadata 

42from ._typing import ChannelArgumentType 

43from ._typing import DeserializingFunction 

44from ._typing import RequestIterableType 

45from ._typing import SerializingFunction 

46from ._utils import _timeout_to_deadline 

47 

48_USER_AGENT = 'grpc-python-asyncio/{}'.format(_grpcio_metadata.__version__) 

49 

50if sys.version_info[1] < 7: 

51 

52 def _all_tasks() -> Iterable[asyncio.Task]: 

53 return asyncio.Task.all_tasks() 

54else: 

55 

56 def _all_tasks() -> Iterable[asyncio.Task]: 

57 return asyncio.all_tasks() 

58 

59 

60def _augment_channel_arguments(base_options: ChannelArgumentType, 

61 compression: Optional[grpc.Compression]): 

62 compression_channel_argument = _compression.create_channel_option( 

63 compression) 

64 user_agent_channel_argument = (( 

65 cygrpc.ChannelArgKey.primary_user_agent_string, 

66 _USER_AGENT, 

67 ),) 

68 return tuple(base_options 

69 ) + compression_channel_argument + user_agent_channel_argument 

70 

71 

72class _BaseMultiCallable: 

73 """Base class of all multi callable objects. 

74 

75 Handles the initialization logic and stores common attributes. 

76 """ 

77 _loop: asyncio.AbstractEventLoop 

78 _channel: cygrpc.AioChannel 

79 _method: bytes 

80 _request_serializer: SerializingFunction 

81 _response_deserializer: DeserializingFunction 

82 _interceptors: Optional[Sequence[ClientInterceptor]] 

83 _references: List[Any] 

84 _loop: asyncio.AbstractEventLoop 

85 

86 # pylint: disable=too-many-arguments 

87 def __init__( 

88 self, 

89 channel: cygrpc.AioChannel, 

90 method: bytes, 

91 request_serializer: SerializingFunction, 

92 response_deserializer: DeserializingFunction, 

93 interceptors: Optional[Sequence[ClientInterceptor]], 

94 references: List[Any], 

95 loop: asyncio.AbstractEventLoop, 

96 ) -> None: 

97 self._loop = loop 

98 self._channel = channel 

99 self._method = method 

100 self._request_serializer = request_serializer 

101 self._response_deserializer = response_deserializer 

102 self._interceptors = interceptors 

103 self._references = references 

104 

105 @staticmethod 

106 def _init_metadata( 

107 metadata: Optional[Metadata] = None, 

108 compression: Optional[grpc.Compression] = None) -> Metadata: 

109 """Based on the provided values for <metadata> or <compression> initialise the final 

110 metadata, as it should be used for the current call. 

111 """ 

112 metadata = metadata or Metadata() 

113 if compression: 

114 metadata = Metadata( 

115 *_compression.augment_metadata(metadata, compression)) 

116 return metadata 

117 

118 

119class UnaryUnaryMultiCallable(_BaseMultiCallable, 

120 _base_channel.UnaryUnaryMultiCallable): 

121 

122 def __call__( 

123 self, 

124 request: Any, 

125 *, 

126 timeout: Optional[float] = None, 

127 metadata: Optional[Metadata] = None, 

128 credentials: Optional[grpc.CallCredentials] = None, 

129 wait_for_ready: Optional[bool] = None, 

130 compression: Optional[grpc.Compression] = None 

131 ) -> _base_call.UnaryUnaryCall: 

132 

133 metadata = self._init_metadata(metadata, compression) 

134 if not self._interceptors: 

135 call = UnaryUnaryCall(request, _timeout_to_deadline(timeout), 

136 metadata, credentials, wait_for_ready, 

137 self._channel, self._method, 

138 self._request_serializer, 

139 self._response_deserializer, self._loop) 

140 else: 

141 call = InterceptedUnaryUnaryCall( 

142 self._interceptors, request, timeout, metadata, credentials, 

143 wait_for_ready, self._channel, self._method, 

144 self._request_serializer, self._response_deserializer, 

145 self._loop) 

146 

147 return call 

148 

149 

150class UnaryStreamMultiCallable(_BaseMultiCallable, 

151 _base_channel.UnaryStreamMultiCallable): 

152 

153 def __call__( 

154 self, 

155 request: Any, 

156 *, 

157 timeout: Optional[float] = None, 

158 metadata: Optional[Metadata] = None, 

159 credentials: Optional[grpc.CallCredentials] = None, 

160 wait_for_ready: Optional[bool] = None, 

161 compression: Optional[grpc.Compression] = None 

162 ) -> _base_call.UnaryStreamCall: 

163 

164 metadata = self._init_metadata(metadata, compression) 

165 deadline = _timeout_to_deadline(timeout) 

166 

167 if not self._interceptors: 

168 call = UnaryStreamCall(request, deadline, metadata, credentials, 

169 wait_for_ready, self._channel, self._method, 

170 self._request_serializer, 

171 self._response_deserializer, self._loop) 

172 else: 

173 call = InterceptedUnaryStreamCall( 

174 self._interceptors, request, deadline, metadata, credentials, 

175 wait_for_ready, self._channel, self._method, 

176 self._request_serializer, self._response_deserializer, 

177 self._loop) 

178 

179 return call 

180 

181 

182class StreamUnaryMultiCallable(_BaseMultiCallable, 

183 _base_channel.StreamUnaryMultiCallable): 

184 

185 def __call__( 

186 self, 

187 request_iterator: Optional[RequestIterableType] = None, 

188 timeout: Optional[float] = None, 

189 metadata: Optional[Metadata] = None, 

190 credentials: Optional[grpc.CallCredentials] = None, 

191 wait_for_ready: Optional[bool] = None, 

192 compression: Optional[grpc.Compression] = None 

193 ) -> _base_call.StreamUnaryCall: 

194 

195 metadata = self._init_metadata(metadata, compression) 

196 deadline = _timeout_to_deadline(timeout) 

197 

198 if not self._interceptors: 

199 call = StreamUnaryCall(request_iterator, deadline, metadata, 

200 credentials, wait_for_ready, self._channel, 

201 self._method, self._request_serializer, 

202 self._response_deserializer, self._loop) 

203 else: 

204 call = InterceptedStreamUnaryCall( 

205 self._interceptors, request_iterator, deadline, metadata, 

206 credentials, wait_for_ready, self._channel, self._method, 

207 self._request_serializer, self._response_deserializer, 

208 self._loop) 

209 

210 return call 

211 

212 

213class StreamStreamMultiCallable(_BaseMultiCallable, 

214 _base_channel.StreamStreamMultiCallable): 

215 

216 def __call__( 

217 self, 

218 request_iterator: Optional[RequestIterableType] = None, 

219 timeout: Optional[float] = None, 

220 metadata: Optional[Metadata] = None, 

221 credentials: Optional[grpc.CallCredentials] = None, 

222 wait_for_ready: Optional[bool] = None, 

223 compression: Optional[grpc.Compression] = None 

224 ) -> _base_call.StreamStreamCall: 

225 

226 metadata = self._init_metadata(metadata, compression) 

227 deadline = _timeout_to_deadline(timeout) 

228 

229 if not self._interceptors: 

230 call = StreamStreamCall(request_iterator, deadline, metadata, 

231 credentials, wait_for_ready, self._channel, 

232 self._method, self._request_serializer, 

233 self._response_deserializer, self._loop) 

234 else: 

235 call = InterceptedStreamStreamCall( 

236 self._interceptors, request_iterator, deadline, metadata, 

237 credentials, wait_for_ready, self._channel, self._method, 

238 self._request_serializer, self._response_deserializer, 

239 self._loop) 

240 

241 return call 

242 

243 

244class Channel(_base_channel.Channel): 

245 _loop: asyncio.AbstractEventLoop 

246 _channel: cygrpc.AioChannel 

247 _unary_unary_interceptors: List[UnaryUnaryClientInterceptor] 

248 _unary_stream_interceptors: List[UnaryStreamClientInterceptor] 

249 _stream_unary_interceptors: List[StreamUnaryClientInterceptor] 

250 _stream_stream_interceptors: List[StreamStreamClientInterceptor] 

251 

252 def __init__(self, target: str, options: ChannelArgumentType, 

253 credentials: Optional[grpc.ChannelCredentials], 

254 compression: Optional[grpc.Compression], 

255 interceptors: Optional[Sequence[ClientInterceptor]]): 

256 """Constructor. 

257 

258 Args: 

259 target: The target to which to connect. 

260 options: Configuration options for the channel. 

261 credentials: A cygrpc.ChannelCredentials or None. 

262 compression: An optional value indicating the compression method to be 

263 used over the lifetime of the channel. 

264 interceptors: An optional list of interceptors that would be used for 

265 intercepting any RPC executed with that channel. 

266 """ 

267 self._unary_unary_interceptors = [] 

268 self._unary_stream_interceptors = [] 

269 self._stream_unary_interceptors = [] 

270 self._stream_stream_interceptors = [] 

271 

272 if interceptors is not None: 

273 for interceptor in interceptors: 

274 if isinstance(interceptor, UnaryUnaryClientInterceptor): 

275 self._unary_unary_interceptors.append(interceptor) 

276 elif isinstance(interceptor, UnaryStreamClientInterceptor): 

277 self._unary_stream_interceptors.append(interceptor) 

278 elif isinstance(interceptor, StreamUnaryClientInterceptor): 

279 self._stream_unary_interceptors.append(interceptor) 

280 elif isinstance(interceptor, StreamStreamClientInterceptor): 

281 self._stream_stream_interceptors.append(interceptor) 

282 else: 

283 raise ValueError( 

284 "Interceptor {} must be ".format(interceptor) + 

285 "{} or ".format(UnaryUnaryClientInterceptor.__name__) + 

286 "{} or ".format(UnaryStreamClientInterceptor.__name__) + 

287 "{} or ".format(StreamUnaryClientInterceptor.__name__) + 

288 "{}. ".format(StreamStreamClientInterceptor.__name__)) 

289 

290 self._loop = cygrpc.get_working_loop() 

291 self._channel = cygrpc.AioChannel( 

292 _common.encode(target), 

293 _augment_channel_arguments(options, compression), credentials, 

294 self._loop) 

295 

296 async def __aenter__(self): 

297 return self 

298 

299 async def __aexit__(self, exc_type, exc_val, exc_tb): 

300 await self._close(None) 

301 

302 async def _close(self, grace): # pylint: disable=too-many-branches 

303 if self._channel.closed(): 

304 return 

305 

306 # No new calls will be accepted by the Cython channel. 

307 self._channel.closing() 

308 

309 # Iterate through running tasks 

310 tasks = _all_tasks() 

311 calls = [] 

312 call_tasks = [] 

313 for task in tasks: 

314 try: 

315 stack = task.get_stack(limit=1) 

316 except AttributeError as attribute_error: 

317 # NOTE(lidiz) tl;dr: If the Task is created with a CPython 

318 # object, it will trigger AttributeError. 

319 # 

320 # In the global finalizer, the event loop schedules 

321 # a CPython PyAsyncGenAThrow object. 

322 # https://github.com/python/cpython/blob/00e45877e33d32bb61aa13a2033e3bba370bda4d/Lib/asyncio/base_events.py#L484 

323 # 

324 # However, the PyAsyncGenAThrow object is written in C and 

325 # failed to include the normal Python frame objects. Hence, 

326 # this exception is a false negative, and it is safe to ignore 

327 # the failure. It is fixed by https://github.com/python/cpython/pull/18669, 

328 # but not available until 3.9 or 3.8.3. So, we have to keep it 

329 # for a while. 

330 # TODO(lidiz) drop this hack after 3.8 deprecation 

331 if 'frame' in str(attribute_error): 

332 continue 

333 else: 

334 raise 

335 

336 # If the Task is created by a C-extension, the stack will be empty. 

337 if not stack: 

338 continue 

339 

340 # Locate ones created by `aio.Call`. 

341 frame = stack[0] 

342 candidate = frame.f_locals.get('self') 

343 if candidate: 

344 if isinstance(candidate, _base_call.Call): 

345 if hasattr(candidate, '_channel'): 

346 # For intercepted Call object 

347 if candidate._channel is not self._channel: 

348 continue 

349 elif hasattr(candidate, '_cython_call'): 

350 # For normal Call object 

351 if candidate._cython_call._channel is not self._channel: 

352 continue 

353 else: 

354 # Unidentified Call object 

355 raise cygrpc.InternalError( 

356 f'Unrecognized call object: {candidate}') 

357 

358 calls.append(candidate) 

359 call_tasks.append(task) 

360 

361 # If needed, try to wait for them to finish. 

362 # Call objects are not always awaitables. 

363 if grace and call_tasks: 

364 await asyncio.wait(call_tasks, timeout=grace) 

365 

366 # Time to cancel existing calls. 

367 for call in calls: 

368 call.cancel() 

369 

370 # Destroy the channel 

371 self._channel.close() 

372 

373 async def close(self, grace: Optional[float] = None): 

374 await self._close(grace) 

375 

376 def __del__(self): 

377 if hasattr(self, '_channel'): 

378 if not self._channel.closed(): 

379 self._channel.close() 

380 

381 def get_state(self, 

382 try_to_connect: bool = False) -> grpc.ChannelConnectivity: 

383 result = self._channel.check_connectivity_state(try_to_connect) 

384 return _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[result] 

385 

386 async def wait_for_state_change( 

387 self, 

388 last_observed_state: grpc.ChannelConnectivity, 

389 ) -> None: 

390 assert await self._channel.watch_connectivity_state( 

391 last_observed_state.value[0], None) 

392 

393 async def channel_ready(self) -> None: 

394 state = self.get_state(try_to_connect=True) 

395 while state != grpc.ChannelConnectivity.READY: 

396 await self.wait_for_state_change(state) 

397 state = self.get_state(try_to_connect=True) 

398 

399 def unary_unary( 

400 self, 

401 method: str, 

402 request_serializer: Optional[SerializingFunction] = None, 

403 response_deserializer: Optional[DeserializingFunction] = None 

404 ) -> UnaryUnaryMultiCallable: 

405 return UnaryUnaryMultiCallable(self._channel, _common.encode(method), 

406 request_serializer, 

407 response_deserializer, 

408 self._unary_unary_interceptors, [self], 

409 self._loop) 

410 

411 def unary_stream( 

412 self, 

413 method: str, 

414 request_serializer: Optional[SerializingFunction] = None, 

415 response_deserializer: Optional[DeserializingFunction] = None 

416 ) -> UnaryStreamMultiCallable: 

417 return UnaryStreamMultiCallable(self._channel, _common.encode(method), 

418 request_serializer, 

419 response_deserializer, 

420 self._unary_stream_interceptors, [self], 

421 self._loop) 

422 

423 def stream_unary( 

424 self, 

425 method: str, 

426 request_serializer: Optional[SerializingFunction] = None, 

427 response_deserializer: Optional[DeserializingFunction] = None 

428 ) -> StreamUnaryMultiCallable: 

429 return StreamUnaryMultiCallable(self._channel, _common.encode(method), 

430 request_serializer, 

431 response_deserializer, 

432 self._stream_unary_interceptors, [self], 

433 self._loop) 

434 

435 def stream_stream( 

436 self, 

437 method: str, 

438 request_serializer: Optional[SerializingFunction] = None, 

439 response_deserializer: Optional[DeserializingFunction] = None 

440 ) -> StreamStreamMultiCallable: 

441 return StreamStreamMultiCallable(self._channel, _common.encode(method), 

442 request_serializer, 

443 response_deserializer, 

444 self._stream_stream_interceptors, 

445 [self], self._loop) 

446 

447 

448def insecure_channel( 

449 target: str, 

450 options: Optional[ChannelArgumentType] = None, 

451 compression: Optional[grpc.Compression] = None, 

452 interceptors: Optional[Sequence[ClientInterceptor]] = None): 

453 """Creates an insecure asynchronous Channel to a server. 

454 

455 Args: 

456 target: The server address 

457 options: An optional list of key-value pairs (:term:`channel_arguments` 

458 in gRPC Core runtime) to configure the channel. 

459 compression: An optional value indicating the compression method to be 

460 used over the lifetime of the channel. 

461 interceptors: An optional sequence of interceptors that will be executed for 

462 any call executed with this channel. 

463 

464 Returns: 

465 A Channel. 

466 """ 

467 return Channel(target, () if options is None else options, None, 

468 compression, interceptors) 

469 

470 

471def secure_channel(target: str, 

472 credentials: grpc.ChannelCredentials, 

473 options: Optional[ChannelArgumentType] = None, 

474 compression: Optional[grpc.Compression] = None, 

475 interceptors: Optional[Sequence[ClientInterceptor]] = None): 

476 """Creates a secure asynchronous Channel to a server. 

477 

478 Args: 

479 target: The server address. 

480 credentials: A ChannelCredentials instance. 

481 options: An optional list of key-value pairs (:term:`channel_arguments` 

482 in gRPC Core runtime) to configure the channel. 

483 compression: An optional value indicating the compression method to be 

484 used over the lifetime of the channel. 

485 interceptors: An optional sequence of interceptors that will be executed for 

486 any call executed with this channel. 

487 

488 Returns: 

489 An aio.Channel. 

490 """ 

491 return Channel(target, () if options is None else options, 

492 credentials._credentials, compression, interceptors)