Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_regional_access_boundary_utils.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

135 statements  

1# Copyright 2026 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Utilities for Regional Access Boundary management.""" 

16 

17import copy 

18import datetime 

19import functools 

20import logging 

21import os 

22import threading 

23from typing import NamedTuple, Optional, TYPE_CHECKING 

24 

25from google.auth import _helpers 

26from google.auth import environment_vars 

27 

28if TYPE_CHECKING: 

29 import google.auth.credentials 

30 import google.auth.transport 

31 

32_LOGGER = logging.getLogger(__name__) 

33 

34 

35@functools.lru_cache() 

36def is_regional_access_boundary_enabled(): 

37 """Checks if Regional Access Boundary is enabled via environment variable. 

38 

39 The environment variable is interpreted as a boolean with the following 

40 (case-insensitive) rules: 

41 - "true", "1" are considered true. 

42 - Any other value (or unset) is considered false. 

43 

44 Returns: 

45 bool: True if Regional Access Boundary is enabled, False otherwise. 

46 """ 

47 value = os.environ.get(environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED) 

48 if value is None: 

49 return False 

50 

51 return value.lower() in ("true", "1") 

52 

53 

54# The default lifetime for a cached Regional Access Boundary. 

55DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL = datetime.timedelta(hours=6) 

56 

57# The period of time prior to the boundary's expiration when a background refresh 

58# is proactively triggered. 

59REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD = datetime.timedelta(hours=1) 

60 

61# The initial cooldown period for a failed Regional Access Boundary lookup. 

62DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(minutes=15) 

63 

64# The maximum cooldown period for a failed Regional Access Boundary lookup. 

65MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(hours=6) 

66 

67 

68# The header key used for Regional Access Boundaries. 

69_REGIONAL_ACCESS_BOUNDARY_HEADER = "x-allowed-locations" 

70 

71 

72class _RegionalAccessBoundaryData(NamedTuple): 

73 """Data container for a Regional Access Boundary snapshot. 

74 

75 Attributes: 

76 encoded_locations (Optional[str]): The encoded Regional Access Boundary string. 

77 expiry (Optional[datetime.datetime]): The hard expiration time of the boundary data. 

78 cooldown_expiry (Optional[datetime.datetime]): The time until which further lookups are skipped. 

79 cooldown_duration (datetime.timedelta): The current duration for the exponential cooldown. 

80 """ 

81 

82 encoded_locations: Optional[str] 

83 expiry: Optional[datetime.datetime] 

84 cooldown_expiry: Optional[datetime.datetime] 

85 cooldown_duration: datetime.timedelta 

86 

87 

88class _RegionalAccessBoundaryManager(object): 

89 """Manages the Regional Access Boundary state and its background refresh. 

90 

91 The actual data is held in an immutable `_RegionalAccessBoundaryData` object 

92 and is swapped atomically to ensure thread-safe, lock-free reads. 

93 """ 

94 

95 def __init__(self): 

96 self._data = _RegionalAccessBoundaryData( 

97 encoded_locations=None, 

98 expiry=None, 

99 cooldown_expiry=None, 

100 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

101 ) 

102 self.refresh_manager = _RegionalAccessBoundaryRefreshManager() 

103 self._update_lock = threading.Lock() 

104 self._use_blocking_regional_access_boundary_lookup = False 

105 

106 def __getstate__(self): 

107 """Pickle helper that serializes the _update_lock attribute.""" 

108 state = self.__dict__.copy() 

109 state["_update_lock"] = None 

110 return state 

111 

112 def __setstate__(self, state): 

113 """Pickle helper that deserializes the _update_lock attribute.""" 

114 self.__dict__.update(state) 

115 self._update_lock = threading.Lock() 

116 

117 def __eq__(self, other): 

118 """Checks if two managers are equal.""" 

119 if not isinstance(other, _RegionalAccessBoundaryManager): 

120 return NotImplemented 

121 return ( 

122 self._data == other._data 

123 and self._use_blocking_regional_access_boundary_lookup 

124 == other._use_blocking_regional_access_boundary_lookup 

125 ) 

126 

127 def enable_blocking_lookup(self): 

128 """Enables blocking Regional Access Boundary lookup. 

129 

130 When enabled, the Regional Access Boundary lookup will be performed 

131 synchronously in the calling thread instead of asynchronously in a 

132 background thread. 

133 """ 

134 self._use_blocking_regional_access_boundary_lookup = True 

135 

136 def set_initial_regional_access_boundary(self, encoded_locations=None, expiry=None): 

137 """Manually sets the regional access boundary to the client provided seed. 

138 

139 Args: 

140 encoded_locations (Optional[str]): The encoded locations string. 

141 expiry (Optional[datetime.datetime]): The expiry time for the boundary. 

142 If encoded_locations is not provided, expiry is ignored. 

143 """ 

144 if not encoded_locations: 

145 expiry = None 

146 

147 self._data = _RegionalAccessBoundaryData( 

148 encoded_locations=encoded_locations, 

149 expiry=expiry, 

150 cooldown_expiry=None, 

151 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

152 ) 

153 

154 def apply_headers(self, headers): 

155 """Applies the Regional Access Boundary header to the provided dictionary. 

156 

157 If the boundary is valid, the 'x-allowed-locations' header is added 

158 or updated. Otherwise, the header is removed to ensure no stale 

159 data is sent. 

160 

161 Args: 

162 headers (MutableMapping[str, str]): The headers dictionary to update. 

163 """ 

164 rab_data = self._data 

165 

166 if rab_data.encoded_locations and ( 

167 rab_data.expiry is not None and _helpers.utcnow() < rab_data.expiry 

168 ): 

169 headers[_REGIONAL_ACCESS_BOUNDARY_HEADER] = rab_data.encoded_locations 

170 else: 

171 headers.pop(_REGIONAL_ACCESS_BOUNDARY_HEADER, None) 

172 

173 def maybe_start_refresh(self, credentials, request): 

174 """Starts a background thread to refresh the Regional Access Boundary if needed. 

175 

176 Args: 

177 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

178 request (google.auth.transport.Request): The object used to make HTTP requests. 

179 """ 

180 rab_data = self._data 

181 

182 # Don't start a new refresh if the Regional Access Boundary info is still fresh. 

183 if ( 

184 rab_data.encoded_locations 

185 and rab_data.expiry 

186 and _helpers.utcnow() 

187 < (rab_data.expiry - REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD) 

188 ): 

189 return 

190 

191 # Don't start a new refresh if the cooldown is still in effect. 

192 if rab_data.cooldown_expiry and _helpers.utcnow() < rab_data.cooldown_expiry: 

193 return 

194 

195 # If all checks pass, start the background refresh. 

196 if self._use_blocking_regional_access_boundary_lookup: 

197 self.start_blocking_refresh(credentials, request) 

198 else: 

199 self.refresh_manager.start_refresh(credentials, request, self) 

200 

201 def start_blocking_refresh(self, credentials, request): 

202 """Initiates a blocking lookup of the Regional Access Boundary. 

203 

204 If the lookup raises an exception, it is caught and logged as a warning, 

205 and the lookup is treated as a failure (entering cooldown). Exceptions 

206 are not propagated to the caller. 

207 

208 Args: 

209 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

210 request (google.auth.transport.Request): The object used to make HTTP requests. 

211 """ 

212 try: 

213 # The fail_fast parameter is set to True to ensure we don't block the calling 

214 # thread for too long. This will do two things: 1) set a timeout to 3s 

215 # instead of the default 120s and 2) ensure we do not retry at all 

216 regional_access_boundary_info = ( 

217 credentials._lookup_regional_access_boundary(request, fail_fast=True) 

218 ) 

219 except Exception as e: 

220 if _helpers.is_logging_enabled(_LOGGER): 

221 _LOGGER.warning( 

222 "Blocking Regional Access Boundary lookup raised an exception: %s", 

223 e, 

224 exc_info=True, 

225 ) 

226 regional_access_boundary_info = None 

227 

228 self.process_regional_access_boundary_info(regional_access_boundary_info) 

229 

230 def process_regional_access_boundary_info(self, regional_access_boundary_info): 

231 """Processes the regional access boundary info and updates the state. 

232 

233 Args: 

234 regional_access_boundary_info (Optional[Mapping[str, str]]): The regional access 

235 boundary info to process. 

236 """ 

237 with self._update_lock: 

238 # Capture the current state before calculating updates. 

239 current_data = self._data 

240 

241 if regional_access_boundary_info: 

242 # On success, update the boundary and its expiry, and clear any cooldown. 

243 encoded_locations = regional_access_boundary_info.get( 

244 "encodedLocations" 

245 ) 

246 updated_data = _RegionalAccessBoundaryData( 

247 encoded_locations=encoded_locations, 

248 expiry=_helpers.utcnow() + DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL, 

249 cooldown_expiry=None, 

250 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

251 ) 

252 if _helpers.is_logging_enabled(_LOGGER): 

253 _LOGGER.debug("Regional Access Boundary lookup successful.") 

254 else: 

255 # On failure, calculate cooldown and update state. 

256 if _helpers.is_logging_enabled(_LOGGER): 

257 _LOGGER.warning( 

258 "Regional Access Boundary lookup failed. Entering cooldown." 

259 ) 

260 

261 next_cooldown_expiry = ( 

262 _helpers.utcnow() + current_data.cooldown_duration 

263 ) 

264 next_cooldown_duration = min( 

265 current_data.cooldown_duration * 2, 

266 MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

267 ) 

268 

269 # If the refresh failed, we keep reusing the existing data unless 

270 # it has reached its hard expiration time. 

271 if current_data.expiry and _helpers.utcnow() > current_data.expiry: 

272 next_encoded_locations = None 

273 next_expiry = None 

274 else: 

275 next_encoded_locations = current_data.encoded_locations 

276 next_expiry = current_data.expiry 

277 

278 updated_data = _RegionalAccessBoundaryData( 

279 encoded_locations=next_encoded_locations, 

280 expiry=next_expiry, 

281 cooldown_expiry=next_cooldown_expiry, 

282 cooldown_duration=next_cooldown_duration, 

283 ) 

284 

285 # Perform the atomic swap of the state object. 

286 self._data = updated_data 

287 

288 

289class _RegionalAccessBoundaryRefreshThread(threading.Thread): 

290 """Thread for background refreshing of the Regional Access Boundary.""" 

291 

292 def __init__( 

293 self, 

294 credentials: "google.auth.credentials.CredentialsWithRegionalAccessBoundary", # noqa: F821 

295 request: "google.auth.transport.Request", # noqa: F821 

296 rab_manager: "_RegionalAccessBoundaryManager", 

297 ): 

298 super().__init__() 

299 self.daemon = True 

300 self._credentials = credentials 

301 self._request = request 

302 self._rab_manager = rab_manager 

303 

304 def run(self): 

305 """ 

306 Performs the Regional Access Boundary lookup and updates the state. 

307 

308 This method is run in a separate thread. It delegates the actual lookup 

309 to the credentials object's `_lookup_regional_access_boundary` method. 

310 Based on the lookup's outcome (success or complete failure after retries), 

311 it updates the cached Regional Access Boundary information, 

312 its expiry, its cooldown expiry, and its exponential cooldown duration. 

313 """ 

314 # Catch exceptions (e.g., from the underlying transport) to prevent the 

315 # background thread from crashing. This ensures we can gracefully enter 

316 # an exponential cooldown state on failure. 

317 try: 

318 regional_access_boundary_info = ( 

319 self._credentials._lookup_regional_access_boundary(self._request) 

320 ) 

321 except Exception as e: 

322 if _helpers.is_logging_enabled(_LOGGER): 

323 _LOGGER.warning( 

324 "Asynchronous Regional Access Boundary lookup raised an exception: %s", 

325 e, 

326 exc_info=True, 

327 ) 

328 regional_access_boundary_info = None 

329 

330 self._rab_manager.process_regional_access_boundary_info( 

331 regional_access_boundary_info 

332 ) 

333 

334 

335class _RegionalAccessBoundaryRefreshManager(object): 

336 """Manages a thread for background refreshing of the Regional Access Boundary.""" 

337 

338 def __init__(self): 

339 self._lock = threading.Lock() 

340 self._worker = None 

341 

342 def __getstate__(self): 

343 """Pickle helper that serializes the _lock and _worker attributes.""" 

344 state = self.__dict__.copy() 

345 state["_lock"] = None 

346 state["_worker"] = None 

347 return state 

348 

349 def __setstate__(self, state): 

350 """Pickle helper that deserializes the _lock and _worker attributes.""" 

351 self.__dict__.update(state) 

352 self._lock = threading.Lock() 

353 self._worker = None 

354 

355 def start_refresh(self, credentials, request, rab_manager): 

356 """ 

357 Starts a background thread to refresh the Regional Access Boundary if one is not already running. 

358 

359 Args: 

360 credentials (CredentialsWithRegionalAccessBoundary): The credentials 

361 to refresh. 

362 request (google.auth.transport.Request): The object used to make 

363 HTTP requests. 

364 rab_manager (_RegionalAccessBoundaryManager): The manager container to update. 

365 """ 

366 with self._lock: 

367 if self._worker and self._worker.is_alive(): 

368 # A refresh is already in progress. 

369 return 

370 

371 try: 

372 copied_request = copy.deepcopy(request) 

373 except Exception as e: 

374 if _helpers.is_logging_enabled(_LOGGER): 

375 _LOGGER.warning( 

376 "Could not deepcopy transport for background RAB refresh. " 

377 "Skipping background refresh to avoid thread safety issues. " 

378 "Exception: %s", 

379 e, 

380 ) 

381 return 

382 

383 self._worker = _RegionalAccessBoundaryRefreshThread( 

384 credentials, copied_request, rab_manager 

385 ) 

386 self._worker.start()