Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_channel.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

974 statements  

1# Copyright 2016 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Python.""" 

15 

16import copy 

17import functools 

18import logging 

19import os 

20import sys 

21import threading 

22import time 

23import types 

24from typing import ( 

25 Any, 

26 Callable, 

27 Dict, 

28 Iterator, 

29 List, 

30 Optional, 

31 Sequence, 

32 Set, 

33 Tuple, 

34 Union, 

35) 

36 

37import grpc # pytype: disable=pyi-error 

38from grpc import _common # pytype: disable=pyi-error 

39from grpc import _compression # pytype: disable=pyi-error 

40from grpc import _grpcio_metadata # pytype: disable=pyi-error 

41from grpc import _observability # pytype: disable=pyi-error 

42from grpc._cython import cygrpc 

43from grpc._typing import ChannelArgumentType 

44from grpc._typing import DeserializingFunction 

45from grpc._typing import IntegratedCallFactory 

46from grpc._typing import MetadataType 

47from grpc._typing import NullaryCallbackType 

48from grpc._typing import ResponseType 

49from grpc._typing import SerializingFunction 

50from grpc._typing import UserTag 

51import grpc.experimental # pytype: disable=pyi-error 

52 

53_LOGGER = logging.getLogger(__name__) 

54 

55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__) 

56 

57_EMPTY_FLAGS = 0 

58 

59# NOTE(rbellevi): No guarantees are given about the maintenance of this 

60# environment variable. 

61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = ( 

62 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None 

63) 

64 

65_UNARY_UNARY_INITIAL_DUE = ( 

66 cygrpc.OperationType.send_initial_metadata, 

67 cygrpc.OperationType.send_message, 

68 cygrpc.OperationType.send_close_from_client, 

69 cygrpc.OperationType.receive_initial_metadata, 

70 cygrpc.OperationType.receive_message, 

71 cygrpc.OperationType.receive_status_on_client, 

72) 

73_UNARY_STREAM_INITIAL_DUE = ( 

74 cygrpc.OperationType.send_initial_metadata, 

75 cygrpc.OperationType.send_message, 

76 cygrpc.OperationType.send_close_from_client, 

77 cygrpc.OperationType.receive_initial_metadata, 

78 cygrpc.OperationType.receive_status_on_client, 

79) 

80_STREAM_UNARY_INITIAL_DUE = ( 

81 cygrpc.OperationType.send_initial_metadata, 

82 cygrpc.OperationType.receive_initial_metadata, 

83 cygrpc.OperationType.receive_message, 

84 cygrpc.OperationType.receive_status_on_client, 

85) 

86_STREAM_STREAM_INITIAL_DUE = ( 

87 cygrpc.OperationType.send_initial_metadata, 

88 cygrpc.OperationType.receive_initial_metadata, 

89 cygrpc.OperationType.receive_status_on_client, 

90) 

91 

92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 

93 "Exception calling channel subscription callback!" 

94) 

95 

96_OK_RENDEZVOUS_REPR_FORMAT = ( 

97 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' 

98) 

99 

100_NON_OK_RENDEZVOUS_REPR_FORMAT = ( 

101 "<{} of RPC that terminated with:\n" 

102 "\tstatus = {}\n" 

103 '\tdetails = "{}"\n' 

104 '\tdebug_error_string = "{}"\n' 

105 ">" 

106) 

107 

108 

109def _deadline(timeout: Optional[float]) -> Optional[float]: 

110 return None if timeout is None else time.time() + timeout 

111 

112 

113def _unknown_code_details( 

114 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str] 

115) -> str: 

116 return 'Server sent unknown code {} and details "{}"'.format( 

117 unknown_cygrpc_code, details 

118 ) 

119 

120 

121class _RPCState(object): 

122 condition: threading.Condition 

123 due: Set[cygrpc.OperationType] 

124 initial_metadata: Optional[MetadataType] 

125 response: Any 

126 trailing_metadata: Optional[MetadataType] 

127 code: Optional[grpc.StatusCode] 

128 details: Optional[str] 

129 debug_error_string: Optional[str] 

130 cancelled: bool 

131 callbacks: List[NullaryCallbackType] 

132 fork_epoch: Optional[int] 

133 rpc_start_time: Optional[float] # In relative seconds 

134 rpc_end_time: Optional[float] # In relative seconds 

135 method: Optional[str] 

136 target: Optional[str] 

137 

138 def __init__( 

139 self, 

140 due: Sequence[cygrpc.OperationType], 

141 initial_metadata: Optional[MetadataType], 

142 trailing_metadata: Optional[MetadataType], 

143 code: Optional[grpc.StatusCode], 

144 details: Optional[str], 

145 ): 

146 # `condition` guards all members of _RPCState. `notify_all` is called on 

147 # `condition` when the state of the RPC has changed. 

148 self.condition = threading.Condition() 

149 

150 # The cygrpc.OperationType objects representing events due from the RPC's 

151 # completion queue. If an operation is in `due`, it is guaranteed that 

152 # `operate()` has been called on a corresponding operation. But the 

153 # converse is not true. That is, in the case of failed `operate()` 

154 # calls, there may briefly be events in `due` that do not correspond to 

155 # operations submitted to Core. 

156 self.due = set(due) 

157 self.initial_metadata = initial_metadata 

158 self.response = None 

159 self.trailing_metadata = trailing_metadata 

160 self.code = code 

161 self.details = details 

162 self.debug_error_string = None 

163 # The following three fields are used for observability. 

164 # Updates to those fields do not trigger self.condition. 

165 self.rpc_start_time = None 

166 self.rpc_end_time = None 

167 self.method = None 

168 self.target = None 

169 

170 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are 

171 # slightly wonky, so they have to be tracked separately from the rest of the 

172 # result of the RPC. This field tracks whether cancellation was requested 

173 # prior to termination of the RPC. 

174 self.cancelled = False 

175 self.callbacks = [] 

176 self.fork_epoch = cygrpc.get_fork_epoch() 

177 

178 def reset_postfork_child(self): 

179 self.condition = threading.Condition() 

180 

181 

182def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None: 

183 if state.code is None: 

184 state.code = code 

185 state.details = details 

186 if state.initial_metadata is None: 

187 state.initial_metadata = () 

188 state.trailing_metadata = () 

189 

190 

191def _handle_event( 

192 event: cygrpc.BaseEvent, 

193 state: _RPCState, 

194 response_deserializer: Optional[DeserializingFunction], 

195) -> List[NullaryCallbackType]: 

196 callbacks = [] 

197 for batch_operation in event.batch_operations: 

198 operation_type = batch_operation.type() 

199 state.due.remove(operation_type) 

200 if operation_type == cygrpc.OperationType.receive_initial_metadata: 

201 state.initial_metadata = batch_operation.initial_metadata() 

202 elif operation_type == cygrpc.OperationType.receive_message: 

203 serialized_response = batch_operation.message() 

204 if serialized_response is not None: 

205 response = _common.deserialize( 

206 serialized_response, response_deserializer 

207 ) 

208 if response is None: 

209 details = "Exception deserializing response!" 

210 _abort(state, grpc.StatusCode.INTERNAL, details) 

211 else: 

212 state.response = response 

213 elif operation_type == cygrpc.OperationType.receive_status_on_client: 

214 state.trailing_metadata = batch_operation.trailing_metadata() 

215 if state.code is None: 

216 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( 

217 batch_operation.code() 

218 ) 

219 if code is None: 

220 state.code = grpc.StatusCode.UNKNOWN 

221 state.details = _unknown_code_details( 

222 code, batch_operation.details() 

223 ) 

224 else: 

225 state.code = code 

226 state.details = batch_operation.details() 

227 state.debug_error_string = batch_operation.error_string() 

228 state.rpc_end_time = time.perf_counter() 

229 _observability.maybe_record_rpc_latency(state) 

230 callbacks.extend(state.callbacks) 

231 state.callbacks = None 

232 return callbacks 

233 

234 

235def _event_handler( 

236 state: _RPCState, response_deserializer: Optional[DeserializingFunction] 

237) -> UserTag: 

238 def handle_event(event): 

239 with state.condition: 

240 callbacks = _handle_event(event, state, response_deserializer) 

241 state.condition.notify_all() 

242 done = not state.due 

243 for callback in callbacks: 

244 try: 

245 callback() 

246 except Exception as e: # pylint: disable=broad-except 

247 # NOTE(rbellevi): We suppress but log errors here so as not to 

248 # kill the channel spin thread. 

249 logging.error( 

250 "Exception in callback %s: %s", repr(callback.func), repr(e) 

251 ) 

252 return done and state.fork_epoch >= cygrpc.get_fork_epoch() 

253 

254 return handle_event 

255 

256 

257# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall. 

258# pylint: disable=too-many-statements 

259def _consume_request_iterator( 

260 request_iterator: Iterator, 

261 state: _RPCState, 

262 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall], 

263 request_serializer: SerializingFunction, 

264 event_handler: Optional[UserTag], 

265) -> None: 

266 """Consume a request supplied by the user.""" 

267 

268 def consume_request_iterator(): # pylint: disable=too-many-branches 

269 # Iterate over the request iterator until it is exhausted or an error 

270 # condition is encountered. 

271 while True: 

272 return_from_user_request_generator_invoked = False 

273 try: 

274 # The thread may die in user-code. Do not block fork for this. 

275 cygrpc.enter_user_request_generator() 

276 request = next(request_iterator) 

277 except StopIteration: 

278 break 

279 except Exception: # pylint: disable=broad-except 

280 cygrpc.return_from_user_request_generator() 

281 return_from_user_request_generator_invoked = True 

282 code = grpc.StatusCode.UNKNOWN 

283 details = "Exception iterating requests!" 

284 _LOGGER.exception(details) 

285 call.cancel( 

286 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

287 ) 

288 _abort(state, code, details) 

289 return 

290 finally: 

291 if not return_from_user_request_generator_invoked: 

292 cygrpc.return_from_user_request_generator() 

293 serialized_request = _common.serialize(request, request_serializer) 

294 with state.condition: 

295 if state.code is None and not state.cancelled: 

296 if serialized_request is None: 

297 code = grpc.StatusCode.INTERNAL 

298 details = "Exception serializing request!" 

299 call.cancel( 

300 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 

301 details, 

302 ) 

303 _abort(state, code, details) 

304 return 

305 else: 

306 state.due.add(cygrpc.OperationType.send_message) 

307 operations = ( 

308 cygrpc.SendMessageOperation( 

309 serialized_request, _EMPTY_FLAGS 

310 ), 

311 ) 

312 operating = call.operate(operations, event_handler) 

313 if not operating: 

314 state.due.remove(cygrpc.OperationType.send_message) 

315 return 

316 

317 def _done(): 

318 return ( 

319 state.code is not None 

320 or cygrpc.OperationType.send_message 

321 not in state.due 

322 ) 

323 

324 _common.wait( 

325 state.condition.wait, 

326 _done, 

327 spin_cb=functools.partial( 

328 cygrpc.block_if_fork_in_progress, state 

329 ), 

330 ) 

331 if state.code is not None: 

332 return 

333 else: 

334 return 

335 with state.condition: 

336 if state.code is None: 

337 state.due.add(cygrpc.OperationType.send_close_from_client) 

338 operations = ( 

339 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

340 ) 

341 operating = call.operate(operations, event_handler) 

342 if not operating: 

343 state.due.remove( 

344 cygrpc.OperationType.send_close_from_client 

345 ) 

346 

347 consumption_thread = cygrpc.ForkManagedThread( 

348 target=consume_request_iterator 

349 ) 

350 consumption_thread.setDaemon(True) 

351 consumption_thread.start() 

352 

353 

354def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str: 

355 """Calculates error string for RPC.""" 

356 with rpc_state.condition: 

357 if rpc_state.code is None: 

358 return "<{} object>".format(class_name) 

359 elif rpc_state.code is grpc.StatusCode.OK: 

360 return _OK_RENDEZVOUS_REPR_FORMAT.format( 

361 class_name, rpc_state.code, rpc_state.details 

362 ) 

363 else: 

364 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( 

365 class_name, 

366 rpc_state.code, 

367 rpc_state.details, 

368 rpc_state.debug_error_string, 

369 ) 

370 

371 

372class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future): 

373 """An RPC error not tied to the execution of a particular RPC. 

374 

375 The RPC represented by the state object must not be in-progress or 

376 cancelled. 

377 

378 Attributes: 

379 _state: An instance of _RPCState. 

380 """ 

381 

382 _state: _RPCState 

383 

384 def __init__(self, state: _RPCState): 

385 with state.condition: 

386 self._state = _RPCState( 

387 (), 

388 copy.deepcopy(state.initial_metadata), 

389 copy.deepcopy(state.trailing_metadata), 

390 state.code, 

391 copy.deepcopy(state.details), 

392 ) 

393 self._state.response = copy.copy(state.response) 

394 self._state.debug_error_string = copy.copy(state.debug_error_string) 

395 

396 def initial_metadata(self) -> Optional[MetadataType]: 

397 return self._state.initial_metadata 

398 

399 def trailing_metadata(self) -> Optional[MetadataType]: 

400 return self._state.trailing_metadata 

401 

402 def code(self) -> Optional[grpc.StatusCode]: 

403 return self._state.code 

404 

405 def details(self) -> Optional[str]: 

406 return _common.decode(self._state.details) 

407 

408 def debug_error_string(self) -> Optional[str]: 

409 return _common.decode(self._state.debug_error_string) 

410 

411 def _repr(self) -> str: 

412 return _rpc_state_string(self.__class__.__name__, self._state) 

413 

414 def __repr__(self) -> str: 

415 return self._repr() 

416 

417 def __str__(self) -> str: 

418 return self._repr() 

419 

420 def cancel(self) -> bool: 

421 """See grpc.Future.cancel.""" 

422 return False 

423 

424 def cancelled(self) -> bool: 

425 """See grpc.Future.cancelled.""" 

426 return False 

427 

428 def running(self) -> bool: 

429 """See grpc.Future.running.""" 

430 return False 

431 

432 def done(self) -> bool: 

433 """See grpc.Future.done.""" 

434 return True 

435 

436 def result( 

437 self, timeout: Optional[float] = None 

438 ) -> Any: # pylint: disable=unused-argument 

439 """See grpc.Future.result.""" 

440 raise self 

441 

442 def exception( 

443 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

444 ) -> Optional[Exception]: 

445 """See grpc.Future.exception.""" 

446 return self 

447 

448 def traceback( 

449 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

450 ) -> Optional[types.TracebackType]: 

451 """See grpc.Future.traceback.""" 

452 try: 

453 raise self 

454 except grpc.RpcError: 

455 return sys.exc_info()[2] 

456 

457 def add_done_callback( 

458 self, 

459 fn: Callable[[grpc.Future], None], 

460 timeout: Optional[float] = None, # pylint: disable=unused-argument 

461 ) -> None: 

462 """See grpc.Future.add_done_callback.""" 

463 fn(self) 

464 

465 

466class _Rendezvous(grpc.RpcError, grpc.RpcContext): 

467 """An RPC iterator. 

468 

469 Attributes: 

470 _state: An instance of _RPCState. 

471 _call: An instance of SegregatedCall or IntegratedCall. 

472 In either case, the _call object is expected to have operate, cancel, 

473 and next_event methods. 

474 _response_deserializer: A callable taking bytes and return a Python 

475 object. 

476 _deadline: A float representing the deadline of the RPC in seconds. Or 

477 possibly None, to represent an RPC with no deadline at all. 

478 """ 

479 

480 _state: _RPCState 

481 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall] 

482 _response_deserializer: Optional[DeserializingFunction] 

483 _deadline: Optional[float] 

484 

485 def __init__( 

486 self, 

487 state: _RPCState, 

488 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall], 

489 response_deserializer: Optional[DeserializingFunction], 

490 deadline: Optional[float], 

491 ): 

492 super(_Rendezvous, self).__init__() 

493 self._state = state 

494 self._call = call 

495 self._response_deserializer = response_deserializer 

496 self._deadline = deadline 

497 

498 def is_active(self) -> bool: 

499 """See grpc.RpcContext.is_active""" 

500 with self._state.condition: 

501 return self._state.code is None 

502 

503 def time_remaining(self) -> Optional[float]: 

504 """See grpc.RpcContext.time_remaining""" 

505 with self._state.condition: 

506 if self._deadline is None: 

507 return None 

508 else: 

509 return max(self._deadline - time.time(), 0) 

510 

511 def cancel(self) -> bool: 

512 """See grpc.RpcContext.cancel""" 

513 with self._state.condition: 

514 if self._state.code is None: 

515 code = grpc.StatusCode.CANCELLED 

516 details = "Locally cancelled by application!" 

517 self._call.cancel( 

518 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

519 ) 

520 self._state.cancelled = True 

521 _abort(self._state, code, details) 

522 self._state.condition.notify_all() 

523 return True 

524 else: 

525 return False 

526 

527 def add_callback(self, callback: NullaryCallbackType) -> bool: 

528 """See grpc.RpcContext.add_callback""" 

529 with self._state.condition: 

530 if self._state.callbacks is None: 

531 return False 

532 else: 

533 self._state.callbacks.append(callback) 

534 return True 

535 

536 def __iter__(self): 

537 return self 

538 

539 def next(self): 

540 return self._next() 

541 

542 def __next__(self): 

543 return self._next() 

544 

545 def _next(self): 

546 raise NotImplementedError() 

547 

548 def debug_error_string(self) -> Optional[str]: 

549 raise NotImplementedError() 

550 

551 def _repr(self) -> str: 

552 return _rpc_state_string(self.__class__.__name__, self._state) 

553 

554 def __repr__(self) -> str: 

555 return self._repr() 

556 

557 def __str__(self) -> str: 

558 return self._repr() 

559 

560 def __del__(self) -> None: 

561 with self._state.condition: 

562 if self._state.code is None: 

563 self._state.code = grpc.StatusCode.CANCELLED 

564 self._state.details = "Cancelled upon garbage collection!" 

565 self._state.cancelled = True 

566 self._call.cancel( 

567 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], 

568 self._state.details, 

569 ) 

570 self._state.condition.notify_all() 

571 

572 

573class _SingleThreadedRendezvous( 

574 _Rendezvous, grpc.Call, grpc.Future 

575): # pylint: disable=too-many-ancestors 

576 """An RPC iterator operating entirely on a single thread. 

577 

578 The __next__ method of _SingleThreadedRendezvous does not depend on the 

579 existence of any other thread, including the "channel spin thread". 

580 However, this means that its interface is entirely synchronous. So this 

581 class cannot completely fulfill the grpc.Future interface. The result, 

582 exception, and traceback methods will never block and will instead raise 

583 an exception if calling the method would result in blocking. 

584 

585 This means that these methods are safe to call from add_done_callback 

586 handlers. 

587 """ 

588 

589 _state: _RPCState 

590 

591 def _is_complete(self) -> bool: 

592 return self._state.code is not None 

593 

594 def cancelled(self) -> bool: 

595 with self._state.condition: 

596 return self._state.cancelled 

597 

598 def running(self) -> bool: 

599 with self._state.condition: 

600 return self._state.code is None 

601 

602 def done(self) -> bool: 

603 with self._state.condition: 

604 return self._state.code is not None 

605 

606 def result(self, timeout: Optional[float] = None) -> Any: 

607 """Returns the result of the computation or raises its exception. 

608 

609 This method will never block. Instead, it will raise an exception 

610 if calling this method would otherwise result in blocking. 

611 

612 Since this method will never block, any `timeout` argument passed will 

613 be ignored. 

614 """ 

615 del timeout 

616 with self._state.condition: 

617 if not self._is_complete(): 

618 raise grpc.experimental.UsageError( 

619 "_SingleThreadedRendezvous only supports result() when the" 

620 " RPC is complete." 

621 ) 

622 if self._state.code is grpc.StatusCode.OK: 

623 return self._state.response 

624 elif self._state.cancelled: 

625 raise grpc.FutureCancelledError() 

626 else: 

627 raise self 

628 

629 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

630 """Return the exception raised by the computation. 

631 

632 This method will never block. Instead, it will raise an exception 

633 if calling this method would otherwise result in blocking. 

634 

635 Since this method will never block, any `timeout` argument passed will 

636 be ignored. 

637 """ 

638 del timeout 

639 with self._state.condition: 

640 if not self._is_complete(): 

641 raise grpc.experimental.UsageError( 

642 "_SingleThreadedRendezvous only supports exception() when" 

643 " the RPC is complete." 

644 ) 

645 if self._state.code is grpc.StatusCode.OK: 

646 return None 

647 elif self._state.cancelled: 

648 raise grpc.FutureCancelledError() 

649 else: 

650 return self 

651 

652 def traceback( 

653 self, timeout: Optional[float] = None 

654 ) -> Optional[types.TracebackType]: 

655 """Access the traceback of the exception raised by the computation. 

656 

657 This method will never block. Instead, it will raise an exception 

658 if calling this method would otherwise result in blocking. 

659 

660 Since this method will never block, any `timeout` argument passed will 

661 be ignored. 

662 """ 

663 del timeout 

664 with self._state.condition: 

665 if not self._is_complete(): 

666 raise grpc.experimental.UsageError( 

667 "_SingleThreadedRendezvous only supports traceback() when" 

668 " the RPC is complete." 

669 ) 

670 if self._state.code is grpc.StatusCode.OK: 

671 return None 

672 elif self._state.cancelled: 

673 raise grpc.FutureCancelledError() 

674 else: 

675 try: 

676 raise self 

677 except grpc.RpcError: 

678 return sys.exc_info()[2] 

679 

680 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

681 with self._state.condition: 

682 if self._state.code is None: 

683 self._state.callbacks.append(functools.partial(fn, self)) 

684 return 

685 

686 fn(self) 

687 

688 def initial_metadata(self) -> Optional[MetadataType]: 

689 """See grpc.Call.initial_metadata""" 

690 with self._state.condition: 

691 # NOTE(gnossen): Based on our initial call batch, we are guaranteed 

692 # to receive initial metadata before any messages. 

693 while self._state.initial_metadata is None: 

694 self._consume_next_event() 

695 return self._state.initial_metadata 

696 

697 def trailing_metadata(self) -> Optional[MetadataType]: 

698 """See grpc.Call.trailing_metadata""" 

699 with self._state.condition: 

700 if self._state.trailing_metadata is None: 

701 raise grpc.experimental.UsageError( 

702 "Cannot get trailing metadata until RPC is completed." 

703 ) 

704 return self._state.trailing_metadata 

705 

706 def code(self) -> Optional[grpc.StatusCode]: 

707 """See grpc.Call.code""" 

708 with self._state.condition: 

709 if self._state.code is None: 

710 raise grpc.experimental.UsageError( 

711 "Cannot get code until RPC is completed." 

712 ) 

713 return self._state.code 

714 

715 def details(self) -> Optional[str]: 

716 """See grpc.Call.details""" 

717 with self._state.condition: 

718 if self._state.details is None: 

719 raise grpc.experimental.UsageError( 

720 "Cannot get details until RPC is completed." 

721 ) 

722 return _common.decode(self._state.details) 

723 

724 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]: 

725 event = self._call.next_event() 

726 with self._state.condition: 

727 callbacks = _handle_event( 

728 event, self._state, self._response_deserializer 

729 ) 

730 for callback in callbacks: 

731 # NOTE(gnossen): We intentionally allow exceptions to bubble up 

732 # to the user when running on a single thread. 

733 callback() 

734 return event 

735 

736 def _next_response(self) -> Any: 

737 while True: 

738 self._consume_next_event() 

739 with self._state.condition: 

740 if self._state.response is not None: 

741 response = self._state.response 

742 self._state.response = None 

743 return response 

744 elif ( 

745 cygrpc.OperationType.receive_message not in self._state.due 

746 ): 

747 if self._state.code is grpc.StatusCode.OK: 

748 raise StopIteration() 

749 elif self._state.code is not None: 

750 raise self 

751 

752 def _next(self) -> Any: 

753 with self._state.condition: 

754 if self._state.code is None: 

755 # We tentatively add the operation as expected and remove 

756 # it if the enqueue operation fails. This allows us to guarantee that 

757 # if an event has been submitted to the core completion queue, 

758 # it is in `due`. If we waited until after a successful 

759 # enqueue operation then a signal could interrupt this 

760 # thread between the enqueue operation and the addition of the 

761 # operation to `due`. This would cause an exception on the 

762 # channel spin thread when the operation completes and no 

763 # corresponding operation would be present in state.due. 

764 # Note that, since `condition` is held through this block, there is 

765 # no data race on `due`. 

766 self._state.due.add(cygrpc.OperationType.receive_message) 

767 operating = self._call.operate( 

768 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None 

769 ) 

770 if not operating: 

771 self._state.due.remove(cygrpc.OperationType.receive_message) 

772 elif self._state.code is grpc.StatusCode.OK: 

773 raise StopIteration() 

774 else: 

775 raise self 

776 return self._next_response() 

777 

778 def debug_error_string(self) -> Optional[str]: 

779 with self._state.condition: 

780 if self._state.debug_error_string is None: 

781 raise grpc.experimental.UsageError( 

782 "Cannot get debug error string until RPC is completed." 

783 ) 

784 return _common.decode(self._state.debug_error_string) 

785 

786 

787class _MultiThreadedRendezvous( 

788 _Rendezvous, grpc.Call, grpc.Future 

789): # pylint: disable=too-many-ancestors 

790 """An RPC iterator that depends on a channel spin thread. 

791 

792 This iterator relies upon a per-channel thread running in the background, 

793 dequeueing events from the completion queue, and notifying threads waiting 

794 on the threading.Condition object in the _RPCState object. 

795 

796 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface 

797 and to mediate a bidirection streaming RPC. 

798 """ 

799 

800 _state: _RPCState 

801 

802 def initial_metadata(self) -> Optional[MetadataType]: 

803 """See grpc.Call.initial_metadata""" 

804 with self._state.condition: 

805 

806 def _done(): 

807 return self._state.initial_metadata is not None 

808 

809 _common.wait(self._state.condition.wait, _done) 

810 return self._state.initial_metadata 

811 

812 def trailing_metadata(self) -> Optional[MetadataType]: 

813 """See grpc.Call.trailing_metadata""" 

814 with self._state.condition: 

815 

816 def _done(): 

817 return self._state.trailing_metadata is not None 

818 

819 _common.wait(self._state.condition.wait, _done) 

820 return self._state.trailing_metadata 

821 

822 def code(self) -> Optional[grpc.StatusCode]: 

823 """See grpc.Call.code""" 

824 with self._state.condition: 

825 

826 def _done(): 

827 return self._state.code is not None 

828 

829 _common.wait(self._state.condition.wait, _done) 

830 return self._state.code 

831 

832 def details(self) -> Optional[str]: 

833 """See grpc.Call.details""" 

834 with self._state.condition: 

835 

836 def _done(): 

837 return self._state.details is not None 

838 

839 _common.wait(self._state.condition.wait, _done) 

840 return _common.decode(self._state.details) 

841 

842 def debug_error_string(self) -> Optional[str]: 

843 with self._state.condition: 

844 

845 def _done(): 

846 return self._state.debug_error_string is not None 

847 

848 _common.wait(self._state.condition.wait, _done) 

849 return _common.decode(self._state.debug_error_string) 

850 

851 def cancelled(self) -> bool: 

852 with self._state.condition: 

853 return self._state.cancelled 

854 

855 def running(self) -> bool: 

856 with self._state.condition: 

857 return self._state.code is None 

858 

859 def done(self) -> bool: 

860 with self._state.condition: 

861 return self._state.code is not None 

862 

863 def _is_complete(self) -> bool: 

864 return self._state.code is not None 

865 

866 def result(self, timeout: Optional[float] = None) -> Any: 

867 """Returns the result of the computation or raises its exception. 

868 

869 See grpc.Future.result for the full API contract. 

870 """ 

871 with self._state.condition: 

872 timed_out = _common.wait( 

873 self._state.condition.wait, self._is_complete, timeout=timeout 

874 ) 

875 if timed_out: 

876 raise grpc.FutureTimeoutError() 

877 else: 

878 if self._state.code is grpc.StatusCode.OK: 

879 return self._state.response 

880 elif self._state.cancelled: 

881 raise grpc.FutureCancelledError() 

882 else: 

883 raise self 

884 

885 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

886 """Return the exception raised by the computation. 

887 

888 See grpc.Future.exception for the full API contract. 

889 """ 

890 with self._state.condition: 

891 timed_out = _common.wait( 

892 self._state.condition.wait, self._is_complete, timeout=timeout 

893 ) 

894 if timed_out: 

895 raise grpc.FutureTimeoutError() 

896 else: 

897 if self._state.code is grpc.StatusCode.OK: 

898 return None 

899 elif self._state.cancelled: 

900 raise grpc.FutureCancelledError() 

901 else: 

902 return self 

903 

904 def traceback( 

905 self, timeout: Optional[float] = None 

906 ) -> Optional[types.TracebackType]: 

907 """Access the traceback of the exception raised by the computation. 

908 

909 See grpc.future.traceback for the full API contract. 

910 """ 

911 with self._state.condition: 

912 timed_out = _common.wait( 

913 self._state.condition.wait, self._is_complete, timeout=timeout 

914 ) 

915 if timed_out: 

916 raise grpc.FutureTimeoutError() 

917 else: 

918 if self._state.code is grpc.StatusCode.OK: 

919 return None 

920 elif self._state.cancelled: 

921 raise grpc.FutureCancelledError() 

922 else: 

923 try: 

924 raise self 

925 except grpc.RpcError: 

926 return sys.exc_info()[2] 

927 

928 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

929 with self._state.condition: 

930 if self._state.code is None: 

931 self._state.callbacks.append(functools.partial(fn, self)) 

932 return 

933 

934 fn(self) 

935 

936 def _next(self) -> Any: 

937 with self._state.condition: 

938 if self._state.code is None: 

939 event_handler = _event_handler( 

940 self._state, self._response_deserializer 

941 ) 

942 self._state.due.add(cygrpc.OperationType.receive_message) 

943 operating = self._call.operate( 

944 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 

945 event_handler, 

946 ) 

947 if not operating: 

948 self._state.due.remove(cygrpc.OperationType.receive_message) 

949 elif self._state.code is grpc.StatusCode.OK: 

950 raise StopIteration() 

951 else: 

952 raise self 

953 

954 def _response_ready(): 

955 return self._state.response is not None or ( 

956 cygrpc.OperationType.receive_message not in self._state.due 

957 and self._state.code is not None 

958 ) 

959 

960 _common.wait(self._state.condition.wait, _response_ready) 

961 if self._state.response is not None: 

962 response = self._state.response 

963 self._state.response = None 

964 return response 

965 elif cygrpc.OperationType.receive_message not in self._state.due: 

966 if self._state.code is grpc.StatusCode.OK: 

967 raise StopIteration() 

968 elif self._state.code is not None: 

969 raise self 

970 

971 

972def _start_unary_request( 

973 request: Any, 

974 timeout: Optional[float], 

975 request_serializer: SerializingFunction, 

976) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]: 

977 deadline = _deadline(timeout) 

978 serialized_request = _common.serialize(request, request_serializer) 

979 if serialized_request is None: 

980 state = _RPCState( 

981 (), 

982 (), 

983 (), 

984 grpc.StatusCode.INTERNAL, 

985 "Exception serializing request!", 

986 ) 

987 error = _InactiveRpcError(state) 

988 return deadline, None, error 

989 else: 

990 return deadline, serialized_request, None 

991 

992 

993def _end_unary_response_blocking( 

994 state: _RPCState, 

995 call: cygrpc.SegregatedCall, 

996 with_call: bool, 

997 deadline: Optional[float], 

998) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]: 

999 if state.code is grpc.StatusCode.OK: 

1000 if with_call: 

1001 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline) 

1002 return state.response, rendezvous 

1003 else: 

1004 return state.response 

1005 else: 

1006 raise _InactiveRpcError(state) # pytype: disable=not-instantiable 

1007 

1008 

1009def _stream_unary_invocation_operations( 

1010 metadata: Optional[MetadataType], initial_metadata_flags: int 

1011) -> Sequence[Sequence[cygrpc.Operation]]: 

1012 return ( 

1013 ( 

1014 cygrpc.SendInitialMetadataOperation( 

1015 metadata, initial_metadata_flags 

1016 ), 

1017 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1018 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1019 ), 

1020 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1021 ) 

1022 

1023 

1024def _stream_unary_invocation_operations_and_tags( 

1025 metadata: Optional[MetadataType], initial_metadata_flags: int 

1026) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]: 

1027 return tuple( 

1028 ( 

1029 operations, 

1030 None, 

1031 ) 

1032 for operations in _stream_unary_invocation_operations( 

1033 metadata, initial_metadata_flags 

1034 ) 

1035 ) 

1036 

1037 

1038def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]: 

1039 parent_deadline = cygrpc.get_deadline_from_context() 

1040 if parent_deadline is None and user_deadline is None: 

1041 return None 

1042 elif parent_deadline is not None and user_deadline is None: 

1043 return parent_deadline 

1044 elif user_deadline is not None and parent_deadline is None: 

1045 return user_deadline 

1046 else: 

1047 return min(parent_deadline, user_deadline) 

1048 

1049 

1050class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): 

1051 _channel: cygrpc.Channel 

1052 _managed_call: IntegratedCallFactory 

1053 _method: bytes 

1054 _target: bytes 

1055 _request_serializer: Optional[SerializingFunction] 

1056 _response_deserializer: Optional[DeserializingFunction] 

1057 _context: Any 

1058 _registered_call_handle: Optional[int] 

1059 

1060 __slots__ = [ 

1061 "_channel", 

1062 "_managed_call", 

1063 "_method", 

1064 "_target", 

1065 "_request_serializer", 

1066 "_response_deserializer", 

1067 "_context", 

1068 ] 

1069 

1070 # pylint: disable=too-many-arguments 

1071 def __init__( 

1072 self, 

1073 channel: cygrpc.Channel, 

1074 managed_call: IntegratedCallFactory, 

1075 method: bytes, 

1076 target: bytes, 

1077 request_serializer: Optional[SerializingFunction], 

1078 response_deserializer: Optional[DeserializingFunction], 

1079 _registered_call_handle: Optional[int], 

1080 ): 

1081 self._channel = channel 

1082 self._managed_call = managed_call 

1083 self._method = method 

1084 self._target = target 

1085 self._request_serializer = request_serializer 

1086 self._response_deserializer = response_deserializer 

1087 self._context = cygrpc.build_census_context() 

1088 self._registered_call_handle = _registered_call_handle 

1089 

1090 def _prepare( 

1091 self, 

1092 request: Any, 

1093 timeout: Optional[float], 

1094 metadata: Optional[MetadataType], 

1095 wait_for_ready: Optional[bool], 

1096 compression: Optional[grpc.Compression], 

1097 ) -> Tuple[ 

1098 Optional[_RPCState], 

1099 Optional[Sequence[cygrpc.Operation]], 

1100 Optional[float], 

1101 Optional[grpc.RpcError], 

1102 ]: 

1103 deadline, serialized_request, rendezvous = _start_unary_request( 

1104 request, timeout, self._request_serializer 

1105 ) 

1106 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1107 wait_for_ready 

1108 ) 

1109 augmented_metadata = _compression.augment_metadata( 

1110 metadata, compression 

1111 ) 

1112 if serialized_request is None: 

1113 return None, None, None, rendezvous 

1114 else: 

1115 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) 

1116 operations = ( 

1117 cygrpc.SendInitialMetadataOperation( 

1118 augmented_metadata, initial_metadata_flags 

1119 ), 

1120 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1121 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1122 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), 

1123 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1124 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1125 ) 

1126 return state, operations, deadline, None 

1127 

1128 def _blocking( 

1129 self, 

1130 request: Any, 

1131 timeout: Optional[float] = None, 

1132 metadata: Optional[MetadataType] = None, 

1133 credentials: Optional[grpc.CallCredentials] = None, 

1134 wait_for_ready: Optional[bool] = None, 

1135 compression: Optional[grpc.Compression] = None, 

1136 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1137 state, operations, deadline, rendezvous = self._prepare( 

1138 request, timeout, metadata, wait_for_ready, compression 

1139 ) 

1140 if state is None: 

1141 raise rendezvous # pylint: disable-msg=raising-bad-type 

1142 else: 

1143 state.rpc_start_time = time.perf_counter() 

1144 state.method = _common.decode(self._method) 

1145 state.target = _common.decode(self._target) 

1146 call = self._channel.segregated_call( 

1147 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1148 self._method, 

1149 None, 

1150 _determine_deadline(deadline), 

1151 metadata, 

1152 None if credentials is None else credentials._credentials, 

1153 ( 

1154 ( 

1155 operations, 

1156 None, 

1157 ), 

1158 ), 

1159 self._context, 

1160 self._registered_call_handle, 

1161 ) 

1162 event = call.next_event() 

1163 _handle_event(event, state, self._response_deserializer) 

1164 return state, call 

1165 

1166 def __call__( 

1167 self, 

1168 request: Any, 

1169 timeout: Optional[float] = None, 

1170 metadata: Optional[MetadataType] = None, 

1171 credentials: Optional[grpc.CallCredentials] = None, 

1172 wait_for_ready: Optional[bool] = None, 

1173 compression: Optional[grpc.Compression] = None, 

1174 ) -> Any: 

1175 state, call = self._blocking( 

1176 request, timeout, metadata, credentials, wait_for_ready, compression 

1177 ) 

1178 return _end_unary_response_blocking(state, call, False, None) 

1179 

1180 def with_call( 

1181 self, 

1182 request: Any, 

1183 timeout: Optional[float] = None, 

1184 metadata: Optional[MetadataType] = None, 

1185 credentials: Optional[grpc.CallCredentials] = None, 

1186 wait_for_ready: Optional[bool] = None, 

1187 compression: Optional[grpc.Compression] = None, 

1188 ) -> Tuple[Any, grpc.Call]: 

1189 state, call = self._blocking( 

1190 request, timeout, metadata, credentials, wait_for_ready, compression 

1191 ) 

1192 return _end_unary_response_blocking(state, call, True, None) 

1193 

1194 def future( 

1195 self, 

1196 request: Any, 

1197 timeout: Optional[float] = None, 

1198 metadata: Optional[MetadataType] = None, 

1199 credentials: Optional[grpc.CallCredentials] = None, 

1200 wait_for_ready: Optional[bool] = None, 

1201 compression: Optional[grpc.Compression] = None, 

1202 ) -> _MultiThreadedRendezvous: 

1203 state, operations, deadline, rendezvous = self._prepare( 

1204 request, timeout, metadata, wait_for_ready, compression 

1205 ) 

1206 if state is None: 

1207 raise rendezvous # pylint: disable-msg=raising-bad-type 

1208 else: 

1209 event_handler = _event_handler(state, self._response_deserializer) 

1210 state.rpc_start_time = time.perf_counter() 

1211 state.method = _common.decode(self._method) 

1212 state.target = _common.decode(self._target) 

1213 call = self._managed_call( 

1214 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1215 self._method, 

1216 None, 

1217 deadline, 

1218 metadata, 

1219 None if credentials is None else credentials._credentials, 

1220 (operations,), 

1221 event_handler, 

1222 self._context, 

1223 self._registered_call_handle, 

1224 ) 

1225 return _MultiThreadedRendezvous( 

1226 state, call, self._response_deserializer, deadline 

1227 ) 

1228 

1229 

1230class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1231 _channel: cygrpc.Channel 

1232 _method: bytes 

1233 _target: bytes 

1234 _request_serializer: Optional[SerializingFunction] 

1235 _response_deserializer: Optional[DeserializingFunction] 

1236 _context: Any 

1237 _registered_call_handle: Optional[int] 

1238 

1239 __slots__ = [ 

1240 "_channel", 

1241 "_method", 

1242 "_target", 

1243 "_request_serializer", 

1244 "_response_deserializer", 

1245 "_context", 

1246 ] 

1247 

1248 # pylint: disable=too-many-arguments 

1249 def __init__( 

1250 self, 

1251 channel: cygrpc.Channel, 

1252 method: bytes, 

1253 target: bytes, 

1254 request_serializer: SerializingFunction, 

1255 response_deserializer: DeserializingFunction, 

1256 _registered_call_handle: Optional[int], 

1257 ): 

1258 self._channel = channel 

1259 self._method = method 

1260 self._target = target 

1261 self._request_serializer = request_serializer 

1262 self._response_deserializer = response_deserializer 

1263 self._context = cygrpc.build_census_context() 

1264 self._registered_call_handle = _registered_call_handle 

1265 

1266 def __call__( # pylint: disable=too-many-locals 

1267 self, 

1268 request: Any, 

1269 timeout: Optional[float] = None, 

1270 metadata: Optional[MetadataType] = None, 

1271 credentials: Optional[grpc.CallCredentials] = None, 

1272 wait_for_ready: Optional[bool] = None, 

1273 compression: Optional[grpc.Compression] = None, 

1274 ) -> _SingleThreadedRendezvous: 

1275 deadline = _deadline(timeout) 

1276 serialized_request = _common.serialize( 

1277 request, self._request_serializer 

1278 ) 

1279 if serialized_request is None: 

1280 state = _RPCState( 

1281 (), 

1282 (), 

1283 (), 

1284 grpc.StatusCode.INTERNAL, 

1285 "Exception serializing request!", 

1286 ) 

1287 raise _InactiveRpcError(state) 

1288 

1289 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1290 call_credentials = ( 

1291 None if credentials is None else credentials._credentials 

1292 ) 

1293 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1294 wait_for_ready 

1295 ) 

1296 augmented_metadata = _compression.augment_metadata( 

1297 metadata, compression 

1298 ) 

1299 operations = ( 

1300 ( 

1301 cygrpc.SendInitialMetadataOperation( 

1302 augmented_metadata, initial_metadata_flags 

1303 ), 

1304 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1305 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1306 ), 

1307 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),), 

1308 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1309 ) 

1310 operations_and_tags = tuple((ops, None) for ops in operations) 

1311 state.rpc_start_time = time.perf_counter() 

1312 state.method = _common.decode(self._method) 

1313 state.target = _common.decode(self._target) 

1314 call = self._channel.segregated_call( 

1315 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1316 self._method, 

1317 None, 

1318 _determine_deadline(deadline), 

1319 metadata, 

1320 call_credentials, 

1321 operations_and_tags, 

1322 self._context, 

1323 self._registered_call_handle, 

1324 ) 

1325 return _SingleThreadedRendezvous( 

1326 state, call, self._response_deserializer, deadline 

1327 ) 

1328 

1329 

1330class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1331 _channel: cygrpc.Channel 

1332 _managed_call: IntegratedCallFactory 

1333 _method: bytes 

1334 _target: bytes 

1335 _request_serializer: Optional[SerializingFunction] 

1336 _response_deserializer: Optional[DeserializingFunction] 

1337 _context: Any 

1338 _registered_call_handle: Optional[int] 

1339 

1340 __slots__ = [ 

1341 "_channel", 

1342 "_managed_call", 

1343 "_method", 

1344 "_target", 

1345 "_request_serializer", 

1346 "_response_deserializer", 

1347 "_context", 

1348 ] 

1349 

1350 # pylint: disable=too-many-arguments 

1351 def __init__( 

1352 self, 

1353 channel: cygrpc.Channel, 

1354 managed_call: IntegratedCallFactory, 

1355 method: bytes, 

1356 target: bytes, 

1357 request_serializer: SerializingFunction, 

1358 response_deserializer: DeserializingFunction, 

1359 _registered_call_handle: Optional[int], 

1360 ): 

1361 self._channel = channel 

1362 self._managed_call = managed_call 

1363 self._method = method 

1364 self._target = target 

1365 self._request_serializer = request_serializer 

1366 self._response_deserializer = response_deserializer 

1367 self._context = cygrpc.build_census_context() 

1368 self._registered_call_handle = _registered_call_handle 

1369 

1370 def __call__( # pylint: disable=too-many-locals 

1371 self, 

1372 request: Any, 

1373 timeout: Optional[float] = None, 

1374 metadata: Optional[MetadataType] = None, 

1375 credentials: Optional[grpc.CallCredentials] = None, 

1376 wait_for_ready: Optional[bool] = None, 

1377 compression: Optional[grpc.Compression] = None, 

1378 ) -> _MultiThreadedRendezvous: 

1379 deadline, serialized_request, rendezvous = _start_unary_request( 

1380 request, timeout, self._request_serializer 

1381 ) 

1382 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1383 wait_for_ready 

1384 ) 

1385 if serialized_request is None: 

1386 raise rendezvous # pylint: disable-msg=raising-bad-type 

1387 else: 

1388 augmented_metadata = _compression.augment_metadata( 

1389 metadata, compression 

1390 ) 

1391 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1392 operations = ( 

1393 ( 

1394 cygrpc.SendInitialMetadataOperation( 

1395 augmented_metadata, initial_metadata_flags 

1396 ), 

1397 cygrpc.SendMessageOperation( 

1398 serialized_request, _EMPTY_FLAGS 

1399 ), 

1400 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1401 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1402 ), 

1403 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1404 ) 

1405 state.rpc_start_time = time.perf_counter() 

1406 state.method = _common.decode(self._method) 

1407 state.target = _common.decode(self._target) 

1408 call = self._managed_call( 

1409 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1410 self._method, 

1411 None, 

1412 _determine_deadline(deadline), 

1413 metadata, 

1414 None if credentials is None else credentials._credentials, 

1415 operations, 

1416 _event_handler(state, self._response_deserializer), 

1417 self._context, 

1418 self._registered_call_handle, 

1419 ) 

1420 return _MultiThreadedRendezvous( 

1421 state, call, self._response_deserializer, deadline 

1422 ) 

1423 

1424 

1425class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): 

1426 _channel: cygrpc.Channel 

1427 _managed_call: IntegratedCallFactory 

1428 _method: bytes 

1429 _target: bytes 

1430 _request_serializer: Optional[SerializingFunction] 

1431 _response_deserializer: Optional[DeserializingFunction] 

1432 _context: Any 

1433 _registered_call_handle: Optional[int] 

1434 

1435 __slots__ = [ 

1436 "_channel", 

1437 "_managed_call", 

1438 "_method", 

1439 "_target", 

1440 "_request_serializer", 

1441 "_response_deserializer", 

1442 "_context", 

1443 ] 

1444 

1445 # pylint: disable=too-many-arguments 

1446 def __init__( 

1447 self, 

1448 channel: cygrpc.Channel, 

1449 managed_call: IntegratedCallFactory, 

1450 method: bytes, 

1451 target: bytes, 

1452 request_serializer: Optional[SerializingFunction], 

1453 response_deserializer: Optional[DeserializingFunction], 

1454 _registered_call_handle: Optional[int], 

1455 ): 

1456 self._channel = channel 

1457 self._managed_call = managed_call 

1458 self._method = method 

1459 self._target = target 

1460 self._request_serializer = request_serializer 

1461 self._response_deserializer = response_deserializer 

1462 self._context = cygrpc.build_census_context() 

1463 self._registered_call_handle = _registered_call_handle 

1464 

1465 def _blocking( 

1466 self, 

1467 request_iterator: Iterator, 

1468 timeout: Optional[float], 

1469 metadata: Optional[MetadataType], 

1470 credentials: Optional[grpc.CallCredentials], 

1471 wait_for_ready: Optional[bool], 

1472 compression: Optional[grpc.Compression], 

1473 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1474 deadline = _deadline(timeout) 

1475 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1476 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1477 wait_for_ready 

1478 ) 

1479 augmented_metadata = _compression.augment_metadata( 

1480 metadata, compression 

1481 ) 

1482 state.rpc_start_time = time.perf_counter() 

1483 state.method = _common.decode(self._method) 

1484 state.target = _common.decode(self._target) 

1485 call = self._channel.segregated_call( 

1486 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1487 self._method, 

1488 None, 

1489 _determine_deadline(deadline), 

1490 augmented_metadata, 

1491 None if credentials is None else credentials._credentials, 

1492 _stream_unary_invocation_operations_and_tags( 

1493 augmented_metadata, initial_metadata_flags 

1494 ), 

1495 self._context, 

1496 self._registered_call_handle, 

1497 ) 

1498 _consume_request_iterator( 

1499 request_iterator, state, call, self._request_serializer, None 

1500 ) 

1501 while True: 

1502 event = call.next_event() 

1503 with state.condition: 

1504 _handle_event(event, state, self._response_deserializer) 

1505 state.condition.notify_all() 

1506 if not state.due: 

1507 break 

1508 return state, call 

1509 

1510 def __call__( 

1511 self, 

1512 request_iterator: Iterator, 

1513 timeout: Optional[float] = None, 

1514 metadata: Optional[MetadataType] = None, 

1515 credentials: Optional[grpc.CallCredentials] = None, 

1516 wait_for_ready: Optional[bool] = None, 

1517 compression: Optional[grpc.Compression] = None, 

1518 ) -> Any: 

1519 state, call = self._blocking( 

1520 request_iterator, 

1521 timeout, 

1522 metadata, 

1523 credentials, 

1524 wait_for_ready, 

1525 compression, 

1526 ) 

1527 return _end_unary_response_blocking(state, call, False, None) 

1528 

1529 def with_call( 

1530 self, 

1531 request_iterator: Iterator, 

1532 timeout: Optional[float] = None, 

1533 metadata: Optional[MetadataType] = None, 

1534 credentials: Optional[grpc.CallCredentials] = None, 

1535 wait_for_ready: Optional[bool] = None, 

1536 compression: Optional[grpc.Compression] = None, 

1537 ) -> Tuple[Any, grpc.Call]: 

1538 state, call = self._blocking( 

1539 request_iterator, 

1540 timeout, 

1541 metadata, 

1542 credentials, 

1543 wait_for_ready, 

1544 compression, 

1545 ) 

1546 return _end_unary_response_blocking(state, call, True, None) 

1547 

1548 def future( 

1549 self, 

1550 request_iterator: Iterator, 

1551 timeout: Optional[float] = None, 

1552 metadata: Optional[MetadataType] = None, 

1553 credentials: Optional[grpc.CallCredentials] = None, 

1554 wait_for_ready: Optional[bool] = None, 

1555 compression: Optional[grpc.Compression] = None, 

1556 ) -> _MultiThreadedRendezvous: 

1557 deadline = _deadline(timeout) 

1558 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1559 event_handler = _event_handler(state, self._response_deserializer) 

1560 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1561 wait_for_ready 

1562 ) 

1563 augmented_metadata = _compression.augment_metadata( 

1564 metadata, compression 

1565 ) 

1566 state.rpc_start_time = time.perf_counter() 

1567 state.method = _common.decode(self._method) 

1568 state.target = _common.decode(self._target) 

1569 call = self._managed_call( 

1570 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1571 self._method, 

1572 None, 

1573 deadline, 

1574 augmented_metadata, 

1575 None if credentials is None else credentials._credentials, 

1576 _stream_unary_invocation_operations( 

1577 metadata, initial_metadata_flags 

1578 ), 

1579 event_handler, 

1580 self._context, 

1581 self._registered_call_handle, 

1582 ) 

1583 _consume_request_iterator( 

1584 request_iterator, 

1585 state, 

1586 call, 

1587 self._request_serializer, 

1588 event_handler, 

1589 ) 

1590 return _MultiThreadedRendezvous( 

1591 state, call, self._response_deserializer, deadline 

1592 ) 

1593 

1594 

1595class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): 

1596 _channel: cygrpc.Channel 

1597 _managed_call: IntegratedCallFactory 

1598 _method: bytes 

1599 _target: bytes 

1600 _request_serializer: Optional[SerializingFunction] 

1601 _response_deserializer: Optional[DeserializingFunction] 

1602 _context: Any 

1603 _registered_call_handle: Optional[int] 

1604 

1605 __slots__ = [ 

1606 "_channel", 

1607 "_managed_call", 

1608 "_method", 

1609 "_target", 

1610 "_request_serializer", 

1611 "_response_deserializer", 

1612 "_context", 

1613 ] 

1614 

1615 # pylint: disable=too-many-arguments 

1616 def __init__( 

1617 self, 

1618 channel: cygrpc.Channel, 

1619 managed_call: IntegratedCallFactory, 

1620 method: bytes, 

1621 target: bytes, 

1622 request_serializer: Optional[SerializingFunction], 

1623 response_deserializer: Optional[DeserializingFunction], 

1624 _registered_call_handle: Optional[int], 

1625 ): 

1626 self._channel = channel 

1627 self._managed_call = managed_call 

1628 self._method = method 

1629 self._target = target 

1630 self._request_serializer = request_serializer 

1631 self._response_deserializer = response_deserializer 

1632 self._context = cygrpc.build_census_context() 

1633 self._registered_call_handle = _registered_call_handle 

1634 

1635 def __call__( 

1636 self, 

1637 request_iterator: Iterator, 

1638 timeout: Optional[float] = None, 

1639 metadata: Optional[MetadataType] = None, 

1640 credentials: Optional[grpc.CallCredentials] = None, 

1641 wait_for_ready: Optional[bool] = None, 

1642 compression: Optional[grpc.Compression] = None, 

1643 ) -> _MultiThreadedRendezvous: 

1644 deadline = _deadline(timeout) 

1645 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) 

1646 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1647 wait_for_ready 

1648 ) 

1649 augmented_metadata = _compression.augment_metadata( 

1650 metadata, compression 

1651 ) 

1652 operations = ( 

1653 ( 

1654 cygrpc.SendInitialMetadataOperation( 

1655 augmented_metadata, initial_metadata_flags 

1656 ), 

1657 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1658 ), 

1659 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1660 ) 

1661 event_handler = _event_handler(state, self._response_deserializer) 

1662 state.rpc_start_time = time.perf_counter() 

1663 state.method = _common.decode(self._method) 

1664 state.target = _common.decode(self._target) 

1665 call = self._managed_call( 

1666 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1667 self._method, 

1668 None, 

1669 _determine_deadline(deadline), 

1670 augmented_metadata, 

1671 None if credentials is None else credentials._credentials, 

1672 operations, 

1673 event_handler, 

1674 self._context, 

1675 self._registered_call_handle, 

1676 ) 

1677 _consume_request_iterator( 

1678 request_iterator, 

1679 state, 

1680 call, 

1681 self._request_serializer, 

1682 event_handler, 

1683 ) 

1684 return _MultiThreadedRendezvous( 

1685 state, call, self._response_deserializer, deadline 

1686 ) 

1687 

1688 

1689class _InitialMetadataFlags(int): 

1690 """Stores immutable initial metadata flags""" 

1691 

1692 def __new__(cls, value: int = _EMPTY_FLAGS): 

1693 value &= cygrpc.InitialMetadataFlags.used_mask 

1694 return super(_InitialMetadataFlags, cls).__new__(cls, value) 

1695 

1696 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int: 

1697 if wait_for_ready is not None: 

1698 if wait_for_ready: 

1699 return self.__class__( 

1700 self 

1701 | cygrpc.InitialMetadataFlags.wait_for_ready 

1702 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1703 ) 

1704 elif not wait_for_ready: 

1705 return self.__class__( 

1706 self & ~cygrpc.InitialMetadataFlags.wait_for_ready 

1707 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1708 ) 

1709 return self 

1710 

1711 

1712class _ChannelCallState(object): 

1713 channel: cygrpc.Channel 

1714 managed_calls: int 

1715 threading: bool 

1716 

1717 def __init__(self, channel: cygrpc.Channel): 

1718 self.lock = threading.Lock() 

1719 self.channel = channel 

1720 self.managed_calls = 0 

1721 self.threading = False 

1722 

1723 def reset_postfork_child(self) -> None: 

1724 self.managed_calls = 0 

1725 

1726 def __del__(self): 

1727 try: 

1728 self.channel.close( 

1729 cygrpc.StatusCode.cancelled, "Channel deallocated!" 

1730 ) 

1731 except (TypeError, AttributeError): 

1732 pass 

1733 

1734 

1735def _run_channel_spin_thread(state: _ChannelCallState) -> None: 

1736 def channel_spin(): 

1737 while True: 

1738 cygrpc.block_if_fork_in_progress(state) 

1739 event = state.channel.next_call_event() 

1740 if event.completion_type == cygrpc.CompletionType.queue_timeout: 

1741 continue 

1742 call_completed = event.tag(event) 

1743 if call_completed: 

1744 with state.lock: 

1745 state.managed_calls -= 1 

1746 if state.managed_calls == 0: 

1747 return 

1748 

1749 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin) 

1750 channel_spin_thread.setDaemon(True) 

1751 channel_spin_thread.start() 

1752 

1753 

1754def _channel_managed_call_management(state: _ChannelCallState): 

1755 # pylint: disable=too-many-arguments 

1756 def create( 

1757 flags: int, 

1758 method: bytes, 

1759 host: Optional[str], 

1760 deadline: Optional[float], 

1761 metadata: Optional[MetadataType], 

1762 credentials: Optional[cygrpc.CallCredentials], 

1763 operations: Sequence[Sequence[cygrpc.Operation]], 

1764 event_handler: UserTag, 

1765 context: Any, 

1766 _registered_call_handle: Optional[int], 

1767 ) -> cygrpc.IntegratedCall: 

1768 """Creates a cygrpc.IntegratedCall. 

1769 

1770 Args: 

1771 flags: An integer bitfield of call flags. 

1772 method: The RPC method. 

1773 host: A host string for the created call. 

1774 deadline: A float to be the deadline of the created call or None if 

1775 the call is to have an infinite deadline. 

1776 metadata: The metadata for the call or None. 

1777 credentials: A cygrpc.CallCredentials or None. 

1778 operations: A sequence of sequences of cygrpc.Operations to be 

1779 started on the call. 

1780 event_handler: A behavior to call to handle the events resultant from 

1781 the operations on the call. 

1782 context: Context object for distributed tracing. 

1783 _registered_call_handle: An int representing the call handle of the 

1784 method, or None if the method is not registered. 

1785 Returns: 

1786 A cygrpc.IntegratedCall with which to conduct an RPC. 

1787 """ 

1788 operations_and_tags = tuple( 

1789 ( 

1790 operation, 

1791 event_handler, 

1792 ) 

1793 for operation in operations 

1794 ) 

1795 with state.lock: 

1796 call = state.channel.integrated_call( 

1797 flags, 

1798 method, 

1799 host, 

1800 deadline, 

1801 metadata, 

1802 credentials, 

1803 operations_and_tags, 

1804 context, 

1805 _registered_call_handle, 

1806 ) 

1807 if state.managed_calls == 0: 

1808 state.managed_calls = 1 

1809 _run_channel_spin_thread(state) 

1810 else: 

1811 state.managed_calls += 1 

1812 return call 

1813 

1814 return create 

1815 

1816 

1817class _ChannelConnectivityState(object): 

1818 lock: threading.RLock 

1819 channel: grpc.Channel 

1820 polling: bool 

1821 connectivity: grpc.ChannelConnectivity 

1822 try_to_connect: bool 

1823 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704 

1824 callbacks_and_connectivities: List[ 

1825 Sequence[ 

1826 Union[ 

1827 Callable[[grpc.ChannelConnectivity], None], 

1828 Optional[grpc.ChannelConnectivity], 

1829 ] 

1830 ] 

1831 ] 

1832 delivering: bool 

1833 

1834 def __init__(self, channel: grpc.Channel): 

1835 self.lock = threading.RLock() 

1836 self.channel = channel 

1837 self.polling = False 

1838 self.connectivity = None 

1839 self.try_to_connect = False 

1840 self.callbacks_and_connectivities = [] 

1841 self.delivering = False 

1842 

1843 def reset_postfork_child(self) -> None: 

1844 self.polling = False 

1845 self.connectivity = None 

1846 self.try_to_connect = False 

1847 self.callbacks_and_connectivities = [] 

1848 self.delivering = False 

1849 

1850 

1851def _deliveries( 

1852 state: _ChannelConnectivityState, 

1853) -> List[Callable[[grpc.ChannelConnectivity], None]]: 

1854 callbacks_needing_update = [] 

1855 for callback_and_connectivity in state.callbacks_and_connectivities: 

1856 callback, callback_connectivity = callback_and_connectivity 

1857 if callback_connectivity is not state.connectivity: 

1858 callbacks_needing_update.append(callback) 

1859 callback_and_connectivity[1] = state.connectivity 

1860 return callbacks_needing_update 

1861 

1862 

1863def _deliver( 

1864 state: _ChannelConnectivityState, 

1865 initial_connectivity: grpc.ChannelConnectivity, 

1866 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1867) -> None: 

1868 connectivity = initial_connectivity 

1869 callbacks = initial_callbacks 

1870 while True: 

1871 for callback in callbacks: 

1872 cygrpc.block_if_fork_in_progress(state) 

1873 try: 

1874 callback(connectivity) 

1875 except Exception: # pylint: disable=broad-except 

1876 _LOGGER.exception( 

1877 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE 

1878 ) 

1879 with state.lock: 

1880 callbacks = _deliveries(state) 

1881 if callbacks: 

1882 connectivity = state.connectivity 

1883 else: 

1884 state.delivering = False 

1885 return 

1886 

1887 

1888def _spawn_delivery( 

1889 state: _ChannelConnectivityState, 

1890 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1891) -> None: 

1892 delivering_thread = cygrpc.ForkManagedThread( 

1893 target=_deliver, 

1894 args=( 

1895 state, 

1896 state.connectivity, 

1897 callbacks, 

1898 ), 

1899 ) 

1900 delivering_thread.setDaemon(True) 

1901 delivering_thread.start() 

1902 state.delivering = True 

1903 

1904 

1905# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll. 

1906def _poll_connectivity( 

1907 state: _ChannelConnectivityState, 

1908 channel: grpc.Channel, 

1909 initial_try_to_connect: bool, 

1910) -> None: 

1911 try_to_connect = initial_try_to_connect 

1912 connectivity = channel.check_connectivity_state(try_to_connect) 

1913 with state.lock: 

1914 state.connectivity = ( 

1915 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1916 connectivity 

1917 ] 

1918 ) 

1919 callbacks = tuple( 

1920 callback for callback, _ in state.callbacks_and_connectivities 

1921 ) 

1922 for callback_and_connectivity in state.callbacks_and_connectivities: 

1923 callback_and_connectivity[1] = state.connectivity 

1924 if callbacks: 

1925 _spawn_delivery(state, callbacks) 

1926 while True: 

1927 event = channel.watch_connectivity_state( 

1928 connectivity, time.time() + 0.2 

1929 ) 

1930 cygrpc.block_if_fork_in_progress(state) 

1931 with state.lock: 

1932 if ( 

1933 not state.callbacks_and_connectivities 

1934 and not state.try_to_connect 

1935 ): 

1936 state.polling = False 

1937 state.connectivity = None 

1938 break 

1939 try_to_connect = state.try_to_connect 

1940 state.try_to_connect = False 

1941 if event.success or try_to_connect: 

1942 connectivity = channel.check_connectivity_state(try_to_connect) 

1943 with state.lock: 

1944 state.connectivity = ( 

1945 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1946 connectivity 

1947 ] 

1948 ) 

1949 if not state.delivering: 

1950 callbacks = _deliveries(state) 

1951 if callbacks: 

1952 _spawn_delivery(state, callbacks) 

1953 

1954 

1955def _subscribe( 

1956 state: _ChannelConnectivityState, 

1957 callback: Callable[[grpc.ChannelConnectivity], None], 

1958 try_to_connect: bool, 

1959) -> None: 

1960 with state.lock: 

1961 if not state.callbacks_and_connectivities and not state.polling: 

1962 polling_thread = cygrpc.ForkManagedThread( 

1963 target=_poll_connectivity, 

1964 args=(state, state.channel, bool(try_to_connect)), 

1965 ) 

1966 polling_thread.setDaemon(True) 

1967 polling_thread.start() 

1968 state.polling = True 

1969 state.callbacks_and_connectivities.append([callback, None]) 

1970 elif not state.delivering and state.connectivity is not None: 

1971 _spawn_delivery(state, (callback,)) 

1972 state.try_to_connect |= bool(try_to_connect) 

1973 state.callbacks_and_connectivities.append( 

1974 [callback, state.connectivity] 

1975 ) 

1976 else: 

1977 state.try_to_connect |= bool(try_to_connect) 

1978 state.callbacks_and_connectivities.append([callback, None]) 

1979 

1980 

1981def _unsubscribe( 

1982 state: _ChannelConnectivityState, 

1983 callback: Callable[[grpc.ChannelConnectivity], None], 

1984) -> None: 

1985 with state.lock: 

1986 for index, (subscribed_callback, unused_connectivity) in enumerate( 

1987 state.callbacks_and_connectivities 

1988 ): 

1989 if callback == subscribed_callback: 

1990 state.callbacks_and_connectivities.pop(index) 

1991 break 

1992 

1993 

1994def _augment_options( 

1995 base_options: Sequence[ChannelArgumentType], 

1996 compression: Optional[grpc.Compression], 

1997) -> Sequence[ChannelArgumentType]: 

1998 compression_option = _compression.create_channel_option(compression) 

1999 return ( 

2000 tuple(base_options) 

2001 + compression_option 

2002 + ( 

2003 ( 

2004 cygrpc.ChannelArgKey.primary_user_agent_string, 

2005 _USER_AGENT, 

2006 ), 

2007 ) 

2008 ) 

2009 

2010 

2011def _separate_channel_options( 

2012 options: Sequence[ChannelArgumentType], 

2013) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]: 

2014 """Separates core channel options from Python channel options.""" 

2015 core_options = [] 

2016 python_options = [] 

2017 for pair in options: 

2018 if ( 

2019 pair[0] 

2020 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

2021 ): 

2022 python_options.append(pair) 

2023 else: 

2024 core_options.append(pair) 

2025 return python_options, core_options 

2026 

2027 

2028class Channel(grpc.Channel): 

2029 """A cygrpc.Channel-backed implementation of grpc.Channel.""" 

2030 

2031 _single_threaded_unary_stream: bool 

2032 _channel: cygrpc.Channel 

2033 _call_state: _ChannelCallState 

2034 _connectivity_state: _ChannelConnectivityState 

2035 _target: str 

2036 _registered_call_handles: Dict[str, int] 

2037 

2038 def __init__( 

2039 self, 

2040 target: str, 

2041 options: Sequence[ChannelArgumentType], 

2042 credentials: Optional[grpc.ChannelCredentials], 

2043 compression: Optional[grpc.Compression], 

2044 ): 

2045 """Constructor. 

2046 

2047 Args: 

2048 target: The target to which to connect. 

2049 options: Configuration options for the channel. 

2050 credentials: A cygrpc.ChannelCredentials or None. 

2051 compression: An optional value indicating the compression method to be 

2052 used over the lifetime of the channel. 

2053 """ 

2054 python_options, core_options = _separate_channel_options(options) 

2055 self._single_threaded_unary_stream = ( 

2056 _DEFAULT_SINGLE_THREADED_UNARY_STREAM 

2057 ) 

2058 self._process_python_options(python_options) 

2059 self._channel = cygrpc.Channel( 

2060 _common.encode(target), 

2061 _augment_options(core_options, compression), 

2062 credentials, 

2063 ) 

2064 self._target = target 

2065 self._call_state = _ChannelCallState(self._channel) 

2066 self._connectivity_state = _ChannelConnectivityState(self._channel) 

2067 cygrpc.fork_register_channel(self) 

2068 if cygrpc.g_gevent_activated: 

2069 cygrpc.gevent_increment_channel_count() 

2070 

2071 def _get_registered_call_handle(self, method: str) -> int: 

2072 """ 

2073 Get the registered call handle for a method. 

2074 

2075 This is a semi-private method. It is intended for use only by gRPC generated code. 

2076 

2077 This method is not thread-safe. 

2078 

2079 Args: 

2080 method: Required, the method name for the RPC. 

2081 

2082 Returns: 

2083 The registered call handle pointer in the form of a Python Long. 

2084 """ 

2085 return self._channel.get_registered_call_handle(_common.encode(method)) 

2086 

2087 def _process_python_options( 

2088 self, python_options: Sequence[ChannelArgumentType] 

2089 ) -> None: 

2090 """Sets channel attributes according to python-only channel options.""" 

2091 for pair in python_options: 

2092 if ( 

2093 pair[0] 

2094 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

2095 ): 

2096 self._single_threaded_unary_stream = True 

2097 

2098 def subscribe( 

2099 self, 

2100 callback: Callable[[grpc.ChannelConnectivity], None], 

2101 try_to_connect: Optional[bool] = None, 

2102 ) -> None: 

2103 _subscribe(self._connectivity_state, callback, try_to_connect) 

2104 

2105 def unsubscribe( 

2106 self, callback: Callable[[grpc.ChannelConnectivity], None] 

2107 ) -> None: 

2108 _unsubscribe(self._connectivity_state, callback) 

2109 

2110 # pylint: disable=arguments-differ 

2111 def unary_unary( 

2112 self, 

2113 method: str, 

2114 request_serializer: Optional[SerializingFunction] = None, 

2115 response_deserializer: Optional[DeserializingFunction] = None, 

2116 _registered_method: Optional[bool] = False, 

2117 ) -> grpc.UnaryUnaryMultiCallable: 

2118 _registered_call_handle = None 

2119 if _registered_method: 

2120 _registered_call_handle = self._get_registered_call_handle(method) 

2121 return _UnaryUnaryMultiCallable( 

2122 self._channel, 

2123 _channel_managed_call_management(self._call_state), 

2124 _common.encode(method), 

2125 _common.encode(self._target), 

2126 request_serializer, 

2127 response_deserializer, 

2128 _registered_call_handle, 

2129 ) 

2130 

2131 # pylint: disable=arguments-differ 

2132 def unary_stream( 

2133 self, 

2134 method: str, 

2135 request_serializer: Optional[SerializingFunction] = None, 

2136 response_deserializer: Optional[DeserializingFunction] = None, 

2137 _registered_method: Optional[bool] = False, 

2138 ) -> grpc.UnaryStreamMultiCallable: 

2139 _registered_call_handle = None 

2140 if _registered_method: 

2141 _registered_call_handle = self._get_registered_call_handle(method) 

2142 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC 

2143 # on a single Python thread results in an appreciable speed-up. However, 

2144 # due to slight differences in capability, the multi-threaded variant 

2145 # remains the default. 

2146 if self._single_threaded_unary_stream: 

2147 return _SingleThreadedUnaryStreamMultiCallable( 

2148 self._channel, 

2149 _common.encode(method), 

2150 _common.encode(self._target), 

2151 request_serializer, 

2152 response_deserializer, 

2153 _registered_call_handle, 

2154 ) 

2155 else: 

2156 return _UnaryStreamMultiCallable( 

2157 self._channel, 

2158 _channel_managed_call_management(self._call_state), 

2159 _common.encode(method), 

2160 _common.encode(self._target), 

2161 request_serializer, 

2162 response_deserializer, 

2163 _registered_call_handle, 

2164 ) 

2165 

2166 # pylint: disable=arguments-differ 

2167 def stream_unary( 

2168 self, 

2169 method: str, 

2170 request_serializer: Optional[SerializingFunction] = None, 

2171 response_deserializer: Optional[DeserializingFunction] = None, 

2172 _registered_method: Optional[bool] = False, 

2173 ) -> grpc.StreamUnaryMultiCallable: 

2174 _registered_call_handle = None 

2175 if _registered_method: 

2176 _registered_call_handle = self._get_registered_call_handle(method) 

2177 return _StreamUnaryMultiCallable( 

2178 self._channel, 

2179 _channel_managed_call_management(self._call_state), 

2180 _common.encode(method), 

2181 _common.encode(self._target), 

2182 request_serializer, 

2183 response_deserializer, 

2184 _registered_call_handle, 

2185 ) 

2186 

2187 # pylint: disable=arguments-differ 

2188 def stream_stream( 

2189 self, 

2190 method: str, 

2191 request_serializer: Optional[SerializingFunction] = None, 

2192 response_deserializer: Optional[DeserializingFunction] = None, 

2193 _registered_method: Optional[bool] = False, 

2194 ) -> grpc.StreamStreamMultiCallable: 

2195 _registered_call_handle = None 

2196 if _registered_method: 

2197 _registered_call_handle = self._get_registered_call_handle(method) 

2198 return _StreamStreamMultiCallable( 

2199 self._channel, 

2200 _channel_managed_call_management(self._call_state), 

2201 _common.encode(method), 

2202 _common.encode(self._target), 

2203 request_serializer, 

2204 response_deserializer, 

2205 _registered_call_handle, 

2206 ) 

2207 

2208 def _unsubscribe_all(self) -> None: 

2209 state = self._connectivity_state 

2210 if state: 

2211 with state.lock: 

2212 del state.callbacks_and_connectivities[:] 

2213 

2214 def _close(self) -> None: 

2215 self._unsubscribe_all() 

2216 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!") 

2217 cygrpc.fork_unregister_channel(self) 

2218 if cygrpc.g_gevent_activated: 

2219 cygrpc.gevent_decrement_channel_count() 

2220 

2221 def _close_on_fork(self) -> None: 

2222 self._unsubscribe_all() 

2223 self._channel.close_on_fork( 

2224 cygrpc.StatusCode.cancelled, "Channel closed due to fork" 

2225 ) 

2226 

2227 def __enter__(self): 

2228 return self 

2229 

2230 def __exit__(self, exc_type, exc_val, exc_tb): 

2231 self._close() 

2232 return False 

2233 

2234 def close(self) -> None: 

2235 self._close() 

2236 

2237 def __del__(self): 

2238 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases 

2239 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call 

2240 # here (or more likely, call self._close() here). We don't do this today 

2241 # because many valid use cases today allow the channel to be deleted 

2242 # immediately after stubs are created. After a sufficient period of time 

2243 # has passed for all users to be trusted to freeze out to their channels 

2244 # for as long as they are in use and to close them after using them, 

2245 # then deletion of this grpc._channel.Channel instance can be made to 

2246 # effect closure of the underlying cygrpc.Channel instance. 

2247 try: 

2248 self._unsubscribe_all() 

2249 except: # pylint: disable=bare-except 

2250 # Exceptions in __del__ are ignored by Python anyway, but they can 

2251 # keep spamming logs. Just silence them. 

2252 pass