1# Copyright 2026 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Utilities for Regional Access Boundary management."""
16
17import copy
18import datetime
19import functools
20import logging
21import os
22import threading
23from typing import NamedTuple, Optional, TYPE_CHECKING
24
25from google.auth import _helpers
26from google.auth import environment_vars
27
28if TYPE_CHECKING:
29 import google.auth.credentials
30 import google.auth.transport
31
32_LOGGER = logging.getLogger(__name__)
33
34
35@functools.lru_cache()
36def is_regional_access_boundary_enabled():
37 """Checks if Regional Access Boundary is enabled via environment variable.
38
39 The environment variable is interpreted as a boolean with the following
40 (case-insensitive) rules:
41 - "true", "1" are considered true.
42 - Any other value (or unset) is considered false.
43
44 Returns:
45 bool: True if Regional Access Boundary is enabled, False otherwise.
46 """
47 value = os.environ.get(environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED)
48 if value is None:
49 return False
50
51 return value.lower() in ("true", "1")
52
53
54# The default lifetime for a cached Regional Access Boundary.
55DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL = datetime.timedelta(hours=6)
56
57# The period of time prior to the boundary's expiration when a background refresh
58# is proactively triggered.
59REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD = datetime.timedelta(hours=1)
60
61# The initial cooldown period for a failed Regional Access Boundary lookup.
62DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(minutes=15)
63
64# The maximum cooldown period for a failed Regional Access Boundary lookup.
65MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(hours=6)
66
67
68# The header key used for Regional Access Boundaries.
69_REGIONAL_ACCESS_BOUNDARY_HEADER = "x-allowed-locations"
70
71
72class _RegionalAccessBoundaryData(NamedTuple):
73 """Data container for a Regional Access Boundary snapshot.
74
75 Attributes:
76 encoded_locations (Optional[str]): The encoded Regional Access Boundary string.
77 expiry (Optional[datetime.datetime]): The hard expiration time of the boundary data.
78 cooldown_expiry (Optional[datetime.datetime]): The time until which further lookups are skipped.
79 cooldown_duration (datetime.timedelta): The current duration for the exponential cooldown.
80 """
81
82 encoded_locations: Optional[str]
83 expiry: Optional[datetime.datetime]
84 cooldown_expiry: Optional[datetime.datetime]
85 cooldown_duration: datetime.timedelta
86
87
88class _RegionalAccessBoundaryManager(object):
89 """Manages the Regional Access Boundary state and its background refresh.
90
91 The actual data is held in an immutable `_RegionalAccessBoundaryData` object
92 and is swapped atomically to ensure thread-safe, lock-free reads.
93 """
94
95 def __init__(self):
96 self._data = _RegionalAccessBoundaryData(
97 encoded_locations=None,
98 expiry=None,
99 cooldown_expiry=None,
100 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
101 )
102 self.refresh_manager = _RegionalAccessBoundaryRefreshManager()
103 self._update_lock = threading.Lock()
104 self._use_blocking_regional_access_boundary_lookup = False
105
106 def __getstate__(self):
107 """Pickle helper that serializes the _update_lock attribute."""
108 state = self.__dict__.copy()
109 state["_update_lock"] = None
110 return state
111
112 def __setstate__(self, state):
113 """Pickle helper that deserializes the _update_lock attribute."""
114 self.__dict__.update(state)
115 self._update_lock = threading.Lock()
116
117 def __eq__(self, other):
118 """Checks if two managers are equal."""
119 if not isinstance(other, _RegionalAccessBoundaryManager):
120 return NotImplemented
121 return (
122 self._data == other._data
123 and self._use_blocking_regional_access_boundary_lookup
124 == other._use_blocking_regional_access_boundary_lookup
125 )
126
127 def enable_blocking_lookup(self):
128 """Enables blocking Regional Access Boundary lookup.
129
130 When enabled, the Regional Access Boundary lookup will be performed
131 synchronously in the calling thread instead of asynchronously in a
132 background thread.
133 """
134 self._use_blocking_regional_access_boundary_lookup = True
135
136 def set_initial_regional_access_boundary(self, encoded_locations=None, expiry=None):
137 """Manually sets the regional access boundary to the client provided seed.
138
139 Args:
140 encoded_locations (Optional[str]): The encoded locations string.
141 expiry (Optional[datetime.datetime]): The expiry time for the boundary.
142 If encoded_locations is not provided, expiry is ignored.
143 """
144 if not encoded_locations:
145 expiry = None
146
147 self._data = _RegionalAccessBoundaryData(
148 encoded_locations=encoded_locations,
149 expiry=expiry,
150 cooldown_expiry=None,
151 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
152 )
153
154 def apply_headers(self, headers):
155 """Applies the Regional Access Boundary header to the provided dictionary.
156
157 If the boundary is valid, the 'x-allowed-locations' header is added
158 or updated. Otherwise, the header is removed to ensure no stale
159 data is sent.
160
161 Args:
162 headers (MutableMapping[str, str]): The headers dictionary to update.
163 """
164 rab_data = self._data
165
166 if rab_data.encoded_locations and (
167 rab_data.expiry is not None and _helpers.utcnow() < rab_data.expiry
168 ):
169 headers[_REGIONAL_ACCESS_BOUNDARY_HEADER] = rab_data.encoded_locations
170 else:
171 headers.pop(_REGIONAL_ACCESS_BOUNDARY_HEADER, None)
172
173 def maybe_start_refresh(self, credentials, request):
174 """Starts a background thread to refresh the Regional Access Boundary if needed.
175
176 Args:
177 credentials (google.auth.credentials.Credentials): The credentials to refresh.
178 request (google.auth.transport.Request): The object used to make HTTP requests.
179 """
180 rab_data = self._data
181
182 # Don't start a new refresh if the Regional Access Boundary info is still fresh.
183 if (
184 rab_data.encoded_locations
185 and rab_data.expiry
186 and _helpers.utcnow()
187 < (rab_data.expiry - REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD)
188 ):
189 return
190
191 # Don't start a new refresh if the cooldown is still in effect.
192 if rab_data.cooldown_expiry and _helpers.utcnow() < rab_data.cooldown_expiry:
193 return
194
195 # If all checks pass, start the background refresh.
196 if self._use_blocking_regional_access_boundary_lookup:
197 self.start_blocking_refresh(credentials, request)
198 else:
199 self.refresh_manager.start_refresh(credentials, request, self)
200
201 def start_blocking_refresh(self, credentials, request):
202 """Initiates a blocking lookup of the Regional Access Boundary.
203
204 If the lookup raises an exception, it is caught and logged as a warning,
205 and the lookup is treated as a failure (entering cooldown). Exceptions
206 are not propagated to the caller.
207
208 Args:
209 credentials (google.auth.credentials.Credentials): The credentials to refresh.
210 request (google.auth.transport.Request): The object used to make HTTP requests.
211 """
212 try:
213 # The fail_fast parameter is set to True to ensure we don't block the calling
214 # thread for too long. This will do two things: 1) set a timeout to 3s
215 # instead of the default 120s and 2) ensure we do not retry at all
216 regional_access_boundary_info = (
217 credentials._lookup_regional_access_boundary(request, fail_fast=True)
218 )
219 except Exception as e:
220 if _helpers.is_logging_enabled(_LOGGER):
221 _LOGGER.warning(
222 "Blocking Regional Access Boundary lookup raised an exception: %s",
223 e,
224 exc_info=True,
225 )
226 regional_access_boundary_info = None
227
228 self.process_regional_access_boundary_info(regional_access_boundary_info)
229
230 def process_regional_access_boundary_info(self, regional_access_boundary_info):
231 """Processes the regional access boundary info and updates the state.
232
233 Args:
234 regional_access_boundary_info (Optional[Mapping[str, str]]): The regional access
235 boundary info to process.
236 """
237 with self._update_lock:
238 # Capture the current state before calculating updates.
239 current_data = self._data
240
241 if regional_access_boundary_info:
242 # On success, update the boundary and its expiry, and clear any cooldown.
243 encoded_locations = regional_access_boundary_info.get(
244 "encodedLocations"
245 )
246 updated_data = _RegionalAccessBoundaryData(
247 encoded_locations=encoded_locations,
248 expiry=_helpers.utcnow() + DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL,
249 cooldown_expiry=None,
250 cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
251 )
252 if _helpers.is_logging_enabled(_LOGGER):
253 _LOGGER.debug("Regional Access Boundary lookup successful.")
254 else:
255 # On failure, calculate cooldown and update state.
256 if _helpers.is_logging_enabled(_LOGGER):
257 _LOGGER.warning(
258 "Regional Access Boundary lookup failed. Entering cooldown."
259 )
260
261 next_cooldown_expiry = (
262 _helpers.utcnow() + current_data.cooldown_duration
263 )
264 next_cooldown_duration = min(
265 current_data.cooldown_duration * 2,
266 MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
267 )
268
269 # If the refresh failed, we keep reusing the existing data unless
270 # it has reached its hard expiration time.
271 if current_data.expiry and _helpers.utcnow() > current_data.expiry:
272 next_encoded_locations = None
273 next_expiry = None
274 else:
275 next_encoded_locations = current_data.encoded_locations
276 next_expiry = current_data.expiry
277
278 updated_data = _RegionalAccessBoundaryData(
279 encoded_locations=next_encoded_locations,
280 expiry=next_expiry,
281 cooldown_expiry=next_cooldown_expiry,
282 cooldown_duration=next_cooldown_duration,
283 )
284
285 # Perform the atomic swap of the state object.
286 self._data = updated_data
287
288
289class _RegionalAccessBoundaryRefreshThread(threading.Thread):
290 """Thread for background refreshing of the Regional Access Boundary."""
291
292 def __init__(
293 self,
294 credentials: "google.auth.credentials.CredentialsWithRegionalAccessBoundary", # noqa: F821
295 request: "google.auth.transport.Request", # noqa: F821
296 rab_manager: "_RegionalAccessBoundaryManager",
297 ):
298 super().__init__()
299 self.daemon = True
300 self._credentials = credentials
301 self._request = request
302 self._rab_manager = rab_manager
303
304 def run(self):
305 """
306 Performs the Regional Access Boundary lookup and updates the state.
307
308 This method is run in a separate thread. It delegates the actual lookup
309 to the credentials object's `_lookup_regional_access_boundary` method.
310 Based on the lookup's outcome (success or complete failure after retries),
311 it updates the cached Regional Access Boundary information,
312 its expiry, its cooldown expiry, and its exponential cooldown duration.
313 """
314 # Catch exceptions (e.g., from the underlying transport) to prevent the
315 # background thread from crashing. This ensures we can gracefully enter
316 # an exponential cooldown state on failure.
317 try:
318 regional_access_boundary_info = (
319 self._credentials._lookup_regional_access_boundary(self._request)
320 )
321 except Exception as e:
322 if _helpers.is_logging_enabled(_LOGGER):
323 _LOGGER.warning(
324 "Asynchronous Regional Access Boundary lookup raised an exception: %s",
325 e,
326 exc_info=True,
327 )
328 regional_access_boundary_info = None
329
330 self._rab_manager.process_regional_access_boundary_info(
331 regional_access_boundary_info
332 )
333
334
335class _RegionalAccessBoundaryRefreshManager(object):
336 """Manages a thread for background refreshing of the Regional Access Boundary."""
337
338 def __init__(self):
339 self._lock = threading.Lock()
340 self._worker = None
341
342 def __getstate__(self):
343 """Pickle helper that serializes the _lock and _worker attributes."""
344 state = self.__dict__.copy()
345 state["_lock"] = None
346 state["_worker"] = None
347 return state
348
349 def __setstate__(self, state):
350 """Pickle helper that deserializes the _lock and _worker attributes."""
351 self.__dict__.update(state)
352 self._lock = threading.Lock()
353 self._worker = None
354
355 def start_refresh(self, credentials, request, rab_manager):
356 """
357 Starts a background thread to refresh the Regional Access Boundary if one is not already running.
358
359 Args:
360 credentials (CredentialsWithRegionalAccessBoundary): The credentials
361 to refresh.
362 request (google.auth.transport.Request): The object used to make
363 HTTP requests.
364 rab_manager (_RegionalAccessBoundaryManager): The manager container to update.
365 """
366 with self._lock:
367 if self._worker and self._worker.is_alive():
368 # A refresh is already in progress.
369 return
370
371 try:
372 copied_request = copy.deepcopy(request)
373 except Exception as e:
374 if _helpers.is_logging_enabled(_LOGGER):
375 _LOGGER.warning(
376 "Could not deepcopy transport for background RAB refresh. "
377 "Skipping background refresh to avoid thread safety issues. "
378 "Exception: %s",
379 e,
380 )
381 return
382
383 self._worker = _RegionalAccessBoundaryRefreshThread(
384 credentials, copied_request, rab_manager
385 )
386 self._worker.start()