Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_regional_access_boundary_utils.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

227 statements  

1# Copyright 2026 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Utilities for Regional Access Boundary management.""" 

16 

17import asyncio 

18import copy 

19import datetime 

20import functools 

21import inspect 

22import logging 

23import threading 

24from typing import NamedTuple, Optional, TYPE_CHECKING 

25 

26from google.auth import _helpers 

27 

28if TYPE_CHECKING: # pragma: NO COVER 

29 import google.auth.credentials 

30 import google.auth.transport 

31 

32_LOGGER = logging.getLogger(__name__) 

33 

34 

35# The default lifetime for a cached Regional Access Boundary. 

36DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL = datetime.timedelta(hours=6) 

37 

38# The period of time prior to the boundary's expiration when a background refresh 

39# is proactively triggered. 

40REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD = datetime.timedelta(hours=1) 

41 

42# The initial cooldown period for a failed Regional Access Boundary lookup. 

43DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(minutes=15) 

44 

45# The maximum cooldown period for a failed Regional Access Boundary lookup. 

46MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(hours=6) 

47 

48 

49# The header key used for Regional Access Boundaries. 

50_REGIONAL_ACCESS_BOUNDARY_HEADER = "x-allowed-locations" 

51 

52 

53class _RegionalAccessBoundaryData(NamedTuple): 

54 """Data container for a Regional Access Boundary snapshot. 

55 

56 Attributes: 

57 encoded_locations (Optional[str]): The encoded Regional Access Boundary string. 

58 expiry (Optional[datetime.datetime]): The hard expiration time of the boundary data. 

59 cooldown_expiry (Optional[datetime.datetime]): The time until which further lookups are skipped. 

60 cooldown_duration (datetime.timedelta): The current duration for the exponential cooldown. 

61 """ 

62 

63 encoded_locations: Optional[str] 

64 expiry: Optional[datetime.datetime] 

65 cooldown_expiry: Optional[datetime.datetime] 

66 cooldown_duration: datetime.timedelta 

67 

68 

69class _RegionalAccessBoundaryManager(object): 

70 """Manages the Regional Access Boundary state and its background refresh. 

71 

72 The actual data is held in an immutable `_RegionalAccessBoundaryData` object 

73 and is swapped atomically to ensure thread-safe, lock-free reads. 

74 """ 

75 

76 def __init__(self): 

77 self._data = _RegionalAccessBoundaryData( 

78 encoded_locations=None, 

79 expiry=None, 

80 cooldown_expiry=None, 

81 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

82 ) 

83 self.refresh_manager = _RegionalAccessBoundaryRefreshManager() 

84 self._update_lock = threading.Lock() 

85 self._use_blocking_regional_access_boundary_lookup = False 

86 

87 def __getstate__(self): 

88 """Pickle helper that serializes the _update_lock attribute.""" 

89 state = self.__dict__.copy() 

90 state["_update_lock"] = None 

91 return state 

92 

93 def __setstate__(self, state): 

94 """Pickle helper that deserializes the _update_lock attribute.""" 

95 self.__dict__.update(state) 

96 self._update_lock = threading.Lock() 

97 

98 def __eq__(self, other): 

99 """Checks if two managers are equal.""" 

100 if not isinstance(other, _RegionalAccessBoundaryManager): 

101 return NotImplemented 

102 return ( 

103 self._data == other._data 

104 and self._use_blocking_regional_access_boundary_lookup 

105 == other._use_blocking_regional_access_boundary_lookup 

106 ) 

107 

108 def enable_blocking_lookup(self): 

109 """Enables blocking Regional Access Boundary lookup. 

110 

111 When enabled, the Regional Access Boundary lookup will be performed 

112 synchronously in the calling thread instead of asynchronously in a 

113 background thread. 

114 """ 

115 self._use_blocking_regional_access_boundary_lookup = True 

116 

117 def set_initial_regional_access_boundary(self, encoded_locations=None, expiry=None): 

118 """Manually sets the regional access boundary to the client provided initial values. 

119 

120 Args: 

121 encoded_locations (Optional[str]): The encoded locations string. 

122 expiry (Optional[datetime.datetime]): The expiry time for the boundary. 

123 If encoded_locations is not provided, expiry is ignored. 

124 """ 

125 if not encoded_locations: 

126 expiry = None 

127 

128 self._data = _RegionalAccessBoundaryData( 

129 encoded_locations=encoded_locations, 

130 expiry=expiry, 

131 cooldown_expiry=None, 

132 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

133 ) 

134 

135 def apply_headers(self, headers): 

136 """Applies the Regional Access Boundary header to the provided dictionary. 

137 

138 If the boundary is valid, the 'x-allowed-locations' header is added 

139 or updated. Otherwise, the header is removed to ensure no stale 

140 data is sent. 

141 

142 Args: 

143 headers (MutableMapping[str, str]): The headers dictionary to update. 

144 """ 

145 rab_data = self._data 

146 

147 if rab_data.encoded_locations and ( 

148 rab_data.expiry is not None and _helpers.utcnow() < rab_data.expiry 

149 ): 

150 headers[_REGIONAL_ACCESS_BOUNDARY_HEADER] = rab_data.encoded_locations 

151 else: 

152 headers.pop(_REGIONAL_ACCESS_BOUNDARY_HEADER, None) 

153 

154 def _should_refresh(self): 

155 """Checks if the Regional Access Boundary data needs a refresh and is not in cooldown. 

156 

157 Returns: 

158 bool: True if a refresh is required, False otherwise. 

159 """ 

160 rab_data = self._data 

161 

162 # Don't start a new refresh if the Regional Access Boundary info is still fresh. 

163 if ( 

164 rab_data.encoded_locations 

165 and rab_data.expiry 

166 and _helpers.utcnow() 

167 < (rab_data.expiry - REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD) 

168 ): 

169 return False 

170 

171 # Don't start a new refresh if the cooldown is still in effect. 

172 if rab_data.cooldown_expiry and _helpers.utcnow() < rab_data.cooldown_expiry: 

173 return False 

174 

175 return True 

176 

177 def maybe_start_refresh(self, credentials, request): 

178 """Starts a background thread to refresh the Regional Access Boundary if needed. 

179 

180 Args: 

181 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

182 request (google.auth.transport.Request): The object used to make HTTP requests. 

183 """ 

184 if not self._should_refresh(): 

185 return 

186 

187 # If all checks pass, start the background refresh. 

188 if self._use_blocking_regional_access_boundary_lookup: 

189 self.start_blocking_refresh(credentials, request) 

190 else: 

191 self.refresh_manager.start_refresh(credentials, request, self) 

192 

193 async def maybe_start_refresh_async(self, credentials, request): 

194 """Starts a background refresh or performs a blocking refresh asynchronously. 

195 

196 Args: 

197 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

198 request (google.auth.aio.transport.Request): The object used to make HTTP requests. 

199 """ 

200 if not self._should_refresh(): 

201 return 

202 

203 # If all checks pass, start the refresh. 

204 if self._use_blocking_regional_access_boundary_lookup: 

205 await self.start_blocking_refresh_async(credentials, request) 

206 else: 

207 self.refresh_manager.start_refresh(credentials, request, self) 

208 

209 def start_blocking_refresh(self, credentials, request): 

210 """Initiates a blocking lookup of the Regional Access Boundary. 

211 

212 If the lookup raises an exception, it is caught and logged as a warning, 

213 and the lookup is treated as a failure (entering cooldown). Exceptions 

214 are not propagated to the caller. 

215 

216 Args: 

217 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

218 request (google.auth.transport.Request): The object used to make HTTP requests. 

219 """ 

220 # Async credentials do not support blocking lookups. 

221 if inspect.iscoroutinefunction(credentials._lookup_regional_access_boundary): 

222 if _helpers.is_logging_enabled(_LOGGER): 

223 _LOGGER.warning( 

224 "Blocking Regional Access Boundary lookup is not supported for async credentials." 

225 ) 

226 self.process_regional_access_boundary_info(None) 

227 return 

228 

229 try: 

230 # The fail_fast parameter is set to True to ensure we don't block the calling 

231 # thread for too long. This will do two things: 1) set a timeout to 3s 

232 # instead of the default 120s and 2) ensure we do not retry at all 

233 regional_access_boundary_info = ( 

234 credentials._lookup_regional_access_boundary(request, fail_fast=True) 

235 ) 

236 except Exception as e: 

237 if _helpers.is_logging_enabled(_LOGGER): 

238 _LOGGER.warning( 

239 "Blocking Regional Access Boundary lookup raised an exception: %s", 

240 e, 

241 exc_info=True, 

242 ) 

243 regional_access_boundary_info = None 

244 

245 self.process_regional_access_boundary_info(regional_access_boundary_info) 

246 

247 async def start_blocking_refresh_async(self, credentials, request): 

248 """Initiates a blocking lookup of the Regional Access Boundary asynchronously. 

249 

250 If the lookup raises an exception, it is caught and logged as a warning, 

251 and the lookup is treated as a failure (entering cooldown). Exceptions 

252 are not propagated to the caller. 

253 

254 Args: 

255 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

256 request (google.auth.aio.transport.Request): The object used to make HTTP requests. 

257 """ 

258 try: 

259 # The fail_fast parameter is set to True to ensure we don't block the calling 

260 # thread for too long. This will do two things: 1) set a timeout to 3s 

261 # instead of the default 120s and 2) ensure we do not retry at all 

262 regional_access_boundary_info = ( 

263 await credentials._lookup_regional_access_boundary( 

264 request, fail_fast=True 

265 ) 

266 ) 

267 except Exception as e: 

268 if _helpers.is_logging_enabled(_LOGGER): 

269 _LOGGER.warning( 

270 "Regional Access Boundary lookup raised an exception: %s", 

271 e, 

272 exc_info=True, 

273 ) 

274 regional_access_boundary_info = None 

275 

276 self.process_regional_access_boundary_info(regional_access_boundary_info) 

277 

278 def process_regional_access_boundary_info(self, regional_access_boundary_info): 

279 """Processes the regional access boundary info and updates the state. 

280 

281 Args: 

282 regional_access_boundary_info (Optional[Mapping[str, str]]): The regional access 

283 boundary info to process. 

284 """ 

285 with self._update_lock: 

286 # Capture the current state before calculating updates. 

287 current_data = self._data 

288 

289 if regional_access_boundary_info: 

290 # On success, update the boundary and its expiry, and clear any cooldown. 

291 encoded_locations = regional_access_boundary_info.get( 

292 "encodedLocations" 

293 ) 

294 updated_data = _RegionalAccessBoundaryData( 

295 encoded_locations=encoded_locations, 

296 expiry=_helpers.utcnow() + DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL, 

297 cooldown_expiry=None, 

298 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

299 ) 

300 if _helpers.is_logging_enabled(_LOGGER): 

301 _LOGGER.debug("Regional Access Boundary lookup successful.") 

302 else: 

303 # On failure, calculate cooldown and update state. 

304 if _helpers.is_logging_enabled(_LOGGER): 

305 _LOGGER.warning( 

306 "Regional Access Boundary lookup failed. Entering cooldown." 

307 ) 

308 

309 next_cooldown_expiry = ( 

310 _helpers.utcnow() + current_data.cooldown_duration 

311 ) 

312 next_cooldown_duration = min( 

313 current_data.cooldown_duration * 2, 

314 MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

315 ) 

316 

317 # If the refresh failed, we keep reusing the existing data unless 

318 # it has reached its hard expiration time. 

319 if current_data.expiry and _helpers.utcnow() > current_data.expiry: 

320 next_encoded_locations = None 

321 next_expiry = None 

322 else: 

323 next_encoded_locations = current_data.encoded_locations 

324 next_expiry = current_data.expiry 

325 

326 updated_data = _RegionalAccessBoundaryData( 

327 encoded_locations=next_encoded_locations, 

328 expiry=next_expiry, 

329 cooldown_expiry=next_cooldown_expiry, 

330 cooldown_duration=next_cooldown_duration, 

331 ) 

332 

333 # Perform the atomic swap of the state object. 

334 self._data = updated_data 

335 

336 

337class _RegionalAccessBoundaryRefreshThread(threading.Thread): 

338 """Thread for background refreshing of the Regional Access Boundary.""" 

339 

340 def __init__( 

341 self, 

342 credentials: "google.auth.credentials.CredentialsWithRegionalAccessBoundary", # noqa: F821 

343 request: "google.auth.transport.Request", # noqa: F821 

344 rab_manager: "_RegionalAccessBoundaryManager", 

345 ): 

346 super().__init__() 

347 self.daemon = True 

348 self._credentials = credentials 

349 self._request = request 

350 self._rab_manager = rab_manager 

351 

352 def run(self): 

353 """ 

354 Performs the Regional Access Boundary lookup and updates the state. 

355 

356 This method is run in a separate thread. It delegates the actual lookup 

357 to the credentials object's `_lookup_regional_access_boundary` method. 

358 Based on the lookup's outcome (success or complete failure after retries), 

359 it updates the cached Regional Access Boundary information, 

360 its expiry, its cooldown expiry, and its exponential cooldown duration. 

361 """ 

362 # Catch exceptions (e.g., from the underlying transport) to prevent the 

363 # background thread from crashing. This ensures we can gracefully enter 

364 # an exponential cooldown state on failure. 

365 try: 

366 regional_access_boundary_info = ( 

367 self._credentials._lookup_regional_access_boundary(self._request) 

368 ) 

369 except Exception as e: 

370 if _helpers.is_logging_enabled(_LOGGER): 

371 _LOGGER.warning( 

372 "Asynchronous Regional Access Boundary lookup raised an exception: %s", 

373 e, 

374 exc_info=True, 

375 ) 

376 regional_access_boundary_info = None 

377 

378 self._rab_manager.process_regional_access_boundary_info( 

379 regional_access_boundary_info 

380 ) 

381 

382 

383class _RegionalAccessBoundaryRefreshManager(object): 

384 """Manages a thread for background refreshing of the Regional Access Boundary.""" 

385 

386 def __init__(self): 

387 self._lock = threading.Lock() 

388 self._worker = None 

389 

390 def __getstate__(self): 

391 """Pickle helper that serializes the _lock and _worker attributes.""" 

392 state = self.__dict__.copy() 

393 state["_lock"] = None 

394 state["_worker"] = None 

395 return state 

396 

397 def __setstate__(self, state): 

398 """Pickle helper that deserializes the _lock and _worker attributes.""" 

399 self.__dict__.update(state) 

400 self._lock = threading.Lock() 

401 self._worker = None 

402 

403 def start_refresh(self, credentials, request, rab_manager): 

404 """ 

405 Starts a background thread to refresh the Regional Access Boundary if one is not already running. 

406 

407 Args: 

408 credentials (CredentialsWithRegionalAccessBoundary): The credentials 

409 to refresh. 

410 request (google.auth.transport.Request): The object used to make 

411 HTTP requests. 

412 rab_manager (_RegionalAccessBoundaryManager): The manager container to update. 

413 """ 

414 with self._lock: 

415 if self._worker and self._worker.is_alive(): 

416 # A refresh is already in progress. 

417 return 

418 

419 try: 

420 copied_request = copy.deepcopy(request) 

421 except Exception as e: 

422 if _helpers.is_logging_enabled(_LOGGER): 

423 _LOGGER.warning( 

424 "Could not deepcopy transport for background RAB refresh. " 

425 "Skipping background refresh to avoid thread safety issues. " 

426 "Exception: %s", 

427 e, 

428 ) 

429 return 

430 

431 self._worker = _RegionalAccessBoundaryRefreshThread( 

432 credentials, copied_request, rab_manager 

433 ) 

434 self._worker.start() 

435 

436 

437def _prepare_async_lookup_callable(request): 

438 """Unwraps a request callable, clones the transport, and returns the new callable. 

439 

440 Args: 

441 request: The original request callable (e.g. functools.partial or raw Request). 

442 

443 Returns: 

444 Tuple[Callable, Any, bool]: A tuple containing the new lookup callable, the 

445 underlying request object, and a boolean indicating if it was cloned. 

446 """ 

447 is_partial = isinstance(request, functools.partial) 

448 base_callable = request.func if is_partial else request 

449 

450 if not hasattr(base_callable, "_clone"): 

451 return request, base_callable, False 

452 

453 cloned_callable = base_callable._clone() 

454 is_cloned = cloned_callable is not base_callable 

455 

456 if is_partial: 

457 new_request = functools.partial( 

458 cloned_callable, *request.args, **request.keywords 

459 ) 

460 else: 

461 new_request = cloned_callable 

462 

463 return new_request, cloned_callable, is_cloned 

464 

465 

466async def _close_cloned_request(lookup_request, is_cloned): 

467 """Safely closes the underlying cloned request transport, if applicable. 

468 

469 Args: 

470 lookup_request (Any): The request object/transport to close. 

471 is_cloned (bool): Whether the request was actually cloned. 

472 """ 

473 if not is_cloned or not hasattr(lookup_request, "close"): 

474 return 

475 

476 is_async = False 

477 try: 

478 maybe_coro = lookup_request.close() 

479 if is_async := inspect.isawaitable(maybe_coro): 

480 await maybe_coro 

481 except Exception as e: 

482 if _helpers.is_logging_enabled(_LOGGER): 

483 adapter_type = " asynchronous " if is_async else " " 

484 _LOGGER.warning( 

485 "Failed to cleanly close cloned%srequest transport: %s", 

486 adapter_type, 

487 e, 

488 exc_info=True, 

489 ) 

490 

491 

492class _AsyncRegionalAccessBoundaryRefreshManager(object): 

493 """Manages a task for background refreshing of the Regional Access Boundary in async flows.""" 

494 

495 def __init__(self): 

496 self._lock = threading.Lock() 

497 self._worker_task = None 

498 

499 def __getstate__(self): 

500 """Pickle helper that excludes the un-picklable _lock and _worker_task attributes from serialization.""" 

501 state = self.__dict__.copy() 

502 state["_lock"] = None 

503 state["_worker_task"] = None 

504 return state 

505 

506 def __setstate__(self, state): 

507 """Pickle helper that restores state and re-initializes the _lock and _worker_task attributes.""" 

508 self.__dict__.update(state) 

509 self._lock = threading.Lock() 

510 self._worker_task = None 

511 

512 def start_refresh(self, credentials, request, rab_manager): 

513 """ 

514 Starts a background task to refresh the Regional Access Boundary if one is not already running. 

515 

516 Args: 

517 credentials (CredentialsWithRegionalAccessBoundary): The credentials 

518 to refresh. 

519 request (google.auth.aio.transport.Request): The object used to make 

520 HTTP requests. 

521 rab_manager (_RegionalAccessBoundaryManager): The manager container to update. 

522 """ 

523 with self._lock: 

524 if self._worker_task and not self._worker_task.done(): 

525 # A refresh is already in progress. 

526 return 

527 

528 try: 

529 ( 

530 lookup_callable, 

531 lookup_request, 

532 is_cloned, 

533 ) = _prepare_async_lookup_callable(request) 

534 except Exception as e: 

535 if _helpers.is_logging_enabled(_LOGGER): 

536 _LOGGER.warning( 

537 "Synchronous cloning of request for Regional Access Boundary lookup failed: %s", 

538 e, 

539 exc_info=True, 

540 ) 

541 rab_manager.process_regional_access_boundary_info(None) 

542 return 

543 

544 async def _worker(): 

545 try: 

546 regional_access_boundary_info = ( 

547 await credentials._lookup_regional_access_boundary( 

548 lookup_callable 

549 ) 

550 ) 

551 except Exception as e: 

552 if _helpers.is_logging_enabled(_LOGGER): 

553 _LOGGER.warning( 

554 "Asynchronous Regional Access Boundary lookup raised an exception: %s", 

555 e, 

556 exc_info=True, 

557 ) 

558 regional_access_boundary_info = None 

559 finally: 

560 await _close_cloned_request(lookup_request, is_cloned) 

561 

562 rab_manager.process_regional_access_boundary_info( 

563 regional_access_boundary_info 

564 ) 

565 

566 coro = _worker() 

567 try: 

568 self._worker_task = asyncio.create_task(coro) 

569 except Exception: 

570 # Clean up cloned request if task creation fails 

571 coro.close() 

572 try: 

573 asyncio.get_running_loop().create_task( 

574 _close_cloned_request(lookup_request, is_cloned) 

575 ) 

576 except RuntimeError: 

577 pass 

578 rab_manager.process_regional_access_boundary_info(None) 

579 raise 

580 

581 

582def _get_domain() -> str: 

583 """Dynamically determines the domain for IAM credentials based on active mTLS configuration. 

584 

585 Returns: 

586 str: The dynamic domain string. 

587 """ 

588 from google.auth.transport import _mtls_helper 

589 

590 if ( 

591 hasattr(_mtls_helper, "check_use_client_cert") 

592 and _mtls_helper.check_use_client_cert() 

593 ): 

594 return f"iamcredentials.mtls.{_helpers.DEFAULT_UNIVERSE_DOMAIN}" 

595 else: 

596 return f"iamcredentials.{_helpers.DEFAULT_UNIVERSE_DOMAIN}" 

597 

598 

599def get_service_account_rab_endpoint(service_account_email: str) -> str: 

600 """Builds the Regional Access Boundary lookup URL for service accounts. 

601 

602 Args: 

603 service_account_email: The service account email. 

604 

605 Returns: 

606 str: The complete lookup URL. 

607 """ 

608 return f"https://{_get_domain()}/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations" 

609 

610 

611def get_workforce_pool_rab_endpoint(pool_id: str) -> str: 

612 """Builds the Regional Access Boundary lookup URL for workforce pools. 

613 

614 Args: 

615 pool_id: The workforce pool ID. 

616 

617 Returns: 

618 str: The complete lookup URL. 

619 """ 

620 return f"https://{_get_domain()}/v1/locations/global/workforcePools/{pool_id}/allowedLocations" 

621 

622 

623def get_workload_identity_pool_rab_endpoint(project_number: str, pool_id: str) -> str: 

624 """Builds the Regional Access Boundary lookup URL for workload identity pools. 

625 

626 Args: 

627 project_number: The Google Cloud project number. 

628 pool_id: The workload identity pool ID. 

629 

630 Returns: 

631 str: The complete lookup URL. 

632 """ 

633 return f"https://{_get_domain()}/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations"