1# Copyright 2025 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for Agent Identity credentials."""
16
17import base64
18import hashlib
19import logging
20import os
21import re
22import time
23from urllib.parse import quote, urlparse
24
25from google.auth import environment_vars, exceptions
26
27_LOGGER = logging.getLogger(__name__)
28
29CRYPTOGRAPHY_NOT_FOUND_ERROR = (
30 "The cryptography library is required for certificate-based authentication."
31 "Please install it with `pip install google-auth[cryptography]`."
32)
33
34# SPIFFE trust domain patterns for Agent Identities.
35_AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS = [
36 r"^agents\.global\.org-\d+\.system\.id\.goog$",
37 r"^agents\.global\.proj-\d+\.system\.id\.goog$",
38 r"^agents-nonprod\.global\.org-\d+\.system\.id\.goog$",
39 r"^agents-nonprod\.global\.proj-\d+\.system\.id\.goog$",
40]
41
42_WELL_KNOWN_CERT_PATH = "/var/run/secrets/workload-spiffe-credentials/certificates.pem"
43
44# Constants for polling the certificate file.
45_FAST_POLL_CYCLES = 50
46_FAST_POLL_INTERVAL = 0.1 # 100ms
47_SLOW_POLL_INTERVAL = 0.5 # 500ms
48_TOTAL_TIMEOUT = 30 # seconds
49
50# Calculate the number of slow poll cycles based on the total timeout.
51_SLOW_POLL_CYCLES = int(
52 (_TOTAL_TIMEOUT - (_FAST_POLL_CYCLES * _FAST_POLL_INTERVAL)) / _SLOW_POLL_INTERVAL
53)
54
55_POLLING_INTERVALS = ([_FAST_POLL_INTERVAL] * _FAST_POLL_CYCLES) + (
56 [_SLOW_POLL_INTERVAL] * _SLOW_POLL_CYCLES
57)
58
59
60def _is_certificate_file_ready(path):
61 """Checks if a file exists and is not empty."""
62 return path and os.path.exists(path) and os.path.getsize(path) > 0
63
64
65def get_agent_identity_certificate_path():
66 """Gets the certificate path from the certificate config file.
67
68 The path to the certificate config file is read from the
69 GOOGLE_API_CERTIFICATE_CONFIG environment variable. This function
70 implements a retry mechanism to handle cases where the environment
71 variable is set before the files are available on the filesystem.
72
73 Returns:
74 str: The path to the leaf certificate file.
75
76 Raises:
77 google.auth.exceptions.RefreshError: If the certificate config file
78 or the certificate file cannot be found after retries.
79 """
80 import json
81
82 cert_config_path = os.environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
83
84 # Check if the well-known workload directory is mounted.
85 well_known_dir = os.path.dirname(_WELL_KNOWN_CERT_PATH)
86 has_well_known_dir = os.path.exists(well_known_dir)
87
88 # If we have neither a config path nor a well-known mount directory, exit immediately.
89 if not cert_config_path and not has_well_known_dir:
90 return None
91
92 # If ECP config path is specified but does not exist, and we are on a workstation, fail-fast immediately.
93 if (
94 cert_config_path
95 and not has_well_known_dir
96 and not os.path.exists(cert_config_path)
97 ):
98 return None
99
100 has_logged_config_warning = False
101 has_logged_cert_warning = False
102
103 for interval in _POLLING_INTERVALS:
104 try:
105 # Path A: Config file is explicitly set
106 if cert_config_path:
107 with open(cert_config_path, "r") as f:
108 cert_config = json.load(f)
109
110 cert_configs = (
111 cert_config.get("cert_configs")
112 if isinstance(cert_config, dict)
113 else None
114 )
115 workload_config = (
116 cert_configs.get("workload")
117 if isinstance(cert_configs, dict)
118 else None
119 )
120
121 if (
122 not isinstance(workload_config, dict)
123 or "cert_path" not in workload_config
124 ):
125 return None
126
127 cert_path = workload_config["cert_path"]
128 if _is_certificate_file_ready(cert_path):
129 return cert_path
130
131 # The config was parsed, but the cert file is not ready yet
132 target_path = cert_path
133
134 # Path B: Config is NOT set, fallback to the well-known path
135 else:
136 if _is_certificate_file_ready(_WELL_KNOWN_CERT_PATH):
137 return _WELL_KNOWN_CERT_PATH
138
139 # The well-known cert file is not ready yet
140 target_path = _WELL_KNOWN_CERT_PATH
141
142 # Log a warning on the first failed attempt to load the certificate file
143 if not has_logged_cert_warning:
144 _LOGGER.warning(
145 "Certificate file not ready at %s. Retrying until startup timeout (up to %s seconds total)...",
146 target_path,
147 _TOTAL_TIMEOUT,
148 )
149 has_logged_cert_warning = True
150
151 except (IOError, ValueError, KeyError) as e:
152 if cert_config_path and os.path.exists(cert_config_path):
153 # If the file exists but has invalid JSON or is unreadable,
154 # we assume it is in its final format and fail-fast by returning None.
155 return None
156
157 if not has_logged_config_warning and cert_config_path:
158 _LOGGER.warning(
159 "Certificate config file not found or incomplete: %s (from %s "
160 "environment variable). Retrying until startup timeout (up to %s seconds total)...",
161 e,
162 environment_vars.GOOGLE_API_CERTIFICATE_CONFIG,
163 _TOTAL_TIMEOUT,
164 )
165 has_logged_config_warning = True
166 pass
167
168 # A sleep is required in two cases:
169 # 1. The config file is not found (the except block).
170 # 2. The config file/well-known path is found, but the certificate is not yet available.
171 # In both cases, we need to poll, so we sleep on every iteration
172 # that doesn't return a certificate.
173 time.sleep(interval)
174
175 raise exceptions.RefreshError(
176 "Certificate config or certificate file not found after multiple retries. "
177 f"Token binding protection is failing. You can turn off this protection by setting "
178 f"{environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES} to false "
179 "to fall back to unbound tokens."
180 )
181
182
183def get_and_parse_agent_identity_certificate():
184 """Gets and parses the agent identity certificate if not opted out.
185
186 Checks if the user has opted out of certificate-bound tokens. If not,
187 it gets the certificate path, reads the file, and parses it.
188
189 Returns:
190 The parsed certificate object if found and not opted out, otherwise None.
191 """
192 # If the user has opted out of cert bound tokens, there is no need to
193 # look up the certificate.
194 is_opted_out = (
195 os.environ.get(
196 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
197 "true",
198 ).lower()
199 == "false"
200 )
201 if is_opted_out:
202 return None
203
204 cert_path = get_agent_identity_certificate_path()
205 if not cert_path:
206 return None
207
208 with open(cert_path, "rb") as cert_file:
209 cert_bytes = cert_file.read()
210
211 return parse_certificate(cert_bytes)
212
213
214def parse_certificate(cert_bytes):
215 """Parses a PEM-encoded certificate.
216
217 Args:
218 cert_bytes (bytes): The PEM-encoded certificate bytes.
219
220 Returns:
221 cryptography.x509.Certificate: The parsed certificate object.
222 """
223 try:
224 from cryptography import x509
225
226 return x509.load_pem_x509_certificate(cert_bytes)
227 except ImportError as e:
228 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
229
230
231def _is_agent_identity_certificate(cert):
232 """Checks if a certificate is an Agent Identity certificate.
233
234 This is determined by checking the Subject Alternative Name (SAN) for a
235 SPIFFE ID with a trust domain matching Agent Identity patterns.
236
237 Args:
238 cert (cryptography.x509.Certificate): The parsed certificate object.
239
240 Returns:
241 bool: True if the certificate is an Agent Identity certificate,
242 False otherwise.
243 """
244 try:
245 from cryptography import x509
246 from cryptography.x509.oid import ExtensionOID
247
248 try:
249 ext = cert.extensions.get_extension_for_oid(
250 ExtensionOID.SUBJECT_ALTERNATIVE_NAME
251 )
252 except x509.ExtensionNotFound:
253 return False
254 uris = ext.value.get_values_for_type(x509.UniformResourceIdentifier)
255
256 for uri in uris:
257 parsed_uri = urlparse(uri)
258 if parsed_uri.scheme == "spiffe":
259 trust_domain = parsed_uri.netloc
260 for pattern in _AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS:
261 if re.match(pattern, trust_domain):
262 return True
263 return False
264 except ImportError as e:
265 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
266
267
268def calculate_certificate_fingerprint(cert):
269 """Calculates the URL-encoded, unpadded, base64-encoded SHA256 hash of a
270 DER-encoded certificate.
271
272 Args:
273 cert (cryptography.x509.Certificate): The parsed certificate object.
274
275 Returns:
276 str: The URL-encoded, unpadded, base64-encoded SHA256 fingerprint.
277 """
278 try:
279 from cryptography.hazmat.primitives import serialization
280
281 der_cert = cert.public_bytes(serialization.Encoding.DER)
282 fingerprint = hashlib.sha256(der_cert).digest()
283 # The certificate fingerprint is generated in two steps to align with GFE's
284 # expectations and ensure proper URL transmission:
285 # 1. Standard base64 encoding is applied, and padding ('=') is removed.
286 # 2. The resulting string is then URL-encoded to handle special characters
287 # ('+', '/') that would otherwise be misinterpreted in URL parameters.
288 base64_fingerprint = base64.b64encode(fingerprint).decode("utf-8")
289 unpadded_base64_fingerprint = base64_fingerprint.rstrip("=")
290 return quote(unpadded_base64_fingerprint)
291 except ImportError as e:
292 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
293
294
295def should_request_bound_token(cert):
296 """Determines if a bound token should be requested.
297
298 This is based on the GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES
299 environment variable and whether the certificate is an agent identity cert.
300
301 Args:
302 cert (cryptography.x509.Certificate): The parsed certificate object.
303
304 Returns:
305 bool: True if a bound token should be requested, False otherwise.
306 """
307 is_agent_cert = _is_agent_identity_certificate(cert)
308 is_opted_in = (
309 os.environ.get(
310 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
311 "true",
312 ).lower()
313 == "true"
314 )
315 return is_agent_cert and is_opted_in
316
317
318def get_cached_cert_fingerprint(cached_cert):
319 """Returns the fingerprint of the cached certificate."""
320 if cached_cert:
321 cert_obj = parse_certificate(cached_cert)
322 cached_cert_fingerprint = calculate_certificate_fingerprint(cert_obj)
323 else:
324 raise ValueError("mTLS connection is not configured.")
325 return cached_cert_fingerprint