1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import collections
16import logging
17import threading
18from typing import Callable, Optional, Type
19
20import grpc
21from grpc import _common
22from grpc._cython import cygrpc
23from grpc._typing import MetadataType
24
25_LOGGER = logging.getLogger(__name__)
26
27
28class _AuthMetadataContext(
29 collections.namedtuple(
30 "AuthMetadataContext",
31 (
32 "service_url",
33 "method_name",
34 ),
35 ),
36 grpc.AuthMetadataContext,
37):
38 pass
39
40
41class _CallbackState(object):
42 def __init__(self):
43 self.lock = threading.Lock()
44 self.called = False
45 self.exception = None
46
47
48class _AuthMetadataPluginCallback(grpc.AuthMetadataPluginCallback):
49 _state: _CallbackState
50 _callback: Callable
51
52 def __init__(self, state: _CallbackState, callback: Callable):
53 self._state = state
54 self._callback = callback
55
56 def __call__(
57 self, metadata: MetadataType, error: Optional[Type[BaseException]]
58 ):
59 with self._state.lock:
60 if self._state.exception is None:
61 if self._state.called:
62 error_msg = (
63 "AuthMetadataPluginCallback invoked more than once!"
64 )
65 raise RuntimeError(error_msg)
66 else:
67 self._state.called = True
68 else:
69 error_msg = (
70 "AuthMetadataPluginCallback"
71 'raised exception "{self._state.exception}"!'
72 )
73 raise RuntimeError(error_msg)
74 if error is None:
75 self._callback(metadata, cygrpc.StatusCode.ok, None)
76 else:
77 self._callback(
78 None, cygrpc.StatusCode.internal, _common.encode(str(error))
79 )
80
81
82class _Plugin(object):
83 _metadata_plugin: grpc.AuthMetadataPlugin
84
85 def __init__(self, metadata_plugin: grpc.AuthMetadataPlugin):
86 self._metadata_plugin = metadata_plugin
87 self._stored_ctx = None
88
89 try:
90 import contextvars # pylint: disable=wrong-import-position
91
92 # The plugin may be invoked on a thread created by Core, which will not
93 # have the context propagated. This context is stored and installed in
94 # the thread invoking the plugin.
95 self._stored_ctx = contextvars.copy_context()
96 except ImportError:
97 # Support versions predating contextvars.
98 pass
99
100 def __call__(self, service_url: str, method_name: str, callback: Callable):
101 context = _AuthMetadataContext(
102 _common.decode(service_url), _common.decode(method_name)
103 )
104 callback_state = _CallbackState()
105 try:
106 self._metadata_plugin(
107 context, _AuthMetadataPluginCallback(callback_state, callback)
108 )
109 except Exception as exception: # pylint: disable=broad-except
110 _LOGGER.exception(
111 'AuthMetadataPluginCallback "%s" raised exception!',
112 self._metadata_plugin,
113 )
114 with callback_state.lock:
115 callback_state.exception = exception
116 if callback_state.called:
117 return
118 callback(
119 None, cygrpc.StatusCode.internal, _common.encode(str(exception))
120 )
121
122
123def metadata_plugin_call_credentials(
124 metadata_plugin: grpc.AuthMetadataPlugin, name: Optional[str]
125) -> grpc.CallCredentials:
126 if name is None:
127 try:
128 effective_name = metadata_plugin.__name__
129 except AttributeError:
130 effective_name = metadata_plugin.__class__.__name__
131 else:
132 effective_name = name
133 return grpc.CallCredentials(
134 cygrpc.MetadataPluginCallCredentials(
135 _Plugin(metadata_plugin), _common.encode(effective_name)
136 )
137 )