Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_channel.py: 20%

822 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 07:30 +0000

1# Copyright 2016 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Invocation-side implementation of gRPC Python.""" 

15 

16import copy 

17import functools 

18import logging 

19import os 

20import sys 

21import threading 

22import time 

23 

24import grpc 

25from grpc import _common 

26from grpc import _compression 

27from grpc import _grpcio_metadata 

28from grpc._cython import cygrpc 

29import grpc.experimental 

30 

31_LOGGER = logging.getLogger(__name__) 

32 

33_USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__) 

34 

35_EMPTY_FLAGS = 0 

36 

37# NOTE(rbellevi): No guarantees are given about the maintenance of this 

38# environment variable. 

39_DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv( 

40 "GRPC_SINGLE_THREADED_UNARY_STREAM") is not None 

41 

42_UNARY_UNARY_INITIAL_DUE = ( 

43 cygrpc.OperationType.send_initial_metadata, 

44 cygrpc.OperationType.send_message, 

45 cygrpc.OperationType.send_close_from_client, 

46 cygrpc.OperationType.receive_initial_metadata, 

47 cygrpc.OperationType.receive_message, 

48 cygrpc.OperationType.receive_status_on_client, 

49) 

50_UNARY_STREAM_INITIAL_DUE = ( 

51 cygrpc.OperationType.send_initial_metadata, 

52 cygrpc.OperationType.send_message, 

53 cygrpc.OperationType.send_close_from_client, 

54 cygrpc.OperationType.receive_initial_metadata, 

55 cygrpc.OperationType.receive_status_on_client, 

56) 

57_STREAM_UNARY_INITIAL_DUE = ( 

58 cygrpc.OperationType.send_initial_metadata, 

59 cygrpc.OperationType.receive_initial_metadata, 

60 cygrpc.OperationType.receive_message, 

61 cygrpc.OperationType.receive_status_on_client, 

62) 

63_STREAM_STREAM_INITIAL_DUE = ( 

64 cygrpc.OperationType.send_initial_metadata, 

65 cygrpc.OperationType.receive_initial_metadata, 

66 cygrpc.OperationType.receive_status_on_client, 

67) 

68 

69_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 

70 'Exception calling channel subscription callback!') 

71 

72_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n' 

73 '\tstatus = {}\n' 

74 '\tdetails = "{}"\n' 

75 '>') 

76 

77_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n' 

78 '\tstatus = {}\n' 

79 '\tdetails = "{}"\n' 

80 '\tdebug_error_string = "{}"\n' 

81 '>') 

82 

83 

84def _deadline(timeout): 

85 return None if timeout is None else time.time() + timeout 

86 

87 

88def _unknown_code_details(unknown_cygrpc_code, details): 

89 return 'Server sent unknown code {} and details "{}"'.format( 

90 unknown_cygrpc_code, details) 

91 

92 

93class _RPCState(object): 

94 

95 def __init__(self, due, initial_metadata, trailing_metadata, code, details): 

96 # `condition` guards all members of _RPCState. `notify_all` is called on 

97 # `condition` when the state of the RPC has changed. 

98 self.condition = threading.Condition() 

99 

100 # The cygrpc.OperationType objects representing events due from the RPC's 

101 # completion queue. If an operation is in `due`, it is guaranteed that 

102 # `operate()` has been called on a corresponding operation. But the 

103 # converse is not true. That is, in the case of failed `operate()` 

104 # calls, there may briefly be events in `due` that do not correspond to 

105 # operations submitted to Core. 

106 self.due = set(due) 

107 self.initial_metadata = initial_metadata 

108 self.response = None 

109 self.trailing_metadata = trailing_metadata 

110 self.code = code 

111 self.details = details 

112 self.debug_error_string = None 

113 

114 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are 

115 # slightly wonky, so they have to be tracked separately from the rest of the 

116 # result of the RPC. This field tracks whether cancellation was requested 

117 # prior to termination of the RPC. 

118 self.cancelled = False 

119 self.callbacks = [] 

120 self.fork_epoch = cygrpc.get_fork_epoch() 

121 

122 def reset_postfork_child(self): 

123 self.condition = threading.Condition() 

124 

125 

126def _abort(state, code, details): 

127 if state.code is None: 

128 state.code = code 

129 state.details = details 

130 if state.initial_metadata is None: 

131 state.initial_metadata = () 

132 state.trailing_metadata = () 

133 

134 

135def _handle_event(event, state, response_deserializer): 

136 callbacks = [] 

137 for batch_operation in event.batch_operations: 

138 operation_type = batch_operation.type() 

139 state.due.remove(operation_type) 

140 if operation_type == cygrpc.OperationType.receive_initial_metadata: 

141 state.initial_metadata = batch_operation.initial_metadata() 

142 elif operation_type == cygrpc.OperationType.receive_message: 

143 serialized_response = batch_operation.message() 

144 if serialized_response is not None: 

145 response = _common.deserialize(serialized_response, 

146 response_deserializer) 

147 if response is None: 

148 details = 'Exception deserializing response!' 

149 _abort(state, grpc.StatusCode.INTERNAL, details) 

150 else: 

151 state.response = response 

152 elif operation_type == cygrpc.OperationType.receive_status_on_client: 

153 state.trailing_metadata = batch_operation.trailing_metadata() 

154 if state.code is None: 

155 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( 

156 batch_operation.code()) 

157 if code is None: 

158 state.code = grpc.StatusCode.UNKNOWN 

159 state.details = _unknown_code_details( 

160 code, batch_operation.details()) 

161 else: 

162 state.code = code 

163 state.details = batch_operation.details() 

164 state.debug_error_string = batch_operation.error_string() 

165 callbacks.extend(state.callbacks) 

166 state.callbacks = None 

167 return callbacks 

168 

169 

170def _event_handler(state, response_deserializer): 

171 

172 def handle_event(event): 

173 with state.condition: 

174 callbacks = _handle_event(event, state, response_deserializer) 

175 state.condition.notify_all() 

176 done = not state.due 

177 for callback in callbacks: 

178 try: 

179 callback() 

180 except Exception as e: # pylint: disable=broad-except 

181 # NOTE(rbellevi): We suppress but log errors here so as not to 

182 # kill the channel spin thread. 

183 logging.error('Exception in callback %s: %s', 

184 repr(callback.func), repr(e)) 

185 return done and state.fork_epoch >= cygrpc.get_fork_epoch() 

186 

187 return handle_event 

188 

189 

190#pylint: disable=too-many-statements 

191def _consume_request_iterator(request_iterator, state, call, request_serializer, 

192 event_handler): 

193 """Consume a request iterator supplied by the user.""" 

194 

195 def consume_request_iterator(): # pylint: disable=too-many-branches 

196 # Iterate over the request iterator until it is exhausted or an error 

197 # condition is encountered. 

198 while True: 

199 return_from_user_request_generator_invoked = False 

200 try: 

201 # The thread may die in user-code. Do not block fork for this. 

202 cygrpc.enter_user_request_generator() 

203 request = next(request_iterator) 

204 except StopIteration: 

205 break 

206 except Exception: # pylint: disable=broad-except 

207 cygrpc.return_from_user_request_generator() 

208 return_from_user_request_generator_invoked = True 

209 code = grpc.StatusCode.UNKNOWN 

210 details = 'Exception iterating requests!' 

211 _LOGGER.exception(details) 

212 call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 

213 details) 

214 _abort(state, code, details) 

215 return 

216 finally: 

217 if not return_from_user_request_generator_invoked: 

218 cygrpc.return_from_user_request_generator() 

219 serialized_request = _common.serialize(request, request_serializer) 

220 with state.condition: 

221 if state.code is None and not state.cancelled: 

222 if serialized_request is None: 

223 code = grpc.StatusCode.INTERNAL 

224 details = 'Exception serializing request!' 

225 call.cancel( 

226 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 

227 details) 

228 _abort(state, code, details) 

229 return 

230 else: 

231 state.due.add(cygrpc.OperationType.send_message) 

232 operations = (cygrpc.SendMessageOperation( 

233 serialized_request, _EMPTY_FLAGS),) 

234 operating = call.operate(operations, event_handler) 

235 if not operating: 

236 state.due.remove(cygrpc.OperationType.send_message) 

237 return 

238 

239 def _done(): 

240 return (state.code is not None or 

241 cygrpc.OperationType.send_message 

242 not in state.due) 

243 

244 _common.wait(state.condition.wait, 

245 _done, 

246 spin_cb=functools.partial( 

247 cygrpc.block_if_fork_in_progress, 

248 state)) 

249 if state.code is not None: 

250 return 

251 else: 

252 return 

253 with state.condition: 

254 if state.code is None: 

255 state.due.add(cygrpc.OperationType.send_close_from_client) 

256 operations = ( 

257 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),) 

258 operating = call.operate(operations, event_handler) 

259 if not operating: 

260 state.due.remove( 

261 cygrpc.OperationType.send_close_from_client) 

262 

263 consumption_thread = cygrpc.ForkManagedThread( 

264 target=consume_request_iterator) 

265 consumption_thread.setDaemon(True) 

266 consumption_thread.start() 

267 

268 

269def _rpc_state_string(class_name, rpc_state): 

270 """Calculates error string for RPC.""" 

271 with rpc_state.condition: 

272 if rpc_state.code is None: 

273 return '<{} object>'.format(class_name) 

274 elif rpc_state.code is grpc.StatusCode.OK: 

275 return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code, 

276 rpc_state.details) 

277 else: 

278 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( 

279 class_name, rpc_state.code, rpc_state.details, 

280 rpc_state.debug_error_string) 

281 

282 

283class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future): 

284 """An RPC error not tied to the execution of a particular RPC. 

285 

286 The RPC represented by the state object must not be in-progress or 

287 cancelled. 

288 

289 Attributes: 

290 _state: An instance of _RPCState. 

291 """ 

292 

293 def __init__(self, state): 

294 with state.condition: 

295 self._state = _RPCState((), copy.deepcopy(state.initial_metadata), 

296 copy.deepcopy(state.trailing_metadata), 

297 state.code, copy.deepcopy(state.details)) 

298 self._state.response = copy.copy(state.response) 

299 self._state.debug_error_string = copy.copy(state.debug_error_string) 

300 

301 def initial_metadata(self): 

302 return self._state.initial_metadata 

303 

304 def trailing_metadata(self): 

305 return self._state.trailing_metadata 

306 

307 def code(self): 

308 return self._state.code 

309 

310 def details(self): 

311 return _common.decode(self._state.details) 

312 

313 def debug_error_string(self): 

314 return _common.decode(self._state.debug_error_string) 

315 

316 def _repr(self): 

317 return _rpc_state_string(self.__class__.__name__, self._state) 

318 

319 def __repr__(self): 

320 return self._repr() 

321 

322 def __str__(self): 

323 return self._repr() 

324 

325 def cancel(self): 

326 """See grpc.Future.cancel.""" 

327 return False 

328 

329 def cancelled(self): 

330 """See grpc.Future.cancelled.""" 

331 return False 

332 

333 def running(self): 

334 """See grpc.Future.running.""" 

335 return False 

336 

337 def done(self): 

338 """See grpc.Future.done.""" 

339 return True 

340 

341 def result(self, timeout=None): # pylint: disable=unused-argument 

342 """See grpc.Future.result.""" 

343 raise self 

344 

345 def exception(self, timeout=None): # pylint: disable=unused-argument 

346 """See grpc.Future.exception.""" 

347 return self 

348 

349 def traceback(self, timeout=None): # pylint: disable=unused-argument 

350 """See grpc.Future.traceback.""" 

351 try: 

352 raise self 

353 except grpc.RpcError: 

354 return sys.exc_info()[2] 

355 

356 def add_done_callback(self, fn, timeout=None): # pylint: disable=unused-argument 

357 """See grpc.Future.add_done_callback.""" 

358 fn(self) 

359 

360 

361class _Rendezvous(grpc.RpcError, grpc.RpcContext): 

362 """An RPC iterator. 

363 

364 Attributes: 

365 _state: An instance of _RPCState. 

366 _call: An instance of SegregatedCall or IntegratedCall. 

367 In either case, the _call object is expected to have operate, cancel, 

368 and next_event methods. 

369 _response_deserializer: A callable taking bytes and return a Python 

370 object. 

371 _deadline: A float representing the deadline of the RPC in seconds. Or 

372 possibly None, to represent an RPC with no deadline at all. 

373 """ 

374 

375 def __init__(self, state, call, response_deserializer, deadline): 

376 super(_Rendezvous, self).__init__() 

377 self._state = state 

378 self._call = call 

379 self._response_deserializer = response_deserializer 

380 self._deadline = deadline 

381 

382 def is_active(self): 

383 """See grpc.RpcContext.is_active""" 

384 with self._state.condition: 

385 return self._state.code is None 

386 

387 def time_remaining(self): 

388 """See grpc.RpcContext.time_remaining""" 

389 with self._state.condition: 

390 if self._deadline is None: 

391 return None 

392 else: 

393 return max(self._deadline - time.time(), 0) 

394 

395 def cancel(self): 

396 """See grpc.RpcContext.cancel""" 

397 with self._state.condition: 

398 if self._state.code is None: 

399 code = grpc.StatusCode.CANCELLED 

400 details = 'Locally cancelled by application!' 

401 self._call.cancel( 

402 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details) 

403 self._state.cancelled = True 

404 _abort(self._state, code, details) 

405 self._state.condition.notify_all() 

406 return True 

407 else: 

408 return False 

409 

410 def add_callback(self, callback): 

411 """See grpc.RpcContext.add_callback""" 

412 with self._state.condition: 

413 if self._state.callbacks is None: 

414 return False 

415 else: 

416 self._state.callbacks.append(callback) 

417 return True 

418 

419 def __iter__(self): 

420 return self 

421 

422 def next(self): 

423 return self._next() 

424 

425 def __next__(self): 

426 return self._next() 

427 

428 def _next(self): 

429 raise NotImplementedError() 

430 

431 def debug_error_string(self): 

432 raise NotImplementedError() 

433 

434 def _repr(self): 

435 return _rpc_state_string(self.__class__.__name__, self._state) 

436 

437 def __repr__(self): 

438 return self._repr() 

439 

440 def __str__(self): 

441 return self._repr() 

442 

443 def __del__(self): 

444 with self._state.condition: 

445 if self._state.code is None: 

446 self._state.code = grpc.StatusCode.CANCELLED 

447 self._state.details = 'Cancelled upon garbage collection!' 

448 self._state.cancelled = True 

449 self._call.cancel( 

450 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], 

451 self._state.details) 

452 self._state.condition.notify_all() 

453 

454 

455class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors 

456 """An RPC iterator operating entirely on a single thread. 

457 

458 The __next__ method of _SingleThreadedRendezvous does not depend on the 

459 existence of any other thread, including the "channel spin thread". 

460 However, this means that its interface is entirely synchronous. So this 

461 class cannot completely fulfill the grpc.Future interface. The result, 

462 exception, and traceback methods will never block and will instead raise 

463 an exception if calling the method would result in blocking. 

464 

465 This means that these methods are safe to call from add_done_callback 

466 handlers. 

467 """ 

468 

469 def _is_complete(self): 

470 return self._state.code is not None 

471 

472 def cancelled(self): 

473 with self._state.condition: 

474 return self._state.cancelled 

475 

476 def running(self): 

477 with self._state.condition: 

478 return self._state.code is None 

479 

480 def done(self): 

481 with self._state.condition: 

482 return self._state.code is not None 

483 

484 def result(self, timeout=None): 

485 """Returns the result of the computation or raises its exception. 

486 

487 This method will never block. Instead, it will raise an exception 

488 if calling this method would otherwise result in blocking. 

489 

490 Since this method will never block, any `timeout` argument passed will 

491 be ignored. 

492 """ 

493 del timeout 

494 with self._state.condition: 

495 if not self._is_complete(): 

496 raise grpc.experimental.UsageError( 

497 "_SingleThreadedRendezvous only supports result() when the RPC is complete." 

498 ) 

499 if self._state.code is grpc.StatusCode.OK: 

500 return self._state.response 

501 elif self._state.cancelled: 

502 raise grpc.FutureCancelledError() 

503 else: 

504 raise self 

505 

506 def exception(self, timeout=None): 

507 """Return the exception raised by the computation. 

508 

509 This method will never block. Instead, it will raise an exception 

510 if calling this method would otherwise result in blocking. 

511 

512 Since this method will never block, any `timeout` argument passed will 

513 be ignored. 

514 """ 

515 del timeout 

516 with self._state.condition: 

517 if not self._is_complete(): 

518 raise grpc.experimental.UsageError( 

519 "_SingleThreadedRendezvous only supports exception() when the RPC is complete." 

520 ) 

521 if self._state.code is grpc.StatusCode.OK: 

522 return None 

523 elif self._state.cancelled: 

524 raise grpc.FutureCancelledError() 

525 else: 

526 return self 

527 

528 def traceback(self, timeout=None): 

529 """Access the traceback of the exception raised by the computation. 

530 

531 This method will never block. Instead, it will raise an exception 

532 if calling this method would otherwise result in blocking. 

533 

534 Since this method will never block, any `timeout` argument passed will 

535 be ignored. 

536 """ 

537 del timeout 

538 with self._state.condition: 

539 if not self._is_complete(): 

540 raise grpc.experimental.UsageError( 

541 "_SingleThreadedRendezvous only supports traceback() when the RPC is complete." 

542 ) 

543 if self._state.code is grpc.StatusCode.OK: 

544 return None 

545 elif self._state.cancelled: 

546 raise grpc.FutureCancelledError() 

547 else: 

548 try: 

549 raise self 

550 except grpc.RpcError: 

551 return sys.exc_info()[2] 

552 

553 def add_done_callback(self, fn): 

554 with self._state.condition: 

555 if self._state.code is None: 

556 self._state.callbacks.append(functools.partial(fn, self)) 

557 return 

558 

559 fn(self) 

560 

561 def initial_metadata(self): 

562 """See grpc.Call.initial_metadata""" 

563 with self._state.condition: 

564 # NOTE(gnossen): Based on our initial call batch, we are guaranteed 

565 # to receive initial metadata before any messages. 

566 while self._state.initial_metadata is None: 

567 self._consume_next_event() 

568 return self._state.initial_metadata 

569 

570 def trailing_metadata(self): 

571 """See grpc.Call.trailing_metadata""" 

572 with self._state.condition: 

573 if self._state.trailing_metadata is None: 

574 raise grpc.experimental.UsageError( 

575 "Cannot get trailing metadata until RPC is completed.") 

576 return self._state.trailing_metadata 

577 

578 def code(self): 

579 """See grpc.Call.code""" 

580 with self._state.condition: 

581 if self._state.code is None: 

582 raise grpc.experimental.UsageError( 

583 "Cannot get code until RPC is completed.") 

584 return self._state.code 

585 

586 def details(self): 

587 """See grpc.Call.details""" 

588 with self._state.condition: 

589 if self._state.details is None: 

590 raise grpc.experimental.UsageError( 

591 "Cannot get details until RPC is completed.") 

592 return _common.decode(self._state.details) 

593 

594 def _consume_next_event(self): 

595 event = self._call.next_event() 

596 with self._state.condition: 

597 callbacks = _handle_event(event, self._state, 

598 self._response_deserializer) 

599 for callback in callbacks: 

600 # NOTE(gnossen): We intentionally allow exceptions to bubble up 

601 # to the user when running on a single thread. 

602 callback() 

603 return event 

604 

605 def _next_response(self): 

606 while True: 

607 self._consume_next_event() 

608 with self._state.condition: 

609 if self._state.response is not None: 

610 response = self._state.response 

611 self._state.response = None 

612 return response 

613 elif cygrpc.OperationType.receive_message not in self._state.due: 

614 if self._state.code is grpc.StatusCode.OK: 

615 raise StopIteration() 

616 elif self._state.code is not None: 

617 raise self 

618 

619 def _next(self): 

620 with self._state.condition: 

621 if self._state.code is None: 

622 # We tentatively add the operation as expected and remove 

623 # it if the enqueue operation fails. This allows us to guarantee that 

624 # if an event has been submitted to the core completion queue, 

625 # it is in `due`. If we waited until after a successful 

626 # enqueue operation then a signal could interrupt this 

627 # thread between the enqueue operation and the addition of the 

628 # operation to `due`. This would cause an exception on the 

629 # channel spin thread when the operation completes and no 

630 # corresponding operation would be present in state.due. 

631 # Note that, since `condition` is held through this block, there is 

632 # no data race on `due`. 

633 self._state.due.add(cygrpc.OperationType.receive_message) 

634 operating = self._call.operate( 

635 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None) 

636 if not operating: 

637 self._state.due.remove(cygrpc.OperationType.receive_message) 

638 elif self._state.code is grpc.StatusCode.OK: 

639 raise StopIteration() 

640 else: 

641 raise self 

642 return self._next_response() 

643 

644 def debug_error_string(self): 

645 with self._state.condition: 

646 if self._state.debug_error_string is None: 

647 raise grpc.experimental.UsageError( 

648 "Cannot get debug error string until RPC is completed.") 

649 return _common.decode(self._state.debug_error_string) 

650 

651 

652class _MultiThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors 

653 """An RPC iterator that depends on a channel spin thread. 

654 

655 This iterator relies upon a per-channel thread running in the background, 

656 dequeueing events from the completion queue, and notifying threads waiting 

657 on the threading.Condition object in the _RPCState object. 

658 

659 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface 

660 and to mediate a bidirection streaming RPC. 

661 """ 

662 

663 def initial_metadata(self): 

664 """See grpc.Call.initial_metadata""" 

665 with self._state.condition: 

666 

667 def _done(): 

668 return self._state.initial_metadata is not None 

669 

670 _common.wait(self._state.condition.wait, _done) 

671 return self._state.initial_metadata 

672 

673 def trailing_metadata(self): 

674 """See grpc.Call.trailing_metadata""" 

675 with self._state.condition: 

676 

677 def _done(): 

678 return self._state.trailing_metadata is not None 

679 

680 _common.wait(self._state.condition.wait, _done) 

681 return self._state.trailing_metadata 

682 

683 def code(self): 

684 """See grpc.Call.code""" 

685 with self._state.condition: 

686 

687 def _done(): 

688 return self._state.code is not None 

689 

690 _common.wait(self._state.condition.wait, _done) 

691 return self._state.code 

692 

693 def details(self): 

694 """See grpc.Call.details""" 

695 with self._state.condition: 

696 

697 def _done(): 

698 return self._state.details is not None 

699 

700 _common.wait(self._state.condition.wait, _done) 

701 return _common.decode(self._state.details) 

702 

703 def debug_error_string(self): 

704 with self._state.condition: 

705 

706 def _done(): 

707 return self._state.debug_error_string is not None 

708 

709 _common.wait(self._state.condition.wait, _done) 

710 return _common.decode(self._state.debug_error_string) 

711 

712 def cancelled(self): 

713 with self._state.condition: 

714 return self._state.cancelled 

715 

716 def running(self): 

717 with self._state.condition: 

718 return self._state.code is None 

719 

720 def done(self): 

721 with self._state.condition: 

722 return self._state.code is not None 

723 

724 def _is_complete(self): 

725 return self._state.code is not None 

726 

727 def result(self, timeout=None): 

728 """Returns the result of the computation or raises its exception. 

729 

730 See grpc.Future.result for the full API contract. 

731 """ 

732 with self._state.condition: 

733 timed_out = _common.wait(self._state.condition.wait, 

734 self._is_complete, 

735 timeout=timeout) 

736 if timed_out: 

737 raise grpc.FutureTimeoutError() 

738 else: 

739 if self._state.code is grpc.StatusCode.OK: 

740 return self._state.response 

741 elif self._state.cancelled: 

742 raise grpc.FutureCancelledError() 

743 else: 

744 raise self 

745 

746 def exception(self, timeout=None): 

747 """Return the exception raised by the computation. 

748 

749 See grpc.Future.exception for the full API contract. 

750 """ 

751 with self._state.condition: 

752 timed_out = _common.wait(self._state.condition.wait, 

753 self._is_complete, 

754 timeout=timeout) 

755 if timed_out: 

756 raise grpc.FutureTimeoutError() 

757 else: 

758 if self._state.code is grpc.StatusCode.OK: 

759 return None 

760 elif self._state.cancelled: 

761 raise grpc.FutureCancelledError() 

762 else: 

763 return self 

764 

765 def traceback(self, timeout=None): 

766 """Access the traceback of the exception raised by the computation. 

767 

768 See grpc.future.traceback for the full API contract. 

769 """ 

770 with self._state.condition: 

771 timed_out = _common.wait(self._state.condition.wait, 

772 self._is_complete, 

773 timeout=timeout) 

774 if timed_out: 

775 raise grpc.FutureTimeoutError() 

776 else: 

777 if self._state.code is grpc.StatusCode.OK: 

778 return None 

779 elif self._state.cancelled: 

780 raise grpc.FutureCancelledError() 

781 else: 

782 try: 

783 raise self 

784 except grpc.RpcError: 

785 return sys.exc_info()[2] 

786 

787 def add_done_callback(self, fn): 

788 with self._state.condition: 

789 if self._state.code is None: 

790 self._state.callbacks.append(functools.partial(fn, self)) 

791 return 

792 

793 fn(self) 

794 

795 def _next(self): 

796 with self._state.condition: 

797 if self._state.code is None: 

798 event_handler = _event_handler(self._state, 

799 self._response_deserializer) 

800 self._state.due.add(cygrpc.OperationType.receive_message) 

801 operating = self._call.operate( 

802 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 

803 event_handler) 

804 if not operating: 

805 self._state.due.remove(cygrpc.OperationType.receive_message) 

806 elif self._state.code is grpc.StatusCode.OK: 

807 raise StopIteration() 

808 else: 

809 raise self 

810 

811 def _response_ready(): 

812 return (self._state.response is not None or 

813 (cygrpc.OperationType.receive_message 

814 not in self._state.due and 

815 self._state.code is not None)) 

816 

817 _common.wait(self._state.condition.wait, _response_ready) 

818 if self._state.response is not None: 

819 response = self._state.response 

820 self._state.response = None 

821 return response 

822 elif cygrpc.OperationType.receive_message not in self._state.due: 

823 if self._state.code is grpc.StatusCode.OK: 

824 raise StopIteration() 

825 elif self._state.code is not None: 

826 raise self 

827 

828 

829def _start_unary_request(request, timeout, request_serializer): 

830 deadline = _deadline(timeout) 

831 serialized_request = _common.serialize(request, request_serializer) 

832 if serialized_request is None: 

833 state = _RPCState((), (), (), grpc.StatusCode.INTERNAL, 

834 'Exception serializing request!') 

835 error = _InactiveRpcError(state) 

836 return deadline, None, error 

837 else: 

838 return deadline, serialized_request, None 

839 

840 

841def _end_unary_response_blocking(state, call, with_call, deadline): 

842 if state.code is grpc.StatusCode.OK: 

843 if with_call: 

844 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline) 

845 return state.response, rendezvous 

846 else: 

847 return state.response 

848 else: 

849 raise _InactiveRpcError(state) 

850 

851 

852def _stream_unary_invocation_operationses(metadata, initial_metadata_flags): 

853 return ( 

854 ( 

855 cygrpc.SendInitialMetadataOperation(metadata, 

856 initial_metadata_flags), 

857 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

858 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

859 ), 

860 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

861 ) 

862 

863 

864def _stream_unary_invocation_operationses_and_tags(metadata, 

865 initial_metadata_flags): 

866 return tuple(( 

867 operations, 

868 None, 

869 ) for operations in _stream_unary_invocation_operationses( 

870 metadata, initial_metadata_flags)) 

871 

872 

873def _determine_deadline(user_deadline): 

874 parent_deadline = cygrpc.get_deadline_from_context() 

875 if parent_deadline is None and user_deadline is None: 

876 return None 

877 elif parent_deadline is not None and user_deadline is None: 

878 return parent_deadline 

879 elif user_deadline is not None and parent_deadline is None: 

880 return user_deadline 

881 else: 

882 return min(parent_deadline, user_deadline) 

883 

884 

885class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): 

886 

887 # pylint: disable=too-many-arguments 

888 def __init__(self, channel, managed_call, method, request_serializer, 

889 response_deserializer): 

890 self._channel = channel 

891 self._managed_call = managed_call 

892 self._method = method 

893 self._request_serializer = request_serializer 

894 self._response_deserializer = response_deserializer 

895 self._context = cygrpc.build_census_context() 

896 

897 def _prepare(self, request, timeout, metadata, wait_for_ready, compression): 

898 deadline, serialized_request, rendezvous = _start_unary_request( 

899 request, timeout, self._request_serializer) 

900 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

901 wait_for_ready) 

902 augmented_metadata = _compression.augment_metadata( 

903 metadata, compression) 

904 if serialized_request is None: 

905 return None, None, None, rendezvous 

906 else: 

907 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) 

908 operations = ( 

909 cygrpc.SendInitialMetadataOperation(augmented_metadata, 

910 initial_metadata_flags), 

911 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

912 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

913 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), 

914 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 

915 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

916 ) 

917 return state, operations, deadline, None 

918 

919 def _blocking(self, request, timeout, metadata, credentials, wait_for_ready, 

920 compression): 

921 state, operations, deadline, rendezvous = self._prepare( 

922 request, timeout, metadata, wait_for_ready, compression) 

923 if state is None: 

924 raise rendezvous # pylint: disable-msg=raising-bad-type 

925 else: 

926 call = self._channel.segregated_call( 

927 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

928 self._method, None, _determine_deadline(deadline), metadata, 

929 None if credentials is None else credentials._credentials, (( 

930 operations, 

931 None, 

932 ),), self._context) 

933 event = call.next_event() 

934 _handle_event(event, state, self._response_deserializer) 

935 return state, call 

936 

937 def __call__(self, 

938 request, 

939 timeout=None, 

940 metadata=None, 

941 credentials=None, 

942 wait_for_ready=None, 

943 compression=None): 

944 state, call, = self._blocking(request, timeout, metadata, credentials, 

945 wait_for_ready, compression) 

946 return _end_unary_response_blocking(state, call, False, None) 

947 

948 def with_call(self, 

949 request, 

950 timeout=None, 

951 metadata=None, 

952 credentials=None, 

953 wait_for_ready=None, 

954 compression=None): 

955 state, call, = self._blocking(request, timeout, metadata, credentials, 

956 wait_for_ready, compression) 

957 return _end_unary_response_blocking(state, call, True, None) 

958 

959 def future(self, 

960 request, 

961 timeout=None, 

962 metadata=None, 

963 credentials=None, 

964 wait_for_ready=None, 

965 compression=None): 

966 state, operations, deadline, rendezvous = self._prepare( 

967 request, timeout, metadata, wait_for_ready, compression) 

968 if state is None: 

969 raise rendezvous # pylint: disable-msg=raising-bad-type 

970 else: 

971 event_handler = _event_handler(state, self._response_deserializer) 

972 call = self._managed_call( 

973 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

974 self._method, None, deadline, metadata, 

975 None if credentials is None else credentials._credentials, 

976 (operations,), event_handler, self._context) 

977 return _MultiThreadedRendezvous(state, call, 

978 self._response_deserializer, 

979 deadline) 

980 

981 

982class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

983 

984 # pylint: disable=too-many-arguments 

985 def __init__(self, channel, method, request_serializer, 

986 response_deserializer): 

987 self._channel = channel 

988 self._method = method 

989 self._request_serializer = request_serializer 

990 self._response_deserializer = response_deserializer 

991 self._context = cygrpc.build_census_context() 

992 

993 def __call__( # pylint: disable=too-many-locals 

994 self, 

995 request, 

996 timeout=None, 

997 metadata=None, 

998 credentials=None, 

999 wait_for_ready=None, 

1000 compression=None): 

1001 deadline = _deadline(timeout) 

1002 serialized_request = _common.serialize(request, 

1003 self._request_serializer) 

1004 if serialized_request is None: 

1005 state = _RPCState((), (), (), grpc.StatusCode.INTERNAL, 

1006 'Exception serializing request!') 

1007 raise _InactiveRpcError(state) 

1008 

1009 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1010 call_credentials = None if credentials is None else credentials._credentials 

1011 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1012 wait_for_ready) 

1013 augmented_metadata = _compression.augment_metadata( 

1014 metadata, compression) 

1015 operations = ( 

1016 (cygrpc.SendInitialMetadataOperation(augmented_metadata, 

1017 initial_metadata_flags), 

1018 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 

1019 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS)), 

1020 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),), 

1021 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1022 ) 

1023 operations_and_tags = tuple((ops, None) for ops in operations) 

1024 call = self._channel.segregated_call( 

1025 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, 

1026 None, _determine_deadline(deadline), metadata, call_credentials, 

1027 operations_and_tags, self._context) 

1028 return _SingleThreadedRendezvous(state, call, 

1029 self._response_deserializer, deadline) 

1030 

1031 

1032class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

1033 

1034 # pylint: disable=too-many-arguments 

1035 def __init__(self, channel, managed_call, method, request_serializer, 

1036 response_deserializer): 

1037 self._channel = channel 

1038 self._managed_call = managed_call 

1039 self._method = method 

1040 self._request_serializer = request_serializer 

1041 self._response_deserializer = response_deserializer 

1042 self._context = cygrpc.build_census_context() 

1043 

1044 def __call__( # pylint: disable=too-many-locals 

1045 self, 

1046 request, 

1047 timeout=None, 

1048 metadata=None, 

1049 credentials=None, 

1050 wait_for_ready=None, 

1051 compression=None): 

1052 deadline, serialized_request, rendezvous = _start_unary_request( 

1053 request, timeout, self._request_serializer) 

1054 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1055 wait_for_ready) 

1056 if serialized_request is None: 

1057 raise rendezvous # pylint: disable-msg=raising-bad-type 

1058 else: 

1059 augmented_metadata = _compression.augment_metadata( 

1060 metadata, compression) 

1061 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 

1062 operationses = ( 

1063 ( 

1064 cygrpc.SendInitialMetadataOperation(augmented_metadata, 

1065 initial_metadata_flags), 

1066 cygrpc.SendMessageOperation(serialized_request, 

1067 _EMPTY_FLAGS), 

1068 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 

1069 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1070 ), 

1071 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1072 ) 

1073 call = self._managed_call( 

1074 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 

1075 self._method, None, _determine_deadline(deadline), metadata, 

1076 None if credentials is None else credentials._credentials, 

1077 operationses, _event_handler(state, 

1078 self._response_deserializer), 

1079 self._context) 

1080 return _MultiThreadedRendezvous(state, call, 

1081 self._response_deserializer, 

1082 deadline) 

1083 

1084 

1085class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): 

1086 

1087 # pylint: disable=too-many-arguments 

1088 def __init__(self, channel, managed_call, method, request_serializer, 

1089 response_deserializer): 

1090 self._channel = channel 

1091 self._managed_call = managed_call 

1092 self._method = method 

1093 self._request_serializer = request_serializer 

1094 self._response_deserializer = response_deserializer 

1095 self._context = cygrpc.build_census_context() 

1096 

1097 def _blocking(self, request_iterator, timeout, metadata, credentials, 

1098 wait_for_ready, compression): 

1099 deadline = _deadline(timeout) 

1100 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1101 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1102 wait_for_ready) 

1103 augmented_metadata = _compression.augment_metadata( 

1104 metadata, compression) 

1105 call = self._channel.segregated_call( 

1106 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, 

1107 None, _determine_deadline(deadline), augmented_metadata, 

1108 None if credentials is None else credentials._credentials, 

1109 _stream_unary_invocation_operationses_and_tags( 

1110 augmented_metadata, initial_metadata_flags), self._context) 

1111 _consume_request_iterator(request_iterator, state, call, 

1112 self._request_serializer, None) 

1113 while True: 

1114 event = call.next_event() 

1115 with state.condition: 

1116 _handle_event(event, state, self._response_deserializer) 

1117 state.condition.notify_all() 

1118 if not state.due: 

1119 break 

1120 return state, call 

1121 

1122 def __call__(self, 

1123 request_iterator, 

1124 timeout=None, 

1125 metadata=None, 

1126 credentials=None, 

1127 wait_for_ready=None, 

1128 compression=None): 

1129 state, call, = self._blocking(request_iterator, timeout, metadata, 

1130 credentials, wait_for_ready, compression) 

1131 return _end_unary_response_blocking(state, call, False, None) 

1132 

1133 def with_call(self, 

1134 request_iterator, 

1135 timeout=None, 

1136 metadata=None, 

1137 credentials=None, 

1138 wait_for_ready=None, 

1139 compression=None): 

1140 state, call, = self._blocking(request_iterator, timeout, metadata, 

1141 credentials, wait_for_ready, compression) 

1142 return _end_unary_response_blocking(state, call, True, None) 

1143 

1144 def future(self, 

1145 request_iterator, 

1146 timeout=None, 

1147 metadata=None, 

1148 credentials=None, 

1149 wait_for_ready=None, 

1150 compression=None): 

1151 deadline = _deadline(timeout) 

1152 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 

1153 event_handler = _event_handler(state, self._response_deserializer) 

1154 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1155 wait_for_ready) 

1156 augmented_metadata = _compression.augment_metadata( 

1157 metadata, compression) 

1158 call = self._managed_call( 

1159 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, 

1160 None, deadline, augmented_metadata, 

1161 None if credentials is None else credentials._credentials, 

1162 _stream_unary_invocation_operationses(metadata, 

1163 initial_metadata_flags), 

1164 event_handler, self._context) 

1165 _consume_request_iterator(request_iterator, state, call, 

1166 self._request_serializer, event_handler) 

1167 return _MultiThreadedRendezvous(state, call, 

1168 self._response_deserializer, deadline) 

1169 

1170 

1171class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): 

1172 

1173 # pylint: disable=too-many-arguments 

1174 def __init__(self, channel, managed_call, method, request_serializer, 

1175 response_deserializer): 

1176 self._channel = channel 

1177 self._managed_call = managed_call 

1178 self._method = method 

1179 self._request_serializer = request_serializer 

1180 self._response_deserializer = response_deserializer 

1181 self._context = cygrpc.build_census_context() 

1182 

1183 def __call__(self, 

1184 request_iterator, 

1185 timeout=None, 

1186 metadata=None, 

1187 credentials=None, 

1188 wait_for_ready=None, 

1189 compression=None): 

1190 deadline = _deadline(timeout) 

1191 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) 

1192 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 

1193 wait_for_ready) 

1194 augmented_metadata = _compression.augment_metadata( 

1195 metadata, compression) 

1196 operationses = ( 

1197 ( 

1198 cygrpc.SendInitialMetadataOperation(augmented_metadata, 

1199 initial_metadata_flags), 

1200 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 

1201 ), 

1202 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 

1203 ) 

1204 event_handler = _event_handler(state, self._response_deserializer) 

1205 call = self._managed_call( 

1206 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, 

1207 None, _determine_deadline(deadline), augmented_metadata, 

1208 None if credentials is None else credentials._credentials, 

1209 operationses, event_handler, self._context) 

1210 _consume_request_iterator(request_iterator, state, call, 

1211 self._request_serializer, event_handler) 

1212 return _MultiThreadedRendezvous(state, call, 

1213 self._response_deserializer, deadline) 

1214 

1215 

1216class _InitialMetadataFlags(int): 

1217 """Stores immutable initial metadata flags""" 

1218 

1219 def __new__(cls, value=_EMPTY_FLAGS): 

1220 value &= cygrpc.InitialMetadataFlags.used_mask 

1221 return super(_InitialMetadataFlags, cls).__new__(cls, value) 

1222 

1223 def with_wait_for_ready(self, wait_for_ready): 

1224 if wait_for_ready is not None: 

1225 if wait_for_ready: 

1226 return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \ 

1227 cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) 

1228 elif not wait_for_ready: 

1229 return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \ 

1230 cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) 

1231 return self 

1232 

1233 

1234class _ChannelCallState(object): 

1235 

1236 def __init__(self, channel): 

1237 self.lock = threading.Lock() 

1238 self.channel = channel 

1239 self.managed_calls = 0 

1240 self.threading = False 

1241 

1242 def reset_postfork_child(self): 

1243 self.managed_calls = 0 

1244 

1245 def __del__(self): 

1246 try: 

1247 self.channel.close(cygrpc.StatusCode.cancelled, 

1248 'Channel deallocated!') 

1249 except (TypeError, AttributeError): 

1250 pass 

1251 

1252 

1253def _run_channel_spin_thread(state): 

1254 

1255 def channel_spin(): 

1256 while True: 

1257 cygrpc.block_if_fork_in_progress(state) 

1258 event = state.channel.next_call_event() 

1259 if event.completion_type == cygrpc.CompletionType.queue_timeout: 

1260 continue 

1261 call_completed = event.tag(event) 

1262 if call_completed: 

1263 with state.lock: 

1264 state.managed_calls -= 1 

1265 if state.managed_calls == 0: 

1266 return 

1267 

1268 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin) 

1269 channel_spin_thread.setDaemon(True) 

1270 channel_spin_thread.start() 

1271 

1272 

1273def _channel_managed_call_management(state): 

1274 

1275 # pylint: disable=too-many-arguments 

1276 def create(flags, method, host, deadline, metadata, credentials, 

1277 operationses, event_handler, context): 

1278 """Creates a cygrpc.IntegratedCall. 

1279 

1280 Args: 

1281 flags: An integer bitfield of call flags. 

1282 method: The RPC method. 

1283 host: A host string for the created call. 

1284 deadline: A float to be the deadline of the created call or None if 

1285 the call is to have an infinite deadline. 

1286 metadata: The metadata for the call or None. 

1287 credentials: A cygrpc.CallCredentials or None. 

1288 operationses: An iterable of iterables of cygrpc.Operations to be 

1289 started on the call. 

1290 event_handler: A behavior to call to handle the events resultant from 

1291 the operations on the call. 

1292 context: Context object for distributed tracing. 

1293 Returns: 

1294 A cygrpc.IntegratedCall with which to conduct an RPC. 

1295 """ 

1296 operationses_and_tags = tuple(( 

1297 operations, 

1298 event_handler, 

1299 ) for operations in operationses) 

1300 with state.lock: 

1301 call = state.channel.integrated_call(flags, method, host, deadline, 

1302 metadata, credentials, 

1303 operationses_and_tags, context) 

1304 if state.managed_calls == 0: 

1305 state.managed_calls = 1 

1306 _run_channel_spin_thread(state) 

1307 else: 

1308 state.managed_calls += 1 

1309 return call 

1310 

1311 return create 

1312 

1313 

1314class _ChannelConnectivityState(object): 

1315 

1316 def __init__(self, channel): 

1317 self.lock = threading.RLock() 

1318 self.channel = channel 

1319 self.polling = False 

1320 self.connectivity = None 

1321 self.try_to_connect = False 

1322 self.callbacks_and_connectivities = [] 

1323 self.delivering = False 

1324 

1325 def reset_postfork_child(self): 

1326 self.polling = False 

1327 self.connectivity = None 

1328 self.try_to_connect = False 

1329 self.callbacks_and_connectivities = [] 

1330 self.delivering = False 

1331 

1332 

1333def _deliveries(state): 

1334 callbacks_needing_update = [] 

1335 for callback_and_connectivity in state.callbacks_and_connectivities: 

1336 callback, callback_connectivity, = callback_and_connectivity 

1337 if callback_connectivity is not state.connectivity: 

1338 callbacks_needing_update.append(callback) 

1339 callback_and_connectivity[1] = state.connectivity 

1340 return callbacks_needing_update 

1341 

1342 

1343def _deliver(state, initial_connectivity, initial_callbacks): 

1344 connectivity = initial_connectivity 

1345 callbacks = initial_callbacks 

1346 while True: 

1347 for callback in callbacks: 

1348 cygrpc.block_if_fork_in_progress(state) 

1349 try: 

1350 callback(connectivity) 

1351 except Exception: # pylint: disable=broad-except 

1352 _LOGGER.exception( 

1353 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE) 

1354 with state.lock: 

1355 callbacks = _deliveries(state) 

1356 if callbacks: 

1357 connectivity = state.connectivity 

1358 else: 

1359 state.delivering = False 

1360 return 

1361 

1362 

1363def _spawn_delivery(state, callbacks): 

1364 delivering_thread = cygrpc.ForkManagedThread(target=_deliver, 

1365 args=( 

1366 state, 

1367 state.connectivity, 

1368 callbacks, 

1369 )) 

1370 delivering_thread.setDaemon(True) 

1371 delivering_thread.start() 

1372 state.delivering = True 

1373 

1374 

1375# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll. 

1376def _poll_connectivity(state, channel, initial_try_to_connect): 

1377 try_to_connect = initial_try_to_connect 

1378 connectivity = channel.check_connectivity_state(try_to_connect) 

1379 with state.lock: 

1380 state.connectivity = ( 

1381 _common. 

1382 CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[connectivity]) 

1383 callbacks = tuple( 

1384 callback for callback, unused_but_known_to_be_none_connectivity in 

1385 state.callbacks_and_connectivities) 

1386 for callback_and_connectivity in state.callbacks_and_connectivities: 

1387 callback_and_connectivity[1] = state.connectivity 

1388 if callbacks: 

1389 _spawn_delivery(state, callbacks) 

1390 while True: 

1391 event = channel.watch_connectivity_state(connectivity, 

1392 time.time() + 0.2) 

1393 cygrpc.block_if_fork_in_progress(state) 

1394 with state.lock: 

1395 if not state.callbacks_and_connectivities and not state.try_to_connect: 

1396 state.polling = False 

1397 state.connectivity = None 

1398 break 

1399 try_to_connect = state.try_to_connect 

1400 state.try_to_connect = False 

1401 if event.success or try_to_connect: 

1402 connectivity = channel.check_connectivity_state(try_to_connect) 

1403 with state.lock: 

1404 state.connectivity = ( 

1405 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 

1406 connectivity]) 

1407 if not state.delivering: 

1408 callbacks = _deliveries(state) 

1409 if callbacks: 

1410 _spawn_delivery(state, callbacks) 

1411 

1412 

1413def _subscribe(state, callback, try_to_connect): 

1414 with state.lock: 

1415 if not state.callbacks_and_connectivities and not state.polling: 

1416 polling_thread = cygrpc.ForkManagedThread( 

1417 target=_poll_connectivity, 

1418 args=(state, state.channel, bool(try_to_connect))) 

1419 polling_thread.setDaemon(True) 

1420 polling_thread.start() 

1421 state.polling = True 

1422 state.callbacks_and_connectivities.append([callback, None]) 

1423 elif not state.delivering and state.connectivity is not None: 

1424 _spawn_delivery(state, (callback,)) 

1425 state.try_to_connect |= bool(try_to_connect) 

1426 state.callbacks_and_connectivities.append( 

1427 [callback, state.connectivity]) 

1428 else: 

1429 state.try_to_connect |= bool(try_to_connect) 

1430 state.callbacks_and_connectivities.append([callback, None]) 

1431 

1432 

1433def _unsubscribe(state, callback): 

1434 with state.lock: 

1435 for index, (subscribed_callback, unused_connectivity) in enumerate( 

1436 state.callbacks_and_connectivities): 

1437 if callback == subscribed_callback: 

1438 state.callbacks_and_connectivities.pop(index) 

1439 break 

1440 

1441 

1442def _augment_options(base_options, compression): 

1443 compression_option = _compression.create_channel_option(compression) 

1444 return tuple(base_options) + compression_option + (( 

1445 cygrpc.ChannelArgKey.primary_user_agent_string, 

1446 _USER_AGENT, 

1447 ),) 

1448 

1449 

1450def _separate_channel_options(options): 

1451 """Separates core channel options from Python channel options.""" 

1452 core_options = [] 

1453 python_options = [] 

1454 for pair in options: 

1455 if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream: 

1456 python_options.append(pair) 

1457 else: 

1458 core_options.append(pair) 

1459 return python_options, core_options 

1460 

1461 

1462class Channel(grpc.Channel): 

1463 """A cygrpc.Channel-backed implementation of grpc.Channel.""" 

1464 

1465 def __init__(self, target, options, credentials, compression): 

1466 """Constructor. 

1467 

1468 Args: 

1469 target: The target to which to connect. 

1470 options: Configuration options for the channel. 

1471 credentials: A cygrpc.ChannelCredentials or None. 

1472 compression: An optional value indicating the compression method to be 

1473 used over the lifetime of the channel. 

1474 """ 

1475 python_options, core_options = _separate_channel_options(options) 

1476 self._single_threaded_unary_stream = _DEFAULT_SINGLE_THREADED_UNARY_STREAM 

1477 self._process_python_options(python_options) 

1478 self._channel = cygrpc.Channel( 

1479 _common.encode(target), _augment_options(core_options, compression), 

1480 credentials) 

1481 self._call_state = _ChannelCallState(self._channel) 

1482 self._connectivity_state = _ChannelConnectivityState(self._channel) 

1483 cygrpc.fork_register_channel(self) 

1484 if cygrpc.g_gevent_activated: 

1485 cygrpc.gevent_increment_channel_count() 

1486 

1487 def _process_python_options(self, python_options): 

1488 """Sets channel attributes according to python-only channel options.""" 

1489 for pair in python_options: 

1490 if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream: 

1491 self._single_threaded_unary_stream = True 

1492 

1493 def subscribe(self, callback, try_to_connect=None): 

1494 _subscribe(self._connectivity_state, callback, try_to_connect) 

1495 

1496 def unsubscribe(self, callback): 

1497 _unsubscribe(self._connectivity_state, callback) 

1498 

1499 def unary_unary(self, 

1500 method, 

1501 request_serializer=None, 

1502 response_deserializer=None): 

1503 return _UnaryUnaryMultiCallable( 

1504 self._channel, _channel_managed_call_management(self._call_state), 

1505 _common.encode(method), request_serializer, response_deserializer) 

1506 

1507 def unary_stream(self, 

1508 method, 

1509 request_serializer=None, 

1510 response_deserializer=None): 

1511 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC 

1512 # on a single Python thread results in an appreciable speed-up. However, 

1513 # due to slight differences in capability, the multi-threaded variant 

1514 # remains the default. 

1515 if self._single_threaded_unary_stream: 

1516 return _SingleThreadedUnaryStreamMultiCallable( 

1517 self._channel, _common.encode(method), request_serializer, 

1518 response_deserializer) 

1519 else: 

1520 return _UnaryStreamMultiCallable( 

1521 self._channel, 

1522 _channel_managed_call_management(self._call_state), 

1523 _common.encode(method), request_serializer, 

1524 response_deserializer) 

1525 

1526 def stream_unary(self, 

1527 method, 

1528 request_serializer=None, 

1529 response_deserializer=None): 

1530 return _StreamUnaryMultiCallable( 

1531 self._channel, _channel_managed_call_management(self._call_state), 

1532 _common.encode(method), request_serializer, response_deserializer) 

1533 

1534 def stream_stream(self, 

1535 method, 

1536 request_serializer=None, 

1537 response_deserializer=None): 

1538 return _StreamStreamMultiCallable( 

1539 self._channel, _channel_managed_call_management(self._call_state), 

1540 _common.encode(method), request_serializer, response_deserializer) 

1541 

1542 def _unsubscribe_all(self): 

1543 state = self._connectivity_state 

1544 if state: 

1545 with state.lock: 

1546 del state.callbacks_and_connectivities[:] 

1547 

1548 def _close(self): 

1549 self._unsubscribe_all() 

1550 self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!') 

1551 cygrpc.fork_unregister_channel(self) 

1552 if cygrpc.g_gevent_activated: 

1553 cygrpc.gevent_decrement_channel_count() 

1554 

1555 def _close_on_fork(self): 

1556 self._unsubscribe_all() 

1557 self._channel.close_on_fork(cygrpc.StatusCode.cancelled, 

1558 'Channel closed due to fork') 

1559 

1560 def __enter__(self): 

1561 return self 

1562 

1563 def __exit__(self, exc_type, exc_val, exc_tb): 

1564 self._close() 

1565 return False 

1566 

1567 def close(self): 

1568 self._close() 

1569 

1570 def __del__(self): 

1571 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases 

1572 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call 

1573 # here (or more likely, call self._close() here). We don't do this today 

1574 # because many valid use cases today allow the channel to be deleted 

1575 # immediately after stubs are created. After a sufficient period of time 

1576 # has passed for all users to be trusted to freeze out to their channels 

1577 # for as long as they are in use and to close them after using them, 

1578 # then deletion of this grpc._channel.Channel instance can be made to 

1579 # effect closure of the underlying cygrpc.Channel instance. 

1580 try: 

1581 self._unsubscribe_all() 

1582 except: # pylint: disable=bare-except 

1583 # Exceptions in __del__ are ignored by Python anyway, but they can 

1584 # keep spamming logs. Just silence them. 

1585 pass