Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_interceptor.py: 38%

394 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:37 +0000

1# Copyright 2019 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Interceptors implementation of gRPC Asyncio Python.""" 

15from abc import ABCMeta 

16from abc import abstractmethod 

17import asyncio 

18import collections 

19import functools 

20from typing import ( 

21 AsyncIterable, 

22 Awaitable, 

23 Callable, 

24 Iterator, 

25 List, 

26 Optional, 

27 Sequence, 

28 Union, 

29) 

30 

31import grpc 

32from grpc._cython import cygrpc 

33 

34from . import _base_call 

35from ._call import AioRpcError 

36from ._call import StreamStreamCall 

37from ._call import StreamUnaryCall 

38from ._call import UnaryStreamCall 

39from ._call import UnaryUnaryCall 

40from ._call import _API_STYLE_ERROR 

41from ._call import _RPC_ALREADY_FINISHED_DETAILS 

42from ._call import _RPC_HALF_CLOSED_DETAILS 

43from ._metadata import Metadata 

44from ._typing import DeserializingFunction 

45from ._typing import DoneCallbackType 

46from ._typing import RequestIterableType 

47from ._typing import RequestType 

48from ._typing import ResponseIterableType 

49from ._typing import ResponseType 

50from ._typing import SerializingFunction 

51from ._utils import _timeout_to_deadline 

52 

53_LOCAL_CANCELLATION_DETAILS = "Locally cancelled by application!" 

54 

55 

56class ServerInterceptor(metaclass=ABCMeta): 

57 """Affords intercepting incoming RPCs on the service-side. 

58 

59 This is an EXPERIMENTAL API. 

60 """ 

61 

62 @abstractmethod 

63 async def intercept_service( 

64 self, 

65 continuation: Callable[ 

66 [grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler] 

67 ], 

68 handler_call_details: grpc.HandlerCallDetails, 

69 ) -> grpc.RpcMethodHandler: 

70 """Intercepts incoming RPCs before handing them over to a handler. 

71 

72 Args: 

73 continuation: A function that takes a HandlerCallDetails and 

74 proceeds to invoke the next interceptor in the chain, if any, 

75 or the RPC handler lookup logic, with the call details passed 

76 as an argument, and returns an RpcMethodHandler instance if 

77 the RPC is considered serviced, or None otherwise. 

78 handler_call_details: A HandlerCallDetails describing the RPC. 

79 

80 Returns: 

81 An RpcMethodHandler with which the RPC may be serviced if the 

82 interceptor chooses to service this RPC, or None otherwise. 

83 """ 

84 

85 

86class ClientCallDetails( 

87 collections.namedtuple( 

88 "ClientCallDetails", 

89 ("method", "timeout", "metadata", "credentials", "wait_for_ready"), 

90 ), 

91 grpc.ClientCallDetails, 

92): 

93 """Describes an RPC to be invoked. 

94 

95 This is an EXPERIMENTAL API. 

96 

97 Args: 

98 method: The method name of the RPC. 

99 timeout: An optional duration of time in seconds to allow for the RPC. 

100 metadata: Optional metadata to be transmitted to the service-side of 

101 the RPC. 

102 credentials: An optional CallCredentials for the RPC. 

103 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 

104 """ 

105 

106 method: str 

107 timeout: Optional[float] 

108 metadata: Optional[Metadata] 

109 credentials: Optional[grpc.CallCredentials] 

110 wait_for_ready: Optional[bool] 

111 

112 

113class ClientInterceptor(metaclass=ABCMeta): 

114 """Base class used for all Aio Client Interceptor classes""" 

115 

116 

117class UnaryUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

118 """Affords intercepting unary-unary invocations.""" 

119 

120 @abstractmethod 

121 async def intercept_unary_unary( 

122 self, 

123 continuation: Callable[ 

124 [ClientCallDetails, RequestType], UnaryUnaryCall 

125 ], 

126 client_call_details: ClientCallDetails, 

127 request: RequestType, 

128 ) -> Union[UnaryUnaryCall, ResponseType]: 

129 """Intercepts a unary-unary invocation asynchronously. 

130 

131 Args: 

132 continuation: A coroutine that proceeds with the invocation by 

133 executing the next interceptor in the chain or invoking the 

134 actual RPC on the underlying Channel. It is the interceptor's 

135 responsibility to call it if it decides to move the RPC forward. 

136 The interceptor can use 

137 `call = await continuation(client_call_details, request)` 

138 to continue with the RPC. `continuation` returns the call to the 

139 RPC. 

140 client_call_details: A ClientCallDetails object describing the 

141 outgoing RPC. 

142 request: The request value for the RPC. 

143 

144 Returns: 

145 An object with the RPC response. 

146 

147 Raises: 

148 AioRpcError: Indicating that the RPC terminated with non-OK status. 

149 asyncio.CancelledError: Indicating that the RPC was canceled. 

150 """ 

151 

152 

153class UnaryStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

154 """Affords intercepting unary-stream invocations.""" 

155 

156 @abstractmethod 

157 async def intercept_unary_stream( 

158 self, 

159 continuation: Callable[ 

160 [ClientCallDetails, RequestType], UnaryStreamCall 

161 ], 

162 client_call_details: ClientCallDetails, 

163 request: RequestType, 

164 ) -> Union[ResponseIterableType, UnaryStreamCall]: 

165 """Intercepts a unary-stream invocation asynchronously. 

166 

167 The function could return the call object or an asynchronous 

168 iterator, in case of being an asyncrhonous iterator this will 

169 become the source of the reads done by the caller. 

170 

171 Args: 

172 continuation: A coroutine that proceeds with the invocation by 

173 executing the next interceptor in the chain or invoking the 

174 actual RPC on the underlying Channel. It is the interceptor's 

175 responsibility to call it if it decides to move the RPC forward. 

176 The interceptor can use 

177 `call = await continuation(client_call_details, request)` 

178 to continue with the RPC. `continuation` returns the call to the 

179 RPC. 

180 client_call_details: A ClientCallDetails object describing the 

181 outgoing RPC. 

182 request: The request value for the RPC. 

183 

184 Returns: 

185 The RPC Call or an asynchronous iterator. 

186 

187 Raises: 

188 AioRpcError: Indicating that the RPC terminated with non-OK status. 

189 asyncio.CancelledError: Indicating that the RPC was canceled. 

190 """ 

191 

192 

193class StreamUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

194 """Affords intercepting stream-unary invocations.""" 

195 

196 @abstractmethod 

197 async def intercept_stream_unary( 

198 self, 

199 continuation: Callable[ 

200 [ClientCallDetails, RequestType], StreamUnaryCall 

201 ], 

202 client_call_details: ClientCallDetails, 

203 request_iterator: RequestIterableType, 

204 ) -> StreamUnaryCall: 

205 """Intercepts a stream-unary invocation asynchronously. 

206 

207 Within the interceptor the usage of the call methods like `write` or 

208 even awaiting the call should be done carefully, since the caller 

209 could be expecting an untouched call, for example for start writing 

210 messages to it. 

211 

212 Args: 

213 continuation: A coroutine that proceeds with the invocation by 

214 executing the next interceptor in the chain or invoking the 

215 actual RPC on the underlying Channel. It is the interceptor's 

216 responsibility to call it if it decides to move the RPC forward. 

217 The interceptor can use 

218 `call = await continuation(client_call_details, request_iterator)` 

219 to continue with the RPC. `continuation` returns the call to the 

220 RPC. 

221 client_call_details: A ClientCallDetails object describing the 

222 outgoing RPC. 

223 request_iterator: The request iterator that will produce requests 

224 for the RPC. 

225 

226 Returns: 

227 The RPC Call. 

228 

229 Raises: 

230 AioRpcError: Indicating that the RPC terminated with non-OK status. 

231 asyncio.CancelledError: Indicating that the RPC was canceled. 

232 """ 

233 

234 

235class StreamStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

236 """Affords intercepting stream-stream invocations.""" 

237 

238 @abstractmethod 

239 async def intercept_stream_stream( 

240 self, 

241 continuation: Callable[ 

242 [ClientCallDetails, RequestType], StreamStreamCall 

243 ], 

244 client_call_details: ClientCallDetails, 

245 request_iterator: RequestIterableType, 

246 ) -> Union[ResponseIterableType, StreamStreamCall]: 

247 """Intercepts a stream-stream invocation asynchronously. 

248 

249 Within the interceptor the usage of the call methods like `write` or 

250 even awaiting the call should be done carefully, since the caller 

251 could be expecting an untouched call, for example for start writing 

252 messages to it. 

253 

254 The function could return the call object or an asynchronous 

255 iterator, in case of being an asyncrhonous iterator this will 

256 become the source of the reads done by the caller. 

257 

258 Args: 

259 continuation: A coroutine that proceeds with the invocation by 

260 executing the next interceptor in the chain or invoking the 

261 actual RPC on the underlying Channel. It is the interceptor's 

262 responsibility to call it if it decides to move the RPC forward. 

263 The interceptor can use 

264 `call = await continuation(client_call_details, request_iterator)` 

265 to continue with the RPC. `continuation` returns the call to the 

266 RPC. 

267 client_call_details: A ClientCallDetails object describing the 

268 outgoing RPC. 

269 request_iterator: The request iterator that will produce requests 

270 for the RPC. 

271 

272 Returns: 

273 The RPC Call or an asynchronous iterator. 

274 

275 Raises: 

276 AioRpcError: Indicating that the RPC terminated with non-OK status. 

277 asyncio.CancelledError: Indicating that the RPC was canceled. 

278 """ 

279 

280 

281class InterceptedCall: 

282 """Base implementation for all intercepted call arities. 

283 

284 Interceptors might have some work to do before the RPC invocation with 

285 the capacity of changing the invocation parameters, and some work to do 

286 after the RPC invocation with the capacity for accessing to the wrapped 

287 `UnaryUnaryCall`. 

288 

289 It handles also early and later cancellations, when the RPC has not even 

290 started and the execution is still held by the interceptors or when the 

291 RPC has finished but again the execution is still held by the interceptors. 

292 

293 Once the RPC is finally executed, all methods are finally done against the 

294 intercepted call, being at the same time the same call returned to the 

295 interceptors. 

296 

297 As a base class for all of the interceptors implements the logic around 

298 final status, metadata and cancellation. 

299 """ 

300 

301 _interceptors_task: asyncio.Task 

302 _pending_add_done_callbacks: Sequence[DoneCallbackType] 

303 

304 def __init__(self, interceptors_task: asyncio.Task) -> None: 

305 self._interceptors_task = interceptors_task 

306 self._pending_add_done_callbacks = [] 

307 self._interceptors_task.add_done_callback( 

308 self._fire_or_add_pending_done_callbacks 

309 ) 

310 

311 def __del__(self): 

312 self.cancel() 

313 

314 def _fire_or_add_pending_done_callbacks( 

315 self, interceptors_task: asyncio.Task 

316 ) -> None: 

317 if not self._pending_add_done_callbacks: 

318 return 

319 

320 call_completed = False 

321 

322 try: 

323 call = interceptors_task.result() 

324 if call.done(): 

325 call_completed = True 

326 except (AioRpcError, asyncio.CancelledError): 

327 call_completed = True 

328 

329 if call_completed: 

330 for callback in self._pending_add_done_callbacks: 

331 callback(self) 

332 else: 

333 for callback in self._pending_add_done_callbacks: 

334 callback = functools.partial( 

335 self._wrap_add_done_callback, callback 

336 ) 

337 call.add_done_callback(callback) 

338 

339 self._pending_add_done_callbacks = [] 

340 

341 def _wrap_add_done_callback( 

342 self, callback: DoneCallbackType, unused_call: _base_call.Call 

343 ) -> None: 

344 callback(self) 

345 

346 def cancel(self) -> bool: 

347 if not self._interceptors_task.done(): 

348 # There is no yet the intercepted call available, 

349 # Trying to cancel it by using the generic Asyncio 

350 # cancellation method. 

351 return self._interceptors_task.cancel() 

352 

353 try: 

354 call = self._interceptors_task.result() 

355 except AioRpcError: 

356 return False 

357 except asyncio.CancelledError: 

358 return False 

359 

360 return call.cancel() 

361 

362 def cancelled(self) -> bool: 

363 if not self._interceptors_task.done(): 

364 return False 

365 

366 try: 

367 call = self._interceptors_task.result() 

368 except AioRpcError as err: 

369 return err.code() == grpc.StatusCode.CANCELLED 

370 except asyncio.CancelledError: 

371 return True 

372 

373 return call.cancelled() 

374 

375 def done(self) -> bool: 

376 if not self._interceptors_task.done(): 

377 return False 

378 

379 try: 

380 call = self._interceptors_task.result() 

381 except (AioRpcError, asyncio.CancelledError): 

382 return True 

383 

384 return call.done() 

385 

386 def add_done_callback(self, callback: DoneCallbackType) -> None: 

387 if not self._interceptors_task.done(): 

388 self._pending_add_done_callbacks.append(callback) 

389 return 

390 

391 try: 

392 call = self._interceptors_task.result() 

393 except (AioRpcError, asyncio.CancelledError): 

394 callback(self) 

395 return 

396 

397 if call.done(): 

398 callback(self) 

399 else: 

400 callback = functools.partial(self._wrap_add_done_callback, callback) 

401 call.add_done_callback(callback) 

402 

403 def time_remaining(self) -> Optional[float]: 

404 raise NotImplementedError() 

405 

406 async def initial_metadata(self) -> Optional[Metadata]: 

407 try: 

408 call = await self._interceptors_task 

409 except AioRpcError as err: 

410 return err.initial_metadata() 

411 except asyncio.CancelledError: 

412 return None 

413 

414 return await call.initial_metadata() 

415 

416 async def trailing_metadata(self) -> Optional[Metadata]: 

417 try: 

418 call = await self._interceptors_task 

419 except AioRpcError as err: 

420 return err.trailing_metadata() 

421 except asyncio.CancelledError: 

422 return None 

423 

424 return await call.trailing_metadata() 

425 

426 async def code(self) -> grpc.StatusCode: 

427 try: 

428 call = await self._interceptors_task 

429 except AioRpcError as err: 

430 return err.code() 

431 except asyncio.CancelledError: 

432 return grpc.StatusCode.CANCELLED 

433 

434 return await call.code() 

435 

436 async def details(self) -> str: 

437 try: 

438 call = await self._interceptors_task 

439 except AioRpcError as err: 

440 return err.details() 

441 except asyncio.CancelledError: 

442 return _LOCAL_CANCELLATION_DETAILS 

443 

444 return await call.details() 

445 

446 async def debug_error_string(self) -> Optional[str]: 

447 try: 

448 call = await self._interceptors_task 

449 except AioRpcError as err: 

450 return err.debug_error_string() 

451 except asyncio.CancelledError: 

452 return "" 

453 

454 return await call.debug_error_string() 

455 

456 async def wait_for_connection(self) -> None: 

457 call = await self._interceptors_task 

458 return await call.wait_for_connection() 

459 

460 

461class _InterceptedUnaryResponseMixin: 

462 def __await__(self): 

463 call = yield from self._interceptors_task.__await__() 

464 response = yield from call.__await__() 

465 return response 

466 

467 

468class _InterceptedStreamResponseMixin: 

469 _response_aiter: Optional[AsyncIterable[ResponseType]] 

470 

471 def _init_stream_response_mixin(self) -> None: 

472 # Is initalized later, otherwise if the iterator is not finnally 

473 # consumed a logging warning is emmited by Asyncio. 

474 self._response_aiter = None 

475 

476 async def _wait_for_interceptor_task_response_iterator( 

477 self, 

478 ) -> ResponseType: 

479 call = await self._interceptors_task 

480 async for response in call: 

481 yield response 

482 

483 def __aiter__(self) -> AsyncIterable[ResponseType]: 

484 if self._response_aiter is None: 

485 self._response_aiter = ( 

486 self._wait_for_interceptor_task_response_iterator() 

487 ) 

488 return self._response_aiter 

489 

490 async def read(self) -> ResponseType: 

491 if self._response_aiter is None: 

492 self._response_aiter = ( 

493 self._wait_for_interceptor_task_response_iterator() 

494 ) 

495 return await self._response_aiter.asend(None) 

496 

497 

498class _InterceptedStreamRequestMixin: 

499 _write_to_iterator_async_gen: Optional[AsyncIterable[RequestType]] 

500 _write_to_iterator_queue: Optional[asyncio.Queue] 

501 _status_code_task: Optional[asyncio.Task] 

502 

503 _FINISH_ITERATOR_SENTINEL = object() 

504 

505 def _init_stream_request_mixin( 

506 self, request_iterator: Optional[RequestIterableType] 

507 ) -> RequestIterableType: 

508 if request_iterator is None: 

509 # We provide our own request iterator which is a proxy 

510 # of the futures writes that will be done by the caller. 

511 self._write_to_iterator_queue = asyncio.Queue(maxsize=1) 

512 self._write_to_iterator_async_gen = ( 

513 self._proxy_writes_as_request_iterator() 

514 ) 

515 self._status_code_task = None 

516 request_iterator = self._write_to_iterator_async_gen 

517 else: 

518 self._write_to_iterator_queue = None 

519 

520 return request_iterator 

521 

522 async def _proxy_writes_as_request_iterator(self): 

523 await self._interceptors_task 

524 

525 while True: 

526 value = await self._write_to_iterator_queue.get() 

527 if ( 

528 value 

529 is _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL 

530 ): 

531 break 

532 yield value 

533 

534 async def _write_to_iterator_queue_interruptible( 

535 self, request: RequestType, call: InterceptedCall 

536 ): 

537 # Write the specified 'request' to the request iterator queue using the 

538 # specified 'call' to allow for interruption of the write in the case 

539 # of abrupt termination of the call. 

540 if self._status_code_task is None: 

541 self._status_code_task = self._loop.create_task(call.code()) 

542 

543 await asyncio.wait( 

544 ( 

545 self._loop.create_task( 

546 self._write_to_iterator_queue.put(request) 

547 ), 

548 self._status_code_task, 

549 ), 

550 return_when=asyncio.FIRST_COMPLETED, 

551 ) 

552 

553 async def write(self, request: RequestType) -> None: 

554 # If no queue was created it means that requests 

555 # should be expected through an iterators provided 

556 # by the caller. 

557 if self._write_to_iterator_queue is None: 

558 raise cygrpc.UsageError(_API_STYLE_ERROR) 

559 

560 try: 

561 call = await self._interceptors_task 

562 except (asyncio.CancelledError, AioRpcError): 

563 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

564 

565 if call.done(): 

566 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

567 elif call._done_writing_flag: 

568 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS) 

569 

570 await self._write_to_iterator_queue_interruptible(request, call) 

571 

572 if call.done(): 

573 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

574 

575 async def done_writing(self) -> None: 

576 """Signal peer that client is done writing. 

577 

578 This method is idempotent. 

579 """ 

580 # If no queue was created it means that requests 

581 # should be expected through an iterators provided 

582 # by the caller. 

583 if self._write_to_iterator_queue is None: 

584 raise cygrpc.UsageError(_API_STYLE_ERROR) 

585 

586 try: 

587 call = await self._interceptors_task 

588 except asyncio.CancelledError: 

589 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

590 

591 await self._write_to_iterator_queue_interruptible( 

592 _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL, call 

593 ) 

594 

595 

596class InterceptedUnaryUnaryCall( 

597 _InterceptedUnaryResponseMixin, InterceptedCall, _base_call.UnaryUnaryCall 

598): 

599 """Used for running a `UnaryUnaryCall` wrapped by interceptors. 

600 

601 For the `__await__` method is it is proxied to the intercepted call only when 

602 the interceptor task is finished. 

603 """ 

604 

605 _loop: asyncio.AbstractEventLoop 

606 _channel: cygrpc.AioChannel 

607 

608 # pylint: disable=too-many-arguments 

609 def __init__( 

610 self, 

611 interceptors: Sequence[UnaryUnaryClientInterceptor], 

612 request: RequestType, 

613 timeout: Optional[float], 

614 metadata: Metadata, 

615 credentials: Optional[grpc.CallCredentials], 

616 wait_for_ready: Optional[bool], 

617 channel: cygrpc.AioChannel, 

618 method: bytes, 

619 request_serializer: SerializingFunction, 

620 response_deserializer: DeserializingFunction, 

621 loop: asyncio.AbstractEventLoop, 

622 ) -> None: 

623 self._loop = loop 

624 self._channel = channel 

625 interceptors_task = loop.create_task( 

626 self._invoke( 

627 interceptors, 

628 method, 

629 timeout, 

630 metadata, 

631 credentials, 

632 wait_for_ready, 

633 request, 

634 request_serializer, 

635 response_deserializer, 

636 ) 

637 ) 

638 super().__init__(interceptors_task) 

639 

640 # pylint: disable=too-many-arguments 

641 async def _invoke( 

642 self, 

643 interceptors: Sequence[UnaryUnaryClientInterceptor], 

644 method: bytes, 

645 timeout: Optional[float], 

646 metadata: Optional[Metadata], 

647 credentials: Optional[grpc.CallCredentials], 

648 wait_for_ready: Optional[bool], 

649 request: RequestType, 

650 request_serializer: SerializingFunction, 

651 response_deserializer: DeserializingFunction, 

652 ) -> UnaryUnaryCall: 

653 """Run the RPC call wrapped in interceptors""" 

654 

655 async def _run_interceptor( 

656 interceptors: List[UnaryUnaryClientInterceptor], 

657 client_call_details: ClientCallDetails, 

658 request: RequestType, 

659 ) -> _base_call.UnaryUnaryCall: 

660 if interceptors: 

661 continuation = functools.partial( 

662 _run_interceptor, interceptors[1:] 

663 ) 

664 call_or_response = await interceptors[0].intercept_unary_unary( 

665 continuation, client_call_details, request 

666 ) 

667 

668 if isinstance(call_or_response, _base_call.UnaryUnaryCall): 

669 return call_or_response 

670 else: 

671 return UnaryUnaryCallResponse(call_or_response) 

672 

673 else: 

674 return UnaryUnaryCall( 

675 request, 

676 _timeout_to_deadline(client_call_details.timeout), 

677 client_call_details.metadata, 

678 client_call_details.credentials, 

679 client_call_details.wait_for_ready, 

680 self._channel, 

681 client_call_details.method, 

682 request_serializer, 

683 response_deserializer, 

684 self._loop, 

685 ) 

686 

687 client_call_details = ClientCallDetails( 

688 method, timeout, metadata, credentials, wait_for_ready 

689 ) 

690 return await _run_interceptor( 

691 list(interceptors), client_call_details, request 

692 ) 

693 

694 def time_remaining(self) -> Optional[float]: 

695 raise NotImplementedError() 

696 

697 

698class InterceptedUnaryStreamCall( 

699 _InterceptedStreamResponseMixin, InterceptedCall, _base_call.UnaryStreamCall 

700): 

701 """Used for running a `UnaryStreamCall` wrapped by interceptors.""" 

702 

703 _loop: asyncio.AbstractEventLoop 

704 _channel: cygrpc.AioChannel 

705 _last_returned_call_from_interceptors = Optional[_base_call.UnaryStreamCall] 

706 

707 # pylint: disable=too-many-arguments 

708 def __init__( 

709 self, 

710 interceptors: Sequence[UnaryStreamClientInterceptor], 

711 request: RequestType, 

712 timeout: Optional[float], 

713 metadata: Metadata, 

714 credentials: Optional[grpc.CallCredentials], 

715 wait_for_ready: Optional[bool], 

716 channel: cygrpc.AioChannel, 

717 method: bytes, 

718 request_serializer: SerializingFunction, 

719 response_deserializer: DeserializingFunction, 

720 loop: asyncio.AbstractEventLoop, 

721 ) -> None: 

722 self._loop = loop 

723 self._channel = channel 

724 self._init_stream_response_mixin() 

725 self._last_returned_call_from_interceptors = None 

726 interceptors_task = loop.create_task( 

727 self._invoke( 

728 interceptors, 

729 method, 

730 timeout, 

731 metadata, 

732 credentials, 

733 wait_for_ready, 

734 request, 

735 request_serializer, 

736 response_deserializer, 

737 ) 

738 ) 

739 super().__init__(interceptors_task) 

740 

741 # pylint: disable=too-many-arguments 

742 async def _invoke( 

743 self, 

744 interceptors: Sequence[UnaryStreamClientInterceptor], 

745 method: bytes, 

746 timeout: Optional[float], 

747 metadata: Optional[Metadata], 

748 credentials: Optional[grpc.CallCredentials], 

749 wait_for_ready: Optional[bool], 

750 request: RequestType, 

751 request_serializer: SerializingFunction, 

752 response_deserializer: DeserializingFunction, 

753 ) -> UnaryStreamCall: 

754 """Run the RPC call wrapped in interceptors""" 

755 

756 async def _run_interceptor( 

757 interceptors: List[UnaryStreamClientInterceptor], 

758 client_call_details: ClientCallDetails, 

759 request: RequestType, 

760 ) -> _base_call.UnaryStreamCall: 

761 if interceptors: 

762 continuation = functools.partial( 

763 _run_interceptor, interceptors[1:] 

764 ) 

765 

766 call_or_response_iterator = await interceptors[ 

767 0 

768 ].intercept_unary_stream( 

769 continuation, client_call_details, request 

770 ) 

771 

772 if isinstance( 

773 call_or_response_iterator, _base_call.UnaryStreamCall 

774 ): 

775 self._last_returned_call_from_interceptors = ( 

776 call_or_response_iterator 

777 ) 

778 else: 

779 self._last_returned_call_from_interceptors = ( 

780 UnaryStreamCallResponseIterator( 

781 self._last_returned_call_from_interceptors, 

782 call_or_response_iterator, 

783 ) 

784 ) 

785 return self._last_returned_call_from_interceptors 

786 else: 

787 self._last_returned_call_from_interceptors = UnaryStreamCall( 

788 request, 

789 _timeout_to_deadline(client_call_details.timeout), 

790 client_call_details.metadata, 

791 client_call_details.credentials, 

792 client_call_details.wait_for_ready, 

793 self._channel, 

794 client_call_details.method, 

795 request_serializer, 

796 response_deserializer, 

797 self._loop, 

798 ) 

799 

800 return self._last_returned_call_from_interceptors 

801 

802 client_call_details = ClientCallDetails( 

803 method, timeout, metadata, credentials, wait_for_ready 

804 ) 

805 return await _run_interceptor( 

806 list(interceptors), client_call_details, request 

807 ) 

808 

809 def time_remaining(self) -> Optional[float]: 

810 raise NotImplementedError() 

811 

812 

813class InterceptedStreamUnaryCall( 

814 _InterceptedUnaryResponseMixin, 

815 _InterceptedStreamRequestMixin, 

816 InterceptedCall, 

817 _base_call.StreamUnaryCall, 

818): 

819 """Used for running a `StreamUnaryCall` wrapped by interceptors. 

820 

821 For the `__await__` method is it is proxied to the intercepted call only when 

822 the interceptor task is finished. 

823 """ 

824 

825 _loop: asyncio.AbstractEventLoop 

826 _channel: cygrpc.AioChannel 

827 

828 # pylint: disable=too-many-arguments 

829 def __init__( 

830 self, 

831 interceptors: Sequence[StreamUnaryClientInterceptor], 

832 request_iterator: Optional[RequestIterableType], 

833 timeout: Optional[float], 

834 metadata: Metadata, 

835 credentials: Optional[grpc.CallCredentials], 

836 wait_for_ready: Optional[bool], 

837 channel: cygrpc.AioChannel, 

838 method: bytes, 

839 request_serializer: SerializingFunction, 

840 response_deserializer: DeserializingFunction, 

841 loop: asyncio.AbstractEventLoop, 

842 ) -> None: 

843 self._loop = loop 

844 self._channel = channel 

845 request_iterator = self._init_stream_request_mixin(request_iterator) 

846 interceptors_task = loop.create_task( 

847 self._invoke( 

848 interceptors, 

849 method, 

850 timeout, 

851 metadata, 

852 credentials, 

853 wait_for_ready, 

854 request_iterator, 

855 request_serializer, 

856 response_deserializer, 

857 ) 

858 ) 

859 super().__init__(interceptors_task) 

860 

861 # pylint: disable=too-many-arguments 

862 async def _invoke( 

863 self, 

864 interceptors: Sequence[StreamUnaryClientInterceptor], 

865 method: bytes, 

866 timeout: Optional[float], 

867 metadata: Optional[Metadata], 

868 credentials: Optional[grpc.CallCredentials], 

869 wait_for_ready: Optional[bool], 

870 request_iterator: RequestIterableType, 

871 request_serializer: SerializingFunction, 

872 response_deserializer: DeserializingFunction, 

873 ) -> StreamUnaryCall: 

874 """Run the RPC call wrapped in interceptors""" 

875 

876 async def _run_interceptor( 

877 interceptors: Iterator[StreamUnaryClientInterceptor], 

878 client_call_details: ClientCallDetails, 

879 request_iterator: RequestIterableType, 

880 ) -> _base_call.StreamUnaryCall: 

881 if interceptors: 

882 continuation = functools.partial( 

883 _run_interceptor, interceptors[1:] 

884 ) 

885 

886 return await interceptors[0].intercept_stream_unary( 

887 continuation, client_call_details, request_iterator 

888 ) 

889 else: 

890 return StreamUnaryCall( 

891 request_iterator, 

892 _timeout_to_deadline(client_call_details.timeout), 

893 client_call_details.metadata, 

894 client_call_details.credentials, 

895 client_call_details.wait_for_ready, 

896 self._channel, 

897 client_call_details.method, 

898 request_serializer, 

899 response_deserializer, 

900 self._loop, 

901 ) 

902 

903 client_call_details = ClientCallDetails( 

904 method, timeout, metadata, credentials, wait_for_ready 

905 ) 

906 return await _run_interceptor( 

907 list(interceptors), client_call_details, request_iterator 

908 ) 

909 

910 def time_remaining(self) -> Optional[float]: 

911 raise NotImplementedError() 

912 

913 

914class InterceptedStreamStreamCall( 

915 _InterceptedStreamResponseMixin, 

916 _InterceptedStreamRequestMixin, 

917 InterceptedCall, 

918 _base_call.StreamStreamCall, 

919): 

920 """Used for running a `StreamStreamCall` wrapped by interceptors.""" 

921 

922 _loop: asyncio.AbstractEventLoop 

923 _channel: cygrpc.AioChannel 

924 _last_returned_call_from_interceptors = Optional[ 

925 _base_call.StreamStreamCall 

926 ] 

927 

928 # pylint: disable=too-many-arguments 

929 def __init__( 

930 self, 

931 interceptors: Sequence[StreamStreamClientInterceptor], 

932 request_iterator: Optional[RequestIterableType], 

933 timeout: Optional[float], 

934 metadata: Metadata, 

935 credentials: Optional[grpc.CallCredentials], 

936 wait_for_ready: Optional[bool], 

937 channel: cygrpc.AioChannel, 

938 method: bytes, 

939 request_serializer: SerializingFunction, 

940 response_deserializer: DeserializingFunction, 

941 loop: asyncio.AbstractEventLoop, 

942 ) -> None: 

943 self._loop = loop 

944 self._channel = channel 

945 self._init_stream_response_mixin() 

946 request_iterator = self._init_stream_request_mixin(request_iterator) 

947 self._last_returned_call_from_interceptors = None 

948 interceptors_task = loop.create_task( 

949 self._invoke( 

950 interceptors, 

951 method, 

952 timeout, 

953 metadata, 

954 credentials, 

955 wait_for_ready, 

956 request_iterator, 

957 request_serializer, 

958 response_deserializer, 

959 ) 

960 ) 

961 super().__init__(interceptors_task) 

962 

963 # pylint: disable=too-many-arguments 

964 async def _invoke( 

965 self, 

966 interceptors: Sequence[StreamStreamClientInterceptor], 

967 method: bytes, 

968 timeout: Optional[float], 

969 metadata: Optional[Metadata], 

970 credentials: Optional[grpc.CallCredentials], 

971 wait_for_ready: Optional[bool], 

972 request_iterator: RequestIterableType, 

973 request_serializer: SerializingFunction, 

974 response_deserializer: DeserializingFunction, 

975 ) -> StreamStreamCall: 

976 """Run the RPC call wrapped in interceptors""" 

977 

978 async def _run_interceptor( 

979 interceptors: List[StreamStreamClientInterceptor], 

980 client_call_details: ClientCallDetails, 

981 request_iterator: RequestIterableType, 

982 ) -> _base_call.StreamStreamCall: 

983 if interceptors: 

984 continuation = functools.partial( 

985 _run_interceptor, interceptors[1:] 

986 ) 

987 

988 call_or_response_iterator = await interceptors[ 

989 0 

990 ].intercept_stream_stream( 

991 continuation, client_call_details, request_iterator 

992 ) 

993 

994 if isinstance( 

995 call_or_response_iterator, _base_call.StreamStreamCall 

996 ): 

997 self._last_returned_call_from_interceptors = ( 

998 call_or_response_iterator 

999 ) 

1000 else: 

1001 self._last_returned_call_from_interceptors = ( 

1002 StreamStreamCallResponseIterator( 

1003 self._last_returned_call_from_interceptors, 

1004 call_or_response_iterator, 

1005 ) 

1006 ) 

1007 return self._last_returned_call_from_interceptors 

1008 else: 

1009 self._last_returned_call_from_interceptors = StreamStreamCall( 

1010 request_iterator, 

1011 _timeout_to_deadline(client_call_details.timeout), 

1012 client_call_details.metadata, 

1013 client_call_details.credentials, 

1014 client_call_details.wait_for_ready, 

1015 self._channel, 

1016 client_call_details.method, 

1017 request_serializer, 

1018 response_deserializer, 

1019 self._loop, 

1020 ) 

1021 return self._last_returned_call_from_interceptors 

1022 

1023 client_call_details = ClientCallDetails( 

1024 method, timeout, metadata, credentials, wait_for_ready 

1025 ) 

1026 return await _run_interceptor( 

1027 list(interceptors), client_call_details, request_iterator 

1028 ) 

1029 

1030 def time_remaining(self) -> Optional[float]: 

1031 raise NotImplementedError() 

1032 

1033 

1034class UnaryUnaryCallResponse(_base_call.UnaryUnaryCall): 

1035 """Final UnaryUnaryCall class finished with a response.""" 

1036 

1037 _response: ResponseType 

1038 

1039 def __init__(self, response: ResponseType) -> None: 

1040 self._response = response 

1041 

1042 def cancel(self) -> bool: 

1043 return False 

1044 

1045 def cancelled(self) -> bool: 

1046 return False 

1047 

1048 def done(self) -> bool: 

1049 return True 

1050 

1051 def add_done_callback(self, unused_callback) -> None: 

1052 raise NotImplementedError() 

1053 

1054 def time_remaining(self) -> Optional[float]: 

1055 raise NotImplementedError() 

1056 

1057 async def initial_metadata(self) -> Optional[Metadata]: 

1058 return None 

1059 

1060 async def trailing_metadata(self) -> Optional[Metadata]: 

1061 return None 

1062 

1063 async def code(self) -> grpc.StatusCode: 

1064 return grpc.StatusCode.OK 

1065 

1066 async def details(self) -> str: 

1067 return "" 

1068 

1069 async def debug_error_string(self) -> Optional[str]: 

1070 return None 

1071 

1072 def __await__(self): 

1073 if False: # pylint: disable=using-constant-test 

1074 # This code path is never used, but a yield statement is needed 

1075 # for telling the interpreter that __await__ is a generator. 

1076 yield None 

1077 return self._response 

1078 

1079 async def wait_for_connection(self) -> None: 

1080 pass 

1081 

1082 

1083class _StreamCallResponseIterator: 

1084 _call: Union[_base_call.UnaryStreamCall, _base_call.StreamStreamCall] 

1085 _response_iterator: AsyncIterable[ResponseType] 

1086 

1087 def __init__( 

1088 self, 

1089 call: Union[_base_call.UnaryStreamCall, _base_call.StreamStreamCall], 

1090 response_iterator: AsyncIterable[ResponseType], 

1091 ) -> None: 

1092 self._response_iterator = response_iterator 

1093 self._call = call 

1094 

1095 def cancel(self) -> bool: 

1096 return self._call.cancel() 

1097 

1098 def cancelled(self) -> bool: 

1099 return self._call.cancelled() 

1100 

1101 def done(self) -> bool: 

1102 return self._call.done() 

1103 

1104 def add_done_callback(self, callback) -> None: 

1105 self._call.add_done_callback(callback) 

1106 

1107 def time_remaining(self) -> Optional[float]: 

1108 return self._call.time_remaining() 

1109 

1110 async def initial_metadata(self) -> Optional[Metadata]: 

1111 return await self._call.initial_metadata() 

1112 

1113 async def trailing_metadata(self) -> Optional[Metadata]: 

1114 return await self._call.trailing_metadata() 

1115 

1116 async def code(self) -> grpc.StatusCode: 

1117 return await self._call.code() 

1118 

1119 async def details(self) -> str: 

1120 return await self._call.details() 

1121 

1122 async def debug_error_string(self) -> Optional[str]: 

1123 return await self._call.debug_error_string() 

1124 

1125 def __aiter__(self): 

1126 return self._response_iterator.__aiter__() 

1127 

1128 async def wait_for_connection(self) -> None: 

1129 return await self._call.wait_for_connection() 

1130 

1131 

1132class UnaryStreamCallResponseIterator( 

1133 _StreamCallResponseIterator, _base_call.UnaryStreamCall 

1134): 

1135 """UnaryStreamCall class wich uses an alternative response iterator.""" 

1136 

1137 async def read(self) -> ResponseType: 

1138 # Behind the scenes everyting goes through the 

1139 # async iterator. So this path should not be reached. 

1140 raise NotImplementedError() 

1141 

1142 

1143class StreamStreamCallResponseIterator( 

1144 _StreamCallResponseIterator, _base_call.StreamStreamCall 

1145): 

1146 """StreamStreamCall class wich uses an alternative response iterator.""" 

1147 

1148 async def read(self) -> ResponseType: 

1149 # Behind the scenes everyting goes through the 

1150 # async iterator. So this path should not be reached. 

1151 raise NotImplementedError() 

1152 

1153 async def write(self, request: RequestType) -> None: 

1154 # Behind the scenes everyting goes through the 

1155 # async iterator provided by the InterceptedStreamStreamCall. 

1156 # So this path should not be reached. 

1157 raise NotImplementedError() 

1158 

1159 async def done_writing(self) -> None: 

1160 # Behind the scenes everyting goes through the 

1161 # async iterator provided by the InterceptedStreamStreamCall. 

1162 # So this path should not be reached. 

1163 raise NotImplementedError() 

1164 

1165 @property 

1166 def _done_writing_flag(self) -> bool: 

1167 return self._call._done_writing_flag