Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_call.py: 36%

346 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:37 +0000

1# Copyright 2019 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Asyncio Python.""" 

15 

16import asyncio 

17import enum 

18from functools import partial 

19import inspect 

20import logging 

21import traceback 

22from typing import Any, AsyncIterator, Generator, Generic, Optional, Tuple 

23 

24import grpc 

25from grpc import _common 

26from grpc._cython import cygrpc 

27 

28from . import _base_call 

29from ._metadata import Metadata 

30from ._typing import DeserializingFunction 

31from ._typing import DoneCallbackType 

32from ._typing import MetadatumType 

33from ._typing import RequestIterableType 

34from ._typing import RequestType 

35from ._typing import ResponseType 

36from ._typing import SerializingFunction 

37 

38__all__ = "AioRpcError", "Call", "UnaryUnaryCall", "UnaryStreamCall" 

39 

40_LOCAL_CANCELLATION_DETAILS = "Locally cancelled by application!" 

41_GC_CANCELLATION_DETAILS = "Cancelled upon garbage collection!" 

42_RPC_ALREADY_FINISHED_DETAILS = "RPC already finished." 

43_RPC_HALF_CLOSED_DETAILS = 'RPC is half closed after calling "done_writing".' 

44_API_STYLE_ERROR = ( 

45 "The iterator and read/write APIs may not be mixed on a single RPC." 

46) 

47 

48_OK_CALL_REPRESENTATION = ( 

49 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' 

50) 

51 

52_NON_OK_CALL_REPRESENTATION = ( 

53 "<{} of RPC that terminated with:\n" 

54 "\tstatus = {}\n" 

55 '\tdetails = "{}"\n' 

56 '\tdebug_error_string = "{}"\n' 

57 ">" 

58) 

59 

60_LOGGER = logging.getLogger(__name__) 

61 

62 

63class AioRpcError(grpc.RpcError): 

64 """An implementation of RpcError to be used by the asynchronous API. 

65 

66 Raised RpcError is a snapshot of the final status of the RPC, values are 

67 determined. Hence, its methods no longer needs to be coroutines. 

68 """ 

69 

70 _code: grpc.StatusCode 

71 _details: Optional[str] 

72 _initial_metadata: Optional[Metadata] 

73 _trailing_metadata: Optional[Metadata] 

74 _debug_error_string: Optional[str] 

75 

76 def __init__( 

77 self, 

78 code: grpc.StatusCode, 

79 initial_metadata: Metadata, 

80 trailing_metadata: Metadata, 

81 details: Optional[str] = None, 

82 debug_error_string: Optional[str] = None, 

83 ) -> None: 

84 """Constructor. 

85 

86 Args: 

87 code: The status code with which the RPC has been finalized. 

88 details: Optional details explaining the reason of the error. 

89 initial_metadata: Optional initial metadata that could be sent by the 

90 Server. 

91 trailing_metadata: Optional metadata that could be sent by the Server. 

92 """ 

93 

94 super().__init__() 

95 self._code = code 

96 self._details = details 

97 self._initial_metadata = initial_metadata 

98 self._trailing_metadata = trailing_metadata 

99 self._debug_error_string = debug_error_string 

100 

101 def code(self) -> grpc.StatusCode: 

102 """Accesses the status code sent by the server. 

103 

104 Returns: 

105 The `grpc.StatusCode` status code. 

106 """ 

107 return self._code 

108 

109 def details(self) -> Optional[str]: 

110 """Accesses the details sent by the server. 

111 

112 Returns: 

113 The description of the error. 

114 """ 

115 return self._details 

116 

117 def initial_metadata(self) -> Metadata: 

118 """Accesses the initial metadata sent by the server. 

119 

120 Returns: 

121 The initial metadata received. 

122 """ 

123 return self._initial_metadata 

124 

125 def trailing_metadata(self) -> Metadata: 

126 """Accesses the trailing metadata sent by the server. 

127 

128 Returns: 

129 The trailing metadata received. 

130 """ 

131 return self._trailing_metadata 

132 

133 def debug_error_string(self) -> str: 

134 """Accesses the debug error string sent by the server. 

135 

136 Returns: 

137 The debug error string received. 

138 """ 

139 return self._debug_error_string 

140 

141 def _repr(self) -> str: 

142 """Assembles the error string for the RPC error.""" 

143 return _NON_OK_CALL_REPRESENTATION.format( 

144 self.__class__.__name__, 

145 self._code, 

146 self._details, 

147 self._debug_error_string, 

148 ) 

149 

150 def __repr__(self) -> str: 

151 return self._repr() 

152 

153 def __str__(self) -> str: 

154 return self._repr() 

155 

156 def __reduce__(self): 

157 return ( 

158 type(self), 

159 ( 

160 self._code, 

161 self._initial_metadata, 

162 self._trailing_metadata, 

163 self._details, 

164 self._debug_error_string, 

165 ), 

166 ) 

167 

168 

169def _create_rpc_error( 

170 initial_metadata: Metadata, status: cygrpc.AioRpcStatus 

171) -> AioRpcError: 

172 return AioRpcError( 

173 _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()], 

174 Metadata.from_tuple(initial_metadata), 

175 Metadata.from_tuple(status.trailing_metadata()), 

176 details=status.details(), 

177 debug_error_string=status.debug_error_string(), 

178 ) 

179 

180 

181class Call: 

182 """Base implementation of client RPC Call object. 

183 

184 Implements logic around final status, metadata and cancellation. 

185 """ 

186 

187 _loop: asyncio.AbstractEventLoop 

188 _code: grpc.StatusCode 

189 _cython_call: cygrpc._AioCall 

190 _metadata: Tuple[MetadatumType, ...] 

191 _request_serializer: SerializingFunction 

192 _response_deserializer: DeserializingFunction 

193 

194 def __init__( 

195 self, 

196 cython_call: cygrpc._AioCall, 

197 metadata: Metadata, 

198 request_serializer: SerializingFunction, 

199 response_deserializer: DeserializingFunction, 

200 loop: asyncio.AbstractEventLoop, 

201 ) -> None: 

202 self._loop = loop 

203 self._cython_call = cython_call 

204 self._metadata = tuple(metadata) 

205 self._request_serializer = request_serializer 

206 self._response_deserializer = response_deserializer 

207 

208 def __del__(self) -> None: 

209 # The '_cython_call' object might be destructed before Call object 

210 if hasattr(self, "_cython_call"): 

211 if not self._cython_call.done(): 

212 self._cancel(_GC_CANCELLATION_DETAILS) 

213 

214 def cancelled(self) -> bool: 

215 return self._cython_call.cancelled() 

216 

217 def _cancel(self, details: str) -> bool: 

218 """Forwards the application cancellation reasoning.""" 

219 if not self._cython_call.done(): 

220 self._cython_call.cancel(details) 

221 return True 

222 else: 

223 return False 

224 

225 def cancel(self) -> bool: 

226 return self._cancel(_LOCAL_CANCELLATION_DETAILS) 

227 

228 def done(self) -> bool: 

229 return self._cython_call.done() 

230 

231 def add_done_callback(self, callback: DoneCallbackType) -> None: 

232 cb = partial(callback, self) 

233 self._cython_call.add_done_callback(cb) 

234 

235 def time_remaining(self) -> Optional[float]: 

236 return self._cython_call.time_remaining() 

237 

238 async def initial_metadata(self) -> Metadata: 

239 raw_metadata_tuple = await self._cython_call.initial_metadata() 

240 return Metadata.from_tuple(raw_metadata_tuple) 

241 

242 async def trailing_metadata(self) -> Metadata: 

243 raw_metadata_tuple = ( 

244 await self._cython_call.status() 

245 ).trailing_metadata() 

246 return Metadata.from_tuple(raw_metadata_tuple) 

247 

248 async def code(self) -> grpc.StatusCode: 

249 cygrpc_code = (await self._cython_call.status()).code() 

250 return _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[cygrpc_code] 

251 

252 async def details(self) -> str: 

253 return (await self._cython_call.status()).details() 

254 

255 async def debug_error_string(self) -> str: 

256 return (await self._cython_call.status()).debug_error_string() 

257 

258 async def _raise_for_status(self) -> None: 

259 if self._cython_call.is_locally_cancelled(): 

260 raise asyncio.CancelledError() 

261 code = await self.code() 

262 if code != grpc.StatusCode.OK: 

263 raise _create_rpc_error( 

264 await self.initial_metadata(), await self._cython_call.status() 

265 ) 

266 

267 def _repr(self) -> str: 

268 return repr(self._cython_call) 

269 

270 def __repr__(self) -> str: 

271 return self._repr() 

272 

273 def __str__(self) -> str: 

274 return self._repr() 

275 

276 

277class _APIStyle(enum.IntEnum): 

278 UNKNOWN = 0 

279 ASYNC_GENERATOR = 1 

280 READER_WRITER = 2 

281 

282 

283class _UnaryResponseMixin(Call, Generic[ResponseType]): 

284 _call_response: asyncio.Task 

285 

286 def _init_unary_response_mixin(self, response_task: asyncio.Task): 

287 self._call_response = response_task 

288 

289 def cancel(self) -> bool: 

290 if super().cancel(): 

291 self._call_response.cancel() 

292 return True 

293 else: 

294 return False 

295 

296 def __await__(self) -> Generator[Any, None, ResponseType]: 

297 """Wait till the ongoing RPC request finishes.""" 

298 try: 

299 response = yield from self._call_response 

300 except asyncio.CancelledError: 

301 # Even if we caught all other CancelledError, there is still 

302 # this corner case. If the application cancels immediately after 

303 # the Call object is created, we will observe this 

304 # `CancelledError`. 

305 if not self.cancelled(): 

306 self.cancel() 

307 raise 

308 

309 # NOTE(lidiz) If we raise RpcError in the task, and users doesn't 

310 # 'await' on it. AsyncIO will log 'Task exception was never retrieved'. 

311 # Instead, if we move the exception raising here, the spam stops. 

312 # Unfortunately, there can only be one 'yield from' in '__await__'. So, 

313 # we need to access the private instance variable. 

314 if response is cygrpc.EOF: 

315 if self._cython_call.is_locally_cancelled(): 

316 raise asyncio.CancelledError() 

317 else: 

318 raise _create_rpc_error( 

319 self._cython_call._initial_metadata, 

320 self._cython_call._status, 

321 ) 

322 else: 

323 return response 

324 

325 

326class _StreamResponseMixin(Call): 

327 _message_aiter: AsyncIterator[ResponseType] 

328 _preparation: asyncio.Task 

329 _response_style: _APIStyle 

330 

331 def _init_stream_response_mixin(self, preparation: asyncio.Task): 

332 self._message_aiter = None 

333 self._preparation = preparation 

334 self._response_style = _APIStyle.UNKNOWN 

335 

336 def _update_response_style(self, style: _APIStyle): 

337 if self._response_style is _APIStyle.UNKNOWN: 

338 self._response_style = style 

339 elif self._response_style is not style: 

340 raise cygrpc.UsageError(_API_STYLE_ERROR) 

341 

342 def cancel(self) -> bool: 

343 if super().cancel(): 

344 self._preparation.cancel() 

345 return True 

346 else: 

347 return False 

348 

349 async def _fetch_stream_responses(self) -> ResponseType: 

350 message = await self._read() 

351 while message is not cygrpc.EOF: 

352 yield message 

353 message = await self._read() 

354 

355 # If the read operation failed, Core should explain why. 

356 await self._raise_for_status() 

357 

358 def __aiter__(self) -> AsyncIterator[ResponseType]: 

359 self._update_response_style(_APIStyle.ASYNC_GENERATOR) 

360 if self._message_aiter is None: 

361 self._message_aiter = self._fetch_stream_responses() 

362 return self._message_aiter 

363 

364 async def _read(self) -> ResponseType: 

365 # Wait for the request being sent 

366 await self._preparation 

367 

368 # Reads response message from Core 

369 try: 

370 raw_response = await self._cython_call.receive_serialized_message() 

371 except asyncio.CancelledError: 

372 if not self.cancelled(): 

373 self.cancel() 

374 raise 

375 

376 if raw_response is cygrpc.EOF: 

377 return cygrpc.EOF 

378 else: 

379 return _common.deserialize( 

380 raw_response, self._response_deserializer 

381 ) 

382 

383 async def read(self) -> ResponseType: 

384 if self.done(): 

385 await self._raise_for_status() 

386 return cygrpc.EOF 

387 self._update_response_style(_APIStyle.READER_WRITER) 

388 

389 response_message = await self._read() 

390 

391 if response_message is cygrpc.EOF: 

392 # If the read operation failed, Core should explain why. 

393 await self._raise_for_status() 

394 return response_message 

395 

396 

397class _StreamRequestMixin(Call): 

398 _metadata_sent: asyncio.Event 

399 _done_writing_flag: bool 

400 _async_request_poller: Optional[asyncio.Task] 

401 _request_style: _APIStyle 

402 

403 def _init_stream_request_mixin( 

404 self, request_iterator: Optional[RequestIterableType] 

405 ): 

406 self._metadata_sent = asyncio.Event() 

407 self._done_writing_flag = False 

408 

409 # If user passes in an async iterator, create a consumer Task. 

410 if request_iterator is not None: 

411 self._async_request_poller = self._loop.create_task( 

412 self._consume_request_iterator(request_iterator) 

413 ) 

414 self._request_style = _APIStyle.ASYNC_GENERATOR 

415 else: 

416 self._async_request_poller = None 

417 self._request_style = _APIStyle.READER_WRITER 

418 

419 def _raise_for_different_style(self, style: _APIStyle): 

420 if self._request_style is not style: 

421 raise cygrpc.UsageError(_API_STYLE_ERROR) 

422 

423 def cancel(self) -> bool: 

424 if super().cancel(): 

425 if self._async_request_poller is not None: 

426 self._async_request_poller.cancel() 

427 return True 

428 else: 

429 return False 

430 

431 def _metadata_sent_observer(self): 

432 self._metadata_sent.set() 

433 

434 async def _consume_request_iterator( 

435 self, request_iterator: RequestIterableType 

436 ) -> None: 

437 try: 

438 if inspect.isasyncgen(request_iterator) or hasattr( 

439 request_iterator, "__aiter__" 

440 ): 

441 async for request in request_iterator: 

442 try: 

443 await self._write(request) 

444 except AioRpcError as rpc_error: 

445 _LOGGER.debug( 

446 ( 

447 "Exception while consuming the" 

448 " request_iterator: %s" 

449 ), 

450 rpc_error, 

451 ) 

452 return 

453 else: 

454 for request in request_iterator: 

455 try: 

456 await self._write(request) 

457 except AioRpcError as rpc_error: 

458 _LOGGER.debug( 

459 ( 

460 "Exception while consuming the" 

461 " request_iterator: %s" 

462 ), 

463 rpc_error, 

464 ) 

465 return 

466 

467 await self._done_writing() 

468 except: # pylint: disable=bare-except 

469 # Client iterators can raise exceptions, which we should handle by 

470 # cancelling the RPC and logging the client's error. No exceptions 

471 # should escape this function. 

472 _LOGGER.debug( 

473 "Client request_iterator raised exception:\n%s", 

474 traceback.format_exc(), 

475 ) 

476 self.cancel() 

477 

478 async def _write(self, request: RequestType) -> None: 

479 if self.done(): 

480 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

481 if self._done_writing_flag: 

482 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS) 

483 if not self._metadata_sent.is_set(): 

484 await self._metadata_sent.wait() 

485 if self.done(): 

486 await self._raise_for_status() 

487 

488 serialized_request = _common.serialize( 

489 request, self._request_serializer 

490 ) 

491 try: 

492 await self._cython_call.send_serialized_message(serialized_request) 

493 except cygrpc.InternalError: 

494 await self._raise_for_status() 

495 except asyncio.CancelledError: 

496 if not self.cancelled(): 

497 self.cancel() 

498 raise 

499 

500 async def _done_writing(self) -> None: 

501 if self.done(): 

502 # If the RPC is finished, do nothing. 

503 return 

504 if not self._done_writing_flag: 

505 # If the done writing is not sent before, try to send it. 

506 self._done_writing_flag = True 

507 try: 

508 await self._cython_call.send_receive_close() 

509 except asyncio.CancelledError: 

510 if not self.cancelled(): 

511 self.cancel() 

512 raise 

513 

514 async def write(self, request: RequestType) -> None: 

515 self._raise_for_different_style(_APIStyle.READER_WRITER) 

516 await self._write(request) 

517 

518 async def done_writing(self) -> None: 

519 """Signal peer that client is done writing. 

520 

521 This method is idempotent. 

522 """ 

523 self._raise_for_different_style(_APIStyle.READER_WRITER) 

524 await self._done_writing() 

525 

526 async def wait_for_connection(self) -> None: 

527 await self._metadata_sent.wait() 

528 if self.done(): 

529 await self._raise_for_status() 

530 

531 

532class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall): 

533 """Object for managing unary-unary RPC calls. 

534 

535 Returned when an instance of `UnaryUnaryMultiCallable` object is called. 

536 """ 

537 

538 _request: RequestType 

539 _invocation_task: asyncio.Task 

540 

541 # pylint: disable=too-many-arguments 

542 def __init__( 

543 self, 

544 request: RequestType, 

545 deadline: Optional[float], 

546 metadata: Metadata, 

547 credentials: Optional[grpc.CallCredentials], 

548 wait_for_ready: Optional[bool], 

549 channel: cygrpc.AioChannel, 

550 method: bytes, 

551 request_serializer: SerializingFunction, 

552 response_deserializer: DeserializingFunction, 

553 loop: asyncio.AbstractEventLoop, 

554 ) -> None: 

555 super().__init__( 

556 channel.call(method, deadline, credentials, wait_for_ready), 

557 metadata, 

558 request_serializer, 

559 response_deserializer, 

560 loop, 

561 ) 

562 self._request = request 

563 self._invocation_task = loop.create_task(self._invoke()) 

564 self._init_unary_response_mixin(self._invocation_task) 

565 

566 async def _invoke(self) -> ResponseType: 

567 serialized_request = _common.serialize( 

568 self._request, self._request_serializer 

569 ) 

570 

571 # NOTE(lidiz) asyncio.CancelledError is not a good transport for status, 

572 # because the asyncio.Task class do not cache the exception object. 

573 # https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785 

574 try: 

575 serialized_response = await self._cython_call.unary_unary( 

576 serialized_request, self._metadata 

577 ) 

578 except asyncio.CancelledError: 

579 if not self.cancelled(): 

580 self.cancel() 

581 

582 if self._cython_call.is_ok(): 

583 return _common.deserialize( 

584 serialized_response, self._response_deserializer 

585 ) 

586 else: 

587 return cygrpc.EOF 

588 

589 async def wait_for_connection(self) -> None: 

590 await self._invocation_task 

591 if self.done(): 

592 await self._raise_for_status() 

593 

594 

595class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall): 

596 """Object for managing unary-stream RPC calls. 

597 

598 Returned when an instance of `UnaryStreamMultiCallable` object is called. 

599 """ 

600 

601 _request: RequestType 

602 _send_unary_request_task: asyncio.Task 

603 

604 # pylint: disable=too-many-arguments 

605 def __init__( 

606 self, 

607 request: RequestType, 

608 deadline: Optional[float], 

609 metadata: Metadata, 

610 credentials: Optional[grpc.CallCredentials], 

611 wait_for_ready: Optional[bool], 

612 channel: cygrpc.AioChannel, 

613 method: bytes, 

614 request_serializer: SerializingFunction, 

615 response_deserializer: DeserializingFunction, 

616 loop: asyncio.AbstractEventLoop, 

617 ) -> None: 

618 super().__init__( 

619 channel.call(method, deadline, credentials, wait_for_ready), 

620 metadata, 

621 request_serializer, 

622 response_deserializer, 

623 loop, 

624 ) 

625 self._request = request 

626 self._send_unary_request_task = loop.create_task( 

627 self._send_unary_request() 

628 ) 

629 self._init_stream_response_mixin(self._send_unary_request_task) 

630 

631 async def _send_unary_request(self) -> ResponseType: 

632 serialized_request = _common.serialize( 

633 self._request, self._request_serializer 

634 ) 

635 try: 

636 await self._cython_call.initiate_unary_stream( 

637 serialized_request, self._metadata 

638 ) 

639 except asyncio.CancelledError: 

640 if not self.cancelled(): 

641 self.cancel() 

642 raise 

643 

644 async def wait_for_connection(self) -> None: 

645 await self._send_unary_request_task 

646 if self.done(): 

647 await self._raise_for_status() 

648 

649 

650# pylint: disable=too-many-ancestors 

651class StreamUnaryCall( 

652 _StreamRequestMixin, _UnaryResponseMixin, Call, _base_call.StreamUnaryCall 

653): 

654 """Object for managing stream-unary RPC calls. 

655 

656 Returned when an instance of `StreamUnaryMultiCallable` object is called. 

657 """ 

658 

659 # pylint: disable=too-many-arguments 

660 def __init__( 

661 self, 

662 request_iterator: Optional[RequestIterableType], 

663 deadline: Optional[float], 

664 metadata: Metadata, 

665 credentials: Optional[grpc.CallCredentials], 

666 wait_for_ready: Optional[bool], 

667 channel: cygrpc.AioChannel, 

668 method: bytes, 

669 request_serializer: SerializingFunction, 

670 response_deserializer: DeserializingFunction, 

671 loop: asyncio.AbstractEventLoop, 

672 ) -> None: 

673 super().__init__( 

674 channel.call(method, deadline, credentials, wait_for_ready), 

675 metadata, 

676 request_serializer, 

677 response_deserializer, 

678 loop, 

679 ) 

680 

681 self._init_stream_request_mixin(request_iterator) 

682 self._init_unary_response_mixin(loop.create_task(self._conduct_rpc())) 

683 

684 async def _conduct_rpc(self) -> ResponseType: 

685 try: 

686 serialized_response = await self._cython_call.stream_unary( 

687 self._metadata, self._metadata_sent_observer 

688 ) 

689 except asyncio.CancelledError: 

690 if not self.cancelled(): 

691 self.cancel() 

692 raise 

693 

694 if self._cython_call.is_ok(): 

695 return _common.deserialize( 

696 serialized_response, self._response_deserializer 

697 ) 

698 else: 

699 return cygrpc.EOF 

700 

701 

702class StreamStreamCall( 

703 _StreamRequestMixin, _StreamResponseMixin, Call, _base_call.StreamStreamCall 

704): 

705 """Object for managing stream-stream RPC calls. 

706 

707 Returned when an instance of `StreamStreamMultiCallable` object is called. 

708 """ 

709 

710 _initializer: asyncio.Task 

711 

712 # pylint: disable=too-many-arguments 

713 def __init__( 

714 self, 

715 request_iterator: Optional[RequestIterableType], 

716 deadline: Optional[float], 

717 metadata: Metadata, 

718 credentials: Optional[grpc.CallCredentials], 

719 wait_for_ready: Optional[bool], 

720 channel: cygrpc.AioChannel, 

721 method: bytes, 

722 request_serializer: SerializingFunction, 

723 response_deserializer: DeserializingFunction, 

724 loop: asyncio.AbstractEventLoop, 

725 ) -> None: 

726 super().__init__( 

727 channel.call(method, deadline, credentials, wait_for_ready), 

728 metadata, 

729 request_serializer, 

730 response_deserializer, 

731 loop, 

732 ) 

733 self._initializer = self._loop.create_task(self._prepare_rpc()) 

734 self._init_stream_request_mixin(request_iterator) 

735 self._init_stream_response_mixin(self._initializer) 

736 

737 async def _prepare_rpc(self): 

738 """This method prepares the RPC for receiving/sending messages. 

739 

740 All other operations around the stream should only happen after the 

741 completion of this method. 

742 """ 

743 try: 

744 await self._cython_call.initiate_stream_stream( 

745 self._metadata, self._metadata_sent_observer 

746 ) 

747 except asyncio.CancelledError: 

748 if not self.cancelled(): 

749 self.cancel() 

750 # No need to raise RpcError here, because no one will `await` this task.