1# Copyright 2026 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Utilities for Regional Access Boundary management."""
16
17import asyncio
18import copy
19import datetime
20import functools
21import inspect
22import logging
23import os
24import threading
25from typing import NamedTuple, Optional, TYPE_CHECKING
26
27from google.auth import _helpers
28from google.auth import environment_vars
29
30if TYPE_CHECKING:
31 import google.auth.credentials
32 import google.auth.transport
33
34_LOGGER = logging.getLogger(__name__)
35
36
37@functools.lru_cache()
38def is_regional_access_boundary_enabled():
39 """Checks if Regional Access Boundary is enabled via environment variable.
40
41 The environment variable is interpreted as a boolean with the following
42 (case-insensitive) rules:
43 - "true", "1" are considered true.
44 - Any other value (or unset) is considered false.
45
46 Returns:
47 bool: True if Regional Access Boundary is enabled, False otherwise.
48 """
49 value = os.environ.get(environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED)
50 if value is None:
51 return False
52
53 return value.lower() in ("true", "1")
54
55
56# The default lifetime for a cached Regional Access Boundary.
57DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL = datetime.timedelta(hours=6)
58
59# The period of time prior to the boundary's expiration when a background refresh
60# is proactively triggered.
61REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD = datetime.timedelta(hours=1)
62
63# The initial cooldown period for a failed Regional Access Boundary lookup.
64DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(minutes=15)
65
66# The maximum cooldown period for a failed Regional Access Boundary lookup.
67MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(hours=6)
68
69
70# The header key used for Regional Access Boundaries.
71_REGIONAL_ACCESS_BOUNDARY_HEADER = "x-allowed-locations"
72
73
74class _RegionalAccessBoundaryData(NamedTuple):
75 """Data container for a Regional Access Boundary snapshot.
76
77 Attributes:
78 encoded_locations (Optional[str]): The encoded Regional Access Boundary string.
79 expiry (Optional[datetime.datetime]): The hard expiration time of the boundary data.
80 cooldown_expiry (Optional[datetime.datetime]): The time until which further lookups are skipped.
81 cooldown_duration (datetime.timedelta): The current duration for the exponential cooldown.
82 """
83
84 encoded_locations: Optional[str]
85 expiry: Optional[datetime.datetime]
86 cooldown_expiry: Optional[datetime.datetime]
87 cooldown_duration: datetime.timedelta
88
89
90class _RegionalAccessBoundaryManager(object):
91 """Manages the Regional Access Boundary state and its background refresh.
92
93 The actual data is held in an immutable `_RegionalAccessBoundaryData` object
94 and is swapped atomically to ensure thread-safe, lock-free reads.
95 """
96
97 def __init__(self):
98 self._data = _RegionalAccessBoundaryData(
99 encoded_locations=None,
100 expiry=None,
101 cooldown_expiry=None,
102 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
103 )
104 self.refresh_manager = _RegionalAccessBoundaryRefreshManager()
105 self._update_lock = threading.Lock()
106 self._use_blocking_regional_access_boundary_lookup = False
107
108 def __getstate__(self):
109 """Pickle helper that serializes the _update_lock attribute."""
110 state = self.__dict__.copy()
111 state["_update_lock"] = None
112 return state
113
114 def __setstate__(self, state):
115 """Pickle helper that deserializes the _update_lock attribute."""
116 self.__dict__.update(state)
117 self._update_lock = threading.Lock()
118
119 def __eq__(self, other):
120 """Checks if two managers are equal."""
121 if not isinstance(other, _RegionalAccessBoundaryManager):
122 return NotImplemented
123 return (
124 self._data == other._data
125 and self._use_blocking_regional_access_boundary_lookup
126 == other._use_blocking_regional_access_boundary_lookup
127 )
128
129 def enable_blocking_lookup(self):
130 """Enables blocking Regional Access Boundary lookup.
131
132 When enabled, the Regional Access Boundary lookup will be performed
133 synchronously in the calling thread instead of asynchronously in a
134 background thread.
135 """
136 self._use_blocking_regional_access_boundary_lookup = True
137
138 def set_initial_regional_access_boundary(self, encoded_locations=None, expiry=None):
139 """Manually sets the regional access boundary to the client provided initial values.
140
141 Args:
142 encoded_locations (Optional[str]): The encoded locations string.
143 expiry (Optional[datetime.datetime]): The expiry time for the boundary.
144 If encoded_locations is not provided, expiry is ignored.
145 """
146 if not encoded_locations:
147 expiry = None
148
149 self._data = _RegionalAccessBoundaryData(
150 encoded_locations=encoded_locations,
151 expiry=expiry,
152 cooldown_expiry=None,
153 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
154 )
155
156 def apply_headers(self, headers):
157 """Applies the Regional Access Boundary header to the provided dictionary.
158
159 If the boundary is valid, the 'x-allowed-locations' header is added
160 or updated. Otherwise, the header is removed to ensure no stale
161 data is sent.
162
163 Args:
164 headers (MutableMapping[str, str]): The headers dictionary to update.
165 """
166 rab_data = self._data
167
168 if rab_data.encoded_locations and (
169 rab_data.expiry is not None and _helpers.utcnow() < rab_data.expiry
170 ):
171 headers[_REGIONAL_ACCESS_BOUNDARY_HEADER] = rab_data.encoded_locations
172 else:
173 headers.pop(_REGIONAL_ACCESS_BOUNDARY_HEADER, None)
174
175 def _should_refresh(self):
176 """Checks if the Regional Access Boundary data needs a refresh and is not in cooldown.
177
178 Returns:
179 bool: True if a refresh is required, False otherwise.
180 """
181 rab_data = self._data
182
183 # Don't start a new refresh if the Regional Access Boundary info is still fresh.
184 if (
185 rab_data.encoded_locations
186 and rab_data.expiry
187 and _helpers.utcnow()
188 < (rab_data.expiry - REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD)
189 ):
190 return False
191
192 # Don't start a new refresh if the cooldown is still in effect.
193 if rab_data.cooldown_expiry and _helpers.utcnow() < rab_data.cooldown_expiry:
194 return False
195
196 return True
197
198 def maybe_start_refresh(self, credentials, request):
199 """Starts a background thread to refresh the Regional Access Boundary if needed.
200
201 Args:
202 credentials (google.auth.credentials.Credentials): The credentials to refresh.
203 request (google.auth.transport.Request): The object used to make HTTP requests.
204 """
205 if not self._should_refresh():
206 return
207
208 # If all checks pass, start the background refresh.
209 if self._use_blocking_regional_access_boundary_lookup:
210 self.start_blocking_refresh(credentials, request)
211 else:
212 self.refresh_manager.start_refresh(credentials, request, self)
213
214 async def maybe_start_refresh_async(self, credentials, request):
215 """Starts a background refresh or performs a blocking refresh asynchronously.
216
217 Args:
218 credentials (google.auth.credentials.Credentials): The credentials to refresh.
219 request (google.auth.aio.transport.Request): The object used to make HTTP requests.
220 """
221 if not self._should_refresh():
222 return
223
224 # If all checks pass, start the refresh.
225 if self._use_blocking_regional_access_boundary_lookup:
226 await self.start_blocking_refresh_async(credentials, request)
227 else:
228 self.refresh_manager.start_refresh(credentials, request, self)
229
230 def start_blocking_refresh(self, credentials, request):
231 """Initiates a blocking lookup of the Regional Access Boundary.
232
233 If the lookup raises an exception, it is caught and logged as a warning,
234 and the lookup is treated as a failure (entering cooldown). Exceptions
235 are not propagated to the caller.
236
237 Args:
238 credentials (google.auth.credentials.Credentials): The credentials to refresh.
239 request (google.auth.transport.Request): The object used to make HTTP requests.
240 """
241 # Async credentials do not support blocking lookups.
242 if inspect.iscoroutinefunction(credentials._lookup_regional_access_boundary):
243 if _helpers.is_logging_enabled(_LOGGER):
244 _LOGGER.warning(
245 "Blocking Regional Access Boundary lookup is not supported for async credentials."
246 )
247 self.process_regional_access_boundary_info(None)
248 return
249
250 try:
251 # The fail_fast parameter is set to True to ensure we don't block the calling
252 # thread for too long. This will do two things: 1) set a timeout to 3s
253 # instead of the default 120s and 2) ensure we do not retry at all
254 regional_access_boundary_info = (
255 credentials._lookup_regional_access_boundary(request, fail_fast=True)
256 )
257 except Exception as e:
258 if _helpers.is_logging_enabled(_LOGGER):
259 _LOGGER.warning(
260 "Blocking Regional Access Boundary lookup raised an exception: %s",
261 e,
262 exc_info=True,
263 )
264 regional_access_boundary_info = None
265
266 self.process_regional_access_boundary_info(regional_access_boundary_info)
267
268 async def start_blocking_refresh_async(self, credentials, request):
269 """Initiates a blocking lookup of the Regional Access Boundary asynchronously.
270
271 If the lookup raises an exception, it is caught and logged as a warning,
272 and the lookup is treated as a failure (entering cooldown). Exceptions
273 are not propagated to the caller.
274
275 Args:
276 credentials (google.auth.credentials.Credentials): The credentials to refresh.
277 request (google.auth.aio.transport.Request): The object used to make HTTP requests.
278 """
279 try:
280 # The fail_fast parameter is set to True to ensure we don't block the calling
281 # thread for too long. This will do two things: 1) set a timeout to 3s
282 # instead of the default 120s and 2) ensure we do not retry at all
283 regional_access_boundary_info = (
284 await credentials._lookup_regional_access_boundary(
285 request, fail_fast=True
286 )
287 )
288 except Exception as e:
289 if _helpers.is_logging_enabled(_LOGGER):
290 _LOGGER.warning(
291 "Regional Access Boundary lookup raised an exception: %s",
292 e,
293 exc_info=True,
294 )
295 regional_access_boundary_info = None
296
297 self.process_regional_access_boundary_info(regional_access_boundary_info)
298
299 def process_regional_access_boundary_info(self, regional_access_boundary_info):
300 """Processes the regional access boundary info and updates the state.
301
302 Args:
303 regional_access_boundary_info (Optional[Mapping[str, str]]): The regional access
304 boundary info to process.
305 """
306 with self._update_lock:
307 # Capture the current state before calculating updates.
308 current_data = self._data
309
310 if regional_access_boundary_info:
311 # On success, update the boundary and its expiry, and clear any cooldown.
312 encoded_locations = regional_access_boundary_info.get(
313 "encodedLocations"
314 )
315 updated_data = _RegionalAccessBoundaryData(
316 encoded_locations=encoded_locations,
317 expiry=_helpers.utcnow() + DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL,
318 cooldown_expiry=None,
319 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
320 )
321 if _helpers.is_logging_enabled(_LOGGER):
322 _LOGGER.debug("Regional Access Boundary lookup successful.")
323 else:
324 # On failure, calculate cooldown and update state.
325 if _helpers.is_logging_enabled(_LOGGER):
326 _LOGGER.warning(
327 "Regional Access Boundary lookup failed. Entering cooldown."
328 )
329
330 next_cooldown_expiry = (
331 _helpers.utcnow() + current_data.cooldown_duration
332 )
333 next_cooldown_duration = min(
334 current_data.cooldown_duration * 2,
335 MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
336 )
337
338 # If the refresh failed, we keep reusing the existing data unless
339 # it has reached its hard expiration time.
340 if current_data.expiry and _helpers.utcnow() > current_data.expiry:
341 next_encoded_locations = None
342 next_expiry = None
343 else:
344 next_encoded_locations = current_data.encoded_locations
345 next_expiry = current_data.expiry
346
347 updated_data = _RegionalAccessBoundaryData(
348 encoded_locations=next_encoded_locations,
349 expiry=next_expiry,
350 cooldown_expiry=next_cooldown_expiry,
351 cooldown_duration=next_cooldown_duration,
352 )
353
354 # Perform the atomic swap of the state object.
355 self._data = updated_data
356
357
358class _RegionalAccessBoundaryRefreshThread(threading.Thread):
359 """Thread for background refreshing of the Regional Access Boundary."""
360
361 def __init__(
362 self,
363 credentials: "google.auth.credentials.CredentialsWithRegionalAccessBoundary", # noqa: F821
364 request: "google.auth.transport.Request", # noqa: F821
365 rab_manager: "_RegionalAccessBoundaryManager",
366 ):
367 super().__init__()
368 self.daemon = True
369 self._credentials = credentials
370 self._request = request
371 self._rab_manager = rab_manager
372
373 def run(self):
374 """
375 Performs the Regional Access Boundary lookup and updates the state.
376
377 This method is run in a separate thread. It delegates the actual lookup
378 to the credentials object's `_lookup_regional_access_boundary` method.
379 Based on the lookup's outcome (success or complete failure after retries),
380 it updates the cached Regional Access Boundary information,
381 its expiry, its cooldown expiry, and its exponential cooldown duration.
382 """
383 # Catch exceptions (e.g., from the underlying transport) to prevent the
384 # background thread from crashing. This ensures we can gracefully enter
385 # an exponential cooldown state on failure.
386 try:
387 regional_access_boundary_info = (
388 self._credentials._lookup_regional_access_boundary(self._request)
389 )
390 except Exception as e:
391 if _helpers.is_logging_enabled(_LOGGER):
392 _LOGGER.warning(
393 "Asynchronous Regional Access Boundary lookup raised an exception: %s",
394 e,
395 exc_info=True,
396 )
397 regional_access_boundary_info = None
398
399 self._rab_manager.process_regional_access_boundary_info(
400 regional_access_boundary_info
401 )
402
403
404class _RegionalAccessBoundaryRefreshManager(object):
405 """Manages a thread for background refreshing of the Regional Access Boundary."""
406
407 def __init__(self):
408 self._lock = threading.Lock()
409 self._worker = None
410
411 def __getstate__(self):
412 """Pickle helper that serializes the _lock and _worker attributes."""
413 state = self.__dict__.copy()
414 state["_lock"] = None
415 state["_worker"] = None
416 return state
417
418 def __setstate__(self, state):
419 """Pickle helper that deserializes the _lock and _worker attributes."""
420 self.__dict__.update(state)
421 self._lock = threading.Lock()
422 self._worker = None
423
424 def start_refresh(self, credentials, request, rab_manager):
425 """
426 Starts a background thread to refresh the Regional Access Boundary if one is not already running.
427
428 Args:
429 credentials (CredentialsWithRegionalAccessBoundary): The credentials
430 to refresh.
431 request (google.auth.transport.Request): The object used to make
432 HTTP requests.
433 rab_manager (_RegionalAccessBoundaryManager): The manager container to update.
434 """
435 with self._lock:
436 if self._worker and self._worker.is_alive():
437 # A refresh is already in progress.
438 return
439
440 try:
441 copied_request = copy.deepcopy(request)
442 except Exception as e:
443 if _helpers.is_logging_enabled(_LOGGER):
444 _LOGGER.warning(
445 "Could not deepcopy transport for background RAB refresh. "
446 "Skipping background refresh to avoid thread safety issues. "
447 "Exception: %s",
448 e,
449 )
450 return
451
452 self._worker = _RegionalAccessBoundaryRefreshThread(
453 credentials, copied_request, rab_manager
454 )
455 self._worker.start()
456
457
458class _AsyncRegionalAccessBoundaryRefreshManager(object):
459 """Manages a task for background refreshing of the Regional Access Boundary in async flows."""
460
461 def __init__(self):
462 self._lock = threading.Lock()
463 self._worker_task = None
464
465 def __getstate__(self):
466 """Pickle helper that excludes the un-picklable _lock and _worker_task attributes from serialization."""
467 state = self.__dict__.copy()
468 state["_lock"] = None
469 state["_worker_task"] = None
470 return state
471
472 def __setstate__(self, state):
473 """Pickle helper that restores state and re-initializes the _lock and _worker_task attributes."""
474 self.__dict__.update(state)
475 self._lock = threading.Lock()
476 self._worker_task = None
477
478 def start_refresh(self, credentials, request, rab_manager):
479 """
480 Starts a background task to refresh the Regional Access Boundary if one is not already running.
481
482 Args:
483 credentials (CredentialsWithRegionalAccessBoundary): The credentials
484 to refresh.
485 request (google.auth.aio.transport.Request): The object used to make
486 HTTP requests.
487 rab_manager (_RegionalAccessBoundaryManager): The manager container to update.
488 """
489 with self._lock:
490 if self._worker_task and not self._worker_task.done():
491 # A refresh is already in progress.
492 return
493
494 async def _worker():
495 try:
496 # credentials._lookup_regional_access_boundary should be async in the async creds class
497 regional_access_boundary_info = (
498 await credentials._lookup_regional_access_boundary(request)
499 )
500 except Exception as e:
501 if _helpers.is_logging_enabled(_LOGGER):
502 _LOGGER.warning(
503 "Asynchronous Regional Access Boundary lookup raised an exception: %s",
504 e,
505 exc_info=True,
506 )
507 regional_access_boundary_info = None
508
509 rab_manager.process_regional_access_boundary_info(
510 regional_access_boundary_info
511 )
512
513 coro = _worker()
514 try:
515 self._worker_task = asyncio.create_task(coro)
516 except Exception:
517 coro.close()
518 raise
519
520
521def _get_domain() -> str:
522 """Dynamically determines the domain for IAM credentials based on active mTLS configuration.
523
524 Returns:
525 str: The dynamic domain string.
526 """
527 from google.auth.transport import _mtls_helper
528
529 if (
530 hasattr(_mtls_helper, "check_use_client_cert")
531 and _mtls_helper.check_use_client_cert()
532 ):
533 return f"iamcredentials.mtls.{_helpers.DEFAULT_UNIVERSE_DOMAIN}"
534 else:
535 return f"iamcredentials.{_helpers.DEFAULT_UNIVERSE_DOMAIN}"
536
537
538def get_service_account_rab_endpoint(service_account_email: str) -> str:
539 """Builds the Regional Access Boundary lookup URL for service accounts.
540
541 Args:
542 service_account_email: The service account email.
543
544 Returns:
545 str: The complete lookup URL.
546 """
547 return f"https://{_get_domain()}/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations"
548
549
550def get_workforce_pool_rab_endpoint(pool_id: str) -> str:
551 """Builds the Regional Access Boundary lookup URL for workforce pools.
552
553 Args:
554 pool_id: The workforce pool ID.
555
556 Returns:
557 str: The complete lookup URL.
558 """
559 return f"https://{_get_domain()}/v1/locations/global/workforcePools/{pool_id}/allowedLocations"
560
561
562def get_workload_identity_pool_rab_endpoint(project_number: str, pool_id: str) -> str:
563 """Builds the Regional Access Boundary lookup URL for workload identity pools.
564
565 Args:
566 project_number: The Google Cloud project number.
567 pool_id: The workload identity pool ID.
568
569 Returns:
570 str: The complete lookup URL.
571 """
572 return f"https://{_get_domain()}/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations"