1# Copyright 2015 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helper functions for commonly used utilities."""
16
17import base64
18import calendar
19import datetime
20from email.message import Message
21import hashlib
22import json
23import logging
24import os
25import sys
26from typing import Any, Dict, Mapping, Optional, Union
27import urllib
28
29from google.auth import exceptions
30
31
32# _BASE_LOGGER_NAME is the base logger for all google-based loggers.
33_BASE_LOGGER_NAME = "google"
34
35# _LOGGING_INITIALIZED ensures that base logger is only configured once
36# (unless already configured by the end-user).
37_LOGGING_INITIALIZED = False
38
39
40# The smallest MDS cache used by this library stores tokens until 4 minutes from
41# expiry.
42REFRESH_THRESHOLD = datetime.timedelta(minutes=3, seconds=45)
43
44# TODO(https://github.com/googleapis/google-auth-library-python/issues/1684): Audit and update the list below.
45_SENSITIVE_FIELDS = {
46 "accessToken",
47 "access_token",
48 "id_token",
49 "client_id",
50 "refresh_token",
51 "client_secret",
52}
53
54
55def copy_docstring(source_class):
56 """Decorator that copies a method's docstring from another class.
57
58 Args:
59 source_class (type): The class that has the documented method.
60
61 Returns:
62 Callable: A decorator that will copy the docstring of the same
63 named method in the source class to the decorated method.
64 """
65
66 def decorator(method):
67 """Decorator implementation.
68
69 Args:
70 method (Callable): The method to copy the docstring to.
71
72 Returns:
73 Callable: the same method passed in with an updated docstring.
74
75 Raises:
76 google.auth.exceptions.InvalidOperation: if the method already has a docstring.
77 """
78 if method.__doc__:
79 raise exceptions.InvalidOperation("Method already has a docstring.")
80
81 source_method = getattr(source_class, method.__name__)
82 method.__doc__ = source_method.__doc__
83
84 return method
85
86 return decorator
87
88
89def parse_content_type(header_value):
90 """Parse a 'content-type' header value to get just the plain media-type (without parameters).
91
92 This is done using the class Message from email.message as suggested in PEP 594
93 (because the cgi is now deprecated and will be removed in python 3.13,
94 see https://peps.python.org/pep-0594/#cgi).
95
96 Args:
97 header_value (str): The value of a 'content-type' header as a string.
98
99 Returns:
100 str: A string with just the lowercase media-type from the parsed 'content-type' header.
101 If the provided content-type is not parsable, returns 'text/plain',
102 the default value for textual files.
103 """
104 m = Message()
105 m["content-type"] = header_value
106 return (
107 m.get_content_type()
108 ) # Despite the name, actually returns just the media-type
109
110
111def utcnow():
112 """Returns the current UTC datetime.
113
114 Returns:
115 datetime: The current time in UTC.
116 """
117 # We used datetime.utcnow() before, since it's deprecated from python 3.12,
118 # we are using datetime.now(timezone.utc) now. "utcnow()" is offset-native
119 # (no timezone info), but "now()" is offset-aware (with timezone info).
120 # This will cause datetime comparison problem. For backward compatibility,
121 # we need to remove the timezone info.
122 now = datetime.datetime.now(datetime.timezone.utc)
123 now = now.replace(tzinfo=None)
124 return now
125
126
127def datetime_to_secs(value):
128 """Convert a datetime object to the number of seconds since the UNIX epoch.
129
130 Args:
131 value (datetime): The datetime to convert.
132
133 Returns:
134 int: The number of seconds since the UNIX epoch.
135 """
136 return calendar.timegm(value.utctimetuple())
137
138
139def to_bytes(value, encoding="utf-8"):
140 """Converts a string value to bytes, if necessary.
141
142 Args:
143 value (Union[str, bytes]): The value to be converted.
144 encoding (str): The encoding to use to convert unicode to bytes.
145 Defaults to "utf-8".
146
147 Returns:
148 bytes: The original value converted to bytes (if unicode) or as
149 passed in if it started out as bytes.
150
151 Raises:
152 google.auth.exceptions.InvalidValue: If the value could not be converted to bytes.
153 """
154 result = value.encode(encoding) if isinstance(value, str) else value
155 if isinstance(result, bytes):
156 return result
157 else:
158 raise exceptions.InvalidValue(
159 "{0!r} could not be converted to bytes".format(value)
160 )
161
162
163def from_bytes(value):
164 """Converts bytes to a string value, if necessary.
165
166 Args:
167 value (Union[str, bytes]): The value to be converted.
168
169 Returns:
170 str: The original value converted to unicode (if bytes) or as passed in
171 if it started out as unicode.
172
173 Raises:
174 google.auth.exceptions.InvalidValue: If the value could not be converted to unicode.
175 """
176 result = value.decode("utf-8") if isinstance(value, bytes) else value
177 if isinstance(result, str):
178 return result
179 else:
180 raise exceptions.InvalidValue(
181 "{0!r} could not be converted to unicode".format(value)
182 )
183
184
185def update_query(url, params, remove=None):
186 """Updates a URL's query parameters.
187
188 Replaces any current values if they are already present in the URL.
189
190 Args:
191 url (str): The URL to update.
192 params (Mapping[str, str]): A mapping of query parameter
193 keys to values.
194 remove (Sequence[str]): Parameters to remove from the query string.
195
196 Returns:
197 str: The URL with updated query parameters.
198
199 Examples:
200
201 >>> url = 'http://example.com?a=1'
202 >>> update_query(url, {'a': '2'})
203 http://example.com?a=2
204 >>> update_query(url, {'b': '3'})
205 http://example.com?a=1&b=3
206 >> update_query(url, {'b': '3'}, remove=['a'])
207 http://example.com?b=3
208
209 """
210 if remove is None:
211 remove = []
212
213 # Split the URL into parts.
214 parts = urllib.parse.urlparse(url)
215 # Parse the query string.
216 query_params = urllib.parse.parse_qs(parts.query)
217 # Update the query parameters with the new parameters.
218 query_params.update(params)
219 # Remove any values specified in remove.
220 query_params = {
221 key: value for key, value in query_params.items() if key not in remove
222 }
223 # Re-encoded the query string.
224 new_query = urllib.parse.urlencode(query_params, doseq=True)
225 # Unsplit the url.
226 new_parts = parts._replace(query=new_query)
227 return urllib.parse.urlunparse(new_parts)
228
229
230def scopes_to_string(scopes):
231 """Converts scope value to a string suitable for sending to OAuth 2.0
232 authorization servers.
233
234 Args:
235 scopes (Sequence[str]): The sequence of scopes to convert.
236
237 Returns:
238 str: The scopes formatted as a single string.
239 """
240 return " ".join(scopes)
241
242
243def string_to_scopes(scopes):
244 """Converts stringifed scopes value to a list.
245
246 Args:
247 scopes (Union[Sequence, str]): The string of space-separated scopes
248 to convert.
249 Returns:
250 Sequence(str): The separated scopes.
251 """
252 if not scopes:
253 return []
254
255 return scopes.split(" ")
256
257
258def padded_urlsafe_b64decode(value):
259 """Decodes base64 strings lacking padding characters.
260
261 Google infrastructure tends to omit the base64 padding characters.
262
263 Args:
264 value (Union[str, bytes]): The encoded value.
265
266 Returns:
267 bytes: The decoded value
268 """
269 b64string = to_bytes(value)
270 padded = b64string + b"=" * (-len(b64string) % 4)
271 return base64.urlsafe_b64decode(padded)
272
273
274def unpadded_urlsafe_b64encode(value):
275 """Encodes base64 strings removing any padding characters.
276
277 `rfc 7515`_ defines Base64url to NOT include any padding
278 characters, but the stdlib doesn't do that by default.
279
280 _rfc7515: https://tools.ietf.org/html/rfc7515#page-6
281
282 Args:
283 value (Union[str|bytes]): The bytes-like value to encode
284
285 Returns:
286 Union[str|bytes]: The encoded value
287 """
288 return base64.urlsafe_b64encode(value).rstrip(b"=")
289
290
291def get_bool_from_env(variable_name, default=False):
292 """Gets a boolean value from an environment variable.
293
294 The environment variable is interpreted as a boolean with the following
295 (case-insensitive) rules:
296 - "true", "1" are considered true.
297 - "false", "0" are considered false.
298 Any other values will raise an exception.
299
300 Args:
301 variable_name (str): The name of the environment variable.
302 default (bool): The default value if the environment variable is not
303 set.
304
305 Returns:
306 bool: The boolean value of the environment variable.
307
308 Raises:
309 google.auth.exceptions.InvalidValue: If the environment variable is
310 set to a value that can not be interpreted as a boolean.
311 """
312 value = os.environ.get(variable_name)
313
314 if value is None:
315 return default
316
317 value = value.lower()
318
319 if value in ("true", "1"):
320 return True
321 elif value in ("false", "0"):
322 return False
323 else:
324 raise exceptions.InvalidValue(
325 'Environment variable "{}" must be one of "true", "false", "1", or "0".'.format(
326 variable_name
327 )
328 )
329
330
331def is_python_3():
332 """Check if the Python interpreter is Python 2 or 3.
333
334 Returns:
335 bool: True if the Python interpreter is Python 3 and False otherwise.
336 """
337 return sys.version_info > (3, 0)
338
339
340def _hash_sensitive_info(data: Union[dict, list]) -> Union[dict, list, str]:
341 """
342 Hashes sensitive information within a dictionary.
343
344 Args:
345 data: The dictionary containing data to be processed.
346
347 Returns:
348 A new dictionary with sensitive values replaced by their SHA512 hashes.
349 If the input is a list, returns a list with each element recursively processed.
350 If the input is neither a dict nor a list, returns the type of the input as a string.
351
352 """
353 if isinstance(data, dict):
354 hashed_data: Dict[Any, Union[Optional[str], dict, list]] = {}
355 for key, value in data.items():
356 if key in _SENSITIVE_FIELDS and not isinstance(value, (dict, list)):
357 hashed_data[key] = _hash_value(value, key)
358 elif isinstance(value, (dict, list)):
359 hashed_data[key] = _hash_sensitive_info(value)
360 else:
361 hashed_data[key] = value
362 return hashed_data
363 elif isinstance(data, list):
364 hashed_list = []
365 for val in data:
366 hashed_list.append(_hash_sensitive_info(val))
367 return hashed_list
368 else:
369 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1701):
370 # Investigate and hash sensitive info before logging when the data type is
371 # not a dict or a list.
372 return str(type(data))
373
374
375def _hash_value(value, field_name: str) -> Optional[str]:
376 """Hashes a value and returns a formatted hash string."""
377 if value is None:
378 return None
379 encoded_value = str(value).encode("utf-8")
380 hash_object = hashlib.sha512()
381 hash_object.update(encoded_value)
382 hex_digest = hash_object.hexdigest()
383 return f"hashed_{field_name}-{hex_digest}"
384
385
386def _logger_configured(logger: logging.Logger) -> bool:
387 """Determines whether `logger` has non-default configuration
388
389 Args:
390 logger: The logger to check.
391
392 Returns:
393 bool: Whether the logger has any non-default configuration.
394 """
395 return (
396 logger.handlers != [] or logger.level != logging.NOTSET or not logger.propagate
397 )
398
399
400def is_logging_enabled(logger: logging.Logger) -> bool:
401 """
402 Checks if debug logging is enabled for the given logger.
403
404 Args:
405 logger: The logging.Logger instance to check.
406
407 Returns:
408 True if debug logging is enabled, False otherwise.
409 """
410 # NOTE: Log propagation to the root logger is disabled unless
411 # the base logger i.e. logging.getLogger("google") is
412 # explicitly configured by the end user. Ideally this
413 # needs to happen in the client layer (already does for GAPICs).
414 # However, this is implemented here to avoid logging
415 # (if a root logger is configured) when a version of google-auth
416 # which supports logging is used with:
417 # - an older version of a GAPIC which does not support logging.
418 # - Apiary client which does not support logging.
419 global _LOGGING_INITIALIZED
420 if not _LOGGING_INITIALIZED:
421 base_logger = logging.getLogger(_BASE_LOGGER_NAME)
422 if not _logger_configured(base_logger):
423 base_logger.propagate = False
424 _LOGGING_INITIALIZED = True
425
426 return logger.isEnabledFor(logging.DEBUG)
427
428
429def request_log(
430 logger: logging.Logger,
431 method: str,
432 url: str,
433 body: Optional[bytes],
434 headers: Optional[Mapping[str, str]],
435) -> None:
436 """
437 Logs an HTTP request at the DEBUG level if logging is enabled.
438
439 Args:
440 logger: The logging.Logger instance to use.
441 method: The HTTP method (e.g., "GET", "POST").
442 url: The URL of the request.
443 body: The request body (can be None).
444 headers: The request headers (can be None).
445 """
446 if is_logging_enabled(logger):
447 content_type = (
448 headers["Content-Type"] if headers and "Content-Type" in headers else ""
449 )
450 json_body = _parse_request_body(body, content_type=content_type)
451 logged_body = _hash_sensitive_info(json_body)
452 logger.debug(
453 "Making request...",
454 extra={
455 "httpRequest": {
456 "method": method,
457 "url": url,
458 "body": logged_body,
459 "headers": headers,
460 }
461 },
462 )
463
464
465def _parse_request_body(body: Optional[bytes], content_type: str = "") -> Any:
466 """
467 Parses a request body, handling bytes and string types, and different content types.
468
469 Args:
470 body (Optional[bytes]): The request body.
471 content_type (str): The content type of the request body, e.g., "application/json",
472 "application/x-www-form-urlencoded", or "text/plain". If empty, attempts
473 to parse as JSON.
474
475 Returns:
476 Parsed body (dict, str, or None).
477 - JSON: Decodes if content_type is "application/json" or None (fallback).
478 - URL-encoded: Parses if content_type is "application/x-www-form-urlencoded".
479 - Plain text: Returns string if content_type is "text/plain".
480 - None: Returns if body is None, UTF-8 decode fails, or content_type is unknown.
481 """
482 if body is None:
483 return None
484 try:
485 body_str = body.decode("utf-8")
486 except (UnicodeDecodeError, AttributeError):
487 return None
488 content_type = content_type.lower()
489 if not content_type or "application/json" in content_type:
490 try:
491 return json.loads(body_str)
492 except (json.JSONDecodeError, TypeError):
493 return body_str
494 if "application/x-www-form-urlencoded" in content_type:
495 parsed_query = urllib.parse.parse_qs(body_str)
496 result = {k: v[0] for k, v in parsed_query.items()}
497 return result
498 if "text/plain" in content_type:
499 return body_str
500 return None
501
502
503def _parse_response(response: Any) -> Any:
504 """
505 Parses a response, attempting to decode JSON.
506
507 Args:
508 response: The response object to parse. This can be any type, but
509 it is expected to have a `json()` method if it contains JSON.
510
511 Returns:
512 The parsed response. If the response contains valid JSON, the
513 decoded JSON object (e.g., a dictionary or list) is returned.
514 If the response does not have a `json()` method or if the JSON
515 decoding fails, None is returned.
516 """
517 try:
518 json_response = response.json()
519 return json_response
520 except Exception:
521 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1744):
522 # Parse and return response payload as json based on different content types.
523 return None
524
525
526def _response_log_base(logger: logging.Logger, parsed_response: Any) -> None:
527 """
528 Logs a parsed HTTP response at the DEBUG level.
529
530 This internal helper function takes a parsed response and logs it
531 using the provided logger. It also applies a hashing function to
532 potentially sensitive information before logging.
533
534 Args:
535 logger: The logging.Logger instance to use for logging.
536 parsed_response: The parsed HTTP response object (e.g., a dictionary,
537 list, or the original response if parsing failed).
538 """
539
540 logged_response = _hash_sensitive_info(parsed_response)
541 logger.debug("Response received...", extra={"httpResponse": logged_response})
542
543
544def response_log(logger: logging.Logger, response: Any) -> None:
545 """
546 Logs an HTTP response at the DEBUG level if logging is enabled.
547
548 Args:
549 logger: The logging.Logger instance to use.
550 response: The HTTP response object to log.
551 """
552 if is_logging_enabled(logger):
553 json_response = _parse_response(response)
554 _response_log_base(logger, json_response)