1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helper functions for getting mTLS cert and key."""
16
17import json
18import logging
19from os import environ, getenv, path
20import re
21import subprocess
22
23from google.auth import _agent_identity_utils
24from google.auth import environment_vars
25from google.auth import exceptions
26
27CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"
28
29# Default gcloud config path, to be used with path.expanduser for cross-platform compatibility.
30CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
31_CERT_PROVIDER_COMMAND = "cert_provider_command"
32_CERT_REGEX = re.compile(
33 b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL
34)
35
36# support various format of key files, e.g.
37# "-----BEGIN PRIVATE KEY-----...",
38# "-----BEGIN EC PRIVATE KEY-----...",
39# "-----BEGIN RSA PRIVATE KEY-----..."
40# "-----BEGIN ENCRYPTED PRIVATE KEY-----"
41_KEY_REGEX = re.compile(
42 b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?",
43 re.DOTALL,
44)
45
46_LOGGER = logging.getLogger(__name__)
47
48
49_PASSPHRASE_REGEX = re.compile(
50 b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL
51)
52
53# Temporary patch to accomodate incorrect cert config in Cloud Run prod environment.
54_WELL_KNOWN_CLOUD_RUN_CERT_PATH = (
55 "/var/run/secrets/workload-spiffe-credentials/certificates.pem"
56)
57_WELL_KNOWN_CLOUD_RUN_KEY_PATH = (
58 "/var/run/secrets/workload-spiffe-credentials/private_key.pem"
59)
60_INCORRECT_CLOUD_RUN_CERT_PATH = (
61 "/var/lib/volumes/certificate/workload-certificates/certificates.pem"
62)
63_INCORRECT_CLOUD_RUN_KEY_PATH = (
64 "/var/lib/volumes/certificate/workload-certificates/private_key.pem"
65)
66
67
68def _check_config_path(config_path):
69 """Checks for config file path. If it exists, returns the absolute path with user expansion;
70 otherwise returns None.
71
72 Args:
73 config_path (str): The config file path for either context_aware_metadata.json or certificate_config.json for example
74
75 Returns:
76 str: absolute path if exists and None otherwise.
77 """
78 config_path = path.expanduser(config_path)
79 if not path.exists(config_path):
80 _LOGGER.debug("%s is not found.", config_path)
81 return None
82 return config_path
83
84
85def _load_json_file(path):
86 """Reads and loads JSON from the given path. Used to read both X509 workload certificate and
87 secure connect configurations.
88
89 Args:
90 path (str): the path to read from.
91
92 Returns:
93 Dict[str, str]: The JSON stored at the file.
94
95 Raises:
96 google.auth.exceptions.ClientCertError: If failed to parse the file as JSON.
97 """
98 try:
99 with open(path) as f:
100 json_data = json.load(f)
101 except ValueError as caught_exc:
102 new_exc = exceptions.ClientCertError(caught_exc)
103 raise new_exc from caught_exc
104
105 return json_data
106
107
108def _get_workload_cert_and_key(
109 certificate_config_path=None, include_context_aware=True
110):
111 """Read the workload identity cert and key files specified in the certificate config provided.
112 If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG"
113 first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json".
114
115 Args:
116 certificate_config_path (string): The certificate config path. If no path is provided,
117 the environment variable will be checked first, then the well known gcloud location.
118 include_context_aware (bool): If context aware metadata path should be checked for the
119 SecureConnect mTLS configuration.
120
121 Returns:
122 Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key
123 bytes in PEM format.
124
125 Raises:
126 google.auth.exceptions.ClientCertError: if problems occurs when retrieving
127 the certificate or key information.
128 """
129
130 cert_path, key_path = _get_workload_cert_and_key_paths(
131 certificate_config_path, include_context_aware
132 )
133
134 if cert_path is None and key_path is None:
135 return None, None
136
137 return _read_cert_and_key_files(cert_path, key_path)
138
139
140def _get_cert_config_path(certificate_config_path=None, include_context_aware=True):
141 """Get the certificate configuration path based on the following order:
142
143 1: Explicit override, if set
144 2: Environment variable, if set
145 3: Well-known location
146
147 Returns "None" if the selected config file does not exist.
148
149 Args:
150 certificate_config_path (string): The certificate config path. If provided, the well known
151 location and environment variable will be ignored.
152 include_context_aware (bool): If context aware metadata path should be checked for the
153 SecureConnect mTLS configuration.
154
155 Returns:
156 The absolute path of the certificate config file, and None if the file does not exist.
157 """
158
159 if certificate_config_path is None:
160 env_path = environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, None)
161 if env_path is not None and env_path != "":
162 certificate_config_path = env_path
163 else:
164 env_path = environ.get(
165 environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH,
166 None,
167 )
168 if include_context_aware and env_path is not None and env_path != "":
169 certificate_config_path = env_path
170 else:
171 certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH
172
173 certificate_config_path = path.expanduser(certificate_config_path)
174 if not path.exists(certificate_config_path):
175 return None
176 return certificate_config_path
177
178
179def _get_workload_cert_and_key_paths(config_path, include_context_aware=True):
180 absolute_path = _get_cert_config_path(config_path, include_context_aware)
181 if absolute_path is None:
182 return None, None
183
184 data = _load_json_file(absolute_path)
185
186 if "cert_configs" not in data:
187 raise exceptions.ClientCertError(
188 'Certificate config file {} is in an invalid format, a "cert configs" object is expected'.format(
189 absolute_path
190 )
191 )
192 cert_configs = data["cert_configs"]
193
194 # We return None, None if the expected workload fields are not present.
195 # The certificate config might be present for other types of connections (e.g. gECC),
196 # and we want to gracefully fallback to testing other mTLS configurations
197 # like SecureConnect instead of throwing an exception.
198
199 if "workload" not in cert_configs:
200 return None, None
201 workload = cert_configs["workload"]
202
203 if "cert_path" not in workload:
204 return None, None
205 cert_path = workload["cert_path"]
206
207 if "key_path" not in workload:
208 return None, None
209 key_path = workload["key_path"]
210
211 # == BEGIN Temporary Cloud Run PATCH ==
212 # See https://github.com/googleapis/google-auth-library-python/issues/1881
213 if (cert_path == _INCORRECT_CLOUD_RUN_CERT_PATH) and (
214 key_path == _INCORRECT_CLOUD_RUN_KEY_PATH
215 ):
216 if not path.exists(cert_path) and not path.exists(key_path):
217 _LOGGER.debug(
218 "Applying Cloud Run certificate path patch. "
219 "Configured paths not found: %s, %s. "
220 "Using well-known paths: %s, %s",
221 cert_path,
222 key_path,
223 _WELL_KNOWN_CLOUD_RUN_CERT_PATH,
224 _WELL_KNOWN_CLOUD_RUN_KEY_PATH,
225 )
226 cert_path = _WELL_KNOWN_CLOUD_RUN_CERT_PATH
227 key_path = _WELL_KNOWN_CLOUD_RUN_KEY_PATH
228 # == END Temporary Cloud Run PATCH ==
229
230 return cert_path, key_path
231
232
233def _read_cert_and_key_files(cert_path, key_path):
234 cert_data = _read_cert_file(cert_path)
235 key_data = _read_key_file(key_path)
236
237 return cert_data, key_data
238
239
240def _read_cert_file(cert_path):
241 with open(cert_path, "rb") as cert_file:
242 cert_data = cert_file.read()
243
244 cert_match = re.findall(_CERT_REGEX, cert_data)
245 if len(cert_match) != 1:
246 raise exceptions.ClientCertError(
247 "Certificate file {} is in an invalid format, a single PEM formatted certificate is expected".format(
248 cert_path
249 )
250 )
251 return cert_match[0]
252
253
254def _read_key_file(key_path):
255 with open(key_path, "rb") as key_file:
256 key_data = key_file.read()
257
258 key_match = re.findall(_KEY_REGEX, key_data)
259 if len(key_match) != 1:
260 raise exceptions.ClientCertError(
261 "Private key file {} is in an invalid format, a single PEM formatted private key is expected".format(
262 key_path
263 )
264 )
265
266 return key_match[0]
267
268
269def _run_cert_provider_command(command, expect_encrypted_key=False):
270 """Run the provided command, and return client side mTLS cert, key and
271 passphrase.
272
273 Args:
274 command (List[str]): cert provider command.
275 expect_encrypted_key (bool): If encrypted private key is expected.
276
277 Returns:
278 Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key
279 bytes in PEM format and passphrase bytes.
280
281 Raises:
282 google.auth.exceptions.ClientCertError: if problems occurs when running
283 the cert provider command or generating cert, key and passphrase.
284 """
285 try:
286 process = subprocess.Popen(
287 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
288 )
289 stdout, stderr = process.communicate()
290 except OSError as caught_exc:
291 new_exc = exceptions.ClientCertError(caught_exc)
292 raise new_exc from caught_exc
293
294 # Check cert provider command execution error.
295 if process.returncode != 0:
296 raise exceptions.ClientCertError(
297 "Cert provider command returns non-zero status code %s" % process.returncode
298 )
299
300 # Extract certificate (chain), key and passphrase.
301 cert_match = re.findall(_CERT_REGEX, stdout)
302 if len(cert_match) != 1:
303 raise exceptions.ClientCertError("Client SSL certificate is missing or invalid")
304 key_match = re.findall(_KEY_REGEX, stdout)
305 if len(key_match) != 1:
306 raise exceptions.ClientCertError("Client SSL key is missing or invalid")
307 passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout)
308
309 if expect_encrypted_key:
310 if len(passphrase_match) != 1:
311 raise exceptions.ClientCertError("Passphrase is missing or invalid")
312 if b"ENCRYPTED" not in key_match[0]:
313 raise exceptions.ClientCertError("Encrypted private key is expected")
314 return cert_match[0], key_match[0], passphrase_match[0].strip()
315
316 if b"ENCRYPTED" in key_match[0]:
317 raise exceptions.ClientCertError("Encrypted private key is not expected")
318 if len(passphrase_match) > 0:
319 raise exceptions.ClientCertError("Passphrase is not expected")
320 return cert_match[0], key_match[0], None
321
322
323def get_client_ssl_credentials(
324 generate_encrypted_key=False,
325 context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH,
326 certificate_config_path=None,
327):
328 """Returns the client side certificate, private key and passphrase.
329
330 We look for certificates and keys with the following order of priority:
331 1. Certificate and key specified by certificate_config.json.
332 Currently, only X.509 workload certificates are supported.
333 2. Certificate and key specified by context aware metadata (i.e. SecureConnect).
334
335 Args:
336 generate_encrypted_key (bool): If set to True, encrypted private key
337 and passphrase will be generated; otherwise, unencrypted private key
338 will be generated and passphrase will be None. This option only
339 affects keys obtained via context_aware_metadata.json.
340 context_aware_metadata_path (str): The context_aware_metadata.json file path.
341 certificate_config_path (str): The certificate_config.json file path.
342
343 Returns:
344 Tuple[bool, bytes, bytes, bytes]:
345 A boolean indicating if cert, key and passphrase are obtained, the
346 cert bytes and key bytes both in PEM format, and passphrase bytes.
347
348 Raises:
349 google.auth.exceptions.ClientCertError: if problems occurs when getting
350 the cert, key and passphrase.
351 """
352
353 # 1. Attempt to retrieve X.509 Workload cert and key.
354 cert, key = _get_workload_cert_and_key(certificate_config_path)
355 if cert and key:
356 return True, cert, key, None
357
358 # 2. Check for context aware metadata json
359 metadata_path = _check_config_path(context_aware_metadata_path)
360
361 if metadata_path:
362 metadata_json = _load_json_file(metadata_path)
363
364 if _CERT_PROVIDER_COMMAND not in metadata_json:
365 raise exceptions.ClientCertError("Cert provider command is not found")
366
367 command = metadata_json[_CERT_PROVIDER_COMMAND]
368
369 if generate_encrypted_key and "--with_passphrase" not in command:
370 command.append("--with_passphrase")
371
372 # Execute the command.
373 cert, key, passphrase = _run_cert_provider_command(
374 command, expect_encrypted_key=generate_encrypted_key
375 )
376 return True, cert, key, passphrase
377
378 return False, None, None, None
379
380
381def get_client_cert_and_key(client_cert_callback=None):
382 """Returns the client side certificate and private key. The function first
383 tries to get certificate and key from client_cert_callback; if the callback
384 is None or doesn't provide certificate and key, the function tries application
385 default SSL credentials.
386
387 Args:
388 client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An
389 optional callback which returns client certificate bytes and private
390 key bytes both in PEM format.
391
392 Returns:
393 Tuple[bool, bytes, bytes]:
394 A boolean indicating if cert and key are obtained, the cert bytes
395 and key bytes both in PEM format.
396
397 Raises:
398 google.auth.exceptions.ClientCertError: if problems occurs when getting
399 the cert and key.
400 """
401 if client_cert_callback:
402 cert, key = client_cert_callback()
403 return True, cert, key
404
405 has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False)
406 return has_cert, cert, key
407
408
409def decrypt_private_key(key, passphrase):
410 """A helper function to decrypt the private key with the given passphrase.
411 google-auth library doesn't support passphrase protected private key for
412 mutual TLS channel. This helper function can be used to decrypt the
413 passphrase protected private key in order to estalish mutual TLS channel.
414
415 For example, if you have a function which produces client cert, passphrase
416 protected private key and passphrase, you can convert it to a client cert
417 callback function accepted by google-auth::
418
419 from google.auth.transport import _mtls_helper
420
421 def your_client_cert_function():
422 return cert, encrypted_key, passphrase
423
424 # callback accepted by google-auth for mutual TLS channel.
425 def client_cert_callback():
426 cert, encrypted_key, passphrase = your_client_cert_function()
427 decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key,
428 passphrase)
429 return cert, decrypted_key
430
431 Args:
432 key (bytes): The private key bytes in PEM format.
433 passphrase (bytes): The passphrase bytes.
434
435 Returns:
436 bytes: The decrypted private key in PEM format.
437
438 Raises:
439 ImportError: If pyOpenSSL is not installed.
440 OpenSSL.crypto.Error: If there is any problem decrypting the private key.
441 """
442 from OpenSSL import crypto
443
444 # First convert encrypted_key_bytes to PKey object
445 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase)
446
447 # Then dump the decrypted key bytes
448 return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
449
450
451def check_use_client_cert():
452 """Returns boolean for whether the client certificate should be used for mTLS.
453
454 If GOOGLE_API_USE_CLIENT_CERTIFICATE is set to true or false, a corresponding
455 bool value will be returned. If the value is set to an unexpected string, it
456 will default to False.
457 If GOOGLE_API_USE_CLIENT_CERTIFICATE is unset, the value will be inferred
458 by reading a file pointed at by GOOGLE_API_CERTIFICATE_CONFIG, and verifying
459 it contains a "workload" section. If so, the function will return True,
460 otherwise False.
461
462 Returns:
463 bool: Whether the client certificate should be used for mTLS connection.
464 """
465 use_client_cert = getenv(environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE)
466 if use_client_cert is None or use_client_cert == "":
467 use_client_cert = getenv(
468 environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE
469 )
470
471 # Check if the value of GOOGLE_API_USE_CLIENT_CERTIFICATE is set.
472 if use_client_cert:
473 return use_client_cert.lower() == "true"
474 else:
475 # Check if the value of GOOGLE_API_CERTIFICATE_CONFIG is set.
476 cert_path = getenv(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
477 if cert_path is None:
478 cert_path = getenv(
479 environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH
480 )
481
482 if cert_path:
483 try:
484 with open(cert_path, "r") as f:
485 content = json.load(f)
486 # verify json has workload key
487 content["cert_configs"]["workload"]
488 return True
489 except (
490 FileNotFoundError,
491 OSError,
492 KeyError,
493 TypeError,
494 json.JSONDecodeError,
495 ) as e:
496 _LOGGER.debug("error decoding certificate: %s", e)
497 return False
498
499
500def check_parameters_for_unauthorized_response(cached_cert):
501 """Returns the cached and current cert fingerprint for reconfiguring mTLS.
502
503 Args:
504 cached_cert(bytes): The cached client certificate.
505
506 Returns:
507 bytes: The client callback cert bytes.
508 bytes: The client callback key bytes.
509 str: The base64-encoded SHA256 cached fingerprint.
510 str: The base64-encoded SHA256 current cert fingerprint.
511 """
512 call_cert_bytes, call_key_bytes = call_client_cert_callback()
513 cert_obj = _agent_identity_utils.parse_certificate(call_cert_bytes)
514 current_cert_fingerprint = _agent_identity_utils.calculate_certificate_fingerprint(
515 cert_obj
516 )
517 if cached_cert:
518 cached_fingerprint = _agent_identity_utils.get_cached_cert_fingerprint(
519 cached_cert
520 )
521 else:
522 cached_fingerprint = current_cert_fingerprint
523 return call_cert_bytes, call_key_bytes, cached_fingerprint, current_cert_fingerprint
524
525
526def call_client_cert_callback():
527 """Calls the client cert callback and returns the certificate and key."""
528 _, cert_bytes, key_bytes, passphrase = get_client_ssl_credentials(
529 generate_encrypted_key=True
530 )
531 return cert_bytes, key_bytes