Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/aio/_call.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

354 statements  

1# Copyright 2019 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Asyncio Python.""" 

15 

16import asyncio 

17import enum 

18from functools import partial 

19import inspect 

20import logging 

21import traceback 

22from typing import ( 

23 Any, 

24 AsyncIterator, 

25 Generator, 

26 Generic, 

27 Optional, 

28 Tuple, 

29 Union, 

30) 

31 

32import grpc 

33from grpc import _common 

34from grpc._cython import cygrpc 

35 

36from . import _base_call 

37from ._metadata import Metadata 

38from ._typing import DeserializingFunction 

39from ._typing import DoneCallbackType 

40from ._typing import EOFType 

41from ._typing import MetadataType 

42from ._typing import MetadatumType 

43from ._typing import RequestIterableType 

44from ._typing import RequestType 

45from ._typing import ResponseType 

46from ._typing import SerializingFunction 

47 

48__all__ = "AioRpcError", "Call", "UnaryUnaryCall", "UnaryStreamCall" 

49 

50_LOCAL_CANCELLATION_DETAILS = "Locally cancelled by application!" 

51_GC_CANCELLATION_DETAILS = "Cancelled upon garbage collection!" 

52_RPC_ALREADY_FINISHED_DETAILS = "RPC already finished." 

53_RPC_HALF_CLOSED_DETAILS = 'RPC is half closed after calling "done_writing".' 

54_API_STYLE_ERROR = ( 

55 "The iterator and read/write APIs may not be mixed on a single RPC." 

56) 

57 

58_OK_CALL_REPRESENTATION = ( 

59 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' 

60) 

61 

62_NON_OK_CALL_REPRESENTATION = ( 

63 "<{} of RPC that terminated with:\n" 

64 "\tstatus = {}\n" 

65 '\tdetails = "{}"\n' 

66 '\tdebug_error_string = "{}"\n' 

67 ">" 

68) 

69 

70_LOGGER = logging.getLogger(__name__) 

71 

72 

73class AioRpcError(grpc.RpcError): 

74 """An implementation of RpcError to be used by the asynchronous API. 

75 

76 Raised RpcError is a snapshot of the final status of the RPC, values are 

77 determined. Hence, its methods no longer needs to be coroutines. 

78 """ 

79 

80 _code: grpc.StatusCode 

81 _details: Optional[str] 

82 _initial_metadata: Optional[Metadata] 

83 _trailing_metadata: Optional[Metadata] 

84 _debug_error_string: Optional[str] 

85 

86 def __init__( 

87 self, 

88 code: grpc.StatusCode, 

89 initial_metadata: Metadata, 

90 trailing_metadata: Metadata, 

91 details: Optional[str] = None, 

92 debug_error_string: Optional[str] = None, 

93 ) -> None: 

94 """Constructor. 

95 

96 Args: 

97 code: The status code with which the RPC has been finalized. 

98 initial_metadata: Optional initial metadata that could be sent by the 

99 Server. 

100 trailing_metadata: Optional metadata that could be sent by the Server. 

101 details: Optional details explaining the reason of the error. 

102 debug_error_string: Optional string 

103 """ 

104 super().__init__() 

105 self._code = code 

106 self._details = details 

107 self._initial_metadata = initial_metadata 

108 self._trailing_metadata = trailing_metadata 

109 self._debug_error_string = debug_error_string 

110 

111 def code(self) -> grpc.StatusCode: 

112 """Accesses the status code sent by the server. 

113 

114 Returns: 

115 The `grpc.StatusCode` status code. 

116 """ 

117 return self._code 

118 

119 def details(self) -> Optional[str]: 

120 """Accesses the details sent by the server. 

121 

122 Returns: 

123 The description of the error. 

124 """ 

125 return self._details 

126 

127 def initial_metadata(self) -> Metadata: 

128 """Accesses the initial metadata sent by the server. 

129 

130 Returns: 

131 The initial metadata received. 

132 """ 

133 return self._initial_metadata 

134 

135 def trailing_metadata(self) -> Metadata: 

136 """Accesses the trailing metadata sent by the server. 

137 

138 Returns: 

139 The trailing metadata received. 

140 """ 

141 return self._trailing_metadata 

142 

143 def debug_error_string(self) -> str: 

144 """Accesses the debug error string sent by the server. 

145 

146 Returns: 

147 The debug error string received. 

148 """ 

149 return self._debug_error_string 

150 

151 def _repr(self) -> str: 

152 """Assembles the error string for the RPC error.""" 

153 return _NON_OK_CALL_REPRESENTATION.format( 

154 self.__class__.__name__, 

155 self._code, 

156 self._details, 

157 self._debug_error_string, 

158 ) 

159 

160 def __repr__(self) -> str: 

161 return self._repr() 

162 

163 def __str__(self) -> str: 

164 return self._repr() 

165 

166 def __reduce__(self): 

167 return ( 

168 type(self), 

169 ( 

170 self._code, 

171 self._initial_metadata, 

172 self._trailing_metadata, 

173 self._details, 

174 self._debug_error_string, 

175 ), 

176 ) 

177 

178 

179def _create_rpc_error( 

180 initial_metadata: MetadataType, 

181 status: cygrpc.AioRpcStatus, 

182) -> AioRpcError: 

183 return AioRpcError( 

184 _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()], 

185 Metadata.from_tuple(initial_metadata), 

186 Metadata.from_tuple(status.trailing_metadata()), 

187 details=status.details(), 

188 debug_error_string=status.debug_error_string(), 

189 ) 

190 

191 

192class Call: 

193 """Base implementation of client RPC Call object. 

194 

195 Implements logic around final status, metadata and cancellation. 

196 """ 

197 

198 _loop: asyncio.AbstractEventLoop 

199 _code: grpc.StatusCode 

200 _cython_call: cygrpc._AioCall 

201 _metadata: Tuple[MetadatumType, ...] 

202 _request_serializer: Optional[SerializingFunction] 

203 _response_deserializer: Optional[DeserializingFunction] 

204 

205 def __init__( 

206 self, 

207 cython_call: cygrpc._AioCall, 

208 metadata: Metadata, 

209 request_serializer: Optional[SerializingFunction], 

210 response_deserializer: Optional[DeserializingFunction], 

211 loop: asyncio.AbstractEventLoop, 

212 ) -> None: 

213 self._loop = loop 

214 self._cython_call = cython_call 

215 self._metadata = tuple(metadata) 

216 self._request_serializer = request_serializer 

217 self._response_deserializer = response_deserializer 

218 

219 def __del__(self) -> None: 

220 # The '_cython_call' object might be destructed before Call object 

221 if hasattr(self, "_cython_call"): 

222 if not self._cython_call.done(): 

223 self._cancel(_GC_CANCELLATION_DETAILS) 

224 

225 def cancelled(self) -> bool: 

226 return self._cython_call.cancelled() 

227 

228 def _cancel(self, details: str) -> bool: 

229 """Forwards the application cancellation reasoning.""" 

230 if not self._cython_call.done(): 

231 self._cython_call.cancel(details) 

232 return True 

233 return False 

234 

235 def cancel(self) -> bool: 

236 return self._cancel(_LOCAL_CANCELLATION_DETAILS) 

237 

238 def done(self) -> bool: 

239 return self._cython_call.done() 

240 

241 def add_done_callback(self, callback: DoneCallbackType) -> None: 

242 cb = partial(callback, self) 

243 self._cython_call.add_done_callback(cb) 

244 

245 def time_remaining(self) -> Optional[float]: 

246 return self._cython_call.time_remaining() 

247 

248 async def initial_metadata(self) -> Metadata: 

249 raw_metadata_tuple = await self._cython_call.initial_metadata() 

250 return Metadata.from_tuple(raw_metadata_tuple) 

251 

252 async def trailing_metadata(self) -> Metadata: 

253 raw_metadata_tuple = ( 

254 await self._cython_call.status() 

255 ).trailing_metadata() 

256 return Metadata.from_tuple(raw_metadata_tuple) 

257 

258 async def code(self) -> grpc.StatusCode: 

259 cygrpc_code = (await self._cython_call.status()).code() 

260 return _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[cygrpc_code] 

261 

262 async def details(self) -> str: 

263 return (await self._cython_call.status()).details() 

264 

265 async def debug_error_string(self) -> str: 

266 return (await self._cython_call.status()).debug_error_string() 

267 

268 async def _raise_for_status(self) -> None: 

269 if self._cython_call.is_locally_cancelled(): 

270 raise asyncio.CancelledError() 

271 code = await self.code() 

272 if code != grpc.StatusCode.OK: 

273 raise _create_rpc_error( 

274 await self.initial_metadata(), 

275 await self._cython_call.status(), 

276 ) 

277 

278 def _repr(self) -> str: 

279 return repr(self._cython_call) 

280 

281 def __repr__(self) -> str: 

282 return self._repr() 

283 

284 def __str__(self) -> str: 

285 return self._repr() 

286 

287 

288class _APIStyle(enum.IntEnum): 

289 UNKNOWN = 0 

290 ASYNC_GENERATOR = 1 

291 READER_WRITER = 2 

292 

293 

294class _UnaryResponseMixin(Call, Generic[ResponseType]): 

295 _call_response: asyncio.Task 

296 

297 def _init_unary_response_mixin(self, response_task: asyncio.Task): 

298 self._call_response = response_task 

299 

300 def cancel(self) -> bool: 

301 if super().cancel(): 

302 self._call_response.cancel() 

303 return True 

304 return False 

305 

306 def __await__(self) -> Generator[Any, None, ResponseType]: 

307 """Wait till the ongoing RPC request finishes.""" 

308 try: 

309 response = yield from self._call_response 

310 except asyncio.CancelledError: 

311 # Even if we caught all other CancelledError, there is still 

312 # this corner case. If the application cancels immediately after 

313 # the Call object is created, we will observe this 

314 # `CancelledError`. 

315 if not self.cancelled(): 

316 self.cancel() 

317 raise 

318 

319 # NOTE(lidiz) If we raise RpcError in the task, and users doesn't 

320 # 'await' on it. AsyncIO will log 'Task exception was never retrieved'. 

321 # Instead, if we move the exception raising here, the spam stops. 

322 # Unfortunately, there can only be one 'yield from' in '__await__'. So, 

323 # we need to access the private instance variable. 

324 if response is cygrpc.EOF: 

325 if self._cython_call.is_locally_cancelled(): 

326 raise asyncio.CancelledError() 

327 else: 

328 raise _create_rpc_error( 

329 self._cython_call._initial_metadata, 

330 self._cython_call._status, 

331 ) 

332 else: 

333 return response 

334 

335 

336class _StreamResponseMixin(Call): 

337 _message_aiter: AsyncIterator[ResponseType] 

338 _preparation: asyncio.Task 

339 _response_style: _APIStyle 

340 

341 def _init_stream_response_mixin(self, preparation: asyncio.Task): 

342 self._message_aiter = None 

343 self._preparation = preparation 

344 self._response_style = _APIStyle.UNKNOWN 

345 

346 def _update_response_style(self, style: _APIStyle): 

347 if self._response_style is _APIStyle.UNKNOWN: 

348 self._response_style = style 

349 elif self._response_style is not style: 

350 raise cygrpc.UsageError(_API_STYLE_ERROR) 

351 

352 def cancel(self) -> bool: 

353 if super().cancel(): 

354 self._preparation.cancel() 

355 return True 

356 return False 

357 

358 async def _fetch_stream_responses(self) -> ResponseType: 

359 message = await self._read() 

360 while message is not cygrpc.EOF: 

361 yield message 

362 message = await self._read() 

363 

364 # If the read operation failed, Core should explain why. 

365 await self._raise_for_status() 

366 

367 def __aiter__(self) -> AsyncIterator[ResponseType]: 

368 self._update_response_style(_APIStyle.ASYNC_GENERATOR) 

369 if self._message_aiter is None: 

370 self._message_aiter = self._fetch_stream_responses() 

371 return self._message_aiter 

372 

373 async def _read(self) -> ResponseType: 

374 # Wait for the request being sent 

375 await self._preparation 

376 

377 # Reads response message from Core 

378 try: 

379 raw_response = await self._cython_call.receive_serialized_message() 

380 except asyncio.CancelledError: 

381 if not self.cancelled(): 

382 self.cancel() 

383 raise 

384 

385 if raw_response is cygrpc.EOF: 

386 return cygrpc.EOF 

387 return _common.deserialize(raw_response, self._response_deserializer) 

388 

389 async def read(self) -> Union[EOFType, ResponseType]: 

390 if self.done(): 

391 await self._raise_for_status() 

392 return cygrpc.EOF 

393 self._update_response_style(_APIStyle.READER_WRITER) 

394 

395 response_message = await self._read() 

396 

397 if response_message is cygrpc.EOF: 

398 # If the read operation failed, Core should explain why. 

399 await self._raise_for_status() 

400 return response_message 

401 

402 

403class _StreamRequestMixin(Call): 

404 _metadata_sent: asyncio.Event 

405 _done_writing_flag: bool 

406 _async_request_poller: Optional[asyncio.Task] 

407 _request_style: _APIStyle 

408 

409 def _init_stream_request_mixin( 

410 self, request_iterator: Optional[RequestIterableType] 

411 ): 

412 self._metadata_sent = asyncio.Event() 

413 self._done_writing_flag = False 

414 

415 # If user passes in an async iterator, create a consumer Task. 

416 if request_iterator is not None: 

417 self._async_request_poller = self._loop.create_task( 

418 self._consume_request_iterator(request_iterator) 

419 ) 

420 self._request_style = _APIStyle.ASYNC_GENERATOR 

421 else: 

422 self._async_request_poller = None 

423 self._request_style = _APIStyle.READER_WRITER 

424 

425 def _raise_for_different_style(self, style: _APIStyle): 

426 if self._request_style is not style: 

427 raise cygrpc.UsageError(_API_STYLE_ERROR) 

428 

429 def cancel(self) -> bool: 

430 if super().cancel(): 

431 if self._async_request_poller is not None: 

432 self._async_request_poller.cancel() 

433 return True 

434 return False 

435 

436 def _metadata_sent_observer(self): 

437 self._metadata_sent.set() 

438 

439 async def _consume_request_iterator( 

440 self, request_iterator: RequestIterableType 

441 ) -> None: 

442 try: 

443 if inspect.isasyncgen(request_iterator) or hasattr( 

444 request_iterator, "__aiter__" 

445 ): 

446 async for request in request_iterator: 

447 try: 

448 await self._write(request) 

449 except AioRpcError as rpc_error: 

450 _LOGGER.debug( 

451 ( 

452 "Exception while consuming the" 

453 " request_iterator: %s" 

454 ), 

455 rpc_error, 

456 ) 

457 return 

458 else: 

459 for request in request_iterator: 

460 try: 

461 await self._write(request) 

462 except AioRpcError as rpc_error: 

463 _LOGGER.debug( 

464 ( 

465 "Exception while consuming the" 

466 " request_iterator: %s" 

467 ), 

468 rpc_error, 

469 ) 

470 return 

471 

472 await self._done_writing() 

473 except: # pylint: disable=bare-except # noqa: E722 

474 # Client iterators can raise exceptions, which we should handle by 

475 # cancelling the RPC and logging the client's error. No exceptions 

476 # should escape this function. 

477 _LOGGER.debug( 

478 "Client request_iterator raised exception:\n%s", 

479 traceback.format_exc(), 

480 ) 

481 self.cancel() 

482 

483 async def _write(self, request: RequestType) -> None: 

484 if self.done(): 

485 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

486 if self._done_writing_flag: 

487 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS) 

488 if not self._metadata_sent.is_set(): 

489 await self._metadata_sent.wait() 

490 if self.done(): 

491 await self._raise_for_status() 

492 

493 serialized_request = _common.serialize( 

494 request, self._request_serializer 

495 ) 

496 try: 

497 await self._cython_call.send_serialized_message(serialized_request) 

498 except cygrpc.InternalError as err: 

499 self._cython_call.set_internal_error(str(err)) 

500 await self._raise_for_status() 

501 except asyncio.CancelledError: 

502 if not self.cancelled(): 

503 self.cancel() 

504 raise 

505 

506 async def _done_writing(self) -> None: 

507 if self.done(): 

508 # If the RPC is finished, do nothing. 

509 return 

510 if not self._done_writing_flag: 

511 # If the done writing is not sent before, try to send it. 

512 self._done_writing_flag = True 

513 try: 

514 await self._cython_call.send_receive_close() 

515 except asyncio.CancelledError: 

516 if not self.cancelled(): 

517 self.cancel() 

518 raise 

519 

520 async def write(self, request: RequestType) -> None: 

521 self._raise_for_different_style(_APIStyle.READER_WRITER) 

522 await self._write(request) 

523 

524 async def done_writing(self) -> None: 

525 """Signal peer that client is done writing. 

526 

527 This method is idempotent. 

528 """ 

529 self._raise_for_different_style(_APIStyle.READER_WRITER) 

530 await self._done_writing() 

531 

532 async def wait_for_connection(self) -> None: 

533 await self._metadata_sent.wait() 

534 if self.done(): 

535 await self._raise_for_status() 

536 

537 

538class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall): 

539 """Object for managing unary-unary RPC calls. 

540 

541 Returned when an instance of `UnaryUnaryMultiCallable` object is called. 

542 """ 

543 

544 _request: RequestType 

545 _invocation_task: asyncio.Task 

546 

547 # pylint: disable=too-many-arguments 

548 def __init__( 

549 self, 

550 request: RequestType, 

551 deadline: Optional[float], 

552 metadata: Metadata, 

553 credentials: Optional[grpc.CallCredentials], 

554 wait_for_ready: Optional[bool], 

555 channel: cygrpc.AioChannel, 

556 method: bytes, 

557 request_serializer: Optional[SerializingFunction], 

558 response_deserializer: Optional[DeserializingFunction], 

559 loop: asyncio.AbstractEventLoop, 

560 ) -> None: 

561 super().__init__( 

562 channel.call(method, deadline, credentials, wait_for_ready), 

563 metadata, 

564 request_serializer, 

565 response_deserializer, 

566 loop, 

567 ) 

568 self._request = request 

569 self._context = cygrpc.build_census_context() 

570 self._invocation_task = loop.create_task(self._invoke()) 

571 self._init_unary_response_mixin(self._invocation_task) 

572 

573 async def _invoke(self) -> ResponseType: 

574 serialized_request = _common.serialize( 

575 self._request, self._request_serializer 

576 ) 

577 

578 # NOTE(lidiz) asyncio.CancelledError is not a good transport for status, 

579 # because the asyncio.Task class do not cache the exception object. 

580 # https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785 

581 try: 

582 serialized_response = await self._cython_call.unary_unary( 

583 serialized_request, self._metadata, self._context 

584 ) 

585 except asyncio.CancelledError: 

586 if not self.cancelled(): 

587 self.cancel() 

588 

589 if self._cython_call.is_ok(): 

590 return _common.deserialize( 

591 serialized_response, self._response_deserializer 

592 ) 

593 return cygrpc.EOF 

594 

595 async def wait_for_connection(self) -> None: 

596 await self._invocation_task 

597 if self.done(): 

598 await self._raise_for_status() 

599 

600 

601class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall): 

602 """Object for managing unary-stream RPC calls. 

603 

604 Returned when an instance of `UnaryStreamMultiCallable` object is called. 

605 """ 

606 

607 _request: RequestType 

608 _send_unary_request_task: asyncio.Task 

609 

610 # pylint: disable=too-many-arguments 

611 def __init__( 

612 self, 

613 request: RequestType, 

614 deadline: Optional[float], 

615 metadata: Metadata, 

616 credentials: Optional[grpc.CallCredentials], 

617 wait_for_ready: Optional[bool], 

618 channel: cygrpc.AioChannel, 

619 method: bytes, 

620 request_serializer: Optional[SerializingFunction], 

621 response_deserializer: Optional[DeserializingFunction], 

622 loop: asyncio.AbstractEventLoop, 

623 ) -> None: 

624 super().__init__( 

625 channel.call(method, deadline, credentials, wait_for_ready), 

626 metadata, 

627 request_serializer, 

628 response_deserializer, 

629 loop, 

630 ) 

631 self._request = request 

632 self._context = cygrpc.build_census_context() 

633 self._send_unary_request_task = loop.create_task( 

634 self._send_unary_request() 

635 ) 

636 self._init_stream_response_mixin(self._send_unary_request_task) 

637 

638 async def _send_unary_request(self) -> ResponseType: 

639 serialized_request = _common.serialize( 

640 self._request, self._request_serializer 

641 ) 

642 try: 

643 await self._cython_call.initiate_unary_stream( 

644 serialized_request, self._metadata, self._context 

645 ) 

646 except asyncio.CancelledError: 

647 if not self.cancelled(): 

648 self.cancel() 

649 raise 

650 

651 async def wait_for_connection(self) -> None: 

652 await self._send_unary_request_task 

653 if self.done(): 

654 await self._raise_for_status() 

655 

656 

657# pylint: disable=too-many-ancestors 

658class StreamUnaryCall( 

659 _StreamRequestMixin, _UnaryResponseMixin, Call, _base_call.StreamUnaryCall 

660): 

661 """Object for managing stream-unary RPC calls. 

662 

663 Returned when an instance of `StreamUnaryMultiCallable` object is called. 

664 """ 

665 

666 # pylint: disable=too-many-arguments 

667 def __init__( 

668 self, 

669 request_iterator: Optional[RequestIterableType], 

670 deadline: Optional[float], 

671 metadata: Metadata, 

672 credentials: Optional[grpc.CallCredentials], 

673 wait_for_ready: Optional[bool], 

674 channel: cygrpc.AioChannel, 

675 method: bytes, 

676 request_serializer: Optional[SerializingFunction], 

677 response_deserializer: Optional[DeserializingFunction], 

678 loop: asyncio.AbstractEventLoop, 

679 ) -> None: 

680 super().__init__( 

681 channel.call(method, deadline, credentials, wait_for_ready), 

682 metadata, 

683 request_serializer, 

684 response_deserializer, 

685 loop, 

686 ) 

687 

688 self._context = cygrpc.build_census_context() 

689 self._init_stream_request_mixin(request_iterator) 

690 self._init_unary_response_mixin(loop.create_task(self._conduct_rpc())) 

691 

692 async def _conduct_rpc(self) -> ResponseType: 

693 try: 

694 serialized_response = await self._cython_call.stream_unary( 

695 self._metadata, self._metadata_sent_observer, self._context 

696 ) 

697 except asyncio.CancelledError: 

698 if not self.cancelled(): 

699 self.cancel() 

700 raise 

701 

702 if self._cython_call.is_ok(): 

703 return _common.deserialize( 

704 serialized_response, self._response_deserializer 

705 ) 

706 return cygrpc.EOF 

707 

708 

709class StreamStreamCall( 

710 _StreamRequestMixin, _StreamResponseMixin, Call, _base_call.StreamStreamCall 

711): 

712 """Object for managing stream-stream RPC calls. 

713 

714 Returned when an instance of `StreamStreamMultiCallable` object is called. 

715 """ 

716 

717 _initializer: asyncio.Task 

718 

719 # pylint: disable=too-many-arguments 

720 def __init__( 

721 self, 

722 request_iterator: Optional[RequestIterableType], 

723 deadline: Optional[float], 

724 metadata: Metadata, 

725 credentials: Optional[grpc.CallCredentials], 

726 wait_for_ready: Optional[bool], 

727 channel: cygrpc.AioChannel, 

728 method: bytes, 

729 request_serializer: Optional[SerializingFunction], 

730 response_deserializer: Optional[DeserializingFunction], 

731 loop: asyncio.AbstractEventLoop, 

732 ) -> None: 

733 super().__init__( 

734 channel.call(method, deadline, credentials, wait_for_ready), 

735 metadata, 

736 request_serializer, 

737 response_deserializer, 

738 loop, 

739 ) 

740 self._context = cygrpc.build_census_context() 

741 self._initializer = self._loop.create_task(self._prepare_rpc()) 

742 self._init_stream_request_mixin(request_iterator) 

743 self._init_stream_response_mixin(self._initializer) 

744 

745 async def _prepare_rpc(self): 

746 """Prepares the RPC for receiving/sending messages. 

747 

748 All other operations around the stream should only happen after the 

749 completion of this method. 

750 """ 

751 try: 

752 await self._cython_call.initiate_stream_stream( 

753 self._metadata, self._metadata_sent_observer, self._context 

754 ) 

755 except asyncio.CancelledError: 

756 if not self.cancelled(): 

757 self.cancel() 

758 # No need to raise RpcError here, because no one will `await` this task.