Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_regional_access_boundary_utils.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

217 statements  

1# Copyright 2026 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Utilities for Regional Access Boundary management.""" 

16 

17import asyncio 

18import copy 

19import datetime 

20import functools 

21import inspect 

22import logging 

23import threading 

24from typing import NamedTuple, Optional, TYPE_CHECKING 

25 

26from google.auth import _helpers 

27 

28if TYPE_CHECKING: # pragma: NO COVER 

29 import google.auth.credentials 

30 import google.auth.transport 

31 

32_LOGGER = logging.getLogger(__name__) 

33 

34 

35# The default lifetime for a cached Regional Access Boundary. 

36DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL = datetime.timedelta(hours=6) 

37 

38# The period of time prior to the boundary's expiration when a background refresh 

39# is proactively triggered. 

40REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD = datetime.timedelta(hours=1) 

41 

42# The initial cooldown period for a failed Regional Access Boundary lookup. 

43DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(minutes=15) 

44 

45# The maximum cooldown period for a failed Regional Access Boundary lookup. 

46MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(hours=6) 

47 

48 

49# The header key used for Regional Access Boundaries. 

50_REGIONAL_ACCESS_BOUNDARY_HEADER = "x-allowed-locations" 

51 

52 

53class _RegionalAccessBoundaryData(NamedTuple): 

54 """Data container for a Regional Access Boundary snapshot. 

55 

56 Attributes: 

57 encoded_locations (Optional[str]): The encoded Regional Access Boundary string. 

58 expiry (Optional[datetime.datetime]): The hard expiration time of the boundary data. 

59 cooldown_expiry (Optional[datetime.datetime]): The time until which further lookups are skipped. 

60 cooldown_duration (datetime.timedelta): The current duration for the exponential cooldown. 

61 """ 

62 

63 encoded_locations: Optional[str] 

64 expiry: Optional[datetime.datetime] 

65 cooldown_expiry: Optional[datetime.datetime] 

66 cooldown_duration: datetime.timedelta 

67 

68 

69class _RegionalAccessBoundaryManager(object): 

70 """Manages the Regional Access Boundary state and its background refresh. 

71 

72 The actual data is held in an immutable `_RegionalAccessBoundaryData` object 

73 and is swapped atomically to ensure thread-safe, lock-free reads. 

74 """ 

75 

76 def __init__(self): 

77 self._data = _RegionalAccessBoundaryData( 

78 encoded_locations=None, 

79 expiry=None, 

80 cooldown_expiry=None, 

81 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

82 ) 

83 self.refresh_manager = _RegionalAccessBoundaryRefreshManager() 

84 self._update_lock = threading.Lock() 

85 self._use_blocking_regional_access_boundary_lookup = False 

86 

87 def __getstate__(self): 

88 """Pickle helper that serializes the _update_lock attribute.""" 

89 state = self.__dict__.copy() 

90 state["_update_lock"] = None 

91 return state 

92 

93 def __setstate__(self, state): 

94 """Pickle helper that deserializes the _update_lock attribute.""" 

95 self.__dict__.update(state) 

96 self._update_lock = threading.Lock() 

97 

98 def __eq__(self, other): 

99 """Checks if two managers are equal.""" 

100 if not isinstance(other, _RegionalAccessBoundaryManager): 

101 return NotImplemented 

102 return ( 

103 self._data == other._data 

104 and self._use_blocking_regional_access_boundary_lookup 

105 == other._use_blocking_regional_access_boundary_lookup 

106 ) 

107 

108 def enable_blocking_lookup(self): 

109 """Enables blocking Regional Access Boundary lookup. 

110 

111 When enabled, the Regional Access Boundary lookup will be performed 

112 synchronously in the calling thread instead of asynchronously in a 

113 background thread. 

114 """ 

115 self._use_blocking_regional_access_boundary_lookup = True 

116 

117 def set_initial_regional_access_boundary(self, encoded_locations=None, expiry=None): 

118 """Manually sets the regional access boundary to the client provided initial values. 

119 

120 Args: 

121 encoded_locations (Optional[str]): The encoded locations string. 

122 expiry (Optional[datetime.datetime]): The expiry time for the boundary. 

123 If encoded_locations is not provided, expiry is ignored. 

124 """ 

125 if not encoded_locations: 

126 expiry = None 

127 

128 self._data = _RegionalAccessBoundaryData( 

129 encoded_locations=encoded_locations, 

130 expiry=expiry, 

131 cooldown_expiry=None, 

132 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

133 ) 

134 

135 def apply_headers(self, headers): 

136 """Applies the Regional Access Boundary header to the provided dictionary. 

137 

138 If the boundary is valid, the 'x-allowed-locations' header is added 

139 or updated. Otherwise, the header is removed to ensure no stale 

140 data is sent. 

141 

142 Args: 

143 headers (MutableMapping[str, str]): The headers dictionary to update. 

144 """ 

145 rab_data = self._data 

146 

147 if rab_data.encoded_locations and ( 

148 rab_data.expiry is not None and _helpers.utcnow() < rab_data.expiry 

149 ): 

150 headers[_REGIONAL_ACCESS_BOUNDARY_HEADER] = rab_data.encoded_locations 

151 else: 

152 headers.pop(_REGIONAL_ACCESS_BOUNDARY_HEADER, None) 

153 

154 def _should_refresh(self): 

155 """Checks if the Regional Access Boundary data needs a refresh and is not in cooldown. 

156 

157 Returns: 

158 bool: True if a refresh is required, False otherwise. 

159 """ 

160 rab_data = self._data 

161 

162 # Don't start a new refresh if the Regional Access Boundary info is still fresh. 

163 if ( 

164 rab_data.encoded_locations 

165 and rab_data.expiry 

166 and _helpers.utcnow() 

167 < (rab_data.expiry - REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD) 

168 ): 

169 return False 

170 

171 # Don't start a new refresh if the cooldown is still in effect. 

172 if rab_data.cooldown_expiry and _helpers.utcnow() < rab_data.cooldown_expiry: 

173 return False 

174 

175 return True 

176 

177 def maybe_start_refresh(self, credentials, request): 

178 """Starts a background thread to refresh the Regional Access Boundary if needed. 

179 

180 Args: 

181 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

182 request (google.auth.transport.Request): The object used to make HTTP requests. 

183 """ 

184 if not self._should_refresh(): 

185 return 

186 

187 # If all checks pass, start the background refresh. 

188 if self._use_blocking_regional_access_boundary_lookup: 

189 self.start_blocking_refresh(credentials, request) 

190 else: 

191 self.refresh_manager.start_refresh(credentials, request, self) 

192 

193 async def maybe_start_refresh_async(self, credentials, request): 

194 """Starts a background refresh or performs a blocking refresh asynchronously. 

195 

196 Args: 

197 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

198 request (google.auth.aio.transport.Request): The object used to make HTTP requests. 

199 """ 

200 if not self._should_refresh(): 

201 return 

202 

203 # If all checks pass, start the refresh. 

204 if self._use_blocking_regional_access_boundary_lookup: 

205 await self.start_blocking_refresh_async(credentials, request) 

206 else: 

207 self.refresh_manager.start_refresh(credentials, request, self) 

208 

209 def start_blocking_refresh(self, credentials, request): 

210 """Initiates a blocking lookup of the Regional Access Boundary. 

211 

212 If the lookup raises an exception, it is caught and logged as a warning, 

213 and the lookup is treated as a failure (entering cooldown). Exceptions 

214 are not propagated to the caller. 

215 

216 Args: 

217 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

218 request (google.auth.transport.Request): The object used to make HTTP requests. 

219 """ 

220 # Async credentials do not support blocking lookups. 

221 if inspect.iscoroutinefunction(credentials._lookup_regional_access_boundary): 

222 _LOGGER.debug( 

223 "Blocking Regional Access Boundary lookup is not supported for async credentials." 

224 ) 

225 self.process_regional_access_boundary_info(None) 

226 return 

227 

228 try: 

229 # The fail_fast parameter is set to True to ensure we don't block the calling 

230 # thread for too long. This will do two things: 1) set a timeout to 3s 

231 # instead of the default 120s and 2) ensure we do not retry at all 

232 regional_access_boundary_info = ( 

233 credentials._lookup_regional_access_boundary(request, fail_fast=True) 

234 ) 

235 except Exception as e: 

236 _LOGGER.debug( 

237 "Blocking Regional Access Boundary lookup raised an exception: %s", 

238 e, 

239 exc_info=True, 

240 ) 

241 regional_access_boundary_info = None 

242 

243 self.process_regional_access_boundary_info(regional_access_boundary_info) 

244 

245 async def start_blocking_refresh_async(self, credentials, request): 

246 """Initiates a blocking lookup of the Regional Access Boundary asynchronously. 

247 

248 If the lookup raises an exception, it is caught and logged as a warning, 

249 and the lookup is treated as a failure (entering cooldown). Exceptions 

250 are not propagated to the caller. 

251 

252 Args: 

253 credentials (google.auth.credentials.Credentials): The credentials to refresh. 

254 request (google.auth.aio.transport.Request): The object used to make HTTP requests. 

255 """ 

256 try: 

257 # The fail_fast parameter is set to True to ensure we don't block the calling 

258 # thread for too long. This will do two things: 1) set a timeout to 3s 

259 # instead of the default 120s and 2) ensure we do not retry at all 

260 regional_access_boundary_info = ( 

261 await credentials._lookup_regional_access_boundary( 

262 request, fail_fast=True 

263 ) 

264 ) 

265 except Exception as e: 

266 _LOGGER.debug( 

267 "Regional Access Boundary lookup raised an exception: %s", 

268 e, 

269 exc_info=True, 

270 ) 

271 regional_access_boundary_info = None 

272 

273 self.process_regional_access_boundary_info(regional_access_boundary_info) 

274 

275 def process_regional_access_boundary_info(self, regional_access_boundary_info): 

276 """Processes the regional access boundary info and updates the state. 

277 

278 Args: 

279 regional_access_boundary_info (Optional[Mapping[str, str]]): The regional access 

280 boundary info to process. 

281 """ 

282 with self._update_lock: 

283 # Capture the current state before calculating updates. 

284 current_data = self._data 

285 

286 if regional_access_boundary_info: 

287 # On success, update the boundary and its expiry, and clear any cooldown. 

288 encoded_locations = regional_access_boundary_info.get( 

289 "encodedLocations" 

290 ) 

291 updated_data = _RegionalAccessBoundaryData( 

292 encoded_locations=encoded_locations, 

293 expiry=_helpers.utcnow() + DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL, 

294 cooldown_expiry=None, 

295 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

296 ) 

297 _LOGGER.debug("Regional Access Boundary lookup successful.") 

298 else: 

299 # On failure, calculate cooldown and update state. 

300 _LOGGER.debug( 

301 "Regional Access Boundary lookup failed. Entering cooldown." 

302 ) 

303 

304 next_cooldown_expiry = ( 

305 _helpers.utcnow() + current_data.cooldown_duration 

306 ) 

307 next_cooldown_duration = min( 

308 current_data.cooldown_duration * 2, 

309 MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, 

310 ) 

311 

312 # If the refresh failed, we keep reusing the existing data unless 

313 # it has reached its hard expiration time. 

314 if current_data.expiry and _helpers.utcnow() > current_data.expiry: 

315 next_encoded_locations = None 

316 next_expiry = None 

317 else: 

318 next_encoded_locations = current_data.encoded_locations 

319 next_expiry = current_data.expiry 

320 

321 updated_data = _RegionalAccessBoundaryData( 

322 encoded_locations=next_encoded_locations, 

323 expiry=next_expiry, 

324 cooldown_expiry=next_cooldown_expiry, 

325 cooldown_duration=next_cooldown_duration, 

326 ) 

327 

328 # Perform the atomic swap of the state object. 

329 self._data = updated_data 

330 

331 

332class _RegionalAccessBoundaryRefreshThread(threading.Thread): 

333 """Thread for background refreshing of the Regional Access Boundary.""" 

334 

335 def __init__( 

336 self, 

337 credentials: "google.auth.credentials.CredentialsWithRegionalAccessBoundary", # noqa: F821 

338 request: "google.auth.transport.Request", # noqa: F821 

339 rab_manager: "_RegionalAccessBoundaryManager", 

340 ): 

341 super().__init__() 

342 self.daemon = True 

343 self._credentials = credentials 

344 self._request = request 

345 self._rab_manager = rab_manager 

346 

347 def run(self): 

348 """ 

349 Performs the Regional Access Boundary lookup and updates the state. 

350 

351 This method is run in a separate thread. It delegates the actual lookup 

352 to the credentials object's `_lookup_regional_access_boundary` method. 

353 Based on the lookup's outcome (success or complete failure after retries), 

354 it updates the cached Regional Access Boundary information, 

355 its expiry, its cooldown expiry, and its exponential cooldown duration. 

356 """ 

357 # Catch exceptions (e.g., from the underlying transport) to prevent the 

358 # background thread from crashing. This ensures we can gracefully enter 

359 # an exponential cooldown state on failure. 

360 try: 

361 regional_access_boundary_info = ( 

362 self._credentials._lookup_regional_access_boundary(self._request) 

363 ) 

364 except Exception as e: 

365 _LOGGER.debug( 

366 "Asynchronous Regional Access Boundary lookup raised an exception: %s", 

367 e, 

368 exc_info=True, 

369 ) 

370 regional_access_boundary_info = None 

371 

372 self._rab_manager.process_regional_access_boundary_info( 

373 regional_access_boundary_info 

374 ) 

375 

376 

377class _RegionalAccessBoundaryRefreshManager(object): 

378 """Manages a thread for background refreshing of the Regional Access Boundary.""" 

379 

380 def __init__(self): 

381 self._lock = threading.Lock() 

382 self._worker = None 

383 

384 def __getstate__(self): 

385 """Pickle helper that serializes the _lock and _worker attributes.""" 

386 state = self.__dict__.copy() 

387 state["_lock"] = None 

388 state["_worker"] = None 

389 return state 

390 

391 def __setstate__(self, state): 

392 """Pickle helper that deserializes the _lock and _worker attributes.""" 

393 self.__dict__.update(state) 

394 self._lock = threading.Lock() 

395 self._worker = None 

396 

397 def start_refresh(self, credentials, request, rab_manager): 

398 """ 

399 Starts a background thread to refresh the Regional Access Boundary if one is not already running. 

400 

401 Args: 

402 credentials (CredentialsWithRegionalAccessBoundary): The credentials 

403 to refresh. 

404 request (google.auth.transport.Request): The object used to make 

405 HTTP requests. 

406 rab_manager (_RegionalAccessBoundaryManager): The manager container to update. 

407 """ 

408 with self._lock: 

409 if self._worker and self._worker.is_alive(): 

410 # A refresh is already in progress. 

411 return 

412 

413 try: 

414 copied_request = copy.deepcopy(request) 

415 except Exception as e: 

416 _LOGGER.debug( 

417 "Could not deepcopy transport for background RAB refresh. " 

418 "Skipping background refresh to avoid thread safety issues. " 

419 "Exception: %s", 

420 e, 

421 ) 

422 return 

423 

424 self._worker = _RegionalAccessBoundaryRefreshThread( 

425 credentials, copied_request, rab_manager 

426 ) 

427 self._worker.start() 

428 

429 

430def _prepare_async_lookup_callable(request): 

431 """Unwraps a request callable, clones the transport, and returns the new callable. 

432 

433 Args: 

434 request: The original request callable (e.g. functools.partial or raw Request). 

435 

436 Returns: 

437 Tuple[Callable, Any, bool]: A tuple containing the new lookup callable, the 

438 underlying request object, and a boolean indicating if it was cloned. 

439 """ 

440 is_partial = isinstance(request, functools.partial) 

441 base_callable = request.func if is_partial else request 

442 

443 if not hasattr(base_callable, "_clone"): 

444 return request, base_callable, False 

445 

446 cloned_callable = base_callable._clone() 

447 is_cloned = cloned_callable is not base_callable 

448 

449 if is_partial: 

450 new_request = functools.partial( 

451 cloned_callable, *request.args, **request.keywords 

452 ) 

453 else: 

454 new_request = cloned_callable 

455 

456 return new_request, cloned_callable, is_cloned 

457 

458 

459async def _close_cloned_request(lookup_request, is_cloned): 

460 """Safely closes the underlying cloned request transport, if applicable. 

461 

462 Args: 

463 lookup_request (Any): The request object/transport to close. 

464 is_cloned (bool): Whether the request was actually cloned. 

465 """ 

466 if not is_cloned or not hasattr(lookup_request, "close"): 

467 return 

468 

469 is_async = False 

470 try: 

471 maybe_coro = lookup_request.close() 

472 if is_async := inspect.isawaitable(maybe_coro): 

473 await maybe_coro 

474 except Exception as e: 

475 adapter_type = " asynchronous " if is_async else " " 

476 _LOGGER.debug( 

477 "Failed to cleanly close cloned%srequest transport: %s", 

478 adapter_type, 

479 e, 

480 exc_info=True, 

481 ) 

482 

483 

484class _AsyncRegionalAccessBoundaryRefreshManager(object): 

485 """Manages a task for background refreshing of the Regional Access Boundary in async flows.""" 

486 

487 def __init__(self): 

488 self._lock = threading.Lock() 

489 self._worker_task = None 

490 

491 def __getstate__(self): 

492 """Pickle helper that excludes the un-picklable _lock and _worker_task attributes from serialization.""" 

493 state = self.__dict__.copy() 

494 state["_lock"] = None 

495 state["_worker_task"] = None 

496 return state 

497 

498 def __setstate__(self, state): 

499 """Pickle helper that restores state and re-initializes the _lock and _worker_task attributes.""" 

500 self.__dict__.update(state) 

501 self._lock = threading.Lock() 

502 self._worker_task = None 

503 

504 def start_refresh(self, credentials, request, rab_manager): 

505 """ 

506 Starts a background task to refresh the Regional Access Boundary if one is not already running. 

507 

508 Args: 

509 credentials (CredentialsWithRegionalAccessBoundary): The credentials 

510 to refresh. 

511 request (google.auth.aio.transport.Request): The object used to make 

512 HTTP requests. 

513 rab_manager (_RegionalAccessBoundaryManager): The manager container to update. 

514 """ 

515 with self._lock: 

516 if self._worker_task and not self._worker_task.done(): 

517 # A refresh is already in progress. 

518 return 

519 

520 try: 

521 ( 

522 lookup_callable, 

523 lookup_request, 

524 is_cloned, 

525 ) = _prepare_async_lookup_callable(request) 

526 except Exception as e: 

527 _LOGGER.debug( 

528 "Synchronous cloning of request for Regional Access Boundary lookup failed: %s", 

529 e, 

530 exc_info=True, 

531 ) 

532 rab_manager.process_regional_access_boundary_info(None) 

533 return 

534 

535 async def _worker(): 

536 try: 

537 regional_access_boundary_info = ( 

538 await credentials._lookup_regional_access_boundary( 

539 lookup_callable 

540 ) 

541 ) 

542 except Exception as e: 

543 _LOGGER.debug( 

544 "Asynchronous Regional Access Boundary lookup raised an exception: %s", 

545 e, 

546 exc_info=True, 

547 ) 

548 regional_access_boundary_info = None 

549 finally: 

550 await _close_cloned_request(lookup_request, is_cloned) 

551 

552 rab_manager.process_regional_access_boundary_info( 

553 regional_access_boundary_info 

554 ) 

555 

556 coro = _worker() 

557 try: 

558 self._worker_task = asyncio.create_task(coro) 

559 except Exception: 

560 # Clean up cloned request if task creation fails 

561 coro.close() 

562 try: 

563 asyncio.get_running_loop().create_task( 

564 _close_cloned_request(lookup_request, is_cloned) 

565 ) 

566 except RuntimeError: 

567 pass 

568 rab_manager.process_regional_access_boundary_info(None) 

569 raise 

570 

571 

572def _get_domain() -> str: 

573 """Dynamically determines the domain for IAM credentials based on active mTLS configuration. 

574 

575 Returns: 

576 str: The dynamic domain string. 

577 """ 

578 from google.auth.transport import _mtls_helper 

579 

580 if ( 

581 hasattr(_mtls_helper, "check_use_client_cert") 

582 and _mtls_helper.check_use_client_cert() 

583 ): 

584 return f"iamcredentials.mtls.{_helpers.DEFAULT_UNIVERSE_DOMAIN}" 

585 else: 

586 return f"iamcredentials.{_helpers.DEFAULT_UNIVERSE_DOMAIN}" 

587 

588 

589def get_service_account_rab_endpoint(service_account_email: str) -> str: 

590 """Builds the Regional Access Boundary lookup URL for service accounts. 

591 

592 Args: 

593 service_account_email: The service account email. 

594 

595 Returns: 

596 str: The complete lookup URL. 

597 """ 

598 return f"https://{_get_domain()}/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations" 

599 

600 

601def get_workforce_pool_rab_endpoint(pool_id: str) -> str: 

602 """Builds the Regional Access Boundary lookup URL for workforce pools. 

603 

604 Args: 

605 pool_id: The workforce pool ID. 

606 

607 Returns: 

608 str: The complete lookup URL. 

609 """ 

610 return f"https://{_get_domain()}/v1/locations/global/workforcePools/{pool_id}/allowedLocations" 

611 

612 

613def get_workload_identity_pool_rab_endpoint(project_number: str, pool_id: str) -> str: 

614 """Builds the Regional Access Boundary lookup URL for workload identity pools. 

615 

616 Args: 

617 project_number: The Google Cloud project number. 

618 pool_id: The workload identity pool ID. 

619 

620 Returns: 

621 str: The complete lookup URL. 

622 """ 

623 return f"https://{_get_domain()}/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations"