Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/aio/_call.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

353 statements  

1# Copyright 2019 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Asyncio Python.""" 

15 

16import asyncio 

17import enum 

18from functools import partial 

19import inspect 

20import logging 

21import traceback 

22from typing import ( 

23 Any, 

24 AsyncIterator, 

25 Generator, 

26 Generic, 

27 Optional, 

28 Tuple, 

29 Union, 

30) 

31 

32import grpc 

33from grpc import _common 

34from grpc._cython import cygrpc 

35 

36from . import _base_call 

37from ._metadata import Metadata 

38from ._typing import DeserializingFunction 

39from ._typing import DoneCallbackType 

40from ._typing import EOFType 

41from ._typing import MetadataType 

42from ._typing import MetadatumType 

43from ._typing import RequestIterableType 

44from ._typing import RequestType 

45from ._typing import ResponseType 

46from ._typing import SerializingFunction 

47 

48__all__ = "AioRpcError", "Call", "UnaryStreamCall", "UnaryUnaryCall" 

49 

50_LOCAL_CANCELLATION_DETAILS = "Locally cancelled by application!" 

51_GC_CANCELLATION_DETAILS = "Cancelled upon garbage collection!" 

52_RPC_ALREADY_FINISHED_DETAILS = "RPC already finished." 

53_RPC_HALF_CLOSED_DETAILS = 'RPC is half closed after calling "done_writing".' 

54_API_STYLE_ERROR = ( 

55 "The iterator and read/write APIs may not be mixed on a single RPC." 

56) 

57 

58_OK_CALL_REPRESENTATION = ( 

59 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' 

60) 

61 

62_NON_OK_CALL_REPRESENTATION = ( 

63 "<{} of RPC that terminated with:\n" 

64 "\tstatus = {}\n" 

65 '\tdetails = "{}"\n' 

66 '\tdebug_error_string = "{}"\n' 

67 ">" 

68) 

69 

70_LOGGER = logging.getLogger(__name__) 

71 

72 

73class AioRpcError(grpc.RpcError): 

74 """An implementation of RpcError to be used by the asynchronous API. 

75 

76 Raised RpcError is a snapshot of the final status of the RPC, values are 

77 determined. Hence, its methods no longer needs to be coroutines. 

78 """ 

79 

80 _code: grpc.StatusCode 

81 _details: Optional[str] 

82 _initial_metadata: Optional[Metadata] 

83 _trailing_metadata: Optional[Metadata] 

84 _debug_error_string: Optional[str] 

85 

86 def __init__( 

87 self, 

88 code: grpc.StatusCode, 

89 initial_metadata: Metadata, 

90 trailing_metadata: Metadata, 

91 details: Optional[str] = None, 

92 debug_error_string: Optional[str] = None, 

93 ) -> None: 

94 """Constructor. 

95 

96 Args: 

97 code: The status code with which the RPC has been finalized. 

98 initial_metadata: Optional initial metadata that could be sent by the 

99 Server. 

100 trailing_metadata: Optional metadata that could be sent by the Server. 

101 details: Optional details explaining the reason of the error. 

102 debug_error_string: Optional string 

103 """ 

104 super().__init__() 

105 self._code = code 

106 self._details = details 

107 self._initial_metadata = initial_metadata 

108 self._trailing_metadata = trailing_metadata 

109 self._debug_error_string = debug_error_string 

110 

111 def code(self) -> grpc.StatusCode: 

112 """Accesses the status code sent by the server. 

113 

114 Returns: 

115 The `grpc.StatusCode` status code. 

116 """ 

117 return self._code 

118 

119 def details(self) -> Optional[str]: 

120 """Accesses the details sent by the server. 

121 

122 Returns: 

123 The description of the error. 

124 """ 

125 return self._details 

126 

127 def initial_metadata(self) -> Metadata: 

128 """Accesses the initial metadata sent by the server. 

129 

130 Returns: 

131 The initial metadata received. 

132 """ 

133 return self._initial_metadata 

134 

135 def trailing_metadata(self) -> Metadata: 

136 """Accesses the trailing metadata sent by the server. 

137 

138 Returns: 

139 The trailing metadata received. 

140 """ 

141 return self._trailing_metadata 

142 

143 def debug_error_string(self) -> str: 

144 """Accesses the debug error string sent by the server. 

145 

146 Returns: 

147 The debug error string received. 

148 """ 

149 return self._debug_error_string 

150 

151 def _repr(self) -> str: 

152 """Assembles the error string for the RPC error.""" 

153 return _NON_OK_CALL_REPRESENTATION.format( 

154 self.__class__.__name__, 

155 self._code, 

156 self._details, 

157 self._debug_error_string, 

158 ) 

159 

160 def __repr__(self) -> str: 

161 return self._repr() 

162 

163 def __str__(self) -> str: 

164 return self._repr() 

165 

166 def __reduce__(self): 

167 return ( 

168 type(self), 

169 ( 

170 self._code, 

171 self._initial_metadata, 

172 self._trailing_metadata, 

173 self._details, 

174 self._debug_error_string, 

175 ), 

176 ) 

177 

178 

179def _create_rpc_error( 

180 initial_metadata: MetadataType, 

181 status: cygrpc.AioRpcStatus, 

182) -> AioRpcError: 

183 return AioRpcError( 

184 _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()], 

185 Metadata.from_tuple(initial_metadata), 

186 Metadata.from_tuple(status.trailing_metadata()), 

187 details=status.details(), 

188 debug_error_string=status.debug_error_string(), 

189 ) 

190 

191 

192class Call: 

193 """Base implementation of client RPC Call object. 

194 

195 Implements logic around final status, metadata and cancellation. 

196 """ 

197 

198 _loop: asyncio.AbstractEventLoop 

199 _code: grpc.StatusCode 

200 _cython_call: cygrpc._AioCall 

201 _metadata: Tuple[MetadatumType, ...] 

202 _request_serializer: Optional[SerializingFunction] 

203 _response_deserializer: Optional[DeserializingFunction] 

204 

205 def __init__( 

206 self, 

207 cython_call: cygrpc._AioCall, 

208 metadata: Metadata, 

209 request_serializer: Optional[SerializingFunction], 

210 response_deserializer: Optional[DeserializingFunction], 

211 loop: asyncio.AbstractEventLoop, 

212 ) -> None: 

213 self._loop = loop 

214 self._cython_call = cython_call 

215 self._metadata = tuple(metadata) 

216 self._request_serializer = request_serializer 

217 self._response_deserializer = response_deserializer 

218 

219 def __del__(self) -> None: 

220 # The '_cython_call' object might be destructed before Call object 

221 if hasattr(self, "_cython_call") and not self._cython_call.done(): 

222 self._cancel(_GC_CANCELLATION_DETAILS) 

223 

224 def cancelled(self) -> bool: 

225 return self._cython_call.cancelled() 

226 

227 def _cancel(self, details: str) -> bool: 

228 """Forwards the application cancellation reasoning.""" 

229 if not self._cython_call.done(): 

230 self._cython_call.cancel(details) 

231 return True 

232 return False 

233 

234 def cancel(self) -> bool: 

235 return self._cancel(_LOCAL_CANCELLATION_DETAILS) 

236 

237 def done(self) -> bool: 

238 return self._cython_call.done() 

239 

240 def add_done_callback(self, callback: DoneCallbackType) -> None: 

241 cb = partial(callback, self) 

242 self._cython_call.add_done_callback(cb) 

243 

244 def time_remaining(self) -> Optional[float]: 

245 return self._cython_call.time_remaining() 

246 

247 async def initial_metadata(self) -> Metadata: 

248 raw_metadata_tuple = await self._cython_call.initial_metadata() 

249 return Metadata.from_tuple(raw_metadata_tuple) 

250 

251 async def trailing_metadata(self) -> Metadata: 

252 raw_metadata_tuple = ( 

253 await self._cython_call.status() 

254 ).trailing_metadata() 

255 return Metadata.from_tuple(raw_metadata_tuple) 

256 

257 async def code(self) -> grpc.StatusCode: 

258 cygrpc_code = (await self._cython_call.status()).code() 

259 return _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[cygrpc_code] 

260 

261 async def details(self) -> str: 

262 return (await self._cython_call.status()).details() 

263 

264 async def debug_error_string(self) -> str: 

265 return (await self._cython_call.status()).debug_error_string() 

266 

267 async def _raise_for_status(self) -> None: 

268 if self._cython_call.is_locally_cancelled(): 

269 raise asyncio.CancelledError() 

270 code = await self.code() 

271 if code != grpc.StatusCode.OK: 

272 raise _create_rpc_error( 

273 await self.initial_metadata(), 

274 await self._cython_call.status(), 

275 ) 

276 

277 def _repr(self) -> str: 

278 return repr(self._cython_call) 

279 

280 def __repr__(self) -> str: 

281 return self._repr() 

282 

283 def __str__(self) -> str: 

284 return self._repr() 

285 

286 

287class _APIStyle(enum.IntEnum): 

288 UNKNOWN = 0 

289 ASYNC_GENERATOR = 1 

290 READER_WRITER = 2 

291 

292 

293class _UnaryResponseMixin(Call, Generic[ResponseType]): 

294 _call_response: asyncio.Task 

295 

296 def _init_unary_response_mixin(self, response_task: asyncio.Task): 

297 self._call_response = response_task 

298 

299 def cancel(self) -> bool: 

300 if super().cancel(): 

301 self._call_response.cancel() 

302 return True 

303 return False 

304 

305 def __await__(self) -> Generator[Any, None, ResponseType]: 

306 """Wait till the ongoing RPC request finishes.""" 

307 try: 

308 response = yield from self._call_response 

309 except asyncio.CancelledError: 

310 # Even if we caught all other CancelledError, there is still 

311 # this corner case. If the application cancels immediately after 

312 # the Call object is created, we will observe this 

313 # `CancelledError`. 

314 if not self.cancelled(): 

315 self.cancel() 

316 raise 

317 

318 # NOTE(lidiz) If we raise RpcError in the task, and users doesn't 

319 # 'await' on it. AsyncIO will log 'Task exception was never retrieved'. 

320 # Instead, if we move the exception raising here, the spam stops. 

321 # Unfortunately, there can only be one 'yield from' in '__await__'. So, 

322 # we need to access the private instance variable. 

323 if response is cygrpc.EOF: 

324 if self._cython_call.is_locally_cancelled(): 

325 raise asyncio.CancelledError() 

326 else: 

327 raise _create_rpc_error( 

328 self._cython_call._initial_metadata, 

329 self._cython_call._status, 

330 ) 

331 else: 

332 return response 

333 

334 

335class _StreamResponseMixin(Call): 

336 _message_aiter: AsyncIterator[ResponseType] 

337 _preparation: asyncio.Task 

338 _response_style: _APIStyle 

339 

340 def _init_stream_response_mixin(self, preparation: asyncio.Task): 

341 self._message_aiter = None 

342 self._preparation = preparation 

343 self._response_style = _APIStyle.UNKNOWN 

344 

345 def _update_response_style(self, style: _APIStyle): 

346 if self._response_style is _APIStyle.UNKNOWN: 

347 self._response_style = style 

348 elif self._response_style is not style: 

349 raise cygrpc.UsageError(_API_STYLE_ERROR) 

350 

351 def cancel(self) -> bool: 

352 if super().cancel(): 

353 self._preparation.cancel() 

354 return True 

355 return False 

356 

357 async def _fetch_stream_responses(self) -> ResponseType: 

358 message = await self._read() 

359 while message is not cygrpc.EOF: 

360 yield message 

361 message = await self._read() 

362 

363 # If the read operation failed, Core should explain why. 

364 await self._raise_for_status() 

365 

366 def __aiter__(self) -> AsyncIterator[ResponseType]: 

367 self._update_response_style(_APIStyle.ASYNC_GENERATOR) 

368 if self._message_aiter is None: 

369 self._message_aiter = self._fetch_stream_responses() 

370 return self._message_aiter 

371 

372 async def _read(self) -> ResponseType: 

373 # Wait for the request being sent 

374 await self._preparation 

375 

376 # Reads response message from Core 

377 try: 

378 raw_response = await self._cython_call.receive_serialized_message() 

379 except asyncio.CancelledError: 

380 if not self.cancelled(): 

381 self.cancel() 

382 raise 

383 

384 if raw_response is cygrpc.EOF: 

385 return cygrpc.EOF 

386 return _common.deserialize(raw_response, self._response_deserializer) 

387 

388 async def read(self) -> Union[EOFType, ResponseType]: 

389 if self.done(): 

390 await self._raise_for_status() 

391 return cygrpc.EOF 

392 self._update_response_style(_APIStyle.READER_WRITER) 

393 

394 response_message = await self._read() 

395 

396 if response_message is cygrpc.EOF: 

397 # If the read operation failed, Core should explain why. 

398 await self._raise_for_status() 

399 return response_message 

400 

401 

402class _StreamRequestMixin(Call): 

403 _metadata_sent: asyncio.Event 

404 _done_writing_flag: bool 

405 _async_request_poller: Optional[asyncio.Task] 

406 _request_style: _APIStyle 

407 

408 def _init_stream_request_mixin( 

409 self, request_iterator: Optional[RequestIterableType] 

410 ): 

411 self._metadata_sent = asyncio.Event() 

412 self._done_writing_flag = False 

413 

414 # If user passes in an async iterator, create a consumer Task. 

415 if request_iterator is not None: 

416 self._async_request_poller = self._loop.create_task( 

417 self._consume_request_iterator(request_iterator) 

418 ) 

419 self._request_style = _APIStyle.ASYNC_GENERATOR 

420 else: 

421 self._async_request_poller = None 

422 self._request_style = _APIStyle.READER_WRITER 

423 

424 def _raise_for_different_style(self, style: _APIStyle): 

425 if self._request_style is not style: 

426 raise cygrpc.UsageError(_API_STYLE_ERROR) 

427 

428 def cancel(self) -> bool: 

429 if super().cancel(): 

430 if self._async_request_poller is not None: 

431 self._async_request_poller.cancel() 

432 return True 

433 return False 

434 

435 def _metadata_sent_observer(self): 

436 self._metadata_sent.set() 

437 

438 async def _consume_request_iterator( 

439 self, request_iterator: RequestIterableType 

440 ) -> None: 

441 try: 

442 if inspect.isasyncgen(request_iterator) or hasattr( 

443 request_iterator, "__aiter__" 

444 ): 

445 async for request in request_iterator: 

446 try: 

447 await self._write(request) 

448 except AioRpcError as rpc_error: 

449 _LOGGER.debug( 

450 ( 

451 "Exception while consuming the" 

452 " request_iterator: %s" 

453 ), 

454 rpc_error, 

455 ) 

456 return 

457 else: 

458 for request in request_iterator: 

459 try: 

460 await self._write(request) 

461 except AioRpcError as rpc_error: 

462 _LOGGER.debug( 

463 ( 

464 "Exception while consuming the" 

465 " request_iterator: %s" 

466 ), 

467 rpc_error, 

468 ) 

469 return 

470 

471 await self._done_writing() 

472 except: # pylint: disable=bare-except # noqa: E722 

473 # Client iterators can raise exceptions, which we should handle by 

474 # cancelling the RPC and logging the client's error. No exceptions 

475 # should escape this function. 

476 _LOGGER.debug( 

477 "Client request_iterator raised exception:\n%s", 

478 traceback.format_exc(), 

479 ) 

480 self.cancel() 

481 

482 async def _write(self, request: RequestType) -> None: 

483 if self.done(): 

484 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

485 if self._done_writing_flag: 

486 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS) 

487 if not self._metadata_sent.is_set(): 

488 await self._metadata_sent.wait() 

489 if self.done(): 

490 await self._raise_for_status() 

491 

492 serialized_request = _common.serialize( 

493 request, self._request_serializer 

494 ) 

495 try: 

496 await self._cython_call.send_serialized_message(serialized_request) 

497 except cygrpc.InternalError as err: 

498 self._cython_call.set_internal_error(str(err)) 

499 await self._raise_for_status() 

500 except asyncio.CancelledError: 

501 if not self.cancelled(): 

502 self.cancel() 

503 raise 

504 

505 async def _done_writing(self) -> None: 

506 if self.done(): 

507 # If the RPC is finished, do nothing. 

508 return 

509 if not self._done_writing_flag: 

510 # If the done writing is not sent before, try to send it. 

511 self._done_writing_flag = True 

512 try: 

513 await self._cython_call.send_receive_close() 

514 except asyncio.CancelledError: 

515 if not self.cancelled(): 

516 self.cancel() 

517 raise 

518 

519 async def write(self, request: RequestType) -> None: 

520 self._raise_for_different_style(_APIStyle.READER_WRITER) 

521 await self._write(request) 

522 

523 async def done_writing(self) -> None: 

524 """Signal peer that client is done writing. 

525 

526 This method is idempotent. 

527 """ 

528 self._raise_for_different_style(_APIStyle.READER_WRITER) 

529 await self._done_writing() 

530 

531 async def wait_for_connection(self) -> None: 

532 await self._metadata_sent.wait() 

533 if self.done(): 

534 await self._raise_for_status() 

535 

536 

537class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall): 

538 """Object for managing unary-unary RPC calls. 

539 

540 Returned when an instance of `UnaryUnaryMultiCallable` object is called. 

541 """ 

542 

543 _request: RequestType 

544 _invocation_task: asyncio.Task 

545 

546 # pylint: disable=too-many-arguments 

547 def __init__( 

548 self, 

549 request: RequestType, 

550 deadline: Optional[float], 

551 metadata: Metadata, 

552 credentials: Optional[grpc.CallCredentials], 

553 wait_for_ready: Optional[bool], 

554 channel: cygrpc.AioChannel, 

555 method: bytes, 

556 request_serializer: Optional[SerializingFunction], 

557 response_deserializer: Optional[DeserializingFunction], 

558 loop: asyncio.AbstractEventLoop, 

559 ) -> None: 

560 super().__init__( 

561 channel.call(method, deadline, credentials, wait_for_ready), 

562 metadata, 

563 request_serializer, 

564 response_deserializer, 

565 loop, 

566 ) 

567 self._request = request 

568 self._context = cygrpc.build_census_context() 

569 self._invocation_task = loop.create_task(self._invoke()) 

570 self._init_unary_response_mixin(self._invocation_task) 

571 

572 async def _invoke(self) -> ResponseType: 

573 serialized_request = _common.serialize( 

574 self._request, self._request_serializer 

575 ) 

576 

577 # NOTE(lidiz) asyncio.CancelledError is not a good transport for status, 

578 # because the asyncio.Task class do not cache the exception object. 

579 # https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785 

580 try: 

581 serialized_response = await self._cython_call.unary_unary( 

582 serialized_request, self._metadata, self._context 

583 ) 

584 except asyncio.CancelledError: 

585 if not self.cancelled(): 

586 self.cancel() 

587 

588 if self._cython_call.is_ok(): 

589 return _common.deserialize( 

590 serialized_response, self._response_deserializer 

591 ) 

592 return cygrpc.EOF 

593 

594 async def wait_for_connection(self) -> None: 

595 await self._invocation_task 

596 if self.done(): 

597 await self._raise_for_status() 

598 

599 

600class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall): 

601 """Object for managing unary-stream RPC calls. 

602 

603 Returned when an instance of `UnaryStreamMultiCallable` object is called. 

604 """ 

605 

606 _request: RequestType 

607 _send_unary_request_task: asyncio.Task 

608 

609 # pylint: disable=too-many-arguments 

610 def __init__( 

611 self, 

612 request: RequestType, 

613 deadline: Optional[float], 

614 metadata: Metadata, 

615 credentials: Optional[grpc.CallCredentials], 

616 wait_for_ready: Optional[bool], 

617 channel: cygrpc.AioChannel, 

618 method: bytes, 

619 request_serializer: Optional[SerializingFunction], 

620 response_deserializer: Optional[DeserializingFunction], 

621 loop: asyncio.AbstractEventLoop, 

622 ) -> None: 

623 super().__init__( 

624 channel.call(method, deadline, credentials, wait_for_ready), 

625 metadata, 

626 request_serializer, 

627 response_deserializer, 

628 loop, 

629 ) 

630 self._request = request 

631 self._context = cygrpc.build_census_context() 

632 self._send_unary_request_task = loop.create_task( 

633 self._send_unary_request() 

634 ) 

635 self._init_stream_response_mixin(self._send_unary_request_task) 

636 

637 async def _send_unary_request(self) -> ResponseType: 

638 serialized_request = _common.serialize( 

639 self._request, self._request_serializer 

640 ) 

641 try: 

642 await self._cython_call.initiate_unary_stream( 

643 serialized_request, self._metadata, self._context 

644 ) 

645 except asyncio.CancelledError: 

646 if not self.cancelled(): 

647 self.cancel() 

648 raise 

649 

650 async def wait_for_connection(self) -> None: 

651 await self._send_unary_request_task 

652 if self.done(): 

653 await self._raise_for_status() 

654 

655 

656# pylint: disable=too-many-ancestors 

657class StreamUnaryCall( 

658 _StreamRequestMixin, _UnaryResponseMixin, Call, _base_call.StreamUnaryCall 

659): 

660 """Object for managing stream-unary RPC calls. 

661 

662 Returned when an instance of `StreamUnaryMultiCallable` object is called. 

663 """ 

664 

665 # pylint: disable=too-many-arguments 

666 def __init__( 

667 self, 

668 request_iterator: Optional[RequestIterableType], 

669 deadline: Optional[float], 

670 metadata: Metadata, 

671 credentials: Optional[grpc.CallCredentials], 

672 wait_for_ready: Optional[bool], 

673 channel: cygrpc.AioChannel, 

674 method: bytes, 

675 request_serializer: Optional[SerializingFunction], 

676 response_deserializer: Optional[DeserializingFunction], 

677 loop: asyncio.AbstractEventLoop, 

678 ) -> None: 

679 super().__init__( 

680 channel.call(method, deadline, credentials, wait_for_ready), 

681 metadata, 

682 request_serializer, 

683 response_deserializer, 

684 loop, 

685 ) 

686 

687 self._context = cygrpc.build_census_context() 

688 self._init_stream_request_mixin(request_iterator) 

689 self._init_unary_response_mixin(loop.create_task(self._conduct_rpc())) 

690 

691 async def _conduct_rpc(self) -> ResponseType: 

692 try: 

693 serialized_response = await self._cython_call.stream_unary( 

694 self._metadata, self._metadata_sent_observer, self._context 

695 ) 

696 except asyncio.CancelledError: 

697 if not self.cancelled(): 

698 self.cancel() 

699 raise 

700 

701 if self._cython_call.is_ok(): 

702 return _common.deserialize( 

703 serialized_response, self._response_deserializer 

704 ) 

705 return cygrpc.EOF 

706 

707 

708class StreamStreamCall( 

709 _StreamRequestMixin, _StreamResponseMixin, Call, _base_call.StreamStreamCall 

710): 

711 """Object for managing stream-stream RPC calls. 

712 

713 Returned when an instance of `StreamStreamMultiCallable` object is called. 

714 """ 

715 

716 _initializer: asyncio.Task 

717 

718 # pylint: disable=too-many-arguments 

719 def __init__( 

720 self, 

721 request_iterator: Optional[RequestIterableType], 

722 deadline: Optional[float], 

723 metadata: Metadata, 

724 credentials: Optional[grpc.CallCredentials], 

725 wait_for_ready: Optional[bool], 

726 channel: cygrpc.AioChannel, 

727 method: bytes, 

728 request_serializer: Optional[SerializingFunction], 

729 response_deserializer: Optional[DeserializingFunction], 

730 loop: asyncio.AbstractEventLoop, 

731 ) -> None: 

732 super().__init__( 

733 channel.call(method, deadline, credentials, wait_for_ready), 

734 metadata, 

735 request_serializer, 

736 response_deserializer, 

737 loop, 

738 ) 

739 self._context = cygrpc.build_census_context() 

740 self._initializer = self._loop.create_task(self._prepare_rpc()) 

741 self._init_stream_request_mixin(request_iterator) 

742 self._init_stream_response_mixin(self._initializer) 

743 

744 async def _prepare_rpc(self): 

745 """Prepares the RPC for receiving/sending messages. 

746 

747 All other operations around the stream should only happen after the 

748 completion of this method. 

749 """ 

750 try: 

751 await self._cython_call.initiate_stream_stream( 

752 self._metadata, self._metadata_sent_observer, self._context 

753 ) 

754 except asyncio.CancelledError: 

755 if not self.cancelled(): 

756 self.cancel() 

757 # No need to raise RpcError here, because no one will `await` this task.