Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_channel.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

981 statements  

1# Copyright 2016 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Python.""" 

15 

16import copy 

17import functools 

18import logging 

19import os 

20import sys 

21import threading 

22import time 

23import types 

24from typing import ( 

25 Any, 

26 Callable, 

27 Dict, 

28 Iterator, 

29 List, 

30 Optional, 

31 Sequence, 

32 Set, 

33 Tuple, 

34 Union, 

35) 

36 

37import grpc # pytype: disable=pyi-error 

38from grpc import _common # pytype: disable=pyi-error 

39from grpc import _compression # pytype: disable=pyi-error 

40from grpc import _grpcio_metadata # pytype: disable=pyi-error 

41from grpc import _observability # pytype: disable=pyi-error 

42from grpc._cython import cygrpc 

43from grpc._typing import ChannelArgumentType 

44from grpc._typing import DeserializingFunction 

45from grpc._typing import IntegratedCallFactory 

46from grpc._typing import MetadataType 

47from grpc._typing import NullaryCallbackType 

48from grpc._typing import ResponseType 

49from grpc._typing import SerializingFunction 

50from grpc._typing import UserTag 

51import grpc.experimental # pytype: disable=pyi-error 

52 

53_LOGGER = logging.getLogger(__name__) 

54 

55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__) 

56 

57_EMPTY_FLAGS = 0 

58 

59# NOTE(rbellevi): No guarantees are given about the maintenance of this 

60# environment variable. 

61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = ( 

62 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None 

63) 

64 

65_UNARY_UNARY_INITIAL_DUE = ( 

66 cygrpc.OperationType.send_initial_metadata, 

67 cygrpc.OperationType.send_message, 

68 cygrpc.OperationType.send_close_from_client, 

69 cygrpc.OperationType.receive_initial_metadata, 

70 cygrpc.OperationType.receive_message, 

71 cygrpc.OperationType.receive_status_on_client, 

72) 

73_UNARY_STREAM_INITIAL_DUE = ( 

74 cygrpc.OperationType.send_initial_metadata, 

75 cygrpc.OperationType.send_message, 

76 cygrpc.OperationType.send_close_from_client, 

77 cygrpc.OperationType.receive_initial_metadata, 

78 cygrpc.OperationType.receive_status_on_client, 

79) 

80_STREAM_UNARY_INITIAL_DUE = ( 

81 cygrpc.OperationType.send_initial_metadata, 

82 cygrpc.OperationType.receive_initial_metadata, 

83 cygrpc.OperationType.receive_message, 

84 cygrpc.OperationType.receive_status_on_client, 

85) 

86_STREAM_STREAM_INITIAL_DUE = ( 

87 cygrpc.OperationType.send_initial_metadata, 

88 cygrpc.OperationType.receive_initial_metadata, 

89 cygrpc.OperationType.receive_status_on_client, 

90) 

91 

92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 

93 "Exception calling channel subscription callback!" 

94) 

95 

96_OK_RENDEZVOUS_REPR_FORMAT = ( 

97 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' 

98) 

99 

100_NON_OK_RENDEZVOUS_REPR_FORMAT = ( 

101 "<{} of RPC that terminated with:\n" 

102 "\tstatus = {}\n" 

103 '\tdetails = "{}"\n' 

104 '\tdebug_error_string = "{}"\n' 

105 ">" 

106) 

107 

108 

109def _deadline(timeout: Optional[float]) -> Optional[float]: 

110 return None if timeout is None else time.time() + timeout 

111 

112 

113def _unknown_code_details( 

114 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str] 

115) -> str: 

116 return 'Server sent unknown code {} and details "{}"'.format( 

117 unknown_cygrpc_code, details 

118 ) 

119 

120 

121class _RPCState(object): 

122 condition: threading.Condition 

123 due: Set[cygrpc.OperationType] 

124 initial_metadata: Optional[MetadataType] 

125 response: Any 

126 trailing_metadata: Optional[MetadataType] 

127 code: Optional[grpc.StatusCode] 

128 details: Optional[str] 

129 debug_error_string: Optional[str] 

130 cancelled: bool 

131 callbacks: List[NullaryCallbackType] 

132 fork_epoch: Optional[int] 

133 rpc_start_time: Optional[float] # In relative seconds 

134 rpc_end_time: Optional[float] # In relative seconds 

135 method: Optional[str] 

136 target: Optional[str] 

137 

138 def __init__( 

139 self, 

140 due: Sequence[cygrpc.OperationType], 

141 initial_metadata: Optional[MetadataType], 

142 trailing_metadata: Optional[MetadataType], 

143 code: Optional[grpc.StatusCode], 

144 details: Optional[str], 

145 ): 

146 # `condition` guards all members of _RPCState. `notify_all` is called on 

147 # `condition` when the state of the RPC has changed. 

148 self.condition = threading.Condition() 

149 

150 # The cygrpc.OperationType objects representing events due from the RPC's 

151 # completion queue. If an operation is in `due`, it is guaranteed that 

152 # `operate()` has been called on a corresponding operation. But the 

153 # converse is not true. That is, in the case of failed `operate()` 

154 # calls, there may briefly be events in `due` that do not correspond to 

155 # operations submitted to Core. 

156 self.due = set(due) 

157 self.initial_metadata = initial_metadata 

158 self.response = None 

159 self.trailing_metadata = trailing_metadata 

160 self.code = code 

161 self.details = details 

162 self.debug_error_string = None 

163 # The following three fields are used for observability. 

164 # Updates to those fields do not trigger self.condition. 

165 self.rpc_start_time = None 

166 self.rpc_end_time = None 

167 self.method = None 

168 self.target = None 

169 

170 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are 

171 # slightly wonky, so they have to be tracked separately from the rest of the 

172 # result of the RPC. This field tracks whether cancellation was requested 

173 # prior to termination of the RPC. 

174 self.cancelled = False 

175 self.callbacks = [] 

176 self.fork_epoch = cygrpc.get_fork_epoch() 

177 

178 def reset_postfork_child(self): 

179 self.condition = threading.Condition() 

180 

181 

182def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None: 

183 if state.code is None: 

184 state.code = code 

185 state.details = details 

186 if state.initial_metadata is None: 

187 state.initial_metadata = () 

188 state.trailing_metadata = () 

189 

190 

191def _handle_event( 

192 event: cygrpc.BaseEvent, 

193 state: _RPCState, 

194 response_deserializer: Optional[DeserializingFunction], 

195) -> List[NullaryCallbackType]: 

196 callbacks = [] 

197 for batch_operation in event.batch_operations: 

198 operation_type = batch_operation.type() 

199 state.due.remove(operation_type) 

200 if operation_type == cygrpc.OperationType.receive_initial_metadata: 

201 state.initial_metadata = batch_operation.initial_metadata() 

202 elif operation_type == cygrpc.OperationType.receive_message: 

203 serialized_response = batch_operation.message() 

204 if serialized_response is not None: 

205 response = _common.deserialize( 

206 serialized_response, response_deserializer 

207 ) 

208 if response is None: 

209 details = "Exception deserializing response!" 

210 _abort(state, grpc.StatusCode.INTERNAL, details) 

211 else: 

212 state.response = response 

213 elif operation_type == cygrpc.OperationType.receive_status_on_client: 

214 state.trailing_metadata = batch_operation.trailing_metadata() 

215 if state.code is None: 

216 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( 

217 batch_operation.code() 

218 ) 

219 if code is None: 

220 state.code = grpc.StatusCode.UNKNOWN 

221 state.details = _unknown_code_details( 

222 code, batch_operation.details() 

223 ) 

224 else: 

225 state.code = code 

226 state.details = batch_operation.details() 

227 state.debug_error_string = batch_operation.error_string() 

228 state.rpc_end_time = time.perf_counter() 

229 _observability.maybe_record_rpc_latency(state) 

230 callbacks.extend(state.callbacks) 

231 state.callbacks = None 

232 return callbacks 

233 

234 

235def _event_handler( 

236 state: _RPCState, response_deserializer: Optional[DeserializingFunction] 

237) -> UserTag: 

238 def handle_event(event): 

239 with state.condition: 

240 callbacks = _handle_event(event, state, response_deserializer) 

241 state.condition.notify_all() 

242 done = not state.due 

243 for callback in callbacks: 

244 try: 

245 callback() 

246 except Exception as e: # pylint: disable=broad-except 

247 # NOTE(rbellevi): We suppress but log errors here so as not to 

248 # kill the channel spin thread. 

249 _LOGGER.error( 

250 "Exception in callback %s: %s", repr(callback.func), repr(e) 

251 ) 

252 return done and state.fork_epoch >= cygrpc.get_fork_epoch() 

253 

254 return handle_event 

255 

256 

257# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall. 

258# pylint: disable=too-many-statements 

259def _consume_request_iterator( 

260 request_iterator: Iterator, 

261 state: _RPCState, 

262 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall], 

263 request_serializer: SerializingFunction, 

264 event_handler: Optional[UserTag], 

265) -> None: 

266 """Consume a request supplied by the user.""" 

267 

268 def consume_request_iterator(): # pylint: disable=too-many-branches 

269 # Iterate over the request iterator until it is exhausted or an error 

270 # condition is encountered. 

271 while True: 

272 return_from_user_request_generator_invoked = False 

273 try: 

274 # The thread may die in user-code. Do not block fork for this. 

275 cygrpc.enter_user_request_generator() 

276 request = next(request_iterator) 

277 except StopIteration: 

278 break 

279 except Exception: # pylint: disable=broad-except 

280 cygrpc.return_from_user_request_generator() 

281 return_from_user_request_generator_invoked = True 

282 code = grpc.StatusCode.UNKNOWN 

283 details = "Exception iterating requests!" 

284 _LOGGER.exception(details) 

285 call.cancel( 

286 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

287 ) 

288 _abort(state, code, details) 

289 return 

290 finally: 

291 if not return_from_user_request_generator_invoked: 

292 cygrpc.return_from_user_request_generator() 

293 serialized_request = _common.serialize(request, request_serializer) 

294 with state.condition: 

295 if state.code is None and not state.cancelled: 

296 if serialized_request is None: 

297 code = grpc.StatusCode.INTERNAL 

298 details = "Exception serializing request!" 

299 call.cancel( 

300 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 

301 details, 

302 ) 

303 _abort(state, code, details) 

304 return 

305 state.due.add(cygrpc.OperationType.send_message) 

306 operations = ( 

307 cygrpc.SendMessageOperation( 

308 serialized_request, _EMPTY_FLAGS 

309 ), 

310 ) 

311 operating = call.operate(operations, event_handler) 

312 if not operating: 

313 state.due.remove(cygrpc.OperationType.send_message) 

314 return 

315 

316 def _done(): 

317 return ( 

318 state.code is not None 

319 or cygrpc.OperationType.send_message 

320 not in state.due 

321 ) 

322 

323 _common.wait( 

324 state.condition.wait, 

325 _done, 

326 spin_cb=functools.partial( 

327 cygrpc.block_if_fork_in_progress, state 

328 ), 

329 ) 

330 if state.code is not None: 

331 return 

332 else: 

333 return 

334 with state.condition: 

335 if state.code is None: 

336 state.due.add(cygrpc.OperationType.send_close_from_client) 

337 operations = ( 

338 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

339 ) 

340 operating = call.operate(operations, event_handler) 

341 if not operating: 

342 state.due.remove( 

343 cygrpc.OperationType.send_close_from_client 

344 ) 

345 

346 consumption_thread = cygrpc.ForkManagedThread( 

347 target=consume_request_iterator 

348 ) 

349 consumption_thread.setDaemon(True) 

350 consumption_thread.start() 

351 

352 

353def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str: 

354 """Calculates error string for RPC.""" 

355 with rpc_state.condition: 

356 if rpc_state.code is None: 

357 return "<{} object>".format(class_name) 

358 if rpc_state.code is grpc.StatusCode.OK: 

359 return _OK_RENDEZVOUS_REPR_FORMAT.format( 

360 class_name, rpc_state.code, rpc_state.details 

361 ) 

362 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( 

363 class_name, 

364 rpc_state.code, 

365 rpc_state.details, 

366 rpc_state.debug_error_string, 

367 ) 

368 

369 

370class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future): 

371 """An RPC error not tied to the execution of a particular RPC. 

372 

373 The RPC represented by the state object must not be in-progress or 

374 cancelled. 

375 

376 Attributes: 

377 _state: An instance of _RPCState. 

378 """ 

379 

380 _state: _RPCState 

381 

382 def __init__(self, state: _RPCState): 

383 with state.condition: 

384 self._state = _RPCState( 

385 (), 

386 copy.deepcopy(state.initial_metadata), 

387 copy.deepcopy(state.trailing_metadata), 

388 state.code, 

389 copy.deepcopy(state.details), 

390 ) 

391 self._state.response = copy.copy(state.response) 

392 self._state.debug_error_string = copy.copy(state.debug_error_string) 

393 

394 def initial_metadata(self) -> Optional[MetadataType]: 

395 return self._state.initial_metadata 

396 

397 def trailing_metadata(self) -> Optional[MetadataType]: 

398 return self._state.trailing_metadata 

399 

400 def code(self) -> Optional[grpc.StatusCode]: 

401 return self._state.code 

402 

403 def details(self) -> Optional[str]: 

404 return _common.decode(self._state.details) 

405 

406 def debug_error_string(self) -> Optional[str]: 

407 return _common.decode(self._state.debug_error_string) 

408 

409 def _repr(self) -> str: 

410 return _rpc_state_string(self.__class__.__name__, self._state) 

411 

412 def __repr__(self) -> str: 

413 return self._repr() 

414 

415 def __str__(self) -> str: 

416 return self._repr() 

417 

418 def cancel(self) -> bool: 

419 """See grpc.Future.cancel.""" 

420 return False 

421 

422 def cancelled(self) -> bool: 

423 """See grpc.Future.cancelled.""" 

424 return False 

425 

426 def running(self) -> bool: 

427 """See grpc.Future.running.""" 

428 return False 

429 

430 def done(self) -> bool: 

431 """See grpc.Future.done.""" 

432 return True 

433 

434 def result( 

435 self, timeout: Optional[float] = None 

436 ) -> Any: # pylint: disable=unused-argument 

437 """See grpc.Future.result.""" 

438 raise self 

439 

440 def exception( 

441 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

442 ) -> Optional[Exception]: 

443 """See grpc.Future.exception.""" 

444 return self 

445 

446 def traceback( 

447 self, timeout: Optional[float] = None # pylint: disable=unused-argument 

448 ) -> Optional[types.TracebackType]: 

449 """See grpc.Future.traceback.""" 

450 try: 

451 raise self 

452 except grpc.RpcError: 

453 return sys.exc_info()[2] 

454 

455 def add_done_callback( 

456 self, 

457 fn: Callable[[grpc.Future], None], 

458 timeout: Optional[float] = None, # pylint: disable=unused-argument 

459 ) -> None: 

460 """See grpc.Future.add_done_callback.""" 

461 fn(self) 

462 

463 

464class _Rendezvous(grpc.RpcError, grpc.RpcContext): 

465 """An RPC iterator. 

466 

467 Attributes: 

468 _state: An instance of _RPCState. 

469 _call: An instance of SegregatedCall or IntegratedCall. 

470 In either case, the _call object is expected to have operate, cancel, 

471 and next_event methods. 

472 _response_deserializer: A callable taking bytes and return a Python 

473 object. 

474 _deadline: A float representing the deadline of the RPC in seconds. Or 

475 possibly None, to represent an RPC with no deadline at all. 

476 """ 

477 

478 _state: _RPCState 

479 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall] 

480 _response_deserializer: Optional[DeserializingFunction] 

481 _deadline: Optional[float] 

482 

483 def __init__( 

484 self, 

485 state: _RPCState, 

486 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall], 

487 response_deserializer: Optional[DeserializingFunction], 

488 deadline: Optional[float], 

489 ): 

490 super(_Rendezvous, self).__init__() 

491 self._state = state 

492 self._call = call 

493 self._response_deserializer = response_deserializer 

494 self._deadline = deadline 

495 

496 def is_active(self) -> bool: 

497 """See grpc.RpcContext.is_active""" 

498 with self._state.condition: 

499 return self._state.code is None 

500 

501 def time_remaining(self) -> Optional[float]: 

502 """See grpc.RpcContext.time_remaining""" 

503 with self._state.condition: 

504 if self._deadline is None: 

505 return None 

506 return max(self._deadline - time.time(), 0) 

507 

508 def cancel(self) -> bool: 

509 """See grpc.RpcContext.cancel""" 

510 with self._state.condition: 

511 if self._state.code is None: 

512 code = grpc.StatusCode.CANCELLED 

513 details = "Locally cancelled by application!" 

514 self._call.cancel( 

515 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 

516 ) 

517 self._state.cancelled = True 

518 _abort(self._state, code, details) 

519 self._state.condition.notify_all() 

520 return True 

521 return False 

522 

523 def add_callback(self, callback: NullaryCallbackType) -> bool: 

524 """See grpc.RpcContext.add_callback""" 

525 with self._state.condition: 

526 if self._state.callbacks is None: 

527 return False 

528 self._state.callbacks.append(callback) 

529 return True 

530 

531 def __iter__(self): 

532 return self 

533 

534 def next(self): 

535 return self._next() 

536 

537 def __next__(self): 

538 return self._next() 

539 

540 def _next(self): 

541 raise NotImplementedError() 

542 

543 def debug_error_string(self) -> Optional[str]: 

544 raise NotImplementedError() 

545 

546 def _repr(self) -> str: 

547 return _rpc_state_string(self.__class__.__name__, self._state) 

548 

549 def __repr__(self) -> str: 

550 return self._repr() 

551 

552 def __str__(self) -> str: 

553 return self._repr() 

554 

555 def __del__(self) -> None: 

556 with self._state.condition: 

557 if self._state.code is None: 

558 self._state.code = grpc.StatusCode.CANCELLED 

559 self._state.details = "Cancelled upon garbage collection!" 

560 self._state.cancelled = True 

561 self._call.cancel( 

562 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], 

563 self._state.details, 

564 ) 

565 self._state.condition.notify_all() 

566 

567 

568class _SingleThreadedRendezvous( 

569 _Rendezvous, grpc.Call, grpc.Future 

570): # pylint: disable=too-many-ancestors 

571 """An RPC iterator operating entirely on a single thread. 

572 

573 The __next__ method of _SingleThreadedRendezvous does not depend on the 

574 existence of any other thread, including the "channel spin thread". 

575 However, this means that its interface is entirely synchronous. So this 

576 class cannot completely fulfill the grpc.Future interface. The result, 

577 exception, and traceback methods will never block and will instead raise 

578 an exception if calling the method would result in blocking. 

579 

580 This means that these methods are safe to call from add_done_callback 

581 handlers. 

582 """ 

583 

584 _state: _RPCState 

585 

586 def _is_complete(self) -> bool: 

587 return self._state.code is not None 

588 

589 def cancelled(self) -> bool: 

590 with self._state.condition: 

591 return self._state.cancelled 

592 

593 def running(self) -> bool: 

594 with self._state.condition: 

595 return self._state.code is None 

596 

597 def done(self) -> bool: 

598 with self._state.condition: 

599 return self._state.code is not None 

600 

601 def result(self, timeout: Optional[float] = None) -> Any: 

602 """Returns the result of the computation or raises its exception. 

603 

604 This method will never block. Instead, it will raise an exception 

605 if calling this method would otherwise result in blocking. 

606 

607 Since this method will never block, any `timeout` argument passed will 

608 be ignored. 

609 """ 

610 del timeout 

611 with self._state.condition: 

612 if not self._is_complete(): 

613 error_msg = ( 

614 "_SingleThreadedRendezvous only supports " 

615 "result() when the RPC is complete." 

616 ) 

617 raise grpc.experimental.UsageError(error_msg) 

618 if self._state.code is grpc.StatusCode.OK: 

619 return self._state.response 

620 if self._state.cancelled: 

621 raise grpc.FutureCancelledError() 

622 else: 

623 raise self 

624 

625 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

626 """Return the exception raised by the computation. 

627 

628 This method will never block. Instead, it will raise an exception 

629 if calling this method would otherwise result in blocking. 

630 

631 Since this method will never block, any `timeout` argument passed will 

632 be ignored. 

633 """ 

634 del timeout 

635 with self._state.condition: 

636 if not self._is_complete(): 

637 error_msg = ( 

638 "_SingleThreadedRendezvous only supports " 

639 "exception() when the RPC is complete." 

640 ) 

641 raise grpc.experimental.UsageError(error_msg) 

642 if self._state.code is grpc.StatusCode.OK: 

643 return None 

644 if self._state.cancelled: 

645 raise grpc.FutureCancelledError() 

646 else: 

647 return self 

648 

649 def traceback( 

650 self, timeout: Optional[float] = None 

651 ) -> Optional[types.TracebackType]: 

652 """Access the traceback of the exception raised by the computation. 

653 

654 This method will never block. Instead, it will raise an exception 

655 if calling this method would otherwise result in blocking. 

656 

657 Since this method will never block, any `timeout` argument passed will 

658 be ignored. 

659 """ 

660 del timeout 

661 with self._state.condition: 

662 if not self._is_complete(): 

663 msg = ( 

664 "_SingleThreadedRendezvous only supports " 

665 "traceback() when the RPC is complete." 

666 ) 

667 raise grpc.experimental.UsageError(msg) 

668 if self._state.code is grpc.StatusCode.OK: 

669 return None 

670 if self._state.cancelled: 

671 raise grpc.FutureCancelledError() 

672 else: 

673 try: 

674 raise self 

675 except grpc.RpcError: 

676 return sys.exc_info()[2] 

677 

678 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

679 with self._state.condition: 

680 if self._state.code is None: 

681 self._state.callbacks.append(functools.partial(fn, self)) 

682 return 

683 

684 fn(self) 

685 

686 def initial_metadata(self) -> Optional[MetadataType]: 

687 """See grpc.Call.initial_metadata""" 

688 with self._state.condition: 

689 # NOTE(gnossen): Based on our initial call batch, we are guaranteed 

690 # to receive initial metadata before any messages. 

691 while self._state.initial_metadata is None: 

692 self._consume_next_event() 

693 return self._state.initial_metadata 

694 

695 def trailing_metadata(self) -> Optional[MetadataType]: 

696 """See grpc.Call.trailing_metadata""" 

697 with self._state.condition: 

698 if self._state.trailing_metadata is None: 

699 error_msg = ( 

700 "Cannot get trailing metadata until RPC is completed." 

701 ) 

702 raise grpc.experimental.UsageError(error_msg) 

703 return self._state.trailing_metadata 

704 

705 def code(self) -> Optional[grpc.StatusCode]: 

706 """See grpc.Call.code""" 

707 with self._state.condition: 

708 if self._state.code is None: 

709 error_msg = "Cannot get code until RPC is completed." 

710 raise grpc.experimental.UsageError(error_msg) 

711 return self._state.code 

712 

713 def details(self) -> Optional[str]: 

714 """See grpc.Call.details""" 

715 with self._state.condition: 

716 if self._state.details is None: 

717 error_msg = "Cannot get details until RPC is completed." 

718 raise grpc.experimental.UsageError(error_msg) 

719 return _common.decode(self._state.details) 

720 

721 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]: 

722 event = self._call.next_event() 

723 with self._state.condition: 

724 callbacks = _handle_event( 

725 event, self._state, self._response_deserializer 

726 ) 

727 for callback in callbacks: 

728 # NOTE(gnossen): We intentionally allow exceptions to bubble up 

729 # to the user when running on a single thread. 

730 callback() 

731 return event 

732 

733 def _next_response(self) -> Any: 

734 while True: 

735 self._consume_next_event() 

736 with self._state.condition: 

737 if self._state.response is not None: 

738 response = self._state.response 

739 self._state.response = None 

740 return response 

741 if cygrpc.OperationType.receive_message not in self._state.due: 

742 if self._state.code is grpc.StatusCode.OK: 

743 raise StopIteration() 

744 elif self._state.code is not None: 

745 raise self 

746 

747 def _next(self) -> Any: 

748 with self._state.condition: 

749 if self._state.code is None: 

750 # We tentatively add the operation as expected and remove 

751 # it if the enqueue operation fails. This allows us to guarantee that 

752 # if an event has been submitted to the core completion queue, 

753 # it is in `due`. If we waited until after a successful 

754 # enqueue operation then a signal could interrupt this 

755 # thread between the enqueue operation and the addition of the 

756 # operation to `due`. This would cause an exception on the 

757 # channel spin thread when the operation completes and no 

758 # corresponding operation would be present in state.due. 

759 # Note that, since `condition` is held through this block, there is 

760 # no data race on `due`. 

761 self._state.due.add(cygrpc.OperationType.receive_message) 

762 operating = self._call.operate( 

763 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None 

764 ) 

765 if not operating: 

766 self._state.due.remove(cygrpc.OperationType.receive_message) 

767 elif self._state.code is grpc.StatusCode.OK: 

768 raise StopIteration() 

769 else: 

770 raise self 

771 return self._next_response() 

772 

773 def debug_error_string(self) -> Optional[str]: 

774 with self._state.condition: 

775 if self._state.debug_error_string is None: 

776 error_msg = ( 

777 "Cannot get debug error string until RPC is completed." 

778 ) 

779 raise grpc.experimental.UsageError(error_msg) 

780 return _common.decode(self._state.debug_error_string) 

781 

782 

783class _MultiThreadedRendezvous( 

784 _Rendezvous, grpc.Call, grpc.Future 

785): # pylint: disable=too-many-ancestors 

786 """An RPC iterator that depends on a channel spin thread. 

787 

788 This iterator relies upon a per-channel thread running in the background, 

789 dequeueing events from the completion queue, and notifying threads waiting 

790 on the threading.Condition object in the _RPCState object. 

791 

792 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface 

793 and to mediate a bidirection streaming RPC. 

794 """ 

795 

796 _state: _RPCState 

797 

798 def initial_metadata(self) -> Optional[MetadataType]: 

799 """See grpc.Call.initial_metadata""" 

800 with self._state.condition: 

801 

802 def _done(): 

803 return self._state.initial_metadata is not None 

804 

805 _common.wait(self._state.condition.wait, _done) 

806 return self._state.initial_metadata 

807 

808 def trailing_metadata(self) -> Optional[MetadataType]: 

809 """See grpc.Call.trailing_metadata""" 

810 with self._state.condition: 

811 

812 def _done(): 

813 return self._state.trailing_metadata is not None 

814 

815 _common.wait(self._state.condition.wait, _done) 

816 return self._state.trailing_metadata 

817 

818 def code(self) -> Optional[grpc.StatusCode]: 

819 """See grpc.Call.code""" 

820 with self._state.condition: 

821 

822 def _done(): 

823 return self._state.code is not None 

824 

825 _common.wait(self._state.condition.wait, _done) 

826 return self._state.code 

827 

828 def details(self) -> Optional[str]: 

829 """See grpc.Call.details""" 

830 with self._state.condition: 

831 

832 def _done(): 

833 return self._state.details is not None 

834 

835 _common.wait(self._state.condition.wait, _done) 

836 return _common.decode(self._state.details) 

837 

838 def debug_error_string(self) -> Optional[str]: 

839 with self._state.condition: 

840 

841 def _done(): 

842 return self._state.debug_error_string is not None 

843 

844 _common.wait(self._state.condition.wait, _done) 

845 return _common.decode(self._state.debug_error_string) 

846 

847 def cancelled(self) -> bool: 

848 with self._state.condition: 

849 return self._state.cancelled 

850 

851 def running(self) -> bool: 

852 with self._state.condition: 

853 return self._state.code is None 

854 

855 def done(self) -> bool: 

856 with self._state.condition: 

857 return self._state.code is not None 

858 

859 def _is_complete(self) -> bool: 

860 return self._state.code is not None 

861 

862 def result(self, timeout: Optional[float] = None) -> Any: 

863 """Returns the result of the computation or raises its exception. 

864 

865 See grpc.Future.result for the full API contract. 

866 """ 

867 with self._state.condition: 

868 timed_out = _common.wait( 

869 self._state.condition.wait, self._is_complete, timeout=timeout 

870 ) 

871 if timed_out: 

872 raise grpc.FutureTimeoutError() 

873 elif self._state.code is grpc.StatusCode.OK: 

874 return self._state.response 

875 elif self._state.cancelled: 

876 raise grpc.FutureCancelledError() 

877 else: 

878 raise self 

879 

880 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 

881 """Return the exception raised by the computation. 

882 

883 See grpc.Future.exception for the full API contract. 

884 """ 

885 with self._state.condition: 

886 timed_out = _common.wait( 

887 self._state.condition.wait, self._is_complete, timeout=timeout 

888 ) 

889 if timed_out: 

890 raise grpc.FutureTimeoutError() 

891 elif self._state.code is grpc.StatusCode.OK: 

892 return None 

893 elif self._state.cancelled: 

894 raise grpc.FutureCancelledError() 

895 else: 

896 return self 

897 

898 def traceback( 

899 self, timeout: Optional[float] = None 

900 ) -> Optional[types.TracebackType]: 

901 """Access the traceback of the exception raised by the computation. 

902 

903 See grpc.future.traceback for the full API contract. 

904 """ 

905 with self._state.condition: 

906 timed_out = _common.wait( 

907 self._state.condition.wait, self._is_complete, timeout=timeout 

908 ) 

909 if timed_out: 

910 raise grpc.FutureTimeoutError() 

911 elif self._state.code is grpc.StatusCode.OK: 

912 return None 

913 elif self._state.cancelled: 

914 raise grpc.FutureCancelledError() 

915 else: 

916 try: 

917 raise self 

918 except grpc.RpcError: 

919 return sys.exc_info()[2] 

920 

921 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 

922 with self._state.condition: 

923 if self._state.code is None: 

924 self._state.callbacks.append(functools.partial(fn, self)) 

925 return 

926 

927 fn(self) 

928 

929 def _next(self) -> Any: 

930 with self._state.condition: 

931 if self._state.code is None: 

932 event_handler = _event_handler( 

933 self._state, self._response_deserializer 

934 ) 

935 self._state.due.add(cygrpc.OperationType.receive_message) 

936 operating = self._call.operate( 

937 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 

938 event_handler, 

939 ) 

940 if not operating: 

941 self._state.due.remove(cygrpc.OperationType.receive_message) 

942 elif self._state.code is grpc.StatusCode.OK: 

943 raise StopIteration() 

944 else: 

945 raise self 

946 

947 def _response_ready(): 

948 return self._state.response is not None or ( 

949 cygrpc.OperationType.receive_message not in self._state.due 

950 and self._state.code is not None 

951 ) 

952 

953 _common.wait(self._state.condition.wait, _response_ready) 

954 if self._state.response is not None: 

955 response = self._state.response 

956 self._state.response = None 

957 return response 

958 if cygrpc.OperationType.receive_message not in self._state.due: 

959 if self._state.code is grpc.StatusCode.OK: 

960 raise StopIteration() 

961 elif self._state.code is not None: 

962 raise self 

963 

964 

965def _start_unary_request( 

966 request: Any, 

967 timeout: Optional[float], 

968 request_serializer: SerializingFunction, 

969) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]: 

970 deadline = _deadline(timeout) 

971 serialized_request = _common.serialize(request, request_serializer) 

972 if serialized_request is None: 

973 state = _RPCState( 

974 (), 

975 (), 

976 (), 

977 grpc.StatusCode.INTERNAL, 

978 "Exception serializing request!", 

979 ) 

980 error = _InactiveRpcError(state) 

981 return deadline, None, error 

982 return deadline, serialized_request, None 

983 

984 

985def _end_unary_response_blocking( 

986 state: _RPCState, 

987 call: cygrpc.SegregatedCall, 

988 with_call: bool, 

989 deadline: Optional[float], 

990) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]: 

991 if state.code is grpc.StatusCode.OK: 

992 if with_call: 

993 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline) 

994 return state.response, rendezvous 

995 return state.response 

996 raise _InactiveRpcError(state) # pytype: disable=not-instantiable 

997 

998 

999def _stream_unary_invocation_operations( 

1000 metadata: Optional[MetadataType], initial_metadata_flags: int 

1001) -> Sequence[Sequence[cygrpc.Operation]]: 

1002 return ( 

1003 ( 

1004 cygrpc.SendInitialMetadataOperation( 

1005 metadata, initial_metadata_flags 

1006 ), 

1007 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1008 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1009 ), 

1010 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1011 ) 

1012 

1013 

1014def _stream_unary_invocation_operations_and_tags( 

1015 metadata: Optional[MetadataType], initial_metadata_flags: int 

1016) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]: 

1017 return tuple( 

1018 ( 

1019 operations, 

1020 None, 

1021 ) 

1022 for operations in _stream_unary_invocation_operations( 

1023 metadata, initial_metadata_flags 

1024 ) 

1025 ) 

1026 

1027 

1028def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]: 

1029 parent_deadline = cygrpc.get_deadline_from_context() 

1030 if parent_deadline is None and user_deadline is None: 

1031 return None 

1032 if parent_deadline is not None and user_deadline is None: 

1033 return parent_deadline 

1034 if user_deadline is not None and parent_deadline is None: 

1035 return user_deadline 

1036 return min(parent_deadline, user_deadline) 

1037 

1038 

1039class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): 

1040 _channel: cygrpc.Channel 

1041 _managed_call: IntegratedCallFactory 

1042 _method: bytes 

1043 _target: bytes 

1044 _request_serializer: Optional[SerializingFunction] 

1045 _response_deserializer: Optional[DeserializingFunction] 

1046 _context: Any 

1047 _registered_call_handle: Optional[int] 

1048 

1049 __slots__ = [ 

1050 "_channel", 

1051 "_managed_call", 

1052 "_method", 

1053 "_target", 

1054 "_request_serializer", 

1055 "_response_deserializer", 

1056 "_context", 

1057 ] 

1058 

1059 # pylint: disable=too-many-arguments 

1060 def __init__( 

1061 self, 

1062 channel: cygrpc.Channel, 

1063 managed_call: IntegratedCallFactory, 

1064 method: bytes, 

1065 target: bytes, 

1066 request_serializer: Optional[SerializingFunction], 

1067 response_deserializer: Optional[DeserializingFunction], 

1068 _registered_call_handle: Optional[int], 

1069 ): 

1070 self._channel = channel 

1071 self._managed_call = managed_call 

1072 self._method = method 

1073 self._target = target 

1074 self._request_serializer = request_serializer 

1075 self._response_deserializer = response_deserializer 

1076 self._context = cygrpc.build_census_context() 

1077 self._registered_call_handle = _registered_call_handle 

1078 

1079 def _prepare( 

1080 self, 

1081 request: Any, 

1082 timeout: Optional[float], 

1083 metadata: Optional[MetadataType], 

1084 wait_for_ready: Optional[bool], 

1085 compression: Optional[grpc.Compression], 

1086 ) -> Tuple[ 

1087 Optional[_RPCState], 

1088 Optional[Sequence[cygrpc.Operation]], 

1089 Optional[float], 

1090 Optional[grpc.RpcError], 

1091 ]: 

1092 deadline, serialized_request, rendezvous = _start_unary_request( 

1093 request, timeout, self._request_serializer 

1094 ) 

1095 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1096 wait_for_ready 

1097 ) 

1098 augmented_metadata = _compression.augment_metadata( 

1099 metadata, compression 

1100 ) 

1101 if serialized_request is None: 

1102 return None, None, None, rendezvous 

1103 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) 

1104 operations = ( 

1105 cygrpc.SendInitialMetadataOperation( 

1106 augmented_metadata, initial_metadata_flags 

1107 ), 

1108 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1109 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1110 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), 

1111 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

1112 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1113 ) 

1114 return state, operations, deadline, None 

1115 

1116 def _blocking( 

1117 self, 

1118 request: Any, 

1119 timeout: Optional[float] = None, 

1120 metadata: Optional[MetadataType] = None, 

1121 credentials: Optional[grpc.CallCredentials] = None, 

1122 wait_for_ready: Optional[bool] = None, 

1123 compression: Optional[grpc.Compression] = None, 

1124 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1125 state, operations, deadline, rendezvous = self._prepare( 

1126 request, timeout, metadata, wait_for_ready, compression 

1127 ) 

1128 if state is None: 

1129 raise rendezvous # pylint: disable-msg=raising-bad-type 

1130 else: 

1131 state.rpc_start_time = time.perf_counter() 

1132 state.method = _common.decode(self._method) 

1133 state.target = _common.decode(self._target) 

1134 call = self._channel.segregated_call( 

1135 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1136 self._method, 

1137 None, 

1138 _determine_deadline(deadline), 

1139 metadata, 

1140 None if credentials is None else credentials._credentials, 

1141 ( 

1142 ( 

1143 operations, 

1144 None, 

1145 ), 

1146 ), 

1147 self._context, 

1148 self._registered_call_handle, 

1149 ) 

1150 event = call.next_event() 

1151 _handle_event(event, state, self._response_deserializer) 

1152 return state, call 

1153 

1154 def __call__( 

1155 self, 

1156 request: Any, 

1157 timeout: Optional[float] = None, 

1158 metadata: Optional[MetadataType] = None, 

1159 credentials: Optional[grpc.CallCredentials] = None, 

1160 wait_for_ready: Optional[bool] = None, 

1161 compression: Optional[grpc.Compression] = None, 

1162 ) -> Any: 

1163 state, call = self._blocking( 

1164 request, timeout, metadata, credentials, wait_for_ready, compression 

1165 ) 

1166 return _end_unary_response_blocking(state, call, False, None) 

1167 

1168 def with_call( 

1169 self, 

1170 request: Any, 

1171 timeout: Optional[float] = None, 

1172 metadata: Optional[MetadataType] = None, 

1173 credentials: Optional[grpc.CallCredentials] = None, 

1174 wait_for_ready: Optional[bool] = None, 

1175 compression: Optional[grpc.Compression] = None, 

1176 ) -> Tuple[Any, grpc.Call]: 

1177 state, call = self._blocking( 

1178 request, timeout, metadata, credentials, wait_for_ready, compression 

1179 ) 

1180 return _end_unary_response_blocking(state, call, True, None) 

1181 

1182 def future( 

1183 self, 

1184 request: Any, 

1185 timeout: Optional[float] = None, 

1186 metadata: Optional[MetadataType] = None, 

1187 credentials: Optional[grpc.CallCredentials] = None, 

1188 wait_for_ready: Optional[bool] = None, 

1189 compression: Optional[grpc.Compression] = None, 

1190 ) -> _MultiThreadedRendezvous: 

1191 state, operations, deadline, rendezvous = self._prepare( 

1192 request, timeout, metadata, wait_for_ready, compression 

1193 ) 

1194 if state is None: 

1195 raise rendezvous # pylint: disable-msg=raising-bad-type 

1196 else: 

1197 event_handler = _event_handler(state, self._response_deserializer) 

1198 state.rpc_start_time = time.perf_counter() 

1199 state.method = _common.decode(self._method) 

1200 state.target = _common.decode(self._target) 

1201 call = self._managed_call( 

1202 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1203 self._method, 

1204 None, 

1205 deadline, 

1206 metadata, 

1207 None if credentials is None else credentials._credentials, 

1208 (operations,), 

1209 event_handler, 

1210 self._context, 

1211 self._registered_call_handle, 

1212 ) 

1213 return _MultiThreadedRendezvous( 

1214 state, call, self._response_deserializer, deadline 

1215 ) 

1216 

1217 

1218class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1219 _channel: cygrpc.Channel 

1220 _method: bytes 

1221 _target: bytes 

1222 _request_serializer: Optional[SerializingFunction] 

1223 _response_deserializer: Optional[DeserializingFunction] 

1224 _context: Any 

1225 _registered_call_handle: Optional[int] 

1226 

1227 __slots__ = [ 

1228 "_channel", 

1229 "_method", 

1230 "_target", 

1231 "_request_serializer", 

1232 "_response_deserializer", 

1233 "_context", 

1234 ] 

1235 

1236 # pylint: disable=too-many-arguments 

1237 def __init__( 

1238 self, 

1239 channel: cygrpc.Channel, 

1240 method: bytes, 

1241 target: bytes, 

1242 request_serializer: SerializingFunction, 

1243 response_deserializer: DeserializingFunction, 

1244 _registered_call_handle: Optional[int], 

1245 ): 

1246 self._channel = channel 

1247 self._method = method 

1248 self._target = target 

1249 self._request_serializer = request_serializer 

1250 self._response_deserializer = response_deserializer 

1251 self._context = cygrpc.build_census_context() 

1252 self._registered_call_handle = _registered_call_handle 

1253 

1254 def __call__( # pylint: disable=too-many-locals 

1255 self, 

1256 request: Any, 

1257 timeout: Optional[float] = None, 

1258 metadata: Optional[MetadataType] = None, 

1259 credentials: Optional[grpc.CallCredentials] = None, 

1260 wait_for_ready: Optional[bool] = None, 

1261 compression: Optional[grpc.Compression] = None, 

1262 ) -> _SingleThreadedRendezvous: 

1263 deadline = _deadline(timeout) 

1264 serialized_request = _common.serialize( 

1265 request, self._request_serializer 

1266 ) 

1267 if serialized_request is None: 

1268 state = _RPCState( 

1269 (), 

1270 (), 

1271 (), 

1272 grpc.StatusCode.INTERNAL, 

1273 "Exception serializing request!", 

1274 ) 

1275 raise _InactiveRpcError(state) 

1276 

1277 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1278 call_credentials = ( 

1279 None if credentials is None else credentials._credentials 

1280 ) 

1281 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1282 wait_for_ready 

1283 ) 

1284 augmented_metadata = _compression.augment_metadata( 

1285 metadata, compression 

1286 ) 

1287 operations = ( 

1288 ( 

1289 cygrpc.SendInitialMetadataOperation( 

1290 augmented_metadata, initial_metadata_flags 

1291 ), 

1292 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1293 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1294 ), 

1295 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),), 

1296 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1297 ) 

1298 operations_and_tags = tuple((ops, None) for ops in operations) 

1299 state.rpc_start_time = time.perf_counter() 

1300 state.method = _common.decode(self._method) 

1301 state.target = _common.decode(self._target) 

1302 call = self._channel.segregated_call( 

1303 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1304 self._method, 

1305 None, 

1306 _determine_deadline(deadline), 

1307 metadata, 

1308 call_credentials, 

1309 operations_and_tags, 

1310 self._context, 

1311 self._registered_call_handle, 

1312 ) 

1313 return _SingleThreadedRendezvous( 

1314 state, call, self._response_deserializer, deadline 

1315 ) 

1316 

1317 

1318class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1319 _channel: cygrpc.Channel 

1320 _managed_call: IntegratedCallFactory 

1321 _method: bytes 

1322 _target: bytes 

1323 _request_serializer: Optional[SerializingFunction] 

1324 _response_deserializer: Optional[DeserializingFunction] 

1325 _context: Any 

1326 _registered_call_handle: Optional[int] 

1327 

1328 __slots__ = [ 

1329 "_channel", 

1330 "_managed_call", 

1331 "_method", 

1332 "_target", 

1333 "_request_serializer", 

1334 "_response_deserializer", 

1335 "_context", 

1336 ] 

1337 

1338 # pylint: disable=too-many-arguments 

1339 def __init__( 

1340 self, 

1341 channel: cygrpc.Channel, 

1342 managed_call: IntegratedCallFactory, 

1343 method: bytes, 

1344 target: bytes, 

1345 request_serializer: SerializingFunction, 

1346 response_deserializer: DeserializingFunction, 

1347 _registered_call_handle: Optional[int], 

1348 ): 

1349 self._channel = channel 

1350 self._managed_call = managed_call 

1351 self._method = method 

1352 self._target = target 

1353 self._request_serializer = request_serializer 

1354 self._response_deserializer = response_deserializer 

1355 self._context = cygrpc.build_census_context() 

1356 self._registered_call_handle = _registered_call_handle 

1357 

1358 def __call__( # pylint: disable=too-many-locals 

1359 self, 

1360 request: Any, 

1361 timeout: Optional[float] = None, 

1362 metadata: Optional[MetadataType] = None, 

1363 credentials: Optional[grpc.CallCredentials] = None, 

1364 wait_for_ready: Optional[bool] = None, 

1365 compression: Optional[grpc.Compression] = None, 

1366 ) -> _MultiThreadedRendezvous: 

1367 deadline, serialized_request, rendezvous = _start_unary_request( 

1368 request, timeout, self._request_serializer 

1369 ) 

1370 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1371 wait_for_ready 

1372 ) 

1373 if serialized_request is None: 

1374 raise rendezvous # pylint: disable-msg=raising-bad-type 

1375 else: 

1376 augmented_metadata = _compression.augment_metadata( 

1377 metadata, compression 

1378 ) 

1379 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1380 operations = ( 

1381 ( 

1382 cygrpc.SendInitialMetadataOperation( 

1383 augmented_metadata, initial_metadata_flags 

1384 ), 

1385 cygrpc.SendMessageOperation( 

1386 serialized_request, _EMPTY_FLAGS 

1387 ), 

1388 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1389 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1390 ), 

1391 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1392 ) 

1393 state.rpc_start_time = time.perf_counter() 

1394 state.method = _common.decode(self._method) 

1395 state.target = _common.decode(self._target) 

1396 call = self._managed_call( 

1397 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1398 self._method, 

1399 None, 

1400 _determine_deadline(deadline), 

1401 metadata, 

1402 None if credentials is None else credentials._credentials, 

1403 operations, 

1404 _event_handler(state, self._response_deserializer), 

1405 self._context, 

1406 self._registered_call_handle, 

1407 ) 

1408 return _MultiThreadedRendezvous( 

1409 state, call, self._response_deserializer, deadline 

1410 ) 

1411 

1412 

1413class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): 

1414 _channel: cygrpc.Channel 

1415 _managed_call: IntegratedCallFactory 

1416 _method: bytes 

1417 _target: bytes 

1418 _request_serializer: Optional[SerializingFunction] 

1419 _response_deserializer: Optional[DeserializingFunction] 

1420 _context: Any 

1421 _registered_call_handle: Optional[int] 

1422 

1423 __slots__ = [ 

1424 "_channel", 

1425 "_managed_call", 

1426 "_method", 

1427 "_target", 

1428 "_request_serializer", 

1429 "_response_deserializer", 

1430 "_context", 

1431 ] 

1432 

1433 # pylint: disable=too-many-arguments 

1434 def __init__( 

1435 self, 

1436 channel: cygrpc.Channel, 

1437 managed_call: IntegratedCallFactory, 

1438 method: bytes, 

1439 target: bytes, 

1440 request_serializer: Optional[SerializingFunction], 

1441 response_deserializer: Optional[DeserializingFunction], 

1442 _registered_call_handle: Optional[int], 

1443 ): 

1444 self._channel = channel 

1445 self._managed_call = managed_call 

1446 self._method = method 

1447 self._target = target 

1448 self._request_serializer = request_serializer 

1449 self._response_deserializer = response_deserializer 

1450 self._context = cygrpc.build_census_context() 

1451 self._registered_call_handle = _registered_call_handle 

1452 

1453 def _blocking( 

1454 self, 

1455 request_iterator: Iterator, 

1456 timeout: Optional[float], 

1457 metadata: Optional[MetadataType], 

1458 credentials: Optional[grpc.CallCredentials], 

1459 wait_for_ready: Optional[bool], 

1460 compression: Optional[grpc.Compression], 

1461 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 

1462 deadline = _deadline(timeout) 

1463 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1464 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1465 wait_for_ready 

1466 ) 

1467 augmented_metadata = _compression.augment_metadata( 

1468 metadata, compression 

1469 ) 

1470 state.rpc_start_time = time.perf_counter() 

1471 state.method = _common.decode(self._method) 

1472 state.target = _common.decode(self._target) 

1473 call = self._channel.segregated_call( 

1474 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1475 self._method, 

1476 None, 

1477 _determine_deadline(deadline), 

1478 augmented_metadata, 

1479 None if credentials is None else credentials._credentials, 

1480 _stream_unary_invocation_operations_and_tags( 

1481 augmented_metadata, initial_metadata_flags 

1482 ), 

1483 self._context, 

1484 self._registered_call_handle, 

1485 ) 

1486 _consume_request_iterator( 

1487 request_iterator, state, call, self._request_serializer, None 

1488 ) 

1489 while True: 

1490 event = call.next_event() 

1491 with state.condition: 

1492 _handle_event(event, state, self._response_deserializer) 

1493 state.condition.notify_all() 

1494 if not state.due: 

1495 break 

1496 return state, call 

1497 

1498 def __call__( 

1499 self, 

1500 request_iterator: Iterator, 

1501 timeout: Optional[float] = None, 

1502 metadata: Optional[MetadataType] = None, 

1503 credentials: Optional[grpc.CallCredentials] = None, 

1504 wait_for_ready: Optional[bool] = None, 

1505 compression: Optional[grpc.Compression] = None, 

1506 ) -> Any: 

1507 state, call = self._blocking( 

1508 request_iterator, 

1509 timeout, 

1510 metadata, 

1511 credentials, 

1512 wait_for_ready, 

1513 compression, 

1514 ) 

1515 return _end_unary_response_blocking(state, call, False, None) 

1516 

1517 def with_call( 

1518 self, 

1519 request_iterator: Iterator, 

1520 timeout: Optional[float] = None, 

1521 metadata: Optional[MetadataType] = None, 

1522 credentials: Optional[grpc.CallCredentials] = None, 

1523 wait_for_ready: Optional[bool] = None, 

1524 compression: Optional[grpc.Compression] = None, 

1525 ) -> Tuple[Any, grpc.Call]: 

1526 state, call = self._blocking( 

1527 request_iterator, 

1528 timeout, 

1529 metadata, 

1530 credentials, 

1531 wait_for_ready, 

1532 compression, 

1533 ) 

1534 return _end_unary_response_blocking(state, call, True, None) 

1535 

1536 def future( 

1537 self, 

1538 request_iterator: Iterator, 

1539 timeout: Optional[float] = None, 

1540 metadata: Optional[MetadataType] = None, 

1541 credentials: Optional[grpc.CallCredentials] = None, 

1542 wait_for_ready: Optional[bool] = None, 

1543 compression: Optional[grpc.Compression] = None, 

1544 ) -> _MultiThreadedRendezvous: 

1545 deadline = _deadline(timeout) 

1546 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1547 event_handler = _event_handler(state, self._response_deserializer) 

1548 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1549 wait_for_ready 

1550 ) 

1551 augmented_metadata = _compression.augment_metadata( 

1552 metadata, compression 

1553 ) 

1554 state.rpc_start_time = time.perf_counter() 

1555 state.method = _common.decode(self._method) 

1556 state.target = _common.decode(self._target) 

1557 call = self._managed_call( 

1558 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1559 self._method, 

1560 None, 

1561 deadline, 

1562 augmented_metadata, 

1563 None if credentials is None else credentials._credentials, 

1564 _stream_unary_invocation_operations( 

1565 metadata, initial_metadata_flags 

1566 ), 

1567 event_handler, 

1568 self._context, 

1569 self._registered_call_handle, 

1570 ) 

1571 _consume_request_iterator( 

1572 request_iterator, 

1573 state, 

1574 call, 

1575 self._request_serializer, 

1576 event_handler, 

1577 ) 

1578 return _MultiThreadedRendezvous( 

1579 state, call, self._response_deserializer, deadline 

1580 ) 

1581 

1582 

1583class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): 

1584 _channel: cygrpc.Channel 

1585 _managed_call: IntegratedCallFactory 

1586 _method: bytes 

1587 _target: bytes 

1588 _request_serializer: Optional[SerializingFunction] 

1589 _response_deserializer: Optional[DeserializingFunction] 

1590 _context: Any 

1591 _registered_call_handle: Optional[int] 

1592 

1593 __slots__ = [ 

1594 "_channel", 

1595 "_managed_call", 

1596 "_method", 

1597 "_target", 

1598 "_request_serializer", 

1599 "_response_deserializer", 

1600 "_context", 

1601 ] 

1602 

1603 # pylint: disable=too-many-arguments 

1604 def __init__( 

1605 self, 

1606 channel: cygrpc.Channel, 

1607 managed_call: IntegratedCallFactory, 

1608 method: bytes, 

1609 target: bytes, 

1610 request_serializer: Optional[SerializingFunction], 

1611 response_deserializer: Optional[DeserializingFunction], 

1612 _registered_call_handle: Optional[int], 

1613 ): 

1614 self._channel = channel 

1615 self._managed_call = managed_call 

1616 self._method = method 

1617 self._target = target 

1618 self._request_serializer = request_serializer 

1619 self._response_deserializer = response_deserializer 

1620 self._context = cygrpc.build_census_context() 

1621 self._registered_call_handle = _registered_call_handle 

1622 

1623 def __call__( 

1624 self, 

1625 request_iterator: Iterator, 

1626 timeout: Optional[float] = None, 

1627 metadata: Optional[MetadataType] = None, 

1628 credentials: Optional[grpc.CallCredentials] = None, 

1629 wait_for_ready: Optional[bool] = None, 

1630 compression: Optional[grpc.Compression] = None, 

1631 ) -> _MultiThreadedRendezvous: 

1632 deadline = _deadline(timeout) 

1633 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) 

1634 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1635 wait_for_ready 

1636 ) 

1637 augmented_metadata = _compression.augment_metadata( 

1638 metadata, compression 

1639 ) 

1640 operations = ( 

1641 ( 

1642 cygrpc.SendInitialMetadataOperation( 

1643 augmented_metadata, initial_metadata_flags 

1644 ), 

1645 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1646 ), 

1647 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1648 ) 

1649 event_handler = _event_handler(state, self._response_deserializer) 

1650 state.rpc_start_time = time.perf_counter() 

1651 state.method = _common.decode(self._method) 

1652 state.target = _common.decode(self._target) 

1653 call = self._managed_call( 

1654 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1655 self._method, 

1656 None, 

1657 _determine_deadline(deadline), 

1658 augmented_metadata, 

1659 None if credentials is None else credentials._credentials, 

1660 operations, 

1661 event_handler, 

1662 self._context, 

1663 self._registered_call_handle, 

1664 ) 

1665 _consume_request_iterator( 

1666 request_iterator, 

1667 state, 

1668 call, 

1669 self._request_serializer, 

1670 event_handler, 

1671 ) 

1672 return _MultiThreadedRendezvous( 

1673 state, call, self._response_deserializer, deadline 

1674 ) 

1675 

1676 

1677class _InitialMetadataFlags(int): 

1678 """Stores immutable initial metadata flags""" 

1679 

1680 def __new__(cls, value: int = _EMPTY_FLAGS): 

1681 value &= cygrpc.InitialMetadataFlags.used_mask 

1682 return super(_InitialMetadataFlags, cls).__new__(cls, value) 

1683 

1684 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int: 

1685 if wait_for_ready is not None: 

1686 if wait_for_ready: 

1687 return self.__class__( 

1688 self 

1689 | cygrpc.InitialMetadataFlags.wait_for_ready 

1690 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1691 ) 

1692 if not wait_for_ready: 

1693 return self.__class__( 

1694 self & ~cygrpc.InitialMetadataFlags.wait_for_ready 

1695 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 

1696 ) 

1697 return self 

1698 

1699 

1700class _ChannelCallState(object): 

1701 channel: cygrpc.Channel 

1702 managed_calls: int 

1703 threading: bool 

1704 

1705 def __init__(self, channel: cygrpc.Channel): 

1706 self.lock = threading.Lock() 

1707 self.channel = channel 

1708 self.managed_calls = 0 

1709 self.threading = False 

1710 

1711 def reset_postfork_child(self) -> None: 

1712 self.managed_calls = 0 

1713 

1714 def __del__(self): 

1715 try: 

1716 self.channel.close( 

1717 cygrpc.StatusCode.cancelled, "Channel deallocated!" 

1718 ) 

1719 except (TypeError, AttributeError): 

1720 pass 

1721 

1722 

1723def _run_channel_spin_thread(state: _ChannelCallState) -> None: 

1724 def channel_spin(): 

1725 while True: 

1726 cygrpc.block_if_fork_in_progress(state) 

1727 event = state.channel.next_call_event() 

1728 if event.completion_type == cygrpc.CompletionType.queue_timeout: 

1729 continue 

1730 call_completed = event.tag(event) 

1731 if call_completed: 

1732 with state.lock: 

1733 state.managed_calls -= 1 

1734 if state.managed_calls == 0: 

1735 return 

1736 

1737 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin) 

1738 channel_spin_thread.setDaemon(True) 

1739 channel_spin_thread.start() 

1740 

1741 

1742def _channel_managed_call_management(state: _ChannelCallState): 

1743 # pylint: disable=too-many-arguments 

1744 def create( 

1745 flags: int, 

1746 method: bytes, 

1747 host: Optional[str], 

1748 deadline: Optional[float], 

1749 metadata: Optional[MetadataType], 

1750 credentials: Optional[cygrpc.CallCredentials], 

1751 operations: Sequence[Sequence[cygrpc.Operation]], 

1752 event_handler: UserTag, 

1753 context: Any, 

1754 _registered_call_handle: Optional[int], 

1755 ) -> cygrpc.IntegratedCall: 

1756 """Creates a cygrpc.IntegratedCall. 

1757 

1758 Args: 

1759 flags: An integer bitfield of call flags. 

1760 method: The RPC method. 

1761 host: A host string for the created call. 

1762 deadline: A float to be the deadline of the created call or None if 

1763 the call is to have an infinite deadline. 

1764 metadata: The metadata for the call or None. 

1765 credentials: A cygrpc.CallCredentials or None. 

1766 operations: A sequence of sequences of cygrpc.Operations to be 

1767 started on the call. 

1768 event_handler: A behavior to call to handle the events resultant from 

1769 the operations on the call. 

1770 context: Context object for distributed tracing. 

1771 _registered_call_handle: An int representing the call handle of the 

1772 method, or None if the method is not registered. 

1773 

1774 Returns: 

1775 A cygrpc.IntegratedCall with which to conduct an RPC. 

1776 """ 

1777 operations_and_tags = tuple( 

1778 ( 

1779 operation, 

1780 event_handler, 

1781 ) 

1782 for operation in operations 

1783 ) 

1784 with state.lock: 

1785 call = state.channel.integrated_call( 

1786 flags, 

1787 method, 

1788 host, 

1789 deadline, 

1790 metadata, 

1791 credentials, 

1792 operations_and_tags, 

1793 context, 

1794 _registered_call_handle, 

1795 ) 

1796 if state.managed_calls == 0: 

1797 state.managed_calls = 1 

1798 _run_channel_spin_thread(state) 

1799 else: 

1800 state.managed_calls += 1 

1801 return call 

1802 

1803 return create 

1804 

1805 

1806class _ChannelConnectivityState(object): 

1807 lock: threading.RLock 

1808 channel: grpc.Channel 

1809 polling: bool 

1810 connectivity: grpc.ChannelConnectivity 

1811 try_to_connect: bool 

1812 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704 

1813 callbacks_and_connectivities: List[ 

1814 Sequence[ 

1815 Union[ 

1816 Callable[[grpc.ChannelConnectivity], None], 

1817 Optional[grpc.ChannelConnectivity], 

1818 ] 

1819 ] 

1820 ] 

1821 delivering: bool 

1822 

1823 def __init__(self, channel: grpc.Channel): 

1824 self.lock = threading.RLock() 

1825 self.channel = channel 

1826 self.polling = False 

1827 self.connectivity = None 

1828 self.try_to_connect = False 

1829 self.callbacks_and_connectivities = [] 

1830 self.delivering = False 

1831 

1832 def reset_postfork_child(self) -> None: 

1833 self.polling = False 

1834 self.connectivity = None 

1835 self.try_to_connect = False 

1836 self.callbacks_and_connectivities = [] 

1837 self.delivering = False 

1838 

1839 

1840def _deliveries( 

1841 state: _ChannelConnectivityState, 

1842) -> List[Callable[[grpc.ChannelConnectivity], None]]: 

1843 callbacks_needing_update = [] 

1844 for callback_and_connectivity in state.callbacks_and_connectivities: 

1845 callback, callback_connectivity = callback_and_connectivity 

1846 if callback_connectivity is not state.connectivity: 

1847 callbacks_needing_update.append(callback) 

1848 callback_and_connectivity[1] = state.connectivity 

1849 return callbacks_needing_update 

1850 

1851 

1852def _deliver( 

1853 state: _ChannelConnectivityState, 

1854 initial_connectivity: grpc.ChannelConnectivity, 

1855 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1856) -> None: 

1857 connectivity = initial_connectivity 

1858 callbacks = initial_callbacks 

1859 while True: 

1860 for callback in callbacks: 

1861 cygrpc.block_if_fork_in_progress(state) 

1862 try: 

1863 callback(connectivity) 

1864 except Exception: # pylint: disable=broad-except 

1865 _LOGGER.exception( 

1866 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE 

1867 ) 

1868 with state.lock: 

1869 callbacks = _deliveries(state) 

1870 if callbacks: 

1871 connectivity = state.connectivity 

1872 else: 

1873 state.delivering = False 

1874 return 

1875 

1876 

1877def _spawn_delivery( 

1878 state: _ChannelConnectivityState, 

1879 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 

1880) -> None: 

1881 delivering_thread = cygrpc.ForkManagedThread( 

1882 target=_deliver, 

1883 args=( 

1884 state, 

1885 state.connectivity, 

1886 callbacks, 

1887 ), 

1888 ) 

1889 delivering_thread.setDaemon(True) 

1890 delivering_thread.start() 

1891 state.delivering = True 

1892 

1893 

1894# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll. 

1895def _poll_connectivity( 

1896 state: _ChannelConnectivityState, 

1897 channel: grpc.Channel, 

1898 initial_try_to_connect: bool, 

1899) -> None: 

1900 try_to_connect = initial_try_to_connect 

1901 connectivity = channel.check_connectivity_state(try_to_connect) 

1902 with state.lock: 

1903 state.connectivity = ( 

1904 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1905 connectivity 

1906 ] 

1907 ) 

1908 callbacks = tuple( 

1909 callback for callback, _ in state.callbacks_and_connectivities 

1910 ) 

1911 for callback_and_connectivity in state.callbacks_and_connectivities: 

1912 callback_and_connectivity[1] = state.connectivity 

1913 if callbacks: 

1914 _spawn_delivery(state, callbacks) 

1915 while True: 

1916 event = channel.watch_connectivity_state( 

1917 connectivity, time.time() + 0.2 

1918 ) 

1919 cygrpc.block_if_fork_in_progress(state) 

1920 with state.lock: 

1921 if ( 

1922 not state.callbacks_and_connectivities 

1923 and not state.try_to_connect 

1924 ): 

1925 state.polling = False 

1926 state.connectivity = None 

1927 break 

1928 try_to_connect = state.try_to_connect 

1929 state.try_to_connect = False 

1930 if event.success or try_to_connect: 

1931 connectivity = channel.check_connectivity_state(try_to_connect) 

1932 with state.lock: 

1933 state.connectivity = ( 

1934 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1935 connectivity 

1936 ] 

1937 ) 

1938 if not state.delivering: 

1939 callbacks = _deliveries(state) 

1940 if callbacks: 

1941 _spawn_delivery(state, callbacks) 

1942 

1943 

1944def _subscribe( 

1945 state: _ChannelConnectivityState, 

1946 callback: Callable[[grpc.ChannelConnectivity], None], 

1947 try_to_connect: bool, 

1948) -> None: 

1949 with state.lock: 

1950 if not state.callbacks_and_connectivities and not state.polling: 

1951 polling_thread = cygrpc.ForkManagedThread( 

1952 target=_poll_connectivity, 

1953 args=(state, state.channel, bool(try_to_connect)), 

1954 ) 

1955 polling_thread.setDaemon(True) 

1956 polling_thread.start() 

1957 state.polling = True 

1958 state.callbacks_and_connectivities.append([callback, None]) 

1959 elif not state.delivering and state.connectivity is not None: 

1960 _spawn_delivery(state, (callback,)) 

1961 state.try_to_connect |= bool(try_to_connect) 

1962 state.callbacks_and_connectivities.append( 

1963 [callback, state.connectivity] 

1964 ) 

1965 else: 

1966 state.try_to_connect |= bool(try_to_connect) 

1967 state.callbacks_and_connectivities.append([callback, None]) 

1968 

1969 

1970def _unsubscribe( 

1971 state: _ChannelConnectivityState, 

1972 callback: Callable[[grpc.ChannelConnectivity], None], 

1973) -> None: 

1974 with state.lock: 

1975 for index, (subscribed_callback, _unused_connectivity) in enumerate( 

1976 state.callbacks_and_connectivities 

1977 ): 

1978 if callback == subscribed_callback: 

1979 state.callbacks_and_connectivities.pop(index) 

1980 break 

1981 

1982 

1983def _augment_options( 

1984 base_options: Sequence[ChannelArgumentType], 

1985 compression: Optional[grpc.Compression], 

1986) -> Sequence[ChannelArgumentType]: 

1987 compression_option = _compression.create_channel_option(compression) 

1988 return ( 

1989 tuple(base_options) 

1990 + compression_option 

1991 + ( 

1992 ( 

1993 cygrpc.ChannelArgKey.primary_user_agent_string, 

1994 _USER_AGENT, 

1995 ), 

1996 ) 

1997 ) 

1998 

1999 

2000def _separate_channel_options( 

2001 options: Sequence[ChannelArgumentType], 

2002) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]: 

2003 """Separates core channel options from Python channel options.""" 

2004 core_options = [] 

2005 python_options = [] 

2006 for pair in options: 

2007 if ( 

2008 pair[0] 

2009 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

2010 ): 

2011 python_options.append(pair) 

2012 else: 

2013 core_options.append(pair) 

2014 return python_options, core_options 

2015 

2016 

2017class Channel(grpc.Channel): 

2018 """A cygrpc.Channel-backed implementation of grpc.Channel.""" 

2019 

2020 _single_threaded_unary_stream: bool 

2021 _channel: cygrpc.Channel 

2022 _call_state: _ChannelCallState 

2023 _connectivity_state: _ChannelConnectivityState 

2024 _target: str 

2025 _registered_call_handles: Dict[str, int] 

2026 

2027 def __init__( 

2028 self, 

2029 target: str, 

2030 options: Sequence[ChannelArgumentType], 

2031 credentials: Optional[grpc.ChannelCredentials], 

2032 compression: Optional[grpc.Compression], 

2033 ): 

2034 """Constructor. 

2035 

2036 Args: 

2037 target: The target to which to connect. 

2038 options: Configuration options for the channel. 

2039 credentials: A cygrpc.ChannelCredentials or None. 

2040 compression: An optional value indicating the compression method to be 

2041 used over the lifetime of the channel. 

2042 """ 

2043 python_options, core_options = _separate_channel_options(options) 

2044 self._single_threaded_unary_stream = ( 

2045 _DEFAULT_SINGLE_THREADED_UNARY_STREAM 

2046 ) 

2047 self._process_python_options(python_options) 

2048 self._channel = cygrpc.Channel( 

2049 _common.encode(target), 

2050 _augment_options(core_options, compression), 

2051 credentials, 

2052 ) 

2053 self._target = target 

2054 self._call_state = _ChannelCallState(self._channel) 

2055 self._connectivity_state = _ChannelConnectivityState(self._channel) 

2056 cygrpc.fork_register_channel(self) 

2057 if cygrpc.g_gevent_activated: 

2058 cygrpc.gevent_increment_channel_count() 

2059 

2060 def _get_registered_call_handle(self, method: str) -> int: 

2061 """ 

2062 Get the registered call handle for a method. 

2063 

2064 This is a semi-private method. It is intended for use only by gRPC generated code. 

2065 

2066 This method is not thread-safe. 

2067 

2068 Args: 

2069 method: Required, the method name for the RPC. 

2070 

2071 Returns: 

2072 The registered call handle pointer in the form of a Python Long. 

2073 """ 

2074 return self._channel.get_registered_call_handle(_common.encode(method)) 

2075 

2076 def _process_python_options( 

2077 self, python_options: Sequence[ChannelArgumentType] 

2078 ) -> None: 

2079 """Sets channel attributes according to python-only channel options.""" 

2080 for pair in python_options: 

2081 if ( 

2082 pair[0] 

2083 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 

2084 ): 

2085 self._single_threaded_unary_stream = True 

2086 

2087 def subscribe( 

2088 self, 

2089 callback: Callable[[grpc.ChannelConnectivity], None], 

2090 try_to_connect: Optional[bool] = None, 

2091 ) -> None: 

2092 _subscribe(self._connectivity_state, callback, try_to_connect) 

2093 

2094 def unsubscribe( 

2095 self, callback: Callable[[grpc.ChannelConnectivity], None] 

2096 ) -> None: 

2097 _unsubscribe(self._connectivity_state, callback) 

2098 

2099 # pylint: disable=arguments-differ 

2100 def unary_unary( 

2101 self, 

2102 method: str, 

2103 request_serializer: Optional[SerializingFunction] = None, 

2104 response_deserializer: Optional[DeserializingFunction] = None, 

2105 _registered_method: Optional[bool] = False, 

2106 ) -> grpc.UnaryUnaryMultiCallable: 

2107 _registered_call_handle = None 

2108 if _registered_method: 

2109 _registered_call_handle = self._get_registered_call_handle(method) 

2110 return _UnaryUnaryMultiCallable( 

2111 self._channel, 

2112 _channel_managed_call_management(self._call_state), 

2113 _common.encode(method), 

2114 _common.encode(self._target), 

2115 request_serializer, 

2116 response_deserializer, 

2117 _registered_call_handle, 

2118 ) 

2119 

2120 # pylint: disable=arguments-differ 

2121 def unary_stream( 

2122 self, 

2123 method: str, 

2124 request_serializer: Optional[SerializingFunction] = None, 

2125 response_deserializer: Optional[DeserializingFunction] = None, 

2126 _registered_method: Optional[bool] = False, 

2127 ) -> grpc.UnaryStreamMultiCallable: 

2128 _registered_call_handle = None 

2129 if _registered_method: 

2130 _registered_call_handle = self._get_registered_call_handle(method) 

2131 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC 

2132 # on a single Python thread results in an appreciable speed-up. However, 

2133 # due to slight differences in capability, the multi-threaded variant 

2134 # remains the default. 

2135 if self._single_threaded_unary_stream: 

2136 return _SingleThreadedUnaryStreamMultiCallable( 

2137 self._channel, 

2138 _common.encode(method), 

2139 _common.encode(self._target), 

2140 request_serializer, 

2141 response_deserializer, 

2142 _registered_call_handle, 

2143 ) 

2144 return _UnaryStreamMultiCallable( 

2145 self._channel, 

2146 _channel_managed_call_management(self._call_state), 

2147 _common.encode(method), 

2148 _common.encode(self._target), 

2149 request_serializer, 

2150 response_deserializer, 

2151 _registered_call_handle, 

2152 ) 

2153 

2154 # pylint: disable=arguments-differ 

2155 def stream_unary( 

2156 self, 

2157 method: str, 

2158 request_serializer: Optional[SerializingFunction] = None, 

2159 response_deserializer: Optional[DeserializingFunction] = None, 

2160 _registered_method: Optional[bool] = False, 

2161 ) -> grpc.StreamUnaryMultiCallable: 

2162 _registered_call_handle = None 

2163 if _registered_method: 

2164 _registered_call_handle = self._get_registered_call_handle(method) 

2165 return _StreamUnaryMultiCallable( 

2166 self._channel, 

2167 _channel_managed_call_management(self._call_state), 

2168 _common.encode(method), 

2169 _common.encode(self._target), 

2170 request_serializer, 

2171 response_deserializer, 

2172 _registered_call_handle, 

2173 ) 

2174 

2175 # pylint: disable=arguments-differ 

2176 def stream_stream( 

2177 self, 

2178 method: str, 

2179 request_serializer: Optional[SerializingFunction] = None, 

2180 response_deserializer: Optional[DeserializingFunction] = None, 

2181 _registered_method: Optional[bool] = False, 

2182 ) -> grpc.StreamStreamMultiCallable: 

2183 _registered_call_handle = None 

2184 if _registered_method: 

2185 _registered_call_handle = self._get_registered_call_handle(method) 

2186 return _StreamStreamMultiCallable( 

2187 self._channel, 

2188 _channel_managed_call_management(self._call_state), 

2189 _common.encode(method), 

2190 _common.encode(self._target), 

2191 request_serializer, 

2192 response_deserializer, 

2193 _registered_call_handle, 

2194 ) 

2195 

2196 def _unsubscribe_all(self) -> None: 

2197 state = self._connectivity_state 

2198 if state: 

2199 with state.lock: 

2200 del state.callbacks_and_connectivities[:] 

2201 

2202 def _close(self) -> None: 

2203 self._unsubscribe_all() 

2204 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!") 

2205 cygrpc.fork_unregister_channel(self) 

2206 if cygrpc.g_gevent_activated: 

2207 cygrpc.gevent_decrement_channel_count() 

2208 

2209 def _close_on_fork(self) -> None: 

2210 self._unsubscribe_all() 

2211 self._channel.close_on_fork( 

2212 cygrpc.StatusCode.cancelled, "Channel closed due to fork" 

2213 ) 

2214 

2215 def __enter__(self): 

2216 return self 

2217 

2218 def __exit__(self, exc_type, exc_val, exc_tb): 

2219 self._close() 

2220 return False 

2221 

2222 def close(self) -> None: 

2223 self._close() 

2224 

2225 def __del__(self): 

2226 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases 

2227 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call 

2228 # here (or more likely, call self._close() here). We don't do this today 

2229 # because many valid use cases today allow the channel to be deleted 

2230 # immediately after stubs are created. After a sufficient period of time 

2231 # has passed for all users to be trusted to freeze out to their channels 

2232 # for as long as they are in use and to close them after using them, 

2233 # then deletion of this grpc._channel.Channel instance can be made to 

2234 # effect closure of the underlying cygrpc.Channel instance. 

2235 try: 

2236 self._unsubscribe_all() 

2237 except: # pylint: disable=bare-except # noqa: E722 

2238 # Exceptions in __del__ are ignored by Python anyway, but they can 

2239 # keep spamming logs. Just silence them. 

2240 pass