Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/transport/_mtls_helper.py: 26%
77 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helper functions for getting mTLS cert and key."""
17import json
18import logging
19from os import path
20import re
21import subprocess
23import six
25from google.auth import exceptions
27CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"
28_CERT_PROVIDER_COMMAND = "cert_provider_command"
29_CERT_REGEX = re.compile(
30 b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL
31)
33# support various format of key files, e.g.
34# "-----BEGIN PRIVATE KEY-----...",
35# "-----BEGIN EC PRIVATE KEY-----...",
36# "-----BEGIN RSA PRIVATE KEY-----..."
37# "-----BEGIN ENCRYPTED PRIVATE KEY-----"
38_KEY_REGEX = re.compile(
39 b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?",
40 re.DOTALL,
41)
43_LOGGER = logging.getLogger(__name__)
46_PASSPHRASE_REGEX = re.compile(
47 b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL
48)
51def _check_dca_metadata_path(metadata_path):
52 """Checks for context aware metadata. If it exists, returns the absolute path;
53 otherwise returns None.
55 Args:
56 metadata_path (str): context aware metadata path.
58 Returns:
59 str: absolute path if exists and None otherwise.
60 """
61 metadata_path = path.expanduser(metadata_path)
62 if not path.exists(metadata_path):
63 _LOGGER.debug("%s is not found, skip client SSL authentication.", metadata_path)
64 return None
65 return metadata_path
68def _read_dca_metadata_file(metadata_path):
69 """Loads context aware metadata from the given path.
71 Args:
72 metadata_path (str): context aware metadata path.
74 Returns:
75 Dict[str, str]: The metadata.
77 Raises:
78 google.auth.exceptions.ClientCertError: If failed to parse metadata as JSON.
79 """
80 try:
81 with open(metadata_path) as f:
82 metadata = json.load(f)
83 except ValueError as caught_exc:
84 new_exc = exceptions.ClientCertError(caught_exc)
85 six.raise_from(new_exc, caught_exc)
87 return metadata
90def _run_cert_provider_command(command, expect_encrypted_key=False):
91 """Run the provided command, and return client side mTLS cert, key and
92 passphrase.
94 Args:
95 command (List[str]): cert provider command.
96 expect_encrypted_key (bool): If encrypted private key is expected.
98 Returns:
99 Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key
100 bytes in PEM format and passphrase bytes.
102 Raises:
103 google.auth.exceptions.ClientCertError: if problems occurs when running
104 the cert provider command or generating cert, key and passphrase.
105 """
106 try:
107 process = subprocess.Popen(
108 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
109 )
110 stdout, stderr = process.communicate()
111 except OSError as caught_exc:
112 new_exc = exceptions.ClientCertError(caught_exc)
113 six.raise_from(new_exc, caught_exc)
115 # Check cert provider command execution error.
116 if process.returncode != 0:
117 raise exceptions.ClientCertError(
118 "Cert provider command returns non-zero status code %s" % process.returncode
119 )
121 # Extract certificate (chain), key and passphrase.
122 cert_match = re.findall(_CERT_REGEX, stdout)
123 if len(cert_match) != 1:
124 raise exceptions.ClientCertError("Client SSL certificate is missing or invalid")
125 key_match = re.findall(_KEY_REGEX, stdout)
126 if len(key_match) != 1:
127 raise exceptions.ClientCertError("Client SSL key is missing or invalid")
128 passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout)
130 if expect_encrypted_key:
131 if len(passphrase_match) != 1:
132 raise exceptions.ClientCertError("Passphrase is missing or invalid")
133 if b"ENCRYPTED" not in key_match[0]:
134 raise exceptions.ClientCertError("Encrypted private key is expected")
135 return cert_match[0], key_match[0], passphrase_match[0].strip()
137 if b"ENCRYPTED" in key_match[0]:
138 raise exceptions.ClientCertError("Encrypted private key is not expected")
139 if len(passphrase_match) > 0:
140 raise exceptions.ClientCertError("Passphrase is not expected")
141 return cert_match[0], key_match[0], None
144def get_client_ssl_credentials(
145 generate_encrypted_key=False,
146 context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH,
147):
148 """Returns the client side certificate, private key and passphrase.
150 Args:
151 generate_encrypted_key (bool): If set to True, encrypted private key
152 and passphrase will be generated; otherwise, unencrypted private key
153 will be generated and passphrase will be None.
154 context_aware_metadata_path (str): The context_aware_metadata.json file path.
156 Returns:
157 Tuple[bool, bytes, bytes, bytes]:
158 A boolean indicating if cert, key and passphrase are obtained, the
159 cert bytes and key bytes both in PEM format, and passphrase bytes.
161 Raises:
162 google.auth.exceptions.ClientCertError: if problems occurs when getting
163 the cert, key and passphrase.
164 """
165 metadata_path = _check_dca_metadata_path(context_aware_metadata_path)
167 if metadata_path:
168 metadata_json = _read_dca_metadata_file(metadata_path)
170 if _CERT_PROVIDER_COMMAND not in metadata_json:
171 raise exceptions.ClientCertError("Cert provider command is not found")
173 command = metadata_json[_CERT_PROVIDER_COMMAND]
175 if generate_encrypted_key and "--with_passphrase" not in command:
176 command.append("--with_passphrase")
178 # Execute the command.
179 cert, key, passphrase = _run_cert_provider_command(
180 command, expect_encrypted_key=generate_encrypted_key
181 )
182 return True, cert, key, passphrase
184 return False, None, None, None
187def get_client_cert_and_key(client_cert_callback=None):
188 """Returns the client side certificate and private key. The function first
189 tries to get certificate and key from client_cert_callback; if the callback
190 is None or doesn't provide certificate and key, the function tries application
191 default SSL credentials.
193 Args:
194 client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An
195 optional callback which returns client certificate bytes and private
196 key bytes both in PEM format.
198 Returns:
199 Tuple[bool, bytes, bytes]:
200 A boolean indicating if cert and key are obtained, the cert bytes
201 and key bytes both in PEM format.
203 Raises:
204 google.auth.exceptions.ClientCertError: if problems occurs when getting
205 the cert and key.
206 """
207 if client_cert_callback:
208 cert, key = client_cert_callback()
209 return True, cert, key
211 has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False)
212 return has_cert, cert, key
215def decrypt_private_key(key, passphrase):
216 """A helper function to decrypt the private key with the given passphrase.
217 google-auth library doesn't support passphrase protected private key for
218 mutual TLS channel. This helper function can be used to decrypt the
219 passphrase protected private key in order to estalish mutual TLS channel.
221 For example, if you have a function which produces client cert, passphrase
222 protected private key and passphrase, you can convert it to a client cert
223 callback function accepted by google-auth::
225 from google.auth.transport import _mtls_helper
227 def your_client_cert_function():
228 return cert, encrypted_key, passphrase
230 # callback accepted by google-auth for mutual TLS channel.
231 def client_cert_callback():
232 cert, encrypted_key, passphrase = your_client_cert_function()
233 decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key,
234 passphrase)
235 return cert, decrypted_key
237 Args:
238 key (bytes): The private key bytes in PEM format.
239 passphrase (bytes): The passphrase bytes.
241 Returns:
242 bytes: The decrypted private key in PEM format.
244 Raises:
245 ImportError: If pyOpenSSL is not installed.
246 OpenSSL.crypto.Error: If there is any problem decrypting the private key.
247 """
248 from OpenSSL import crypto
250 # First convert encrypted_key_bytes to PKey object
251 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase)
253 # Then dump the decrypted key bytes
254 return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)