Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/_helpers.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

141 statements  

1# Copyright 2015 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helper functions for commonly used utilities.""" 

16 

17import base64 

18import calendar 

19import datetime 

20from email.message import Message 

21import hashlib 

22import json 

23import logging 

24import sys 

25from typing import Any, Dict, Mapping, Optional, Union 

26import urllib 

27 

28from google.auth import exceptions 

29 

30 

31# _BASE_LOGGER_NAME is the base logger for all google-based loggers. 

32_BASE_LOGGER_NAME = "google" 

33 

34# _LOGGING_INITIALIZED ensures that base logger is only configured once 

35# (unless already configured by the end-user). 

36_LOGGING_INITIALIZED = False 

37 

38 

39# The smallest MDS cache used by this library stores tokens until 4 minutes from 

40# expiry. 

41REFRESH_THRESHOLD = datetime.timedelta(minutes=3, seconds=45) 

42 

43# TODO(https://github.com/googleapis/google-auth-library-python/issues/1684): Audit and update the list below. 

44_SENSITIVE_FIELDS = { 

45 "accessToken", 

46 "access_token", 

47 "id_token", 

48 "client_id", 

49 "refresh_token", 

50 "client_secret", 

51} 

52 

53 

54def copy_docstring(source_class): 

55 """Decorator that copies a method's docstring from another class. 

56 

57 Args: 

58 source_class (type): The class that has the documented method. 

59 

60 Returns: 

61 Callable: A decorator that will copy the docstring of the same 

62 named method in the source class to the decorated method. 

63 """ 

64 

65 def decorator(method): 

66 """Decorator implementation. 

67 

68 Args: 

69 method (Callable): The method to copy the docstring to. 

70 

71 Returns: 

72 Callable: the same method passed in with an updated docstring. 

73 

74 Raises: 

75 google.auth.exceptions.InvalidOperation: if the method already has a docstring. 

76 """ 

77 if method.__doc__: 

78 raise exceptions.InvalidOperation("Method already has a docstring.") 

79 

80 source_method = getattr(source_class, method.__name__) 

81 method.__doc__ = source_method.__doc__ 

82 

83 return method 

84 

85 return decorator 

86 

87 

88def parse_content_type(header_value): 

89 """Parse a 'content-type' header value to get just the plain media-type (without parameters). 

90 

91 This is done using the class Message from email.message as suggested in PEP 594 

92 (because the cgi is now deprecated and will be removed in python 3.13, 

93 see https://peps.python.org/pep-0594/#cgi). 

94 

95 Args: 

96 header_value (str): The value of a 'content-type' header as a string. 

97 

98 Returns: 

99 str: A string with just the lowercase media-type from the parsed 'content-type' header. 

100 If the provided content-type is not parsable, returns 'text/plain', 

101 the default value for textual files. 

102 """ 

103 m = Message() 

104 m["content-type"] = header_value 

105 return ( 

106 m.get_content_type() 

107 ) # Despite the name, actually returns just the media-type 

108 

109 

110def utcnow(): 

111 """Returns the current UTC datetime. 

112 

113 Returns: 

114 datetime: The current time in UTC. 

115 """ 

116 # We used datetime.utcnow() before, since it's deprecated from python 3.12, 

117 # we are using datetime.now(timezone.utc) now. "utcnow()" is offset-native 

118 # (no timezone info), but "now()" is offset-aware (with timezone info). 

119 # This will cause datetime comparison problem. For backward compatibility, 

120 # we need to remove the timezone info. 

121 now = datetime.datetime.now(datetime.timezone.utc) 

122 now = now.replace(tzinfo=None) 

123 return now 

124 

125 

126def datetime_to_secs(value): 

127 """Convert a datetime object to the number of seconds since the UNIX epoch. 

128 

129 Args: 

130 value (datetime): The datetime to convert. 

131 

132 Returns: 

133 int: The number of seconds since the UNIX epoch. 

134 """ 

135 return calendar.timegm(value.utctimetuple()) 

136 

137 

138def to_bytes(value, encoding="utf-8"): 

139 """Converts a string value to bytes, if necessary. 

140 

141 Args: 

142 value (Union[str, bytes]): The value to be converted. 

143 encoding (str): The encoding to use to convert unicode to bytes. 

144 Defaults to "utf-8". 

145 

146 Returns: 

147 bytes: The original value converted to bytes (if unicode) or as 

148 passed in if it started out as bytes. 

149 

150 Raises: 

151 google.auth.exceptions.InvalidValue: If the value could not be converted to bytes. 

152 """ 

153 result = value.encode(encoding) if isinstance(value, str) else value 

154 if isinstance(result, bytes): 

155 return result 

156 else: 

157 raise exceptions.InvalidValue( 

158 "{0!r} could not be converted to bytes".format(value) 

159 ) 

160 

161 

162def from_bytes(value): 

163 """Converts bytes to a string value, if necessary. 

164 

165 Args: 

166 value (Union[str, bytes]): The value to be converted. 

167 

168 Returns: 

169 str: The original value converted to unicode (if bytes) or as passed in 

170 if it started out as unicode. 

171 

172 Raises: 

173 google.auth.exceptions.InvalidValue: If the value could not be converted to unicode. 

174 """ 

175 result = value.decode("utf-8") if isinstance(value, bytes) else value 

176 if isinstance(result, str): 

177 return result 

178 else: 

179 raise exceptions.InvalidValue( 

180 "{0!r} could not be converted to unicode".format(value) 

181 ) 

182 

183 

184def update_query(url, params, remove=None): 

185 """Updates a URL's query parameters. 

186 

187 Replaces any current values if they are already present in the URL. 

188 

189 Args: 

190 url (str): The URL to update. 

191 params (Mapping[str, str]): A mapping of query parameter 

192 keys to values. 

193 remove (Sequence[str]): Parameters to remove from the query string. 

194 

195 Returns: 

196 str: The URL with updated query parameters. 

197 

198 Examples: 

199 

200 >>> url = 'http://example.com?a=1' 

201 >>> update_query(url, {'a': '2'}) 

202 http://example.com?a=2 

203 >>> update_query(url, {'b': '3'}) 

204 http://example.com?a=1&b=3 

205 >> update_query(url, {'b': '3'}, remove=['a']) 

206 http://example.com?b=3 

207 

208 """ 

209 if remove is None: 

210 remove = [] 

211 

212 # Split the URL into parts. 

213 parts = urllib.parse.urlparse(url) 

214 # Parse the query string. 

215 query_params = urllib.parse.parse_qs(parts.query) 

216 # Update the query parameters with the new parameters. 

217 query_params.update(params) 

218 # Remove any values specified in remove. 

219 query_params = { 

220 key: value for key, value in query_params.items() if key not in remove 

221 } 

222 # Re-encoded the query string. 

223 new_query = urllib.parse.urlencode(query_params, doseq=True) 

224 # Unsplit the url. 

225 new_parts = parts._replace(query=new_query) 

226 return urllib.parse.urlunparse(new_parts) 

227 

228 

229def scopes_to_string(scopes): 

230 """Converts scope value to a string suitable for sending to OAuth 2.0 

231 authorization servers. 

232 

233 Args: 

234 scopes (Sequence[str]): The sequence of scopes to convert. 

235 

236 Returns: 

237 str: The scopes formatted as a single string. 

238 """ 

239 return " ".join(scopes) 

240 

241 

242def string_to_scopes(scopes): 

243 """Converts stringifed scopes value to a list. 

244 

245 Args: 

246 scopes (Union[Sequence, str]): The string of space-separated scopes 

247 to convert. 

248 Returns: 

249 Sequence(str): The separated scopes. 

250 """ 

251 if not scopes: 

252 return [] 

253 

254 return scopes.split(" ") 

255 

256 

257def padded_urlsafe_b64decode(value): 

258 """Decodes base64 strings lacking padding characters. 

259 

260 Google infrastructure tends to omit the base64 padding characters. 

261 

262 Args: 

263 value (Union[str, bytes]): The encoded value. 

264 

265 Returns: 

266 bytes: The decoded value 

267 """ 

268 b64string = to_bytes(value) 

269 padded = b64string + b"=" * (-len(b64string) % 4) 

270 return base64.urlsafe_b64decode(padded) 

271 

272 

273def unpadded_urlsafe_b64encode(value): 

274 """Encodes base64 strings removing any padding characters. 

275 

276 `rfc 7515`_ defines Base64url to NOT include any padding 

277 characters, but the stdlib doesn't do that by default. 

278 

279 _rfc7515: https://tools.ietf.org/html/rfc7515#page-6 

280 

281 Args: 

282 value (Union[str|bytes]): The bytes-like value to encode 

283 

284 Returns: 

285 Union[str|bytes]: The encoded value 

286 """ 

287 return base64.urlsafe_b64encode(value).rstrip(b"=") 

288 

289 

290def is_python_3(): 

291 """Check if the Python interpreter is Python 2 or 3. 

292 

293 Returns: 

294 bool: True if the Python interpreter is Python 3 and False otherwise. 

295 """ 

296 return sys.version_info > (3, 0) 

297 

298 

299def _hash_sensitive_info(data: Union[dict, list]) -> Union[dict, list, str]: 

300 """ 

301 Hashes sensitive information within a dictionary. 

302 

303 Args: 

304 data: The dictionary containing data to be processed. 

305 

306 Returns: 

307 A new dictionary with sensitive values replaced by their SHA512 hashes. 

308 If the input is a list, returns a list with each element recursively processed. 

309 If the input is neither a dict nor a list, returns the type of the input as a string. 

310 

311 """ 

312 if isinstance(data, dict): 

313 hashed_data: Dict[Any, Union[Optional[str], dict, list]] = {} 

314 for key, value in data.items(): 

315 if key in _SENSITIVE_FIELDS and not isinstance(value, (dict, list)): 

316 hashed_data[key] = _hash_value(value, key) 

317 elif isinstance(value, (dict, list)): 

318 hashed_data[key] = _hash_sensitive_info(value) 

319 else: 

320 hashed_data[key] = value 

321 return hashed_data 

322 elif isinstance(data, list): 

323 hashed_list = [] 

324 for val in data: 

325 hashed_list.append(_hash_sensitive_info(val)) 

326 return hashed_list 

327 else: 

328 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1701): 

329 # Investigate and hash sensitive info before logging when the data type is 

330 # not a dict or a list. 

331 return str(type(data)) 

332 

333 

334def _hash_value(value, field_name: str) -> Optional[str]: 

335 """Hashes a value and returns a formatted hash string.""" 

336 if value is None: 

337 return None 

338 encoded_value = str(value).encode("utf-8") 

339 hash_object = hashlib.sha512() 

340 hash_object.update(encoded_value) 

341 hex_digest = hash_object.hexdigest() 

342 return f"hashed_{field_name}-{hex_digest}" 

343 

344 

345def _logger_configured(logger: logging.Logger) -> bool: 

346 """Determines whether `logger` has non-default configuration 

347 

348 Args: 

349 logger: The logger to check. 

350 

351 Returns: 

352 bool: Whether the logger has any non-default configuration. 

353 """ 

354 return ( 

355 logger.handlers != [] or logger.level != logging.NOTSET or not logger.propagate 

356 ) 

357 

358 

359def is_logging_enabled(logger: logging.Logger) -> bool: 

360 """ 

361 Checks if debug logging is enabled for the given logger. 

362 

363 Args: 

364 logger: The logging.Logger instance to check. 

365 

366 Returns: 

367 True if debug logging is enabled, False otherwise. 

368 """ 

369 # NOTE: Log propagation to the root logger is disabled unless 

370 # the base logger i.e. logging.getLogger("google") is 

371 # explicitly configured by the end user. Ideally this 

372 # needs to happen in the client layer (already does for GAPICs). 

373 # However, this is implemented here to avoid logging 

374 # (if a root logger is configured) when a version of google-auth 

375 # which supports logging is used with: 

376 # - an older version of a GAPIC which does not support logging. 

377 # - Apiary client which does not support logging. 

378 global _LOGGING_INITIALIZED 

379 if not _LOGGING_INITIALIZED: 

380 base_logger = logging.getLogger(_BASE_LOGGER_NAME) 

381 if not _logger_configured(base_logger): 

382 base_logger.propagate = False 

383 _LOGGING_INITIALIZED = True 

384 

385 return logger.isEnabledFor(logging.DEBUG) 

386 

387 

388def request_log( 

389 logger: logging.Logger, 

390 method: str, 

391 url: str, 

392 body: Optional[bytes], 

393 headers: Optional[Mapping[str, str]], 

394) -> None: 

395 """ 

396 Logs an HTTP request at the DEBUG level if logging is enabled. 

397 

398 Args: 

399 logger: The logging.Logger instance to use. 

400 method: The HTTP method (e.g., "GET", "POST"). 

401 url: The URL of the request. 

402 body: The request body (can be None). 

403 headers: The request headers (can be None). 

404 """ 

405 if is_logging_enabled(logger): 

406 content_type = ( 

407 headers["Content-Type"] if headers and "Content-Type" in headers else "" 

408 ) 

409 json_body = _parse_request_body(body, content_type=content_type) 

410 logged_body = _hash_sensitive_info(json_body) 

411 logger.debug( 

412 "Making request...", 

413 extra={ 

414 "httpRequest": { 

415 "method": method, 

416 "url": url, 

417 "body": logged_body, 

418 "headers": headers, 

419 } 

420 }, 

421 ) 

422 

423 

424def _parse_request_body(body: Optional[bytes], content_type: str = "") -> Any: 

425 """ 

426 Parses a request body, handling bytes and string types, and different content types. 

427 

428 Args: 

429 body (Optional[bytes]): The request body. 

430 content_type (str): The content type of the request body, e.g., "application/json", 

431 "application/x-www-form-urlencoded", or "text/plain". If empty, attempts 

432 to parse as JSON. 

433 

434 Returns: 

435 Parsed body (dict, str, or None). 

436 - JSON: Decodes if content_type is "application/json" or None (fallback). 

437 - URL-encoded: Parses if content_type is "application/x-www-form-urlencoded". 

438 - Plain text: Returns string if content_type is "text/plain". 

439 - None: Returns if body is None, UTF-8 decode fails, or content_type is unknown. 

440 """ 

441 if body is None: 

442 return None 

443 try: 

444 body_str = body.decode("utf-8") 

445 except (UnicodeDecodeError, AttributeError): 

446 return None 

447 content_type = content_type.lower() 

448 if not content_type or "application/json" in content_type: 

449 try: 

450 return json.loads(body_str) 

451 except (json.JSONDecodeError, TypeError): 

452 return body_str 

453 if "application/x-www-form-urlencoded" in content_type: 

454 parsed_query = urllib.parse.parse_qs(body_str) 

455 result = {k: v[0] for k, v in parsed_query.items()} 

456 return result 

457 if "text/plain" in content_type: 

458 return body_str 

459 return None 

460 

461 

462def _parse_response(response: Any) -> Any: 

463 """ 

464 Parses a response, attempting to decode JSON. 

465 

466 Args: 

467 response: The response object to parse. This can be any type, but 

468 it is expected to have a `json()` method if it contains JSON. 

469 

470 Returns: 

471 The parsed response. If the response contains valid JSON, the 

472 decoded JSON object (e.g., a dictionary or list) is returned. 

473 If the response does not have a `json()` method or if the JSON 

474 decoding fails, None is returned. 

475 """ 

476 try: 

477 json_response = response.json() 

478 return json_response 

479 except Exception: 

480 # TODO(https://github.com/googleapis/google-auth-library-python/issues/1744): 

481 # Parse and return response payload as json based on different content types. 

482 return None 

483 

484 

485def _response_log_base(logger: logging.Logger, parsed_response: Any) -> None: 

486 """ 

487 Logs a parsed HTTP response at the DEBUG level. 

488 

489 This internal helper function takes a parsed response and logs it 

490 using the provided logger. It also applies a hashing function to 

491 potentially sensitive information before logging. 

492 

493 Args: 

494 logger: The logging.Logger instance to use for logging. 

495 parsed_response: The parsed HTTP response object (e.g., a dictionary, 

496 list, or the original response if parsing failed). 

497 """ 

498 

499 logged_response = _hash_sensitive_info(parsed_response) 

500 logger.debug("Response received...", extra={"httpResponse": logged_response}) 

501 

502 

503def response_log(logger: logging.Logger, response: Any) -> None: 

504 """ 

505 Logs an HTTP response at the DEBUG level if logging is enabled. 

506 

507 Args: 

508 logger: The logging.Logger instance to use. 

509 response: The HTTP response object to log. 

510 """ 

511 if is_logging_enabled(logger): 

512 json_response = _parse_response(response) 

513 _response_log_base(logger, json_response)