1# Copyright 2025 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for Agent Identity credentials."""
16
17import base64
18import hashlib
19import logging
20import os
21import re
22import time
23from urllib.parse import quote, urlparse
24
25from google.auth import environment_vars
26from google.auth import exceptions
27from google.auth.transport import _mtls_helper
28
29
30_LOGGER = logging.getLogger(__name__)
31
32CRYPTOGRAPHY_NOT_FOUND_ERROR = (
33 "The cryptography library is required for certificate-based authentication."
34 "Please install it with `pip install google-auth[cryptography]`."
35)
36
37# SPIFFE trust domain patterns for Agent Identities.
38_AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS = [
39 r"^agents\.global\.org-\d+\.system\.id\.goog$",
40 r"^agents\.global\.proj-\d+\.system\.id\.goog$",
41]
42
43_WELL_KNOWN_CERT_PATH = "/var/run/secrets/workload-spiffe-credentials/certificates.pem"
44
45# Constants for polling the certificate file.
46_FAST_POLL_CYCLES = 50
47_FAST_POLL_INTERVAL = 0.1 # 100ms
48_SLOW_POLL_INTERVAL = 0.5 # 500ms
49_TOTAL_TIMEOUT = 30 # seconds
50
51# Calculate the number of slow poll cycles based on the total timeout.
52_SLOW_POLL_CYCLES = int(
53 (_TOTAL_TIMEOUT - (_FAST_POLL_CYCLES * _FAST_POLL_INTERVAL)) / _SLOW_POLL_INTERVAL
54)
55
56_POLLING_INTERVALS = ([_FAST_POLL_INTERVAL] * _FAST_POLL_CYCLES) + (
57 [_SLOW_POLL_INTERVAL] * _SLOW_POLL_CYCLES
58)
59
60
61def _is_certificate_file_ready(path):
62 """Checks if a file exists and is not empty."""
63 return path and os.path.exists(path) and os.path.getsize(path) > 0
64
65
66def get_agent_identity_certificate_path():
67 """Gets the certificate path from the certificate config file.
68
69 The path to the certificate config file is read from the
70 GOOGLE_API_CERTIFICATE_CONFIG environment variable. This function
71 implements a retry mechanism to handle cases where the environment
72 variable is set before the files are available on the filesystem.
73
74 Returns:
75 str: The path to the leaf certificate file.
76
77 Raises:
78 google.auth.exceptions.RefreshError: If the certificate config file
79 or the certificate file cannot be found after retries.
80 """
81 import json
82
83 cert_config_path = os.environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
84 if not cert_config_path:
85 return None
86
87 has_logged_warning = False
88
89 for interval in _POLLING_INTERVALS:
90 try:
91 with open(cert_config_path, "r") as f:
92 cert_config = json.load(f)
93 cert_path = (
94 cert_config.get("cert_configs", {})
95 .get("workload", {})
96 .get("cert_path")
97 )
98 if _is_certificate_file_ready(cert_path):
99 return cert_path
100 except (IOError, ValueError, KeyError):
101 if not has_logged_warning:
102 _LOGGER.warning(
103 "Certificate config file not found at %s (from %s environment "
104 "variable). Retrying for up to %s seconds.",
105 cert_config_path,
106 environment_vars.GOOGLE_API_CERTIFICATE_CONFIG,
107 _TOTAL_TIMEOUT,
108 )
109 has_logged_warning = True
110 pass
111
112 # As a fallback, check the well-known certificate path.
113 if _is_certificate_file_ready(_WELL_KNOWN_CERT_PATH):
114 return _WELL_KNOWN_CERT_PATH
115
116 # A sleep is required in two cases:
117 # 1. The config file is not found (the except block).
118 # 2. The config file is found, but the certificate is not yet available.
119 # In both cases, we need to poll, so we sleep on every iteration
120 # that doesn't return a certificate.
121 time.sleep(interval)
122
123 raise exceptions.RefreshError(
124 "Certificate config or certificate file not found after multiple retries. "
125 f"Token binding protection is failing. You can turn off this protection by setting "
126 f"{environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES} to false "
127 "to fall back to unbound tokens."
128 )
129
130
131def get_and_parse_agent_identity_certificate():
132 """Gets and parses the agent identity certificate if not opted out.
133
134 Checks if the user has opted out of certificate-bound tokens. If not,
135 it gets the certificate path, reads the file, and parses it.
136
137 Returns:
138 The parsed certificate object if found and not opted out, otherwise None.
139 """
140 # If the user has opted out of cert bound tokens, there is no need to
141 # look up the certificate.
142 is_opted_out = (
143 os.environ.get(
144 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
145 "true",
146 ).lower()
147 == "false"
148 )
149 if is_opted_out:
150 return None
151
152 cert_path = get_agent_identity_certificate_path()
153 if not cert_path:
154 return None
155
156 with open(cert_path, "rb") as cert_file:
157 cert_bytes = cert_file.read()
158
159 return parse_certificate(cert_bytes)
160
161
162def parse_certificate(cert_bytes):
163 """Parses a PEM-encoded certificate.
164
165 Args:
166 cert_bytes (bytes): The PEM-encoded certificate bytes.
167
168 Returns:
169 cryptography.x509.Certificate: The parsed certificate object.
170 """
171 try:
172 from cryptography import x509
173
174 return x509.load_pem_x509_certificate(cert_bytes)
175 except ImportError as e:
176 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
177
178
179def _is_agent_identity_certificate(cert):
180 """Checks if a certificate is an Agent Identity certificate.
181
182 This is determined by checking the Subject Alternative Name (SAN) for a
183 SPIFFE ID with a trust domain matching Agent Identity patterns.
184
185 Args:
186 cert (cryptography.x509.Certificate): The parsed certificate object.
187
188 Returns:
189 bool: True if the certificate is an Agent Identity certificate,
190 False otherwise.
191 """
192 try:
193 from cryptography import x509
194 from cryptography.x509.oid import ExtensionOID
195
196 try:
197 ext = cert.extensions.get_extension_for_oid(
198 ExtensionOID.SUBJECT_ALTERNATIVE_NAME
199 )
200 except x509.ExtensionNotFound:
201 return False
202 uris = ext.value.get_values_for_type(x509.UniformResourceIdentifier)
203
204 for uri in uris:
205 parsed_uri = urlparse(uri)
206 if parsed_uri.scheme == "spiffe":
207 trust_domain = parsed_uri.netloc
208 for pattern in _AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS:
209 if re.match(pattern, trust_domain):
210 return True
211 return False
212 except ImportError as e:
213 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
214
215
216def calculate_certificate_fingerprint(cert):
217 """Calculates the URL-encoded, unpadded, base64-encoded SHA256 hash of a
218 DER-encoded certificate.
219
220 Args:
221 cert (cryptography.x509.Certificate): The parsed certificate object.
222
223 Returns:
224 str: The URL-encoded, unpadded, base64-encoded SHA256 fingerprint.
225 """
226 try:
227 from cryptography.hazmat.primitives import serialization
228
229 der_cert = cert.public_bytes(serialization.Encoding.DER)
230 fingerprint = hashlib.sha256(der_cert).digest()
231 # The certificate fingerprint is generated in two steps to align with GFE's
232 # expectations and ensure proper URL transmission:
233 # 1. Standard base64 encoding is applied, and padding ('=') is removed.
234 # 2. The resulting string is then URL-encoded to handle special characters
235 # ('+', '/') that would otherwise be misinterpreted in URL parameters.
236 base64_fingerprint = base64.b64encode(fingerprint).decode("utf-8")
237 unpadded_base64_fingerprint = base64_fingerprint.rstrip("=")
238 return quote(unpadded_base64_fingerprint)
239 except ImportError as e:
240 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
241
242
243def should_request_bound_token(cert):
244 """Determines if a bound token should be requested.
245
246 This is based on the GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES
247 environment variable and whether the certificate is an agent identity cert.
248
249 Args:
250 cert (cryptography.x509.Certificate): The parsed certificate object.
251
252 Returns:
253 bool: True if a bound token should be requested, False otherwise.
254 """
255 is_agent_cert = _is_agent_identity_certificate(cert)
256 is_opted_in = (
257 os.environ.get(
258 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
259 "true",
260 ).lower()
261 == "true"
262 )
263 return is_agent_cert and is_opted_in
264
265
266def call_client_cert_callback():
267 """Calls the client cert callback and returns the certificate and key."""
268 _, cert_bytes, key_bytes, passphrase = _mtls_helper.get_client_ssl_credentials(
269 generate_encrypted_key=True
270 )
271 return cert_bytes, key_bytes
272
273
274def get_cached_cert_fingerprint(cached_cert):
275 """Returns the fingerprint of the cached certificate."""
276 if cached_cert:
277 cert_obj = parse_certificate(cached_cert)
278 cached_cert_fingerprint = calculate_certificate_fingerprint(cert_obj)
279 else:
280 raise ValueError("mTLS connection is not configured.")
281 return cached_cert_fingerprint