1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Authorization support for gRPC."""
16
17from __future__ import absolute_import
18
19import logging
20
21from google.auth import exceptions
22from google.auth.transport import _mtls_helper
23from google.oauth2 import service_account
24
25try:
26 import grpc # type: ignore
27except ImportError as caught_exc: # pragma: NO COVER
28 raise ImportError(
29 "gRPC is not installed from please install the grpcio package to use the gRPC transport."
30 ) from caught_exc
31
32_LOGGER = logging.getLogger(__name__)
33
34
35class AuthMetadataPlugin(grpc.AuthMetadataPlugin):
36 """A `gRPC AuthMetadataPlugin`_ that inserts the credentials into each
37 request.
38
39 .. _gRPC AuthMetadataPlugin:
40 http://www.grpc.io/grpc/python/grpc.html#grpc.AuthMetadataPlugin
41
42 Args:
43 credentials (google.auth.credentials.Credentials): The credentials to
44 add to requests.
45 request (google.auth.transport.Request): A HTTP transport request
46 object used to refresh credentials as needed.
47 default_host (Optional[str]): A host like "pubsub.googleapis.com".
48 This is used when a self-signed JWT is created from service
49 account credentials.
50 """
51
52 def __init__(self, credentials, request, default_host=None):
53 # pylint: disable=no-value-for-parameter
54 # pylint doesn't realize that the super method takes no arguments
55 # because this class is the same name as the superclass.
56 super(AuthMetadataPlugin, self).__init__()
57 self._credentials = credentials
58 self._request = request
59 self._default_host = default_host
60
61 def _get_authorization_headers(self, context):
62 """Gets the authorization headers for a request.
63
64 Returns:
65 Sequence[Tuple[str, str]]: A list of request headers (key, value)
66 to add to the request.
67 """
68 headers = {}
69
70 # https://google.aip.dev/auth/4111
71 # Attempt to use self-signed JWTs when a service account is used.
72 # A default host must be explicitly provided since it cannot always
73 # be determined from the context.service_url.
74 if isinstance(self._credentials, service_account.Credentials):
75 self._credentials._create_self_signed_jwt(
76 "https://{}/".format(self._default_host) if self._default_host else None
77 )
78
79 self._credentials.before_request(
80 self._request, context.method_name, context.service_url, headers
81 )
82
83 return list(headers.items())
84
85 def __call__(self, context, callback):
86 """Passes authorization metadata into the given callback.
87
88 Args:
89 context (grpc.AuthMetadataContext): The RPC context.
90 callback (grpc.AuthMetadataPluginCallback): The callback that will
91 be invoked to pass in the authorization metadata.
92 """
93 callback(self._get_authorization_headers(context), None)
94
95
96def secure_authorized_channel(
97 credentials,
98 request,
99 target,
100 ssl_credentials=None,
101 client_cert_callback=None,
102 **kwargs
103):
104 """Creates a secure authorized gRPC channel.
105
106 This creates a channel with SSL and :class:`AuthMetadataPlugin`. This
107 channel can be used to create a stub that can make authorized requests.
108 Users can configure client certificate or rely on device certificates to
109 establish a mutual TLS channel, if the `GOOGLE_API_USE_CLIENT_CERTIFICATE`
110 variable is explicitly set to `true`.
111
112 Example::
113
114 import google.auth
115 import google.auth.transport.grpc
116 import google.auth.transport.requests
117 from google.cloud.speech.v1 import cloud_speech_pb2
118
119 # Get credentials.
120 credentials, _ = google.auth.default()
121
122 # Get an HTTP request function to refresh credentials.
123 request = google.auth.transport.requests.Request()
124
125 # Create a channel.
126 channel = google.auth.transport.grpc.secure_authorized_channel(
127 credentials, regular_endpoint, request,
128 ssl_credentials=grpc.ssl_channel_credentials())
129
130 # Use the channel to create a stub.
131 cloud_speech.create_Speech_stub(channel)
132
133 Usage:
134
135 There are actually a couple of options to create a channel, depending on if
136 you want to create a regular or mutual TLS channel.
137
138 First let's list the endpoints (regular vs mutual TLS) to choose from::
139
140 regular_endpoint = 'speech.googleapis.com:443'
141 mtls_endpoint = 'speech.mtls.googleapis.com:443'
142
143 Option 1: create a regular (non-mutual) TLS channel by explicitly setting
144 the ssl_credentials::
145
146 regular_ssl_credentials = grpc.ssl_channel_credentials()
147
148 channel = google.auth.transport.grpc.secure_authorized_channel(
149 credentials, regular_endpoint, request,
150 ssl_credentials=regular_ssl_credentials)
151
152 Option 2: create a mutual TLS channel by calling a callback which returns
153 the client side certificate and the key (Note that
154 `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable must be explicitly
155 set to `true`)::
156
157 def my_client_cert_callback():
158 code_to_load_client_cert_and_key()
159 if loaded:
160 return (pem_cert_bytes, pem_key_bytes)
161 raise MyClientCertFailureException()
162
163 try:
164 channel = google.auth.transport.grpc.secure_authorized_channel(
165 credentials, mtls_endpoint, request,
166 client_cert_callback=my_client_cert_callback)
167 except MyClientCertFailureException:
168 # handle the exception
169
170 Option 3: use application default SSL credentials. It searches and uses
171 the command in a context aware metadata file, which is available on devices
172 with endpoint verification support (Note that
173 `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable must be explicitly
174 set to `true`).
175 See https://cloud.google.com/endpoint-verification/docs/overview::
176
177 try:
178 default_ssl_credentials = SslCredentials()
179 except:
180 # Exception can be raised if the context aware metadata is malformed.
181 # See :class:`SslCredentials` for the possible exceptions.
182
183 # Choose the endpoint based on the SSL credentials type.
184 if default_ssl_credentials.is_mtls:
185 endpoint_to_use = mtls_endpoint
186 else:
187 endpoint_to_use = regular_endpoint
188 channel = google.auth.transport.grpc.secure_authorized_channel(
189 credentials, endpoint_to_use, request,
190 ssl_credentials=default_ssl_credentials)
191
192 Option 4: not setting ssl_credentials and client_cert_callback. For devices
193 without endpoint verification support or `GOOGLE_API_USE_CLIENT_CERTIFICATE`
194 environment variable is not `true`, a regular TLS channel is created;
195 otherwise, a mutual TLS channel is created, however, the call should be
196 wrapped in a try/except block in case of malformed context aware metadata.
197
198 The following code uses regular_endpoint, it works the same no matter the
199 created channle is regular or mutual TLS. Regular endpoint ignores client
200 certificate and key::
201
202 channel = google.auth.transport.grpc.secure_authorized_channel(
203 credentials, regular_endpoint, request)
204
205 The following code uses mtls_endpoint, if the created channle is regular,
206 and API mtls_endpoint is confgured to require client SSL credentials, API
207 calls using this channel will be rejected::
208
209 channel = google.auth.transport.grpc.secure_authorized_channel(
210 credentials, mtls_endpoint, request)
211
212 Args:
213 credentials (google.auth.credentials.Credentials): The credentials to
214 add to requests.
215 request (google.auth.transport.Request): A HTTP transport request
216 object used to refresh credentials as needed. Even though gRPC
217 is a separate transport, there's no way to refresh the credentials
218 without using a standard http transport.
219 target (str): The host and port of the service.
220 ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
221 credentials. This can be used to specify different certificates.
222 This argument is mutually exclusive with client_cert_callback;
223 providing both will raise an exception.
224 If ssl_credentials and client_cert_callback are None, application
225 default SSL credentials are used if `GOOGLE_API_USE_CLIENT_CERTIFICATE`
226 environment variable is explicitly set to `true`, otherwise one way TLS
227 SSL credentials are used.
228 client_cert_callback (Callable[[], (bytes, bytes)]): Optional
229 callback function to obtain client certicate and key for mutual TLS
230 connection. This argument is mutually exclusive with
231 ssl_credentials; providing both will raise an exception.
232 This argument does nothing unless `GOOGLE_API_USE_CLIENT_CERTIFICATE`
233 environment variable is explicitly set to `true`.
234 kwargs: Additional arguments to pass to :func:`grpc.secure_channel`.
235
236 Returns:
237 grpc.Channel: The created gRPC channel.
238
239 Raises:
240 google.auth.exceptions.MutualTLSChannelError: If mutual TLS channel
241 creation failed for any reason.
242 """
243 # Create the metadata plugin for inserting the authorization header.
244 metadata_plugin = AuthMetadataPlugin(credentials, request)
245
246 # Create a set of grpc.CallCredentials using the metadata plugin.
247 google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin)
248
249 if ssl_credentials and client_cert_callback:
250 raise exceptions.MalformedError(
251 "Received both ssl_credentials and client_cert_callback; "
252 "these are mutually exclusive."
253 )
254
255 # If SSL credentials are not explicitly set, try client_cert_callback and ADC.
256 if not ssl_credentials:
257 use_client_cert = _mtls_helper.check_use_client_cert()
258 if use_client_cert and client_cert_callback:
259 # Use the callback if provided.
260 cert, key = client_cert_callback()
261 ssl_credentials = grpc.ssl_channel_credentials(
262 certificate_chain=cert, private_key=key
263 )
264 elif use_client_cert:
265 # Use application default SSL credentials.
266 adc_ssl_credentils = SslCredentials()
267 ssl_credentials = adc_ssl_credentils.ssl_credentials
268 else:
269 ssl_credentials = grpc.ssl_channel_credentials()
270
271 # Combine the ssl credentials and the authorization credentials.
272 composite_credentials = grpc.composite_channel_credentials(
273 ssl_credentials, google_auth_credentials
274 )
275
276 return grpc.secure_channel(target, composite_credentials, **kwargs)
277
278
279class SslCredentials:
280 """Class for application default SSL credentials.
281
282 The behavior is controlled by `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment
283 variable whose default value is `false`. Client certificate will not be used
284 unless the environment variable is explicitly set to `true`. See
285 https://google.aip.dev/auth/4114
286
287 If the environment variable is `true`, then for devices with endpoint verification
288 support, a device certificate will be automatically loaded and mutual TLS will
289 be established.
290 See https://cloud.google.com/endpoint-verification/docs/overview.
291 """
292
293 def __init__(self):
294 use_client_cert = _mtls_helper.check_use_client_cert()
295 if not use_client_cert:
296 self._is_mtls = False
297 else:
298 # Load client SSL credentials.
299 metadata_path = _mtls_helper._check_config_path(
300 _mtls_helper.CONTEXT_AWARE_METADATA_PATH
301 )
302 self._is_mtls = metadata_path is not None
303
304 @property
305 def ssl_credentials(self):
306 """Get the created SSL channel credentials.
307
308 For devices with endpoint verification support, if the device certificate
309 loading has any problems, corresponding exceptions will be raised. For
310 a device without endpoint verification support, no exceptions will be
311 raised.
312
313 Returns:
314 grpc.ChannelCredentials: The created grpc channel credentials.
315
316 Raises:
317 google.auth.exceptions.MutualTLSChannelError: If mutual TLS channel
318 creation failed for any reason.
319 """
320 if self._is_mtls:
321 try:
322 _, cert, key, _ = _mtls_helper.get_client_ssl_credentials()
323 self._ssl_credentials = grpc.ssl_channel_credentials(
324 certificate_chain=cert, private_key=key
325 )
326 except exceptions.ClientCertError as caught_exc:
327 new_exc = exceptions.MutualTLSChannelError(caught_exc)
328 raise new_exc from caught_exc
329 else:
330 self._ssl_credentials = grpc.ssl_channel_credentials()
331
332 return self._ssl_credentials
333
334 @property
335 def is_mtls(self):
336 """Indicates if the created SSL channel credentials is mutual TLS."""
337 return self._is_mtls