Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_channel.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

981 statements  

1# Copyright 2016 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Python.""" 

15 

16import copy 

17import functools 

18import logging 

19import os 

20import sys 

21import threading 

22import time 

23import types 

24from typing import ( 

25 Any, 

26 Callable, 

27 Dict, 

28 Iterator, 

29 List, 

30 Optional, 

31 Sequence, 

32 Set, 

33 Tuple, 

34 Union, 

35) 

36 

37import grpc # pytype: disable=pyi-error 

38from grpc import _common # pytype: disable=pyi-error 

39from grpc import _compression # pytype: disable=pyi-error 

40from grpc import _grpcio_metadata # pytype: disable=pyi-error 

41from grpc import _observability # pytype: disable=pyi-error 

42from grpc._cython import cygrpc 

43from grpc._typing import ChannelArgumentType 

44from grpc._typing import DeserializingFunction 

45from grpc._typing import IntegratedCallFactory 

46from grpc._typing import MetadataType 

47from grpc._typing import NullaryCallbackType 

48from grpc._typing import ResponseType 

49from grpc._typing import SerializingFunction 

50from grpc._typing import UserTag 

51import grpc.experimental # pytype: disable=pyi-error 

52 

53_LOGGER = logging.getLogger(__name__) 

54 

55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__) 

56 

57_EMPTY_FLAGS = 0 

58 

59# NOTE(rbellevi): No guarantees are given about the maintenance of this 

60# environment variable. 

61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = ( 

62 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None 

63) 

64 

65_UNARY_UNARY_INITIAL_DUE = ( 

66 cygrpc.OperationType.send_initial_metadata, 

67 cygrpc.OperationType.send_message, 

68 cygrpc.OperationType.send_close_from_client, 

69 cygrpc.OperationType.receive_initial_metadata, 

70 cygrpc.OperationType.receive_message, 

71 cygrpc.OperationType.receive_status_on_client, 

72) 

73_UNARY_STREAM_INITIAL_DUE = ( 

74 cygrpc.OperationType.send_initial_metadata, 

75 cygrpc.OperationType.send_message, 

76 cygrpc.OperationType.send_close_from_client, 

77 cygrpc.OperationType.receive_initial_metadata, 

78 cygrpc.OperationType.receive_status_on_client, 

79) 

80_STREAM_UNARY_INITIAL_DUE = ( 

81 cygrpc.OperationType.send_initial_metadata, 

82 cygrpc.OperationType.receive_initial_metadata, 

83 cygrpc.OperationType.receive_message, 

84 cygrpc.OperationType.receive_status_on_client, 

85) 

86_STREAM_STREAM_INITIAL_DUE = ( 

87 cygrpc.OperationType.send_initial_metadata, 

88 cygrpc.OperationType.receive_initial_metadata, 

89 cygrpc.OperationType.receive_status_on_client, 

90) 

91 

92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 

93 "Exception calling channel subscription callback!" 

94) 

95 

96_OK_RENDEZVOUS_REPR_FORMAT = ( 

97 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' 

98) 

99 

100_NON_OK_RENDEZVOUS_REPR_FORMAT = ( 

101 "<{} of RPC that terminated with:\n" 

102 "\tstatus = {}\n" 

103 '\tdetails = "{}"\n' 

104 '\tdebug_error_string = "{}"\n' 

105 ">" 

106) 

107 

108 

109def _deadline(timeout: Optional[float]) -> Optional[float]: 

110 return None if timeout is None else time.time() + timeout 

111 

112 

113def _unknown_code_details( 

114 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str] 

115) -> str: 

116 return 'Server sent unknown code {} and details "{}"'.format( 

117 unknown_cygrpc_code, details 

118 ) 

119 

120 

121class _RPCState(object): 

122 condition: threading.Condition 

123 due: Set[cygrpc.OperationType] 

124 initial_metadata: Optional[MetadataType] 

125 response: Any 

126 trailing_metadata: Optional[MetadataType] 

127 code: Optional[grpc.StatusCode] 

128 details: Optional[str] 

129 debug_error_string: Optional[str] 

130 cancelled: bool 

131 callbacks: List[NullaryCallbackType] 

132 fork_epoch: Optional[int] 

133 rpc_start_time: Optional[float] # In relative seconds 

134 rpc_end_time: Optional[float] # In relative seconds 

135 method: Optional[str] 

136 target: Optional[str] 

137 

138 def __init__( 

139 self, 

140 due: Sequence[cygrpc.OperationType], 

141 initial_metadata: Optional[MetadataType], 

142 trailing_metadata: Optional[MetadataType], 

143 code: Optional[grpc.StatusCode], 

144 details: Optional[str], 

145 ): 

146 # `condition` guards all members of _RPCState. `notify_all` is called on 

147 # `condition` when the state of the RPC has changed. 

148 self.condition = threading.Condition() 

149 

150 # The cygrpc.OperationType objects representing events due from the RPC's 

151 # completion queue. If an operation is in `due`, it is guaranteed that 

152 # `operate()` has been called on a corresponding operation. But the 

153 # converse is not true. That is, in the case of failed `operate()` 

154 # calls, there may briefly be events in `due` that do not correspond to 

155 # operations submitted to Core. 

156 self.due = set(due) 

157 self.initial_metadata = initial_metadata 

158 self.response = None 

159 self.trailing_metadata = trailing_metadata 

160 self.code = code 

161 self.details = details 

162 self.debug_error_string = None 

163 # The following three fields are used for observability. 

164 # Updates to those fields do not trigger self.condition. 

165 self.rpc_start_time = None 

166 self.rpc_end_time = None 

167 self.method = None 

168 self.target = None 

169 

170 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are 

171 # slightly wonky, so they have to be tracked separately from the rest of the 

172 # result of the RPC. This field tracks whether cancellation was requested 

173 # prior to termination of the RPC. 

174 self.cancelled = False 

175 self.callbacks = [] 

176 self.fork_epoch = cygrpc.get_fork_epoch() 

177 

178 def reset_postfork_child(self): 

179 self.condition = threading.Condition() 

180 

181 

182def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None: 

183 if state.code is None: 

184 state.code = code 

185 state.details = details 

186 if state.initial_metadata is None: 

187 state.initial_metadata = () 

188 state.trailing_metadata = () 

189 

190 

191def _handle_event( 

192 event: cygrpc.BaseEvent, 

193 state: _RPCState, 

194 response_deserializer: Optional[DeserializingFunction], 

195) -> List[NullaryCallbackType]: 

196 callbacks = [] 

197 for batch_operation in event.batch_operations: 

198 operation_type = batch_operation.type() 

199 state.due.remove(operation_type) 

200 if operation_type == cygrpc.OperationType.receive_initial_metadata: 

201 state.initial_metadata = batch_operation.initial_metadata() 

202 elif operation_type == cygrpc.OperationType.receive_message: 

203 serialized_response = batch_operation.message() 

204 if serialized_response is not None: 

205 response = _common.deserialize( 

206 serialized_response, response_deserializer 

207 ) 

208 if response is None: 

209 details = "Exception deserializing response!" 

210 _abort(state, grpc.StatusCode.INTERNAL, details) 

211 else: 

212 state.response = response 

213 elif operation_type == cygrpc.OperationType.receive_status_on_client: 

214 state.trailing_metadata = batch_operation.trailing_metadata() 

215 if state.code is None: 

216 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( 

217 batch_operation.code() 

218 ) 

219 if code is None: 

220 state.code = grpc.StatusCode.UNKNOWN 

221 state.details = _unknown_code_details( 

222 code, batch_operation.details() 

223 ) 

224 else: 

225 state.code = code 

226 state.details = batch_operation.details() 

227 state.debug_error_string = batch_operation.error_string() 

228 state.rpc_end_time = time.perf_counter() 

229 _observability.maybe_record_rpc_latency(state) 

230 callbacks.extend(state.callbacks) 

231 state.callbacks = None 

232 return callbacks 

233 

234 

235def _event_handler( 

236 state: _RPCState, response_deserializer: Optional[DeserializingFunction] 

237) -> UserTag: 

238 def handle_event(event): 

239 with state.condition: 

240 callbacks = _handle_event(event, state, response_deserializer) 

241 state.condition.notify_all() 

242 done = not state.due 

243 for callback in callbacks: 

244 try: 

245 callback() 

246 except Exception as e: # pylint: disable=broad-except 

247 # NOTE(rbellevi): We suppress but log errors here so as not to 

248 # kill the channel spin thread. 

249 _LOGGER.error( 

250 "Exception in callback %s: %s", repr(callback.func), repr(e) 

251 ) 

252 return done and state.fork_epoch >= cygrpc.get_fork_epoch() 

253 

254 return handle_event 

255 

256 

257# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall. 

258# pylint: disable=too-many-statements 

259def _consume_request_iterator( 

260 request_iterator: Iterator, 

261 state: _RPCState, 

262 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall], 

263 request_serializer: SerializingFunction, 

264 event_handler: Optional[UserTag], 

265) -> None: 

266 """Consume a request supplied by the user.""" 

267 

268 def consume_request_iterator(): # pylint: disable=too-many-branches 

269 # Iterate over the request iterator until it is exhausted or an error 

270 # condition is encountered. 

271 while True: 

272 return_from_user_request_generator_invoked = False 

273 try: 

274 # The thread may die in user-code. Do not block fork for this. 

275 cygrpc.enter_user_request_generator() 

276 request = next(request_iterator) 

277 except StopIteration: 

278 break 

279 except Exception: # pylint: disable=broad-except 

280 cygrpc.return_from_user_request_generator() 

281 return_from_user_request_generator_invoked = True 

282 code = grpc.StatusCode.UNKNOWN 

283 details = "Exception iterating requests!" 

284 _LOGGER.exception(details) 

285 call.cancel( 

286 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

287 ) 

288 _abort(state, code, details) 

289 return 

290 finally: 

291 if not return_from_user_request_generator_invoked: 

292 cygrpc.return_from_user_request_generator() 

293 serialized_request = _common.serialize(request, request_serializer) 

294 with state.condition: 

295 if state.code is None and not state.cancelled: 

296 if serialized_request is None: 

297 code = grpc.StatusCode.INTERNAL 

298 details = "Exception serializing request!" 

299 call.cancel( 

300 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 

301 details, 

302 ) 

303 _abort(state, code, details) 

304 return 

305 state.due.add(cygrpc.OperationType.send_message) 

306 operations = ( 

307 cygrpc.SendMessageOperation( 

308 serialized_request, _EMPTY_FLAGS 

309 ), 

310 ) 

311 operating = call.operate(operations, event_handler) 

312 if not operating: 

313 state.due.remove(cygrpc.OperationType.send_message) 

314 return 

315 

316 def _done(): 

317 return ( 

318 state.code is not None 

319 or cygrpc.OperationType.send_message 

320 not in state.due 

321 ) 

322 

323 _common.wait( 

324 state.condition.wait, 

325 _done, 

326 spin_cb=functools.partial( 

327 cygrpc.block_if_fork_in_progress, state 

328 ), 

329 ) 

330 if state.code is not None: 

331 return 

332 else: 

333 return 

334 with state.condition: 

335 if state.code is None: 

336 state.due.add(cygrpc.OperationType.send_close_from_client) 

337 operations = ( 

338 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

339 ) 

340 operating = call.operate(operations, event_handler) 

341 if not operating: 

342 state.due.remove( 

343 cygrpc.OperationType.send_close_from_client 

344 ) 

345 

346 consumption_thread = cygrpc.ForkManagedThread( 

347 target=consume_request_iterator 

348 ) 

349 consumption_thread.setDaemon(True) 

350 consumption_thread.start() 

351 

352 

353def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str: 

354 """Calculates error string for RPC.""" 

355 with rpc_state.condition: 

356 if rpc_state.code is None: 

357 return "<{} object>".format(class_name) 

358 if rpc_state.code is grpc.StatusCode.OK: 

359 return _OK_RENDEZVOUS_REPR_FORMAT.format( 

360 class_name, rpc_state.code, rpc_state.details 

361 ) 

362 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( 

363 class_name, 

364 rpc_state.code, 

365 rpc_state.details, 

366 rpc_state.debug_error_string, 

367 ) 

368 

369 

370class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future): 

371 """An RPC error not tied to the execution of a particular RPC. 

372 

373 The RPC represented by the state object must not be in-progress or 

374 cancelled. 

375 

376 Attributes: 

377 _state: An instance of _RPCState. 

378 """ 

379 

380 _state: _RPCState 

381 

382 def __init__(self, state: _RPCState): 

383 with state.condition: 

384 self._state = _RPCState( 

385 (), 

386 copy.deepcopy(state.initial_metadata), 

387 copy.deepcopy(state.trailing_metadata), 

388 state.code, 

389 copy.deepcopy(state.details), 

390 ) 

391 self._state.response = copy.copy(state.response) 

392 self._state.debug_error_string = copy.copy(state.debug_error_string) 

393 

394 def initial_metadata(self) -> Optional[MetadataType]: 

395 return self._state.initial_metadata 

396 

397 def trailing_metadata(self) -> Optional[MetadataType]: 

398 return self._state.trailing_metadata 

399 

400 def code(self) -> Optional[grpc.StatusCode]: 

401 return self._state.code 

402 

403 def details(self) -> Optional[str]: 

404 return _common.decode(self._state.details) 

405 

406 def debug_error_string(self) -> Optional[str]: 

407 return _common.decode(self._state.debug_error_string) 

408 

409 def _repr(self) -> str: 

410 return _rpc_state_string(self.__class__.__name__, self._state) 

411 

412 def __repr__(self) -> str: 

413 return self._repr() 

414 

415 def __str__(self) -> str: 

416 return self._repr() 

417 

418 def cancel(self) -> bool: 

419 """See grpc.Future.cancel.""" 

420 return False 

421 

422 def cancelled(self) -> bool: 

423 """See grpc.Future.cancelled.""" 

424 return False 

425 

426 def running(self) -> bool: 

427 """See grpc.Future.running.""" 

428 return False 

429 

430 def done(self) -> bool: 

431 """See grpc.Future.done.""" 

432 return True 

433 

434 def result( 

435 self, timeout: Optional[float] = None 

436 ) -> Any: # pylint: disable=unused-argument 

437 """See grpc.Future.result.""" 

438 raise self 

439 

440 def exception( 

441 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

442 ) -> Optional[Exception]: 

443 """See grpc.Future.exception.""" 

444 return self 

445 

446 def traceback( 

447 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

448 ) -> Optional[types.TracebackType]: 

449 """See grpc.Future.traceback.""" 

450 try: 

451 raise self 

452 except grpc.RpcError: 

453 return sys.exc_info()[2] 

454 

455 def add_done_callback( 

456 self, 

457 fn: Callable[[grpc.Future], None], 

458 timeout: Optional[float] = None, # pylint: disable=unused-argument 

459 ) -> None: 

460 """See grpc.Future.add_done_callback.""" 

461 fn(self) 

462 

463 

464class _Rendezvous(grpc.RpcError, grpc.RpcContext): 

465 """An RPC iterator. 

466 

467 Attributes: 

468 _state: An instance of _RPCState. 

469 _call: An instance of SegregatedCall or IntegratedCall. 

470 In either case, the _call object is expected to have operate, cancel, 

471 and next_event methods. 

472 _response_deserializer: A callable taking bytes and return a Python 

473 object. 

474 _deadline: A float representing the deadline of the RPC in seconds. Or 

475 possibly None, to represent an RPC with no deadline at all. 

476 """ 

477 

478 _state: _RPCState 

479 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall] 

480 _response_deserializer: Optional[DeserializingFunction] 

481 _deadline: Optional[float] 

482 

483 def __init__( 

484 self, 

485 state: _RPCState, 

486 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall], 

487 response_deserializer: Optional[DeserializingFunction], 

488 deadline: Optional[float], 

489 ): 

490 super(_Rendezvous, self).__init__() 

491 self._state = state 

492 self._call = call 

493 self._response_deserializer = response_deserializer 

494 self._deadline = deadline 

495 

496 def is_active(self) -> bool: 

497 """See grpc.RpcContext.is_active""" 

498 with self._state.condition: 

499 return self._state.code is None 

500 

501 def time_remaining(self) -> Optional[float]: 

502 """See grpc.RpcContext.time_remaining""" 

503 with self._state.condition: 

504 if self._deadline is None: 

505 return None 

506 return max(self._deadline - time.time(), 0) 

507 

508 def cancel(self) -> bool: 

509 """See grpc.RpcContext.cancel""" 

510 with self._state.condition: 

511 if self._state.code is None: 

512 code = grpc.StatusCode.CANCELLED 

513 details = "Locally cancelled by application!" 

514 self._call.cancel( 

515 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

516 ) 

517 self._state.cancelled = True 

518 _abort(self._state, code, details) 

519 self._state.condition.notify_all() 

520 return True 

521 return False 

522 

523 def add_callback(self, callback: NullaryCallbackType) -> bool: 

524 """See grpc.RpcContext.add_callback""" 

525 with self._state.condition: 

526 if self._state.callbacks is None: 

527 return False 

528 self._state.callbacks.append(callback) 

529 return True 

530 

531 def __iter__(self): 

532 return self 

533 

534 def next(self): 

535 return self._next() 

536 

537 def __next__(self): 

538 return self._next() 

539 

540 def _next(self): 

541 raise NotImplementedError() 

542 

543 def debug_error_string(self) -> Optional[str]: 

544 raise NotImplementedError() 

545 

546 def _repr(self) -> str: 

547 return _rpc_state_string(self.__class__.__name__, self._state) 

548 

549 def __repr__(self) -> str: 

550 return self._repr() 

551 

552 def __str__(self) -> str: 

553 return self._repr() 

554 

555 def __del__(self) -> None: 

556 with self._state.condition: 

557 if self._state.code is None: 

558 self._state.code = grpc.StatusCode.CANCELLED 

559 self._state.details = "Cancelled upon garbage collection!" 

560 self._state.cancelled = True 

561 self._call.cancel( 

562 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], 

563 self._state.details, 

564 ) 

565 self._state.condition.notify_all() 

566 

567 

568class _SingleThreadedRendezvous( 

569 _Rendezvous, grpc.Call, grpc.Future 

570): # pylint: disable=too-many-ancestors 

571 """An RPC iterator operating entirely on a single thread. 

572 

573 The __next__ method of _SingleThreadedRendezvous does not depend on the 

574 existence of any other thread, including the "channel spin thread". 

575 However, this means that its interface is entirely synchronous. So this 

576 class cannot completely fulfill the grpc.Future interface. The result, 

577 exception, and traceback methods will never block and will instead raise 

578 an exception if calling the method would result in blocking. 

579 

580 This means that these methods are safe to call from add_done_callback 

581 handlers. 

582 """ 

583 

584 _state: _RPCState 

585 

586 def _is_complete(self) -> bool: 

587 return self._state.code is not None 

588 

589 def cancelled(self) -> bool: 

590 with self._state.condition: 

591 return self._state.cancelled 

592 

593 def running(self) -> bool: 

594 with self._state.condition: 

595 return self._state.code is None 

596 

597 def done(self) -> bool: 

598 with self._state.condition: 

599 return self._state.code is not None 

600 

601 def result(self, timeout: Optional[float] = None) -> Any: 

602 """Returns the result of the computation or raises its exception. 

603 

604 This method will never block. Instead, it will raise an exception 

605 if calling this method would otherwise result in blocking. 

606 

607 Since this method will never block, any `timeout` argument passed will 

608 be ignored. 

609 """ 

610 del timeout 

611 with self._state.condition: 

612 if not self._is_complete(): 

613 error_msg = ( 

614 "_SingleThreadedRendezvous only supports " 

615 "result() when the RPC is complete." 

616 ) 

617 raise grpc.experimental.UsageError(error_msg) 

618 if self._state.code is grpc.StatusCode.OK: 

619 return self._state.response 

620 if self._state.cancelled: 

621 raise grpc.FutureCancelledError() 

622 raise self 

623 

624 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

625 """Return the exception raised by the computation. 

626 

627 This method will never block. Instead, it will raise an exception 

628 if calling this method would otherwise result in blocking. 

629 

630 Since this method will never block, any `timeout` argument passed will 

631 be ignored. 

632 """ 

633 del timeout 

634 with self._state.condition: 

635 if not self._is_complete(): 

636 error_msg = ( 

637 "_SingleThreadedRendezvous only supports " 

638 "exception() when the RPC is complete." 

639 ) 

640 raise grpc.experimental.UsageError(error_msg) 

641 if self._state.code is grpc.StatusCode.OK: 

642 return None 

643 if self._state.cancelled: 

644 raise grpc.FutureCancelledError() 

645 return self 

646 

647 def traceback( 

648 self, timeout: Optional[float] = None 

649 ) -> Optional[types.TracebackType]: 

650 """Access the traceback of the exception raised by the computation. 

651 

652 This method will never block. Instead, it will raise an exception 

653 if calling this method would otherwise result in blocking. 

654 

655 Since this method will never block, any `timeout` argument passed will 

656 be ignored. 

657 """ 

658 del timeout 

659 with self._state.condition: 

660 if not self._is_complete(): 

661 msg = ( 

662 "_SingleThreadedRendezvous only supports " 

663 "traceback() when the RPC is complete." 

664 ) 

665 raise grpc.experimental.UsageError(msg) 

666 if self._state.code is grpc.StatusCode.OK: 

667 return None 

668 if self._state.cancelled: 

669 raise grpc.FutureCancelledError() 

670 try: 

671 raise self 

672 except grpc.RpcError: 

673 return sys.exc_info()[2] 

674 

675 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

676 with self._state.condition: 

677 if self._state.code is None: 

678 self._state.callbacks.append(functools.partial(fn, self)) 

679 return 

680 

681 fn(self) 

682 

683 def initial_metadata(self) -> Optional[MetadataType]: 

684 """See grpc.Call.initial_metadata""" 

685 with self._state.condition: 

686 # NOTE(gnossen): Based on our initial call batch, we are guaranteed 

687 # to receive initial metadata before any messages. 

688 while self._state.initial_metadata is None: 

689 self._consume_next_event() 

690 return self._state.initial_metadata 

691 

692 def trailing_metadata(self) -> Optional[MetadataType]: 

693 """See grpc.Call.trailing_metadata""" 

694 with self._state.condition: 

695 if self._state.trailing_metadata is None: 

696 error_msg = ( 

697 "Cannot get trailing metadata until RPC is completed." 

698 ) 

699 raise grpc.experimental.UsageError(error_msg) 

700 return self._state.trailing_metadata 

701 

702 def code(self) -> Optional[grpc.StatusCode]: 

703 """See grpc.Call.code""" 

704 with self._state.condition: 

705 if self._state.code is None: 

706 error_msg = "Cannot get code until RPC is completed." 

707 raise grpc.experimental.UsageError(error_msg) 

708 return self._state.code 

709 

710 def details(self) -> Optional[str]: 

711 """See grpc.Call.details""" 

712 with self._state.condition: 

713 if self._state.details is None: 

714 error_msg = "Cannot get details until RPC is completed." 

715 raise grpc.experimental.UsageError(error_msg) 

716 return _common.decode(self._state.details) 

717 

718 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]: 

719 event = self._call.next_event() 

720 with self._state.condition: 

721 callbacks = _handle_event( 

722 event, self._state, self._response_deserializer 

723 ) 

724 for callback in callbacks: 

725 # NOTE(gnossen): We intentionally allow exceptions to bubble up 

726 # to the user when running on a single thread. 

727 callback() 

728 return event 

729 

730 def _next_response(self) -> Any: 

731 while True: 

732 self._consume_next_event() 

733 with self._state.condition: 

734 if self._state.response is not None: 

735 response = self._state.response 

736 self._state.response = None 

737 return response 

738 if cygrpc.OperationType.receive_message not in self._state.due: 

739 if self._state.code is grpc.StatusCode.OK: 

740 raise StopIteration() 

741 if self._state.code is not None: 

742 raise self 

743 

744 def _next(self) -> Any: 

745 with self._state.condition: 

746 if self._state.code is None: 

747 # We tentatively add the operation as expected and remove 

748 # it if the enqueue operation fails. This allows us to guarantee that 

749 # if an event has been submitted to the core completion queue, 

750 # it is in `due`. If we waited until after a successful 

751 # enqueue operation then a signal could interrupt this 

752 # thread between the enqueue operation and the addition of the 

753 # operation to `due`. This would cause an exception on the 

754 # channel spin thread when the operation completes and no 

755 # corresponding operation would be present in state.due. 

756 # Note that, since `condition` is held through this block, there is 

757 # no data race on `due`. 

758 self._state.due.add(cygrpc.OperationType.receive_message) 

759 operating = self._call.operate( 

760 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None 

761 ) 

762 if not operating: 

763 self._state.due.remove(cygrpc.OperationType.receive_message) 

764 elif self._state.code is grpc.StatusCode.OK: 

765 raise StopIteration() 

766 else: 

767 raise self 

768 return self._next_response() 

769 

770 def debug_error_string(self) -> Optional[str]: 

771 with self._state.condition: 

772 if self._state.debug_error_string is None: 

773 error_msg = ( 

774 "Cannot get debug error string until RPC is completed." 

775 ) 

776 raise grpc.experimental.UsageError(error_msg) 

777 return _common.decode(self._state.debug_error_string) 

778 

779 

780class _MultiThreadedRendezvous( 

781 _Rendezvous, grpc.Call, grpc.Future 

782): # pylint: disable=too-many-ancestors 

783 """An RPC iterator that depends on a channel spin thread. 

784 

785 This iterator relies upon a per-channel thread running in the background, 

786 dequeueing events from the completion queue, and notifying threads waiting 

787 on the threading.Condition object in the _RPCState object. 

788 

789 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface 

790 and to mediate a bidirection streaming RPC. 

791 """ 

792 

793 _state: _RPCState 

794 

795 def initial_metadata(self) -> Optional[MetadataType]: 

796 """See grpc.Call.initial_metadata""" 

797 with self._state.condition: 

798 

799 def _done(): 

800 return self._state.initial_metadata is not None 

801 

802 _common.wait(self._state.condition.wait, _done) 

803 return self._state.initial_metadata 

804 

805 def trailing_metadata(self) -> Optional[MetadataType]: 

806 """See grpc.Call.trailing_metadata""" 

807 with self._state.condition: 

808 

809 def _done(): 

810 return self._state.trailing_metadata is not None 

811 

812 _common.wait(self._state.condition.wait, _done) 

813 return self._state.trailing_metadata 

814 

815 def code(self) -> Optional[grpc.StatusCode]: 

816 """See grpc.Call.code""" 

817 with self._state.condition: 

818 

819 def _done(): 

820 return self._state.code is not None 

821 

822 _common.wait(self._state.condition.wait, _done) 

823 return self._state.code 

824 

825 def details(self) -> Optional[str]: 

826 """See grpc.Call.details""" 

827 with self._state.condition: 

828 

829 def _done(): 

830 return self._state.details is not None 

831 

832 _common.wait(self._state.condition.wait, _done) 

833 return _common.decode(self._state.details) 

834 

835 def debug_error_string(self) -> Optional[str]: 

836 with self._state.condition: 

837 

838 def _done(): 

839 return self._state.debug_error_string is not None 

840 

841 _common.wait(self._state.condition.wait, _done) 

842 return _common.decode(self._state.debug_error_string) 

843 

844 def cancelled(self) -> bool: 

845 with self._state.condition: 

846 return self._state.cancelled 

847 

848 def running(self) -> bool: 

849 with self._state.condition: 

850 return self._state.code is None 

851 

852 def done(self) -> bool: 

853 with self._state.condition: 

854 return self._state.code is not None 

855 

856 def _is_complete(self) -> bool: 

857 return self._state.code is not None 

858 

859 def result(self, timeout: Optional[float] = None) -> Any: 

860 """Returns the result of the computation or raises its exception. 

861 

862 See grpc.Future.result for the full API contract. 

863 """ 

864 with self._state.condition: 

865 timed_out = _common.wait( 

866 self._state.condition.wait, self._is_complete, timeout=timeout 

867 ) 

868 if timed_out: 

869 raise grpc.FutureTimeoutError() 

870 if self._state.code is grpc.StatusCode.OK: 

871 return self._state.response 

872 if self._state.cancelled: 

873 raise grpc.FutureCancelledError() 

874 raise self 

875 

876 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

877 """Return the exception raised by the computation. 

878 

879 See grpc.Future.exception for the full API contract. 

880 """ 

881 with self._state.condition: 

882 timed_out = _common.wait( 

883 self._state.condition.wait, self._is_complete, timeout=timeout 

884 ) 

885 if timed_out: 

886 raise grpc.FutureTimeoutError() 

887 if self._state.code is grpc.StatusCode.OK: 

888 return None 

889 if self._state.cancelled: 

890 raise grpc.FutureCancelledError() 

891 return self 

892 

893 def traceback( 

894 self, timeout: Optional[float] = None 

895 ) -> Optional[types.TracebackType]: 

896 """Access the traceback of the exception raised by the computation. 

897 

898 See grpc.future.traceback for the full API contract. 

899 """ 

900 with self._state.condition: 

901 timed_out = _common.wait( 

902 self._state.condition.wait, self._is_complete, timeout=timeout 

903 ) 

904 if timed_out: 

905 raise grpc.FutureTimeoutError() 

906 if self._state.code is grpc.StatusCode.OK: 

907 return None 

908 if self._state.cancelled: 

909 raise grpc.FutureCancelledError() 

910 try: 

911 raise self 

912 except grpc.RpcError: 

913 return sys.exc_info()[2] 

914 

915 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

916 with self._state.condition: 

917 if self._state.code is None: 

918 self._state.callbacks.append(functools.partial(fn, self)) 

919 return 

920 

921 fn(self) 

922 

923 def _next(self) -> Any: 

924 with self._state.condition: 

925 if self._state.code is None: 

926 event_handler = _event_handler( 

927 self._state, self._response_deserializer 

928 ) 

929 self._state.due.add(cygrpc.OperationType.receive_message) 

930 operating = self._call.operate( 

931 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 

932 event_handler, 

933 ) 

934 if not operating: 

935 self._state.due.remove(cygrpc.OperationType.receive_message) 

936 elif self._state.code is grpc.StatusCode.OK: 

937 raise StopIteration() 

938 else: 

939 raise self 

940 

941 def _response_ready(): 

942 return self._state.response is not None or ( 

943 cygrpc.OperationType.receive_message not in self._state.due 

944 and self._state.code is not None 

945 ) 

946 

947 _common.wait(self._state.condition.wait, _response_ready) 

948 if self._state.response is not None: 

949 response = self._state.response 

950 self._state.response = None 

951 return response 

952 if cygrpc.OperationType.receive_message not in self._state.due: 

953 if self._state.code is grpc.StatusCode.OK: 

954 raise StopIteration() 

955 if self._state.code is not None: 

956 raise self 

957 

958 

959def _start_unary_request( 

960 request: Any, 

961 timeout: Optional[float], 

962 request_serializer: SerializingFunction, 

963) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]: 

964 deadline = _deadline(timeout) 

965 serialized_request = _common.serialize(request, request_serializer) 

966 if serialized_request is None: 

967 state = _RPCState( 

968 (), 

969 (), 

970 (), 

971 grpc.StatusCode.INTERNAL, 

972 "Exception serializing request!", 

973 ) 

974 error = _InactiveRpcError(state) 

975 return deadline, None, error 

976 return deadline, serialized_request, None 

977 

978 

979def _end_unary_response_blocking( 

980 state: _RPCState, 

981 call: cygrpc.SegregatedCall, 

982 with_call: bool, 

983 deadline: Optional[float], 

984) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]: 

985 if state.code is grpc.StatusCode.OK: 

986 if with_call: 

987 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline) 

988 return state.response, rendezvous 

989 return state.response 

990 raise _InactiveRpcError(state) # pytype: disable=not-instantiable 

991 

992 

993def _stream_unary_invocation_operations( 

994 metadata: Optional[MetadataType], initial_metadata_flags: int 

995) -> Sequence[Sequence[cygrpc.Operation]]: 

996 return ( 

997 ( 

998 cygrpc.SendInitialMetadataOperation( 

999 metadata, initial_metadata_flags 

1000 ), 

1001 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1002 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1003 ), 

1004 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1005 ) 

1006 

1007 

1008def _stream_unary_invocation_operations_and_tags( 

1009 metadata: Optional[MetadataType], initial_metadata_flags: int 

1010) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]: 

1011 return tuple( 

1012 ( 

1013 operations, 

1014 None, 

1015 ) 

1016 for operations in _stream_unary_invocation_operations( 

1017 metadata, initial_metadata_flags 

1018 ) 

1019 ) 

1020 

1021 

1022def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]: 

1023 parent_deadline = cygrpc.get_deadline_from_context() 

1024 if parent_deadline is None and user_deadline is None: 

1025 return None 

1026 if parent_deadline is not None and user_deadline is None: 

1027 return parent_deadline 

1028 if user_deadline is not None and parent_deadline is None: 

1029 return user_deadline 

1030 return min(parent_deadline, user_deadline) 

1031 

1032 

1033class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): 

1034 _channel: cygrpc.Channel 

1035 _managed_call: IntegratedCallFactory 

1036 _method: bytes 

1037 _target: bytes 

1038 _request_serializer: Optional[SerializingFunction] 

1039 _response_deserializer: Optional[DeserializingFunction] 

1040 _context: Any 

1041 _registered_call_handle: Optional[int] 

1042 

1043 __slots__ = [ 

1044 "_channel", 

1045 "_context", 

1046 "_managed_call", 

1047 "_method", 

1048 "_request_serializer", 

1049 "_response_deserializer", 

1050 "_target", 

1051 ] 

1052 

1053 # pylint: disable=too-many-arguments 

1054 def __init__( 

1055 self, 

1056 channel: cygrpc.Channel, 

1057 managed_call: IntegratedCallFactory, 

1058 method: bytes, 

1059 target: bytes, 

1060 request_serializer: Optional[SerializingFunction], 

1061 response_deserializer: Optional[DeserializingFunction], 

1062 _registered_call_handle: Optional[int], 

1063 ): 

1064 self._channel = channel 

1065 self._managed_call = managed_call 

1066 self._method = method 

1067 self._target = target 

1068 self._request_serializer = request_serializer 

1069 self._response_deserializer = response_deserializer 

1070 self._context = cygrpc.build_census_context() 

1071 self._registered_call_handle = _registered_call_handle 

1072 

1073 def _prepare( 

1074 self, 

1075 request: Any, 

1076 timeout: Optional[float], 

1077 metadata: Optional[MetadataType], 

1078 wait_for_ready: Optional[bool], 

1079 compression: Optional[grpc.Compression], 

1080 ) -> Tuple[ 

1081 Optional[_RPCState], 

1082 Optional[Sequence[cygrpc.Operation]], 

1083 Optional[float], 

1084 Optional[grpc.RpcError], 

1085 ]: 

1086 deadline, serialized_request, rendezvous = _start_unary_request( 

1087 request, timeout, self._request_serializer 

1088 ) 

1089 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1090 wait_for_ready 

1091 ) 

1092 augmented_metadata = _compression.augment_metadata( 

1093 metadata, compression 

1094 ) 

1095 if serialized_request is None: 

1096 return None, None, None, rendezvous 

1097 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) 

1098 operations = ( 

1099 cygrpc.SendInitialMetadataOperation( 

1100 augmented_metadata, initial_metadata_flags 

1101 ), 

1102 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1103 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1104 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), 

1105 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1106 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1107 ) 

1108 return state, operations, deadline, None 

1109 

1110 def _blocking( 

1111 self, 

1112 request: Any, 

1113 timeout: Optional[float] = None, 

1114 metadata: Optional[MetadataType] = None, 

1115 credentials: Optional[grpc.CallCredentials] = None, 

1116 wait_for_ready: Optional[bool] = None, 

1117 compression: Optional[grpc.Compression] = None, 

1118 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1119 state, operations, deadline, rendezvous = self._prepare( 

1120 request, timeout, metadata, wait_for_ready, compression 

1121 ) 

1122 if state is None: 

1123 raise rendezvous # pylint: disable-msg=raising-bad-type 

1124 state.rpc_start_time = time.perf_counter() 

1125 state.method = _common.decode(self._method) 

1126 state.target = _common.decode(self._target) 

1127 call = self._channel.segregated_call( 

1128 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1129 self._method, 

1130 None, 

1131 _determine_deadline(deadline), 

1132 metadata, 

1133 None if credentials is None else credentials._credentials, 

1134 ( 

1135 ( 

1136 operations, 

1137 None, 

1138 ), 

1139 ), 

1140 self._context, 

1141 self._registered_call_handle, 

1142 ) 

1143 event = call.next_event() 

1144 _handle_event(event, state, self._response_deserializer) 

1145 return state, call 

1146 

1147 def __call__( 

1148 self, 

1149 request: Any, 

1150 timeout: Optional[float] = None, 

1151 metadata: Optional[MetadataType] = None, 

1152 credentials: Optional[grpc.CallCredentials] = None, 

1153 wait_for_ready: Optional[bool] = None, 

1154 compression: Optional[grpc.Compression] = None, 

1155 ) -> Any: 

1156 state, call = self._blocking( 

1157 request, timeout, metadata, credentials, wait_for_ready, compression 

1158 ) 

1159 return _end_unary_response_blocking(state, call, False, None) 

1160 

1161 def with_call( 

1162 self, 

1163 request: Any, 

1164 timeout: Optional[float] = None, 

1165 metadata: Optional[MetadataType] = None, 

1166 credentials: Optional[grpc.CallCredentials] = None, 

1167 wait_for_ready: Optional[bool] = None, 

1168 compression: Optional[grpc.Compression] = None, 

1169 ) -> Tuple[Any, grpc.Call]: 

1170 state, call = self._blocking( 

1171 request, timeout, metadata, credentials, wait_for_ready, compression 

1172 ) 

1173 return _end_unary_response_blocking(state, call, True, None) 

1174 

1175 def future( 

1176 self, 

1177 request: Any, 

1178 timeout: Optional[float] = None, 

1179 metadata: Optional[MetadataType] = None, 

1180 credentials: Optional[grpc.CallCredentials] = None, 

1181 wait_for_ready: Optional[bool] = None, 

1182 compression: Optional[grpc.Compression] = None, 

1183 ) -> _MultiThreadedRendezvous: 

1184 state, operations, deadline, rendezvous = self._prepare( 

1185 request, timeout, metadata, wait_for_ready, compression 

1186 ) 

1187 if state is None: 

1188 raise rendezvous # pylint: disable-msg=raising-bad-type 

1189 event_handler = _event_handler(state, self._response_deserializer) 

1190 state.rpc_start_time = time.perf_counter() 

1191 state.method = _common.decode(self._method) 

1192 state.target = _common.decode(self._target) 

1193 call = self._managed_call( 

1194 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1195 self._method, 

1196 None, 

1197 deadline, 

1198 metadata, 

1199 None if credentials is None else credentials._credentials, 

1200 (operations,), 

1201 event_handler, 

1202 self._context, 

1203 self._registered_call_handle, 

1204 ) 

1205 return _MultiThreadedRendezvous( 

1206 state, call, self._response_deserializer, deadline 

1207 ) 

1208 

1209 

1210class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1211 _channel: cygrpc.Channel 

1212 _method: bytes 

1213 _target: bytes 

1214 _request_serializer: Optional[SerializingFunction] 

1215 _response_deserializer: Optional[DeserializingFunction] 

1216 _context: Any 

1217 _registered_call_handle: Optional[int] 

1218 

1219 __slots__ = [ 

1220 "_channel", 

1221 "_context", 

1222 "_method", 

1223 "_request_serializer", 

1224 "_response_deserializer", 

1225 "_target", 

1226 ] 

1227 

1228 # pylint: disable=too-many-arguments 

1229 def __init__( 

1230 self, 

1231 channel: cygrpc.Channel, 

1232 method: bytes, 

1233 target: bytes, 

1234 request_serializer: SerializingFunction, 

1235 response_deserializer: DeserializingFunction, 

1236 _registered_call_handle: Optional[int], 

1237 ): 

1238 self._channel = channel 

1239 self._method = method 

1240 self._target = target 

1241 self._request_serializer = request_serializer 

1242 self._response_deserializer = response_deserializer 

1243 self._context = cygrpc.build_census_context() 

1244 self._registered_call_handle = _registered_call_handle 

1245 

1246 def __call__( # pylint: disable=too-many-locals 

1247 self, 

1248 request: Any, 

1249 timeout: Optional[float] = None, 

1250 metadata: Optional[MetadataType] = None, 

1251 credentials: Optional[grpc.CallCredentials] = None, 

1252 wait_for_ready: Optional[bool] = None, 

1253 compression: Optional[grpc.Compression] = None, 

1254 ) -> _SingleThreadedRendezvous: 

1255 deadline = _deadline(timeout) 

1256 serialized_request = _common.serialize( 

1257 request, self._request_serializer 

1258 ) 

1259 if serialized_request is None: 

1260 state = _RPCState( 

1261 (), 

1262 (), 

1263 (), 

1264 grpc.StatusCode.INTERNAL, 

1265 "Exception serializing request!", 

1266 ) 

1267 raise _InactiveRpcError(state) 

1268 

1269 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1270 call_credentials = ( 

1271 None if credentials is None else credentials._credentials 

1272 ) 

1273 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1274 wait_for_ready 

1275 ) 

1276 augmented_metadata = _compression.augment_metadata( 

1277 metadata, compression 

1278 ) 

1279 operations = ( 

1280 ( 

1281 cygrpc.SendInitialMetadataOperation( 

1282 augmented_metadata, initial_metadata_flags 

1283 ), 

1284 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1285 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1286 ), 

1287 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),), 

1288 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1289 ) 

1290 operations_and_tags = tuple((ops, None) for ops in operations) 

1291 state.rpc_start_time = time.perf_counter() 

1292 state.method = _common.decode(self._method) 

1293 state.target = _common.decode(self._target) 

1294 call = self._channel.segregated_call( 

1295 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1296 self._method, 

1297 None, 

1298 _determine_deadline(deadline), 

1299 metadata, 

1300 call_credentials, 

1301 operations_and_tags, 

1302 self._context, 

1303 self._registered_call_handle, 

1304 ) 

1305 return _SingleThreadedRendezvous( 

1306 state, call, self._response_deserializer, deadline 

1307 ) 

1308 

1309 

1310class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1311 _channel: cygrpc.Channel 

1312 _managed_call: IntegratedCallFactory 

1313 _method: bytes 

1314 _target: bytes 

1315 _request_serializer: Optional[SerializingFunction] 

1316 _response_deserializer: Optional[DeserializingFunction] 

1317 _context: Any 

1318 _registered_call_handle: Optional[int] 

1319 

1320 __slots__ = [ 

1321 "_channel", 

1322 "_context", 

1323 "_managed_call", 

1324 "_method", 

1325 "_request_serializer", 

1326 "_response_deserializer", 

1327 "_target", 

1328 ] 

1329 

1330 # pylint: disable=too-many-arguments 

1331 def __init__( 

1332 self, 

1333 channel: cygrpc.Channel, 

1334 managed_call: IntegratedCallFactory, 

1335 method: bytes, 

1336 target: bytes, 

1337 request_serializer: SerializingFunction, 

1338 response_deserializer: DeserializingFunction, 

1339 _registered_call_handle: Optional[int], 

1340 ): 

1341 self._channel = channel 

1342 self._managed_call = managed_call 

1343 self._method = method 

1344 self._target = target 

1345 self._request_serializer = request_serializer 

1346 self._response_deserializer = response_deserializer 

1347 self._context = cygrpc.build_census_context() 

1348 self._registered_call_handle = _registered_call_handle 

1349 

1350 def __call__( # pylint: disable=too-many-locals 

1351 self, 

1352 request: Any, 

1353 timeout: Optional[float] = None, 

1354 metadata: Optional[MetadataType] = None, 

1355 credentials: Optional[grpc.CallCredentials] = None, 

1356 wait_for_ready: Optional[bool] = None, 

1357 compression: Optional[grpc.Compression] = None, 

1358 ) -> _MultiThreadedRendezvous: 

1359 deadline, serialized_request, rendezvous = _start_unary_request( 

1360 request, timeout, self._request_serializer 

1361 ) 

1362 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1363 wait_for_ready 

1364 ) 

1365 if serialized_request is None: 

1366 raise rendezvous # pylint: disable-msg=raising-bad-type 

1367 augmented_metadata = _compression.augment_metadata( 

1368 metadata, compression 

1369 ) 

1370 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1371 operations = ( 

1372 ( 

1373 cygrpc.SendInitialMetadataOperation( 

1374 augmented_metadata, initial_metadata_flags 

1375 ), 

1376 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1377 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1378 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1379 ), 

1380 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1381 ) 

1382 state.rpc_start_time = time.perf_counter() 

1383 state.method = _common.decode(self._method) 

1384 state.target = _common.decode(self._target) 

1385 call = self._managed_call( 

1386 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1387 self._method, 

1388 None, 

1389 _determine_deadline(deadline), 

1390 metadata, 

1391 None if credentials is None else credentials._credentials, 

1392 operations, 

1393 _event_handler(state, self._response_deserializer), 

1394 self._context, 

1395 self._registered_call_handle, 

1396 ) 

1397 return _MultiThreadedRendezvous( 

1398 state, call, self._response_deserializer, deadline 

1399 ) 

1400 

1401 

1402class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): 

1403 _channel: cygrpc.Channel 

1404 _managed_call: IntegratedCallFactory 

1405 _method: bytes 

1406 _target: bytes 

1407 _request_serializer: Optional[SerializingFunction] 

1408 _response_deserializer: Optional[DeserializingFunction] 

1409 _context: Any 

1410 _registered_call_handle: Optional[int] 

1411 

1412 __slots__ = [ 

1413 "_channel", 

1414 "_context", 

1415 "_managed_call", 

1416 "_method", 

1417 "_request_serializer", 

1418 "_response_deserializer", 

1419 "_target", 

1420 ] 

1421 

1422 # pylint: disable=too-many-arguments 

1423 def __init__( 

1424 self, 

1425 channel: cygrpc.Channel, 

1426 managed_call: IntegratedCallFactory, 

1427 method: bytes, 

1428 target: bytes, 

1429 request_serializer: Optional[SerializingFunction], 

1430 response_deserializer: Optional[DeserializingFunction], 

1431 _registered_call_handle: Optional[int], 

1432 ): 

1433 self._channel = channel 

1434 self._managed_call = managed_call 

1435 self._method = method 

1436 self._target = target 

1437 self._request_serializer = request_serializer 

1438 self._response_deserializer = response_deserializer 

1439 self._context = cygrpc.build_census_context() 

1440 self._registered_call_handle = _registered_call_handle 

1441 

1442 def _blocking( 

1443 self, 

1444 request_iterator: Iterator, 

1445 timeout: Optional[float], 

1446 metadata: Optional[MetadataType], 

1447 credentials: Optional[grpc.CallCredentials], 

1448 wait_for_ready: Optional[bool], 

1449 compression: Optional[grpc.Compression], 

1450 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1451 deadline = _deadline(timeout) 

1452 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1453 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1454 wait_for_ready 

1455 ) 

1456 augmented_metadata = _compression.augment_metadata( 

1457 metadata, compression 

1458 ) 

1459 state.rpc_start_time = time.perf_counter() 

1460 state.method = _common.decode(self._method) 

1461 state.target = _common.decode(self._target) 

1462 call = self._channel.segregated_call( 

1463 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1464 self._method, 

1465 None, 

1466 _determine_deadline(deadline), 

1467 augmented_metadata, 

1468 None if credentials is None else credentials._credentials, 

1469 _stream_unary_invocation_operations_and_tags( 

1470 augmented_metadata, initial_metadata_flags 

1471 ), 

1472 self._context, 

1473 self._registered_call_handle, 

1474 ) 

1475 _consume_request_iterator( 

1476 request_iterator, state, call, self._request_serializer, None 

1477 ) 

1478 while True: 

1479 event = call.next_event() 

1480 with state.condition: 

1481 _handle_event(event, state, self._response_deserializer) 

1482 state.condition.notify_all() 

1483 if not state.due: 

1484 break 

1485 return state, call 

1486 

1487 def __call__( 

1488 self, 

1489 request_iterator: Iterator, 

1490 timeout: Optional[float] = None, 

1491 metadata: Optional[MetadataType] = None, 

1492 credentials: Optional[grpc.CallCredentials] = None, 

1493 wait_for_ready: Optional[bool] = None, 

1494 compression: Optional[grpc.Compression] = None, 

1495 ) -> Any: 

1496 state, call = self._blocking( 

1497 request_iterator, 

1498 timeout, 

1499 metadata, 

1500 credentials, 

1501 wait_for_ready, 

1502 compression, 

1503 ) 

1504 return _end_unary_response_blocking(state, call, False, None) 

1505 

1506 def with_call( 

1507 self, 

1508 request_iterator: Iterator, 

1509 timeout: Optional[float] = None, 

1510 metadata: Optional[MetadataType] = None, 

1511 credentials: Optional[grpc.CallCredentials] = None, 

1512 wait_for_ready: Optional[bool] = None, 

1513 compression: Optional[grpc.Compression] = None, 

1514 ) -> Tuple[Any, grpc.Call]: 

1515 state, call = self._blocking( 

1516 request_iterator, 

1517 timeout, 

1518 metadata, 

1519 credentials, 

1520 wait_for_ready, 

1521 compression, 

1522 ) 

1523 return _end_unary_response_blocking(state, call, True, None) 

1524 

1525 def future( 

1526 self, 

1527 request_iterator: Iterator, 

1528 timeout: Optional[float] = None, 

1529 metadata: Optional[MetadataType] = None, 

1530 credentials: Optional[grpc.CallCredentials] = None, 

1531 wait_for_ready: Optional[bool] = None, 

1532 compression: Optional[grpc.Compression] = None, 

1533 ) -> _MultiThreadedRendezvous: 

1534 deadline = _deadline(timeout) 

1535 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1536 event_handler = _event_handler(state, self._response_deserializer) 

1537 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1538 wait_for_ready 

1539 ) 

1540 augmented_metadata = _compression.augment_metadata( 

1541 metadata, compression 

1542 ) 

1543 state.rpc_start_time = time.perf_counter() 

1544 state.method = _common.decode(self._method) 

1545 state.target = _common.decode(self._target) 

1546 call = self._managed_call( 

1547 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1548 self._method, 

1549 None, 

1550 deadline, 

1551 augmented_metadata, 

1552 None if credentials is None else credentials._credentials, 

1553 _stream_unary_invocation_operations( 

1554 metadata, initial_metadata_flags 

1555 ), 

1556 event_handler, 

1557 self._context, 

1558 self._registered_call_handle, 

1559 ) 

1560 _consume_request_iterator( 

1561 request_iterator, 

1562 state, 

1563 call, 

1564 self._request_serializer, 

1565 event_handler, 

1566 ) 

1567 return _MultiThreadedRendezvous( 

1568 state, call, self._response_deserializer, deadline 

1569 ) 

1570 

1571 

1572class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): 

1573 _channel: cygrpc.Channel 

1574 _managed_call: IntegratedCallFactory 

1575 _method: bytes 

1576 _target: bytes 

1577 _request_serializer: Optional[SerializingFunction] 

1578 _response_deserializer: Optional[DeserializingFunction] 

1579 _context: Any 

1580 _registered_call_handle: Optional[int] 

1581 

1582 __slots__ = [ 

1583 "_channel", 

1584 "_context", 

1585 "_managed_call", 

1586 "_method", 

1587 "_request_serializer", 

1588 "_response_deserializer", 

1589 "_target", 

1590 ] 

1591 

1592 # pylint: disable=too-many-arguments 

1593 def __init__( 

1594 self, 

1595 channel: cygrpc.Channel, 

1596 managed_call: IntegratedCallFactory, 

1597 method: bytes, 

1598 target: bytes, 

1599 request_serializer: Optional[SerializingFunction], 

1600 response_deserializer: Optional[DeserializingFunction], 

1601 _registered_call_handle: Optional[int], 

1602 ): 

1603 self._channel = channel 

1604 self._managed_call = managed_call 

1605 self._method = method 

1606 self._target = target 

1607 self._request_serializer = request_serializer 

1608 self._response_deserializer = response_deserializer 

1609 self._context = cygrpc.build_census_context() 

1610 self._registered_call_handle = _registered_call_handle 

1611 

1612 def __call__( 

1613 self, 

1614 request_iterator: Iterator, 

1615 timeout: Optional[float] = None, 

1616 metadata: Optional[MetadataType] = None, 

1617 credentials: Optional[grpc.CallCredentials] = None, 

1618 wait_for_ready: Optional[bool] = None, 

1619 compression: Optional[grpc.Compression] = None, 

1620 ) -> _MultiThreadedRendezvous: 

1621 deadline = _deadline(timeout) 

1622 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) 

1623 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1624 wait_for_ready 

1625 ) 

1626 augmented_metadata = _compression.augment_metadata( 

1627 metadata, compression 

1628 ) 

1629 operations = ( 

1630 ( 

1631 cygrpc.SendInitialMetadataOperation( 

1632 augmented_metadata, initial_metadata_flags 

1633 ), 

1634 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1635 ), 

1636 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1637 ) 

1638 event_handler = _event_handler(state, self._response_deserializer) 

1639 state.rpc_start_time = time.perf_counter() 

1640 state.method = _common.decode(self._method) 

1641 state.target = _common.decode(self._target) 

1642 call = self._managed_call( 

1643 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1644 self._method, 

1645 None, 

1646 _determine_deadline(deadline), 

1647 augmented_metadata, 

1648 None if credentials is None else credentials._credentials, 

1649 operations, 

1650 event_handler, 

1651 self._context, 

1652 self._registered_call_handle, 

1653 ) 

1654 _consume_request_iterator( 

1655 request_iterator, 

1656 state, 

1657 call, 

1658 self._request_serializer, 

1659 event_handler, 

1660 ) 

1661 return _MultiThreadedRendezvous( 

1662 state, call, self._response_deserializer, deadline 

1663 ) 

1664 

1665 

1666class _InitialMetadataFlags(int): 

1667 """Stores immutable initial metadata flags""" 

1668 

1669 def __new__(cls, value: int = _EMPTY_FLAGS): 

1670 value &= cygrpc.InitialMetadataFlags.used_mask 

1671 return super(_InitialMetadataFlags, cls).__new__(cls, value) 

1672 

1673 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int: 

1674 if wait_for_ready is not None: 

1675 if wait_for_ready: 

1676 return self.__class__( 

1677 self 

1678 | cygrpc.InitialMetadataFlags.wait_for_ready 

1679 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1680 ) 

1681 if not wait_for_ready: 

1682 return self.__class__( 

1683 self & ~cygrpc.InitialMetadataFlags.wait_for_ready 

1684 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1685 ) 

1686 return self 

1687 

1688 

1689class _ChannelCallState(object): 

1690 channel: cygrpc.Channel 

1691 managed_calls: int 

1692 threading: bool 

1693 

1694 def __init__(self, channel: cygrpc.Channel): 

1695 self.lock = threading.Lock() 

1696 self.channel = channel 

1697 self.managed_calls = 0 

1698 self.threading = False 

1699 

1700 def reset_postfork_child(self) -> None: 

1701 self.managed_calls = 0 

1702 

1703 def __del__(self): 

1704 try: 

1705 self.channel.close( 

1706 cygrpc.StatusCode.cancelled, "Channel deallocated!" 

1707 ) 

1708 except (TypeError, AttributeError): 

1709 pass 

1710 

1711 

1712def _run_channel_spin_thread(state: _ChannelCallState) -> None: 

1713 def channel_spin(): 

1714 while True: 

1715 cygrpc.block_if_fork_in_progress(state) 

1716 event = state.channel.next_call_event() 

1717 if event.completion_type == cygrpc.CompletionType.queue_timeout: 

1718 continue 

1719 call_completed = event.tag(event) 

1720 if call_completed: 

1721 with state.lock: 

1722 state.managed_calls -= 1 

1723 if state.managed_calls == 0: 

1724 return 

1725 

1726 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin) 

1727 channel_spin_thread.setDaemon(True) 

1728 channel_spin_thread.start() 

1729 

1730 

1731def _channel_managed_call_management(state: _ChannelCallState): 

1732 # pylint: disable=too-many-arguments 

1733 def create( 

1734 flags: int, 

1735 method: bytes, 

1736 host: Optional[str], 

1737 deadline: Optional[float], 

1738 metadata: Optional[MetadataType], 

1739 credentials: Optional[cygrpc.CallCredentials], 

1740 operations: Sequence[Sequence[cygrpc.Operation]], 

1741 event_handler: UserTag, 

1742 context: Any, 

1743 _registered_call_handle: Optional[int], 

1744 ) -> cygrpc.IntegratedCall: 

1745 """Creates a cygrpc.IntegratedCall. 

1746 

1747 Args: 

1748 flags: An integer bitfield of call flags. 

1749 method: The RPC method. 

1750 host: A host string for the created call. 

1751 deadline: A float to be the deadline of the created call or None if 

1752 the call is to have an infinite deadline. 

1753 metadata: The metadata for the call or None. 

1754 credentials: A cygrpc.CallCredentials or None. 

1755 operations: A sequence of sequences of cygrpc.Operations to be 

1756 started on the call. 

1757 event_handler: A behavior to call to handle the events resultant from 

1758 the operations on the call. 

1759 context: Context object for distributed tracing. 

1760 _registered_call_handle: An int representing the call handle of the 

1761 method, or None if the method is not registered. 

1762 

1763 Returns: 

1764 A cygrpc.IntegratedCall with which to conduct an RPC. 

1765 """ 

1766 operations_and_tags = tuple( 

1767 ( 

1768 operation, 

1769 event_handler, 

1770 ) 

1771 for operation in operations 

1772 ) 

1773 with state.lock: 

1774 call = state.channel.integrated_call( 

1775 flags, 

1776 method, 

1777 host, 

1778 deadline, 

1779 metadata, 

1780 credentials, 

1781 operations_and_tags, 

1782 context, 

1783 _registered_call_handle, 

1784 ) 

1785 if state.managed_calls == 0: 

1786 state.managed_calls = 1 

1787 _run_channel_spin_thread(state) 

1788 else: 

1789 state.managed_calls += 1 

1790 return call 

1791 

1792 return create 

1793 

1794 

1795class _ChannelConnectivityState(object): 

1796 lock: threading.RLock 

1797 channel: grpc.Channel 

1798 polling: bool 

1799 connectivity: grpc.ChannelConnectivity 

1800 try_to_connect: bool 

1801 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704 

1802 callbacks_and_connectivities: List[ 

1803 Sequence[ 

1804 Union[ 

1805 Callable[[grpc.ChannelConnectivity], None], 

1806 Optional[grpc.ChannelConnectivity], 

1807 ] 

1808 ] 

1809 ] 

1810 delivering: bool 

1811 

1812 def __init__(self, channel: grpc.Channel): 

1813 self.lock = threading.RLock() 

1814 self.channel = channel 

1815 self.polling = False 

1816 self.connectivity = None 

1817 self.try_to_connect = False 

1818 self.callbacks_and_connectivities = [] 

1819 self.delivering = False 

1820 

1821 def reset_postfork_child(self) -> None: 

1822 self.polling = False 

1823 self.connectivity = None 

1824 self.try_to_connect = False 

1825 self.callbacks_and_connectivities = [] 

1826 self.delivering = False 

1827 

1828 

1829def _deliveries( 

1830 state: _ChannelConnectivityState, 

1831) -> List[Callable[[grpc.ChannelConnectivity], None]]: 

1832 callbacks_needing_update = [] 

1833 for callback_and_connectivity in state.callbacks_and_connectivities: 

1834 callback, callback_connectivity = callback_and_connectivity 

1835 if callback_connectivity is not state.connectivity: 

1836 callbacks_needing_update.append(callback) 

1837 callback_and_connectivity[1] = state.connectivity 

1838 return callbacks_needing_update 

1839 

1840 

1841def _deliver( 

1842 state: _ChannelConnectivityState, 

1843 initial_connectivity: grpc.ChannelConnectivity, 

1844 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1845) -> None: 

1846 connectivity = initial_connectivity 

1847 callbacks = initial_callbacks 

1848 while True: 

1849 for callback in callbacks: 

1850 cygrpc.block_if_fork_in_progress(state) 

1851 try: 

1852 callback(connectivity) 

1853 except Exception: # pylint: disable=broad-except 

1854 _LOGGER.exception( 

1855 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE 

1856 ) 

1857 with state.lock: 

1858 callbacks = _deliveries(state) 

1859 if callbacks: 

1860 connectivity = state.connectivity 

1861 else: 

1862 state.delivering = False 

1863 return 

1864 

1865 

1866def _spawn_delivery( 

1867 state: _ChannelConnectivityState, 

1868 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1869) -> None: 

1870 delivering_thread = cygrpc.ForkManagedThread( 

1871 target=_deliver, 

1872 args=( 

1873 state, 

1874 state.connectivity, 

1875 callbacks, 

1876 ), 

1877 ) 

1878 delivering_thread.setDaemon(True) 

1879 delivering_thread.start() 

1880 state.delivering = True 

1881 

1882 

1883# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll. 

1884def _poll_connectivity( 

1885 state: _ChannelConnectivityState, 

1886 channel: grpc.Channel, 

1887 initial_try_to_connect: bool, 

1888) -> None: 

1889 try_to_connect = initial_try_to_connect 

1890 connectivity = channel.check_connectivity_state(try_to_connect) 

1891 with state.lock: 

1892 state.connectivity = ( 

1893 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1894 connectivity 

1895 ] 

1896 ) 

1897 callbacks = tuple( 

1898 callback for callback, _ in state.callbacks_and_connectivities 

1899 ) 

1900 for callback_and_connectivity in state.callbacks_and_connectivities: 

1901 callback_and_connectivity[1] = state.connectivity 

1902 if callbacks: 

1903 _spawn_delivery(state, callbacks) 

1904 while True: 

1905 event = channel.watch_connectivity_state( 

1906 connectivity, time.time() + 0.2 

1907 ) 

1908 cygrpc.block_if_fork_in_progress(state) 

1909 with state.lock: 

1910 if ( 

1911 not state.callbacks_and_connectivities 

1912 and not state.try_to_connect 

1913 ): 

1914 state.polling = False 

1915 state.connectivity = None 

1916 break 

1917 try_to_connect = state.try_to_connect 

1918 state.try_to_connect = False 

1919 if event.success or try_to_connect: 

1920 connectivity = channel.check_connectivity_state(try_to_connect) 

1921 with state.lock: 

1922 state.connectivity = ( 

1923 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1924 connectivity 

1925 ] 

1926 ) 

1927 if not state.delivering: 

1928 callbacks = _deliveries(state) 

1929 if callbacks: 

1930 _spawn_delivery(state, callbacks) 

1931 

1932 

1933def _subscribe( 

1934 state: _ChannelConnectivityState, 

1935 callback: Callable[[grpc.ChannelConnectivity], None], 

1936 try_to_connect: bool, 

1937) -> None: 

1938 with state.lock: 

1939 if not state.callbacks_and_connectivities and not state.polling: 

1940 polling_thread = cygrpc.ForkManagedThread( 

1941 target=_poll_connectivity, 

1942 args=(state, state.channel, bool(try_to_connect)), 

1943 ) 

1944 polling_thread.setDaemon(True) 

1945 polling_thread.start() 

1946 state.polling = True 

1947 state.callbacks_and_connectivities.append([callback, None]) 

1948 elif not state.delivering and state.connectivity is not None: 

1949 _spawn_delivery(state, (callback,)) 

1950 state.try_to_connect |= bool(try_to_connect) 

1951 state.callbacks_and_connectivities.append( 

1952 [callback, state.connectivity] 

1953 ) 

1954 else: 

1955 state.try_to_connect |= bool(try_to_connect) 

1956 state.callbacks_and_connectivities.append([callback, None]) 

1957 

1958 

1959def _unsubscribe( 

1960 state: _ChannelConnectivityState, 

1961 callback: Callable[[grpc.ChannelConnectivity], None], 

1962) -> None: 

1963 with state.lock: 

1964 for index, (subscribed_callback, _unused_connectivity) in enumerate( 

1965 state.callbacks_and_connectivities 

1966 ): 

1967 if callback == subscribed_callback: 

1968 state.callbacks_and_connectivities.pop(index) 

1969 break 

1970 

1971 

1972def _augment_options( 

1973 base_options: Sequence[ChannelArgumentType], 

1974 compression: Optional[grpc.Compression], 

1975) -> Sequence[ChannelArgumentType]: 

1976 compression_option = _compression.create_channel_option(compression) 

1977 return ( 

1978 tuple(base_options) 

1979 + compression_option 

1980 + ( 

1981 ( 

1982 cygrpc.ChannelArgKey.primary_user_agent_string, 

1983 _USER_AGENT, 

1984 ), 

1985 ) 

1986 ) 

1987 

1988 

1989def _separate_channel_options( 

1990 options: Sequence[ChannelArgumentType], 

1991) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]: 

1992 """Separates core channel options from Python channel options.""" 

1993 core_options = [] 

1994 python_options = [] 

1995 for pair in options: 

1996 if ( 

1997 pair[0] 

1998 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

1999 ): 

2000 python_options.append(pair) 

2001 else: 

2002 core_options.append(pair) 

2003 return python_options, core_options 

2004 

2005 

2006class Channel(grpc.Channel): 

2007 """A cygrpc.Channel-backed implementation of grpc.Channel.""" 

2008 

2009 _single_threaded_unary_stream: bool 

2010 _channel: cygrpc.Channel 

2011 _call_state: _ChannelCallState 

2012 _connectivity_state: _ChannelConnectivityState 

2013 _target: str 

2014 _registered_call_handles: Dict[str, int] 

2015 

2016 def __init__( 

2017 self, 

2018 target: str, 

2019 options: Sequence[ChannelArgumentType], 

2020 credentials: Optional[grpc.ChannelCredentials], 

2021 compression: Optional[grpc.Compression], 

2022 ): 

2023 """Constructor. 

2024 

2025 Args: 

2026 target: The target to which to connect. 

2027 options: Configuration options for the channel. 

2028 credentials: A cygrpc.ChannelCredentials or None. 

2029 compression: An optional value indicating the compression method to be 

2030 used over the lifetime of the channel. 

2031 """ 

2032 python_options, core_options = _separate_channel_options(options) 

2033 self._single_threaded_unary_stream = ( 

2034 _DEFAULT_SINGLE_THREADED_UNARY_STREAM 

2035 ) 

2036 self._process_python_options(python_options) 

2037 self._channel = cygrpc.Channel( 

2038 _common.encode(target), 

2039 _augment_options(core_options, compression), 

2040 credentials, 

2041 ) 

2042 self._target = target 

2043 self._call_state = _ChannelCallState(self._channel) 

2044 self._connectivity_state = _ChannelConnectivityState(self._channel) 

2045 cygrpc.fork_register_channel(self) 

2046 if cygrpc.g_gevent_activated: 

2047 cygrpc.gevent_increment_channel_count() 

2048 

2049 def _get_registered_call_handle(self, method: str) -> int: 

2050 """ 

2051 Get the registered call handle for a method. 

2052 

2053 This is a semi-private method. It is intended for use only by gRPC generated code. 

2054 

2055 This method is not thread-safe. 

2056 

2057 Args: 

2058 method: Required, the method name for the RPC. 

2059 

2060 Returns: 

2061 The registered call handle pointer in the form of a Python Long. 

2062 """ 

2063 return self._channel.get_registered_call_handle(_common.encode(method)) 

2064 

2065 def _process_python_options( 

2066 self, python_options: Sequence[ChannelArgumentType] 

2067 ) -> None: 

2068 """Sets channel attributes according to python-only channel options.""" 

2069 for pair in python_options: 

2070 if ( 

2071 pair[0] 

2072 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

2073 ): 

2074 self._single_threaded_unary_stream = True 

2075 

2076 def subscribe( 

2077 self, 

2078 callback: Callable[[grpc.ChannelConnectivity], None], 

2079 try_to_connect: Optional[bool] = None, 

2080 ) -> None: 

2081 _subscribe(self._connectivity_state, callback, try_to_connect) 

2082 

2083 def unsubscribe( 

2084 self, callback: Callable[[grpc.ChannelConnectivity], None] 

2085 ) -> None: 

2086 _unsubscribe(self._connectivity_state, callback) 

2087 

2088 # pylint: disable=arguments-differ 

2089 def unary_unary( 

2090 self, 

2091 method: str, 

2092 request_serializer: Optional[SerializingFunction] = None, 

2093 response_deserializer: Optional[DeserializingFunction] = None, 

2094 _registered_method: Optional[bool] = False, 

2095 ) -> grpc.UnaryUnaryMultiCallable: 

2096 _registered_call_handle = None 

2097 if _registered_method: 

2098 _registered_call_handle = self._get_registered_call_handle(method) 

2099 return _UnaryUnaryMultiCallable( 

2100 self._channel, 

2101 _channel_managed_call_management(self._call_state), 

2102 _common.encode(method), 

2103 _common.encode(self._target), 

2104 request_serializer, 

2105 response_deserializer, 

2106 _registered_call_handle, 

2107 ) 

2108 

2109 # pylint: disable=arguments-differ 

2110 def unary_stream( 

2111 self, 

2112 method: str, 

2113 request_serializer: Optional[SerializingFunction] = None, 

2114 response_deserializer: Optional[DeserializingFunction] = None, 

2115 _registered_method: Optional[bool] = False, 

2116 ) -> grpc.UnaryStreamMultiCallable: 

2117 _registered_call_handle = None 

2118 if _registered_method: 

2119 _registered_call_handle = self._get_registered_call_handle(method) 

2120 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC 

2121 # on a single Python thread results in an appreciable speed-up. However, 

2122 # due to slight differences in capability, the multi-threaded variant 

2123 # remains the default. 

2124 if self._single_threaded_unary_stream: 

2125 return _SingleThreadedUnaryStreamMultiCallable( 

2126 self._channel, 

2127 _common.encode(method), 

2128 _common.encode(self._target), 

2129 request_serializer, 

2130 response_deserializer, 

2131 _registered_call_handle, 

2132 ) 

2133 return _UnaryStreamMultiCallable( 

2134 self._channel, 

2135 _channel_managed_call_management(self._call_state), 

2136 _common.encode(method), 

2137 _common.encode(self._target), 

2138 request_serializer, 

2139 response_deserializer, 

2140 _registered_call_handle, 

2141 ) 

2142 

2143 # pylint: disable=arguments-differ 

2144 def stream_unary( 

2145 self, 

2146 method: str, 

2147 request_serializer: Optional[SerializingFunction] = None, 

2148 response_deserializer: Optional[DeserializingFunction] = None, 

2149 _registered_method: Optional[bool] = False, 

2150 ) -> grpc.StreamUnaryMultiCallable: 

2151 _registered_call_handle = None 

2152 if _registered_method: 

2153 _registered_call_handle = self._get_registered_call_handle(method) 

2154 return _StreamUnaryMultiCallable( 

2155 self._channel, 

2156 _channel_managed_call_management(self._call_state), 

2157 _common.encode(method), 

2158 _common.encode(self._target), 

2159 request_serializer, 

2160 response_deserializer, 

2161 _registered_call_handle, 

2162 ) 

2163 

2164 # pylint: disable=arguments-differ 

2165 def stream_stream( 

2166 self, 

2167 method: str, 

2168 request_serializer: Optional[SerializingFunction] = None, 

2169 response_deserializer: Optional[DeserializingFunction] = None, 

2170 _registered_method: Optional[bool] = False, 

2171 ) -> grpc.StreamStreamMultiCallable: 

2172 _registered_call_handle = None 

2173 if _registered_method: 

2174 _registered_call_handle = self._get_registered_call_handle(method) 

2175 return _StreamStreamMultiCallable( 

2176 self._channel, 

2177 _channel_managed_call_management(self._call_state), 

2178 _common.encode(method), 

2179 _common.encode(self._target), 

2180 request_serializer, 

2181 response_deserializer, 

2182 _registered_call_handle, 

2183 ) 

2184 

2185 def _unsubscribe_all(self) -> None: 

2186 state = self._connectivity_state 

2187 if state: 

2188 with state.lock: 

2189 del state.callbacks_and_connectivities[:] 

2190 

2191 def _close(self) -> None: 

2192 self._unsubscribe_all() 

2193 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!") 

2194 cygrpc.fork_unregister_channel(self) 

2195 if cygrpc.g_gevent_activated: 

2196 cygrpc.gevent_decrement_channel_count() 

2197 

2198 def _close_on_fork(self) -> None: 

2199 self._unsubscribe_all() 

2200 self._channel.close_on_fork( 

2201 cygrpc.StatusCode.cancelled, "Channel closed due to fork" 

2202 ) 

2203 

2204 def __enter__(self): 

2205 return self 

2206 

2207 def __exit__(self, exc_type, exc_val, exc_tb): 

2208 self._close() 

2209 return False 

2210 

2211 def close(self) -> None: 

2212 self._close() 

2213 

2214 def __del__(self): 

2215 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases 

2216 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call 

2217 # here (or more likely, call self._close() here). We don't do this today 

2218 # because many valid use cases today allow the channel to be deleted 

2219 # immediately after stubs are created. After a sufficient period of time 

2220 # has passed for all users to be trusted to freeze out to their channels 

2221 # for as long as they are in use and to close them after using them, 

2222 # then deletion of this grpc._channel.Channel instance can be made to 

2223 # effect closure of the underlying cygrpc.Channel instance. 

2224 try: 

2225 self._unsubscribe_all() 

2226 except: # pylint: disable=bare-except # noqa: E722 

2227 # Exceptions in __del__ are ignored by Python anyway, but they can 

2228 # keep spamming logs. Just silence them. 

2229 pass