1# Copyright 2026 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Utilities for Regional Access Boundary management."""
16
17import asyncio
18import copy
19import datetime
20import functools
21import inspect
22import logging
23import threading
24from typing import NamedTuple, Optional, TYPE_CHECKING
25
26from google.auth import _helpers
27
28if TYPE_CHECKING: # pragma: NO COVER
29 import google.auth.credentials
30 import google.auth.transport
31
32_LOGGER = logging.getLogger(__name__)
33
34
35# The default lifetime for a cached Regional Access Boundary.
36DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL = datetime.timedelta(hours=6)
37
38# The period of time prior to the boundary's expiration when a background refresh
39# is proactively triggered.
40REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD = datetime.timedelta(hours=1)
41
42# The initial cooldown period for a failed Regional Access Boundary lookup.
43DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(minutes=15)
44
45# The maximum cooldown period for a failed Regional Access Boundary lookup.
46MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(hours=6)
47
48
49# The header key used for Regional Access Boundaries.
50_REGIONAL_ACCESS_BOUNDARY_HEADER = "x-allowed-locations"
51
52
53class _RegionalAccessBoundaryData(NamedTuple):
54 """Data container for a Regional Access Boundary snapshot.
55
56 Attributes:
57 encoded_locations (Optional[str]): The encoded Regional Access Boundary string.
58 expiry (Optional[datetime.datetime]): The hard expiration time of the boundary data.
59 cooldown_expiry (Optional[datetime.datetime]): The time until which further lookups are skipped.
60 cooldown_duration (datetime.timedelta): The current duration for the exponential cooldown.
61 """
62
63 encoded_locations: Optional[str]
64 expiry: Optional[datetime.datetime]
65 cooldown_expiry: Optional[datetime.datetime]
66 cooldown_duration: datetime.timedelta
67
68
69class _RegionalAccessBoundaryManager(object):
70 """Manages the Regional Access Boundary state and its background refresh.
71
72 The actual data is held in an immutable `_RegionalAccessBoundaryData` object
73 and is swapped atomically to ensure thread-safe, lock-free reads.
74 """
75
76 def __init__(self):
77 self._data = _RegionalAccessBoundaryData(
78 encoded_locations=None,
79 expiry=None,
80 cooldown_expiry=None,
81 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
82 )
83 self.refresh_manager = _RegionalAccessBoundaryRefreshManager()
84 self._update_lock = threading.Lock()
85 self._use_blocking_regional_access_boundary_lookup = False
86
87 def __getstate__(self):
88 """Pickle helper that serializes the _update_lock attribute."""
89 state = self.__dict__.copy()
90 state["_update_lock"] = None
91 return state
92
93 def __setstate__(self, state):
94 """Pickle helper that deserializes the _update_lock attribute."""
95 self.__dict__.update(state)
96 self._update_lock = threading.Lock()
97
98 def __eq__(self, other):
99 """Checks if two managers are equal."""
100 if not isinstance(other, _RegionalAccessBoundaryManager):
101 return NotImplemented
102 return (
103 self._data == other._data
104 and self._use_blocking_regional_access_boundary_lookup
105 == other._use_blocking_regional_access_boundary_lookup
106 )
107
108 def enable_blocking_lookup(self):
109 """Enables blocking Regional Access Boundary lookup.
110
111 When enabled, the Regional Access Boundary lookup will be performed
112 synchronously in the calling thread instead of asynchronously in a
113 background thread.
114 """
115 self._use_blocking_regional_access_boundary_lookup = True
116
117 def set_initial_regional_access_boundary(self, encoded_locations=None, expiry=None):
118 """Manually sets the regional access boundary to the client provided initial values.
119
120 Args:
121 encoded_locations (Optional[str]): The encoded locations string.
122 expiry (Optional[datetime.datetime]): The expiry time for the boundary.
123 If encoded_locations is not provided, expiry is ignored.
124 """
125 if not encoded_locations:
126 expiry = None
127
128 self._data = _RegionalAccessBoundaryData(
129 encoded_locations=encoded_locations,
130 expiry=expiry,
131 cooldown_expiry=None,
132 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
133 )
134
135 def apply_headers(self, headers):
136 """Applies the Regional Access Boundary header to the provided dictionary.
137
138 If the boundary is valid, the 'x-allowed-locations' header is added
139 or updated. Otherwise, the header is removed to ensure no stale
140 data is sent.
141
142 Args:
143 headers (MutableMapping[str, str]): The headers dictionary to update.
144 """
145 rab_data = self._data
146
147 if rab_data.encoded_locations and (
148 rab_data.expiry is not None and _helpers.utcnow() < rab_data.expiry
149 ):
150 headers[_REGIONAL_ACCESS_BOUNDARY_HEADER] = rab_data.encoded_locations
151 else:
152 headers.pop(_REGIONAL_ACCESS_BOUNDARY_HEADER, None)
153
154 def _should_refresh(self):
155 """Checks if the Regional Access Boundary data needs a refresh and is not in cooldown.
156
157 Returns:
158 bool: True if a refresh is required, False otherwise.
159 """
160 rab_data = self._data
161
162 # Don't start a new refresh if the Regional Access Boundary info is still fresh.
163 if (
164 rab_data.encoded_locations
165 and rab_data.expiry
166 and _helpers.utcnow()
167 < (rab_data.expiry - REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD)
168 ):
169 return False
170
171 # Don't start a new refresh if the cooldown is still in effect.
172 if rab_data.cooldown_expiry and _helpers.utcnow() < rab_data.cooldown_expiry:
173 return False
174
175 return True
176
177 def maybe_start_refresh(self, credentials, request):
178 """Starts a background thread to refresh the Regional Access Boundary if needed.
179
180 Args:
181 credentials (google.auth.credentials.Credentials): The credentials to refresh.
182 request (google.auth.transport.Request): The object used to make HTTP requests.
183 """
184 if not self._should_refresh():
185 return
186
187 # If all checks pass, start the background refresh.
188 if self._use_blocking_regional_access_boundary_lookup:
189 self.start_blocking_refresh(credentials, request)
190 else:
191 self.refresh_manager.start_refresh(credentials, request, self)
192
193 async def maybe_start_refresh_async(self, credentials, request):
194 """Starts a background refresh or performs a blocking refresh asynchronously.
195
196 Args:
197 credentials (google.auth.credentials.Credentials): The credentials to refresh.
198 request (google.auth.aio.transport.Request): The object used to make HTTP requests.
199 """
200 if not self._should_refresh():
201 return
202
203 # If all checks pass, start the refresh.
204 if self._use_blocking_regional_access_boundary_lookup:
205 await self.start_blocking_refresh_async(credentials, request)
206 else:
207 self.refresh_manager.start_refresh(credentials, request, self)
208
209 def start_blocking_refresh(self, credentials, request):
210 """Initiates a blocking lookup of the Regional Access Boundary.
211
212 If the lookup raises an exception, it is caught and logged as a warning,
213 and the lookup is treated as a failure (entering cooldown). Exceptions
214 are not propagated to the caller.
215
216 Args:
217 credentials (google.auth.credentials.Credentials): The credentials to refresh.
218 request (google.auth.transport.Request): The object used to make HTTP requests.
219 """
220 # Async credentials do not support blocking lookups.
221 if inspect.iscoroutinefunction(credentials._lookup_regional_access_boundary):
222 _LOGGER.debug(
223 "Blocking Regional Access Boundary lookup is not supported for async credentials."
224 )
225 self.process_regional_access_boundary_info(None)
226 return
227
228 try:
229 # The fail_fast parameter is set to True to ensure we don't block the calling
230 # thread for too long. This will do two things: 1) set a timeout to 3s
231 # instead of the default 120s and 2) ensure we do not retry at all
232 regional_access_boundary_info = (
233 credentials._lookup_regional_access_boundary(request, fail_fast=True)
234 )
235 except Exception as e:
236 _LOGGER.debug(
237 "Blocking Regional Access Boundary lookup raised an exception: %s",
238 e,
239 exc_info=True,
240 )
241 regional_access_boundary_info = None
242
243 self.process_regional_access_boundary_info(regional_access_boundary_info)
244
245 async def start_blocking_refresh_async(self, credentials, request):
246 """Initiates a blocking lookup of the Regional Access Boundary asynchronously.
247
248 If the lookup raises an exception, it is caught and logged as a warning,
249 and the lookup is treated as a failure (entering cooldown). Exceptions
250 are not propagated to the caller.
251
252 Args:
253 credentials (google.auth.credentials.Credentials): The credentials to refresh.
254 request (google.auth.aio.transport.Request): The object used to make HTTP requests.
255 """
256 try:
257 # The fail_fast parameter is set to True to ensure we don't block the calling
258 # thread for too long. This will do two things: 1) set a timeout to 3s
259 # instead of the default 120s and 2) ensure we do not retry at all
260 regional_access_boundary_info = (
261 await credentials._lookup_regional_access_boundary(
262 request, fail_fast=True
263 )
264 )
265 except Exception as e:
266 _LOGGER.debug(
267 "Regional Access Boundary lookup raised an exception: %s",
268 e,
269 exc_info=True,
270 )
271 regional_access_boundary_info = None
272
273 self.process_regional_access_boundary_info(regional_access_boundary_info)
274
275 def process_regional_access_boundary_info(self, regional_access_boundary_info):
276 """Processes the regional access boundary info and updates the state.
277
278 Args:
279 regional_access_boundary_info (Optional[Mapping[str, str]]): The regional access
280 boundary info to process.
281 """
282 with self._update_lock:
283 # Capture the current state before calculating updates.
284 current_data = self._data
285
286 if regional_access_boundary_info:
287 # On success, update the boundary and its expiry, and clear any cooldown.
288 encoded_locations = regional_access_boundary_info.get(
289 "encodedLocations"
290 )
291 updated_data = _RegionalAccessBoundaryData(
292 encoded_locations=encoded_locations,
293 expiry=_helpers.utcnow() + DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL,
294 cooldown_expiry=None,
295 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
296 )
297 _LOGGER.debug("Regional Access Boundary lookup successful.")
298 else:
299 # On failure, calculate cooldown and update state.
300 _LOGGER.debug(
301 "Regional Access Boundary lookup failed. Entering cooldown."
302 )
303
304 next_cooldown_expiry = (
305 _helpers.utcnow() + current_data.cooldown_duration
306 )
307 next_cooldown_duration = min(
308 current_data.cooldown_duration * 2,
309 MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
310 )
311
312 # If the refresh failed, we keep reusing the existing data unless
313 # it has reached its hard expiration time.
314 if current_data.expiry and _helpers.utcnow() > current_data.expiry:
315 next_encoded_locations = None
316 next_expiry = None
317 else:
318 next_encoded_locations = current_data.encoded_locations
319 next_expiry = current_data.expiry
320
321 updated_data = _RegionalAccessBoundaryData(
322 encoded_locations=next_encoded_locations,
323 expiry=next_expiry,
324 cooldown_expiry=next_cooldown_expiry,
325 cooldown_duration=next_cooldown_duration,
326 )
327
328 # Perform the atomic swap of the state object.
329 self._data = updated_data
330
331
332class _RegionalAccessBoundaryRefreshThread(threading.Thread):
333 """Thread for background refreshing of the Regional Access Boundary."""
334
335 def __init__(
336 self,
337 credentials: "google.auth.credentials.CredentialsWithRegionalAccessBoundary", # noqa: F821
338 request: "google.auth.transport.Request", # noqa: F821
339 rab_manager: "_RegionalAccessBoundaryManager",
340 ):
341 super().__init__()
342 self.daemon = True
343 self._credentials = credentials
344 self._request = request
345 self._rab_manager = rab_manager
346
347 def run(self):
348 """
349 Performs the Regional Access Boundary lookup and updates the state.
350
351 This method is run in a separate thread. It delegates the actual lookup
352 to the credentials object's `_lookup_regional_access_boundary` method.
353 Based on the lookup's outcome (success or complete failure after retries),
354 it updates the cached Regional Access Boundary information,
355 its expiry, its cooldown expiry, and its exponential cooldown duration.
356 """
357 # Catch exceptions (e.g., from the underlying transport) to prevent the
358 # background thread from crashing. This ensures we can gracefully enter
359 # an exponential cooldown state on failure.
360 try:
361 regional_access_boundary_info = (
362 self._credentials._lookup_regional_access_boundary(self._request)
363 )
364 except Exception as e:
365 _LOGGER.debug(
366 "Asynchronous Regional Access Boundary lookup raised an exception: %s",
367 e,
368 exc_info=True,
369 )
370 regional_access_boundary_info = None
371
372 self._rab_manager.process_regional_access_boundary_info(
373 regional_access_boundary_info
374 )
375
376
377class _RegionalAccessBoundaryRefreshManager(object):
378 """Manages a thread for background refreshing of the Regional Access Boundary."""
379
380 def __init__(self):
381 self._lock = threading.Lock()
382 self._worker = None
383
384 def __getstate__(self):
385 """Pickle helper that serializes the _lock and _worker attributes."""
386 state = self.__dict__.copy()
387 state["_lock"] = None
388 state["_worker"] = None
389 return state
390
391 def __setstate__(self, state):
392 """Pickle helper that deserializes the _lock and _worker attributes."""
393 self.__dict__.update(state)
394 self._lock = threading.Lock()
395 self._worker = None
396
397 def start_refresh(self, credentials, request, rab_manager):
398 """
399 Starts a background thread to refresh the Regional Access Boundary if one is not already running.
400
401 Args:
402 credentials (CredentialsWithRegionalAccessBoundary): The credentials
403 to refresh.
404 request (google.auth.transport.Request): The object used to make
405 HTTP requests.
406 rab_manager (_RegionalAccessBoundaryManager): The manager container to update.
407 """
408 with self._lock:
409 if self._worker and self._worker.is_alive():
410 # A refresh is already in progress.
411 return
412
413 try:
414 copied_request = copy.deepcopy(request)
415 except Exception as e:
416 _LOGGER.debug(
417 "Could not deepcopy transport for background RAB refresh. "
418 "Skipping background refresh to avoid thread safety issues. "
419 "Exception: %s",
420 e,
421 )
422 return
423
424 self._worker = _RegionalAccessBoundaryRefreshThread(
425 credentials, copied_request, rab_manager
426 )
427 self._worker.start()
428
429
430def _prepare_async_lookup_callable(request):
431 """Unwraps a request callable, clones the transport, and returns the new callable.
432
433 Args:
434 request: The original request callable (e.g. functools.partial or raw Request).
435
436 Returns:
437 Tuple[Callable, Any, bool]: A tuple containing the new lookup callable, the
438 underlying request object, and a boolean indicating if it was cloned.
439 """
440 is_partial = isinstance(request, functools.partial)
441 base_callable = request.func if is_partial else request
442
443 if not hasattr(base_callable, "_clone"):
444 return request, base_callable, False
445
446 cloned_callable = base_callable._clone()
447 is_cloned = cloned_callable is not base_callable
448
449 if is_partial:
450 new_request = functools.partial(
451 cloned_callable, *request.args, **request.keywords
452 )
453 else:
454 new_request = cloned_callable
455
456 return new_request, cloned_callable, is_cloned
457
458
459async def _close_cloned_request(lookup_request, is_cloned):
460 """Safely closes the underlying cloned request transport, if applicable.
461
462 Args:
463 lookup_request (Any): The request object/transport to close.
464 is_cloned (bool): Whether the request was actually cloned.
465 """
466 if not is_cloned or not hasattr(lookup_request, "close"):
467 return
468
469 is_async = False
470 try:
471 maybe_coro = lookup_request.close()
472 if is_async := inspect.isawaitable(maybe_coro):
473 await maybe_coro
474 except Exception as e:
475 adapter_type = " asynchronous " if is_async else " "
476 _LOGGER.debug(
477 "Failed to cleanly close cloned%srequest transport: %s",
478 adapter_type,
479 e,
480 exc_info=True,
481 )
482
483
484class _AsyncRegionalAccessBoundaryRefreshManager(object):
485 """Manages a task for background refreshing of the Regional Access Boundary in async flows."""
486
487 def __init__(self):
488 self._lock = threading.Lock()
489 self._worker_task = None
490
491 def __getstate__(self):
492 """Pickle helper that excludes the un-picklable _lock and _worker_task attributes from serialization."""
493 state = self.__dict__.copy()
494 state["_lock"] = None
495 state["_worker_task"] = None
496 return state
497
498 def __setstate__(self, state):
499 """Pickle helper that restores state and re-initializes the _lock and _worker_task attributes."""
500 self.__dict__.update(state)
501 self._lock = threading.Lock()
502 self._worker_task = None
503
504 def start_refresh(self, credentials, request, rab_manager):
505 """
506 Starts a background task to refresh the Regional Access Boundary if one is not already running.
507
508 Args:
509 credentials (CredentialsWithRegionalAccessBoundary): The credentials
510 to refresh.
511 request (google.auth.aio.transport.Request): The object used to make
512 HTTP requests.
513 rab_manager (_RegionalAccessBoundaryManager): The manager container to update.
514 """
515 with self._lock:
516 if self._worker_task and not self._worker_task.done():
517 # A refresh is already in progress.
518 return
519
520 try:
521 (
522 lookup_callable,
523 lookup_request,
524 is_cloned,
525 ) = _prepare_async_lookup_callable(request)
526 except Exception as e:
527 _LOGGER.debug(
528 "Synchronous cloning of request for Regional Access Boundary lookup failed: %s",
529 e,
530 exc_info=True,
531 )
532 rab_manager.process_regional_access_boundary_info(None)
533 return
534
535 async def _worker():
536 try:
537 regional_access_boundary_info = (
538 await credentials._lookup_regional_access_boundary(
539 lookup_callable
540 )
541 )
542 except Exception as e:
543 _LOGGER.debug(
544 "Asynchronous Regional Access Boundary lookup raised an exception: %s",
545 e,
546 exc_info=True,
547 )
548 regional_access_boundary_info = None
549 finally:
550 await _close_cloned_request(lookup_request, is_cloned)
551
552 rab_manager.process_regional_access_boundary_info(
553 regional_access_boundary_info
554 )
555
556 coro = _worker()
557 try:
558 self._worker_task = asyncio.create_task(coro)
559 except Exception:
560 # Clean up cloned request if task creation fails
561 coro.close()
562 try:
563 asyncio.get_running_loop().create_task(
564 _close_cloned_request(lookup_request, is_cloned)
565 )
566 except RuntimeError:
567 pass
568 rab_manager.process_regional_access_boundary_info(None)
569 raise
570
571
572def _get_domain() -> str:
573 """Dynamically determines the domain for IAM credentials based on active mTLS configuration.
574
575 Returns:
576 str: The dynamic domain string.
577 """
578 from google.auth.transport import _mtls_helper
579
580 if (
581 hasattr(_mtls_helper, "check_use_client_cert")
582 and _mtls_helper.check_use_client_cert()
583 ):
584 return f"iamcredentials.mtls.{_helpers.DEFAULT_UNIVERSE_DOMAIN}"
585 else:
586 return f"iamcredentials.{_helpers.DEFAULT_UNIVERSE_DOMAIN}"
587
588
589def get_service_account_rab_endpoint(service_account_email: str) -> str:
590 """Builds the Regional Access Boundary lookup URL for service accounts.
591
592 Args:
593 service_account_email: The service account email.
594
595 Returns:
596 str: The complete lookup URL.
597 """
598 return f"https://{_get_domain()}/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations"
599
600
601def get_workforce_pool_rab_endpoint(pool_id: str) -> str:
602 """Builds the Regional Access Boundary lookup URL for workforce pools.
603
604 Args:
605 pool_id: The workforce pool ID.
606
607 Returns:
608 str: The complete lookup URL.
609 """
610 return f"https://{_get_domain()}/v1/locations/global/workforcePools/{pool_id}/allowedLocations"
611
612
613def get_workload_identity_pool_rab_endpoint(project_number: str, pool_id: str) -> str:
614 """Builds the Regional Access Boundary lookup URL for workload identity pools.
615
616 Args:
617 project_number: The Google Cloud project number.
618 pool_id: The workload identity pool ID.
619
620 Returns:
621 str: The complete lookup URL.
622 """
623 return f"https://{_get_domain()}/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations"