Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_channel.py: 42%

187 statements  

« prev     ^ index     » next       coverage.py v7.3.0, created at 2023-08-16 06:17 +0000

1# Copyright 2019 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Asyncio Python.""" 

15 

16import asyncio 

17import sys 

18from typing import Any, Iterable, List, Optional, Sequence 

19 

20import grpc 

21from grpc import _common 

22from grpc import _compression 

23from grpc import _grpcio_metadata 

24from grpc._cython import cygrpc 

25 

26from . import _base_call 

27from . import _base_channel 

28from ._call import StreamStreamCall 

29from ._call import StreamUnaryCall 

30from ._call import UnaryStreamCall 

31from ._call import UnaryUnaryCall 

32from ._interceptor import ClientInterceptor 

33from ._interceptor import InterceptedStreamStreamCall 

34from ._interceptor import InterceptedStreamUnaryCall 

35from ._interceptor import InterceptedUnaryStreamCall 

36from ._interceptor import InterceptedUnaryUnaryCall 

37from ._interceptor import StreamStreamClientInterceptor 

38from ._interceptor import StreamUnaryClientInterceptor 

39from ._interceptor import UnaryStreamClientInterceptor 

40from ._interceptor import UnaryUnaryClientInterceptor 

41from ._metadata import Metadata 

42from ._typing import ChannelArgumentType 

43from ._typing import DeserializingFunction 

44from ._typing import RequestIterableType 

45from ._typing import RequestType 

46from ._typing import ResponseType 

47from ._typing import SerializingFunction 

48from ._utils import _timeout_to_deadline 

49 

50_USER_AGENT = "grpc-python-asyncio/{}".format(_grpcio_metadata.__version__) 

51 

52if sys.version_info[1] < 7: 

53 

54 def _all_tasks() -> Iterable[asyncio.Task]: 

55 return asyncio.Task.all_tasks() # pylint: disable=no-member 

56 

57else: 

58 

59 def _all_tasks() -> Iterable[asyncio.Task]: 

60 return asyncio.all_tasks() 

61 

62 

63def _augment_channel_arguments( 

64 base_options: ChannelArgumentType, compression: Optional[grpc.Compression] 

65): 

66 compression_channel_argument = _compression.create_channel_option( 

67 compression 

68 ) 

69 user_agent_channel_argument = ( 

70 ( 

71 cygrpc.ChannelArgKey.primary_user_agent_string, 

72 _USER_AGENT, 

73 ), 

74 ) 

75 return ( 

76 tuple(base_options) 

77 + compression_channel_argument 

78 + user_agent_channel_argument 

79 ) 

80 

81 

82class _BaseMultiCallable: 

83 """Base class of all multi callable objects. 

84 

85 Handles the initialization logic and stores common attributes. 

86 """ 

87 

88 _loop: asyncio.AbstractEventLoop 

89 _channel: cygrpc.AioChannel 

90 _method: bytes 

91 _request_serializer: SerializingFunction 

92 _response_deserializer: DeserializingFunction 

93 _interceptors: Optional[Sequence[ClientInterceptor]] 

94 _references: List[Any] 

95 _loop: asyncio.AbstractEventLoop 

96 

97 # pylint: disable=too-many-arguments 

98 def __init__( 

99 self, 

100 channel: cygrpc.AioChannel, 

101 method: bytes, 

102 request_serializer: SerializingFunction, 

103 response_deserializer: DeserializingFunction, 

104 interceptors: Optional[Sequence[ClientInterceptor]], 

105 references: List[Any], 

106 loop: asyncio.AbstractEventLoop, 

107 ) -> None: 

108 self._loop = loop 

109 self._channel = channel 

110 self._method = method 

111 self._request_serializer = request_serializer 

112 self._response_deserializer = response_deserializer 

113 self._interceptors = interceptors 

114 self._references = references 

115 

116 @staticmethod 

117 def _init_metadata( 

118 metadata: Optional[Metadata] = None, 

119 compression: Optional[grpc.Compression] = None, 

120 ) -> Metadata: 

121 """Based on the provided values for <metadata> or <compression> initialise the final 

122 metadata, as it should be used for the current call. 

123 """ 

124 metadata = metadata or Metadata() 

125 if compression: 

126 metadata = Metadata( 

127 *_compression.augment_metadata(metadata, compression) 

128 ) 

129 return metadata 

130 

131 

132class UnaryUnaryMultiCallable( 

133 _BaseMultiCallable, _base_channel.UnaryUnaryMultiCallable 

134): 

135 def __call__( 

136 self, 

137 request: RequestType, 

138 *, 

139 timeout: Optional[float] = None, 

140 metadata: Optional[Metadata] = None, 

141 credentials: Optional[grpc.CallCredentials] = None, 

142 wait_for_ready: Optional[bool] = None, 

143 compression: Optional[grpc.Compression] = None, 

144 ) -> _base_call.UnaryUnaryCall[RequestType, ResponseType]: 

145 metadata = self._init_metadata(metadata, compression) 

146 if not self._interceptors: 

147 call = UnaryUnaryCall( 

148 request, 

149 _timeout_to_deadline(timeout), 

150 metadata, 

151 credentials, 

152 wait_for_ready, 

153 self._channel, 

154 self._method, 

155 self._request_serializer, 

156 self._response_deserializer, 

157 self._loop, 

158 ) 

159 else: 

160 call = InterceptedUnaryUnaryCall( 

161 self._interceptors, 

162 request, 

163 timeout, 

164 metadata, 

165 credentials, 

166 wait_for_ready, 

167 self._channel, 

168 self._method, 

169 self._request_serializer, 

170 self._response_deserializer, 

171 self._loop, 

172 ) 

173 

174 return call 

175 

176 

177class UnaryStreamMultiCallable( 

178 _BaseMultiCallable, _base_channel.UnaryStreamMultiCallable 

179): 

180 def __call__( 

181 self, 

182 request: RequestType, 

183 *, 

184 timeout: Optional[float] = None, 

185 metadata: Optional[Metadata] = None, 

186 credentials: Optional[grpc.CallCredentials] = None, 

187 wait_for_ready: Optional[bool] = None, 

188 compression: Optional[grpc.Compression] = None, 

189 ) -> _base_call.UnaryStreamCall[RequestType, ResponseType]: 

190 metadata = self._init_metadata(metadata, compression) 

191 deadline = _timeout_to_deadline(timeout) 

192 

193 if not self._interceptors: 

194 call = UnaryStreamCall( 

195 request, 

196 deadline, 

197 metadata, 

198 credentials, 

199 wait_for_ready, 

200 self._channel, 

201 self._method, 

202 self._request_serializer, 

203 self._response_deserializer, 

204 self._loop, 

205 ) 

206 else: 

207 call = InterceptedUnaryStreamCall( 

208 self._interceptors, 

209 request, 

210 deadline, 

211 metadata, 

212 credentials, 

213 wait_for_ready, 

214 self._channel, 

215 self._method, 

216 self._request_serializer, 

217 self._response_deserializer, 

218 self._loop, 

219 ) 

220 

221 return call 

222 

223 

224class StreamUnaryMultiCallable( 

225 _BaseMultiCallable, _base_channel.StreamUnaryMultiCallable 

226): 

227 def __call__( 

228 self, 

229 request_iterator: Optional[RequestIterableType] = None, 

230 timeout: Optional[float] = None, 

231 metadata: Optional[Metadata] = None, 

232 credentials: Optional[grpc.CallCredentials] = None, 

233 wait_for_ready: Optional[bool] = None, 

234 compression: Optional[grpc.Compression] = None, 

235 ) -> _base_call.StreamUnaryCall: 

236 metadata = self._init_metadata(metadata, compression) 

237 deadline = _timeout_to_deadline(timeout) 

238 

239 if not self._interceptors: 

240 call = StreamUnaryCall( 

241 request_iterator, 

242 deadline, 

243 metadata, 

244 credentials, 

245 wait_for_ready, 

246 self._channel, 

247 self._method, 

248 self._request_serializer, 

249 self._response_deserializer, 

250 self._loop, 

251 ) 

252 else: 

253 call = InterceptedStreamUnaryCall( 

254 self._interceptors, 

255 request_iterator, 

256 deadline, 

257 metadata, 

258 credentials, 

259 wait_for_ready, 

260 self._channel, 

261 self._method, 

262 self._request_serializer, 

263 self._response_deserializer, 

264 self._loop, 

265 ) 

266 

267 return call 

268 

269 

270class StreamStreamMultiCallable( 

271 _BaseMultiCallable, _base_channel.StreamStreamMultiCallable 

272): 

273 def __call__( 

274 self, 

275 request_iterator: Optional[RequestIterableType] = None, 

276 timeout: Optional[float] = None, 

277 metadata: Optional[Metadata] = None, 

278 credentials: Optional[grpc.CallCredentials] = None, 

279 wait_for_ready: Optional[bool] = None, 

280 compression: Optional[grpc.Compression] = None, 

281 ) -> _base_call.StreamStreamCall: 

282 metadata = self._init_metadata(metadata, compression) 

283 deadline = _timeout_to_deadline(timeout) 

284 

285 if not self._interceptors: 

286 call = StreamStreamCall( 

287 request_iterator, 

288 deadline, 

289 metadata, 

290 credentials, 

291 wait_for_ready, 

292 self._channel, 

293 self._method, 

294 self._request_serializer, 

295 self._response_deserializer, 

296 self._loop, 

297 ) 

298 else: 

299 call = InterceptedStreamStreamCall( 

300 self._interceptors, 

301 request_iterator, 

302 deadline, 

303 metadata, 

304 credentials, 

305 wait_for_ready, 

306 self._channel, 

307 self._method, 

308 self._request_serializer, 

309 self._response_deserializer, 

310 self._loop, 

311 ) 

312 

313 return call 

314 

315 

316class Channel(_base_channel.Channel): 

317 _loop: asyncio.AbstractEventLoop 

318 _channel: cygrpc.AioChannel 

319 _unary_unary_interceptors: List[UnaryUnaryClientInterceptor] 

320 _unary_stream_interceptors: List[UnaryStreamClientInterceptor] 

321 _stream_unary_interceptors: List[StreamUnaryClientInterceptor] 

322 _stream_stream_interceptors: List[StreamStreamClientInterceptor] 

323 

324 def __init__( 

325 self, 

326 target: str, 

327 options: ChannelArgumentType, 

328 credentials: Optional[grpc.ChannelCredentials], 

329 compression: Optional[grpc.Compression], 

330 interceptors: Optional[Sequence[ClientInterceptor]], 

331 ): 

332 """Constructor. 

333 

334 Args: 

335 target: The target to which to connect. 

336 options: Configuration options for the channel. 

337 credentials: A cygrpc.ChannelCredentials or None. 

338 compression: An optional value indicating the compression method to be 

339 used over the lifetime of the channel. 

340 interceptors: An optional list of interceptors that would be used for 

341 intercepting any RPC executed with that channel. 

342 """ 

343 self._unary_unary_interceptors = [] 

344 self._unary_stream_interceptors = [] 

345 self._stream_unary_interceptors = [] 

346 self._stream_stream_interceptors = [] 

347 

348 if interceptors is not None: 

349 for interceptor in interceptors: 

350 if isinstance(interceptor, UnaryUnaryClientInterceptor): 

351 self._unary_unary_interceptors.append(interceptor) 

352 elif isinstance(interceptor, UnaryStreamClientInterceptor): 

353 self._unary_stream_interceptors.append(interceptor) 

354 elif isinstance(interceptor, StreamUnaryClientInterceptor): 

355 self._stream_unary_interceptors.append(interceptor) 

356 elif isinstance(interceptor, StreamStreamClientInterceptor): 

357 self._stream_stream_interceptors.append(interceptor) 

358 else: 

359 raise ValueError( 

360 "Interceptor {} must be ".format(interceptor) 

361 + "{} or ".format(UnaryUnaryClientInterceptor.__name__) 

362 + "{} or ".format(UnaryStreamClientInterceptor.__name__) 

363 + "{} or ".format(StreamUnaryClientInterceptor.__name__) 

364 + "{}. ".format(StreamStreamClientInterceptor.__name__) 

365 ) 

366 

367 self._loop = cygrpc.get_working_loop() 

368 self._channel = cygrpc.AioChannel( 

369 _common.encode(target), 

370 _augment_channel_arguments(options, compression), 

371 credentials, 

372 self._loop, 

373 ) 

374 

375 async def __aenter__(self): 

376 return self 

377 

378 async def __aexit__(self, exc_type, exc_val, exc_tb): 

379 await self._close(None) 

380 

381 async def _close(self, grace): # pylint: disable=too-many-branches 

382 if self._channel.closed(): 

383 return 

384 

385 # No new calls will be accepted by the Cython channel. 

386 self._channel.closing() 

387 

388 # Iterate through running tasks 

389 tasks = _all_tasks() 

390 calls = [] 

391 call_tasks = [] 

392 for task in tasks: 

393 try: 

394 stack = task.get_stack(limit=1) 

395 except AttributeError as attribute_error: 

396 # NOTE(lidiz) tl;dr: If the Task is created with a CPython 

397 # object, it will trigger AttributeError. 

398 # 

399 # In the global finalizer, the event loop schedules 

400 # a CPython PyAsyncGenAThrow object. 

401 # https://github.com/python/cpython/blob/00e45877e33d32bb61aa13a2033e3bba370bda4d/Lib/asyncio/base_events.py#L484 

402 # 

403 # However, the PyAsyncGenAThrow object is written in C and 

404 # failed to include the normal Python frame objects. Hence, 

405 # this exception is a false negative, and it is safe to ignore 

406 # the failure. It is fixed by https://github.com/python/cpython/pull/18669, 

407 # but not available until 3.9 or 3.8.3. So, we have to keep it 

408 # for a while. 

409 # TODO(lidiz) drop this hack after 3.8 deprecation 

410 if "frame" in str(attribute_error): 

411 continue 

412 else: 

413 raise 

414 

415 # If the Task is created by a C-extension, the stack will be empty. 

416 if not stack: 

417 continue 

418 

419 # Locate ones created by `aio.Call`. 

420 frame = stack[0] 

421 candidate = frame.f_locals.get("self") 

422 if candidate: 

423 if isinstance(candidate, _base_call.Call): 

424 if hasattr(candidate, "_channel"): 

425 # For intercepted Call object 

426 if candidate._channel is not self._channel: 

427 continue 

428 elif hasattr(candidate, "_cython_call"): 

429 # For normal Call object 

430 if candidate._cython_call._channel is not self._channel: 

431 continue 

432 else: 

433 # Unidentified Call object 

434 raise cygrpc.InternalError( 

435 f"Unrecognized call object: {candidate}" 

436 ) 

437 

438 calls.append(candidate) 

439 call_tasks.append(task) 

440 

441 # If needed, try to wait for them to finish. 

442 # Call objects are not always awaitables. 

443 if grace and call_tasks: 

444 await asyncio.wait(call_tasks, timeout=grace) 

445 

446 # Time to cancel existing calls. 

447 for call in calls: 

448 call.cancel() 

449 

450 # Destroy the channel 

451 self._channel.close() 

452 

453 async def close(self, grace: Optional[float] = None): 

454 await self._close(grace) 

455 

456 def __del__(self): 

457 if hasattr(self, "_channel"): 

458 if not self._channel.closed(): 

459 self._channel.close() 

460 

461 def get_state( 

462 self, try_to_connect: bool = False 

463 ) -> grpc.ChannelConnectivity: 

464 result = self._channel.check_connectivity_state(try_to_connect) 

465 return _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[result] 

466 

467 async def wait_for_state_change( 

468 self, 

469 last_observed_state: grpc.ChannelConnectivity, 

470 ) -> None: 

471 assert await self._channel.watch_connectivity_state( 

472 last_observed_state.value[0], None 

473 ) 

474 

475 async def channel_ready(self) -> None: 

476 state = self.get_state(try_to_connect=True) 

477 while state != grpc.ChannelConnectivity.READY: 

478 await self.wait_for_state_change(state) 

479 state = self.get_state(try_to_connect=True) 

480 

481 def unary_unary( 

482 self, 

483 method: str, 

484 request_serializer: Optional[SerializingFunction] = None, 

485 response_deserializer: Optional[DeserializingFunction] = None, 

486 ) -> UnaryUnaryMultiCallable: 

487 return UnaryUnaryMultiCallable( 

488 self._channel, 

489 _common.encode(method), 

490 request_serializer, 

491 response_deserializer, 

492 self._unary_unary_interceptors, 

493 [self], 

494 self._loop, 

495 ) 

496 

497 def unary_stream( 

498 self, 

499 method: str, 

500 request_serializer: Optional[SerializingFunction] = None, 

501 response_deserializer: Optional[DeserializingFunction] = None, 

502 ) -> UnaryStreamMultiCallable: 

503 return UnaryStreamMultiCallable( 

504 self._channel, 

505 _common.encode(method), 

506 request_serializer, 

507 response_deserializer, 

508 self._unary_stream_interceptors, 

509 [self], 

510 self._loop, 

511 ) 

512 

513 def stream_unary( 

514 self, 

515 method: str, 

516 request_serializer: Optional[SerializingFunction] = None, 

517 response_deserializer: Optional[DeserializingFunction] = None, 

518 ) -> StreamUnaryMultiCallable: 

519 return StreamUnaryMultiCallable( 

520 self._channel, 

521 _common.encode(method), 

522 request_serializer, 

523 response_deserializer, 

524 self._stream_unary_interceptors, 

525 [self], 

526 self._loop, 

527 ) 

528 

529 def stream_stream( 

530 self, 

531 method: str, 

532 request_serializer: Optional[SerializingFunction] = None, 

533 response_deserializer: Optional[DeserializingFunction] = None, 

534 ) -> StreamStreamMultiCallable: 

535 return StreamStreamMultiCallable( 

536 self._channel, 

537 _common.encode(method), 

538 request_serializer, 

539 response_deserializer, 

540 self._stream_stream_interceptors, 

541 [self], 

542 self._loop, 

543 ) 

544 

545 

546def insecure_channel( 

547 target: str, 

548 options: Optional[ChannelArgumentType] = None, 

549 compression: Optional[grpc.Compression] = None, 

550 interceptors: Optional[Sequence[ClientInterceptor]] = None, 

551): 

552 """Creates an insecure asynchronous Channel to a server. 

553 

554 Args: 

555 target: The server address 

556 options: An optional list of key-value pairs (:term:`channel_arguments` 

557 in gRPC Core runtime) to configure the channel. 

558 compression: An optional value indicating the compression method to be 

559 used over the lifetime of the channel. 

560 interceptors: An optional sequence of interceptors that will be executed for 

561 any call executed with this channel. 

562 

563 Returns: 

564 A Channel. 

565 """ 

566 return Channel( 

567 target, 

568 () if options is None else options, 

569 None, 

570 compression, 

571 interceptors, 

572 ) 

573 

574 

575def secure_channel( 

576 target: str, 

577 credentials: grpc.ChannelCredentials, 

578 options: Optional[ChannelArgumentType] = None, 

579 compression: Optional[grpc.Compression] = None, 

580 interceptors: Optional[Sequence[ClientInterceptor]] = None, 

581): 

582 """Creates a secure asynchronous Channel to a server. 

583 

584 Args: 

585 target: The server address. 

586 credentials: A ChannelCredentials instance. 

587 options: An optional list of key-value pairs (:term:`channel_arguments` 

588 in gRPC Core runtime) to configure the channel. 

589 compression: An optional value indicating the compression method to be 

590 used over the lifetime of the channel. 

591 interceptors: An optional sequence of interceptors that will be executed for 

592 any call executed with this channel. 

593 

594 Returns: 

595 An aio.Channel. 

596 """ 

597 return Channel( 

598 target, 

599 () if options is None else options, 

600 credentials._credentials, 

601 compression, 

602 interceptors, 

603 )