Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_agent_identity_utils.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

114 statements  

1# Copyright 2025 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for Agent Identity credentials.""" 

16 

17import base64 

18import hashlib 

19import logging 

20import os 

21import re 

22import time 

23from urllib.parse import quote, urlparse 

24 

25from google.auth import environment_vars, exceptions 

26 

27_LOGGER = logging.getLogger(__name__) 

28 

29CRYPTOGRAPHY_NOT_FOUND_ERROR = ( 

30 "The cryptography library is required for certificate-based authentication." 

31 "Please install it with `pip install google-auth[cryptography]`." 

32) 

33 

34# SPIFFE trust domain patterns for Agent Identities. 

35_AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS = [ 

36 r"^agents\.global\.org-\d+\.system\.id\.goog$", 

37 r"^agents\.global\.proj-\d+\.system\.id\.goog$", 

38 r"^agents-nonprod\.global\.org-\d+\.system\.id\.goog$", 

39 r"^agents-nonprod\.global\.proj-\d+\.system\.id\.goog$", 

40] 

41 

42_WELL_KNOWN_CERT_PATH = "/var/run/secrets/workload-spiffe-credentials/certificates.pem" 

43 

44# Constants for polling the certificate file. 

45_FAST_POLL_CYCLES = 50 

46_FAST_POLL_INTERVAL = 0.1 # 100ms 

47_SLOW_POLL_INTERVAL = 0.5 # 500ms 

48_TOTAL_TIMEOUT = 30 # seconds 

49 

50# Calculate the number of slow poll cycles based on the total timeout. 

51_SLOW_POLL_CYCLES = int( 

52 (_TOTAL_TIMEOUT - (_FAST_POLL_CYCLES * _FAST_POLL_INTERVAL)) / _SLOW_POLL_INTERVAL 

53) 

54 

55_POLLING_INTERVALS = ([_FAST_POLL_INTERVAL] * _FAST_POLL_CYCLES) + ( 

56 [_SLOW_POLL_INTERVAL] * _SLOW_POLL_CYCLES 

57) 

58 

59 

60def _is_certificate_file_ready(path): 

61 """Checks if a file exists and is not empty.""" 

62 return path and os.path.exists(path) and os.path.getsize(path) > 0 

63 

64 

65def get_agent_identity_certificate_path(): 

66 """Gets the certificate path from the certificate config file. 

67 

68 The path to the certificate config file is read from the 

69 GOOGLE_API_CERTIFICATE_CONFIG environment variable. This function 

70 implements a retry mechanism to handle cases where the environment 

71 variable is set before the files are available on the filesystem. 

72 

73 Returns: 

74 str: The path to the leaf certificate file. 

75 

76 Raises: 

77 google.auth.exceptions.RefreshError: If the certificate config file 

78 or the certificate file cannot be found after retries. 

79 """ 

80 import json 

81 

82 cert_config_path = os.environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG) 

83 

84 # Check if the well-known workload directory is mounted. 

85 well_known_dir = os.path.dirname(_WELL_KNOWN_CERT_PATH) 

86 has_well_known_dir = os.path.exists(well_known_dir) 

87 

88 # If we have neither a config path nor a well-known mount directory, exit immediately. 

89 if not cert_config_path and not has_well_known_dir: 

90 return None 

91 

92 has_logged_config_warning = False 

93 has_logged_cert_warning = False 

94 

95 for interval in _POLLING_INTERVALS: 

96 try: 

97 # Path A: Config file is explicitly set 

98 if cert_config_path: 

99 with open(cert_config_path, "r") as f: 

100 cert_config = json.load(f) 

101 

102 cert_configs = ( 

103 cert_config.get("cert_configs") 

104 if isinstance(cert_config, dict) 

105 else None 

106 ) 

107 workload_config = ( 

108 cert_configs.get("workload") 

109 if isinstance(cert_configs, dict) 

110 else None 

111 ) 

112 

113 if ( 

114 not isinstance(workload_config, dict) 

115 or "cert_path" not in workload_config 

116 ): 

117 return None 

118 

119 cert_path = workload_config["cert_path"] 

120 if _is_certificate_file_ready(cert_path): 

121 return cert_path 

122 

123 # The config was parsed, but the cert file is not ready yet 

124 target_path = cert_path 

125 

126 # Path B: Config is NOT set, fallback to the well-known path 

127 else: 

128 if _is_certificate_file_ready(_WELL_KNOWN_CERT_PATH): 

129 return _WELL_KNOWN_CERT_PATH 

130 

131 # The well-known cert file is not ready yet 

132 target_path = _WELL_KNOWN_CERT_PATH 

133 

134 # Log a warning on the first failed attempt to load the certificate file 

135 if not has_logged_cert_warning: 

136 _LOGGER.warning( 

137 "Certificate file not ready at %s. Retrying until startup timeout (up to %s seconds total)...", 

138 target_path, 

139 _TOTAL_TIMEOUT, 

140 ) 

141 has_logged_cert_warning = True 

142 

143 except (IOError, ValueError, KeyError) as e: 

144 if cert_config_path and os.path.exists(cert_config_path): 

145 # If the file exists but has invalid JSON or is unreadable, 

146 # we assume it is in its final format and fail-fast by returning None. 

147 return None 

148 

149 if not has_logged_config_warning and cert_config_path: 

150 _LOGGER.warning( 

151 "Certificate config file not found or incomplete: %s (from %s " 

152 "environment variable). Retrying until startup timeout (up to %s seconds total)...", 

153 e, 

154 environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, 

155 _TOTAL_TIMEOUT, 

156 ) 

157 has_logged_config_warning = True 

158 pass 

159 

160 # A sleep is required in two cases: 

161 # 1. The config file is not found (the except block). 

162 # 2. The config file/well-known path is found, but the certificate is not yet available. 

163 # In both cases, we need to poll, so we sleep on every iteration 

164 # that doesn't return a certificate. 

165 time.sleep(interval) 

166 

167 raise exceptions.RefreshError( 

168 "Certificate config or certificate file not found after multiple retries. " 

169 f"Token binding protection is failing. You can turn off this protection by setting " 

170 f"{environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES} to false " 

171 "to fall back to unbound tokens." 

172 ) 

173 

174 

175def get_and_parse_agent_identity_certificate(): 

176 """Gets and parses the agent identity certificate if not opted out. 

177 

178 Checks if the user has opted out of certificate-bound tokens. If not, 

179 it gets the certificate path, reads the file, and parses it. 

180 

181 Returns: 

182 The parsed certificate object if found and not opted out, otherwise None. 

183 """ 

184 # If the user has opted out of cert bound tokens, there is no need to 

185 # look up the certificate. 

186 is_opted_out = ( 

187 os.environ.get( 

188 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, 

189 "true", 

190 ).lower() 

191 == "false" 

192 ) 

193 if is_opted_out: 

194 return None 

195 

196 cert_path = get_agent_identity_certificate_path() 

197 if not cert_path: 

198 return None 

199 

200 with open(cert_path, "rb") as cert_file: 

201 cert_bytes = cert_file.read() 

202 

203 return parse_certificate(cert_bytes) 

204 

205 

206def parse_certificate(cert_bytes): 

207 """Parses a PEM-encoded certificate. 

208 

209 Args: 

210 cert_bytes (bytes): The PEM-encoded certificate bytes. 

211 

212 Returns: 

213 cryptography.x509.Certificate: The parsed certificate object. 

214 """ 

215 try: 

216 from cryptography import x509 

217 

218 return x509.load_pem_x509_certificate(cert_bytes) 

219 except ImportError as e: 

220 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e 

221 

222 

223def _is_agent_identity_certificate(cert): 

224 """Checks if a certificate is an Agent Identity certificate. 

225 

226 This is determined by checking the Subject Alternative Name (SAN) for a 

227 SPIFFE ID with a trust domain matching Agent Identity patterns. 

228 

229 Args: 

230 cert (cryptography.x509.Certificate): The parsed certificate object. 

231 

232 Returns: 

233 bool: True if the certificate is an Agent Identity certificate, 

234 False otherwise. 

235 """ 

236 try: 

237 from cryptography import x509 

238 from cryptography.x509.oid import ExtensionOID 

239 

240 try: 

241 ext = cert.extensions.get_extension_for_oid( 

242 ExtensionOID.SUBJECT_ALTERNATIVE_NAME 

243 ) 

244 except x509.ExtensionNotFound: 

245 return False 

246 uris = ext.value.get_values_for_type(x509.UniformResourceIdentifier) 

247 

248 for uri in uris: 

249 parsed_uri = urlparse(uri) 

250 if parsed_uri.scheme == "spiffe": 

251 trust_domain = parsed_uri.netloc 

252 for pattern in _AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS: 

253 if re.match(pattern, trust_domain): 

254 return True 

255 return False 

256 except ImportError as e: 

257 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e 

258 

259 

260def calculate_certificate_fingerprint(cert): 

261 """Calculates the URL-encoded, unpadded, base64-encoded SHA256 hash of a 

262 DER-encoded certificate. 

263 

264 Args: 

265 cert (cryptography.x509.Certificate): The parsed certificate object. 

266 

267 Returns: 

268 str: The URL-encoded, unpadded, base64-encoded SHA256 fingerprint. 

269 """ 

270 try: 

271 from cryptography.hazmat.primitives import serialization 

272 

273 der_cert = cert.public_bytes(serialization.Encoding.DER) 

274 fingerprint = hashlib.sha256(der_cert).digest() 

275 # The certificate fingerprint is generated in two steps to align with GFE's 

276 # expectations and ensure proper URL transmission: 

277 # 1. Standard base64 encoding is applied, and padding ('=') is removed. 

278 # 2. The resulting string is then URL-encoded to handle special characters 

279 # ('+', '/') that would otherwise be misinterpreted in URL parameters. 

280 base64_fingerprint = base64.b64encode(fingerprint).decode("utf-8") 

281 unpadded_base64_fingerprint = base64_fingerprint.rstrip("=") 

282 return quote(unpadded_base64_fingerprint) 

283 except ImportError as e: 

284 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e 

285 

286 

287def should_request_bound_token(cert): 

288 """Determines if a bound token should be requested. 

289 

290 This is based on the GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES 

291 environment variable and whether the certificate is an agent identity cert. 

292 

293 Args: 

294 cert (cryptography.x509.Certificate): The parsed certificate object. 

295 

296 Returns: 

297 bool: True if a bound token should be requested, False otherwise. 

298 """ 

299 is_agent_cert = _is_agent_identity_certificate(cert) 

300 is_opted_in = ( 

301 os.environ.get( 

302 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, 

303 "true", 

304 ).lower() 

305 == "true" 

306 ) 

307 return is_agent_cert and is_opted_in 

308 

309 

310def get_cached_cert_fingerprint(cached_cert): 

311 """Returns the fingerprint of the cached certificate.""" 

312 if cached_cert: 

313 cert_obj = parse_certificate(cached_cert) 

314 cached_cert_fingerprint = calculate_certificate_fingerprint(cert_obj) 

315 else: 

316 raise ValueError("mTLS connection is not configured.") 

317 return cached_cert_fingerprint