Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_channel.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

981 statements  

1# Copyright 2016 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Python.""" 

15 

16import copy 

17import functools 

18import logging 

19import os 

20import sys 

21import threading 

22import time 

23import types 

24from typing import ( 

25 Any, 

26 Callable, 

27 Dict, 

28 Iterator, 

29 List, 

30 Optional, 

31 Sequence, 

32 Set, 

33 Tuple, 

34 Union, 

35) 

36 

37import grpc # pytype: disable=pyi-error 

38from grpc import _common # pytype: disable=pyi-error 

39from grpc import _compression # pytype: disable=pyi-error 

40from grpc import _grpcio_metadata # pytype: disable=pyi-error 

41from grpc import _observability # pytype: disable=pyi-error 

42from grpc._cython import cygrpc 

43from grpc._typing import ChannelArgumentType 

44from grpc._typing import DeserializingFunction 

45from grpc._typing import IntegratedCallFactory 

46from grpc._typing import MetadataType 

47from grpc._typing import NullaryCallbackType 

48from grpc._typing import ResponseType 

49from grpc._typing import SerializingFunction 

50from grpc._typing import UserTag 

51import grpc.experimental # pytype: disable=pyi-error 

52 

53_LOGGER = logging.getLogger(__name__) 

54 

55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__) 

56 

57_EMPTY_FLAGS = 0 

58 

59# NOTE(rbellevi): No guarantees are given about the maintenance of this 

60# environment variable. 

61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = ( 

62 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None 

63) 

64 

65_UNARY_UNARY_INITIAL_DUE = ( 

66 cygrpc.OperationType.send_initial_metadata, 

67 cygrpc.OperationType.send_message, 

68 cygrpc.OperationType.send_close_from_client, 

69 cygrpc.OperationType.receive_initial_metadata, 

70 cygrpc.OperationType.receive_message, 

71 cygrpc.OperationType.receive_status_on_client, 

72) 

73_UNARY_STREAM_INITIAL_DUE = ( 

74 cygrpc.OperationType.send_initial_metadata, 

75 cygrpc.OperationType.send_message, 

76 cygrpc.OperationType.send_close_from_client, 

77 cygrpc.OperationType.receive_initial_metadata, 

78 cygrpc.OperationType.receive_status_on_client, 

79) 

80_STREAM_UNARY_INITIAL_DUE = ( 

81 cygrpc.OperationType.send_initial_metadata, 

82 cygrpc.OperationType.receive_initial_metadata, 

83 cygrpc.OperationType.receive_message, 

84 cygrpc.OperationType.receive_status_on_client, 

85) 

86_STREAM_STREAM_INITIAL_DUE = ( 

87 cygrpc.OperationType.send_initial_metadata, 

88 cygrpc.OperationType.receive_initial_metadata, 

89 cygrpc.OperationType.receive_status_on_client, 

90) 

91 

92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 

93 "Exception calling channel subscription callback!" 

94) 

95 

96_OK_RENDEZVOUS_REPR_FORMAT = ( 

97 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' 

98) 

99 

100_NON_OK_RENDEZVOUS_REPR_FORMAT = ( 

101 "<{} of RPC that terminated with:\n" 

102 "\tstatus = {}\n" 

103 '\tdetails = "{}"\n' 

104 '\tdebug_error_string = "{}"\n' 

105 ">" 

106) 

107 

108 

109def _deadline(timeout: Optional[float]) -> Optional[float]: 

110 return None if timeout is None else time.time() + timeout 

111 

112 

113def _unknown_code_details( 

114 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str] 

115) -> str: 

116 return 'Server sent unknown code {} and details "{}"'.format( 

117 unknown_cygrpc_code, details 

118 ) 

119 

120 

121class _RPCState(object): 

122 condition: threading.Condition 

123 due: Set[cygrpc.OperationType] 

124 initial_metadata: Optional[MetadataType] 

125 response: Any 

126 trailing_metadata: Optional[MetadataType] 

127 code: Optional[grpc.StatusCode] 

128 details: Optional[str] 

129 debug_error_string: Optional[str] 

130 cancelled: bool 

131 callbacks: List[NullaryCallbackType] 

132 fork_epoch: Optional[int] 

133 rpc_start_time: Optional[float] # In relative seconds 

134 rpc_end_time: Optional[float] # In relative seconds 

135 method: Optional[str] 

136 target: Optional[str] 

137 

138 def __init__( 

139 self, 

140 due: Sequence[cygrpc.OperationType], 

141 initial_metadata: Optional[MetadataType], 

142 trailing_metadata: Optional[MetadataType], 

143 code: Optional[grpc.StatusCode], 

144 details: Optional[str], 

145 ): 

146 # `condition` guards all members of _RPCState. `notify_all` is called on 

147 # `condition` when the state of the RPC has changed. 

148 self.condition = threading.Condition() 

149 

150 # The cygrpc.OperationType objects representing events due from the RPC's 

151 # completion queue. If an operation is in `due`, it is guaranteed that 

152 # `operate()` has been called on a corresponding operation. But the 

153 # converse is not true. That is, in the case of failed `operate()` 

154 # calls, there may briefly be events in `due` that do not correspond to 

155 # operations submitted to Core. 

156 self.due = set(due) 

157 self.initial_metadata = initial_metadata 

158 self.response = None 

159 self.trailing_metadata = trailing_metadata 

160 self.code = code 

161 self.details = details 

162 self.debug_error_string = None 

163 # The following three fields are used for observability. 

164 # Updates to those fields do not trigger self.condition. 

165 self.rpc_start_time = None 

166 self.rpc_end_time = None 

167 self.method = None 

168 self.target = None 

169 

170 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are 

171 # slightly wonky, so they have to be tracked separately from the rest of the 

172 # result of the RPC. This field tracks whether cancellation was requested 

173 # prior to termination of the RPC. 

174 self.cancelled = False 

175 self.callbacks = [] 

176 self.fork_epoch = cygrpc.get_fork_epoch() 

177 

178 def reset_postfork_child(self): 

179 self.condition = threading.Condition() 

180 

181 

182def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None: 

183 if state.code is None: 

184 state.code = code 

185 state.details = details 

186 if state.initial_metadata is None: 

187 state.initial_metadata = () 

188 state.trailing_metadata = () 

189 

190 

191def _handle_event( 

192 event: cygrpc.BaseEvent, 

193 state: _RPCState, 

194 response_deserializer: Optional[DeserializingFunction], 

195) -> List[NullaryCallbackType]: 

196 callbacks = [] 

197 for batch_operation in event.batch_operations: 

198 operation_type = batch_operation.type() 

199 state.due.remove(operation_type) 

200 if operation_type == cygrpc.OperationType.receive_initial_metadata: 

201 state.initial_metadata = batch_operation.initial_metadata() 

202 elif operation_type == cygrpc.OperationType.receive_message: 

203 serialized_response = batch_operation.message() 

204 if serialized_response is not None: 

205 response = _common.deserialize( 

206 serialized_response, response_deserializer 

207 ) 

208 if response is None: 

209 details = "Exception deserializing response!" 

210 _abort(state, grpc.StatusCode.INTERNAL, details) 

211 else: 

212 state.response = response 

213 elif operation_type == cygrpc.OperationType.receive_status_on_client: 

214 state.trailing_metadata = batch_operation.trailing_metadata() 

215 if state.code is None: 

216 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( 

217 batch_operation.code() 

218 ) 

219 if code is None: 

220 state.code = grpc.StatusCode.UNKNOWN 

221 state.details = _unknown_code_details( 

222 code, batch_operation.details() 

223 ) 

224 else: 

225 state.code = code 

226 state.details = batch_operation.details() 

227 state.debug_error_string = batch_operation.error_string() 

228 state.rpc_end_time = time.perf_counter() 

229 _observability.maybe_record_rpc_latency(state) 

230 callbacks.extend(state.callbacks) 

231 state.callbacks = None 

232 return callbacks 

233 

234 

235def _event_handler( 

236 state: _RPCState, response_deserializer: Optional[DeserializingFunction] 

237) -> UserTag: 

238 def handle_event(event): 

239 with state.condition: 

240 callbacks = _handle_event(event, state, response_deserializer) 

241 state.condition.notify_all() 

242 done = not state.due 

243 for callback in callbacks: 

244 try: 

245 callback() 

246 except Exception as e: # pylint: disable=broad-except 

247 # NOTE(rbellevi): We suppress but log errors here so as not to 

248 # kill the channel spin thread. 

249 _LOGGER.error( 

250 "Exception in callback %s: %s", repr(callback.func), repr(e) 

251 ) 

252 return done and state.fork_epoch >= cygrpc.get_fork_epoch() 

253 

254 return handle_event 

255 

256 

257# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall. 

258# pylint: disable=too-many-statements 

259def _consume_request_iterator( 

260 request_iterator: Iterator, 

261 state: _RPCState, 

262 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall], 

263 request_serializer: SerializingFunction, 

264 event_handler: Optional[UserTag], 

265) -> None: 

266 """Consume a request supplied by the user.""" 

267 

268 def consume_request_iterator(): # pylint: disable=too-many-branches 

269 # Iterate over the request iterator until it is exhausted or an error 

270 # condition is encountered. 

271 while True: 

272 return_from_user_request_generator_invoked = False 

273 try: 

274 # The thread may die in user-code. Do not block fork for this. 

275 cygrpc.enter_user_request_generator() 

276 request = next(request_iterator) 

277 except StopIteration: 

278 break 

279 except Exception: # pylint: disable=broad-except 

280 cygrpc.return_from_user_request_generator() 

281 return_from_user_request_generator_invoked = True 

282 code = grpc.StatusCode.UNKNOWN 

283 details = "Exception iterating requests!" 

284 _LOGGER.exception(details) 

285 call.cancel( 

286 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

287 ) 

288 _abort(state, code, details) 

289 return 

290 finally: 

291 if not return_from_user_request_generator_invoked: 

292 cygrpc.return_from_user_request_generator() 

293 serialized_request = _common.serialize(request, request_serializer) 

294 with state.condition: 

295 if state.code is None and not state.cancelled: 

296 if serialized_request is None: 

297 code = grpc.StatusCode.INTERNAL 

298 details = "Exception serializing request!" 

299 call.cancel( 

300 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 

301 details, 

302 ) 

303 _abort(state, code, details) 

304 return 

305 else: 

306 state.due.add(cygrpc.OperationType.send_message) 

307 operations = ( 

308 cygrpc.SendMessageOperation( 

309 serialized_request, _EMPTY_FLAGS 

310 ), 

311 ) 

312 operating = call.operate(operations, event_handler) 

313 if not operating: 

314 state.due.remove(cygrpc.OperationType.send_message) 

315 return 

316 

317 def _done(): 

318 return ( 

319 state.code is not None 

320 or cygrpc.OperationType.send_message 

321 not in state.due 

322 ) 

323 

324 _common.wait( 

325 state.condition.wait, 

326 _done, 

327 spin_cb=functools.partial( 

328 cygrpc.block_if_fork_in_progress, state 

329 ), 

330 ) 

331 if state.code is not None: 

332 return 

333 else: 

334 return 

335 with state.condition: 

336 if state.code is None: 

337 state.due.add(cygrpc.OperationType.send_close_from_client) 

338 operations = ( 

339 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

340 ) 

341 operating = call.operate(operations, event_handler) 

342 if not operating: 

343 state.due.remove( 

344 cygrpc.OperationType.send_close_from_client 

345 ) 

346 

347 consumption_thread = cygrpc.ForkManagedThread( 

348 target=consume_request_iterator 

349 ) 

350 consumption_thread.setDaemon(True) 

351 consumption_thread.start() 

352 

353 

354def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str: 

355 """Calculates error string for RPC.""" 

356 with rpc_state.condition: 

357 if rpc_state.code is None: 

358 return "<{} object>".format(class_name) 

359 elif rpc_state.code is grpc.StatusCode.OK: 

360 return _OK_RENDEZVOUS_REPR_FORMAT.format( 

361 class_name, rpc_state.code, rpc_state.details 

362 ) 

363 else: 

364 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( 

365 class_name, 

366 rpc_state.code, 

367 rpc_state.details, 

368 rpc_state.debug_error_string, 

369 ) 

370 

371 

372class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future): 

373 """An RPC error not tied to the execution of a particular RPC. 

374 

375 The RPC represented by the state object must not be in-progress or 

376 cancelled. 

377 

378 Attributes: 

379 _state: An instance of _RPCState. 

380 """ 

381 

382 _state: _RPCState 

383 

384 def __init__(self, state: _RPCState): 

385 with state.condition: 

386 self._state = _RPCState( 

387 (), 

388 copy.deepcopy(state.initial_metadata), 

389 copy.deepcopy(state.trailing_metadata), 

390 state.code, 

391 copy.deepcopy(state.details), 

392 ) 

393 self._state.response = copy.copy(state.response) 

394 self._state.debug_error_string = copy.copy(state.debug_error_string) 

395 

396 def initial_metadata(self) -> Optional[MetadataType]: 

397 return self._state.initial_metadata 

398 

399 def trailing_metadata(self) -> Optional[MetadataType]: 

400 return self._state.trailing_metadata 

401 

402 def code(self) -> Optional[grpc.StatusCode]: 

403 return self._state.code 

404 

405 def details(self) -> Optional[str]: 

406 return _common.decode(self._state.details) 

407 

408 def debug_error_string(self) -> Optional[str]: 

409 return _common.decode(self._state.debug_error_string) 

410 

411 def _repr(self) -> str: 

412 return _rpc_state_string(self.__class__.__name__, self._state) 

413 

414 def __repr__(self) -> str: 

415 return self._repr() 

416 

417 def __str__(self) -> str: 

418 return self._repr() 

419 

420 def cancel(self) -> bool: 

421 """See grpc.Future.cancel.""" 

422 return False 

423 

424 def cancelled(self) -> bool: 

425 """See grpc.Future.cancelled.""" 

426 return False 

427 

428 def running(self) -> bool: 

429 """See grpc.Future.running.""" 

430 return False 

431 

432 def done(self) -> bool: 

433 """See grpc.Future.done.""" 

434 return True 

435 

436 def result( 

437 self, timeout: Optional[float] = None 

438 ) -> Any: # pylint: disable=unused-argument 

439 """See grpc.Future.result.""" 

440 raise self 

441 

442 def exception( 

443 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

444 ) -> Optional[Exception]: 

445 """See grpc.Future.exception.""" 

446 return self 

447 

448 def traceback( 

449 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

450 ) -> Optional[types.TracebackType]: 

451 """See grpc.Future.traceback.""" 

452 try: 

453 raise self 

454 except grpc.RpcError: 

455 return sys.exc_info()[2] 

456 

457 def add_done_callback( 

458 self, 

459 fn: Callable[[grpc.Future], None], 

460 timeout: Optional[float] = None, # pylint: disable=unused-argument 

461 ) -> None: 

462 """See grpc.Future.add_done_callback.""" 

463 fn(self) 

464 

465 

466class _Rendezvous(grpc.RpcError, grpc.RpcContext): 

467 """An RPC iterator. 

468 

469 Attributes: 

470 _state: An instance of _RPCState. 

471 _call: An instance of SegregatedCall or IntegratedCall. 

472 In either case, the _call object is expected to have operate, cancel, 

473 and next_event methods. 

474 _response_deserializer: A callable taking bytes and return a Python 

475 object. 

476 _deadline: A float representing the deadline of the RPC in seconds. Or 

477 possibly None, to represent an RPC with no deadline at all. 

478 """ 

479 

480 _state: _RPCState 

481 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall] 

482 _response_deserializer: Optional[DeserializingFunction] 

483 _deadline: Optional[float] 

484 

485 def __init__( 

486 self, 

487 state: _RPCState, 

488 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall], 

489 response_deserializer: Optional[DeserializingFunction], 

490 deadline: Optional[float], 

491 ): 

492 super(_Rendezvous, self).__init__() 

493 self._state = state 

494 self._call = call 

495 self._response_deserializer = response_deserializer 

496 self._deadline = deadline 

497 

498 def is_active(self) -> bool: 

499 """See grpc.RpcContext.is_active""" 

500 with self._state.condition: 

501 return self._state.code is None 

502 

503 def time_remaining(self) -> Optional[float]: 

504 """See grpc.RpcContext.time_remaining""" 

505 with self._state.condition: 

506 if self._deadline is None: 

507 return None 

508 else: 

509 return max(self._deadline - time.time(), 0) 

510 

511 def cancel(self) -> bool: 

512 """See grpc.RpcContext.cancel""" 

513 with self._state.condition: 

514 if self._state.code is None: 

515 code = grpc.StatusCode.CANCELLED 

516 details = "Locally cancelled by application!" 

517 self._call.cancel( 

518 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

519 ) 

520 self._state.cancelled = True 

521 _abort(self._state, code, details) 

522 self._state.condition.notify_all() 

523 return True 

524 else: 

525 return False 

526 

527 def add_callback(self, callback: NullaryCallbackType) -> bool: 

528 """See grpc.RpcContext.add_callback""" 

529 with self._state.condition: 

530 if self._state.callbacks is None: 

531 return False 

532 else: 

533 self._state.callbacks.append(callback) 

534 return True 

535 

536 def __iter__(self): 

537 return self 

538 

539 def next(self): 

540 return self._next() 

541 

542 def __next__(self): 

543 return self._next() 

544 

545 def _next(self): 

546 raise NotImplementedError() 

547 

548 def debug_error_string(self) -> Optional[str]: 

549 raise NotImplementedError() 

550 

551 def _repr(self) -> str: 

552 return _rpc_state_string(self.__class__.__name__, self._state) 

553 

554 def __repr__(self) -> str: 

555 return self._repr() 

556 

557 def __str__(self) -> str: 

558 return self._repr() 

559 

560 def __del__(self) -> None: 

561 with self._state.condition: 

562 if self._state.code is None: 

563 self._state.code = grpc.StatusCode.CANCELLED 

564 self._state.details = "Cancelled upon garbage collection!" 

565 self._state.cancelled = True 

566 self._call.cancel( 

567 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], 

568 self._state.details, 

569 ) 

570 self._state.condition.notify_all() 

571 

572 

573class _SingleThreadedRendezvous( 

574 _Rendezvous, grpc.Call, grpc.Future 

575): # pylint: disable=too-many-ancestors 

576 """An RPC iterator operating entirely on a single thread. 

577 

578 The __next__ method of _SingleThreadedRendezvous does not depend on the 

579 existence of any other thread, including the "channel spin thread". 

580 However, this means that its interface is entirely synchronous. So this 

581 class cannot completely fulfill the grpc.Future interface. The result, 

582 exception, and traceback methods will never block and will instead raise 

583 an exception if calling the method would result in blocking. 

584 

585 This means that these methods are safe to call from add_done_callback 

586 handlers. 

587 """ 

588 

589 _state: _RPCState 

590 

591 def _is_complete(self) -> bool: 

592 return self._state.code is not None 

593 

594 def cancelled(self) -> bool: 

595 with self._state.condition: 

596 return self._state.cancelled 

597 

598 def running(self) -> bool: 

599 with self._state.condition: 

600 return self._state.code is None 

601 

602 def done(self) -> bool: 

603 with self._state.condition: 

604 return self._state.code is not None 

605 

606 def result(self, timeout: Optional[float] = None) -> Any: 

607 """Returns the result of the computation or raises its exception. 

608 

609 This method will never block. Instead, it will raise an exception 

610 if calling this method would otherwise result in blocking. 

611 

612 Since this method will never block, any `timeout` argument passed will 

613 be ignored. 

614 """ 

615 del timeout 

616 with self._state.condition: 

617 if not self._is_complete(): 

618 error_msg = ( 

619 "_SingleThreadedRendezvous only supports " 

620 "result() when the RPC is complete." 

621 ) 

622 raise grpc.experimental.UsageError(error_msg) 

623 if self._state.code is grpc.StatusCode.OK: 

624 return self._state.response 

625 elif self._state.cancelled: 

626 raise grpc.FutureCancelledError() 

627 else: 

628 raise self 

629 

630 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

631 """Return the exception raised by the computation. 

632 

633 This method will never block. Instead, it will raise an exception 

634 if calling this method would otherwise result in blocking. 

635 

636 Since this method will never block, any `timeout` argument passed will 

637 be ignored. 

638 """ 

639 del timeout 

640 with self._state.condition: 

641 if not self._is_complete(): 

642 error_msg = ( 

643 "_SingleThreadedRendezvous only supports " 

644 "exception() when the RPC is complete." 

645 ) 

646 raise grpc.experimental.UsageError(error_msg) 

647 if self._state.code is grpc.StatusCode.OK: 

648 return None 

649 elif self._state.cancelled: 

650 raise grpc.FutureCancelledError() 

651 else: 

652 return self 

653 

654 def traceback( 

655 self, timeout: Optional[float] = None 

656 ) -> Optional[types.TracebackType]: 

657 """Access the traceback of the exception raised by the computation. 

658 

659 This method will never block. Instead, it will raise an exception 

660 if calling this method would otherwise result in blocking. 

661 

662 Since this method will never block, any `timeout` argument passed will 

663 be ignored. 

664 """ 

665 del timeout 

666 with self._state.condition: 

667 if not self._is_complete(): 

668 msg = ( 

669 "_SingleThreadedRendezvous only supports " 

670 "traceback() when the RPC is complete." 

671 ) 

672 raise grpc.experimental.UsageError(msg) 

673 if self._state.code is grpc.StatusCode.OK: 

674 return None 

675 elif self._state.cancelled: 

676 raise grpc.FutureCancelledError() 

677 else: 

678 try: 

679 raise self 

680 except grpc.RpcError: 

681 return sys.exc_info()[2] 

682 

683 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

684 with self._state.condition: 

685 if self._state.code is None: 

686 self._state.callbacks.append(functools.partial(fn, self)) 

687 return 

688 

689 fn(self) 

690 

691 def initial_metadata(self) -> Optional[MetadataType]: 

692 """See grpc.Call.initial_metadata""" 

693 with self._state.condition: 

694 # NOTE(gnossen): Based on our initial call batch, we are guaranteed 

695 # to receive initial metadata before any messages. 

696 while self._state.initial_metadata is None: 

697 self._consume_next_event() 

698 return self._state.initial_metadata 

699 

700 def trailing_metadata(self) -> Optional[MetadataType]: 

701 """See grpc.Call.trailing_metadata""" 

702 with self._state.condition: 

703 if self._state.trailing_metadata is None: 

704 error_msg = ( 

705 "Cannot get trailing metadata until RPC is completed." 

706 ) 

707 raise grpc.experimental.UsageError(error_msg) 

708 return self._state.trailing_metadata 

709 

710 def code(self) -> Optional[grpc.StatusCode]: 

711 """See grpc.Call.code""" 

712 with self._state.condition: 

713 if self._state.code is None: 

714 error_msg = "Cannot get code until RPC is completed." 

715 raise grpc.experimental.UsageError(error_msg) 

716 return self._state.code 

717 

718 def details(self) -> Optional[str]: 

719 """See grpc.Call.details""" 

720 with self._state.condition: 

721 if self._state.details is None: 

722 error_msg = "Cannot get details until RPC is completed." 

723 raise grpc.experimental.UsageError(error_msg) 

724 return _common.decode(self._state.details) 

725 

726 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]: 

727 event = self._call.next_event() 

728 with self._state.condition: 

729 callbacks = _handle_event( 

730 event, self._state, self._response_deserializer 

731 ) 

732 for callback in callbacks: 

733 # NOTE(gnossen): We intentionally allow exceptions to bubble up 

734 # to the user when running on a single thread. 

735 callback() 

736 return event 

737 

738 def _next_response(self) -> Any: 

739 while True: 

740 self._consume_next_event() 

741 with self._state.condition: 

742 if self._state.response is not None: 

743 response = self._state.response 

744 self._state.response = None 

745 return response 

746 elif ( 

747 cygrpc.OperationType.receive_message not in self._state.due 

748 ): 

749 if self._state.code is grpc.StatusCode.OK: 

750 raise StopIteration() 

751 elif self._state.code is not None: 

752 raise self 

753 

754 def _next(self) -> Any: 

755 with self._state.condition: 

756 if self._state.code is None: 

757 # We tentatively add the operation as expected and remove 

758 # it if the enqueue operation fails. This allows us to guarantee that 

759 # if an event has been submitted to the core completion queue, 

760 # it is in `due`. If we waited until after a successful 

761 # enqueue operation then a signal could interrupt this 

762 # thread between the enqueue operation and the addition of the 

763 # operation to `due`. This would cause an exception on the 

764 # channel spin thread when the operation completes and no 

765 # corresponding operation would be present in state.due. 

766 # Note that, since `condition` is held through this block, there is 

767 # no data race on `due`. 

768 self._state.due.add(cygrpc.OperationType.receive_message) 

769 operating = self._call.operate( 

770 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None 

771 ) 

772 if not operating: 

773 self._state.due.remove(cygrpc.OperationType.receive_message) 

774 elif self._state.code is grpc.StatusCode.OK: 

775 raise StopIteration() 

776 else: 

777 raise self 

778 return self._next_response() 

779 

780 def debug_error_string(self) -> Optional[str]: 

781 with self._state.condition: 

782 if self._state.debug_error_string is None: 

783 error_msg = ( 

784 "Cannot get debug error string until RPC is completed." 

785 ) 

786 raise grpc.experimental.UsageError(error_msg) 

787 return _common.decode(self._state.debug_error_string) 

788 

789 

790class _MultiThreadedRendezvous( 

791 _Rendezvous, grpc.Call, grpc.Future 

792): # pylint: disable=too-many-ancestors 

793 """An RPC iterator that depends on a channel spin thread. 

794 

795 This iterator relies upon a per-channel thread running in the background, 

796 dequeueing events from the completion queue, and notifying threads waiting 

797 on the threading.Condition object in the _RPCState object. 

798 

799 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface 

800 and to mediate a bidirection streaming RPC. 

801 """ 

802 

803 _state: _RPCState 

804 

805 def initial_metadata(self) -> Optional[MetadataType]: 

806 """See grpc.Call.initial_metadata""" 

807 with self._state.condition: 

808 

809 def _done(): 

810 return self._state.initial_metadata is not None 

811 

812 _common.wait(self._state.condition.wait, _done) 

813 return self._state.initial_metadata 

814 

815 def trailing_metadata(self) -> Optional[MetadataType]: 

816 """See grpc.Call.trailing_metadata""" 

817 with self._state.condition: 

818 

819 def _done(): 

820 return self._state.trailing_metadata is not None 

821 

822 _common.wait(self._state.condition.wait, _done) 

823 return self._state.trailing_metadata 

824 

825 def code(self) -> Optional[grpc.StatusCode]: 

826 """See grpc.Call.code""" 

827 with self._state.condition: 

828 

829 def _done(): 

830 return self._state.code is not None 

831 

832 _common.wait(self._state.condition.wait, _done) 

833 return self._state.code 

834 

835 def details(self) -> Optional[str]: 

836 """See grpc.Call.details""" 

837 with self._state.condition: 

838 

839 def _done(): 

840 return self._state.details is not None 

841 

842 _common.wait(self._state.condition.wait, _done) 

843 return _common.decode(self._state.details) 

844 

845 def debug_error_string(self) -> Optional[str]: 

846 with self._state.condition: 

847 

848 def _done(): 

849 return self._state.debug_error_string is not None 

850 

851 _common.wait(self._state.condition.wait, _done) 

852 return _common.decode(self._state.debug_error_string) 

853 

854 def cancelled(self) -> bool: 

855 with self._state.condition: 

856 return self._state.cancelled 

857 

858 def running(self) -> bool: 

859 with self._state.condition: 

860 return self._state.code is None 

861 

862 def done(self) -> bool: 

863 with self._state.condition: 

864 return self._state.code is not None 

865 

866 def _is_complete(self) -> bool: 

867 return self._state.code is not None 

868 

869 def result(self, timeout: Optional[float] = None) -> Any: 

870 """Returns the result of the computation or raises its exception. 

871 

872 See grpc.Future.result for the full API contract. 

873 """ 

874 with self._state.condition: 

875 timed_out = _common.wait( 

876 self._state.condition.wait, self._is_complete, timeout=timeout 

877 ) 

878 if timed_out: 

879 raise grpc.FutureTimeoutError() 

880 else: 

881 if self._state.code is grpc.StatusCode.OK: 

882 return self._state.response 

883 elif self._state.cancelled: 

884 raise grpc.FutureCancelledError() 

885 else: 

886 raise self 

887 

888 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

889 """Return the exception raised by the computation. 

890 

891 See grpc.Future.exception for the full API contract. 

892 """ 

893 with self._state.condition: 

894 timed_out = _common.wait( 

895 self._state.condition.wait, self._is_complete, timeout=timeout 

896 ) 

897 if timed_out: 

898 raise grpc.FutureTimeoutError() 

899 else: 

900 if self._state.code is grpc.StatusCode.OK: 

901 return None 

902 elif self._state.cancelled: 

903 raise grpc.FutureCancelledError() 

904 else: 

905 return self 

906 

907 def traceback( 

908 self, timeout: Optional[float] = None 

909 ) -> Optional[types.TracebackType]: 

910 """Access the traceback of the exception raised by the computation. 

911 

912 See grpc.future.traceback for the full API contract. 

913 """ 

914 with self._state.condition: 

915 timed_out = _common.wait( 

916 self._state.condition.wait, self._is_complete, timeout=timeout 

917 ) 

918 if timed_out: 

919 raise grpc.FutureTimeoutError() 

920 else: 

921 if self._state.code is grpc.StatusCode.OK: 

922 return None 

923 elif self._state.cancelled: 

924 raise grpc.FutureCancelledError() 

925 else: 

926 try: 

927 raise self 

928 except grpc.RpcError: 

929 return sys.exc_info()[2] 

930 

931 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

932 with self._state.condition: 

933 if self._state.code is None: 

934 self._state.callbacks.append(functools.partial(fn, self)) 

935 return 

936 

937 fn(self) 

938 

939 def _next(self) -> Any: 

940 with self._state.condition: 

941 if self._state.code is None: 

942 event_handler = _event_handler( 

943 self._state, self._response_deserializer 

944 ) 

945 self._state.due.add(cygrpc.OperationType.receive_message) 

946 operating = self._call.operate( 

947 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 

948 event_handler, 

949 ) 

950 if not operating: 

951 self._state.due.remove(cygrpc.OperationType.receive_message) 

952 elif self._state.code is grpc.StatusCode.OK: 

953 raise StopIteration() 

954 else: 

955 raise self 

956 

957 def _response_ready(): 

958 return self._state.response is not None or ( 

959 cygrpc.OperationType.receive_message not in self._state.due 

960 and self._state.code is not None 

961 ) 

962 

963 _common.wait(self._state.condition.wait, _response_ready) 

964 if self._state.response is not None: 

965 response = self._state.response 

966 self._state.response = None 

967 return response 

968 elif cygrpc.OperationType.receive_message not in self._state.due: 

969 if self._state.code is grpc.StatusCode.OK: 

970 raise StopIteration() 

971 elif self._state.code is not None: 

972 raise self 

973 

974 

975def _start_unary_request( 

976 request: Any, 

977 timeout: Optional[float], 

978 request_serializer: SerializingFunction, 

979) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]: 

980 deadline = _deadline(timeout) 

981 serialized_request = _common.serialize(request, request_serializer) 

982 if serialized_request is None: 

983 state = _RPCState( 

984 (), 

985 (), 

986 (), 

987 grpc.StatusCode.INTERNAL, 

988 "Exception serializing request!", 

989 ) 

990 error = _InactiveRpcError(state) 

991 return deadline, None, error 

992 else: 

993 return deadline, serialized_request, None 

994 

995 

996def _end_unary_response_blocking( 

997 state: _RPCState, 

998 call: cygrpc.SegregatedCall, 

999 with_call: bool, 

1000 deadline: Optional[float], 

1001) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]: 

1002 if state.code is grpc.StatusCode.OK: 

1003 if with_call: 

1004 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline) 

1005 return state.response, rendezvous 

1006 else: 

1007 return state.response 

1008 else: 

1009 raise _InactiveRpcError(state) # pytype: disable=not-instantiable 

1010 

1011 

1012def _stream_unary_invocation_operations( 

1013 metadata: Optional[MetadataType], initial_metadata_flags: int 

1014) -> Sequence[Sequence[cygrpc.Operation]]: 

1015 return ( 

1016 ( 

1017 cygrpc.SendInitialMetadataOperation( 

1018 metadata, initial_metadata_flags 

1019 ), 

1020 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1021 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1022 ), 

1023 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1024 ) 

1025 

1026 

1027def _stream_unary_invocation_operations_and_tags( 

1028 metadata: Optional[MetadataType], initial_metadata_flags: int 

1029) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]: 

1030 return tuple( 

1031 ( 

1032 operations, 

1033 None, 

1034 ) 

1035 for operations in _stream_unary_invocation_operations( 

1036 metadata, initial_metadata_flags 

1037 ) 

1038 ) 

1039 

1040 

1041def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]: 

1042 parent_deadline = cygrpc.get_deadline_from_context() 

1043 if parent_deadline is None and user_deadline is None: 

1044 return None 

1045 elif parent_deadline is not None and user_deadline is None: 

1046 return parent_deadline 

1047 elif user_deadline is not None and parent_deadline is None: 

1048 return user_deadline 

1049 else: 

1050 return min(parent_deadline, user_deadline) 

1051 

1052 

1053class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): 

1054 _channel: cygrpc.Channel 

1055 _managed_call: IntegratedCallFactory 

1056 _method: bytes 

1057 _target: bytes 

1058 _request_serializer: Optional[SerializingFunction] 

1059 _response_deserializer: Optional[DeserializingFunction] 

1060 _context: Any 

1061 _registered_call_handle: Optional[int] 

1062 

1063 __slots__ = [ 

1064 "_channel", 

1065 "_managed_call", 

1066 "_method", 

1067 "_target", 

1068 "_request_serializer", 

1069 "_response_deserializer", 

1070 "_context", 

1071 ] 

1072 

1073 # pylint: disable=too-many-arguments 

1074 def __init__( 

1075 self, 

1076 channel: cygrpc.Channel, 

1077 managed_call: IntegratedCallFactory, 

1078 method: bytes, 

1079 target: bytes, 

1080 request_serializer: Optional[SerializingFunction], 

1081 response_deserializer: Optional[DeserializingFunction], 

1082 _registered_call_handle: Optional[int], 

1083 ): 

1084 self._channel = channel 

1085 self._managed_call = managed_call 

1086 self._method = method 

1087 self._target = target 

1088 self._request_serializer = request_serializer 

1089 self._response_deserializer = response_deserializer 

1090 self._context = cygrpc.build_census_context() 

1091 self._registered_call_handle = _registered_call_handle 

1092 

1093 def _prepare( 

1094 self, 

1095 request: Any, 

1096 timeout: Optional[float], 

1097 metadata: Optional[MetadataType], 

1098 wait_for_ready: Optional[bool], 

1099 compression: Optional[grpc.Compression], 

1100 ) -> Tuple[ 

1101 Optional[_RPCState], 

1102 Optional[Sequence[cygrpc.Operation]], 

1103 Optional[float], 

1104 Optional[grpc.RpcError], 

1105 ]: 

1106 deadline, serialized_request, rendezvous = _start_unary_request( 

1107 request, timeout, self._request_serializer 

1108 ) 

1109 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1110 wait_for_ready 

1111 ) 

1112 augmented_metadata = _compression.augment_metadata( 

1113 metadata, compression 

1114 ) 

1115 if serialized_request is None: 

1116 return None, None, None, rendezvous 

1117 else: 

1118 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) 

1119 operations = ( 

1120 cygrpc.SendInitialMetadataOperation( 

1121 augmented_metadata, initial_metadata_flags 

1122 ), 

1123 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1124 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1125 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), 

1126 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1127 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1128 ) 

1129 return state, operations, deadline, None 

1130 

1131 def _blocking( 

1132 self, 

1133 request: Any, 

1134 timeout: Optional[float] = None, 

1135 metadata: Optional[MetadataType] = None, 

1136 credentials: Optional[grpc.CallCredentials] = None, 

1137 wait_for_ready: Optional[bool] = None, 

1138 compression: Optional[grpc.Compression] = None, 

1139 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1140 state, operations, deadline, rendezvous = self._prepare( 

1141 request, timeout, metadata, wait_for_ready, compression 

1142 ) 

1143 if state is None: 

1144 raise rendezvous # pylint: disable-msg=raising-bad-type 

1145 else: 

1146 state.rpc_start_time = time.perf_counter() 

1147 state.method = _common.decode(self._method) 

1148 state.target = _common.decode(self._target) 

1149 call = self._channel.segregated_call( 

1150 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1151 self._method, 

1152 None, 

1153 _determine_deadline(deadline), 

1154 metadata, 

1155 None if credentials is None else credentials._credentials, 

1156 ( 

1157 ( 

1158 operations, 

1159 None, 

1160 ), 

1161 ), 

1162 self._context, 

1163 self._registered_call_handle, 

1164 ) 

1165 event = call.next_event() 

1166 _handle_event(event, state, self._response_deserializer) 

1167 return state, call 

1168 

1169 def __call__( 

1170 self, 

1171 request: Any, 

1172 timeout: Optional[float] = None, 

1173 metadata: Optional[MetadataType] = None, 

1174 credentials: Optional[grpc.CallCredentials] = None, 

1175 wait_for_ready: Optional[bool] = None, 

1176 compression: Optional[grpc.Compression] = None, 

1177 ) -> Any: 

1178 state, call = self._blocking( 

1179 request, timeout, metadata, credentials, wait_for_ready, compression 

1180 ) 

1181 return _end_unary_response_blocking(state, call, False, None) 

1182 

1183 def with_call( 

1184 self, 

1185 request: Any, 

1186 timeout: Optional[float] = None, 

1187 metadata: Optional[MetadataType] = None, 

1188 credentials: Optional[grpc.CallCredentials] = None, 

1189 wait_for_ready: Optional[bool] = None, 

1190 compression: Optional[grpc.Compression] = None, 

1191 ) -> Tuple[Any, grpc.Call]: 

1192 state, call = self._blocking( 

1193 request, timeout, metadata, credentials, wait_for_ready, compression 

1194 ) 

1195 return _end_unary_response_blocking(state, call, True, None) 

1196 

1197 def future( 

1198 self, 

1199 request: Any, 

1200 timeout: Optional[float] = None, 

1201 metadata: Optional[MetadataType] = None, 

1202 credentials: Optional[grpc.CallCredentials] = None, 

1203 wait_for_ready: Optional[bool] = None, 

1204 compression: Optional[grpc.Compression] = None, 

1205 ) -> _MultiThreadedRendezvous: 

1206 state, operations, deadline, rendezvous = self._prepare( 

1207 request, timeout, metadata, wait_for_ready, compression 

1208 ) 

1209 if state is None: 

1210 raise rendezvous # pylint: disable-msg=raising-bad-type 

1211 else: 

1212 event_handler = _event_handler(state, self._response_deserializer) 

1213 state.rpc_start_time = time.perf_counter() 

1214 state.method = _common.decode(self._method) 

1215 state.target = _common.decode(self._target) 

1216 call = self._managed_call( 

1217 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1218 self._method, 

1219 None, 

1220 deadline, 

1221 metadata, 

1222 None if credentials is None else credentials._credentials, 

1223 (operations,), 

1224 event_handler, 

1225 self._context, 

1226 self._registered_call_handle, 

1227 ) 

1228 return _MultiThreadedRendezvous( 

1229 state, call, self._response_deserializer, deadline 

1230 ) 

1231 

1232 

1233class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1234 _channel: cygrpc.Channel 

1235 _method: bytes 

1236 _target: bytes 

1237 _request_serializer: Optional[SerializingFunction] 

1238 _response_deserializer: Optional[DeserializingFunction] 

1239 _context: Any 

1240 _registered_call_handle: Optional[int] 

1241 

1242 __slots__ = [ 

1243 "_channel", 

1244 "_method", 

1245 "_target", 

1246 "_request_serializer", 

1247 "_response_deserializer", 

1248 "_context", 

1249 ] 

1250 

1251 # pylint: disable=too-many-arguments 

1252 def __init__( 

1253 self, 

1254 channel: cygrpc.Channel, 

1255 method: bytes, 

1256 target: bytes, 

1257 request_serializer: SerializingFunction, 

1258 response_deserializer: DeserializingFunction, 

1259 _registered_call_handle: Optional[int], 

1260 ): 

1261 self._channel = channel 

1262 self._method = method 

1263 self._target = target 

1264 self._request_serializer = request_serializer 

1265 self._response_deserializer = response_deserializer 

1266 self._context = cygrpc.build_census_context() 

1267 self._registered_call_handle = _registered_call_handle 

1268 

1269 def __call__( # pylint: disable=too-many-locals 

1270 self, 

1271 request: Any, 

1272 timeout: Optional[float] = None, 

1273 metadata: Optional[MetadataType] = None, 

1274 credentials: Optional[grpc.CallCredentials] = None, 

1275 wait_for_ready: Optional[bool] = None, 

1276 compression: Optional[grpc.Compression] = None, 

1277 ) -> _SingleThreadedRendezvous: 

1278 deadline = _deadline(timeout) 

1279 serialized_request = _common.serialize( 

1280 request, self._request_serializer 

1281 ) 

1282 if serialized_request is None: 

1283 state = _RPCState( 

1284 (), 

1285 (), 

1286 (), 

1287 grpc.StatusCode.INTERNAL, 

1288 "Exception serializing request!", 

1289 ) 

1290 raise _InactiveRpcError(state) 

1291 

1292 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1293 call_credentials = ( 

1294 None if credentials is None else credentials._credentials 

1295 ) 

1296 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1297 wait_for_ready 

1298 ) 

1299 augmented_metadata = _compression.augment_metadata( 

1300 metadata, compression 

1301 ) 

1302 operations = ( 

1303 ( 

1304 cygrpc.SendInitialMetadataOperation( 

1305 augmented_metadata, initial_metadata_flags 

1306 ), 

1307 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1308 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1309 ), 

1310 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),), 

1311 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1312 ) 

1313 operations_and_tags = tuple((ops, None) for ops in operations) 

1314 state.rpc_start_time = time.perf_counter() 

1315 state.method = _common.decode(self._method) 

1316 state.target = _common.decode(self._target) 

1317 call = self._channel.segregated_call( 

1318 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1319 self._method, 

1320 None, 

1321 _determine_deadline(deadline), 

1322 metadata, 

1323 call_credentials, 

1324 operations_and_tags, 

1325 self._context, 

1326 self._registered_call_handle, 

1327 ) 

1328 return _SingleThreadedRendezvous( 

1329 state, call, self._response_deserializer, deadline 

1330 ) 

1331 

1332 

1333class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1334 _channel: cygrpc.Channel 

1335 _managed_call: IntegratedCallFactory 

1336 _method: bytes 

1337 _target: bytes 

1338 _request_serializer: Optional[SerializingFunction] 

1339 _response_deserializer: Optional[DeserializingFunction] 

1340 _context: Any 

1341 _registered_call_handle: Optional[int] 

1342 

1343 __slots__ = [ 

1344 "_channel", 

1345 "_managed_call", 

1346 "_method", 

1347 "_target", 

1348 "_request_serializer", 

1349 "_response_deserializer", 

1350 "_context", 

1351 ] 

1352 

1353 # pylint: disable=too-many-arguments 

1354 def __init__( 

1355 self, 

1356 channel: cygrpc.Channel, 

1357 managed_call: IntegratedCallFactory, 

1358 method: bytes, 

1359 target: bytes, 

1360 request_serializer: SerializingFunction, 

1361 response_deserializer: DeserializingFunction, 

1362 _registered_call_handle: Optional[int], 

1363 ): 

1364 self._channel = channel 

1365 self._managed_call = managed_call 

1366 self._method = method 

1367 self._target = target 

1368 self._request_serializer = request_serializer 

1369 self._response_deserializer = response_deserializer 

1370 self._context = cygrpc.build_census_context() 

1371 self._registered_call_handle = _registered_call_handle 

1372 

1373 def __call__( # pylint: disable=too-many-locals 

1374 self, 

1375 request: Any, 

1376 timeout: Optional[float] = None, 

1377 metadata: Optional[MetadataType] = None, 

1378 credentials: Optional[grpc.CallCredentials] = None, 

1379 wait_for_ready: Optional[bool] = None, 

1380 compression: Optional[grpc.Compression] = None, 

1381 ) -> _MultiThreadedRendezvous: 

1382 deadline, serialized_request, rendezvous = _start_unary_request( 

1383 request, timeout, self._request_serializer 

1384 ) 

1385 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1386 wait_for_ready 

1387 ) 

1388 if serialized_request is None: 

1389 raise rendezvous # pylint: disable-msg=raising-bad-type 

1390 else: 

1391 augmented_metadata = _compression.augment_metadata( 

1392 metadata, compression 

1393 ) 

1394 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1395 operations = ( 

1396 ( 

1397 cygrpc.SendInitialMetadataOperation( 

1398 augmented_metadata, initial_metadata_flags 

1399 ), 

1400 cygrpc.SendMessageOperation( 

1401 serialized_request, _EMPTY_FLAGS 

1402 ), 

1403 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1404 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1405 ), 

1406 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1407 ) 

1408 state.rpc_start_time = time.perf_counter() 

1409 state.method = _common.decode(self._method) 

1410 state.target = _common.decode(self._target) 

1411 call = self._managed_call( 

1412 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1413 self._method, 

1414 None, 

1415 _determine_deadline(deadline), 

1416 metadata, 

1417 None if credentials is None else credentials._credentials, 

1418 operations, 

1419 _event_handler(state, self._response_deserializer), 

1420 self._context, 

1421 self._registered_call_handle, 

1422 ) 

1423 return _MultiThreadedRendezvous( 

1424 state, call, self._response_deserializer, deadline 

1425 ) 

1426 

1427 

1428class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): 

1429 _channel: cygrpc.Channel 

1430 _managed_call: IntegratedCallFactory 

1431 _method: bytes 

1432 _target: bytes 

1433 _request_serializer: Optional[SerializingFunction] 

1434 _response_deserializer: Optional[DeserializingFunction] 

1435 _context: Any 

1436 _registered_call_handle: Optional[int] 

1437 

1438 __slots__ = [ 

1439 "_channel", 

1440 "_managed_call", 

1441 "_method", 

1442 "_target", 

1443 "_request_serializer", 

1444 "_response_deserializer", 

1445 "_context", 

1446 ] 

1447 

1448 # pylint: disable=too-many-arguments 

1449 def __init__( 

1450 self, 

1451 channel: cygrpc.Channel, 

1452 managed_call: IntegratedCallFactory, 

1453 method: bytes, 

1454 target: bytes, 

1455 request_serializer: Optional[SerializingFunction], 

1456 response_deserializer: Optional[DeserializingFunction], 

1457 _registered_call_handle: Optional[int], 

1458 ): 

1459 self._channel = channel 

1460 self._managed_call = managed_call 

1461 self._method = method 

1462 self._target = target 

1463 self._request_serializer = request_serializer 

1464 self._response_deserializer = response_deserializer 

1465 self._context = cygrpc.build_census_context() 

1466 self._registered_call_handle = _registered_call_handle 

1467 

1468 def _blocking( 

1469 self, 

1470 request_iterator: Iterator, 

1471 timeout: Optional[float], 

1472 metadata: Optional[MetadataType], 

1473 credentials: Optional[grpc.CallCredentials], 

1474 wait_for_ready: Optional[bool], 

1475 compression: Optional[grpc.Compression], 

1476 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1477 deadline = _deadline(timeout) 

1478 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1479 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1480 wait_for_ready 

1481 ) 

1482 augmented_metadata = _compression.augment_metadata( 

1483 metadata, compression 

1484 ) 

1485 state.rpc_start_time = time.perf_counter() 

1486 state.method = _common.decode(self._method) 

1487 state.target = _common.decode(self._target) 

1488 call = self._channel.segregated_call( 

1489 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1490 self._method, 

1491 None, 

1492 _determine_deadline(deadline), 

1493 augmented_metadata, 

1494 None if credentials is None else credentials._credentials, 

1495 _stream_unary_invocation_operations_and_tags( 

1496 augmented_metadata, initial_metadata_flags 

1497 ), 

1498 self._context, 

1499 self._registered_call_handle, 

1500 ) 

1501 _consume_request_iterator( 

1502 request_iterator, state, call, self._request_serializer, None 

1503 ) 

1504 while True: 

1505 event = call.next_event() 

1506 with state.condition: 

1507 _handle_event(event, state, self._response_deserializer) 

1508 state.condition.notify_all() 

1509 if not state.due: 

1510 break 

1511 return state, call 

1512 

1513 def __call__( 

1514 self, 

1515 request_iterator: Iterator, 

1516 timeout: Optional[float] = None, 

1517 metadata: Optional[MetadataType] = None, 

1518 credentials: Optional[grpc.CallCredentials] = None, 

1519 wait_for_ready: Optional[bool] = None, 

1520 compression: Optional[grpc.Compression] = None, 

1521 ) -> Any: 

1522 state, call = self._blocking( 

1523 request_iterator, 

1524 timeout, 

1525 metadata, 

1526 credentials, 

1527 wait_for_ready, 

1528 compression, 

1529 ) 

1530 return _end_unary_response_blocking(state, call, False, None) 

1531 

1532 def with_call( 

1533 self, 

1534 request_iterator: Iterator, 

1535 timeout: Optional[float] = None, 

1536 metadata: Optional[MetadataType] = None, 

1537 credentials: Optional[grpc.CallCredentials] = None, 

1538 wait_for_ready: Optional[bool] = None, 

1539 compression: Optional[grpc.Compression] = None, 

1540 ) -> Tuple[Any, grpc.Call]: 

1541 state, call = self._blocking( 

1542 request_iterator, 

1543 timeout, 

1544 metadata, 

1545 credentials, 

1546 wait_for_ready, 

1547 compression, 

1548 ) 

1549 return _end_unary_response_blocking(state, call, True, None) 

1550 

1551 def future( 

1552 self, 

1553 request_iterator: Iterator, 

1554 timeout: Optional[float] = None, 

1555 metadata: Optional[MetadataType] = None, 

1556 credentials: Optional[grpc.CallCredentials] = None, 

1557 wait_for_ready: Optional[bool] = None, 

1558 compression: Optional[grpc.Compression] = None, 

1559 ) -> _MultiThreadedRendezvous: 

1560 deadline = _deadline(timeout) 

1561 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1562 event_handler = _event_handler(state, self._response_deserializer) 

1563 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1564 wait_for_ready 

1565 ) 

1566 augmented_metadata = _compression.augment_metadata( 

1567 metadata, compression 

1568 ) 

1569 state.rpc_start_time = time.perf_counter() 

1570 state.method = _common.decode(self._method) 

1571 state.target = _common.decode(self._target) 

1572 call = self._managed_call( 

1573 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1574 self._method, 

1575 None, 

1576 deadline, 

1577 augmented_metadata, 

1578 None if credentials is None else credentials._credentials, 

1579 _stream_unary_invocation_operations( 

1580 metadata, initial_metadata_flags 

1581 ), 

1582 event_handler, 

1583 self._context, 

1584 self._registered_call_handle, 

1585 ) 

1586 _consume_request_iterator( 

1587 request_iterator, 

1588 state, 

1589 call, 

1590 self._request_serializer, 

1591 event_handler, 

1592 ) 

1593 return _MultiThreadedRendezvous( 

1594 state, call, self._response_deserializer, deadline 

1595 ) 

1596 

1597 

1598class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): 

1599 _channel: cygrpc.Channel 

1600 _managed_call: IntegratedCallFactory 

1601 _method: bytes 

1602 _target: bytes 

1603 _request_serializer: Optional[SerializingFunction] 

1604 _response_deserializer: Optional[DeserializingFunction] 

1605 _context: Any 

1606 _registered_call_handle: Optional[int] 

1607 

1608 __slots__ = [ 

1609 "_channel", 

1610 "_managed_call", 

1611 "_method", 

1612 "_target", 

1613 "_request_serializer", 

1614 "_response_deserializer", 

1615 "_context", 

1616 ] 

1617 

1618 # pylint: disable=too-many-arguments 

1619 def __init__( 

1620 self, 

1621 channel: cygrpc.Channel, 

1622 managed_call: IntegratedCallFactory, 

1623 method: bytes, 

1624 target: bytes, 

1625 request_serializer: Optional[SerializingFunction], 

1626 response_deserializer: Optional[DeserializingFunction], 

1627 _registered_call_handle: Optional[int], 

1628 ): 

1629 self._channel = channel 

1630 self._managed_call = managed_call 

1631 self._method = method 

1632 self._target = target 

1633 self._request_serializer = request_serializer 

1634 self._response_deserializer = response_deserializer 

1635 self._context = cygrpc.build_census_context() 

1636 self._registered_call_handle = _registered_call_handle 

1637 

1638 def __call__( 

1639 self, 

1640 request_iterator: Iterator, 

1641 timeout: Optional[float] = None, 

1642 metadata: Optional[MetadataType] = None, 

1643 credentials: Optional[grpc.CallCredentials] = None, 

1644 wait_for_ready: Optional[bool] = None, 

1645 compression: Optional[grpc.Compression] = None, 

1646 ) -> _MultiThreadedRendezvous: 

1647 deadline = _deadline(timeout) 

1648 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) 

1649 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1650 wait_for_ready 

1651 ) 

1652 augmented_metadata = _compression.augment_metadata( 

1653 metadata, compression 

1654 ) 

1655 operations = ( 

1656 ( 

1657 cygrpc.SendInitialMetadataOperation( 

1658 augmented_metadata, initial_metadata_flags 

1659 ), 

1660 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1661 ), 

1662 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1663 ) 

1664 event_handler = _event_handler(state, self._response_deserializer) 

1665 state.rpc_start_time = time.perf_counter() 

1666 state.method = _common.decode(self._method) 

1667 state.target = _common.decode(self._target) 

1668 call = self._managed_call( 

1669 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1670 self._method, 

1671 None, 

1672 _determine_deadline(deadline), 

1673 augmented_metadata, 

1674 None if credentials is None else credentials._credentials, 

1675 operations, 

1676 event_handler, 

1677 self._context, 

1678 self._registered_call_handle, 

1679 ) 

1680 _consume_request_iterator( 

1681 request_iterator, 

1682 state, 

1683 call, 

1684 self._request_serializer, 

1685 event_handler, 

1686 ) 

1687 return _MultiThreadedRendezvous( 

1688 state, call, self._response_deserializer, deadline 

1689 ) 

1690 

1691 

1692class _InitialMetadataFlags(int): 

1693 """Stores immutable initial metadata flags""" 

1694 

1695 def __new__(cls, value: int = _EMPTY_FLAGS): 

1696 value &= cygrpc.InitialMetadataFlags.used_mask 

1697 return super(_InitialMetadataFlags, cls).__new__(cls, value) 

1698 

1699 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int: 

1700 if wait_for_ready is not None: 

1701 if wait_for_ready: 

1702 return self.__class__( 

1703 self 

1704 | cygrpc.InitialMetadataFlags.wait_for_ready 

1705 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1706 ) 

1707 elif not wait_for_ready: 

1708 return self.__class__( 

1709 self & ~cygrpc.InitialMetadataFlags.wait_for_ready 

1710 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1711 ) 

1712 return self 

1713 

1714 

1715class _ChannelCallState(object): 

1716 channel: cygrpc.Channel 

1717 managed_calls: int 

1718 threading: bool 

1719 

1720 def __init__(self, channel: cygrpc.Channel): 

1721 self.lock = threading.Lock() 

1722 self.channel = channel 

1723 self.managed_calls = 0 

1724 self.threading = False 

1725 

1726 def reset_postfork_child(self) -> None: 

1727 self.managed_calls = 0 

1728 

1729 def __del__(self): 

1730 try: 

1731 self.channel.close( 

1732 cygrpc.StatusCode.cancelled, "Channel deallocated!" 

1733 ) 

1734 except (TypeError, AttributeError): 

1735 pass 

1736 

1737 

1738def _run_channel_spin_thread(state: _ChannelCallState) -> None: 

1739 def channel_spin(): 

1740 while True: 

1741 cygrpc.block_if_fork_in_progress(state) 

1742 event = state.channel.next_call_event() 

1743 if event.completion_type == cygrpc.CompletionType.queue_timeout: 

1744 continue 

1745 call_completed = event.tag(event) 

1746 if call_completed: 

1747 with state.lock: 

1748 state.managed_calls -= 1 

1749 if state.managed_calls == 0: 

1750 return 

1751 

1752 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin) 

1753 channel_spin_thread.setDaemon(True) 

1754 channel_spin_thread.start() 

1755 

1756 

1757def _channel_managed_call_management(state: _ChannelCallState): 

1758 # pylint: disable=too-many-arguments 

1759 def create( 

1760 flags: int, 

1761 method: bytes, 

1762 host: Optional[str], 

1763 deadline: Optional[float], 

1764 metadata: Optional[MetadataType], 

1765 credentials: Optional[cygrpc.CallCredentials], 

1766 operations: Sequence[Sequence[cygrpc.Operation]], 

1767 event_handler: UserTag, 

1768 context: Any, 

1769 _registered_call_handle: Optional[int], 

1770 ) -> cygrpc.IntegratedCall: 

1771 """Creates a cygrpc.IntegratedCall. 

1772 

1773 Args: 

1774 flags: An integer bitfield of call flags. 

1775 method: The RPC method. 

1776 host: A host string for the created call. 

1777 deadline: A float to be the deadline of the created call or None if 

1778 the call is to have an infinite deadline. 

1779 metadata: The metadata for the call or None. 

1780 credentials: A cygrpc.CallCredentials or None. 

1781 operations: A sequence of sequences of cygrpc.Operations to be 

1782 started on the call. 

1783 event_handler: A behavior to call to handle the events resultant from 

1784 the operations on the call. 

1785 context: Context object for distributed tracing. 

1786 _registered_call_handle: An int representing the call handle of the 

1787 method, or None if the method is not registered. 

1788 

1789 Returns: 

1790 A cygrpc.IntegratedCall with which to conduct an RPC. 

1791 """ 

1792 operations_and_tags = tuple( 

1793 ( 

1794 operation, 

1795 event_handler, 

1796 ) 

1797 for operation in operations 

1798 ) 

1799 with state.lock: 

1800 call = state.channel.integrated_call( 

1801 flags, 

1802 method, 

1803 host, 

1804 deadline, 

1805 metadata, 

1806 credentials, 

1807 operations_and_tags, 

1808 context, 

1809 _registered_call_handle, 

1810 ) 

1811 if state.managed_calls == 0: 

1812 state.managed_calls = 1 

1813 _run_channel_spin_thread(state) 

1814 else: 

1815 state.managed_calls += 1 

1816 return call 

1817 

1818 return create 

1819 

1820 

1821class _ChannelConnectivityState(object): 

1822 lock: threading.RLock 

1823 channel: grpc.Channel 

1824 polling: bool 

1825 connectivity: grpc.ChannelConnectivity 

1826 try_to_connect: bool 

1827 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704 

1828 callbacks_and_connectivities: List[ 

1829 Sequence[ 

1830 Union[ 

1831 Callable[[grpc.ChannelConnectivity], None], 

1832 Optional[grpc.ChannelConnectivity], 

1833 ] 

1834 ] 

1835 ] 

1836 delivering: bool 

1837 

1838 def __init__(self, channel: grpc.Channel): 

1839 self.lock = threading.RLock() 

1840 self.channel = channel 

1841 self.polling = False 

1842 self.connectivity = None 

1843 self.try_to_connect = False 

1844 self.callbacks_and_connectivities = [] 

1845 self.delivering = False 

1846 

1847 def reset_postfork_child(self) -> None: 

1848 self.polling = False 

1849 self.connectivity = None 

1850 self.try_to_connect = False 

1851 self.callbacks_and_connectivities = [] 

1852 self.delivering = False 

1853 

1854 

1855def _deliveries( 

1856 state: _ChannelConnectivityState, 

1857) -> List[Callable[[grpc.ChannelConnectivity], None]]: 

1858 callbacks_needing_update = [] 

1859 for callback_and_connectivity in state.callbacks_and_connectivities: 

1860 callback, callback_connectivity = callback_and_connectivity 

1861 if callback_connectivity is not state.connectivity: 

1862 callbacks_needing_update.append(callback) 

1863 callback_and_connectivity[1] = state.connectivity 

1864 return callbacks_needing_update 

1865 

1866 

1867def _deliver( 

1868 state: _ChannelConnectivityState, 

1869 initial_connectivity: grpc.ChannelConnectivity, 

1870 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1871) -> None: 

1872 connectivity = initial_connectivity 

1873 callbacks = initial_callbacks 

1874 while True: 

1875 for callback in callbacks: 

1876 cygrpc.block_if_fork_in_progress(state) 

1877 try: 

1878 callback(connectivity) 

1879 except Exception: # pylint: disable=broad-except 

1880 _LOGGER.exception( 

1881 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE 

1882 ) 

1883 with state.lock: 

1884 callbacks = _deliveries(state) 

1885 if callbacks: 

1886 connectivity = state.connectivity 

1887 else: 

1888 state.delivering = False 

1889 return 

1890 

1891 

1892def _spawn_delivery( 

1893 state: _ChannelConnectivityState, 

1894 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1895) -> None: 

1896 delivering_thread = cygrpc.ForkManagedThread( 

1897 target=_deliver, 

1898 args=( 

1899 state, 

1900 state.connectivity, 

1901 callbacks, 

1902 ), 

1903 ) 

1904 delivering_thread.setDaemon(True) 

1905 delivering_thread.start() 

1906 state.delivering = True 

1907 

1908 

1909# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll. 

1910def _poll_connectivity( 

1911 state: _ChannelConnectivityState, 

1912 channel: grpc.Channel, 

1913 initial_try_to_connect: bool, 

1914) -> None: 

1915 try_to_connect = initial_try_to_connect 

1916 connectivity = channel.check_connectivity_state(try_to_connect) 

1917 with state.lock: 

1918 state.connectivity = ( 

1919 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1920 connectivity 

1921 ] 

1922 ) 

1923 callbacks = tuple( 

1924 callback for callback, _ in state.callbacks_and_connectivities 

1925 ) 

1926 for callback_and_connectivity in state.callbacks_and_connectivities: 

1927 callback_and_connectivity[1] = state.connectivity 

1928 if callbacks: 

1929 _spawn_delivery(state, callbacks) 

1930 while True: 

1931 event = channel.watch_connectivity_state( 

1932 connectivity, time.time() + 0.2 

1933 ) 

1934 cygrpc.block_if_fork_in_progress(state) 

1935 with state.lock: 

1936 if ( 

1937 not state.callbacks_and_connectivities 

1938 and not state.try_to_connect 

1939 ): 

1940 state.polling = False 

1941 state.connectivity = None 

1942 break 

1943 try_to_connect = state.try_to_connect 

1944 state.try_to_connect = False 

1945 if event.success or try_to_connect: 

1946 connectivity = channel.check_connectivity_state(try_to_connect) 

1947 with state.lock: 

1948 state.connectivity = ( 

1949 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1950 connectivity 

1951 ] 

1952 ) 

1953 if not state.delivering: 

1954 callbacks = _deliveries(state) 

1955 if callbacks: 

1956 _spawn_delivery(state, callbacks) 

1957 

1958 

1959def _subscribe( 

1960 state: _ChannelConnectivityState, 

1961 callback: Callable[[grpc.ChannelConnectivity], None], 

1962 try_to_connect: bool, 

1963) -> None: 

1964 with state.lock: 

1965 if not state.callbacks_and_connectivities and not state.polling: 

1966 polling_thread = cygrpc.ForkManagedThread( 

1967 target=_poll_connectivity, 

1968 args=(state, state.channel, bool(try_to_connect)), 

1969 ) 

1970 polling_thread.setDaemon(True) 

1971 polling_thread.start() 

1972 state.polling = True 

1973 state.callbacks_and_connectivities.append([callback, None]) 

1974 elif not state.delivering and state.connectivity is not None: 

1975 _spawn_delivery(state, (callback,)) 

1976 state.try_to_connect |= bool(try_to_connect) 

1977 state.callbacks_and_connectivities.append( 

1978 [callback, state.connectivity] 

1979 ) 

1980 else: 

1981 state.try_to_connect |= bool(try_to_connect) 

1982 state.callbacks_and_connectivities.append([callback, None]) 

1983 

1984 

1985def _unsubscribe( 

1986 state: _ChannelConnectivityState, 

1987 callback: Callable[[grpc.ChannelConnectivity], None], 

1988) -> None: 

1989 with state.lock: 

1990 for index, (subscribed_callback, _unused_connectivity) in enumerate( 

1991 state.callbacks_and_connectivities 

1992 ): 

1993 if callback == subscribed_callback: 

1994 state.callbacks_and_connectivities.pop(index) 

1995 break 

1996 

1997 

1998def _augment_options( 

1999 base_options: Sequence[ChannelArgumentType], 

2000 compression: Optional[grpc.Compression], 

2001) -> Sequence[ChannelArgumentType]: 

2002 compression_option = _compression.create_channel_option(compression) 

2003 return ( 

2004 tuple(base_options) 

2005 + compression_option 

2006 + ( 

2007 ( 

2008 cygrpc.ChannelArgKey.primary_user_agent_string, 

2009 _USER_AGENT, 

2010 ), 

2011 ) 

2012 ) 

2013 

2014 

2015def _separate_channel_options( 

2016 options: Sequence[ChannelArgumentType], 

2017) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]: 

2018 """Separates core channel options from Python channel options.""" 

2019 core_options = [] 

2020 python_options = [] 

2021 for pair in options: 

2022 if ( 

2023 pair[0] 

2024 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

2025 ): 

2026 python_options.append(pair) 

2027 else: 

2028 core_options.append(pair) 

2029 return python_options, core_options 

2030 

2031 

2032class Channel(grpc.Channel): 

2033 """A cygrpc.Channel-backed implementation of grpc.Channel.""" 

2034 

2035 _single_threaded_unary_stream: bool 

2036 _channel: cygrpc.Channel 

2037 _call_state: _ChannelCallState 

2038 _connectivity_state: _ChannelConnectivityState 

2039 _target: str 

2040 _registered_call_handles: Dict[str, int] 

2041 

2042 def __init__( 

2043 self, 

2044 target: str, 

2045 options: Sequence[ChannelArgumentType], 

2046 credentials: Optional[grpc.ChannelCredentials], 

2047 compression: Optional[grpc.Compression], 

2048 ): 

2049 """Constructor. 

2050 

2051 Args: 

2052 target: The target to which to connect. 

2053 options: Configuration options for the channel. 

2054 credentials: A cygrpc.ChannelCredentials or None. 

2055 compression: An optional value indicating the compression method to be 

2056 used over the lifetime of the channel. 

2057 """ 

2058 python_options, core_options = _separate_channel_options(options) 

2059 self._single_threaded_unary_stream = ( 

2060 _DEFAULT_SINGLE_THREADED_UNARY_STREAM 

2061 ) 

2062 self._process_python_options(python_options) 

2063 self._channel = cygrpc.Channel( 

2064 _common.encode(target), 

2065 _augment_options(core_options, compression), 

2066 credentials, 

2067 ) 

2068 self._target = target 

2069 self._call_state = _ChannelCallState(self._channel) 

2070 self._connectivity_state = _ChannelConnectivityState(self._channel) 

2071 cygrpc.fork_register_channel(self) 

2072 if cygrpc.g_gevent_activated: 

2073 cygrpc.gevent_increment_channel_count() 

2074 

2075 def _get_registered_call_handle(self, method: str) -> int: 

2076 """ 

2077 Get the registered call handle for a method. 

2078 

2079 This is a semi-private method. It is intended for use only by gRPC generated code. 

2080 

2081 This method is not thread-safe. 

2082 

2083 Args: 

2084 method: Required, the method name for the RPC. 

2085 

2086 Returns: 

2087 The registered call handle pointer in the form of a Python Long. 

2088 """ 

2089 return self._channel.get_registered_call_handle(_common.encode(method)) 

2090 

2091 def _process_python_options( 

2092 self, python_options: Sequence[ChannelArgumentType] 

2093 ) -> None: 

2094 """Sets channel attributes according to python-only channel options.""" 

2095 for pair in python_options: 

2096 if ( 

2097 pair[0] 

2098 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

2099 ): 

2100 self._single_threaded_unary_stream = True 

2101 

2102 def subscribe( 

2103 self, 

2104 callback: Callable[[grpc.ChannelConnectivity], None], 

2105 try_to_connect: Optional[bool] = None, 

2106 ) -> None: 

2107 _subscribe(self._connectivity_state, callback, try_to_connect) 

2108 

2109 def unsubscribe( 

2110 self, callback: Callable[[grpc.ChannelConnectivity], None] 

2111 ) -> None: 

2112 _unsubscribe(self._connectivity_state, callback) 

2113 

2114 # pylint: disable=arguments-differ 

2115 def unary_unary( 

2116 self, 

2117 method: str, 

2118 request_serializer: Optional[SerializingFunction] = None, 

2119 response_deserializer: Optional[DeserializingFunction] = None, 

2120 _registered_method: Optional[bool] = False, 

2121 ) -> grpc.UnaryUnaryMultiCallable: 

2122 _registered_call_handle = None 

2123 if _registered_method: 

2124 _registered_call_handle = self._get_registered_call_handle(method) 

2125 return _UnaryUnaryMultiCallable( 

2126 self._channel, 

2127 _channel_managed_call_management(self._call_state), 

2128 _common.encode(method), 

2129 _common.encode(self._target), 

2130 request_serializer, 

2131 response_deserializer, 

2132 _registered_call_handle, 

2133 ) 

2134 

2135 # pylint: disable=arguments-differ 

2136 def unary_stream( 

2137 self, 

2138 method: str, 

2139 request_serializer: Optional[SerializingFunction] = None, 

2140 response_deserializer: Optional[DeserializingFunction] = None, 

2141 _registered_method: Optional[bool] = False, 

2142 ) -> grpc.UnaryStreamMultiCallable: 

2143 _registered_call_handle = None 

2144 if _registered_method: 

2145 _registered_call_handle = self._get_registered_call_handle(method) 

2146 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC 

2147 # on a single Python thread results in an appreciable speed-up. However, 

2148 # due to slight differences in capability, the multi-threaded variant 

2149 # remains the default. 

2150 if self._single_threaded_unary_stream: 

2151 return _SingleThreadedUnaryStreamMultiCallable( 

2152 self._channel, 

2153 _common.encode(method), 

2154 _common.encode(self._target), 

2155 request_serializer, 

2156 response_deserializer, 

2157 _registered_call_handle, 

2158 ) 

2159 else: 

2160 return _UnaryStreamMultiCallable( 

2161 self._channel, 

2162 _channel_managed_call_management(self._call_state), 

2163 _common.encode(method), 

2164 _common.encode(self._target), 

2165 request_serializer, 

2166 response_deserializer, 

2167 _registered_call_handle, 

2168 ) 

2169 

2170 # pylint: disable=arguments-differ 

2171 def stream_unary( 

2172 self, 

2173 method: str, 

2174 request_serializer: Optional[SerializingFunction] = None, 

2175 response_deserializer: Optional[DeserializingFunction] = None, 

2176 _registered_method: Optional[bool] = False, 

2177 ) -> grpc.StreamUnaryMultiCallable: 

2178 _registered_call_handle = None 

2179 if _registered_method: 

2180 _registered_call_handle = self._get_registered_call_handle(method) 

2181 return _StreamUnaryMultiCallable( 

2182 self._channel, 

2183 _channel_managed_call_management(self._call_state), 

2184 _common.encode(method), 

2185 _common.encode(self._target), 

2186 request_serializer, 

2187 response_deserializer, 

2188 _registered_call_handle, 

2189 ) 

2190 

2191 # pylint: disable=arguments-differ 

2192 def stream_stream( 

2193 self, 

2194 method: str, 

2195 request_serializer: Optional[SerializingFunction] = None, 

2196 response_deserializer: Optional[DeserializingFunction] = None, 

2197 _registered_method: Optional[bool] = False, 

2198 ) -> grpc.StreamStreamMultiCallable: 

2199 _registered_call_handle = None 

2200 if _registered_method: 

2201 _registered_call_handle = self._get_registered_call_handle(method) 

2202 return _StreamStreamMultiCallable( 

2203 self._channel, 

2204 _channel_managed_call_management(self._call_state), 

2205 _common.encode(method), 

2206 _common.encode(self._target), 

2207 request_serializer, 

2208 response_deserializer, 

2209 _registered_call_handle, 

2210 ) 

2211 

2212 def _unsubscribe_all(self) -> None: 

2213 state = self._connectivity_state 

2214 if state: 

2215 with state.lock: 

2216 del state.callbacks_and_connectivities[:] 

2217 

2218 def _close(self) -> None: 

2219 self._unsubscribe_all() 

2220 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!") 

2221 cygrpc.fork_unregister_channel(self) 

2222 if cygrpc.g_gevent_activated: 

2223 cygrpc.gevent_decrement_channel_count() 

2224 

2225 def _close_on_fork(self) -> None: 

2226 self._unsubscribe_all() 

2227 self._channel.close_on_fork( 

2228 cygrpc.StatusCode.cancelled, "Channel closed due to fork" 

2229 ) 

2230 

2231 def __enter__(self): 

2232 return self 

2233 

2234 def __exit__(self, exc_type, exc_val, exc_tb): 

2235 self._close() 

2236 return False 

2237 

2238 def close(self) -> None: 

2239 self._close() 

2240 

2241 def __del__(self): 

2242 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases 

2243 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call 

2244 # here (or more likely, call self._close() here). We don't do this today 

2245 # because many valid use cases today allow the channel to be deleted 

2246 # immediately after stubs are created. After a sufficient period of time 

2247 # has passed for all users to be trusted to freeze out to their channels 

2248 # for as long as they are in use and to close them after using them, 

2249 # then deletion of this grpc._channel.Channel instance can be made to 

2250 # effect closure of the underlying cygrpc.Channel instance. 

2251 try: 

2252 self._unsubscribe_all() 

2253 except: # pylint: disable=bare-except # noqa: E722 

2254 # Exceptions in __del__ are ignored by Python anyway, but they can 

2255 # keep spamming logs. Just silence them. 

2256 pass