Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_agent_identity_utils.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

104 statements  

1# Copyright 2025 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for Agent Identity credentials.""" 

16 

17import base64 

18import hashlib 

19import logging 

20import os 

21import re 

22import time 

23from urllib.parse import quote, urlparse 

24 

25from google.auth import environment_vars 

26from google.auth import exceptions 

27from google.auth.transport import _mtls_helper 

28 

29 

30_LOGGER = logging.getLogger(__name__) 

31 

32CRYPTOGRAPHY_NOT_FOUND_ERROR = ( 

33 "The cryptography library is required for certificate-based authentication." 

34 "Please install it with `pip install google-auth[cryptography]`." 

35) 

36 

37# SPIFFE trust domain patterns for Agent Identities. 

38_AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS = [ 

39 r"^agents\.global\.org-\d+\.system\.id\.goog$", 

40 r"^agents\.global\.proj-\d+\.system\.id\.goog$", 

41] 

42 

43_WELL_KNOWN_CERT_PATH = "/var/run/secrets/workload-spiffe-credentials/certificates.pem" 

44 

45# Constants for polling the certificate file. 

46_FAST_POLL_CYCLES = 50 

47_FAST_POLL_INTERVAL = 0.1 # 100ms 

48_SLOW_POLL_INTERVAL = 0.5 # 500ms 

49_TOTAL_TIMEOUT = 30 # seconds 

50 

51# Calculate the number of slow poll cycles based on the total timeout. 

52_SLOW_POLL_CYCLES = int( 

53 (_TOTAL_TIMEOUT - (_FAST_POLL_CYCLES * _FAST_POLL_INTERVAL)) / _SLOW_POLL_INTERVAL 

54) 

55 

56_POLLING_INTERVALS = ([_FAST_POLL_INTERVAL] * _FAST_POLL_CYCLES) + ( 

57 [_SLOW_POLL_INTERVAL] * _SLOW_POLL_CYCLES 

58) 

59 

60 

61def _is_certificate_file_ready(path): 

62 """Checks if a file exists and is not empty.""" 

63 return path and os.path.exists(path) and os.path.getsize(path) > 0 

64 

65 

66def get_agent_identity_certificate_path(): 

67 """Gets the certificate path from the certificate config file. 

68 

69 The path to the certificate config file is read from the 

70 GOOGLE_API_CERTIFICATE_CONFIG environment variable. This function 

71 implements a retry mechanism to handle cases where the environment 

72 variable is set before the files are available on the filesystem. 

73 

74 Returns: 

75 str: The path to the leaf certificate file. 

76 

77 Raises: 

78 google.auth.exceptions.RefreshError: If the certificate config file 

79 or the certificate file cannot be found after retries. 

80 """ 

81 import json 

82 

83 cert_config_path = os.environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG) 

84 if not cert_config_path: 

85 return None 

86 

87 has_logged_warning = False 

88 

89 for interval in _POLLING_INTERVALS: 

90 try: 

91 with open(cert_config_path, "r") as f: 

92 cert_config = json.load(f) 

93 cert_path = ( 

94 cert_config.get("cert_configs", {}) 

95 .get("workload", {}) 

96 .get("cert_path") 

97 ) 

98 if _is_certificate_file_ready(cert_path): 

99 return cert_path 

100 except (IOError, ValueError, KeyError): 

101 if not has_logged_warning: 

102 _LOGGER.warning( 

103 "Certificate config file not found at %s (from %s environment " 

104 "variable). Retrying for up to %s seconds.", 

105 cert_config_path, 

106 environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, 

107 _TOTAL_TIMEOUT, 

108 ) 

109 has_logged_warning = True 

110 pass 

111 

112 # As a fallback, check the well-known certificate path. 

113 if _is_certificate_file_ready(_WELL_KNOWN_CERT_PATH): 

114 return _WELL_KNOWN_CERT_PATH 

115 

116 # A sleep is required in two cases: 

117 # 1. The config file is not found (the except block). 

118 # 2. The config file is found, but the certificate is not yet available. 

119 # In both cases, we need to poll, so we sleep on every iteration 

120 # that doesn't return a certificate. 

121 time.sleep(interval) 

122 

123 raise exceptions.RefreshError( 

124 "Certificate config or certificate file not found after multiple retries. " 

125 f"Token binding protection is failing. You can turn off this protection by setting " 

126 f"{environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES} to false " 

127 "to fall back to unbound tokens." 

128 ) 

129 

130 

131def get_and_parse_agent_identity_certificate(): 

132 """Gets and parses the agent identity certificate if not opted out. 

133 

134 Checks if the user has opted out of certificate-bound tokens. If not, 

135 it gets the certificate path, reads the file, and parses it. 

136 

137 Returns: 

138 The parsed certificate object if found and not opted out, otherwise None. 

139 """ 

140 # If the user has opted out of cert bound tokens, there is no need to 

141 # look up the certificate. 

142 is_opted_out = ( 

143 os.environ.get( 

144 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, 

145 "true", 

146 ).lower() 

147 == "false" 

148 ) 

149 if is_opted_out: 

150 return None 

151 

152 cert_path = get_agent_identity_certificate_path() 

153 if not cert_path: 

154 return None 

155 

156 with open(cert_path, "rb") as cert_file: 

157 cert_bytes = cert_file.read() 

158 

159 return parse_certificate(cert_bytes) 

160 

161 

162def parse_certificate(cert_bytes): 

163 """Parses a PEM-encoded certificate. 

164 

165 Args: 

166 cert_bytes (bytes): The PEM-encoded certificate bytes. 

167 

168 Returns: 

169 cryptography.x509.Certificate: The parsed certificate object. 

170 """ 

171 try: 

172 from cryptography import x509 

173 

174 return x509.load_pem_x509_certificate(cert_bytes) 

175 except ImportError as e: 

176 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e 

177 

178 

179def _is_agent_identity_certificate(cert): 

180 """Checks if a certificate is an Agent Identity certificate. 

181 

182 This is determined by checking the Subject Alternative Name (SAN) for a 

183 SPIFFE ID with a trust domain matching Agent Identity patterns. 

184 

185 Args: 

186 cert (cryptography.x509.Certificate): The parsed certificate object. 

187 

188 Returns: 

189 bool: True if the certificate is an Agent Identity certificate, 

190 False otherwise. 

191 """ 

192 try: 

193 from cryptography import x509 

194 from cryptography.x509.oid import ExtensionOID 

195 

196 try: 

197 ext = cert.extensions.get_extension_for_oid( 

198 ExtensionOID.SUBJECT_ALTERNATIVE_NAME 

199 ) 

200 except x509.ExtensionNotFound: 

201 return False 

202 uris = ext.value.get_values_for_type(x509.UniformResourceIdentifier) 

203 

204 for uri in uris: 

205 parsed_uri = urlparse(uri) 

206 if parsed_uri.scheme == "spiffe": 

207 trust_domain = parsed_uri.netloc 

208 for pattern in _AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS: 

209 if re.match(pattern, trust_domain): 

210 return True 

211 return False 

212 except ImportError as e: 

213 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e 

214 

215 

216def calculate_certificate_fingerprint(cert): 

217 """Calculates the URL-encoded, unpadded, base64-encoded SHA256 hash of a 

218 DER-encoded certificate. 

219 

220 Args: 

221 cert (cryptography.x509.Certificate): The parsed certificate object. 

222 

223 Returns: 

224 str: The URL-encoded, unpadded, base64-encoded SHA256 fingerprint. 

225 """ 

226 try: 

227 from cryptography.hazmat.primitives import serialization 

228 

229 der_cert = cert.public_bytes(serialization.Encoding.DER) 

230 fingerprint = hashlib.sha256(der_cert).digest() 

231 # The certificate fingerprint is generated in two steps to align with GFE's 

232 # expectations and ensure proper URL transmission: 

233 # 1. Standard base64 encoding is applied, and padding ('=') is removed. 

234 # 2. The resulting string is then URL-encoded to handle special characters 

235 # ('+', '/') that would otherwise be misinterpreted in URL parameters. 

236 base64_fingerprint = base64.b64encode(fingerprint).decode("utf-8") 

237 unpadded_base64_fingerprint = base64_fingerprint.rstrip("=") 

238 return quote(unpadded_base64_fingerprint) 

239 except ImportError as e: 

240 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e 

241 

242 

243def should_request_bound_token(cert): 

244 """Determines if a bound token should be requested. 

245 

246 This is based on the GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES 

247 environment variable and whether the certificate is an agent identity cert. 

248 

249 Args: 

250 cert (cryptography.x509.Certificate): The parsed certificate object. 

251 

252 Returns: 

253 bool: True if a bound token should be requested, False otherwise. 

254 """ 

255 is_agent_cert = _is_agent_identity_certificate(cert) 

256 is_opted_in = ( 

257 os.environ.get( 

258 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, 

259 "true", 

260 ).lower() 

261 == "true" 

262 ) 

263 return is_agent_cert and is_opted_in 

264 

265 

266def call_client_cert_callback(): 

267 """Calls the client cert callback and returns the certificate and key.""" 

268 _, cert_bytes, key_bytes, passphrase = _mtls_helper.get_client_ssl_credentials( 

269 generate_encrypted_key=True 

270 ) 

271 return cert_bytes, key_bytes 

272 

273 

274def get_cached_cert_fingerprint(cached_cert): 

275 """Returns the fingerprint of the cached certificate.""" 

276 if cached_cert: 

277 cert_obj = parse_certificate(cached_cert) 

278 cached_cert_fingerprint = calculate_certificate_fingerprint(cert_obj) 

279 else: 

280 raise ValueError("mTLS connection is not configured.") 

281 return cached_cert_fingerprint