Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_helpers.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2015 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helper functions for commonly used utilities."""
17import base64
18import calendar
19import datetime
20from email.message import Message
21import hashlib
22import json
23import logging
24import sys
25from typing import Any, Dict, Mapping, Optional, Union
26import urllib
28from google.auth import exceptions
31DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
33# _BASE_LOGGER_NAME is the base logger for all google-based loggers.
34_BASE_LOGGER_NAME = "google"
36# _LOGGING_INITIALIZED ensures that base logger is only configured once
37# (unless already configured by the end-user).
38_LOGGING_INITIALIZED = False
41# The smallest MDS cache used by this library stores tokens until 4 minutes from
42# expiry.
43REFRESH_THRESHOLD = datetime.timedelta(minutes=3, seconds=45)
45# TODO(https://github.com/googleapis/google-auth-library-python/issues/1684): Audit and update the list below.
46_SENSITIVE_FIELDS = {
47 "accessToken",
48 "access_token",
49 "id_token",
50 "client_id",
51 "refresh_token",
52 "client_secret",
53}
56def copy_docstring(source_class):
57 """Decorator that copies a method's docstring from another class.
59 Args:
60 source_class (type): The class that has the documented method.
62 Returns:
63 Callable: A decorator that will copy the docstring of the same
64 named method in the source class to the decorated method.
65 """
67 def decorator(method):
68 """Decorator implementation.
70 Args:
71 method (Callable): The method to copy the docstring to.
73 Returns:
74 Callable: the same method passed in with an updated docstring.
76 Raises:
77 google.auth.exceptions.InvalidOperation: if the method already has a docstring.
78 """
79 if method.__doc__:
80 raise exceptions.InvalidOperation("Method already has a docstring.")
82 source_method = getattr(source_class, method.__name__)
83 method.__doc__ = source_method.__doc__
85 return method
87 return decorator
90def parse_content_type(header_value):
91 """Parse a 'content-type' header value to get just the plain media-type (without parameters).
93 This is done using the class Message from email.message as suggested in PEP 594
94 (because the cgi is now deprecated and will be removed in python 3.13,
95 see https://peps.python.org/pep-0594/#cgi).
97 Args:
98 header_value (str): The value of a 'content-type' header as a string.
100 Returns:
101 str: A string with just the lowercase media-type from the parsed 'content-type' header.
102 If the provided content-type is not parsable, returns 'text/plain',
103 the default value for textual files.
104 """
105 m = Message()
106 m["content-type"] = header_value
107 return (
108 m.get_content_type()
109 ) # Despite the name, actually returns just the media-type
112def utcnow():
113 """Returns the current UTC datetime.
115 Returns:
116 datetime: The current time in UTC.
117 """
118 # We used datetime.utcnow() before, since it's deprecated from python 3.12,
119 # we are using datetime.now(timezone.utc) now. "utcnow()" is offset-native
120 # (no timezone info), but "now()" is offset-aware (with timezone info).
121 # This will cause datetime comparison problem. For backward compatibility,
122 # we need to remove the timezone info.
123 now = datetime.datetime.now(datetime.timezone.utc)
124 now = now.replace(tzinfo=None)
125 return now
128def utcfromtimestamp(timestamp):
129 """Returns the UTC datetime from a timestamp.
131 Args:
132 timestamp (float): The timestamp to convert.
134 Returns:
135 datetime: The time in UTC.
136 """
137 # We used datetime.utcfromtimestamp() before, since it's deprecated from
138 # python 3.12, we are using datetime.fromtimestamp(timestamp, timezone.utc)
139 # now. "utcfromtimestamp()" is offset-native (no timezone info), but
140 # "fromtimestamp(timestamp, timezone.utc)" is offset-aware (with timezone
141 # info). This will cause datetime comparison problem. For backward
142 # compatibility, we need to remove the timezone info.
143 dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
144 dt = dt.replace(tzinfo=None)
145 return dt
148def datetime_to_secs(value):
149 """Convert a datetime object to the number of seconds since the UNIX epoch.
151 Args:
152 value (datetime): The datetime to convert.
154 Returns:
155 int: The number of seconds since the UNIX epoch.
156 """
157 return calendar.timegm(value.utctimetuple())
160def to_bytes(value, encoding="utf-8"):
161 """Converts a string value to bytes, if necessary.
163 Args:
164 value (Union[str, bytes]): The value to be converted.
165 encoding (str): The encoding to use to convert unicode to bytes.
166 Defaults to "utf-8".
168 Returns:
169 bytes: The original value converted to bytes (if unicode) or as
170 passed in if it started out as bytes.
172 Raises:
173 google.auth.exceptions.InvalidValue: If the value could not be converted to bytes.
174 """
175 result = value.encode(encoding) if isinstance(value, str) else value
176 if isinstance(result, bytes):
177 return result
178 else:
179 raise exceptions.InvalidValue(
180 "{0!r} could not be converted to bytes".format(value)
181 )
184def from_bytes(value):
185 """Converts bytes to a string value, if necessary.
187 Args:
188 value (Union[str, bytes]): The value to be converted.
190 Returns:
191 str: The original value converted to unicode (if bytes) or as passed in
192 if it started out as unicode.
194 Raises:
195 google.auth.exceptions.InvalidValue: If the value could not be converted to unicode.
196 """
197 result = value.decode("utf-8") if isinstance(value, bytes) else value
198 if isinstance(result, str):
199 return result
200 else:
201 raise exceptions.InvalidValue(
202 "{0!r} could not be converted to unicode".format(value)
203 )
206def update_query(url, params, remove=None):
207 """Updates a URL's query parameters.
209 Replaces any current values if they are already present in the URL.
211 Args:
212 url (str): The URL to update.
213 params (Mapping[str, str]): A mapping of query parameter
214 keys to values.
215 remove (Sequence[str]): Parameters to remove from the query string.
217 Returns:
218 str: The URL with updated query parameters.
220 Examples:
222 >>> url = 'http://example.com?a=1'
223 >>> update_query(url, {'a': '2'})
224 http://example.com?a=2
225 >>> update_query(url, {'b': '3'})
226 http://example.com?a=1&b=3
227 >> update_query(url, {'b': '3'}, remove=['a'])
228 http://example.com?b=3
230 """
231 if remove is None:
232 remove = []
234 # Split the URL into parts.
235 parts = urllib.parse.urlparse(url)
236 # Parse the query string.
237 query_params = urllib.parse.parse_qs(parts.query)
238 # Update the query parameters with the new parameters.
239 query_params.update(params)
240 # Remove any values specified in remove.
241 query_params = {
242 key: value for key, value in query_params.items() if key not in remove
243 }
244 # Re-encoded the query string.
245 new_query = urllib.parse.urlencode(query_params, doseq=True)
246 # Unsplit the url.
247 new_parts = parts._replace(query=new_query)
248 return urllib.parse.urlunparse(new_parts)
251def scopes_to_string(scopes):
252 """Converts scope value to a string suitable for sending to OAuth 2.0
253 authorization servers.
255 Args:
256 scopes (Sequence[str]): The sequence of scopes to convert.
258 Returns:
259 str: The scopes formatted as a single string.
260 """
261 return " ".join(scopes)
264def string_to_scopes(scopes):
265 """Converts stringifed scopes value to a list.
267 Args:
268 scopes (Union[Sequence, str]): The string of space-separated scopes
269 to convert.
270 Returns:
271 Sequence(str): The separated scopes.
272 """
273 if not scopes:
274 return []
276 return scopes.split(" ")
279def padded_urlsafe_b64decode(value):
280 """Decodes base64 strings lacking padding characters.
282 Google infrastructure tends to omit the base64 padding characters.
284 Args:
285 value (Union[str, bytes]): The encoded value.
287 Returns:
288 bytes: The decoded value
289 """
290 b64string = to_bytes(value)
291 padded = b64string + b"=" * (-len(b64string) % 4)
292 return base64.urlsafe_b64decode(padded)
295def unpadded_urlsafe_b64encode(value):
296 """Encodes base64 strings removing any padding characters.
298 `rfc 7515`_ defines Base64url to NOT include any padding
299 characters, but the stdlib doesn't do that by default.
301 _rfc7515: https://tools.ietf.org/html/rfc7515#page-6
303 Args:
304 value (Union[str|bytes]): The bytes-like value to encode
306 Returns:
307 Union[str|bytes]: The encoded value
308 """
309 return base64.urlsafe_b64encode(value).rstrip(b"=")
312def is_python_3():
313 """Check if the Python interpreter is Python 2 or 3.
315 Returns:
316 bool: True if the Python interpreter is Python 3 and False otherwise.
317 """
319 return sys.version_info > (3, 0) # pragma: NO COVER
322def _hash_sensitive_info(data: Union[dict, list]) -> Union[dict, list, str]:
323 """
324 Hashes sensitive information within a dictionary.
326 Args:
327 data: The dictionary containing data to be processed.
329 Returns:
330 A new dictionary with sensitive values replaced by their SHA512 hashes.
331 If the input is a list, returns a list with each element recursively processed.
332 If the input is neither a dict nor a list, returns the type of the input as a string.
334 """
335 if isinstance(data, dict):
336 hashed_data: Dict[Any, Union[Optional[str], dict, list]] = {}
337 for key, value in data.items():
338 if key in _SENSITIVE_FIELDS and not isinstance(value, (dict, list)):
339 hashed_data[key] = _hash_value(value, key)
340 elif isinstance(value, (dict, list)):
341 hashed_data[key] = _hash_sensitive_info(value)
342 else:
343 hashed_data[key] = value
344 return hashed_data
345 elif isinstance(data, list):
346 hashed_list = []
347 for val in data:
348 hashed_list.append(_hash_sensitive_info(val))
349 return hashed_list
350 else:
351 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1701):
352 # Investigate and hash sensitive info before logging when the data type is
353 # not a dict or a list.
354 return str(type(data))
357def _hash_value(value, field_name: str) -> Optional[str]:
358 """Hashes a value and returns a formatted hash string."""
359 if value is None:
360 return None
361 encoded_value = str(value).encode("utf-8")
362 hash_object = hashlib.sha512()
363 hash_object.update(encoded_value)
364 hex_digest = hash_object.hexdigest()
365 return f"hashed_{field_name}-{hex_digest}"
368def _logger_configured(logger: logging.Logger) -> bool:
369 """Determines whether `logger` has non-default configuration
371 Args:
372 logger: The logger to check.
374 Returns:
375 bool: Whether the logger has any non-default configuration.
376 """
377 return (
378 logger.handlers != [] or logger.level != logging.NOTSET or not logger.propagate
379 )
382def is_logging_enabled(logger: logging.Logger) -> bool:
383 """
384 Checks if debug logging is enabled for the given logger.
386 Args:
387 logger: The logging.Logger instance to check.
389 Returns:
390 True if debug logging is enabled, False otherwise.
391 """
392 # NOTE: Log propagation to the root logger is disabled unless
393 # the base logger i.e. logging.getLogger("google") is
394 # explicitly configured by the end user. Ideally this
395 # needs to happen in the client layer (already does for GAPICs).
396 # However, this is implemented here to avoid logging
397 # (if a root logger is configured) when a version of google-auth
398 # which supports logging is used with:
399 # - an older version of a GAPIC which does not support logging.
400 # - Apiary client which does not support logging.
401 global _LOGGING_INITIALIZED
402 if not _LOGGING_INITIALIZED:
403 base_logger = logging.getLogger(_BASE_LOGGER_NAME)
404 if not _logger_configured(base_logger):
405 base_logger.propagate = False
406 _LOGGING_INITIALIZED = True
408 return logger.isEnabledFor(logging.DEBUG)
411def request_log(
412 logger: logging.Logger,
413 method: str,
414 url: str,
415 body: Optional[bytes],
416 headers: Optional[Mapping[str, str]],
417) -> None:
418 """
419 Logs an HTTP request at the DEBUG level if logging is enabled.
421 Args:
422 logger: The logging.Logger instance to use.
423 method: The HTTP method (e.g., "GET", "POST").
424 url: The URL of the request.
425 body: The request body (can be None).
426 headers: The request headers (can be None).
427 """
428 if is_logging_enabled(logger):
429 content_type = (
430 headers["Content-Type"] if headers and "Content-Type" in headers else ""
431 )
432 json_body = _parse_request_body(body, content_type=content_type)
433 logged_body = _hash_sensitive_info(json_body)
434 logger.debug(
435 "Making request...",
436 extra={
437 "httpRequest": {
438 "method": method,
439 "url": url,
440 "body": logged_body,
441 "headers": headers,
442 }
443 },
444 )
447def _parse_request_body(body: Optional[bytes], content_type: str = "") -> Any:
448 """
449 Parses a request body, handling bytes and string types, and different content types.
451 Args:
452 body (Optional[bytes]): The request body.
453 content_type (str): The content type of the request body, e.g., "application/json",
454 "application/x-www-form-urlencoded", or "text/plain". If empty, attempts
455 to parse as JSON.
457 Returns:
458 Parsed body (dict, str, or None).
459 - JSON: Decodes if content_type is "application/json" or None (fallback).
460 - URL-encoded: Parses if content_type is "application/x-www-form-urlencoded".
461 - Plain text: Returns string if content_type is "text/plain".
462 - None: Returns if body is None, UTF-8 decode fails, or content_type is unknown.
463 """
464 if body is None:
465 return None
466 try:
467 body_str = body.decode("utf-8")
468 except (UnicodeDecodeError, AttributeError):
469 return None
470 content_type = content_type.lower()
471 if not content_type or "application/json" in content_type:
472 try:
473 return json.loads(body_str)
474 except (TypeError, ValueError):
475 return body_str
476 if "application/x-www-form-urlencoded" in content_type:
477 parsed_query = urllib.parse.parse_qs(body_str)
478 result = {k: v[0] for k, v in parsed_query.items()}
479 return result
480 if "text/plain" in content_type:
481 return body_str
482 return None
485def _parse_response(response: Any) -> Any:
486 """
487 Parses a response, attempting to decode JSON.
489 Args:
490 response: The response object to parse. This can be any type, but
491 it is expected to have a `json()` method if it contains JSON.
493 Returns:
494 The parsed response. If the response contains valid JSON, the
495 decoded JSON object (e.g., a dictionary or list) is returned.
496 If the response does not have a `json()` method or if the JSON
497 decoding fails, None is returned.
498 """
499 try:
500 json_response = response.json()
501 return json_response
502 except Exception:
503 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1744):
504 # Parse and return response payload as json based on different content types.
505 return None
508def _response_log_base(logger: logging.Logger, parsed_response: Any) -> None:
509 """
510 Logs a parsed HTTP response at the DEBUG level.
512 This internal helper function takes a parsed response and logs it
513 using the provided logger. It also applies a hashing function to
514 potentially sensitive information before logging.
516 Args:
517 logger: The logging.Logger instance to use for logging.
518 parsed_response: The parsed HTTP response object (e.g., a dictionary,
519 list, or the original response if parsing failed).
520 """
522 logged_response = _hash_sensitive_info(parsed_response)
523 logger.debug("Response received...", extra={"httpResponse": logged_response})
526def response_log(logger: logging.Logger, response: Any) -> None:
527 """
528 Logs an HTTP response at the DEBUG level if logging is enabled.
530 Args:
531 logger: The logging.Logger instance to use.
532 response: The HTTP response object to log.
533 """
534 if is_logging_enabled(logger):
535 json_response = _parse_response(response)
536 _response_log_base(logger, json_response)