Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_helpers.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

151 statements  

1# Copyright 2015 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helper functions for commonly used utilities.""" 

16 

17import base64 

18import calendar 

19import datetime 

20from email.message import Message 

21import hashlib 

22import json 

23import logging 

24import os 

25import sys 

26from typing import Any, Dict, Mapping, Optional, Union 

27import urllib 

28 

29from google.auth import exceptions 

30 

31 

32# _BASE_LOGGER_NAME is the base logger for all google-based loggers. 

33_BASE_LOGGER_NAME = "google" 

34 

35# _LOGGING_INITIALIZED ensures that base logger is only configured once 

36# (unless already configured by the end-user). 

37_LOGGING_INITIALIZED = False 

38 

39 

40# The smallest MDS cache used by this library stores tokens until 4 minutes from 

41# expiry. 

42REFRESH_THRESHOLD = datetime.timedelta(minutes=3, seconds=45) 

43 

44# TODO(https://github.com/googleapis/google-auth-library-python/issues/1684): Audit and update the list below. 

45_SENSITIVE_FIELDS = { 

46 "accessToken", 

47 "access_token", 

48 "id_token", 

49 "client_id", 

50 "refresh_token", 

51 "client_secret", 

52} 

53 

54 

55def copy_docstring(source_class): 

56 """Decorator that copies a method's docstring from another class. 

57 

58 Args: 

59 source_class (type): The class that has the documented method. 

60 

61 Returns: 

62 Callable: A decorator that will copy the docstring of the same 

63 named method in the source class to the decorated method. 

64 """ 

65 

66 def decorator(method): 

67 """Decorator implementation. 

68 

69 Args: 

70 method (Callable): The method to copy the docstring to. 

71 

72 Returns: 

73 Callable: the same method passed in with an updated docstring. 

74 

75 Raises: 

76 google.auth.exceptions.InvalidOperation: if the method already has a docstring. 

77 """ 

78 if method.__doc__: 

79 raise exceptions.InvalidOperation("Method already has a docstring.") 

80 

81 source_method = getattr(source_class, method.__name__) 

82 method.__doc__ = source_method.__doc__ 

83 

84 return method 

85 

86 return decorator 

87 

88 

89def parse_content_type(header_value): 

90 """Parse a 'content-type' header value to get just the plain media-type (without parameters). 

91 

92 This is done using the class Message from email.message as suggested in PEP 594 

93 (because the cgi is now deprecated and will be removed in python 3.13, 

94 see https://peps.python.org/pep-0594/#cgi). 

95 

96 Args: 

97 header_value (str): The value of a 'content-type' header as a string. 

98 

99 Returns: 

100 str: A string with just the lowercase media-type from the parsed 'content-type' header. 

101 If the provided content-type is not parsable, returns 'text/plain', 

102 the default value for textual files. 

103 """ 

104 m = Message() 

105 m["content-type"] = header_value 

106 return ( 

107 m.get_content_type() 

108 ) # Despite the name, actually returns just the media-type 

109 

110 

111def utcnow(): 

112 """Returns the current UTC datetime. 

113 

114 Returns: 

115 datetime: The current time in UTC. 

116 """ 

117 # We used datetime.utcnow() before, since it's deprecated from python 3.12, 

118 # we are using datetime.now(timezone.utc) now. "utcnow()" is offset-native 

119 # (no timezone info), but "now()" is offset-aware (with timezone info). 

120 # This will cause datetime comparison problem. For backward compatibility, 

121 # we need to remove the timezone info. 

122 now = datetime.datetime.now(datetime.timezone.utc) 

123 now = now.replace(tzinfo=None) 

124 return now 

125 

126 

127def datetime_to_secs(value): 

128 """Convert a datetime object to the number of seconds since the UNIX epoch. 

129 

130 Args: 

131 value (datetime): The datetime to convert. 

132 

133 Returns: 

134 int: The number of seconds since the UNIX epoch. 

135 """ 

136 return calendar.timegm(value.utctimetuple()) 

137 

138 

139def to_bytes(value, encoding="utf-8"): 

140 """Converts a string value to bytes, if necessary. 

141 

142 Args: 

143 value (Union[str, bytes]): The value to be converted. 

144 encoding (str): The encoding to use to convert unicode to bytes. 

145 Defaults to "utf-8". 

146 

147 Returns: 

148 bytes: The original value converted to bytes (if unicode) or as 

149 passed in if it started out as bytes. 

150 

151 Raises: 

152 google.auth.exceptions.InvalidValue: If the value could not be converted to bytes. 

153 """ 

154 result = value.encode(encoding) if isinstance(value, str) else value 

155 if isinstance(result, bytes): 

156 return result 

157 else: 

158 raise exceptions.InvalidValue( 

159 "{0!r} could not be converted to bytes".format(value) 

160 ) 

161 

162 

163def from_bytes(value): 

164 """Converts bytes to a string value, if necessary. 

165 

166 Args: 

167 value (Union[str, bytes]): The value to be converted. 

168 

169 Returns: 

170 str: The original value converted to unicode (if bytes) or as passed in 

171 if it started out as unicode. 

172 

173 Raises: 

174 google.auth.exceptions.InvalidValue: If the value could not be converted to unicode. 

175 """ 

176 result = value.decode("utf-8") if isinstance(value, bytes) else value 

177 if isinstance(result, str): 

178 return result 

179 else: 

180 raise exceptions.InvalidValue( 

181 "{0!r} could not be converted to unicode".format(value) 

182 ) 

183 

184 

185def update_query(url, params, remove=None): 

186 """Updates a URL's query parameters. 

187 

188 Replaces any current values if they are already present in the URL. 

189 

190 Args: 

191 url (str): The URL to update. 

192 params (Mapping[str, str]): A mapping of query parameter 

193 keys to values. 

194 remove (Sequence[str]): Parameters to remove from the query string. 

195 

196 Returns: 

197 str: The URL with updated query parameters. 

198 

199 Examples: 

200 

201 >>> url = 'http://example.com?a=1' 

202 >>> update_query(url, {'a': '2'}) 

203 http://example.com?a=2 

204 >>> update_query(url, {'b': '3'}) 

205 http://example.com?a=1&b=3 

206 >> update_query(url, {'b': '3'}, remove=['a']) 

207 http://example.com?b=3 

208 

209 """ 

210 if remove is None: 

211 remove = [] 

212 

213 # Split the URL into parts. 

214 parts = urllib.parse.urlparse(url) 

215 # Parse the query string. 

216 query_params = urllib.parse.parse_qs(parts.query) 

217 # Update the query parameters with the new parameters. 

218 query_params.update(params) 

219 # Remove any values specified in remove. 

220 query_params = { 

221 key: value for key, value in query_params.items() if key not in remove 

222 } 

223 # Re-encoded the query string. 

224 new_query = urllib.parse.urlencode(query_params, doseq=True) 

225 # Unsplit the url. 

226 new_parts = parts._replace(query=new_query) 

227 return urllib.parse.urlunparse(new_parts) 

228 

229 

230def scopes_to_string(scopes): 

231 """Converts scope value to a string suitable for sending to OAuth 2.0 

232 authorization servers. 

233 

234 Args: 

235 scopes (Sequence[str]): The sequence of scopes to convert. 

236 

237 Returns: 

238 str: The scopes formatted as a single string. 

239 """ 

240 return " ".join(scopes) 

241 

242 

243def string_to_scopes(scopes): 

244 """Converts stringifed scopes value to a list. 

245 

246 Args: 

247 scopes (Union[Sequence, str]): The string of space-separated scopes 

248 to convert. 

249 Returns: 

250 Sequence(str): The separated scopes. 

251 """ 

252 if not scopes: 

253 return [] 

254 

255 return scopes.split(" ") 

256 

257 

258def padded_urlsafe_b64decode(value): 

259 """Decodes base64 strings lacking padding characters. 

260 

261 Google infrastructure tends to omit the base64 padding characters. 

262 

263 Args: 

264 value (Union[str, bytes]): The encoded value. 

265 

266 Returns: 

267 bytes: The decoded value 

268 """ 

269 b64string = to_bytes(value) 

270 padded = b64string + b"=" * (-len(b64string) % 4) 

271 return base64.urlsafe_b64decode(padded) 

272 

273 

274def unpadded_urlsafe_b64encode(value): 

275 """Encodes base64 strings removing any padding characters. 

276 

277 `rfc 7515`_ defines Base64url to NOT include any padding 

278 characters, but the stdlib doesn't do that by default. 

279 

280 _rfc7515: https://tools.ietf.org/html/rfc7515#page-6 

281 

282 Args: 

283 value (Union[str|bytes]): The bytes-like value to encode 

284 

285 Returns: 

286 Union[str|bytes]: The encoded value 

287 """ 

288 return base64.urlsafe_b64encode(value).rstrip(b"=") 

289 

290 

291def get_bool_from_env(variable_name, default=False): 

292 """Gets a boolean value from an environment variable. 

293 

294 The environment variable is interpreted as a boolean with the following 

295 (case-insensitive) rules: 

296 - "true", "1" are considered true. 

297 - "false", "0" are considered false. 

298 Any other values will raise an exception. 

299 

300 Args: 

301 variable_name (str): The name of the environment variable. 

302 default (bool): The default value if the environment variable is not 

303 set. 

304 

305 Returns: 

306 bool: The boolean value of the environment variable. 

307 

308 Raises: 

309 google.auth.exceptions.InvalidValue: If the environment variable is 

310 set to a value that can not be interpreted as a boolean. 

311 """ 

312 value = os.environ.get(variable_name) 

313 

314 if value is None: 

315 return default 

316 

317 value = value.lower() 

318 

319 if value in ("true", "1"): 

320 return True 

321 elif value in ("false", "0"): 

322 return False 

323 else: 

324 raise exceptions.InvalidValue( 

325 'Environment variable "{}" must be one of "true", "false", "1", or "0".'.format( 

326 variable_name 

327 ) 

328 ) 

329 

330 

331def is_python_3(): 

332 """Check if the Python interpreter is Python 2 or 3. 

333 

334 Returns: 

335 bool: True if the Python interpreter is Python 3 and False otherwise. 

336 """ 

337 

338 return sys.version_info > (3, 0) # pragma: NO COVER 

339 

340 

341def _hash_sensitive_info(data: Union[dict, list]) -> Union[dict, list, str]: 

342 """ 

343 Hashes sensitive information within a dictionary. 

344 

345 Args: 

346 data: The dictionary containing data to be processed. 

347 

348 Returns: 

349 A new dictionary with sensitive values replaced by their SHA512 hashes. 

350 If the input is a list, returns a list with each element recursively processed. 

351 If the input is neither a dict nor a list, returns the type of the input as a string. 

352 

353 """ 

354 if isinstance(data, dict): 

355 hashed_data: Dict[Any, Union[Optional[str], dict, list]] = {} 

356 for key, value in data.items(): 

357 if key in _SENSITIVE_FIELDS and not isinstance(value, (dict, list)): 

358 hashed_data[key] = _hash_value(value, key) 

359 elif isinstance(value, (dict, list)): 

360 hashed_data[key] = _hash_sensitive_info(value) 

361 else: 

362 hashed_data[key] = value 

363 return hashed_data 

364 elif isinstance(data, list): 

365 hashed_list = [] 

366 for val in data: 

367 hashed_list.append(_hash_sensitive_info(val)) 

368 return hashed_list 

369 else: 

370 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1701): 

371 # Investigate and hash sensitive info before logging when the data type is 

372 # not a dict or a list. 

373 return str(type(data)) 

374 

375 

376def _hash_value(value, field_name: str) -> Optional[str]: 

377 """Hashes a value and returns a formatted hash string.""" 

378 if value is None: 

379 return None 

380 encoded_value = str(value).encode("utf-8") 

381 hash_object = hashlib.sha512() 

382 hash_object.update(encoded_value) 

383 hex_digest = hash_object.hexdigest() 

384 return f"hashed_{field_name}-{hex_digest}" 

385 

386 

387def _logger_configured(logger: logging.Logger) -> bool: 

388 """Determines whether `logger` has non-default configuration 

389 

390 Args: 

391 logger: The logger to check. 

392 

393 Returns: 

394 bool: Whether the logger has any non-default configuration. 

395 """ 

396 return ( 

397 logger.handlers != [] or logger.level != logging.NOTSET or not logger.propagate 

398 ) 

399 

400 

401def is_logging_enabled(logger: logging.Logger) -> bool: 

402 """ 

403 Checks if debug logging is enabled for the given logger. 

404 

405 Args: 

406 logger: The logging.Logger instance to check. 

407 

408 Returns: 

409 True if debug logging is enabled, False otherwise. 

410 """ 

411 # NOTE: Log propagation to the root logger is disabled unless 

412 # the base logger i.e. logging.getLogger("google") is 

413 # explicitly configured by the end user. Ideally this 

414 # needs to happen in the client layer (already does for GAPICs). 

415 # However, this is implemented here to avoid logging 

416 # (if a root logger is configured) when a version of google-auth 

417 # which supports logging is used with: 

418 # - an older version of a GAPIC which does not support logging. 

419 # - Apiary client which does not support logging. 

420 global _LOGGING_INITIALIZED 

421 if not _LOGGING_INITIALIZED: 

422 base_logger = logging.getLogger(_BASE_LOGGER_NAME) 

423 if not _logger_configured(base_logger): 

424 base_logger.propagate = False 

425 _LOGGING_INITIALIZED = True 

426 

427 return logger.isEnabledFor(logging.DEBUG) 

428 

429 

430def request_log( 

431 logger: logging.Logger, 

432 method: str, 

433 url: str, 

434 body: Optional[bytes], 

435 headers: Optional[Mapping[str, str]], 

436) -> None: 

437 """ 

438 Logs an HTTP request at the DEBUG level if logging is enabled. 

439 

440 Args: 

441 logger: The logging.Logger instance to use. 

442 method: The HTTP method (e.g., "GET", "POST"). 

443 url: The URL of the request. 

444 body: The request body (can be None). 

445 headers: The request headers (can be None). 

446 """ 

447 if is_logging_enabled(logger): 

448 content_type = ( 

449 headers["Content-Type"] if headers and "Content-Type" in headers else "" 

450 ) 

451 json_body = _parse_request_body(body, content_type=content_type) 

452 logged_body = _hash_sensitive_info(json_body) 

453 logger.debug( 

454 "Making request...", 

455 extra={ 

456 "httpRequest": { 

457 "method": method, 

458 "url": url, 

459 "body": logged_body, 

460 "headers": headers, 

461 } 

462 }, 

463 ) 

464 

465 

466def _parse_request_body(body: Optional[bytes], content_type: str = "") -> Any: 

467 """ 

468 Parses a request body, handling bytes and string types, and different content types. 

469 

470 Args: 

471 body (Optional[bytes]): The request body. 

472 content_type (str): The content type of the request body, e.g., "application/json", 

473 "application/x-www-form-urlencoded", or "text/plain". If empty, attempts 

474 to parse as JSON. 

475 

476 Returns: 

477 Parsed body (dict, str, or None). 

478 - JSON: Decodes if content_type is "application/json" or None (fallback). 

479 - URL-encoded: Parses if content_type is "application/x-www-form-urlencoded". 

480 - Plain text: Returns string if content_type is "text/plain". 

481 - None: Returns if body is None, UTF-8 decode fails, or content_type is unknown. 

482 """ 

483 if body is None: 

484 return None 

485 try: 

486 body_str = body.decode("utf-8") 

487 except (UnicodeDecodeError, AttributeError): 

488 return None 

489 content_type = content_type.lower() 

490 if not content_type or "application/json" in content_type: 

491 try: 

492 return json.loads(body_str) 

493 except (TypeError, ValueError): 

494 return body_str 

495 if "application/x-www-form-urlencoded" in content_type: 

496 parsed_query = urllib.parse.parse_qs(body_str) 

497 result = {k: v[0] for k, v in parsed_query.items()} 

498 return result 

499 if "text/plain" in content_type: 

500 return body_str 

501 return None 

502 

503 

504def _parse_response(response: Any) -> Any: 

505 """ 

506 Parses a response, attempting to decode JSON. 

507 

508 Args: 

509 response: The response object to parse. This can be any type, but 

510 it is expected to have a `json()` method if it contains JSON. 

511 

512 Returns: 

513 The parsed response. If the response contains valid JSON, the 

514 decoded JSON object (e.g., a dictionary or list) is returned. 

515 If the response does not have a `json()` method or if the JSON 

516 decoding fails, None is returned. 

517 """ 

518 try: 

519 json_response = response.json() 

520 return json_response 

521 except Exception: 

522 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1744): 

523 # Parse and return response payload as json based on different content types. 

524 return None 

525 

526 

527def _response_log_base(logger: logging.Logger, parsed_response: Any) -> None: 

528 """ 

529 Logs a parsed HTTP response at the DEBUG level. 

530 

531 This internal helper function takes a parsed response and logs it 

532 using the provided logger. It also applies a hashing function to 

533 potentially sensitive information before logging. 

534 

535 Args: 

536 logger: The logging.Logger instance to use for logging. 

537 parsed_response: The parsed HTTP response object (e.g., a dictionary, 

538 list, or the original response if parsing failed). 

539 """ 

540 

541 logged_response = _hash_sensitive_info(parsed_response) 

542 logger.debug("Response received...", extra={"httpResponse": logged_response}) 

543 

544 

545def response_log(logger: logging.Logger, response: Any) -> None: 

546 """ 

547 Logs an HTTP response at the DEBUG level if logging is enabled. 

548 

549 Args: 

550 logger: The logging.Logger instance to use. 

551 response: The HTTP response object to log. 

552 """ 

553 if is_logging_enabled(logger): 

554 json_response = _parse_response(response) 

555 _response_log_base(logger, json_response)