Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_agent_identity_utils.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

116 statements  

1# Copyright 2025 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for Agent Identity credentials.""" 

16 

17import base64 

18import hashlib 

19import logging 

20import os 

21import re 

22import time 

23from urllib.parse import quote, urlparse 

24 

25from google.auth import environment_vars, exceptions 

26 

27_LOGGER = logging.getLogger(__name__) 

28 

29CRYPTOGRAPHY_NOT_FOUND_ERROR = ( 

30 "The cryptography library is required for certificate-based authentication." 

31 "Please install it with `pip install google-auth[cryptography]`." 

32) 

33 

34# SPIFFE trust domain patterns for Agent Identities. 

35_AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS = [ 

36 r"^agents\.global\.org-\d+\.system\.id\.goog$", 

37 r"^agents\.global\.proj-\d+\.system\.id\.goog$", 

38 r"^agents-nonprod\.global\.org-\d+\.system\.id\.goog$", 

39 r"^agents-nonprod\.global\.proj-\d+\.system\.id\.goog$", 

40] 

41 

42_WELL_KNOWN_CERT_PATH = "/var/run/secrets/workload-spiffe-credentials/certificates.pem" 

43 

44# Constants for polling the certificate file. 

45_FAST_POLL_CYCLES = 50 

46_FAST_POLL_INTERVAL = 0.1 # 100ms 

47_SLOW_POLL_INTERVAL = 0.5 # 500ms 

48_TOTAL_TIMEOUT = 30 # seconds 

49 

50# Calculate the number of slow poll cycles based on the total timeout. 

51_SLOW_POLL_CYCLES = int( 

52 (_TOTAL_TIMEOUT - (_FAST_POLL_CYCLES * _FAST_POLL_INTERVAL)) / _SLOW_POLL_INTERVAL 

53) 

54 

55_POLLING_INTERVALS = ([_FAST_POLL_INTERVAL] * _FAST_POLL_CYCLES) + ( 

56 [_SLOW_POLL_INTERVAL] * _SLOW_POLL_CYCLES 

57) 

58 

59 

60def _is_certificate_file_ready(path): 

61 """Checks if a file exists and is not empty.""" 

62 return path and os.path.exists(path) and os.path.getsize(path) > 0 

63 

64 

65def get_agent_identity_certificate_path(): 

66 """Gets the certificate path from the certificate config file. 

67 

68 The path to the certificate config file is read from the 

69 GOOGLE_API_CERTIFICATE_CONFIG environment variable. This function 

70 implements a retry mechanism to handle cases where the environment 

71 variable is set before the files are available on the filesystem. 

72 

73 Returns: 

74 str: The path to the leaf certificate file. 

75 

76 Raises: 

77 google.auth.exceptions.RefreshError: If the certificate config file 

78 or the certificate file cannot be found after retries. 

79 """ 

80 import json 

81 

82 cert_config_path = os.environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG) 

83 

84 # Check if the well-known workload directory is mounted. 

85 well_known_dir = os.path.dirname(_WELL_KNOWN_CERT_PATH) 

86 has_well_known_dir = os.path.exists(well_known_dir) 

87 

88 # If we have neither a config path nor a well-known mount directory, exit immediately. 

89 if not cert_config_path and not has_well_known_dir: 

90 return None 

91 

92 # If ECP config path is specified but does not exist, and we are on a workstation, fail-fast immediately. 

93 if ( 

94 cert_config_path 

95 and not has_well_known_dir 

96 and not os.path.exists(cert_config_path) 

97 ): 

98 return None 

99 

100 has_logged_config_warning = False 

101 has_logged_cert_warning = False 

102 

103 for interval in _POLLING_INTERVALS: 

104 try: 

105 # Path A: Config file is explicitly set 

106 if cert_config_path: 

107 with open(cert_config_path, "r") as f: 

108 cert_config = json.load(f) 

109 

110 cert_configs = ( 

111 cert_config.get("cert_configs") 

112 if isinstance(cert_config, dict) 

113 else None 

114 ) 

115 workload_config = ( 

116 cert_configs.get("workload") 

117 if isinstance(cert_configs, dict) 

118 else None 

119 ) 

120 

121 if ( 

122 not isinstance(workload_config, dict) 

123 or "cert_path" not in workload_config 

124 ): 

125 return None 

126 

127 cert_path = workload_config["cert_path"] 

128 if _is_certificate_file_ready(cert_path): 

129 return cert_path 

130 

131 # The config was parsed, but the cert file is not ready yet 

132 target_path = cert_path 

133 

134 # Path B: Config is NOT set, fallback to the well-known path 

135 else: 

136 if _is_certificate_file_ready(_WELL_KNOWN_CERT_PATH): 

137 return _WELL_KNOWN_CERT_PATH 

138 

139 # The well-known cert file is not ready yet 

140 target_path = _WELL_KNOWN_CERT_PATH 

141 

142 # Log a warning on the first failed attempt to load the certificate file 

143 if not has_logged_cert_warning: 

144 _LOGGER.warning( 

145 "Certificate file not ready at %s. Retrying until startup timeout (up to %s seconds total)...", 

146 target_path, 

147 _TOTAL_TIMEOUT, 

148 ) 

149 has_logged_cert_warning = True 

150 

151 except (IOError, ValueError, KeyError) as e: 

152 if cert_config_path and os.path.exists(cert_config_path): 

153 # If the file exists but has invalid JSON or is unreadable, 

154 # we assume it is in its final format and fail-fast by returning None. 

155 return None 

156 

157 if not has_logged_config_warning and cert_config_path: 

158 _LOGGER.warning( 

159 "Certificate config file not found or incomplete: %s (from %s " 

160 "environment variable). Retrying until startup timeout (up to %s seconds total)...", 

161 e, 

162 environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, 

163 _TOTAL_TIMEOUT, 

164 ) 

165 has_logged_config_warning = True 

166 pass 

167 

168 # A sleep is required in two cases: 

169 # 1. The config file is not found (the except block). 

170 # 2. The config file/well-known path is found, but the certificate is not yet available. 

171 # In both cases, we need to poll, so we sleep on every iteration 

172 # that doesn't return a certificate. 

173 time.sleep(interval) 

174 

175 raise exceptions.RefreshError( 

176 "Certificate config or certificate file not found after multiple retries. " 

177 f"Token binding protection is failing. You can turn off this protection by setting " 

178 f"{environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES} to false " 

179 "to fall back to unbound tokens." 

180 ) 

181 

182 

183def get_and_parse_agent_identity_certificate(): 

184 """Gets and parses the agent identity certificate if not opted out. 

185 

186 Checks if the user has opted out of certificate-bound tokens. If not, 

187 it gets the certificate path, reads the file, and parses it. 

188 

189 Returns: 

190 The parsed certificate object if found and not opted out, otherwise None. 

191 """ 

192 # If the user has opted out of cert bound tokens, there is no need to 

193 # look up the certificate. 

194 is_opted_out = ( 

195 os.environ.get( 

196 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, 

197 "true", 

198 ).lower() 

199 == "false" 

200 ) 

201 if is_opted_out: 

202 return None 

203 

204 cert_path = get_agent_identity_certificate_path() 

205 if not cert_path: 

206 return None 

207 

208 with open(cert_path, "rb") as cert_file: 

209 cert_bytes = cert_file.read() 

210 

211 return parse_certificate(cert_bytes) 

212 

213 

214def parse_certificate(cert_bytes): 

215 """Parses a PEM-encoded certificate. 

216 

217 Args: 

218 cert_bytes (bytes): The PEM-encoded certificate bytes. 

219 

220 Returns: 

221 cryptography.x509.Certificate: The parsed certificate object. 

222 """ 

223 try: 

224 from cryptography import x509 

225 

226 return x509.load_pem_x509_certificate(cert_bytes) 

227 except ImportError as e: 

228 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e 

229 

230 

231def _is_agent_identity_certificate(cert): 

232 """Checks if a certificate is an Agent Identity certificate. 

233 

234 This is determined by checking the Subject Alternative Name (SAN) for a 

235 SPIFFE ID with a trust domain matching Agent Identity patterns. 

236 

237 Args: 

238 cert (cryptography.x509.Certificate): The parsed certificate object. 

239 

240 Returns: 

241 bool: True if the certificate is an Agent Identity certificate, 

242 False otherwise. 

243 """ 

244 try: 

245 from cryptography import x509 

246 from cryptography.x509.oid import ExtensionOID 

247 

248 try: 

249 ext = cert.extensions.get_extension_for_oid( 

250 ExtensionOID.SUBJECT_ALTERNATIVE_NAME 

251 ) 

252 except x509.ExtensionNotFound: 

253 return False 

254 uris = ext.value.get_values_for_type(x509.UniformResourceIdentifier) 

255 

256 for uri in uris: 

257 parsed_uri = urlparse(uri) 

258 if parsed_uri.scheme == "spiffe": 

259 trust_domain = parsed_uri.netloc 

260 for pattern in _AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS: 

261 if re.match(pattern, trust_domain): 

262 return True 

263 return False 

264 except ImportError as e: 

265 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e 

266 

267 

268def calculate_certificate_fingerprint(cert): 

269 """Calculates the URL-encoded, unpadded, base64-encoded SHA256 hash of a 

270 DER-encoded certificate. 

271 

272 Args: 

273 cert (cryptography.x509.Certificate): The parsed certificate object. 

274 

275 Returns: 

276 str: The URL-encoded, unpadded, base64-encoded SHA256 fingerprint. 

277 """ 

278 try: 

279 from cryptography.hazmat.primitives import serialization 

280 

281 der_cert = cert.public_bytes(serialization.Encoding.DER) 

282 fingerprint = hashlib.sha256(der_cert).digest() 

283 # The certificate fingerprint is generated in two steps to align with GFE's 

284 # expectations and ensure proper URL transmission: 

285 # 1. Standard base64 encoding is applied, and padding ('=') is removed. 

286 # 2. The resulting string is then URL-encoded to handle special characters 

287 # ('+', '/') that would otherwise be misinterpreted in URL parameters. 

288 base64_fingerprint = base64.b64encode(fingerprint).decode("utf-8") 

289 unpadded_base64_fingerprint = base64_fingerprint.rstrip("=") 

290 return quote(unpadded_base64_fingerprint) 

291 except ImportError as e: 

292 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e 

293 

294 

295def should_request_bound_token(cert): 

296 """Determines if a bound token should be requested. 

297 

298 This is based on the GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES 

299 environment variable and whether the certificate is an agent identity cert. 

300 

301 Args: 

302 cert (cryptography.x509.Certificate): The parsed certificate object. 

303 

304 Returns: 

305 bool: True if a bound token should be requested, False otherwise. 

306 """ 

307 is_agent_cert = _is_agent_identity_certificate(cert) 

308 is_opted_in = ( 

309 os.environ.get( 

310 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, 

311 "true", 

312 ).lower() 

313 == "true" 

314 ) 

315 return is_agent_cert and is_opted_in 

316 

317 

318def get_cached_cert_fingerprint(cached_cert): 

319 """Returns the fingerprint of the cached certificate.""" 

320 if cached_cert: 

321 cert_obj = parse_certificate(cached_cert) 

322 cached_cert_fingerprint = calculate_certificate_fingerprint(cert_obj) 

323 else: 

324 raise ValueError("mTLS connection is not configured.") 

325 return cached_cert_fingerprint