Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_plugin_wrapping.py: 55%

62 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-06 06:03 +0000

1# Copyright 2015 gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import collections 

16import logging 

17import threading 

18from typing import Callable, Optional, Type 

19 

20import grpc 

21from grpc import _common 

22from grpc._cython import cygrpc 

23from grpc._typing import MetadataType 

24 

25_LOGGER = logging.getLogger(__name__) 

26 

27 

28class _AuthMetadataContext( 

29 collections.namedtuple('AuthMetadataContext', ( 

30 'service_url', 

31 'method_name', 

32 )), grpc.AuthMetadataContext): 

33 pass 

34 

35 

36class _CallbackState(object): 

37 

38 def __init__(self): 

39 self.lock = threading.Lock() 

40 self.called = False 

41 self.exception = None 

42 

43 

44class _AuthMetadataPluginCallback(grpc.AuthMetadataPluginCallback): 

45 _state: _CallbackState 

46 _callback: Callable 

47 

48 def __init__(self, state: _CallbackState, callback: Callable): 

49 self._state = state 

50 self._callback = callback 

51 

52 def __call__(self, metadata: MetadataType, 

53 error: Optional[Type[BaseException]]): 

54 with self._state.lock: 

55 if self._state.exception is None: 

56 if self._state.called: 

57 raise RuntimeError( 

58 'AuthMetadataPluginCallback invoked more than once!') 

59 else: 

60 self._state.called = True 

61 else: 

62 raise RuntimeError( 

63 'AuthMetadataPluginCallback raised exception "{}"!'.format( 

64 self._state.exception)) 

65 if error is None: 

66 self._callback(metadata, cygrpc.StatusCode.ok, None) 

67 else: 

68 self._callback(None, cygrpc.StatusCode.internal, 

69 _common.encode(str(error))) 

70 

71 

72class _Plugin(object): 

73 _metadata_plugin: grpc.AuthMetadataPlugin 

74 

75 def __init__(self, metadata_plugin: grpc.AuthMetadataPlugin): 

76 self._metadata_plugin = metadata_plugin 

77 self._stored_ctx = None 

78 

79 try: 

80 import contextvars # pylint: disable=wrong-import-position 

81 

82 # The plugin may be invoked on a thread created by Core, which will not 

83 # have the context propagated. This context is stored and installed in 

84 # the thread invoking the plugin. 

85 self._stored_ctx = contextvars.copy_context() 

86 except ImportError: 

87 # Support versions predating contextvars. 

88 pass 

89 

90 def __call__(self, service_url: str, method_name: str, callback: Callable): 

91 context = _AuthMetadataContext(_common.decode(service_url), 

92 _common.decode(method_name)) 

93 callback_state = _CallbackState() 

94 try: 

95 self._metadata_plugin( 

96 context, _AuthMetadataPluginCallback(callback_state, callback)) 

97 except Exception as exception: # pylint: disable=broad-except 

98 _LOGGER.exception( 

99 'AuthMetadataPluginCallback "%s" raised exception!', 

100 self._metadata_plugin) 

101 with callback_state.lock: 

102 callback_state.exception = exception 

103 if callback_state.called: 

104 return 

105 callback(None, cygrpc.StatusCode.internal, 

106 _common.encode(str(exception))) 

107 

108 

109def metadata_plugin_call_credentials( 

110 metadata_plugin: grpc.AuthMetadataPlugin, 

111 name: Optional[str]) -> grpc.CallCredentials: 

112 if name is None: 

113 try: 

114 effective_name = metadata_plugin.__name__ 

115 except AttributeError: 

116 effective_name = metadata_plugin.__class__.__name__ 

117 else: 

118 effective_name = name 

119 return grpc.CallCredentials( 

120 cygrpc.MetadataPluginCallCredentials(_Plugin(metadata_plugin), 

121 _common.encode(effective_name)))