Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/aio/_interceptor.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

402 statements  

1# Copyright 2019 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Interceptors implementation of gRPC Asyncio Python.""" 

15from __future__ import annotations 

16 

17from abc import ABCMeta 

18from abc import abstractmethod 

19import asyncio 

20import collections 

21import functools 

22from typing import ( 

23 AsyncIterable, 

24 AsyncIterator, 

25 Awaitable, 

26 Callable, 

27 List, 

28 Optional, 

29 Sequence, 

30 Union, 

31) 

32 

33import grpc 

34from grpc._cython import cygrpc 

35 

36from . import _base_call 

37from ._call import AioRpcError 

38from ._call import StreamStreamCall 

39from ._call import StreamUnaryCall 

40from ._call import UnaryStreamCall 

41from ._call import UnaryUnaryCall 

42from ._call import _API_STYLE_ERROR 

43from ._call import _RPC_ALREADY_FINISHED_DETAILS 

44from ._call import _RPC_HALF_CLOSED_DETAILS 

45from ._metadata import Metadata 

46from ._typing import DeserializingFunction 

47from ._typing import DoneCallbackType 

48from ._typing import EOFType 

49from ._typing import RequestIterableType 

50from ._typing import RequestType 

51from ._typing import ResponseIterableType 

52from ._typing import ResponseType 

53from ._typing import SerializingFunction 

54from ._utils import _timeout_to_deadline 

55 

56_LOCAL_CANCELLATION_DETAILS = "Locally cancelled by application!" 

57 

58 

59class ServerInterceptor(metaclass=ABCMeta): 

60 """Affords intercepting incoming RPCs on the service-side. 

61 

62 This is an EXPERIMENTAL API. 

63 """ 

64 

65 @abstractmethod 

66 async def intercept_service( 

67 self, 

68 continuation: Callable[ 

69 [grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler] 

70 ], 

71 handler_call_details: grpc.HandlerCallDetails, 

72 ) -> grpc.RpcMethodHandler: 

73 """Intercepts incoming RPCs before handing them over to a handler. 

74 

75 State can be passed from an interceptor to downstream interceptors 

76 via contextvars. The first interceptor is called from an empty 

77 contextvars.Context, and the same Context is used for downstream 

78 interceptors and for the final handler call. Note that there are no 

79 guarantees that interceptors and handlers will be called from the 

80 same thread. 

81 

82 Args: 

83 continuation: A function that takes a HandlerCallDetails and 

84 proceeds to invoke the next interceptor in the chain, if any, 

85 or the RPC handler lookup logic, with the call details passed 

86 as an argument, and returns an RpcMethodHandler instance if 

87 the RPC is considered serviced, or None otherwise. 

88 handler_call_details: A HandlerCallDetails describing the RPC. 

89 

90 Returns: 

91 An RpcMethodHandler with which the RPC may be serviced if the 

92 interceptor chooses to service this RPC, or None otherwise. 

93 """ 

94 

95 

96class ClientCallDetails( 

97 collections.namedtuple( 

98 "ClientCallDetails", 

99 ("method", "timeout", "metadata", "credentials", "wait_for_ready"), 

100 ), 

101 grpc.ClientCallDetails, 

102): 

103 """Describes an RPC to be invoked. 

104 

105 This is an EXPERIMENTAL API. 

106 

107 Args: 

108 method: The method name of the RPC. 

109 timeout: An optional duration of time in seconds to allow for the RPC. 

110 metadata: Optional metadata to be transmitted to the service-side of 

111 the RPC. 

112 credentials: An optional CallCredentials for the RPC. 

113 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 

114 """ 

115 

116 method: bytes 

117 timeout: Optional[float] 

118 metadata: Optional[Metadata] 

119 credentials: Optional[grpc.CallCredentials] 

120 wait_for_ready: Optional[bool] 

121 

122 

123class ClientInterceptor(metaclass=ABCMeta): 

124 """Base class used for all Aio Client Interceptor classes""" 

125 

126 

127class UnaryUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

128 """Affords intercepting unary-unary invocations.""" 

129 

130 @abstractmethod 

131 async def intercept_unary_unary( 

132 self, 

133 continuation: Callable[ 

134 [ClientCallDetails, RequestType], UnaryUnaryCall 

135 ], 

136 client_call_details: ClientCallDetails, 

137 request: RequestType, 

138 ) -> Union[UnaryUnaryCall, ResponseType]: 

139 """Intercepts a unary-unary invocation asynchronously. 

140 

141 Args: 

142 continuation: A coroutine that proceeds with the invocation by 

143 executing the next interceptor in the chain or invoking the 

144 actual RPC on the underlying Channel. It is the interceptor's 

145 responsibility to call it if it decides to move the RPC forward. 

146 The interceptor can use 

147 `call = await continuation(client_call_details, request)` 

148 to continue with the RPC. `continuation` returns the call to the 

149 RPC. 

150 client_call_details: A ClientCallDetails object describing the 

151 outgoing RPC. 

152 request: The request value for the RPC. 

153 

154 Returns: 

155 An object with the RPC response. 

156 

157 Raises: 

158 AioRpcError: Indicating that the RPC terminated with non-OK status. 

159 asyncio.CancelledError: Indicating that the RPC was canceled. 

160 """ 

161 

162 

163class UnaryStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

164 """Affords intercepting unary-stream invocations.""" 

165 

166 @abstractmethod 

167 async def intercept_unary_stream( 

168 self, 

169 continuation: Callable[ 

170 [ClientCallDetails, RequestType], UnaryStreamCall 

171 ], 

172 client_call_details: ClientCallDetails, 

173 request: RequestType, 

174 ) -> Union[ResponseIterableType, UnaryStreamCall]: 

175 """Intercepts a unary-stream invocation asynchronously. 

176 

177 The function could return the call object or an asynchronous 

178 iterator, in case of being an asyncrhonous iterator this will 

179 become the source of the reads done by the caller. 

180 

181 Args: 

182 continuation: A coroutine that proceeds with the invocation by 

183 executing the next interceptor in the chain or invoking the 

184 actual RPC on the underlying Channel. It is the interceptor's 

185 responsibility to call it if it decides to move the RPC forward. 

186 The interceptor can use 

187 `call = await continuation(client_call_details, request)` 

188 to continue with the RPC. `continuation` returns the call to the 

189 RPC. 

190 client_call_details: A ClientCallDetails object describing the 

191 outgoing RPC. 

192 request: The request value for the RPC. 

193 

194 Returns: 

195 The RPC Call or an asynchronous iterator. 

196 

197 Raises: 

198 AioRpcError: Indicating that the RPC terminated with non-OK status. 

199 asyncio.CancelledError: Indicating that the RPC was canceled. 

200 """ 

201 

202 

203class StreamUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

204 """Affords intercepting stream-unary invocations.""" 

205 

206 @abstractmethod 

207 async def intercept_stream_unary( 

208 self, 

209 continuation: Callable[ 

210 [ClientCallDetails, RequestType], StreamUnaryCall 

211 ], 

212 client_call_details: ClientCallDetails, 

213 request_iterator: RequestIterableType, 

214 ) -> StreamUnaryCall: 

215 """Intercepts a stream-unary invocation asynchronously. 

216 

217 Within the interceptor the usage of the call methods like `write` or 

218 even awaiting the call should be done carefully, since the caller 

219 could be expecting an untouched call, for example for start writing 

220 messages to it. 

221 

222 Args: 

223 continuation: A coroutine that proceeds with the invocation by 

224 executing the next interceptor in the chain or invoking the 

225 actual RPC on the underlying Channel. It is the interceptor's 

226 responsibility to call it if it decides to move the RPC forward. 

227 The interceptor can use 

228 `call = await continuation(client_call_details, request_iterator)` 

229 to continue with the RPC. `continuation` returns the call to the 

230 RPC. 

231 client_call_details: A ClientCallDetails object describing the 

232 outgoing RPC. 

233 request_iterator: The request iterator that will produce requests 

234 for the RPC. 

235 

236 Returns: 

237 The RPC Call. 

238 

239 Raises: 

240 AioRpcError: Indicating that the RPC terminated with non-OK status. 

241 asyncio.CancelledError: Indicating that the RPC was canceled. 

242 """ 

243 

244 

245class StreamStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

246 """Affords intercepting stream-stream invocations.""" 

247 

248 @abstractmethod 

249 async def intercept_stream_stream( 

250 self, 

251 continuation: Callable[ 

252 [ClientCallDetails, RequestType], StreamStreamCall 

253 ], 

254 client_call_details: ClientCallDetails, 

255 request_iterator: RequestIterableType, 

256 ) -> Union[ResponseIterableType, StreamStreamCall]: 

257 """Intercepts a stream-stream invocation asynchronously. 

258 

259 Within the interceptor the usage of the call methods like `write` or 

260 even awaiting the call should be done carefully, since the caller 

261 could be expecting an untouched call, for example for start writing 

262 messages to it. 

263 

264 The function could return the call object or an asynchronous 

265 iterator, in case of being an asyncrhonous iterator this will 

266 become the source of the reads done by the caller. 

267 

268 Args: 

269 continuation: A coroutine that proceeds with the invocation by 

270 executing the next interceptor in the chain or invoking the 

271 actual RPC on the underlying Channel. It is the interceptor's 

272 responsibility to call it if it decides to move the RPC forward. 

273 The interceptor can use 

274 `call = await continuation(client_call_details, request_iterator)` 

275 to continue with the RPC. `continuation` returns the call to the 

276 RPC. 

277 client_call_details: A ClientCallDetails object describing the 

278 outgoing RPC. 

279 request_iterator: The request iterator that will produce requests 

280 for the RPC. 

281 

282 Returns: 

283 The RPC Call or an asynchronous iterator. 

284 

285 Raises: 

286 AioRpcError: Indicating that the RPC terminated with non-OK status. 

287 asyncio.CancelledError: Indicating that the RPC was canceled. 

288 """ 

289 

290 

291class InterceptedCall: 

292 """Base implementation for all intercepted call arities. 

293 

294 Interceptors might have some work to do before the RPC invocation with 

295 the capacity of changing the invocation parameters, and some work to do 

296 after the RPC invocation with the capacity for accessing to the wrapped 

297 `UnaryUnaryCall`. 

298 

299 It handles also early and later cancellations, when the RPC has not even 

300 started and the execution is still held by the interceptors or when the 

301 RPC has finished but again the execution is still held by the interceptors. 

302 

303 Once the RPC is finally executed, all methods are finally done against the 

304 intercepted call, being at the same time the same call returned to the 

305 interceptors. 

306 

307 As a base class for all of the interceptors implements the logic around 

308 final status, metadata and cancellation. 

309 """ 

310 

311 _interceptors_task: asyncio.Task 

312 _pending_add_done_callbacks: Sequence[DoneCallbackType] 

313 

314 def __init__(self, interceptors_task: asyncio.Task) -> None: 

315 self._interceptors_task = interceptors_task 

316 self._pending_add_done_callbacks = [] 

317 self._interceptors_task.add_done_callback( 

318 self._fire_or_add_pending_done_callbacks 

319 ) 

320 

321 def __del__(self): 

322 self.cancel() 

323 

324 def _fire_or_add_pending_done_callbacks( 

325 self, interceptors_task: asyncio.Task 

326 ) -> None: 

327 if not self._pending_add_done_callbacks: 

328 return 

329 

330 call_completed = False 

331 

332 try: 

333 call = interceptors_task.result() 

334 if call.done(): 

335 call_completed = True 

336 except (AioRpcError, asyncio.CancelledError): 

337 call_completed = True 

338 

339 if call_completed: 

340 for callback in self._pending_add_done_callbacks: 

341 callback(self) 

342 else: 

343 for callback in self._pending_add_done_callbacks: 

344 callback = functools.partial( 

345 self._wrap_add_done_callback, callback 

346 ) 

347 call.add_done_callback(callback) 

348 

349 self._pending_add_done_callbacks = [] 

350 

351 def _wrap_add_done_callback( 

352 self, callback: DoneCallbackType, unused_call: _base_call.Call 

353 ) -> None: 

354 callback(self) 

355 

356 def cancel(self) -> bool: 

357 if not self._interceptors_task.done(): 

358 # There is no yet the intercepted call available, 

359 # Trying to cancel it by using the generic Asyncio 

360 # cancellation method. 

361 return self._interceptors_task.cancel() 

362 

363 try: 

364 call = self._interceptors_task.result() 

365 except AioRpcError: 

366 return False 

367 except asyncio.CancelledError: 

368 return False 

369 

370 return call.cancel() 

371 

372 def cancelled(self) -> bool: 

373 if not self._interceptors_task.done(): 

374 return False 

375 

376 try: 

377 call = self._interceptors_task.result() 

378 except AioRpcError as err: 

379 return err.code() == grpc.StatusCode.CANCELLED 

380 except asyncio.CancelledError: 

381 return True 

382 

383 return call.cancelled() 

384 

385 def done(self) -> bool: 

386 if not self._interceptors_task.done(): 

387 return False 

388 

389 try: 

390 call = self._interceptors_task.result() 

391 except (AioRpcError, asyncio.CancelledError): 

392 return True 

393 

394 return call.done() 

395 

396 def add_done_callback(self, callback: DoneCallbackType) -> None: 

397 if not self._interceptors_task.done(): 

398 self._pending_add_done_callbacks.append(callback) 

399 return 

400 

401 try: 

402 call = self._interceptors_task.result() 

403 except (AioRpcError, asyncio.CancelledError): 

404 callback(self) 

405 return 

406 

407 if call.done(): 

408 callback(self) 

409 else: 

410 callback = functools.partial(self._wrap_add_done_callback, callback) 

411 call.add_done_callback(callback) 

412 

413 def time_remaining(self) -> Optional[float]: 

414 raise NotImplementedError() 

415 

416 async def initial_metadata(self) -> Optional[Metadata]: 

417 try: 

418 call = await self._interceptors_task 

419 except AioRpcError as err: 

420 return err.initial_metadata() 

421 except asyncio.CancelledError: 

422 return None 

423 

424 return await call.initial_metadata() 

425 

426 async def trailing_metadata(self) -> Optional[Metadata]: 

427 try: 

428 call = await self._interceptors_task 

429 except AioRpcError as err: 

430 return err.trailing_metadata() 

431 except asyncio.CancelledError: 

432 return None 

433 

434 return await call.trailing_metadata() 

435 

436 async def code(self) -> grpc.StatusCode: 

437 try: 

438 call = await self._interceptors_task 

439 except AioRpcError as err: 

440 return err.code() 

441 except asyncio.CancelledError: 

442 return grpc.StatusCode.CANCELLED 

443 

444 return await call.code() 

445 

446 async def details(self) -> str: 

447 try: 

448 call = await self._interceptors_task 

449 except AioRpcError as err: 

450 return err.details() 

451 except asyncio.CancelledError: 

452 return _LOCAL_CANCELLATION_DETAILS 

453 

454 return await call.details() 

455 

456 async def debug_error_string(self) -> Optional[str]: 

457 try: 

458 call = await self._interceptors_task 

459 except AioRpcError as err: 

460 return err.debug_error_string() 

461 except asyncio.CancelledError: 

462 return "" 

463 

464 return await call.debug_error_string() 

465 

466 async def wait_for_connection(self) -> None: 

467 call = await self._interceptors_task 

468 return await call.wait_for_connection() 

469 

470 

471class _InterceptedUnaryResponseMixin: 

472 def __await__(self): 

473 call = yield from self._interceptors_task.__await__() 

474 response = yield from call.__await__() 

475 return response 

476 

477 

478class _InterceptedStreamResponseMixin: 

479 _response_aiter: Optional[AsyncIterable[ResponseType]] 

480 

481 def _init_stream_response_mixin(self) -> None: 

482 # Is initialized later, otherwise if the iterator is not finally 

483 # consumed a logging warning is emitted by Asyncio. 

484 self._response_aiter = None 

485 

486 async def _wait_for_interceptor_task_response_iterator( 

487 self, 

488 ) -> ResponseType: 

489 call = await self._interceptors_task 

490 async for response in call: 

491 yield response 

492 

493 def __aiter__(self) -> AsyncIterator[ResponseType]: 

494 if self._response_aiter is None: 

495 self._response_aiter = ( 

496 self._wait_for_interceptor_task_response_iterator() 

497 ) 

498 return self._response_aiter 

499 

500 async def read(self) -> Union[EOFType, ResponseType]: 

501 if self._response_aiter is None: 

502 self._response_aiter = ( 

503 self._wait_for_interceptor_task_response_iterator() 

504 ) 

505 try: 

506 return await self._response_aiter.asend(None) 

507 except StopAsyncIteration: 

508 return cygrpc.EOF 

509 

510 

511class _InterceptedStreamRequestMixin: 

512 _write_to_iterator_async_gen: Optional[AsyncIterable[RequestType]] 

513 _write_to_iterator_queue: Optional[asyncio.Queue] 

514 _status_code_task: Optional[asyncio.Task] 

515 

516 _FINISH_ITERATOR_SENTINEL = object() 

517 

518 def _init_stream_request_mixin( 

519 self, request_iterator: Optional[RequestIterableType] 

520 ) -> RequestIterableType: 

521 if request_iterator is None: 

522 # We provide our own request iterator which is a proxy 

523 # of the futures writes that will be done by the caller. 

524 self._write_to_iterator_queue = asyncio.Queue(maxsize=1) 

525 self._write_to_iterator_async_gen = ( 

526 self._proxy_writes_as_request_iterator() 

527 ) 

528 self._status_code_task = None 

529 request_iterator = self._write_to_iterator_async_gen 

530 else: 

531 self._write_to_iterator_queue = None 

532 

533 return request_iterator 

534 

535 async def _proxy_writes_as_request_iterator(self): 

536 await self._interceptors_task 

537 

538 while True: 

539 value = await self._write_to_iterator_queue.get() 

540 if ( 

541 value 

542 is _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL 

543 ): 

544 break 

545 yield value 

546 

547 async def _write_to_iterator_queue_interruptible( 

548 self, 

549 request: RequestType, 

550 call: _base_call.Call, 

551 ): 

552 # Write the specified 'request' to the request iterator queue using the 

553 # specified 'call' to allow for interruption of the write in the case 

554 # of abrupt termination of the call. 

555 if self._status_code_task is None: 

556 self._status_code_task = self._loop.create_task(call.code()) 

557 

558 await asyncio.wait( 

559 ( 

560 self._loop.create_task( 

561 self._write_to_iterator_queue.put(request) 

562 ), 

563 self._status_code_task, 

564 ), 

565 return_when=asyncio.FIRST_COMPLETED, 

566 ) 

567 

568 async def write(self, request: RequestType) -> None: 

569 # If no queue was created it means that requests 

570 # should be expected through an iterators provided 

571 # by the caller. 

572 if self._write_to_iterator_queue is None: 

573 raise cygrpc.UsageError(_API_STYLE_ERROR) 

574 

575 try: 

576 call = await self._interceptors_task 

577 except (asyncio.CancelledError, AioRpcError): 

578 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

579 

580 if call.done(): 

581 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

582 elif call._done_writing_flag: 

583 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS) 

584 

585 await self._write_to_iterator_queue_interruptible(request, call) 

586 

587 if call.done(): 

588 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

589 

590 async def done_writing(self) -> None: 

591 """Signal peer that client is done writing. 

592 

593 This method is idempotent. 

594 """ 

595 # If no queue was created it means that requests 

596 # should be expected through an iterators provided 

597 # by the caller. 

598 if self._write_to_iterator_queue is None: 

599 raise cygrpc.UsageError(_API_STYLE_ERROR) 

600 

601 try: 

602 call = await self._interceptors_task 

603 except asyncio.CancelledError: 

604 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

605 

606 await self._write_to_iterator_queue_interruptible( 

607 _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL, call 

608 ) 

609 

610 

611class InterceptedUnaryUnaryCall( 

612 _InterceptedUnaryResponseMixin, InterceptedCall, _base_call.UnaryUnaryCall 

613): 

614 """Used for running a `UnaryUnaryCall` wrapped by interceptors. 

615 

616 For the `__await__` method is it is proxied to the intercepted call only when 

617 the interceptor task is finished. 

618 """ 

619 

620 _loop: asyncio.AbstractEventLoop 

621 _channel: cygrpc.AioChannel 

622 

623 # pylint: disable=too-many-arguments 

624 def __init__( 

625 self, 

626 interceptors: Sequence[UnaryUnaryClientInterceptor], 

627 request: RequestType, 

628 timeout: Optional[float], 

629 metadata: Metadata, 

630 credentials: Optional[grpc.CallCredentials], 

631 wait_for_ready: Optional[bool], 

632 channel: cygrpc.AioChannel, 

633 method: bytes, 

634 request_serializer: Optional[SerializingFunction], 

635 response_deserializer: Optional[DeserializingFunction], 

636 loop: asyncio.AbstractEventLoop, 

637 ) -> None: 

638 self._loop = loop 

639 self._channel = channel 

640 interceptors_task = loop.create_task( 

641 self._invoke( 

642 interceptors, 

643 method, 

644 timeout, 

645 metadata, 

646 credentials, 

647 wait_for_ready, 

648 request, 

649 request_serializer, 

650 response_deserializer, 

651 ) 

652 ) 

653 super().__init__(interceptors_task) 

654 

655 # pylint: disable=too-many-arguments 

656 async def _invoke( 

657 self, 

658 interceptors: Sequence[UnaryUnaryClientInterceptor], 

659 method: bytes, 

660 timeout: Optional[float], 

661 metadata: Optional[Metadata], 

662 credentials: Optional[grpc.CallCredentials], 

663 wait_for_ready: Optional[bool], 

664 request: RequestType, 

665 request_serializer: Optional[SerializingFunction], 

666 response_deserializer: Optional[DeserializingFunction], 

667 ) -> Union[UnaryUnaryCall, UnaryUnaryCallResponse]: 

668 """Run the RPC call wrapped in interceptors""" 

669 

670 async def _run_interceptor( 

671 interceptors: List[UnaryUnaryClientInterceptor], 

672 client_call_details: ClientCallDetails, 

673 request: RequestType, 

674 ) -> Union[UnaryUnaryCall, UnaryUnaryCallResponse]: 

675 if interceptors: 

676 continuation = functools.partial( 

677 _run_interceptor, interceptors[1:] 

678 ) 

679 call_or_response = await interceptors[0].intercept_unary_unary( 

680 continuation, client_call_details, request 

681 ) 

682 

683 if isinstance(call_or_response, _base_call.UnaryUnaryCall): 

684 return call_or_response 

685 return UnaryUnaryCallResponse(call_or_response) 

686 

687 return UnaryUnaryCall( 

688 request, 

689 _timeout_to_deadline(client_call_details.timeout), 

690 client_call_details.metadata, 

691 client_call_details.credentials, 

692 client_call_details.wait_for_ready, 

693 self._channel, 

694 client_call_details.method, 

695 request_serializer, 

696 response_deserializer, 

697 self._loop, 

698 ) 

699 

700 client_call_details = ClientCallDetails( 

701 method, timeout, metadata, credentials, wait_for_ready 

702 ) 

703 return await _run_interceptor( 

704 list(interceptors), client_call_details, request 

705 ) 

706 

707 def time_remaining(self) -> Optional[float]: 

708 raise NotImplementedError() 

709 

710 

711class InterceptedUnaryStreamCall( 

712 _InterceptedStreamResponseMixin, InterceptedCall, _base_call.UnaryStreamCall 

713): 

714 """Used for running a `UnaryStreamCall` wrapped by interceptors.""" 

715 

716 _loop: asyncio.AbstractEventLoop 

717 _channel: cygrpc.AioChannel 

718 _last_returned_call_from_interceptors = Optional[_base_call.UnaryStreamCall] 

719 

720 # pylint: disable=too-many-arguments 

721 def __init__( 

722 self, 

723 interceptors: Sequence[UnaryStreamClientInterceptor], 

724 request: RequestType, 

725 timeout: Optional[float], 

726 metadata: Metadata, 

727 credentials: Optional[grpc.CallCredentials], 

728 wait_for_ready: Optional[bool], 

729 channel: cygrpc.AioChannel, 

730 method: bytes, 

731 request_serializer: Optional[SerializingFunction], 

732 response_deserializer: Optional[DeserializingFunction], 

733 loop: asyncio.AbstractEventLoop, 

734 ) -> None: 

735 self._loop = loop 

736 self._channel = channel 

737 self._init_stream_response_mixin() 

738 self._last_returned_call_from_interceptors = None 

739 interceptors_task = loop.create_task( 

740 self._invoke( 

741 interceptors, 

742 method, 

743 timeout, 

744 metadata, 

745 credentials, 

746 wait_for_ready, 

747 request, 

748 request_serializer, 

749 response_deserializer, 

750 ) 

751 ) 

752 super().__init__(interceptors_task) 

753 

754 # pylint: disable=too-many-arguments 

755 async def _invoke( 

756 self, 

757 interceptors: Sequence[UnaryStreamClientInterceptor], 

758 method: bytes, 

759 timeout: Optional[float], 

760 metadata: Optional[Metadata], 

761 credentials: Optional[grpc.CallCredentials], 

762 wait_for_ready: Optional[bool], 

763 request: RequestType, 

764 request_serializer: Optional[SerializingFunction], 

765 response_deserializer: Optional[DeserializingFunction], 

766 ) -> Union[UnaryStreamCall, UnaryStreamCallResponseIterator]: 

767 """Run the RPC call wrapped in interceptors""" 

768 

769 async def _run_interceptor( 

770 interceptors: List[UnaryStreamClientInterceptor], 

771 client_call_details: ClientCallDetails, 

772 request: RequestType, 

773 ) -> Union[UnaryStreamCall, UnaryStreamCallResponseIterator]: 

774 if interceptors: 

775 continuation = functools.partial( 

776 _run_interceptor, interceptors[1:] 

777 ) 

778 

779 call_or_response_iterator = await interceptors[ 

780 0 

781 ].intercept_unary_stream( 

782 continuation, client_call_details, request 

783 ) 

784 

785 if isinstance( 

786 call_or_response_iterator, _base_call.UnaryStreamCall 

787 ): 

788 self._last_returned_call_from_interceptors = ( 

789 call_or_response_iterator 

790 ) 

791 else: 

792 self._last_returned_call_from_interceptors = ( 

793 UnaryStreamCallResponseIterator( 

794 self._last_returned_call_from_interceptors, 

795 call_or_response_iterator, 

796 ) 

797 ) 

798 return self._last_returned_call_from_interceptors 

799 self._last_returned_call_from_interceptors = UnaryStreamCall( 

800 request, 

801 _timeout_to_deadline(client_call_details.timeout), 

802 client_call_details.metadata, 

803 client_call_details.credentials, 

804 client_call_details.wait_for_ready, 

805 self._channel, 

806 client_call_details.method, 

807 request_serializer, 

808 response_deserializer, 

809 self._loop, 

810 ) 

811 

812 return self._last_returned_call_from_interceptors 

813 

814 client_call_details = ClientCallDetails( 

815 method, timeout, metadata, credentials, wait_for_ready 

816 ) 

817 return await _run_interceptor( 

818 list(interceptors), client_call_details, request 

819 ) 

820 

821 def time_remaining(self) -> Optional[float]: 

822 raise NotImplementedError() 

823 

824 

825class InterceptedStreamUnaryCall( 

826 _InterceptedUnaryResponseMixin, 

827 _InterceptedStreamRequestMixin, 

828 InterceptedCall, 

829 _base_call.StreamUnaryCall, 

830): 

831 """Used for running a `StreamUnaryCall` wrapped by interceptors. 

832 

833 For the `__await__` method is it is proxied to the intercepted call only when 

834 the interceptor task is finished. 

835 """ 

836 

837 _loop: asyncio.AbstractEventLoop 

838 _channel: cygrpc.AioChannel 

839 

840 # pylint: disable=too-many-arguments 

841 def __init__( 

842 self, 

843 interceptors: Sequence[StreamUnaryClientInterceptor], 

844 request_iterator: Optional[RequestIterableType], 

845 timeout: Optional[float], 

846 metadata: Metadata, 

847 credentials: Optional[grpc.CallCredentials], 

848 wait_for_ready: Optional[bool], 

849 channel: cygrpc.AioChannel, 

850 method: bytes, 

851 request_serializer: Optional[SerializingFunction], 

852 response_deserializer: Optional[DeserializingFunction], 

853 loop: asyncio.AbstractEventLoop, 

854 ) -> None: 

855 self._loop = loop 

856 self._channel = channel 

857 request_iterator = self._init_stream_request_mixin(request_iterator) 

858 interceptors_task = loop.create_task( 

859 self._invoke( 

860 interceptors, 

861 method, 

862 timeout, 

863 metadata, 

864 credentials, 

865 wait_for_ready, 

866 request_iterator, 

867 request_serializer, 

868 response_deserializer, 

869 ) 

870 ) 

871 super().__init__(interceptors_task) 

872 

873 # pylint: disable=too-many-arguments 

874 async def _invoke( 

875 self, 

876 interceptors: Sequence[StreamUnaryClientInterceptor], 

877 method: bytes, 

878 timeout: Optional[float], 

879 metadata: Optional[Metadata], 

880 credentials: Optional[grpc.CallCredentials], 

881 wait_for_ready: Optional[bool], 

882 request_iterator: RequestIterableType, 

883 request_serializer: Optional[SerializingFunction], 

884 response_deserializer: Optional[DeserializingFunction], 

885 ) -> StreamUnaryCall: 

886 """Run the RPC call wrapped in interceptors""" 

887 

888 async def _run_interceptor( 

889 interceptors: Sequence[StreamUnaryClientInterceptor], 

890 client_call_details: ClientCallDetails, 

891 request_iterator: RequestIterableType, 

892 ) -> _base_call.StreamUnaryCall: 

893 if interceptors: 

894 continuation = functools.partial( 

895 _run_interceptor, interceptors[1:] 

896 ) 

897 

898 return await interceptors[0].intercept_stream_unary( 

899 continuation, client_call_details, request_iterator 

900 ) 

901 return StreamUnaryCall( 

902 request_iterator, 

903 _timeout_to_deadline(client_call_details.timeout), 

904 client_call_details.metadata, 

905 client_call_details.credentials, 

906 client_call_details.wait_for_ready, 

907 self._channel, 

908 client_call_details.method, 

909 request_serializer, 

910 response_deserializer, 

911 self._loop, 

912 ) 

913 

914 client_call_details = ClientCallDetails( 

915 method, timeout, metadata, credentials, wait_for_ready 

916 ) 

917 return await _run_interceptor( 

918 list(interceptors), client_call_details, request_iterator 

919 ) 

920 

921 def time_remaining(self) -> Optional[float]: 

922 raise NotImplementedError() 

923 

924 

925class InterceptedStreamStreamCall( 

926 _InterceptedStreamResponseMixin, 

927 _InterceptedStreamRequestMixin, 

928 InterceptedCall, 

929 _base_call.StreamStreamCall, 

930): 

931 """Used for running a `StreamStreamCall` wrapped by interceptors.""" 

932 

933 _loop: asyncio.AbstractEventLoop 

934 _channel: cygrpc.AioChannel 

935 _last_returned_call_from_interceptors = Optional[ 

936 _base_call.StreamStreamCall 

937 ] 

938 

939 # pylint: disable=too-many-arguments 

940 def __init__( 

941 self, 

942 interceptors: Sequence[StreamStreamClientInterceptor], 

943 request_iterator: Optional[RequestIterableType], 

944 timeout: Optional[float], 

945 metadata: Metadata, 

946 credentials: Optional[grpc.CallCredentials], 

947 wait_for_ready: Optional[bool], 

948 channel: cygrpc.AioChannel, 

949 method: bytes, 

950 request_serializer: Optional[SerializingFunction], 

951 response_deserializer: Optional[DeserializingFunction], 

952 loop: asyncio.AbstractEventLoop, 

953 ) -> None: 

954 self._loop = loop 

955 self._channel = channel 

956 self._init_stream_response_mixin() 

957 request_iterator = self._init_stream_request_mixin(request_iterator) 

958 self._last_returned_call_from_interceptors = None 

959 interceptors_task = loop.create_task( 

960 self._invoke( 

961 interceptors, 

962 method, 

963 timeout, 

964 metadata, 

965 credentials, 

966 wait_for_ready, 

967 request_iterator, 

968 request_serializer, 

969 response_deserializer, 

970 ) 

971 ) 

972 super().__init__(interceptors_task) 

973 

974 # pylint: disable=too-many-arguments 

975 async def _invoke( 

976 self, 

977 interceptors: Sequence[StreamStreamClientInterceptor], 

978 method: bytes, 

979 timeout: Optional[float], 

980 metadata: Optional[Metadata], 

981 credentials: Optional[grpc.CallCredentials], 

982 wait_for_ready: Optional[bool], 

983 request_iterator: RequestIterableType, 

984 request_serializer: Optional[SerializingFunction], 

985 response_deserializer: Optional[DeserializingFunction], 

986 ) -> Union[StreamStreamCall, StreamStreamCallResponseIterator]: 

987 """Run the RPC call wrapped in interceptors""" 

988 

989 async def _run_interceptor( 

990 interceptors: List[StreamStreamClientInterceptor], 

991 client_call_details: ClientCallDetails, 

992 request_iterator: RequestIterableType, 

993 ) -> Union[StreamStreamCall, StreamStreamCallResponseIterator]: 

994 if interceptors: 

995 continuation = functools.partial( 

996 _run_interceptor, interceptors[1:] 

997 ) 

998 

999 call_or_response_iterator = await interceptors[ 

1000 0 

1001 ].intercept_stream_stream( 

1002 continuation, client_call_details, request_iterator 

1003 ) 

1004 

1005 if isinstance( 

1006 call_or_response_iterator, _base_call.StreamStreamCall 

1007 ): 

1008 self._last_returned_call_from_interceptors = ( 

1009 call_or_response_iterator 

1010 ) 

1011 else: 

1012 self._last_returned_call_from_interceptors = ( 

1013 StreamStreamCallResponseIterator( 

1014 self._last_returned_call_from_interceptors, 

1015 call_or_response_iterator, 

1016 ) 

1017 ) 

1018 return self._last_returned_call_from_interceptors 

1019 self._last_returned_call_from_interceptors = StreamStreamCall( 

1020 request_iterator, 

1021 _timeout_to_deadline(client_call_details.timeout), 

1022 client_call_details.metadata, 

1023 client_call_details.credentials, 

1024 client_call_details.wait_for_ready, 

1025 self._channel, 

1026 client_call_details.method, 

1027 request_serializer, 

1028 response_deserializer, 

1029 self._loop, 

1030 ) 

1031 return self._last_returned_call_from_interceptors 

1032 

1033 client_call_details = ClientCallDetails( 

1034 method, timeout, metadata, credentials, wait_for_ready 

1035 ) 

1036 return await _run_interceptor( 

1037 list(interceptors), client_call_details, request_iterator 

1038 ) 

1039 

1040 def time_remaining(self) -> Optional[float]: 

1041 raise NotImplementedError() 

1042 

1043 

1044class UnaryUnaryCallResponse(_base_call.UnaryUnaryCall): 

1045 """Final UnaryUnaryCall class finished with a response.""" 

1046 

1047 _response: ResponseType 

1048 

1049 def __init__(self, response: ResponseType) -> None: 

1050 self._response = response 

1051 

1052 def cancel(self) -> bool: 

1053 return False 

1054 

1055 def cancelled(self) -> bool: 

1056 return False 

1057 

1058 def done(self) -> bool: 

1059 return True 

1060 

1061 def add_done_callback(self, unused_callback) -> None: 

1062 raise NotImplementedError() 

1063 

1064 def time_remaining(self) -> Optional[float]: 

1065 raise NotImplementedError() 

1066 

1067 async def initial_metadata(self) -> Optional[Metadata]: 

1068 return None 

1069 

1070 async def trailing_metadata(self) -> Optional[Metadata]: 

1071 return None 

1072 

1073 async def code(self) -> grpc.StatusCode: 

1074 return grpc.StatusCode.OK 

1075 

1076 async def details(self) -> str: 

1077 return "" 

1078 

1079 async def debug_error_string(self) -> Optional[str]: 

1080 return None 

1081 

1082 def __await__(self): 

1083 if False: # pylint: disable=using-constant-test 

1084 # This code path is never used, but a yield statement is needed 

1085 # for telling the interpreter that __await__ is a generator. 

1086 yield None 

1087 return self._response 

1088 

1089 async def wait_for_connection(self) -> None: 

1090 pass 

1091 

1092 

1093class _StreamCallResponseIterator: 

1094 _call: Union[_base_call.UnaryStreamCall, _base_call.StreamStreamCall] 

1095 _response_iterator: AsyncIterable[ResponseType] 

1096 

1097 def __init__( 

1098 self, 

1099 call: Union[_base_call.UnaryStreamCall, _base_call.StreamStreamCall], 

1100 response_iterator: AsyncIterable[ResponseType], 

1101 ) -> None: 

1102 self._response_iterator = response_iterator 

1103 self._call = call 

1104 

1105 def cancel(self) -> bool: 

1106 return self._call.cancel() 

1107 

1108 def cancelled(self) -> bool: 

1109 return self._call.cancelled() 

1110 

1111 def done(self) -> bool: 

1112 return self._call.done() 

1113 

1114 def add_done_callback(self, callback) -> None: 

1115 self._call.add_done_callback(callback) 

1116 

1117 def time_remaining(self) -> Optional[float]: 

1118 return self._call.time_remaining() 

1119 

1120 async def initial_metadata(self) -> Optional[Metadata]: 

1121 return await self._call.initial_metadata() 

1122 

1123 async def trailing_metadata(self) -> Optional[Metadata]: 

1124 return await self._call.trailing_metadata() 

1125 

1126 async def code(self) -> grpc.StatusCode: 

1127 return await self._call.code() 

1128 

1129 async def details(self) -> str: 

1130 return await self._call.details() 

1131 

1132 async def debug_error_string(self) -> Optional[str]: 

1133 return await self._call.debug_error_string() 

1134 

1135 def __aiter__(self): 

1136 return self._response_iterator.__aiter__() 

1137 

1138 async def wait_for_connection(self) -> None: 

1139 return await self._call.wait_for_connection() 

1140 

1141 

1142class UnaryStreamCallResponseIterator( 

1143 _StreamCallResponseIterator, _base_call.UnaryStreamCall 

1144): 

1145 """UnaryStreamCall class which uses an alternative response iterator.""" 

1146 

1147 async def read(self) -> Union[EOFType, ResponseType]: 

1148 # Behind the scenes everything goes through the 

1149 # async iterator. So this path should not be reached. 

1150 raise NotImplementedError() 

1151 

1152 

1153class StreamStreamCallResponseIterator( 

1154 _StreamCallResponseIterator, _base_call.StreamStreamCall 

1155): 

1156 """StreamStreamCall class which uses an alternative response iterator.""" 

1157 

1158 async def read(self) -> Union[EOFType, ResponseType]: 

1159 # Behind the scenes everything goes through the 

1160 # async iterator. So this path should not be reached. 

1161 raise NotImplementedError() 

1162 

1163 async def write(self, request: RequestType) -> None: 

1164 # Behind the scenes everything goes through the 

1165 # async iterator provided by the InterceptedStreamStreamCall. 

1166 # So this path should not be reached. 

1167 raise NotImplementedError() 

1168 

1169 async def done_writing(self) -> None: 

1170 # Behind the scenes everything goes through the 

1171 # async iterator provided by the InterceptedStreamStreamCall. 

1172 # So this path should not be reached. 

1173 raise NotImplementedError() 

1174 

1175 @property 

1176 def _done_writing_flag(self) -> bool: 

1177 return self._call._done_writing_flag