Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_channel.py: 42%

184 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:37 +0000

1# Copyright 2019 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Asyncio Python.""" 

15 

16import asyncio 

17import sys 

18from typing import Any, Iterable, List, Optional, Sequence 

19 

20import grpc 

21from grpc import _common 

22from grpc import _compression 

23from grpc import _grpcio_metadata 

24from grpc._cython import cygrpc 

25 

26from . import _base_call 

27from . import _base_channel 

28from ._call import StreamStreamCall 

29from ._call import StreamUnaryCall 

30from ._call import UnaryStreamCall 

31from ._call import UnaryUnaryCall 

32from ._interceptor import ClientInterceptor 

33from ._interceptor import InterceptedStreamStreamCall 

34from ._interceptor import InterceptedStreamUnaryCall 

35from ._interceptor import InterceptedUnaryStreamCall 

36from ._interceptor import InterceptedUnaryUnaryCall 

37from ._interceptor import StreamStreamClientInterceptor 

38from ._interceptor import StreamUnaryClientInterceptor 

39from ._interceptor import UnaryStreamClientInterceptor 

40from ._interceptor import UnaryUnaryClientInterceptor 

41from ._metadata import Metadata 

42from ._typing import ChannelArgumentType 

43from ._typing import DeserializingFunction 

44from ._typing import RequestIterableType 

45from ._typing import RequestType 

46from ._typing import ResponseType 

47from ._typing import SerializingFunction 

48from ._utils import _timeout_to_deadline 

49 

50_USER_AGENT = "grpc-python-asyncio/{}".format(_grpcio_metadata.__version__) 

51 

52if sys.version_info[1] < 7: 

53 

54 def _all_tasks() -> Iterable[asyncio.Task]: 

55 return asyncio.Task.all_tasks() # pylint: disable=no-member 

56 

57else: 

58 

59 def _all_tasks() -> Iterable[asyncio.Task]: 

60 return asyncio.all_tasks() 

61 

62 

63def _augment_channel_arguments( 

64 base_options: ChannelArgumentType, compression: Optional[grpc.Compression] 

65): 

66 compression_channel_argument = _compression.create_channel_option( 

67 compression 

68 ) 

69 user_agent_channel_argument = ( 

70 ( 

71 cygrpc.ChannelArgKey.primary_user_agent_string, 

72 _USER_AGENT, 

73 ), 

74 ) 

75 return ( 

76 tuple(base_options) 

77 + compression_channel_argument 

78 + user_agent_channel_argument 

79 ) 

80 

81 

82class _BaseMultiCallable: 

83 """Base class of all multi callable objects. 

84 

85 Handles the initialization logic and stores common attributes. 

86 """ 

87 

88 _loop: asyncio.AbstractEventLoop 

89 _channel: cygrpc.AioChannel 

90 _method: bytes 

91 _request_serializer: SerializingFunction 

92 _response_deserializer: DeserializingFunction 

93 _interceptors: Optional[Sequence[ClientInterceptor]] 

94 _references: List[Any] 

95 _loop: asyncio.AbstractEventLoop 

96 

97 # pylint: disable=too-many-arguments 

98 def __init__( 

99 self, 

100 channel: cygrpc.AioChannel, 

101 method: bytes, 

102 request_serializer: SerializingFunction, 

103 response_deserializer: DeserializingFunction, 

104 interceptors: Optional[Sequence[ClientInterceptor]], 

105 references: List[Any], 

106 loop: asyncio.AbstractEventLoop, 

107 ) -> None: 

108 self._loop = loop 

109 self._channel = channel 

110 self._method = method 

111 self._request_serializer = request_serializer 

112 self._response_deserializer = response_deserializer 

113 self._interceptors = interceptors 

114 self._references = references 

115 

116 @staticmethod 

117 def _init_metadata( 

118 metadata: Optional[Metadata] = None, 

119 compression: Optional[grpc.Compression] = None, 

120 ) -> Metadata: 

121 """Based on the provided values for <metadata> or <compression> initialise the final 

122 metadata, as it should be used for the current call. 

123 """ 

124 metadata = metadata or Metadata() 

125 if compression: 

126 metadata = Metadata( 

127 *_compression.augment_metadata(metadata, compression) 

128 ) 

129 return metadata 

130 

131 

132class UnaryUnaryMultiCallable( 

133 _BaseMultiCallable, _base_channel.UnaryUnaryMultiCallable 

134): 

135 def __call__( 

136 self, 

137 request: RequestType, 

138 *, 

139 timeout: Optional[float] = None, 

140 metadata: Optional[Metadata] = None, 

141 credentials: Optional[grpc.CallCredentials] = None, 

142 wait_for_ready: Optional[bool] = None, 

143 compression: Optional[grpc.Compression] = None, 

144 ) -> _base_call.UnaryUnaryCall[RequestType, ResponseType]: 

145 metadata = self._init_metadata(metadata, compression) 

146 if not self._interceptors: 

147 call = UnaryUnaryCall( 

148 request, 

149 _timeout_to_deadline(timeout), 

150 metadata, 

151 credentials, 

152 wait_for_ready, 

153 self._channel, 

154 self._method, 

155 self._request_serializer, 

156 self._response_deserializer, 

157 self._loop, 

158 ) 

159 else: 

160 call = InterceptedUnaryUnaryCall( 

161 self._interceptors, 

162 request, 

163 timeout, 

164 metadata, 

165 credentials, 

166 wait_for_ready, 

167 self._channel, 

168 self._method, 

169 self._request_serializer, 

170 self._response_deserializer, 

171 self._loop, 

172 ) 

173 

174 return call 

175 

176 

177class UnaryStreamMultiCallable( 

178 _BaseMultiCallable, _base_channel.UnaryStreamMultiCallable 

179): 

180 def __call__( 

181 self, 

182 request: RequestType, 

183 *, 

184 timeout: Optional[float] = None, 

185 metadata: Optional[Metadata] = None, 

186 credentials: Optional[grpc.CallCredentials] = None, 

187 wait_for_ready: Optional[bool] = None, 

188 compression: Optional[grpc.Compression] = None, 

189 ) -> _base_call.UnaryStreamCall[RequestType, ResponseType]: 

190 metadata = self._init_metadata(metadata, compression) 

191 

192 if not self._interceptors: 

193 call = UnaryStreamCall( 

194 request, 

195 _timeout_to_deadline(timeout), 

196 metadata, 

197 credentials, 

198 wait_for_ready, 

199 self._channel, 

200 self._method, 

201 self._request_serializer, 

202 self._response_deserializer, 

203 self._loop, 

204 ) 

205 else: 

206 call = InterceptedUnaryStreamCall( 

207 self._interceptors, 

208 request, 

209 timeout, 

210 metadata, 

211 credentials, 

212 wait_for_ready, 

213 self._channel, 

214 self._method, 

215 self._request_serializer, 

216 self._response_deserializer, 

217 self._loop, 

218 ) 

219 

220 return call 

221 

222 

223class StreamUnaryMultiCallable( 

224 _BaseMultiCallable, _base_channel.StreamUnaryMultiCallable 

225): 

226 def __call__( 

227 self, 

228 request_iterator: Optional[RequestIterableType] = None, 

229 timeout: Optional[float] = None, 

230 metadata: Optional[Metadata] = None, 

231 credentials: Optional[grpc.CallCredentials] = None, 

232 wait_for_ready: Optional[bool] = None, 

233 compression: Optional[grpc.Compression] = None, 

234 ) -> _base_call.StreamUnaryCall: 

235 metadata = self._init_metadata(metadata, compression) 

236 

237 if not self._interceptors: 

238 call = StreamUnaryCall( 

239 request_iterator, 

240 _timeout_to_deadline(timeout), 

241 metadata, 

242 credentials, 

243 wait_for_ready, 

244 self._channel, 

245 self._method, 

246 self._request_serializer, 

247 self._response_deserializer, 

248 self._loop, 

249 ) 

250 else: 

251 call = InterceptedStreamUnaryCall( 

252 self._interceptors, 

253 request_iterator, 

254 timeout, 

255 metadata, 

256 credentials, 

257 wait_for_ready, 

258 self._channel, 

259 self._method, 

260 self._request_serializer, 

261 self._response_deserializer, 

262 self._loop, 

263 ) 

264 

265 return call 

266 

267 

268class StreamStreamMultiCallable( 

269 _BaseMultiCallable, _base_channel.StreamStreamMultiCallable 

270): 

271 def __call__( 

272 self, 

273 request_iterator: Optional[RequestIterableType] = None, 

274 timeout: Optional[float] = None, 

275 metadata: Optional[Metadata] = None, 

276 credentials: Optional[grpc.CallCredentials] = None, 

277 wait_for_ready: Optional[bool] = None, 

278 compression: Optional[grpc.Compression] = None, 

279 ) -> _base_call.StreamStreamCall: 

280 metadata = self._init_metadata(metadata, compression) 

281 

282 if not self._interceptors: 

283 call = StreamStreamCall( 

284 request_iterator, 

285 _timeout_to_deadline(timeout), 

286 metadata, 

287 credentials, 

288 wait_for_ready, 

289 self._channel, 

290 self._method, 

291 self._request_serializer, 

292 self._response_deserializer, 

293 self._loop, 

294 ) 

295 else: 

296 call = InterceptedStreamStreamCall( 

297 self._interceptors, 

298 request_iterator, 

299 timeout, 

300 metadata, 

301 credentials, 

302 wait_for_ready, 

303 self._channel, 

304 self._method, 

305 self._request_serializer, 

306 self._response_deserializer, 

307 self._loop, 

308 ) 

309 

310 return call 

311 

312 

313class Channel(_base_channel.Channel): 

314 _loop: asyncio.AbstractEventLoop 

315 _channel: cygrpc.AioChannel 

316 _unary_unary_interceptors: List[UnaryUnaryClientInterceptor] 

317 _unary_stream_interceptors: List[UnaryStreamClientInterceptor] 

318 _stream_unary_interceptors: List[StreamUnaryClientInterceptor] 

319 _stream_stream_interceptors: List[StreamStreamClientInterceptor] 

320 

321 def __init__( 

322 self, 

323 target: str, 

324 options: ChannelArgumentType, 

325 credentials: Optional[grpc.ChannelCredentials], 

326 compression: Optional[grpc.Compression], 

327 interceptors: Optional[Sequence[ClientInterceptor]], 

328 ): 

329 """Constructor. 

330 

331 Args: 

332 target: The target to which to connect. 

333 options: Configuration options for the channel. 

334 credentials: A cygrpc.ChannelCredentials or None. 

335 compression: An optional value indicating the compression method to be 

336 used over the lifetime of the channel. 

337 interceptors: An optional list of interceptors that would be used for 

338 intercepting any RPC executed with that channel. 

339 """ 

340 self._unary_unary_interceptors = [] 

341 self._unary_stream_interceptors = [] 

342 self._stream_unary_interceptors = [] 

343 self._stream_stream_interceptors = [] 

344 

345 if interceptors is not None: 

346 for interceptor in interceptors: 

347 if isinstance(interceptor, UnaryUnaryClientInterceptor): 

348 self._unary_unary_interceptors.append(interceptor) 

349 elif isinstance(interceptor, UnaryStreamClientInterceptor): 

350 self._unary_stream_interceptors.append(interceptor) 

351 elif isinstance(interceptor, StreamUnaryClientInterceptor): 

352 self._stream_unary_interceptors.append(interceptor) 

353 elif isinstance(interceptor, StreamStreamClientInterceptor): 

354 self._stream_stream_interceptors.append(interceptor) 

355 else: 

356 raise ValueError( 

357 "Interceptor {} must be ".format(interceptor) 

358 + "{} or ".format(UnaryUnaryClientInterceptor.__name__) 

359 + "{} or ".format(UnaryStreamClientInterceptor.__name__) 

360 + "{} or ".format(StreamUnaryClientInterceptor.__name__) 

361 + "{}. ".format(StreamStreamClientInterceptor.__name__) 

362 ) 

363 

364 self._loop = cygrpc.get_working_loop() 

365 self._channel = cygrpc.AioChannel( 

366 _common.encode(target), 

367 _augment_channel_arguments(options, compression), 

368 credentials, 

369 self._loop, 

370 ) 

371 

372 async def __aenter__(self): 

373 return self 

374 

375 async def __aexit__(self, exc_type, exc_val, exc_tb): 

376 await self._close(None) 

377 

378 async def _close(self, grace): # pylint: disable=too-many-branches 

379 if self._channel.closed(): 

380 return 

381 

382 # No new calls will be accepted by the Cython channel. 

383 self._channel.closing() 

384 

385 # Iterate through running tasks 

386 tasks = _all_tasks() 

387 calls = [] 

388 call_tasks = [] 

389 for task in tasks: 

390 try: 

391 stack = task.get_stack(limit=1) 

392 except AttributeError as attribute_error: 

393 # NOTE(lidiz) tl;dr: If the Task is created with a CPython 

394 # object, it will trigger AttributeError. 

395 # 

396 # In the global finalizer, the event loop schedules 

397 # a CPython PyAsyncGenAThrow object. 

398 # https://github.com/python/cpython/blob/00e45877e33d32bb61aa13a2033e3bba370bda4d/Lib/asyncio/base_events.py#L484 

399 # 

400 # However, the PyAsyncGenAThrow object is written in C and 

401 # failed to include the normal Python frame objects. Hence, 

402 # this exception is a false negative, and it is safe to ignore 

403 # the failure. It is fixed by https://github.com/python/cpython/pull/18669, 

404 # but not available until 3.9 or 3.8.3. So, we have to keep it 

405 # for a while. 

406 # TODO(lidiz) drop this hack after 3.8 deprecation 

407 if "frame" in str(attribute_error): 

408 continue 

409 else: 

410 raise 

411 

412 # If the Task is created by a C-extension, the stack will be empty. 

413 if not stack: 

414 continue 

415 

416 # Locate ones created by `aio.Call`. 

417 frame = stack[0] 

418 candidate = frame.f_locals.get("self") 

419 if candidate: 

420 if isinstance(candidate, _base_call.Call): 

421 if hasattr(candidate, "_channel"): 

422 # For intercepted Call object 

423 if candidate._channel is not self._channel: 

424 continue 

425 elif hasattr(candidate, "_cython_call"): 

426 # For normal Call object 

427 if candidate._cython_call._channel is not self._channel: 

428 continue 

429 else: 

430 # Unidentified Call object 

431 raise cygrpc.InternalError( 

432 f"Unrecognized call object: {candidate}" 

433 ) 

434 

435 calls.append(candidate) 

436 call_tasks.append(task) 

437 

438 # If needed, try to wait for them to finish. 

439 # Call objects are not always awaitables. 

440 if grace and call_tasks: 

441 await asyncio.wait(call_tasks, timeout=grace) 

442 

443 # Time to cancel existing calls. 

444 for call in calls: 

445 call.cancel() 

446 

447 # Destroy the channel 

448 self._channel.close() 

449 

450 async def close(self, grace: Optional[float] = None): 

451 await self._close(grace) 

452 

453 def __del__(self): 

454 if hasattr(self, "_channel"): 

455 if not self._channel.closed(): 

456 self._channel.close() 

457 

458 def get_state( 

459 self, try_to_connect: bool = False 

460 ) -> grpc.ChannelConnectivity: 

461 result = self._channel.check_connectivity_state(try_to_connect) 

462 return _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[result] 

463 

464 async def wait_for_state_change( 

465 self, 

466 last_observed_state: grpc.ChannelConnectivity, 

467 ) -> None: 

468 assert await self._channel.watch_connectivity_state( 

469 last_observed_state.value[0], None 

470 ) 

471 

472 async def channel_ready(self) -> None: 

473 state = self.get_state(try_to_connect=True) 

474 while state != grpc.ChannelConnectivity.READY: 

475 await self.wait_for_state_change(state) 

476 state = self.get_state(try_to_connect=True) 

477 

478 def unary_unary( 

479 self, 

480 method: str, 

481 request_serializer: Optional[SerializingFunction] = None, 

482 response_deserializer: Optional[DeserializingFunction] = None, 

483 ) -> UnaryUnaryMultiCallable: 

484 return UnaryUnaryMultiCallable( 

485 self._channel, 

486 _common.encode(method), 

487 request_serializer, 

488 response_deserializer, 

489 self._unary_unary_interceptors, 

490 [self], 

491 self._loop, 

492 ) 

493 

494 def unary_stream( 

495 self, 

496 method: str, 

497 request_serializer: Optional[SerializingFunction] = None, 

498 response_deserializer: Optional[DeserializingFunction] = None, 

499 ) -> UnaryStreamMultiCallable: 

500 return UnaryStreamMultiCallable( 

501 self._channel, 

502 _common.encode(method), 

503 request_serializer, 

504 response_deserializer, 

505 self._unary_stream_interceptors, 

506 [self], 

507 self._loop, 

508 ) 

509 

510 def stream_unary( 

511 self, 

512 method: str, 

513 request_serializer: Optional[SerializingFunction] = None, 

514 response_deserializer: Optional[DeserializingFunction] = None, 

515 ) -> StreamUnaryMultiCallable: 

516 return StreamUnaryMultiCallable( 

517 self._channel, 

518 _common.encode(method), 

519 request_serializer, 

520 response_deserializer, 

521 self._stream_unary_interceptors, 

522 [self], 

523 self._loop, 

524 ) 

525 

526 def stream_stream( 

527 self, 

528 method: str, 

529 request_serializer: Optional[SerializingFunction] = None, 

530 response_deserializer: Optional[DeserializingFunction] = None, 

531 ) -> StreamStreamMultiCallable: 

532 return StreamStreamMultiCallable( 

533 self._channel, 

534 _common.encode(method), 

535 request_serializer, 

536 response_deserializer, 

537 self._stream_stream_interceptors, 

538 [self], 

539 self._loop, 

540 ) 

541 

542 

543def insecure_channel( 

544 target: str, 

545 options: Optional[ChannelArgumentType] = None, 

546 compression: Optional[grpc.Compression] = None, 

547 interceptors: Optional[Sequence[ClientInterceptor]] = None, 

548): 

549 """Creates an insecure asynchronous Channel to a server. 

550 

551 Args: 

552 target: The server address 

553 options: An optional list of key-value pairs (:term:`channel_arguments` 

554 in gRPC Core runtime) to configure the channel. 

555 compression: An optional value indicating the compression method to be 

556 used over the lifetime of the channel. 

557 interceptors: An optional sequence of interceptors that will be executed for 

558 any call executed with this channel. 

559 

560 Returns: 

561 A Channel. 

562 """ 

563 return Channel( 

564 target, 

565 () if options is None else options, 

566 None, 

567 compression, 

568 interceptors, 

569 ) 

570 

571 

572def secure_channel( 

573 target: str, 

574 credentials: grpc.ChannelCredentials, 

575 options: Optional[ChannelArgumentType] = None, 

576 compression: Optional[grpc.Compression] = None, 

577 interceptors: Optional[Sequence[ClientInterceptor]] = None, 

578): 

579 """Creates a secure asynchronous Channel to a server. 

580 

581 Args: 

582 target: The server address. 

583 credentials: A ChannelCredentials instance. 

584 options: An optional list of key-value pairs (:term:`channel_arguments` 

585 in gRPC Core runtime) to configure the channel. 

586 compression: An optional value indicating the compression method to be 

587 used over the lifetime of the channel. 

588 interceptors: An optional sequence of interceptors that will be executed for 

589 any call executed with this channel. 

590 

591 Returns: 

592 An aio.Channel. 

593 """ 

594 return Channel( 

595 target, 

596 () if options is None else options, 

597 credentials._credentials, 

598 compression, 

599 interceptors, 

600 )