Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_agent_identity_utils.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

100 statements  

1# Copyright 2025 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for Agent Identity credentials.""" 

16 

17import base64 

18import hashlib 

19import logging 

20import os 

21import re 

22import time 

23from urllib.parse import quote, urlparse 

24 

25from google.auth import environment_vars 

26from google.auth import exceptions 

27 

28 

29_LOGGER = logging.getLogger(__name__) 

30 

31CRYPTOGRAPHY_NOT_FOUND_ERROR = ( 

32 "The cryptography library is required for certificate-based authentication." 

33 "Please install it with `pip install google-auth[cryptography]`." 

34) 

35 

36# SPIFFE trust domain patterns for Agent Identities. 

37_AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS = [ 

38 r"^agents\.global\.org-\d+\.system\.id\.goog$", 

39 r"^agents\.global\.proj-\d+\.system\.id\.goog$", 

40] 

41 

42_WELL_KNOWN_CERT_PATH = "/var/run/secrets/workload-spiffe-credentials/certificates.pem" 

43 

44# Constants for polling the certificate file. 

45_FAST_POLL_CYCLES = 50 

46_FAST_POLL_INTERVAL = 0.1 # 100ms 

47_SLOW_POLL_INTERVAL = 0.5 # 500ms 

48_TOTAL_TIMEOUT = 30 # seconds 

49 

50# Calculate the number of slow poll cycles based on the total timeout. 

51_SLOW_POLL_CYCLES = int( 

52 (_TOTAL_TIMEOUT - (_FAST_POLL_CYCLES * _FAST_POLL_INTERVAL)) / _SLOW_POLL_INTERVAL 

53) 

54 

55_POLLING_INTERVALS = ([_FAST_POLL_INTERVAL] * _FAST_POLL_CYCLES) + ( 

56 [_SLOW_POLL_INTERVAL] * _SLOW_POLL_CYCLES 

57) 

58 

59 

60def _is_certificate_file_ready(path): 

61 """Checks if a file exists and is not empty.""" 

62 return path and os.path.exists(path) and os.path.getsize(path) > 0 

63 

64 

65def get_agent_identity_certificate_path(): 

66 """Gets the certificate path from the certificate config file. 

67 

68 The path to the certificate config file is read from the 

69 GOOGLE_API_CERTIFICATE_CONFIG environment variable. This function 

70 implements a retry mechanism to handle cases where the environment 

71 variable is set before the files are available on the filesystem. 

72 

73 Returns: 

74 str: The path to the leaf certificate file. 

75 

76 Raises: 

77 google.auth.exceptions.RefreshError: If the certificate config file 

78 or the certificate file cannot be found after retries. 

79 """ 

80 import json 

81 

82 cert_config_path = os.environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG) 

83 if not cert_config_path: 

84 return None 

85 

86 has_logged_warning = False 

87 

88 for interval in _POLLING_INTERVALS: 

89 try: 

90 with open(cert_config_path, "r") as f: 

91 cert_config = json.load(f) 

92 cert_path = ( 

93 cert_config.get("cert_configs", {}) 

94 .get("workload", {}) 

95 .get("cert_path") 

96 ) 

97 if _is_certificate_file_ready(cert_path): 

98 return cert_path 

99 except (IOError, ValueError, KeyError): 

100 if not has_logged_warning: 

101 _LOGGER.warning( 

102 "Certificate config file not found at %s (from %s environment " 

103 "variable). Retrying for up to %s seconds.", 

104 cert_config_path, 

105 environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, 

106 _TOTAL_TIMEOUT, 

107 ) 

108 has_logged_warning = True 

109 pass 

110 

111 # As a fallback, check the well-known certificate path. 

112 if _is_certificate_file_ready(_WELL_KNOWN_CERT_PATH): 

113 return _WELL_KNOWN_CERT_PATH 

114 

115 # A sleep is required in two cases: 

116 # 1. The config file is not found (the except block). 

117 # 2. The config file is found, but the certificate is not yet available. 

118 # In both cases, we need to poll, so we sleep on every iteration 

119 # that doesn't return a certificate. 

120 time.sleep(interval) 

121 

122 raise exceptions.RefreshError( 

123 "Certificate config or certificate file not found after multiple retries. " 

124 f"Token binding protection is failing. You can turn off this protection by setting " 

125 f"{environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES} to false " 

126 "to fall back to unbound tokens." 

127 ) 

128 

129 

130def get_and_parse_agent_identity_certificate(): 

131 """Gets and parses the agent identity certificate if not opted out. 

132 

133 Checks if the user has opted out of certificate-bound tokens. If not, 

134 it gets the certificate path, reads the file, and parses it. 

135 

136 Returns: 

137 The parsed certificate object if found and not opted out, otherwise None. 

138 """ 

139 # If the user has opted out of cert bound tokens, there is no need to 

140 # look up the certificate. 

141 is_opted_out = ( 

142 os.environ.get( 

143 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, 

144 "true", 

145 ).lower() 

146 == "false" 

147 ) 

148 if is_opted_out: 

149 return None 

150 

151 cert_path = get_agent_identity_certificate_path() 

152 if not cert_path: 

153 return None 

154 

155 with open(cert_path, "rb") as cert_file: 

156 cert_bytes = cert_file.read() 

157 

158 return parse_certificate(cert_bytes) 

159 

160 

161def parse_certificate(cert_bytes): 

162 """Parses a PEM-encoded certificate. 

163 

164 Args: 

165 cert_bytes (bytes): The PEM-encoded certificate bytes. 

166 

167 Returns: 

168 cryptography.x509.Certificate: The parsed certificate object. 

169 """ 

170 try: 

171 from cryptography import x509 

172 

173 return x509.load_pem_x509_certificate(cert_bytes) 

174 except ImportError as e: 

175 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e 

176 

177 

178def _is_agent_identity_certificate(cert): 

179 """Checks if a certificate is an Agent Identity certificate. 

180 

181 This is determined by checking the Subject Alternative Name (SAN) for a 

182 SPIFFE ID with a trust domain matching Agent Identity patterns. 

183 

184 Args: 

185 cert (cryptography.x509.Certificate): The parsed certificate object. 

186 

187 Returns: 

188 bool: True if the certificate is an Agent Identity certificate, 

189 False otherwise. 

190 """ 

191 try: 

192 from cryptography import x509 

193 from cryptography.x509.oid import ExtensionOID 

194 

195 try: 

196 ext = cert.extensions.get_extension_for_oid( 

197 ExtensionOID.SUBJECT_ALTERNATIVE_NAME 

198 ) 

199 except x509.ExtensionNotFound: 

200 return False 

201 uris = ext.value.get_values_for_type(x509.UniformResourceIdentifier) 

202 

203 for uri in uris: 

204 parsed_uri = urlparse(uri) 

205 if parsed_uri.scheme == "spiffe": 

206 trust_domain = parsed_uri.netloc 

207 for pattern in _AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS: 

208 if re.match(pattern, trust_domain): 

209 return True 

210 return False 

211 except ImportError as e: 

212 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e 

213 

214 

215def calculate_certificate_fingerprint(cert): 

216 """Calculates the URL-encoded, unpadded, base64-encoded SHA256 hash of a 

217 DER-encoded certificate. 

218 

219 Args: 

220 cert (cryptography.x509.Certificate): The parsed certificate object. 

221 

222 Returns: 

223 str: The URL-encoded, unpadded, base64-encoded SHA256 fingerprint. 

224 """ 

225 try: 

226 from cryptography.hazmat.primitives import serialization 

227 

228 der_cert = cert.public_bytes(serialization.Encoding.DER) 

229 fingerprint = hashlib.sha256(der_cert).digest() 

230 # The certificate fingerprint is generated in two steps to align with GFE's 

231 # expectations and ensure proper URL transmission: 

232 # 1. Standard base64 encoding is applied, and padding ('=') is removed. 

233 # 2. The resulting string is then URL-encoded to handle special characters 

234 # ('+', '/') that would otherwise be misinterpreted in URL parameters. 

235 base64_fingerprint = base64.b64encode(fingerprint).decode("utf-8") 

236 unpadded_base64_fingerprint = base64_fingerprint.rstrip("=") 

237 return quote(unpadded_base64_fingerprint) 

238 except ImportError as e: 

239 raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e 

240 

241 

242def should_request_bound_token(cert): 

243 """Determines if a bound token should be requested. 

244 

245 This is based on the GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES 

246 environment variable and whether the certificate is an agent identity cert. 

247 

248 Args: 

249 cert (cryptography.x509.Certificate): The parsed certificate object. 

250 

251 Returns: 

252 bool: True if a bound token should be requested, False otherwise. 

253 """ 

254 is_agent_cert = _is_agent_identity_certificate(cert) 

255 is_opted_in = ( 

256 os.environ.get( 

257 environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES, 

258 "true", 

259 ).lower() 

260 == "true" 

261 ) 

262 return is_agent_cert and is_opted_in 

263 

264 

265def get_cached_cert_fingerprint(cached_cert): 

266 """Returns the fingerprint of the cached certificate.""" 

267 if cached_cert: 

268 cert_obj = parse_certificate(cached_cert) 

269 cached_cert_fingerprint = calculate_certificate_fingerprint(cert_obj) 

270 else: 

271 raise ValueError("mTLS connection is not configured.") 

272 return cached_cert_fingerprint