Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_plugin_wrapping.py: 55%
62 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import collections
16import logging
17import threading
18from typing import Callable, Optional, Type
20import grpc
21from grpc import _common
22from grpc._cython import cygrpc
23from grpc._typing import MetadataType
25_LOGGER = logging.getLogger(__name__)
28class _AuthMetadataContext(
29 collections.namedtuple('AuthMetadataContext', (
30 'service_url',
31 'method_name',
32 )), grpc.AuthMetadataContext):
33 pass
36class _CallbackState(object):
38 def __init__(self):
39 self.lock = threading.Lock()
40 self.called = False
41 self.exception = None
44class _AuthMetadataPluginCallback(grpc.AuthMetadataPluginCallback):
45 _state: _CallbackState
46 _callback: Callable
48 def __init__(self, state: _CallbackState, callback: Callable):
49 self._state = state
50 self._callback = callback
52 def __call__(self, metadata: MetadataType,
53 error: Optional[Type[BaseException]]):
54 with self._state.lock:
55 if self._state.exception is None:
56 if self._state.called:
57 raise RuntimeError(
58 'AuthMetadataPluginCallback invoked more than once!')
59 else:
60 self._state.called = True
61 else:
62 raise RuntimeError(
63 'AuthMetadataPluginCallback raised exception "{}"!'.format(
64 self._state.exception))
65 if error is None:
66 self._callback(metadata, cygrpc.StatusCode.ok, None)
67 else:
68 self._callback(None, cygrpc.StatusCode.internal,
69 _common.encode(str(error)))
72class _Plugin(object):
73 _metadata_plugin: grpc.AuthMetadataPlugin
75 def __init__(self, metadata_plugin: grpc.AuthMetadataPlugin):
76 self._metadata_plugin = metadata_plugin
77 self._stored_ctx = None
79 try:
80 import contextvars # pylint: disable=wrong-import-position
82 # The plugin may be invoked on a thread created by Core, which will not
83 # have the context propagated. This context is stored and installed in
84 # the thread invoking the plugin.
85 self._stored_ctx = contextvars.copy_context()
86 except ImportError:
87 # Support versions predating contextvars.
88 pass
90 def __call__(self, service_url: str, method_name: str, callback: Callable):
91 context = _AuthMetadataContext(_common.decode(service_url),
92 _common.decode(method_name))
93 callback_state = _CallbackState()
94 try:
95 self._metadata_plugin(
96 context, _AuthMetadataPluginCallback(callback_state, callback))
97 except Exception as exception: # pylint: disable=broad-except
98 _LOGGER.exception(
99 'AuthMetadataPluginCallback "%s" raised exception!',
100 self._metadata_plugin)
101 with callback_state.lock:
102 callback_state.exception = exception
103 if callback_state.called:
104 return
105 callback(None, cygrpc.StatusCode.internal,
106 _common.encode(str(exception)))
109def metadata_plugin_call_credentials(
110 metadata_plugin: grpc.AuthMetadataPlugin,
111 name: Optional[str]) -> grpc.CallCredentials:
112 if name is None:
113 try:
114 effective_name = metadata_plugin.__name__
115 except AttributeError:
116 effective_name = metadata_plugin.__class__.__name__
117 else:
118 effective_name = name
119 return grpc.CallCredentials(
120 cygrpc.MetadataPluginCallCredentials(_Plugin(metadata_plugin),
121 _common.encode(effective_name)))