Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_helpers.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

145 statements  

1# Copyright 2015 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helper functions for commonly used utilities.""" 

16 

17import base64 

18import calendar 

19import datetime 

20from email.message import Message 

21import hashlib 

22import json 

23import logging 

24import sys 

25from typing import Any, Dict, Mapping, Optional, Union 

26import urllib 

27 

28from google.auth import exceptions 

29 

30 

31DEFAULT_UNIVERSE_DOMAIN = "googleapis.com" 

32 

33# _BASE_LOGGER_NAME is the base logger for all google-based loggers. 

34_BASE_LOGGER_NAME = "google" 

35 

36# _LOGGING_INITIALIZED ensures that base logger is only configured once 

37# (unless already configured by the end-user). 

38_LOGGING_INITIALIZED = False 

39 

40 

41# The smallest MDS cache used by this library stores tokens until 4 minutes from 

42# expiry. 

43REFRESH_THRESHOLD = datetime.timedelta(minutes=3, seconds=45) 

44 

45# TODO(https://github.com/googleapis/google-auth-library-python/issues/1684): Audit and update the list below. 

46_SENSITIVE_FIELDS = { 

47 "accessToken", 

48 "access_token", 

49 "id_token", 

50 "client_id", 

51 "refresh_token", 

52 "client_secret", 

53} 

54 

55 

56def copy_docstring(source_class): 

57 """Decorator that copies a method's docstring from another class. 

58 

59 Args: 

60 source_class (type): The class that has the documented method. 

61 

62 Returns: 

63 Callable: A decorator that will copy the docstring of the same 

64 named method in the source class to the decorated method. 

65 """ 

66 

67 def decorator(method): 

68 """Decorator implementation. 

69 

70 Args: 

71 method (Callable): The method to copy the docstring to. 

72 

73 Returns: 

74 Callable: the same method passed in with an updated docstring. 

75 

76 Raises: 

77 google.auth.exceptions.InvalidOperation: if the method already has a docstring. 

78 """ 

79 if method.__doc__: 

80 raise exceptions.InvalidOperation("Method already has a docstring.") 

81 

82 source_method = getattr(source_class, method.__name__) 

83 method.__doc__ = source_method.__doc__ 

84 

85 return method 

86 

87 return decorator 

88 

89 

90def parse_content_type(header_value): 

91 """Parse a 'content-type' header value to get just the plain media-type (without parameters). 

92 

93 This is done using the class Message from email.message as suggested in PEP 594 

94 (because the cgi is now deprecated and will be removed in python 3.13, 

95 see https://peps.python.org/pep-0594/#cgi). 

96 

97 Args: 

98 header_value (str): The value of a 'content-type' header as a string. 

99 

100 Returns: 

101 str: A string with just the lowercase media-type from the parsed 'content-type' header. 

102 If the provided content-type is not parsable, returns 'text/plain', 

103 the default value for textual files. 

104 """ 

105 m = Message() 

106 m["content-type"] = header_value 

107 return ( 

108 m.get_content_type() 

109 ) # Despite the name, actually returns just the media-type 

110 

111 

112def utcnow(): 

113 """Returns the current UTC datetime. 

114 

115 Returns: 

116 datetime: The current time in UTC. 

117 """ 

118 # We used datetime.utcnow() before, since it's deprecated from python 3.12, 

119 # we are using datetime.now(timezone.utc) now. "utcnow()" is offset-native 

120 # (no timezone info), but "now()" is offset-aware (with timezone info). 

121 # This will cause datetime comparison problem. For backward compatibility, 

122 # we need to remove the timezone info. 

123 now = datetime.datetime.now(datetime.timezone.utc) 

124 now = now.replace(tzinfo=None) 

125 return now 

126 

127 

128def utcfromtimestamp(timestamp): 

129 """Returns the UTC datetime from a timestamp. 

130 

131 Args: 

132 timestamp (float): The timestamp to convert. 

133 

134 Returns: 

135 datetime: The time in UTC. 

136 """ 

137 # We used datetime.utcfromtimestamp() before, since it's deprecated from 

138 # python 3.12, we are using datetime.fromtimestamp(timestamp, timezone.utc) 

139 # now. "utcfromtimestamp()" is offset-native (no timezone info), but 

140 # "fromtimestamp(timestamp, timezone.utc)" is offset-aware (with timezone 

141 # info). This will cause datetime comparison problem. For backward 

142 # compatibility, we need to remove the timezone info. 

143 dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) 

144 dt = dt.replace(tzinfo=None) 

145 return dt 

146 

147 

148def datetime_to_secs(value): 

149 """Convert a datetime object to the number of seconds since the UNIX epoch. 

150 

151 Args: 

152 value (datetime): The datetime to convert. 

153 

154 Returns: 

155 int: The number of seconds since the UNIX epoch. 

156 """ 

157 return calendar.timegm(value.utctimetuple()) 

158 

159 

160def to_bytes(value, encoding="utf-8"): 

161 """Converts a string value to bytes, if necessary. 

162 

163 Args: 

164 value (Union[str, bytes]): The value to be converted. 

165 encoding (str): The encoding to use to convert unicode to bytes. 

166 Defaults to "utf-8". 

167 

168 Returns: 

169 bytes: The original value converted to bytes (if unicode) or as 

170 passed in if it started out as bytes. 

171 

172 Raises: 

173 google.auth.exceptions.InvalidValue: If the value could not be converted to bytes. 

174 """ 

175 result = value.encode(encoding) if isinstance(value, str) else value 

176 if isinstance(result, bytes): 

177 return result 

178 else: 

179 raise exceptions.InvalidValue( 

180 "{0!r} could not be converted to bytes".format(value) 

181 ) 

182 

183 

184def from_bytes(value): 

185 """Converts bytes to a string value, if necessary. 

186 

187 Args: 

188 value (Union[str, bytes]): The value to be converted. 

189 

190 Returns: 

191 str: The original value converted to unicode (if bytes) or as passed in 

192 if it started out as unicode. 

193 

194 Raises: 

195 google.auth.exceptions.InvalidValue: If the value could not be converted to unicode. 

196 """ 

197 result = value.decode("utf-8") if isinstance(value, bytes) else value 

198 if isinstance(result, str): 

199 return result 

200 else: 

201 raise exceptions.InvalidValue( 

202 "{0!r} could not be converted to unicode".format(value) 

203 ) 

204 

205 

206def update_query(url, params, remove=None): 

207 """Updates a URL's query parameters. 

208 

209 Replaces any current values if they are already present in the URL. 

210 

211 Args: 

212 url (str): The URL to update. 

213 params (Mapping[str, str]): A mapping of query parameter 

214 keys to values. 

215 remove (Sequence[str]): Parameters to remove from the query string. 

216 

217 Returns: 

218 str: The URL with updated query parameters. 

219 

220 Examples: 

221 

222 >>> url = 'http://example.com?a=1' 

223 >>> update_query(url, {'a': '2'}) 

224 http://example.com?a=2 

225 >>> update_query(url, {'b': '3'}) 

226 http://example.com?a=1&b=3 

227 >> update_query(url, {'b': '3'}, remove=['a']) 

228 http://example.com?b=3 

229 

230 """ 

231 if remove is None: 

232 remove = [] 

233 

234 # Split the URL into parts. 

235 parts = urllib.parse.urlparse(url) 

236 # Parse the query string. 

237 query_params = urllib.parse.parse_qs(parts.query) 

238 # Update the query parameters with the new parameters. 

239 query_params.update(params) 

240 # Remove any values specified in remove. 

241 query_params = { 

242 key: value for key, value in query_params.items() if key not in remove 

243 } 

244 # Re-encoded the query string. 

245 new_query = urllib.parse.urlencode(query_params, doseq=True) 

246 # Unsplit the url. 

247 new_parts = parts._replace(query=new_query) 

248 return urllib.parse.urlunparse(new_parts) 

249 

250 

251def scopes_to_string(scopes): 

252 """Converts scope value to a string suitable for sending to OAuth 2.0 

253 authorization servers. 

254 

255 Args: 

256 scopes (Sequence[str]): The sequence of scopes to convert. 

257 

258 Returns: 

259 str: The scopes formatted as a single string. 

260 """ 

261 return " ".join(scopes) 

262 

263 

264def string_to_scopes(scopes): 

265 """Converts stringifed scopes value to a list. 

266 

267 Args: 

268 scopes (Union[Sequence, str]): The string of space-separated scopes 

269 to convert. 

270 Returns: 

271 Sequence(str): The separated scopes. 

272 """ 

273 if not scopes: 

274 return [] 

275 

276 return scopes.split(" ") 

277 

278 

279def padded_urlsafe_b64decode(value): 

280 """Decodes base64 strings lacking padding characters. 

281 

282 Google infrastructure tends to omit the base64 padding characters. 

283 

284 Args: 

285 value (Union[str, bytes]): The encoded value. 

286 

287 Returns: 

288 bytes: The decoded value 

289 """ 

290 b64string = to_bytes(value) 

291 padded = b64string + b"=" * (-len(b64string) % 4) 

292 return base64.urlsafe_b64decode(padded) 

293 

294 

295def unpadded_urlsafe_b64encode(value): 

296 """Encodes base64 strings removing any padding characters. 

297 

298 `rfc 7515`_ defines Base64url to NOT include any padding 

299 characters, but the stdlib doesn't do that by default. 

300 

301 _rfc7515: https://tools.ietf.org/html/rfc7515#page-6 

302 

303 Args: 

304 value (Union[str|bytes]): The bytes-like value to encode 

305 

306 Returns: 

307 Union[str|bytes]: The encoded value 

308 """ 

309 return base64.urlsafe_b64encode(value).rstrip(b"=") 

310 

311 

312def is_python_3(): 

313 """Check if the Python interpreter is Python 2 or 3. 

314 

315 Returns: 

316 bool: True if the Python interpreter is Python 3 and False otherwise. 

317 """ 

318 

319 return sys.version_info > (3, 0) # pragma: NO COVER 

320 

321 

322def _hash_sensitive_info(data: Union[dict, list]) -> Union[dict, list, str]: 

323 """ 

324 Hashes sensitive information within a dictionary. 

325 

326 Args: 

327 data: The dictionary containing data to be processed. 

328 

329 Returns: 

330 A new dictionary with sensitive values replaced by their SHA512 hashes. 

331 If the input is a list, returns a list with each element recursively processed. 

332 If the input is neither a dict nor a list, returns the type of the input as a string. 

333 

334 """ 

335 if isinstance(data, dict): 

336 hashed_data: Dict[Any, Union[Optional[str], dict, list]] = {} 

337 for key, value in data.items(): 

338 if key in _SENSITIVE_FIELDS and not isinstance(value, (dict, list)): 

339 hashed_data[key] = _hash_value(value, key) 

340 elif isinstance(value, (dict, list)): 

341 hashed_data[key] = _hash_sensitive_info(value) 

342 else: 

343 hashed_data[key] = value 

344 return hashed_data 

345 elif isinstance(data, list): 

346 hashed_list = [] 

347 for val in data: 

348 hashed_list.append(_hash_sensitive_info(val)) 

349 return hashed_list 

350 else: 

351 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1701): 

352 # Investigate and hash sensitive info before logging when the data type is 

353 # not a dict or a list. 

354 return str(type(data)) 

355 

356 

357def _hash_value(value, field_name: str) -> Optional[str]: 

358 """Hashes a value and returns a formatted hash string.""" 

359 if value is None: 

360 return None 

361 encoded_value = str(value).encode("utf-8") 

362 hash_object = hashlib.sha512() 

363 hash_object.update(encoded_value) 

364 hex_digest = hash_object.hexdigest() 

365 return f"hashed_{field_name}-{hex_digest}" 

366 

367 

368def _logger_configured(logger: logging.Logger) -> bool: 

369 """Determines whether `logger` has non-default configuration 

370 

371 Args: 

372 logger: The logger to check. 

373 

374 Returns: 

375 bool: Whether the logger has any non-default configuration. 

376 """ 

377 return ( 

378 logger.handlers != [] or logger.level != logging.NOTSET or not logger.propagate 

379 ) 

380 

381 

382def is_logging_enabled(logger: logging.Logger) -> bool: 

383 """ 

384 Checks if debug logging is enabled for the given logger. 

385 

386 Args: 

387 logger: The logging.Logger instance to check. 

388 

389 Returns: 

390 True if debug logging is enabled, False otherwise. 

391 """ 

392 # NOTE: Log propagation to the root logger is disabled unless 

393 # the base logger i.e. logging.getLogger("google") is 

394 # explicitly configured by the end user. Ideally this 

395 # needs to happen in the client layer (already does for GAPICs). 

396 # However, this is implemented here to avoid logging 

397 # (if a root logger is configured) when a version of google-auth 

398 # which supports logging is used with: 

399 # - an older version of a GAPIC which does not support logging. 

400 # - Apiary client which does not support logging. 

401 global _LOGGING_INITIALIZED 

402 if not _LOGGING_INITIALIZED: 

403 base_logger = logging.getLogger(_BASE_LOGGER_NAME) 

404 if not _logger_configured(base_logger): 

405 base_logger.propagate = False 

406 _LOGGING_INITIALIZED = True 

407 

408 return logger.isEnabledFor(logging.DEBUG) 

409 

410 

411def request_log( 

412 logger: logging.Logger, 

413 method: str, 

414 url: str, 

415 body: Optional[bytes], 

416 headers: Optional[Mapping[str, str]], 

417) -> None: 

418 """ 

419 Logs an HTTP request at the DEBUG level if logging is enabled. 

420 

421 Args: 

422 logger: The logging.Logger instance to use. 

423 method: The HTTP method (e.g., "GET", "POST"). 

424 url: The URL of the request. 

425 body: The request body (can be None). 

426 headers: The request headers (can be None). 

427 """ 

428 if is_logging_enabled(logger): 

429 content_type = ( 

430 headers["Content-Type"] if headers and "Content-Type" in headers else "" 

431 ) 

432 json_body = _parse_request_body(body, content_type=content_type) 

433 logged_body = _hash_sensitive_info(json_body) 

434 logger.debug( 

435 "Making request...", 

436 extra={ 

437 "httpRequest": { 

438 "method": method, 

439 "url": url, 

440 "body": logged_body, 

441 "headers": headers, 

442 } 

443 }, 

444 ) 

445 

446 

447def _parse_request_body(body: Optional[bytes], content_type: str = "") -> Any: 

448 """ 

449 Parses a request body, handling bytes and string types, and different content types. 

450 

451 Args: 

452 body (Optional[bytes]): The request body. 

453 content_type (str): The content type of the request body, e.g., "application/json", 

454 "application/x-www-form-urlencoded", or "text/plain". If empty, attempts 

455 to parse as JSON. 

456 

457 Returns: 

458 Parsed body (dict, str, or None). 

459 - JSON: Decodes if content_type is "application/json" or None (fallback). 

460 - URL-encoded: Parses if content_type is "application/x-www-form-urlencoded". 

461 - Plain text: Returns string if content_type is "text/plain". 

462 - None: Returns if body is None, UTF-8 decode fails, or content_type is unknown. 

463 """ 

464 if body is None: 

465 return None 

466 try: 

467 body_str = body.decode("utf-8") 

468 except (UnicodeDecodeError, AttributeError): 

469 return None 

470 content_type = content_type.lower() 

471 if not content_type or "application/json" in content_type: 

472 try: 

473 return json.loads(body_str) 

474 except (TypeError, ValueError): 

475 return body_str 

476 if "application/x-www-form-urlencoded" in content_type: 

477 parsed_query = urllib.parse.parse_qs(body_str) 

478 result = {k: v[0] for k, v in parsed_query.items()} 

479 return result 

480 if "text/plain" in content_type: 

481 return body_str 

482 return None 

483 

484 

485def _parse_response(response: Any) -> Any: 

486 """ 

487 Parses a response, attempting to decode JSON. 

488 

489 Args: 

490 response: The response object to parse. This can be any type, but 

491 it is expected to have a `json()` method if it contains JSON. 

492 

493 Returns: 

494 The parsed response. If the response contains valid JSON, the 

495 decoded JSON object (e.g., a dictionary or list) is returned. 

496 If the response does not have a `json()` method or if the JSON 

497 decoding fails, None is returned. 

498 """ 

499 try: 

500 json_response = response.json() 

501 return json_response 

502 except Exception: 

503 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1744): 

504 # Parse and return response payload as json based on different content types. 

505 return None 

506 

507 

508def _response_log_base(logger: logging.Logger, parsed_response: Any) -> None: 

509 """ 

510 Logs a parsed HTTP response at the DEBUG level. 

511 

512 This internal helper function takes a parsed response and logs it 

513 using the provided logger. It also applies a hashing function to 

514 potentially sensitive information before logging. 

515 

516 Args: 

517 logger: The logging.Logger instance to use for logging. 

518 parsed_response: The parsed HTTP response object (e.g., a dictionary, 

519 list, or the original response if parsing failed). 

520 """ 

521 

522 logged_response = _hash_sensitive_info(parsed_response) 

523 logger.debug("Response received...", extra={"httpResponse": logged_response}) 

524 

525 

526def response_log(logger: logging.Logger, response: Any) -> None: 

527 """ 

528 Logs an HTTP response at the DEBUG level if logging is enabled. 

529 

530 Args: 

531 logger: The logging.Logger instance to use. 

532 response: The HTTP response object to log. 

533 """ 

534 if is_logging_enabled(logger): 

535 json_response = _parse_response(response) 

536 _response_log_base(logger, json_response)