Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_channel.py: 26%

916 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:45 +0000

1# Copyright 2016 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Python.""" 

15 

16import copy 

17import functools 

18import logging 

19import os 

20import sys 

21import threading 

22import time 

23import types 

24from typing import ( 

25 Any, 

26 Callable, 

27 Iterator, 

28 List, 

29 Optional, 

30 Sequence, 

31 Set, 

32 Tuple, 

33 Union, 

34) 

35 

36import grpc # pytype: disable=pyi-error 

37from grpc import _common # pytype: disable=pyi-error 

38from grpc import _compression # pytype: disable=pyi-error 

39from grpc import _grpcio_metadata # pytype: disable=pyi-error 

40from grpc import _observability # pytype: disable=pyi-error 

41from grpc._cython import cygrpc 

42from grpc._typing import ChannelArgumentType 

43from grpc._typing import DeserializingFunction 

44from grpc._typing import IntegratedCallFactory 

45from grpc._typing import MetadataType 

46from grpc._typing import NullaryCallbackType 

47from grpc._typing import ResponseType 

48from grpc._typing import SerializingFunction 

49from grpc._typing import UserTag 

50import grpc.experimental # pytype: disable=pyi-error 

51 

52_LOGGER = logging.getLogger(__name__) 

53 

54_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__) 

55 

56_EMPTY_FLAGS = 0 

57 

58# NOTE(rbellevi): No guarantees are given about the maintenance of this 

59# environment variable. 

60_DEFAULT_SINGLE_THREADED_UNARY_STREAM = ( 

61 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None 

62) 

63 

64_UNARY_UNARY_INITIAL_DUE = ( 

65 cygrpc.OperationType.send_initial_metadata, 

66 cygrpc.OperationType.send_message, 

67 cygrpc.OperationType.send_close_from_client, 

68 cygrpc.OperationType.receive_initial_metadata, 

69 cygrpc.OperationType.receive_message, 

70 cygrpc.OperationType.receive_status_on_client, 

71) 

72_UNARY_STREAM_INITIAL_DUE = ( 

73 cygrpc.OperationType.send_initial_metadata, 

74 cygrpc.OperationType.send_message, 

75 cygrpc.OperationType.send_close_from_client, 

76 cygrpc.OperationType.receive_initial_metadata, 

77 cygrpc.OperationType.receive_status_on_client, 

78) 

79_STREAM_UNARY_INITIAL_DUE = ( 

80 cygrpc.OperationType.send_initial_metadata, 

81 cygrpc.OperationType.receive_initial_metadata, 

82 cygrpc.OperationType.receive_message, 

83 cygrpc.OperationType.receive_status_on_client, 

84) 

85_STREAM_STREAM_INITIAL_DUE = ( 

86 cygrpc.OperationType.send_initial_metadata, 

87 cygrpc.OperationType.receive_initial_metadata, 

88 cygrpc.OperationType.receive_status_on_client, 

89) 

90 

91_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 

92 "Exception calling channel subscription callback!" 

93) 

94 

95_OK_RENDEZVOUS_REPR_FORMAT = ( 

96 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' 

97) 

98 

99_NON_OK_RENDEZVOUS_REPR_FORMAT = ( 

100 "<{} of RPC that terminated with:\n" 

101 "\tstatus = {}\n" 

102 '\tdetails = "{}"\n' 

103 '\tdebug_error_string = "{}"\n' 

104 ">" 

105) 

106 

107 

108def _deadline(timeout: Optional[float]) -> Optional[float]: 

109 return None if timeout is None else time.time() + timeout 

110 

111 

112def _unknown_code_details( 

113 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str] 

114) -> str: 

115 return 'Server sent unknown code {} and details "{}"'.format( 

116 unknown_cygrpc_code, details 

117 ) 

118 

119 

120class _RPCState(object): 

121 condition: threading.Condition 

122 due: Set[cygrpc.OperationType] 

123 initial_metadata: Optional[MetadataType] 

124 response: Any 

125 trailing_metadata: Optional[MetadataType] 

126 code: Optional[grpc.StatusCode] 

127 details: Optional[str] 

128 debug_error_string: Optional[str] 

129 cancelled: bool 

130 callbacks: List[NullaryCallbackType] 

131 fork_epoch: Optional[int] 

132 rpc_start_time: Optional[float] # In relative seconds 

133 rpc_end_time: Optional[float] # In relative seconds 

134 method: Optional[str] 

135 

136 def __init__( 

137 self, 

138 due: Sequence[cygrpc.OperationType], 

139 initial_metadata: Optional[MetadataType], 

140 trailing_metadata: Optional[MetadataType], 

141 code: Optional[grpc.StatusCode], 

142 details: Optional[str], 

143 ): 

144 # `condition` guards all members of _RPCState. `notify_all` is called on 

145 # `condition` when the state of the RPC has changed. 

146 self.condition = threading.Condition() 

147 

148 # The cygrpc.OperationType objects representing events due from the RPC's 

149 # completion queue. If an operation is in `due`, it is guaranteed that 

150 # `operate()` has been called on a corresponding operation. But the 

151 # converse is not true. That is, in the case of failed `operate()` 

152 # calls, there may briefly be events in `due` that do not correspond to 

153 # operations submitted to Core. 

154 self.due = set(due) 

155 self.initial_metadata = initial_metadata 

156 self.response = None 

157 self.trailing_metadata = trailing_metadata 

158 self.code = code 

159 self.details = details 

160 self.debug_error_string = None 

161 # The following three fields are used for observability. 

162 # Updates to those fields do not trigger self.condition. 

163 self.rpc_start_time = None 

164 self.rpc_end_time = None 

165 self.method = None 

166 

167 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are 

168 # slightly wonky, so they have to be tracked separately from the rest of the 

169 # result of the RPC. This field tracks whether cancellation was requested 

170 # prior to termination of the RPC. 

171 self.cancelled = False 

172 self.callbacks = [] 

173 self.fork_epoch = cygrpc.get_fork_epoch() 

174 

175 def reset_postfork_child(self): 

176 self.condition = threading.Condition() 

177 

178 

179def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None: 

180 if state.code is None: 

181 state.code = code 

182 state.details = details 

183 if state.initial_metadata is None: 

184 state.initial_metadata = () 

185 state.trailing_metadata = () 

186 

187 

188def _handle_event( 

189 event: cygrpc.BaseEvent, 

190 state: _RPCState, 

191 response_deserializer: Optional[DeserializingFunction], 

192) -> List[NullaryCallbackType]: 

193 callbacks = [] 

194 for batch_operation in event.batch_operations: 

195 operation_type = batch_operation.type() 

196 state.due.remove(operation_type) 

197 if operation_type == cygrpc.OperationType.receive_initial_metadata: 

198 state.initial_metadata = batch_operation.initial_metadata() 

199 elif operation_type == cygrpc.OperationType.receive_message: 

200 serialized_response = batch_operation.message() 

201 if serialized_response is not None: 

202 response = _common.deserialize( 

203 serialized_response, response_deserializer 

204 ) 

205 if response is None: 

206 details = "Exception deserializing response!" 

207 _abort(state, grpc.StatusCode.INTERNAL, details) 

208 else: 

209 state.response = response 

210 elif operation_type == cygrpc.OperationType.receive_status_on_client: 

211 state.trailing_metadata = batch_operation.trailing_metadata() 

212 if state.code is None: 

213 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( 

214 batch_operation.code() 

215 ) 

216 if code is None: 

217 state.code = grpc.StatusCode.UNKNOWN 

218 state.details = _unknown_code_details( 

219 code, batch_operation.details() 

220 ) 

221 else: 

222 state.code = code 

223 state.details = batch_operation.details() 

224 state.debug_error_string = batch_operation.error_string() 

225 state.rpc_end_time = time.perf_counter() 

226 _observability.maybe_record_rpc_latency(state) 

227 callbacks.extend(state.callbacks) 

228 state.callbacks = None 

229 return callbacks 

230 

231 

232def _event_handler( 

233 state: _RPCState, response_deserializer: Optional[DeserializingFunction] 

234) -> UserTag: 

235 def handle_event(event): 

236 with state.condition: 

237 callbacks = _handle_event(event, state, response_deserializer) 

238 state.condition.notify_all() 

239 done = not state.due 

240 for callback in callbacks: 

241 try: 

242 callback() 

243 except Exception as e: # pylint: disable=broad-except 

244 # NOTE(rbellevi): We suppress but log errors here so as not to 

245 # kill the channel spin thread. 

246 logging.error( 

247 "Exception in callback %s: %s", repr(callback.func), repr(e) 

248 ) 

249 return done and state.fork_epoch >= cygrpc.get_fork_epoch() 

250 

251 return handle_event 

252 

253 

254# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall. 

255# pylint: disable=too-many-statements 

256def _consume_request_iterator( 

257 request_iterator: Iterator, 

258 state: _RPCState, 

259 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall], 

260 request_serializer: SerializingFunction, 

261 event_handler: Optional[UserTag], 

262) -> None: 

263 """Consume a request supplied by the user.""" 

264 

265 def consume_request_iterator(): # pylint: disable=too-many-branches 

266 # Iterate over the request iterator until it is exhausted or an error 

267 # condition is encountered. 

268 while True: 

269 return_from_user_request_generator_invoked = False 

270 try: 

271 # The thread may die in user-code. Do not block fork for this. 

272 cygrpc.enter_user_request_generator() 

273 request = next(request_iterator) 

274 except StopIteration: 

275 break 

276 except Exception: # pylint: disable=broad-except 

277 cygrpc.return_from_user_request_generator() 

278 return_from_user_request_generator_invoked = True 

279 code = grpc.StatusCode.UNKNOWN 

280 details = "Exception iterating requests!" 

281 _LOGGER.exception(details) 

282 call.cancel( 

283 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

284 ) 

285 _abort(state, code, details) 

286 return 

287 finally: 

288 if not return_from_user_request_generator_invoked: 

289 cygrpc.return_from_user_request_generator() 

290 serialized_request = _common.serialize(request, request_serializer) 

291 with state.condition: 

292 if state.code is None and not state.cancelled: 

293 if serialized_request is None: 

294 code = grpc.StatusCode.INTERNAL 

295 details = "Exception serializing request!" 

296 call.cancel( 

297 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 

298 details, 

299 ) 

300 _abort(state, code, details) 

301 return 

302 else: 

303 state.due.add(cygrpc.OperationType.send_message) 

304 operations = ( 

305 cygrpc.SendMessageOperation( 

306 serialized_request, _EMPTY_FLAGS 

307 ), 

308 ) 

309 operating = call.operate(operations, event_handler) 

310 if not operating: 

311 state.due.remove(cygrpc.OperationType.send_message) 

312 return 

313 

314 def _done(): 

315 return ( 

316 state.code is not None 

317 or cygrpc.OperationType.send_message 

318 not in state.due 

319 ) 

320 

321 _common.wait( 

322 state.condition.wait, 

323 _done, 

324 spin_cb=functools.partial( 

325 cygrpc.block_if_fork_in_progress, state 

326 ), 

327 ) 

328 if state.code is not None: 

329 return 

330 else: 

331 return 

332 with state.condition: 

333 if state.code is None: 

334 state.due.add(cygrpc.OperationType.send_close_from_client) 

335 operations = ( 

336 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

337 ) 

338 operating = call.operate(operations, event_handler) 

339 if not operating: 

340 state.due.remove( 

341 cygrpc.OperationType.send_close_from_client 

342 ) 

343 

344 consumption_thread = cygrpc.ForkManagedThread( 

345 target=consume_request_iterator 

346 ) 

347 consumption_thread.setDaemon(True) 

348 consumption_thread.start() 

349 

350 

351def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str: 

352 """Calculates error string for RPC.""" 

353 with rpc_state.condition: 

354 if rpc_state.code is None: 

355 return "<{} object>".format(class_name) 

356 elif rpc_state.code is grpc.StatusCode.OK: 

357 return _OK_RENDEZVOUS_REPR_FORMAT.format( 

358 class_name, rpc_state.code, rpc_state.details 

359 ) 

360 else: 

361 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( 

362 class_name, 

363 rpc_state.code, 

364 rpc_state.details, 

365 rpc_state.debug_error_string, 

366 ) 

367 

368 

369class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future): 

370 """An RPC error not tied to the execution of a particular RPC. 

371 

372 The RPC represented by the state object must not be in-progress or 

373 cancelled. 

374 

375 Attributes: 

376 _state: An instance of _RPCState. 

377 """ 

378 

379 _state: _RPCState 

380 

381 def __init__(self, state: _RPCState): 

382 with state.condition: 

383 self._state = _RPCState( 

384 (), 

385 copy.deepcopy(state.initial_metadata), 

386 copy.deepcopy(state.trailing_metadata), 

387 state.code, 

388 copy.deepcopy(state.details), 

389 ) 

390 self._state.response = copy.copy(state.response) 

391 self._state.debug_error_string = copy.copy(state.debug_error_string) 

392 

393 def initial_metadata(self) -> Optional[MetadataType]: 

394 return self._state.initial_metadata 

395 

396 def trailing_metadata(self) -> Optional[MetadataType]: 

397 return self._state.trailing_metadata 

398 

399 def code(self) -> Optional[grpc.StatusCode]: 

400 return self._state.code 

401 

402 def details(self) -> Optional[str]: 

403 return _common.decode(self._state.details) 

404 

405 def debug_error_string(self) -> Optional[str]: 

406 return _common.decode(self._state.debug_error_string) 

407 

408 def _repr(self) -> str: 

409 return _rpc_state_string(self.__class__.__name__, self._state) 

410 

411 def __repr__(self) -> str: 

412 return self._repr() 

413 

414 def __str__(self) -> str: 

415 return self._repr() 

416 

417 def cancel(self) -> bool: 

418 """See grpc.Future.cancel.""" 

419 return False 

420 

421 def cancelled(self) -> bool: 

422 """See grpc.Future.cancelled.""" 

423 return False 

424 

425 def running(self) -> bool: 

426 """See grpc.Future.running.""" 

427 return False 

428 

429 def done(self) -> bool: 

430 """See grpc.Future.done.""" 

431 return True 

432 

433 def result( 

434 self, timeout: Optional[float] = None 

435 ) -> Any: # pylint: disable=unused-argument 

436 """See grpc.Future.result.""" 

437 raise self 

438 

439 def exception( 

440 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

441 ) -> Optional[Exception]: 

442 """See grpc.Future.exception.""" 

443 return self 

444 

445 def traceback( 

446 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

447 ) -> Optional[types.TracebackType]: 

448 """See grpc.Future.traceback.""" 

449 try: 

450 raise self 

451 except grpc.RpcError: 

452 return sys.exc_info()[2] 

453 

454 def add_done_callback( 

455 self, 

456 fn: Callable[[grpc.Future], None], 

457 timeout: Optional[float] = None, # pylint: disable=unused-argument 

458 ) -> None: 

459 """See grpc.Future.add_done_callback.""" 

460 fn(self) 

461 

462 

463class _Rendezvous(grpc.RpcError, grpc.RpcContext): 

464 """An RPC iterator. 

465 

466 Attributes: 

467 _state: An instance of _RPCState. 

468 _call: An instance of SegregatedCall or IntegratedCall. 

469 In either case, the _call object is expected to have operate, cancel, 

470 and next_event methods. 

471 _response_deserializer: A callable taking bytes and return a Python 

472 object. 

473 _deadline: A float representing the deadline of the RPC in seconds. Or 

474 possibly None, to represent an RPC with no deadline at all. 

475 """ 

476 

477 _state: _RPCState 

478 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall] 

479 _response_deserializer: Optional[DeserializingFunction] 

480 _deadline: Optional[float] 

481 

482 def __init__( 

483 self, 

484 state: _RPCState, 

485 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall], 

486 response_deserializer: Optional[DeserializingFunction], 

487 deadline: Optional[float], 

488 ): 

489 super(_Rendezvous, self).__init__() 

490 self._state = state 

491 self._call = call 

492 self._response_deserializer = response_deserializer 

493 self._deadline = deadline 

494 

495 def is_active(self) -> bool: 

496 """See grpc.RpcContext.is_active""" 

497 with self._state.condition: 

498 return self._state.code is None 

499 

500 def time_remaining(self) -> Optional[float]: 

501 """See grpc.RpcContext.time_remaining""" 

502 with self._state.condition: 

503 if self._deadline is None: 

504 return None 

505 else: 

506 return max(self._deadline - time.time(), 0) 

507 

508 def cancel(self) -> bool: 

509 """See grpc.RpcContext.cancel""" 

510 with self._state.condition: 

511 if self._state.code is None: 

512 code = grpc.StatusCode.CANCELLED 

513 details = "Locally cancelled by application!" 

514 self._call.cancel( 

515 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

516 ) 

517 self._state.cancelled = True 

518 _abort(self._state, code, details) 

519 self._state.condition.notify_all() 

520 return True 

521 else: 

522 return False 

523 

524 def add_callback(self, callback: NullaryCallbackType) -> bool: 

525 """See grpc.RpcContext.add_callback""" 

526 with self._state.condition: 

527 if self._state.callbacks is None: 

528 return False 

529 else: 

530 self._state.callbacks.append(callback) 

531 return True 

532 

533 def __iter__(self): 

534 return self 

535 

536 def next(self): 

537 return self._next() 

538 

539 def __next__(self): 

540 return self._next() 

541 

542 def _next(self): 

543 raise NotImplementedError() 

544 

545 def debug_error_string(self) -> Optional[str]: 

546 raise NotImplementedError() 

547 

548 def _repr(self) -> str: 

549 return _rpc_state_string(self.__class__.__name__, self._state) 

550 

551 def __repr__(self) -> str: 

552 return self._repr() 

553 

554 def __str__(self) -> str: 

555 return self._repr() 

556 

557 def __del__(self) -> None: 

558 with self._state.condition: 

559 if self._state.code is None: 

560 self._state.code = grpc.StatusCode.CANCELLED 

561 self._state.details = "Cancelled upon garbage collection!" 

562 self._state.cancelled = True 

563 self._call.cancel( 

564 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], 

565 self._state.details, 

566 ) 

567 self._state.condition.notify_all() 

568 

569 

570class _SingleThreadedRendezvous( 

571 _Rendezvous, grpc.Call, grpc.Future 

572): # pylint: disable=too-many-ancestors 

573 """An RPC iterator operating entirely on a single thread. 

574 

575 The __next__ method of _SingleThreadedRendezvous does not depend on the 

576 existence of any other thread, including the "channel spin thread". 

577 However, this means that its interface is entirely synchronous. So this 

578 class cannot completely fulfill the grpc.Future interface. The result, 

579 exception, and traceback methods will never block and will instead raise 

580 an exception if calling the method would result in blocking. 

581 

582 This means that these methods are safe to call from add_done_callback 

583 handlers. 

584 """ 

585 

586 _state: _RPCState 

587 

588 def _is_complete(self) -> bool: 

589 return self._state.code is not None 

590 

591 def cancelled(self) -> bool: 

592 with self._state.condition: 

593 return self._state.cancelled 

594 

595 def running(self) -> bool: 

596 with self._state.condition: 

597 return self._state.code is None 

598 

599 def done(self) -> bool: 

600 with self._state.condition: 

601 return self._state.code is not None 

602 

603 def result(self, timeout: Optional[float] = None) -> Any: 

604 """Returns the result of the computation or raises its exception. 

605 

606 This method will never block. Instead, it will raise an exception 

607 if calling this method would otherwise result in blocking. 

608 

609 Since this method will never block, any `timeout` argument passed will 

610 be ignored. 

611 """ 

612 del timeout 

613 with self._state.condition: 

614 if not self._is_complete(): 

615 raise grpc.experimental.UsageError( 

616 "_SingleThreadedRendezvous only supports result() when the" 

617 " RPC is complete." 

618 ) 

619 if self._state.code is grpc.StatusCode.OK: 

620 return self._state.response 

621 elif self._state.cancelled: 

622 raise grpc.FutureCancelledError() 

623 else: 

624 raise self 

625 

626 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

627 """Return the exception raised by the computation. 

628 

629 This method will never block. Instead, it will raise an exception 

630 if calling this method would otherwise result in blocking. 

631 

632 Since this method will never block, any `timeout` argument passed will 

633 be ignored. 

634 """ 

635 del timeout 

636 with self._state.condition: 

637 if not self._is_complete(): 

638 raise grpc.experimental.UsageError( 

639 "_SingleThreadedRendezvous only supports exception() when" 

640 " the RPC is complete." 

641 ) 

642 if self._state.code is grpc.StatusCode.OK: 

643 return None 

644 elif self._state.cancelled: 

645 raise grpc.FutureCancelledError() 

646 else: 

647 return self 

648 

649 def traceback( 

650 self, timeout: Optional[float] = None 

651 ) -> Optional[types.TracebackType]: 

652 """Access the traceback of the exception raised by the computation. 

653 

654 This method will never block. Instead, it will raise an exception 

655 if calling this method would otherwise result in blocking. 

656 

657 Since this method will never block, any `timeout` argument passed will 

658 be ignored. 

659 """ 

660 del timeout 

661 with self._state.condition: 

662 if not self._is_complete(): 

663 raise grpc.experimental.UsageError( 

664 "_SingleThreadedRendezvous only supports traceback() when" 

665 " the RPC is complete." 

666 ) 

667 if self._state.code is grpc.StatusCode.OK: 

668 return None 

669 elif self._state.cancelled: 

670 raise grpc.FutureCancelledError() 

671 else: 

672 try: 

673 raise self 

674 except grpc.RpcError: 

675 return sys.exc_info()[2] 

676 

677 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

678 with self._state.condition: 

679 if self._state.code is None: 

680 self._state.callbacks.append(functools.partial(fn, self)) 

681 return 

682 

683 fn(self) 

684 

685 def initial_metadata(self) -> Optional[MetadataType]: 

686 """See grpc.Call.initial_metadata""" 

687 with self._state.condition: 

688 # NOTE(gnossen): Based on our initial call batch, we are guaranteed 

689 # to receive initial metadata before any messages. 

690 while self._state.initial_metadata is None: 

691 self._consume_next_event() 

692 return self._state.initial_metadata 

693 

694 def trailing_metadata(self) -> Optional[MetadataType]: 

695 """See grpc.Call.trailing_metadata""" 

696 with self._state.condition: 

697 if self._state.trailing_metadata is None: 

698 raise grpc.experimental.UsageError( 

699 "Cannot get trailing metadata until RPC is completed." 

700 ) 

701 return self._state.trailing_metadata 

702 

703 def code(self) -> Optional[grpc.StatusCode]: 

704 """See grpc.Call.code""" 

705 with self._state.condition: 

706 if self._state.code is None: 

707 raise grpc.experimental.UsageError( 

708 "Cannot get code until RPC is completed." 

709 ) 

710 return self._state.code 

711 

712 def details(self) -> Optional[str]: 

713 """See grpc.Call.details""" 

714 with self._state.condition: 

715 if self._state.details is None: 

716 raise grpc.experimental.UsageError( 

717 "Cannot get details until RPC is completed." 

718 ) 

719 return _common.decode(self._state.details) 

720 

721 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]: 

722 event = self._call.next_event() 

723 with self._state.condition: 

724 callbacks = _handle_event( 

725 event, self._state, self._response_deserializer 

726 ) 

727 for callback in callbacks: 

728 # NOTE(gnossen): We intentionally allow exceptions to bubble up 

729 # to the user when running on a single thread. 

730 callback() 

731 return event 

732 

733 def _next_response(self) -> Any: 

734 while True: 

735 self._consume_next_event() 

736 with self._state.condition: 

737 if self._state.response is not None: 

738 response = self._state.response 

739 self._state.response = None 

740 return response 

741 elif ( 

742 cygrpc.OperationType.receive_message not in self._state.due 

743 ): 

744 if self._state.code is grpc.StatusCode.OK: 

745 raise StopIteration() 

746 elif self._state.code is not None: 

747 raise self 

748 

749 def _next(self) -> Any: 

750 with self._state.condition: 

751 if self._state.code is None: 

752 # We tentatively add the operation as expected and remove 

753 # it if the enqueue operation fails. This allows us to guarantee that 

754 # if an event has been submitted to the core completion queue, 

755 # it is in `due`. If we waited until after a successful 

756 # enqueue operation then a signal could interrupt this 

757 # thread between the enqueue operation and the addition of the 

758 # operation to `due`. This would cause an exception on the 

759 # channel spin thread when the operation completes and no 

760 # corresponding operation would be present in state.due. 

761 # Note that, since `condition` is held through this block, there is 

762 # no data race on `due`. 

763 self._state.due.add(cygrpc.OperationType.receive_message) 

764 operating = self._call.operate( 

765 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None 

766 ) 

767 if not operating: 

768 self._state.due.remove(cygrpc.OperationType.receive_message) 

769 elif self._state.code is grpc.StatusCode.OK: 

770 raise StopIteration() 

771 else: 

772 raise self 

773 return self._next_response() 

774 

775 def debug_error_string(self) -> Optional[str]: 

776 with self._state.condition: 

777 if self._state.debug_error_string is None: 

778 raise grpc.experimental.UsageError( 

779 "Cannot get debug error string until RPC is completed." 

780 ) 

781 return _common.decode(self._state.debug_error_string) 

782 

783 

784class _MultiThreadedRendezvous( 

785 _Rendezvous, grpc.Call, grpc.Future 

786): # pylint: disable=too-many-ancestors 

787 """An RPC iterator that depends on a channel spin thread. 

788 

789 This iterator relies upon a per-channel thread running in the background, 

790 dequeueing events from the completion queue, and notifying threads waiting 

791 on the threading.Condition object in the _RPCState object. 

792 

793 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface 

794 and to mediate a bidirection streaming RPC. 

795 """ 

796 

797 _state: _RPCState 

798 

799 def initial_metadata(self) -> Optional[MetadataType]: 

800 """See grpc.Call.initial_metadata""" 

801 with self._state.condition: 

802 

803 def _done(): 

804 return self._state.initial_metadata is not None 

805 

806 _common.wait(self._state.condition.wait, _done) 

807 return self._state.initial_metadata 

808 

809 def trailing_metadata(self) -> Optional[MetadataType]: 

810 """See grpc.Call.trailing_metadata""" 

811 with self._state.condition: 

812 

813 def _done(): 

814 return self._state.trailing_metadata is not None 

815 

816 _common.wait(self._state.condition.wait, _done) 

817 return self._state.trailing_metadata 

818 

819 def code(self) -> Optional[grpc.StatusCode]: 

820 """See grpc.Call.code""" 

821 with self._state.condition: 

822 

823 def _done(): 

824 return self._state.code is not None 

825 

826 _common.wait(self._state.condition.wait, _done) 

827 return self._state.code 

828 

829 def details(self) -> Optional[str]: 

830 """See grpc.Call.details""" 

831 with self._state.condition: 

832 

833 def _done(): 

834 return self._state.details is not None 

835 

836 _common.wait(self._state.condition.wait, _done) 

837 return _common.decode(self._state.details) 

838 

839 def debug_error_string(self) -> Optional[str]: 

840 with self._state.condition: 

841 

842 def _done(): 

843 return self._state.debug_error_string is not None 

844 

845 _common.wait(self._state.condition.wait, _done) 

846 return _common.decode(self._state.debug_error_string) 

847 

848 def cancelled(self) -> bool: 

849 with self._state.condition: 

850 return self._state.cancelled 

851 

852 def running(self) -> bool: 

853 with self._state.condition: 

854 return self._state.code is None 

855 

856 def done(self) -> bool: 

857 with self._state.condition: 

858 return self._state.code is not None 

859 

860 def _is_complete(self) -> bool: 

861 return self._state.code is not None 

862 

863 def result(self, timeout: Optional[float] = None) -> Any: 

864 """Returns the result of the computation or raises its exception. 

865 

866 See grpc.Future.result for the full API contract. 

867 """ 

868 with self._state.condition: 

869 timed_out = _common.wait( 

870 self._state.condition.wait, self._is_complete, timeout=timeout 

871 ) 

872 if timed_out: 

873 raise grpc.FutureTimeoutError() 

874 else: 

875 if self._state.code is grpc.StatusCode.OK: 

876 return self._state.response 

877 elif self._state.cancelled: 

878 raise grpc.FutureCancelledError() 

879 else: 

880 raise self 

881 

882 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

883 """Return the exception raised by the computation. 

884 

885 See grpc.Future.exception for the full API contract. 

886 """ 

887 with self._state.condition: 

888 timed_out = _common.wait( 

889 self._state.condition.wait, self._is_complete, timeout=timeout 

890 ) 

891 if timed_out: 

892 raise grpc.FutureTimeoutError() 

893 else: 

894 if self._state.code is grpc.StatusCode.OK: 

895 return None 

896 elif self._state.cancelled: 

897 raise grpc.FutureCancelledError() 

898 else: 

899 return self 

900 

901 def traceback( 

902 self, timeout: Optional[float] = None 

903 ) -> Optional[types.TracebackType]: 

904 """Access the traceback of the exception raised by the computation. 

905 

906 See grpc.future.traceback for the full API contract. 

907 """ 

908 with self._state.condition: 

909 timed_out = _common.wait( 

910 self._state.condition.wait, self._is_complete, timeout=timeout 

911 ) 

912 if timed_out: 

913 raise grpc.FutureTimeoutError() 

914 else: 

915 if self._state.code is grpc.StatusCode.OK: 

916 return None 

917 elif self._state.cancelled: 

918 raise grpc.FutureCancelledError() 

919 else: 

920 try: 

921 raise self 

922 except grpc.RpcError: 

923 return sys.exc_info()[2] 

924 

925 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

926 with self._state.condition: 

927 if self._state.code is None: 

928 self._state.callbacks.append(functools.partial(fn, self)) 

929 return 

930 

931 fn(self) 

932 

933 def _next(self) -> Any: 

934 with self._state.condition: 

935 if self._state.code is None: 

936 event_handler = _event_handler( 

937 self._state, self._response_deserializer 

938 ) 

939 self._state.due.add(cygrpc.OperationType.receive_message) 

940 operating = self._call.operate( 

941 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 

942 event_handler, 

943 ) 

944 if not operating: 

945 self._state.due.remove(cygrpc.OperationType.receive_message) 

946 elif self._state.code is grpc.StatusCode.OK: 

947 raise StopIteration() 

948 else: 

949 raise self 

950 

951 def _response_ready(): 

952 return self._state.response is not None or ( 

953 cygrpc.OperationType.receive_message not in self._state.due 

954 and self._state.code is not None 

955 ) 

956 

957 _common.wait(self._state.condition.wait, _response_ready) 

958 if self._state.response is not None: 

959 response = self._state.response 

960 self._state.response = None 

961 return response 

962 elif cygrpc.OperationType.receive_message not in self._state.due: 

963 if self._state.code is grpc.StatusCode.OK: 

964 raise StopIteration() 

965 elif self._state.code is not None: 

966 raise self 

967 

968 

969def _start_unary_request( 

970 request: Any, 

971 timeout: Optional[float], 

972 request_serializer: SerializingFunction, 

973) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]: 

974 deadline = _deadline(timeout) 

975 serialized_request = _common.serialize(request, request_serializer) 

976 if serialized_request is None: 

977 state = _RPCState( 

978 (), 

979 (), 

980 (), 

981 grpc.StatusCode.INTERNAL, 

982 "Exception serializing request!", 

983 ) 

984 error = _InactiveRpcError(state) 

985 return deadline, None, error 

986 else: 

987 return deadline, serialized_request, None 

988 

989 

990def _end_unary_response_blocking( 

991 state: _RPCState, 

992 call: cygrpc.SegregatedCall, 

993 with_call: bool, 

994 deadline: Optional[float], 

995) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]: 

996 if state.code is grpc.StatusCode.OK: 

997 if with_call: 

998 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline) 

999 return state.response, rendezvous 

1000 else: 

1001 return state.response 

1002 else: 

1003 raise _InactiveRpcError(state) # pytype: disable=not-instantiable 

1004 

1005 

1006def _stream_unary_invocation_operations( 

1007 metadata: Optional[MetadataType], initial_metadata_flags: int 

1008) -> Sequence[Sequence[cygrpc.Operation]]: 

1009 return ( 

1010 ( 

1011 cygrpc.SendInitialMetadataOperation( 

1012 metadata, initial_metadata_flags 

1013 ), 

1014 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1015 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1016 ), 

1017 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1018 ) 

1019 

1020 

1021def _stream_unary_invocation_operations_and_tags( 

1022 metadata: Optional[MetadataType], initial_metadata_flags: int 

1023) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]: 

1024 return tuple( 

1025 ( 

1026 operations, 

1027 None, 

1028 ) 

1029 for operations in _stream_unary_invocation_operations( 

1030 metadata, initial_metadata_flags 

1031 ) 

1032 ) 

1033 

1034 

1035def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]: 

1036 parent_deadline = cygrpc.get_deadline_from_context() 

1037 if parent_deadline is None and user_deadline is None: 

1038 return None 

1039 elif parent_deadline is not None and user_deadline is None: 

1040 return parent_deadline 

1041 elif user_deadline is not None and parent_deadline is None: 

1042 return user_deadline 

1043 else: 

1044 return min(parent_deadline, user_deadline) 

1045 

1046 

1047class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): 

1048 _channel: cygrpc.Channel 

1049 _managed_call: IntegratedCallFactory 

1050 _method: bytes 

1051 _request_serializer: Optional[SerializingFunction] 

1052 _response_deserializer: Optional[DeserializingFunction] 

1053 _context: Any 

1054 

1055 # pylint: disable=too-many-arguments 

1056 def __init__( 

1057 self, 

1058 channel: cygrpc.Channel, 

1059 managed_call: IntegratedCallFactory, 

1060 method: bytes, 

1061 request_serializer: Optional[SerializingFunction], 

1062 response_deserializer: Optional[DeserializingFunction], 

1063 ): 

1064 self._channel = channel 

1065 self._managed_call = managed_call 

1066 self._method = method 

1067 self._request_serializer = request_serializer 

1068 self._response_deserializer = response_deserializer 

1069 self._context = cygrpc.build_census_context() 

1070 

1071 def _prepare( 

1072 self, 

1073 request: Any, 

1074 timeout: Optional[float], 

1075 metadata: Optional[MetadataType], 

1076 wait_for_ready: Optional[bool], 

1077 compression: Optional[grpc.Compression], 

1078 ) -> Tuple[ 

1079 Optional[_RPCState], 

1080 Optional[Sequence[cygrpc.Operation]], 

1081 Optional[float], 

1082 Optional[grpc.RpcError], 

1083 ]: 

1084 deadline, serialized_request, rendezvous = _start_unary_request( 

1085 request, timeout, self._request_serializer 

1086 ) 

1087 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1088 wait_for_ready 

1089 ) 

1090 augmented_metadata = _compression.augment_metadata( 

1091 metadata, compression 

1092 ) 

1093 if serialized_request is None: 

1094 return None, None, None, rendezvous 

1095 else: 

1096 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) 

1097 operations = ( 

1098 cygrpc.SendInitialMetadataOperation( 

1099 augmented_metadata, initial_metadata_flags 

1100 ), 

1101 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1102 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1103 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), 

1104 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1105 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1106 ) 

1107 return state, operations, deadline, None 

1108 

1109 def _blocking( 

1110 self, 

1111 request: Any, 

1112 timeout: Optional[float] = None, 

1113 metadata: Optional[MetadataType] = None, 

1114 credentials: Optional[grpc.CallCredentials] = None, 

1115 wait_for_ready: Optional[bool] = None, 

1116 compression: Optional[grpc.Compression] = None, 

1117 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1118 state, operations, deadline, rendezvous = self._prepare( 

1119 request, timeout, metadata, wait_for_ready, compression 

1120 ) 

1121 if state is None: 

1122 raise rendezvous # pylint: disable-msg=raising-bad-type 

1123 else: 

1124 state.rpc_start_time = time.perf_counter() 

1125 state.method = _common.decode(self._method) 

1126 call = self._channel.segregated_call( 

1127 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1128 self._method, 

1129 None, 

1130 _determine_deadline(deadline), 

1131 metadata, 

1132 None if credentials is None else credentials._credentials, 

1133 ( 

1134 ( 

1135 operations, 

1136 None, 

1137 ), 

1138 ), 

1139 self._context, 

1140 ) 

1141 event = call.next_event() 

1142 _handle_event(event, state, self._response_deserializer) 

1143 return state, call 

1144 

1145 def __call__( 

1146 self, 

1147 request: Any, 

1148 timeout: Optional[float] = None, 

1149 metadata: Optional[MetadataType] = None, 

1150 credentials: Optional[grpc.CallCredentials] = None, 

1151 wait_for_ready: Optional[bool] = None, 

1152 compression: Optional[grpc.Compression] = None, 

1153 ) -> Any: 

1154 ( 

1155 state, 

1156 call, 

1157 ) = self._blocking( 

1158 request, timeout, metadata, credentials, wait_for_ready, compression 

1159 ) 

1160 return _end_unary_response_blocking(state, call, False, None) 

1161 

1162 def with_call( 

1163 self, 

1164 request: Any, 

1165 timeout: Optional[float] = None, 

1166 metadata: Optional[MetadataType] = None, 

1167 credentials: Optional[grpc.CallCredentials] = None, 

1168 wait_for_ready: Optional[bool] = None, 

1169 compression: Optional[grpc.Compression] = None, 

1170 ) -> Tuple[Any, grpc.Call]: 

1171 ( 

1172 state, 

1173 call, 

1174 ) = self._blocking( 

1175 request, timeout, metadata, credentials, wait_for_ready, compression 

1176 ) 

1177 return _end_unary_response_blocking(state, call, True, None) 

1178 

1179 def future( 

1180 self, 

1181 request: Any, 

1182 timeout: Optional[float] = None, 

1183 metadata: Optional[MetadataType] = None, 

1184 credentials: Optional[grpc.CallCredentials] = None, 

1185 wait_for_ready: Optional[bool] = None, 

1186 compression: Optional[grpc.Compression] = None, 

1187 ) -> _MultiThreadedRendezvous: 

1188 state, operations, deadline, rendezvous = self._prepare( 

1189 request, timeout, metadata, wait_for_ready, compression 

1190 ) 

1191 if state is None: 

1192 raise rendezvous # pylint: disable-msg=raising-bad-type 

1193 else: 

1194 event_handler = _event_handler(state, self._response_deserializer) 

1195 state.rpc_start_time = time.perf_counter() 

1196 state.method = _common.decode(self._method) 

1197 call = self._managed_call( 

1198 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1199 self._method, 

1200 None, 

1201 deadline, 

1202 metadata, 

1203 None if credentials is None else credentials._credentials, 

1204 (operations,), 

1205 event_handler, 

1206 self._context, 

1207 ) 

1208 return _MultiThreadedRendezvous( 

1209 state, call, self._response_deserializer, deadline 

1210 ) 

1211 

1212 

1213class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1214 _channel: cygrpc.Channel 

1215 _method: bytes 

1216 _request_serializer: Optional[SerializingFunction] 

1217 _response_deserializer: Optional[DeserializingFunction] 

1218 _context: Any 

1219 

1220 # pylint: disable=too-many-arguments 

1221 def __init__( 

1222 self, 

1223 channel: cygrpc.Channel, 

1224 method: bytes, 

1225 request_serializer: SerializingFunction, 

1226 response_deserializer: DeserializingFunction, 

1227 ): 

1228 self._channel = channel 

1229 self._method = method 

1230 self._request_serializer = request_serializer 

1231 self._response_deserializer = response_deserializer 

1232 self._context = cygrpc.build_census_context() 

1233 

1234 def __call__( # pylint: disable=too-many-locals 

1235 self, 

1236 request: Any, 

1237 timeout: Optional[float] = None, 

1238 metadata: Optional[MetadataType] = None, 

1239 credentials: Optional[grpc.CallCredentials] = None, 

1240 wait_for_ready: Optional[bool] = None, 

1241 compression: Optional[grpc.Compression] = None, 

1242 ) -> _SingleThreadedRendezvous: 

1243 deadline = _deadline(timeout) 

1244 serialized_request = _common.serialize( 

1245 request, self._request_serializer 

1246 ) 

1247 if serialized_request is None: 

1248 state = _RPCState( 

1249 (), 

1250 (), 

1251 (), 

1252 grpc.StatusCode.INTERNAL, 

1253 "Exception serializing request!", 

1254 ) 

1255 raise _InactiveRpcError(state) 

1256 

1257 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1258 call_credentials = ( 

1259 None if credentials is None else credentials._credentials 

1260 ) 

1261 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1262 wait_for_ready 

1263 ) 

1264 augmented_metadata = _compression.augment_metadata( 

1265 metadata, compression 

1266 ) 

1267 operations = ( 

1268 ( 

1269 cygrpc.SendInitialMetadataOperation( 

1270 augmented_metadata, initial_metadata_flags 

1271 ), 

1272 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1273 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1274 ), 

1275 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),), 

1276 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1277 ) 

1278 operations_and_tags = tuple((ops, None) for ops in operations) 

1279 state.rpc_start_time = time.perf_counter() 

1280 state.method = _common.decode(self._method) 

1281 call = self._channel.segregated_call( 

1282 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1283 self._method, 

1284 None, 

1285 _determine_deadline(deadline), 

1286 metadata, 

1287 call_credentials, 

1288 operations_and_tags, 

1289 self._context, 

1290 ) 

1291 return _SingleThreadedRendezvous( 

1292 state, call, self._response_deserializer, deadline 

1293 ) 

1294 

1295 

1296class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1297 _channel: cygrpc.Channel 

1298 _managed_call: IntegratedCallFactory 

1299 _method: bytes 

1300 _request_serializer: Optional[SerializingFunction] 

1301 _response_deserializer: Optional[DeserializingFunction] 

1302 _context: Any 

1303 

1304 # pylint: disable=too-many-arguments 

1305 def __init__( 

1306 self, 

1307 channel: cygrpc.Channel, 

1308 managed_call: IntegratedCallFactory, 

1309 method: bytes, 

1310 request_serializer: SerializingFunction, 

1311 response_deserializer: DeserializingFunction, 

1312 ): 

1313 self._channel = channel 

1314 self._managed_call = managed_call 

1315 self._method = method 

1316 self._request_serializer = request_serializer 

1317 self._response_deserializer = response_deserializer 

1318 self._context = cygrpc.build_census_context() 

1319 

1320 def __call__( # pylint: disable=too-many-locals 

1321 self, 

1322 request: Any, 

1323 timeout: Optional[float] = None, 

1324 metadata: Optional[MetadataType] = None, 

1325 credentials: Optional[grpc.CallCredentials] = None, 

1326 wait_for_ready: Optional[bool] = None, 

1327 compression: Optional[grpc.Compression] = None, 

1328 ) -> _MultiThreadedRendezvous: 

1329 deadline, serialized_request, rendezvous = _start_unary_request( 

1330 request, timeout, self._request_serializer 

1331 ) 

1332 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1333 wait_for_ready 

1334 ) 

1335 if serialized_request is None: 

1336 raise rendezvous # pylint: disable-msg=raising-bad-type 

1337 else: 

1338 augmented_metadata = _compression.augment_metadata( 

1339 metadata, compression 

1340 ) 

1341 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1342 operations = ( 

1343 ( 

1344 cygrpc.SendInitialMetadataOperation( 

1345 augmented_metadata, initial_metadata_flags 

1346 ), 

1347 cygrpc.SendMessageOperation( 

1348 serialized_request, _EMPTY_FLAGS 

1349 ), 

1350 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1351 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1352 ), 

1353 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1354 ) 

1355 state.rpc_start_time = time.perf_counter() 

1356 state.method = _common.decode(self._method) 

1357 call = self._managed_call( 

1358 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1359 self._method, 

1360 None, 

1361 _determine_deadline(deadline), 

1362 metadata, 

1363 None if credentials is None else credentials._credentials, 

1364 operations, 

1365 _event_handler(state, self._response_deserializer), 

1366 self._context, 

1367 ) 

1368 return _MultiThreadedRendezvous( 

1369 state, call, self._response_deserializer, deadline 

1370 ) 

1371 

1372 

1373class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): 

1374 _channel: cygrpc.Channel 

1375 _managed_call: IntegratedCallFactory 

1376 _method: bytes 

1377 _request_serializer: Optional[SerializingFunction] 

1378 _response_deserializer: Optional[DeserializingFunction] 

1379 _context: Any 

1380 

1381 # pylint: disable=too-many-arguments 

1382 def __init__( 

1383 self, 

1384 channel: cygrpc.Channel, 

1385 managed_call: IntegratedCallFactory, 

1386 method: bytes, 

1387 request_serializer: Optional[SerializingFunction], 

1388 response_deserializer: Optional[DeserializingFunction], 

1389 ): 

1390 self._channel = channel 

1391 self._managed_call = managed_call 

1392 self._method = method 

1393 self._request_serializer = request_serializer 

1394 self._response_deserializer = response_deserializer 

1395 self._context = cygrpc.build_census_context() 

1396 

1397 def _blocking( 

1398 self, 

1399 request_iterator: Iterator, 

1400 timeout: Optional[float], 

1401 metadata: Optional[MetadataType], 

1402 credentials: Optional[grpc.CallCredentials], 

1403 wait_for_ready: Optional[bool], 

1404 compression: Optional[grpc.Compression], 

1405 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1406 deadline = _deadline(timeout) 

1407 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1408 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1409 wait_for_ready 

1410 ) 

1411 augmented_metadata = _compression.augment_metadata( 

1412 metadata, compression 

1413 ) 

1414 state.rpc_start_time = time.perf_counter() 

1415 state.method = _common.decode(self._method) 

1416 call = self._channel.segregated_call( 

1417 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1418 self._method, 

1419 None, 

1420 _determine_deadline(deadline), 

1421 augmented_metadata, 

1422 None if credentials is None else credentials._credentials, 

1423 _stream_unary_invocation_operations_and_tags( 

1424 augmented_metadata, initial_metadata_flags 

1425 ), 

1426 self._context, 

1427 ) 

1428 _consume_request_iterator( 

1429 request_iterator, state, call, self._request_serializer, None 

1430 ) 

1431 while True: 

1432 event = call.next_event() 

1433 with state.condition: 

1434 _handle_event(event, state, self._response_deserializer) 

1435 state.condition.notify_all() 

1436 if not state.due: 

1437 break 

1438 return state, call 

1439 

1440 def __call__( 

1441 self, 

1442 request_iterator: Iterator, 

1443 timeout: Optional[float] = None, 

1444 metadata: Optional[MetadataType] = None, 

1445 credentials: Optional[grpc.CallCredentials] = None, 

1446 wait_for_ready: Optional[bool] = None, 

1447 compression: Optional[grpc.Compression] = None, 

1448 ) -> Any: 

1449 ( 

1450 state, 

1451 call, 

1452 ) = self._blocking( 

1453 request_iterator, 

1454 timeout, 

1455 metadata, 

1456 credentials, 

1457 wait_for_ready, 

1458 compression, 

1459 ) 

1460 return _end_unary_response_blocking(state, call, False, None) 

1461 

1462 def with_call( 

1463 self, 

1464 request_iterator: Iterator, 

1465 timeout: Optional[float] = None, 

1466 metadata: Optional[MetadataType] = None, 

1467 credentials: Optional[grpc.CallCredentials] = None, 

1468 wait_for_ready: Optional[bool] = None, 

1469 compression: Optional[grpc.Compression] = None, 

1470 ) -> Tuple[Any, grpc.Call]: 

1471 ( 

1472 state, 

1473 call, 

1474 ) = self._blocking( 

1475 request_iterator, 

1476 timeout, 

1477 metadata, 

1478 credentials, 

1479 wait_for_ready, 

1480 compression, 

1481 ) 

1482 return _end_unary_response_blocking(state, call, True, None) 

1483 

1484 def future( 

1485 self, 

1486 request_iterator: Iterator, 

1487 timeout: Optional[float] = None, 

1488 metadata: Optional[MetadataType] = None, 

1489 credentials: Optional[grpc.CallCredentials] = None, 

1490 wait_for_ready: Optional[bool] = None, 

1491 compression: Optional[grpc.Compression] = None, 

1492 ) -> _MultiThreadedRendezvous: 

1493 deadline = _deadline(timeout) 

1494 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1495 event_handler = _event_handler(state, self._response_deserializer) 

1496 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1497 wait_for_ready 

1498 ) 

1499 augmented_metadata = _compression.augment_metadata( 

1500 metadata, compression 

1501 ) 

1502 state.rpc_start_time = time.perf_counter() 

1503 state.method = _common.decode(self._method) 

1504 call = self._managed_call( 

1505 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1506 self._method, 

1507 None, 

1508 deadline, 

1509 augmented_metadata, 

1510 None if credentials is None else credentials._credentials, 

1511 _stream_unary_invocation_operations( 

1512 metadata, initial_metadata_flags 

1513 ), 

1514 event_handler, 

1515 self._context, 

1516 ) 

1517 _consume_request_iterator( 

1518 request_iterator, 

1519 state, 

1520 call, 

1521 self._request_serializer, 

1522 event_handler, 

1523 ) 

1524 return _MultiThreadedRendezvous( 

1525 state, call, self._response_deserializer, deadline 

1526 ) 

1527 

1528 

1529class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): 

1530 _channel: cygrpc.Channel 

1531 _managed_call: IntegratedCallFactory 

1532 _method: bytes 

1533 _request_serializer: Optional[SerializingFunction] 

1534 _response_deserializer: Optional[DeserializingFunction] 

1535 _context: Any 

1536 

1537 # pylint: disable=too-many-arguments 

1538 def __init__( 

1539 self, 

1540 channel: cygrpc.Channel, 

1541 managed_call: IntegratedCallFactory, 

1542 method: bytes, 

1543 request_serializer: Optional[SerializingFunction] = None, 

1544 response_deserializer: Optional[DeserializingFunction] = None, 

1545 ): 

1546 self._channel = channel 

1547 self._managed_call = managed_call 

1548 self._method = method 

1549 self._request_serializer = request_serializer 

1550 self._response_deserializer = response_deserializer 

1551 self._context = cygrpc.build_census_context() 

1552 

1553 def __call__( 

1554 self, 

1555 request_iterator: Iterator, 

1556 timeout: Optional[float] = None, 

1557 metadata: Optional[MetadataType] = None, 

1558 credentials: Optional[grpc.CallCredentials] = None, 

1559 wait_for_ready: Optional[bool] = None, 

1560 compression: Optional[grpc.Compression] = None, 

1561 ) -> _MultiThreadedRendezvous: 

1562 deadline = _deadline(timeout) 

1563 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) 

1564 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1565 wait_for_ready 

1566 ) 

1567 augmented_metadata = _compression.augment_metadata( 

1568 metadata, compression 

1569 ) 

1570 operations = ( 

1571 ( 

1572 cygrpc.SendInitialMetadataOperation( 

1573 augmented_metadata, initial_metadata_flags 

1574 ), 

1575 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1576 ), 

1577 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1578 ) 

1579 event_handler = _event_handler(state, self._response_deserializer) 

1580 state.rpc_start_time = time.perf_counter() 

1581 state.method = _common.decode(self._method) 

1582 call = self._managed_call( 

1583 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1584 self._method, 

1585 None, 

1586 _determine_deadline(deadline), 

1587 augmented_metadata, 

1588 None if credentials is None else credentials._credentials, 

1589 operations, 

1590 event_handler, 

1591 self._context, 

1592 ) 

1593 _consume_request_iterator( 

1594 request_iterator, 

1595 state, 

1596 call, 

1597 self._request_serializer, 

1598 event_handler, 

1599 ) 

1600 return _MultiThreadedRendezvous( 

1601 state, call, self._response_deserializer, deadline 

1602 ) 

1603 

1604 

1605class _InitialMetadataFlags(int): 

1606 """Stores immutable initial metadata flags""" 

1607 

1608 def __new__(cls, value: int = _EMPTY_FLAGS): 

1609 value &= cygrpc.InitialMetadataFlags.used_mask 

1610 return super(_InitialMetadataFlags, cls).__new__(cls, value) 

1611 

1612 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int: 

1613 if wait_for_ready is not None: 

1614 if wait_for_ready: 

1615 return self.__class__( 

1616 self 

1617 | cygrpc.InitialMetadataFlags.wait_for_ready 

1618 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1619 ) 

1620 elif not wait_for_ready: 

1621 return self.__class__( 

1622 self & ~cygrpc.InitialMetadataFlags.wait_for_ready 

1623 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1624 ) 

1625 return self 

1626 

1627 

1628class _ChannelCallState(object): 

1629 channel: cygrpc.Channel 

1630 managed_calls: int 

1631 threading: bool 

1632 

1633 def __init__(self, channel: cygrpc.Channel): 

1634 self.lock = threading.Lock() 

1635 self.channel = channel 

1636 self.managed_calls = 0 

1637 self.threading = False 

1638 

1639 def reset_postfork_child(self) -> None: 

1640 self.managed_calls = 0 

1641 

1642 def __del__(self): 

1643 try: 

1644 self.channel.close( 

1645 cygrpc.StatusCode.cancelled, "Channel deallocated!" 

1646 ) 

1647 except (TypeError, AttributeError): 

1648 pass 

1649 

1650 

1651def _run_channel_spin_thread(state: _ChannelCallState) -> None: 

1652 def channel_spin(): 

1653 while True: 

1654 cygrpc.block_if_fork_in_progress(state) 

1655 event = state.channel.next_call_event() 

1656 if event.completion_type == cygrpc.CompletionType.queue_timeout: 

1657 continue 

1658 call_completed = event.tag(event) 

1659 if call_completed: 

1660 with state.lock: 

1661 state.managed_calls -= 1 

1662 if state.managed_calls == 0: 

1663 return 

1664 

1665 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin) 

1666 channel_spin_thread.setDaemon(True) 

1667 channel_spin_thread.start() 

1668 

1669 

1670def _channel_managed_call_management(state: _ChannelCallState): 

1671 # pylint: disable=too-many-arguments 

1672 def create( 

1673 flags: int, 

1674 method: bytes, 

1675 host: Optional[str], 

1676 deadline: Optional[float], 

1677 metadata: Optional[MetadataType], 

1678 credentials: Optional[cygrpc.CallCredentials], 

1679 operations: Sequence[Sequence[cygrpc.Operation]], 

1680 event_handler: UserTag, 

1681 context, 

1682 ) -> cygrpc.IntegratedCall: 

1683 """Creates a cygrpc.IntegratedCall. 

1684 

1685 Args: 

1686 flags: An integer bitfield of call flags. 

1687 method: The RPC method. 

1688 host: A host string for the created call. 

1689 deadline: A float to be the deadline of the created call or None if 

1690 the call is to have an infinite deadline. 

1691 metadata: The metadata for the call or None. 

1692 credentials: A cygrpc.CallCredentials or None. 

1693 operations: A sequence of sequences of cygrpc.Operations to be 

1694 started on the call. 

1695 event_handler: A behavior to call to handle the events resultant from 

1696 the operations on the call. 

1697 context: Context object for distributed tracing. 

1698 Returns: 

1699 A cygrpc.IntegratedCall with which to conduct an RPC. 

1700 """ 

1701 operations_and_tags = tuple( 

1702 ( 

1703 operation, 

1704 event_handler, 

1705 ) 

1706 for operation in operations 

1707 ) 

1708 with state.lock: 

1709 call = state.channel.integrated_call( 

1710 flags, 

1711 method, 

1712 host, 

1713 deadline, 

1714 metadata, 

1715 credentials, 

1716 operations_and_tags, 

1717 context, 

1718 ) 

1719 if state.managed_calls == 0: 

1720 state.managed_calls = 1 

1721 _run_channel_spin_thread(state) 

1722 else: 

1723 state.managed_calls += 1 

1724 return call 

1725 

1726 return create 

1727 

1728 

1729class _ChannelConnectivityState(object): 

1730 lock: threading.RLock 

1731 channel: grpc.Channel 

1732 polling: bool 

1733 connectivity: grpc.ChannelConnectivity 

1734 try_to_connect: bool 

1735 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704 

1736 callbacks_and_connectivities: List[ 

1737 Sequence[ 

1738 Union[ 

1739 Callable[[grpc.ChannelConnectivity], None], 

1740 Optional[grpc.ChannelConnectivity], 

1741 ] 

1742 ] 

1743 ] 

1744 delivering: bool 

1745 

1746 def __init__(self, channel: grpc.Channel): 

1747 self.lock = threading.RLock() 

1748 self.channel = channel 

1749 self.polling = False 

1750 self.connectivity = None 

1751 self.try_to_connect = False 

1752 self.callbacks_and_connectivities = [] 

1753 self.delivering = False 

1754 

1755 def reset_postfork_child(self) -> None: 

1756 self.polling = False 

1757 self.connectivity = None 

1758 self.try_to_connect = False 

1759 self.callbacks_and_connectivities = [] 

1760 self.delivering = False 

1761 

1762 

1763def _deliveries( 

1764 state: _ChannelConnectivityState, 

1765) -> List[Callable[[grpc.ChannelConnectivity], None]]: 

1766 callbacks_needing_update = [] 

1767 for callback_and_connectivity in state.callbacks_and_connectivities: 

1768 ( 

1769 callback, 

1770 callback_connectivity, 

1771 ) = callback_and_connectivity 

1772 if callback_connectivity is not state.connectivity: 

1773 callbacks_needing_update.append(callback) 

1774 callback_and_connectivity[1] = state.connectivity 

1775 return callbacks_needing_update 

1776 

1777 

1778def _deliver( 

1779 state: _ChannelConnectivityState, 

1780 initial_connectivity: grpc.ChannelConnectivity, 

1781 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1782) -> None: 

1783 connectivity = initial_connectivity 

1784 callbacks = initial_callbacks 

1785 while True: 

1786 for callback in callbacks: 

1787 cygrpc.block_if_fork_in_progress(state) 

1788 try: 

1789 callback(connectivity) 

1790 except Exception: # pylint: disable=broad-except 

1791 _LOGGER.exception( 

1792 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE 

1793 ) 

1794 with state.lock: 

1795 callbacks = _deliveries(state) 

1796 if callbacks: 

1797 connectivity = state.connectivity 

1798 else: 

1799 state.delivering = False 

1800 return 

1801 

1802 

1803def _spawn_delivery( 

1804 state: _ChannelConnectivityState, 

1805 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1806) -> None: 

1807 delivering_thread = cygrpc.ForkManagedThread( 

1808 target=_deliver, 

1809 args=( 

1810 state, 

1811 state.connectivity, 

1812 callbacks, 

1813 ), 

1814 ) 

1815 delivering_thread.setDaemon(True) 

1816 delivering_thread.start() 

1817 state.delivering = True 

1818 

1819 

1820# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll. 

1821def _poll_connectivity( 

1822 state: _ChannelConnectivityState, 

1823 channel: grpc.Channel, 

1824 initial_try_to_connect: bool, 

1825) -> None: 

1826 try_to_connect = initial_try_to_connect 

1827 connectivity = channel.check_connectivity_state(try_to_connect) 

1828 with state.lock: 

1829 state.connectivity = ( 

1830 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1831 connectivity 

1832 ] 

1833 ) 

1834 callbacks = tuple( 

1835 callback for callback, _ in state.callbacks_and_connectivities 

1836 ) 

1837 for callback_and_connectivity in state.callbacks_and_connectivities: 

1838 callback_and_connectivity[1] = state.connectivity 

1839 if callbacks: 

1840 _spawn_delivery(state, callbacks) 

1841 while True: 

1842 event = channel.watch_connectivity_state( 

1843 connectivity, time.time() + 0.2 

1844 ) 

1845 cygrpc.block_if_fork_in_progress(state) 

1846 with state.lock: 

1847 if ( 

1848 not state.callbacks_and_connectivities 

1849 and not state.try_to_connect 

1850 ): 

1851 state.polling = False 

1852 state.connectivity = None 

1853 break 

1854 try_to_connect = state.try_to_connect 

1855 state.try_to_connect = False 

1856 if event.success or try_to_connect: 

1857 connectivity = channel.check_connectivity_state(try_to_connect) 

1858 with state.lock: 

1859 state.connectivity = ( 

1860 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1861 connectivity 

1862 ] 

1863 ) 

1864 if not state.delivering: 

1865 callbacks = _deliveries(state) 

1866 if callbacks: 

1867 _spawn_delivery(state, callbacks) 

1868 

1869 

1870def _subscribe( 

1871 state: _ChannelConnectivityState, 

1872 callback: Callable[[grpc.ChannelConnectivity], None], 

1873 try_to_connect: bool, 

1874) -> None: 

1875 with state.lock: 

1876 if not state.callbacks_and_connectivities and not state.polling: 

1877 polling_thread = cygrpc.ForkManagedThread( 

1878 target=_poll_connectivity, 

1879 args=(state, state.channel, bool(try_to_connect)), 

1880 ) 

1881 polling_thread.setDaemon(True) 

1882 polling_thread.start() 

1883 state.polling = True 

1884 state.callbacks_and_connectivities.append([callback, None]) 

1885 elif not state.delivering and state.connectivity is not None: 

1886 _spawn_delivery(state, (callback,)) 

1887 state.try_to_connect |= bool(try_to_connect) 

1888 state.callbacks_and_connectivities.append( 

1889 [callback, state.connectivity] 

1890 ) 

1891 else: 

1892 state.try_to_connect |= bool(try_to_connect) 

1893 state.callbacks_and_connectivities.append([callback, None]) 

1894 

1895 

1896def _unsubscribe( 

1897 state: _ChannelConnectivityState, 

1898 callback: Callable[[grpc.ChannelConnectivity], None], 

1899) -> None: 

1900 with state.lock: 

1901 for index, (subscribed_callback, unused_connectivity) in enumerate( 

1902 state.callbacks_and_connectivities 

1903 ): 

1904 if callback == subscribed_callback: 

1905 state.callbacks_and_connectivities.pop(index) 

1906 break 

1907 

1908 

1909def _augment_options( 

1910 base_options: Sequence[ChannelArgumentType], 

1911 compression: Optional[grpc.Compression], 

1912) -> Sequence[ChannelArgumentType]: 

1913 compression_option = _compression.create_channel_option(compression) 

1914 return ( 

1915 tuple(base_options) 

1916 + compression_option 

1917 + ( 

1918 ( 

1919 cygrpc.ChannelArgKey.primary_user_agent_string, 

1920 _USER_AGENT, 

1921 ), 

1922 ) 

1923 ) 

1924 

1925 

1926def _separate_channel_options( 

1927 options: Sequence[ChannelArgumentType], 

1928) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]: 

1929 """Separates core channel options from Python channel options.""" 

1930 core_options = [] 

1931 python_options = [] 

1932 for pair in options: 

1933 if ( 

1934 pair[0] 

1935 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

1936 ): 

1937 python_options.append(pair) 

1938 else: 

1939 core_options.append(pair) 

1940 return python_options, core_options 

1941 

1942 

1943class Channel(grpc.Channel): 

1944 """A cygrpc.Channel-backed implementation of grpc.Channel.""" 

1945 

1946 _single_threaded_unary_stream: bool 

1947 _channel: cygrpc.Channel 

1948 _call_state: _ChannelCallState 

1949 _connectivity_state: _ChannelConnectivityState 

1950 

1951 def __init__( 

1952 self, 

1953 target: str, 

1954 options: Sequence[ChannelArgumentType], 

1955 credentials: Optional[grpc.ChannelCredentials], 

1956 compression: Optional[grpc.Compression], 

1957 ): 

1958 """Constructor. 

1959 

1960 Args: 

1961 target: The target to which to connect. 

1962 options: Configuration options for the channel. 

1963 credentials: A cygrpc.ChannelCredentials or None. 

1964 compression: An optional value indicating the compression method to be 

1965 used over the lifetime of the channel. 

1966 """ 

1967 python_options, core_options = _separate_channel_options(options) 

1968 self._single_threaded_unary_stream = ( 

1969 _DEFAULT_SINGLE_THREADED_UNARY_STREAM 

1970 ) 

1971 self._process_python_options(python_options) 

1972 self._channel = cygrpc.Channel( 

1973 _common.encode(target), 

1974 _augment_options(core_options, compression), 

1975 credentials, 

1976 ) 

1977 self._call_state = _ChannelCallState(self._channel) 

1978 self._connectivity_state = _ChannelConnectivityState(self._channel) 

1979 cygrpc.fork_register_channel(self) 

1980 if cygrpc.g_gevent_activated: 

1981 cygrpc.gevent_increment_channel_count() 

1982 

1983 def _process_python_options( 

1984 self, python_options: Sequence[ChannelArgumentType] 

1985 ) -> None: 

1986 """Sets channel attributes according to python-only channel options.""" 

1987 for pair in python_options: 

1988 if ( 

1989 pair[0] 

1990 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

1991 ): 

1992 self._single_threaded_unary_stream = True 

1993 

1994 def subscribe( 

1995 self, 

1996 callback: Callable[[grpc.ChannelConnectivity], None], 

1997 try_to_connect: Optional[bool] = None, 

1998 ) -> None: 

1999 _subscribe(self._connectivity_state, callback, try_to_connect) 

2000 

2001 def unsubscribe( 

2002 self, callback: Callable[[grpc.ChannelConnectivity], None] 

2003 ) -> None: 

2004 _unsubscribe(self._connectivity_state, callback) 

2005 

2006 def unary_unary( 

2007 self, 

2008 method: str, 

2009 request_serializer: Optional[SerializingFunction] = None, 

2010 response_deserializer: Optional[DeserializingFunction] = None, 

2011 ) -> grpc.UnaryUnaryMultiCallable: 

2012 return _UnaryUnaryMultiCallable( 

2013 self._channel, 

2014 _channel_managed_call_management(self._call_state), 

2015 _common.encode(method), 

2016 request_serializer, 

2017 response_deserializer, 

2018 ) 

2019 

2020 def unary_stream( 

2021 self, 

2022 method: str, 

2023 request_serializer: Optional[SerializingFunction] = None, 

2024 response_deserializer: Optional[DeserializingFunction] = None, 

2025 ) -> grpc.UnaryStreamMultiCallable: 

2026 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC 

2027 # on a single Python thread results in an appreciable speed-up. However, 

2028 # due to slight differences in capability, the multi-threaded variant 

2029 # remains the default. 

2030 if self._single_threaded_unary_stream: 

2031 return _SingleThreadedUnaryStreamMultiCallable( 

2032 self._channel, 

2033 _common.encode(method), 

2034 request_serializer, 

2035 response_deserializer, 

2036 ) 

2037 else: 

2038 return _UnaryStreamMultiCallable( 

2039 self._channel, 

2040 _channel_managed_call_management(self._call_state), 

2041 _common.encode(method), 

2042 request_serializer, 

2043 response_deserializer, 

2044 ) 

2045 

2046 def stream_unary( 

2047 self, 

2048 method: str, 

2049 request_serializer: Optional[SerializingFunction] = None, 

2050 response_deserializer: Optional[DeserializingFunction] = None, 

2051 ) -> grpc.StreamUnaryMultiCallable: 

2052 return _StreamUnaryMultiCallable( 

2053 self._channel, 

2054 _channel_managed_call_management(self._call_state), 

2055 _common.encode(method), 

2056 request_serializer, 

2057 response_deserializer, 

2058 ) 

2059 

2060 def stream_stream( 

2061 self, 

2062 method: str, 

2063 request_serializer: Optional[SerializingFunction] = None, 

2064 response_deserializer: Optional[DeserializingFunction] = None, 

2065 ) -> grpc.StreamStreamMultiCallable: 

2066 return _StreamStreamMultiCallable( 

2067 self._channel, 

2068 _channel_managed_call_management(self._call_state), 

2069 _common.encode(method), 

2070 request_serializer, 

2071 response_deserializer, 

2072 ) 

2073 

2074 def _unsubscribe_all(self) -> None: 

2075 state = self._connectivity_state 

2076 if state: 

2077 with state.lock: 

2078 del state.callbacks_and_connectivities[:] 

2079 

2080 def _close(self) -> None: 

2081 self._unsubscribe_all() 

2082 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!") 

2083 cygrpc.fork_unregister_channel(self) 

2084 if cygrpc.g_gevent_activated: 

2085 cygrpc.gevent_decrement_channel_count() 

2086 

2087 def _close_on_fork(self) -> None: 

2088 self._unsubscribe_all() 

2089 self._channel.close_on_fork( 

2090 cygrpc.StatusCode.cancelled, "Channel closed due to fork" 

2091 ) 

2092 

2093 def __enter__(self): 

2094 return self 

2095 

2096 def __exit__(self, exc_type, exc_val, exc_tb): 

2097 self._close() 

2098 return False 

2099 

2100 def close(self) -> None: 

2101 self._close() 

2102 

2103 def __del__(self): 

2104 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases 

2105 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call 

2106 # here (or more likely, call self._close() here). We don't do this today 

2107 # because many valid use cases today allow the channel to be deleted 

2108 # immediately after stubs are created. After a sufficient period of time 

2109 # has passed for all users to be trusted to freeze out to their channels 

2110 # for as long as they are in use and to close them after using them, 

2111 # then deletion of this grpc._channel.Channel instance can be made to 

2112 # effect closure of the underlying cygrpc.Channel instance. 

2113 try: 

2114 self._unsubscribe_all() 

2115 except: # pylint: disable=bare-except 

2116 # Exceptions in __del__ are ignored by Python anyway, but they can 

2117 # keep spamming logs. Just silence them. 

2118 pass