Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_call.py: 36%

343 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 07:30 +0000

1# Copyright 2019 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Asyncio Python.""" 

15 

16import asyncio 

17import enum 

18from functools import partial 

19import inspect 

20import logging 

21import traceback 

22from typing import AsyncIterable, Optional, Tuple 

23 

24import grpc 

25from grpc import _common 

26from grpc._cython import cygrpc 

27 

28from . import _base_call 

29from ._metadata import Metadata 

30from ._typing import DeserializingFunction 

31from ._typing import DoneCallbackType 

32from ._typing import MetadatumType 

33from ._typing import RequestIterableType 

34from ._typing import RequestType 

35from ._typing import ResponseType 

36from ._typing import SerializingFunction 

37 

38__all__ = 'AioRpcError', 'Call', 'UnaryUnaryCall', 'UnaryStreamCall' 

39 

40_LOCAL_CANCELLATION_DETAILS = 'Locally cancelled by application!' 

41_GC_CANCELLATION_DETAILS = 'Cancelled upon garbage collection!' 

42_RPC_ALREADY_FINISHED_DETAILS = 'RPC already finished.' 

43_RPC_HALF_CLOSED_DETAILS = 'RPC is half closed after calling "done_writing".' 

44_API_STYLE_ERROR = 'The iterator and read/write APIs may not be mixed on a single RPC.' 

45 

46_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n' 

47 '\tstatus = {}\n' 

48 '\tdetails = "{}"\n' 

49 '>') 

50 

51_NON_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n' 

52 '\tstatus = {}\n' 

53 '\tdetails = "{}"\n' 

54 '\tdebug_error_string = "{}"\n' 

55 '>') 

56 

57_LOGGER = logging.getLogger(__name__) 

58 

59 

60class AioRpcError(grpc.RpcError): 

61 """An implementation of RpcError to be used by the asynchronous API. 

62 

63 Raised RpcError is a snapshot of the final status of the RPC, values are 

64 determined. Hence, its methods no longer needs to be coroutines. 

65 """ 

66 

67 _code: grpc.StatusCode 

68 _details: Optional[str] 

69 _initial_metadata: Optional[Metadata] 

70 _trailing_metadata: Optional[Metadata] 

71 _debug_error_string: Optional[str] 

72 

73 def __init__(self, 

74 code: grpc.StatusCode, 

75 initial_metadata: Metadata, 

76 trailing_metadata: Metadata, 

77 details: Optional[str] = None, 

78 debug_error_string: Optional[str] = None) -> None: 

79 """Constructor. 

80 

81 Args: 

82 code: The status code with which the RPC has been finalized. 

83 details: Optional details explaining the reason of the error. 

84 initial_metadata: Optional initial metadata that could be sent by the 

85 Server. 

86 trailing_metadata: Optional metadata that could be sent by the Server. 

87 """ 

88 

89 super().__init__(self) 

90 self._code = code 

91 self._details = details 

92 self._initial_metadata = initial_metadata 

93 self._trailing_metadata = trailing_metadata 

94 self._debug_error_string = debug_error_string 

95 

96 def code(self) -> grpc.StatusCode: 

97 """Accesses the status code sent by the server. 

98 

99 Returns: 

100 The `grpc.StatusCode` status code. 

101 """ 

102 return self._code 

103 

104 def details(self) -> Optional[str]: 

105 """Accesses the details sent by the server. 

106 

107 Returns: 

108 The description of the error. 

109 """ 

110 return self._details 

111 

112 def initial_metadata(self) -> Metadata: 

113 """Accesses the initial metadata sent by the server. 

114 

115 Returns: 

116 The initial metadata received. 

117 """ 

118 return self._initial_metadata 

119 

120 def trailing_metadata(self) -> Metadata: 

121 """Accesses the trailing metadata sent by the server. 

122 

123 Returns: 

124 The trailing metadata received. 

125 """ 

126 return self._trailing_metadata 

127 

128 def debug_error_string(self) -> str: 

129 """Accesses the debug error string sent by the server. 

130 

131 Returns: 

132 The debug error string received. 

133 """ 

134 return self._debug_error_string 

135 

136 def _repr(self) -> str: 

137 """Assembles the error string for the RPC error.""" 

138 return _NON_OK_CALL_REPRESENTATION.format(self.__class__.__name__, 

139 self._code, self._details, 

140 self._debug_error_string) 

141 

142 def __repr__(self) -> str: 

143 return self._repr() 

144 

145 def __str__(self) -> str: 

146 return self._repr() 

147 

148 

149def _create_rpc_error(initial_metadata: Metadata, 

150 status: cygrpc.AioRpcStatus) -> AioRpcError: 

151 return AioRpcError( 

152 _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()], 

153 Metadata.from_tuple(initial_metadata), 

154 Metadata.from_tuple(status.trailing_metadata()), 

155 details=status.details(), 

156 debug_error_string=status.debug_error_string(), 

157 ) 

158 

159 

160class Call: 

161 """Base implementation of client RPC Call object. 

162 

163 Implements logic around final status, metadata and cancellation. 

164 """ 

165 _loop: asyncio.AbstractEventLoop 

166 _code: grpc.StatusCode 

167 _cython_call: cygrpc._AioCall 

168 _metadata: Tuple[MetadatumType, ...] 

169 _request_serializer: SerializingFunction 

170 _response_deserializer: DeserializingFunction 

171 

172 def __init__(self, cython_call: cygrpc._AioCall, metadata: Metadata, 

173 request_serializer: SerializingFunction, 

174 response_deserializer: DeserializingFunction, 

175 loop: asyncio.AbstractEventLoop) -> None: 

176 self._loop = loop 

177 self._cython_call = cython_call 

178 self._metadata = tuple(metadata) 

179 self._request_serializer = request_serializer 

180 self._response_deserializer = response_deserializer 

181 

182 def __del__(self) -> None: 

183 # The '_cython_call' object might be destructed before Call object 

184 if hasattr(self, '_cython_call'): 

185 if not self._cython_call.done(): 

186 self._cancel(_GC_CANCELLATION_DETAILS) 

187 

188 def cancelled(self) -> bool: 

189 return self._cython_call.cancelled() 

190 

191 def _cancel(self, details: str) -> bool: 

192 """Forwards the application cancellation reasoning.""" 

193 if not self._cython_call.done(): 

194 self._cython_call.cancel(details) 

195 return True 

196 else: 

197 return False 

198 

199 def cancel(self) -> bool: 

200 return self._cancel(_LOCAL_CANCELLATION_DETAILS) 

201 

202 def done(self) -> bool: 

203 return self._cython_call.done() 

204 

205 def add_done_callback(self, callback: DoneCallbackType) -> None: 

206 cb = partial(callback, self) 

207 self._cython_call.add_done_callback(cb) 

208 

209 def time_remaining(self) -> Optional[float]: 

210 return self._cython_call.time_remaining() 

211 

212 async def initial_metadata(self) -> Metadata: 

213 raw_metadata_tuple = await self._cython_call.initial_metadata() 

214 return Metadata.from_tuple(raw_metadata_tuple) 

215 

216 async def trailing_metadata(self) -> Metadata: 

217 raw_metadata_tuple = (await 

218 self._cython_call.status()).trailing_metadata() 

219 return Metadata.from_tuple(raw_metadata_tuple) 

220 

221 async def code(self) -> grpc.StatusCode: 

222 cygrpc_code = (await self._cython_call.status()).code() 

223 return _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[cygrpc_code] 

224 

225 async def details(self) -> str: 

226 return (await self._cython_call.status()).details() 

227 

228 async def debug_error_string(self) -> str: 

229 return (await self._cython_call.status()).debug_error_string() 

230 

231 async def _raise_for_status(self) -> None: 

232 if self._cython_call.is_locally_cancelled(): 

233 raise asyncio.CancelledError() 

234 code = await self.code() 

235 if code != grpc.StatusCode.OK: 

236 raise _create_rpc_error(await self.initial_metadata(), await 

237 self._cython_call.status()) 

238 

239 def _repr(self) -> str: 

240 return repr(self._cython_call) 

241 

242 def __repr__(self) -> str: 

243 return self._repr() 

244 

245 def __str__(self) -> str: 

246 return self._repr() 

247 

248 

249class _APIStyle(enum.IntEnum): 

250 UNKNOWN = 0 

251 ASYNC_GENERATOR = 1 

252 READER_WRITER = 2 

253 

254 

255class _UnaryResponseMixin(Call): 

256 _call_response: asyncio.Task 

257 

258 def _init_unary_response_mixin(self, response_task: asyncio.Task): 

259 self._call_response = response_task 

260 

261 def cancel(self) -> bool: 

262 if super().cancel(): 

263 self._call_response.cancel() 

264 return True 

265 else: 

266 return False 

267 

268 def __await__(self) -> ResponseType: 

269 """Wait till the ongoing RPC request finishes.""" 

270 try: 

271 response = yield from self._call_response 

272 except asyncio.CancelledError: 

273 # Even if we caught all other CancelledError, there is still 

274 # this corner case. If the application cancels immediately after 

275 # the Call object is created, we will observe this 

276 # `CancelledError`. 

277 if not self.cancelled(): 

278 self.cancel() 

279 raise 

280 

281 # NOTE(lidiz) If we raise RpcError in the task, and users doesn't 

282 # 'await' on it. AsyncIO will log 'Task exception was never retrieved'. 

283 # Instead, if we move the exception raising here, the spam stops. 

284 # Unfortunately, there can only be one 'yield from' in '__await__'. So, 

285 # we need to access the private instance variable. 

286 if response is cygrpc.EOF: 

287 if self._cython_call.is_locally_cancelled(): 

288 raise asyncio.CancelledError() 

289 else: 

290 raise _create_rpc_error(self._cython_call._initial_metadata, 

291 self._cython_call._status) 

292 else: 

293 return response 

294 

295 

296class _StreamResponseMixin(Call): 

297 _message_aiter: AsyncIterable[ResponseType] 

298 _preparation: asyncio.Task 

299 _response_style: _APIStyle 

300 

301 def _init_stream_response_mixin(self, preparation: asyncio.Task): 

302 self._message_aiter = None 

303 self._preparation = preparation 

304 self._response_style = _APIStyle.UNKNOWN 

305 

306 def _update_response_style(self, style: _APIStyle): 

307 if self._response_style is _APIStyle.UNKNOWN: 

308 self._response_style = style 

309 elif self._response_style is not style: 

310 raise cygrpc.UsageError(_API_STYLE_ERROR) 

311 

312 def cancel(self) -> bool: 

313 if super().cancel(): 

314 self._preparation.cancel() 

315 return True 

316 else: 

317 return False 

318 

319 async def _fetch_stream_responses(self) -> ResponseType: 

320 message = await self._read() 

321 while message is not cygrpc.EOF: 

322 yield message 

323 message = await self._read() 

324 

325 # If the read operation failed, Core should explain why. 

326 await self._raise_for_status() 

327 

328 def __aiter__(self) -> AsyncIterable[ResponseType]: 

329 self._update_response_style(_APIStyle.ASYNC_GENERATOR) 

330 if self._message_aiter is None: 

331 self._message_aiter = self._fetch_stream_responses() 

332 return self._message_aiter 

333 

334 async def _read(self) -> ResponseType: 

335 # Wait for the request being sent 

336 await self._preparation 

337 

338 # Reads response message from Core 

339 try: 

340 raw_response = await self._cython_call.receive_serialized_message() 

341 except asyncio.CancelledError: 

342 if not self.cancelled(): 

343 self.cancel() 

344 await self._raise_for_status() 

345 

346 if raw_response is cygrpc.EOF: 

347 return cygrpc.EOF 

348 else: 

349 return _common.deserialize(raw_response, 

350 self._response_deserializer) 

351 

352 async def read(self) -> ResponseType: 

353 if self.done(): 

354 await self._raise_for_status() 

355 return cygrpc.EOF 

356 self._update_response_style(_APIStyle.READER_WRITER) 

357 

358 response_message = await self._read() 

359 

360 if response_message is cygrpc.EOF: 

361 # If the read operation failed, Core should explain why. 

362 await self._raise_for_status() 

363 return response_message 

364 

365 

366class _StreamRequestMixin(Call): 

367 _metadata_sent: asyncio.Event 

368 _done_writing_flag: bool 

369 _async_request_poller: Optional[asyncio.Task] 

370 _request_style: _APIStyle 

371 

372 def _init_stream_request_mixin( 

373 self, request_iterator: Optional[RequestIterableType]): 

374 self._metadata_sent = asyncio.Event() 

375 self._done_writing_flag = False 

376 

377 # If user passes in an async iterator, create a consumer Task. 

378 if request_iterator is not None: 

379 self._async_request_poller = self._loop.create_task( 

380 self._consume_request_iterator(request_iterator)) 

381 self._request_style = _APIStyle.ASYNC_GENERATOR 

382 else: 

383 self._async_request_poller = None 

384 self._request_style = _APIStyle.READER_WRITER 

385 

386 def _raise_for_different_style(self, style: _APIStyle): 

387 if self._request_style is not style: 

388 raise cygrpc.UsageError(_API_STYLE_ERROR) 

389 

390 def cancel(self) -> bool: 

391 if super().cancel(): 

392 if self._async_request_poller is not None: 

393 self._async_request_poller.cancel() 

394 return True 

395 else: 

396 return False 

397 

398 def _metadata_sent_observer(self): 

399 self._metadata_sent.set() 

400 

401 async def _consume_request_iterator( 

402 self, request_iterator: RequestIterableType) -> None: 

403 try: 

404 if inspect.isasyncgen(request_iterator) or hasattr( 

405 request_iterator, '__aiter__'): 

406 async for request in request_iterator: 

407 try: 

408 await self._write(request) 

409 except AioRpcError as rpc_error: 

410 _LOGGER.debug( 

411 'Exception while consuming the request_iterator: %s', 

412 rpc_error) 

413 return 

414 else: 

415 for request in request_iterator: 

416 try: 

417 await self._write(request) 

418 except AioRpcError as rpc_error: 

419 _LOGGER.debug( 

420 'Exception while consuming the request_iterator: %s', 

421 rpc_error) 

422 return 

423 

424 await self._done_writing() 

425 except: # pylint: disable=bare-except 

426 # Client iterators can raise exceptions, which we should handle by 

427 # cancelling the RPC and logging the client's error. No exceptions 

428 # should escape this function. 

429 _LOGGER.debug('Client request_iterator raised exception:\n%s', 

430 traceback.format_exc()) 

431 self.cancel() 

432 

433 async def _write(self, request: RequestType) -> None: 

434 if self.done(): 

435 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) 

436 if self._done_writing_flag: 

437 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS) 

438 if not self._metadata_sent.is_set(): 

439 await self._metadata_sent.wait() 

440 if self.done(): 

441 await self._raise_for_status() 

442 

443 serialized_request = _common.serialize(request, 

444 self._request_serializer) 

445 try: 

446 await self._cython_call.send_serialized_message(serialized_request) 

447 except cygrpc.InternalError: 

448 await self._raise_for_status() 

449 except asyncio.CancelledError: 

450 if not self.cancelled(): 

451 self.cancel() 

452 await self._raise_for_status() 

453 

454 async def _done_writing(self) -> None: 

455 if self.done(): 

456 # If the RPC is finished, do nothing. 

457 return 

458 if not self._done_writing_flag: 

459 # If the done writing is not sent before, try to send it. 

460 self._done_writing_flag = True 

461 try: 

462 await self._cython_call.send_receive_close() 

463 except asyncio.CancelledError: 

464 if not self.cancelled(): 

465 self.cancel() 

466 await self._raise_for_status() 

467 

468 async def write(self, request: RequestType) -> None: 

469 self._raise_for_different_style(_APIStyle.READER_WRITER) 

470 await self._write(request) 

471 

472 async def done_writing(self) -> None: 

473 """Signal peer that client is done writing. 

474 

475 This method is idempotent. 

476 """ 

477 self._raise_for_different_style(_APIStyle.READER_WRITER) 

478 await self._done_writing() 

479 

480 async def wait_for_connection(self) -> None: 

481 await self._metadata_sent.wait() 

482 if self.done(): 

483 await self._raise_for_status() 

484 

485 

486class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall): 

487 """Object for managing unary-unary RPC calls. 

488 

489 Returned when an instance of `UnaryUnaryMultiCallable` object is called. 

490 """ 

491 _request: RequestType 

492 _invocation_task: asyncio.Task 

493 

494 # pylint: disable=too-many-arguments 

495 def __init__(self, request: RequestType, deadline: Optional[float], 

496 metadata: Metadata, 

497 credentials: Optional[grpc.CallCredentials], 

498 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, 

499 method: bytes, request_serializer: SerializingFunction, 

500 response_deserializer: DeserializingFunction, 

501 loop: asyncio.AbstractEventLoop) -> None: 

502 super().__init__( 

503 channel.call(method, deadline, credentials, wait_for_ready), 

504 metadata, request_serializer, response_deserializer, loop) 

505 self._request = request 

506 self._invocation_task = loop.create_task(self._invoke()) 

507 self._init_unary_response_mixin(self._invocation_task) 

508 

509 async def _invoke(self) -> ResponseType: 

510 serialized_request = _common.serialize(self._request, 

511 self._request_serializer) 

512 

513 # NOTE(lidiz) asyncio.CancelledError is not a good transport for status, 

514 # because the asyncio.Task class do not cache the exception object. 

515 # https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785 

516 try: 

517 serialized_response = await self._cython_call.unary_unary( 

518 serialized_request, self._metadata) 

519 except asyncio.CancelledError: 

520 if not self.cancelled(): 

521 self.cancel() 

522 

523 if self._cython_call.is_ok(): 

524 return _common.deserialize(serialized_response, 

525 self._response_deserializer) 

526 else: 

527 return cygrpc.EOF 

528 

529 async def wait_for_connection(self) -> None: 

530 await self._invocation_task 

531 if self.done(): 

532 await self._raise_for_status() 

533 

534 

535class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall): 

536 """Object for managing unary-stream RPC calls. 

537 

538 Returned when an instance of `UnaryStreamMultiCallable` object is called. 

539 """ 

540 _request: RequestType 

541 _send_unary_request_task: asyncio.Task 

542 

543 # pylint: disable=too-many-arguments 

544 def __init__(self, request: RequestType, deadline: Optional[float], 

545 metadata: Metadata, 

546 credentials: Optional[grpc.CallCredentials], 

547 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, 

548 method: bytes, request_serializer: SerializingFunction, 

549 response_deserializer: DeserializingFunction, 

550 loop: asyncio.AbstractEventLoop) -> None: 

551 super().__init__( 

552 channel.call(method, deadline, credentials, wait_for_ready), 

553 metadata, request_serializer, response_deserializer, loop) 

554 self._request = request 

555 self._send_unary_request_task = loop.create_task( 

556 self._send_unary_request()) 

557 self._init_stream_response_mixin(self._send_unary_request_task) 

558 

559 async def _send_unary_request(self) -> ResponseType: 

560 serialized_request = _common.serialize(self._request, 

561 self._request_serializer) 

562 try: 

563 await self._cython_call.initiate_unary_stream( 

564 serialized_request, self._metadata) 

565 except asyncio.CancelledError: 

566 if not self.cancelled(): 

567 self.cancel() 

568 raise 

569 

570 async def wait_for_connection(self) -> None: 

571 await self._send_unary_request_task 

572 if self.done(): 

573 await self._raise_for_status() 

574 

575 

576class StreamUnaryCall(_StreamRequestMixin, _UnaryResponseMixin, Call, 

577 _base_call.StreamUnaryCall): 

578 """Object for managing stream-unary RPC calls. 

579 

580 Returned when an instance of `StreamUnaryMultiCallable` object is called. 

581 """ 

582 

583 # pylint: disable=too-many-arguments 

584 def __init__(self, request_iterator: Optional[RequestIterableType], 

585 deadline: Optional[float], metadata: Metadata, 

586 credentials: Optional[grpc.CallCredentials], 

587 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, 

588 method: bytes, request_serializer: SerializingFunction, 

589 response_deserializer: DeserializingFunction, 

590 loop: asyncio.AbstractEventLoop) -> None: 

591 super().__init__( 

592 channel.call(method, deadline, credentials, wait_for_ready), 

593 metadata, request_serializer, response_deserializer, loop) 

594 

595 self._init_stream_request_mixin(request_iterator) 

596 self._init_unary_response_mixin(loop.create_task(self._conduct_rpc())) 

597 

598 async def _conduct_rpc(self) -> ResponseType: 

599 try: 

600 serialized_response = await self._cython_call.stream_unary( 

601 self._metadata, self._metadata_sent_observer) 

602 except asyncio.CancelledError: 

603 if not self.cancelled(): 

604 self.cancel() 

605 

606 if self._cython_call.is_ok(): 

607 return _common.deserialize(serialized_response, 

608 self._response_deserializer) 

609 else: 

610 return cygrpc.EOF 

611 

612 

613class StreamStreamCall(_StreamRequestMixin, _StreamResponseMixin, Call, 

614 _base_call.StreamStreamCall): 

615 """Object for managing stream-stream RPC calls. 

616 

617 Returned when an instance of `StreamStreamMultiCallable` object is called. 

618 """ 

619 _initializer: asyncio.Task 

620 

621 # pylint: disable=too-many-arguments 

622 def __init__(self, request_iterator: Optional[RequestIterableType], 

623 deadline: Optional[float], metadata: Metadata, 

624 credentials: Optional[grpc.CallCredentials], 

625 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, 

626 method: bytes, request_serializer: SerializingFunction, 

627 response_deserializer: DeserializingFunction, 

628 loop: asyncio.AbstractEventLoop) -> None: 

629 super().__init__( 

630 channel.call(method, deadline, credentials, wait_for_ready), 

631 metadata, request_serializer, response_deserializer, loop) 

632 self._initializer = self._loop.create_task(self._prepare_rpc()) 

633 self._init_stream_request_mixin(request_iterator) 

634 self._init_stream_response_mixin(self._initializer) 

635 

636 async def _prepare_rpc(self): 

637 """This method prepares the RPC for receiving/sending messages. 

638 

639 All other operations around the stream should only happen after the 

640 completion of this method. 

641 """ 

642 try: 

643 await self._cython_call.initiate_stream_stream( 

644 self._metadata, self._metadata_sent_observer) 

645 except asyncio.CancelledError: 

646 if not self.cancelled(): 

647 self.cancel() 

648 # No need to raise RpcError here, because no one will `await` this task.