1# Copyright 2025 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for Agent Identity credentials."""
16
17import base64
18import hashlib
19import logging
20import os
21import re
22import time
23from urllib.parse import quote, urlparse
24
25from google.auth import environment_vars, exceptions
26
27_LOGGER = logging.getLogger(__name__)
28
29CRYPTOGRAPHY_NOT_FOUND_ERROR = (
30 "The cryptography library is required for certificate-based authentication."
31 "Please install it with `pip install google-auth[cryptography]`."
32)
33
34# SPIFFE trust domain patterns for Agent Identities.
35_AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS = [
36 r"^agents\.global\.org-\d+\.system\.id\.goog$",
37 r"^agents\.global\.proj-\d+\.system\.id\.goog$",
38 r"^agents-nonprod\.global\.org-\d+\.system\.id\.goog$",
39 r"^agents-nonprod\.global\.proj-\d+\.system\.id\.goog$",
40]
41
42_WELL_KNOWN_CERT_PATH = "/var/run/secrets/workload-spiffe-credentials/certificates.pem"
43
44# Constants for polling the certificate file.
45_FAST_POLL_CYCLES = 50
46_FAST_POLL_INTERVAL = 0.1 # 100ms
47_SLOW_POLL_INTERVAL = 0.5 # 500ms
48_TOTAL_TIMEOUT = 30 # seconds
49
50# Calculate the number of slow poll cycles based on the total timeout.
51_SLOW_POLL_CYCLES = int(
52 (_TOTAL_TIMEOUT - (_FAST_POLL_CYCLES * _FAST_POLL_INTERVAL)) / _SLOW_POLL_INTERVAL
53)
54
55_POLLING_INTERVALS = ([_FAST_POLL_INTERVAL] * _FAST_POLL_CYCLES) + (
56 [_SLOW_POLL_INTERVAL] * _SLOW_POLL_CYCLES
57)
58
59
60def _is_certificate_file_ready(path):
61 """Checks if a file exists and is not empty."""
62 return path and os.path.exists(path) and os.path.getsize(path) > 0
63
64
65def get_agent_identity_certificate_path():
66 """Gets the certificate path from the certificate config file.
67
68 The path to the certificate config file is read from the
69 GOOGLE_API_CERTIFICATE_CONFIG environment variable. This function
70 implements a retry mechanism to handle cases where the environment
71 variable is set before the files are available on the filesystem.
72
73 Returns:
74 str: The path to the leaf certificate file.
75
76 Raises:
77 google.auth.exceptions.RefreshError: If the certificate config file
78 or the certificate file cannot be found after retries.
79 """
80 import json
81
82 cert_config_path = os.environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
83
84 # Check if the well-known workload directory is mounted.
85 well_known_dir = os.path.dirname(_WELL_KNOWN_CERT_PATH)
86 has_well_known_dir = os.path.exists(well_known_dir)
87
88 # If we have neither a config path nor a well-known mount directory, exit immediately.
89 if not cert_config_path and not has_well_known_dir:
90 return None
91
92 has_logged_config_warning = False
93 has_logged_cert_warning = False
94
95 for interval in _POLLING_INTERVALS:
96 try:
97 # Path A: Config file is explicitly set
98 if cert_config_path:
99 with open(cert_config_path, "r") as f:
100 cert_config = json.load(f)
101
102 cert_configs = (
103 cert_config.get("cert_configs")
104 if isinstance(cert_config, dict)
105 else None
106 )
107 workload_config = (
108 cert_configs.get("workload")
109 if isinstance(cert_configs, dict)
110 else None
111 )
112
113 if (
114 not isinstance(workload_config, dict)
115 or "cert_path" not in workload_config
116 ):
117 return None
118
119 cert_path = workload_config["cert_path"]
120 if _is_certificate_file_ready(cert_path):
121 return cert_path
122
123 # The config was parsed, but the cert file is not ready yet
124 target_path = cert_path
125
126 # Path B: Config is NOT set, fallback to the well-known path
127 else:
128 if _is_certificate_file_ready(_WELL_KNOWN_CERT_PATH):
129 return _WELL_KNOWN_CERT_PATH
130
131 # The well-known cert file is not ready yet
132 target_path = _WELL_KNOWN_CERT_PATH
133
134 # Log a warning on the first failed attempt to load the certificate file
135 if not has_logged_cert_warning:
136 _LOGGER.warning(
137 "Certificate file not ready at %s. Retrying until startup timeout (up to %s seconds total)...",
138 target_path,
139 _TOTAL_TIMEOUT,
140 )
141 has_logged_cert_warning = True
142
143 except (IOError, ValueError, KeyError) as e:
144 if cert_config_path and os.path.exists(cert_config_path):
145 # If the file exists but has invalid JSON or is unreadable,
146 # we assume it is in its final format and fail-fast by returning None.
147 return None
148
149 if not has_logged_config_warning and cert_config_path:
150 _LOGGER.warning(
151 "Certificate config file not found or incomplete: %s (from %s "
152 "environment variable). Retrying until startup timeout (up to %s seconds total)...",
153 e,
154 environment_vars.GOOGLE_API_CERTIFICATE_CONFIG,
155 _TOTAL_TIMEOUT,
156 )
157 has_logged_config_warning = True
158 pass
159
160 # A sleep is required in two cases:
161 # 1. The config file is not found (the except block).
162 # 2. The config file/well-known path is found, but the certificate is not yet available.
163 # In both cases, we need to poll, so we sleep on every iteration
164 # that doesn't return a certificate.
165 time.sleep(interval)
166
167 raise exceptions.RefreshError(
168 "Certificate config or certificate file not found after multiple retries. "
169 f"Token binding protection is failing. You can turn off this protection by setting "
170 f"{environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES} to false "
171 "to fall back to unbound tokens."
172 )
173
174
175def get_and_parse_agent_identity_certificate():
176 """Gets and parses the agent identity certificate if not opted out.
177
178 Checks if the user has opted out of certificate-bound tokens. If not,
179 it gets the certificate path, reads the file, and parses it.
180
181 Returns:
182 The parsed certificate object if found and not opted out, otherwise None.
183 """
184 # If the user has opted out of cert bound tokens, there is no need to
185 # look up the certificate.
186 is_opted_out = (
187 os.environ.get(
188 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
189 "true",
190 ).lower()
191 == "false"
192 )
193 if is_opted_out:
194 return None
195
196 cert_path = get_agent_identity_certificate_path()
197 if not cert_path:
198 return None
199
200 with open(cert_path, "rb") as cert_file:
201 cert_bytes = cert_file.read()
202
203 return parse_certificate(cert_bytes)
204
205
206def parse_certificate(cert_bytes):
207 """Parses a PEM-encoded certificate.
208
209 Args:
210 cert_bytes (bytes): The PEM-encoded certificate bytes.
211
212 Returns:
213 cryptography.x509.Certificate: The parsed certificate object.
214 """
215 try:
216 from cryptography import x509
217
218 return x509.load_pem_x509_certificate(cert_bytes)
219 except ImportError as e:
220 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
221
222
223def _is_agent_identity_certificate(cert):
224 """Checks if a certificate is an Agent Identity certificate.
225
226 This is determined by checking the Subject Alternative Name (SAN) for a
227 SPIFFE ID with a trust domain matching Agent Identity patterns.
228
229 Args:
230 cert (cryptography.x509.Certificate): The parsed certificate object.
231
232 Returns:
233 bool: True if the certificate is an Agent Identity certificate,
234 False otherwise.
235 """
236 try:
237 from cryptography import x509
238 from cryptography.x509.oid import ExtensionOID
239
240 try:
241 ext = cert.extensions.get_extension_for_oid(
242 ExtensionOID.SUBJECT_ALTERNATIVE_NAME
243 )
244 except x509.ExtensionNotFound:
245 return False
246 uris = ext.value.get_values_for_type(x509.UniformResourceIdentifier)
247
248 for uri in uris:
249 parsed_uri = urlparse(uri)
250 if parsed_uri.scheme == "spiffe":
251 trust_domain = parsed_uri.netloc
252 for pattern in _AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS:
253 if re.match(pattern, trust_domain):
254 return True
255 return False
256 except ImportError as e:
257 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
258
259
260def calculate_certificate_fingerprint(cert):
261 """Calculates the URL-encoded, unpadded, base64-encoded SHA256 hash of a
262 DER-encoded certificate.
263
264 Args:
265 cert (cryptography.x509.Certificate): The parsed certificate object.
266
267 Returns:
268 str: The URL-encoded, unpadded, base64-encoded SHA256 fingerprint.
269 """
270 try:
271 from cryptography.hazmat.primitives import serialization
272
273 der_cert = cert.public_bytes(serialization.Encoding.DER)
274 fingerprint = hashlib.sha256(der_cert).digest()
275 # The certificate fingerprint is generated in two steps to align with GFE's
276 # expectations and ensure proper URL transmission:
277 # 1. Standard base64 encoding is applied, and padding ('=') is removed.
278 # 2. The resulting string is then URL-encoded to handle special characters
279 # ('+', '/') that would otherwise be misinterpreted in URL parameters.
280 base64_fingerprint = base64.b64encode(fingerprint).decode("utf-8")
281 unpadded_base64_fingerprint = base64_fingerprint.rstrip("=")
282 return quote(unpadded_base64_fingerprint)
283 except ImportError as e:
284 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
285
286
287def should_request_bound_token(cert):
288 """Determines if a bound token should be requested.
289
290 This is based on the GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES
291 environment variable and whether the certificate is an agent identity cert.
292
293 Args:
294 cert (cryptography.x509.Certificate): The parsed certificate object.
295
296 Returns:
297 bool: True if a bound token should be requested, False otherwise.
298 """
299 is_agent_cert = _is_agent_identity_certificate(cert)
300 is_opted_in = (
301 os.environ.get(
302 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
303 "true",
304 ).lower()
305 == "true"
306 )
307 return is_agent_cert and is_opted_in
308
309
310def get_cached_cert_fingerprint(cached_cert):
311 """Returns the fingerprint of the cached certificate."""
312 if cached_cert:
313 cert_obj = parse_certificate(cached_cert)
314 cached_cert_fingerprint = calculate_certificate_fingerprint(cert_obj)
315 else:
316 raise ValueError("mTLS connection is not configured.")
317 return cached_cert_fingerprint