Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_regional_access_boundary_utils.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

202 statements  

1# Copyright 2026 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Utilities for Regional Access Boundary management.""" 

16 

17import asyncio 

18import copy 

19import datetime 

20import functools 

21import inspect 

22import logging 

23import os 

24import threading 

25from typing import NamedTuple, Optional, TYPE_CHECKING 

26 

27from google.auth import _helpers 

28from google.auth import environment_vars 

29 

30if TYPE_CHECKING: 

31 import google.auth.credentials 

32 import google.auth.transport 

33 

34_LOGGER = logging.getLogger(__name__) 

35 

36 

37@functools.lru_cache() 

38def is_regional_access_boundary_enabled(): 

39 """Checks if Regional Access Boundary is enabled via environment variable. 

40 

41 The environment variable is interpreted as a boolean with the following 

42 (case-insensitive) rules: 

43 - "true", "1" are considered true. 

44 - Any other value (or unset) is considered false. 

45 

46 Returns: 

47 bool: True if Regional Access Boundary is enabled, False otherwise. 

48 """ 

49 value = os.environ.get(environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED) 

50 if value is None: 

51 return False 

52 

53 return value.lower() in ("true", "1") 

54 

55 

56# The default lifetime for a cached Regional Access Boundary. 

57DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL = datetime.timedelta(hours=6) 

58 

59# The period of time prior to the boundary's expiration when a background refresh 

60# is proactively triggered. 

61REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD = datetime.timedelta(hours=1) 

62 

63# The initial cooldown period for a failed Regional Access Boundary lookup. 

64DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(minutes=15) 

65 

66# The maximum cooldown period for a failed Regional Access Boundary lookup. 

67MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(hours=6) 

68 

69 

70# The header key used for Regional Access Boundaries. 

71_REGIONAL_ACCESS_BOUNDARY_HEADER = "x-allowed-locations" 

72 

73 

74class _RegionalAccessBoundaryData(NamedTuple): 

75 """Data container for a Regional Access Boundary snapshot. 

76 

77 Attributes: 

78 encoded_locations (Optional[str]): The encoded Regional Access Boundary string. 

79 expiry (Optional[datetime.datetime]): The hard expiration time of the boundary data. 

80 cooldown_expiry (Optional[datetime.datetime]): The time until which further lookups are skipped. 

81 cooldown_duration (datetime.timedelta): The current duration for the exponential cooldown. 

82 """ 

83 

84 encoded_locations: Optional[str] 

85 expiry: Optional[datetime.datetime] 

86 cooldown_expiry: Optional[datetime.datetime] 

87 cooldown_duration: datetime.timedelta 

88 

89 

90class _RegionalAccessBoundaryManager(object): 

91 """Manages the Regional Access Boundary state and its background refresh. 

92 

93 The actual data is held in an immutable `_RegionalAccessBoundaryData` object 

94 and is swapped atomically to ensure thread-safe, lock-free reads. 

95 """ 

96 

97 def __init__(self): 

98 self._data = _RegionalAccessBoundaryData( 

99 encoded_locations=None, 

100 expiry=None, 

101 cooldown_expiry=None, 

102 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

103 ) 

104 self.refresh_manager = _RegionalAccessBoundaryRefreshManager() 

105 self._update_lock = threading.Lock() 

106 self._use_blocking_regional_access_boundary_lookup = False 

107 

108 def __getstate__(self): 

109 """Pickle helper that serializes the _update_lock attribute.""" 

110 state = self.__dict__.copy() 

111 state["_update_lock"] = None 

112 return state 

113 

114 def __setstate__(self, state): 

115 """Pickle helper that deserializes the _update_lock attribute.""" 

116 self.__dict__.update(state) 

117 self._update_lock = threading.Lock() 

118 

119 def __eq__(self, other): 

120 """Checks if two managers are equal.""" 

121 if not isinstance(other, _RegionalAccessBoundaryManager): 

122 return NotImplemented 

123 return ( 

124 self._data == other._data 

125 and self._use_blocking_regional_access_boundary_lookup 

126 == other._use_blocking_regional_access_boundary_lookup 

127 ) 

128 

129 def enable_blocking_lookup(self): 

130 """Enables blocking Regional Access Boundary lookup. 

131 

132 When enabled, the Regional Access Boundary lookup will be performed 

133 synchronously in the calling thread instead of asynchronously in a 

134 background thread. 

135 """ 

136 self._use_blocking_regional_access_boundary_lookup = True 

137 

138 def set_initial_regional_access_boundary(self, encoded_locations=None, expiry=None): 

139 """Manually sets the regional access boundary to the client provided initial values. 

140 

141 Args: 

142 encoded_locations (Optional[str]): The encoded locations string. 

143 expiry (Optional[datetime.datetime]): The expiry time for the boundary. 

144 If encoded_locations is not provided, expiry is ignored. 

145 """ 

146 if not encoded_locations: 

147 expiry = None 

148 

149 self._data = _RegionalAccessBoundaryData( 

150 encoded_locations=encoded_locations, 

151 expiry=expiry, 

152 cooldown_expiry=None, 

153 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

154 ) 

155 

156 def apply_headers(self, headers): 

157 """Applies the Regional Access Boundary header to the provided dictionary. 

158 

159 If the boundary is valid, the 'x-allowed-locations' header is added 

160 or updated. Otherwise, the header is removed to ensure no stale 

161 data is sent. 

162 

163 Args: 

164 headers (MutableMapping[str, str]): The headers dictionary to update. 

165 """ 

166 rab_data = self._data 

167 

168 if rab_data.encoded_locations and ( 

169 rab_data.expiry is not None and _helpers.utcnow() < rab_data.expiry 

170 ): 

171 headers[_REGIONAL_ACCESS_BOUNDARY_HEADER] = rab_data.encoded_locations 

172 else: 

173 headers.pop(_REGIONAL_ACCESS_BOUNDARY_HEADER, None) 

174 

175 def _should_refresh(self): 

176 """Checks if the Regional Access Boundary data needs a refresh and is not in cooldown. 

177 

178 Returns: 

179 bool: True if a refresh is required, False otherwise. 

180 """ 

181 rab_data = self._data 

182 

183 # Don't start a new refresh if the Regional Access Boundary info is still fresh. 

184 if ( 

185 rab_data.encoded_locations 

186 and rab_data.expiry 

187 and _helpers.utcnow() 

188 < (rab_data.expiry - REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD) 

189 ): 

190 return False 

191 

192 # Don't start a new refresh if the cooldown is still in effect. 

193 if rab_data.cooldown_expiry and _helpers.utcnow() < rab_data.cooldown_expiry: 

194 return False 

195 

196 return True 

197 

198 def maybe_start_refresh(self, credentials, request): 

199 """Starts a background thread to refresh the Regional Access Boundary if needed. 

200 

201 Args: 

202 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

203 request (google.auth.transport.Request): The object used to make HTTP requests. 

204 """ 

205 if not self._should_refresh(): 

206 return 

207 

208 # If all checks pass, start the background refresh. 

209 if self._use_blocking_regional_access_boundary_lookup: 

210 self.start_blocking_refresh(credentials, request) 

211 else: 

212 self.refresh_manager.start_refresh(credentials, request, self) 

213 

214 async def maybe_start_refresh_async(self, credentials, request): 

215 """Starts a background refresh or performs a blocking refresh asynchronously. 

216 

217 Args: 

218 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

219 request (google.auth.aio.transport.Request): The object used to make HTTP requests. 

220 """ 

221 if not self._should_refresh(): 

222 return 

223 

224 # If all checks pass, start the refresh. 

225 if self._use_blocking_regional_access_boundary_lookup: 

226 await self.start_blocking_refresh_async(credentials, request) 

227 else: 

228 self.refresh_manager.start_refresh(credentials, request, self) 

229 

230 def start_blocking_refresh(self, credentials, request): 

231 """Initiates a blocking lookup of the Regional Access Boundary. 

232 

233 If the lookup raises an exception, it is caught and logged as a warning, 

234 and the lookup is treated as a failure (entering cooldown). Exceptions 

235 are not propagated to the caller. 

236 

237 Args: 

238 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

239 request (google.auth.transport.Request): The object used to make HTTP requests. 

240 """ 

241 # Async credentials do not support blocking lookups. 

242 if inspect.iscoroutinefunction(credentials._lookup_regional_access_boundary): 

243 if _helpers.is_logging_enabled(_LOGGER): 

244 _LOGGER.warning( 

245 "Blocking Regional Access Boundary lookup is not supported for async credentials." 

246 ) 

247 self.process_regional_access_boundary_info(None) 

248 return 

249 

250 try: 

251 # The fail_fast parameter is set to True to ensure we don't block the calling 

252 # thread for too long. This will do two things: 1) set a timeout to 3s 

253 # instead of the default 120s and 2) ensure we do not retry at all 

254 regional_access_boundary_info = ( 

255 credentials._lookup_regional_access_boundary(request, fail_fast=True) 

256 ) 

257 except Exception as e: 

258 if _helpers.is_logging_enabled(_LOGGER): 

259 _LOGGER.warning( 

260 "Blocking Regional Access Boundary lookup raised an exception: %s", 

261 e, 

262 exc_info=True, 

263 ) 

264 regional_access_boundary_info = None 

265 

266 self.process_regional_access_boundary_info(regional_access_boundary_info) 

267 

268 async def start_blocking_refresh_async(self, credentials, request): 

269 """Initiates a blocking lookup of the Regional Access Boundary asynchronously. 

270 

271 If the lookup raises an exception, it is caught and logged as a warning, 

272 and the lookup is treated as a failure (entering cooldown). Exceptions 

273 are not propagated to the caller. 

274 

275 Args: 

276 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

277 request (google.auth.aio.transport.Request): The object used to make HTTP requests. 

278 """ 

279 try: 

280 # The fail_fast parameter is set to True to ensure we don't block the calling 

281 # thread for too long. This will do two things: 1) set a timeout to 3s 

282 # instead of the default 120s and 2) ensure we do not retry at all 

283 regional_access_boundary_info = ( 

284 await credentials._lookup_regional_access_boundary( 

285 request, fail_fast=True 

286 ) 

287 ) 

288 except Exception as e: 

289 if _helpers.is_logging_enabled(_LOGGER): 

290 _LOGGER.warning( 

291 "Regional Access Boundary lookup raised an exception: %s", 

292 e, 

293 exc_info=True, 

294 ) 

295 regional_access_boundary_info = None 

296 

297 self.process_regional_access_boundary_info(regional_access_boundary_info) 

298 

299 def process_regional_access_boundary_info(self, regional_access_boundary_info): 

300 """Processes the regional access boundary info and updates the state. 

301 

302 Args: 

303 regional_access_boundary_info (Optional[Mapping[str, str]]): The regional access 

304 boundary info to process. 

305 """ 

306 with self._update_lock: 

307 # Capture the current state before calculating updates. 

308 current_data = self._data 

309 

310 if regional_access_boundary_info: 

311 # On success, update the boundary and its expiry, and clear any cooldown. 

312 encoded_locations = regional_access_boundary_info.get( 

313 "encodedLocations" 

314 ) 

315 updated_data = _RegionalAccessBoundaryData( 

316 encoded_locations=encoded_locations, 

317 expiry=_helpers.utcnow() + DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL, 

318 cooldown_expiry=None, 

319 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

320 ) 

321 if _helpers.is_logging_enabled(_LOGGER): 

322 _LOGGER.debug("Regional Access Boundary lookup successful.") 

323 else: 

324 # On failure, calculate cooldown and update state. 

325 if _helpers.is_logging_enabled(_LOGGER): 

326 _LOGGER.warning( 

327 "Regional Access Boundary lookup failed. Entering cooldown." 

328 ) 

329 

330 next_cooldown_expiry = ( 

331 _helpers.utcnow() + current_data.cooldown_duration 

332 ) 

333 next_cooldown_duration = min( 

334 current_data.cooldown_duration * 2, 

335 MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

336 ) 

337 

338 # If the refresh failed, we keep reusing the existing data unless 

339 # it has reached its hard expiration time. 

340 if current_data.expiry and _helpers.utcnow() > current_data.expiry: 

341 next_encoded_locations = None 

342 next_expiry = None 

343 else: 

344 next_encoded_locations = current_data.encoded_locations 

345 next_expiry = current_data.expiry 

346 

347 updated_data = _RegionalAccessBoundaryData( 

348 encoded_locations=next_encoded_locations, 

349 expiry=next_expiry, 

350 cooldown_expiry=next_cooldown_expiry, 

351 cooldown_duration=next_cooldown_duration, 

352 ) 

353 

354 # Perform the atomic swap of the state object. 

355 self._data = updated_data 

356 

357 

358class _RegionalAccessBoundaryRefreshThread(threading.Thread): 

359 """Thread for background refreshing of the Regional Access Boundary.""" 

360 

361 def __init__( 

362 self, 

363 credentials: "google.auth.credentials.CredentialsWithRegionalAccessBoundary", # noqa: F821 

364 request: "google.auth.transport.Request", # noqa: F821 

365 rab_manager: "_RegionalAccessBoundaryManager", 

366 ): 

367 super().__init__() 

368 self.daemon = True 

369 self._credentials = credentials 

370 self._request = request 

371 self._rab_manager = rab_manager 

372 

373 def run(self): 

374 """ 

375 Performs the Regional Access Boundary lookup and updates the state. 

376 

377 This method is run in a separate thread. It delegates the actual lookup 

378 to the credentials object's `_lookup_regional_access_boundary` method. 

379 Based on the lookup's outcome (success or complete failure after retries), 

380 it updates the cached Regional Access Boundary information, 

381 its expiry, its cooldown expiry, and its exponential cooldown duration. 

382 """ 

383 # Catch exceptions (e.g., from the underlying transport) to prevent the 

384 # background thread from crashing. This ensures we can gracefully enter 

385 # an exponential cooldown state on failure. 

386 try: 

387 regional_access_boundary_info = ( 

388 self._credentials._lookup_regional_access_boundary(self._request) 

389 ) 

390 except Exception as e: 

391 if _helpers.is_logging_enabled(_LOGGER): 

392 _LOGGER.warning( 

393 "Asynchronous Regional Access Boundary lookup raised an exception: %s", 

394 e, 

395 exc_info=True, 

396 ) 

397 regional_access_boundary_info = None 

398 

399 self._rab_manager.process_regional_access_boundary_info( 

400 regional_access_boundary_info 

401 ) 

402 

403 

404class _RegionalAccessBoundaryRefreshManager(object): 

405 """Manages a thread for background refreshing of the Regional Access Boundary.""" 

406 

407 def __init__(self): 

408 self._lock = threading.Lock() 

409 self._worker = None 

410 

411 def __getstate__(self): 

412 """Pickle helper that serializes the _lock and _worker attributes.""" 

413 state = self.__dict__.copy() 

414 state["_lock"] = None 

415 state["_worker"] = None 

416 return state 

417 

418 def __setstate__(self, state): 

419 """Pickle helper that deserializes the _lock and _worker attributes.""" 

420 self.__dict__.update(state) 

421 self._lock = threading.Lock() 

422 self._worker = None 

423 

424 def start_refresh(self, credentials, request, rab_manager): 

425 """ 

426 Starts a background thread to refresh the Regional Access Boundary if one is not already running. 

427 

428 Args: 

429 credentials (CredentialsWithRegionalAccessBoundary): The credentials 

430 to refresh. 

431 request (google.auth.transport.Request): The object used to make 

432 HTTP requests. 

433 rab_manager (_RegionalAccessBoundaryManager): The manager container to update. 

434 """ 

435 with self._lock: 

436 if self._worker and self._worker.is_alive(): 

437 # A refresh is already in progress. 

438 return 

439 

440 try: 

441 copied_request = copy.deepcopy(request) 

442 except Exception as e: 

443 if _helpers.is_logging_enabled(_LOGGER): 

444 _LOGGER.warning( 

445 "Could not deepcopy transport for background RAB refresh. " 

446 "Skipping background refresh to avoid thread safety issues. " 

447 "Exception: %s", 

448 e, 

449 ) 

450 return 

451 

452 self._worker = _RegionalAccessBoundaryRefreshThread( 

453 credentials, copied_request, rab_manager 

454 ) 

455 self._worker.start() 

456 

457 

458class _AsyncRegionalAccessBoundaryRefreshManager(object): 

459 """Manages a task for background refreshing of the Regional Access Boundary in async flows.""" 

460 

461 def __init__(self): 

462 self._lock = threading.Lock() 

463 self._worker_task = None 

464 

465 def __getstate__(self): 

466 """Pickle helper that excludes the un-picklable _lock and _worker_task attributes from serialization.""" 

467 state = self.__dict__.copy() 

468 state["_lock"] = None 

469 state["_worker_task"] = None 

470 return state 

471 

472 def __setstate__(self, state): 

473 """Pickle helper that restores state and re-initializes the _lock and _worker_task attributes.""" 

474 self.__dict__.update(state) 

475 self._lock = threading.Lock() 

476 self._worker_task = None 

477 

478 def start_refresh(self, credentials, request, rab_manager): 

479 """ 

480 Starts a background task to refresh the Regional Access Boundary if one is not already running. 

481 

482 Args: 

483 credentials (CredentialsWithRegionalAccessBoundary): The credentials 

484 to refresh. 

485 request (google.auth.aio.transport.Request): The object used to make 

486 HTTP requests. 

487 rab_manager (_RegionalAccessBoundaryManager): The manager container to update. 

488 """ 

489 with self._lock: 

490 if self._worker_task and not self._worker_task.done(): 

491 # A refresh is already in progress. 

492 return 

493 

494 async def _worker(): 

495 try: 

496 # credentials._lookup_regional_access_boundary should be async in the async creds class 

497 regional_access_boundary_info = ( 

498 await credentials._lookup_regional_access_boundary(request) 

499 ) 

500 except Exception as e: 

501 if _helpers.is_logging_enabled(_LOGGER): 

502 _LOGGER.warning( 

503 "Asynchronous Regional Access Boundary lookup raised an exception: %s", 

504 e, 

505 exc_info=True, 

506 ) 

507 regional_access_boundary_info = None 

508 

509 rab_manager.process_regional_access_boundary_info( 

510 regional_access_boundary_info 

511 ) 

512 

513 coro = _worker() 

514 try: 

515 self._worker_task = asyncio.create_task(coro) 

516 except Exception: 

517 coro.close() 

518 raise 

519 

520 

521def _get_domain() -> str: 

522 """Dynamically determines the domain for IAM credentials based on active mTLS configuration. 

523 

524 Returns: 

525 str: The dynamic domain string. 

526 """ 

527 from google.auth.transport import _mtls_helper 

528 

529 if ( 

530 hasattr(_mtls_helper, "check_use_client_cert") 

531 and _mtls_helper.check_use_client_cert() 

532 ): 

533 return f"iamcredentials.mtls.{_helpers.DEFAULT_UNIVERSE_DOMAIN}" 

534 else: 

535 return f"iamcredentials.{_helpers.DEFAULT_UNIVERSE_DOMAIN}" 

536 

537 

538def get_service_account_rab_endpoint(service_account_email: str) -> str: 

539 """Builds the Regional Access Boundary lookup URL for service accounts. 

540 

541 Args: 

542 service_account_email: The service account email. 

543 

544 Returns: 

545 str: The complete lookup URL. 

546 """ 

547 return f"https://{_get_domain()}/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations" 

548 

549 

550def get_workforce_pool_rab_endpoint(pool_id: str) -> str: 

551 """Builds the Regional Access Boundary lookup URL for workforce pools. 

552 

553 Args: 

554 pool_id: The workforce pool ID. 

555 

556 Returns: 

557 str: The complete lookup URL. 

558 """ 

559 return f"https://{_get_domain()}/v1/locations/global/workforcePools/{pool_id}/allowedLocations" 

560 

561 

562def get_workload_identity_pool_rab_endpoint(project_number: str, pool_id: str) -> str: 

563 """Builds the Regional Access Boundary lookup URL for workload identity pools. 

564 

565 Args: 

566 project_number: The Google Cloud project number. 

567 pool_id: The workload identity pool ID. 

568 

569 Returns: 

570 str: The complete lookup URL. 

571 """ 

572 return f"https://{_get_domain()}/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations"