1# Copyright 2015 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helper functions for commonly used utilities."""
16
17import base64
18import calendar
19import datetime
20from email.message import Message
21import hashlib
22import json
23import logging
24import sys
25from typing import Any, Dict, Mapping, Optional, Union
26import urllib
27
28from google.auth import exceptions
29
30
31# _BASE_LOGGER_NAME is the base logger for all google-based loggers.
32_BASE_LOGGER_NAME = "google"
33
34# _LOGGING_INITIALIZED ensures that base logger is only configured once
35# (unless already configured by the end-user).
36_LOGGING_INITIALIZED = False
37
38
39# The smallest MDS cache used by this library stores tokens until 4 minutes from
40# expiry.
41REFRESH_THRESHOLD = datetime.timedelta(minutes=3, seconds=45)
42
43# TODO(https://github.com/googleapis/google-auth-library-python/issues/1684): Audit and update the list below.
44_SENSITIVE_FIELDS = {
45 "accessToken",
46 "access_token",
47 "id_token",
48 "client_id",
49 "refresh_token",
50 "client_secret",
51}
52
53
54def copy_docstring(source_class):
55 """Decorator that copies a method's docstring from another class.
56
57 Args:
58 source_class (type): The class that has the documented method.
59
60 Returns:
61 Callable: A decorator that will copy the docstring of the same
62 named method in the source class to the decorated method.
63 """
64
65 def decorator(method):
66 """Decorator implementation.
67
68 Args:
69 method (Callable): The method to copy the docstring to.
70
71 Returns:
72 Callable: the same method passed in with an updated docstring.
73
74 Raises:
75 google.auth.exceptions.InvalidOperation: if the method already has a docstring.
76 """
77 if method.__doc__:
78 raise exceptions.InvalidOperation("Method already has a docstring.")
79
80 source_method = getattr(source_class, method.__name__)
81 method.__doc__ = source_method.__doc__
82
83 return method
84
85 return decorator
86
87
88def parse_content_type(header_value):
89 """Parse a 'content-type' header value to get just the plain media-type (without parameters).
90
91 This is done using the class Message from email.message as suggested in PEP 594
92 (because the cgi is now deprecated and will be removed in python 3.13,
93 see https://peps.python.org/pep-0594/#cgi).
94
95 Args:
96 header_value (str): The value of a 'content-type' header as a string.
97
98 Returns:
99 str: A string with just the lowercase media-type from the parsed 'content-type' header.
100 If the provided content-type is not parsable, returns 'text/plain',
101 the default value for textual files.
102 """
103 m = Message()
104 m["content-type"] = header_value
105 return (
106 m.get_content_type()
107 ) # Despite the name, actually returns just the media-type
108
109
110def utcnow():
111 """Returns the current UTC datetime.
112
113 Returns:
114 datetime: The current time in UTC.
115 """
116 # We used datetime.utcnow() before, since it's deprecated from python 3.12,
117 # we are using datetime.now(timezone.utc) now. "utcnow()" is offset-native
118 # (no timezone info), but "now()" is offset-aware (with timezone info).
119 # This will cause datetime comparison problem. For backward compatibility,
120 # we need to remove the timezone info.
121 now = datetime.datetime.now(datetime.timezone.utc)
122 now = now.replace(tzinfo=None)
123 return now
124
125
126def datetime_to_secs(value):
127 """Convert a datetime object to the number of seconds since the UNIX epoch.
128
129 Args:
130 value (datetime): The datetime to convert.
131
132 Returns:
133 int: The number of seconds since the UNIX epoch.
134 """
135 return calendar.timegm(value.utctimetuple())
136
137
138def to_bytes(value, encoding="utf-8"):
139 """Converts a string value to bytes, if necessary.
140
141 Args:
142 value (Union[str, bytes]): The value to be converted.
143 encoding (str): The encoding to use to convert unicode to bytes.
144 Defaults to "utf-8".
145
146 Returns:
147 bytes: The original value converted to bytes (if unicode) or as
148 passed in if it started out as bytes.
149
150 Raises:
151 google.auth.exceptions.InvalidValue: If the value could not be converted to bytes.
152 """
153 result = value.encode(encoding) if isinstance(value, str) else value
154 if isinstance(result, bytes):
155 return result
156 else:
157 raise exceptions.InvalidValue(
158 "{0!r} could not be converted to bytes".format(value)
159 )
160
161
162def from_bytes(value):
163 """Converts bytes to a string value, if necessary.
164
165 Args:
166 value (Union[str, bytes]): The value to be converted.
167
168 Returns:
169 str: The original value converted to unicode (if bytes) or as passed in
170 if it started out as unicode.
171
172 Raises:
173 google.auth.exceptions.InvalidValue: If the value could not be converted to unicode.
174 """
175 result = value.decode("utf-8") if isinstance(value, bytes) else value
176 if isinstance(result, str):
177 return result
178 else:
179 raise exceptions.InvalidValue(
180 "{0!r} could not be converted to unicode".format(value)
181 )
182
183
184def update_query(url, params, remove=None):
185 """Updates a URL's query parameters.
186
187 Replaces any current values if they are already present in the URL.
188
189 Args:
190 url (str): The URL to update.
191 params (Mapping[str, str]): A mapping of query parameter
192 keys to values.
193 remove (Sequence[str]): Parameters to remove from the query string.
194
195 Returns:
196 str: The URL with updated query parameters.
197
198 Examples:
199
200 >>> url = 'http://example.com?a=1'
201 >>> update_query(url, {'a': '2'})
202 http://example.com?a=2
203 >>> update_query(url, {'b': '3'})
204 http://example.com?a=1&b=3
205 >> update_query(url, {'b': '3'}, remove=['a'])
206 http://example.com?b=3
207
208 """
209 if remove is None:
210 remove = []
211
212 # Split the URL into parts.
213 parts = urllib.parse.urlparse(url)
214 # Parse the query string.
215 query_params = urllib.parse.parse_qs(parts.query)
216 # Update the query parameters with the new parameters.
217 query_params.update(params)
218 # Remove any values specified in remove.
219 query_params = {
220 key: value for key, value in query_params.items() if key not in remove
221 }
222 # Re-encoded the query string.
223 new_query = urllib.parse.urlencode(query_params, doseq=True)
224 # Unsplit the url.
225 new_parts = parts._replace(query=new_query)
226 return urllib.parse.urlunparse(new_parts)
227
228
229def scopes_to_string(scopes):
230 """Converts scope value to a string suitable for sending to OAuth 2.0
231 authorization servers.
232
233 Args:
234 scopes (Sequence[str]): The sequence of scopes to convert.
235
236 Returns:
237 str: The scopes formatted as a single string.
238 """
239 return " ".join(scopes)
240
241
242def string_to_scopes(scopes):
243 """Converts stringifed scopes value to a list.
244
245 Args:
246 scopes (Union[Sequence, str]): The string of space-separated scopes
247 to convert.
248 Returns:
249 Sequence(str): The separated scopes.
250 """
251 if not scopes:
252 return []
253
254 return scopes.split(" ")
255
256
257def padded_urlsafe_b64decode(value):
258 """Decodes base64 strings lacking padding characters.
259
260 Google infrastructure tends to omit the base64 padding characters.
261
262 Args:
263 value (Union[str, bytes]): The encoded value.
264
265 Returns:
266 bytes: The decoded value
267 """
268 b64string = to_bytes(value)
269 padded = b64string + b"=" * (-len(b64string) % 4)
270 return base64.urlsafe_b64decode(padded)
271
272
273def unpadded_urlsafe_b64encode(value):
274 """Encodes base64 strings removing any padding characters.
275
276 `rfc 7515`_ defines Base64url to NOT include any padding
277 characters, but the stdlib doesn't do that by default.
278
279 _rfc7515: https://tools.ietf.org/html/rfc7515#page-6
280
281 Args:
282 value (Union[str|bytes]): The bytes-like value to encode
283
284 Returns:
285 Union[str|bytes]: The encoded value
286 """
287 return base64.urlsafe_b64encode(value).rstrip(b"=")
288
289
290def is_python_3():
291 """Check if the Python interpreter is Python 2 or 3.
292
293 Returns:
294 bool: True if the Python interpreter is Python 3 and False otherwise.
295 """
296 return sys.version_info > (3, 0)
297
298
299def _hash_sensitive_info(data: Union[dict, list]) -> Union[dict, list, str]:
300 """
301 Hashes sensitive information within a dictionary.
302
303 Args:
304 data: The dictionary containing data to be processed.
305
306 Returns:
307 A new dictionary with sensitive values replaced by their SHA512 hashes.
308 If the input is a list, returns a list with each element recursively processed.
309 If the input is neither a dict nor a list, returns the type of the input as a string.
310
311 """
312 if isinstance(data, dict):
313 hashed_data: Dict[Any, Union[Optional[str], dict, list]] = {}
314 for key, value in data.items():
315 if key in _SENSITIVE_FIELDS and not isinstance(value, (dict, list)):
316 hashed_data[key] = _hash_value(value, key)
317 elif isinstance(value, (dict, list)):
318 hashed_data[key] = _hash_sensitive_info(value)
319 else:
320 hashed_data[key] = value
321 return hashed_data
322 elif isinstance(data, list):
323 hashed_list = []
324 for val in data:
325 hashed_list.append(_hash_sensitive_info(val))
326 return hashed_list
327 else:
328 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1701):
329 # Investigate and hash sensitive info before logging when the data type is
330 # not a dict or a list.
331 return str(type(data))
332
333
334def _hash_value(value, field_name: str) -> Optional[str]:
335 """Hashes a value and returns a formatted hash string."""
336 if value is None:
337 return None
338 encoded_value = str(value).encode("utf-8")
339 hash_object = hashlib.sha512()
340 hash_object.update(encoded_value)
341 hex_digest = hash_object.hexdigest()
342 return f"hashed_{field_name}-{hex_digest}"
343
344
345def _logger_configured(logger: logging.Logger) -> bool:
346 """Determines whether `logger` has non-default configuration
347
348 Args:
349 logger: The logger to check.
350
351 Returns:
352 bool: Whether the logger has any non-default configuration.
353 """
354 return (
355 logger.handlers != [] or logger.level != logging.NOTSET or not logger.propagate
356 )
357
358
359def is_logging_enabled(logger: logging.Logger) -> bool:
360 """
361 Checks if debug logging is enabled for the given logger.
362
363 Args:
364 logger: The logging.Logger instance to check.
365
366 Returns:
367 True if debug logging is enabled, False otherwise.
368 """
369 # NOTE: Log propagation to the root logger is disabled unless
370 # the base logger i.e. logging.getLogger("google") is
371 # explicitly configured by the end user. Ideally this
372 # needs to happen in the client layer (already does for GAPICs).
373 # However, this is implemented here to avoid logging
374 # (if a root logger is configured) when a version of google-auth
375 # which supports logging is used with:
376 # - an older version of a GAPIC which does not support logging.
377 # - Apiary client which does not support logging.
378 global _LOGGING_INITIALIZED
379 if not _LOGGING_INITIALIZED:
380 base_logger = logging.getLogger(_BASE_LOGGER_NAME)
381 if not _logger_configured(base_logger):
382 base_logger.propagate = False
383 _LOGGING_INITIALIZED = True
384
385 return logger.isEnabledFor(logging.DEBUG)
386
387
388def request_log(
389 logger: logging.Logger,
390 method: str,
391 url: str,
392 body: Optional[bytes],
393 headers: Optional[Mapping[str, str]],
394) -> None:
395 """
396 Logs an HTTP request at the DEBUG level if logging is enabled.
397
398 Args:
399 logger: The logging.Logger instance to use.
400 method: The HTTP method (e.g., "GET", "POST").
401 url: The URL of the request.
402 body: The request body (can be None).
403 headers: The request headers (can be None).
404 """
405 if is_logging_enabled(logger):
406 content_type = (
407 headers["Content-Type"] if headers and "Content-Type" in headers else ""
408 )
409 json_body = _parse_request_body(body, content_type=content_type)
410 logged_body = _hash_sensitive_info(json_body)
411 logger.debug(
412 "Making request...",
413 extra={
414 "httpRequest": {
415 "method": method,
416 "url": url,
417 "body": logged_body,
418 "headers": headers,
419 }
420 },
421 )
422
423
424def _parse_request_body(body: Optional[bytes], content_type: str = "") -> Any:
425 """
426 Parses a request body, handling bytes and string types, and different content types.
427
428 Args:
429 body (Optional[bytes]): The request body.
430 content_type (str): The content type of the request body, e.g., "application/json",
431 "application/x-www-form-urlencoded", or "text/plain". If empty, attempts
432 to parse as JSON.
433
434 Returns:
435 Parsed body (dict, str, or None).
436 - JSON: Decodes if content_type is "application/json" or None (fallback).
437 - URL-encoded: Parses if content_type is "application/x-www-form-urlencoded".
438 - Plain text: Returns string if content_type is "text/plain".
439 - None: Returns if body is None, UTF-8 decode fails, or content_type is unknown.
440 """
441 if body is None:
442 return None
443 try:
444 body_str = body.decode("utf-8")
445 except (UnicodeDecodeError, AttributeError):
446 return None
447 content_type = content_type.lower()
448 if not content_type or "application/json" in content_type:
449 try:
450 return json.loads(body_str)
451 except (json.JSONDecodeError, TypeError):
452 return body_str
453 if "application/x-www-form-urlencoded" in content_type:
454 parsed_query = urllib.parse.parse_qs(body_str)
455 result = {k: v[0] for k, v in parsed_query.items()}
456 return result
457 if "text/plain" in content_type:
458 return body_str
459 return None
460
461
462def _parse_response(response: Any) -> Any:
463 """
464 Parses a response, attempting to decode JSON.
465
466 Args:
467 response: The response object to parse. This can be any type, but
468 it is expected to have a `json()` method if it contains JSON.
469
470 Returns:
471 The parsed response. If the response contains valid JSON, the
472 decoded JSON object (e.g., a dictionary or list) is returned.
473 If the response does not have a `json()` method or if the JSON
474 decoding fails, None is returned.
475 """
476 try:
477 json_response = response.json()
478 return json_response
479 except Exception:
480 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1744):
481 # Parse and return response payload as json based on different content types.
482 return None
483
484
485def _response_log_base(logger: logging.Logger, parsed_response: Any) -> None:
486 """
487 Logs a parsed HTTP response at the DEBUG level.
488
489 This internal helper function takes a parsed response and logs it
490 using the provided logger. It also applies a hashing function to
491 potentially sensitive information before logging.
492
493 Args:
494 logger: The logging.Logger instance to use for logging.
495 parsed_response: The parsed HTTP response object (e.g., a dictionary,
496 list, or the original response if parsing failed).
497 """
498
499 logged_response = _hash_sensitive_info(parsed_response)
500 logger.debug("Response received...", extra={"httpResponse": logged_response})
501
502
503def response_log(logger: logging.Logger, response: Any) -> None:
504 """
505 Logs an HTTP response at the DEBUG level if logging is enabled.
506
507 Args:
508 logger: The logging.Logger instance to use.
509 response: The HTTP response object to log.
510 """
511 if is_logging_enabled(logger):
512 json_response = _parse_response(response)
513 _response_log_base(logger, json_response)