Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_interceptor.py: 38%

398 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 07:30 +0000

1# Copyright 2019 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Interceptors implementation of gRPC Asyncio Python.""" 

15from abc import ABCMeta 

16from abc import abstractmethod 

17import asyncio 

18import collections 

19import functools 

20from typing import (AsyncIterable, Awaitable, Callable, Iterator, Optional, 

21 Sequence, Union) 

22 

23import grpc 

24from grpc._cython import cygrpc 

25 

26from . import _base_call 

27from ._call import AioRpcError 

28from ._call import StreamStreamCall 

29from ._call import StreamUnaryCall 

30from ._call import UnaryStreamCall 

31from ._call import UnaryUnaryCall 

32from ._call import _API_STYLE_ERROR 

33from ._call import _RPC_ALREADY_FINISHED_DETAILS 

34from ._call import _RPC_HALF_CLOSED_DETAILS 

35from ._metadata import Metadata 

36from ._typing import DeserializingFunction 

37from ._typing import DoneCallbackType 

38from ._typing import RequestIterableType 

39from ._typing import RequestType 

40from ._typing import ResponseIterableType 

41from ._typing import ResponseType 

42from ._typing import SerializingFunction 

43from ._utils import _timeout_to_deadline 

44 

45_LOCAL_CANCELLATION_DETAILS = 'Locally cancelled by application!' 

46 

47 

48class ServerInterceptor(metaclass=ABCMeta): 

49 """Affords intercepting incoming RPCs on the service-side. 

50 

51 This is an EXPERIMENTAL API. 

52 """ 

53 

54 @abstractmethod 

55 async def intercept_service( 

56 self, continuation: Callable[[grpc.HandlerCallDetails], 

57 Awaitable[grpc.RpcMethodHandler]], 

58 handler_call_details: grpc.HandlerCallDetails 

59 ) -> grpc.RpcMethodHandler: 

60 """Intercepts incoming RPCs before handing them over to a handler. 

61 

62 Args: 

63 continuation: A function that takes a HandlerCallDetails and 

64 proceeds to invoke the next interceptor in the chain, if any, 

65 or the RPC handler lookup logic, with the call details passed 

66 as an argument, and returns an RpcMethodHandler instance if 

67 the RPC is considered serviced, or None otherwise. 

68 handler_call_details: A HandlerCallDetails describing the RPC. 

69 

70 Returns: 

71 An RpcMethodHandler with which the RPC may be serviced if the 

72 interceptor chooses to service this RPC, or None otherwise. 

73 """ 

74 

75 

76class ClientCallDetails( 

77 collections.namedtuple( 

78 'ClientCallDetails', 

79 ('method', 'timeout', 'metadata', 'credentials', 'wait_for_ready')), 

80 grpc.ClientCallDetails): 

81 """Describes an RPC to be invoked. 

82 

83 This is an EXPERIMENTAL API. 

84 

85 Args: 

86 method: The method name of the RPC. 

87 timeout: An optional duration of time in seconds to allow for the RPC. 

88 metadata: Optional metadata to be transmitted to the service-side of 

89 the RPC. 

90 credentials: An optional CallCredentials for the RPC. 

91 wait_for_ready: This is an EXPERIMENTAL argument. An optional 

92 flag to enable :term:`wait_for_ready` mechanism. 

93 """ 

94 

95 method: str 

96 timeout: Optional[float] 

97 metadata: Optional[Metadata] 

98 credentials: Optional[grpc.CallCredentials] 

99 wait_for_ready: Optional[bool] 

100 

101 

102class ClientInterceptor(metaclass=ABCMeta): 

103 """Base class used for all Aio Client Interceptor classes""" 

104 

105 

106class UnaryUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

107 """Affords intercepting unary-unary invocations.""" 

108 

109 @abstractmethod 

110 async def intercept_unary_unary( 

111 self, continuation: Callable[[ClientCallDetails, RequestType], 

112 UnaryUnaryCall], 

113 client_call_details: ClientCallDetails, 

114 request: RequestType) -> Union[UnaryUnaryCall, ResponseType]: 

115 """Intercepts a unary-unary invocation asynchronously. 

116 

117 Args: 

118 continuation: A coroutine that proceeds with the invocation by 

119 executing the next interceptor in the chain or invoking the 

120 actual RPC on the underlying Channel. It is the interceptor's 

121 responsibility to call it if it decides to move the RPC forward. 

122 The interceptor can use 

123 `call = await continuation(client_call_details, request)` 

124 to continue with the RPC. `continuation` returns the call to the 

125 RPC. 

126 client_call_details: A ClientCallDetails object describing the 

127 outgoing RPC. 

128 request: The request value for the RPC. 

129 

130 Returns: 

131 An object with the RPC response. 

132 

133 Raises: 

134 AioRpcError: Indicating that the RPC terminated with non-OK status. 

135 asyncio.CancelledError: Indicating that the RPC was canceled. 

136 """ 

137 

138 

139class UnaryStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

140 """Affords intercepting unary-stream invocations.""" 

141 

142 @abstractmethod 

143 async def intercept_unary_stream( 

144 self, continuation: Callable[[ClientCallDetails, RequestType], 

145 UnaryStreamCall], 

146 client_call_details: ClientCallDetails, request: RequestType 

147 ) -> Union[ResponseIterableType, UnaryStreamCall]: 

148 """Intercepts a unary-stream invocation asynchronously. 

149 

150 The function could return the call object or an asynchronous 

151 iterator, in case of being an asyncrhonous iterator this will 

152 become the source of the reads done by the caller. 

153 

154 Args: 

155 continuation: A coroutine that proceeds with the invocation by 

156 executing the next interceptor in the chain or invoking the 

157 actual RPC on the underlying Channel. It is the interceptor's 

158 responsibility to call it if it decides to move the RPC forward. 

159 The interceptor can use 

160 `call = await continuation(client_call_details, request)` 

161 to continue with the RPC. `continuation` returns the call to the 

162 RPC. 

163 client_call_details: A ClientCallDetails object describing the 

164 outgoing RPC. 

165 request: The request value for the RPC. 

166 

167 Returns: 

168 The RPC Call or an asynchronous iterator. 

169 

170 Raises: 

171 AioRpcError: Indicating that the RPC terminated with non-OK status. 

172 asyncio.CancelledError: Indicating that the RPC was canceled. 

173 """ 

174 

175 

176class StreamUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

177 """Affords intercepting stream-unary invocations.""" 

178 

179 @abstractmethod 

180 async def intercept_stream_unary( 

181 self, 

182 continuation: Callable[[ClientCallDetails, RequestType], 

183 StreamUnaryCall], 

184 client_call_details: ClientCallDetails, 

185 request_iterator: RequestIterableType, 

186 ) -> StreamUnaryCall: 

187 """Intercepts a stream-unary invocation asynchronously. 

188 

189 Within the interceptor the usage of the call methods like `write` or 

190 even awaiting the call should be done carefully, since the caller 

191 could be expecting an untouched call, for example for start writing 

192 messages to it. 

193 

194 Args: 

195 continuation: A coroutine that proceeds with the invocation by 

196 executing the next interceptor in the chain or invoking the 

197 actual RPC on the underlying Channel. It is the interceptor's 

198 responsibility to call it if it decides to move the RPC forward. 

199 The interceptor can use 

200 `call = await continuation(client_call_details, request_iterator)` 

201 to continue with the RPC. `continuation` returns the call to the 

202 RPC. 

203 client_call_details: A ClientCallDetails object describing the 

204 outgoing RPC. 

205 request_iterator: The request iterator that will produce requests 

206 for the RPC. 

207 

208 Returns: 

209 The RPC Call. 

210 

211 Raises: 

212 AioRpcError: Indicating that the RPC terminated with non-OK status. 

213 asyncio.CancelledError: Indicating that the RPC was canceled. 

214 """ 

215 

216 

217class StreamStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta): 

218 """Affords intercepting stream-stream invocations.""" 

219 

220 @abstractmethod 

221 async def intercept_stream_stream( 

222 self, 

223 continuation: Callable[[ClientCallDetails, RequestType], 

224 StreamStreamCall], 

225 client_call_details: ClientCallDetails, 

226 request_iterator: RequestIterableType, 

227 ) -> Union[ResponseIterableType, StreamStreamCall]: 

228 """Intercepts a stream-stream invocation asynchronously. 

229 

230 Within the interceptor the usage of the call methods like `write` or 

231 even awaiting the call should be done carefully, since the caller 

232 could be expecting an untouched call, for example for start writing 

233 messages to it. 

234 

235 The function could return the call object or an asynchronous 

236 iterator, in case of being an asyncrhonous iterator this will 

237 become the source of the reads done by the caller. 

238 

239 Args: 

240 continuation: A coroutine that proceeds with the invocation by 

241 executing the next interceptor in the chain or invoking the 

242 actual RPC on the underlying Channel. It is the interceptor's 

243 responsibility to call it if it decides to move the RPC forward. 

244 The interceptor can use 

245 `call = await continuation(client_call_details, request_iterator)` 

246 to continue with the RPC. `continuation` returns the call to the 

247 RPC. 

248 client_call_details: A ClientCallDetails object describing the 

249 outgoing RPC. 

250 request_iterator: The request iterator that will produce requests 

251 for the RPC. 

252 

253 Returns: 

254 The RPC Call or an asynchronous iterator. 

255 

256 Raises: 

257 AioRpcError: Indicating that the RPC terminated with non-OK status. 

258 asyncio.CancelledError: Indicating that the RPC was canceled. 

259 """ 

260 

261 

262class InterceptedCall: 

263 """Base implementation for all intercepted call arities. 

264 

265 Interceptors might have some work to do before the RPC invocation with 

266 the capacity of changing the invocation parameters, and some work to do 

267 after the RPC invocation with the capacity for accessing to the wrapped 

268 `UnaryUnaryCall`. 

269 

270 It handles also early and later cancellations, when the RPC has not even 

271 started and the execution is still held by the interceptors or when the 

272 RPC has finished but again the execution is still held by the interceptors. 

273 

274 Once the RPC is finally executed, all methods are finally done against the 

275 intercepted call, being at the same time the same call returned to the 

276 interceptors. 

277 

278 As a base class for all of the interceptors implements the logic around 

279 final status, metadata and cancellation. 

280 """ 

281 

282 _interceptors_task: asyncio.Task 

283 _pending_add_done_callbacks: Sequence[DoneCallbackType] 

284 

285 def __init__(self, interceptors_task: asyncio.Task) -> None: 

286 self._interceptors_task = interceptors_task 

287 self._pending_add_done_callbacks = [] 

288 self._interceptors_task.add_done_callback( 

289 self._fire_or_add_pending_done_callbacks) 

290 

291 def __del__(self): 

292 self.cancel() 

293 

294 def _fire_or_add_pending_done_callbacks( 

295 self, interceptors_task: asyncio.Task) -> None: 

296 

297 if not self._pending_add_done_callbacks: 

298 return 

299 

300 call_completed = False 

301 

302 try: 

303 call = interceptors_task.result() 

304 if call.done(): 

305 call_completed = True 

306 except (AioRpcError, asyncio.CancelledError): 

307 call_completed = True 

308 

309 if call_completed: 

310 for callback in self._pending_add_done_callbacks: 

311 callback(self) 

312 else: 

313 for callback in self._pending_add_done_callbacks: 

314 callback = functools.partial(self._wrap_add_done_callback, 

315 callback) 

316 call.add_done_callback(callback) 

317 

318 self._pending_add_done_callbacks = [] 

319 

320 def _wrap_add_done_callback(self, callback: DoneCallbackType, 

321 unused_call: _base_call.Call) -> None: 

322 callback(self) 

323 

324 def cancel(self) -> bool: 

325 if not self._interceptors_task.done(): 

326 # There is no yet the intercepted call available, 

327 # Trying to cancel it by using the generic Asyncio 

328 # cancellation method. 

329 return self._interceptors_task.cancel() 

330 

331 try: 

332 call = self._interceptors_task.result() 

333 except AioRpcError: 

334 return False 

335 except asyncio.CancelledError: 

336 return False 

337 

338 return call.cancel() 

339 

340 def cancelled(self) -> bool: 

341 if not self._interceptors_task.done(): 

342 return False 

343 

344 try: 

345 call = self._interceptors_task.result() 

346 except AioRpcError as err: 

347 return err.code() == grpc.StatusCode.CANCELLED 

348 except asyncio.CancelledError: 

349 return True 

350 

351 return call.cancelled() 

352 

353 def done(self) -> bool: 

354 if not self._interceptors_task.done(): 

355 return False 

356 

357 try: 

358 call = self._interceptors_task.result() 

359 except (AioRpcError, asyncio.CancelledError): 

360 return True 

361 

362 return call.done() 

363 

364 def add_done_callback(self, callback: DoneCallbackType) -> None: 

365 if not self._interceptors_task.done(): 

366 self._pending_add_done_callbacks.append(callback) 

367 return 

368 

369 try: 

370 call = self._interceptors_task.result() 

371 except (AioRpcError, asyncio.CancelledError): 

372 callback(self) 

373 return 

374 

375 if call.done(): 

376 callback(self) 

377 else: 

378 callback = functools.partial(self._wrap_add_done_callback, callback) 

379 call.add_done_callback(callback) 

380 

381 def time_remaining(self) -> Optional[float]: 

382 raise NotImplementedError() 

383 

384 async def initial_metadata(self) -> Optional[Metadata]: 

385 try: 

386 call = await self._interceptors_task 

387 except AioRpcError as err: 

388 return err.initial_metadata() 

389 except asyncio.CancelledError: 

390 return None 

391 

392 return await call.initial_metadata() 

393 

394 async def trailing_metadata(self) -> Optional[Metadata]: 

395 try: 

396 call = await self._interceptors_task 

397 except AioRpcError as err: 

398 return err.trailing_metadata() 

399 except asyncio.CancelledError: 

400 return None 

401 

402 return await call.trailing_metadata() 

403 

404 async def code(self) -> grpc.StatusCode: 

405 try: 

406 call = await self._interceptors_task 

407 except AioRpcError as err: 

408 return err.code() 

409 except asyncio.CancelledError: 

410 return grpc.StatusCode.CANCELLED 

411 

412 return await call.code() 

413 

414 async def details(self) -> str: 

415 try: 

416 call = await self._interceptors_task 

417 except AioRpcError as err: 

418 return err.details() 

419 except asyncio.CancelledError: 

420 return _LOCAL_CANCELLATION_DETAILS 

421 

422 return await call.details() 

423 

424 async def debug_error_string(self) -> Optional[str]: 

425 try: 

426 call = await self._interceptors_task 

427 except AioRpcError as err: 

428 return err.debug_error_string() 

429 except asyncio.CancelledError: 

430 return '' 

431 

432 return await call.debug_error_string() 

433 

434 async def wait_for_connection(self) -> None: 

435 call = await self._interceptors_task 

436 return await call.wait_for_connection() 

437 

438 

439class _InterceptedUnaryResponseMixin: 

440 

441 def __await__(self): 

442 call = yield from self._interceptors_task.__await__() 

443 response = yield from call.__await__() 

444 return response 

445 

446 

447class _InterceptedStreamResponseMixin: 

448 _response_aiter: Optional[AsyncIterable[ResponseType]] 

449 

450 def _init_stream_response_mixin(self) -> None: 

451 # Is initalized later, otherwise if the iterator is not finnally 

452 # consumed a logging warning is emmited by Asyncio. 

453 self._response_aiter = None 

454 

455 async def _wait_for_interceptor_task_response_iterator( 

456 self) -> ResponseType: 

457 call = await self._interceptors_task 

458 async for response in call: 

459 yield response 

460 

461 def __aiter__(self) -> AsyncIterable[ResponseType]: 

462 if self._response_aiter is None: 

463 self._response_aiter = self._wait_for_interceptor_task_response_iterator( 

464 ) 

465 return self._response_aiter 

466 

467 async def read(self) -> ResponseType: 

468 if self._response_aiter is None: 

469 self._response_aiter = self._wait_for_interceptor_task_response_iterator( 

470 ) 

471 return await self._response_aiter.asend(None) 

472 

473 

474class _InterceptedStreamRequestMixin: 

475 

476 _write_to_iterator_async_gen: Optional[AsyncIterable[RequestType]] 

477 _write_to_iterator_queue: Optional[asyncio.Queue] 

478 _status_code_task: Optional[asyncio.Task] 

479 

480 _FINISH_ITERATOR_SENTINEL = object() 

481 

482 def _init_stream_request_mixin( 

483 self, request_iterator: Optional[RequestIterableType] 

484 ) -> RequestIterableType: 

485 

486 if request_iterator is None: 

487 # We provide our own request iterator which is a proxy 

488 # of the futures writes that will be done by the caller. 

489 self._write_to_iterator_queue = asyncio.Queue(maxsize=1) 

490 self._write_to_iterator_async_gen = self._proxy_writes_as_request_iterator( 

491 ) 

492 self._status_code_task = None 

493 request_iterator = self._write_to_iterator_async_gen 

494 else: 

495 self._write_to_iterator_queue = None 

496 

497 return request_iterator 

498 

499 async def _proxy_writes_as_request_iterator(self): 

500 await self._interceptors_task 

501 

502 while True: 

503 value = await self._write_to_iterator_queue.get() 

504 if value is _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL: 

505 break 

506 yield value 

507 

508 async def _write_to_iterator_queue_interruptible(self, request: RequestType, 

509 call: InterceptedCall): 

510 # Write the specified 'request' to the request iterator queue using the 

511 # specified 'call' to allow for interruption of the write in the case 

512 # of abrupt termination of the call. 

513 if self._status_code_task is None: 

514 self._status_code_task = self._loop.create_task(call.code()) 

515 

516 await asyncio.wait( 

517 (self._loop.create_task(self._write_to_iterator_queue.put(request)), 

518 self._status_code_task), 

519 return_when=asyncio.FIRST_COMPLETED) 

520 

521 async def write(self, request: RequestType) -> None: 

522 # If no queue was created it means that requests 

523 # should be expected through an iterators provided 

524 # by the caller. 

525 if self._write_to_iterator_queue is None: 

526 raise cygrpc.UsageError(_API_STYLE_ERROR) 

527 

528 try: 

529 call = await self._interceptors_task 

530 except (asyncio.CancelledError, AioRpcError): 

531 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

532 

533 if call.done(): 

534 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

535 elif call._done_writing_flag: 

536 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS) 

537 

538 await self._write_to_iterator_queue_interruptible(request, call) 

539 

540 if call.done(): 

541 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

542 

543 async def done_writing(self) -> None: 

544 """Signal peer that client is done writing. 

545 

546 This method is idempotent. 

547 """ 

548 # If no queue was created it means that requests 

549 # should be expected through an iterators provided 

550 # by the caller. 

551 if self._write_to_iterator_queue is None: 

552 raise cygrpc.UsageError(_API_STYLE_ERROR) 

553 

554 try: 

555 call = await self._interceptors_task 

556 except asyncio.CancelledError: 

557 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

558 

559 await self._write_to_iterator_queue_interruptible( 

560 _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL, call) 

561 

562 

563class InterceptedUnaryUnaryCall(_InterceptedUnaryResponseMixin, InterceptedCall, 

564 _base_call.UnaryUnaryCall): 

565 """Used for running a `UnaryUnaryCall` wrapped by interceptors. 

566 

567 For the `__await__` method is it is proxied to the intercepted call only when 

568 the interceptor task is finished. 

569 """ 

570 

571 _loop: asyncio.AbstractEventLoop 

572 _channel: cygrpc.AioChannel 

573 

574 # pylint: disable=too-many-arguments 

575 def __init__(self, interceptors: Sequence[UnaryUnaryClientInterceptor], 

576 request: RequestType, timeout: Optional[float], 

577 metadata: Metadata, 

578 credentials: Optional[grpc.CallCredentials], 

579 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, 

580 method: bytes, request_serializer: SerializingFunction, 

581 response_deserializer: DeserializingFunction, 

582 loop: asyncio.AbstractEventLoop) -> None: 

583 self._loop = loop 

584 self._channel = channel 

585 interceptors_task = loop.create_task( 

586 self._invoke(interceptors, method, timeout, metadata, credentials, 

587 wait_for_ready, request, request_serializer, 

588 response_deserializer)) 

589 super().__init__(interceptors_task) 

590 

591 # pylint: disable=too-many-arguments 

592 async def _invoke( 

593 self, interceptors: Sequence[UnaryUnaryClientInterceptor], 

594 method: bytes, timeout: Optional[float], 

595 metadata: Optional[Metadata], 

596 credentials: Optional[grpc.CallCredentials], 

597 wait_for_ready: Optional[bool], request: RequestType, 

598 request_serializer: SerializingFunction, 

599 response_deserializer: DeserializingFunction) -> UnaryUnaryCall: 

600 """Run the RPC call wrapped in interceptors""" 

601 

602 async def _run_interceptor( 

603 interceptors: Iterator[UnaryUnaryClientInterceptor], 

604 client_call_details: ClientCallDetails, 

605 request: RequestType) -> _base_call.UnaryUnaryCall: 

606 

607 interceptor = next(interceptors, None) 

608 

609 if interceptor: 

610 continuation = functools.partial(_run_interceptor, interceptors) 

611 

612 call_or_response = await interceptor.intercept_unary_unary( 

613 continuation, client_call_details, request) 

614 

615 if isinstance(call_or_response, _base_call.UnaryUnaryCall): 

616 return call_or_response 

617 else: 

618 return UnaryUnaryCallResponse(call_or_response) 

619 

620 else: 

621 return UnaryUnaryCall( 

622 request, _timeout_to_deadline(client_call_details.timeout), 

623 client_call_details.metadata, 

624 client_call_details.credentials, 

625 client_call_details.wait_for_ready, self._channel, 

626 client_call_details.method, request_serializer, 

627 response_deserializer, self._loop) 

628 

629 client_call_details = ClientCallDetails(method, timeout, metadata, 

630 credentials, wait_for_ready) 

631 return await _run_interceptor(iter(interceptors), client_call_details, 

632 request) 

633 

634 def time_remaining(self) -> Optional[float]: 

635 raise NotImplementedError() 

636 

637 

638class InterceptedUnaryStreamCall(_InterceptedStreamResponseMixin, 

639 InterceptedCall, _base_call.UnaryStreamCall): 

640 """Used for running a `UnaryStreamCall` wrapped by interceptors.""" 

641 

642 _loop: asyncio.AbstractEventLoop 

643 _channel: cygrpc.AioChannel 

644 _last_returned_call_from_interceptors = Optional[_base_call.UnaryStreamCall] 

645 

646 # pylint: disable=too-many-arguments 

647 def __init__(self, interceptors: Sequence[UnaryStreamClientInterceptor], 

648 request: RequestType, timeout: Optional[float], 

649 metadata: Metadata, 

650 credentials: Optional[grpc.CallCredentials], 

651 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, 

652 method: bytes, request_serializer: SerializingFunction, 

653 response_deserializer: DeserializingFunction, 

654 loop: asyncio.AbstractEventLoop) -> None: 

655 self._loop = loop 

656 self._channel = channel 

657 self._init_stream_response_mixin() 

658 self._last_returned_call_from_interceptors = None 

659 interceptors_task = loop.create_task( 

660 self._invoke(interceptors, method, timeout, metadata, credentials, 

661 wait_for_ready, request, request_serializer, 

662 response_deserializer)) 

663 super().__init__(interceptors_task) 

664 

665 # pylint: disable=too-many-arguments 

666 async def _invoke( 

667 self, interceptors: Sequence[UnaryUnaryClientInterceptor], 

668 method: bytes, timeout: Optional[float], 

669 metadata: Optional[Metadata], 

670 credentials: Optional[grpc.CallCredentials], 

671 wait_for_ready: Optional[bool], request: RequestType, 

672 request_serializer: SerializingFunction, 

673 response_deserializer: DeserializingFunction) -> UnaryStreamCall: 

674 """Run the RPC call wrapped in interceptors""" 

675 

676 async def _run_interceptor( 

677 interceptors: Iterator[UnaryStreamClientInterceptor], 

678 client_call_details: ClientCallDetails, 

679 request: RequestType, 

680 ) -> _base_call.UnaryUnaryCall: 

681 

682 interceptor = next(interceptors, None) 

683 

684 if interceptor: 

685 continuation = functools.partial(_run_interceptor, interceptors) 

686 

687 call_or_response_iterator = await interceptor.intercept_unary_stream( 

688 continuation, client_call_details, request) 

689 

690 if isinstance(call_or_response_iterator, 

691 _base_call.UnaryStreamCall): 

692 self._last_returned_call_from_interceptors = call_or_response_iterator 

693 else: 

694 self._last_returned_call_from_interceptors = UnaryStreamCallResponseIterator( 

695 self._last_returned_call_from_interceptors, 

696 call_or_response_iterator) 

697 return self._last_returned_call_from_interceptors 

698 else: 

699 self._last_returned_call_from_interceptors = UnaryStreamCall( 

700 request, _timeout_to_deadline(client_call_details.timeout), 

701 client_call_details.metadata, 

702 client_call_details.credentials, 

703 client_call_details.wait_for_ready, self._channel, 

704 client_call_details.method, request_serializer, 

705 response_deserializer, self._loop) 

706 

707 return self._last_returned_call_from_interceptors 

708 

709 client_call_details = ClientCallDetails(method, timeout, metadata, 

710 credentials, wait_for_ready) 

711 return await _run_interceptor(iter(interceptors), client_call_details, 

712 request) 

713 

714 def time_remaining(self) -> Optional[float]: 

715 raise NotImplementedError() 

716 

717 

718class InterceptedStreamUnaryCall(_InterceptedUnaryResponseMixin, 

719 _InterceptedStreamRequestMixin, 

720 InterceptedCall, _base_call.StreamUnaryCall): 

721 """Used for running a `StreamUnaryCall` wrapped by interceptors. 

722 

723 For the `__await__` method is it is proxied to the intercepted call only when 

724 the interceptor task is finished. 

725 """ 

726 

727 _loop: asyncio.AbstractEventLoop 

728 _channel: cygrpc.AioChannel 

729 

730 # pylint: disable=too-many-arguments 

731 def __init__(self, interceptors: Sequence[StreamUnaryClientInterceptor], 

732 request_iterator: Optional[RequestIterableType], 

733 timeout: Optional[float], metadata: Metadata, 

734 credentials: Optional[grpc.CallCredentials], 

735 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, 

736 method: bytes, request_serializer: SerializingFunction, 

737 response_deserializer: DeserializingFunction, 

738 loop: asyncio.AbstractEventLoop) -> None: 

739 self._loop = loop 

740 self._channel = channel 

741 request_iterator = self._init_stream_request_mixin(request_iterator) 

742 interceptors_task = loop.create_task( 

743 self._invoke(interceptors, method, timeout, metadata, credentials, 

744 wait_for_ready, request_iterator, request_serializer, 

745 response_deserializer)) 

746 super().__init__(interceptors_task) 

747 

748 # pylint: disable=too-many-arguments 

749 async def _invoke( 

750 self, interceptors: Sequence[StreamUnaryClientInterceptor], 

751 method: bytes, timeout: Optional[float], 

752 metadata: Optional[Metadata], 

753 credentials: Optional[grpc.CallCredentials], 

754 wait_for_ready: Optional[bool], 

755 request_iterator: RequestIterableType, 

756 request_serializer: SerializingFunction, 

757 response_deserializer: DeserializingFunction) -> StreamUnaryCall: 

758 """Run the RPC call wrapped in interceptors""" 

759 

760 async def _run_interceptor( 

761 interceptors: Iterator[UnaryUnaryClientInterceptor], 

762 client_call_details: ClientCallDetails, 

763 request_iterator: RequestIterableType 

764 ) -> _base_call.StreamUnaryCall: 

765 

766 interceptor = next(interceptors, None) 

767 

768 if interceptor: 

769 continuation = functools.partial(_run_interceptor, interceptors) 

770 

771 return await interceptor.intercept_stream_unary( 

772 continuation, client_call_details, request_iterator) 

773 else: 

774 return StreamUnaryCall( 

775 request_iterator, 

776 _timeout_to_deadline(client_call_details.timeout), 

777 client_call_details.metadata, 

778 client_call_details.credentials, 

779 client_call_details.wait_for_ready, self._channel, 

780 client_call_details.method, request_serializer, 

781 response_deserializer, self._loop) 

782 

783 client_call_details = ClientCallDetails(method, timeout, metadata, 

784 credentials, wait_for_ready) 

785 return await _run_interceptor(iter(interceptors), client_call_details, 

786 request_iterator) 

787 

788 def time_remaining(self) -> Optional[float]: 

789 raise NotImplementedError() 

790 

791 

792class InterceptedStreamStreamCall(_InterceptedStreamResponseMixin, 

793 _InterceptedStreamRequestMixin, 

794 InterceptedCall, _base_call.StreamStreamCall): 

795 """Used for running a `StreamStreamCall` wrapped by interceptors.""" 

796 

797 _loop: asyncio.AbstractEventLoop 

798 _channel: cygrpc.AioChannel 

799 _last_returned_call_from_interceptors = Optional[_base_call.UnaryStreamCall] 

800 

801 # pylint: disable=too-many-arguments 

802 def __init__(self, interceptors: Sequence[StreamStreamClientInterceptor], 

803 request_iterator: Optional[RequestIterableType], 

804 timeout: Optional[float], metadata: Metadata, 

805 credentials: Optional[grpc.CallCredentials], 

806 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, 

807 method: bytes, request_serializer: SerializingFunction, 

808 response_deserializer: DeserializingFunction, 

809 loop: asyncio.AbstractEventLoop) -> None: 

810 self._loop = loop 

811 self._channel = channel 

812 self._init_stream_response_mixin() 

813 request_iterator = self._init_stream_request_mixin(request_iterator) 

814 self._last_returned_call_from_interceptors = None 

815 interceptors_task = loop.create_task( 

816 self._invoke(interceptors, method, timeout, metadata, credentials, 

817 wait_for_ready, request_iterator, request_serializer, 

818 response_deserializer)) 

819 super().__init__(interceptors_task) 

820 

821 # pylint: disable=too-many-arguments 

822 async def _invoke( 

823 self, interceptors: Sequence[StreamStreamClientInterceptor], 

824 method: bytes, timeout: Optional[float], 

825 metadata: Optional[Metadata], 

826 credentials: Optional[grpc.CallCredentials], 

827 wait_for_ready: Optional[bool], 

828 request_iterator: RequestIterableType, 

829 request_serializer: SerializingFunction, 

830 response_deserializer: DeserializingFunction) -> StreamStreamCall: 

831 """Run the RPC call wrapped in interceptors""" 

832 

833 async def _run_interceptor( 

834 interceptors: Iterator[StreamStreamClientInterceptor], 

835 client_call_details: ClientCallDetails, 

836 request_iterator: RequestIterableType 

837 ) -> _base_call.StreamStreamCall: 

838 

839 interceptor = next(interceptors, None) 

840 

841 if interceptor: 

842 continuation = functools.partial(_run_interceptor, interceptors) 

843 

844 call_or_response_iterator = await interceptor.intercept_stream_stream( 

845 continuation, client_call_details, request_iterator) 

846 

847 if isinstance(call_or_response_iterator, 

848 _base_call.StreamStreamCall): 

849 self._last_returned_call_from_interceptors = call_or_response_iterator 

850 else: 

851 self._last_returned_call_from_interceptors = StreamStreamCallResponseIterator( 

852 self._last_returned_call_from_interceptors, 

853 call_or_response_iterator) 

854 return self._last_returned_call_from_interceptors 

855 else: 

856 self._last_returned_call_from_interceptors = StreamStreamCall( 

857 request_iterator, 

858 _timeout_to_deadline(client_call_details.timeout), 

859 client_call_details.metadata, 

860 client_call_details.credentials, 

861 client_call_details.wait_for_ready, self._channel, 

862 client_call_details.method, request_serializer, 

863 response_deserializer, self._loop) 

864 return self._last_returned_call_from_interceptors 

865 

866 client_call_details = ClientCallDetails(method, timeout, metadata, 

867 credentials, wait_for_ready) 

868 return await _run_interceptor(iter(interceptors), client_call_details, 

869 request_iterator) 

870 

871 def time_remaining(self) -> Optional[float]: 

872 raise NotImplementedError() 

873 

874 

875class UnaryUnaryCallResponse(_base_call.UnaryUnaryCall): 

876 """Final UnaryUnaryCall class finished with a response.""" 

877 _response: ResponseType 

878 

879 def __init__(self, response: ResponseType) -> None: 

880 self._response = response 

881 

882 def cancel(self) -> bool: 

883 return False 

884 

885 def cancelled(self) -> bool: 

886 return False 

887 

888 def done(self) -> bool: 

889 return True 

890 

891 def add_done_callback(self, unused_callback) -> None: 

892 raise NotImplementedError() 

893 

894 def time_remaining(self) -> Optional[float]: 

895 raise NotImplementedError() 

896 

897 async def initial_metadata(self) -> Optional[Metadata]: 

898 return None 

899 

900 async def trailing_metadata(self) -> Optional[Metadata]: 

901 return None 

902 

903 async def code(self) -> grpc.StatusCode: 

904 return grpc.StatusCode.OK 

905 

906 async def details(self) -> str: 

907 return '' 

908 

909 async def debug_error_string(self) -> Optional[str]: 

910 return None 

911 

912 def __await__(self): 

913 if False: # pylint: disable=using-constant-test 

914 # This code path is never used, but a yield statement is needed 

915 # for telling the interpreter that __await__ is a generator. 

916 yield None 

917 return self._response 

918 

919 async def wait_for_connection(self) -> None: 

920 pass 

921 

922 

923class _StreamCallResponseIterator: 

924 

925 _call: Union[_base_call.UnaryStreamCall, _base_call.StreamStreamCall] 

926 _response_iterator: AsyncIterable[ResponseType] 

927 

928 def __init__(self, call: Union[_base_call.UnaryStreamCall, 

929 _base_call.StreamStreamCall], 

930 response_iterator: AsyncIterable[ResponseType]) -> None: 

931 self._response_iterator = response_iterator 

932 self._call = call 

933 

934 def cancel(self) -> bool: 

935 return self._call.cancel() 

936 

937 def cancelled(self) -> bool: 

938 return self._call.cancelled() 

939 

940 def done(self) -> bool: 

941 return self._call.done() 

942 

943 def add_done_callback(self, callback) -> None: 

944 self._call.add_done_callback(callback) 

945 

946 def time_remaining(self) -> Optional[float]: 

947 return self._call.time_remaining() 

948 

949 async def initial_metadata(self) -> Optional[Metadata]: 

950 return await self._call.initial_metadata() 

951 

952 async def trailing_metadata(self) -> Optional[Metadata]: 

953 return await self._call.trailing_metadata() 

954 

955 async def code(self) -> grpc.StatusCode: 

956 return await self._call.code() 

957 

958 async def details(self) -> str: 

959 return await self._call.details() 

960 

961 async def debug_error_string(self) -> Optional[str]: 

962 return await self._call.debug_error_string() 

963 

964 def __aiter__(self): 

965 return self._response_iterator.__aiter__() 

966 

967 async def wait_for_connection(self) -> None: 

968 return await self._call.wait_for_connection() 

969 

970 

971class UnaryStreamCallResponseIterator(_StreamCallResponseIterator, 

972 _base_call.UnaryStreamCall): 

973 """UnaryStreamCall class wich uses an alternative response iterator.""" 

974 

975 async def read(self) -> ResponseType: 

976 # Behind the scenes everyting goes through the 

977 # async iterator. So this path should not be reached. 

978 raise NotImplementedError() 

979 

980 

981class StreamStreamCallResponseIterator(_StreamCallResponseIterator, 

982 _base_call.StreamStreamCall): 

983 """StreamStreamCall class wich uses an alternative response iterator.""" 

984 

985 async def read(self) -> ResponseType: 

986 # Behind the scenes everyting goes through the 

987 # async iterator. So this path should not be reached. 

988 raise NotImplementedError() 

989 

990 async def write(self, request: RequestType) -> None: 

991 # Behind the scenes everyting goes through the 

992 # async iterator provided by the InterceptedStreamStreamCall. 

993 # So this path should not be reached. 

994 raise NotImplementedError() 

995 

996 async def done_writing(self) -> None: 

997 # Behind the scenes everyting goes through the 

998 # async iterator provided by the InterceptedStreamStreamCall. 

999 # So this path should not be reached. 

1000 raise NotImplementedError() 

1001 

1002 @property 

1003 def _done_writing_flag(self) -> bool: 

1004 return self._call._done_writing_flag