1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helper functions for getting mTLS cert and key."""
16
17import json
18import logging
19from os import environ, path
20import re
21import subprocess
22
23from google.auth import exceptions
24
25CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"
26CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
27_CERTIFICATE_CONFIGURATION_ENV = "GOOGLE_API_CERTIFICATE_CONFIG"
28_CERT_PROVIDER_COMMAND = "cert_provider_command"
29_CERT_REGEX = re.compile(
30 b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL
31)
32
33# support various format of key files, e.g.
34# "-----BEGIN PRIVATE KEY-----...",
35# "-----BEGIN EC PRIVATE KEY-----...",
36# "-----BEGIN RSA PRIVATE KEY-----..."
37# "-----BEGIN ENCRYPTED PRIVATE KEY-----"
38_KEY_REGEX = re.compile(
39 b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?",
40 re.DOTALL,
41)
42
43_LOGGER = logging.getLogger(__name__)
44
45
46_PASSPHRASE_REGEX = re.compile(
47 b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL
48)
49
50
51def _check_config_path(config_path):
52 """Checks for config file path. If it exists, returns the absolute path with user expansion;
53 otherwise returns None.
54
55 Args:
56 config_path (str): The config file path for either context_aware_metadata.json or certificate_config.json for example
57
58 Returns:
59 str: absolute path if exists and None otherwise.
60 """
61 config_path = path.expanduser(config_path)
62 if not path.exists(config_path):
63 _LOGGER.debug("%s is not found.", config_path)
64 return None
65 return config_path
66
67
68def _load_json_file(path):
69 """Reads and loads JSON from the given path. Used to read both X509 workload certificate and
70 secure connect configurations.
71
72 Args:
73 path (str): the path to read from.
74
75 Returns:
76 Dict[str, str]: The JSON stored at the file.
77
78 Raises:
79 google.auth.exceptions.ClientCertError: If failed to parse the file as JSON.
80 """
81 try:
82 with open(path) as f:
83 json_data = json.load(f)
84 except ValueError as caught_exc:
85 new_exc = exceptions.ClientCertError(caught_exc)
86 raise new_exc from caught_exc
87
88 return json_data
89
90
91def _get_workload_cert_and_key(certificate_config_path=None):
92 """Read the workload identity cert and key files specified in the certificate config provided.
93 If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG"
94 first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json".
95
96 Args:
97 certificate_config_path (string): The certificate config path. If no path is provided,
98 the environment variable will be checked first, then the well known gcloud location.
99
100 Returns:
101 Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key
102 bytes in PEM format.
103
104 Raises:
105 google.auth.exceptions.ClientCertError: if problems occurs when retrieving
106 the certificate or key information.
107 """
108
109 cert_path, key_path = _get_workload_cert_and_key_paths(certificate_config_path)
110
111 if cert_path is None and key_path is None:
112 return None, None
113
114 return _read_cert_and_key_files(cert_path, key_path)
115
116
117def _get_cert_config_path(certificate_config_path=None):
118 """Get the certificate configuration path based on the following order:
119
120 1: Explicit override, if set
121 2: Environment variable, if set
122 3: Well-known location
123
124 Returns "None" if the selected config file does not exist.
125
126 Args:
127 certificate_config_path (string): The certificate config path. If provided, the well known
128 location and environment variable will be ignored.
129
130 Returns:
131 The absolute path of the certificate config file, and None if the file does not exist.
132 """
133
134 if certificate_config_path is None:
135 env_path = environ.get(_CERTIFICATE_CONFIGURATION_ENV, None)
136 if env_path is not None and env_path != "":
137 certificate_config_path = env_path
138 else:
139 certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH
140
141 certificate_config_path = path.expanduser(certificate_config_path)
142 if not path.exists(certificate_config_path):
143 return None
144 return certificate_config_path
145
146
147def _get_workload_cert_and_key_paths(config_path):
148 absolute_path = _get_cert_config_path(config_path)
149 if absolute_path is None:
150 return None, None
151
152 data = _load_json_file(absolute_path)
153
154 if "cert_configs" not in data:
155 raise exceptions.ClientCertError(
156 'Certificate config file {} is in an invalid format, a "cert configs" object is expected'.format(
157 absolute_path
158 )
159 )
160 cert_configs = data["cert_configs"]
161
162 if "workload" not in cert_configs:
163 raise exceptions.ClientCertError(
164 'Certificate config file {} is in an invalid format, a "workload" cert config is expected'.format(
165 absolute_path
166 )
167 )
168 workload = cert_configs["workload"]
169
170 if "cert_path" not in workload:
171 raise exceptions.ClientCertError(
172 'Certificate config file {} is in an invalid format, a "cert_path" is expected in the workload cert config'.format(
173 absolute_path
174 )
175 )
176 cert_path = workload["cert_path"]
177
178 if "key_path" not in workload:
179 raise exceptions.ClientCertError(
180 'Certificate config file {} is in an invalid format, a "key_path" is expected in the workload cert config'.format(
181 absolute_path
182 )
183 )
184 key_path = workload["key_path"]
185
186 return cert_path, key_path
187
188
189def _read_cert_and_key_files(cert_path, key_path):
190 cert_data = _read_cert_file(cert_path)
191 key_data = _read_key_file(key_path)
192
193 return cert_data, key_data
194
195
196def _read_cert_file(cert_path):
197 with open(cert_path, "rb") as cert_file:
198 cert_data = cert_file.read()
199
200 cert_match = re.findall(_CERT_REGEX, cert_data)
201 if len(cert_match) != 1:
202 raise exceptions.ClientCertError(
203 "Certificate file {} is in an invalid format, a single PEM formatted certificate is expected".format(
204 cert_path
205 )
206 )
207 return cert_match[0]
208
209
210def _read_key_file(key_path):
211 with open(key_path, "rb") as key_file:
212 key_data = key_file.read()
213
214 key_match = re.findall(_KEY_REGEX, key_data)
215 if len(key_match) != 1:
216 raise exceptions.ClientCertError(
217 "Private key file {} is in an invalid format, a single PEM formatted private key is expected".format(
218 key_path
219 )
220 )
221
222 return key_match[0]
223
224
225def _run_cert_provider_command(command, expect_encrypted_key=False):
226 """Run the provided command, and return client side mTLS cert, key and
227 passphrase.
228
229 Args:
230 command (List[str]): cert provider command.
231 expect_encrypted_key (bool): If encrypted private key is expected.
232
233 Returns:
234 Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key
235 bytes in PEM format and passphrase bytes.
236
237 Raises:
238 google.auth.exceptions.ClientCertError: if problems occurs when running
239 the cert provider command or generating cert, key and passphrase.
240 """
241 try:
242 process = subprocess.Popen(
243 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
244 )
245 stdout, stderr = process.communicate()
246 except OSError as caught_exc:
247 new_exc = exceptions.ClientCertError(caught_exc)
248 raise new_exc from caught_exc
249
250 # Check cert provider command execution error.
251 if process.returncode != 0:
252 raise exceptions.ClientCertError(
253 "Cert provider command returns non-zero status code %s" % process.returncode
254 )
255
256 # Extract certificate (chain), key and passphrase.
257 cert_match = re.findall(_CERT_REGEX, stdout)
258 if len(cert_match) != 1:
259 raise exceptions.ClientCertError("Client SSL certificate is missing or invalid")
260 key_match = re.findall(_KEY_REGEX, stdout)
261 if len(key_match) != 1:
262 raise exceptions.ClientCertError("Client SSL key is missing or invalid")
263 passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout)
264
265 if expect_encrypted_key:
266 if len(passphrase_match) != 1:
267 raise exceptions.ClientCertError("Passphrase is missing or invalid")
268 if b"ENCRYPTED" not in key_match[0]:
269 raise exceptions.ClientCertError("Encrypted private key is expected")
270 return cert_match[0], key_match[0], passphrase_match[0].strip()
271
272 if b"ENCRYPTED" in key_match[0]:
273 raise exceptions.ClientCertError("Encrypted private key is not expected")
274 if len(passphrase_match) > 0:
275 raise exceptions.ClientCertError("Passphrase is not expected")
276 return cert_match[0], key_match[0], None
277
278
279def get_client_ssl_credentials(
280 generate_encrypted_key=False,
281 context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH,
282 certificate_config_path=CERTIFICATE_CONFIGURATION_DEFAULT_PATH,
283):
284 """Returns the client side certificate, private key and passphrase.
285
286 We look for certificates and keys with the following order of priority:
287 1. Certificate and key specified by certificate_config.json.
288 Currently, only X.509 workload certificates are supported.
289 2. Certificate and key specified by context aware metadata (i.e. SecureConnect).
290
291 Args:
292 generate_encrypted_key (bool): If set to True, encrypted private key
293 and passphrase will be generated; otherwise, unencrypted private key
294 will be generated and passphrase will be None. This option only
295 affects keys obtained via context_aware_metadata.json.
296 context_aware_metadata_path (str): The context_aware_metadata.json file path.
297 certificate_config_path (str): The certificate_config.json file path.
298
299 Returns:
300 Tuple[bool, bytes, bytes, bytes]:
301 A boolean indicating if cert, key and passphrase are obtained, the
302 cert bytes and key bytes both in PEM format, and passphrase bytes.
303
304 Raises:
305 google.auth.exceptions.ClientCertError: if problems occurs when getting
306 the cert, key and passphrase.
307 """
308
309 # 1. Check for certificate config json.
310 cert_config_path = _check_config_path(certificate_config_path)
311 if cert_config_path:
312 # Attempt to retrieve X.509 Workload cert and key.
313 cert, key = _get_workload_cert_and_key(cert_config_path)
314 if cert and key:
315 return True, cert, key, None
316
317 # 2. Check for context aware metadata json
318 metadata_path = _check_config_path(context_aware_metadata_path)
319
320 if metadata_path:
321 metadata_json = _load_json_file(metadata_path)
322
323 if _CERT_PROVIDER_COMMAND not in metadata_json:
324 raise exceptions.ClientCertError("Cert provider command is not found")
325
326 command = metadata_json[_CERT_PROVIDER_COMMAND]
327
328 if generate_encrypted_key and "--with_passphrase" not in command:
329 command.append("--with_passphrase")
330
331 # Execute the command.
332 cert, key, passphrase = _run_cert_provider_command(
333 command, expect_encrypted_key=generate_encrypted_key
334 )
335 return True, cert, key, passphrase
336
337 return False, None, None, None
338
339
340def get_client_cert_and_key(client_cert_callback=None):
341 """Returns the client side certificate and private key. The function first
342 tries to get certificate and key from client_cert_callback; if the callback
343 is None or doesn't provide certificate and key, the function tries application
344 default SSL credentials.
345
346 Args:
347 client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An
348 optional callback which returns client certificate bytes and private
349 key bytes both in PEM format.
350
351 Returns:
352 Tuple[bool, bytes, bytes]:
353 A boolean indicating if cert and key are obtained, the cert bytes
354 and key bytes both in PEM format.
355
356 Raises:
357 google.auth.exceptions.ClientCertError: if problems occurs when getting
358 the cert and key.
359 """
360 if client_cert_callback:
361 cert, key = client_cert_callback()
362 return True, cert, key
363
364 has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False)
365 return has_cert, cert, key
366
367
368def decrypt_private_key(key, passphrase):
369 """A helper function to decrypt the private key with the given passphrase.
370 google-auth library doesn't support passphrase protected private key for
371 mutual TLS channel. This helper function can be used to decrypt the
372 passphrase protected private key in order to estalish mutual TLS channel.
373
374 For example, if you have a function which produces client cert, passphrase
375 protected private key and passphrase, you can convert it to a client cert
376 callback function accepted by google-auth::
377
378 from google.auth.transport import _mtls_helper
379
380 def your_client_cert_function():
381 return cert, encrypted_key, passphrase
382
383 # callback accepted by google-auth for mutual TLS channel.
384 def client_cert_callback():
385 cert, encrypted_key, passphrase = your_client_cert_function()
386 decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key,
387 passphrase)
388 return cert, decrypted_key
389
390 Args:
391 key (bytes): The private key bytes in PEM format.
392 passphrase (bytes): The passphrase bytes.
393
394 Returns:
395 bytes: The decrypted private key in PEM format.
396
397 Raises:
398 ImportError: If pyOpenSSL is not installed.
399 OpenSSL.crypto.Error: If there is any problem decrypting the private key.
400 """
401 from OpenSSL import crypto
402
403 # First convert encrypted_key_bytes to PKey object
404 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase)
405
406 # Then dump the decrypted key bytes
407 return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)