Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_helpers.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2015 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helper functions for commonly used utilities."""
17import base64
18import calendar
19import datetime
20from email.message import Message
21import hashlib
22import json
23import logging
24import sys
25from typing import Any, Dict, Mapping, Optional, Union
26import urllib
28from google.auth import exceptions
31# _BASE_LOGGER_NAME is the base logger for all google-based loggers.
32_BASE_LOGGER_NAME = "google"
34# _LOGGING_INITIALIZED ensures that base logger is only configured once
35# (unless already configured by the end-user).
36_LOGGING_INITIALIZED = False
39# The smallest MDS cache used by this library stores tokens until 4 minutes from
40# expiry.
41REFRESH_THRESHOLD = datetime.timedelta(minutes=3, seconds=45)
43# TODO(https://github.com/googleapis/google-auth-library-python/issues/1684): Audit and update the list below.
44_SENSITIVE_FIELDS = {
45 "accessToken",
46 "access_token",
47 "id_token",
48 "client_id",
49 "refresh_token",
50 "client_secret",
51}
54def copy_docstring(source_class):
55 """Decorator that copies a method's docstring from another class.
57 Args:
58 source_class (type): The class that has the documented method.
60 Returns:
61 Callable: A decorator that will copy the docstring of the same
62 named method in the source class to the decorated method.
63 """
65 def decorator(method):
66 """Decorator implementation.
68 Args:
69 method (Callable): The method to copy the docstring to.
71 Returns:
72 Callable: the same method passed in with an updated docstring.
74 Raises:
75 google.auth.exceptions.InvalidOperation: if the method already has a docstring.
76 """
77 if method.__doc__:
78 raise exceptions.InvalidOperation("Method already has a docstring.")
80 source_method = getattr(source_class, method.__name__)
81 method.__doc__ = source_method.__doc__
83 return method
85 return decorator
88def parse_content_type(header_value):
89 """Parse a 'content-type' header value to get just the plain media-type (without parameters).
91 This is done using the class Message from email.message as suggested in PEP 594
92 (because the cgi is now deprecated and will be removed in python 3.13,
93 see https://peps.python.org/pep-0594/#cgi).
95 Args:
96 header_value (str): The value of a 'content-type' header as a string.
98 Returns:
99 str: A string with just the lowercase media-type from the parsed 'content-type' header.
100 If the provided content-type is not parsable, returns 'text/plain',
101 the default value for textual files.
102 """
103 m = Message()
104 m["content-type"] = header_value
105 return (
106 m.get_content_type()
107 ) # Despite the name, actually returns just the media-type
110def utcnow():
111 """Returns the current UTC datetime.
113 Returns:
114 datetime: The current time in UTC.
115 """
116 # We used datetime.utcnow() before, since it's deprecated from python 3.12,
117 # we are using datetime.now(timezone.utc) now. "utcnow()" is offset-native
118 # (no timezone info), but "now()" is offset-aware (with timezone info).
119 # This will cause datetime comparison problem. For backward compatibility,
120 # we need to remove the timezone info.
121 now = datetime.datetime.now(datetime.timezone.utc)
122 now = now.replace(tzinfo=None)
123 return now
126def utcfromtimestamp(timestamp):
127 """Returns the UTC datetime from a timestamp.
129 Args:
130 timestamp (float): The timestamp to convert.
132 Returns:
133 datetime: The time in UTC.
134 """
135 # We used datetime.utcfromtimestamp() before, since it's deprecated from
136 # python 3.12, we are using datetime.fromtimestamp(timestamp, timezone.utc)
137 # now. "utcfromtimestamp()" is offset-native (no timezone info), but
138 # "fromtimestamp(timestamp, timezone.utc)" is offset-aware (with timezone
139 # info). This will cause datetime comparison problem. For backward
140 # compatibility, we need to remove the timezone info.
141 dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
142 dt = dt.replace(tzinfo=None)
143 return dt
146def datetime_to_secs(value):
147 """Convert a datetime object to the number of seconds since the UNIX epoch.
149 Args:
150 value (datetime): The datetime to convert.
152 Returns:
153 int: The number of seconds since the UNIX epoch.
154 """
155 return calendar.timegm(value.utctimetuple())
158def to_bytes(value, encoding="utf-8"):
159 """Converts a string value to bytes, if necessary.
161 Args:
162 value (Union[str, bytes]): The value to be converted.
163 encoding (str): The encoding to use to convert unicode to bytes.
164 Defaults to "utf-8".
166 Returns:
167 bytes: The original value converted to bytes (if unicode) or as
168 passed in if it started out as bytes.
170 Raises:
171 google.auth.exceptions.InvalidValue: If the value could not be converted to bytes.
172 """
173 result = value.encode(encoding) if isinstance(value, str) else value
174 if isinstance(result, bytes):
175 return result
176 else:
177 raise exceptions.InvalidValue(
178 "{0!r} could not be converted to bytes".format(value)
179 )
182def from_bytes(value):
183 """Converts bytes to a string value, if necessary.
185 Args:
186 value (Union[str, bytes]): The value to be converted.
188 Returns:
189 str: The original value converted to unicode (if bytes) or as passed in
190 if it started out as unicode.
192 Raises:
193 google.auth.exceptions.InvalidValue: If the value could not be converted to unicode.
194 """
195 result = value.decode("utf-8") if isinstance(value, bytes) else value
196 if isinstance(result, str):
197 return result
198 else:
199 raise exceptions.InvalidValue(
200 "{0!r} could not be converted to unicode".format(value)
201 )
204def update_query(url, params, remove=None):
205 """Updates a URL's query parameters.
207 Replaces any current values if they are already present in the URL.
209 Args:
210 url (str): The URL to update.
211 params (Mapping[str, str]): A mapping of query parameter
212 keys to values.
213 remove (Sequence[str]): Parameters to remove from the query string.
215 Returns:
216 str: The URL with updated query parameters.
218 Examples:
220 >>> url = 'http://example.com?a=1'
221 >>> update_query(url, {'a': '2'})
222 http://example.com?a=2
223 >>> update_query(url, {'b': '3'})
224 http://example.com?a=1&b=3
225 >> update_query(url, {'b': '3'}, remove=['a'])
226 http://example.com?b=3
228 """
229 if remove is None:
230 remove = []
232 # Split the URL into parts.
233 parts = urllib.parse.urlparse(url)
234 # Parse the query string.
235 query_params = urllib.parse.parse_qs(parts.query)
236 # Update the query parameters with the new parameters.
237 query_params.update(params)
238 # Remove any values specified in remove.
239 query_params = {
240 key: value for key, value in query_params.items() if key not in remove
241 }
242 # Re-encoded the query string.
243 new_query = urllib.parse.urlencode(query_params, doseq=True)
244 # Unsplit the url.
245 new_parts = parts._replace(query=new_query)
246 return urllib.parse.urlunparse(new_parts)
249def scopes_to_string(scopes):
250 """Converts scope value to a string suitable for sending to OAuth 2.0
251 authorization servers.
253 Args:
254 scopes (Sequence[str]): The sequence of scopes to convert.
256 Returns:
257 str: The scopes formatted as a single string.
258 """
259 return " ".join(scopes)
262def string_to_scopes(scopes):
263 """Converts stringifed scopes value to a list.
265 Args:
266 scopes (Union[Sequence, str]): The string of space-separated scopes
267 to convert.
268 Returns:
269 Sequence(str): The separated scopes.
270 """
271 if not scopes:
272 return []
274 return scopes.split(" ")
277def padded_urlsafe_b64decode(value):
278 """Decodes base64 strings lacking padding characters.
280 Google infrastructure tends to omit the base64 padding characters.
282 Args:
283 value (Union[str, bytes]): The encoded value.
285 Returns:
286 bytes: The decoded value
287 """
288 b64string = to_bytes(value)
289 padded = b64string + b"=" * (-len(b64string) % 4)
290 return base64.urlsafe_b64decode(padded)
293def unpadded_urlsafe_b64encode(value):
294 """Encodes base64 strings removing any padding characters.
296 `rfc 7515`_ defines Base64url to NOT include any padding
297 characters, but the stdlib doesn't do that by default.
299 _rfc7515: https://tools.ietf.org/html/rfc7515#page-6
301 Args:
302 value (Union[str|bytes]): The bytes-like value to encode
304 Returns:
305 Union[str|bytes]: The encoded value
306 """
307 return base64.urlsafe_b64encode(value).rstrip(b"=")
310def is_python_3():
311 """Check if the Python interpreter is Python 2 or 3.
313 Returns:
314 bool: True if the Python interpreter is Python 3 and False otherwise.
315 """
317 return sys.version_info > (3, 0) # pragma: NO COVER
320def _hash_sensitive_info(data: Union[dict, list]) -> Union[dict, list, str]:
321 """
322 Hashes sensitive information within a dictionary.
324 Args:
325 data: The dictionary containing data to be processed.
327 Returns:
328 A new dictionary with sensitive values replaced by their SHA512 hashes.
329 If the input is a list, returns a list with each element recursively processed.
330 If the input is neither a dict nor a list, returns the type of the input as a string.
332 """
333 if isinstance(data, dict):
334 hashed_data: Dict[Any, Union[Optional[str], dict, list]] = {}
335 for key, value in data.items():
336 if key in _SENSITIVE_FIELDS and not isinstance(value, (dict, list)):
337 hashed_data[key] = _hash_value(value, key)
338 elif isinstance(value, (dict, list)):
339 hashed_data[key] = _hash_sensitive_info(value)
340 else:
341 hashed_data[key] = value
342 return hashed_data
343 elif isinstance(data, list):
344 hashed_list = []
345 for val in data:
346 hashed_list.append(_hash_sensitive_info(val))
347 return hashed_list
348 else:
349 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1701):
350 # Investigate and hash sensitive info before logging when the data type is
351 # not a dict or a list.
352 return str(type(data))
355def _hash_value(value, field_name: str) -> Optional[str]:
356 """Hashes a value and returns a formatted hash string."""
357 if value is None:
358 return None
359 encoded_value = str(value).encode("utf-8")
360 hash_object = hashlib.sha512()
361 hash_object.update(encoded_value)
362 hex_digest = hash_object.hexdigest()
363 return f"hashed_{field_name}-{hex_digest}"
366def _logger_configured(logger: logging.Logger) -> bool:
367 """Determines whether `logger` has non-default configuration
369 Args:
370 logger: The logger to check.
372 Returns:
373 bool: Whether the logger has any non-default configuration.
374 """
375 return (
376 logger.handlers != [] or logger.level != logging.NOTSET or not logger.propagate
377 )
380def is_logging_enabled(logger: logging.Logger) -> bool:
381 """
382 Checks if debug logging is enabled for the given logger.
384 Args:
385 logger: The logging.Logger instance to check.
387 Returns:
388 True if debug logging is enabled, False otherwise.
389 """
390 # NOTE: Log propagation to the root logger is disabled unless
391 # the base logger i.e. logging.getLogger("google") is
392 # explicitly configured by the end user. Ideally this
393 # needs to happen in the client layer (already does for GAPICs).
394 # However, this is implemented here to avoid logging
395 # (if a root logger is configured) when a version of google-auth
396 # which supports logging is used with:
397 # - an older version of a GAPIC which does not support logging.
398 # - Apiary client which does not support logging.
399 global _LOGGING_INITIALIZED
400 if not _LOGGING_INITIALIZED:
401 base_logger = logging.getLogger(_BASE_LOGGER_NAME)
402 if not _logger_configured(base_logger):
403 base_logger.propagate = False
404 _LOGGING_INITIALIZED = True
406 return logger.isEnabledFor(logging.DEBUG)
409def request_log(
410 logger: logging.Logger,
411 method: str,
412 url: str,
413 body: Optional[bytes],
414 headers: Optional[Mapping[str, str]],
415) -> None:
416 """
417 Logs an HTTP request at the DEBUG level if logging is enabled.
419 Args:
420 logger: The logging.Logger instance to use.
421 method: The HTTP method (e.g., "GET", "POST").
422 url: The URL of the request.
423 body: The request body (can be None).
424 headers: The request headers (can be None).
425 """
426 if is_logging_enabled(logger):
427 content_type = (
428 headers["Content-Type"] if headers and "Content-Type" in headers else ""
429 )
430 json_body = _parse_request_body(body, content_type=content_type)
431 logged_body = _hash_sensitive_info(json_body)
432 logger.debug(
433 "Making request...",
434 extra={
435 "httpRequest": {
436 "method": method,
437 "url": url,
438 "body": logged_body,
439 "headers": headers,
440 }
441 },
442 )
445def _parse_request_body(body: Optional[bytes], content_type: str = "") -> Any:
446 """
447 Parses a request body, handling bytes and string types, and different content types.
449 Args:
450 body (Optional[bytes]): The request body.
451 content_type (str): The content type of the request body, e.g., "application/json",
452 "application/x-www-form-urlencoded", or "text/plain". If empty, attempts
453 to parse as JSON.
455 Returns:
456 Parsed body (dict, str, or None).
457 - JSON: Decodes if content_type is "application/json" or None (fallback).
458 - URL-encoded: Parses if content_type is "application/x-www-form-urlencoded".
459 - Plain text: Returns string if content_type is "text/plain".
460 - None: Returns if body is None, UTF-8 decode fails, or content_type is unknown.
461 """
462 if body is None:
463 return None
464 try:
465 body_str = body.decode("utf-8")
466 except (UnicodeDecodeError, AttributeError):
467 return None
468 content_type = content_type.lower()
469 if not content_type or "application/json" in content_type:
470 try:
471 return json.loads(body_str)
472 except (TypeError, ValueError):
473 return body_str
474 if "application/x-www-form-urlencoded" in content_type:
475 parsed_query = urllib.parse.parse_qs(body_str)
476 result = {k: v[0] for k, v in parsed_query.items()}
477 return result
478 if "text/plain" in content_type:
479 return body_str
480 return None
483def _parse_response(response: Any) -> Any:
484 """
485 Parses a response, attempting to decode JSON.
487 Args:
488 response: The response object to parse. This can be any type, but
489 it is expected to have a `json()` method if it contains JSON.
491 Returns:
492 The parsed response. If the response contains valid JSON, the
493 decoded JSON object (e.g., a dictionary or list) is returned.
494 If the response does not have a `json()` method or if the JSON
495 decoding fails, None is returned.
496 """
497 try:
498 json_response = response.json()
499 return json_response
500 except Exception:
501 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1744):
502 # Parse and return response payload as json based on different content types.
503 return None
506def _response_log_base(logger: logging.Logger, parsed_response: Any) -> None:
507 """
508 Logs a parsed HTTP response at the DEBUG level.
510 This internal helper function takes a parsed response and logs it
511 using the provided logger. It also applies a hashing function to
512 potentially sensitive information before logging.
514 Args:
515 logger: The logging.Logger instance to use for logging.
516 parsed_response: The parsed HTTP response object (e.g., a dictionary,
517 list, or the original response if parsing failed).
518 """
520 logged_response = _hash_sensitive_info(parsed_response)
521 logger.debug("Response received...", extra={"httpResponse": logged_response})
524def response_log(logger: logging.Logger, response: Any) -> None:
525 """
526 Logs an HTTP response at the DEBUG level if logging is enabled.
528 Args:
529 logger: The logging.Logger instance to use.
530 response: The HTTP response object to log.
531 """
532 if is_logging_enabled(logger):
533 json_response = _parse_response(response)
534 _response_log_base(logger, json_response)