Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_helpers.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2015 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helper functions for commonly used utilities."""
17import base64
18import calendar
19import datetime
20from email.message import Message
21import hashlib
22import json
23import logging
24import sys
25from typing import Any, Dict, Mapping, Optional, Union
26import urllib
28from google.auth import exceptions
31# _BASE_LOGGER_NAME is the base logger for all google-based loggers.
32_BASE_LOGGER_NAME = "google"
34# _LOGGING_INITIALIZED ensures that base logger is only configured once
35# (unless already configured by the end-user).
36_LOGGING_INITIALIZED = False
39# The smallest MDS cache used by this library stores tokens until 4 minutes from
40# expiry.
41REFRESH_THRESHOLD = datetime.timedelta(minutes=3, seconds=45)
43# TODO(https://github.com/googleapis/google-auth-library-python/issues/1684): Audit and update the list below.
44_SENSITIVE_FIELDS = {
45 "accessToken",
46 "access_token",
47 "id_token",
48 "client_id",
49 "refresh_token",
50 "client_secret",
51}
54def copy_docstring(source_class):
55 """Decorator that copies a method's docstring from another class.
57 Args:
58 source_class (type): The class that has the documented method.
60 Returns:
61 Callable: A decorator that will copy the docstring of the same
62 named method in the source class to the decorated method.
63 """
65 def decorator(method):
66 """Decorator implementation.
68 Args:
69 method (Callable): The method to copy the docstring to.
71 Returns:
72 Callable: the same method passed in with an updated docstring.
74 Raises:
75 google.auth.exceptions.InvalidOperation: if the method already has a docstring.
76 """
77 if method.__doc__:
78 raise exceptions.InvalidOperation("Method already has a docstring.")
80 source_method = getattr(source_class, method.__name__)
81 method.__doc__ = source_method.__doc__
83 return method
85 return decorator
88def parse_content_type(header_value):
89 """Parse a 'content-type' header value to get just the plain media-type (without parameters).
91 This is done using the class Message from email.message as suggested in PEP 594
92 (because the cgi is now deprecated and will be removed in python 3.13,
93 see https://peps.python.org/pep-0594/#cgi).
95 Args:
96 header_value (str): The value of a 'content-type' header as a string.
98 Returns:
99 str: A string with just the lowercase media-type from the parsed 'content-type' header.
100 If the provided content-type is not parsable, returns 'text/plain',
101 the default value for textual files.
102 """
103 m = Message()
104 m["content-type"] = header_value
105 return (
106 m.get_content_type()
107 ) # Despite the name, actually returns just the media-type
110def utcnow():
111 """Returns the current UTC datetime.
113 Returns:
114 datetime: The current time in UTC.
115 """
116 # We used datetime.utcnow() before, since it's deprecated from python 3.12,
117 # we are using datetime.now(timezone.utc) now. "utcnow()" is offset-native
118 # (no timezone info), but "now()" is offset-aware (with timezone info).
119 # This will cause datetime comparison problem. For backward compatibility,
120 # we need to remove the timezone info.
121 now = datetime.datetime.now(datetime.timezone.utc)
122 now = now.replace(tzinfo=None)
123 return now
126def datetime_to_secs(value):
127 """Convert a datetime object to the number of seconds since the UNIX epoch.
129 Args:
130 value (datetime): The datetime to convert.
132 Returns:
133 int: The number of seconds since the UNIX epoch.
134 """
135 return calendar.timegm(value.utctimetuple())
138def to_bytes(value, encoding="utf-8"):
139 """Converts a string value to bytes, if necessary.
141 Args:
142 value (Union[str, bytes]): The value to be converted.
143 encoding (str): The encoding to use to convert unicode to bytes.
144 Defaults to "utf-8".
146 Returns:
147 bytes: The original value converted to bytes (if unicode) or as
148 passed in if it started out as bytes.
150 Raises:
151 google.auth.exceptions.InvalidValue: If the value could not be converted to bytes.
152 """
153 result = value.encode(encoding) if isinstance(value, str) else value
154 if isinstance(result, bytes):
155 return result
156 else:
157 raise exceptions.InvalidValue(
158 "{0!r} could not be converted to bytes".format(value)
159 )
162def from_bytes(value):
163 """Converts bytes to a string value, if necessary.
165 Args:
166 value (Union[str, bytes]): The value to be converted.
168 Returns:
169 str: The original value converted to unicode (if bytes) or as passed in
170 if it started out as unicode.
172 Raises:
173 google.auth.exceptions.InvalidValue: If the value could not be converted to unicode.
174 """
175 result = value.decode("utf-8") if isinstance(value, bytes) else value
176 if isinstance(result, str):
177 return result
178 else:
179 raise exceptions.InvalidValue(
180 "{0!r} could not be converted to unicode".format(value)
181 )
184def update_query(url, params, remove=None):
185 """Updates a URL's query parameters.
187 Replaces any current values if they are already present in the URL.
189 Args:
190 url (str): The URL to update.
191 params (Mapping[str, str]): A mapping of query parameter
192 keys to values.
193 remove (Sequence[str]): Parameters to remove from the query string.
195 Returns:
196 str: The URL with updated query parameters.
198 Examples:
200 >>> url = 'http://example.com?a=1'
201 >>> update_query(url, {'a': '2'})
202 http://example.com?a=2
203 >>> update_query(url, {'b': '3'})
204 http://example.com?a=1&b=3
205 >> update_query(url, {'b': '3'}, remove=['a'])
206 http://example.com?b=3
208 """
209 if remove is None:
210 remove = []
212 # Split the URL into parts.
213 parts = urllib.parse.urlparse(url)
214 # Parse the query string.
215 query_params = urllib.parse.parse_qs(parts.query)
216 # Update the query parameters with the new parameters.
217 query_params.update(params)
218 # Remove any values specified in remove.
219 query_params = {
220 key: value for key, value in query_params.items() if key not in remove
221 }
222 # Re-encoded the query string.
223 new_query = urllib.parse.urlencode(query_params, doseq=True)
224 # Unsplit the url.
225 new_parts = parts._replace(query=new_query)
226 return urllib.parse.urlunparse(new_parts)
229def scopes_to_string(scopes):
230 """Converts scope value to a string suitable for sending to OAuth 2.0
231 authorization servers.
233 Args:
234 scopes (Sequence[str]): The sequence of scopes to convert.
236 Returns:
237 str: The scopes formatted as a single string.
238 """
239 return " ".join(scopes)
242def string_to_scopes(scopes):
243 """Converts stringifed scopes value to a list.
245 Args:
246 scopes (Union[Sequence, str]): The string of space-separated scopes
247 to convert.
248 Returns:
249 Sequence(str): The separated scopes.
250 """
251 if not scopes:
252 return []
254 return scopes.split(" ")
257def padded_urlsafe_b64decode(value):
258 """Decodes base64 strings lacking padding characters.
260 Google infrastructure tends to omit the base64 padding characters.
262 Args:
263 value (Union[str, bytes]): The encoded value.
265 Returns:
266 bytes: The decoded value
267 """
268 b64string = to_bytes(value)
269 padded = b64string + b"=" * (-len(b64string) % 4)
270 return base64.urlsafe_b64decode(padded)
273def unpadded_urlsafe_b64encode(value):
274 """Encodes base64 strings removing any padding characters.
276 `rfc 7515`_ defines Base64url to NOT include any padding
277 characters, but the stdlib doesn't do that by default.
279 _rfc7515: https://tools.ietf.org/html/rfc7515#page-6
281 Args:
282 value (Union[str|bytes]): The bytes-like value to encode
284 Returns:
285 Union[str|bytes]: The encoded value
286 """
287 return base64.urlsafe_b64encode(value).rstrip(b"=")
290def is_python_3():
291 """Check if the Python interpreter is Python 2 or 3.
293 Returns:
294 bool: True if the Python interpreter is Python 3 and False otherwise.
295 """
296 return sys.version_info > (3, 0)
299def _hash_sensitive_info(data: Union[dict, list]) -> Union[dict, list, str]:
300 """
301 Hashes sensitive information within a dictionary.
303 Args:
304 data: The dictionary containing data to be processed.
306 Returns:
307 A new dictionary with sensitive values replaced by their SHA512 hashes.
308 If the input is a list, returns a list with each element recursively processed.
309 If the input is neither a dict nor a list, returns the type of the input as a string.
311 """
312 if isinstance(data, dict):
313 hashed_data: Dict[Any, Union[Optional[str], dict, list]] = {}
314 for key, value in data.items():
315 if key in _SENSITIVE_FIELDS and not isinstance(value, (dict, list)):
316 hashed_data[key] = _hash_value(value, key)
317 elif isinstance(value, (dict, list)):
318 hashed_data[key] = _hash_sensitive_info(value)
319 else:
320 hashed_data[key] = value
321 return hashed_data
322 elif isinstance(data, list):
323 hashed_list = []
324 for val in data:
325 hashed_list.append(_hash_sensitive_info(val))
326 return hashed_list
327 else:
328 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1701):
329 # Investigate and hash sensitive info before logging when the data type is
330 # not a dict or a list.
331 return str(type(data))
334def _hash_value(value, field_name: str) -> Optional[str]:
335 """Hashes a value and returns a formatted hash string."""
336 if value is None:
337 return None
338 encoded_value = str(value).encode("utf-8")
339 hash_object = hashlib.sha512()
340 hash_object.update(encoded_value)
341 hex_digest = hash_object.hexdigest()
342 return f"hashed_{field_name}-{hex_digest}"
345def _logger_configured(logger: logging.Logger) -> bool:
346 """Determines whether `logger` has non-default configuration
348 Args:
349 logger: The logger to check.
351 Returns:
352 bool: Whether the logger has any non-default configuration.
353 """
354 return (
355 logger.handlers != [] or logger.level != logging.NOTSET or not logger.propagate
356 )
359def is_logging_enabled(logger: logging.Logger) -> bool:
360 """
361 Checks if debug logging is enabled for the given logger.
363 Args:
364 logger: The logging.Logger instance to check.
366 Returns:
367 True if debug logging is enabled, False otherwise.
368 """
369 # NOTE: Log propagation to the root logger is disabled unless
370 # the base logger i.e. logging.getLogger("google") is
371 # explicitly configured by the end user. Ideally this
372 # needs to happen in the client layer (already does for GAPICs).
373 # However, this is implemented here to avoid logging
374 # (if a root logger is configured) when a version of google-auth
375 # which supports logging is used with:
376 # - an older version of a GAPIC which does not support logging.
377 # - Apiary client which does not support logging.
378 global _LOGGING_INITIALIZED
379 if not _LOGGING_INITIALIZED:
380 base_logger = logging.getLogger(_BASE_LOGGER_NAME)
381 if not _logger_configured(base_logger):
382 base_logger.propagate = False
383 _LOGGING_INITIALIZED = True
385 return logger.isEnabledFor(logging.DEBUG)
388def request_log(
389 logger: logging.Logger,
390 method: str,
391 url: str,
392 body: Optional[bytes],
393 headers: Optional[Mapping[str, str]],
394) -> None:
395 """
396 Logs an HTTP request at the DEBUG level if logging is enabled.
398 Args:
399 logger: The logging.Logger instance to use.
400 method: The HTTP method (e.g., "GET", "POST").
401 url: The URL of the request.
402 body: The request body (can be None).
403 headers: The request headers (can be None).
404 """
405 if is_logging_enabled(logger):
406 content_type = (
407 headers["Content-Type"] if headers and "Content-Type" in headers else ""
408 )
409 json_body = _parse_request_body(body, content_type=content_type)
410 logged_body = _hash_sensitive_info(json_body)
411 logger.debug(
412 "Making request...",
413 extra={
414 "httpRequest": {
415 "method": method,
416 "url": url,
417 "body": logged_body,
418 "headers": headers,
419 }
420 },
421 )
424def _parse_request_body(body: Optional[bytes], content_type: str = "") -> Any:
425 """
426 Parses a request body, handling bytes and string types, and different content types.
428 Args:
429 body (Optional[bytes]): The request body.
430 content_type (str): The content type of the request body, e.g., "application/json",
431 "application/x-www-form-urlencoded", or "text/plain". If empty, attempts
432 to parse as JSON.
434 Returns:
435 Parsed body (dict, str, or None).
436 - JSON: Decodes if content_type is "application/json" or None (fallback).
437 - URL-encoded: Parses if content_type is "application/x-www-form-urlencoded".
438 - Plain text: Returns string if content_type is "text/plain".
439 - None: Returns if body is None, UTF-8 decode fails, or content_type is unknown.
440 """
441 if body is None:
442 return None
443 try:
444 body_str = body.decode("utf-8")
445 except (UnicodeDecodeError, AttributeError):
446 return None
447 content_type = content_type.lower()
448 if not content_type or "application/json" in content_type:
449 try:
450 return json.loads(body_str)
451 except (json.JSONDecodeError, TypeError):
452 return body_str
453 if "application/x-www-form-urlencoded" in content_type:
454 parsed_query = urllib.parse.parse_qs(body_str)
455 result = {k: v[0] for k, v in parsed_query.items()}
456 return result
457 if "text/plain" in content_type:
458 return body_str
459 return None
462def _parse_response(response: Any) -> Any:
463 """
464 Parses a response, attempting to decode JSON.
466 Args:
467 response: The response object to parse. This can be any type, but
468 it is expected to have a `json()` method if it contains JSON.
470 Returns:
471 The parsed response. If the response contains valid JSON, the
472 decoded JSON object (e.g., a dictionary or list) is returned.
473 If the response does not have a `json()` method or if the JSON
474 decoding fails, None is returned.
475 """
476 try:
477 json_response = response.json()
478 return json_response
479 except Exception:
480 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1744):
481 # Parse and return response payload as json based on different content types.
482 return None
485def _response_log_base(logger: logging.Logger, parsed_response: Any) -> None:
486 """
487 Logs a parsed HTTP response at the DEBUG level.
489 This internal helper function takes a parsed response and logs it
490 using the provided logger. It also applies a hashing function to
491 potentially sensitive information before logging.
493 Args:
494 logger: The logging.Logger instance to use for logging.
495 parsed_response: The parsed HTTP response object (e.g., a dictionary,
496 list, or the original response if parsing failed).
497 """
499 logged_response = _hash_sensitive_info(parsed_response)
500 logger.debug("Response received...", extra={"httpResponse": logged_response})
503def response_log(logger: logging.Logger, response: Any) -> None:
504 """
505 Logs an HTTP response at the DEBUG level if logging is enabled.
507 Args:
508 logger: The logging.Logger instance to use.
509 response: The HTTP response object to log.
510 """
511 if is_logging_enabled(logger):
512 json_response = _parse_response(response)
513 _response_log_base(logger, json_response)