Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_helpers.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

144 statements  

1# Copyright 2015 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helper functions for commonly used utilities.""" 

16 

17import base64 

18import calendar 

19import datetime 

20from email.message import Message 

21import hashlib 

22import json 

23import logging 

24import sys 

25from typing import Any, Dict, Mapping, Optional, Union 

26import urllib 

27 

28from google.auth import exceptions 

29 

30 

31# _BASE_LOGGER_NAME is the base logger for all google-based loggers. 

32_BASE_LOGGER_NAME = "google" 

33 

34# _LOGGING_INITIALIZED ensures that base logger is only configured once 

35# (unless already configured by the end-user). 

36_LOGGING_INITIALIZED = False 

37 

38 

39# The smallest MDS cache used by this library stores tokens until 4 minutes from 

40# expiry. 

41REFRESH_THRESHOLD = datetime.timedelta(minutes=3, seconds=45) 

42 

43# TODO(https://github.com/googleapis/google-auth-library-python/issues/1684): Audit and update the list below. 

44_SENSITIVE_FIELDS = { 

45 "accessToken", 

46 "access_token", 

47 "id_token", 

48 "client_id", 

49 "refresh_token", 

50 "client_secret", 

51} 

52 

53 

54def copy_docstring(source_class): 

55 """Decorator that copies a method's docstring from another class. 

56 

57 Args: 

58 source_class (type): The class that has the documented method. 

59 

60 Returns: 

61 Callable: A decorator that will copy the docstring of the same 

62 named method in the source class to the decorated method. 

63 """ 

64 

65 def decorator(method): 

66 """Decorator implementation. 

67 

68 Args: 

69 method (Callable): The method to copy the docstring to. 

70 

71 Returns: 

72 Callable: the same method passed in with an updated docstring. 

73 

74 Raises: 

75 google.auth.exceptions.InvalidOperation: if the method already has a docstring. 

76 """ 

77 if method.__doc__: 

78 raise exceptions.InvalidOperation("Method already has a docstring.") 

79 

80 source_method = getattr(source_class, method.__name__) 

81 method.__doc__ = source_method.__doc__ 

82 

83 return method 

84 

85 return decorator 

86 

87 

88def parse_content_type(header_value): 

89 """Parse a 'content-type' header value to get just the plain media-type (without parameters). 

90 

91 This is done using the class Message from email.message as suggested in PEP 594 

92 (because the cgi is now deprecated and will be removed in python 3.13, 

93 see https://peps.python.org/pep-0594/#cgi). 

94 

95 Args: 

96 header_value (str): The value of a 'content-type' header as a string. 

97 

98 Returns: 

99 str: A string with just the lowercase media-type from the parsed 'content-type' header. 

100 If the provided content-type is not parsable, returns 'text/plain', 

101 the default value for textual files. 

102 """ 

103 m = Message() 

104 m["content-type"] = header_value 

105 return ( 

106 m.get_content_type() 

107 ) # Despite the name, actually returns just the media-type 

108 

109 

110def utcnow(): 

111 """Returns the current UTC datetime. 

112 

113 Returns: 

114 datetime: The current time in UTC. 

115 """ 

116 # We used datetime.utcnow() before, since it's deprecated from python 3.12, 

117 # we are using datetime.now(timezone.utc) now. "utcnow()" is offset-native 

118 # (no timezone info), but "now()" is offset-aware (with timezone info). 

119 # This will cause datetime comparison problem. For backward compatibility, 

120 # we need to remove the timezone info. 

121 now = datetime.datetime.now(datetime.timezone.utc) 

122 now = now.replace(tzinfo=None) 

123 return now 

124 

125 

126def utcfromtimestamp(timestamp): 

127 """Returns the UTC datetime from a timestamp. 

128 

129 Args: 

130 timestamp (float): The timestamp to convert. 

131 

132 Returns: 

133 datetime: The time in UTC. 

134 """ 

135 # We used datetime.utcfromtimestamp() before, since it's deprecated from 

136 # python 3.12, we are using datetime.fromtimestamp(timestamp, timezone.utc) 

137 # now. "utcfromtimestamp()" is offset-native (no timezone info), but 

138 # "fromtimestamp(timestamp, timezone.utc)" is offset-aware (with timezone 

139 # info). This will cause datetime comparison problem. For backward 

140 # compatibility, we need to remove the timezone info. 

141 dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) 

142 dt = dt.replace(tzinfo=None) 

143 return dt 

144 

145 

146def datetime_to_secs(value): 

147 """Convert a datetime object to the number of seconds since the UNIX epoch. 

148 

149 Args: 

150 value (datetime): The datetime to convert. 

151 

152 Returns: 

153 int: The number of seconds since the UNIX epoch. 

154 """ 

155 return calendar.timegm(value.utctimetuple()) 

156 

157 

158def to_bytes(value, encoding="utf-8"): 

159 """Converts a string value to bytes, if necessary. 

160 

161 Args: 

162 value (Union[str, bytes]): The value to be converted. 

163 encoding (str): The encoding to use to convert unicode to bytes. 

164 Defaults to "utf-8". 

165 

166 Returns: 

167 bytes: The original value converted to bytes (if unicode) or as 

168 passed in if it started out as bytes. 

169 

170 Raises: 

171 google.auth.exceptions.InvalidValue: If the value could not be converted to bytes. 

172 """ 

173 result = value.encode(encoding) if isinstance(value, str) else value 

174 if isinstance(result, bytes): 

175 return result 

176 else: 

177 raise exceptions.InvalidValue( 

178 "{0!r} could not be converted to bytes".format(value) 

179 ) 

180 

181 

182def from_bytes(value): 

183 """Converts bytes to a string value, if necessary. 

184 

185 Args: 

186 value (Union[str, bytes]): The value to be converted. 

187 

188 Returns: 

189 str: The original value converted to unicode (if bytes) or as passed in 

190 if it started out as unicode. 

191 

192 Raises: 

193 google.auth.exceptions.InvalidValue: If the value could not be converted to unicode. 

194 """ 

195 result = value.decode("utf-8") if isinstance(value, bytes) else value 

196 if isinstance(result, str): 

197 return result 

198 else: 

199 raise exceptions.InvalidValue( 

200 "{0!r} could not be converted to unicode".format(value) 

201 ) 

202 

203 

204def update_query(url, params, remove=None): 

205 """Updates a URL's query parameters. 

206 

207 Replaces any current values if they are already present in the URL. 

208 

209 Args: 

210 url (str): The URL to update. 

211 params (Mapping[str, str]): A mapping of query parameter 

212 keys to values. 

213 remove (Sequence[str]): Parameters to remove from the query string. 

214 

215 Returns: 

216 str: The URL with updated query parameters. 

217 

218 Examples: 

219 

220 >>> url = 'http://example.com?a=1' 

221 >>> update_query(url, {'a': '2'}) 

222 http://example.com?a=2 

223 >>> update_query(url, {'b': '3'}) 

224 http://example.com?a=1&b=3 

225 >> update_query(url, {'b': '3'}, remove=['a']) 

226 http://example.com?b=3 

227 

228 """ 

229 if remove is None: 

230 remove = [] 

231 

232 # Split the URL into parts. 

233 parts = urllib.parse.urlparse(url) 

234 # Parse the query string. 

235 query_params = urllib.parse.parse_qs(parts.query) 

236 # Update the query parameters with the new parameters. 

237 query_params.update(params) 

238 # Remove any values specified in remove. 

239 query_params = { 

240 key: value for key, value in query_params.items() if key not in remove 

241 } 

242 # Re-encoded the query string. 

243 new_query = urllib.parse.urlencode(query_params, doseq=True) 

244 # Unsplit the url. 

245 new_parts = parts._replace(query=new_query) 

246 return urllib.parse.urlunparse(new_parts) 

247 

248 

249def scopes_to_string(scopes): 

250 """Converts scope value to a string suitable for sending to OAuth 2.0 

251 authorization servers. 

252 

253 Args: 

254 scopes (Sequence[str]): The sequence of scopes to convert. 

255 

256 Returns: 

257 str: The scopes formatted as a single string. 

258 """ 

259 return " ".join(scopes) 

260 

261 

262def string_to_scopes(scopes): 

263 """Converts stringifed scopes value to a list. 

264 

265 Args: 

266 scopes (Union[Sequence, str]): The string of space-separated scopes 

267 to convert. 

268 Returns: 

269 Sequence(str): The separated scopes. 

270 """ 

271 if not scopes: 

272 return [] 

273 

274 return scopes.split(" ") 

275 

276 

277def padded_urlsafe_b64decode(value): 

278 """Decodes base64 strings lacking padding characters. 

279 

280 Google infrastructure tends to omit the base64 padding characters. 

281 

282 Args: 

283 value (Union[str, bytes]): The encoded value. 

284 

285 Returns: 

286 bytes: The decoded value 

287 """ 

288 b64string = to_bytes(value) 

289 padded = b64string + b"=" * (-len(b64string) % 4) 

290 return base64.urlsafe_b64decode(padded) 

291 

292 

293def unpadded_urlsafe_b64encode(value): 

294 """Encodes base64 strings removing any padding characters. 

295 

296 `rfc 7515`_ defines Base64url to NOT include any padding 

297 characters, but the stdlib doesn't do that by default. 

298 

299 _rfc7515: https://tools.ietf.org/html/rfc7515#page-6 

300 

301 Args: 

302 value (Union[str|bytes]): The bytes-like value to encode 

303 

304 Returns: 

305 Union[str|bytes]: The encoded value 

306 """ 

307 return base64.urlsafe_b64encode(value).rstrip(b"=") 

308 

309 

310def is_python_3(): 

311 """Check if the Python interpreter is Python 2 or 3. 

312 

313 Returns: 

314 bool: True if the Python interpreter is Python 3 and False otherwise. 

315 """ 

316 

317 return sys.version_info > (3, 0) # pragma: NO COVER 

318 

319 

320def _hash_sensitive_info(data: Union[dict, list]) -> Union[dict, list, str]: 

321 """ 

322 Hashes sensitive information within a dictionary. 

323 

324 Args: 

325 data: The dictionary containing data to be processed. 

326 

327 Returns: 

328 A new dictionary with sensitive values replaced by their SHA512 hashes. 

329 If the input is a list, returns a list with each element recursively processed. 

330 If the input is neither a dict nor a list, returns the type of the input as a string. 

331 

332 """ 

333 if isinstance(data, dict): 

334 hashed_data: Dict[Any, Union[Optional[str], dict, list]] = {} 

335 for key, value in data.items(): 

336 if key in _SENSITIVE_FIELDS and not isinstance(value, (dict, list)): 

337 hashed_data[key] = _hash_value(value, key) 

338 elif isinstance(value, (dict, list)): 

339 hashed_data[key] = _hash_sensitive_info(value) 

340 else: 

341 hashed_data[key] = value 

342 return hashed_data 

343 elif isinstance(data, list): 

344 hashed_list = [] 

345 for val in data: 

346 hashed_list.append(_hash_sensitive_info(val)) 

347 return hashed_list 

348 else: 

349 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1701): 

350 # Investigate and hash sensitive info before logging when the data type is 

351 # not a dict or a list. 

352 return str(type(data)) 

353 

354 

355def _hash_value(value, field_name: str) -> Optional[str]: 

356 """Hashes a value and returns a formatted hash string.""" 

357 if value is None: 

358 return None 

359 encoded_value = str(value).encode("utf-8") 

360 hash_object = hashlib.sha512() 

361 hash_object.update(encoded_value) 

362 hex_digest = hash_object.hexdigest() 

363 return f"hashed_{field_name}-{hex_digest}" 

364 

365 

366def _logger_configured(logger: logging.Logger) -> bool: 

367 """Determines whether `logger` has non-default configuration 

368 

369 Args: 

370 logger: The logger to check. 

371 

372 Returns: 

373 bool: Whether the logger has any non-default configuration. 

374 """ 

375 return ( 

376 logger.handlers != [] or logger.level != logging.NOTSET or not logger.propagate 

377 ) 

378 

379 

380def is_logging_enabled(logger: logging.Logger) -> bool: 

381 """ 

382 Checks if debug logging is enabled for the given logger. 

383 

384 Args: 

385 logger: The logging.Logger instance to check. 

386 

387 Returns: 

388 True if debug logging is enabled, False otherwise. 

389 """ 

390 # NOTE: Log propagation to the root logger is disabled unless 

391 # the base logger i.e. logging.getLogger("google") is 

392 # explicitly configured by the end user. Ideally this 

393 # needs to happen in the client layer (already does for GAPICs). 

394 # However, this is implemented here to avoid logging 

395 # (if a root logger is configured) when a version of google-auth 

396 # which supports logging is used with: 

397 # - an older version of a GAPIC which does not support logging. 

398 # - Apiary client which does not support logging. 

399 global _LOGGING_INITIALIZED 

400 if not _LOGGING_INITIALIZED: 

401 base_logger = logging.getLogger(_BASE_LOGGER_NAME) 

402 if not _logger_configured(base_logger): 

403 base_logger.propagate = False 

404 _LOGGING_INITIALIZED = True 

405 

406 return logger.isEnabledFor(logging.DEBUG) 

407 

408 

409def request_log( 

410 logger: logging.Logger, 

411 method: str, 

412 url: str, 

413 body: Optional[bytes], 

414 headers: Optional[Mapping[str, str]], 

415) -> None: 

416 """ 

417 Logs an HTTP request at the DEBUG level if logging is enabled. 

418 

419 Args: 

420 logger: The logging.Logger instance to use. 

421 method: The HTTP method (e.g., "GET", "POST"). 

422 url: The URL of the request. 

423 body: The request body (can be None). 

424 headers: The request headers (can be None). 

425 """ 

426 if is_logging_enabled(logger): 

427 content_type = ( 

428 headers["Content-Type"] if headers and "Content-Type" in headers else "" 

429 ) 

430 json_body = _parse_request_body(body, content_type=content_type) 

431 logged_body = _hash_sensitive_info(json_body) 

432 logger.debug( 

433 "Making request...", 

434 extra={ 

435 "httpRequest": { 

436 "method": method, 

437 "url": url, 

438 "body": logged_body, 

439 "headers": headers, 

440 } 

441 }, 

442 ) 

443 

444 

445def _parse_request_body(body: Optional[bytes], content_type: str = "") -> Any: 

446 """ 

447 Parses a request body, handling bytes and string types, and different content types. 

448 

449 Args: 

450 body (Optional[bytes]): The request body. 

451 content_type (str): The content type of the request body, e.g., "application/json", 

452 "application/x-www-form-urlencoded", or "text/plain". If empty, attempts 

453 to parse as JSON. 

454 

455 Returns: 

456 Parsed body (dict, str, or None). 

457 - JSON: Decodes if content_type is "application/json" or None (fallback). 

458 - URL-encoded: Parses if content_type is "application/x-www-form-urlencoded". 

459 - Plain text: Returns string if content_type is "text/plain". 

460 - None: Returns if body is None, UTF-8 decode fails, or content_type is unknown. 

461 """ 

462 if body is None: 

463 return None 

464 try: 

465 body_str = body.decode("utf-8") 

466 except (UnicodeDecodeError, AttributeError): 

467 return None 

468 content_type = content_type.lower() 

469 if not content_type or "application/json" in content_type: 

470 try: 

471 return json.loads(body_str) 

472 except (TypeError, ValueError): 

473 return body_str 

474 if "application/x-www-form-urlencoded" in content_type: 

475 parsed_query = urllib.parse.parse_qs(body_str) 

476 result = {k: v[0] for k, v in parsed_query.items()} 

477 return result 

478 if "text/plain" in content_type: 

479 return body_str 

480 return None 

481 

482 

483def _parse_response(response: Any) -> Any: 

484 """ 

485 Parses a response, attempting to decode JSON. 

486 

487 Args: 

488 response: The response object to parse. This can be any type, but 

489 it is expected to have a `json()` method if it contains JSON. 

490 

491 Returns: 

492 The parsed response. If the response contains valid JSON, the 

493 decoded JSON object (e.g., a dictionary or list) is returned. 

494 If the response does not have a `json()` method or if the JSON 

495 decoding fails, None is returned. 

496 """ 

497 try: 

498 json_response = response.json() 

499 return json_response 

500 except Exception: 

501 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1744): 

502 # Parse and return response payload as json based on different content types. 

503 return None 

504 

505 

506def _response_log_base(logger: logging.Logger, parsed_response: Any) -> None: 

507 """ 

508 Logs a parsed HTTP response at the DEBUG level. 

509 

510 This internal helper function takes a parsed response and logs it 

511 using the provided logger. It also applies a hashing function to 

512 potentially sensitive information before logging. 

513 

514 Args: 

515 logger: The logging.Logger instance to use for logging. 

516 parsed_response: The parsed HTTP response object (e.g., a dictionary, 

517 list, or the original response if parsing failed). 

518 """ 

519 

520 logged_response = _hash_sensitive_info(parsed_response) 

521 logger.debug("Response received...", extra={"httpResponse": logged_response}) 

522 

523 

524def response_log(logger: logging.Logger, response: Any) -> None: 

525 """ 

526 Logs an HTTP response at the DEBUG level if logging is enabled. 

527 

528 Args: 

529 logger: The logging.Logger instance to use. 

530 response: The HTTP response object to log. 

531 """ 

532 if is_logging_enabled(logger): 

533 json_response = _parse_response(response) 

534 _response_log_base(logger, json_response)