Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_channel.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

974 statements  

1# Copyright 2016 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Python.""" 

15 

16import copy 

17import functools 

18import logging 

19import os 

20import sys 

21import threading 

22import time 

23import types 

24from typing import ( 

25 Any, 

26 Callable, 

27 Dict, 

28 Iterator, 

29 List, 

30 Optional, 

31 Sequence, 

32 Set, 

33 Tuple, 

34 Union, 

35) 

36 

37import grpc # pytype: disable=pyi-error 

38from grpc import _common # pytype: disable=pyi-error 

39from grpc import _compression # pytype: disable=pyi-error 

40from grpc import _grpcio_metadata # pytype: disable=pyi-error 

41from grpc import _observability # pytype: disable=pyi-error 

42from grpc._cython import cygrpc 

43from grpc._typing import ChannelArgumentType 

44from grpc._typing import DeserializingFunction 

45from grpc._typing import IntegratedCallFactory 

46from grpc._typing import MetadataType 

47from grpc._typing import NullaryCallbackType 

48from grpc._typing import ResponseType 

49from grpc._typing import SerializingFunction 

50from grpc._typing import UserTag 

51import grpc.experimental # pytype: disable=pyi-error 

52 

53_LOGGER = logging.getLogger(__name__) 

54 

55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__) 

56 

57_EMPTY_FLAGS = 0 

58 

59# NOTE(rbellevi): No guarantees are given about the maintenance of this 

60# environment variable. 

61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = ( 

62 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None 

63) 

64 

65_UNARY_UNARY_INITIAL_DUE = ( 

66 cygrpc.OperationType.send_initial_metadata, 

67 cygrpc.OperationType.send_message, 

68 cygrpc.OperationType.send_close_from_client, 

69 cygrpc.OperationType.receive_initial_metadata, 

70 cygrpc.OperationType.receive_message, 

71 cygrpc.OperationType.receive_status_on_client, 

72) 

73_UNARY_STREAM_INITIAL_DUE = ( 

74 cygrpc.OperationType.send_initial_metadata, 

75 cygrpc.OperationType.send_message, 

76 cygrpc.OperationType.send_close_from_client, 

77 cygrpc.OperationType.receive_initial_metadata, 

78 cygrpc.OperationType.receive_status_on_client, 

79) 

80_STREAM_UNARY_INITIAL_DUE = ( 

81 cygrpc.OperationType.send_initial_metadata, 

82 cygrpc.OperationType.receive_initial_metadata, 

83 cygrpc.OperationType.receive_message, 

84 cygrpc.OperationType.receive_status_on_client, 

85) 

86_STREAM_STREAM_INITIAL_DUE = ( 

87 cygrpc.OperationType.send_initial_metadata, 

88 cygrpc.OperationType.receive_initial_metadata, 

89 cygrpc.OperationType.receive_status_on_client, 

90) 

91 

92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 

93 "Exception calling channel subscription callback!" 

94) 

95 

96_OK_RENDEZVOUS_REPR_FORMAT = ( 

97 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' 

98) 

99 

100_NON_OK_RENDEZVOUS_REPR_FORMAT = ( 

101 "<{} of RPC that terminated with:\n" 

102 "\tstatus = {}\n" 

103 '\tdetails = "{}"\n' 

104 '\tdebug_error_string = "{}"\n' 

105 ">" 

106) 

107 

108 

109def _deadline(timeout: Optional[float]) -> Optional[float]: 

110 return None if timeout is None else time.time() + timeout 

111 

112 

113def _unknown_code_details( 

114 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str] 

115) -> str: 

116 return 'Server sent unknown code {} and details "{}"'.format( 

117 unknown_cygrpc_code, details 

118 ) 

119 

120 

121class _RPCState(object): 

122 condition: threading.Condition 

123 due: Set[cygrpc.OperationType] 

124 initial_metadata: Optional[MetadataType] 

125 response: Any 

126 trailing_metadata: Optional[MetadataType] 

127 code: Optional[grpc.StatusCode] 

128 details: Optional[str] 

129 debug_error_string: Optional[str] 

130 cancelled: bool 

131 callbacks: List[NullaryCallbackType] 

132 fork_epoch: Optional[int] 

133 rpc_start_time: Optional[float] # In relative seconds 

134 rpc_end_time: Optional[float] # In relative seconds 

135 method: Optional[str] 

136 target: Optional[str] 

137 

138 def __init__( 

139 self, 

140 due: Sequence[cygrpc.OperationType], 

141 initial_metadata: Optional[MetadataType], 

142 trailing_metadata: Optional[MetadataType], 

143 code: Optional[grpc.StatusCode], 

144 details: Optional[str], 

145 ): 

146 # `condition` guards all members of _RPCState. `notify_all` is called on 

147 # `condition` when the state of the RPC has changed. 

148 self.condition = threading.Condition() 

149 

150 # The cygrpc.OperationType objects representing events due from the RPC's 

151 # completion queue. If an operation is in `due`, it is guaranteed that 

152 # `operate()` has been called on a corresponding operation. But the 

153 # converse is not true. That is, in the case of failed `operate()` 

154 # calls, there may briefly be events in `due` that do not correspond to 

155 # operations submitted to Core. 

156 self.due = set(due) 

157 self.initial_metadata = initial_metadata 

158 self.response = None 

159 self.trailing_metadata = trailing_metadata 

160 self.code = code 

161 self.details = details 

162 self.debug_error_string = None 

163 # The following three fields are used for observability. 

164 # Updates to those fields do not trigger self.condition. 

165 self.rpc_start_time = None 

166 self.rpc_end_time = None 

167 self.method = None 

168 self.target = None 

169 

170 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are 

171 # slightly wonky, so they have to be tracked separately from the rest of the 

172 # result of the RPC. This field tracks whether cancellation was requested 

173 # prior to termination of the RPC. 

174 self.cancelled = False 

175 self.callbacks = [] 

176 self.fork_epoch = cygrpc.get_fork_epoch() 

177 

178 def reset_postfork_child(self): 

179 self.condition = threading.Condition() 

180 

181 

182def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None: 

183 if state.code is None: 

184 state.code = code 

185 state.details = details 

186 if state.initial_metadata is None: 

187 state.initial_metadata = () 

188 state.trailing_metadata = () 

189 

190 

191def _handle_event( 

192 event: cygrpc.BaseEvent, 

193 state: _RPCState, 

194 response_deserializer: Optional[DeserializingFunction], 

195) -> List[NullaryCallbackType]: 

196 callbacks = [] 

197 for batch_operation in event.batch_operations: 

198 operation_type = batch_operation.type() 

199 state.due.remove(operation_type) 

200 if operation_type == cygrpc.OperationType.receive_initial_metadata: 

201 state.initial_metadata = batch_operation.initial_metadata() 

202 elif operation_type == cygrpc.OperationType.receive_message: 

203 serialized_response = batch_operation.message() 

204 if serialized_response is not None: 

205 response = _common.deserialize( 

206 serialized_response, response_deserializer 

207 ) 

208 if response is None: 

209 details = "Exception deserializing response!" 

210 _abort(state, grpc.StatusCode.INTERNAL, details) 

211 else: 

212 state.response = response 

213 elif operation_type == cygrpc.OperationType.receive_status_on_client: 

214 state.trailing_metadata = batch_operation.trailing_metadata() 

215 if state.code is None: 

216 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( 

217 batch_operation.code() 

218 ) 

219 if code is None: 

220 state.code = grpc.StatusCode.UNKNOWN 

221 state.details = _unknown_code_details( 

222 code, batch_operation.details() 

223 ) 

224 else: 

225 state.code = code 

226 state.details = batch_operation.details() 

227 state.debug_error_string = batch_operation.error_string() 

228 state.rpc_end_time = time.perf_counter() 

229 _observability.maybe_record_rpc_latency(state) 

230 callbacks.extend(state.callbacks) 

231 state.callbacks = None 

232 return callbacks 

233 

234 

235def _event_handler( 

236 state: _RPCState, response_deserializer: Optional[DeserializingFunction] 

237) -> UserTag: 

238 def handle_event(event): 

239 with state.condition: 

240 callbacks = _handle_event(event, state, response_deserializer) 

241 state.condition.notify_all() 

242 done = not state.due 

243 for callback in callbacks: 

244 try: 

245 callback() 

246 except Exception as e: # pylint: disable=broad-except 

247 # NOTE(rbellevi): We suppress but log errors here so as not to 

248 # kill the channel spin thread. 

249 logging.error( 

250 "Exception in callback %s: %s", repr(callback.func), repr(e) 

251 ) 

252 return done and state.fork_epoch >= cygrpc.get_fork_epoch() 

253 

254 return handle_event 

255 

256 

257# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall. 

258# pylint: disable=too-many-statements 

259def _consume_request_iterator( 

260 request_iterator: Iterator, 

261 state: _RPCState, 

262 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall], 

263 request_serializer: SerializingFunction, 

264 event_handler: Optional[UserTag], 

265) -> None: 

266 """Consume a request supplied by the user.""" 

267 

268 def consume_request_iterator(): # pylint: disable=too-many-branches 

269 # Iterate over the request iterator until it is exhausted or an error 

270 # condition is encountered. 

271 while True: 

272 return_from_user_request_generator_invoked = False 

273 try: 

274 # The thread may die in user-code. Do not block fork for this. 

275 cygrpc.enter_user_request_generator() 

276 request = next(request_iterator) 

277 except StopIteration: 

278 break 

279 except Exception: # pylint: disable=broad-except 

280 cygrpc.return_from_user_request_generator() 

281 return_from_user_request_generator_invoked = True 

282 code = grpc.StatusCode.UNKNOWN 

283 details = "Exception iterating requests!" 

284 _LOGGER.exception(details) 

285 call.cancel( 

286 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

287 ) 

288 _abort(state, code, details) 

289 return 

290 finally: 

291 if not return_from_user_request_generator_invoked: 

292 cygrpc.return_from_user_request_generator() 

293 serialized_request = _common.serialize(request, request_serializer) 

294 with state.condition: 

295 if state.code is None and not state.cancelled: 

296 if serialized_request is None: 

297 code = grpc.StatusCode.INTERNAL 

298 details = "Exception serializing request!" 

299 call.cancel( 

300 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 

301 details, 

302 ) 

303 _abort(state, code, details) 

304 return 

305 else: 

306 state.due.add(cygrpc.OperationType.send_message) 

307 operations = ( 

308 cygrpc.SendMessageOperation( 

309 serialized_request, _EMPTY_FLAGS 

310 ), 

311 ) 

312 operating = call.operate(operations, event_handler) 

313 if not operating: 

314 state.due.remove(cygrpc.OperationType.send_message) 

315 return 

316 

317 def _done(): 

318 return ( 

319 state.code is not None 

320 or cygrpc.OperationType.send_message 

321 not in state.due 

322 ) 

323 

324 _common.wait( 

325 state.condition.wait, 

326 _done, 

327 spin_cb=functools.partial( 

328 cygrpc.block_if_fork_in_progress, state 

329 ), 

330 ) 

331 if state.code is not None: 

332 return 

333 else: 

334 return 

335 with state.condition: 

336 if state.code is None: 

337 state.due.add(cygrpc.OperationType.send_close_from_client) 

338 operations = ( 

339 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

340 ) 

341 operating = call.operate(operations, event_handler) 

342 if not operating: 

343 state.due.remove( 

344 cygrpc.OperationType.send_close_from_client 

345 ) 

346 

347 consumption_thread = cygrpc.ForkManagedThread( 

348 target=consume_request_iterator 

349 ) 

350 consumption_thread.setDaemon(True) 

351 consumption_thread.start() 

352 

353 

354def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str: 

355 """Calculates error string for RPC.""" 

356 with rpc_state.condition: 

357 if rpc_state.code is None: 

358 return "<{} object>".format(class_name) 

359 elif rpc_state.code is grpc.StatusCode.OK: 

360 return _OK_RENDEZVOUS_REPR_FORMAT.format( 

361 class_name, rpc_state.code, rpc_state.details 

362 ) 

363 else: 

364 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( 

365 class_name, 

366 rpc_state.code, 

367 rpc_state.details, 

368 rpc_state.debug_error_string, 

369 ) 

370 

371 

372class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future): 

373 """An RPC error not tied to the execution of a particular RPC. 

374 

375 The RPC represented by the state object must not be in-progress or 

376 cancelled. 

377 

378 Attributes: 

379 _state: An instance of _RPCState. 

380 """ 

381 

382 _state: _RPCState 

383 

384 def __init__(self, state: _RPCState): 

385 with state.condition: 

386 self._state = _RPCState( 

387 (), 

388 copy.deepcopy(state.initial_metadata), 

389 copy.deepcopy(state.trailing_metadata), 

390 state.code, 

391 copy.deepcopy(state.details), 

392 ) 

393 self._state.response = copy.copy(state.response) 

394 self._state.debug_error_string = copy.copy(state.debug_error_string) 

395 

396 def initial_metadata(self) -> Optional[MetadataType]: 

397 return self._state.initial_metadata 

398 

399 def trailing_metadata(self) -> Optional[MetadataType]: 

400 return self._state.trailing_metadata 

401 

402 def code(self) -> Optional[grpc.StatusCode]: 

403 return self._state.code 

404 

405 def details(self) -> Optional[str]: 

406 return _common.decode(self._state.details) 

407 

408 def debug_error_string(self) -> Optional[str]: 

409 return _common.decode(self._state.debug_error_string) 

410 

411 def _repr(self) -> str: 

412 return _rpc_state_string(self.__class__.__name__, self._state) 

413 

414 def __repr__(self) -> str: 

415 return self._repr() 

416 

417 def __str__(self) -> str: 

418 return self._repr() 

419 

420 def cancel(self) -> bool: 

421 """See grpc.Future.cancel.""" 

422 return False 

423 

424 def cancelled(self) -> bool: 

425 """See grpc.Future.cancelled.""" 

426 return False 

427 

428 def running(self) -> bool: 

429 """See grpc.Future.running.""" 

430 return False 

431 

432 def done(self) -> bool: 

433 """See grpc.Future.done.""" 

434 return True 

435 

436 def result( 

437 self, timeout: Optional[float] = None 

438 ) -> Any: # pylint: disable=unused-argument 

439 """See grpc.Future.result.""" 

440 raise self 

441 

442 def exception( 

443 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

444 ) -> Optional[Exception]: 

445 """See grpc.Future.exception.""" 

446 return self 

447 

448 def traceback( 

449 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

450 ) -> Optional[types.TracebackType]: 

451 """See grpc.Future.traceback.""" 

452 try: 

453 raise self 

454 except grpc.RpcError: 

455 return sys.exc_info()[2] 

456 

457 def add_done_callback( 

458 self, 

459 fn: Callable[[grpc.Future], None], 

460 timeout: Optional[float] = None, # pylint: disable=unused-argument 

461 ) -> None: 

462 """See grpc.Future.add_done_callback.""" 

463 fn(self) 

464 

465 

466class _Rendezvous(grpc.RpcError, grpc.RpcContext): 

467 """An RPC iterator. 

468 

469 Attributes: 

470 _state: An instance of _RPCState. 

471 _call: An instance of SegregatedCall or IntegratedCall. 

472 In either case, the _call object is expected to have operate, cancel, 

473 and next_event methods. 

474 _response_deserializer: A callable taking bytes and return a Python 

475 object. 

476 _deadline: A float representing the deadline of the RPC in seconds. Or 

477 possibly None, to represent an RPC with no deadline at all. 

478 """ 

479 

480 _state: _RPCState 

481 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall] 

482 _response_deserializer: Optional[DeserializingFunction] 

483 _deadline: Optional[float] 

484 

485 def __init__( 

486 self, 

487 state: _RPCState, 

488 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall], 

489 response_deserializer: Optional[DeserializingFunction], 

490 deadline: Optional[float], 

491 ): 

492 super(_Rendezvous, self).__init__() 

493 self._state = state 

494 self._call = call 

495 self._response_deserializer = response_deserializer 

496 self._deadline = deadline 

497 

498 def is_active(self) -> bool: 

499 """See grpc.RpcContext.is_active""" 

500 with self._state.condition: 

501 return self._state.code is None 

502 

503 def time_remaining(self) -> Optional[float]: 

504 """See grpc.RpcContext.time_remaining""" 

505 with self._state.condition: 

506 if self._deadline is None: 

507 return None 

508 else: 

509 return max(self._deadline - time.time(), 0) 

510 

511 def cancel(self) -> bool: 

512 """See grpc.RpcContext.cancel""" 

513 with self._state.condition: 

514 if self._state.code is None: 

515 code = grpc.StatusCode.CANCELLED 

516 details = "Locally cancelled by application!" 

517 self._call.cancel( 

518 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

519 ) 

520 self._state.cancelled = True 

521 _abort(self._state, code, details) 

522 self._state.condition.notify_all() 

523 return True 

524 else: 

525 return False 

526 

527 def add_callback(self, callback: NullaryCallbackType) -> bool: 

528 """See grpc.RpcContext.add_callback""" 

529 with self._state.condition: 

530 if self._state.callbacks is None: 

531 return False 

532 else: 

533 self._state.callbacks.append(callback) 

534 return True 

535 

536 def __iter__(self): 

537 return self 

538 

539 def next(self): 

540 return self._next() 

541 

542 def __next__(self): 

543 return self._next() 

544 

545 def _next(self): 

546 raise NotImplementedError() 

547 

548 def debug_error_string(self) -> Optional[str]: 

549 raise NotImplementedError() 

550 

551 def _repr(self) -> str: 

552 return _rpc_state_string(self.__class__.__name__, self._state) 

553 

554 def __repr__(self) -> str: 

555 return self._repr() 

556 

557 def __str__(self) -> str: 

558 return self._repr() 

559 

560 def __del__(self) -> None: 

561 with self._state.condition: 

562 if self._state.code is None: 

563 self._state.code = grpc.StatusCode.CANCELLED 

564 self._state.details = "Cancelled upon garbage collection!" 

565 self._state.cancelled = True 

566 self._call.cancel( 

567 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], 

568 self._state.details, 

569 ) 

570 self._state.condition.notify_all() 

571 

572 

573class _SingleThreadedRendezvous( 

574 _Rendezvous, grpc.Call, grpc.Future 

575): # pylint: disable=too-many-ancestors 

576 """An RPC iterator operating entirely on a single thread. 

577 

578 The __next__ method of _SingleThreadedRendezvous does not depend on the 

579 existence of any other thread, including the "channel spin thread". 

580 However, this means that its interface is entirely synchronous. So this 

581 class cannot completely fulfill the grpc.Future interface. The result, 

582 exception, and traceback methods will never block and will instead raise 

583 an exception if calling the method would result in blocking. 

584 

585 This means that these methods are safe to call from add_done_callback 

586 handlers. 

587 """ 

588 

589 _state: _RPCState 

590 

591 def _is_complete(self) -> bool: 

592 return self._state.code is not None 

593 

594 def cancelled(self) -> bool: 

595 with self._state.condition: 

596 return self._state.cancelled 

597 

598 def running(self) -> bool: 

599 with self._state.condition: 

600 return self._state.code is None 

601 

602 def done(self) -> bool: 

603 with self._state.condition: 

604 return self._state.code is not None 

605 

606 def result(self, timeout: Optional[float] = None) -> Any: 

607 """Returns the result of the computation or raises its exception. 

608 

609 This method will never block. Instead, it will raise an exception 

610 if calling this method would otherwise result in blocking. 

611 

612 Since this method will never block, any `timeout` argument passed will 

613 be ignored. 

614 """ 

615 del timeout 

616 with self._state.condition: 

617 if not self._is_complete(): 

618 raise grpc.experimental.UsageError( 

619 "_SingleThreadedRendezvous only supports result() when the" 

620 " RPC is complete." 

621 ) 

622 if self._state.code is grpc.StatusCode.OK: 

623 return self._state.response 

624 elif self._state.cancelled: 

625 raise grpc.FutureCancelledError() 

626 else: 

627 raise self 

628 

629 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

630 """Return the exception raised by the computation. 

631 

632 This method will never block. Instead, it will raise an exception 

633 if calling this method would otherwise result in blocking. 

634 

635 Since this method will never block, any `timeout` argument passed will 

636 be ignored. 

637 """ 

638 del timeout 

639 with self._state.condition: 

640 if not self._is_complete(): 

641 raise grpc.experimental.UsageError( 

642 "_SingleThreadedRendezvous only supports exception() when" 

643 " the RPC is complete." 

644 ) 

645 if self._state.code is grpc.StatusCode.OK: 

646 return None 

647 elif self._state.cancelled: 

648 raise grpc.FutureCancelledError() 

649 else: 

650 return self 

651 

652 def traceback( 

653 self, timeout: Optional[float] = None 

654 ) -> Optional[types.TracebackType]: 

655 """Access the traceback of the exception raised by the computation. 

656 

657 This method will never block. Instead, it will raise an exception 

658 if calling this method would otherwise result in blocking. 

659 

660 Since this method will never block, any `timeout` argument passed will 

661 be ignored. 

662 """ 

663 del timeout 

664 with self._state.condition: 

665 if not self._is_complete(): 

666 raise grpc.experimental.UsageError( 

667 "_SingleThreadedRendezvous only supports traceback() when" 

668 " the RPC is complete." 

669 ) 

670 if self._state.code is grpc.StatusCode.OK: 

671 return None 

672 elif self._state.cancelled: 

673 raise grpc.FutureCancelledError() 

674 else: 

675 try: 

676 raise self 

677 except grpc.RpcError: 

678 return sys.exc_info()[2] 

679 

680 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

681 with self._state.condition: 

682 if self._state.code is None: 

683 self._state.callbacks.append(functools.partial(fn, self)) 

684 return 

685 

686 fn(self) 

687 

688 def initial_metadata(self) -> Optional[MetadataType]: 

689 """See grpc.Call.initial_metadata""" 

690 with self._state.condition: 

691 # NOTE(gnossen): Based on our initial call batch, we are guaranteed 

692 # to receive initial metadata before any messages. 

693 while self._state.initial_metadata is None: 

694 self._consume_next_event() 

695 return self._state.initial_metadata 

696 

697 def trailing_metadata(self) -> Optional[MetadataType]: 

698 """See grpc.Call.trailing_metadata""" 

699 with self._state.condition: 

700 if self._state.trailing_metadata is None: 

701 raise grpc.experimental.UsageError( 

702 "Cannot get trailing metadata until RPC is completed." 

703 ) 

704 return self._state.trailing_metadata 

705 

706 def code(self) -> Optional[grpc.StatusCode]: 

707 """See grpc.Call.code""" 

708 with self._state.condition: 

709 if self._state.code is None: 

710 raise grpc.experimental.UsageError( 

711 "Cannot get code until RPC is completed." 

712 ) 

713 return self._state.code 

714 

715 def details(self) -> Optional[str]: 

716 """See grpc.Call.details""" 

717 with self._state.condition: 

718 if self._state.details is None: 

719 raise grpc.experimental.UsageError( 

720 "Cannot get details until RPC is completed." 

721 ) 

722 return _common.decode(self._state.details) 

723 

724 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]: 

725 event = self._call.next_event() 

726 with self._state.condition: 

727 callbacks = _handle_event( 

728 event, self._state, self._response_deserializer 

729 ) 

730 for callback in callbacks: 

731 # NOTE(gnossen): We intentionally allow exceptions to bubble up 

732 # to the user when running on a single thread. 

733 callback() 

734 return event 

735 

736 def _next_response(self) -> Any: 

737 while True: 

738 self._consume_next_event() 

739 with self._state.condition: 

740 if self._state.response is not None: 

741 response = self._state.response 

742 self._state.response = None 

743 return response 

744 elif ( 

745 cygrpc.OperationType.receive_message not in self._state.due 

746 ): 

747 if self._state.code is grpc.StatusCode.OK: 

748 raise StopIteration() 

749 elif self._state.code is not None: 

750 raise self 

751 

752 def _next(self) -> Any: 

753 with self._state.condition: 

754 if self._state.code is None: 

755 # We tentatively add the operation as expected and remove 

756 # it if the enqueue operation fails. This allows us to guarantee that 

757 # if an event has been submitted to the core completion queue, 

758 # it is in `due`. If we waited until after a successful 

759 # enqueue operation then a signal could interrupt this 

760 # thread between the enqueue operation and the addition of the 

761 # operation to `due`. This would cause an exception on the 

762 # channel spin thread when the operation completes and no 

763 # corresponding operation would be present in state.due. 

764 # Note that, since `condition` is held through this block, there is 

765 # no data race on `due`. 

766 self._state.due.add(cygrpc.OperationType.receive_message) 

767 operating = self._call.operate( 

768 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None 

769 ) 

770 if not operating: 

771 self._state.due.remove(cygrpc.OperationType.receive_message) 

772 elif self._state.code is grpc.StatusCode.OK: 

773 raise StopIteration() 

774 else: 

775 raise self 

776 return self._next_response() 

777 

778 def debug_error_string(self) -> Optional[str]: 

779 with self._state.condition: 

780 if self._state.debug_error_string is None: 

781 raise grpc.experimental.UsageError( 

782 "Cannot get debug error string until RPC is completed." 

783 ) 

784 return _common.decode(self._state.debug_error_string) 

785 

786 

787class _MultiThreadedRendezvous( 

788 _Rendezvous, grpc.Call, grpc.Future 

789): # pylint: disable=too-many-ancestors 

790 """An RPC iterator that depends on a channel spin thread. 

791 

792 This iterator relies upon a per-channel thread running in the background, 

793 dequeueing events from the completion queue, and notifying threads waiting 

794 on the threading.Condition object in the _RPCState object. 

795 

796 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface 

797 and to mediate a bidirection streaming RPC. 

798 """ 

799 

800 _state: _RPCState 

801 

802 def initial_metadata(self) -> Optional[MetadataType]: 

803 """See grpc.Call.initial_metadata""" 

804 with self._state.condition: 

805 

806 def _done(): 

807 return self._state.initial_metadata is not None 

808 

809 _common.wait(self._state.condition.wait, _done) 

810 return self._state.initial_metadata 

811 

812 def trailing_metadata(self) -> Optional[MetadataType]: 

813 """See grpc.Call.trailing_metadata""" 

814 with self._state.condition: 

815 

816 def _done(): 

817 return self._state.trailing_metadata is not None 

818 

819 _common.wait(self._state.condition.wait, _done) 

820 return self._state.trailing_metadata 

821 

822 def code(self) -> Optional[grpc.StatusCode]: 

823 """See grpc.Call.code""" 

824 with self._state.condition: 

825 

826 def _done(): 

827 return self._state.code is not None 

828 

829 _common.wait(self._state.condition.wait, _done) 

830 return self._state.code 

831 

832 def details(self) -> Optional[str]: 

833 """See grpc.Call.details""" 

834 with self._state.condition: 

835 

836 def _done(): 

837 return self._state.details is not None 

838 

839 _common.wait(self._state.condition.wait, _done) 

840 return _common.decode(self._state.details) 

841 

842 def debug_error_string(self) -> Optional[str]: 

843 with self._state.condition: 

844 

845 def _done(): 

846 return self._state.debug_error_string is not None 

847 

848 _common.wait(self._state.condition.wait, _done) 

849 return _common.decode(self._state.debug_error_string) 

850 

851 def cancelled(self) -> bool: 

852 with self._state.condition: 

853 return self._state.cancelled 

854 

855 def running(self) -> bool: 

856 with self._state.condition: 

857 return self._state.code is None 

858 

859 def done(self) -> bool: 

860 with self._state.condition: 

861 return self._state.code is not None 

862 

863 def _is_complete(self) -> bool: 

864 return self._state.code is not None 

865 

866 def result(self, timeout: Optional[float] = None) -> Any: 

867 """Returns the result of the computation or raises its exception. 

868 

869 See grpc.Future.result for the full API contract. 

870 """ 

871 with self._state.condition: 

872 timed_out = _common.wait( 

873 self._state.condition.wait, self._is_complete, timeout=timeout 

874 ) 

875 if timed_out: 

876 raise grpc.FutureTimeoutError() 

877 else: 

878 if self._state.code is grpc.StatusCode.OK: 

879 return self._state.response 

880 elif self._state.cancelled: 

881 raise grpc.FutureCancelledError() 

882 else: 

883 raise self 

884 

885 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

886 """Return the exception raised by the computation. 

887 

888 See grpc.Future.exception for the full API contract. 

889 """ 

890 with self._state.condition: 

891 timed_out = _common.wait( 

892 self._state.condition.wait, self._is_complete, timeout=timeout 

893 ) 

894 if timed_out: 

895 raise grpc.FutureTimeoutError() 

896 else: 

897 if self._state.code is grpc.StatusCode.OK: 

898 return None 

899 elif self._state.cancelled: 

900 raise grpc.FutureCancelledError() 

901 else: 

902 return self 

903 

904 def traceback( 

905 self, timeout: Optional[float] = None 

906 ) -> Optional[types.TracebackType]: 

907 """Access the traceback of the exception raised by the computation. 

908 

909 See grpc.future.traceback for the full API contract. 

910 """ 

911 with self._state.condition: 

912 timed_out = _common.wait( 

913 self._state.condition.wait, self._is_complete, timeout=timeout 

914 ) 

915 if timed_out: 

916 raise grpc.FutureTimeoutError() 

917 else: 

918 if self._state.code is grpc.StatusCode.OK: 

919 return None 

920 elif self._state.cancelled: 

921 raise grpc.FutureCancelledError() 

922 else: 

923 try: 

924 raise self 

925 except grpc.RpcError: 

926 return sys.exc_info()[2] 

927 

928 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

929 with self._state.condition: 

930 if self._state.code is None: 

931 self._state.callbacks.append(functools.partial(fn, self)) 

932 return 

933 

934 fn(self) 

935 

936 def _next(self) -> Any: 

937 with self._state.condition: 

938 if self._state.code is None: 

939 event_handler = _event_handler( 

940 self._state, self._response_deserializer 

941 ) 

942 self._state.due.add(cygrpc.OperationType.receive_message) 

943 operating = self._call.operate( 

944 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 

945 event_handler, 

946 ) 

947 if not operating: 

948 self._state.due.remove(cygrpc.OperationType.receive_message) 

949 elif self._state.code is grpc.StatusCode.OK: 

950 raise StopIteration() 

951 else: 

952 raise self 

953 

954 def _response_ready(): 

955 return self._state.response is not None or ( 

956 cygrpc.OperationType.receive_message not in self._state.due 

957 and self._state.code is not None 

958 ) 

959 

960 _common.wait(self._state.condition.wait, _response_ready) 

961 if self._state.response is not None: 

962 response = self._state.response 

963 self._state.response = None 

964 return response 

965 elif cygrpc.OperationType.receive_message not in self._state.due: 

966 if self._state.code is grpc.StatusCode.OK: 

967 raise StopIteration() 

968 elif self._state.code is not None: 

969 raise self 

970 

971 

972def _start_unary_request( 

973 request: Any, 

974 timeout: Optional[float], 

975 request_serializer: SerializingFunction, 

976) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]: 

977 deadline = _deadline(timeout) 

978 serialized_request = _common.serialize(request, request_serializer) 

979 if serialized_request is None: 

980 state = _RPCState( 

981 (), 

982 (), 

983 (), 

984 grpc.StatusCode.INTERNAL, 

985 "Exception serializing request!", 

986 ) 

987 error = _InactiveRpcError(state) 

988 return deadline, None, error 

989 else: 

990 return deadline, serialized_request, None 

991 

992 

993def _end_unary_response_blocking( 

994 state: _RPCState, 

995 call: cygrpc.SegregatedCall, 

996 with_call: bool, 

997 deadline: Optional[float], 

998) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]: 

999 if state.code is grpc.StatusCode.OK: 

1000 if with_call: 

1001 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline) 

1002 return state.response, rendezvous 

1003 else: 

1004 return state.response 

1005 else: 

1006 raise _InactiveRpcError(state) # pytype: disable=not-instantiable 

1007 

1008 

1009def _stream_unary_invocation_operations( 

1010 metadata: Optional[MetadataType], initial_metadata_flags: int 

1011) -> Sequence[Sequence[cygrpc.Operation]]: 

1012 return ( 

1013 ( 

1014 cygrpc.SendInitialMetadataOperation( 

1015 metadata, initial_metadata_flags 

1016 ), 

1017 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1018 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1019 ), 

1020 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1021 ) 

1022 

1023 

1024def _stream_unary_invocation_operations_and_tags( 

1025 metadata: Optional[MetadataType], initial_metadata_flags: int 

1026) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]: 

1027 return tuple( 

1028 ( 

1029 operations, 

1030 None, 

1031 ) 

1032 for operations in _stream_unary_invocation_operations( 

1033 metadata, initial_metadata_flags 

1034 ) 

1035 ) 

1036 

1037 

1038def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]: 

1039 parent_deadline = cygrpc.get_deadline_from_context() 

1040 if parent_deadline is None and user_deadline is None: 

1041 return None 

1042 elif parent_deadline is not None and user_deadline is None: 

1043 return parent_deadline 

1044 elif user_deadline is not None and parent_deadline is None: 

1045 return user_deadline 

1046 else: 

1047 return min(parent_deadline, user_deadline) 

1048 

1049 

1050class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): 

1051 _channel: cygrpc.Channel 

1052 _managed_call: IntegratedCallFactory 

1053 _method: bytes 

1054 _target: bytes 

1055 _request_serializer: Optional[SerializingFunction] 

1056 _response_deserializer: Optional[DeserializingFunction] 

1057 _context: Any 

1058 _registered_call_handle: Optional[int] 

1059 

1060 __slots__ = [ 

1061 "_channel", 

1062 "_managed_call", 

1063 "_method", 

1064 "_target", 

1065 "_request_serializer", 

1066 "_response_deserializer", 

1067 "_context", 

1068 ] 

1069 

1070 # pylint: disable=too-many-arguments 

1071 def __init__( 

1072 self, 

1073 channel: cygrpc.Channel, 

1074 managed_call: IntegratedCallFactory, 

1075 method: bytes, 

1076 target: bytes, 

1077 request_serializer: Optional[SerializingFunction], 

1078 response_deserializer: Optional[DeserializingFunction], 

1079 _registered_call_handle: Optional[int], 

1080 ): 

1081 self._channel = channel 

1082 self._managed_call = managed_call 

1083 self._method = method 

1084 self._target = target 

1085 self._request_serializer = request_serializer 

1086 self._response_deserializer = response_deserializer 

1087 self._context = cygrpc.build_census_context() 

1088 self._registered_call_handle = _registered_call_handle 

1089 

1090 def _prepare( 

1091 self, 

1092 request: Any, 

1093 timeout: Optional[float], 

1094 metadata: Optional[MetadataType], 

1095 wait_for_ready: Optional[bool], 

1096 compression: Optional[grpc.Compression], 

1097 ) -> Tuple[ 

1098 Optional[_RPCState], 

1099 Optional[Sequence[cygrpc.Operation]], 

1100 Optional[float], 

1101 Optional[grpc.RpcError], 

1102 ]: 

1103 deadline, serialized_request, rendezvous = _start_unary_request( 

1104 request, timeout, self._request_serializer 

1105 ) 

1106 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1107 wait_for_ready 

1108 ) 

1109 augmented_metadata = _compression.augment_metadata( 

1110 metadata, compression 

1111 ) 

1112 if serialized_request is None: 

1113 return None, None, None, rendezvous 

1114 else: 

1115 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) 

1116 operations = ( 

1117 cygrpc.SendInitialMetadataOperation( 

1118 augmented_metadata, initial_metadata_flags 

1119 ), 

1120 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1121 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1122 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), 

1123 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1124 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1125 ) 

1126 return state, operations, deadline, None 

1127 

1128 def _blocking( 

1129 self, 

1130 request: Any, 

1131 timeout: Optional[float] = None, 

1132 metadata: Optional[MetadataType] = None, 

1133 credentials: Optional[grpc.CallCredentials] = None, 

1134 wait_for_ready: Optional[bool] = None, 

1135 compression: Optional[grpc.Compression] = None, 

1136 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1137 state, operations, deadline, rendezvous = self._prepare( 

1138 request, timeout, metadata, wait_for_ready, compression 

1139 ) 

1140 if state is None: 

1141 raise rendezvous # pylint: disable-msg=raising-bad-type 

1142 else: 

1143 state.rpc_start_time = time.perf_counter() 

1144 state.method = _common.decode(self._method) 

1145 state.target = _common.decode(self._target) 

1146 call = self._channel.segregated_call( 

1147 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1148 self._method, 

1149 None, 

1150 _determine_deadline(deadline), 

1151 metadata, 

1152 None if credentials is None else credentials._credentials, 

1153 ( 

1154 ( 

1155 operations, 

1156 None, 

1157 ), 

1158 ), 

1159 self._context, 

1160 self._registered_call_handle, 

1161 ) 

1162 event = call.next_event() 

1163 _handle_event(event, state, self._response_deserializer) 

1164 return state, call 

1165 

1166 def __call__( 

1167 self, 

1168 request: Any, 

1169 timeout: Optional[float] = None, 

1170 metadata: Optional[MetadataType] = None, 

1171 credentials: Optional[grpc.CallCredentials] = None, 

1172 wait_for_ready: Optional[bool] = None, 

1173 compression: Optional[grpc.Compression] = None, 

1174 ) -> Any: 

1175 ( 

1176 state, 

1177 call, 

1178 ) = self._blocking( 

1179 request, timeout, metadata, credentials, wait_for_ready, compression 

1180 ) 

1181 return _end_unary_response_blocking(state, call, False, None) 

1182 

1183 def with_call( 

1184 self, 

1185 request: Any, 

1186 timeout: Optional[float] = None, 

1187 metadata: Optional[MetadataType] = None, 

1188 credentials: Optional[grpc.CallCredentials] = None, 

1189 wait_for_ready: Optional[bool] = None, 

1190 compression: Optional[grpc.Compression] = None, 

1191 ) -> Tuple[Any, grpc.Call]: 

1192 ( 

1193 state, 

1194 call, 

1195 ) = self._blocking( 

1196 request, timeout, metadata, credentials, wait_for_ready, compression 

1197 ) 

1198 return _end_unary_response_blocking(state, call, True, None) 

1199 

1200 def future( 

1201 self, 

1202 request: Any, 

1203 timeout: Optional[float] = None, 

1204 metadata: Optional[MetadataType] = None, 

1205 credentials: Optional[grpc.CallCredentials] = None, 

1206 wait_for_ready: Optional[bool] = None, 

1207 compression: Optional[grpc.Compression] = None, 

1208 ) -> _MultiThreadedRendezvous: 

1209 state, operations, deadline, rendezvous = self._prepare( 

1210 request, timeout, metadata, wait_for_ready, compression 

1211 ) 

1212 if state is None: 

1213 raise rendezvous # pylint: disable-msg=raising-bad-type 

1214 else: 

1215 event_handler = _event_handler(state, self._response_deserializer) 

1216 state.rpc_start_time = time.perf_counter() 

1217 state.method = _common.decode(self._method) 

1218 state.target = _common.decode(self._target) 

1219 call = self._managed_call( 

1220 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1221 self._method, 

1222 None, 

1223 deadline, 

1224 metadata, 

1225 None if credentials is None else credentials._credentials, 

1226 (operations,), 

1227 event_handler, 

1228 self._context, 

1229 self._registered_call_handle, 

1230 ) 

1231 return _MultiThreadedRendezvous( 

1232 state, call, self._response_deserializer, deadline 

1233 ) 

1234 

1235 

1236class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1237 _channel: cygrpc.Channel 

1238 _method: bytes 

1239 _target: bytes 

1240 _request_serializer: Optional[SerializingFunction] 

1241 _response_deserializer: Optional[DeserializingFunction] 

1242 _context: Any 

1243 _registered_call_handle: Optional[int] 

1244 

1245 __slots__ = [ 

1246 "_channel", 

1247 "_method", 

1248 "_target", 

1249 "_request_serializer", 

1250 "_response_deserializer", 

1251 "_context", 

1252 ] 

1253 

1254 # pylint: disable=too-many-arguments 

1255 def __init__( 

1256 self, 

1257 channel: cygrpc.Channel, 

1258 method: bytes, 

1259 target: bytes, 

1260 request_serializer: SerializingFunction, 

1261 response_deserializer: DeserializingFunction, 

1262 _registered_call_handle: Optional[int], 

1263 ): 

1264 self._channel = channel 

1265 self._method = method 

1266 self._target = target 

1267 self._request_serializer = request_serializer 

1268 self._response_deserializer = response_deserializer 

1269 self._context = cygrpc.build_census_context() 

1270 self._registered_call_handle = _registered_call_handle 

1271 

1272 def __call__( # pylint: disable=too-many-locals 

1273 self, 

1274 request: Any, 

1275 timeout: Optional[float] = None, 

1276 metadata: Optional[MetadataType] = None, 

1277 credentials: Optional[grpc.CallCredentials] = None, 

1278 wait_for_ready: Optional[bool] = None, 

1279 compression: Optional[grpc.Compression] = None, 

1280 ) -> _SingleThreadedRendezvous: 

1281 deadline = _deadline(timeout) 

1282 serialized_request = _common.serialize( 

1283 request, self._request_serializer 

1284 ) 

1285 if serialized_request is None: 

1286 state = _RPCState( 

1287 (), 

1288 (), 

1289 (), 

1290 grpc.StatusCode.INTERNAL, 

1291 "Exception serializing request!", 

1292 ) 

1293 raise _InactiveRpcError(state) 

1294 

1295 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1296 call_credentials = ( 

1297 None if credentials is None else credentials._credentials 

1298 ) 

1299 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1300 wait_for_ready 

1301 ) 

1302 augmented_metadata = _compression.augment_metadata( 

1303 metadata, compression 

1304 ) 

1305 operations = ( 

1306 ( 

1307 cygrpc.SendInitialMetadataOperation( 

1308 augmented_metadata, initial_metadata_flags 

1309 ), 

1310 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1311 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1312 ), 

1313 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),), 

1314 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1315 ) 

1316 operations_and_tags = tuple((ops, None) for ops in operations) 

1317 state.rpc_start_time = time.perf_counter() 

1318 state.method = _common.decode(self._method) 

1319 state.target = _common.decode(self._target) 

1320 call = self._channel.segregated_call( 

1321 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1322 self._method, 

1323 None, 

1324 _determine_deadline(deadline), 

1325 metadata, 

1326 call_credentials, 

1327 operations_and_tags, 

1328 self._context, 

1329 self._registered_call_handle, 

1330 ) 

1331 return _SingleThreadedRendezvous( 

1332 state, call, self._response_deserializer, deadline 

1333 ) 

1334 

1335 

1336class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1337 _channel: cygrpc.Channel 

1338 _managed_call: IntegratedCallFactory 

1339 _method: bytes 

1340 _target: bytes 

1341 _request_serializer: Optional[SerializingFunction] 

1342 _response_deserializer: Optional[DeserializingFunction] 

1343 _context: Any 

1344 _registered_call_handle: Optional[int] 

1345 

1346 __slots__ = [ 

1347 "_channel", 

1348 "_managed_call", 

1349 "_method", 

1350 "_target", 

1351 "_request_serializer", 

1352 "_response_deserializer", 

1353 "_context", 

1354 ] 

1355 

1356 # pylint: disable=too-many-arguments 

1357 def __init__( 

1358 self, 

1359 channel: cygrpc.Channel, 

1360 managed_call: IntegratedCallFactory, 

1361 method: bytes, 

1362 target: bytes, 

1363 request_serializer: SerializingFunction, 

1364 response_deserializer: DeserializingFunction, 

1365 _registered_call_handle: Optional[int], 

1366 ): 

1367 self._channel = channel 

1368 self._managed_call = managed_call 

1369 self._method = method 

1370 self._target = target 

1371 self._request_serializer = request_serializer 

1372 self._response_deserializer = response_deserializer 

1373 self._context = cygrpc.build_census_context() 

1374 self._registered_call_handle = _registered_call_handle 

1375 

1376 def __call__( # pylint: disable=too-many-locals 

1377 self, 

1378 request: Any, 

1379 timeout: Optional[float] = None, 

1380 metadata: Optional[MetadataType] = None, 

1381 credentials: Optional[grpc.CallCredentials] = None, 

1382 wait_for_ready: Optional[bool] = None, 

1383 compression: Optional[grpc.Compression] = None, 

1384 ) -> _MultiThreadedRendezvous: 

1385 deadline, serialized_request, rendezvous = _start_unary_request( 

1386 request, timeout, self._request_serializer 

1387 ) 

1388 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1389 wait_for_ready 

1390 ) 

1391 if serialized_request is None: 

1392 raise rendezvous # pylint: disable-msg=raising-bad-type 

1393 else: 

1394 augmented_metadata = _compression.augment_metadata( 

1395 metadata, compression 

1396 ) 

1397 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1398 operations = ( 

1399 ( 

1400 cygrpc.SendInitialMetadataOperation( 

1401 augmented_metadata, initial_metadata_flags 

1402 ), 

1403 cygrpc.SendMessageOperation( 

1404 serialized_request, _EMPTY_FLAGS 

1405 ), 

1406 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1407 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1408 ), 

1409 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1410 ) 

1411 state.rpc_start_time = time.perf_counter() 

1412 state.method = _common.decode(self._method) 

1413 state.target = _common.decode(self._target) 

1414 call = self._managed_call( 

1415 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1416 self._method, 

1417 None, 

1418 _determine_deadline(deadline), 

1419 metadata, 

1420 None if credentials is None else credentials._credentials, 

1421 operations, 

1422 _event_handler(state, self._response_deserializer), 

1423 self._context, 

1424 self._registered_call_handle, 

1425 ) 

1426 return _MultiThreadedRendezvous( 

1427 state, call, self._response_deserializer, deadline 

1428 ) 

1429 

1430 

1431class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): 

1432 _channel: cygrpc.Channel 

1433 _managed_call: IntegratedCallFactory 

1434 _method: bytes 

1435 _target: bytes 

1436 _request_serializer: Optional[SerializingFunction] 

1437 _response_deserializer: Optional[DeserializingFunction] 

1438 _context: Any 

1439 _registered_call_handle: Optional[int] 

1440 

1441 __slots__ = [ 

1442 "_channel", 

1443 "_managed_call", 

1444 "_method", 

1445 "_target", 

1446 "_request_serializer", 

1447 "_response_deserializer", 

1448 "_context", 

1449 ] 

1450 

1451 # pylint: disable=too-many-arguments 

1452 def __init__( 

1453 self, 

1454 channel: cygrpc.Channel, 

1455 managed_call: IntegratedCallFactory, 

1456 method: bytes, 

1457 target: bytes, 

1458 request_serializer: Optional[SerializingFunction], 

1459 response_deserializer: Optional[DeserializingFunction], 

1460 _registered_call_handle: Optional[int], 

1461 ): 

1462 self._channel = channel 

1463 self._managed_call = managed_call 

1464 self._method = method 

1465 self._target = target 

1466 self._request_serializer = request_serializer 

1467 self._response_deserializer = response_deserializer 

1468 self._context = cygrpc.build_census_context() 

1469 self._registered_call_handle = _registered_call_handle 

1470 

1471 def _blocking( 

1472 self, 

1473 request_iterator: Iterator, 

1474 timeout: Optional[float], 

1475 metadata: Optional[MetadataType], 

1476 credentials: Optional[grpc.CallCredentials], 

1477 wait_for_ready: Optional[bool], 

1478 compression: Optional[grpc.Compression], 

1479 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1480 deadline = _deadline(timeout) 

1481 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1482 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1483 wait_for_ready 

1484 ) 

1485 augmented_metadata = _compression.augment_metadata( 

1486 metadata, compression 

1487 ) 

1488 state.rpc_start_time = time.perf_counter() 

1489 state.method = _common.decode(self._method) 

1490 state.target = _common.decode(self._target) 

1491 call = self._channel.segregated_call( 

1492 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1493 self._method, 

1494 None, 

1495 _determine_deadline(deadline), 

1496 augmented_metadata, 

1497 None if credentials is None else credentials._credentials, 

1498 _stream_unary_invocation_operations_and_tags( 

1499 augmented_metadata, initial_metadata_flags 

1500 ), 

1501 self._context, 

1502 self._registered_call_handle, 

1503 ) 

1504 _consume_request_iterator( 

1505 request_iterator, state, call, self._request_serializer, None 

1506 ) 

1507 while True: 

1508 event = call.next_event() 

1509 with state.condition: 

1510 _handle_event(event, state, self._response_deserializer) 

1511 state.condition.notify_all() 

1512 if not state.due: 

1513 break 

1514 return state, call 

1515 

1516 def __call__( 

1517 self, 

1518 request_iterator: Iterator, 

1519 timeout: Optional[float] = None, 

1520 metadata: Optional[MetadataType] = None, 

1521 credentials: Optional[grpc.CallCredentials] = None, 

1522 wait_for_ready: Optional[bool] = None, 

1523 compression: Optional[grpc.Compression] = None, 

1524 ) -> Any: 

1525 ( 

1526 state, 

1527 call, 

1528 ) = self._blocking( 

1529 request_iterator, 

1530 timeout, 

1531 metadata, 

1532 credentials, 

1533 wait_for_ready, 

1534 compression, 

1535 ) 

1536 return _end_unary_response_blocking(state, call, False, None) 

1537 

1538 def with_call( 

1539 self, 

1540 request_iterator: Iterator, 

1541 timeout: Optional[float] = None, 

1542 metadata: Optional[MetadataType] = None, 

1543 credentials: Optional[grpc.CallCredentials] = None, 

1544 wait_for_ready: Optional[bool] = None, 

1545 compression: Optional[grpc.Compression] = None, 

1546 ) -> Tuple[Any, grpc.Call]: 

1547 ( 

1548 state, 

1549 call, 

1550 ) = self._blocking( 

1551 request_iterator, 

1552 timeout, 

1553 metadata, 

1554 credentials, 

1555 wait_for_ready, 

1556 compression, 

1557 ) 

1558 return _end_unary_response_blocking(state, call, True, None) 

1559 

1560 def future( 

1561 self, 

1562 request_iterator: Iterator, 

1563 timeout: Optional[float] = None, 

1564 metadata: Optional[MetadataType] = None, 

1565 credentials: Optional[grpc.CallCredentials] = None, 

1566 wait_for_ready: Optional[bool] = None, 

1567 compression: Optional[grpc.Compression] = None, 

1568 ) -> _MultiThreadedRendezvous: 

1569 deadline = _deadline(timeout) 

1570 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1571 event_handler = _event_handler(state, self._response_deserializer) 

1572 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1573 wait_for_ready 

1574 ) 

1575 augmented_metadata = _compression.augment_metadata( 

1576 metadata, compression 

1577 ) 

1578 state.rpc_start_time = time.perf_counter() 

1579 state.method = _common.decode(self._method) 

1580 state.target = _common.decode(self._target) 

1581 call = self._managed_call( 

1582 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1583 self._method, 

1584 None, 

1585 deadline, 

1586 augmented_metadata, 

1587 None if credentials is None else credentials._credentials, 

1588 _stream_unary_invocation_operations( 

1589 metadata, initial_metadata_flags 

1590 ), 

1591 event_handler, 

1592 self._context, 

1593 self._registered_call_handle, 

1594 ) 

1595 _consume_request_iterator( 

1596 request_iterator, 

1597 state, 

1598 call, 

1599 self._request_serializer, 

1600 event_handler, 

1601 ) 

1602 return _MultiThreadedRendezvous( 

1603 state, call, self._response_deserializer, deadline 

1604 ) 

1605 

1606 

1607class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): 

1608 _channel: cygrpc.Channel 

1609 _managed_call: IntegratedCallFactory 

1610 _method: bytes 

1611 _target: bytes 

1612 _request_serializer: Optional[SerializingFunction] 

1613 _response_deserializer: Optional[DeserializingFunction] 

1614 _context: Any 

1615 _registered_call_handle: Optional[int] 

1616 

1617 __slots__ = [ 

1618 "_channel", 

1619 "_managed_call", 

1620 "_method", 

1621 "_target", 

1622 "_request_serializer", 

1623 "_response_deserializer", 

1624 "_context", 

1625 ] 

1626 

1627 # pylint: disable=too-many-arguments 

1628 def __init__( 

1629 self, 

1630 channel: cygrpc.Channel, 

1631 managed_call: IntegratedCallFactory, 

1632 method: bytes, 

1633 target: bytes, 

1634 request_serializer: Optional[SerializingFunction], 

1635 response_deserializer: Optional[DeserializingFunction], 

1636 _registered_call_handle: Optional[int], 

1637 ): 

1638 self._channel = channel 

1639 self._managed_call = managed_call 

1640 self._method = method 

1641 self._target = target 

1642 self._request_serializer = request_serializer 

1643 self._response_deserializer = response_deserializer 

1644 self._context = cygrpc.build_census_context() 

1645 self._registered_call_handle = _registered_call_handle 

1646 

1647 def __call__( 

1648 self, 

1649 request_iterator: Iterator, 

1650 timeout: Optional[float] = None, 

1651 metadata: Optional[MetadataType] = None, 

1652 credentials: Optional[grpc.CallCredentials] = None, 

1653 wait_for_ready: Optional[bool] = None, 

1654 compression: Optional[grpc.Compression] = None, 

1655 ) -> _MultiThreadedRendezvous: 

1656 deadline = _deadline(timeout) 

1657 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) 

1658 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1659 wait_for_ready 

1660 ) 

1661 augmented_metadata = _compression.augment_metadata( 

1662 metadata, compression 

1663 ) 

1664 operations = ( 

1665 ( 

1666 cygrpc.SendInitialMetadataOperation( 

1667 augmented_metadata, initial_metadata_flags 

1668 ), 

1669 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1670 ), 

1671 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1672 ) 

1673 event_handler = _event_handler(state, self._response_deserializer) 

1674 state.rpc_start_time = time.perf_counter() 

1675 state.method = _common.decode(self._method) 

1676 state.target = _common.decode(self._target) 

1677 call = self._managed_call( 

1678 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1679 self._method, 

1680 None, 

1681 _determine_deadline(deadline), 

1682 augmented_metadata, 

1683 None if credentials is None else credentials._credentials, 

1684 operations, 

1685 event_handler, 

1686 self._context, 

1687 self._registered_call_handle, 

1688 ) 

1689 _consume_request_iterator( 

1690 request_iterator, 

1691 state, 

1692 call, 

1693 self._request_serializer, 

1694 event_handler, 

1695 ) 

1696 return _MultiThreadedRendezvous( 

1697 state, call, self._response_deserializer, deadline 

1698 ) 

1699 

1700 

1701class _InitialMetadataFlags(int): 

1702 """Stores immutable initial metadata flags""" 

1703 

1704 def __new__(cls, value: int = _EMPTY_FLAGS): 

1705 value &= cygrpc.InitialMetadataFlags.used_mask 

1706 return super(_InitialMetadataFlags, cls).__new__(cls, value) 

1707 

1708 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int: 

1709 if wait_for_ready is not None: 

1710 if wait_for_ready: 

1711 return self.__class__( 

1712 self 

1713 | cygrpc.InitialMetadataFlags.wait_for_ready 

1714 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1715 ) 

1716 elif not wait_for_ready: 

1717 return self.__class__( 

1718 self & ~cygrpc.InitialMetadataFlags.wait_for_ready 

1719 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1720 ) 

1721 return self 

1722 

1723 

1724class _ChannelCallState(object): 

1725 channel: cygrpc.Channel 

1726 managed_calls: int 

1727 threading: bool 

1728 

1729 def __init__(self, channel: cygrpc.Channel): 

1730 self.lock = threading.Lock() 

1731 self.channel = channel 

1732 self.managed_calls = 0 

1733 self.threading = False 

1734 

1735 def reset_postfork_child(self) -> None: 

1736 self.managed_calls = 0 

1737 

1738 def __del__(self): 

1739 try: 

1740 self.channel.close( 

1741 cygrpc.StatusCode.cancelled, "Channel deallocated!" 

1742 ) 

1743 except (TypeError, AttributeError): 

1744 pass 

1745 

1746 

1747def _run_channel_spin_thread(state: _ChannelCallState) -> None: 

1748 def channel_spin(): 

1749 while True: 

1750 cygrpc.block_if_fork_in_progress(state) 

1751 event = state.channel.next_call_event() 

1752 if event.completion_type == cygrpc.CompletionType.queue_timeout: 

1753 continue 

1754 call_completed = event.tag(event) 

1755 if call_completed: 

1756 with state.lock: 

1757 state.managed_calls -= 1 

1758 if state.managed_calls == 0: 

1759 return 

1760 

1761 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin) 

1762 channel_spin_thread.setDaemon(True) 

1763 channel_spin_thread.start() 

1764 

1765 

1766def _channel_managed_call_management(state: _ChannelCallState): 

1767 # pylint: disable=too-many-arguments 

1768 def create( 

1769 flags: int, 

1770 method: bytes, 

1771 host: Optional[str], 

1772 deadline: Optional[float], 

1773 metadata: Optional[MetadataType], 

1774 credentials: Optional[cygrpc.CallCredentials], 

1775 operations: Sequence[Sequence[cygrpc.Operation]], 

1776 event_handler: UserTag, 

1777 context: Any, 

1778 _registered_call_handle: Optional[int], 

1779 ) -> cygrpc.IntegratedCall: 

1780 """Creates a cygrpc.IntegratedCall. 

1781 

1782 Args: 

1783 flags: An integer bitfield of call flags. 

1784 method: The RPC method. 

1785 host: A host string for the created call. 

1786 deadline: A float to be the deadline of the created call or None if 

1787 the call is to have an infinite deadline. 

1788 metadata: The metadata for the call or None. 

1789 credentials: A cygrpc.CallCredentials or None. 

1790 operations: A sequence of sequences of cygrpc.Operations to be 

1791 started on the call. 

1792 event_handler: A behavior to call to handle the events resultant from 

1793 the operations on the call. 

1794 context: Context object for distributed tracing. 

1795 _registered_call_handle: An int representing the call handle of the 

1796 method, or None if the method is not registered. 

1797 Returns: 

1798 A cygrpc.IntegratedCall with which to conduct an RPC. 

1799 """ 

1800 operations_and_tags = tuple( 

1801 ( 

1802 operation, 

1803 event_handler, 

1804 ) 

1805 for operation in operations 

1806 ) 

1807 with state.lock: 

1808 call = state.channel.integrated_call( 

1809 flags, 

1810 method, 

1811 host, 

1812 deadline, 

1813 metadata, 

1814 credentials, 

1815 operations_and_tags, 

1816 context, 

1817 _registered_call_handle, 

1818 ) 

1819 if state.managed_calls == 0: 

1820 state.managed_calls = 1 

1821 _run_channel_spin_thread(state) 

1822 else: 

1823 state.managed_calls += 1 

1824 return call 

1825 

1826 return create 

1827 

1828 

1829class _ChannelConnectivityState(object): 

1830 lock: threading.RLock 

1831 channel: grpc.Channel 

1832 polling: bool 

1833 connectivity: grpc.ChannelConnectivity 

1834 try_to_connect: bool 

1835 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704 

1836 callbacks_and_connectivities: List[ 

1837 Sequence[ 

1838 Union[ 

1839 Callable[[grpc.ChannelConnectivity], None], 

1840 Optional[grpc.ChannelConnectivity], 

1841 ] 

1842 ] 

1843 ] 

1844 delivering: bool 

1845 

1846 def __init__(self, channel: grpc.Channel): 

1847 self.lock = threading.RLock() 

1848 self.channel = channel 

1849 self.polling = False 

1850 self.connectivity = None 

1851 self.try_to_connect = False 

1852 self.callbacks_and_connectivities = [] 

1853 self.delivering = False 

1854 

1855 def reset_postfork_child(self) -> None: 

1856 self.polling = False 

1857 self.connectivity = None 

1858 self.try_to_connect = False 

1859 self.callbacks_and_connectivities = [] 

1860 self.delivering = False 

1861 

1862 

1863def _deliveries( 

1864 state: _ChannelConnectivityState, 

1865) -> List[Callable[[grpc.ChannelConnectivity], None]]: 

1866 callbacks_needing_update = [] 

1867 for callback_and_connectivity in state.callbacks_and_connectivities: 

1868 ( 

1869 callback, 

1870 callback_connectivity, 

1871 ) = callback_and_connectivity 

1872 if callback_connectivity is not state.connectivity: 

1873 callbacks_needing_update.append(callback) 

1874 callback_and_connectivity[1] = state.connectivity 

1875 return callbacks_needing_update 

1876 

1877 

1878def _deliver( 

1879 state: _ChannelConnectivityState, 

1880 initial_connectivity: grpc.ChannelConnectivity, 

1881 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1882) -> None: 

1883 connectivity = initial_connectivity 

1884 callbacks = initial_callbacks 

1885 while True: 

1886 for callback in callbacks: 

1887 cygrpc.block_if_fork_in_progress(state) 

1888 try: 

1889 callback(connectivity) 

1890 except Exception: # pylint: disable=broad-except 

1891 _LOGGER.exception( 

1892 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE 

1893 ) 

1894 with state.lock: 

1895 callbacks = _deliveries(state) 

1896 if callbacks: 

1897 connectivity = state.connectivity 

1898 else: 

1899 state.delivering = False 

1900 return 

1901 

1902 

1903def _spawn_delivery( 

1904 state: _ChannelConnectivityState, 

1905 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1906) -> None: 

1907 delivering_thread = cygrpc.ForkManagedThread( 

1908 target=_deliver, 

1909 args=( 

1910 state, 

1911 state.connectivity, 

1912 callbacks, 

1913 ), 

1914 ) 

1915 delivering_thread.setDaemon(True) 

1916 delivering_thread.start() 

1917 state.delivering = True 

1918 

1919 

1920# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll. 

1921def _poll_connectivity( 

1922 state: _ChannelConnectivityState, 

1923 channel: grpc.Channel, 

1924 initial_try_to_connect: bool, 

1925) -> None: 

1926 try_to_connect = initial_try_to_connect 

1927 connectivity = channel.check_connectivity_state(try_to_connect) 

1928 with state.lock: 

1929 state.connectivity = ( 

1930 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1931 connectivity 

1932 ] 

1933 ) 

1934 callbacks = tuple( 

1935 callback for callback, _ in state.callbacks_and_connectivities 

1936 ) 

1937 for callback_and_connectivity in state.callbacks_and_connectivities: 

1938 callback_and_connectivity[1] = state.connectivity 

1939 if callbacks: 

1940 _spawn_delivery(state, callbacks) 

1941 while True: 

1942 event = channel.watch_connectivity_state( 

1943 connectivity, time.time() + 0.2 

1944 ) 

1945 cygrpc.block_if_fork_in_progress(state) 

1946 with state.lock: 

1947 if ( 

1948 not state.callbacks_and_connectivities 

1949 and not state.try_to_connect 

1950 ): 

1951 state.polling = False 

1952 state.connectivity = None 

1953 break 

1954 try_to_connect = state.try_to_connect 

1955 state.try_to_connect = False 

1956 if event.success or try_to_connect: 

1957 connectivity = channel.check_connectivity_state(try_to_connect) 

1958 with state.lock: 

1959 state.connectivity = ( 

1960 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1961 connectivity 

1962 ] 

1963 ) 

1964 if not state.delivering: 

1965 callbacks = _deliveries(state) 

1966 if callbacks: 

1967 _spawn_delivery(state, callbacks) 

1968 

1969 

1970def _subscribe( 

1971 state: _ChannelConnectivityState, 

1972 callback: Callable[[grpc.ChannelConnectivity], None], 

1973 try_to_connect: bool, 

1974) -> None: 

1975 with state.lock: 

1976 if not state.callbacks_and_connectivities and not state.polling: 

1977 polling_thread = cygrpc.ForkManagedThread( 

1978 target=_poll_connectivity, 

1979 args=(state, state.channel, bool(try_to_connect)), 

1980 ) 

1981 polling_thread.setDaemon(True) 

1982 polling_thread.start() 

1983 state.polling = True 

1984 state.callbacks_and_connectivities.append([callback, None]) 

1985 elif not state.delivering and state.connectivity is not None: 

1986 _spawn_delivery(state, (callback,)) 

1987 state.try_to_connect |= bool(try_to_connect) 

1988 state.callbacks_and_connectivities.append( 

1989 [callback, state.connectivity] 

1990 ) 

1991 else: 

1992 state.try_to_connect |= bool(try_to_connect) 

1993 state.callbacks_and_connectivities.append([callback, None]) 

1994 

1995 

1996def _unsubscribe( 

1997 state: _ChannelConnectivityState, 

1998 callback: Callable[[grpc.ChannelConnectivity], None], 

1999) -> None: 

2000 with state.lock: 

2001 for index, (subscribed_callback, unused_connectivity) in enumerate( 

2002 state.callbacks_and_connectivities 

2003 ): 

2004 if callback == subscribed_callback: 

2005 state.callbacks_and_connectivities.pop(index) 

2006 break 

2007 

2008 

2009def _augment_options( 

2010 base_options: Sequence[ChannelArgumentType], 

2011 compression: Optional[grpc.Compression], 

2012) -> Sequence[ChannelArgumentType]: 

2013 compression_option = _compression.create_channel_option(compression) 

2014 return ( 

2015 tuple(base_options) 

2016 + compression_option 

2017 + ( 

2018 ( 

2019 cygrpc.ChannelArgKey.primary_user_agent_string, 

2020 _USER_AGENT, 

2021 ), 

2022 ) 

2023 ) 

2024 

2025 

2026def _separate_channel_options( 

2027 options: Sequence[ChannelArgumentType], 

2028) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]: 

2029 """Separates core channel options from Python channel options.""" 

2030 core_options = [] 

2031 python_options = [] 

2032 for pair in options: 

2033 if ( 

2034 pair[0] 

2035 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

2036 ): 

2037 python_options.append(pair) 

2038 else: 

2039 core_options.append(pair) 

2040 return python_options, core_options 

2041 

2042 

2043class Channel(grpc.Channel): 

2044 """A cygrpc.Channel-backed implementation of grpc.Channel.""" 

2045 

2046 _single_threaded_unary_stream: bool 

2047 _channel: cygrpc.Channel 

2048 _call_state: _ChannelCallState 

2049 _connectivity_state: _ChannelConnectivityState 

2050 _target: str 

2051 _registered_call_handles: Dict[str, int] 

2052 

2053 def __init__( 

2054 self, 

2055 target: str, 

2056 options: Sequence[ChannelArgumentType], 

2057 credentials: Optional[grpc.ChannelCredentials], 

2058 compression: Optional[grpc.Compression], 

2059 ): 

2060 """Constructor. 

2061 

2062 Args: 

2063 target: The target to which to connect. 

2064 options: Configuration options for the channel. 

2065 credentials: A cygrpc.ChannelCredentials or None. 

2066 compression: An optional value indicating the compression method to be 

2067 used over the lifetime of the channel. 

2068 """ 

2069 python_options, core_options = _separate_channel_options(options) 

2070 self._single_threaded_unary_stream = ( 

2071 _DEFAULT_SINGLE_THREADED_UNARY_STREAM 

2072 ) 

2073 self._process_python_options(python_options) 

2074 self._channel = cygrpc.Channel( 

2075 _common.encode(target), 

2076 _augment_options(core_options, compression), 

2077 credentials, 

2078 ) 

2079 self._target = target 

2080 self._call_state = _ChannelCallState(self._channel) 

2081 self._connectivity_state = _ChannelConnectivityState(self._channel) 

2082 cygrpc.fork_register_channel(self) 

2083 if cygrpc.g_gevent_activated: 

2084 cygrpc.gevent_increment_channel_count() 

2085 

2086 def _get_registered_call_handle(self, method: str) -> int: 

2087 """ 

2088 Get the registered call handle for a method. 

2089 

2090 This is a semi-private method. It is intended for use only by gRPC generated code. 

2091 

2092 This method is not thread-safe. 

2093 

2094 Args: 

2095 method: Required, the method name for the RPC. 

2096 

2097 Returns: 

2098 The registered call handle pointer in the form of a Python Long. 

2099 """ 

2100 return self._channel.get_registered_call_handle(_common.encode(method)) 

2101 

2102 def _process_python_options( 

2103 self, python_options: Sequence[ChannelArgumentType] 

2104 ) -> None: 

2105 """Sets channel attributes according to python-only channel options.""" 

2106 for pair in python_options: 

2107 if ( 

2108 pair[0] 

2109 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

2110 ): 

2111 self._single_threaded_unary_stream = True 

2112 

2113 def subscribe( 

2114 self, 

2115 callback: Callable[[grpc.ChannelConnectivity], None], 

2116 try_to_connect: Optional[bool] = None, 

2117 ) -> None: 

2118 _subscribe(self._connectivity_state, callback, try_to_connect) 

2119 

2120 def unsubscribe( 

2121 self, callback: Callable[[grpc.ChannelConnectivity], None] 

2122 ) -> None: 

2123 _unsubscribe(self._connectivity_state, callback) 

2124 

2125 # pylint: disable=arguments-differ 

2126 def unary_unary( 

2127 self, 

2128 method: str, 

2129 request_serializer: Optional[SerializingFunction] = None, 

2130 response_deserializer: Optional[DeserializingFunction] = None, 

2131 _registered_method: Optional[bool] = False, 

2132 ) -> grpc.UnaryUnaryMultiCallable: 

2133 _registered_call_handle = None 

2134 if _registered_method: 

2135 _registered_call_handle = self._get_registered_call_handle(method) 

2136 return _UnaryUnaryMultiCallable( 

2137 self._channel, 

2138 _channel_managed_call_management(self._call_state), 

2139 _common.encode(method), 

2140 _common.encode(self._target), 

2141 request_serializer, 

2142 response_deserializer, 

2143 _registered_call_handle, 

2144 ) 

2145 

2146 # pylint: disable=arguments-differ 

2147 def unary_stream( 

2148 self, 

2149 method: str, 

2150 request_serializer: Optional[SerializingFunction] = None, 

2151 response_deserializer: Optional[DeserializingFunction] = None, 

2152 _registered_method: Optional[bool] = False, 

2153 ) -> grpc.UnaryStreamMultiCallable: 

2154 _registered_call_handle = None 

2155 if _registered_method: 

2156 _registered_call_handle = self._get_registered_call_handle(method) 

2157 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC 

2158 # on a single Python thread results in an appreciable speed-up. However, 

2159 # due to slight differences in capability, the multi-threaded variant 

2160 # remains the default. 

2161 if self._single_threaded_unary_stream: 

2162 return _SingleThreadedUnaryStreamMultiCallable( 

2163 self._channel, 

2164 _common.encode(method), 

2165 _common.encode(self._target), 

2166 request_serializer, 

2167 response_deserializer, 

2168 _registered_call_handle, 

2169 ) 

2170 else: 

2171 return _UnaryStreamMultiCallable( 

2172 self._channel, 

2173 _channel_managed_call_management(self._call_state), 

2174 _common.encode(method), 

2175 _common.encode(self._target), 

2176 request_serializer, 

2177 response_deserializer, 

2178 _registered_call_handle, 

2179 ) 

2180 

2181 # pylint: disable=arguments-differ 

2182 def stream_unary( 

2183 self, 

2184 method: str, 

2185 request_serializer: Optional[SerializingFunction] = None, 

2186 response_deserializer: Optional[DeserializingFunction] = None, 

2187 _registered_method: Optional[bool] = False, 

2188 ) -> grpc.StreamUnaryMultiCallable: 

2189 _registered_call_handle = None 

2190 if _registered_method: 

2191 _registered_call_handle = self._get_registered_call_handle(method) 

2192 return _StreamUnaryMultiCallable( 

2193 self._channel, 

2194 _channel_managed_call_management(self._call_state), 

2195 _common.encode(method), 

2196 _common.encode(self._target), 

2197 request_serializer, 

2198 response_deserializer, 

2199 _registered_call_handle, 

2200 ) 

2201 

2202 # pylint: disable=arguments-differ 

2203 def stream_stream( 

2204 self, 

2205 method: str, 

2206 request_serializer: Optional[SerializingFunction] = None, 

2207 response_deserializer: Optional[DeserializingFunction] = None, 

2208 _registered_method: Optional[bool] = False, 

2209 ) -> grpc.StreamStreamMultiCallable: 

2210 _registered_call_handle = None 

2211 if _registered_method: 

2212 _registered_call_handle = self._get_registered_call_handle(method) 

2213 return _StreamStreamMultiCallable( 

2214 self._channel, 

2215 _channel_managed_call_management(self._call_state), 

2216 _common.encode(method), 

2217 _common.encode(self._target), 

2218 request_serializer, 

2219 response_deserializer, 

2220 _registered_call_handle, 

2221 ) 

2222 

2223 def _unsubscribe_all(self) -> None: 

2224 state = self._connectivity_state 

2225 if state: 

2226 with state.lock: 

2227 del state.callbacks_and_connectivities[:] 

2228 

2229 def _close(self) -> None: 

2230 self._unsubscribe_all() 

2231 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!") 

2232 cygrpc.fork_unregister_channel(self) 

2233 if cygrpc.g_gevent_activated: 

2234 cygrpc.gevent_decrement_channel_count() 

2235 

2236 def _close_on_fork(self) -> None: 

2237 self._unsubscribe_all() 

2238 self._channel.close_on_fork( 

2239 cygrpc.StatusCode.cancelled, "Channel closed due to fork" 

2240 ) 

2241 

2242 def __enter__(self): 

2243 return self 

2244 

2245 def __exit__(self, exc_type, exc_val, exc_tb): 

2246 self._close() 

2247 return False 

2248 

2249 def close(self) -> None: 

2250 self._close() 

2251 

2252 def __del__(self): 

2253 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases 

2254 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call 

2255 # here (or more likely, call self._close() here). We don't do this today 

2256 # because many valid use cases today allow the channel to be deleted 

2257 # immediately after stubs are created. After a sufficient period of time 

2258 # has passed for all users to be trusted to freeze out to their channels 

2259 # for as long as they are in use and to close them after using them, 

2260 # then deletion of this grpc._channel.Channel instance can be made to 

2261 # effect closure of the underlying cygrpc.Channel instance. 

2262 try: 

2263 self._unsubscribe_all() 

2264 except: # pylint: disable=bare-except 

2265 # Exceptions in __del__ are ignored by Python anyway, but they can 

2266 # keep spamming logs. Just silence them. 

2267 pass