Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_helpers.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

155 statements  

1# Copyright 2015 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helper functions for commonly used utilities.""" 

16 

17import base64 

18import calendar 

19import datetime 

20from email.message import Message 

21import hashlib 

22import json 

23import logging 

24import os 

25import sys 

26from typing import Any, Dict, Mapping, Optional, Union 

27import urllib 

28 

29from google.auth import exceptions 

30 

31 

32# _BASE_LOGGER_NAME is the base logger for all google-based loggers. 

33_BASE_LOGGER_NAME = "google" 

34 

35# _LOGGING_INITIALIZED ensures that base logger is only configured once 

36# (unless already configured by the end-user). 

37_LOGGING_INITIALIZED = False 

38 

39 

40# The smallest MDS cache used by this library stores tokens until 4 minutes from 

41# expiry. 

42REFRESH_THRESHOLD = datetime.timedelta(minutes=3, seconds=45) 

43 

44# TODO(https://github.com/googleapis/google-auth-library-python/issues/1684): Audit and update the list below. 

45_SENSITIVE_FIELDS = { 

46 "accessToken", 

47 "access_token", 

48 "id_token", 

49 "client_id", 

50 "refresh_token", 

51 "client_secret", 

52} 

53 

54 

55def copy_docstring(source_class): 

56 """Decorator that copies a method's docstring from another class. 

57 

58 Args: 

59 source_class (type): The class that has the documented method. 

60 

61 Returns: 

62 Callable: A decorator that will copy the docstring of the same 

63 named method in the source class to the decorated method. 

64 """ 

65 

66 def decorator(method): 

67 """Decorator implementation. 

68 

69 Args: 

70 method (Callable): The method to copy the docstring to. 

71 

72 Returns: 

73 Callable: the same method passed in with an updated docstring. 

74 

75 Raises: 

76 google.auth.exceptions.InvalidOperation: if the method already has a docstring. 

77 """ 

78 if method.__doc__: 

79 raise exceptions.InvalidOperation("Method already has a docstring.") 

80 

81 source_method = getattr(source_class, method.__name__) 

82 method.__doc__ = source_method.__doc__ 

83 

84 return method 

85 

86 return decorator 

87 

88 

89def parse_content_type(header_value): 

90 """Parse a 'content-type' header value to get just the plain media-type (without parameters). 

91 

92 This is done using the class Message from email.message as suggested in PEP 594 

93 (because the cgi is now deprecated and will be removed in python 3.13, 

94 see https://peps.python.org/pep-0594/#cgi). 

95 

96 Args: 

97 header_value (str): The value of a 'content-type' header as a string. 

98 

99 Returns: 

100 str: A string with just the lowercase media-type from the parsed 'content-type' header. 

101 If the provided content-type is not parsable, returns 'text/plain', 

102 the default value for textual files. 

103 """ 

104 m = Message() 

105 m["content-type"] = header_value 

106 return ( 

107 m.get_content_type() 

108 ) # Despite the name, actually returns just the media-type 

109 

110 

111def utcnow(): 

112 """Returns the current UTC datetime. 

113 

114 Returns: 

115 datetime: The current time in UTC. 

116 """ 

117 # We used datetime.utcnow() before, since it's deprecated from python 3.12, 

118 # we are using datetime.now(timezone.utc) now. "utcnow()" is offset-native 

119 # (no timezone info), but "now()" is offset-aware (with timezone info). 

120 # This will cause datetime comparison problem. For backward compatibility, 

121 # we need to remove the timezone info. 

122 now = datetime.datetime.now(datetime.timezone.utc) 

123 now = now.replace(tzinfo=None) 

124 return now 

125 

126 

127def utcfromtimestamp(timestamp): 

128 """Returns the UTC datetime from a timestamp. 

129 

130 Args: 

131 timestamp (float): The timestamp to convert. 

132 

133 Returns: 

134 datetime: The time in UTC. 

135 """ 

136 # We used datetime.utcfromtimestamp() before, since it's deprecated from 

137 # python 3.12, we are using datetime.fromtimestamp(timestamp, timezone.utc) 

138 # now. "utcfromtimestamp()" is offset-native (no timezone info), but 

139 # "fromtimestamp(timestamp, timezone.utc)" is offset-aware (with timezone 

140 # info). This will cause datetime comparison problem. For backward 

141 # compatibility, we need to remove the timezone info. 

142 dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) 

143 dt = dt.replace(tzinfo=None) 

144 return dt 

145 

146 

147def datetime_to_secs(value): 

148 """Convert a datetime object to the number of seconds since the UNIX epoch. 

149 

150 Args: 

151 value (datetime): The datetime to convert. 

152 

153 Returns: 

154 int: The number of seconds since the UNIX epoch. 

155 """ 

156 return calendar.timegm(value.utctimetuple()) 

157 

158 

159def to_bytes(value, encoding="utf-8"): 

160 """Converts a string value to bytes, if necessary. 

161 

162 Args: 

163 value (Union[str, bytes]): The value to be converted. 

164 encoding (str): The encoding to use to convert unicode to bytes. 

165 Defaults to "utf-8". 

166 

167 Returns: 

168 bytes: The original value converted to bytes (if unicode) or as 

169 passed in if it started out as bytes. 

170 

171 Raises: 

172 google.auth.exceptions.InvalidValue: If the value could not be converted to bytes. 

173 """ 

174 result = value.encode(encoding) if isinstance(value, str) else value 

175 if isinstance(result, bytes): 

176 return result 

177 else: 

178 raise exceptions.InvalidValue( 

179 "{0!r} could not be converted to bytes".format(value) 

180 ) 

181 

182 

183def from_bytes(value): 

184 """Converts bytes to a string value, if necessary. 

185 

186 Args: 

187 value (Union[str, bytes]): The value to be converted. 

188 

189 Returns: 

190 str: The original value converted to unicode (if bytes) or as passed in 

191 if it started out as unicode. 

192 

193 Raises: 

194 google.auth.exceptions.InvalidValue: If the value could not be converted to unicode. 

195 """ 

196 result = value.decode("utf-8") if isinstance(value, bytes) else value 

197 if isinstance(result, str): 

198 return result 

199 else: 

200 raise exceptions.InvalidValue( 

201 "{0!r} could not be converted to unicode".format(value) 

202 ) 

203 

204 

205def update_query(url, params, remove=None): 

206 """Updates a URL's query parameters. 

207 

208 Replaces any current values if they are already present in the URL. 

209 

210 Args: 

211 url (str): The URL to update. 

212 params (Mapping[str, str]): A mapping of query parameter 

213 keys to values. 

214 remove (Sequence[str]): Parameters to remove from the query string. 

215 

216 Returns: 

217 str: The URL with updated query parameters. 

218 

219 Examples: 

220 

221 >>> url = 'http://example.com?a=1' 

222 >>> update_query(url, {'a': '2'}) 

223 http://example.com?a=2 

224 >>> update_query(url, {'b': '3'}) 

225 http://example.com?a=1&b=3 

226 >> update_query(url, {'b': '3'}, remove=['a']) 

227 http://example.com?b=3 

228 

229 """ 

230 if remove is None: 

231 remove = [] 

232 

233 # Split the URL into parts. 

234 parts = urllib.parse.urlparse(url) 

235 # Parse the query string. 

236 query_params = urllib.parse.parse_qs(parts.query) 

237 # Update the query parameters with the new parameters. 

238 query_params.update(params) 

239 # Remove any values specified in remove. 

240 query_params = { 

241 key: value for key, value in query_params.items() if key not in remove 

242 } 

243 # Re-encoded the query string. 

244 new_query = urllib.parse.urlencode(query_params, doseq=True) 

245 # Unsplit the url. 

246 new_parts = parts._replace(query=new_query) 

247 return urllib.parse.urlunparse(new_parts) 

248 

249 

250def scopes_to_string(scopes): 

251 """Converts scope value to a string suitable for sending to OAuth 2.0 

252 authorization servers. 

253 

254 Args: 

255 scopes (Sequence[str]): The sequence of scopes to convert. 

256 

257 Returns: 

258 str: The scopes formatted as a single string. 

259 """ 

260 return " ".join(scopes) 

261 

262 

263def string_to_scopes(scopes): 

264 """Converts stringifed scopes value to a list. 

265 

266 Args: 

267 scopes (Union[Sequence, str]): The string of space-separated scopes 

268 to convert. 

269 Returns: 

270 Sequence(str): The separated scopes. 

271 """ 

272 if not scopes: 

273 return [] 

274 

275 return scopes.split(" ") 

276 

277 

278def padded_urlsafe_b64decode(value): 

279 """Decodes base64 strings lacking padding characters. 

280 

281 Google infrastructure tends to omit the base64 padding characters. 

282 

283 Args: 

284 value (Union[str, bytes]): The encoded value. 

285 

286 Returns: 

287 bytes: The decoded value 

288 """ 

289 b64string = to_bytes(value) 

290 padded = b64string + b"=" * (-len(b64string) % 4) 

291 return base64.urlsafe_b64decode(padded) 

292 

293 

294def unpadded_urlsafe_b64encode(value): 

295 """Encodes base64 strings removing any padding characters. 

296 

297 `rfc 7515`_ defines Base64url to NOT include any padding 

298 characters, but the stdlib doesn't do that by default. 

299 

300 _rfc7515: https://tools.ietf.org/html/rfc7515#page-6 

301 

302 Args: 

303 value (Union[str|bytes]): The bytes-like value to encode 

304 

305 Returns: 

306 Union[str|bytes]: The encoded value 

307 """ 

308 return base64.urlsafe_b64encode(value).rstrip(b"=") 

309 

310 

311def get_bool_from_env(variable_name, default=False): 

312 """Gets a boolean value from an environment variable. 

313 

314 The environment variable is interpreted as a boolean with the following 

315 (case-insensitive) rules: 

316 - "true", "1" are considered true. 

317 - "false", "0" are considered false. 

318 Any other values will raise an exception. 

319 

320 Args: 

321 variable_name (str): The name of the environment variable. 

322 default (bool): The default value if the environment variable is not 

323 set. 

324 

325 Returns: 

326 bool: The boolean value of the environment variable. 

327 

328 Raises: 

329 google.auth.exceptions.InvalidValue: If the environment variable is 

330 set to a value that can not be interpreted as a boolean. 

331 """ 

332 value = os.environ.get(variable_name) 

333 

334 if value is None: 

335 return default 

336 

337 value = value.lower() 

338 

339 if value in ("true", "1"): 

340 return True 

341 elif value in ("false", "0"): 

342 return False 

343 else: 

344 raise exceptions.InvalidValue( 

345 'Environment variable "{}" must be one of "true", "false", "1", or "0".'.format( 

346 variable_name 

347 ) 

348 ) 

349 

350 

351def is_python_3(): 

352 """Check if the Python interpreter is Python 2 or 3. 

353 

354 Returns: 

355 bool: True if the Python interpreter is Python 3 and False otherwise. 

356 """ 

357 

358 return sys.version_info > (3, 0) # pragma: NO COVER 

359 

360 

361def _hash_sensitive_info(data: Union[dict, list]) -> Union[dict, list, str]: 

362 """ 

363 Hashes sensitive information within a dictionary. 

364 

365 Args: 

366 data: The dictionary containing data to be processed. 

367 

368 Returns: 

369 A new dictionary with sensitive values replaced by their SHA512 hashes. 

370 If the input is a list, returns a list with each element recursively processed. 

371 If the input is neither a dict nor a list, returns the type of the input as a string. 

372 

373 """ 

374 if isinstance(data, dict): 

375 hashed_data: Dict[Any, Union[Optional[str], dict, list]] = {} 

376 for key, value in data.items(): 

377 if key in _SENSITIVE_FIELDS and not isinstance(value, (dict, list)): 

378 hashed_data[key] = _hash_value(value, key) 

379 elif isinstance(value, (dict, list)): 

380 hashed_data[key] = _hash_sensitive_info(value) 

381 else: 

382 hashed_data[key] = value 

383 return hashed_data 

384 elif isinstance(data, list): 

385 hashed_list = [] 

386 for val in data: 

387 hashed_list.append(_hash_sensitive_info(val)) 

388 return hashed_list 

389 else: 

390 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1701): 

391 # Investigate and hash sensitive info before logging when the data type is 

392 # not a dict or a list. 

393 return str(type(data)) 

394 

395 

396def _hash_value(value, field_name: str) -> Optional[str]: 

397 """Hashes a value and returns a formatted hash string.""" 

398 if value is None: 

399 return None 

400 encoded_value = str(value).encode("utf-8") 

401 hash_object = hashlib.sha512() 

402 hash_object.update(encoded_value) 

403 hex_digest = hash_object.hexdigest() 

404 return f"hashed_{field_name}-{hex_digest}" 

405 

406 

407def _logger_configured(logger: logging.Logger) -> bool: 

408 """Determines whether `logger` has non-default configuration 

409 

410 Args: 

411 logger: The logger to check. 

412 

413 Returns: 

414 bool: Whether the logger has any non-default configuration. 

415 """ 

416 return ( 

417 logger.handlers != [] or logger.level != logging.NOTSET or not logger.propagate 

418 ) 

419 

420 

421def is_logging_enabled(logger: logging.Logger) -> bool: 

422 """ 

423 Checks if debug logging is enabled for the given logger. 

424 

425 Args: 

426 logger: The logging.Logger instance to check. 

427 

428 Returns: 

429 True if debug logging is enabled, False otherwise. 

430 """ 

431 # NOTE: Log propagation to the root logger is disabled unless 

432 # the base logger i.e. logging.getLogger("google") is 

433 # explicitly configured by the end user. Ideally this 

434 # needs to happen in the client layer (already does for GAPICs). 

435 # However, this is implemented here to avoid logging 

436 # (if a root logger is configured) when a version of google-auth 

437 # which supports logging is used with: 

438 # - an older version of a GAPIC which does not support logging. 

439 # - Apiary client which does not support logging. 

440 global _LOGGING_INITIALIZED 

441 if not _LOGGING_INITIALIZED: 

442 base_logger = logging.getLogger(_BASE_LOGGER_NAME) 

443 if not _logger_configured(base_logger): 

444 base_logger.propagate = False 

445 _LOGGING_INITIALIZED = True 

446 

447 return logger.isEnabledFor(logging.DEBUG) 

448 

449 

450def request_log( 

451 logger: logging.Logger, 

452 method: str, 

453 url: str, 

454 body: Optional[bytes], 

455 headers: Optional[Mapping[str, str]], 

456) -> None: 

457 """ 

458 Logs an HTTP request at the DEBUG level if logging is enabled. 

459 

460 Args: 

461 logger: The logging.Logger instance to use. 

462 method: The HTTP method (e.g., "GET", "POST"). 

463 url: The URL of the request. 

464 body: The request body (can be None). 

465 headers: The request headers (can be None). 

466 """ 

467 if is_logging_enabled(logger): 

468 content_type = ( 

469 headers["Content-Type"] if headers and "Content-Type" in headers else "" 

470 ) 

471 json_body = _parse_request_body(body, content_type=content_type) 

472 logged_body = _hash_sensitive_info(json_body) 

473 logger.debug( 

474 "Making request...", 

475 extra={ 

476 "httpRequest": { 

477 "method": method, 

478 "url": url, 

479 "body": logged_body, 

480 "headers": headers, 

481 } 

482 }, 

483 ) 

484 

485 

486def _parse_request_body(body: Optional[bytes], content_type: str = "") -> Any: 

487 """ 

488 Parses a request body, handling bytes and string types, and different content types. 

489 

490 Args: 

491 body (Optional[bytes]): The request body. 

492 content_type (str): The content type of the request body, e.g., "application/json", 

493 "application/x-www-form-urlencoded", or "text/plain". If empty, attempts 

494 to parse as JSON. 

495 

496 Returns: 

497 Parsed body (dict, str, or None). 

498 - JSON: Decodes if content_type is "application/json" or None (fallback). 

499 - URL-encoded: Parses if content_type is "application/x-www-form-urlencoded". 

500 - Plain text: Returns string if content_type is "text/plain". 

501 - None: Returns if body is None, UTF-8 decode fails, or content_type is unknown. 

502 """ 

503 if body is None: 

504 return None 

505 try: 

506 body_str = body.decode("utf-8") 

507 except (UnicodeDecodeError, AttributeError): 

508 return None 

509 content_type = content_type.lower() 

510 if not content_type or "application/json" in content_type: 

511 try: 

512 return json.loads(body_str) 

513 except (TypeError, ValueError): 

514 return body_str 

515 if "application/x-www-form-urlencoded" in content_type: 

516 parsed_query = urllib.parse.parse_qs(body_str) 

517 result = {k: v[0] for k, v in parsed_query.items()} 

518 return result 

519 if "text/plain" in content_type: 

520 return body_str 

521 return None 

522 

523 

524def _parse_response(response: Any) -> Any: 

525 """ 

526 Parses a response, attempting to decode JSON. 

527 

528 Args: 

529 response: The response object to parse. This can be any type, but 

530 it is expected to have a `json()` method if it contains JSON. 

531 

532 Returns: 

533 The parsed response. If the response contains valid JSON, the 

534 decoded JSON object (e.g., a dictionary or list) is returned. 

535 If the response does not have a `json()` method or if the JSON 

536 decoding fails, None is returned. 

537 """ 

538 try: 

539 json_response = response.json() 

540 return json_response 

541 except Exception: 

542 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1744): 

543 # Parse and return response payload as json based on different content types. 

544 return None 

545 

546 

547def _response_log_base(logger: logging.Logger, parsed_response: Any) -> None: 

548 """ 

549 Logs a parsed HTTP response at the DEBUG level. 

550 

551 This internal helper function takes a parsed response and logs it 

552 using the provided logger. It also applies a hashing function to 

553 potentially sensitive information before logging. 

554 

555 Args: 

556 logger: The logging.Logger instance to use for logging. 

557 parsed_response: The parsed HTTP response object (e.g., a dictionary, 

558 list, or the original response if parsing failed). 

559 """ 

560 

561 logged_response = _hash_sensitive_info(parsed_response) 

562 logger.debug("Response received...", extra={"httpResponse": logged_response}) 

563 

564 

565def response_log(logger: logging.Logger, response: Any) -> None: 

566 """ 

567 Logs an HTTP response at the DEBUG level if logging is enabled. 

568 

569 Args: 

570 logger: The logging.Logger instance to use. 

571 response: The HTTP response object to log. 

572 """ 

573 if is_logging_enabled(logger): 

574 json_response = _parse_response(response) 

575 _response_log_base(logger, json_response)