1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helper functions for getting mTLS cert and key."""
16
17import json
18import logging
19from os import environ, getenv, path
20import re
21import subprocess
22
23from google.auth import _agent_identity_utils
24from google.auth import environment_vars
25from google.auth import exceptions
26
27CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"
28CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
29_CERT_PROVIDER_COMMAND = "cert_provider_command"
30_CERT_REGEX = re.compile(
31 b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL
32)
33
34# support various format of key files, e.g.
35# "-----BEGIN PRIVATE KEY-----...",
36# "-----BEGIN EC PRIVATE KEY-----...",
37# "-----BEGIN RSA PRIVATE KEY-----..."
38# "-----BEGIN ENCRYPTED PRIVATE KEY-----"
39_KEY_REGEX = re.compile(
40 b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?",
41 re.DOTALL,
42)
43
44_LOGGER = logging.getLogger(__name__)
45
46
47_PASSPHRASE_REGEX = re.compile(
48 b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL
49)
50
51# Temporary patch to accomodate incorrect cert config in Cloud Run prod environment.
52_WELL_KNOWN_CLOUD_RUN_CERT_PATH = (
53 "/var/run/secrets/workload-spiffe-credentials/certificates.pem"
54)
55_WELL_KNOWN_CLOUD_RUN_KEY_PATH = (
56 "/var/run/secrets/workload-spiffe-credentials/private_key.pem"
57)
58_INCORRECT_CLOUD_RUN_CERT_PATH = (
59 "/var/lib/volumes/certificate/workload-certificates/certificates.pem"
60)
61_INCORRECT_CLOUD_RUN_KEY_PATH = (
62 "/var/lib/volumes/certificate/workload-certificates/private_key.pem"
63)
64
65
66def _check_config_path(config_path):
67 """Checks for config file path. If it exists, returns the absolute path with user expansion;
68 otherwise returns None.
69
70 Args:
71 config_path (str): The config file path for either context_aware_metadata.json or certificate_config.json for example
72
73 Returns:
74 str: absolute path if exists and None otherwise.
75 """
76 config_path = path.expanduser(config_path)
77 if not path.exists(config_path):
78 _LOGGER.debug("%s is not found.", config_path)
79 return None
80 return config_path
81
82
83def _load_json_file(path):
84 """Reads and loads JSON from the given path. Used to read both X509 workload certificate and
85 secure connect configurations.
86
87 Args:
88 path (str): the path to read from.
89
90 Returns:
91 Dict[str, str]: The JSON stored at the file.
92
93 Raises:
94 google.auth.exceptions.ClientCertError: If failed to parse the file as JSON.
95 """
96 try:
97 with open(path) as f:
98 json_data = json.load(f)
99 except ValueError as caught_exc:
100 new_exc = exceptions.ClientCertError(caught_exc)
101 raise new_exc from caught_exc
102
103 return json_data
104
105
106def _get_workload_cert_and_key(certificate_config_path=None):
107 """Read the workload identity cert and key files specified in the certificate config provided.
108 If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG"
109 first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json".
110
111 Args:
112 certificate_config_path (string): The certificate config path. If no path is provided,
113 the environment variable will be checked first, then the well known gcloud location.
114
115 Returns:
116 Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key
117 bytes in PEM format.
118
119 Raises:
120 google.auth.exceptions.ClientCertError: if problems occurs when retrieving
121 the certificate or key information.
122 """
123
124 cert_path, key_path = _get_workload_cert_and_key_paths(certificate_config_path)
125
126 if cert_path is None and key_path is None:
127 return None, None
128
129 return _read_cert_and_key_files(cert_path, key_path)
130
131
132def _get_cert_config_path(certificate_config_path=None):
133 """Get the certificate configuration path based on the following order:
134
135 1: Explicit override, if set
136 2: Environment variable, if set
137 3: Well-known location
138
139 Returns "None" if the selected config file does not exist.
140
141 Args:
142 certificate_config_path (string): The certificate config path. If provided, the well known
143 location and environment variable will be ignored.
144
145 Returns:
146 The absolute path of the certificate config file, and None if the file does not exist.
147 """
148
149 if certificate_config_path is None:
150 env_path = environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, None)
151 if env_path is not None and env_path != "":
152 certificate_config_path = env_path
153 else:
154 certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH
155
156 certificate_config_path = path.expanduser(certificate_config_path)
157 if not path.exists(certificate_config_path):
158 return None
159 return certificate_config_path
160
161
162def _get_workload_cert_and_key_paths(config_path):
163 absolute_path = _get_cert_config_path(config_path)
164 if absolute_path is None:
165 return None, None
166
167 data = _load_json_file(absolute_path)
168
169 if "cert_configs" not in data:
170 raise exceptions.ClientCertError(
171 'Certificate config file {} is in an invalid format, a "cert configs" object is expected'.format(
172 absolute_path
173 )
174 )
175 cert_configs = data["cert_configs"]
176
177 if "workload" not in cert_configs:
178 raise exceptions.ClientCertError(
179 'Certificate config file {} is in an invalid format, a "workload" cert config is expected'.format(
180 absolute_path
181 )
182 )
183 workload = cert_configs["workload"]
184
185 if "cert_path" not in workload:
186 raise exceptions.ClientCertError(
187 'Certificate config file {} is in an invalid format, a "cert_path" is expected in the workload cert config'.format(
188 absolute_path
189 )
190 )
191 cert_path = workload["cert_path"]
192
193 if "key_path" not in workload:
194 raise exceptions.ClientCertError(
195 'Certificate config file {} is in an invalid format, a "key_path" is expected in the workload cert config'.format(
196 absolute_path
197 )
198 )
199 key_path = workload["key_path"]
200
201 # == BEGIN Temporary Cloud Run PATCH ==
202 # See https://github.com/googleapis/google-auth-library-python/issues/1881
203 if (cert_path == _INCORRECT_CLOUD_RUN_CERT_PATH) and (
204 key_path == _INCORRECT_CLOUD_RUN_KEY_PATH
205 ):
206 if not path.exists(cert_path) and not path.exists(key_path):
207 _LOGGER.debug(
208 "Applying Cloud Run certificate path patch. "
209 "Configured paths not found: %s, %s. "
210 "Using well-known paths: %s, %s",
211 cert_path,
212 key_path,
213 _WELL_KNOWN_CLOUD_RUN_CERT_PATH,
214 _WELL_KNOWN_CLOUD_RUN_KEY_PATH,
215 )
216 cert_path = _WELL_KNOWN_CLOUD_RUN_CERT_PATH
217 key_path = _WELL_KNOWN_CLOUD_RUN_KEY_PATH
218 # == END Temporary Cloud Run PATCH ==
219
220 return cert_path, key_path
221
222
223def _read_cert_and_key_files(cert_path, key_path):
224 cert_data = _read_cert_file(cert_path)
225 key_data = _read_key_file(key_path)
226
227 return cert_data, key_data
228
229
230def _read_cert_file(cert_path):
231 with open(cert_path, "rb") as cert_file:
232 cert_data = cert_file.read()
233
234 cert_match = re.findall(_CERT_REGEX, cert_data)
235 if len(cert_match) != 1:
236 raise exceptions.ClientCertError(
237 "Certificate file {} is in an invalid format, a single PEM formatted certificate is expected".format(
238 cert_path
239 )
240 )
241 return cert_match[0]
242
243
244def _read_key_file(key_path):
245 with open(key_path, "rb") as key_file:
246 key_data = key_file.read()
247
248 key_match = re.findall(_KEY_REGEX, key_data)
249 if len(key_match) != 1:
250 raise exceptions.ClientCertError(
251 "Private key file {} is in an invalid format, a single PEM formatted private key is expected".format(
252 key_path
253 )
254 )
255
256 return key_match[0]
257
258
259def _run_cert_provider_command(command, expect_encrypted_key=False):
260 """Run the provided command, and return client side mTLS cert, key and
261 passphrase.
262
263 Args:
264 command (List[str]): cert provider command.
265 expect_encrypted_key (bool): If encrypted private key is expected.
266
267 Returns:
268 Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key
269 bytes in PEM format and passphrase bytes.
270
271 Raises:
272 google.auth.exceptions.ClientCertError: if problems occurs when running
273 the cert provider command or generating cert, key and passphrase.
274 """
275 try:
276 process = subprocess.Popen(
277 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
278 )
279 stdout, stderr = process.communicate()
280 except OSError as caught_exc:
281 new_exc = exceptions.ClientCertError(caught_exc)
282 raise new_exc from caught_exc
283
284 # Check cert provider command execution error.
285 if process.returncode != 0:
286 raise exceptions.ClientCertError(
287 "Cert provider command returns non-zero status code %s" % process.returncode
288 )
289
290 # Extract certificate (chain), key and passphrase.
291 cert_match = re.findall(_CERT_REGEX, stdout)
292 if len(cert_match) != 1:
293 raise exceptions.ClientCertError("Client SSL certificate is missing or invalid")
294 key_match = re.findall(_KEY_REGEX, stdout)
295 if len(key_match) != 1:
296 raise exceptions.ClientCertError("Client SSL key is missing or invalid")
297 passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout)
298
299 if expect_encrypted_key:
300 if len(passphrase_match) != 1:
301 raise exceptions.ClientCertError("Passphrase is missing or invalid")
302 if b"ENCRYPTED" not in key_match[0]:
303 raise exceptions.ClientCertError("Encrypted private key is expected")
304 return cert_match[0], key_match[0], passphrase_match[0].strip()
305
306 if b"ENCRYPTED" in key_match[0]:
307 raise exceptions.ClientCertError("Encrypted private key is not expected")
308 if len(passphrase_match) > 0:
309 raise exceptions.ClientCertError("Passphrase is not expected")
310 return cert_match[0], key_match[0], None
311
312
313def get_client_ssl_credentials(
314 generate_encrypted_key=False,
315 context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH,
316 certificate_config_path=None,
317):
318 """Returns the client side certificate, private key and passphrase.
319
320 We look for certificates and keys with the following order of priority:
321 1. Certificate and key specified by certificate_config.json.
322 Currently, only X.509 workload certificates are supported.
323 2. Certificate and key specified by context aware metadata (i.e. SecureConnect).
324
325 Args:
326 generate_encrypted_key (bool): If set to True, encrypted private key
327 and passphrase will be generated; otherwise, unencrypted private key
328 will be generated and passphrase will be None. This option only
329 affects keys obtained via context_aware_metadata.json.
330 context_aware_metadata_path (str): The context_aware_metadata.json file path.
331 certificate_config_path (str): The certificate_config.json file path.
332
333 Returns:
334 Tuple[bool, bytes, bytes, bytes]:
335 A boolean indicating if cert, key and passphrase are obtained, the
336 cert bytes and key bytes both in PEM format, and passphrase bytes.
337
338 Raises:
339 google.auth.exceptions.ClientCertError: if problems occurs when getting
340 the cert, key and passphrase.
341 """
342
343 # 1. Attempt to retrieve X.509 Workload cert and key.
344 cert, key = _get_workload_cert_and_key(certificate_config_path)
345 if cert and key:
346 return True, cert, key, None
347
348 # 2. Check for context aware metadata json
349 metadata_path = _check_config_path(context_aware_metadata_path)
350
351 if metadata_path:
352 metadata_json = _load_json_file(metadata_path)
353
354 if _CERT_PROVIDER_COMMAND not in metadata_json:
355 raise exceptions.ClientCertError("Cert provider command is not found")
356
357 command = metadata_json[_CERT_PROVIDER_COMMAND]
358
359 if generate_encrypted_key and "--with_passphrase" not in command:
360 command.append("--with_passphrase")
361
362 # Execute the command.
363 cert, key, passphrase = _run_cert_provider_command(
364 command, expect_encrypted_key=generate_encrypted_key
365 )
366 return True, cert, key, passphrase
367
368 return False, None, None, None
369
370
371def get_client_cert_and_key(client_cert_callback=None):
372 """Returns the client side certificate and private key. The function first
373 tries to get certificate and key from client_cert_callback; if the callback
374 is None or doesn't provide certificate and key, the function tries application
375 default SSL credentials.
376
377 Args:
378 client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An
379 optional callback which returns client certificate bytes and private
380 key bytes both in PEM format.
381
382 Returns:
383 Tuple[bool, bytes, bytes]:
384 A boolean indicating if cert and key are obtained, the cert bytes
385 and key bytes both in PEM format.
386
387 Raises:
388 google.auth.exceptions.ClientCertError: if problems occurs when getting
389 the cert and key.
390 """
391 if client_cert_callback:
392 cert, key = client_cert_callback()
393 return True, cert, key
394
395 has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False)
396 return has_cert, cert, key
397
398
399def decrypt_private_key(key, passphrase):
400 """A helper function to decrypt the private key with the given passphrase.
401 google-auth library doesn't support passphrase protected private key for
402 mutual TLS channel. This helper function can be used to decrypt the
403 passphrase protected private key in order to estalish mutual TLS channel.
404
405 For example, if you have a function which produces client cert, passphrase
406 protected private key and passphrase, you can convert it to a client cert
407 callback function accepted by google-auth::
408
409 from google.auth.transport import _mtls_helper
410
411 def your_client_cert_function():
412 return cert, encrypted_key, passphrase
413
414 # callback accepted by google-auth for mutual TLS channel.
415 def client_cert_callback():
416 cert, encrypted_key, passphrase = your_client_cert_function()
417 decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key,
418 passphrase)
419 return cert, decrypted_key
420
421 Args:
422 key (bytes): The private key bytes in PEM format.
423 passphrase (bytes): The passphrase bytes.
424
425 Returns:
426 bytes: The decrypted private key in PEM format.
427
428 Raises:
429 ImportError: If pyOpenSSL is not installed.
430 OpenSSL.crypto.Error: If there is any problem decrypting the private key.
431 """
432 from OpenSSL import crypto
433
434 # First convert encrypted_key_bytes to PKey object
435 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase)
436
437 # Then dump the decrypted key bytes
438 return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
439
440
441def check_use_client_cert():
442 """Returns boolean for whether the client certificate should be used for mTLS.
443
444 If GOOGLE_API_USE_CLIENT_CERTIFICATE is set to true or false, a corresponding
445 bool value will be returned. If the value is set to an unexpected string, it
446 will default to False.
447 If GOOGLE_API_USE_CLIENT_CERTIFICATE is unset, the value will be inferred
448 by reading a file pointed at by GOOGLE_API_CERTIFICATE_CONFIG, and verifying
449 it contains a "workload" section. If so, the function will return True,
450 otherwise False.
451
452 Returns:
453 bool: Whether the client certificate should be used for mTLS connection.
454 """
455 use_client_cert = getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE")
456 # Check if the value of GOOGLE_API_USE_CLIENT_CERTIFICATE is set.
457 if use_client_cert:
458 return use_client_cert.lower() == "true"
459 else:
460 # Check if the value of GOOGLE_API_CERTIFICATE_CONFIG is set.
461 cert_path = getenv("GOOGLE_API_CERTIFICATE_CONFIG")
462 if cert_path:
463 try:
464 with open(cert_path, "r") as f:
465 content = json.load(f)
466 # verify json has workload key
467 content["cert_configs"]["workload"]
468 return True
469 except (
470 FileNotFoundError,
471 OSError,
472 KeyError,
473 TypeError,
474 json.JSONDecodeError,
475 ) as e:
476 _LOGGER.debug("error decoding certificate: %s", e)
477 return False
478
479
480def check_parameters_for_unauthorized_response(cached_cert):
481 """Returns the cached and current cert fingerprint for reconfiguring mTLS.
482
483 Args:
484 cached_cert(bytes): The cached client certificate.
485
486 Returns:
487 bytes: The client callback cert bytes.
488 bytes: The client callback key bytes.
489 str: The base64-encoded SHA256 cached fingerprint.
490 str: The base64-encoded SHA256 current cert fingerprint.
491 """
492 call_cert_bytes, call_key_bytes = _agent_identity_utils.call_client_cert_callback()
493 cert_obj = _agent_identity_utils.parse_certificate(call_cert_bytes)
494 current_cert_fingerprint = _agent_identity_utils.calculate_certificate_fingerprint(
495 cert_obj
496 )
497 if cached_cert:
498 cached_fingerprint = _agent_identity_utils.get_cached_cert_fingerprint(
499 cached_cert
500 )
501 else:
502 cached_fingerprint = current_cert_fingerprint
503 return call_cert_bytes, call_key_bytes, cached_fingerprint, current_cert_fingerprint