1# Copyright 2026 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Utilities for Regional Access Boundary management."""
16
17import asyncio
18import copy
19import datetime
20import functools
21import inspect
22import logging
23import threading
24from typing import NamedTuple, Optional, TYPE_CHECKING
25
26from google.auth import _helpers
27
28if TYPE_CHECKING: # pragma: NO COVER
29 import google.auth.credentials
30 import google.auth.transport
31
32_LOGGER = logging.getLogger(__name__)
33
34
35# The default lifetime for a cached Regional Access Boundary.
36DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL = datetime.timedelta(hours=6)
37
38# The period of time prior to the boundary's expiration when a background refresh
39# is proactively triggered.
40REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD = datetime.timedelta(hours=1)
41
42# The initial cooldown period for a failed Regional Access Boundary lookup.
43DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(minutes=15)
44
45# The maximum cooldown period for a failed Regional Access Boundary lookup.
46MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(hours=6)
47
48
49# The header key used for Regional Access Boundaries.
50_REGIONAL_ACCESS_BOUNDARY_HEADER = "x-allowed-locations"
51
52
53class _RegionalAccessBoundaryData(NamedTuple):
54 """Data container for a Regional Access Boundary snapshot.
55
56 Attributes:
57 encoded_locations (Optional[str]): The encoded Regional Access Boundary string.
58 expiry (Optional[datetime.datetime]): The hard expiration time of the boundary data.
59 cooldown_expiry (Optional[datetime.datetime]): The time until which further lookups are skipped.
60 cooldown_duration (datetime.timedelta): The current duration for the exponential cooldown.
61 """
62
63 encoded_locations: Optional[str]
64 expiry: Optional[datetime.datetime]
65 cooldown_expiry: Optional[datetime.datetime]
66 cooldown_duration: datetime.timedelta
67
68
69class _RegionalAccessBoundaryManager(object):
70 """Manages the Regional Access Boundary state and its background refresh.
71
72 The actual data is held in an immutable `_RegionalAccessBoundaryData` object
73 and is swapped atomically to ensure thread-safe, lock-free reads.
74 """
75
76 def __init__(self):
77 self._data = _RegionalAccessBoundaryData(
78 encoded_locations=None,
79 expiry=None,
80 cooldown_expiry=None,
81 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
82 )
83 self.refresh_manager = _RegionalAccessBoundaryRefreshManager()
84 self._update_lock = threading.Lock()
85 self._use_blocking_regional_access_boundary_lookup = False
86
87 def __getstate__(self):
88 """Pickle helper that serializes the _update_lock attribute."""
89 state = self.__dict__.copy()
90 state["_update_lock"] = None
91 return state
92
93 def __setstate__(self, state):
94 """Pickle helper that deserializes the _update_lock attribute."""
95 self.__dict__.update(state)
96 self._update_lock = threading.Lock()
97
98 def __eq__(self, other):
99 """Checks if two managers are equal."""
100 if not isinstance(other, _RegionalAccessBoundaryManager):
101 return NotImplemented
102 return (
103 self._data == other._data
104 and self._use_blocking_regional_access_boundary_lookup
105 == other._use_blocking_regional_access_boundary_lookup
106 )
107
108 def enable_blocking_lookup(self):
109 """Enables blocking Regional Access Boundary lookup.
110
111 When enabled, the Regional Access Boundary lookup will be performed
112 synchronously in the calling thread instead of asynchronously in a
113 background thread.
114 """
115 self._use_blocking_regional_access_boundary_lookup = True
116
117 def set_initial_regional_access_boundary(self, encoded_locations=None, expiry=None):
118 """Manually sets the regional access boundary to the client provided initial values.
119
120 Args:
121 encoded_locations (Optional[str]): The encoded locations string.
122 expiry (Optional[datetime.datetime]): The expiry time for the boundary.
123 If encoded_locations is not provided, expiry is ignored.
124 """
125 if not encoded_locations:
126 expiry = None
127
128 self._data = _RegionalAccessBoundaryData(
129 encoded_locations=encoded_locations,
130 expiry=expiry,
131 cooldown_expiry=None,
132 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
133 )
134
135 def apply_headers(self, headers):
136 """Applies the Regional Access Boundary header to the provided dictionary.
137
138 If the boundary is valid, the 'x-allowed-locations' header is added
139 or updated. Otherwise, the header is removed to ensure no stale
140 data is sent.
141
142 Args:
143 headers (MutableMapping[str, str]): The headers dictionary to update.
144 """
145 rab_data = self._data
146
147 if rab_data.encoded_locations and (
148 rab_data.expiry is not None and _helpers.utcnow() < rab_data.expiry
149 ):
150 headers[_REGIONAL_ACCESS_BOUNDARY_HEADER] = rab_data.encoded_locations
151 else:
152 headers.pop(_REGIONAL_ACCESS_BOUNDARY_HEADER, None)
153
154 def _should_refresh(self):
155 """Checks if the Regional Access Boundary data needs a refresh and is not in cooldown.
156
157 Returns:
158 bool: True if a refresh is required, False otherwise.
159 """
160 rab_data = self._data
161
162 # Don't start a new refresh if the Regional Access Boundary info is still fresh.
163 if (
164 rab_data.encoded_locations
165 and rab_data.expiry
166 and _helpers.utcnow()
167 < (rab_data.expiry - REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD)
168 ):
169 return False
170
171 # Don't start a new refresh if the cooldown is still in effect.
172 if rab_data.cooldown_expiry and _helpers.utcnow() < rab_data.cooldown_expiry:
173 return False
174
175 return True
176
177 def maybe_start_refresh(self, credentials, request):
178 """Starts a background thread to refresh the Regional Access Boundary if needed.
179
180 Args:
181 credentials (google.auth.credentials.Credentials): The credentials to refresh.
182 request (google.auth.transport.Request): The object used to make HTTP requests.
183 """
184 if not self._should_refresh():
185 return
186
187 # If all checks pass, start the background refresh.
188 if self._use_blocking_regional_access_boundary_lookup:
189 self.start_blocking_refresh(credentials, request)
190 else:
191 self.refresh_manager.start_refresh(credentials, request, self)
192
193 async def maybe_start_refresh_async(self, credentials, request):
194 """Starts a background refresh or performs a blocking refresh asynchronously.
195
196 Args:
197 credentials (google.auth.credentials.Credentials): The credentials to refresh.
198 request (google.auth.aio.transport.Request): The object used to make HTTP requests.
199 """
200 if not self._should_refresh():
201 return
202
203 # If all checks pass, start the refresh.
204 if self._use_blocking_regional_access_boundary_lookup:
205 await self.start_blocking_refresh_async(credentials, request)
206 else:
207 self.refresh_manager.start_refresh(credentials, request, self)
208
209 def start_blocking_refresh(self, credentials, request):
210 """Initiates a blocking lookup of the Regional Access Boundary.
211
212 If the lookup raises an exception, it is caught and logged as a warning,
213 and the lookup is treated as a failure (entering cooldown). Exceptions
214 are not propagated to the caller.
215
216 Args:
217 credentials (google.auth.credentials.Credentials): The credentials to refresh.
218 request (google.auth.transport.Request): The object used to make HTTP requests.
219 """
220 # Async credentials do not support blocking lookups.
221 if inspect.iscoroutinefunction(credentials._lookup_regional_access_boundary):
222 if _helpers.is_logging_enabled(_LOGGER):
223 _LOGGER.warning(
224 "Blocking Regional Access Boundary lookup is not supported for async credentials."
225 )
226 self.process_regional_access_boundary_info(None)
227 return
228
229 try:
230 # The fail_fast parameter is set to True to ensure we don't block the calling
231 # thread for too long. This will do two things: 1) set a timeout to 3s
232 # instead of the default 120s and 2) ensure we do not retry at all
233 regional_access_boundary_info = (
234 credentials._lookup_regional_access_boundary(request, fail_fast=True)
235 )
236 except Exception as e:
237 if _helpers.is_logging_enabled(_LOGGER):
238 _LOGGER.warning(
239 "Blocking Regional Access Boundary lookup raised an exception: %s",
240 e,
241 exc_info=True,
242 )
243 regional_access_boundary_info = None
244
245 self.process_regional_access_boundary_info(regional_access_boundary_info)
246
247 async def start_blocking_refresh_async(self, credentials, request):
248 """Initiates a blocking lookup of the Regional Access Boundary asynchronously.
249
250 If the lookup raises an exception, it is caught and logged as a warning,
251 and the lookup is treated as a failure (entering cooldown). Exceptions
252 are not propagated to the caller.
253
254 Args:
255 credentials (google.auth.credentials.Credentials): The credentials to refresh.
256 request (google.auth.aio.transport.Request): The object used to make HTTP requests.
257 """
258 try:
259 # The fail_fast parameter is set to True to ensure we don't block the calling
260 # thread for too long. This will do two things: 1) set a timeout to 3s
261 # instead of the default 120s and 2) ensure we do not retry at all
262 regional_access_boundary_info = (
263 await credentials._lookup_regional_access_boundary(
264 request, fail_fast=True
265 )
266 )
267 except Exception as e:
268 if _helpers.is_logging_enabled(_LOGGER):
269 _LOGGER.warning(
270 "Regional Access Boundary lookup raised an exception: %s",
271 e,
272 exc_info=True,
273 )
274 regional_access_boundary_info = None
275
276 self.process_regional_access_boundary_info(regional_access_boundary_info)
277
278 def process_regional_access_boundary_info(self, regional_access_boundary_info):
279 """Processes the regional access boundary info and updates the state.
280
281 Args:
282 regional_access_boundary_info (Optional[Mapping[str, str]]): The regional access
283 boundary info to process.
284 """
285 with self._update_lock:
286 # Capture the current state before calculating updates.
287 current_data = self._data
288
289 if regional_access_boundary_info:
290 # On success, update the boundary and its expiry, and clear any cooldown.
291 encoded_locations = regional_access_boundary_info.get(
292 "encodedLocations"
293 )
294 updated_data = _RegionalAccessBoundaryData(
295 encoded_locations=encoded_locations,
296 expiry=_helpers.utcnow() + DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL,
297 cooldown_expiry=None,
298 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
299 )
300 if _helpers.is_logging_enabled(_LOGGER):
301 _LOGGER.debug("Regional Access Boundary lookup successful.")
302 else:
303 # On failure, calculate cooldown and update state.
304 if _helpers.is_logging_enabled(_LOGGER):
305 _LOGGER.warning(
306 "Regional Access Boundary lookup failed. Entering cooldown."
307 )
308
309 next_cooldown_expiry = (
310 _helpers.utcnow() + current_data.cooldown_duration
311 )
312 next_cooldown_duration = min(
313 current_data.cooldown_duration * 2,
314 MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
315 )
316
317 # If the refresh failed, we keep reusing the existing data unless
318 # it has reached its hard expiration time.
319 if current_data.expiry and _helpers.utcnow() > current_data.expiry:
320 next_encoded_locations = None
321 next_expiry = None
322 else:
323 next_encoded_locations = current_data.encoded_locations
324 next_expiry = current_data.expiry
325
326 updated_data = _RegionalAccessBoundaryData(
327 encoded_locations=next_encoded_locations,
328 expiry=next_expiry,
329 cooldown_expiry=next_cooldown_expiry,
330 cooldown_duration=next_cooldown_duration,
331 )
332
333 # Perform the atomic swap of the state object.
334 self._data = updated_data
335
336
337class _RegionalAccessBoundaryRefreshThread(threading.Thread):
338 """Thread for background refreshing of the Regional Access Boundary."""
339
340 def __init__(
341 self,
342 credentials: "google.auth.credentials.CredentialsWithRegionalAccessBoundary", # noqa: F821
343 request: "google.auth.transport.Request", # noqa: F821
344 rab_manager: "_RegionalAccessBoundaryManager",
345 ):
346 super().__init__()
347 self.daemon = True
348 self._credentials = credentials
349 self._request = request
350 self._rab_manager = rab_manager
351
352 def run(self):
353 """
354 Performs the Regional Access Boundary lookup and updates the state.
355
356 This method is run in a separate thread. It delegates the actual lookup
357 to the credentials object's `_lookup_regional_access_boundary` method.
358 Based on the lookup's outcome (success or complete failure after retries),
359 it updates the cached Regional Access Boundary information,
360 its expiry, its cooldown expiry, and its exponential cooldown duration.
361 """
362 # Catch exceptions (e.g., from the underlying transport) to prevent the
363 # background thread from crashing. This ensures we can gracefully enter
364 # an exponential cooldown state on failure.
365 try:
366 regional_access_boundary_info = (
367 self._credentials._lookup_regional_access_boundary(self._request)
368 )
369 except Exception as e:
370 if _helpers.is_logging_enabled(_LOGGER):
371 _LOGGER.warning(
372 "Asynchronous Regional Access Boundary lookup raised an exception: %s",
373 e,
374 exc_info=True,
375 )
376 regional_access_boundary_info = None
377
378 self._rab_manager.process_regional_access_boundary_info(
379 regional_access_boundary_info
380 )
381
382
383class _RegionalAccessBoundaryRefreshManager(object):
384 """Manages a thread for background refreshing of the Regional Access Boundary."""
385
386 def __init__(self):
387 self._lock = threading.Lock()
388 self._worker = None
389
390 def __getstate__(self):
391 """Pickle helper that serializes the _lock and _worker attributes."""
392 state = self.__dict__.copy()
393 state["_lock"] = None
394 state["_worker"] = None
395 return state
396
397 def __setstate__(self, state):
398 """Pickle helper that deserializes the _lock and _worker attributes."""
399 self.__dict__.update(state)
400 self._lock = threading.Lock()
401 self._worker = None
402
403 def start_refresh(self, credentials, request, rab_manager):
404 """
405 Starts a background thread to refresh the Regional Access Boundary if one is not already running.
406
407 Args:
408 credentials (CredentialsWithRegionalAccessBoundary): The credentials
409 to refresh.
410 request (google.auth.transport.Request): The object used to make
411 HTTP requests.
412 rab_manager (_RegionalAccessBoundaryManager): The manager container to update.
413 """
414 with self._lock:
415 if self._worker and self._worker.is_alive():
416 # A refresh is already in progress.
417 return
418
419 try:
420 copied_request = copy.deepcopy(request)
421 except Exception as e:
422 if _helpers.is_logging_enabled(_LOGGER):
423 _LOGGER.warning(
424 "Could not deepcopy transport for background RAB refresh. "
425 "Skipping background refresh to avoid thread safety issues. "
426 "Exception: %s",
427 e,
428 )
429 return
430
431 self._worker = _RegionalAccessBoundaryRefreshThread(
432 credentials, copied_request, rab_manager
433 )
434 self._worker.start()
435
436
437def _prepare_async_lookup_callable(request):
438 """Unwraps a request callable, clones the transport, and returns the new callable.
439
440 Args:
441 request: The original request callable (e.g. functools.partial or raw Request).
442
443 Returns:
444 Tuple[Callable, Any, bool]: A tuple containing the new lookup callable, the
445 underlying request object, and a boolean indicating if it was cloned.
446 """
447 is_partial = isinstance(request, functools.partial)
448 base_callable = request.func if is_partial else request
449
450 if not hasattr(base_callable, "_clone"):
451 return request, base_callable, False
452
453 cloned_callable = base_callable._clone()
454 is_cloned = cloned_callable is not base_callable
455
456 if is_partial:
457 new_request = functools.partial(
458 cloned_callable, *request.args, **request.keywords
459 )
460 else:
461 new_request = cloned_callable
462
463 return new_request, cloned_callable, is_cloned
464
465
466async def _close_cloned_request(lookup_request, is_cloned):
467 """Safely closes the underlying cloned request transport, if applicable.
468
469 Args:
470 lookup_request (Any): The request object/transport to close.
471 is_cloned (bool): Whether the request was actually cloned.
472 """
473 if not is_cloned or not hasattr(lookup_request, "close"):
474 return
475
476 is_async = False
477 try:
478 maybe_coro = lookup_request.close()
479 if is_async := inspect.isawaitable(maybe_coro):
480 await maybe_coro
481 except Exception as e:
482 if _helpers.is_logging_enabled(_LOGGER):
483 adapter_type = " asynchronous " if is_async else " "
484 _LOGGER.warning(
485 "Failed to cleanly close cloned%srequest transport: %s",
486 adapter_type,
487 e,
488 exc_info=True,
489 )
490
491
492class _AsyncRegionalAccessBoundaryRefreshManager(object):
493 """Manages a task for background refreshing of the Regional Access Boundary in async flows."""
494
495 def __init__(self):
496 self._lock = threading.Lock()
497 self._worker_task = None
498
499 def __getstate__(self):
500 """Pickle helper that excludes the un-picklable _lock and _worker_task attributes from serialization."""
501 state = self.__dict__.copy()
502 state["_lock"] = None
503 state["_worker_task"] = None
504 return state
505
506 def __setstate__(self, state):
507 """Pickle helper that restores state and re-initializes the _lock and _worker_task attributes."""
508 self.__dict__.update(state)
509 self._lock = threading.Lock()
510 self._worker_task = None
511
512 def start_refresh(self, credentials, request, rab_manager):
513 """
514 Starts a background task to refresh the Regional Access Boundary if one is not already running.
515
516 Args:
517 credentials (CredentialsWithRegionalAccessBoundary): The credentials
518 to refresh.
519 request (google.auth.aio.transport.Request): The object used to make
520 HTTP requests.
521 rab_manager (_RegionalAccessBoundaryManager): The manager container to update.
522 """
523 with self._lock:
524 if self._worker_task and not self._worker_task.done():
525 # A refresh is already in progress.
526 return
527
528 try:
529 (
530 lookup_callable,
531 lookup_request,
532 is_cloned,
533 ) = _prepare_async_lookup_callable(request)
534 except Exception as e:
535 if _helpers.is_logging_enabled(_LOGGER):
536 _LOGGER.warning(
537 "Synchronous cloning of request for Regional Access Boundary lookup failed: %s",
538 e,
539 exc_info=True,
540 )
541 rab_manager.process_regional_access_boundary_info(None)
542 return
543
544 async def _worker():
545 try:
546 regional_access_boundary_info = (
547 await credentials._lookup_regional_access_boundary(
548 lookup_callable
549 )
550 )
551 except Exception as e:
552 if _helpers.is_logging_enabled(_LOGGER):
553 _LOGGER.warning(
554 "Asynchronous Regional Access Boundary lookup raised an exception: %s",
555 e,
556 exc_info=True,
557 )
558 regional_access_boundary_info = None
559 finally:
560 await _close_cloned_request(lookup_request, is_cloned)
561
562 rab_manager.process_regional_access_boundary_info(
563 regional_access_boundary_info
564 )
565
566 coro = _worker()
567 try:
568 self._worker_task = asyncio.create_task(coro)
569 except Exception:
570 # Clean up cloned request if task creation fails
571 coro.close()
572 try:
573 asyncio.get_running_loop().create_task(
574 _close_cloned_request(lookup_request, is_cloned)
575 )
576 except RuntimeError:
577 pass
578 rab_manager.process_regional_access_boundary_info(None)
579 raise
580
581
582def _get_domain() -> str:
583 """Dynamically determines the domain for IAM credentials based on active mTLS configuration.
584
585 Returns:
586 str: The dynamic domain string.
587 """
588 from google.auth.transport import _mtls_helper
589
590 if (
591 hasattr(_mtls_helper, "check_use_client_cert")
592 and _mtls_helper.check_use_client_cert()
593 ):
594 return f"iamcredentials.mtls.{_helpers.DEFAULT_UNIVERSE_DOMAIN}"
595 else:
596 return f"iamcredentials.{_helpers.DEFAULT_UNIVERSE_DOMAIN}"
597
598
599def get_service_account_rab_endpoint(service_account_email: str) -> str:
600 """Builds the Regional Access Boundary lookup URL for service accounts.
601
602 Args:
603 service_account_email: The service account email.
604
605 Returns:
606 str: The complete lookup URL.
607 """
608 return f"https://{_get_domain()}/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations"
609
610
611def get_workforce_pool_rab_endpoint(pool_id: str) -> str:
612 """Builds the Regional Access Boundary lookup URL for workforce pools.
613
614 Args:
615 pool_id: The workforce pool ID.
616
617 Returns:
618 str: The complete lookup URL.
619 """
620 return f"https://{_get_domain()}/v1/locations/global/workforcePools/{pool_id}/allowedLocations"
621
622
623def get_workload_identity_pool_rab_endpoint(project_number: str, pool_id: str) -> str:
624 """Builds the Regional Access Boundary lookup URL for workload identity pools.
625
626 Args:
627 project_number: The Google Cloud project number.
628 pool_id: The workload identity pool ID.
629
630 Returns:
631 str: The complete lookup URL.
632 """
633 return f"https://{_get_domain()}/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations"