1# Copyright 2025 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for Agent Identity credentials."""
16
17import base64
18import hashlib
19import logging
20import os
21import re
22import time
23from urllib.parse import quote, urlparse
24
25from google.auth import environment_vars
26from google.auth import exceptions
27
28
29_LOGGER = logging.getLogger(__name__)
30
31CRYPTOGRAPHY_NOT_FOUND_ERROR = (
32 "The cryptography library is required for certificate-based authentication."
33 "Please install it with `pip install google-auth[cryptography]`."
34)
35
36# SPIFFE trust domain patterns for Agent Identities.
37_AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS = [
38 r"^agents\.global\.org-\d+\.system\.id\.goog$",
39 r"^agents\.global\.proj-\d+\.system\.id\.goog$",
40]
41
42_WELL_KNOWN_CERT_PATH = "/var/run/secrets/workload-spiffe-credentials/certificates.pem"
43
44# Constants for polling the certificate file.
45_FAST_POLL_CYCLES = 50
46_FAST_POLL_INTERVAL = 0.1 # 100ms
47_SLOW_POLL_INTERVAL = 0.5 # 500ms
48_TOTAL_TIMEOUT = 30 # seconds
49
50# Calculate the number of slow poll cycles based on the total timeout.
51_SLOW_POLL_CYCLES = int(
52 (_TOTAL_TIMEOUT - (_FAST_POLL_CYCLES * _FAST_POLL_INTERVAL)) / _SLOW_POLL_INTERVAL
53)
54
55_POLLING_INTERVALS = ([_FAST_POLL_INTERVAL] * _FAST_POLL_CYCLES) + (
56 [_SLOW_POLL_INTERVAL] * _SLOW_POLL_CYCLES
57)
58
59
60def _is_certificate_file_ready(path):
61 """Checks if a file exists and is not empty."""
62 return path and os.path.exists(path) and os.path.getsize(path) > 0
63
64
65def get_agent_identity_certificate_path():
66 """Gets the certificate path from the certificate config file.
67
68 The path to the certificate config file is read from the
69 GOOGLE_API_CERTIFICATE_CONFIG environment variable. This function
70 implements a retry mechanism to handle cases where the environment
71 variable is set before the files are available on the filesystem.
72
73 Returns:
74 str: The path to the leaf certificate file.
75
76 Raises:
77 google.auth.exceptions.RefreshError: If the certificate config file
78 or the certificate file cannot be found after retries.
79 """
80 import json
81
82 cert_config_path = os.environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
83 if not cert_config_path:
84 return None
85
86 has_logged_warning = False
87
88 for interval in _POLLING_INTERVALS:
89 try:
90 with open(cert_config_path, "r") as f:
91 cert_config = json.load(f)
92 cert_path = (
93 cert_config.get("cert_configs", {})
94 .get("workload", {})
95 .get("cert_path")
96 )
97 if _is_certificate_file_ready(cert_path):
98 return cert_path
99 except (IOError, ValueError, KeyError):
100 if not has_logged_warning:
101 _LOGGER.warning(
102 "Certificate config file not found at %s (from %s environment "
103 "variable). Retrying for up to %s seconds.",
104 cert_config_path,
105 environment_vars.GOOGLE_API_CERTIFICATE_CONFIG,
106 _TOTAL_TIMEOUT,
107 )
108 has_logged_warning = True
109 pass
110
111 # As a fallback, check the well-known certificate path.
112 if _is_certificate_file_ready(_WELL_KNOWN_CERT_PATH):
113 return _WELL_KNOWN_CERT_PATH
114
115 # A sleep is required in two cases:
116 # 1. The config file is not found (the except block).
117 # 2. The config file is found, but the certificate is not yet available.
118 # In both cases, we need to poll, so we sleep on every iteration
119 # that doesn't return a certificate.
120 time.sleep(interval)
121
122 raise exceptions.RefreshError(
123 "Certificate config or certificate file not found after multiple retries. "
124 f"Token binding protection is failing. You can turn off this protection by setting "
125 f"{environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES} to false "
126 "to fall back to unbound tokens."
127 )
128
129
130def get_and_parse_agent_identity_certificate():
131 """Gets and parses the agent identity certificate if not opted out.
132
133 Checks if the user has opted out of certificate-bound tokens. If not,
134 it gets the certificate path, reads the file, and parses it.
135
136 Returns:
137 The parsed certificate object if found and not opted out, otherwise None.
138 """
139 # If the user has opted out of cert bound tokens, there is no need to
140 # look up the certificate.
141 is_opted_out = (
142 os.environ.get(
143 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
144 "true",
145 ).lower()
146 == "false"
147 )
148 if is_opted_out:
149 return None
150
151 cert_path = get_agent_identity_certificate_path()
152 if not cert_path:
153 return None
154
155 with open(cert_path, "rb") as cert_file:
156 cert_bytes = cert_file.read()
157
158 return parse_certificate(cert_bytes)
159
160
161def parse_certificate(cert_bytes):
162 """Parses a PEM-encoded certificate.
163
164 Args:
165 cert_bytes (bytes): The PEM-encoded certificate bytes.
166
167 Returns:
168 cryptography.x509.Certificate: The parsed certificate object.
169 """
170 try:
171 from cryptography import x509
172
173 return x509.load_pem_x509_certificate(cert_bytes)
174 except ImportError as e:
175 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
176
177
178def _is_agent_identity_certificate(cert):
179 """Checks if a certificate is an Agent Identity certificate.
180
181 This is determined by checking the Subject Alternative Name (SAN) for a
182 SPIFFE ID with a trust domain matching Agent Identity patterns.
183
184 Args:
185 cert (cryptography.x509.Certificate): The parsed certificate object.
186
187 Returns:
188 bool: True if the certificate is an Agent Identity certificate,
189 False otherwise.
190 """
191 try:
192 from cryptography import x509
193 from cryptography.x509.oid import ExtensionOID
194
195 try:
196 ext = cert.extensions.get_extension_for_oid(
197 ExtensionOID.SUBJECT_ALTERNATIVE_NAME
198 )
199 except x509.ExtensionNotFound:
200 return False
201 uris = ext.value.get_values_for_type(x509.UniformResourceIdentifier)
202
203 for uri in uris:
204 parsed_uri = urlparse(uri)
205 if parsed_uri.scheme == "spiffe":
206 trust_domain = parsed_uri.netloc
207 for pattern in _AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS:
208 if re.match(pattern, trust_domain):
209 return True
210 return False
211 except ImportError as e:
212 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
213
214
215def calculate_certificate_fingerprint(cert):
216 """Calculates the URL-encoded, unpadded, base64-encoded SHA256 hash of a
217 DER-encoded certificate.
218
219 Args:
220 cert (cryptography.x509.Certificate): The parsed certificate object.
221
222 Returns:
223 str: The URL-encoded, unpadded, base64-encoded SHA256 fingerprint.
224 """
225 try:
226 from cryptography.hazmat.primitives import serialization
227
228 der_cert = cert.public_bytes(serialization.Encoding.DER)
229 fingerprint = hashlib.sha256(der_cert).digest()
230 # The certificate fingerprint is generated in two steps to align with GFE's
231 # expectations and ensure proper URL transmission:
232 # 1. Standard base64 encoding is applied, and padding ('=') is removed.
233 # 2. The resulting string is then URL-encoded to handle special characters
234 # ('+', '/') that would otherwise be misinterpreted in URL parameters.
235 base64_fingerprint = base64.b64encode(fingerprint).decode("utf-8")
236 unpadded_base64_fingerprint = base64_fingerprint.rstrip("=")
237 return quote(unpadded_base64_fingerprint)
238 except ImportError as e:
239 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
240
241
242def should_request_bound_token(cert):
243 """Determines if a bound token should be requested.
244
245 This is based on the GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES
246 environment variable and whether the certificate is an agent identity cert.
247
248 Args:
249 cert (cryptography.x509.Certificate): The parsed certificate object.
250
251 Returns:
252 bool: True if a bound token should be requested, False otherwise.
253 """
254 is_agent_cert = _is_agent_identity_certificate(cert)
255 is_opted_in = (
256 os.environ.get(
257 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
258 "true",
259 ).lower()
260 == "true"
261 )
262 return is_agent_cert and is_opted_in
263
264
265def get_cached_cert_fingerprint(cached_cert):
266 """Returns the fingerprint of the cached certificate."""
267 if cached_cert:
268 cert_obj = parse_certificate(cached_cert)
269 cached_cert_fingerprint = calculate_certificate_fingerprint(cert_obj)
270 else:
271 raise ValueError("mTLS connection is not configured.")
272 return cached_cert_fingerprint