Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_helpers.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2015 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helper functions for commonly used utilities."""
17import base64
18import calendar
19import datetime
20from email.message import Message
21import hashlib
22import json
23import logging
24import os
25import sys
26from typing import Any, Dict, Mapping, Optional, Union
27import urllib
29from google.auth import exceptions
32# _BASE_LOGGER_NAME is the base logger for all google-based loggers.
33_BASE_LOGGER_NAME = "google"
35# _LOGGING_INITIALIZED ensures that base logger is only configured once
36# (unless already configured by the end-user).
37_LOGGING_INITIALIZED = False
40# The smallest MDS cache used by this library stores tokens until 4 minutes from
41# expiry.
42REFRESH_THRESHOLD = datetime.timedelta(minutes=3, seconds=45)
44# TODO(https://github.com/googleapis/google-auth-library-python/issues/1684): Audit and update the list below.
45_SENSITIVE_FIELDS = {
46 "accessToken",
47 "access_token",
48 "id_token",
49 "client_id",
50 "refresh_token",
51 "client_secret",
52}
55def copy_docstring(source_class):
56 """Decorator that copies a method's docstring from another class.
58 Args:
59 source_class (type): The class that has the documented method.
61 Returns:
62 Callable: A decorator that will copy the docstring of the same
63 named method in the source class to the decorated method.
64 """
66 def decorator(method):
67 """Decorator implementation.
69 Args:
70 method (Callable): The method to copy the docstring to.
72 Returns:
73 Callable: the same method passed in with an updated docstring.
75 Raises:
76 google.auth.exceptions.InvalidOperation: if the method already has a docstring.
77 """
78 if method.__doc__:
79 raise exceptions.InvalidOperation("Method already has a docstring.")
81 source_method = getattr(source_class, method.__name__)
82 method.__doc__ = source_method.__doc__
84 return method
86 return decorator
89def parse_content_type(header_value):
90 """Parse a 'content-type' header value to get just the plain media-type (without parameters).
92 This is done using the class Message from email.message as suggested in PEP 594
93 (because the cgi is now deprecated and will be removed in python 3.13,
94 see https://peps.python.org/pep-0594/#cgi).
96 Args:
97 header_value (str): The value of a 'content-type' header as a string.
99 Returns:
100 str: A string with just the lowercase media-type from the parsed 'content-type' header.
101 If the provided content-type is not parsable, returns 'text/plain',
102 the default value for textual files.
103 """
104 m = Message()
105 m["content-type"] = header_value
106 return (
107 m.get_content_type()
108 ) # Despite the name, actually returns just the media-type
111def utcnow():
112 """Returns the current UTC datetime.
114 Returns:
115 datetime: The current time in UTC.
116 """
117 # We used datetime.utcnow() before, since it's deprecated from python 3.12,
118 # we are using datetime.now(timezone.utc) now. "utcnow()" is offset-native
119 # (no timezone info), but "now()" is offset-aware (with timezone info).
120 # This will cause datetime comparison problem. For backward compatibility,
121 # we need to remove the timezone info.
122 now = datetime.datetime.now(datetime.timezone.utc)
123 now = now.replace(tzinfo=None)
124 return now
127def utcfromtimestamp(timestamp):
128 """Returns the UTC datetime from a timestamp.
130 Args:
131 timestamp (float): The timestamp to convert.
133 Returns:
134 datetime: The time in UTC.
135 """
136 # We used datetime.utcfromtimestamp() before, since it's deprecated from
137 # python 3.12, we are using datetime.fromtimestamp(timestamp, timezone.utc)
138 # now. "utcfromtimestamp()" is offset-native (no timezone info), but
139 # "fromtimestamp(timestamp, timezone.utc)" is offset-aware (with timezone
140 # info). This will cause datetime comparison problem. For backward
141 # compatibility, we need to remove the timezone info.
142 dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
143 dt = dt.replace(tzinfo=None)
144 return dt
147def datetime_to_secs(value):
148 """Convert a datetime object to the number of seconds since the UNIX epoch.
150 Args:
151 value (datetime): The datetime to convert.
153 Returns:
154 int: The number of seconds since the UNIX epoch.
155 """
156 return calendar.timegm(value.utctimetuple())
159def to_bytes(value, encoding="utf-8"):
160 """Converts a string value to bytes, if necessary.
162 Args:
163 value (Union[str, bytes]): The value to be converted.
164 encoding (str): The encoding to use to convert unicode to bytes.
165 Defaults to "utf-8".
167 Returns:
168 bytes: The original value converted to bytes (if unicode) or as
169 passed in if it started out as bytes.
171 Raises:
172 google.auth.exceptions.InvalidValue: If the value could not be converted to bytes.
173 """
174 result = value.encode(encoding) if isinstance(value, str) else value
175 if isinstance(result, bytes):
176 return result
177 else:
178 raise exceptions.InvalidValue(
179 "{0!r} could not be converted to bytes".format(value)
180 )
183def from_bytes(value):
184 """Converts bytes to a string value, if necessary.
186 Args:
187 value (Union[str, bytes]): The value to be converted.
189 Returns:
190 str: The original value converted to unicode (if bytes) or as passed in
191 if it started out as unicode.
193 Raises:
194 google.auth.exceptions.InvalidValue: If the value could not be converted to unicode.
195 """
196 result = value.decode("utf-8") if isinstance(value, bytes) else value
197 if isinstance(result, str):
198 return result
199 else:
200 raise exceptions.InvalidValue(
201 "{0!r} could not be converted to unicode".format(value)
202 )
205def update_query(url, params, remove=None):
206 """Updates a URL's query parameters.
208 Replaces any current values if they are already present in the URL.
210 Args:
211 url (str): The URL to update.
212 params (Mapping[str, str]): A mapping of query parameter
213 keys to values.
214 remove (Sequence[str]): Parameters to remove from the query string.
216 Returns:
217 str: The URL with updated query parameters.
219 Examples:
221 >>> url = 'http://example.com?a=1'
222 >>> update_query(url, {'a': '2'})
223 http://example.com?a=2
224 >>> update_query(url, {'b': '3'})
225 http://example.com?a=1&b=3
226 >> update_query(url, {'b': '3'}, remove=['a'])
227 http://example.com?b=3
229 """
230 if remove is None:
231 remove = []
233 # Split the URL into parts.
234 parts = urllib.parse.urlparse(url)
235 # Parse the query string.
236 query_params = urllib.parse.parse_qs(parts.query)
237 # Update the query parameters with the new parameters.
238 query_params.update(params)
239 # Remove any values specified in remove.
240 query_params = {
241 key: value for key, value in query_params.items() if key not in remove
242 }
243 # Re-encoded the query string.
244 new_query = urllib.parse.urlencode(query_params, doseq=True)
245 # Unsplit the url.
246 new_parts = parts._replace(query=new_query)
247 return urllib.parse.urlunparse(new_parts)
250def scopes_to_string(scopes):
251 """Converts scope value to a string suitable for sending to OAuth 2.0
252 authorization servers.
254 Args:
255 scopes (Sequence[str]): The sequence of scopes to convert.
257 Returns:
258 str: The scopes formatted as a single string.
259 """
260 return " ".join(scopes)
263def string_to_scopes(scopes):
264 """Converts stringifed scopes value to a list.
266 Args:
267 scopes (Union[Sequence, str]): The string of space-separated scopes
268 to convert.
269 Returns:
270 Sequence(str): The separated scopes.
271 """
272 if not scopes:
273 return []
275 return scopes.split(" ")
278def padded_urlsafe_b64decode(value):
279 """Decodes base64 strings lacking padding characters.
281 Google infrastructure tends to omit the base64 padding characters.
283 Args:
284 value (Union[str, bytes]): The encoded value.
286 Returns:
287 bytes: The decoded value
288 """
289 b64string = to_bytes(value)
290 padded = b64string + b"=" * (-len(b64string) % 4)
291 return base64.urlsafe_b64decode(padded)
294def unpadded_urlsafe_b64encode(value):
295 """Encodes base64 strings removing any padding characters.
297 `rfc 7515`_ defines Base64url to NOT include any padding
298 characters, but the stdlib doesn't do that by default.
300 _rfc7515: https://tools.ietf.org/html/rfc7515#page-6
302 Args:
303 value (Union[str|bytes]): The bytes-like value to encode
305 Returns:
306 Union[str|bytes]: The encoded value
307 """
308 return base64.urlsafe_b64encode(value).rstrip(b"=")
311def get_bool_from_env(variable_name, default=False):
312 """Gets a boolean value from an environment variable.
314 The environment variable is interpreted as a boolean with the following
315 (case-insensitive) rules:
316 - "true", "1" are considered true.
317 - "false", "0" are considered false.
318 Any other values will raise an exception.
320 Args:
321 variable_name (str): The name of the environment variable.
322 default (bool): The default value if the environment variable is not
323 set.
325 Returns:
326 bool: The boolean value of the environment variable.
328 Raises:
329 google.auth.exceptions.InvalidValue: If the environment variable is
330 set to a value that can not be interpreted as a boolean.
331 """
332 value = os.environ.get(variable_name)
334 if value is None:
335 return default
337 value = value.lower()
339 if value in ("true", "1"):
340 return True
341 elif value in ("false", "0"):
342 return False
343 else:
344 raise exceptions.InvalidValue(
345 'Environment variable "{}" must be one of "true", "false", "1", or "0".'.format(
346 variable_name
347 )
348 )
351def is_python_3():
352 """Check if the Python interpreter is Python 2 or 3.
354 Returns:
355 bool: True if the Python interpreter is Python 3 and False otherwise.
356 """
358 return sys.version_info > (3, 0) # pragma: NO COVER
361def _hash_sensitive_info(data: Union[dict, list]) -> Union[dict, list, str]:
362 """
363 Hashes sensitive information within a dictionary.
365 Args:
366 data: The dictionary containing data to be processed.
368 Returns:
369 A new dictionary with sensitive values replaced by their SHA512 hashes.
370 If the input is a list, returns a list with each element recursively processed.
371 If the input is neither a dict nor a list, returns the type of the input as a string.
373 """
374 if isinstance(data, dict):
375 hashed_data: Dict[Any, Union[Optional[str], dict, list]] = {}
376 for key, value in data.items():
377 if key in _SENSITIVE_FIELDS and not isinstance(value, (dict, list)):
378 hashed_data[key] = _hash_value(value, key)
379 elif isinstance(value, (dict, list)):
380 hashed_data[key] = _hash_sensitive_info(value)
381 else:
382 hashed_data[key] = value
383 return hashed_data
384 elif isinstance(data, list):
385 hashed_list = []
386 for val in data:
387 hashed_list.append(_hash_sensitive_info(val))
388 return hashed_list
389 else:
390 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1701):
391 # Investigate and hash sensitive info before logging when the data type is
392 # not a dict or a list.
393 return str(type(data))
396def _hash_value(value, field_name: str) -> Optional[str]:
397 """Hashes a value and returns a formatted hash string."""
398 if value is None:
399 return None
400 encoded_value = str(value).encode("utf-8")
401 hash_object = hashlib.sha512()
402 hash_object.update(encoded_value)
403 hex_digest = hash_object.hexdigest()
404 return f"hashed_{field_name}-{hex_digest}"
407def _logger_configured(logger: logging.Logger) -> bool:
408 """Determines whether `logger` has non-default configuration
410 Args:
411 logger: The logger to check.
413 Returns:
414 bool: Whether the logger has any non-default configuration.
415 """
416 return (
417 logger.handlers != [] or logger.level != logging.NOTSET or not logger.propagate
418 )
421def is_logging_enabled(logger: logging.Logger) -> bool:
422 """
423 Checks if debug logging is enabled for the given logger.
425 Args:
426 logger: The logging.Logger instance to check.
428 Returns:
429 True if debug logging is enabled, False otherwise.
430 """
431 # NOTE: Log propagation to the root logger is disabled unless
432 # the base logger i.e. logging.getLogger("google") is
433 # explicitly configured by the end user. Ideally this
434 # needs to happen in the client layer (already does for GAPICs).
435 # However, this is implemented here to avoid logging
436 # (if a root logger is configured) when a version of google-auth
437 # which supports logging is used with:
438 # - an older version of a GAPIC which does not support logging.
439 # - Apiary client which does not support logging.
440 global _LOGGING_INITIALIZED
441 if not _LOGGING_INITIALIZED:
442 base_logger = logging.getLogger(_BASE_LOGGER_NAME)
443 if not _logger_configured(base_logger):
444 base_logger.propagate = False
445 _LOGGING_INITIALIZED = True
447 return logger.isEnabledFor(logging.DEBUG)
450def request_log(
451 logger: logging.Logger,
452 method: str,
453 url: str,
454 body: Optional[bytes],
455 headers: Optional[Mapping[str, str]],
456) -> None:
457 """
458 Logs an HTTP request at the DEBUG level if logging is enabled.
460 Args:
461 logger: The logging.Logger instance to use.
462 method: The HTTP method (e.g., "GET", "POST").
463 url: The URL of the request.
464 body: The request body (can be None).
465 headers: The request headers (can be None).
466 """
467 if is_logging_enabled(logger):
468 content_type = (
469 headers["Content-Type"] if headers and "Content-Type" in headers else ""
470 )
471 json_body = _parse_request_body(body, content_type=content_type)
472 logged_body = _hash_sensitive_info(json_body)
473 logger.debug(
474 "Making request...",
475 extra={
476 "httpRequest": {
477 "method": method,
478 "url": url,
479 "body": logged_body,
480 "headers": headers,
481 }
482 },
483 )
486def _parse_request_body(body: Optional[bytes], content_type: str = "") -> Any:
487 """
488 Parses a request body, handling bytes and string types, and different content types.
490 Args:
491 body (Optional[bytes]): The request body.
492 content_type (str): The content type of the request body, e.g., "application/json",
493 "application/x-www-form-urlencoded", or "text/plain". If empty, attempts
494 to parse as JSON.
496 Returns:
497 Parsed body (dict, str, or None).
498 - JSON: Decodes if content_type is "application/json" or None (fallback).
499 - URL-encoded: Parses if content_type is "application/x-www-form-urlencoded".
500 - Plain text: Returns string if content_type is "text/plain".
501 - None: Returns if body is None, UTF-8 decode fails, or content_type is unknown.
502 """
503 if body is None:
504 return None
505 try:
506 body_str = body.decode("utf-8")
507 except (UnicodeDecodeError, AttributeError):
508 return None
509 content_type = content_type.lower()
510 if not content_type or "application/json" in content_type:
511 try:
512 return json.loads(body_str)
513 except (TypeError, ValueError):
514 return body_str
515 if "application/x-www-form-urlencoded" in content_type:
516 parsed_query = urllib.parse.parse_qs(body_str)
517 result = {k: v[0] for k, v in parsed_query.items()}
518 return result
519 if "text/plain" in content_type:
520 return body_str
521 return None
524def _parse_response(response: Any) -> Any:
525 """
526 Parses a response, attempting to decode JSON.
528 Args:
529 response: The response object to parse. This can be any type, but
530 it is expected to have a `json()` method if it contains JSON.
532 Returns:
533 The parsed response. If the response contains valid JSON, the
534 decoded JSON object (e.g., a dictionary or list) is returned.
535 If the response does not have a `json()` method or if the JSON
536 decoding fails, None is returned.
537 """
538 try:
539 json_response = response.json()
540 return json_response
541 except Exception:
542 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1744):
543 # Parse and return response payload as json based on different content types.
544 return None
547def _response_log_base(logger: logging.Logger, parsed_response: Any) -> None:
548 """
549 Logs a parsed HTTP response at the DEBUG level.
551 This internal helper function takes a parsed response and logs it
552 using the provided logger. It also applies a hashing function to
553 potentially sensitive information before logging.
555 Args:
556 logger: The logging.Logger instance to use for logging.
557 parsed_response: The parsed HTTP response object (e.g., a dictionary,
558 list, or the original response if parsing failed).
559 """
561 logged_response = _hash_sensitive_info(parsed_response)
562 logger.debug("Response received...", extra={"httpResponse": logged_response})
565def response_log(logger: logging.Logger, response: Any) -> None:
566 """
567 Logs an HTTP response at the DEBUG level if logging is enabled.
569 Args:
570 logger: The logging.Logger instance to use.
571 response: The HTTP response object to log.
572 """
573 if is_logging_enabled(logger):
574 json_response = _parse_response(response)
575 _response_log_base(logger, json_response)