Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_interceptor.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

285 statements  

1# Copyright 2017 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Implementation of gRPC Python interceptors.""" 

15 

16import collections 

17import sys 

18import types 

19from typing import Any, Callable, Optional, Sequence, Tuple, Union 

20 

21import grpc 

22 

23from ._typing import DeserializingFunction 

24from ._typing import DoneCallbackType 

25from ._typing import MetadataType 

26from ._typing import RequestIterableType 

27from ._typing import SerializingFunction 

28 

29 

30class _ServicePipeline(object): 

31 interceptors: Tuple[grpc.ServerInterceptor] 

32 

33 def __init__(self, interceptors: Sequence[grpc.ServerInterceptor]): 

34 self.interceptors = tuple(interceptors) 

35 

36 def _continuation(self, thunk: Callable, index: int) -> Callable: 

37 return lambda context: self._intercept_at(thunk, index, context) 

38 

39 def _intercept_at( 

40 self, thunk: Callable, index: int, context: grpc.HandlerCallDetails 

41 ) -> grpc.RpcMethodHandler: 

42 if index < len(self.interceptors): 

43 interceptor = self.interceptors[index] 

44 thunk = self._continuation(thunk, index + 1) 

45 return interceptor.intercept_service(thunk, context) 

46 return thunk(context) 

47 

48 def execute( 

49 self, thunk: Callable, context: grpc.HandlerCallDetails 

50 ) -> grpc.RpcMethodHandler: 

51 return self._intercept_at(thunk, 0, context) 

52 

53 

54def service_pipeline( 

55 interceptors: Optional[Sequence[grpc.ServerInterceptor]], 

56) -> Optional[_ServicePipeline]: 

57 return _ServicePipeline(interceptors) if interceptors else None 

58 

59 

60class _ClientCallDetails( 

61 collections.namedtuple( 

62 "_ClientCallDetails", 

63 ( 

64 "method", 

65 "timeout", 

66 "metadata", 

67 "credentials", 

68 "wait_for_ready", 

69 "compression", 

70 ), 

71 ), 

72 grpc.ClientCallDetails, 

73): 

74 pass 

75 

76 

77def _unwrap_client_call_details( 

78 call_details: grpc.ClientCallDetails, 

79 default_details: grpc.ClientCallDetails, 

80) -> Tuple[ 

81 str, float, MetadataType, grpc.CallCredentials, bool, grpc.Compression 

82]: 

83 try: 

84 method = call_details.method # pytype: disable=attribute-error 

85 except AttributeError: 

86 method = default_details.method # pytype: disable=attribute-error 

87 

88 try: 

89 timeout = call_details.timeout # pytype: disable=attribute-error 

90 except AttributeError: 

91 timeout = default_details.timeout # pytype: disable=attribute-error 

92 

93 try: 

94 metadata = call_details.metadata # pytype: disable=attribute-error 

95 except AttributeError: 

96 metadata = default_details.metadata # pytype: disable=attribute-error 

97 

98 try: 

99 credentials = ( 

100 call_details.credentials 

101 ) # pytype: disable=attribute-error 

102 except AttributeError: 

103 credentials = ( 

104 default_details.credentials 

105 ) # pytype: disable=attribute-error 

106 

107 try: 

108 wait_for_ready = ( 

109 call_details.wait_for_ready 

110 ) # pytype: disable=attribute-error 

111 except AttributeError: 

112 wait_for_ready = ( 

113 default_details.wait_for_ready 

114 ) # pytype: disable=attribute-error 

115 

116 try: 

117 compression = ( 

118 call_details.compression 

119 ) # pytype: disable=attribute-error 

120 except AttributeError: 

121 compression = ( 

122 default_details.compression 

123 ) # pytype: disable=attribute-error 

124 

125 return method, timeout, metadata, credentials, wait_for_ready, compression 

126 

127 

128class _FailureOutcome( 

129 grpc.RpcError, grpc.Future, grpc.Call 

130): # pylint: disable=too-many-ancestors 

131 _exception: Exception 

132 _traceback: types.TracebackType 

133 

134 def __init__(self, exception: Exception, traceback: types.TracebackType): 

135 super(_FailureOutcome, self).__init__() 

136 self._exception = exception 

137 self._traceback = traceback 

138 

139 def initial_metadata(self) -> Optional[MetadataType]: 

140 return None 

141 

142 def trailing_metadata(self) -> Optional[MetadataType]: 

143 return None 

144 

145 def code(self) -> Optional[grpc.StatusCode]: 

146 return grpc.StatusCode.INTERNAL 

147 

148 def details(self) -> Optional[str]: 

149 return "Exception raised while intercepting the RPC" 

150 

151 def cancel(self) -> bool: 

152 return False 

153 

154 def cancelled(self) -> bool: 

155 return False 

156 

157 def is_active(self) -> bool: 

158 return False 

159 

160 def time_remaining(self) -> Optional[float]: 

161 return None 

162 

163 def running(self) -> bool: 

164 return False 

165 

166 def done(self) -> bool: 

167 return True 

168 

169 def result(self, ignored_timeout: Optional[float] = None): 

170 raise self._exception 

171 

172 def exception( 

173 self, ignored_timeout: Optional[float] = None 

174 ) -> Optional[Exception]: 

175 return self._exception 

176 

177 def traceback( 

178 self, ignored_timeout: Optional[float] = None 

179 ) -> Optional[types.TracebackType]: 

180 return self._traceback 

181 

182 def add_callback(self, unused_callback) -> bool: 

183 return False 

184 

185 def add_done_callback(self, fn: DoneCallbackType) -> None: 

186 fn(self) 

187 

188 def __iter__(self): 

189 return self 

190 

191 def __next__(self): 

192 raise self._exception 

193 

194 def next(self): 

195 return self.__next__() 

196 

197 

198class _UnaryOutcome(grpc.Call, grpc.Future): 

199 _response: Any 

200 _call: grpc.Call 

201 

202 def __init__(self, response: Any, call: grpc.Call): 

203 self._response = response 

204 self._call = call 

205 

206 def initial_metadata(self) -> Optional[MetadataType]: 

207 return self._call.initial_metadata() 

208 

209 def trailing_metadata(self) -> Optional[MetadataType]: 

210 return self._call.trailing_metadata() 

211 

212 def code(self) -> Optional[grpc.StatusCode]: 

213 return self._call.code() 

214 

215 def details(self) -> Optional[str]: 

216 return self._call.details() 

217 

218 def is_active(self) -> bool: 

219 return self._call.is_active() 

220 

221 def time_remaining(self) -> Optional[float]: 

222 return self._call.time_remaining() 

223 

224 def cancel(self) -> bool: 

225 return self._call.cancel() 

226 

227 def add_callback(self, callback) -> bool: 

228 return self._call.add_callback(callback) 

229 

230 def cancelled(self) -> bool: 

231 return False 

232 

233 def running(self) -> bool: 

234 return False 

235 

236 def done(self) -> bool: 

237 return True 

238 

239 def result(self, ignored_timeout: Optional[float] = None): 

240 return self._response 

241 

242 def exception(self, ignored_timeout: Optional[float] = None): 

243 return None 

244 

245 def traceback(self, ignored_timeout: Optional[float] = None): 

246 return None 

247 

248 def add_done_callback(self, fn: DoneCallbackType) -> None: 

249 fn(self) 

250 

251 

252class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): 

253 _thunk: Callable 

254 _method: str 

255 _interceptor: grpc.UnaryUnaryClientInterceptor 

256 

257 def __init__( 

258 self, 

259 thunk: Callable, 

260 method: str, 

261 interceptor: grpc.UnaryUnaryClientInterceptor, 

262 ): 

263 self._thunk = thunk 

264 self._method = method 

265 self._interceptor = interceptor 

266 

267 def __call__( 

268 self, 

269 request: Any, 

270 timeout: Optional[float] = None, 

271 metadata: Optional[MetadataType] = None, 

272 credentials: Optional[grpc.CallCredentials] = None, 

273 wait_for_ready: Optional[bool] = None, 

274 compression: Optional[grpc.Compression] = None, 

275 ) -> Any: 

276 response, ignored_call = self._with_call( 

277 request, 

278 timeout=timeout, 

279 metadata=metadata, 

280 credentials=credentials, 

281 wait_for_ready=wait_for_ready, 

282 compression=compression, 

283 ) 

284 return response 

285 

286 def _with_call( 

287 self, 

288 request: Any, 

289 timeout: Optional[float] = None, 

290 metadata: Optional[MetadataType] = None, 

291 credentials: Optional[grpc.CallCredentials] = None, 

292 wait_for_ready: Optional[bool] = None, 

293 compression: Optional[grpc.Compression] = None, 

294 ) -> Tuple[Any, grpc.Call]: 

295 client_call_details = _ClientCallDetails( 

296 self._method, 

297 timeout, 

298 metadata, 

299 credentials, 

300 wait_for_ready, 

301 compression, 

302 ) 

303 

304 def continuation(new_details, request): 

305 ( 

306 new_method, 

307 new_timeout, 

308 new_metadata, 

309 new_credentials, 

310 new_wait_for_ready, 

311 new_compression, 

312 ) = _unwrap_client_call_details(new_details, client_call_details) 

313 try: 

314 response, call = self._thunk(new_method).with_call( 

315 request, 

316 timeout=new_timeout, 

317 metadata=new_metadata, 

318 credentials=new_credentials, 

319 wait_for_ready=new_wait_for_ready, 

320 compression=new_compression, 

321 ) 

322 return _UnaryOutcome(response, call) 

323 except grpc.RpcError as rpc_error: 

324 return rpc_error 

325 except Exception as exception: # pylint:disable=broad-except 

326 return _FailureOutcome(exception, sys.exc_info()[2]) 

327 

328 call = self._interceptor.intercept_unary_unary( 

329 continuation, client_call_details, request 

330 ) 

331 return call.result(), call 

332 

333 def with_call( 

334 self, 

335 request: Any, 

336 timeout: Optional[float] = None, 

337 metadata: Optional[MetadataType] = None, 

338 credentials: Optional[grpc.CallCredentials] = None, 

339 wait_for_ready: Optional[bool] = None, 

340 compression: Optional[grpc.Compression] = None, 

341 ) -> Tuple[Any, grpc.Call]: 

342 return self._with_call( 

343 request, 

344 timeout=timeout, 

345 metadata=metadata, 

346 credentials=credentials, 

347 wait_for_ready=wait_for_ready, 

348 compression=compression, 

349 ) 

350 

351 def future( 

352 self, 

353 request: Any, 

354 timeout: Optional[float] = None, 

355 metadata: Optional[MetadataType] = None, 

356 credentials: Optional[grpc.CallCredentials] = None, 

357 wait_for_ready: Optional[bool] = None, 

358 compression: Optional[grpc.Compression] = None, 

359 ) -> Any: 

360 client_call_details = _ClientCallDetails( 

361 self._method, 

362 timeout, 

363 metadata, 

364 credentials, 

365 wait_for_ready, 

366 compression, 

367 ) 

368 

369 def continuation(new_details, request): 

370 ( 

371 new_method, 

372 new_timeout, 

373 new_metadata, 

374 new_credentials, 

375 new_wait_for_ready, 

376 new_compression, 

377 ) = _unwrap_client_call_details(new_details, client_call_details) 

378 return self._thunk(new_method).future( 

379 request, 

380 timeout=new_timeout, 

381 metadata=new_metadata, 

382 credentials=new_credentials, 

383 wait_for_ready=new_wait_for_ready, 

384 compression=new_compression, 

385 ) 

386 

387 try: 

388 return self._interceptor.intercept_unary_unary( 

389 continuation, client_call_details, request 

390 ) 

391 except Exception as exception: # pylint:disable=broad-except 

392 return _FailureOutcome(exception, sys.exc_info()[2]) 

393 

394 

395class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 

396 _thunk: Callable 

397 _method: str 

398 _interceptor: grpc.UnaryStreamClientInterceptor 

399 

400 def __init__( 

401 self, 

402 thunk: Callable, 

403 method: str, 

404 interceptor: grpc.UnaryStreamClientInterceptor, 

405 ): 

406 self._thunk = thunk 

407 self._method = method 

408 self._interceptor = interceptor 

409 

410 def __call__( 

411 self, 

412 request: Any, 

413 timeout: Optional[float] = None, 

414 metadata: Optional[MetadataType] = None, 

415 credentials: Optional[grpc.CallCredentials] = None, 

416 wait_for_ready: Optional[bool] = None, 

417 compression: Optional[grpc.Compression] = None, 

418 ): 

419 client_call_details = _ClientCallDetails( 

420 self._method, 

421 timeout, 

422 metadata, 

423 credentials, 

424 wait_for_ready, 

425 compression, 

426 ) 

427 

428 def continuation(new_details, request): 

429 ( 

430 new_method, 

431 new_timeout, 

432 new_metadata, 

433 new_credentials, 

434 new_wait_for_ready, 

435 new_compression, 

436 ) = _unwrap_client_call_details(new_details, client_call_details) 

437 return self._thunk(new_method)( 

438 request, 

439 timeout=new_timeout, 

440 metadata=new_metadata, 

441 credentials=new_credentials, 

442 wait_for_ready=new_wait_for_ready, 

443 compression=new_compression, 

444 ) 

445 

446 try: 

447 return self._interceptor.intercept_unary_stream( 

448 continuation, client_call_details, request 

449 ) 

450 except Exception as exception: # pylint:disable=broad-except 

451 return _FailureOutcome(exception, sys.exc_info()[2]) 

452 

453 

454class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): 

455 _thunk: Callable 

456 _method: str 

457 _interceptor: grpc.StreamUnaryClientInterceptor 

458 

459 def __init__( 

460 self, 

461 thunk: Callable, 

462 method: str, 

463 interceptor: grpc.StreamUnaryClientInterceptor, 

464 ): 

465 self._thunk = thunk 

466 self._method = method 

467 self._interceptor = interceptor 

468 

469 def __call__( 

470 self, 

471 request_iterator: RequestIterableType, 

472 timeout: Optional[float] = None, 

473 metadata: Optional[MetadataType] = None, 

474 credentials: Optional[grpc.CallCredentials] = None, 

475 wait_for_ready: Optional[bool] = None, 

476 compression: Optional[grpc.Compression] = None, 

477 ) -> Any: 

478 response, ignored_call = self._with_call( 

479 request_iterator, 

480 timeout=timeout, 

481 metadata=metadata, 

482 credentials=credentials, 

483 wait_for_ready=wait_for_ready, 

484 compression=compression, 

485 ) 

486 return response 

487 

488 def _with_call( 

489 self, 

490 request_iterator: RequestIterableType, 

491 timeout: Optional[float] = None, 

492 metadata: Optional[MetadataType] = None, 

493 credentials: Optional[grpc.CallCredentials] = None, 

494 wait_for_ready: Optional[bool] = None, 

495 compression: Optional[grpc.Compression] = None, 

496 ) -> Tuple[Any, grpc.Call]: 

497 client_call_details = _ClientCallDetails( 

498 self._method, 

499 timeout, 

500 metadata, 

501 credentials, 

502 wait_for_ready, 

503 compression, 

504 ) 

505 

506 def continuation(new_details, request_iterator): 

507 ( 

508 new_method, 

509 new_timeout, 

510 new_metadata, 

511 new_credentials, 

512 new_wait_for_ready, 

513 new_compression, 

514 ) = _unwrap_client_call_details(new_details, client_call_details) 

515 try: 

516 response, call = self._thunk(new_method).with_call( 

517 request_iterator, 

518 timeout=new_timeout, 

519 metadata=new_metadata, 

520 credentials=new_credentials, 

521 wait_for_ready=new_wait_for_ready, 

522 compression=new_compression, 

523 ) 

524 return _UnaryOutcome(response, call) 

525 except grpc.RpcError as rpc_error: 

526 return rpc_error 

527 except Exception as exception: # pylint:disable=broad-except 

528 return _FailureOutcome(exception, sys.exc_info()[2]) 

529 

530 call = self._interceptor.intercept_stream_unary( 

531 continuation, client_call_details, request_iterator 

532 ) 

533 return call.result(), call 

534 

535 def with_call( 

536 self, 

537 request_iterator: RequestIterableType, 

538 timeout: Optional[float] = None, 

539 metadata: Optional[MetadataType] = None, 

540 credentials: Optional[grpc.CallCredentials] = None, 

541 wait_for_ready: Optional[bool] = None, 

542 compression: Optional[grpc.Compression] = None, 

543 ) -> Tuple[Any, grpc.Call]: 

544 return self._with_call( 

545 request_iterator, 

546 timeout=timeout, 

547 metadata=metadata, 

548 credentials=credentials, 

549 wait_for_ready=wait_for_ready, 

550 compression=compression, 

551 ) 

552 

553 def future( 

554 self, 

555 request_iterator: RequestIterableType, 

556 timeout: Optional[float] = None, 

557 metadata: Optional[MetadataType] = None, 

558 credentials: Optional[grpc.CallCredentials] = None, 

559 wait_for_ready: Optional[bool] = None, 

560 compression: Optional[grpc.Compression] = None, 

561 ) -> Any: 

562 client_call_details = _ClientCallDetails( 

563 self._method, 

564 timeout, 

565 metadata, 

566 credentials, 

567 wait_for_ready, 

568 compression, 

569 ) 

570 

571 def continuation(new_details, request_iterator): 

572 ( 

573 new_method, 

574 new_timeout, 

575 new_metadata, 

576 new_credentials, 

577 new_wait_for_ready, 

578 new_compression, 

579 ) = _unwrap_client_call_details(new_details, client_call_details) 

580 return self._thunk(new_method).future( 

581 request_iterator, 

582 timeout=new_timeout, 

583 metadata=new_metadata, 

584 credentials=new_credentials, 

585 wait_for_ready=new_wait_for_ready, 

586 compression=new_compression, 

587 ) 

588 

589 try: 

590 return self._interceptor.intercept_stream_unary( 

591 continuation, client_call_details, request_iterator 

592 ) 

593 except Exception as exception: # pylint:disable=broad-except 

594 return _FailureOutcome(exception, sys.exc_info()[2]) 

595 

596 

597class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): 

598 _thunk: Callable 

599 _method: str 

600 _interceptor: grpc.StreamStreamClientInterceptor 

601 

602 def __init__( 

603 self, 

604 thunk: Callable, 

605 method: str, 

606 interceptor: grpc.StreamStreamClientInterceptor, 

607 ): 

608 self._thunk = thunk 

609 self._method = method 

610 self._interceptor = interceptor 

611 

612 def __call__( 

613 self, 

614 request_iterator: RequestIterableType, 

615 timeout: Optional[float] = None, 

616 metadata: Optional[MetadataType] = None, 

617 credentials: Optional[grpc.CallCredentials] = None, 

618 wait_for_ready: Optional[bool] = None, 

619 compression: Optional[grpc.Compression] = None, 

620 ): 

621 client_call_details = _ClientCallDetails( 

622 self._method, 

623 timeout, 

624 metadata, 

625 credentials, 

626 wait_for_ready, 

627 compression, 

628 ) 

629 

630 def continuation(new_details, request_iterator): 

631 ( 

632 new_method, 

633 new_timeout, 

634 new_metadata, 

635 new_credentials, 

636 new_wait_for_ready, 

637 new_compression, 

638 ) = _unwrap_client_call_details(new_details, client_call_details) 

639 return self._thunk(new_method)( 

640 request_iterator, 

641 timeout=new_timeout, 

642 metadata=new_metadata, 

643 credentials=new_credentials, 

644 wait_for_ready=new_wait_for_ready, 

645 compression=new_compression, 

646 ) 

647 

648 try: 

649 return self._interceptor.intercept_stream_stream( 

650 continuation, client_call_details, request_iterator 

651 ) 

652 except Exception as exception: # pylint:disable=broad-except 

653 return _FailureOutcome(exception, sys.exc_info()[2]) 

654 

655 

656class _Channel(grpc.Channel): 

657 _channel: grpc.Channel 

658 _interceptor: Union[ 

659 grpc.UnaryUnaryClientInterceptor, 

660 grpc.UnaryStreamClientInterceptor, 

661 grpc.StreamStreamClientInterceptor, 

662 grpc.StreamUnaryClientInterceptor, 

663 ] 

664 

665 def __init__( 

666 self, 

667 channel: grpc.Channel, 

668 interceptor: Union[ 

669 grpc.UnaryUnaryClientInterceptor, 

670 grpc.UnaryStreamClientInterceptor, 

671 grpc.StreamStreamClientInterceptor, 

672 grpc.StreamUnaryClientInterceptor, 

673 ], 

674 ): 

675 self._channel = channel 

676 self._interceptor = interceptor 

677 

678 def subscribe( 

679 self, callback: Callable, try_to_connect: Optional[bool] = False 

680 ): 

681 self._channel.subscribe(callback, try_to_connect=try_to_connect) 

682 

683 def unsubscribe(self, callback: Callable): 

684 self._channel.unsubscribe(callback) 

685 

686 # pylint: disable=arguments-differ 

687 def unary_unary( 

688 self, 

689 method: str, 

690 request_serializer: Optional[SerializingFunction] = None, 

691 response_deserializer: Optional[DeserializingFunction] = None, 

692 _registered_method: Optional[bool] = False, 

693 ) -> grpc.UnaryUnaryMultiCallable: 

694 # pytype: disable=wrong-arg-count 

695 thunk = lambda m: self._channel.unary_unary( 

696 m, 

697 request_serializer, 

698 response_deserializer, 

699 _registered_method, 

700 ) 

701 # pytype: enable=wrong-arg-count 

702 if isinstance(self._interceptor, grpc.UnaryUnaryClientInterceptor): 

703 return _UnaryUnaryMultiCallable(thunk, method, self._interceptor) 

704 return thunk(method) 

705 

706 # pylint: disable=arguments-differ 

707 def unary_stream( 

708 self, 

709 method: str, 

710 request_serializer: Optional[SerializingFunction] = None, 

711 response_deserializer: Optional[DeserializingFunction] = None, 

712 _registered_method: Optional[bool] = False, 

713 ) -> grpc.UnaryStreamMultiCallable: 

714 # pytype: disable=wrong-arg-count 

715 thunk = lambda m: self._channel.unary_stream( 

716 m, 

717 request_serializer, 

718 response_deserializer, 

719 _registered_method, 

720 ) 

721 # pytype: enable=wrong-arg-count 

722 if isinstance(self._interceptor, grpc.UnaryStreamClientInterceptor): 

723 return _UnaryStreamMultiCallable(thunk, method, self._interceptor) 

724 return thunk(method) 

725 

726 # pylint: disable=arguments-differ 

727 def stream_unary( 

728 self, 

729 method: str, 

730 request_serializer: Optional[SerializingFunction] = None, 

731 response_deserializer: Optional[DeserializingFunction] = None, 

732 _registered_method: Optional[bool] = False, 

733 ) -> grpc.StreamUnaryMultiCallable: 

734 # pytype: disable=wrong-arg-count 

735 thunk = lambda m: self._channel.stream_unary( 

736 m, 

737 request_serializer, 

738 response_deserializer, 

739 _registered_method, 

740 ) 

741 # pytype: enable=wrong-arg-count 

742 if isinstance(self._interceptor, grpc.StreamUnaryClientInterceptor): 

743 return _StreamUnaryMultiCallable(thunk, method, self._interceptor) 

744 return thunk(method) 

745 

746 # pylint: disable=arguments-differ 

747 def stream_stream( 

748 self, 

749 method: str, 

750 request_serializer: Optional[SerializingFunction] = None, 

751 response_deserializer: Optional[DeserializingFunction] = None, 

752 _registered_method: Optional[bool] = False, 

753 ) -> grpc.StreamStreamMultiCallable: 

754 # pytype: disable=wrong-arg-count 

755 thunk = lambda m: self._channel.stream_stream( 

756 m, 

757 request_serializer, 

758 response_deserializer, 

759 _registered_method, 

760 ) 

761 # pytype: enable=wrong-arg-count 

762 if isinstance(self._interceptor, grpc.StreamStreamClientInterceptor): 

763 return _StreamStreamMultiCallable(thunk, method, self._interceptor) 

764 return thunk(method) 

765 

766 def _close(self): 

767 self._channel.close() 

768 

769 def __enter__(self): 

770 return self 

771 

772 def __exit__(self, exc_type, exc_val, exc_tb): 

773 self._close() 

774 return False 

775 

776 def close(self): 

777 self._channel.close() 

778 

779 

780def intercept_channel( 

781 channel: grpc.Channel, 

782 *interceptors: Optional[ 

783 Sequence[ 

784 Union[ 

785 grpc.UnaryUnaryClientInterceptor, 

786 grpc.UnaryStreamClientInterceptor, 

787 grpc.StreamStreamClientInterceptor, 

788 grpc.StreamUnaryClientInterceptor, 

789 ] 

790 ] 

791 ], 

792) -> grpc.Channel: 

793 for interceptor in reversed(list(interceptors)): 

794 if ( 

795 not isinstance(interceptor, grpc.UnaryUnaryClientInterceptor) 

796 and not isinstance(interceptor, grpc.UnaryStreamClientInterceptor) 

797 and not isinstance(interceptor, grpc.StreamUnaryClientInterceptor) 

798 and not isinstance(interceptor, grpc.StreamStreamClientInterceptor) 

799 ): 

800 error_msg = ( 

801 "interceptor must be " 

802 "grpc.UnaryUnaryClientInterceptor or " 

803 "grpc.UnaryStreamClientInterceptor or " 

804 "grpc.StreamUnaryClientInterceptor or " 

805 "grpc.StreamStreamClientInterceptor" 

806 ) 

807 raise TypeError(error_msg) 

808 channel = _Channel(channel, interceptor) 

809 return channel