1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helper functions for getting mTLS cert and key."""
16
17import json
18import logging
19from os import environ, path
20import re
21import subprocess
22
23from google.auth import exceptions
24
25CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"
26_CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
27_CERTIFICATE_CONFIGURATION_ENV = "GOOGLE_API_CERTIFICATE_CONFIG"
28_CERT_PROVIDER_COMMAND = "cert_provider_command"
29_CERT_REGEX = re.compile(
30 b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL
31)
32
33# support various format of key files, e.g.
34# "-----BEGIN PRIVATE KEY-----...",
35# "-----BEGIN EC PRIVATE KEY-----...",
36# "-----BEGIN RSA PRIVATE KEY-----..."
37# "-----BEGIN ENCRYPTED PRIVATE KEY-----"
38_KEY_REGEX = re.compile(
39 b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?",
40 re.DOTALL,
41)
42
43_LOGGER = logging.getLogger(__name__)
44
45
46_PASSPHRASE_REGEX = re.compile(
47 b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL
48)
49
50
51def _check_dca_metadata_path(metadata_path):
52 """Checks for context aware metadata. If it exists, returns the absolute path;
53 otherwise returns None.
54
55 Args:
56 metadata_path (str): context aware metadata path.
57
58 Returns:
59 str: absolute path if exists and None otherwise.
60 """
61 metadata_path = path.expanduser(metadata_path)
62 if not path.exists(metadata_path):
63 _LOGGER.debug("%s is not found, skip client SSL authentication.", metadata_path)
64 return None
65 return metadata_path
66
67
68def _load_json_file(path):
69 """Reads and loads JSON from the given path. Used to read both X509 workload certificate and
70 secure connect configurations.
71
72 Args:
73 path (str): the path to read from.
74
75 Returns:
76 Dict[str, str]: The JSON stored at the file.
77
78 Raises:
79 google.auth.exceptions.ClientCertError: If failed to parse the file as JSON.
80 """
81 try:
82 with open(path) as f:
83 json_data = json.load(f)
84 except ValueError as caught_exc:
85 new_exc = exceptions.ClientCertError(caught_exc)
86 raise new_exc from caught_exc
87
88 return json_data
89
90
91def _get_workload_cert_and_key(certificate_config_path=None):
92 """Read the workload identity cert and key files specified in the certificate config provided.
93 If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG"
94 first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json".
95
96 Args:
97 certificate_config_path (string): The certificate config path. If no path is provided,
98 the environment variable will be checked first, then the well known gcloud location.
99
100 Returns:
101 Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key
102 bytes in PEM format.
103
104 Raises:
105 google.auth.exceptions.ClientCertError: if problems occurs when retrieving
106 the certificate or key information.
107 """
108
109 cert_path, key_path = _get_workload_cert_and_key_paths(certificate_config_path)
110
111 if cert_path is None and key_path is None:
112 return None, None
113
114 return _read_cert_and_key_files(cert_path, key_path)
115
116
117def _get_cert_config_path(certificate_config_path=None):
118 """Get the certificate configuration path based on the following order:
119
120 1: Explicit override, if set
121 2: Environment variable, if set
122 3: Well-known location
123
124 Returns "None" if the selected config file does not exist.
125
126 Args:
127 certificate_config_path (string): The certificate config path. If provided, the well known
128 location and environment variable will be ignored.
129
130 Returns:
131 The absolute path of the certificate config file, and None if the file does not exist.
132 """
133
134 if certificate_config_path is None:
135 env_path = environ.get(_CERTIFICATE_CONFIGURATION_ENV, None)
136 if env_path is not None and env_path != "":
137 certificate_config_path = env_path
138 else:
139 certificate_config_path = _CERTIFICATE_CONFIGURATION_DEFAULT_PATH
140
141 certificate_config_path = path.expanduser(certificate_config_path)
142 if not path.exists(certificate_config_path):
143 return None
144 return certificate_config_path
145
146
147def _get_workload_cert_and_key_paths(config_path):
148 absolute_path = _get_cert_config_path(config_path)
149 if absolute_path is None:
150 return None, None
151
152 data = _load_json_file(absolute_path)
153
154 if "cert_configs" not in data:
155 raise exceptions.ClientCertError(
156 'Certificate config file {} is in an invalid format, a "cert configs" object is expected'.format(
157 absolute_path
158 )
159 )
160 cert_configs = data["cert_configs"]
161
162 if "workload" not in cert_configs:
163 raise exceptions.ClientCertError(
164 'Certificate config file {} is in an invalid format, a "workload" cert config is expected'.format(
165 absolute_path
166 )
167 )
168 workload = cert_configs["workload"]
169
170 if "cert_path" not in workload:
171 raise exceptions.ClientCertError(
172 'Certificate config file {} is in an invalid format, a "cert_path" is expected in the workload cert config'.format(
173 absolute_path
174 )
175 )
176 cert_path = workload["cert_path"]
177
178 if "key_path" not in workload:
179 raise exceptions.ClientCertError(
180 'Certificate config file {} is in an invalid format, a "key_path" is expected in the workload cert config'.format(
181 absolute_path
182 )
183 )
184 key_path = workload["key_path"]
185
186 return cert_path, key_path
187
188
189def _read_cert_and_key_files(cert_path, key_path):
190 cert_data = _read_cert_file(cert_path)
191 key_data = _read_key_file(key_path)
192
193 return cert_data, key_data
194
195
196def _read_cert_file(cert_path):
197 with open(cert_path, "rb") as cert_file:
198 cert_data = cert_file.read()
199
200 cert_match = re.findall(_CERT_REGEX, cert_data)
201 if len(cert_match) != 1:
202 raise exceptions.ClientCertError(
203 "Certificate file {} is in an invalid format, a single PEM formatted certificate is expected".format(
204 cert_path
205 )
206 )
207 return cert_match[0]
208
209
210def _read_key_file(key_path):
211 with open(key_path, "rb") as key_file:
212 key_data = key_file.read()
213
214 key_match = re.findall(_KEY_REGEX, key_data)
215 if len(key_match) != 1:
216 raise exceptions.ClientCertError(
217 "Private key file {} is in an invalid format, a single PEM formatted private key is expected".format(
218 key_path
219 )
220 )
221
222 return key_match[0]
223
224
225def _run_cert_provider_command(command, expect_encrypted_key=False):
226 """Run the provided command, and return client side mTLS cert, key and
227 passphrase.
228
229 Args:
230 command (List[str]): cert provider command.
231 expect_encrypted_key (bool): If encrypted private key is expected.
232
233 Returns:
234 Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key
235 bytes in PEM format and passphrase bytes.
236
237 Raises:
238 google.auth.exceptions.ClientCertError: if problems occurs when running
239 the cert provider command or generating cert, key and passphrase.
240 """
241 try:
242 process = subprocess.Popen(
243 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
244 )
245 stdout, stderr = process.communicate()
246 except OSError as caught_exc:
247 new_exc = exceptions.ClientCertError(caught_exc)
248 raise new_exc from caught_exc
249
250 # Check cert provider command execution error.
251 if process.returncode != 0:
252 raise exceptions.ClientCertError(
253 "Cert provider command returns non-zero status code %s" % process.returncode
254 )
255
256 # Extract certificate (chain), key and passphrase.
257 cert_match = re.findall(_CERT_REGEX, stdout)
258 if len(cert_match) != 1:
259 raise exceptions.ClientCertError("Client SSL certificate is missing or invalid")
260 key_match = re.findall(_KEY_REGEX, stdout)
261 if len(key_match) != 1:
262 raise exceptions.ClientCertError("Client SSL key is missing or invalid")
263 passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout)
264
265 if expect_encrypted_key:
266 if len(passphrase_match) != 1:
267 raise exceptions.ClientCertError("Passphrase is missing or invalid")
268 if b"ENCRYPTED" not in key_match[0]:
269 raise exceptions.ClientCertError("Encrypted private key is expected")
270 return cert_match[0], key_match[0], passphrase_match[0].strip()
271
272 if b"ENCRYPTED" in key_match[0]:
273 raise exceptions.ClientCertError("Encrypted private key is not expected")
274 if len(passphrase_match) > 0:
275 raise exceptions.ClientCertError("Passphrase is not expected")
276 return cert_match[0], key_match[0], None
277
278
279def get_client_ssl_credentials(
280 generate_encrypted_key=False,
281 context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH,
282):
283 """Returns the client side certificate, private key and passphrase.
284
285 Args:
286 generate_encrypted_key (bool): If set to True, encrypted private key
287 and passphrase will be generated; otherwise, unencrypted private key
288 will be generated and passphrase will be None.
289 context_aware_metadata_path (str): The context_aware_metadata.json file path.
290
291 Returns:
292 Tuple[bool, bytes, bytes, bytes]:
293 A boolean indicating if cert, key and passphrase are obtained, the
294 cert bytes and key bytes both in PEM format, and passphrase bytes.
295
296 Raises:
297 google.auth.exceptions.ClientCertError: if problems occurs when getting
298 the cert, key and passphrase.
299 """
300 metadata_path = _check_dca_metadata_path(context_aware_metadata_path)
301
302 if metadata_path:
303 metadata_json = _load_json_file(metadata_path)
304
305 if _CERT_PROVIDER_COMMAND not in metadata_json:
306 raise exceptions.ClientCertError("Cert provider command is not found")
307
308 command = metadata_json[_CERT_PROVIDER_COMMAND]
309
310 if generate_encrypted_key and "--with_passphrase" not in command:
311 command.append("--with_passphrase")
312
313 # Execute the command.
314 cert, key, passphrase = _run_cert_provider_command(
315 command, expect_encrypted_key=generate_encrypted_key
316 )
317 return True, cert, key, passphrase
318
319 return False, None, None, None
320
321
322def get_client_cert_and_key(client_cert_callback=None):
323 """Returns the client side certificate and private key. The function first
324 tries to get certificate and key from client_cert_callback; if the callback
325 is None or doesn't provide certificate and key, the function tries application
326 default SSL credentials.
327
328 Args:
329 client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An
330 optional callback which returns client certificate bytes and private
331 key bytes both in PEM format.
332
333 Returns:
334 Tuple[bool, bytes, bytes]:
335 A boolean indicating if cert and key are obtained, the cert bytes
336 and key bytes both in PEM format.
337
338 Raises:
339 google.auth.exceptions.ClientCertError: if problems occurs when getting
340 the cert and key.
341 """
342 if client_cert_callback:
343 cert, key = client_cert_callback()
344 return True, cert, key
345
346 has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False)
347 return has_cert, cert, key
348
349
350def decrypt_private_key(key, passphrase):
351 """A helper function to decrypt the private key with the given passphrase.
352 google-auth library doesn't support passphrase protected private key for
353 mutual TLS channel. This helper function can be used to decrypt the
354 passphrase protected private key in order to estalish mutual TLS channel.
355
356 For example, if you have a function which produces client cert, passphrase
357 protected private key and passphrase, you can convert it to a client cert
358 callback function accepted by google-auth::
359
360 from google.auth.transport import _mtls_helper
361
362 def your_client_cert_function():
363 return cert, encrypted_key, passphrase
364
365 # callback accepted by google-auth for mutual TLS channel.
366 def client_cert_callback():
367 cert, encrypted_key, passphrase = your_client_cert_function()
368 decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key,
369 passphrase)
370 return cert, decrypted_key
371
372 Args:
373 key (bytes): The private key bytes in PEM format.
374 passphrase (bytes): The passphrase bytes.
375
376 Returns:
377 bytes: The decrypted private key in PEM format.
378
379 Raises:
380 ImportError: If pyOpenSSL is not installed.
381 OpenSSL.crypto.Error: If there is any problem decrypting the private key.
382 """
383 from OpenSSL import crypto
384
385 # First convert encrypted_key_bytes to PKey object
386 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase)
387
388 # Then dump the decrypted key bytes
389 return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)