Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_channel.py: 31%

917 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:37 +0000

1# Copyright 2016 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Python.""" 

15 

16import copy 

17from datetime import datetime 

18import functools 

19import logging 

20import os 

21import sys 

22import threading 

23import time 

24import types 

25from typing import ( 

26 Any, 

27 Callable, 

28 Iterator, 

29 List, 

30 Optional, 

31 Sequence, 

32 Set, 

33 Tuple, 

34 Union, 

35) 

36 

37import grpc # pytype: disable=pyi-error 

38from grpc import _common # pytype: disable=pyi-error 

39from grpc import _compression # pytype: disable=pyi-error 

40from grpc import _grpcio_metadata # pytype: disable=pyi-error 

41from grpc import _observability # pytype: disable=pyi-error 

42from grpc._cython import cygrpc 

43from grpc._typing import ChannelArgumentType 

44from grpc._typing import DeserializingFunction 

45from grpc._typing import IntegratedCallFactory 

46from grpc._typing import MetadataType 

47from grpc._typing import NullaryCallbackType 

48from grpc._typing import ResponseType 

49from grpc._typing import SerializingFunction 

50from grpc._typing import UserTag 

51import grpc.experimental # pytype: disable=pyi-error 

52 

53_LOGGER = logging.getLogger(__name__) 

54 

55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__) 

56 

57_EMPTY_FLAGS = 0 

58 

59# NOTE(rbellevi): No guarantees are given about the maintenance of this 

60# environment variable. 

61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = ( 

62 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None 

63) 

64 

65_UNARY_UNARY_INITIAL_DUE = ( 

66 cygrpc.OperationType.send_initial_metadata, 

67 cygrpc.OperationType.send_message, 

68 cygrpc.OperationType.send_close_from_client, 

69 cygrpc.OperationType.receive_initial_metadata, 

70 cygrpc.OperationType.receive_message, 

71 cygrpc.OperationType.receive_status_on_client, 

72) 

73_UNARY_STREAM_INITIAL_DUE = ( 

74 cygrpc.OperationType.send_initial_metadata, 

75 cygrpc.OperationType.send_message, 

76 cygrpc.OperationType.send_close_from_client, 

77 cygrpc.OperationType.receive_initial_metadata, 

78 cygrpc.OperationType.receive_status_on_client, 

79) 

80_STREAM_UNARY_INITIAL_DUE = ( 

81 cygrpc.OperationType.send_initial_metadata, 

82 cygrpc.OperationType.receive_initial_metadata, 

83 cygrpc.OperationType.receive_message, 

84 cygrpc.OperationType.receive_status_on_client, 

85) 

86_STREAM_STREAM_INITIAL_DUE = ( 

87 cygrpc.OperationType.send_initial_metadata, 

88 cygrpc.OperationType.receive_initial_metadata, 

89 cygrpc.OperationType.receive_status_on_client, 

90) 

91 

92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 

93 "Exception calling channel subscription callback!" 

94) 

95 

96_OK_RENDEZVOUS_REPR_FORMAT = ( 

97 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' 

98) 

99 

100_NON_OK_RENDEZVOUS_REPR_FORMAT = ( 

101 "<{} of RPC that terminated with:\n" 

102 "\tstatus = {}\n" 

103 '\tdetails = "{}"\n' 

104 '\tdebug_error_string = "{}"\n' 

105 ">" 

106) 

107 

108 

109def _deadline(timeout: Optional[float]) -> Optional[float]: 

110 return None if timeout is None else time.time() + timeout 

111 

112 

113def _unknown_code_details( 

114 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str] 

115) -> str: 

116 return 'Server sent unknown code {} and details "{}"'.format( 

117 unknown_cygrpc_code, details 

118 ) 

119 

120 

121class _RPCState(object): 

122 condition: threading.Condition 

123 due: Set[cygrpc.OperationType] 

124 initial_metadata: Optional[MetadataType] 

125 response: Any 

126 trailing_metadata: Optional[MetadataType] 

127 code: Optional[grpc.StatusCode] 

128 details: Optional[str] 

129 debug_error_string: Optional[str] 

130 cancelled: bool 

131 callbacks: List[NullaryCallbackType] 

132 fork_epoch: Optional[int] 

133 rpc_start_time: Optional[datetime] 

134 rpc_end_time: Optional[datetime] 

135 method: Optional[str] 

136 

137 def __init__( 

138 self, 

139 due: Sequence[cygrpc.OperationType], 

140 initial_metadata: Optional[MetadataType], 

141 trailing_metadata: Optional[MetadataType], 

142 code: Optional[grpc.StatusCode], 

143 details: Optional[str], 

144 ): 

145 # `condition` guards all members of _RPCState. `notify_all` is called on 

146 # `condition` when the state of the RPC has changed. 

147 self.condition = threading.Condition() 

148 

149 # The cygrpc.OperationType objects representing events due from the RPC's 

150 # completion queue. If an operation is in `due`, it is guaranteed that 

151 # `operate()` has been called on a corresponding operation. But the 

152 # converse is not true. That is, in the case of failed `operate()` 

153 # calls, there may briefly be events in `due` that do not correspond to 

154 # operations submitted to Core. 

155 self.due = set(due) 

156 self.initial_metadata = initial_metadata 

157 self.response = None 

158 self.trailing_metadata = trailing_metadata 

159 self.code = code 

160 self.details = details 

161 self.debug_error_string = None 

162 # The following three fields are used for observability. 

163 # Updates to those fields do not trigger self.condition. 

164 self.rpc_start_time = None 

165 self.rpc_end_time = None 

166 self.method = None 

167 

168 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are 

169 # slightly wonky, so they have to be tracked separately from the rest of the 

170 # result of the RPC. This field tracks whether cancellation was requested 

171 # prior to termination of the RPC. 

172 self.cancelled = False 

173 self.callbacks = [] 

174 self.fork_epoch = cygrpc.get_fork_epoch() 

175 

176 def reset_postfork_child(self): 

177 self.condition = threading.Condition() 

178 

179 

180def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None: 

181 if state.code is None: 

182 state.code = code 

183 state.details = details 

184 if state.initial_metadata is None: 

185 state.initial_metadata = () 

186 state.trailing_metadata = () 

187 

188 

189def _handle_event( 

190 event: cygrpc.BaseEvent, 

191 state: _RPCState, 

192 response_deserializer: Optional[DeserializingFunction], 

193) -> List[NullaryCallbackType]: 

194 callbacks = [] 

195 for batch_operation in event.batch_operations: 

196 operation_type = batch_operation.type() 

197 state.due.remove(operation_type) 

198 if operation_type == cygrpc.OperationType.receive_initial_metadata: 

199 state.initial_metadata = batch_operation.initial_metadata() 

200 elif operation_type == cygrpc.OperationType.receive_message: 

201 serialized_response = batch_operation.message() 

202 if serialized_response is not None: 

203 response = _common.deserialize( 

204 serialized_response, response_deserializer 

205 ) 

206 if response is None: 

207 details = "Exception deserializing response!" 

208 _abort(state, grpc.StatusCode.INTERNAL, details) 

209 else: 

210 state.response = response 

211 elif operation_type == cygrpc.OperationType.receive_status_on_client: 

212 state.trailing_metadata = batch_operation.trailing_metadata() 

213 if state.code is None: 

214 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( 

215 batch_operation.code() 

216 ) 

217 if code is None: 

218 state.code = grpc.StatusCode.UNKNOWN 

219 state.details = _unknown_code_details( 

220 code, batch_operation.details() 

221 ) 

222 else: 

223 state.code = code 

224 state.details = batch_operation.details() 

225 state.debug_error_string = batch_operation.error_string() 

226 state.rpc_end_time = datetime.utcnow() 

227 _observability.maybe_record_rpc_latency(state) 

228 callbacks.extend(state.callbacks) 

229 state.callbacks = None 

230 return callbacks 

231 

232 

233def _event_handler( 

234 state: _RPCState, response_deserializer: Optional[DeserializingFunction] 

235) -> UserTag: 

236 def handle_event(event): 

237 with state.condition: 

238 callbacks = _handle_event(event, state, response_deserializer) 

239 state.condition.notify_all() 

240 done = not state.due 

241 for callback in callbacks: 

242 try: 

243 callback() 

244 except Exception as e: # pylint: disable=broad-except 

245 # NOTE(rbellevi): We suppress but log errors here so as not to 

246 # kill the channel spin thread. 

247 logging.error( 

248 "Exception in callback %s: %s", repr(callback.func), repr(e) 

249 ) 

250 return done and state.fork_epoch >= cygrpc.get_fork_epoch() 

251 

252 return handle_event 

253 

254 

255# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall. 

256# pylint: disable=too-many-statements 

257def _consume_request_iterator( 

258 request_iterator: Iterator, 

259 state: _RPCState, 

260 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall], 

261 request_serializer: SerializingFunction, 

262 event_handler: Optional[UserTag], 

263) -> None: 

264 """Consume a request supplied by the user.""" 

265 

266 def consume_request_iterator(): # pylint: disable=too-many-branches 

267 # Iterate over the request iterator until it is exhausted or an error 

268 # condition is encountered. 

269 while True: 

270 return_from_user_request_generator_invoked = False 

271 try: 

272 # The thread may die in user-code. Do not block fork for this. 

273 cygrpc.enter_user_request_generator() 

274 request = next(request_iterator) 

275 except StopIteration: 

276 break 

277 except Exception: # pylint: disable=broad-except 

278 cygrpc.return_from_user_request_generator() 

279 return_from_user_request_generator_invoked = True 

280 code = grpc.StatusCode.UNKNOWN 

281 details = "Exception iterating requests!" 

282 _LOGGER.exception(details) 

283 call.cancel( 

284 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

285 ) 

286 _abort(state, code, details) 

287 return 

288 finally: 

289 if not return_from_user_request_generator_invoked: 

290 cygrpc.return_from_user_request_generator() 

291 serialized_request = _common.serialize(request, request_serializer) 

292 with state.condition: 

293 if state.code is None and not state.cancelled: 

294 if serialized_request is None: 

295 code = grpc.StatusCode.INTERNAL 

296 details = "Exception serializing request!" 

297 call.cancel( 

298 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 

299 details, 

300 ) 

301 _abort(state, code, details) 

302 return 

303 else: 

304 state.due.add(cygrpc.OperationType.send_message) 

305 operations = ( 

306 cygrpc.SendMessageOperation( 

307 serialized_request, _EMPTY_FLAGS 

308 ), 

309 ) 

310 operating = call.operate(operations, event_handler) 

311 if not operating: 

312 state.due.remove(cygrpc.OperationType.send_message) 

313 return 

314 

315 def _done(): 

316 return ( 

317 state.code is not None 

318 or cygrpc.OperationType.send_message 

319 not in state.due 

320 ) 

321 

322 _common.wait( 

323 state.condition.wait, 

324 _done, 

325 spin_cb=functools.partial( 

326 cygrpc.block_if_fork_in_progress, state 

327 ), 

328 ) 

329 if state.code is not None: 

330 return 

331 else: 

332 return 

333 with state.condition: 

334 if state.code is None: 

335 state.due.add(cygrpc.OperationType.send_close_from_client) 

336 operations = ( 

337 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

338 ) 

339 operating = call.operate(operations, event_handler) 

340 if not operating: 

341 state.due.remove( 

342 cygrpc.OperationType.send_close_from_client 

343 ) 

344 

345 consumption_thread = cygrpc.ForkManagedThread( 

346 target=consume_request_iterator 

347 ) 

348 consumption_thread.setDaemon(True) 

349 consumption_thread.start() 

350 

351 

352def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str: 

353 """Calculates error string for RPC.""" 

354 with rpc_state.condition: 

355 if rpc_state.code is None: 

356 return "<{} object>".format(class_name) 

357 elif rpc_state.code is grpc.StatusCode.OK: 

358 return _OK_RENDEZVOUS_REPR_FORMAT.format( 

359 class_name, rpc_state.code, rpc_state.details 

360 ) 

361 else: 

362 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( 

363 class_name, 

364 rpc_state.code, 

365 rpc_state.details, 

366 rpc_state.debug_error_string, 

367 ) 

368 

369 

370class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future): 

371 """An RPC error not tied to the execution of a particular RPC. 

372 

373 The RPC represented by the state object must not be in-progress or 

374 cancelled. 

375 

376 Attributes: 

377 _state: An instance of _RPCState. 

378 """ 

379 

380 _state: _RPCState 

381 

382 def __init__(self, state: _RPCState): 

383 with state.condition: 

384 self._state = _RPCState( 

385 (), 

386 copy.deepcopy(state.initial_metadata), 

387 copy.deepcopy(state.trailing_metadata), 

388 state.code, 

389 copy.deepcopy(state.details), 

390 ) 

391 self._state.response = copy.copy(state.response) 

392 self._state.debug_error_string = copy.copy(state.debug_error_string) 

393 

394 def initial_metadata(self) -> Optional[MetadataType]: 

395 return self._state.initial_metadata 

396 

397 def trailing_metadata(self) -> Optional[MetadataType]: 

398 return self._state.trailing_metadata 

399 

400 def code(self) -> Optional[grpc.StatusCode]: 

401 return self._state.code 

402 

403 def details(self) -> Optional[str]: 

404 return _common.decode(self._state.details) 

405 

406 def debug_error_string(self) -> Optional[str]: 

407 return _common.decode(self._state.debug_error_string) 

408 

409 def _repr(self) -> str: 

410 return _rpc_state_string(self.__class__.__name__, self._state) 

411 

412 def __repr__(self) -> str: 

413 return self._repr() 

414 

415 def __str__(self) -> str: 

416 return self._repr() 

417 

418 def cancel(self) -> bool: 

419 """See grpc.Future.cancel.""" 

420 return False 

421 

422 def cancelled(self) -> bool: 

423 """See grpc.Future.cancelled.""" 

424 return False 

425 

426 def running(self) -> bool: 

427 """See grpc.Future.running.""" 

428 return False 

429 

430 def done(self) -> bool: 

431 """See grpc.Future.done.""" 

432 return True 

433 

434 def result( 

435 self, timeout: Optional[float] = None 

436 ) -> Any: # pylint: disable=unused-argument 

437 """See grpc.Future.result.""" 

438 raise self 

439 

440 def exception( 

441 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

442 ) -> Optional[Exception]: 

443 """See grpc.Future.exception.""" 

444 return self 

445 

446 def traceback( 

447 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

448 ) -> Optional[types.TracebackType]: 

449 """See grpc.Future.traceback.""" 

450 try: 

451 raise self 

452 except grpc.RpcError: 

453 return sys.exc_info()[2] 

454 

455 def add_done_callback( 

456 self, 

457 fn: Callable[[grpc.Future], None], 

458 timeout: Optional[float] = None, # pylint: disable=unused-argument 

459 ) -> None: 

460 """See grpc.Future.add_done_callback.""" 

461 fn(self) 

462 

463 

464class _Rendezvous(grpc.RpcError, grpc.RpcContext): 

465 """An RPC iterator. 

466 

467 Attributes: 

468 _state: An instance of _RPCState. 

469 _call: An instance of SegregatedCall or IntegratedCall. 

470 In either case, the _call object is expected to have operate, cancel, 

471 and next_event methods. 

472 _response_deserializer: A callable taking bytes and return a Python 

473 object. 

474 _deadline: A float representing the deadline of the RPC in seconds. Or 

475 possibly None, to represent an RPC with no deadline at all. 

476 """ 

477 

478 _state: _RPCState 

479 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall] 

480 _response_deserializer: Optional[DeserializingFunction] 

481 _deadline: Optional[float] 

482 

483 def __init__( 

484 self, 

485 state: _RPCState, 

486 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall], 

487 response_deserializer: Optional[DeserializingFunction], 

488 deadline: Optional[float], 

489 ): 

490 super(_Rendezvous, self).__init__() 

491 self._state = state 

492 self._call = call 

493 self._response_deserializer = response_deserializer 

494 self._deadline = deadline 

495 

496 def is_active(self) -> bool: 

497 """See grpc.RpcContext.is_active""" 

498 with self._state.condition: 

499 return self._state.code is None 

500 

501 def time_remaining(self) -> Optional[float]: 

502 """See grpc.RpcContext.time_remaining""" 

503 with self._state.condition: 

504 if self._deadline is None: 

505 return None 

506 else: 

507 return max(self._deadline - time.time(), 0) 

508 

509 def cancel(self) -> bool: 

510 """See grpc.RpcContext.cancel""" 

511 with self._state.condition: 

512 if self._state.code is None: 

513 code = grpc.StatusCode.CANCELLED 

514 details = "Locally cancelled by application!" 

515 self._call.cancel( 

516 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

517 ) 

518 self._state.cancelled = True 

519 _abort(self._state, code, details) 

520 self._state.condition.notify_all() 

521 return True 

522 else: 

523 return False 

524 

525 def add_callback(self, callback: NullaryCallbackType) -> bool: 

526 """See grpc.RpcContext.add_callback""" 

527 with self._state.condition: 

528 if self._state.callbacks is None: 

529 return False 

530 else: 

531 self._state.callbacks.append(callback) 

532 return True 

533 

534 def __iter__(self): 

535 return self 

536 

537 def next(self): 

538 return self._next() 

539 

540 def __next__(self): 

541 return self._next() 

542 

543 def _next(self): 

544 raise NotImplementedError() 

545 

546 def debug_error_string(self) -> Optional[str]: 

547 raise NotImplementedError() 

548 

549 def _repr(self) -> str: 

550 return _rpc_state_string(self.__class__.__name__, self._state) 

551 

552 def __repr__(self) -> str: 

553 return self._repr() 

554 

555 def __str__(self) -> str: 

556 return self._repr() 

557 

558 def __del__(self) -> None: 

559 with self._state.condition: 

560 if self._state.code is None: 

561 self._state.code = grpc.StatusCode.CANCELLED 

562 self._state.details = "Cancelled upon garbage collection!" 

563 self._state.cancelled = True 

564 self._call.cancel( 

565 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], 

566 self._state.details, 

567 ) 

568 self._state.condition.notify_all() 

569 

570 

571class _SingleThreadedRendezvous( 

572 _Rendezvous, grpc.Call, grpc.Future 

573): # pylint: disable=too-many-ancestors 

574 """An RPC iterator operating entirely on a single thread. 

575 

576 The __next__ method of _SingleThreadedRendezvous does not depend on the 

577 existence of any other thread, including the "channel spin thread". 

578 However, this means that its interface is entirely synchronous. So this 

579 class cannot completely fulfill the grpc.Future interface. The result, 

580 exception, and traceback methods will never block and will instead raise 

581 an exception if calling the method would result in blocking. 

582 

583 This means that these methods are safe to call from add_done_callback 

584 handlers. 

585 """ 

586 

587 _state: _RPCState 

588 

589 def _is_complete(self) -> bool: 

590 return self._state.code is not None 

591 

592 def cancelled(self) -> bool: 

593 with self._state.condition: 

594 return self._state.cancelled 

595 

596 def running(self) -> bool: 

597 with self._state.condition: 

598 return self._state.code is None 

599 

600 def done(self) -> bool: 

601 with self._state.condition: 

602 return self._state.code is not None 

603 

604 def result(self, timeout: Optional[float] = None) -> Any: 

605 """Returns the result of the computation or raises its exception. 

606 

607 This method will never block. Instead, it will raise an exception 

608 if calling this method would otherwise result in blocking. 

609 

610 Since this method will never block, any `timeout` argument passed will 

611 be ignored. 

612 """ 

613 del timeout 

614 with self._state.condition: 

615 if not self._is_complete(): 

616 raise grpc.experimental.UsageError( 

617 "_SingleThreadedRendezvous only supports result() when the" 

618 " RPC is complete." 

619 ) 

620 if self._state.code is grpc.StatusCode.OK: 

621 return self._state.response 

622 elif self._state.cancelled: 

623 raise grpc.FutureCancelledError() 

624 else: 

625 raise self 

626 

627 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

628 """Return the exception raised by the computation. 

629 

630 This method will never block. Instead, it will raise an exception 

631 if calling this method would otherwise result in blocking. 

632 

633 Since this method will never block, any `timeout` argument passed will 

634 be ignored. 

635 """ 

636 del timeout 

637 with self._state.condition: 

638 if not self._is_complete(): 

639 raise grpc.experimental.UsageError( 

640 "_SingleThreadedRendezvous only supports exception() when" 

641 " the RPC is complete." 

642 ) 

643 if self._state.code is grpc.StatusCode.OK: 

644 return None 

645 elif self._state.cancelled: 

646 raise grpc.FutureCancelledError() 

647 else: 

648 return self 

649 

650 def traceback( 

651 self, timeout: Optional[float] = None 

652 ) -> Optional[types.TracebackType]: 

653 """Access the traceback of the exception raised by the computation. 

654 

655 This method will never block. Instead, it will raise an exception 

656 if calling this method would otherwise result in blocking. 

657 

658 Since this method will never block, any `timeout` argument passed will 

659 be ignored. 

660 """ 

661 del timeout 

662 with self._state.condition: 

663 if not self._is_complete(): 

664 raise grpc.experimental.UsageError( 

665 "_SingleThreadedRendezvous only supports traceback() when" 

666 " the RPC is complete." 

667 ) 

668 if self._state.code is grpc.StatusCode.OK: 

669 return None 

670 elif self._state.cancelled: 

671 raise grpc.FutureCancelledError() 

672 else: 

673 try: 

674 raise self 

675 except grpc.RpcError: 

676 return sys.exc_info()[2] 

677 

678 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

679 with self._state.condition: 

680 if self._state.code is None: 

681 self._state.callbacks.append(functools.partial(fn, self)) 

682 return 

683 

684 fn(self) 

685 

686 def initial_metadata(self) -> Optional[MetadataType]: 

687 """See grpc.Call.initial_metadata""" 

688 with self._state.condition: 

689 # NOTE(gnossen): Based on our initial call batch, we are guaranteed 

690 # to receive initial metadata before any messages. 

691 while self._state.initial_metadata is None: 

692 self._consume_next_event() 

693 return self._state.initial_metadata 

694 

695 def trailing_metadata(self) -> Optional[MetadataType]: 

696 """See grpc.Call.trailing_metadata""" 

697 with self._state.condition: 

698 if self._state.trailing_metadata is None: 

699 raise grpc.experimental.UsageError( 

700 "Cannot get trailing metadata until RPC is completed." 

701 ) 

702 return self._state.trailing_metadata 

703 

704 def code(self) -> Optional[grpc.StatusCode]: 

705 """See grpc.Call.code""" 

706 with self._state.condition: 

707 if self._state.code is None: 

708 raise grpc.experimental.UsageError( 

709 "Cannot get code until RPC is completed." 

710 ) 

711 return self._state.code 

712 

713 def details(self) -> Optional[str]: 

714 """See grpc.Call.details""" 

715 with self._state.condition: 

716 if self._state.details is None: 

717 raise grpc.experimental.UsageError( 

718 "Cannot get details until RPC is completed." 

719 ) 

720 return _common.decode(self._state.details) 

721 

722 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]: 

723 event = self._call.next_event() 

724 with self._state.condition: 

725 callbacks = _handle_event( 

726 event, self._state, self._response_deserializer 

727 ) 

728 for callback in callbacks: 

729 # NOTE(gnossen): We intentionally allow exceptions to bubble up 

730 # to the user when running on a single thread. 

731 callback() 

732 return event 

733 

734 def _next_response(self) -> Any: 

735 while True: 

736 self._consume_next_event() 

737 with self._state.condition: 

738 if self._state.response is not None: 

739 response = self._state.response 

740 self._state.response = None 

741 return response 

742 elif ( 

743 cygrpc.OperationType.receive_message not in self._state.due 

744 ): 

745 if self._state.code is grpc.StatusCode.OK: 

746 raise StopIteration() 

747 elif self._state.code is not None: 

748 raise self 

749 

750 def _next(self) -> Any: 

751 with self._state.condition: 

752 if self._state.code is None: 

753 # We tentatively add the operation as expected and remove 

754 # it if the enqueue operation fails. This allows us to guarantee that 

755 # if an event has been submitted to the core completion queue, 

756 # it is in `due`. If we waited until after a successful 

757 # enqueue operation then a signal could interrupt this 

758 # thread between the enqueue operation and the addition of the 

759 # operation to `due`. This would cause an exception on the 

760 # channel spin thread when the operation completes and no 

761 # corresponding operation would be present in state.due. 

762 # Note that, since `condition` is held through this block, there is 

763 # no data race on `due`. 

764 self._state.due.add(cygrpc.OperationType.receive_message) 

765 operating = self._call.operate( 

766 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None 

767 ) 

768 if not operating: 

769 self._state.due.remove(cygrpc.OperationType.receive_message) 

770 elif self._state.code is grpc.StatusCode.OK: 

771 raise StopIteration() 

772 else: 

773 raise self 

774 return self._next_response() 

775 

776 def debug_error_string(self) -> Optional[str]: 

777 with self._state.condition: 

778 if self._state.debug_error_string is None: 

779 raise grpc.experimental.UsageError( 

780 "Cannot get debug error string until RPC is completed." 

781 ) 

782 return _common.decode(self._state.debug_error_string) 

783 

784 

785class _MultiThreadedRendezvous( 

786 _Rendezvous, grpc.Call, grpc.Future 

787): # pylint: disable=too-many-ancestors 

788 """An RPC iterator that depends on a channel spin thread. 

789 

790 This iterator relies upon a per-channel thread running in the background, 

791 dequeueing events from the completion queue, and notifying threads waiting 

792 on the threading.Condition object in the _RPCState object. 

793 

794 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface 

795 and to mediate a bidirection streaming RPC. 

796 """ 

797 

798 _state: _RPCState 

799 

800 def initial_metadata(self) -> Optional[MetadataType]: 

801 """See grpc.Call.initial_metadata""" 

802 with self._state.condition: 

803 

804 def _done(): 

805 return self._state.initial_metadata is not None 

806 

807 _common.wait(self._state.condition.wait, _done) 

808 return self._state.initial_metadata 

809 

810 def trailing_metadata(self) -> Optional[MetadataType]: 

811 """See grpc.Call.trailing_metadata""" 

812 with self._state.condition: 

813 

814 def _done(): 

815 return self._state.trailing_metadata is not None 

816 

817 _common.wait(self._state.condition.wait, _done) 

818 return self._state.trailing_metadata 

819 

820 def code(self) -> Optional[grpc.StatusCode]: 

821 """See grpc.Call.code""" 

822 with self._state.condition: 

823 

824 def _done(): 

825 return self._state.code is not None 

826 

827 _common.wait(self._state.condition.wait, _done) 

828 return self._state.code 

829 

830 def details(self) -> Optional[str]: 

831 """See grpc.Call.details""" 

832 with self._state.condition: 

833 

834 def _done(): 

835 return self._state.details is not None 

836 

837 _common.wait(self._state.condition.wait, _done) 

838 return _common.decode(self._state.details) 

839 

840 def debug_error_string(self) -> Optional[str]: 

841 with self._state.condition: 

842 

843 def _done(): 

844 return self._state.debug_error_string is not None 

845 

846 _common.wait(self._state.condition.wait, _done) 

847 return _common.decode(self._state.debug_error_string) 

848 

849 def cancelled(self) -> bool: 

850 with self._state.condition: 

851 return self._state.cancelled 

852 

853 def running(self) -> bool: 

854 with self._state.condition: 

855 return self._state.code is None 

856 

857 def done(self) -> bool: 

858 with self._state.condition: 

859 return self._state.code is not None 

860 

861 def _is_complete(self) -> bool: 

862 return self._state.code is not None 

863 

864 def result(self, timeout: Optional[float] = None) -> Any: 

865 """Returns the result of the computation or raises its exception. 

866 

867 See grpc.Future.result for the full API contract. 

868 """ 

869 with self._state.condition: 

870 timed_out = _common.wait( 

871 self._state.condition.wait, self._is_complete, timeout=timeout 

872 ) 

873 if timed_out: 

874 raise grpc.FutureTimeoutError() 

875 else: 

876 if self._state.code is grpc.StatusCode.OK: 

877 return self._state.response 

878 elif self._state.cancelled: 

879 raise grpc.FutureCancelledError() 

880 else: 

881 raise self 

882 

883 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

884 """Return the exception raised by the computation. 

885 

886 See grpc.Future.exception for the full API contract. 

887 """ 

888 with self._state.condition: 

889 timed_out = _common.wait( 

890 self._state.condition.wait, self._is_complete, timeout=timeout 

891 ) 

892 if timed_out: 

893 raise grpc.FutureTimeoutError() 

894 else: 

895 if self._state.code is grpc.StatusCode.OK: 

896 return None 

897 elif self._state.cancelled: 

898 raise grpc.FutureCancelledError() 

899 else: 

900 return self 

901 

902 def traceback( 

903 self, timeout: Optional[float] = None 

904 ) -> Optional[types.TracebackType]: 

905 """Access the traceback of the exception raised by the computation. 

906 

907 See grpc.future.traceback for the full API contract. 

908 """ 

909 with self._state.condition: 

910 timed_out = _common.wait( 

911 self._state.condition.wait, self._is_complete, timeout=timeout 

912 ) 

913 if timed_out: 

914 raise grpc.FutureTimeoutError() 

915 else: 

916 if self._state.code is grpc.StatusCode.OK: 

917 return None 

918 elif self._state.cancelled: 

919 raise grpc.FutureCancelledError() 

920 else: 

921 try: 

922 raise self 

923 except grpc.RpcError: 

924 return sys.exc_info()[2] 

925 

926 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

927 with self._state.condition: 

928 if self._state.code is None: 

929 self._state.callbacks.append(functools.partial(fn, self)) 

930 return 

931 

932 fn(self) 

933 

934 def _next(self) -> Any: 

935 with self._state.condition: 

936 if self._state.code is None: 

937 event_handler = _event_handler( 

938 self._state, self._response_deserializer 

939 ) 

940 self._state.due.add(cygrpc.OperationType.receive_message) 

941 operating = self._call.operate( 

942 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 

943 event_handler, 

944 ) 

945 if not operating: 

946 self._state.due.remove(cygrpc.OperationType.receive_message) 

947 elif self._state.code is grpc.StatusCode.OK: 

948 raise StopIteration() 

949 else: 

950 raise self 

951 

952 def _response_ready(): 

953 return self._state.response is not None or ( 

954 cygrpc.OperationType.receive_message not in self._state.due 

955 and self._state.code is not None 

956 ) 

957 

958 _common.wait(self._state.condition.wait, _response_ready) 

959 if self._state.response is not None: 

960 response = self._state.response 

961 self._state.response = None 

962 return response 

963 elif cygrpc.OperationType.receive_message not in self._state.due: 

964 if self._state.code is grpc.StatusCode.OK: 

965 raise StopIteration() 

966 elif self._state.code is not None: 

967 raise self 

968 

969 

970def _start_unary_request( 

971 request: Any, 

972 timeout: Optional[float], 

973 request_serializer: SerializingFunction, 

974) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]: 

975 deadline = _deadline(timeout) 

976 serialized_request = _common.serialize(request, request_serializer) 

977 if serialized_request is None: 

978 state = _RPCState( 

979 (), 

980 (), 

981 (), 

982 grpc.StatusCode.INTERNAL, 

983 "Exception serializing request!", 

984 ) 

985 error = _InactiveRpcError(state) 

986 return deadline, None, error 

987 else: 

988 return deadline, serialized_request, None 

989 

990 

991def _end_unary_response_blocking( 

992 state: _RPCState, 

993 call: cygrpc.SegregatedCall, 

994 with_call: bool, 

995 deadline: Optional[float], 

996) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]: 

997 if state.code is grpc.StatusCode.OK: 

998 if with_call: 

999 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline) 

1000 return state.response, rendezvous 

1001 else: 

1002 return state.response 

1003 else: 

1004 raise _InactiveRpcError(state) # pytype: disable=not-instantiable 

1005 

1006 

1007def _stream_unary_invocation_operations( 

1008 metadata: Optional[MetadataType], initial_metadata_flags: int 

1009) -> Sequence[Sequence[cygrpc.Operation]]: 

1010 return ( 

1011 ( 

1012 cygrpc.SendInitialMetadataOperation( 

1013 metadata, initial_metadata_flags 

1014 ), 

1015 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1016 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1017 ), 

1018 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1019 ) 

1020 

1021 

1022def _stream_unary_invocation_operations_and_tags( 

1023 metadata: Optional[MetadataType], initial_metadata_flags: int 

1024) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]: 

1025 return tuple( 

1026 ( 

1027 operations, 

1028 None, 

1029 ) 

1030 for operations in _stream_unary_invocation_operations( 

1031 metadata, initial_metadata_flags 

1032 ) 

1033 ) 

1034 

1035 

1036def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]: 

1037 parent_deadline = cygrpc.get_deadline_from_context() 

1038 if parent_deadline is None and user_deadline is None: 

1039 return None 

1040 elif parent_deadline is not None and user_deadline is None: 

1041 return parent_deadline 

1042 elif user_deadline is not None and parent_deadline is None: 

1043 return user_deadline 

1044 else: 

1045 return min(parent_deadline, user_deadline) 

1046 

1047 

1048class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): 

1049 _channel: cygrpc.Channel 

1050 _managed_call: IntegratedCallFactory 

1051 _method: bytes 

1052 _request_serializer: Optional[SerializingFunction] 

1053 _response_deserializer: Optional[DeserializingFunction] 

1054 _context: Any 

1055 

1056 # pylint: disable=too-many-arguments 

1057 def __init__( 

1058 self, 

1059 channel: cygrpc.Channel, 

1060 managed_call: IntegratedCallFactory, 

1061 method: bytes, 

1062 request_serializer: Optional[SerializingFunction], 

1063 response_deserializer: Optional[DeserializingFunction], 

1064 ): 

1065 self._channel = channel 

1066 self._managed_call = managed_call 

1067 self._method = method 

1068 self._request_serializer = request_serializer 

1069 self._response_deserializer = response_deserializer 

1070 self._context = cygrpc.build_census_context() 

1071 

1072 def _prepare( 

1073 self, 

1074 request: Any, 

1075 timeout: Optional[float], 

1076 metadata: Optional[MetadataType], 

1077 wait_for_ready: Optional[bool], 

1078 compression: Optional[grpc.Compression], 

1079 ) -> Tuple[ 

1080 Optional[_RPCState], 

1081 Optional[Sequence[cygrpc.Operation]], 

1082 Optional[float], 

1083 Optional[grpc.RpcError], 

1084 ]: 

1085 deadline, serialized_request, rendezvous = _start_unary_request( 

1086 request, timeout, self._request_serializer 

1087 ) 

1088 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1089 wait_for_ready 

1090 ) 

1091 augmented_metadata = _compression.augment_metadata( 

1092 metadata, compression 

1093 ) 

1094 if serialized_request is None: 

1095 return None, None, None, rendezvous 

1096 else: 

1097 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) 

1098 operations = ( 

1099 cygrpc.SendInitialMetadataOperation( 

1100 augmented_metadata, initial_metadata_flags 

1101 ), 

1102 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1103 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1104 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), 

1105 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1106 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1107 ) 

1108 return state, operations, deadline, None 

1109 

1110 def _blocking( 

1111 self, 

1112 request: Any, 

1113 timeout: Optional[float] = None, 

1114 metadata: Optional[MetadataType] = None, 

1115 credentials: Optional[grpc.CallCredentials] = None, 

1116 wait_for_ready: Optional[bool] = None, 

1117 compression: Optional[grpc.Compression] = None, 

1118 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1119 state, operations, deadline, rendezvous = self._prepare( 

1120 request, timeout, metadata, wait_for_ready, compression 

1121 ) 

1122 if state is None: 

1123 raise rendezvous # pylint: disable-msg=raising-bad-type 

1124 else: 

1125 state.rpc_start_time = datetime.utcnow() 

1126 state.method = _common.decode(self._method) 

1127 call = self._channel.segregated_call( 

1128 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1129 self._method, 

1130 None, 

1131 _determine_deadline(deadline), 

1132 metadata, 

1133 None if credentials is None else credentials._credentials, 

1134 ( 

1135 ( 

1136 operations, 

1137 None, 

1138 ), 

1139 ), 

1140 self._context, 

1141 ) 

1142 event = call.next_event() 

1143 _handle_event(event, state, self._response_deserializer) 

1144 return state, call 

1145 

1146 def __call__( 

1147 self, 

1148 request: Any, 

1149 timeout: Optional[float] = None, 

1150 metadata: Optional[MetadataType] = None, 

1151 credentials: Optional[grpc.CallCredentials] = None, 

1152 wait_for_ready: Optional[bool] = None, 

1153 compression: Optional[grpc.Compression] = None, 

1154 ) -> Any: 

1155 ( 

1156 state, 

1157 call, 

1158 ) = self._blocking( 

1159 request, timeout, metadata, credentials, wait_for_ready, compression 

1160 ) 

1161 return _end_unary_response_blocking(state, call, False, None) 

1162 

1163 def with_call( 

1164 self, 

1165 request: Any, 

1166 timeout: Optional[float] = None, 

1167 metadata: Optional[MetadataType] = None, 

1168 credentials: Optional[grpc.CallCredentials] = None, 

1169 wait_for_ready: Optional[bool] = None, 

1170 compression: Optional[grpc.Compression] = None, 

1171 ) -> Tuple[Any, grpc.Call]: 

1172 ( 

1173 state, 

1174 call, 

1175 ) = self._blocking( 

1176 request, timeout, metadata, credentials, wait_for_ready, compression 

1177 ) 

1178 return _end_unary_response_blocking(state, call, True, None) 

1179 

1180 def future( 

1181 self, 

1182 request: Any, 

1183 timeout: Optional[float] = None, 

1184 metadata: Optional[MetadataType] = None, 

1185 credentials: Optional[grpc.CallCredentials] = None, 

1186 wait_for_ready: Optional[bool] = None, 

1187 compression: Optional[grpc.Compression] = None, 

1188 ) -> _MultiThreadedRendezvous: 

1189 state, operations, deadline, rendezvous = self._prepare( 

1190 request, timeout, metadata, wait_for_ready, compression 

1191 ) 

1192 if state is None: 

1193 raise rendezvous # pylint: disable-msg=raising-bad-type 

1194 else: 

1195 event_handler = _event_handler(state, self._response_deserializer) 

1196 state.rpc_start_time = datetime.utcnow() 

1197 state.method = _common.decode(self._method) 

1198 call = self._managed_call( 

1199 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1200 self._method, 

1201 None, 

1202 deadline, 

1203 metadata, 

1204 None if credentials is None else credentials._credentials, 

1205 (operations,), 

1206 event_handler, 

1207 self._context, 

1208 ) 

1209 return _MultiThreadedRendezvous( 

1210 state, call, self._response_deserializer, deadline 

1211 ) 

1212 

1213 

1214class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1215 _channel: cygrpc.Channel 

1216 _method: bytes 

1217 _request_serializer: Optional[SerializingFunction] 

1218 _response_deserializer: Optional[DeserializingFunction] 

1219 _context: Any 

1220 

1221 # pylint: disable=too-many-arguments 

1222 def __init__( 

1223 self, 

1224 channel: cygrpc.Channel, 

1225 method: bytes, 

1226 request_serializer: SerializingFunction, 

1227 response_deserializer: DeserializingFunction, 

1228 ): 

1229 self._channel = channel 

1230 self._method = method 

1231 self._request_serializer = request_serializer 

1232 self._response_deserializer = response_deserializer 

1233 self._context = cygrpc.build_census_context() 

1234 

1235 def __call__( # pylint: disable=too-many-locals 

1236 self, 

1237 request: Any, 

1238 timeout: Optional[float] = None, 

1239 metadata: Optional[MetadataType] = None, 

1240 credentials: Optional[grpc.CallCredentials] = None, 

1241 wait_for_ready: Optional[bool] = None, 

1242 compression: Optional[grpc.Compression] = None, 

1243 ) -> _SingleThreadedRendezvous: 

1244 deadline = _deadline(timeout) 

1245 serialized_request = _common.serialize( 

1246 request, self._request_serializer 

1247 ) 

1248 if serialized_request is None: 

1249 state = _RPCState( 

1250 (), 

1251 (), 

1252 (), 

1253 grpc.StatusCode.INTERNAL, 

1254 "Exception serializing request!", 

1255 ) 

1256 raise _InactiveRpcError(state) 

1257 

1258 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1259 call_credentials = ( 

1260 None if credentials is None else credentials._credentials 

1261 ) 

1262 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1263 wait_for_ready 

1264 ) 

1265 augmented_metadata = _compression.augment_metadata( 

1266 metadata, compression 

1267 ) 

1268 operations = ( 

1269 ( 

1270 cygrpc.SendInitialMetadataOperation( 

1271 augmented_metadata, initial_metadata_flags 

1272 ), 

1273 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1274 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1275 ), 

1276 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),), 

1277 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1278 ) 

1279 operations_and_tags = tuple((ops, None) for ops in operations) 

1280 state.rpc_start_time = datetime.utcnow() 

1281 state.method = _common.decode(self._method) 

1282 call = self._channel.segregated_call( 

1283 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1284 self._method, 

1285 None, 

1286 _determine_deadline(deadline), 

1287 metadata, 

1288 call_credentials, 

1289 operations_and_tags, 

1290 self._context, 

1291 ) 

1292 return _SingleThreadedRendezvous( 

1293 state, call, self._response_deserializer, deadline 

1294 ) 

1295 

1296 

1297class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1298 _channel: cygrpc.Channel 

1299 _managed_call: IntegratedCallFactory 

1300 _method: bytes 

1301 _request_serializer: Optional[SerializingFunction] 

1302 _response_deserializer: Optional[DeserializingFunction] 

1303 _context: Any 

1304 

1305 # pylint: disable=too-many-arguments 

1306 def __init__( 

1307 self, 

1308 channel: cygrpc.Channel, 

1309 managed_call: IntegratedCallFactory, 

1310 method: bytes, 

1311 request_serializer: SerializingFunction, 

1312 response_deserializer: DeserializingFunction, 

1313 ): 

1314 self._channel = channel 

1315 self._managed_call = managed_call 

1316 self._method = method 

1317 self._request_serializer = request_serializer 

1318 self._response_deserializer = response_deserializer 

1319 self._context = cygrpc.build_census_context() 

1320 

1321 def __call__( # pylint: disable=too-many-locals 

1322 self, 

1323 request: Any, 

1324 timeout: Optional[float] = None, 

1325 metadata: Optional[MetadataType] = None, 

1326 credentials: Optional[grpc.CallCredentials] = None, 

1327 wait_for_ready: Optional[bool] = None, 

1328 compression: Optional[grpc.Compression] = None, 

1329 ) -> _MultiThreadedRendezvous: 

1330 deadline, serialized_request, rendezvous = _start_unary_request( 

1331 request, timeout, self._request_serializer 

1332 ) 

1333 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1334 wait_for_ready 

1335 ) 

1336 if serialized_request is None: 

1337 raise rendezvous # pylint: disable-msg=raising-bad-type 

1338 else: 

1339 augmented_metadata = _compression.augment_metadata( 

1340 metadata, compression 

1341 ) 

1342 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1343 operations = ( 

1344 ( 

1345 cygrpc.SendInitialMetadataOperation( 

1346 augmented_metadata, initial_metadata_flags 

1347 ), 

1348 cygrpc.SendMessageOperation( 

1349 serialized_request, _EMPTY_FLAGS 

1350 ), 

1351 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1352 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1353 ), 

1354 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1355 ) 

1356 state.rpc_start_time = datetime.utcnow() 

1357 state.method = _common.decode(self._method) 

1358 call = self._managed_call( 

1359 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1360 self._method, 

1361 None, 

1362 _determine_deadline(deadline), 

1363 metadata, 

1364 None if credentials is None else credentials._credentials, 

1365 operations, 

1366 _event_handler(state, self._response_deserializer), 

1367 self._context, 

1368 ) 

1369 return _MultiThreadedRendezvous( 

1370 state, call, self._response_deserializer, deadline 

1371 ) 

1372 

1373 

1374class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): 

1375 _channel: cygrpc.Channel 

1376 _managed_call: IntegratedCallFactory 

1377 _method: bytes 

1378 _request_serializer: Optional[SerializingFunction] 

1379 _response_deserializer: Optional[DeserializingFunction] 

1380 _context: Any 

1381 

1382 # pylint: disable=too-many-arguments 

1383 def __init__( 

1384 self, 

1385 channel: cygrpc.Channel, 

1386 managed_call: IntegratedCallFactory, 

1387 method: bytes, 

1388 request_serializer: Optional[SerializingFunction], 

1389 response_deserializer: Optional[DeserializingFunction], 

1390 ): 

1391 self._channel = channel 

1392 self._managed_call = managed_call 

1393 self._method = method 

1394 self._request_serializer = request_serializer 

1395 self._response_deserializer = response_deserializer 

1396 self._context = cygrpc.build_census_context() 

1397 

1398 def _blocking( 

1399 self, 

1400 request_iterator: Iterator, 

1401 timeout: Optional[float], 

1402 metadata: Optional[MetadataType], 

1403 credentials: Optional[grpc.CallCredentials], 

1404 wait_for_ready: Optional[bool], 

1405 compression: Optional[grpc.Compression], 

1406 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1407 deadline = _deadline(timeout) 

1408 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1409 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1410 wait_for_ready 

1411 ) 

1412 augmented_metadata = _compression.augment_metadata( 

1413 metadata, compression 

1414 ) 

1415 state.rpc_start_time = datetime.utcnow() 

1416 state.method = _common.decode(self._method) 

1417 call = self._channel.segregated_call( 

1418 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1419 self._method, 

1420 None, 

1421 _determine_deadline(deadline), 

1422 augmented_metadata, 

1423 None if credentials is None else credentials._credentials, 

1424 _stream_unary_invocation_operations_and_tags( 

1425 augmented_metadata, initial_metadata_flags 

1426 ), 

1427 self._context, 

1428 ) 

1429 _consume_request_iterator( 

1430 request_iterator, state, call, self._request_serializer, None 

1431 ) 

1432 while True: 

1433 event = call.next_event() 

1434 with state.condition: 

1435 _handle_event(event, state, self._response_deserializer) 

1436 state.condition.notify_all() 

1437 if not state.due: 

1438 break 

1439 return state, call 

1440 

1441 def __call__( 

1442 self, 

1443 request_iterator: Iterator, 

1444 timeout: Optional[float] = None, 

1445 metadata: Optional[MetadataType] = None, 

1446 credentials: Optional[grpc.CallCredentials] = None, 

1447 wait_for_ready: Optional[bool] = None, 

1448 compression: Optional[grpc.Compression] = None, 

1449 ) -> Any: 

1450 ( 

1451 state, 

1452 call, 

1453 ) = self._blocking( 

1454 request_iterator, 

1455 timeout, 

1456 metadata, 

1457 credentials, 

1458 wait_for_ready, 

1459 compression, 

1460 ) 

1461 return _end_unary_response_blocking(state, call, False, None) 

1462 

1463 def with_call( 

1464 self, 

1465 request_iterator: Iterator, 

1466 timeout: Optional[float] = None, 

1467 metadata: Optional[MetadataType] = None, 

1468 credentials: Optional[grpc.CallCredentials] = None, 

1469 wait_for_ready: Optional[bool] = None, 

1470 compression: Optional[grpc.Compression] = None, 

1471 ) -> Tuple[Any, grpc.Call]: 

1472 ( 

1473 state, 

1474 call, 

1475 ) = self._blocking( 

1476 request_iterator, 

1477 timeout, 

1478 metadata, 

1479 credentials, 

1480 wait_for_ready, 

1481 compression, 

1482 ) 

1483 return _end_unary_response_blocking(state, call, True, None) 

1484 

1485 def future( 

1486 self, 

1487 request_iterator: Iterator, 

1488 timeout: Optional[float] = None, 

1489 metadata: Optional[MetadataType] = None, 

1490 credentials: Optional[grpc.CallCredentials] = None, 

1491 wait_for_ready: Optional[bool] = None, 

1492 compression: Optional[grpc.Compression] = None, 

1493 ) -> _MultiThreadedRendezvous: 

1494 deadline = _deadline(timeout) 

1495 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1496 event_handler = _event_handler(state, self._response_deserializer) 

1497 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1498 wait_for_ready 

1499 ) 

1500 augmented_metadata = _compression.augment_metadata( 

1501 metadata, compression 

1502 ) 

1503 state.rpc_start_time = datetime.utcnow() 

1504 state.method = _common.decode(self._method) 

1505 call = self._managed_call( 

1506 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1507 self._method, 

1508 None, 

1509 deadline, 

1510 augmented_metadata, 

1511 None if credentials is None else credentials._credentials, 

1512 _stream_unary_invocation_operations( 

1513 metadata, initial_metadata_flags 

1514 ), 

1515 event_handler, 

1516 self._context, 

1517 ) 

1518 _consume_request_iterator( 

1519 request_iterator, 

1520 state, 

1521 call, 

1522 self._request_serializer, 

1523 event_handler, 

1524 ) 

1525 return _MultiThreadedRendezvous( 

1526 state, call, self._response_deserializer, deadline 

1527 ) 

1528 

1529 

1530class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): 

1531 _channel: cygrpc.Channel 

1532 _managed_call: IntegratedCallFactory 

1533 _method: bytes 

1534 _request_serializer: Optional[SerializingFunction] 

1535 _response_deserializer: Optional[DeserializingFunction] 

1536 _context: Any 

1537 

1538 # pylint: disable=too-many-arguments 

1539 def __init__( 

1540 self, 

1541 channel: cygrpc.Channel, 

1542 managed_call: IntegratedCallFactory, 

1543 method: bytes, 

1544 request_serializer: Optional[SerializingFunction] = None, 

1545 response_deserializer: Optional[DeserializingFunction] = None, 

1546 ): 

1547 self._channel = channel 

1548 self._managed_call = managed_call 

1549 self._method = method 

1550 self._request_serializer = request_serializer 

1551 self._response_deserializer = response_deserializer 

1552 self._context = cygrpc.build_census_context() 

1553 

1554 def __call__( 

1555 self, 

1556 request_iterator: Iterator, 

1557 timeout: Optional[float] = None, 

1558 metadata: Optional[MetadataType] = None, 

1559 credentials: Optional[grpc.CallCredentials] = None, 

1560 wait_for_ready: Optional[bool] = None, 

1561 compression: Optional[grpc.Compression] = None, 

1562 ) -> _MultiThreadedRendezvous: 

1563 deadline = _deadline(timeout) 

1564 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) 

1565 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1566 wait_for_ready 

1567 ) 

1568 augmented_metadata = _compression.augment_metadata( 

1569 metadata, compression 

1570 ) 

1571 operations = ( 

1572 ( 

1573 cygrpc.SendInitialMetadataOperation( 

1574 augmented_metadata, initial_metadata_flags 

1575 ), 

1576 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1577 ), 

1578 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1579 ) 

1580 event_handler = _event_handler(state, self._response_deserializer) 

1581 state.rpc_start_time = datetime.utcnow() 

1582 state.method = _common.decode(self._method) 

1583 call = self._managed_call( 

1584 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1585 self._method, 

1586 None, 

1587 _determine_deadline(deadline), 

1588 augmented_metadata, 

1589 None if credentials is None else credentials._credentials, 

1590 operations, 

1591 event_handler, 

1592 self._context, 

1593 ) 

1594 _consume_request_iterator( 

1595 request_iterator, 

1596 state, 

1597 call, 

1598 self._request_serializer, 

1599 event_handler, 

1600 ) 

1601 return _MultiThreadedRendezvous( 

1602 state, call, self._response_deserializer, deadline 

1603 ) 

1604 

1605 

1606class _InitialMetadataFlags(int): 

1607 """Stores immutable initial metadata flags""" 

1608 

1609 def __new__(cls, value: int = _EMPTY_FLAGS): 

1610 value &= cygrpc.InitialMetadataFlags.used_mask 

1611 return super(_InitialMetadataFlags, cls).__new__(cls, value) 

1612 

1613 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int: 

1614 if wait_for_ready is not None: 

1615 if wait_for_ready: 

1616 return self.__class__( 

1617 self 

1618 | cygrpc.InitialMetadataFlags.wait_for_ready 

1619 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1620 ) 

1621 elif not wait_for_ready: 

1622 return self.__class__( 

1623 self & ~cygrpc.InitialMetadataFlags.wait_for_ready 

1624 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1625 ) 

1626 return self 

1627 

1628 

1629class _ChannelCallState(object): 

1630 channel: cygrpc.Channel 

1631 managed_calls: int 

1632 threading: bool 

1633 

1634 def __init__(self, channel: cygrpc.Channel): 

1635 self.lock = threading.Lock() 

1636 self.channel = channel 

1637 self.managed_calls = 0 

1638 self.threading = False 

1639 

1640 def reset_postfork_child(self) -> None: 

1641 self.managed_calls = 0 

1642 

1643 def __del__(self): 

1644 try: 

1645 self.channel.close( 

1646 cygrpc.StatusCode.cancelled, "Channel deallocated!" 

1647 ) 

1648 except (TypeError, AttributeError): 

1649 pass 

1650 

1651 

1652def _run_channel_spin_thread(state: _ChannelCallState) -> None: 

1653 def channel_spin(): 

1654 while True: 

1655 cygrpc.block_if_fork_in_progress(state) 

1656 event = state.channel.next_call_event() 

1657 if event.completion_type == cygrpc.CompletionType.queue_timeout: 

1658 continue 

1659 call_completed = event.tag(event) 

1660 if call_completed: 

1661 with state.lock: 

1662 state.managed_calls -= 1 

1663 if state.managed_calls == 0: 

1664 return 

1665 

1666 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin) 

1667 channel_spin_thread.setDaemon(True) 

1668 channel_spin_thread.start() 

1669 

1670 

1671def _channel_managed_call_management(state: _ChannelCallState): 

1672 # pylint: disable=too-many-arguments 

1673 def create( 

1674 flags: int, 

1675 method: bytes, 

1676 host: Optional[str], 

1677 deadline: Optional[float], 

1678 metadata: Optional[MetadataType], 

1679 credentials: Optional[cygrpc.CallCredentials], 

1680 operations: Sequence[Sequence[cygrpc.Operation]], 

1681 event_handler: UserTag, 

1682 context, 

1683 ) -> cygrpc.IntegratedCall: 

1684 """Creates a cygrpc.IntegratedCall. 

1685 

1686 Args: 

1687 flags: An integer bitfield of call flags. 

1688 method: The RPC method. 

1689 host: A host string for the created call. 

1690 deadline: A float to be the deadline of the created call or None if 

1691 the call is to have an infinite deadline. 

1692 metadata: The metadata for the call or None. 

1693 credentials: A cygrpc.CallCredentials or None. 

1694 operations: A sequence of sequences of cygrpc.Operations to be 

1695 started on the call. 

1696 event_handler: A behavior to call to handle the events resultant from 

1697 the operations on the call. 

1698 context: Context object for distributed tracing. 

1699 Returns: 

1700 A cygrpc.IntegratedCall with which to conduct an RPC. 

1701 """ 

1702 operations_and_tags = tuple( 

1703 ( 

1704 operation, 

1705 event_handler, 

1706 ) 

1707 for operation in operations 

1708 ) 

1709 with state.lock: 

1710 call = state.channel.integrated_call( 

1711 flags, 

1712 method, 

1713 host, 

1714 deadline, 

1715 metadata, 

1716 credentials, 

1717 operations_and_tags, 

1718 context, 

1719 ) 

1720 if state.managed_calls == 0: 

1721 state.managed_calls = 1 

1722 _run_channel_spin_thread(state) 

1723 else: 

1724 state.managed_calls += 1 

1725 return call 

1726 

1727 return create 

1728 

1729 

1730class _ChannelConnectivityState(object): 

1731 lock: threading.RLock 

1732 channel: grpc.Channel 

1733 polling: bool 

1734 connectivity: grpc.ChannelConnectivity 

1735 try_to_connect: bool 

1736 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704 

1737 callbacks_and_connectivities: List[ 

1738 Sequence[ 

1739 Union[ 

1740 Callable[[grpc.ChannelConnectivity], None], 

1741 Optional[grpc.ChannelConnectivity], 

1742 ] 

1743 ] 

1744 ] 

1745 delivering: bool 

1746 

1747 def __init__(self, channel: grpc.Channel): 

1748 self.lock = threading.RLock() 

1749 self.channel = channel 

1750 self.polling = False 

1751 self.connectivity = None 

1752 self.try_to_connect = False 

1753 self.callbacks_and_connectivities = [] 

1754 self.delivering = False 

1755 

1756 def reset_postfork_child(self) -> None: 

1757 self.polling = False 

1758 self.connectivity = None 

1759 self.try_to_connect = False 

1760 self.callbacks_and_connectivities = [] 

1761 self.delivering = False 

1762 

1763 

1764def _deliveries( 

1765 state: _ChannelConnectivityState, 

1766) -> List[Callable[[grpc.ChannelConnectivity], None]]: 

1767 callbacks_needing_update = [] 

1768 for callback_and_connectivity in state.callbacks_and_connectivities: 

1769 ( 

1770 callback, 

1771 callback_connectivity, 

1772 ) = callback_and_connectivity 

1773 if callback_connectivity is not state.connectivity: 

1774 callbacks_needing_update.append(callback) 

1775 callback_and_connectivity[1] = state.connectivity 

1776 return callbacks_needing_update 

1777 

1778 

1779def _deliver( 

1780 state: _ChannelConnectivityState, 

1781 initial_connectivity: grpc.ChannelConnectivity, 

1782 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1783) -> None: 

1784 connectivity = initial_connectivity 

1785 callbacks = initial_callbacks 

1786 while True: 

1787 for callback in callbacks: 

1788 cygrpc.block_if_fork_in_progress(state) 

1789 try: 

1790 callback(connectivity) 

1791 except Exception: # pylint: disable=broad-except 

1792 _LOGGER.exception( 

1793 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE 

1794 ) 

1795 with state.lock: 

1796 callbacks = _deliveries(state) 

1797 if callbacks: 

1798 connectivity = state.connectivity 

1799 else: 

1800 state.delivering = False 

1801 return 

1802 

1803 

1804def _spawn_delivery( 

1805 state: _ChannelConnectivityState, 

1806 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1807) -> None: 

1808 delivering_thread = cygrpc.ForkManagedThread( 

1809 target=_deliver, 

1810 args=( 

1811 state, 

1812 state.connectivity, 

1813 callbacks, 

1814 ), 

1815 ) 

1816 delivering_thread.setDaemon(True) 

1817 delivering_thread.start() 

1818 state.delivering = True 

1819 

1820 

1821# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll. 

1822def _poll_connectivity( 

1823 state: _ChannelConnectivityState, 

1824 channel: grpc.Channel, 

1825 initial_try_to_connect: bool, 

1826) -> None: 

1827 try_to_connect = initial_try_to_connect 

1828 connectivity = channel.check_connectivity_state(try_to_connect) 

1829 with state.lock: 

1830 state.connectivity = ( 

1831 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1832 connectivity 

1833 ] 

1834 ) 

1835 callbacks = tuple( 

1836 callback for callback, _ in state.callbacks_and_connectivities 

1837 ) 

1838 for callback_and_connectivity in state.callbacks_and_connectivities: 

1839 callback_and_connectivity[1] = state.connectivity 

1840 if callbacks: 

1841 _spawn_delivery(state, callbacks) 

1842 while True: 

1843 event = channel.watch_connectivity_state( 

1844 connectivity, time.time() + 0.2 

1845 ) 

1846 cygrpc.block_if_fork_in_progress(state) 

1847 with state.lock: 

1848 if ( 

1849 not state.callbacks_and_connectivities 

1850 and not state.try_to_connect 

1851 ): 

1852 state.polling = False 

1853 state.connectivity = None 

1854 break 

1855 try_to_connect = state.try_to_connect 

1856 state.try_to_connect = False 

1857 if event.success or try_to_connect: 

1858 connectivity = channel.check_connectivity_state(try_to_connect) 

1859 with state.lock: 

1860 state.connectivity = ( 

1861 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1862 connectivity 

1863 ] 

1864 ) 

1865 if not state.delivering: 

1866 callbacks = _deliveries(state) 

1867 if callbacks: 

1868 _spawn_delivery(state, callbacks) 

1869 

1870 

1871def _subscribe( 

1872 state: _ChannelConnectivityState, 

1873 callback: Callable[[grpc.ChannelConnectivity], None], 

1874 try_to_connect: bool, 

1875) -> None: 

1876 with state.lock: 

1877 if not state.callbacks_and_connectivities and not state.polling: 

1878 polling_thread = cygrpc.ForkManagedThread( 

1879 target=_poll_connectivity, 

1880 args=(state, state.channel, bool(try_to_connect)), 

1881 ) 

1882 polling_thread.setDaemon(True) 

1883 polling_thread.start() 

1884 state.polling = True 

1885 state.callbacks_and_connectivities.append([callback, None]) 

1886 elif not state.delivering and state.connectivity is not None: 

1887 _spawn_delivery(state, (callback,)) 

1888 state.try_to_connect |= bool(try_to_connect) 

1889 state.callbacks_and_connectivities.append( 

1890 [callback, state.connectivity] 

1891 ) 

1892 else: 

1893 state.try_to_connect |= bool(try_to_connect) 

1894 state.callbacks_and_connectivities.append([callback, None]) 

1895 

1896 

1897def _unsubscribe( 

1898 state: _ChannelConnectivityState, 

1899 callback: Callable[[grpc.ChannelConnectivity], None], 

1900) -> None: 

1901 with state.lock: 

1902 for index, (subscribed_callback, unused_connectivity) in enumerate( 

1903 state.callbacks_and_connectivities 

1904 ): 

1905 if callback == subscribed_callback: 

1906 state.callbacks_and_connectivities.pop(index) 

1907 break 

1908 

1909 

1910def _augment_options( 

1911 base_options: Sequence[ChannelArgumentType], 

1912 compression: Optional[grpc.Compression], 

1913) -> Sequence[ChannelArgumentType]: 

1914 compression_option = _compression.create_channel_option(compression) 

1915 return ( 

1916 tuple(base_options) 

1917 + compression_option 

1918 + ( 

1919 ( 

1920 cygrpc.ChannelArgKey.primary_user_agent_string, 

1921 _USER_AGENT, 

1922 ), 

1923 ) 

1924 ) 

1925 

1926 

1927def _separate_channel_options( 

1928 options: Sequence[ChannelArgumentType], 

1929) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]: 

1930 """Separates core channel options from Python channel options.""" 

1931 core_options = [] 

1932 python_options = [] 

1933 for pair in options: 

1934 if ( 

1935 pair[0] 

1936 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

1937 ): 

1938 python_options.append(pair) 

1939 else: 

1940 core_options.append(pair) 

1941 return python_options, core_options 

1942 

1943 

1944class Channel(grpc.Channel): 

1945 """A cygrpc.Channel-backed implementation of grpc.Channel.""" 

1946 

1947 _single_threaded_unary_stream: bool 

1948 _channel: cygrpc.Channel 

1949 _call_state: _ChannelCallState 

1950 _connectivity_state: _ChannelConnectivityState 

1951 

1952 def __init__( 

1953 self, 

1954 target: str, 

1955 options: Sequence[ChannelArgumentType], 

1956 credentials: Optional[grpc.ChannelCredentials], 

1957 compression: Optional[grpc.Compression], 

1958 ): 

1959 """Constructor. 

1960 

1961 Args: 

1962 target: The target to which to connect. 

1963 options: Configuration options for the channel. 

1964 credentials: A cygrpc.ChannelCredentials or None. 

1965 compression: An optional value indicating the compression method to be 

1966 used over the lifetime of the channel. 

1967 """ 

1968 python_options, core_options = _separate_channel_options(options) 

1969 self._single_threaded_unary_stream = ( 

1970 _DEFAULT_SINGLE_THREADED_UNARY_STREAM 

1971 ) 

1972 self._process_python_options(python_options) 

1973 self._channel = cygrpc.Channel( 

1974 _common.encode(target), 

1975 _augment_options(core_options, compression), 

1976 credentials, 

1977 ) 

1978 self._call_state = _ChannelCallState(self._channel) 

1979 self._connectivity_state = _ChannelConnectivityState(self._channel) 

1980 cygrpc.fork_register_channel(self) 

1981 if cygrpc.g_gevent_activated: 

1982 cygrpc.gevent_increment_channel_count() 

1983 

1984 def _process_python_options( 

1985 self, python_options: Sequence[ChannelArgumentType] 

1986 ) -> None: 

1987 """Sets channel attributes according to python-only channel options.""" 

1988 for pair in python_options: 

1989 if ( 

1990 pair[0] 

1991 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

1992 ): 

1993 self._single_threaded_unary_stream = True 

1994 

1995 def subscribe( 

1996 self, 

1997 callback: Callable[[grpc.ChannelConnectivity], None], 

1998 try_to_connect: Optional[bool] = None, 

1999 ) -> None: 

2000 _subscribe(self._connectivity_state, callback, try_to_connect) 

2001 

2002 def unsubscribe( 

2003 self, callback: Callable[[grpc.ChannelConnectivity], None] 

2004 ) -> None: 

2005 _unsubscribe(self._connectivity_state, callback) 

2006 

2007 def unary_unary( 

2008 self, 

2009 method: str, 

2010 request_serializer: Optional[SerializingFunction] = None, 

2011 response_deserializer: Optional[DeserializingFunction] = None, 

2012 ) -> grpc.UnaryUnaryMultiCallable: 

2013 return _UnaryUnaryMultiCallable( 

2014 self._channel, 

2015 _channel_managed_call_management(self._call_state), 

2016 _common.encode(method), 

2017 request_serializer, 

2018 response_deserializer, 

2019 ) 

2020 

2021 def unary_stream( 

2022 self, 

2023 method: str, 

2024 request_serializer: Optional[SerializingFunction] = None, 

2025 response_deserializer: Optional[DeserializingFunction] = None, 

2026 ) -> grpc.UnaryStreamMultiCallable: 

2027 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC 

2028 # on a single Python thread results in an appreciable speed-up. However, 

2029 # due to slight differences in capability, the multi-threaded variant 

2030 # remains the default. 

2031 if self._single_threaded_unary_stream: 

2032 return _SingleThreadedUnaryStreamMultiCallable( 

2033 self._channel, 

2034 _common.encode(method), 

2035 request_serializer, 

2036 response_deserializer, 

2037 ) 

2038 else: 

2039 return _UnaryStreamMultiCallable( 

2040 self._channel, 

2041 _channel_managed_call_management(self._call_state), 

2042 _common.encode(method), 

2043 request_serializer, 

2044 response_deserializer, 

2045 ) 

2046 

2047 def stream_unary( 

2048 self, 

2049 method: str, 

2050 request_serializer: Optional[SerializingFunction] = None, 

2051 response_deserializer: Optional[DeserializingFunction] = None, 

2052 ) -> grpc.StreamUnaryMultiCallable: 

2053 return _StreamUnaryMultiCallable( 

2054 self._channel, 

2055 _channel_managed_call_management(self._call_state), 

2056 _common.encode(method), 

2057 request_serializer, 

2058 response_deserializer, 

2059 ) 

2060 

2061 def stream_stream( 

2062 self, 

2063 method: str, 

2064 request_serializer: Optional[SerializingFunction] = None, 

2065 response_deserializer: Optional[DeserializingFunction] = None, 

2066 ) -> grpc.StreamStreamMultiCallable: 

2067 return _StreamStreamMultiCallable( 

2068 self._channel, 

2069 _channel_managed_call_management(self._call_state), 

2070 _common.encode(method), 

2071 request_serializer, 

2072 response_deserializer, 

2073 ) 

2074 

2075 def _unsubscribe_all(self) -> None: 

2076 state = self._connectivity_state 

2077 if state: 

2078 with state.lock: 

2079 del state.callbacks_and_connectivities[:] 

2080 

2081 def _close(self) -> None: 

2082 self._unsubscribe_all() 

2083 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!") 

2084 cygrpc.fork_unregister_channel(self) 

2085 if cygrpc.g_gevent_activated: 

2086 cygrpc.gevent_decrement_channel_count() 

2087 

2088 def _close_on_fork(self) -> None: 

2089 self._unsubscribe_all() 

2090 self._channel.close_on_fork( 

2091 cygrpc.StatusCode.cancelled, "Channel closed due to fork" 

2092 ) 

2093 

2094 def __enter__(self): 

2095 return self 

2096 

2097 def __exit__(self, exc_type, exc_val, exc_tb): 

2098 self._close() 

2099 return False 

2100 

2101 def close(self) -> None: 

2102 self._close() 

2103 

2104 def __del__(self): 

2105 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases 

2106 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call 

2107 # here (or more likely, call self._close() here). We don't do this today 

2108 # because many valid use cases today allow the channel to be deleted 

2109 # immediately after stubs are created. After a sufficient period of time 

2110 # has passed for all users to be trusted to freeze out to their channels 

2111 # for as long as they are in use and to close them after using them, 

2112 # then deletion of this grpc._channel.Channel instance can be made to 

2113 # effect closure of the underlying cygrpc.Channel instance. 

2114 try: 

2115 self._unsubscribe_all() 

2116 except: # pylint: disable=bare-except 

2117 # Exceptions in __del__ are ignored by Python anyway, but they can 

2118 # keep spamming logs. Just silence them. 

2119 pass