Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/aio/_call.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

355 statements  

1# Copyright 2019 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Asyncio Python.""" 

15 

16import asyncio 

17import enum 

18from functools import partial 

19import inspect 

20import logging 

21import traceback 

22from typing import ( 

23 Any, 

24 AsyncIterator, 

25 Generator, 

26 Generic, 

27 Optional, 

28 Tuple, 

29 Union, 

30) 

31 

32import grpc 

33from grpc import _common 

34from grpc._cython import cygrpc 

35 

36from . import _base_call 

37from ._metadata import Metadata 

38from ._typing import DeserializingFunction 

39from ._typing import DoneCallbackType 

40from ._typing import EOFType 

41from ._typing import MetadataType 

42from ._typing import MetadatumType 

43from ._typing import RequestIterableType 

44from ._typing import RequestType 

45from ._typing import ResponseType 

46from ._typing import SerializingFunction 

47 

48__all__ = "AioRpcError", "Call", "UnaryStreamCall", "UnaryUnaryCall" 

49 

50_LOCAL_CANCELLATION_DETAILS = "Locally cancelled by application!" 

51_GC_CANCELLATION_DETAILS = "Cancelled upon garbage collection!" 

52_RPC_ALREADY_FINISHED_DETAILS = "RPC already finished." 

53_RPC_HALF_CLOSED_DETAILS = 'RPC is half closed after calling "done_writing".' 

54_API_STYLE_ERROR = ( 

55 "The iterator and read/write APIs may not be mixed on a single RPC." 

56) 

57 

58_OK_CALL_REPRESENTATION = ( 

59 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' 

60) 

61 

62_NON_OK_CALL_REPRESENTATION = ( 

63 "<{} of RPC that terminated with:\n" 

64 "\tstatus = {}\n" 

65 '\tdetails = "{}"\n' 

66 '\tdebug_error_string = "{}"\n' 

67 ">" 

68) 

69 

70_LOGGER = logging.getLogger(__name__) 

71 

72 

73class AioRpcError(grpc.RpcError): 

74 """An implementation of RpcError to be used by the asynchronous API. 

75 

76 Raised RpcError is a snapshot of the final status of the RPC, values are 

77 determined. Hence, its methods no longer needs to be coroutines. 

78 """ 

79 

80 _code: grpc.StatusCode 

81 _details: Optional[str] 

82 _initial_metadata: Optional[Metadata] 

83 _trailing_metadata: Optional[Metadata] 

84 _debug_error_string: Optional[str] 

85 

86 def __init__( 

87 self, 

88 code: grpc.StatusCode, 

89 initial_metadata: Metadata, 

90 trailing_metadata: Metadata, 

91 details: Optional[str] = None, 

92 debug_error_string: Optional[str] = None, 

93 ) -> None: 

94 """Constructor. 

95 

96 Args: 

97 code: The status code with which the RPC has been finalized. 

98 initial_metadata: Optional initial metadata that could be sent by the 

99 Server. 

100 trailing_metadata: Optional metadata that could be sent by the Server. 

101 details: Optional details explaining the reason of the error. 

102 debug_error_string: Optional string 

103 """ 

104 super().__init__() 

105 self._code = code 

106 self._details = details 

107 self._initial_metadata = initial_metadata 

108 self._trailing_metadata = trailing_metadata 

109 self._debug_error_string = debug_error_string 

110 

111 def code(self) -> grpc.StatusCode: 

112 """Accesses the status code sent by the server. 

113 

114 Returns: 

115 The `grpc.StatusCode` status code. 

116 """ 

117 return self._code 

118 

119 def details(self) -> Optional[str]: 

120 """Accesses the details sent by the server. 

121 

122 Returns: 

123 The description of the error. 

124 """ 

125 return self._details 

126 

127 def initial_metadata(self) -> Metadata: 

128 """Accesses the initial metadata sent by the server. 

129 

130 Returns: 

131 The initial metadata received. 

132 """ 

133 return self._initial_metadata 

134 

135 def trailing_metadata(self) -> Metadata: 

136 """Accesses the trailing metadata sent by the server. 

137 

138 Returns: 

139 The trailing metadata received. 

140 """ 

141 return self._trailing_metadata 

142 

143 def debug_error_string(self) -> str: 

144 """Accesses the debug error string sent by the server. 

145 

146 Returns: 

147 The debug error string received. 

148 """ 

149 return self._debug_error_string 

150 

151 def _repr(self) -> str: 

152 """Assembles the error string for the RPC error.""" 

153 return _NON_OK_CALL_REPRESENTATION.format( 

154 self.__class__.__name__, 

155 self._code, 

156 self._details, 

157 self._debug_error_string, 

158 ) 

159 

160 def __repr__(self) -> str: 

161 return self._repr() 

162 

163 def __str__(self) -> str: 

164 return self._repr() 

165 

166 def __reduce__(self): 

167 return ( 

168 type(self), 

169 ( 

170 self._code, 

171 self._initial_metadata, 

172 self._trailing_metadata, 

173 self._details, 

174 self._debug_error_string, 

175 ), 

176 ) 

177 

178 

179def _create_rpc_error( 

180 initial_metadata: MetadataType, 

181 status: cygrpc.AioRpcStatus, 

182) -> AioRpcError: 

183 return AioRpcError( 

184 _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()], 

185 Metadata._create(initial_metadata), 

186 Metadata.from_tuple(status.trailing_metadata()), 

187 details=status.details(), 

188 debug_error_string=status.debug_error_string(), 

189 ) 

190 

191 

192class Call: 

193 """Base implementation of client RPC Call object. 

194 

195 Implements logic around final status, metadata and cancellation. 

196 """ 

197 

198 _loop: asyncio.AbstractEventLoop 

199 _code: grpc.StatusCode 

200 _cython_call: cygrpc._AioCall 

201 _metadata: Tuple[MetadatumType, ...] 

202 _request_serializer: Optional[SerializingFunction] 

203 _response_deserializer: Optional[DeserializingFunction] 

204 

205 def __init__( 

206 self, 

207 cython_call: cygrpc._AioCall, 

208 metadata: Metadata, 

209 request_serializer: Optional[SerializingFunction], 

210 response_deserializer: Optional[DeserializingFunction], 

211 loop: asyncio.AbstractEventLoop, 

212 ) -> None: 

213 self._loop = loop 

214 self._cython_call = cython_call 

215 self._metadata = tuple(metadata) 

216 self._request_serializer = request_serializer 

217 self._response_deserializer = response_deserializer 

218 

219 def __del__(self) -> None: 

220 # The '_cython_call' object might be destructed before Call object 

221 if hasattr(self, "_cython_call") and not self._cython_call.done(): 

222 self._cancel(_GC_CANCELLATION_DETAILS) 

223 

224 def cancelled(self) -> bool: 

225 return self._cython_call.cancelled() 

226 

227 def _cancel(self, details: str) -> bool: 

228 """Forwards the application cancellation reasoning.""" 

229 if not self._cython_call.done(): 

230 self._cython_call.cancel(details) 

231 return True 

232 return False 

233 

234 def cancel(self) -> bool: 

235 return self._cancel(_LOCAL_CANCELLATION_DETAILS) 

236 

237 def done(self) -> bool: 

238 return self._cython_call.done() 

239 

240 def add_done_callback(self, callback: DoneCallbackType) -> None: 

241 cb = partial(callback, self) 

242 self._cython_call.add_done_callback(cb) 

243 

244 def time_remaining(self) -> Optional[float]: 

245 return self._cython_call.time_remaining() 

246 

247 async def initial_metadata(self) -> Metadata: 

248 raw_metadata_tuple = await self._cython_call.initial_metadata() 

249 return Metadata.from_tuple(raw_metadata_tuple) 

250 

251 async def trailing_metadata(self) -> Metadata: 

252 raw_metadata_tuple = ( 

253 await self._cython_call.status() 

254 ).trailing_metadata() 

255 if not raw_metadata_tuple: 

256 return Metadata() 

257 return Metadata.from_tuple(raw_metadata_tuple) 

258 

259 async def code(self) -> grpc.StatusCode: 

260 cygrpc_code = (await self._cython_call.status()).code() 

261 return _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[cygrpc_code] 

262 

263 async def details(self) -> str: 

264 return (await self._cython_call.status()).details() 

265 

266 async def debug_error_string(self) -> str: 

267 return (await self._cython_call.status()).debug_error_string() 

268 

269 async def _raise_for_status(self) -> None: 

270 if self._cython_call.is_locally_cancelled(): 

271 raise asyncio.CancelledError() 

272 code = await self.code() 

273 if code != grpc.StatusCode.OK: 

274 raise _create_rpc_error( 

275 await self.initial_metadata(), 

276 await self._cython_call.status(), 

277 ) 

278 

279 def _repr(self) -> str: 

280 return repr(self._cython_call) 

281 

282 def __repr__(self) -> str: 

283 return self._repr() 

284 

285 def __str__(self) -> str: 

286 return self._repr() 

287 

288 

289class _APIStyle(enum.IntEnum): 

290 UNKNOWN = 0 

291 ASYNC_GENERATOR = 1 

292 READER_WRITER = 2 

293 

294 

295class _UnaryResponseMixin(Call, Generic[ResponseType]): 

296 _call_response: asyncio.Task 

297 

298 def _init_unary_response_mixin(self, response_task: asyncio.Task): 

299 self._call_response = response_task 

300 

301 def cancel(self) -> bool: 

302 if super().cancel(): 

303 self._call_response.cancel() 

304 return True 

305 return False 

306 

307 def __await__(self) -> Generator[Any, None, ResponseType]: 

308 """Wait till the ongoing RPC request finishes.""" 

309 try: 

310 response = yield from self._call_response 

311 except asyncio.CancelledError: 

312 # Even if we caught all other CancelledError, there is still 

313 # this corner case. If the application cancels immediately after 

314 # the Call object is created, we will observe this 

315 # `CancelledError`. 

316 if not self.cancelled(): 

317 self.cancel() 

318 raise 

319 

320 # NOTE(lidiz) If we raise RpcError in the task, and users doesn't 

321 # 'await' on it. AsyncIO will log 'Task exception was never retrieved'. 

322 # Instead, if we move the exception raising here, the spam stops. 

323 # Unfortunately, there can only be one 'yield from' in '__await__'. So, 

324 # we need to access the private instance variable. 

325 if response is cygrpc.EOF: 

326 if self._cython_call.is_locally_cancelled(): 

327 raise asyncio.CancelledError() 

328 else: 

329 raise _create_rpc_error( 

330 self._cython_call._initial_metadata, 

331 self._cython_call._status, 

332 ) 

333 else: 

334 return response 

335 

336 

337class _StreamResponseMixin(Call): 

338 _message_aiter: AsyncIterator[ResponseType] 

339 _preparation: asyncio.Task 

340 _response_style: _APIStyle 

341 

342 def _init_stream_response_mixin(self, preparation: asyncio.Task): 

343 self._message_aiter = None 

344 self._preparation = preparation 

345 self._response_style = _APIStyle.UNKNOWN 

346 

347 def _update_response_style(self, style: _APIStyle): 

348 if self._response_style is _APIStyle.UNKNOWN: 

349 self._response_style = style 

350 elif self._response_style is not style: 

351 raise cygrpc.UsageError(_API_STYLE_ERROR) 

352 

353 def cancel(self) -> bool: 

354 if super().cancel(): 

355 self._preparation.cancel() 

356 return True 

357 return False 

358 

359 async def _fetch_stream_responses(self) -> ResponseType: 

360 message = await self._read() 

361 while message is not cygrpc.EOF: 

362 yield message 

363 message = await self._read() 

364 

365 # If the read operation failed, Core should explain why. 

366 await self._raise_for_status() 

367 

368 def __aiter__(self) -> AsyncIterator[ResponseType]: 

369 self._update_response_style(_APIStyle.ASYNC_GENERATOR) 

370 if self._message_aiter is None: 

371 self._message_aiter = self._fetch_stream_responses() 

372 return self._message_aiter 

373 

374 async def _read(self) -> ResponseType: 

375 # Wait for the request being sent 

376 await self._preparation 

377 

378 # Reads response message from Core 

379 try: 

380 raw_response = await self._cython_call.receive_serialized_message() 

381 except asyncio.CancelledError: 

382 if not self.cancelled(): 

383 self.cancel() 

384 raise 

385 

386 if raw_response is cygrpc.EOF: 

387 return cygrpc.EOF 

388 return _common.deserialize(raw_response, self._response_deserializer) 

389 

390 async def read(self) -> Union[EOFType, ResponseType]: 

391 if self.done(): 

392 await self._raise_for_status() 

393 return cygrpc.EOF 

394 self._update_response_style(_APIStyle.READER_WRITER) 

395 

396 response_message = await self._read() 

397 

398 if response_message is cygrpc.EOF: 

399 # If the read operation failed, Core should explain why. 

400 await self._raise_for_status() 

401 return response_message 

402 

403 

404class _StreamRequestMixin(Call): 

405 _metadata_sent: asyncio.Event 

406 _done_writing_flag: bool 

407 _async_request_poller: Optional[asyncio.Task] 

408 _request_style: _APIStyle 

409 

410 def _init_stream_request_mixin( 

411 self, request_iterator: Optional[RequestIterableType] 

412 ): 

413 self._metadata_sent = asyncio.Event() 

414 self._done_writing_flag = False 

415 

416 # If user passes in an async iterator, create a consumer Task. 

417 if request_iterator is not None: 

418 self._async_request_poller = self._loop.create_task( 

419 self._consume_request_iterator(request_iterator) 

420 ) 

421 self._request_style = _APIStyle.ASYNC_GENERATOR 

422 else: 

423 self._async_request_poller = None 

424 self._request_style = _APIStyle.READER_WRITER 

425 

426 def _raise_for_different_style(self, style: _APIStyle): 

427 if self._request_style is not style: 

428 raise cygrpc.UsageError(_API_STYLE_ERROR) 

429 

430 def cancel(self) -> bool: 

431 if super().cancel(): 

432 if self._async_request_poller is not None: 

433 self._async_request_poller.cancel() 

434 return True 

435 return False 

436 

437 def _metadata_sent_observer(self): 

438 self._metadata_sent.set() 

439 

440 async def _consume_request_iterator( 

441 self, request_iterator: RequestIterableType 

442 ) -> None: 

443 try: 

444 if inspect.isasyncgen(request_iterator) or hasattr( 

445 request_iterator, "__aiter__" 

446 ): 

447 async for request in request_iterator: 

448 try: 

449 await self._write(request) 

450 except AioRpcError as rpc_error: 

451 _LOGGER.debug( 

452 ( 

453 "Exception while consuming the" 

454 " request_iterator: %s" 

455 ), 

456 rpc_error, 

457 ) 

458 return 

459 else: 

460 for request in request_iterator: 

461 try: 

462 await self._write(request) 

463 except AioRpcError as rpc_error: 

464 _LOGGER.debug( 

465 ( 

466 "Exception while consuming the" 

467 " request_iterator: %s" 

468 ), 

469 rpc_error, 

470 ) 

471 return 

472 

473 await self._done_writing() 

474 except: # pylint: disable=bare-except # noqa: E722 

475 # Client iterators can raise exceptions, which we should handle by 

476 # cancelling the RPC and logging the client's error. No exceptions 

477 # should escape this function. 

478 _LOGGER.debug( 

479 "Client request_iterator raised exception:\n%s", 

480 traceback.format_exc(), 

481 ) 

482 self.cancel() 

483 

484 async def _write(self, request: RequestType) -> None: 

485 if self.done(): 

486 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

487 if self._done_writing_flag: 

488 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS) 

489 if not self._metadata_sent.is_set(): 

490 await self._metadata_sent.wait() 

491 if self.done(): 

492 await self._raise_for_status() 

493 

494 serialized_request = _common.serialize( 

495 request, self._request_serializer 

496 ) 

497 try: 

498 await self._cython_call.send_serialized_message(serialized_request) 

499 except cygrpc.InternalError as err: 

500 self._cython_call.set_internal_error(str(err)) 

501 await self._raise_for_status() 

502 except asyncio.CancelledError: 

503 if not self.cancelled(): 

504 self.cancel() 

505 raise 

506 

507 async def _done_writing(self) -> None: 

508 if self.done(): 

509 # If the RPC is finished, do nothing. 

510 return 

511 if not self._done_writing_flag: 

512 # If the done writing is not sent before, try to send it. 

513 self._done_writing_flag = True 

514 try: 

515 await self._cython_call.send_receive_close() 

516 except asyncio.CancelledError: 

517 if not self.cancelled(): 

518 self.cancel() 

519 raise 

520 

521 async def write(self, request: RequestType) -> None: 

522 self._raise_for_different_style(_APIStyle.READER_WRITER) 

523 await self._write(request) 

524 

525 async def done_writing(self) -> None: 

526 """Signal peer that client is done writing. 

527 

528 This method is idempotent. 

529 """ 

530 self._raise_for_different_style(_APIStyle.READER_WRITER) 

531 await self._done_writing() 

532 

533 async def wait_for_connection(self) -> None: 

534 await self._metadata_sent.wait() 

535 if self.done(): 

536 await self._raise_for_status() 

537 

538 

539class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall): 

540 """Object for managing unary-unary RPC calls. 

541 

542 Returned when an instance of `UnaryUnaryMultiCallable` object is called. 

543 """ 

544 

545 _request: RequestType 

546 _invocation_task: asyncio.Task 

547 

548 # pylint: disable=too-many-arguments 

549 def __init__( 

550 self, 

551 request: RequestType, 

552 deadline: Optional[float], 

553 metadata: Metadata, 

554 credentials: Optional[grpc.CallCredentials], 

555 wait_for_ready: Optional[bool], 

556 channel: cygrpc.AioChannel, 

557 method: bytes, 

558 request_serializer: Optional[SerializingFunction], 

559 response_deserializer: Optional[DeserializingFunction], 

560 loop: asyncio.AbstractEventLoop, 

561 ) -> None: 

562 super().__init__( 

563 channel.call(method, deadline, credentials, wait_for_ready), 

564 metadata, 

565 request_serializer, 

566 response_deserializer, 

567 loop, 

568 ) 

569 self._request = request 

570 self._context = cygrpc.build_census_context() 

571 self._invocation_task = loop.create_task(self._invoke()) 

572 self._init_unary_response_mixin(self._invocation_task) 

573 

574 async def _invoke(self) -> ResponseType: 

575 serialized_request = _common.serialize( 

576 self._request, self._request_serializer 

577 ) 

578 

579 # NOTE(lidiz) asyncio.CancelledError is not a good transport for status, 

580 # because the asyncio.Task class do not cache the exception object. 

581 # https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785 

582 try: 

583 serialized_response = await self._cython_call.unary_unary( 

584 serialized_request, self._metadata, self._context 

585 ) 

586 except asyncio.CancelledError: 

587 if not self.cancelled(): 

588 self.cancel() 

589 

590 if self._cython_call.is_ok(): 

591 return _common.deserialize( 

592 serialized_response, self._response_deserializer 

593 ) 

594 return cygrpc.EOF 

595 

596 async def wait_for_connection(self) -> None: 

597 await self._invocation_task 

598 if self.done(): 

599 await self._raise_for_status() 

600 

601 

602class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall): 

603 """Object for managing unary-stream RPC calls. 

604 

605 Returned when an instance of `UnaryStreamMultiCallable` object is called. 

606 """ 

607 

608 _request: RequestType 

609 _send_unary_request_task: asyncio.Task 

610 

611 # pylint: disable=too-many-arguments 

612 def __init__( 

613 self, 

614 request: RequestType, 

615 deadline: Optional[float], 

616 metadata: Metadata, 

617 credentials: Optional[grpc.CallCredentials], 

618 wait_for_ready: Optional[bool], 

619 channel: cygrpc.AioChannel, 

620 method: bytes, 

621 request_serializer: Optional[SerializingFunction], 

622 response_deserializer: Optional[DeserializingFunction], 

623 loop: asyncio.AbstractEventLoop, 

624 ) -> None: 

625 super().__init__( 

626 channel.call(method, deadline, credentials, wait_for_ready), 

627 metadata, 

628 request_serializer, 

629 response_deserializer, 

630 loop, 

631 ) 

632 self._request = request 

633 self._context = cygrpc.build_census_context() 

634 self._send_unary_request_task = loop.create_task( 

635 self._send_unary_request() 

636 ) 

637 self._init_stream_response_mixin(self._send_unary_request_task) 

638 

639 async def _send_unary_request(self) -> ResponseType: 

640 serialized_request = _common.serialize( 

641 self._request, self._request_serializer 

642 ) 

643 try: 

644 await self._cython_call.initiate_unary_stream( 

645 serialized_request, self._metadata, self._context 

646 ) 

647 except asyncio.CancelledError: 

648 if not self.cancelled(): 

649 self.cancel() 

650 raise 

651 

652 async def wait_for_connection(self) -> None: 

653 await self._send_unary_request_task 

654 if self.done(): 

655 await self._raise_for_status() 

656 

657 

658# pylint: disable=too-many-ancestors 

659class StreamUnaryCall( 

660 _StreamRequestMixin, _UnaryResponseMixin, Call, _base_call.StreamUnaryCall 

661): 

662 """Object for managing stream-unary RPC calls. 

663 

664 Returned when an instance of `StreamUnaryMultiCallable` object is called. 

665 """ 

666 

667 # pylint: disable=too-many-arguments 

668 def __init__( 

669 self, 

670 request_iterator: Optional[RequestIterableType], 

671 deadline: Optional[float], 

672 metadata: Metadata, 

673 credentials: Optional[grpc.CallCredentials], 

674 wait_for_ready: Optional[bool], 

675 channel: cygrpc.AioChannel, 

676 method: bytes, 

677 request_serializer: Optional[SerializingFunction], 

678 response_deserializer: Optional[DeserializingFunction], 

679 loop: asyncio.AbstractEventLoop, 

680 ) -> None: 

681 super().__init__( 

682 channel.call(method, deadline, credentials, wait_for_ready), 

683 metadata, 

684 request_serializer, 

685 response_deserializer, 

686 loop, 

687 ) 

688 

689 self._context = cygrpc.build_census_context() 

690 self._init_stream_request_mixin(request_iterator) 

691 self._init_unary_response_mixin(loop.create_task(self._conduct_rpc())) 

692 

693 async def _conduct_rpc(self) -> ResponseType: 

694 try: 

695 serialized_response = await self._cython_call.stream_unary( 

696 self._metadata, self._metadata_sent_observer, self._context 

697 ) 

698 except asyncio.CancelledError: 

699 if not self.cancelled(): 

700 self.cancel() 

701 raise 

702 

703 if self._cython_call.is_ok(): 

704 return _common.deserialize( 

705 serialized_response, self._response_deserializer 

706 ) 

707 return cygrpc.EOF 

708 

709 

710class StreamStreamCall( 

711 _StreamRequestMixin, _StreamResponseMixin, Call, _base_call.StreamStreamCall 

712): 

713 """Object for managing stream-stream RPC calls. 

714 

715 Returned when an instance of `StreamStreamMultiCallable` object is called. 

716 """ 

717 

718 _initializer: asyncio.Task 

719 

720 # pylint: disable=too-many-arguments 

721 def __init__( 

722 self, 

723 request_iterator: Optional[RequestIterableType], 

724 deadline: Optional[float], 

725 metadata: Metadata, 

726 credentials: Optional[grpc.CallCredentials], 

727 wait_for_ready: Optional[bool], 

728 channel: cygrpc.AioChannel, 

729 method: bytes, 

730 request_serializer: Optional[SerializingFunction], 

731 response_deserializer: Optional[DeserializingFunction], 

732 loop: asyncio.AbstractEventLoop, 

733 ) -> None: 

734 super().__init__( 

735 channel.call(method, deadline, credentials, wait_for_ready), 

736 metadata, 

737 request_serializer, 

738 response_deserializer, 

739 loop, 

740 ) 

741 self._context = cygrpc.build_census_context() 

742 self._initializer = self._loop.create_task(self._prepare_rpc()) 

743 self._init_stream_request_mixin(request_iterator) 

744 self._init_stream_response_mixin(self._initializer) 

745 

746 async def _prepare_rpc(self): 

747 """Prepares the RPC for receiving/sending messages. 

748 

749 All other operations around the stream should only happen after the 

750 completion of this method. 

751 """ 

752 try: 

753 await self._cython_call.initiate_stream_stream( 

754 self._metadata, self._metadata_sent_observer, self._context 

755 ) 

756 except asyncio.CancelledError: 

757 if not self.cancelled(): 

758 self.cancel() 

759 # No need to raise RpcError here, because no one will `await` this task.