Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_channel.py: 31%

893 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-06 06:03 +0000

1# Copyright 2016 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Python.""" 

15 

16import copy 

17import functools 

18import logging 

19import os 

20import sys 

21import threading 

22import time 

23import types 

24from typing import (Any, Callable, Iterator, List, Optional, Sequence, Set, 

25 Tuple, Union) 

26 

27import grpc # pytype: disable=pyi-error 

28from grpc import _common # pytype: disable=pyi-error 

29from grpc import _compression # pytype: disable=pyi-error 

30from grpc import _grpcio_metadata # pytype: disable=pyi-error 

31from grpc._cython import cygrpc 

32from grpc._typing import ChannelArgumentType 

33from grpc._typing import DeserializingFunction 

34from grpc._typing import IntegratedCallFactory 

35from grpc._typing import MetadataType 

36from grpc._typing import NullaryCallbackType 

37from grpc._typing import ResponseType 

38from grpc._typing import SerializingFunction 

39from grpc._typing import UserTag 

40import grpc.experimental # pytype: disable=pyi-error 

41 

42_LOGGER = logging.getLogger(__name__) 

43 

44_USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__) 

45 

46_EMPTY_FLAGS = 0 

47 

48# NOTE(rbellevi): No guarantees are given about the maintenance of this 

49# environment variable. 

50_DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv( 

51 "GRPC_SINGLE_THREADED_UNARY_STREAM") is not None 

52 

53_UNARY_UNARY_INITIAL_DUE = ( 

54 cygrpc.OperationType.send_initial_metadata, 

55 cygrpc.OperationType.send_message, 

56 cygrpc.OperationType.send_close_from_client, 

57 cygrpc.OperationType.receive_initial_metadata, 

58 cygrpc.OperationType.receive_message, 

59 cygrpc.OperationType.receive_status_on_client, 

60) 

61_UNARY_STREAM_INITIAL_DUE = ( 

62 cygrpc.OperationType.send_initial_metadata, 

63 cygrpc.OperationType.send_message, 

64 cygrpc.OperationType.send_close_from_client, 

65 cygrpc.OperationType.receive_initial_metadata, 

66 cygrpc.OperationType.receive_status_on_client, 

67) 

68_STREAM_UNARY_INITIAL_DUE = ( 

69 cygrpc.OperationType.send_initial_metadata, 

70 cygrpc.OperationType.receive_initial_metadata, 

71 cygrpc.OperationType.receive_message, 

72 cygrpc.OperationType.receive_status_on_client, 

73) 

74_STREAM_STREAM_INITIAL_DUE = ( 

75 cygrpc.OperationType.send_initial_metadata, 

76 cygrpc.OperationType.receive_initial_metadata, 

77 cygrpc.OperationType.receive_status_on_client, 

78) 

79 

80_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 

81 'Exception calling channel subscription callback!') 

82 

83_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n' 

84 '\tstatus = {}\n' 

85 '\tdetails = "{}"\n' 

86 '>') 

87 

88_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n' 

89 '\tstatus = {}\n' 

90 '\tdetails = "{}"\n' 

91 '\tdebug_error_string = "{}"\n' 

92 '>') 

93 

94 

95def _deadline(timeout: Optional[float]) -> Optional[float]: 

96 return None if timeout is None else time.time() + timeout 

97 

98 

99def _unknown_code_details(unknown_cygrpc_code: Optional[grpc.StatusCode], 

100 details: Optional[str]) -> str: 

101 return 'Server sent unknown code {} and details "{}"'.format( 

102 unknown_cygrpc_code, details) 

103 

104 

105class _RPCState(object): 

106 condition: threading.Condition 

107 due: Set[cygrpc.OperationType] 

108 initial_metadata: Optional[MetadataType] 

109 response: Any 

110 trailing_metadata: Optional[MetadataType] 

111 code: Optional[grpc.StatusCode] 

112 details: Optional[str] 

113 debug_error_string: Optional[str] 

114 cancelled: bool 

115 callbacks: List[NullaryCallbackType] 

116 fork_epoch: Optional[int] 

117 

118 def __init__(self, due: Sequence[cygrpc.OperationType], 

119 initial_metadata: Optional[MetadataType], 

120 trailing_metadata: Optional[MetadataType], 

121 code: Optional[grpc.StatusCode], details: Optional[str]): 

122 # `condition` guards all members of _RPCState. `notify_all` is called on 

123 # `condition` when the state of the RPC has changed. 

124 self.condition = threading.Condition() 

125 

126 # The cygrpc.OperationType objects representing events due from the RPC's 

127 # completion queue. If an operation is in `due`, it is guaranteed that 

128 # `operate()` has been called on a corresponding operation. But the 

129 # converse is not true. That is, in the case of failed `operate()` 

130 # calls, there may briefly be events in `due` that do not correspond to 

131 # operations submitted to Core. 

132 self.due = set(due) 

133 self.initial_metadata = initial_metadata 

134 self.response = None 

135 self.trailing_metadata = trailing_metadata 

136 self.code = code 

137 self.details = details 

138 self.debug_error_string = None 

139 

140 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are 

141 # slightly wonky, so they have to be tracked separately from the rest of the 

142 # result of the RPC. This field tracks whether cancellation was requested 

143 # prior to termination of the RPC. 

144 self.cancelled = False 

145 self.callbacks = [] 

146 self.fork_epoch = cygrpc.get_fork_epoch() 

147 

148 def reset_postfork_child(self): 

149 self.condition = threading.Condition() 

150 

151 

152def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None: 

153 if state.code is None: 

154 state.code = code 

155 state.details = details 

156 if state.initial_metadata is None: 

157 state.initial_metadata = () 

158 state.trailing_metadata = () 

159 

160 

161def _handle_event( 

162 event: cygrpc.BaseEvent, state: _RPCState, 

163 response_deserializer: Optional[DeserializingFunction] 

164) -> List[NullaryCallbackType]: 

165 callbacks = [] 

166 for batch_operation in event.batch_operations: 

167 operation_type = batch_operation.type() 

168 state.due.remove(operation_type) 

169 if operation_type == cygrpc.OperationType.receive_initial_metadata: 

170 state.initial_metadata = batch_operation.initial_metadata() 

171 elif operation_type == cygrpc.OperationType.receive_message: 

172 serialized_response = batch_operation.message() 

173 if serialized_response is not None: 

174 response = _common.deserialize(serialized_response, 

175 response_deserializer) 

176 if response is None: 

177 details = 'Exception deserializing response!' 

178 _abort(state, grpc.StatusCode.INTERNAL, details) 

179 else: 

180 state.response = response 

181 elif operation_type == cygrpc.OperationType.receive_status_on_client: 

182 state.trailing_metadata = batch_operation.trailing_metadata() 

183 if state.code is None: 

184 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( 

185 batch_operation.code()) 

186 if code is None: 

187 state.code = grpc.StatusCode.UNKNOWN 

188 state.details = _unknown_code_details( 

189 code, batch_operation.details()) 

190 else: 

191 state.code = code 

192 state.details = batch_operation.details() 

193 state.debug_error_string = batch_operation.error_string() 

194 callbacks.extend(state.callbacks) 

195 state.callbacks = None 

196 return callbacks 

197 

198 

199def _event_handler( 

200 state: _RPCState, 

201 response_deserializer: Optional[DeserializingFunction]) -> UserTag: 

202 

203 def handle_event(event): 

204 with state.condition: 

205 callbacks = _handle_event(event, state, response_deserializer) 

206 state.condition.notify_all() 

207 done = not state.due 

208 for callback in callbacks: 

209 try: 

210 callback() 

211 except Exception as e: # pylint: disable=broad-except 

212 # NOTE(rbellevi): We suppress but log errors here so as not to 

213 # kill the channel spin thread. 

214 logging.error('Exception in callback %s: %s', 

215 repr(callback.func), repr(e)) 

216 return done and state.fork_epoch >= cygrpc.get_fork_epoch() 

217 

218 return handle_event 

219 

220 

221# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall. 

222#pylint: disable=too-many-statements 

223def _consume_request_iterator(request_iterator: Iterator, state: _RPCState, 

224 call: Union[cygrpc.IntegratedCall, 

225 cygrpc.SegregatedCall], 

226 request_serializer: SerializingFunction, 

227 event_handler: Optional[UserTag]) -> None: 

228 """Consume a request supplied by the user.""" 

229 

230 def consume_request_iterator(): # pylint: disable=too-many-branches 

231 # Iterate over the request iterator until it is exhausted or an error 

232 # condition is encountered. 

233 while True: 

234 return_from_user_request_generator_invoked = False 

235 try: 

236 # The thread may die in user-code. Do not block fork for this. 

237 cygrpc.enter_user_request_generator() 

238 request = next(request_iterator) 

239 except StopIteration: 

240 break 

241 except Exception: # pylint: disable=broad-except 

242 cygrpc.return_from_user_request_generator() 

243 return_from_user_request_generator_invoked = True 

244 code = grpc.StatusCode.UNKNOWN 

245 details = 'Exception iterating requests!' 

246 _LOGGER.exception(details) 

247 call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 

248 details) 

249 _abort(state, code, details) 

250 return 

251 finally: 

252 if not return_from_user_request_generator_invoked: 

253 cygrpc.return_from_user_request_generator() 

254 serialized_request = _common.serialize(request, request_serializer) 

255 with state.condition: 

256 if state.code is None and not state.cancelled: 

257 if serialized_request is None: 

258 code = grpc.StatusCode.INTERNAL 

259 details = 'Exception serializing request!' 

260 call.cancel( 

261 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 

262 details) 

263 _abort(state, code, details) 

264 return 

265 else: 

266 state.due.add(cygrpc.OperationType.send_message) 

267 operations = (cygrpc.SendMessageOperation( 

268 serialized_request, _EMPTY_FLAGS),) 

269 operating = call.operate(operations, event_handler) 

270 if not operating: 

271 state.due.remove(cygrpc.OperationType.send_message) 

272 return 

273 

274 def _done(): 

275 return (state.code is not None or 

276 cygrpc.OperationType.send_message 

277 not in state.due) 

278 

279 _common.wait(state.condition.wait, 

280 _done, 

281 spin_cb=functools.partial( 

282 cygrpc.block_if_fork_in_progress, 

283 state)) 

284 if state.code is not None: 

285 return 

286 else: 

287 return 

288 with state.condition: 

289 if state.code is None: 

290 state.due.add(cygrpc.OperationType.send_close_from_client) 

291 operations = ( 

292 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),) 

293 operating = call.operate(operations, event_handler) 

294 if not operating: 

295 state.due.remove( 

296 cygrpc.OperationType.send_close_from_client) 

297 

298 consumption_thread = cygrpc.ForkManagedThread( 

299 target=consume_request_iterator) 

300 consumption_thread.setDaemon(True) 

301 consumption_thread.start() 

302 

303 

304def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str: 

305 """Calculates error string for RPC.""" 

306 with rpc_state.condition: 

307 if rpc_state.code is None: 

308 return '<{} object>'.format(class_name) 

309 elif rpc_state.code is grpc.StatusCode.OK: 

310 return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code, 

311 rpc_state.details) 

312 else: 

313 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( 

314 class_name, rpc_state.code, rpc_state.details, 

315 rpc_state.debug_error_string) 

316 

317 

318class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future): 

319 """An RPC error not tied to the execution of a particular RPC. 

320 

321 The RPC represented by the state object must not be in-progress or 

322 cancelled. 

323 

324 Attributes: 

325 _state: An instance of _RPCState. 

326 """ 

327 _state: _RPCState 

328 

329 def __init__(self, state: _RPCState): 

330 with state.condition: 

331 self._state = _RPCState((), copy.deepcopy(state.initial_metadata), 

332 copy.deepcopy(state.trailing_metadata), 

333 state.code, copy.deepcopy(state.details)) 

334 self._state.response = copy.copy(state.response) 

335 self._state.debug_error_string = copy.copy(state.debug_error_string) 

336 

337 def initial_metadata(self) -> Optional[MetadataType]: 

338 return self._state.initial_metadata 

339 

340 def trailing_metadata(self) -> Optional[MetadataType]: 

341 return self._state.trailing_metadata 

342 

343 def code(self) -> Optional[grpc.StatusCode]: 

344 return self._state.code 

345 

346 def details(self) -> Optional[str]: 

347 return _common.decode(self._state.details) 

348 

349 def debug_error_string(self) -> Optional[str]: 

350 return _common.decode(self._state.debug_error_string) 

351 

352 def _repr(self) -> str: 

353 return _rpc_state_string(self.__class__.__name__, self._state) 

354 

355 def __repr__(self) -> str: 

356 return self._repr() 

357 

358 def __str__(self) -> str: 

359 return self._repr() 

360 

361 def cancel(self) -> bool: 

362 """See grpc.Future.cancel.""" 

363 return False 

364 

365 def cancelled(self) -> bool: 

366 """See grpc.Future.cancelled.""" 

367 return False 

368 

369 def running(self) -> bool: 

370 """See grpc.Future.running.""" 

371 return False 

372 

373 def done(self) -> bool: 

374 """See grpc.Future.done.""" 

375 return True 

376 

377 def result(self, timeout: Optional[float] = None) -> Any: # pylint: disable=unused-argument 

378 """See grpc.Future.result.""" 

379 raise self 

380 

381 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: # pylint: disable=unused-argument 

382 """See grpc.Future.exception.""" 

383 return self 

384 

385 def traceback( 

386 self, 

387 timeout: Optional[float] = None # pylint: disable=unused-argument 

388 ) -> Optional[types.TracebackType]: 

389 """See grpc.Future.traceback.""" 

390 try: 

391 raise self 

392 except grpc.RpcError: 

393 return sys.exc_info()[2] 

394 

395 def add_done_callback( 

396 self, 

397 fn: Callable[[grpc.Future], None], 

398 timeout: Optional[float] = None) -> None: # pylint: disable=unused-argument 

399 """See grpc.Future.add_done_callback.""" 

400 fn(self) 

401 

402 

403class _Rendezvous(grpc.RpcError, grpc.RpcContext): 

404 """An RPC iterator. 

405 

406 Attributes: 

407 _state: An instance of _RPCState. 

408 _call: An instance of SegregatedCall or IntegratedCall. 

409 In either case, the _call object is expected to have operate, cancel, 

410 and next_event methods. 

411 _response_deserializer: A callable taking bytes and return a Python 

412 object. 

413 _deadline: A float representing the deadline of the RPC in seconds. Or 

414 possibly None, to represent an RPC with no deadline at all. 

415 """ 

416 _state: _RPCState 

417 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall] 

418 _response_deserializer: Optional[DeserializingFunction] 

419 _deadline: Optional[float] 

420 

421 def __init__(self, state: _RPCState, call: Union[cygrpc.SegregatedCall, 

422 cygrpc.IntegratedCall], 

423 response_deserializer: Optional[DeserializingFunction], 

424 deadline: Optional[float]): 

425 super(_Rendezvous, self).__init__() 

426 self._state = state 

427 self._call = call 

428 self._response_deserializer = response_deserializer 

429 self._deadline = deadline 

430 

431 def is_active(self) -> bool: 

432 """See grpc.RpcContext.is_active""" 

433 with self._state.condition: 

434 return self._state.code is None 

435 

436 def time_remaining(self) -> Optional[float]: 

437 """See grpc.RpcContext.time_remaining""" 

438 with self._state.condition: 

439 if self._deadline is None: 

440 return None 

441 else: 

442 return max(self._deadline - time.time(), 0) 

443 

444 def cancel(self) -> bool: 

445 """See grpc.RpcContext.cancel""" 

446 with self._state.condition: 

447 if self._state.code is None: 

448 code = grpc.StatusCode.CANCELLED 

449 details = 'Locally cancelled by application!' 

450 self._call.cancel( 

451 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details) 

452 self._state.cancelled = True 

453 _abort(self._state, code, details) 

454 self._state.condition.notify_all() 

455 return True 

456 else: 

457 return False 

458 

459 def add_callback(self, callback: NullaryCallbackType) -> bool: 

460 """See grpc.RpcContext.add_callback""" 

461 with self._state.condition: 

462 if self._state.callbacks is None: 

463 return False 

464 else: 

465 self._state.callbacks.append(callback) 

466 return True 

467 

468 def __iter__(self): 

469 return self 

470 

471 def next(self): 

472 return self._next() 

473 

474 def __next__(self): 

475 return self._next() 

476 

477 def _next(self): 

478 raise NotImplementedError() 

479 

480 def debug_error_string(self) -> Optional[str]: 

481 raise NotImplementedError() 

482 

483 def _repr(self) -> str: 

484 return _rpc_state_string(self.__class__.__name__, self._state) 

485 

486 def __repr__(self) -> str: 

487 return self._repr() 

488 

489 def __str__(self) -> str: 

490 return self._repr() 

491 

492 def __del__(self) -> None: 

493 with self._state.condition: 

494 if self._state.code is None: 

495 self._state.code = grpc.StatusCode.CANCELLED 

496 self._state.details = 'Cancelled upon garbage collection!' 

497 self._state.cancelled = True 

498 self._call.cancel( 

499 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], 

500 self._state.details) 

501 self._state.condition.notify_all() 

502 

503 

504class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors 

505 """An RPC iterator operating entirely on a single thread. 

506 

507 The __next__ method of _SingleThreadedRendezvous does not depend on the 

508 existence of any other thread, including the "channel spin thread". 

509 However, this means that its interface is entirely synchronous. So this 

510 class cannot completely fulfill the grpc.Future interface. The result, 

511 exception, and traceback methods will never block and will instead raise 

512 an exception if calling the method would result in blocking. 

513 

514 This means that these methods are safe to call from add_done_callback 

515 handlers. 

516 """ 

517 _state: _RPCState 

518 

519 def _is_complete(self) -> bool: 

520 return self._state.code is not None 

521 

522 def cancelled(self) -> bool: 

523 with self._state.condition: 

524 return self._state.cancelled 

525 

526 def running(self) -> bool: 

527 with self._state.condition: 

528 return self._state.code is None 

529 

530 def done(self) -> bool: 

531 with self._state.condition: 

532 return self._state.code is not None 

533 

534 def result(self, timeout: Optional[float] = None) -> Any: 

535 """Returns the result of the computation or raises its exception. 

536 

537 This method will never block. Instead, it will raise an exception 

538 if calling this method would otherwise result in blocking. 

539 

540 Since this method will never block, any `timeout` argument passed will 

541 be ignored. 

542 """ 

543 del timeout 

544 with self._state.condition: 

545 if not self._is_complete(): 

546 raise grpc.experimental.UsageError( 

547 "_SingleThreadedRendezvous only supports result() when the RPC is complete." 

548 ) 

549 if self._state.code is grpc.StatusCode.OK: 

550 return self._state.response 

551 elif self._state.cancelled: 

552 raise grpc.FutureCancelledError() 

553 else: 

554 raise self 

555 

556 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

557 """Return the exception raised by the computation. 

558 

559 This method will never block. Instead, it will raise an exception 

560 if calling this method would otherwise result in blocking. 

561 

562 Since this method will never block, any `timeout` argument passed will 

563 be ignored. 

564 """ 

565 del timeout 

566 with self._state.condition: 

567 if not self._is_complete(): 

568 raise grpc.experimental.UsageError( 

569 "_SingleThreadedRendezvous only supports exception() when the RPC is complete." 

570 ) 

571 if self._state.code is grpc.StatusCode.OK: 

572 return None 

573 elif self._state.cancelled: 

574 raise grpc.FutureCancelledError() 

575 else: 

576 return self 

577 

578 def traceback( 

579 self, 

580 timeout: Optional[float] = None) -> Optional[types.TracebackType]: 

581 """Access the traceback of the exception raised by the computation. 

582 

583 This method will never block. Instead, it will raise an exception 

584 if calling this method would otherwise result in blocking. 

585 

586 Since this method will never block, any `timeout` argument passed will 

587 be ignored. 

588 """ 

589 del timeout 

590 with self._state.condition: 

591 if not self._is_complete(): 

592 raise grpc.experimental.UsageError( 

593 "_SingleThreadedRendezvous only supports traceback() when the RPC is complete." 

594 ) 

595 if self._state.code is grpc.StatusCode.OK: 

596 return None 

597 elif self._state.cancelled: 

598 raise grpc.FutureCancelledError() 

599 else: 

600 try: 

601 raise self 

602 except grpc.RpcError: 

603 return sys.exc_info()[2] 

604 

605 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

606 with self._state.condition: 

607 if self._state.code is None: 

608 self._state.callbacks.append(functools.partial(fn, self)) 

609 return 

610 

611 fn(self) 

612 

613 def initial_metadata(self) -> Optional[MetadataType]: 

614 """See grpc.Call.initial_metadata""" 

615 with self._state.condition: 

616 # NOTE(gnossen): Based on our initial call batch, we are guaranteed 

617 # to receive initial metadata before any messages. 

618 while self._state.initial_metadata is None: 

619 self._consume_next_event() 

620 return self._state.initial_metadata 

621 

622 def trailing_metadata(self) -> Optional[MetadataType]: 

623 """See grpc.Call.trailing_metadata""" 

624 with self._state.condition: 

625 if self._state.trailing_metadata is None: 

626 raise grpc.experimental.UsageError( 

627 "Cannot get trailing metadata until RPC is completed.") 

628 return self._state.trailing_metadata 

629 

630 def code(self) -> Optional[grpc.StatusCode]: 

631 """See grpc.Call.code""" 

632 with self._state.condition: 

633 if self._state.code is None: 

634 raise grpc.experimental.UsageError( 

635 "Cannot get code until RPC is completed.") 

636 return self._state.code 

637 

638 def details(self) -> Optional[str]: 

639 """See grpc.Call.details""" 

640 with self._state.condition: 

641 if self._state.details is None: 

642 raise grpc.experimental.UsageError( 

643 "Cannot get details until RPC is completed.") 

644 return _common.decode(self._state.details) 

645 

646 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]: 

647 event = self._call.next_event() 

648 with self._state.condition: 

649 callbacks = _handle_event(event, self._state, 

650 self._response_deserializer) 

651 for callback in callbacks: 

652 # NOTE(gnossen): We intentionally allow exceptions to bubble up 

653 # to the user when running on a single thread. 

654 callback() 

655 return event 

656 

657 def _next_response(self) -> Any: 

658 while True: 

659 self._consume_next_event() 

660 with self._state.condition: 

661 if self._state.response is not None: 

662 response = self._state.response 

663 self._state.response = None 

664 return response 

665 elif cygrpc.OperationType.receive_message not in self._state.due: 

666 if self._state.code is grpc.StatusCode.OK: 

667 raise StopIteration() 

668 elif self._state.code is not None: 

669 raise self 

670 

671 def _next(self) -> Any: 

672 with self._state.condition: 

673 if self._state.code is None: 

674 # We tentatively add the operation as expected and remove 

675 # it if the enqueue operation fails. This allows us to guarantee that 

676 # if an event has been submitted to the core completion queue, 

677 # it is in `due`. If we waited until after a successful 

678 # enqueue operation then a signal could interrupt this 

679 # thread between the enqueue operation and the addition of the 

680 # operation to `due`. This would cause an exception on the 

681 # channel spin thread when the operation completes and no 

682 # corresponding operation would be present in state.due. 

683 # Note that, since `condition` is held through this block, there is 

684 # no data race on `due`. 

685 self._state.due.add(cygrpc.OperationType.receive_message) 

686 operating = self._call.operate( 

687 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None) 

688 if not operating: 

689 self._state.due.remove(cygrpc.OperationType.receive_message) 

690 elif self._state.code is grpc.StatusCode.OK: 

691 raise StopIteration() 

692 else: 

693 raise self 

694 return self._next_response() 

695 

696 def debug_error_string(self) -> Optional[str]: 

697 with self._state.condition: 

698 if self._state.debug_error_string is None: 

699 raise grpc.experimental.UsageError( 

700 "Cannot get debug error string until RPC is completed.") 

701 return _common.decode(self._state.debug_error_string) 

702 

703 

704class _MultiThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors 

705 """An RPC iterator that depends on a channel spin thread. 

706 

707 This iterator relies upon a per-channel thread running in the background, 

708 dequeueing events from the completion queue, and notifying threads waiting 

709 on the threading.Condition object in the _RPCState object. 

710 

711 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface 

712 and to mediate a bidirection streaming RPC. 

713 """ 

714 _state: _RPCState 

715 

716 def initial_metadata(self) -> Optional[MetadataType]: 

717 """See grpc.Call.initial_metadata""" 

718 with self._state.condition: 

719 

720 def _done(): 

721 return self._state.initial_metadata is not None 

722 

723 _common.wait(self._state.condition.wait, _done) 

724 return self._state.initial_metadata 

725 

726 def trailing_metadata(self) -> Optional[MetadataType]: 

727 """See grpc.Call.trailing_metadata""" 

728 with self._state.condition: 

729 

730 def _done(): 

731 return self._state.trailing_metadata is not None 

732 

733 _common.wait(self._state.condition.wait, _done) 

734 return self._state.trailing_metadata 

735 

736 def code(self) -> Optional[grpc.StatusCode]: 

737 """See grpc.Call.code""" 

738 with self._state.condition: 

739 

740 def _done(): 

741 return self._state.code is not None 

742 

743 _common.wait(self._state.condition.wait, _done) 

744 return self._state.code 

745 

746 def details(self) -> Optional[str]: 

747 """See grpc.Call.details""" 

748 with self._state.condition: 

749 

750 def _done(): 

751 return self._state.details is not None 

752 

753 _common.wait(self._state.condition.wait, _done) 

754 return _common.decode(self._state.details) 

755 

756 def debug_error_string(self) -> Optional[str]: 

757 with self._state.condition: 

758 

759 def _done(): 

760 return self._state.debug_error_string is not None 

761 

762 _common.wait(self._state.condition.wait, _done) 

763 return _common.decode(self._state.debug_error_string) 

764 

765 def cancelled(self) -> bool: 

766 with self._state.condition: 

767 return self._state.cancelled 

768 

769 def running(self) -> bool: 

770 with self._state.condition: 

771 return self._state.code is None 

772 

773 def done(self) -> bool: 

774 with self._state.condition: 

775 return self._state.code is not None 

776 

777 def _is_complete(self) -> bool: 

778 return self._state.code is not None 

779 

780 def result(self, timeout: Optional[float] = None) -> Any: 

781 """Returns the result of the computation or raises its exception. 

782 

783 See grpc.Future.result for the full API contract. 

784 """ 

785 with self._state.condition: 

786 timed_out = _common.wait(self._state.condition.wait, 

787 self._is_complete, 

788 timeout=timeout) 

789 if timed_out: 

790 raise grpc.FutureTimeoutError() 

791 else: 

792 if self._state.code is grpc.StatusCode.OK: 

793 return self._state.response 

794 elif self._state.cancelled: 

795 raise grpc.FutureCancelledError() 

796 else: 

797 raise self 

798 

799 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

800 """Return the exception raised by the computation. 

801 

802 See grpc.Future.exception for the full API contract. 

803 """ 

804 with self._state.condition: 

805 timed_out = _common.wait(self._state.condition.wait, 

806 self._is_complete, 

807 timeout=timeout) 

808 if timed_out: 

809 raise grpc.FutureTimeoutError() 

810 else: 

811 if self._state.code is grpc.StatusCode.OK: 

812 return None 

813 elif self._state.cancelled: 

814 raise grpc.FutureCancelledError() 

815 else: 

816 return self 

817 

818 def traceback( 

819 self, 

820 timeout: Optional[float] = None) -> Optional[types.TracebackType]: 

821 """Access the traceback of the exception raised by the computation. 

822 

823 See grpc.future.traceback for the full API contract. 

824 """ 

825 with self._state.condition: 

826 timed_out = _common.wait(self._state.condition.wait, 

827 self._is_complete, 

828 timeout=timeout) 

829 if timed_out: 

830 raise grpc.FutureTimeoutError() 

831 else: 

832 if self._state.code is grpc.StatusCode.OK: 

833 return None 

834 elif self._state.cancelled: 

835 raise grpc.FutureCancelledError() 

836 else: 

837 try: 

838 raise self 

839 except grpc.RpcError: 

840 return sys.exc_info()[2] 

841 

842 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

843 with self._state.condition: 

844 if self._state.code is None: 

845 self._state.callbacks.append(functools.partial(fn, self)) 

846 return 

847 

848 fn(self) 

849 

850 def _next(self) -> Any: 

851 with self._state.condition: 

852 if self._state.code is None: 

853 event_handler = _event_handler(self._state, 

854 self._response_deserializer) 

855 self._state.due.add(cygrpc.OperationType.receive_message) 

856 operating = self._call.operate( 

857 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 

858 event_handler) 

859 if not operating: 

860 self._state.due.remove(cygrpc.OperationType.receive_message) 

861 elif self._state.code is grpc.StatusCode.OK: 

862 raise StopIteration() 

863 else: 

864 raise self 

865 

866 def _response_ready(): 

867 return (self._state.response is not None or 

868 (cygrpc.OperationType.receive_message 

869 not in self._state.due and 

870 self._state.code is not None)) 

871 

872 _common.wait(self._state.condition.wait, _response_ready) 

873 if self._state.response is not None: 

874 response = self._state.response 

875 self._state.response = None 

876 return response 

877 elif cygrpc.OperationType.receive_message not in self._state.due: 

878 if self._state.code is grpc.StatusCode.OK: 

879 raise StopIteration() 

880 elif self._state.code is not None: 

881 raise self 

882 

883 

884def _start_unary_request( 

885 request: Any, timeout: Optional[float], 

886 request_serializer: SerializingFunction 

887) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]: 

888 deadline = _deadline(timeout) 

889 serialized_request = _common.serialize(request, request_serializer) 

890 if serialized_request is None: 

891 state = _RPCState((), (), (), grpc.StatusCode.INTERNAL, 

892 'Exception serializing request!') 

893 error = _InactiveRpcError(state) 

894 return deadline, None, error 

895 else: 

896 return deadline, serialized_request, None 

897 

898 

899def _end_unary_response_blocking( 

900 state: _RPCState, call: cygrpc.SegregatedCall, with_call: bool, 

901 deadline: Optional[float] 

902) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]: 

903 if state.code is grpc.StatusCode.OK: 

904 if with_call: 

905 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline) 

906 return state.response, rendezvous 

907 else: 

908 return state.response 

909 else: 

910 raise _InactiveRpcError(state) # pytype: disable=not-instantiable 

911 

912 

913def _stream_unary_invocation_operations( 

914 metadata: Optional[MetadataType], 

915 initial_metadata_flags: int) -> Sequence[Sequence[cygrpc.Operation]]: 

916 return ( 

917 ( 

918 cygrpc.SendInitialMetadataOperation(metadata, 

919 initial_metadata_flags), 

920 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

921 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

922 ), 

923 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

924 ) 

925 

926 

927def _stream_unary_invocation_operations_and_tags( 

928 metadata: Optional[MetadataType], initial_metadata_flags: int 

929) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]: 

930 return tuple(( 

931 operations, 

932 None, 

933 ) for operations in _stream_unary_invocation_operations( 

934 metadata, initial_metadata_flags)) 

935 

936 

937def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]: 

938 parent_deadline = cygrpc.get_deadline_from_context() 

939 if parent_deadline is None and user_deadline is None: 

940 return None 

941 elif parent_deadline is not None and user_deadline is None: 

942 return parent_deadline 

943 elif user_deadline is not None and parent_deadline is None: 

944 return user_deadline 

945 else: 

946 return min(parent_deadline, user_deadline) 

947 

948 

949class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): 

950 _channel: cygrpc.Channel 

951 _managed_call: IntegratedCallFactory 

952 _method: bytes 

953 _request_serializer: Optional[SerializingFunction] 

954 _response_deserializer: Optional[DeserializingFunction] 

955 _context: Any 

956 

957 # pylint: disable=too-many-arguments 

958 def __init__(self, channel: cygrpc.Channel, 

959 managed_call: IntegratedCallFactory, method: bytes, 

960 request_serializer: Optional[SerializingFunction], 

961 response_deserializer: Optional[DeserializingFunction]): 

962 self._channel = channel 

963 self._managed_call = managed_call 

964 self._method = method 

965 self._request_serializer = request_serializer 

966 self._response_deserializer = response_deserializer 

967 self._context = cygrpc.build_census_context() 

968 

969 def _prepare( 

970 self, request: Any, timeout: Optional[float], 

971 metadata: Optional[MetadataType], wait_for_ready: Optional[bool], 

972 compression: Optional[grpc.Compression] 

973 ) -> Tuple[Optional[_RPCState], Optional[Sequence[cygrpc.Operation]], 

974 Optional[float], Optional[grpc.RpcError]]: 

975 deadline, serialized_request, rendezvous = _start_unary_request( 

976 request, timeout, self._request_serializer) 

977 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

978 wait_for_ready) 

979 augmented_metadata = _compression.augment_metadata( 

980 metadata, compression) 

981 if serialized_request is None: 

982 return None, None, None, rendezvous 

983 else: 

984 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) 

985 operations = ( 

986 cygrpc.SendInitialMetadataOperation(augmented_metadata, 

987 initial_metadata_flags), 

988 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

989 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

990 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), 

991 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

992 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

993 ) 

994 return state, operations, deadline, None 

995 

996 def _blocking( 

997 self, 

998 request: Any, 

999 timeout: Optional[float] = None, 

1000 metadata: Optional[MetadataType] = None, 

1001 credentials: Optional[grpc.CallCredentials] = None, 

1002 wait_for_ready: Optional[bool] = None, 

1003 compression: Optional[grpc.Compression] = None 

1004 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1005 state, operations, deadline, rendezvous = self._prepare( 

1006 request, timeout, metadata, wait_for_ready, compression) 

1007 if state is None: 

1008 raise rendezvous # pylint: disable-msg=raising-bad-type 

1009 else: 

1010 call = self._channel.segregated_call( 

1011 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1012 self._method, None, _determine_deadline(deadline), metadata, 

1013 None if credentials is None else credentials._credentials, (( 

1014 operations, 

1015 None, 

1016 ),), self._context) 

1017 event = call.next_event() 

1018 _handle_event(event, state, self._response_deserializer) 

1019 return state, call 

1020 

1021 def __call__(self, 

1022 request: Any, 

1023 timeout: Optional[float] = None, 

1024 metadata: Optional[MetadataType] = None, 

1025 credentials: Optional[grpc.CallCredentials] = None, 

1026 wait_for_ready: Optional[bool] = None, 

1027 compression: Optional[grpc.Compression] = None) -> Any: 

1028 state, call, = self._blocking(request, timeout, metadata, credentials, 

1029 wait_for_ready, compression) 

1030 return _end_unary_response_blocking(state, call, False, None) 

1031 

1032 def with_call( 

1033 self, 

1034 request: Any, 

1035 timeout: Optional[float] = None, 

1036 metadata: Optional[MetadataType] = None, 

1037 credentials: Optional[grpc.CallCredentials] = None, 

1038 wait_for_ready: Optional[bool] = None, 

1039 compression: Optional[grpc.Compression] = None 

1040 ) -> Tuple[Any, grpc.Call]: 

1041 state, call, = self._blocking(request, timeout, metadata, credentials, 

1042 wait_for_ready, compression) 

1043 return _end_unary_response_blocking(state, call, True, None) 

1044 

1045 def future( 

1046 self, 

1047 request: Any, 

1048 timeout: Optional[float] = None, 

1049 metadata: Optional[MetadataType] = None, 

1050 credentials: Optional[grpc.CallCredentials] = None, 

1051 wait_for_ready: Optional[bool] = None, 

1052 compression: Optional[grpc.Compression] = None 

1053 ) -> _MultiThreadedRendezvous: 

1054 state, operations, deadline, rendezvous = self._prepare( 

1055 request, timeout, metadata, wait_for_ready, compression) 

1056 if state is None: 

1057 raise rendezvous # pylint: disable-msg=raising-bad-type 

1058 else: 

1059 event_handler = _event_handler(state, self._response_deserializer) 

1060 call = self._managed_call( 

1061 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1062 self._method, None, deadline, metadata, 

1063 None if credentials is None else credentials._credentials, 

1064 (operations,), event_handler, self._context) 

1065 return _MultiThreadedRendezvous(state, call, 

1066 self._response_deserializer, 

1067 deadline) 

1068 

1069 

1070class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1071 _channel: cygrpc.Channel 

1072 _method: bytes 

1073 _request_serializer: Optional[SerializingFunction] 

1074 _response_deserializer: Optional[DeserializingFunction] 

1075 _context: Any 

1076 

1077 # pylint: disable=too-many-arguments 

1078 def __init__(self, channel: cygrpc.Channel, method: bytes, 

1079 request_serializer: SerializingFunction, 

1080 response_deserializer: DeserializingFunction): 

1081 self._channel = channel 

1082 self._method = method 

1083 self._request_serializer = request_serializer 

1084 self._response_deserializer = response_deserializer 

1085 self._context = cygrpc.build_census_context() 

1086 

1087 def __call__( # pylint: disable=too-many-locals 

1088 self, 

1089 request: Any, 

1090 timeout: Optional[float] = None, 

1091 metadata: Optional[MetadataType] = None, 

1092 credentials: Optional[grpc.CallCredentials] = None, 

1093 wait_for_ready: Optional[bool] = None, 

1094 compression: Optional[grpc.Compression] = None 

1095 ) -> _SingleThreadedRendezvous: 

1096 deadline = _deadline(timeout) 

1097 serialized_request = _common.serialize(request, 

1098 self._request_serializer) 

1099 if serialized_request is None: 

1100 state = _RPCState((), (), (), grpc.StatusCode.INTERNAL, 

1101 'Exception serializing request!') 

1102 raise _InactiveRpcError(state) 

1103 

1104 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1105 call_credentials = None if credentials is None else credentials._credentials 

1106 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1107 wait_for_ready) 

1108 augmented_metadata = _compression.augment_metadata( 

1109 metadata, compression) 

1110 operations = ( 

1111 (cygrpc.SendInitialMetadataOperation(augmented_metadata, 

1112 initial_metadata_flags), 

1113 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1114 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS)), 

1115 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),), 

1116 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1117 ) 

1118 operations_and_tags = tuple((ops, None) for ops in operations) 

1119 call = self._channel.segregated_call( 

1120 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, 

1121 None, _determine_deadline(deadline), metadata, call_credentials, 

1122 operations_and_tags, self._context) 

1123 return _SingleThreadedRendezvous(state, call, 

1124 self._response_deserializer, deadline) 

1125 

1126 

1127class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1128 _channel: cygrpc.Channel 

1129 _managed_call: IntegratedCallFactory 

1130 _method: bytes 

1131 _request_serializer: Optional[SerializingFunction] 

1132 _response_deserializer: Optional[DeserializingFunction] 

1133 _context: Any 

1134 

1135 # pylint: disable=too-many-arguments 

1136 def __init__(self, channel: cygrpc.Channel, 

1137 managed_call: IntegratedCallFactory, method: bytes, 

1138 request_serializer: SerializingFunction, 

1139 response_deserializer: DeserializingFunction): 

1140 self._channel = channel 

1141 self._managed_call = managed_call 

1142 self._method = method 

1143 self._request_serializer = request_serializer 

1144 self._response_deserializer = response_deserializer 

1145 self._context = cygrpc.build_census_context() 

1146 

1147 def __call__( # pylint: disable=too-many-locals 

1148 self, 

1149 request: Any, 

1150 timeout: Optional[float] = None, 

1151 metadata: Optional[MetadataType] = None, 

1152 credentials: Optional[grpc.CallCredentials] = None, 

1153 wait_for_ready: Optional[bool] = None, 

1154 compression: Optional[ 

1155 grpc.Compression] = None) -> _MultiThreadedRendezvous: 

1156 deadline, serialized_request, rendezvous = _start_unary_request( 

1157 request, timeout, self._request_serializer) 

1158 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1159 wait_for_ready) 

1160 if serialized_request is None: 

1161 raise rendezvous # pylint: disable-msg=raising-bad-type 

1162 else: 

1163 augmented_metadata = _compression.augment_metadata( 

1164 metadata, compression) 

1165 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1166 operations = ( 

1167 ( 

1168 cygrpc.SendInitialMetadataOperation(augmented_metadata, 

1169 initial_metadata_flags), 

1170 cygrpc.SendMessageOperation(serialized_request, 

1171 _EMPTY_FLAGS), 

1172 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1173 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1174 ), 

1175 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1176 ) 

1177 call = self._managed_call( 

1178 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1179 self._method, None, _determine_deadline(deadline), metadata, 

1180 None if credentials is None else credentials._credentials, 

1181 operations, _event_handler(state, self._response_deserializer), 

1182 self._context) 

1183 return _MultiThreadedRendezvous(state, call, 

1184 self._response_deserializer, 

1185 deadline) 

1186 

1187 

1188class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): 

1189 _channel: cygrpc.Channel 

1190 _managed_call: IntegratedCallFactory 

1191 _method: bytes 

1192 _request_serializer: Optional[SerializingFunction] 

1193 _response_deserializer: Optional[DeserializingFunction] 

1194 _context: Any 

1195 

1196 # pylint: disable=too-many-arguments 

1197 def __init__(self, channel: cygrpc.Channel, 

1198 managed_call: IntegratedCallFactory, method: bytes, 

1199 request_serializer: Optional[SerializingFunction], 

1200 response_deserializer: Optional[DeserializingFunction]): 

1201 self._channel = channel 

1202 self._managed_call = managed_call 

1203 self._method = method 

1204 self._request_serializer = request_serializer 

1205 self._response_deserializer = response_deserializer 

1206 self._context = cygrpc.build_census_context() 

1207 

1208 def _blocking( 

1209 self, request_iterator: Iterator, timeout: Optional[float], 

1210 metadata: Optional[MetadataType], 

1211 credentials: Optional[grpc.CallCredentials], 

1212 wait_for_ready: Optional[bool], compression: Optional[grpc.Compression] 

1213 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1214 deadline = _deadline(timeout) 

1215 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1216 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1217 wait_for_ready) 

1218 augmented_metadata = _compression.augment_metadata( 

1219 metadata, compression) 

1220 call = self._channel.segregated_call( 

1221 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, 

1222 None, _determine_deadline(deadline), augmented_metadata, 

1223 None if credentials is None else credentials._credentials, 

1224 _stream_unary_invocation_operations_and_tags( 

1225 augmented_metadata, initial_metadata_flags), self._context) 

1226 _consume_request_iterator(request_iterator, state, call, 

1227 self._request_serializer, None) 

1228 while True: 

1229 event = call.next_event() 

1230 with state.condition: 

1231 _handle_event(event, state, self._response_deserializer) 

1232 state.condition.notify_all() 

1233 if not state.due: 

1234 break 

1235 return state, call 

1236 

1237 def __call__(self, 

1238 request_iterator: Iterator, 

1239 timeout: Optional[float] = None, 

1240 metadata: Optional[MetadataType] = None, 

1241 credentials: Optional[grpc.CallCredentials] = None, 

1242 wait_for_ready: Optional[bool] = None, 

1243 compression: Optional[grpc.Compression] = None) -> Any: 

1244 state, call, = self._blocking(request_iterator, timeout, metadata, 

1245 credentials, wait_for_ready, compression) 

1246 return _end_unary_response_blocking(state, call, False, None) 

1247 

1248 def with_call( 

1249 self, 

1250 request_iterator: Iterator, 

1251 timeout: Optional[float] = None, 

1252 metadata: Optional[MetadataType] = None, 

1253 credentials: Optional[grpc.CallCredentials] = None, 

1254 wait_for_ready: Optional[bool] = None, 

1255 compression: Optional[grpc.Compression] = None 

1256 ) -> Tuple[Any, grpc.Call]: 

1257 state, call, = self._blocking(request_iterator, timeout, metadata, 

1258 credentials, wait_for_ready, compression) 

1259 return _end_unary_response_blocking(state, call, True, None) 

1260 

1261 def future( 

1262 self, 

1263 request_iterator: Iterator, 

1264 timeout: Optional[float] = None, 

1265 metadata: Optional[MetadataType] = None, 

1266 credentials: Optional[grpc.CallCredentials] = None, 

1267 wait_for_ready: Optional[bool] = None, 

1268 compression: Optional[grpc.Compression] = None 

1269 ) -> _MultiThreadedRendezvous: 

1270 deadline = _deadline(timeout) 

1271 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1272 event_handler = _event_handler(state, self._response_deserializer) 

1273 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1274 wait_for_ready) 

1275 augmented_metadata = _compression.augment_metadata( 

1276 metadata, compression) 

1277 call = self._managed_call( 

1278 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, 

1279 None, deadline, augmented_metadata, 

1280 None if credentials is None else credentials._credentials, 

1281 _stream_unary_invocation_operations(metadata, 

1282 initial_metadata_flags), 

1283 event_handler, self._context) 

1284 _consume_request_iterator(request_iterator, state, call, 

1285 self._request_serializer, event_handler) 

1286 return _MultiThreadedRendezvous(state, call, 

1287 self._response_deserializer, deadline) 

1288 

1289 

1290class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): 

1291 _channel: cygrpc.Channel 

1292 _managed_call: IntegratedCallFactory 

1293 _method: bytes 

1294 _request_serializer: Optional[SerializingFunction] 

1295 _response_deserializer: Optional[DeserializingFunction] 

1296 _context: Any 

1297 

1298 # pylint: disable=too-many-arguments 

1299 def __init__(self, 

1300 channel: cygrpc.Channel, 

1301 managed_call: IntegratedCallFactory, 

1302 method: bytes, 

1303 request_serializer: Optional[SerializingFunction] = None, 

1304 response_deserializer: Optional[DeserializingFunction] = None): 

1305 self._channel = channel 

1306 self._managed_call = managed_call 

1307 self._method = method 

1308 self._request_serializer = request_serializer 

1309 self._response_deserializer = response_deserializer 

1310 self._context = cygrpc.build_census_context() 

1311 

1312 def __call__( 

1313 self, 

1314 request_iterator: Iterator, 

1315 timeout: Optional[float] = None, 

1316 metadata: Optional[MetadataType] = None, 

1317 credentials: Optional[grpc.CallCredentials] = None, 

1318 wait_for_ready: Optional[bool] = None, 

1319 compression: Optional[grpc.Compression] = None 

1320 ) -> _MultiThreadedRendezvous: 

1321 deadline = _deadline(timeout) 

1322 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) 

1323 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1324 wait_for_ready) 

1325 augmented_metadata = _compression.augment_metadata( 

1326 metadata, compression) 

1327 operations = ( 

1328 ( 

1329 cygrpc.SendInitialMetadataOperation(augmented_metadata, 

1330 initial_metadata_flags), 

1331 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1332 ), 

1333 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1334 ) 

1335 event_handler = _event_handler(state, self._response_deserializer) 

1336 call = self._managed_call( 

1337 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, 

1338 None, _determine_deadline(deadline), augmented_metadata, 

1339 None if credentials is None else credentials._credentials, 

1340 operations, event_handler, self._context) 

1341 _consume_request_iterator(request_iterator, state, call, 

1342 self._request_serializer, event_handler) 

1343 return _MultiThreadedRendezvous(state, call, 

1344 self._response_deserializer, deadline) 

1345 

1346 

1347class _InitialMetadataFlags(int): 

1348 """Stores immutable initial metadata flags""" 

1349 

1350 def __new__(cls, value: int = _EMPTY_FLAGS): 

1351 value &= cygrpc.InitialMetadataFlags.used_mask 

1352 return super(_InitialMetadataFlags, cls).__new__(cls, value) 

1353 

1354 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int: 

1355 if wait_for_ready is not None: 

1356 if wait_for_ready: 

1357 return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \ 

1358 cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) 

1359 elif not wait_for_ready: 

1360 return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \ 

1361 cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) 

1362 return self 

1363 

1364 

1365class _ChannelCallState(object): 

1366 channel: cygrpc.Channel 

1367 managed_calls: int 

1368 threading: bool 

1369 

1370 def __init__(self, channel: cygrpc.Channel): 

1371 self.lock = threading.Lock() 

1372 self.channel = channel 

1373 self.managed_calls = 0 

1374 self.threading = False 

1375 

1376 def reset_postfork_child(self) -> None: 

1377 self.managed_calls = 0 

1378 

1379 def __del__(self): 

1380 try: 

1381 self.channel.close(cygrpc.StatusCode.cancelled, 

1382 'Channel deallocated!') 

1383 except (TypeError, AttributeError): 

1384 pass 

1385 

1386 

1387def _run_channel_spin_thread(state: _ChannelCallState) -> None: 

1388 

1389 def channel_spin(): 

1390 while True: 

1391 cygrpc.block_if_fork_in_progress(state) 

1392 event = state.channel.next_call_event() 

1393 if event.completion_type == cygrpc.CompletionType.queue_timeout: 

1394 continue 

1395 call_completed = event.tag(event) 

1396 if call_completed: 

1397 with state.lock: 

1398 state.managed_calls -= 1 

1399 if state.managed_calls == 0: 

1400 return 

1401 

1402 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin) 

1403 channel_spin_thread.setDaemon(True) 

1404 channel_spin_thread.start() 

1405 

1406 

1407def _channel_managed_call_management(state: _ChannelCallState): 

1408 

1409 # pylint: disable=too-many-arguments 

1410 def create(flags: int, method: bytes, host: Optional[str], 

1411 deadline: Optional[float], metadata: Optional[MetadataType], 

1412 credentials: Optional[cygrpc.CallCredentials], 

1413 operations: Sequence[Sequence[cygrpc.Operation]], 

1414 event_handler: UserTag, context) -> cygrpc.IntegratedCall: 

1415 """Creates a cygrpc.IntegratedCall. 

1416 

1417 Args: 

1418 flags: An integer bitfield of call flags. 

1419 method: The RPC method. 

1420 host: A host string for the created call. 

1421 deadline: A float to be the deadline of the created call or None if 

1422 the call is to have an infinite deadline. 

1423 metadata: The metadata for the call or None. 

1424 credentials: A cygrpc.CallCredentials or None. 

1425 operations: A sequence of sequences of cygrpc.Operations to be 

1426 started on the call. 

1427 event_handler: A behavior to call to handle the events resultant from 

1428 the operations on the call. 

1429 context: Context object for distributed tracing. 

1430 Returns: 

1431 A cygrpc.IntegratedCall with which to conduct an RPC. 

1432 """ 

1433 operations_and_tags = tuple(( 

1434 operation, 

1435 event_handler, 

1436 ) for operation in operations) 

1437 with state.lock: 

1438 call = state.channel.integrated_call(flags, method, host, deadline, 

1439 metadata, credentials, 

1440 operations_and_tags, context) 

1441 if state.managed_calls == 0: 

1442 state.managed_calls = 1 

1443 _run_channel_spin_thread(state) 

1444 else: 

1445 state.managed_calls += 1 

1446 return call 

1447 

1448 return create 

1449 

1450 

1451class _ChannelConnectivityState(object): 

1452 lock: threading.RLock 

1453 channel: grpc.Channel 

1454 polling: bool 

1455 connectivity: grpc.ChannelConnectivity 

1456 try_to_connect: bool 

1457 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704 

1458 callbacks_and_connectivities: List[Sequence[Union[Callable[ 

1459 [grpc.ChannelConnectivity], None], Optional[grpc.ChannelConnectivity]]]] 

1460 delivering: bool 

1461 

1462 def __init__(self, channel: grpc.Channel): 

1463 self.lock = threading.RLock() 

1464 self.channel = channel 

1465 self.polling = False 

1466 self.connectivity = None 

1467 self.try_to_connect = False 

1468 self.callbacks_and_connectivities = [] 

1469 self.delivering = False 

1470 

1471 def reset_postfork_child(self) -> None: 

1472 self.polling = False 

1473 self.connectivity = None 

1474 self.try_to_connect = False 

1475 self.callbacks_and_connectivities = [] 

1476 self.delivering = False 

1477 

1478 

1479def _deliveries( 

1480 state: _ChannelConnectivityState 

1481) -> List[Callable[[grpc.ChannelConnectivity], None]]: 

1482 callbacks_needing_update = [] 

1483 for callback_and_connectivity in state.callbacks_and_connectivities: 

1484 callback, callback_connectivity, = callback_and_connectivity 

1485 if callback_connectivity is not state.connectivity: 

1486 callbacks_needing_update.append(callback) 

1487 callback_and_connectivity[1] = state.connectivity 

1488 return callbacks_needing_update 

1489 

1490 

1491def _deliver( 

1492 state: _ChannelConnectivityState, 

1493 initial_connectivity: grpc.ChannelConnectivity, 

1494 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]] 

1495) -> None: 

1496 connectivity = initial_connectivity 

1497 callbacks = initial_callbacks 

1498 while True: 

1499 for callback in callbacks: 

1500 cygrpc.block_if_fork_in_progress(state) 

1501 try: 

1502 callback(connectivity) 

1503 except Exception: # pylint: disable=broad-except 

1504 _LOGGER.exception( 

1505 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE) 

1506 with state.lock: 

1507 callbacks = _deliveries(state) 

1508 if callbacks: 

1509 connectivity = state.connectivity 

1510 else: 

1511 state.delivering = False 

1512 return 

1513 

1514 

1515def _spawn_delivery( 

1516 state: _ChannelConnectivityState, 

1517 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], 

1518 None]]) -> None: 

1519 delivering_thread = cygrpc.ForkManagedThread(target=_deliver, 

1520 args=( 

1521 state, 

1522 state.connectivity, 

1523 callbacks, 

1524 )) 

1525 delivering_thread.setDaemon(True) 

1526 delivering_thread.start() 

1527 state.delivering = True 

1528 

1529 

1530# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll. 

1531def _poll_connectivity(state: _ChannelConnectivityState, channel: grpc.Channel, 

1532 initial_try_to_connect: bool) -> None: 

1533 try_to_connect = initial_try_to_connect 

1534 connectivity = channel.check_connectivity_state(try_to_connect) 

1535 with state.lock: 

1536 state.connectivity = ( 

1537 _common. 

1538 CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[connectivity]) 

1539 callbacks = tuple( 

1540 callback for callback, unused_but_known_to_be_none_connectivity in 

1541 state.callbacks_and_connectivities) 

1542 for callback_and_connectivity in state.callbacks_and_connectivities: 

1543 callback_and_connectivity[1] = state.connectivity 

1544 if callbacks: 

1545 _spawn_delivery(state, callbacks) 

1546 while True: 

1547 event = channel.watch_connectivity_state(connectivity, 

1548 time.time() + 0.2) 

1549 cygrpc.block_if_fork_in_progress(state) 

1550 with state.lock: 

1551 if not state.callbacks_and_connectivities and not state.try_to_connect: 

1552 state.polling = False 

1553 state.connectivity = None 

1554 break 

1555 try_to_connect = state.try_to_connect 

1556 state.try_to_connect = False 

1557 if event.success or try_to_connect: 

1558 connectivity = channel.check_connectivity_state(try_to_connect) 

1559 with state.lock: 

1560 state.connectivity = ( 

1561 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1562 connectivity]) 

1563 if not state.delivering: 

1564 callbacks = _deliveries(state) 

1565 if callbacks: 

1566 _spawn_delivery(state, callbacks) 

1567 

1568 

1569def _subscribe(state: _ChannelConnectivityState, 

1570 callback: Callable[[grpc.ChannelConnectivity], 

1571 None], try_to_connect: bool) -> None: 

1572 with state.lock: 

1573 if not state.callbacks_and_connectivities and not state.polling: 

1574 polling_thread = cygrpc.ForkManagedThread( 

1575 target=_poll_connectivity, 

1576 args=(state, state.channel, bool(try_to_connect))) 

1577 polling_thread.setDaemon(True) 

1578 polling_thread.start() 

1579 state.polling = True 

1580 state.callbacks_and_connectivities.append([callback, None]) 

1581 elif not state.delivering and state.connectivity is not None: 

1582 _spawn_delivery(state, (callback,)) 

1583 state.try_to_connect |= bool(try_to_connect) 

1584 state.callbacks_and_connectivities.append( 

1585 [callback, state.connectivity]) 

1586 else: 

1587 state.try_to_connect |= bool(try_to_connect) 

1588 state.callbacks_and_connectivities.append([callback, None]) 

1589 

1590 

1591def _unsubscribe(state: _ChannelConnectivityState, 

1592 callback: Callable[[grpc.ChannelConnectivity], None]) -> None: 

1593 with state.lock: 

1594 for index, (subscribed_callback, unused_connectivity) in enumerate( 

1595 state.callbacks_and_connectivities): 

1596 if callback == subscribed_callback: 

1597 state.callbacks_and_connectivities.pop(index) 

1598 break 

1599 

1600 

1601def _augment_options( 

1602 base_options: Sequence[ChannelArgumentType], 

1603 compression: Optional[grpc.Compression] 

1604) -> Sequence[ChannelArgumentType]: 

1605 compression_option = _compression.create_channel_option(compression) 

1606 return tuple(base_options) + compression_option + (( 

1607 cygrpc.ChannelArgKey.primary_user_agent_string, 

1608 _USER_AGENT, 

1609 ),) 

1610 

1611 

1612def _separate_channel_options( 

1613 options: Sequence[ChannelArgumentType] 

1614) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]: 

1615 """Separates core channel options from Python channel options.""" 

1616 core_options = [] 

1617 python_options = [] 

1618 for pair in options: 

1619 if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream: 

1620 python_options.append(pair) 

1621 else: 

1622 core_options.append(pair) 

1623 return python_options, core_options 

1624 

1625 

1626class Channel(grpc.Channel): 

1627 """A cygrpc.Channel-backed implementation of grpc.Channel.""" 

1628 _single_threaded_unary_stream: bool 

1629 _channel: cygrpc.Channel 

1630 _call_state: _ChannelCallState 

1631 _connectivity_state: _ChannelConnectivityState 

1632 

1633 def __init__(self, target: str, options: Sequence[ChannelArgumentType], 

1634 credentials: Optional[grpc.ChannelCredentials], 

1635 compression: Optional[grpc.Compression]): 

1636 """Constructor. 

1637 

1638 Args: 

1639 target: The target to which to connect. 

1640 options: Configuration options for the channel. 

1641 credentials: A cygrpc.ChannelCredentials or None. 

1642 compression: An optional value indicating the compression method to be 

1643 used over the lifetime of the channel. 

1644 """ 

1645 python_options, core_options = _separate_channel_options(options) 

1646 self._single_threaded_unary_stream = _DEFAULT_SINGLE_THREADED_UNARY_STREAM 

1647 self._process_python_options(python_options) 

1648 self._channel = cygrpc.Channel( 

1649 _common.encode(target), _augment_options(core_options, compression), 

1650 credentials) 

1651 self._call_state = _ChannelCallState(self._channel) 

1652 self._connectivity_state = _ChannelConnectivityState(self._channel) 

1653 cygrpc.fork_register_channel(self) 

1654 if cygrpc.g_gevent_activated: 

1655 cygrpc.gevent_increment_channel_count() 

1656 

1657 def _process_python_options( 

1658 self, python_options: Sequence[ChannelArgumentType]) -> None: 

1659 """Sets channel attributes according to python-only channel options.""" 

1660 for pair in python_options: 

1661 if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream: 

1662 self._single_threaded_unary_stream = True 

1663 

1664 def subscribe(self, 

1665 callback: Callable[[grpc.ChannelConnectivity], None], 

1666 try_to_connect: Optional[bool] = None) -> None: 

1667 _subscribe(self._connectivity_state, callback, try_to_connect) 

1668 

1669 def unsubscribe( 

1670 self, callback: Callable[[grpc.ChannelConnectivity], None]) -> None: 

1671 _unsubscribe(self._connectivity_state, callback) 

1672 

1673 def unary_unary( 

1674 self, 

1675 method: str, 

1676 request_serializer: Optional[SerializingFunction] = None, 

1677 response_deserializer: Optional[DeserializingFunction] = None 

1678 ) -> grpc.UnaryUnaryMultiCallable: 

1679 return _UnaryUnaryMultiCallable( 

1680 self._channel, _channel_managed_call_management(self._call_state), 

1681 _common.encode(method), request_serializer, response_deserializer) 

1682 

1683 def unary_stream( 

1684 self, 

1685 method: str, 

1686 request_serializer: Optional[SerializingFunction] = None, 

1687 response_deserializer: Optional[DeserializingFunction] = None 

1688 ) -> grpc.UnaryStreamMultiCallable: 

1689 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC 

1690 # on a single Python thread results in an appreciable speed-up. However, 

1691 # due to slight differences in capability, the multi-threaded variant 

1692 # remains the default. 

1693 if self._single_threaded_unary_stream: 

1694 return _SingleThreadedUnaryStreamMultiCallable( 

1695 self._channel, _common.encode(method), request_serializer, 

1696 response_deserializer) 

1697 else: 

1698 return _UnaryStreamMultiCallable( 

1699 self._channel, 

1700 _channel_managed_call_management(self._call_state), 

1701 _common.encode(method), request_serializer, 

1702 response_deserializer) 

1703 

1704 def stream_unary( 

1705 self, 

1706 method: str, 

1707 request_serializer: Optional[SerializingFunction] = None, 

1708 response_deserializer: Optional[DeserializingFunction] = None 

1709 ) -> grpc.StreamUnaryMultiCallable: 

1710 return _StreamUnaryMultiCallable( 

1711 self._channel, _channel_managed_call_management(self._call_state), 

1712 _common.encode(method), request_serializer, response_deserializer) 

1713 

1714 def stream_stream( 

1715 self, 

1716 method: str, 

1717 request_serializer: Optional[SerializingFunction] = None, 

1718 response_deserializer: Optional[DeserializingFunction] = None 

1719 ) -> grpc.StreamStreamMultiCallable: 

1720 return _StreamStreamMultiCallable( 

1721 self._channel, _channel_managed_call_management(self._call_state), 

1722 _common.encode(method), request_serializer, response_deserializer) 

1723 

1724 def _unsubscribe_all(self) -> None: 

1725 state = self._connectivity_state 

1726 if state: 

1727 with state.lock: 

1728 del state.callbacks_and_connectivities[:] 

1729 

1730 def _close(self) -> None: 

1731 self._unsubscribe_all() 

1732 self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!') 

1733 cygrpc.fork_unregister_channel(self) 

1734 if cygrpc.g_gevent_activated: 

1735 cygrpc.gevent_decrement_channel_count() 

1736 

1737 def _close_on_fork(self) -> None: 

1738 self._unsubscribe_all() 

1739 self._channel.close_on_fork(cygrpc.StatusCode.cancelled, 

1740 'Channel closed due to fork') 

1741 

1742 def __enter__(self): 

1743 return self 

1744 

1745 def __exit__(self, exc_type, exc_val, exc_tb): 

1746 self._close() 

1747 return False 

1748 

1749 def close(self) -> None: 

1750 self._close() 

1751 

1752 def __del__(self): 

1753 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases 

1754 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call 

1755 # here (or more likely, call self._close() here). We don't do this today 

1756 # because many valid use cases today allow the channel to be deleted 

1757 # immediately after stubs are created. After a sufficient period of time 

1758 # has passed for all users to be trusted to freeze out to their channels 

1759 # for as long as they are in use and to close them after using them, 

1760 # then deletion of this grpc._channel.Channel instance can be made to 

1761 # effect closure of the underlying cygrpc.Channel instance. 

1762 try: 

1763 self._unsubscribe_all() 

1764 except: # pylint: disable=bare-except 

1765 # Exceptions in __del__ are ignored by Python anyway, but they can 

1766 # keep spamming logs. Just silence them. 

1767 pass