Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_interceptor.py: 38%

394 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-06 06:03 +0000

1# Copyright 2019 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Interceptors implementation of gRPC Asyncio Python.""" 

15from abc import ABCMeta 

16from abc import abstractmethod 

17import asyncio 

18import collections 

19import functools 

20from typing import (AsyncIterable, Awaitable, Callable, Iterator, List, 

21 Optional, Sequence, Union) 

22 

23import grpc 

24from grpc._cython import cygrpc 

25 

26from . import _base_call 

27from ._call import AioRpcError 

28from ._call import StreamStreamCall 

29from ._call import StreamUnaryCall 

30from ._call import UnaryStreamCall 

31from ._call import UnaryUnaryCall 

32from ._call import _API_STYLE_ERROR 

33from ._call import _RPC_ALREADY_FINISHED_DETAILS 

34from ._call import _RPC_HALF_CLOSED_DETAILS 

35from ._metadata import Metadata 

36from ._typing import DeserializingFunction 

37from ._typing import DoneCallbackType 

38from ._typing import RequestIterableType 

39from ._typing import RequestType 

40from ._typing import ResponseIterableType 

41from ._typing import ResponseType 

42from ._typing import SerializingFunction 

43from ._utils import _timeout_to_deadline 

44 

45_LOCAL_CANCELLATION_DETAILS = 'Locally cancelled by application!' 

46 

47 

48class ServerInterceptor(metaclass=ABCMeta): 

49 """Affords intercepting incoming RPCs on the service-side. 

50 

51 This is an EXPERIMENTAL API. 

52 """ 

53 

54 @abstractmethod 

55 async def intercept_service( 

56 self, continuation: Callable[[grpc.HandlerCallDetails], 

57 Awaitable[grpc.RpcMethodHandler]], 

58 handler_call_details: grpc.HandlerCallDetails 

59 ) -> grpc.RpcMethodHandler: 

60 """Intercepts incoming RPCs before handing them over to a handler. 

61 

62 Args: 

63 continuation: A function that takes a HandlerCallDetails and 

64 proceeds to invoke the next interceptor in the chain, if any, 

65 or the RPC handler lookup logic, with the call details passed 

66 as an argument, and returns an RpcMethodHandler instance if 

67 the RPC is considered serviced, or None otherwise. 

68 handler_call_details: A HandlerCallDetails describing the RPC. 

69 

70 Returns: 

71 An RpcMethodHandler with which the RPC may be serviced if the 

72 interceptor chooses to service this RPC, or None otherwise. 

73 """ 

74 

75 

76class ClientCallDetails( 

77 collections.namedtuple( 

78 'ClientCallDetails', 

79 ('method', 'timeout', 'metadata', 'credentials', 'wait_for_ready')), 

80 grpc.ClientCallDetails): 

81 """Describes an RPC to be invoked. 

82 

83 This is an EXPERIMENTAL API. 

84 

85 Args: 

86 method: The method name of the RPC. 

87 timeout: An optional duration of time in seconds to allow for the RPC. 

88 metadata: Optional metadata to be transmitted to the service-side of 

89 the RPC. 

90 credentials: An optional CallCredentials for the RPC. 

91 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 

92 """ 

93 

94 method: str 

95 timeout: Optional[float] 

96 metadata: Optional[Metadata] 

97 credentials: Optional[grpc.CallCredentials] 

98 wait_for_ready: Optional[bool] 

99 

100 

101class ClientInterceptor(metaclass=ABCMeta): 

102 """Base class used for all Aio Client Interceptor classes""" 

103 

104 

105class UnaryUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

106 """Affords intercepting unary-unary invocations.""" 

107 

108 @abstractmethod 

109 async def intercept_unary_unary( 

110 self, continuation: Callable[[ClientCallDetails, RequestType], 

111 UnaryUnaryCall], 

112 client_call_details: ClientCallDetails, 

113 request: RequestType) -> Union[UnaryUnaryCall, ResponseType]: 

114 """Intercepts a unary-unary invocation asynchronously. 

115 

116 Args: 

117 continuation: A coroutine that proceeds with the invocation by 

118 executing the next interceptor in the chain or invoking the 

119 actual RPC on the underlying Channel. It is the interceptor's 

120 responsibility to call it if it decides to move the RPC forward. 

121 The interceptor can use 

122 `call = await continuation(client_call_details, request)` 

123 to continue with the RPC. `continuation` returns the call to the 

124 RPC. 

125 client_call_details: A ClientCallDetails object describing the 

126 outgoing RPC. 

127 request: The request value for the RPC. 

128 

129 Returns: 

130 An object with the RPC response. 

131 

132 Raises: 

133 AioRpcError: Indicating that the RPC terminated with non-OK status. 

134 asyncio.CancelledError: Indicating that the RPC was canceled. 

135 """ 

136 

137 

138class UnaryStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

139 """Affords intercepting unary-stream invocations.""" 

140 

141 @abstractmethod 

142 async def intercept_unary_stream( 

143 self, continuation: Callable[[ClientCallDetails, RequestType], 

144 UnaryStreamCall], 

145 client_call_details: ClientCallDetails, request: RequestType 

146 ) -> Union[ResponseIterableType, UnaryStreamCall]: 

147 """Intercepts a unary-stream invocation asynchronously. 

148 

149 The function could return the call object or an asynchronous 

150 iterator, in case of being an asyncrhonous iterator this will 

151 become the source of the reads done by the caller. 

152 

153 Args: 

154 continuation: A coroutine that proceeds with the invocation by 

155 executing the next interceptor in the chain or invoking the 

156 actual RPC on the underlying Channel. It is the interceptor's 

157 responsibility to call it if it decides to move the RPC forward. 

158 The interceptor can use 

159 `call = await continuation(client_call_details, request)` 

160 to continue with the RPC. `continuation` returns the call to the 

161 RPC. 

162 client_call_details: A ClientCallDetails object describing the 

163 outgoing RPC. 

164 request: The request value for the RPC. 

165 

166 Returns: 

167 The RPC Call or an asynchronous iterator. 

168 

169 Raises: 

170 AioRpcError: Indicating that the RPC terminated with non-OK status. 

171 asyncio.CancelledError: Indicating that the RPC was canceled. 

172 """ 

173 

174 

175class StreamUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

176 """Affords intercepting stream-unary invocations.""" 

177 

178 @abstractmethod 

179 async def intercept_stream_unary( 

180 self, 

181 continuation: Callable[[ClientCallDetails, RequestType], 

182 StreamUnaryCall], 

183 client_call_details: ClientCallDetails, 

184 request_iterator: RequestIterableType, 

185 ) -> StreamUnaryCall: 

186 """Intercepts a stream-unary invocation asynchronously. 

187 

188 Within the interceptor the usage of the call methods like `write` or 

189 even awaiting the call should be done carefully, since the caller 

190 could be expecting an untouched call, for example for start writing 

191 messages to it. 

192 

193 Args: 

194 continuation: A coroutine that proceeds with the invocation by 

195 executing the next interceptor in the chain or invoking the 

196 actual RPC on the underlying Channel. It is the interceptor's 

197 responsibility to call it if it decides to move the RPC forward. 

198 The interceptor can use 

199 `call = await continuation(client_call_details, request_iterator)` 

200 to continue with the RPC. `continuation` returns the call to the 

201 RPC. 

202 client_call_details: A ClientCallDetails object describing the 

203 outgoing RPC. 

204 request_iterator: The request iterator that will produce requests 

205 for the RPC. 

206 

207 Returns: 

208 The RPC Call. 

209 

210 Raises: 

211 AioRpcError: Indicating that the RPC terminated with non-OK status. 

212 asyncio.CancelledError: Indicating that the RPC was canceled. 

213 """ 

214 

215 

216class StreamStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

217 """Affords intercepting stream-stream invocations.""" 

218 

219 @abstractmethod 

220 async def intercept_stream_stream( 

221 self, 

222 continuation: Callable[[ClientCallDetails, RequestType], 

223 StreamStreamCall], 

224 client_call_details: ClientCallDetails, 

225 request_iterator: RequestIterableType, 

226 ) -> Union[ResponseIterableType, StreamStreamCall]: 

227 """Intercepts a stream-stream invocation asynchronously. 

228 

229 Within the interceptor the usage of the call methods like `write` or 

230 even awaiting the call should be done carefully, since the caller 

231 could be expecting an untouched call, for example for start writing 

232 messages to it. 

233 

234 The function could return the call object or an asynchronous 

235 iterator, in case of being an asyncrhonous iterator this will 

236 become the source of the reads done by the caller. 

237 

238 Args: 

239 continuation: A coroutine that proceeds with the invocation by 

240 executing the next interceptor in the chain or invoking the 

241 actual RPC on the underlying Channel. It is the interceptor's 

242 responsibility to call it if it decides to move the RPC forward. 

243 The interceptor can use 

244 `call = await continuation(client_call_details, request_iterator)` 

245 to continue with the RPC. `continuation` returns the call to the 

246 RPC. 

247 client_call_details: A ClientCallDetails object describing the 

248 outgoing RPC. 

249 request_iterator: The request iterator that will produce requests 

250 for the RPC. 

251 

252 Returns: 

253 The RPC Call or an asynchronous iterator. 

254 

255 Raises: 

256 AioRpcError: Indicating that the RPC terminated with non-OK status. 

257 asyncio.CancelledError: Indicating that the RPC was canceled. 

258 """ 

259 

260 

261class InterceptedCall: 

262 """Base implementation for all intercepted call arities. 

263 

264 Interceptors might have some work to do before the RPC invocation with 

265 the capacity of changing the invocation parameters, and some work to do 

266 after the RPC invocation with the capacity for accessing to the wrapped 

267 `UnaryUnaryCall`. 

268 

269 It handles also early and later cancellations, when the RPC has not even 

270 started and the execution is still held by the interceptors or when the 

271 RPC has finished but again the execution is still held by the interceptors. 

272 

273 Once the RPC is finally executed, all methods are finally done against the 

274 intercepted call, being at the same time the same call returned to the 

275 interceptors. 

276 

277 As a base class for all of the interceptors implements the logic around 

278 final status, metadata and cancellation. 

279 """ 

280 

281 _interceptors_task: asyncio.Task 

282 _pending_add_done_callbacks: Sequence[DoneCallbackType] 

283 

284 def __init__(self, interceptors_task: asyncio.Task) -> None: 

285 self._interceptors_task = interceptors_task 

286 self._pending_add_done_callbacks = [] 

287 self._interceptors_task.add_done_callback( 

288 self._fire_or_add_pending_done_callbacks) 

289 

290 def __del__(self): 

291 self.cancel() 

292 

293 def _fire_or_add_pending_done_callbacks( 

294 self, interceptors_task: asyncio.Task) -> None: 

295 

296 if not self._pending_add_done_callbacks: 

297 return 

298 

299 call_completed = False 

300 

301 try: 

302 call = interceptors_task.result() 

303 if call.done(): 

304 call_completed = True 

305 except (AioRpcError, asyncio.CancelledError): 

306 call_completed = True 

307 

308 if call_completed: 

309 for callback in self._pending_add_done_callbacks: 

310 callback(self) 

311 else: 

312 for callback in self._pending_add_done_callbacks: 

313 callback = functools.partial(self._wrap_add_done_callback, 

314 callback) 

315 call.add_done_callback(callback) 

316 

317 self._pending_add_done_callbacks = [] 

318 

319 def _wrap_add_done_callback(self, callback: DoneCallbackType, 

320 unused_call: _base_call.Call) -> None: 

321 callback(self) 

322 

323 def cancel(self) -> bool: 

324 if not self._interceptors_task.done(): 

325 # There is no yet the intercepted call available, 

326 # Trying to cancel it by using the generic Asyncio 

327 # cancellation method. 

328 return self._interceptors_task.cancel() 

329 

330 try: 

331 call = self._interceptors_task.result() 

332 except AioRpcError: 

333 return False 

334 except asyncio.CancelledError: 

335 return False 

336 

337 return call.cancel() 

338 

339 def cancelled(self) -> bool: 

340 if not self._interceptors_task.done(): 

341 return False 

342 

343 try: 

344 call = self._interceptors_task.result() 

345 except AioRpcError as err: 

346 return err.code() == grpc.StatusCode.CANCELLED 

347 except asyncio.CancelledError: 

348 return True 

349 

350 return call.cancelled() 

351 

352 def done(self) -> bool: 

353 if not self._interceptors_task.done(): 

354 return False 

355 

356 try: 

357 call = self._interceptors_task.result() 

358 except (AioRpcError, asyncio.CancelledError): 

359 return True 

360 

361 return call.done() 

362 

363 def add_done_callback(self, callback: DoneCallbackType) -> None: 

364 if not self._interceptors_task.done(): 

365 self._pending_add_done_callbacks.append(callback) 

366 return 

367 

368 try: 

369 call = self._interceptors_task.result() 

370 except (AioRpcError, asyncio.CancelledError): 

371 callback(self) 

372 return 

373 

374 if call.done(): 

375 callback(self) 

376 else: 

377 callback = functools.partial(self._wrap_add_done_callback, callback) 

378 call.add_done_callback(callback) 

379 

380 def time_remaining(self) -> Optional[float]: 

381 raise NotImplementedError() 

382 

383 async def initial_metadata(self) -> Optional[Metadata]: 

384 try: 

385 call = await self._interceptors_task 

386 except AioRpcError as err: 

387 return err.initial_metadata() 

388 except asyncio.CancelledError: 

389 return None 

390 

391 return await call.initial_metadata() 

392 

393 async def trailing_metadata(self) -> Optional[Metadata]: 

394 try: 

395 call = await self._interceptors_task 

396 except AioRpcError as err: 

397 return err.trailing_metadata() 

398 except asyncio.CancelledError: 

399 return None 

400 

401 return await call.trailing_metadata() 

402 

403 async def code(self) -> grpc.StatusCode: 

404 try: 

405 call = await self._interceptors_task 

406 except AioRpcError as err: 

407 return err.code() 

408 except asyncio.CancelledError: 

409 return grpc.StatusCode.CANCELLED 

410 

411 return await call.code() 

412 

413 async def details(self) -> str: 

414 try: 

415 call = await self._interceptors_task 

416 except AioRpcError as err: 

417 return err.details() 

418 except asyncio.CancelledError: 

419 return _LOCAL_CANCELLATION_DETAILS 

420 

421 return await call.details() 

422 

423 async def debug_error_string(self) -> Optional[str]: 

424 try: 

425 call = await self._interceptors_task 

426 except AioRpcError as err: 

427 return err.debug_error_string() 

428 except asyncio.CancelledError: 

429 return '' 

430 

431 return await call.debug_error_string() 

432 

433 async def wait_for_connection(self) -> None: 

434 call = await self._interceptors_task 

435 return await call.wait_for_connection() 

436 

437 

438class _InterceptedUnaryResponseMixin: 

439 

440 def __await__(self): 

441 call = yield from self._interceptors_task.__await__() 

442 response = yield from call.__await__() 

443 return response 

444 

445 

446class _InterceptedStreamResponseMixin: 

447 _response_aiter: Optional[AsyncIterable[ResponseType]] 

448 

449 def _init_stream_response_mixin(self) -> None: 

450 # Is initalized later, otherwise if the iterator is not finnally 

451 # consumed a logging warning is emmited by Asyncio. 

452 self._response_aiter = None 

453 

454 async def _wait_for_interceptor_task_response_iterator( 

455 self) -> ResponseType: 

456 call = await self._interceptors_task 

457 async for response in call: 

458 yield response 

459 

460 def __aiter__(self) -> AsyncIterable[ResponseType]: 

461 if self._response_aiter is None: 

462 self._response_aiter = self._wait_for_interceptor_task_response_iterator( 

463 ) 

464 return self._response_aiter 

465 

466 async def read(self) -> ResponseType: 

467 if self._response_aiter is None: 

468 self._response_aiter = self._wait_for_interceptor_task_response_iterator( 

469 ) 

470 return await self._response_aiter.asend(None) 

471 

472 

473class _InterceptedStreamRequestMixin: 

474 

475 _write_to_iterator_async_gen: Optional[AsyncIterable[RequestType]] 

476 _write_to_iterator_queue: Optional[asyncio.Queue] 

477 _status_code_task: Optional[asyncio.Task] 

478 

479 _FINISH_ITERATOR_SENTINEL = object() 

480 

481 def _init_stream_request_mixin( 

482 self, request_iterator: Optional[RequestIterableType] 

483 ) -> RequestIterableType: 

484 

485 if request_iterator is None: 

486 # We provide our own request iterator which is a proxy 

487 # of the futures writes that will be done by the caller. 

488 self._write_to_iterator_queue = asyncio.Queue(maxsize=1) 

489 self._write_to_iterator_async_gen = self._proxy_writes_as_request_iterator( 

490 ) 

491 self._status_code_task = None 

492 request_iterator = self._write_to_iterator_async_gen 

493 else: 

494 self._write_to_iterator_queue = None 

495 

496 return request_iterator 

497 

498 async def _proxy_writes_as_request_iterator(self): 

499 await self._interceptors_task 

500 

501 while True: 

502 value = await self._write_to_iterator_queue.get() 

503 if value is _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL: 

504 break 

505 yield value 

506 

507 async def _write_to_iterator_queue_interruptible(self, request: RequestType, 

508 call: InterceptedCall): 

509 # Write the specified 'request' to the request iterator queue using the 

510 # specified 'call' to allow for interruption of the write in the case 

511 # of abrupt termination of the call. 

512 if self._status_code_task is None: 

513 self._status_code_task = self._loop.create_task(call.code()) 

514 

515 await asyncio.wait( 

516 (self._loop.create_task(self._write_to_iterator_queue.put(request)), 

517 self._status_code_task), 

518 return_when=asyncio.FIRST_COMPLETED) 

519 

520 async def write(self, request: RequestType) -> None: 

521 # If no queue was created it means that requests 

522 # should be expected through an iterators provided 

523 # by the caller. 

524 if self._write_to_iterator_queue is None: 

525 raise cygrpc.UsageError(_API_STYLE_ERROR) 

526 

527 try: 

528 call = await self._interceptors_task 

529 except (asyncio.CancelledError, AioRpcError): 

530 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

531 

532 if call.done(): 

533 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

534 elif call._done_writing_flag: 

535 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS) 

536 

537 await self._write_to_iterator_queue_interruptible(request, call) 

538 

539 if call.done(): 

540 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

541 

542 async def done_writing(self) -> None: 

543 """Signal peer that client is done writing. 

544 

545 This method is idempotent. 

546 """ 

547 # If no queue was created it means that requests 

548 # should be expected through an iterators provided 

549 # by the caller. 

550 if self._write_to_iterator_queue is None: 

551 raise cygrpc.UsageError(_API_STYLE_ERROR) 

552 

553 try: 

554 call = await self._interceptors_task 

555 except asyncio.CancelledError: 

556 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

557 

558 await self._write_to_iterator_queue_interruptible( 

559 _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL, call) 

560 

561 

562class InterceptedUnaryUnaryCall(_InterceptedUnaryResponseMixin, InterceptedCall, 

563 _base_call.UnaryUnaryCall): 

564 """Used for running a `UnaryUnaryCall` wrapped by interceptors. 

565 

566 For the `__await__` method is it is proxied to the intercepted call only when 

567 the interceptor task is finished. 

568 """ 

569 

570 _loop: asyncio.AbstractEventLoop 

571 _channel: cygrpc.AioChannel 

572 

573 # pylint: disable=too-many-arguments 

574 def __init__(self, interceptors: Sequence[UnaryUnaryClientInterceptor], 

575 request: RequestType, timeout: Optional[float], 

576 metadata: Metadata, 

577 credentials: Optional[grpc.CallCredentials], 

578 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, 

579 method: bytes, request_serializer: SerializingFunction, 

580 response_deserializer: DeserializingFunction, 

581 loop: asyncio.AbstractEventLoop) -> None: 

582 self._loop = loop 

583 self._channel = channel 

584 interceptors_task = loop.create_task( 

585 self._invoke(interceptors, method, timeout, metadata, credentials, 

586 wait_for_ready, request, request_serializer, 

587 response_deserializer)) 

588 super().__init__(interceptors_task) 

589 

590 # pylint: disable=too-many-arguments 

591 async def _invoke( 

592 self, interceptors: Sequence[UnaryUnaryClientInterceptor], 

593 method: bytes, timeout: Optional[float], 

594 metadata: Optional[Metadata], 

595 credentials: Optional[grpc.CallCredentials], 

596 wait_for_ready: Optional[bool], request: RequestType, 

597 request_serializer: SerializingFunction, 

598 response_deserializer: DeserializingFunction) -> UnaryUnaryCall: 

599 """Run the RPC call wrapped in interceptors""" 

600 

601 async def _run_interceptor( 

602 interceptors: List[UnaryUnaryClientInterceptor], 

603 client_call_details: ClientCallDetails, 

604 request: RequestType) -> _base_call.UnaryUnaryCall: 

605 

606 if interceptors: 

607 continuation = functools.partial(_run_interceptor, 

608 interceptors[1:]) 

609 call_or_response = await interceptors[0].intercept_unary_unary( 

610 continuation, client_call_details, request) 

611 

612 if isinstance(call_or_response, _base_call.UnaryUnaryCall): 

613 return call_or_response 

614 else: 

615 return UnaryUnaryCallResponse(call_or_response) 

616 

617 else: 

618 return UnaryUnaryCall( 

619 request, _timeout_to_deadline(client_call_details.timeout), 

620 client_call_details.metadata, 

621 client_call_details.credentials, 

622 client_call_details.wait_for_ready, self._channel, 

623 client_call_details.method, request_serializer, 

624 response_deserializer, self._loop) 

625 

626 client_call_details = ClientCallDetails(method, timeout, metadata, 

627 credentials, wait_for_ready) 

628 return await _run_interceptor(list(interceptors), client_call_details, 

629 request) 

630 

631 def time_remaining(self) -> Optional[float]: 

632 raise NotImplementedError() 

633 

634 

635class InterceptedUnaryStreamCall(_InterceptedStreamResponseMixin, 

636 InterceptedCall, _base_call.UnaryStreamCall): 

637 """Used for running a `UnaryStreamCall` wrapped by interceptors.""" 

638 

639 _loop: asyncio.AbstractEventLoop 

640 _channel: cygrpc.AioChannel 

641 _last_returned_call_from_interceptors = Optional[_base_call.UnaryStreamCall] 

642 

643 # pylint: disable=too-many-arguments 

644 def __init__(self, interceptors: Sequence[UnaryStreamClientInterceptor], 

645 request: RequestType, timeout: Optional[float], 

646 metadata: Metadata, 

647 credentials: Optional[grpc.CallCredentials], 

648 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, 

649 method: bytes, request_serializer: SerializingFunction, 

650 response_deserializer: DeserializingFunction, 

651 loop: asyncio.AbstractEventLoop) -> None: 

652 self._loop = loop 

653 self._channel = channel 

654 self._init_stream_response_mixin() 

655 self._last_returned_call_from_interceptors = None 

656 interceptors_task = loop.create_task( 

657 self._invoke(interceptors, method, timeout, metadata, credentials, 

658 wait_for_ready, request, request_serializer, 

659 response_deserializer)) 

660 super().__init__(interceptors_task) 

661 

662 # pylint: disable=too-many-arguments 

663 async def _invoke( 

664 self, interceptors: Sequence[UnaryUnaryClientInterceptor], 

665 method: bytes, timeout: Optional[float], 

666 metadata: Optional[Metadata], 

667 credentials: Optional[grpc.CallCredentials], 

668 wait_for_ready: Optional[bool], request: RequestType, 

669 request_serializer: SerializingFunction, 

670 response_deserializer: DeserializingFunction) -> UnaryStreamCall: 

671 """Run the RPC call wrapped in interceptors""" 

672 

673 async def _run_interceptor( 

674 interceptors: List[UnaryStreamClientInterceptor], 

675 client_call_details: ClientCallDetails, 

676 request: RequestType, 

677 ) -> _base_call.UnaryUnaryCall: 

678 

679 if interceptors: 

680 continuation = functools.partial(_run_interceptor, 

681 interceptors[1:]) 

682 

683 call_or_response_iterator = await interceptors[ 

684 0].intercept_unary_stream(continuation, client_call_details, 

685 request) 

686 

687 if isinstance(call_or_response_iterator, 

688 _base_call.UnaryStreamCall): 

689 self._last_returned_call_from_interceptors = call_or_response_iterator 

690 else: 

691 self._last_returned_call_from_interceptors = UnaryStreamCallResponseIterator( 

692 self._last_returned_call_from_interceptors, 

693 call_or_response_iterator) 

694 return self._last_returned_call_from_interceptors 

695 else: 

696 self._last_returned_call_from_interceptors = UnaryStreamCall( 

697 request, _timeout_to_deadline(client_call_details.timeout), 

698 client_call_details.metadata, 

699 client_call_details.credentials, 

700 client_call_details.wait_for_ready, self._channel, 

701 client_call_details.method, request_serializer, 

702 response_deserializer, self._loop) 

703 

704 return self._last_returned_call_from_interceptors 

705 

706 client_call_details = ClientCallDetails(method, timeout, metadata, 

707 credentials, wait_for_ready) 

708 return await _run_interceptor(list(interceptors), client_call_details, 

709 request) 

710 

711 def time_remaining(self) -> Optional[float]: 

712 raise NotImplementedError() 

713 

714 

715class InterceptedStreamUnaryCall(_InterceptedUnaryResponseMixin, 

716 _InterceptedStreamRequestMixin, 

717 InterceptedCall, _base_call.StreamUnaryCall): 

718 """Used for running a `StreamUnaryCall` wrapped by interceptors. 

719 

720 For the `__await__` method is it is proxied to the intercepted call only when 

721 the interceptor task is finished. 

722 """ 

723 

724 _loop: asyncio.AbstractEventLoop 

725 _channel: cygrpc.AioChannel 

726 

727 # pylint: disable=too-many-arguments 

728 def __init__(self, interceptors: Sequence[StreamUnaryClientInterceptor], 

729 request_iterator: Optional[RequestIterableType], 

730 timeout: Optional[float], metadata: Metadata, 

731 credentials: Optional[grpc.CallCredentials], 

732 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, 

733 method: bytes, request_serializer: SerializingFunction, 

734 response_deserializer: DeserializingFunction, 

735 loop: asyncio.AbstractEventLoop) -> None: 

736 self._loop = loop 

737 self._channel = channel 

738 request_iterator = self._init_stream_request_mixin(request_iterator) 

739 interceptors_task = loop.create_task( 

740 self._invoke(interceptors, method, timeout, metadata, credentials, 

741 wait_for_ready, request_iterator, request_serializer, 

742 response_deserializer)) 

743 super().__init__(interceptors_task) 

744 

745 # pylint: disable=too-many-arguments 

746 async def _invoke( 

747 self, interceptors: Sequence[StreamUnaryClientInterceptor], 

748 method: bytes, timeout: Optional[float], 

749 metadata: Optional[Metadata], 

750 credentials: Optional[grpc.CallCredentials], 

751 wait_for_ready: Optional[bool], 

752 request_iterator: RequestIterableType, 

753 request_serializer: SerializingFunction, 

754 response_deserializer: DeserializingFunction) -> StreamUnaryCall: 

755 """Run the RPC call wrapped in interceptors""" 

756 

757 async def _run_interceptor( 

758 interceptors: Iterator[UnaryUnaryClientInterceptor], 

759 client_call_details: ClientCallDetails, 

760 request_iterator: RequestIterableType 

761 ) -> _base_call.StreamUnaryCall: 

762 

763 if interceptors: 

764 continuation = functools.partial(_run_interceptor, 

765 interceptors[1:]) 

766 

767 return await interceptors[0].intercept_stream_unary( 

768 continuation, client_call_details, request_iterator) 

769 else: 

770 return StreamUnaryCall( 

771 request_iterator, 

772 _timeout_to_deadline(client_call_details.timeout), 

773 client_call_details.metadata, 

774 client_call_details.credentials, 

775 client_call_details.wait_for_ready, self._channel, 

776 client_call_details.method, request_serializer, 

777 response_deserializer, self._loop) 

778 

779 client_call_details = ClientCallDetails(method, timeout, metadata, 

780 credentials, wait_for_ready) 

781 return await _run_interceptor(list(interceptors), client_call_details, 

782 request_iterator) 

783 

784 def time_remaining(self) -> Optional[float]: 

785 raise NotImplementedError() 

786 

787 

788class InterceptedStreamStreamCall(_InterceptedStreamResponseMixin, 

789 _InterceptedStreamRequestMixin, 

790 InterceptedCall, _base_call.StreamStreamCall): 

791 """Used for running a `StreamStreamCall` wrapped by interceptors.""" 

792 

793 _loop: asyncio.AbstractEventLoop 

794 _channel: cygrpc.AioChannel 

795 _last_returned_call_from_interceptors = Optional[_base_call.UnaryStreamCall] 

796 

797 # pylint: disable=too-many-arguments 

798 def __init__(self, interceptors: Sequence[StreamStreamClientInterceptor], 

799 request_iterator: Optional[RequestIterableType], 

800 timeout: Optional[float], metadata: Metadata, 

801 credentials: Optional[grpc.CallCredentials], 

802 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, 

803 method: bytes, request_serializer: SerializingFunction, 

804 response_deserializer: DeserializingFunction, 

805 loop: asyncio.AbstractEventLoop) -> None: 

806 self._loop = loop 

807 self._channel = channel 

808 self._init_stream_response_mixin() 

809 request_iterator = self._init_stream_request_mixin(request_iterator) 

810 self._last_returned_call_from_interceptors = None 

811 interceptors_task = loop.create_task( 

812 self._invoke(interceptors, method, timeout, metadata, credentials, 

813 wait_for_ready, request_iterator, request_serializer, 

814 response_deserializer)) 

815 super().__init__(interceptors_task) 

816 

817 # pylint: disable=too-many-arguments 

818 async def _invoke( 

819 self, interceptors: Sequence[StreamStreamClientInterceptor], 

820 method: bytes, timeout: Optional[float], 

821 metadata: Optional[Metadata], 

822 credentials: Optional[grpc.CallCredentials], 

823 wait_for_ready: Optional[bool], 

824 request_iterator: RequestIterableType, 

825 request_serializer: SerializingFunction, 

826 response_deserializer: DeserializingFunction) -> StreamStreamCall: 

827 """Run the RPC call wrapped in interceptors""" 

828 

829 async def _run_interceptor( 

830 interceptors: List[StreamStreamClientInterceptor], 

831 client_call_details: ClientCallDetails, 

832 request_iterator: RequestIterableType 

833 ) -> _base_call.StreamStreamCall: 

834 

835 if interceptors: 

836 continuation = functools.partial(_run_interceptor, 

837 interceptors[1:]) 

838 

839 call_or_response_iterator = await interceptors[ 

840 0].intercept_stream_stream(continuation, 

841 client_call_details, 

842 request_iterator) 

843 

844 if isinstance(call_or_response_iterator, 

845 _base_call.StreamStreamCall): 

846 self._last_returned_call_from_interceptors = call_or_response_iterator 

847 else: 

848 self._last_returned_call_from_interceptors = StreamStreamCallResponseIterator( 

849 self._last_returned_call_from_interceptors, 

850 call_or_response_iterator) 

851 return self._last_returned_call_from_interceptors 

852 else: 

853 self._last_returned_call_from_interceptors = StreamStreamCall( 

854 request_iterator, 

855 _timeout_to_deadline(client_call_details.timeout), 

856 client_call_details.metadata, 

857 client_call_details.credentials, 

858 client_call_details.wait_for_ready, self._channel, 

859 client_call_details.method, request_serializer, 

860 response_deserializer, self._loop) 

861 return self._last_returned_call_from_interceptors 

862 

863 client_call_details = ClientCallDetails(method, timeout, metadata, 

864 credentials, wait_for_ready) 

865 return await _run_interceptor(list(interceptors), client_call_details, 

866 request_iterator) 

867 

868 def time_remaining(self) -> Optional[float]: 

869 raise NotImplementedError() 

870 

871 

872class UnaryUnaryCallResponse(_base_call.UnaryUnaryCall): 

873 """Final UnaryUnaryCall class finished with a response.""" 

874 _response: ResponseType 

875 

876 def __init__(self, response: ResponseType) -> None: 

877 self._response = response 

878 

879 def cancel(self) -> bool: 

880 return False 

881 

882 def cancelled(self) -> bool: 

883 return False 

884 

885 def done(self) -> bool: 

886 return True 

887 

888 def add_done_callback(self, unused_callback) -> None: 

889 raise NotImplementedError() 

890 

891 def time_remaining(self) -> Optional[float]: 

892 raise NotImplementedError() 

893 

894 async def initial_metadata(self) -> Optional[Metadata]: 

895 return None 

896 

897 async def trailing_metadata(self) -> Optional[Metadata]: 

898 return None 

899 

900 async def code(self) -> grpc.StatusCode: 

901 return grpc.StatusCode.OK 

902 

903 async def details(self) -> str: 

904 return '' 

905 

906 async def debug_error_string(self) -> Optional[str]: 

907 return None 

908 

909 def __await__(self): 

910 if False: # pylint: disable=using-constant-test 

911 # This code path is never used, but a yield statement is needed 

912 # for telling the interpreter that __await__ is a generator. 

913 yield None 

914 return self._response 

915 

916 async def wait_for_connection(self) -> None: 

917 pass 

918 

919 

920class _StreamCallResponseIterator: 

921 

922 _call: Union[_base_call.UnaryStreamCall, _base_call.StreamStreamCall] 

923 _response_iterator: AsyncIterable[ResponseType] 

924 

925 def __init__(self, call: Union[_base_call.UnaryStreamCall, 

926 _base_call.StreamStreamCall], 

927 response_iterator: AsyncIterable[ResponseType]) -> None: 

928 self._response_iterator = response_iterator 

929 self._call = call 

930 

931 def cancel(self) -> bool: 

932 return self._call.cancel() 

933 

934 def cancelled(self) -> bool: 

935 return self._call.cancelled() 

936 

937 def done(self) -> bool: 

938 return self._call.done() 

939 

940 def add_done_callback(self, callback) -> None: 

941 self._call.add_done_callback(callback) 

942 

943 def time_remaining(self) -> Optional[float]: 

944 return self._call.time_remaining() 

945 

946 async def initial_metadata(self) -> Optional[Metadata]: 

947 return await self._call.initial_metadata() 

948 

949 async def trailing_metadata(self) -> Optional[Metadata]: 

950 return await self._call.trailing_metadata() 

951 

952 async def code(self) -> grpc.StatusCode: 

953 return await self._call.code() 

954 

955 async def details(self) -> str: 

956 return await self._call.details() 

957 

958 async def debug_error_string(self) -> Optional[str]: 

959 return await self._call.debug_error_string() 

960 

961 def __aiter__(self): 

962 return self._response_iterator.__aiter__() 

963 

964 async def wait_for_connection(self) -> None: 

965 return await self._call.wait_for_connection() 

966 

967 

968class UnaryStreamCallResponseIterator(_StreamCallResponseIterator, 

969 _base_call.UnaryStreamCall): 

970 """UnaryStreamCall class wich uses an alternative response iterator.""" 

971 

972 async def read(self) -> ResponseType: 

973 # Behind the scenes everyting goes through the 

974 # async iterator. So this path should not be reached. 

975 raise NotImplementedError() 

976 

977 

978class StreamStreamCallResponseIterator(_StreamCallResponseIterator, 

979 _base_call.StreamStreamCall): 

980 """StreamStreamCall class wich uses an alternative response iterator.""" 

981 

982 async def read(self) -> ResponseType: 

983 # Behind the scenes everyting goes through the 

984 # async iterator. So this path should not be reached. 

985 raise NotImplementedError() 

986 

987 async def write(self, request: RequestType) -> None: 

988 # Behind the scenes everyting goes through the 

989 # async iterator provided by the InterceptedStreamStreamCall. 

990 # So this path should not be reached. 

991 raise NotImplementedError() 

992 

993 async def done_writing(self) -> None: 

994 # Behind the scenes everyting goes through the 

995 # async iterator provided by the InterceptedStreamStreamCall. 

996 # So this path should not be reached. 

997 raise NotImplementedError() 

998 

999 @property 

1000 def _done_writing_flag(self) -> bool: 

1001 return self._call._done_writing_flag