Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/aio/_call.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

354 statements  

1# Copyright 2019 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Asyncio Python.""" 

15 

16import asyncio 

17import enum 

18from functools import partial 

19import inspect 

20import logging 

21import traceback 

22from typing import ( 

23 Any, 

24 AsyncIterator, 

25 Generator, 

26 Generic, 

27 Optional, 

28 Tuple, 

29 Union, 

30) 

31 

32import grpc 

33from grpc import _common 

34from grpc._cython import cygrpc 

35 

36from . import _base_call 

37from ._metadata import Metadata 

38from ._typing import DeserializingFunction 

39from ._typing import DoneCallbackType 

40from ._typing import EOFType 

41from ._typing import MetadataType 

42from ._typing import MetadatumType 

43from ._typing import RequestIterableType 

44from ._typing import RequestType 

45from ._typing import ResponseType 

46from ._typing import SerializingFunction 

47 

48__all__ = "AioRpcError", "Call", "UnaryUnaryCall", "UnaryStreamCall" 

49 

50_LOCAL_CANCELLATION_DETAILS = "Locally cancelled by application!" 

51_GC_CANCELLATION_DETAILS = "Cancelled upon garbage collection!" 

52_RPC_ALREADY_FINISHED_DETAILS = "RPC already finished." 

53_RPC_HALF_CLOSED_DETAILS = 'RPC is half closed after calling "done_writing".' 

54_API_STYLE_ERROR = ( 

55 "The iterator and read/write APIs may not be mixed on a single RPC." 

56) 

57 

58_OK_CALL_REPRESENTATION = ( 

59 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' 

60) 

61 

62_NON_OK_CALL_REPRESENTATION = ( 

63 "<{} of RPC that terminated with:\n" 

64 "\tstatus = {}\n" 

65 '\tdetails = "{}"\n' 

66 '\tdebug_error_string = "{}"\n' 

67 ">" 

68) 

69 

70_LOGGER = logging.getLogger(__name__) 

71 

72 

73class AioRpcError(grpc.RpcError): 

74 """An implementation of RpcError to be used by the asynchronous API. 

75 

76 Raised RpcError is a snapshot of the final status of the RPC, values are 

77 determined. Hence, its methods no longer needs to be coroutines. 

78 """ 

79 

80 _code: grpc.StatusCode 

81 _details: Optional[str] 

82 _initial_metadata: Optional[Metadata] 

83 _trailing_metadata: Optional[Metadata] 

84 _debug_error_string: Optional[str] 

85 

86 def __init__( 

87 self, 

88 code: grpc.StatusCode, 

89 initial_metadata: Metadata, 

90 trailing_metadata: Metadata, 

91 details: Optional[str] = None, 

92 debug_error_string: Optional[str] = None, 

93 ) -> None: 

94 """Constructor. 

95 

96 Args: 

97 code: The status code with which the RPC has been finalized. 

98 initial_metadata: Optional initial metadata that could be sent by the 

99 Server. 

100 trailing_metadata: Optional metadata that could be sent by the Server. 

101 details: Optional details explaining the reason of the error. 

102 debug_error_string: Optional string 

103 """ 

104 super().__init__() 

105 self._code = code 

106 self._details = details 

107 self._initial_metadata = initial_metadata 

108 self._trailing_metadata = trailing_metadata 

109 self._debug_error_string = debug_error_string 

110 

111 def code(self) -> grpc.StatusCode: 

112 """Accesses the status code sent by the server. 

113 

114 Returns: 

115 The `grpc.StatusCode` status code. 

116 """ 

117 return self._code 

118 

119 def details(self) -> Optional[str]: 

120 """Accesses the details sent by the server. 

121 

122 Returns: 

123 The description of the error. 

124 """ 

125 return self._details 

126 

127 def initial_metadata(self) -> Metadata: 

128 """Accesses the initial metadata sent by the server. 

129 

130 Returns: 

131 The initial metadata received. 

132 """ 

133 return self._initial_metadata 

134 

135 def trailing_metadata(self) -> Metadata: 

136 """Accesses the trailing metadata sent by the server. 

137 

138 Returns: 

139 The trailing metadata received. 

140 """ 

141 return self._trailing_metadata 

142 

143 def debug_error_string(self) -> str: 

144 """Accesses the debug error string sent by the server. 

145 

146 Returns: 

147 The debug error string received. 

148 """ 

149 return self._debug_error_string 

150 

151 def _repr(self) -> str: 

152 """Assembles the error string for the RPC error.""" 

153 return _NON_OK_CALL_REPRESENTATION.format( 

154 self.__class__.__name__, 

155 self._code, 

156 self._details, 

157 self._debug_error_string, 

158 ) 

159 

160 def __repr__(self) -> str: 

161 return self._repr() 

162 

163 def __str__(self) -> str: 

164 return self._repr() 

165 

166 def __reduce__(self): 

167 return ( 

168 type(self), 

169 ( 

170 self._code, 

171 self._initial_metadata, 

172 self._trailing_metadata, 

173 self._details, 

174 self._debug_error_string, 

175 ), 

176 ) 

177 

178 

179def _create_rpc_error( 

180 initial_metadata: MetadataType, 

181 status: cygrpc.AioRpcStatus, 

182) -> AioRpcError: 

183 return AioRpcError( 

184 _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()], 

185 Metadata.from_tuple(initial_metadata), 

186 Metadata.from_tuple(status.trailing_metadata()), 

187 details=status.details(), 

188 debug_error_string=status.debug_error_string(), 

189 ) 

190 

191 

192class Call: 

193 """Base implementation of client RPC Call object. 

194 

195 Implements logic around final status, metadata and cancellation. 

196 """ 

197 

198 _loop: asyncio.AbstractEventLoop 

199 _code: grpc.StatusCode 

200 _cython_call: cygrpc._AioCall 

201 _metadata: Tuple[MetadatumType, ...] 

202 _request_serializer: Optional[SerializingFunction] 

203 _response_deserializer: Optional[DeserializingFunction] 

204 

205 def __init__( 

206 self, 

207 cython_call: cygrpc._AioCall, 

208 metadata: Metadata, 

209 request_serializer: Optional[SerializingFunction], 

210 response_deserializer: Optional[DeserializingFunction], 

211 loop: asyncio.AbstractEventLoop, 

212 ) -> None: 

213 self._loop = loop 

214 self._cython_call = cython_call 

215 self._metadata = tuple(metadata) 

216 self._request_serializer = request_serializer 

217 self._response_deserializer = response_deserializer 

218 

219 def __del__(self) -> None: 

220 # The '_cython_call' object might be destructed before Call object 

221 if hasattr(self, "_cython_call"): 

222 if not self._cython_call.done(): 

223 self._cancel(_GC_CANCELLATION_DETAILS) 

224 

225 def cancelled(self) -> bool: 

226 return self._cython_call.cancelled() 

227 

228 def _cancel(self, details: str) -> bool: 

229 """Forwards the application cancellation reasoning.""" 

230 if not self._cython_call.done(): 

231 self._cython_call.cancel(details) 

232 return True 

233 else: 

234 return False 

235 

236 def cancel(self) -> bool: 

237 return self._cancel(_LOCAL_CANCELLATION_DETAILS) 

238 

239 def done(self) -> bool: 

240 return self._cython_call.done() 

241 

242 def add_done_callback(self, callback: DoneCallbackType) -> None: 

243 cb = partial(callback, self) 

244 self._cython_call.add_done_callback(cb) 

245 

246 def time_remaining(self) -> Optional[float]: 

247 return self._cython_call.time_remaining() 

248 

249 async def initial_metadata(self) -> Metadata: 

250 raw_metadata_tuple = await self._cython_call.initial_metadata() 

251 return Metadata.from_tuple(raw_metadata_tuple) 

252 

253 async def trailing_metadata(self) -> Metadata: 

254 raw_metadata_tuple = ( 

255 await self._cython_call.status() 

256 ).trailing_metadata() 

257 return Metadata.from_tuple(raw_metadata_tuple) 

258 

259 async def code(self) -> grpc.StatusCode: 

260 cygrpc_code = (await self._cython_call.status()).code() 

261 return _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[cygrpc_code] 

262 

263 async def details(self) -> str: 

264 return (await self._cython_call.status()).details() 

265 

266 async def debug_error_string(self) -> str: 

267 return (await self._cython_call.status()).debug_error_string() 

268 

269 async def _raise_for_status(self) -> None: 

270 if self._cython_call.is_locally_cancelled(): 

271 raise asyncio.CancelledError() 

272 code = await self.code() 

273 if code != grpc.StatusCode.OK: 

274 raise _create_rpc_error( 

275 await self.initial_metadata(), 

276 await self._cython_call.status(), 

277 ) 

278 

279 def _repr(self) -> str: 

280 return repr(self._cython_call) 

281 

282 def __repr__(self) -> str: 

283 return self._repr() 

284 

285 def __str__(self) -> str: 

286 return self._repr() 

287 

288 

289class _APIStyle(enum.IntEnum): 

290 UNKNOWN = 0 

291 ASYNC_GENERATOR = 1 

292 READER_WRITER = 2 

293 

294 

295class _UnaryResponseMixin(Call, Generic[ResponseType]): 

296 _call_response: asyncio.Task 

297 

298 def _init_unary_response_mixin(self, response_task: asyncio.Task): 

299 self._call_response = response_task 

300 

301 def cancel(self) -> bool: 

302 if super().cancel(): 

303 self._call_response.cancel() 

304 return True 

305 else: 

306 return False 

307 

308 def __await__(self) -> Generator[Any, None, ResponseType]: 

309 """Wait till the ongoing RPC request finishes.""" 

310 try: 

311 response = yield from self._call_response 

312 except asyncio.CancelledError: 

313 # Even if we caught all other CancelledError, there is still 

314 # this corner case. If the application cancels immediately after 

315 # the Call object is created, we will observe this 

316 # `CancelledError`. 

317 if not self.cancelled(): 

318 self.cancel() 

319 raise 

320 

321 # NOTE(lidiz) If we raise RpcError in the task, and users doesn't 

322 # 'await' on it. AsyncIO will log 'Task exception was never retrieved'. 

323 # Instead, if we move the exception raising here, the spam stops. 

324 # Unfortunately, there can only be one 'yield from' in '__await__'. So, 

325 # we need to access the private instance variable. 

326 if response is cygrpc.EOF: 

327 if self._cython_call.is_locally_cancelled(): 

328 raise asyncio.CancelledError() 

329 else: 

330 raise _create_rpc_error( 

331 self._cython_call._initial_metadata, 

332 self._cython_call._status, 

333 ) 

334 else: 

335 return response 

336 

337 

338class _StreamResponseMixin(Call): 

339 _message_aiter: AsyncIterator[ResponseType] 

340 _preparation: asyncio.Task 

341 _response_style: _APIStyle 

342 

343 def _init_stream_response_mixin(self, preparation: asyncio.Task): 

344 self._message_aiter = None 

345 self._preparation = preparation 

346 self._response_style = _APIStyle.UNKNOWN 

347 

348 def _update_response_style(self, style: _APIStyle): 

349 if self._response_style is _APIStyle.UNKNOWN: 

350 self._response_style = style 

351 elif self._response_style is not style: 

352 raise cygrpc.UsageError(_API_STYLE_ERROR) 

353 

354 def cancel(self) -> bool: 

355 if super().cancel(): 

356 self._preparation.cancel() 

357 return True 

358 else: 

359 return False 

360 

361 async def _fetch_stream_responses(self) -> ResponseType: 

362 message = await self._read() 

363 while message is not cygrpc.EOF: 

364 yield message 

365 message = await self._read() 

366 

367 # If the read operation failed, Core should explain why. 

368 await self._raise_for_status() 

369 

370 def __aiter__(self) -> AsyncIterator[ResponseType]: 

371 self._update_response_style(_APIStyle.ASYNC_GENERATOR) 

372 if self._message_aiter is None: 

373 self._message_aiter = self._fetch_stream_responses() 

374 return self._message_aiter 

375 

376 async def _read(self) -> ResponseType: 

377 # Wait for the request being sent 

378 await self._preparation 

379 

380 # Reads response message from Core 

381 try: 

382 raw_response = await self._cython_call.receive_serialized_message() 

383 except asyncio.CancelledError: 

384 if not self.cancelled(): 

385 self.cancel() 

386 raise 

387 

388 if raw_response is cygrpc.EOF: 

389 return cygrpc.EOF 

390 else: 

391 return _common.deserialize( 

392 raw_response, self._response_deserializer 

393 ) 

394 

395 async def read(self) -> Union[EOFType, ResponseType]: 

396 if self.done(): 

397 await self._raise_for_status() 

398 return cygrpc.EOF 

399 self._update_response_style(_APIStyle.READER_WRITER) 

400 

401 response_message = await self._read() 

402 

403 if response_message is cygrpc.EOF: 

404 # If the read operation failed, Core should explain why. 

405 await self._raise_for_status() 

406 return response_message 

407 

408 

409class _StreamRequestMixin(Call): 

410 _metadata_sent: asyncio.Event 

411 _done_writing_flag: bool 

412 _async_request_poller: Optional[asyncio.Task] 

413 _request_style: _APIStyle 

414 

415 def _init_stream_request_mixin( 

416 self, request_iterator: Optional[RequestIterableType] 

417 ): 

418 self._metadata_sent = asyncio.Event() 

419 self._done_writing_flag = False 

420 

421 # If user passes in an async iterator, create a consumer Task. 

422 if request_iterator is not None: 

423 self._async_request_poller = self._loop.create_task( 

424 self._consume_request_iterator(request_iterator) 

425 ) 

426 self._request_style = _APIStyle.ASYNC_GENERATOR 

427 else: 

428 self._async_request_poller = None 

429 self._request_style = _APIStyle.READER_WRITER 

430 

431 def _raise_for_different_style(self, style: _APIStyle): 

432 if self._request_style is not style: 

433 raise cygrpc.UsageError(_API_STYLE_ERROR) 

434 

435 def cancel(self) -> bool: 

436 if super().cancel(): 

437 if self._async_request_poller is not None: 

438 self._async_request_poller.cancel() 

439 return True 

440 else: 

441 return False 

442 

443 def _metadata_sent_observer(self): 

444 self._metadata_sent.set() 

445 

446 async def _consume_request_iterator( 

447 self, request_iterator: RequestIterableType 

448 ) -> None: 

449 try: 

450 if inspect.isasyncgen(request_iterator) or hasattr( 

451 request_iterator, "__aiter__" 

452 ): 

453 async for request in request_iterator: 

454 try: 

455 await self._write(request) 

456 except AioRpcError as rpc_error: 

457 _LOGGER.debug( 

458 ( 

459 "Exception while consuming the" 

460 " request_iterator: %s" 

461 ), 

462 rpc_error, 

463 ) 

464 return 

465 else: 

466 for request in request_iterator: 

467 try: 

468 await self._write(request) 

469 except AioRpcError as rpc_error: 

470 _LOGGER.debug( 

471 ( 

472 "Exception while consuming the" 

473 " request_iterator: %s" 

474 ), 

475 rpc_error, 

476 ) 

477 return 

478 

479 await self._done_writing() 

480 except: # pylint: disable=bare-except # noqa: E722 

481 # Client iterators can raise exceptions, which we should handle by 

482 # cancelling the RPC and logging the client's error. No exceptions 

483 # should escape this function. 

484 _LOGGER.debug( 

485 "Client request_iterator raised exception:\n%s", 

486 traceback.format_exc(), 

487 ) 

488 self.cancel() 

489 

490 async def _write(self, request: RequestType) -> None: 

491 if self.done(): 

492 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

493 if self._done_writing_flag: 

494 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS) 

495 if not self._metadata_sent.is_set(): 

496 await self._metadata_sent.wait() 

497 if self.done(): 

498 await self._raise_for_status() 

499 

500 serialized_request = _common.serialize( 

501 request, self._request_serializer 

502 ) 

503 try: 

504 await self._cython_call.send_serialized_message(serialized_request) 

505 except cygrpc.InternalError as err: 

506 self._cython_call.set_internal_error(str(err)) 

507 await self._raise_for_status() 

508 except asyncio.CancelledError: 

509 if not self.cancelled(): 

510 self.cancel() 

511 raise 

512 

513 async def _done_writing(self) -> None: 

514 if self.done(): 

515 # If the RPC is finished, do nothing. 

516 return 

517 if not self._done_writing_flag: 

518 # If the done writing is not sent before, try to send it. 

519 self._done_writing_flag = True 

520 try: 

521 await self._cython_call.send_receive_close() 

522 except asyncio.CancelledError: 

523 if not self.cancelled(): 

524 self.cancel() 

525 raise 

526 

527 async def write(self, request: RequestType) -> None: 

528 self._raise_for_different_style(_APIStyle.READER_WRITER) 

529 await self._write(request) 

530 

531 async def done_writing(self) -> None: 

532 """Signal peer that client is done writing. 

533 

534 This method is idempotent. 

535 """ 

536 self._raise_for_different_style(_APIStyle.READER_WRITER) 

537 await self._done_writing() 

538 

539 async def wait_for_connection(self) -> None: 

540 await self._metadata_sent.wait() 

541 if self.done(): 

542 await self._raise_for_status() 

543 

544 

545class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall): 

546 """Object for managing unary-unary RPC calls. 

547 

548 Returned when an instance of `UnaryUnaryMultiCallable` object is called. 

549 """ 

550 

551 _request: RequestType 

552 _invocation_task: asyncio.Task 

553 

554 # pylint: disable=too-many-arguments 

555 def __init__( 

556 self, 

557 request: RequestType, 

558 deadline: Optional[float], 

559 metadata: Metadata, 

560 credentials: Optional[grpc.CallCredentials], 

561 wait_for_ready: Optional[bool], 

562 channel: cygrpc.AioChannel, 

563 method: bytes, 

564 request_serializer: Optional[SerializingFunction], 

565 response_deserializer: Optional[DeserializingFunction], 

566 loop: asyncio.AbstractEventLoop, 

567 ) -> None: 

568 super().__init__( 

569 channel.call(method, deadline, credentials, wait_for_ready), 

570 metadata, 

571 request_serializer, 

572 response_deserializer, 

573 loop, 

574 ) 

575 self._request = request 

576 self._context = cygrpc.build_census_context() 

577 self._invocation_task = loop.create_task(self._invoke()) 

578 self._init_unary_response_mixin(self._invocation_task) 

579 

580 async def _invoke(self) -> ResponseType: 

581 serialized_request = _common.serialize( 

582 self._request, self._request_serializer 

583 ) 

584 

585 # NOTE(lidiz) asyncio.CancelledError is not a good transport for status, 

586 # because the asyncio.Task class do not cache the exception object. 

587 # https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785 

588 try: 

589 serialized_response = await self._cython_call.unary_unary( 

590 serialized_request, self._metadata, self._context 

591 ) 

592 except asyncio.CancelledError: 

593 if not self.cancelled(): 

594 self.cancel() 

595 

596 if self._cython_call.is_ok(): 

597 return _common.deserialize( 

598 serialized_response, self._response_deserializer 

599 ) 

600 else: 

601 return cygrpc.EOF 

602 

603 async def wait_for_connection(self) -> None: 

604 await self._invocation_task 

605 if self.done(): 

606 await self._raise_for_status() 

607 

608 

609class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall): 

610 """Object for managing unary-stream RPC calls. 

611 

612 Returned when an instance of `UnaryStreamMultiCallable` object is called. 

613 """ 

614 

615 _request: RequestType 

616 _send_unary_request_task: asyncio.Task 

617 

618 # pylint: disable=too-many-arguments 

619 def __init__( 

620 self, 

621 request: RequestType, 

622 deadline: Optional[float], 

623 metadata: Metadata, 

624 credentials: Optional[grpc.CallCredentials], 

625 wait_for_ready: Optional[bool], 

626 channel: cygrpc.AioChannel, 

627 method: bytes, 

628 request_serializer: Optional[SerializingFunction], 

629 response_deserializer: Optional[DeserializingFunction], 

630 loop: asyncio.AbstractEventLoop, 

631 ) -> None: 

632 super().__init__( 

633 channel.call(method, deadline, credentials, wait_for_ready), 

634 metadata, 

635 request_serializer, 

636 response_deserializer, 

637 loop, 

638 ) 

639 self._request = request 

640 self._context = cygrpc.build_census_context() 

641 self._send_unary_request_task = loop.create_task( 

642 self._send_unary_request() 

643 ) 

644 self._init_stream_response_mixin(self._send_unary_request_task) 

645 

646 async def _send_unary_request(self) -> ResponseType: 

647 serialized_request = _common.serialize( 

648 self._request, self._request_serializer 

649 ) 

650 try: 

651 await self._cython_call.initiate_unary_stream( 

652 serialized_request, self._metadata, self._context 

653 ) 

654 except asyncio.CancelledError: 

655 if not self.cancelled(): 

656 self.cancel() 

657 raise 

658 

659 async def wait_for_connection(self) -> None: 

660 await self._send_unary_request_task 

661 if self.done(): 

662 await self._raise_for_status() 

663 

664 

665# pylint: disable=too-many-ancestors 

666class StreamUnaryCall( 

667 _StreamRequestMixin, _UnaryResponseMixin, Call, _base_call.StreamUnaryCall 

668): 

669 """Object for managing stream-unary RPC calls. 

670 

671 Returned when an instance of `StreamUnaryMultiCallable` object is called. 

672 """ 

673 

674 # pylint: disable=too-many-arguments 

675 def __init__( 

676 self, 

677 request_iterator: Optional[RequestIterableType], 

678 deadline: Optional[float], 

679 metadata: Metadata, 

680 credentials: Optional[grpc.CallCredentials], 

681 wait_for_ready: Optional[bool], 

682 channel: cygrpc.AioChannel, 

683 method: bytes, 

684 request_serializer: Optional[SerializingFunction], 

685 response_deserializer: Optional[DeserializingFunction], 

686 loop: asyncio.AbstractEventLoop, 

687 ) -> None: 

688 super().__init__( 

689 channel.call(method, deadline, credentials, wait_for_ready), 

690 metadata, 

691 request_serializer, 

692 response_deserializer, 

693 loop, 

694 ) 

695 

696 self._context = cygrpc.build_census_context() 

697 self._init_stream_request_mixin(request_iterator) 

698 self._init_unary_response_mixin(loop.create_task(self._conduct_rpc())) 

699 

700 async def _conduct_rpc(self) -> ResponseType: 

701 try: 

702 serialized_response = await self._cython_call.stream_unary( 

703 self._metadata, self._metadata_sent_observer, self._context 

704 ) 

705 except asyncio.CancelledError: 

706 if not self.cancelled(): 

707 self.cancel() 

708 raise 

709 

710 if self._cython_call.is_ok(): 

711 return _common.deserialize( 

712 serialized_response, self._response_deserializer 

713 ) 

714 else: 

715 return cygrpc.EOF 

716 

717 

718class StreamStreamCall( 

719 _StreamRequestMixin, _StreamResponseMixin, Call, _base_call.StreamStreamCall 

720): 

721 """Object for managing stream-stream RPC calls. 

722 

723 Returned when an instance of `StreamStreamMultiCallable` object is called. 

724 """ 

725 

726 _initializer: asyncio.Task 

727 

728 # pylint: disable=too-many-arguments 

729 def __init__( 

730 self, 

731 request_iterator: Optional[RequestIterableType], 

732 deadline: Optional[float], 

733 metadata: Metadata, 

734 credentials: Optional[grpc.CallCredentials], 

735 wait_for_ready: Optional[bool], 

736 channel: cygrpc.AioChannel, 

737 method: bytes, 

738 request_serializer: Optional[SerializingFunction], 

739 response_deserializer: Optional[DeserializingFunction], 

740 loop: asyncio.AbstractEventLoop, 

741 ) -> None: 

742 super().__init__( 

743 channel.call(method, deadline, credentials, wait_for_ready), 

744 metadata, 

745 request_serializer, 

746 response_deserializer, 

747 loop, 

748 ) 

749 self._context = cygrpc.build_census_context() 

750 self._initializer = self._loop.create_task(self._prepare_rpc()) 

751 self._init_stream_request_mixin(request_iterator) 

752 self._init_stream_response_mixin(self._initializer) 

753 

754 async def _prepare_rpc(self): 

755 """Prepares the RPC for receiving/sending messages. 

756 

757 All other operations around the stream should only happen after the 

758 completion of this method. 

759 """ 

760 try: 

761 await self._cython_call.initiate_stream_stream( 

762 self._metadata, self._metadata_sent_observer, self._context 

763 ) 

764 except asyncio.CancelledError: 

765 if not self.cancelled(): 

766 self.cancel() 

767 # No need to raise RpcError here, because no one will `await` this task.