Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/credentials.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1188 statements  

1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ 

2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"). You 

5# may not use this file except in compliance with the License. A copy of 

6# the License is located at 

7# 

8# http://aws.amazon.com/apache2.0/ 

9# 

10# or in the "license" file accompanying this file. This file is 

11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

12# ANY KIND, either express or implied. See the License for the specific 

13# language governing permissions and limitations under the License. 

14import base64 

15import datetime 

16import getpass 

17import json 

18import logging 

19import os 

20import subprocess 

21import threading 

22import time 

23import uuid 

24from collections import namedtuple 

25from copy import deepcopy 

26from hashlib import sha1, sha256 

27 

28import dateutil.parser 

29from dateutil.parser import parse 

30from dateutil.tz import tzlocal, tzutc 

31 

32import botocore.compat 

33import botocore.configloader 

34from botocore import UNSIGNED 

35from botocore.compat import ( 

36 EC, 

37 compat_shell_split, 

38 total_seconds, 

39) 

40from botocore.config import Config 

41from botocore.exceptions import ( 

42 ConfigNotFound, 

43 CredentialRetrievalError, 

44 InfiniteLoopConfigError, 

45 InvalidConfigError, 

46 LoginError, 

47 LoginInsufficientPermissions, 

48 LoginRefreshRequired, 

49 LoginTokenLoadError, 

50 MetadataRetrievalError, 

51 MissingDependencyException, 

52 PartialCredentialsError, 

53 RefreshWithMFAUnsupportedError, 

54 UnauthorizedSSOTokenError, 

55 UnknownCredentialError, 

56) 

57from botocore.tokens import SSOTokenProvider 

58from botocore.useragent import register_feature_id, register_feature_ids 

59from botocore.utils import ( 

60 ArnParser, 

61 ContainerMetadataFetcher, 

62 FileWebIdentityTokenLoader, 

63 InstanceMetadataFetcher, 

64 JSONFileCache, 

65 LoginTokenLoader, 

66 SSOTokenLoader, 

67 create_nested_client, 

68 get_login_token_cache_directory, 

69 parse_key_val_file, 

70 resolve_imds_endpoint_mode, 

71) 

72 

73logger = logging.getLogger(__name__) 

74ReadOnlyCredentials = namedtuple( 

75 'ReadOnlyCredentials', 

76 ['access_key', 'secret_key', 'token', 'account_id'], 

77 defaults=(None,), 

78) 

79 

80_DEFAULT_MANDATORY_REFRESH_TIMEOUT = 10 * 60 # 10 min 

81_DEFAULT_ADVISORY_REFRESH_TIMEOUT = 15 * 60 # 15 min 

82 

83 

84def create_credential_resolver(session, cache=None, region_name=None): 

85 """Create a default credential resolver. 

86 

87 This creates a pre-configured credential resolver 

88 that includes the default lookup chain for 

89 credentials. 

90 

91 """ 

92 profile_name = session.get_config_variable('profile') or 'default' 

93 metadata_timeout = session.get_config_variable('metadata_service_timeout') 

94 num_attempts = session.get_config_variable('metadata_service_num_attempts') 

95 disable_env_vars = session.instance_variables().get('profile') is not None 

96 

97 imds_config = { 

98 'ec2_metadata_service_endpoint': session.get_config_variable( 

99 'ec2_metadata_service_endpoint' 

100 ), 

101 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode( 

102 session 

103 ), 

104 'ec2_credential_refresh_window': _DEFAULT_ADVISORY_REFRESH_TIMEOUT, 

105 'ec2_metadata_v1_disabled': session.get_config_variable( 

106 'ec2_metadata_v1_disabled' 

107 ), 

108 } 

109 

110 if cache is None: 

111 cache = {} 

112 

113 env_provider = EnvProvider() 

114 container_provider = ContainerProvider() 

115 instance_metadata_provider = InstanceMetadataProvider( 

116 iam_role_fetcher=InstanceMetadataFetcher( 

117 timeout=metadata_timeout, 

118 num_attempts=num_attempts, 

119 user_agent=session.user_agent(), 

120 config=imds_config, 

121 ) 

122 ) 

123 

124 profile_provider_builder = ProfileProviderBuilder( 

125 session, cache=cache, region_name=region_name 

126 ) 

127 assume_role_provider = AssumeRoleProvider( 

128 load_config=lambda: session.full_config, 

129 client_creator=_get_client_creator(session, region_name), 

130 cache=cache, 

131 profile_name=profile_name, 

132 credential_sourcer=CanonicalNameCredentialSourcer( 

133 [env_provider, container_provider, instance_metadata_provider] 

134 ), 

135 profile_provider_builder=profile_provider_builder, 

136 ) 

137 

138 pre_profile = [ 

139 env_provider, 

140 assume_role_provider, 

141 ] 

142 profile_providers = profile_provider_builder.providers( 

143 profile_name=profile_name, 

144 disable_env_vars=disable_env_vars, 

145 ) 

146 post_profile = [ 

147 OriginalEC2Provider(), 

148 BotoProvider(), 

149 container_provider, 

150 instance_metadata_provider, 

151 ] 

152 providers = pre_profile + profile_providers + post_profile 

153 

154 if disable_env_vars: 

155 # An explicitly provided profile will negate an EnvProvider. 

156 # We will defer to providers that understand the "profile" 

157 # concept to retrieve credentials. 

158 # The one edge case if is all three values are provided via 

159 # env vars: 

160 # export AWS_ACCESS_KEY_ID=foo 

161 # export AWS_SECRET_ACCESS_KEY=bar 

162 # export AWS_PROFILE=baz 

163 # Then, just like our client() calls, the explicit credentials 

164 # will take precedence. 

165 # 

166 # This precedence is enforced by leaving the EnvProvider in the chain. 

167 # This means that the only way a "profile" would win is if the 

168 # EnvProvider does not return credentials, which is what we want 

169 # in this scenario. 

170 providers.remove(env_provider) 

171 logger.debug( 

172 'Skipping environment variable credential check' 

173 ' because profile name was explicitly set.' 

174 ) 

175 

176 resolver = CredentialResolver(providers=providers) 

177 return resolver 

178 

179 

180class ProfileProviderBuilder: 

181 """This class handles the creation of profile based providers. 

182 

183 NOTE: This class is only intended for internal use. 

184 

185 This class handles the creation and ordering of the various credential 

186 providers that primarly source their configuration from the shared config. 

187 This is needed to enable sharing between the default credential chain and 

188 the source profile chain created by the assume role provider. 

189 """ 

190 

191 def __init__( 

192 self, 

193 session, 

194 cache=None, 

195 region_name=None, 

196 sso_token_cache=None, 

197 login_token_cache=None, 

198 ): 

199 self._session = session 

200 self._cache = cache 

201 self._region_name = region_name 

202 self._sso_token_cache = sso_token_cache 

203 self._login_token_cache = login_token_cache 

204 

205 def providers(self, profile_name, disable_env_vars=False): 

206 return [ 

207 self._create_web_identity_provider( 

208 profile_name, 

209 disable_env_vars, 

210 ), 

211 self._create_sso_provider(profile_name), 

212 self._create_shared_credential_provider(profile_name), 

213 self._create_login_provider(profile_name), 

214 self._create_process_provider(profile_name), 

215 self._create_config_provider(profile_name), 

216 ] 

217 

218 def _create_process_provider(self, profile_name): 

219 return ProcessProvider( 

220 profile_name=profile_name, 

221 load_config=lambda: self._session.full_config, 

222 ) 

223 

224 def _create_shared_credential_provider(self, profile_name): 

225 credential_file = self._session.get_config_variable('credentials_file') 

226 return SharedCredentialProvider( 

227 profile_name=profile_name, 

228 creds_filename=credential_file, 

229 ) 

230 

231 def _create_config_provider(self, profile_name): 

232 config_file = self._session.get_config_variable('config_file') 

233 return ConfigProvider( 

234 profile_name=profile_name, 

235 config_filename=config_file, 

236 ) 

237 

238 def _create_web_identity_provider(self, profile_name, disable_env_vars): 

239 return AssumeRoleWithWebIdentityProvider( 

240 load_config=lambda: self._session.full_config, 

241 client_creator=_get_client_creator( 

242 self._session, self._region_name 

243 ), 

244 cache=self._cache, 

245 profile_name=profile_name, 

246 disable_env_vars=disable_env_vars, 

247 ) 

248 

249 def _create_sso_provider(self, profile_name): 

250 return SSOProvider( 

251 load_config=lambda: self._session.full_config, 

252 client_creator=self._session.create_client, 

253 profile_name=profile_name, 

254 cache=self._cache, 

255 token_cache=self._sso_token_cache, 

256 token_provider=SSOTokenProvider( 

257 self._session, 

258 cache=self._sso_token_cache, 

259 profile_name=profile_name, 

260 ), 

261 ) 

262 

263 def _create_login_provider(self, profile_name): 

264 return LoginProvider( 

265 load_config=lambda: self._session.full_config, 

266 client_creator=self._session.create_client, 

267 profile_name=profile_name, 

268 token_cache=self._login_token_cache, 

269 ) 

270 

271 

272def get_credentials(session): 

273 resolver = create_credential_resolver(session) 

274 return resolver.load_credentials() 

275 

276 

277def _local_now(): 

278 return datetime.datetime.now(tzlocal()) 

279 

280 

281def _parse_if_needed(value): 

282 if isinstance(value, datetime.datetime): 

283 return value 

284 return parse(value) 

285 

286 

287def _serialize_if_needed(value, iso=False): 

288 if isinstance(value, datetime.datetime): 

289 if iso: 

290 return value.isoformat() 

291 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

292 return value 

293 

294 

295def _get_client_creator(session, region_name): 

296 def client_creator(service_name, **kwargs): 

297 create_client_kwargs = {'region_name': region_name} 

298 create_client_kwargs.update(**kwargs) 

299 return create_nested_client( 

300 session, service_name, **create_client_kwargs 

301 ) 

302 

303 return client_creator 

304 

305 

306def create_assume_role_refresher(client, params): 

307 def refresh(): 

308 response = client.assume_role(**params) 

309 credentials = response['Credentials'] 

310 # We need to normalize the credential names to 

311 # the values expected by the refresh creds. 

312 return { 

313 'access_key': credentials['AccessKeyId'], 

314 'secret_key': credentials['SecretAccessKey'], 

315 'token': credentials['SessionToken'], 

316 'expiry_time': _serialize_if_needed(credentials['Expiration']), 

317 } 

318 

319 return refresh 

320 

321 

322def create_mfa_serial_refresher(actual_refresh): 

323 class _Refresher: 

324 def __init__(self, refresh): 

325 self._refresh = refresh 

326 self._has_been_called = False 

327 

328 def __call__(self): 

329 if self._has_been_called: 

330 # We can explore an option in the future to support 

331 # reprompting for MFA, but for now we just error out 

332 # when the temp creds expire. 

333 raise RefreshWithMFAUnsupportedError() 

334 self._has_been_called = True 

335 return self._refresh() 

336 

337 return _Refresher(actual_refresh) 

338 

339 

340class Credentials: 

341 """ 

342 Holds the credentials needed to authenticate requests. 

343 

344 :param str access_key: The access key part of the credentials. 

345 :param str secret_key: The secret key part of the credentials. 

346 :param str token: The security token, valid only for session credentials. 

347 :param str method: A string which identifies where the credentials 

348 were found. 

349 :param str account_id: (optional) An account ID associated with the credentials. 

350 """ 

351 

352 def __init__( 

353 self, access_key, secret_key, token=None, method=None, account_id=None 

354 ): 

355 self.access_key = access_key 

356 self.secret_key = secret_key 

357 self.token = token 

358 

359 if method is None: 

360 method = 'explicit' 

361 self.method = method 

362 self.account_id = account_id 

363 

364 self._normalize() 

365 

366 def _normalize(self): 

367 # Keys would sometimes (accidentally) contain non-ascii characters. 

368 # It would cause a confusing UnicodeDecodeError in Python 2. 

369 # We explicitly convert them into unicode to avoid such error. 

370 # 

371 # Eventually the service will decide whether to accept the credential. 

372 # This also complies with the behavior in Python 3. 

373 self.access_key = botocore.compat.ensure_unicode(self.access_key) 

374 self.secret_key = botocore.compat.ensure_unicode(self.secret_key) 

375 

376 def get_frozen_credentials(self): 

377 return ReadOnlyCredentials( 

378 self.access_key, self.secret_key, self.token, self.account_id 

379 ) 

380 

381 def get_deferred_property(self, property_name): 

382 def get_property(): 

383 return getattr(self, property_name, None) 

384 

385 return get_property 

386 

387 

388class RefreshableCredentials(Credentials): 

389 """ 

390 Holds the credentials needed to authenticate requests. In addition, it 

391 knows how to refresh itself. 

392 

393 :param str access_key: The access key part of the credentials. 

394 :param str secret_key: The secret key part of the credentials. 

395 :param str token: The security token, valid only for session credentials. 

396 :param datetime expiry_time: The expiration time of the credentials. 

397 :param function refresh_using: Callback function to refresh the credentials. 

398 :param str method: A string which identifies where the credentials 

399 were found. 

400 :param function time_fetcher: Callback function to retrieve current time. 

401 """ 

402 

403 # The time at which we'll attempt to refresh, but not 

404 # block if someone else is refreshing. 

405 _advisory_refresh_timeout = _DEFAULT_ADVISORY_REFRESH_TIMEOUT 

406 # The time at which all threads will block waiting for 

407 # refreshed credentials. 

408 _mandatory_refresh_timeout = _DEFAULT_MANDATORY_REFRESH_TIMEOUT 

409 

410 def __init__( 

411 self, 

412 access_key, 

413 secret_key, 

414 token, 

415 expiry_time, 

416 refresh_using, 

417 method, 

418 time_fetcher=_local_now, 

419 advisory_timeout=None, 

420 mandatory_timeout=None, 

421 account_id=None, 

422 ): 

423 self._refresh_using = refresh_using 

424 self._access_key = access_key 

425 self._secret_key = secret_key 

426 self._token = token 

427 self._account_id = account_id 

428 self._expiry_time = expiry_time 

429 self._time_fetcher = time_fetcher 

430 self._refresh_lock = threading.Lock() 

431 self.method = method 

432 self._frozen_credentials = ReadOnlyCredentials( 

433 access_key, secret_key, token, account_id 

434 ) 

435 self._normalize() 

436 if advisory_timeout is not None: 

437 self._advisory_refresh_timeout = advisory_timeout 

438 if mandatory_timeout is not None: 

439 self._mandatory_refresh_timeout = mandatory_timeout 

440 

441 def _normalize(self): 

442 self._access_key = botocore.compat.ensure_unicode(self._access_key) 

443 self._secret_key = botocore.compat.ensure_unicode(self._secret_key) 

444 

445 @classmethod 

446 def create_from_metadata( 

447 cls, 

448 metadata, 

449 refresh_using, 

450 method, 

451 advisory_timeout=None, 

452 mandatory_timeout=None, 

453 ): 

454 kwargs = {} 

455 if advisory_timeout is not None: 

456 kwargs['advisory_timeout'] = advisory_timeout 

457 if mandatory_timeout is not None: 

458 kwargs['mandatory_timeout'] = mandatory_timeout 

459 

460 instance = cls( 

461 access_key=metadata['access_key'], 

462 secret_key=metadata['secret_key'], 

463 token=metadata['token'], 

464 expiry_time=cls._expiry_datetime(metadata['expiry_time']), 

465 method=method, 

466 refresh_using=refresh_using, 

467 account_id=metadata.get('account_id'), 

468 **kwargs, 

469 ) 

470 return instance 

471 

472 @property 

473 def access_key(self): 

474 """Warning: Using this property can lead to race conditions if you 

475 access another property subsequently along the refresh boundary. 

476 Please use get_frozen_credentials instead. 

477 """ 

478 self._refresh() 

479 return self._access_key 

480 

481 @access_key.setter 

482 def access_key(self, value): 

483 self._access_key = value 

484 

485 @property 

486 def secret_key(self): 

487 """Warning: Using this property can lead to race conditions if you 

488 access another property subsequently along the refresh boundary. 

489 Please use get_frozen_credentials instead. 

490 """ 

491 self._refresh() 

492 return self._secret_key 

493 

494 @secret_key.setter 

495 def secret_key(self, value): 

496 self._secret_key = value 

497 

498 @property 

499 def token(self): 

500 """Warning: Using this property can lead to race conditions if you 

501 access another property subsequently along the refresh boundary. 

502 Please use get_frozen_credentials instead. 

503 """ 

504 self._refresh() 

505 return self._token 

506 

507 @token.setter 

508 def token(self, value): 

509 self._token = value 

510 

511 @property 

512 def account_id(self): 

513 """Warning: Using this property can lead to race conditions if you 

514 access another property subsequently along the refresh boundary. 

515 Please use get_frozen_credentials instead. 

516 """ 

517 self._refresh() 

518 return self._account_id 

519 

520 @account_id.setter 

521 def account_id(self, value): 

522 self._account_id = value 

523 

524 def _seconds_remaining(self): 

525 delta = self._expiry_time - self._time_fetcher() 

526 return total_seconds(delta) 

527 

528 def refresh_needed(self, refresh_in=None): 

529 """Check if a refresh is needed. 

530 

531 A refresh is needed if the expiry time associated 

532 with the temporary credentials is less than the 

533 provided ``refresh_in``. If ``time_delta`` is not 

534 provided, ``self.advisory_refresh_needed`` will be used. 

535 

536 For example, if your temporary credentials expire 

537 in 10 minutes and the provided ``refresh_in`` is 

538 ``15 * 60``, then this function will return ``True``. 

539 

540 :type refresh_in: int 

541 :param refresh_in: The number of seconds before the 

542 credentials expire in which refresh attempts should 

543 be made. 

544 

545 :return: True if refresh needed, False otherwise. 

546 

547 """ 

548 if self._expiry_time is None: 

549 # No expiration, so assume we don't need to refresh. 

550 return False 

551 

552 if refresh_in is None: 

553 refresh_in = self._advisory_refresh_timeout 

554 # The credentials should be refreshed if they're going to expire 

555 # in less than 5 minutes. 

556 if self._seconds_remaining() >= refresh_in: 

557 # There's enough time left. Don't refresh. 

558 return False 

559 logger.debug("Credentials need to be refreshed.") 

560 return True 

561 

562 def _is_expired(self): 

563 # Checks if the current credentials are expired. 

564 return self.refresh_needed(refresh_in=0) 

565 

566 def _refresh(self): 

567 # In the common case where we don't need a refresh, we 

568 # can immediately exit and not require acquiring the 

569 # refresh lock. 

570 if not self.refresh_needed(self._advisory_refresh_timeout): 

571 return 

572 

573 # acquire() doesn't accept kwargs, but False is indicating 

574 # that we should not block if we can't acquire the lock. 

575 # If we aren't able to acquire the lock, we'll trigger 

576 # the else clause. 

577 if self._refresh_lock.acquire(False): 

578 try: 

579 if not self.refresh_needed(self._advisory_refresh_timeout): 

580 return 

581 is_mandatory_refresh = self.refresh_needed( 

582 self._mandatory_refresh_timeout 

583 ) 

584 self._protected_refresh(is_mandatory=is_mandatory_refresh) 

585 return 

586 finally: 

587 self._refresh_lock.release() 

588 elif self.refresh_needed(self._mandatory_refresh_timeout): 

589 # If we're within the mandatory refresh window, 

590 # we must block until we get refreshed credentials. 

591 with self._refresh_lock: 

592 if not self.refresh_needed(self._mandatory_refresh_timeout): 

593 return 

594 self._protected_refresh(is_mandatory=True) 

595 

596 def _protected_refresh(self, is_mandatory): 

597 # precondition: this method should only be called if you've acquired 

598 # the self._refresh_lock. 

599 try: 

600 metadata = self._refresh_using() 

601 except Exception: 

602 period_name = 'mandatory' if is_mandatory else 'advisory' 

603 logger.warning( 

604 "Refreshing temporary credentials failed " 

605 "during %s refresh period.", 

606 period_name, 

607 exc_info=True, 

608 ) 

609 if is_mandatory: 

610 # If this is a mandatory refresh, then 

611 # all errors that occur when we attempt to refresh 

612 # credentials are propagated back to the user. 

613 raise 

614 # Otherwise we'll just return. 

615 # The end result will be that we'll use the current 

616 # set of temporary credentials we have. 

617 return 

618 self._set_from_data(metadata) 

619 self._frozen_credentials = ReadOnlyCredentials( 

620 self._access_key, self._secret_key, self._token, self._account_id 

621 ) 

622 if self._is_expired(): 

623 # We successfully refreshed credentials but for whatever 

624 # reason, our refreshing function returned credentials 

625 # that are still expired. In this scenario, the only 

626 # thing we can do is let the user know and raise 

627 # an exception. 

628 msg = ( 

629 "Credentials were refreshed, but the " 

630 "refreshed credentials are still expired." 

631 ) 

632 logger.warning(msg) 

633 raise RuntimeError(msg) 

634 

635 @staticmethod 

636 def _expiry_datetime(time_str): 

637 return parse(time_str) 

638 

639 def _set_from_data(self, data): 

640 expected_keys = ['access_key', 'secret_key', 'token', 'expiry_time'] 

641 if not data: 

642 missing_keys = expected_keys 

643 else: 

644 missing_keys = [k for k in expected_keys if k not in data] 

645 

646 if missing_keys: 

647 message = "Credential refresh failed, response did not contain: %s" 

648 raise CredentialRetrievalError( 

649 provider=self.method, 

650 error_msg=message % ', '.join(missing_keys), 

651 ) 

652 

653 self.access_key = data['access_key'] 

654 self.secret_key = data['secret_key'] 

655 self.token = data['token'] 

656 self._expiry_time = parse(data['expiry_time']) 

657 self.account_id = data.get('account_id') 

658 logger.debug( 

659 "Retrieved credentials will expire at: %s", self._expiry_time 

660 ) 

661 self._normalize() 

662 

663 def get_frozen_credentials(self): 

664 """Return immutable credentials. 

665 

666 The ``access_key``, ``secret_key``, and ``token`` properties 

667 on this class will always check and refresh credentials if 

668 needed before returning the particular credentials. 

669 

670 This has an edge case where you can get inconsistent 

671 credentials. Imagine this: 

672 

673 # Current creds are "t1" 

674 tmp.access_key ---> expired? no, so return t1.access_key 

675 # ---- time is now expired, creds need refreshing to "t2" ---- 

676 tmp.secret_key ---> expired? yes, refresh and return t2.secret_key 

677 

678 This means we're using the access key from t1 with the secret key 

679 from t2. To fix this issue, you can request a frozen credential object 

680 which is guaranteed not to change. 

681 

682 The frozen credentials returned from this method should be used 

683 immediately and then discarded. The typical usage pattern would 

684 be:: 

685 

686 creds = RefreshableCredentials(...) 

687 some_code = SomeSignerObject() 

688 # I'm about to sign the request. 

689 # The frozen credentials are only used for the 

690 # duration of generate_presigned_url and will be 

691 # immediately thrown away. 

692 request = some_code.sign_some_request( 

693 with_credentials=creds.get_frozen_credentials()) 

694 print("Signed request:", request) 

695 

696 """ 

697 self._refresh() 

698 return self._frozen_credentials 

699 

700 

701class DeferredRefreshableCredentials(RefreshableCredentials): 

702 """Refreshable credentials that don't require initial credentials. 

703 

704 refresh_using will be called upon first access. 

705 """ 

706 

707 def __init__(self, refresh_using, method, time_fetcher=_local_now): 

708 self._refresh_using = refresh_using 

709 self._access_key = None 

710 self._secret_key = None 

711 self._token = None 

712 self._account_id = None 

713 self._expiry_time = None 

714 self._time_fetcher = time_fetcher 

715 self._refresh_lock = threading.Lock() 

716 self.method = method 

717 self._frozen_credentials = None 

718 

719 def refresh_needed(self, refresh_in=None): 

720 if self._frozen_credentials is None: 

721 return True 

722 return super().refresh_needed(refresh_in) 

723 

724 

725class CachedCredentialFetcher: 

726 DEFAULT_EXPIRY_WINDOW_SECONDS = 60 * 15 

727 

728 def __init__(self, cache=None, expiry_window_seconds=None): 

729 if cache is None: 

730 cache = {} 

731 self._cache = cache 

732 self._cache_key = self._create_cache_key() 

733 if expiry_window_seconds is None: 

734 expiry_window_seconds = self.DEFAULT_EXPIRY_WINDOW_SECONDS 

735 self._expiry_window_seconds = expiry_window_seconds 

736 self.feature_ids = set() 

737 

738 def _create_cache_key(self): 

739 raise NotImplementedError('_create_cache_key()') 

740 

741 def _make_file_safe(self, filename): 

742 # Replace :, path sep, and / to make it the string filename safe. 

743 filename = filename.replace(':', '_').replace(os.sep, '_') 

744 return filename.replace('/', '_') 

745 

746 def _get_credentials(self): 

747 raise NotImplementedError('_get_credentials()') 

748 

749 def fetch_credentials(self): 

750 return self._get_cached_credentials() 

751 

752 def _get_cached_credentials(self): 

753 """Get up-to-date credentials. 

754 

755 This will check the cache for up-to-date credentials, calling assume 

756 role if none are available. 

757 """ 

758 response = self._load_from_cache() 

759 if response is None: 

760 response = self._get_credentials() 

761 self._write_to_cache(response) 

762 else: 

763 logger.debug("Credentials for role retrieved from cache.") 

764 

765 creds = response['Credentials'] 

766 expiration = _serialize_if_needed(creds['Expiration'], iso=True) 

767 credentials = { 

768 'access_key': creds['AccessKeyId'], 

769 'secret_key': creds['SecretAccessKey'], 

770 'token': creds['SessionToken'], 

771 'expiry_time': expiration, 

772 'account_id': creds.get('AccountId'), 

773 } 

774 

775 return credentials 

776 

777 def _load_from_cache(self): 

778 if self._cache_key in self._cache: 

779 creds = deepcopy(self._cache[self._cache_key]) 

780 if not self._is_expired(creds): 

781 return creds 

782 else: 

783 logger.debug( 

784 "Credentials were found in cache, but they are expired." 

785 ) 

786 return None 

787 

788 def _write_to_cache(self, response): 

789 self._cache[self._cache_key] = deepcopy(response) 

790 

791 def _is_expired(self, credentials): 

792 """Check if credentials are expired.""" 

793 end_time = _parse_if_needed(credentials['Credentials']['Expiration']) 

794 seconds = total_seconds(end_time - _local_now()) 

795 return seconds < self._expiry_window_seconds 

796 

797 

798class BaseAssumeRoleCredentialFetcher(CachedCredentialFetcher): 

799 def __init__( 

800 self, 

801 client_creator, 

802 role_arn, 

803 extra_args=None, 

804 cache=None, 

805 expiry_window_seconds=None, 

806 ): 

807 self._client_creator = client_creator 

808 self._role_arn = role_arn 

809 

810 if extra_args is None: 

811 self._assume_kwargs = {} 

812 else: 

813 self._assume_kwargs = deepcopy(extra_args) 

814 self._assume_kwargs['RoleArn'] = self._role_arn 

815 

816 self._role_session_name = self._assume_kwargs.get('RoleSessionName') 

817 self._using_default_session_name = False 

818 if not self._role_session_name: 

819 self._generate_assume_role_name() 

820 

821 super().__init__(cache, expiry_window_seconds) 

822 

823 def _generate_assume_role_name(self): 

824 self._role_session_name = f'botocore-session-{int(time.time())}' 

825 self._assume_kwargs['RoleSessionName'] = self._role_session_name 

826 self._using_default_session_name = True 

827 

828 def _create_cache_key(self): 

829 """Create a predictable cache key for the current configuration. 

830 

831 The cache key is intended to be compatible with file names. 

832 """ 

833 args = deepcopy(self._assume_kwargs) 

834 

835 # The role session name gets randomly generated, so we don't want it 

836 # in the hash. 

837 if self._using_default_session_name: 

838 del args['RoleSessionName'] 

839 

840 if 'Policy' in args: 

841 # To have a predictable hash, the keys of the policy must be 

842 # sorted, so we have to load it here to make sure it gets sorted 

843 # later on. 

844 args['Policy'] = json.loads(args['Policy']) 

845 

846 args = json.dumps(args, sort_keys=True) 

847 argument_hash = sha1(args.encode('utf-8')).hexdigest() 

848 return self._make_file_safe(argument_hash) 

849 

850 def _add_account_id_to_response(self, response): 

851 role_arn = response.get('AssumedRoleUser', {}).get('Arn') 

852 if ArnParser.is_arn(role_arn): 

853 arn_parser = ArnParser() 

854 account_id = arn_parser.parse_arn(role_arn)['account'] 

855 response['Credentials']['AccountId'] = account_id 

856 else: 

857 logger.debug("Unable to extract account ID from Arn: %s", role_arn) 

858 

859 

860class AssumeRoleCredentialFetcher(BaseAssumeRoleCredentialFetcher): 

861 def __init__( 

862 self, 

863 client_creator, 

864 source_credentials, 

865 role_arn, 

866 extra_args=None, 

867 mfa_prompter=None, 

868 cache=None, 

869 expiry_window_seconds=None, 

870 ): 

871 """ 

872 :type client_creator: callable 

873 :param client_creator: A callable that creates a client taking 

874 arguments like ``Session.create_client``. 

875 

876 :type source_credentials: Credentials 

877 :param source_credentials: The credentials to use to create the 

878 client for the call to AssumeRole. 

879 

880 :type role_arn: str 

881 :param role_arn: The ARN of the role to be assumed. 

882 

883 :type extra_args: dict 

884 :param extra_args: Any additional arguments to add to the assume 

885 role request using the format of the botocore operation. 

886 Possible keys include, but may not be limited to, 

887 DurationSeconds, Policy, SerialNumber, ExternalId and 

888 RoleSessionName. 

889 

890 :type mfa_prompter: callable 

891 :param mfa_prompter: A callable that returns input provided by the 

892 user (i.e raw_input, getpass.getpass, etc.). 

893 

894 :type cache: dict 

895 :param cache: An object that supports ``__getitem__``, 

896 ``__setitem__``, and ``__contains__``. An example of this is 

897 the ``JSONFileCache`` class in aws-cli. 

898 

899 :type expiry_window_seconds: int 

900 :param expiry_window_seconds: The amount of time, in seconds, 

901 """ 

902 self._source_credentials = source_credentials 

903 self._mfa_prompter = mfa_prompter 

904 if self._mfa_prompter is None: 

905 self._mfa_prompter = getpass.getpass 

906 

907 super().__init__( 

908 client_creator, 

909 role_arn, 

910 extra_args=extra_args, 

911 cache=cache, 

912 expiry_window_seconds=expiry_window_seconds, 

913 ) 

914 

915 def _get_credentials(self): 

916 """Get credentials by calling assume role.""" 

917 register_feature_ids(self.feature_ids) 

918 kwargs = self._assume_role_kwargs() 

919 client = self._create_client() 

920 response = client.assume_role(**kwargs) 

921 self._add_account_id_to_response(response) 

922 return response 

923 

924 def _assume_role_kwargs(self): 

925 """Get the arguments for assume role based on current configuration.""" 

926 assume_role_kwargs = deepcopy(self._assume_kwargs) 

927 

928 mfa_serial = assume_role_kwargs.get('SerialNumber') 

929 

930 if mfa_serial is not None: 

931 prompt = f'Enter MFA code for {mfa_serial}: ' 

932 token_code = self._mfa_prompter(prompt) 

933 assume_role_kwargs['TokenCode'] = token_code 

934 

935 duration_seconds = assume_role_kwargs.get('DurationSeconds') 

936 

937 if duration_seconds is not None: 

938 assume_role_kwargs['DurationSeconds'] = duration_seconds 

939 

940 return assume_role_kwargs 

941 

942 def _create_client(self): 

943 """Create an STS client using the source credentials.""" 

944 frozen_credentials = self._source_credentials.get_frozen_credentials() 

945 return self._client_creator( 

946 'sts', 

947 aws_access_key_id=frozen_credentials.access_key, 

948 aws_secret_access_key=frozen_credentials.secret_key, 

949 aws_session_token=frozen_credentials.token, 

950 ) 

951 

952 

953class AssumeRoleWithWebIdentityCredentialFetcher( 

954 BaseAssumeRoleCredentialFetcher 

955): 

956 def __init__( 

957 self, 

958 client_creator, 

959 web_identity_token_loader, 

960 role_arn, 

961 extra_args=None, 

962 cache=None, 

963 expiry_window_seconds=None, 

964 ): 

965 """ 

966 :type client_creator: callable 

967 :param client_creator: A callable that creates a client taking 

968 arguments like ``Session.create_client``. 

969 

970 :type web_identity_token_loader: callable 

971 :param web_identity_token_loader: A callable that takes no arguments 

972 and returns a web identity token str. 

973 

974 :type role_arn: str 

975 :param role_arn: The ARN of the role to be assumed. 

976 

977 :type extra_args: dict 

978 :param extra_args: Any additional arguments to add to the assume 

979 role request using the format of the botocore operation. 

980 Possible keys include, but may not be limited to, 

981 DurationSeconds, Policy, SerialNumber, ExternalId and 

982 RoleSessionName. 

983 

984 :type cache: dict 

985 :param cache: An object that supports ``__getitem__``, 

986 ``__setitem__``, and ``__contains__``. An example of this is 

987 the ``JSONFileCache`` class in aws-cli. 

988 

989 :type expiry_window_seconds: int 

990 :param expiry_window_seconds: The amount of time, in seconds, 

991 """ 

992 self._web_identity_token_loader = web_identity_token_loader 

993 

994 super().__init__( 

995 client_creator, 

996 role_arn, 

997 extra_args=extra_args, 

998 cache=cache, 

999 expiry_window_seconds=expiry_window_seconds, 

1000 ) 

1001 

1002 def _get_credentials(self): 

1003 """Get credentials by calling assume role.""" 

1004 register_feature_ids(self.feature_ids) 

1005 kwargs = self._assume_role_kwargs() 

1006 # Assume role with web identity does not require credentials other than 

1007 # the token, explicitly configure the client to not sign requests. 

1008 config = Config(signature_version=UNSIGNED) 

1009 client = self._client_creator('sts', config=config) 

1010 response = client.assume_role_with_web_identity(**kwargs) 

1011 self._add_account_id_to_response(response) 

1012 return response 

1013 

1014 def _assume_role_kwargs(self): 

1015 """Get the arguments for assume role based on current configuration.""" 

1016 assume_role_kwargs = deepcopy(self._assume_kwargs) 

1017 identity_token = self._web_identity_token_loader() 

1018 assume_role_kwargs['WebIdentityToken'] = identity_token 

1019 

1020 return assume_role_kwargs 

1021 

1022 

1023class CredentialProvider: 

1024 # A short name to identify the provider within botocore. 

1025 METHOD = None 

1026 

1027 # A name to identify the provider for use in cross-sdk features like 

1028 # assume role's `credential_source` configuration option. These names 

1029 # are to be treated in a case-insensitive way. NOTE: any providers not 

1030 # implemented in botocore MUST prefix their canonical names with 

1031 # 'custom' or we DO NOT guarantee that it will work with any features 

1032 # that this provides. 

1033 CANONICAL_NAME = None 

1034 

1035 def __init__(self, session=None): 

1036 self.session = session 

1037 

1038 def load(self): 

1039 """ 

1040 Loads the credentials from their source & sets them on the object. 

1041 

1042 Subclasses should implement this method (by reading from disk, the 

1043 environment, the network or wherever), returning ``True`` if they were 

1044 found & loaded. 

1045 

1046 If not found, this method should return ``False``, indicating that the 

1047 ``CredentialResolver`` should fall back to the next available method. 

1048 

1049 The default implementation does nothing, assuming the user has set the 

1050 ``access_key/secret_key/token`` themselves. 

1051 

1052 :returns: Whether credentials were found & set 

1053 :rtype: Credentials 

1054 """ 

1055 return True 

1056 

1057 def _extract_creds_from_mapping(self, mapping, *key_names): 

1058 found = [] 

1059 for key_name in key_names: 

1060 try: 

1061 found.append(mapping[key_name]) 

1062 except KeyError: 

1063 raise PartialCredentialsError( 

1064 provider=self.METHOD, cred_var=key_name 

1065 ) 

1066 return found 

1067 

1068 

1069class ProcessProvider(CredentialProvider): 

1070 METHOD = 'custom-process' 

1071 

1072 def __init__(self, profile_name, load_config, popen=subprocess.Popen): 

1073 self._profile_name = profile_name 

1074 self._load_config = load_config 

1075 self._loaded_config = None 

1076 self._popen = popen 

1077 

1078 def load(self): 

1079 credential_process = self._credential_process 

1080 if credential_process is None: 

1081 return 

1082 

1083 register_feature_id('CREDENTIALS_PROFILE_PROCESS') 

1084 creds_dict = self._retrieve_credentials_using(credential_process) 

1085 register_feature_id('CREDENTIALS_PROCESS') 

1086 if creds_dict.get('expiry_time') is not None: 

1087 return RefreshableCredentials.create_from_metadata( 

1088 creds_dict, 

1089 lambda: self._retrieve_credentials_using(credential_process), 

1090 self.METHOD, 

1091 ) 

1092 

1093 return Credentials( 

1094 access_key=creds_dict['access_key'], 

1095 secret_key=creds_dict['secret_key'], 

1096 token=creds_dict.get('token'), 

1097 method=self.METHOD, 

1098 account_id=creds_dict.get('account_id'), 

1099 ) 

1100 

1101 def _retrieve_credentials_using(self, credential_process): 

1102 # We're not using shell=True, so we need to pass the 

1103 # command and all arguments as a list. 

1104 process_list = compat_shell_split(credential_process) 

1105 p = self._popen( 

1106 process_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

1107 ) 

1108 stdout, stderr = p.communicate() 

1109 if p.returncode != 0: 

1110 raise CredentialRetrievalError( 

1111 provider=self.METHOD, error_msg=stderr.decode('utf-8') 

1112 ) 

1113 parsed = botocore.compat.json.loads(stdout.decode('utf-8')) 

1114 version = parsed.get('Version', '<Version key not provided>') 

1115 if version != 1: 

1116 raise CredentialRetrievalError( 

1117 provider=self.METHOD, 

1118 error_msg=( 

1119 f"Unsupported version '{version}' for credential process " 

1120 f"provider, supported versions: 1" 

1121 ), 

1122 ) 

1123 try: 

1124 return { 

1125 'access_key': parsed['AccessKeyId'], 

1126 'secret_key': parsed['SecretAccessKey'], 

1127 'token': parsed.get('SessionToken'), 

1128 'expiry_time': parsed.get('Expiration'), 

1129 'account_id': self._get_account_id(parsed), 

1130 } 

1131 except KeyError as e: 

1132 raise CredentialRetrievalError( 

1133 provider=self.METHOD, 

1134 error_msg=f"Missing required key in response: {e}", 

1135 ) 

1136 

1137 @property 

1138 def _credential_process(self): 

1139 return self.profile_config.get('credential_process') 

1140 

1141 @property 

1142 def profile_config(self): 

1143 if self._loaded_config is None: 

1144 self._loaded_config = self._load_config() 

1145 profile_config = self._loaded_config.get('profiles', {}).get( 

1146 self._profile_name, {} 

1147 ) 

1148 return profile_config 

1149 

1150 def _get_account_id(self, parsed): 

1151 account_id = parsed.get('AccountId') 

1152 return account_id or self.profile_config.get('aws_account_id') 

1153 

1154 

1155class InstanceMetadataProvider(CredentialProvider): 

1156 METHOD = 'iam-role' 

1157 CANONICAL_NAME = 'Ec2InstanceMetadata' 

1158 

1159 def __init__(self, iam_role_fetcher): 

1160 self._role_fetcher = iam_role_fetcher 

1161 

1162 def load(self): 

1163 fetcher = self._role_fetcher 

1164 # We do the first request, to see if we get useful data back. 

1165 # If not, we'll pass & move on to whatever's next in the credential 

1166 # chain. 

1167 metadata = fetcher.retrieve_iam_role_credentials() 

1168 if not metadata: 

1169 return None 

1170 register_feature_id('CREDENTIALS_IMDS') 

1171 logger.info( 

1172 'Found credentials from IAM Role: %s', metadata['role_name'] 

1173 ) 

1174 # We manually set the data here, since we already made the request & 

1175 # have it. When the expiry is hit, the credentials will auto-refresh 

1176 # themselves. 

1177 creds = RefreshableCredentials.create_from_metadata( 

1178 metadata, 

1179 method=self.METHOD, 

1180 refresh_using=fetcher.retrieve_iam_role_credentials, 

1181 ) 

1182 return creds 

1183 

1184 

1185class EnvProvider(CredentialProvider): 

1186 METHOD = 'env' 

1187 CANONICAL_NAME = 'Environment' 

1188 ACCESS_KEY = 'AWS_ACCESS_KEY_ID' 

1189 SECRET_KEY = 'AWS_SECRET_ACCESS_KEY' 

1190 # The token can come from either of these env var. 

1191 # AWS_SESSION_TOKEN is what other AWS SDKs have standardized on. 

1192 TOKENS = ['AWS_SECURITY_TOKEN', 'AWS_SESSION_TOKEN'] 

1193 EXPIRY_TIME = 'AWS_CREDENTIAL_EXPIRATION' 

1194 ACCOUNT_ID = 'AWS_ACCOUNT_ID' 

1195 

1196 def __init__(self, environ=None, mapping=None): 

1197 """ 

1198 

1199 :param environ: The environment variables (defaults to 

1200 ``os.environ`` if no value is provided). 

1201 :param mapping: An optional mapping of variable names to 

1202 environment variable names. Use this if you want to 

1203 change the mapping of access_key->AWS_ACCESS_KEY_ID, etc. 

1204 The dict can have up to 5 keys: 

1205 * ``access_key`` 

1206 * ``secret_key`` 

1207 * ``token`` 

1208 * ``expiry_time`` 

1209 * ``account_id`` 

1210 """ 

1211 if environ is None: 

1212 environ = os.environ 

1213 self.environ = environ 

1214 self._mapping = self._build_mapping(mapping) 

1215 

1216 def _build_mapping(self, mapping): 

1217 # Mapping of variable name to env var name. 

1218 var_mapping = {} 

1219 if mapping is None: 

1220 # Use the class var default. 

1221 var_mapping['access_key'] = self.ACCESS_KEY 

1222 var_mapping['secret_key'] = self.SECRET_KEY 

1223 var_mapping['token'] = self.TOKENS 

1224 var_mapping['expiry_time'] = self.EXPIRY_TIME 

1225 var_mapping['account_id'] = self.ACCOUNT_ID 

1226 else: 

1227 var_mapping['access_key'] = mapping.get( 

1228 'access_key', self.ACCESS_KEY 

1229 ) 

1230 var_mapping['secret_key'] = mapping.get( 

1231 'secret_key', self.SECRET_KEY 

1232 ) 

1233 var_mapping['token'] = mapping.get('token', self.TOKENS) 

1234 if not isinstance(var_mapping['token'], list): 

1235 var_mapping['token'] = [var_mapping['token']] 

1236 var_mapping['expiry_time'] = mapping.get( 

1237 'expiry_time', self.EXPIRY_TIME 

1238 ) 

1239 var_mapping['account_id'] = mapping.get( 

1240 'account_id', self.ACCOUNT_ID 

1241 ) 

1242 return var_mapping 

1243 

1244 def load(self): 

1245 """ 

1246 Search for credentials in explicit environment variables. 

1247 """ 

1248 

1249 access_key = self.environ.get(self._mapping['access_key'], '') 

1250 

1251 if access_key: 

1252 logger.info('Found credentials in environment variables.') 

1253 fetcher = self._create_credentials_fetcher() 

1254 credentials = fetcher(require_expiry=False) 

1255 register_feature_id('CREDENTIALS_ENV_VARS') 

1256 

1257 expiry_time = credentials['expiry_time'] 

1258 if expiry_time is not None: 

1259 expiry_time = parse(expiry_time) 

1260 return RefreshableCredentials( 

1261 credentials['access_key'], 

1262 credentials['secret_key'], 

1263 credentials['token'], 

1264 expiry_time, 

1265 refresh_using=fetcher, 

1266 method=self.METHOD, 

1267 account_id=credentials['account_id'], 

1268 ) 

1269 

1270 return Credentials( 

1271 credentials['access_key'], 

1272 credentials['secret_key'], 

1273 credentials['token'], 

1274 method=self.METHOD, 

1275 account_id=credentials['account_id'], 

1276 ) 

1277 else: 

1278 return None 

1279 

1280 def _create_credentials_fetcher(self): 

1281 mapping = self._mapping 

1282 method = self.METHOD 

1283 environ = self.environ 

1284 

1285 def fetch_credentials(require_expiry=True): 

1286 credentials = {} 

1287 

1288 access_key = environ.get(mapping['access_key'], '') 

1289 if not access_key: 

1290 raise PartialCredentialsError( 

1291 provider=method, cred_var=mapping['access_key'] 

1292 ) 

1293 credentials['access_key'] = access_key 

1294 

1295 secret_key = environ.get(mapping['secret_key'], '') 

1296 if not secret_key: 

1297 raise PartialCredentialsError( 

1298 provider=method, cred_var=mapping['secret_key'] 

1299 ) 

1300 credentials['secret_key'] = secret_key 

1301 

1302 credentials['token'] = None 

1303 for token_env_var in mapping['token']: 

1304 token = environ.get(token_env_var, '') 

1305 if token: 

1306 credentials['token'] = token 

1307 break 

1308 

1309 credentials['expiry_time'] = None 

1310 expiry_time = environ.get(mapping['expiry_time'], '') 

1311 if expiry_time: 

1312 credentials['expiry_time'] = expiry_time 

1313 if require_expiry and not expiry_time: 

1314 raise PartialCredentialsError( 

1315 provider=method, cred_var=mapping['expiry_time'] 

1316 ) 

1317 

1318 credentials['account_id'] = None 

1319 account_id = environ.get(mapping['account_id'], '') 

1320 if account_id: 

1321 credentials['account_id'] = account_id 

1322 

1323 return credentials 

1324 

1325 return fetch_credentials 

1326 

1327 

1328class OriginalEC2Provider(CredentialProvider): 

1329 METHOD = 'ec2-credentials-file' 

1330 CANONICAL_NAME = 'Ec2Config' 

1331 

1332 CRED_FILE_ENV = 'AWS_CREDENTIAL_FILE' 

1333 ACCESS_KEY = 'AWSAccessKeyId' 

1334 SECRET_KEY = 'AWSSecretKey' 

1335 

1336 def __init__(self, environ=None, parser=None): 

1337 if environ is None: 

1338 environ = os.environ 

1339 if parser is None: 

1340 parser = parse_key_val_file 

1341 self._environ = environ 

1342 self._parser = parser 

1343 

1344 def load(self): 

1345 """ 

1346 Search for a credential file used by original EC2 CLI tools. 

1347 """ 

1348 if 'AWS_CREDENTIAL_FILE' in self._environ: 

1349 full_path = os.path.expanduser( 

1350 self._environ['AWS_CREDENTIAL_FILE'] 

1351 ) 

1352 creds = self._parser(full_path) 

1353 if self.ACCESS_KEY in creds: 

1354 logger.info('Found credentials in AWS_CREDENTIAL_FILE.') 

1355 access_key = creds[self.ACCESS_KEY] 

1356 secret_key = creds[self.SECRET_KEY] 

1357 # EC2 creds file doesn't support session tokens. 

1358 return Credentials(access_key, secret_key, method=self.METHOD) 

1359 else: 

1360 return None 

1361 

1362 

1363class SharedCredentialProvider(CredentialProvider): 

1364 METHOD = 'shared-credentials-file' 

1365 CANONICAL_NAME = 'SharedCredentials' 

1366 

1367 ACCESS_KEY = 'aws_access_key_id' 

1368 SECRET_KEY = 'aws_secret_access_key' 

1369 # Same deal as the EnvProvider above. Botocore originally supported 

1370 # aws_security_token, but the SDKs are standardizing on aws_session_token 

1371 # so we support both. 

1372 TOKENS = ['aws_security_token', 'aws_session_token'] 

1373 ACCOUNT_ID = 'aws_account_id' 

1374 

1375 def __init__(self, creds_filename, profile_name=None, ini_parser=None): 

1376 self._creds_filename = creds_filename 

1377 if profile_name is None: 

1378 profile_name = 'default' 

1379 self._profile_name = profile_name 

1380 if ini_parser is None: 

1381 ini_parser = botocore.configloader.raw_config_parse 

1382 self._ini_parser = ini_parser 

1383 

1384 def load(self): 

1385 try: 

1386 available_creds = self._ini_parser(self._creds_filename) 

1387 except ConfigNotFound: 

1388 return None 

1389 if self._profile_name in available_creds: 

1390 config = available_creds[self._profile_name] 

1391 if self.ACCESS_KEY in config: 

1392 logger.info( 

1393 "Found credentials in shared credentials file: %s", 

1394 self._creds_filename, 

1395 ) 

1396 access_key, secret_key = self._extract_creds_from_mapping( 

1397 config, self.ACCESS_KEY, self.SECRET_KEY 

1398 ) 

1399 token = self._get_session_token(config) 

1400 account_id = self._get_account_id(config) 

1401 register_feature_id('CREDENTIALS_PROFILE') 

1402 return Credentials( 

1403 access_key, 

1404 secret_key, 

1405 token, 

1406 method=self.METHOD, 

1407 account_id=account_id, 

1408 ) 

1409 

1410 def _get_session_token(self, config): 

1411 for token_envvar in self.TOKENS: 

1412 if token_envvar in config: 

1413 return config[token_envvar] 

1414 

1415 def _get_account_id(self, config): 

1416 return config.get(self.ACCOUNT_ID) 

1417 

1418 

1419class ConfigProvider(CredentialProvider): 

1420 """INI based config provider with profile sections.""" 

1421 

1422 METHOD = 'config-file' 

1423 CANONICAL_NAME = 'SharedConfig' 

1424 

1425 ACCESS_KEY = 'aws_access_key_id' 

1426 SECRET_KEY = 'aws_secret_access_key' 

1427 # Same deal as the EnvProvider above. Botocore originally supported 

1428 # aws_security_token, but the SDKs are standardizing on aws_session_token 

1429 # so we support both. 

1430 TOKENS = ['aws_security_token', 'aws_session_token'] 

1431 ACCOUNT_ID = 'aws_account_id' 

1432 

1433 def __init__(self, config_filename, profile_name, config_parser=None): 

1434 """ 

1435 

1436 :param config_filename: The session configuration scoped to the current 

1437 profile. This is available via ``session.config``. 

1438 :param profile_name: The name of the current profile. 

1439 :param config_parser: A config parser callable. 

1440 

1441 """ 

1442 self._config_filename = config_filename 

1443 self._profile_name = profile_name 

1444 if config_parser is None: 

1445 config_parser = botocore.configloader.load_config 

1446 self._config_parser = config_parser 

1447 

1448 def load(self): 

1449 """ 

1450 If there is are credentials in the configuration associated with 

1451 the session, use those. 

1452 """ 

1453 try: 

1454 full_config = self._config_parser(self._config_filename) 

1455 except ConfigNotFound: 

1456 return None 

1457 if self._profile_name in full_config['profiles']: 

1458 profile_config = full_config['profiles'][self._profile_name] 

1459 if self.ACCESS_KEY in profile_config: 

1460 logger.info( 

1461 "Credentials found in config file: %s", 

1462 self._config_filename, 

1463 ) 

1464 access_key, secret_key = self._extract_creds_from_mapping( 

1465 profile_config, self.ACCESS_KEY, self.SECRET_KEY 

1466 ) 

1467 token = self._get_session_token(profile_config) 

1468 account_id = self._get_account_id(profile_config) 

1469 register_feature_id('CREDENTIALS_PROFILE') 

1470 return Credentials( 

1471 access_key, 

1472 secret_key, 

1473 token, 

1474 method=self.METHOD, 

1475 account_id=account_id, 

1476 ) 

1477 else: 

1478 return None 

1479 

1480 def _get_session_token(self, profile_config): 

1481 for token_name in self.TOKENS: 

1482 if token_name in profile_config: 

1483 return profile_config[token_name] 

1484 

1485 def _get_account_id(self, config): 

1486 return config.get(self.ACCOUNT_ID) 

1487 

1488 

1489class BotoProvider(CredentialProvider): 

1490 METHOD = 'boto-config' 

1491 CANONICAL_NAME = 'Boto2Config' 

1492 

1493 BOTO_CONFIG_ENV = 'BOTO_CONFIG' 

1494 DEFAULT_CONFIG_FILENAMES = ['/etc/boto.cfg', '~/.boto'] 

1495 ACCESS_KEY = 'aws_access_key_id' 

1496 SECRET_KEY = 'aws_secret_access_key' 

1497 

1498 def __init__(self, environ=None, ini_parser=None): 

1499 if environ is None: 

1500 environ = os.environ 

1501 if ini_parser is None: 

1502 ini_parser = botocore.configloader.raw_config_parse 

1503 self._environ = environ 

1504 self._ini_parser = ini_parser 

1505 

1506 def load(self): 

1507 """ 

1508 Look for credentials in boto config file. 

1509 """ 

1510 if self.BOTO_CONFIG_ENV in self._environ: 

1511 potential_locations = [self._environ[self.BOTO_CONFIG_ENV]] 

1512 else: 

1513 potential_locations = self.DEFAULT_CONFIG_FILENAMES 

1514 for filename in potential_locations: 

1515 try: 

1516 config = self._ini_parser(filename) 

1517 except ConfigNotFound: 

1518 # Move on to the next potential config file name. 

1519 continue 

1520 if 'Credentials' in config: 

1521 credentials = config['Credentials'] 

1522 if self.ACCESS_KEY in credentials: 

1523 logger.info( 

1524 "Found credentials in boto config file: %s", filename 

1525 ) 

1526 access_key, secret_key = self._extract_creds_from_mapping( 

1527 credentials, self.ACCESS_KEY, self.SECRET_KEY 

1528 ) 

1529 register_feature_id('CREDENTIALS_BOTO2_CONFIG_FILE') 

1530 return Credentials( 

1531 access_key, secret_key, method=self.METHOD 

1532 ) 

1533 

1534 

1535class AssumeRoleProvider(CredentialProvider): 

1536 METHOD = 'assume-role' 

1537 # The AssumeRole provider is logically part of the SharedConfig and 

1538 # SharedCredentials providers. Since the purpose of the canonical name 

1539 # is to provide cross-sdk compatibility, calling code will need to be 

1540 # aware that either of those providers should be tied to the AssumeRole 

1541 # provider as much as possible. 

1542 CANONICAL_NAME = None 

1543 ROLE_CONFIG_VAR = 'role_arn' 

1544 WEB_IDENTITY_TOKE_FILE_VAR = 'web_identity_token_file' 

1545 # Credentials are considered expired (and will be refreshed) once the total 

1546 # remaining time left until the credentials expires is less than the 

1547 # EXPIRY_WINDOW. 

1548 EXPIRY_WINDOW_SECONDS = 60 * 15 

1549 NAMED_PROVIDER_FEATURE_MAP = { 

1550 'Ec2InstanceMetadata': 'CREDENTIALS_IMDS', 

1551 'Environment': 'CREDENTIALS_ENV_VARS', 

1552 'EcsContainer': 'CREDENTIALS_HTTP', 

1553 } 

1554 

1555 def __init__( 

1556 self, 

1557 load_config, 

1558 client_creator, 

1559 cache, 

1560 profile_name, 

1561 prompter=getpass.getpass, 

1562 credential_sourcer=None, 

1563 profile_provider_builder=None, 

1564 ): 

1565 """ 

1566 :type load_config: callable 

1567 :param load_config: A function that accepts no arguments, and 

1568 when called, will return the full configuration dictionary 

1569 for the session (``session.full_config``). 

1570 

1571 :type client_creator: callable 

1572 :param client_creator: A factory function that will create 

1573 a client when called. Has the same interface as 

1574 ``botocore.session.Session.create_client``. 

1575 

1576 :type cache: dict 

1577 :param cache: An object that supports ``__getitem__``, 

1578 ``__setitem__``, and ``__contains__``. An example 

1579 of this is the ``JSONFileCache`` class in the CLI. 

1580 

1581 :type profile_name: str 

1582 :param profile_name: The name of the profile. 

1583 

1584 :type prompter: callable 

1585 :param prompter: A callable that returns input provided 

1586 by the user (i.e raw_input, getpass.getpass, etc.). 

1587 

1588 :type credential_sourcer: CanonicalNameCredentialSourcer 

1589 :param credential_sourcer: A credential provider that takes a 

1590 configuration, which is used to provide the source credentials 

1591 for the STS call. 

1592 """ 

1593 #: The cache used to first check for assumed credentials. 

1594 #: This is checked before making the AssumeRole API 

1595 #: calls and can be useful if you have short lived 

1596 #: scripts and you'd like to avoid calling AssumeRole 

1597 #: until the credentials are expired. 

1598 self.cache = cache 

1599 self._load_config = load_config 

1600 # client_creator is a callable that creates function. 

1601 # It's basically session.create_client 

1602 self._client_creator = client_creator 

1603 self._profile_name = profile_name 

1604 self._prompter = prompter 

1605 # The _loaded_config attribute will be populated from the 

1606 # load_config() function once the configuration is actually 

1607 # loaded. The reason we go through all this instead of just 

1608 # requiring that the loaded_config be passed to us is to that 

1609 # we can defer configuration loaded until we actually try 

1610 # to load credentials (as opposed to when the object is 

1611 # instantiated). 

1612 self._loaded_config = {} 

1613 self._credential_sourcer = credential_sourcer 

1614 self._profile_provider_builder = profile_provider_builder 

1615 self._visited_profiles = [self._profile_name] 

1616 self._feature_ids = set() 

1617 

1618 def load(self): 

1619 self._loaded_config = self._load_config() 

1620 profiles = self._loaded_config.get('profiles', {}) 

1621 profile = profiles.get(self._profile_name, {}) 

1622 if self._has_assume_role_config_vars(profile): 

1623 return self._load_creds_via_assume_role(self._profile_name) 

1624 

1625 def _has_assume_role_config_vars(self, profile): 

1626 return ( 

1627 self.ROLE_CONFIG_VAR in profile 

1628 and 

1629 # We need to ensure this provider doesn't look at a profile when 

1630 # the profile has configuration for web identity. Simply relying on 

1631 # the order in the credential chain is insufficient as it doesn't 

1632 # prevent the case when we're doing an assume role chain. 

1633 self.WEB_IDENTITY_TOKE_FILE_VAR not in profile 

1634 ) 

1635 

1636 def _load_creds_via_assume_role(self, profile_name): 

1637 role_config = self._get_role_config(profile_name) 

1638 source_credentials = self._resolve_source_credentials( 

1639 role_config, profile_name 

1640 ) 

1641 

1642 extra_args = {} 

1643 role_session_name = role_config.get('role_session_name') 

1644 if role_session_name is not None: 

1645 extra_args['RoleSessionName'] = role_session_name 

1646 

1647 external_id = role_config.get('external_id') 

1648 if external_id is not None: 

1649 extra_args['ExternalId'] = external_id 

1650 

1651 mfa_serial = role_config.get('mfa_serial') 

1652 if mfa_serial is not None: 

1653 extra_args['SerialNumber'] = mfa_serial 

1654 

1655 duration_seconds = role_config.get('duration_seconds') 

1656 if duration_seconds is not None: 

1657 extra_args['DurationSeconds'] = duration_seconds 

1658 

1659 fetcher = AssumeRoleCredentialFetcher( 

1660 client_creator=self._client_creator, 

1661 source_credentials=source_credentials, 

1662 role_arn=role_config['role_arn'], 

1663 extra_args=extra_args, 

1664 mfa_prompter=self._prompter, 

1665 cache=self.cache, 

1666 ) 

1667 fetcher.feature_ids = self._feature_ids.copy() 

1668 refresher = fetcher.fetch_credentials 

1669 if mfa_serial is not None: 

1670 refresher = create_mfa_serial_refresher(refresher) 

1671 

1672 self._feature_ids.add('CREDENTIALS_STS_ASSUME_ROLE') 

1673 register_feature_ids(self._feature_ids) 

1674 # The initial credentials are empty and the expiration time is set 

1675 # to now so that we can delay the call to assume role until it is 

1676 # strictly needed. 

1677 return DeferredRefreshableCredentials( 

1678 method=self.METHOD, 

1679 refresh_using=refresher, 

1680 time_fetcher=_local_now, 

1681 ) 

1682 

1683 def _get_role_config(self, profile_name): 

1684 """Retrieves and validates the role configuration for the profile.""" 

1685 profiles = self._loaded_config.get('profiles', {}) 

1686 

1687 profile = profiles[profile_name] 

1688 source_profile = profile.get('source_profile') 

1689 role_arn = profile['role_arn'] 

1690 credential_source = profile.get('credential_source') 

1691 mfa_serial = profile.get('mfa_serial') 

1692 external_id = profile.get('external_id') 

1693 role_session_name = profile.get('role_session_name') 

1694 duration_seconds = profile.get('duration_seconds') 

1695 

1696 role_config = { 

1697 'role_arn': role_arn, 

1698 'external_id': external_id, 

1699 'mfa_serial': mfa_serial, 

1700 'role_session_name': role_session_name, 

1701 'source_profile': source_profile, 

1702 'credential_source': credential_source, 

1703 } 

1704 

1705 if duration_seconds is not None: 

1706 try: 

1707 role_config['duration_seconds'] = int(duration_seconds) 

1708 except ValueError: 

1709 pass 

1710 

1711 # Either the credential source or the source profile must be 

1712 # specified, but not both. 

1713 if credential_source is not None and source_profile is not None: 

1714 raise InvalidConfigError( 

1715 error_msg=( 

1716 f'The profile "{profile_name}" contains both ' 

1717 'source_profile and credential_source.' 

1718 ) 

1719 ) 

1720 elif credential_source is None and source_profile is None: 

1721 raise PartialCredentialsError( 

1722 provider=self.METHOD, 

1723 cred_var='source_profile or credential_source', 

1724 ) 

1725 elif credential_source is not None: 

1726 self._validate_credential_source(profile_name, credential_source) 

1727 else: 

1728 self._validate_source_profile(profile_name, source_profile) 

1729 

1730 return role_config 

1731 

1732 def _validate_credential_source(self, parent_profile, credential_source): 

1733 if self._credential_sourcer is None: 

1734 raise InvalidConfigError( 

1735 error_msg=( 

1736 f"The credential_source \"{credential_source}\" is specified " 

1737 f"in profile \"{parent_profile}\", " 

1738 f"but no source provider was configured." 

1739 ) 

1740 ) 

1741 if not self._credential_sourcer.is_supported(credential_source): 

1742 raise InvalidConfigError( 

1743 error_msg=( 

1744 f"The credential source \"{credential_source}\" referenced " 

1745 f"in profile \"{parent_profile}\" is not valid." 

1746 ) 

1747 ) 

1748 

1749 def _source_profile_has_credentials(self, profile): 

1750 return any( 

1751 [ 

1752 self._has_static_credentials(profile), 

1753 self._has_assume_role_config_vars(profile), 

1754 ] 

1755 ) 

1756 

1757 def _validate_source_profile( 

1758 self, parent_profile_name, source_profile_name 

1759 ): 

1760 profiles = self._loaded_config.get('profiles', {}) 

1761 if source_profile_name not in profiles: 

1762 raise InvalidConfigError( 

1763 error_msg=( 

1764 f"The source_profile \"{source_profile_name}\" referenced in " 

1765 f"the profile \"{parent_profile_name}\" does not exist." 

1766 ) 

1767 ) 

1768 

1769 source_profile = profiles[source_profile_name] 

1770 

1771 # Make sure we aren't going into an infinite loop. If we haven't 

1772 # visited the profile yet, we're good. 

1773 if source_profile_name not in self._visited_profiles: 

1774 return 

1775 

1776 # If we have visited the profile and the profile isn't simply 

1777 # referencing itself, that's an infinite loop. 

1778 if source_profile_name != parent_profile_name: 

1779 raise InfiniteLoopConfigError( 

1780 source_profile=source_profile_name, 

1781 visited_profiles=self._visited_profiles, 

1782 ) 

1783 

1784 # A profile is allowed to reference itself so that it can source 

1785 # static credentials and have configuration all in the same 

1786 # profile. This will only ever work for the top level assume 

1787 # role because the static credentials will otherwise take 

1788 # precedence. 

1789 if not self._has_static_credentials(source_profile): 

1790 raise InfiniteLoopConfigError( 

1791 source_profile=source_profile_name, 

1792 visited_profiles=self._visited_profiles, 

1793 ) 

1794 

1795 def _has_static_credentials(self, profile): 

1796 static_keys = ['aws_secret_access_key', 'aws_access_key_id'] 

1797 return any(static_key in profile for static_key in static_keys) 

1798 

1799 def _resolve_source_credentials(self, role_config, profile_name): 

1800 credential_source = role_config.get('credential_source') 

1801 if credential_source is not None: 

1802 self._feature_ids.add('CREDENTIALS_PROFILE_NAMED_PROVIDER') 

1803 return self._resolve_credentials_from_source( 

1804 credential_source, profile_name 

1805 ) 

1806 

1807 source_profile = role_config['source_profile'] 

1808 self._visited_profiles.append(source_profile) 

1809 self._feature_ids.add('CREDENTIALS_PROFILE_SOURCE_PROFILE') 

1810 return self._resolve_credentials_from_profile(source_profile) 

1811 

1812 def _resolve_credentials_from_profile(self, profile_name): 

1813 profiles = self._loaded_config.get('profiles', {}) 

1814 profile = profiles[profile_name] 

1815 self._feature_ids.add('CREDENTIALS_PROFILE') 

1816 if ( 

1817 self._has_static_credentials(profile) 

1818 and not self._profile_provider_builder 

1819 ): 

1820 # This is only here for backwards compatibility. If this provider 

1821 # isn't given a profile provider builder we still want to be able 

1822 # to handle the basic static credential case as we would before the 

1823 # profile provider builder parameter was added. 

1824 return self._resolve_static_credentials_from_profile(profile) 

1825 elif self._has_static_credentials( 

1826 profile 

1827 ) or not self._has_assume_role_config_vars(profile): 

1828 profile_providers = self._profile_provider_builder.providers( 

1829 profile_name=profile_name, 

1830 disable_env_vars=True, 

1831 ) 

1832 profile_chain = CredentialResolver(profile_providers) 

1833 credentials = profile_chain.load_credentials() 

1834 if credentials is None: 

1835 error_message = ( 

1836 'The source profile "%s" must have credentials.' 

1837 ) 

1838 raise InvalidConfigError( 

1839 error_msg=error_message % profile_name, 

1840 ) 

1841 return credentials 

1842 

1843 return self._load_creds_via_assume_role(profile_name) 

1844 

1845 def _resolve_static_credentials_from_profile(self, profile): 

1846 try: 

1847 return Credentials( 

1848 access_key=profile['aws_access_key_id'], 

1849 secret_key=profile['aws_secret_access_key'], 

1850 token=profile.get('aws_session_token'), 

1851 ) 

1852 except KeyError as e: 

1853 raise PartialCredentialsError( 

1854 provider=self.METHOD, cred_var=str(e) 

1855 ) 

1856 

1857 def _resolve_credentials_from_source( 

1858 self, credential_source, profile_name 

1859 ): 

1860 credentials = self._credential_sourcer.source_credentials( 

1861 credential_source 

1862 ) 

1863 if credentials is None: 

1864 raise CredentialRetrievalError( 

1865 provider=credential_source, 

1866 error_msg=( 

1867 'No credentials found in credential_source referenced ' 

1868 f'in profile {profile_name}' 

1869 ), 

1870 ) 

1871 named_provider_feature_id = self.NAMED_PROVIDER_FEATURE_MAP.get( 

1872 credential_source 

1873 ) 

1874 if named_provider_feature_id: 

1875 self._feature_ids.add(named_provider_feature_id) 

1876 return credentials 

1877 

1878 

1879class AssumeRoleWithWebIdentityProvider(CredentialProvider): 

1880 METHOD = 'assume-role-with-web-identity' 

1881 CANONICAL_NAME = None 

1882 _CONFIG_TO_ENV_VAR = { 

1883 'web_identity_token_file': 'AWS_WEB_IDENTITY_TOKEN_FILE', 

1884 'role_session_name': 'AWS_ROLE_SESSION_NAME', 

1885 'role_arn': 'AWS_ROLE_ARN', 

1886 } 

1887 

1888 def __init__( 

1889 self, 

1890 load_config, 

1891 client_creator, 

1892 profile_name, 

1893 cache=None, 

1894 disable_env_vars=False, 

1895 token_loader_cls=None, 

1896 ): 

1897 self.cache = cache 

1898 self._load_config = load_config 

1899 self._client_creator = client_creator 

1900 self._profile_name = profile_name 

1901 self._profile_config = None 

1902 self._disable_env_vars = disable_env_vars 

1903 if token_loader_cls is None: 

1904 token_loader_cls = FileWebIdentityTokenLoader 

1905 self._token_loader_cls = token_loader_cls 

1906 self._feature_ids = set() 

1907 

1908 def load(self): 

1909 return self._assume_role_with_web_identity() 

1910 

1911 def _get_profile_config(self, key): 

1912 if self._profile_config is None: 

1913 loaded_config = self._load_config() 

1914 profiles = loaded_config.get('profiles', {}) 

1915 self._profile_config = profiles.get(self._profile_name, {}) 

1916 return self._profile_config.get(key) 

1917 

1918 def _get_env_config(self, key): 

1919 if self._disable_env_vars: 

1920 return None 

1921 env_key = self._CONFIG_TO_ENV_VAR.get(key) 

1922 if env_key and env_key in os.environ: 

1923 return os.environ[env_key] 

1924 return None 

1925 

1926 def _get_config(self, key): 

1927 env_value = self._get_env_config(key) 

1928 if env_value is not None: 

1929 self._feature_ids.add('CREDENTIALS_ENV_VARS_STS_WEB_ID_TOKEN') 

1930 return env_value 

1931 

1932 config_value = self._get_profile_config(key) 

1933 if config_value is not None: 

1934 self._feature_ids.add('CREDENTIALS_PROFILE_STS_WEB_ID_TOKEN') 

1935 return config_value 

1936 

1937 return None 

1938 

1939 def _assume_role_with_web_identity(self): 

1940 token_path = self._get_config('web_identity_token_file') 

1941 if not token_path: 

1942 return None 

1943 token_loader = self._token_loader_cls(token_path) 

1944 

1945 role_arn = self._get_config('role_arn') 

1946 if not role_arn: 

1947 error_msg = ( 

1948 'The provided profile or the current environment is ' 

1949 'configured to assume role with web identity but has no ' 

1950 'role ARN configured. Ensure that the profile has the role_arn' 

1951 'configuration set or the AWS_ROLE_ARN env var is set.' 

1952 ) 

1953 raise InvalidConfigError(error_msg=error_msg) 

1954 

1955 extra_args = {} 

1956 role_session_name = self._get_config('role_session_name') 

1957 if role_session_name is not None: 

1958 extra_args['RoleSessionName'] = role_session_name 

1959 

1960 fetcher = AssumeRoleWithWebIdentityCredentialFetcher( 

1961 client_creator=self._client_creator, 

1962 web_identity_token_loader=token_loader, 

1963 role_arn=role_arn, 

1964 extra_args=extra_args, 

1965 cache=self.cache, 

1966 ) 

1967 fetcher.feature_ids = self._feature_ids.copy() 

1968 

1969 self._feature_ids.add('CREDENTIALS_STS_ASSUME_ROLE_WEB_ID') 

1970 register_feature_ids(self._feature_ids) 

1971 # The initial credentials are empty and the expiration time is set 

1972 # to now so that we can delay the call to assume role until it is 

1973 # strictly needed. 

1974 return DeferredRefreshableCredentials( 

1975 method=self.METHOD, 

1976 refresh_using=fetcher.fetch_credentials, 

1977 ) 

1978 

1979 

1980class CanonicalNameCredentialSourcer: 

1981 def __init__(self, providers): 

1982 self._providers = providers 

1983 

1984 def is_supported(self, source_name): 

1985 """Validates a given source name. 

1986 

1987 :type source_name: str 

1988 :param source_name: The value of credential_source in the config 

1989 file. This is the canonical name of the credential provider. 

1990 

1991 :rtype: bool 

1992 :returns: True if the credential provider is supported, 

1993 False otherwise. 

1994 """ 

1995 return source_name in [p.CANONICAL_NAME for p in self._providers] 

1996 

1997 def source_credentials(self, source_name): 

1998 """Loads source credentials based on the provided configuration. 

1999 

2000 :type source_name: str 

2001 :param source_name: The value of credential_source in the config 

2002 file. This is the canonical name of the credential provider. 

2003 

2004 :rtype: Credentials 

2005 """ 

2006 source = self._get_provider(source_name) 

2007 if isinstance(source, CredentialResolver): 

2008 return source.load_credentials() 

2009 return source.load() 

2010 

2011 def _get_provider(self, canonical_name): 

2012 """Return a credential provider by its canonical name. 

2013 

2014 :type canonical_name: str 

2015 :param canonical_name: The canonical name of the provider. 

2016 

2017 :raises UnknownCredentialError: Raised if no 

2018 credential provider by the provided name 

2019 is found. 

2020 """ 

2021 provider = self._get_provider_by_canonical_name(canonical_name) 

2022 

2023 # The AssumeRole provider should really be part of the SharedConfig 

2024 # provider rather than being its own thing, but it is not. It is 

2025 # effectively part of both the SharedConfig provider and the 

2026 # SharedCredentials provider now due to the way it behaves. 

2027 # Therefore if we want either of those providers we should return 

2028 # the AssumeRole provider with it. 

2029 if canonical_name.lower() in ['sharedconfig', 'sharedcredentials']: 

2030 assume_role_provider = self._get_provider_by_method('assume-role') 

2031 if assume_role_provider is not None: 

2032 # The SharedConfig or SharedCredentials provider may not be 

2033 # present if it was removed for some reason, but the 

2034 # AssumeRole provider could still be present. In that case, 

2035 # return the assume role provider by itself. 

2036 if provider is None: 

2037 return assume_role_provider 

2038 

2039 # If both are present, return them both as a 

2040 # CredentialResolver so that calling code can treat them as 

2041 # a single entity. 

2042 return CredentialResolver([assume_role_provider, provider]) 

2043 

2044 if provider is None: 

2045 raise UnknownCredentialError(name=canonical_name) 

2046 

2047 return provider 

2048 

2049 def _get_provider_by_canonical_name(self, canonical_name): 

2050 """Return a credential provider by its canonical name. 

2051 

2052 This function is strict, it does not attempt to address 

2053 compatibility issues. 

2054 """ 

2055 for provider in self._providers: 

2056 name = provider.CANONICAL_NAME 

2057 # Canonical names are case-insensitive 

2058 if name and name.lower() == canonical_name.lower(): 

2059 return provider 

2060 

2061 def _get_provider_by_method(self, method): 

2062 """Return a credential provider by its METHOD name.""" 

2063 for provider in self._providers: 

2064 if provider.METHOD == method: 

2065 return provider 

2066 

2067 

2068class ContainerProvider(CredentialProvider): 

2069 METHOD = 'container-role' 

2070 CANONICAL_NAME = 'EcsContainer' 

2071 ENV_VAR = 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI' 

2072 ENV_VAR_FULL = 'AWS_CONTAINER_CREDENTIALS_FULL_URI' 

2073 ENV_VAR_AUTH_TOKEN = 'AWS_CONTAINER_AUTHORIZATION_TOKEN' 

2074 ENV_VAR_AUTH_TOKEN_FILE = 'AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE' 

2075 

2076 def __init__(self, environ=None, fetcher=None): 

2077 if environ is None: 

2078 environ = os.environ 

2079 if fetcher is None: 

2080 fetcher = ContainerMetadataFetcher() 

2081 self._environ = environ 

2082 self._fetcher = fetcher 

2083 

2084 def load(self): 

2085 # This cred provider is only triggered if the self.ENV_VAR is set, 

2086 # which only happens if you opt into this feature. 

2087 if self.ENV_VAR in self._environ or self.ENV_VAR_FULL in self._environ: 

2088 return self._retrieve_or_fail() 

2089 

2090 def _retrieve_or_fail(self): 

2091 if self._provided_relative_uri(): 

2092 full_uri = self._fetcher.full_url(self._environ[self.ENV_VAR]) 

2093 else: 

2094 full_uri = self._environ[self.ENV_VAR_FULL] 

2095 fetcher = self._create_fetcher(full_uri) 

2096 creds = fetcher() 

2097 return RefreshableCredentials( 

2098 access_key=creds['access_key'], 

2099 secret_key=creds['secret_key'], 

2100 token=creds['token'], 

2101 method=self.METHOD, 

2102 expiry_time=_parse_if_needed(creds['expiry_time']), 

2103 refresh_using=fetcher, 

2104 account_id=creds.get('account_id'), 

2105 ) 

2106 

2107 def _build_headers(self): 

2108 auth_token = None 

2109 if self.ENV_VAR_AUTH_TOKEN_FILE in self._environ: 

2110 auth_token_file_path = self._environ[self.ENV_VAR_AUTH_TOKEN_FILE] 

2111 with open(auth_token_file_path) as token_file: 

2112 auth_token = token_file.read() 

2113 elif self.ENV_VAR_AUTH_TOKEN in self._environ: 

2114 auth_token = self._environ[self.ENV_VAR_AUTH_TOKEN] 

2115 if auth_token is not None: 

2116 self._validate_auth_token(auth_token) 

2117 return {'Authorization': auth_token} 

2118 

2119 def _validate_auth_token(self, auth_token): 

2120 if "\r" in auth_token or "\n" in auth_token: 

2121 raise ValueError("Auth token value is not a legal header value") 

2122 

2123 def _create_fetcher(self, full_uri, *args, **kwargs): 

2124 def fetch_creds(): 

2125 try: 

2126 headers = self._build_headers() 

2127 response = self._fetcher.retrieve_full_uri( 

2128 full_uri, headers=headers 

2129 ) 

2130 register_feature_id('CREDENTIALS_HTTP') 

2131 except MetadataRetrievalError as e: 

2132 logger.debug( 

2133 "Error retrieving container metadata: %s", e, exc_info=True 

2134 ) 

2135 raise CredentialRetrievalError( 

2136 provider=self.METHOD, error_msg=str(e) 

2137 ) 

2138 return { 

2139 'access_key': response['AccessKeyId'], 

2140 'secret_key': response['SecretAccessKey'], 

2141 'token': response['Token'], 

2142 'expiry_time': response['Expiration'], 

2143 'account_id': response.get('AccountId'), 

2144 } 

2145 

2146 return fetch_creds 

2147 

2148 def _provided_relative_uri(self): 

2149 return self.ENV_VAR in self._environ 

2150 

2151 

2152class CredentialResolver: 

2153 def __init__(self, providers): 

2154 """ 

2155 

2156 :param providers: A list of ``CredentialProvider`` instances. 

2157 

2158 """ 

2159 self.providers = providers 

2160 

2161 def insert_before(self, name, credential_provider): 

2162 """ 

2163 Inserts a new instance of ``CredentialProvider`` into the chain that 

2164 will be tried before an existing one. 

2165 

2166 :param name: The short name of the credentials you'd like to insert the 

2167 new credentials before. (ex. ``env`` or ``config``). Existing names 

2168 & ordering can be discovered via ``self.available_methods``. 

2169 :type name: string 

2170 

2171 :param cred_instance: An instance of the new ``Credentials`` object 

2172 you'd like to add to the chain. 

2173 :type cred_instance: A subclass of ``Credentials`` 

2174 """ 

2175 try: 

2176 offset = [p.METHOD for p in self.providers].index(name) 

2177 except ValueError: 

2178 raise UnknownCredentialError(name=name) 

2179 self.providers.insert(offset, credential_provider) 

2180 

2181 def insert_after(self, name, credential_provider): 

2182 """ 

2183 Inserts a new type of ``Credentials`` instance into the chain that will 

2184 be tried after an existing one. 

2185 

2186 :param name: The short name of the credentials you'd like to insert the 

2187 new credentials after. (ex. ``env`` or ``config``). Existing names 

2188 & ordering can be discovered via ``self.available_methods``. 

2189 :type name: string 

2190 

2191 :param cred_instance: An instance of the new ``Credentials`` object 

2192 you'd like to add to the chain. 

2193 :type cred_instance: A subclass of ``Credentials`` 

2194 """ 

2195 offset = self._get_provider_offset(name) 

2196 self.providers.insert(offset + 1, credential_provider) 

2197 

2198 def remove(self, name): 

2199 """ 

2200 Removes a given ``Credentials`` instance from the chain. 

2201 

2202 :param name: The short name of the credentials instance to remove. 

2203 :type name: string 

2204 """ 

2205 available_methods = [p.METHOD for p in self.providers] 

2206 if name not in available_methods: 

2207 # It's not present. Fail silently. 

2208 return 

2209 

2210 offset = available_methods.index(name) 

2211 self.providers.pop(offset) 

2212 

2213 def get_provider(self, name): 

2214 """Return a credential provider by name. 

2215 

2216 :type name: str 

2217 :param name: The name of the provider. 

2218 

2219 :raises UnknownCredentialError: Raised if no 

2220 credential provider by the provided name 

2221 is found. 

2222 """ 

2223 return self.providers[self._get_provider_offset(name)] 

2224 

2225 def _get_provider_offset(self, name): 

2226 try: 

2227 return [p.METHOD for p in self.providers].index(name) 

2228 except ValueError: 

2229 raise UnknownCredentialError(name=name) 

2230 

2231 def load_credentials(self): 

2232 """ 

2233 Goes through the credentials chain, returning the first ``Credentials`` 

2234 that could be loaded. 

2235 """ 

2236 # First provider to return a non-None response wins. 

2237 for provider in self.providers: 

2238 logger.debug("Looking for credentials via: %s", provider.METHOD) 

2239 creds = provider.load() 

2240 if creds is not None: 

2241 return creds 

2242 

2243 # If we got here, no credentials could be found. 

2244 # This feels like it should be an exception, but historically, ``None`` 

2245 # is returned. 

2246 # 

2247 # +1 

2248 # -js 

2249 return None 

2250 

2251 

2252class SSOCredentialFetcher(CachedCredentialFetcher): 

2253 _UTC_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' 

2254 

2255 def __init__( 

2256 self, 

2257 start_url, 

2258 sso_region, 

2259 role_name, 

2260 account_id, 

2261 client_creator, 

2262 token_loader=None, 

2263 cache=None, 

2264 expiry_window_seconds=None, 

2265 token_provider=None, 

2266 sso_session_name=None, 

2267 time_fetcher=_local_now, 

2268 ): 

2269 self._client_creator = client_creator 

2270 self._sso_region = sso_region 

2271 self._role_name = role_name 

2272 self._account_id = account_id 

2273 self._start_url = start_url 

2274 self._token_loader = token_loader 

2275 self._token_provider = token_provider 

2276 self._sso_session_name = sso_session_name 

2277 self._time_fetcher = time_fetcher 

2278 super().__init__(cache, expiry_window_seconds) 

2279 

2280 def _create_cache_key(self): 

2281 """Create a predictable cache key for the current configuration. 

2282 

2283 The cache key is intended to be compatible with file names. 

2284 """ 

2285 args = { 

2286 'roleName': self._role_name, 

2287 'accountId': self._account_id, 

2288 } 

2289 if self._sso_session_name: 

2290 args['sessionName'] = self._sso_session_name 

2291 else: 

2292 args['startUrl'] = self._start_url 

2293 # NOTE: It would be good to hoist this cache key construction logic 

2294 # into the CachedCredentialFetcher class as we should be consistent. 

2295 # Unfortunately, the current assume role fetchers that sub class don't 

2296 # pass separators resulting in non-minified JSON. In the long term, 

2297 # all fetchers should use the below caching scheme. 

2298 args = json.dumps(args, sort_keys=True, separators=(',', ':')) 

2299 argument_hash = sha1(args.encode('utf-8')).hexdigest() 

2300 return self._make_file_safe(argument_hash) 

2301 

2302 def _parse_timestamp(self, timestamp_ms): 

2303 # fromtimestamp expects seconds so: milliseconds / 1000 = seconds 

2304 timestamp_seconds = timestamp_ms / 1000.0 

2305 timestamp = datetime.datetime.fromtimestamp(timestamp_seconds, tzutc()) 

2306 return timestamp.strftime(self._UTC_DATE_FORMAT) 

2307 

2308 def _get_credentials(self): 

2309 """Get credentials by calling SSO get role credentials.""" 

2310 config = Config( 

2311 signature_version=UNSIGNED, 

2312 region_name=self._sso_region, 

2313 ) 

2314 client = self._client_creator('sso', config=config) 

2315 if self._token_provider: 

2316 initial_token_data = self._token_provider.load_token() 

2317 token = initial_token_data.get_frozen_token().token 

2318 else: 

2319 token_dict = self._token_loader(self._start_url) 

2320 token = token_dict['accessToken'] 

2321 

2322 # raise an UnauthorizedSSOTokenError if the loaded legacy token 

2323 # is expired to save a call to GetRoleCredentials with an 

2324 # expired token. 

2325 expiration = dateutil.parser.parse(token_dict['expiresAt']) 

2326 remaining = total_seconds(expiration - self._time_fetcher()) 

2327 if remaining <= 0: 

2328 raise UnauthorizedSSOTokenError() 

2329 

2330 kwargs = { 

2331 'roleName': self._role_name, 

2332 'accountId': self._account_id, 

2333 'accessToken': token, 

2334 } 

2335 try: 

2336 register_feature_ids(self.feature_ids) 

2337 response = client.get_role_credentials(**kwargs) 

2338 except client.exceptions.UnauthorizedException: 

2339 raise UnauthorizedSSOTokenError() 

2340 credentials = response['roleCredentials'] 

2341 

2342 credentials = { 

2343 'ProviderType': 'sso', 

2344 'Credentials': { 

2345 'AccessKeyId': credentials['accessKeyId'], 

2346 'SecretAccessKey': credentials['secretAccessKey'], 

2347 'SessionToken': credentials['sessionToken'], 

2348 'Expiration': self._parse_timestamp(credentials['expiration']), 

2349 'AccountId': self._account_id, 

2350 }, 

2351 } 

2352 return credentials 

2353 

2354 

2355class SSOProvider(CredentialProvider): 

2356 METHOD = 'sso' 

2357 

2358 _SSO_TOKEN_CACHE_DIR = os.path.expanduser( 

2359 os.path.join('~', '.aws', 'sso', 'cache') 

2360 ) 

2361 _PROFILE_REQUIRED_CONFIG_VARS = ( 

2362 'sso_role_name', 

2363 'sso_account_id', 

2364 ) 

2365 _SSO_REQUIRED_CONFIG_VARS = ( 

2366 'sso_start_url', 

2367 'sso_region', 

2368 ) 

2369 _ALL_REQUIRED_CONFIG_VARS = ( 

2370 _PROFILE_REQUIRED_CONFIG_VARS + _SSO_REQUIRED_CONFIG_VARS 

2371 ) 

2372 

2373 def __init__( 

2374 self, 

2375 load_config, 

2376 client_creator, 

2377 profile_name, 

2378 cache=None, 

2379 token_cache=None, 

2380 token_provider=None, 

2381 ): 

2382 if token_cache is None: 

2383 token_cache = JSONFileCache(self._SSO_TOKEN_CACHE_DIR) 

2384 self._token_cache = token_cache 

2385 self._token_provider = token_provider 

2386 if cache is None: 

2387 cache = {} 

2388 self.cache = cache 

2389 self._load_config = load_config 

2390 self._client_creator = client_creator 

2391 self._profile_name = profile_name 

2392 self._feature_ids = set() 

2393 

2394 def _load_sso_config(self): 

2395 loaded_config = self._load_config() 

2396 profiles = loaded_config.get('profiles', {}) 

2397 profile_name = self._profile_name 

2398 profile_config = profiles.get(self._profile_name, {}) 

2399 sso_sessions = loaded_config.get('sso_sessions', {}) 

2400 

2401 # Role name & Account ID indicate the cred provider should be used 

2402 if all( 

2403 c not in profile_config for c in self._PROFILE_REQUIRED_CONFIG_VARS 

2404 ): 

2405 return None 

2406 

2407 resolved_config, extra_reqs = self._resolve_sso_session_reference( 

2408 profile_config, sso_sessions 

2409 ) 

2410 

2411 config = {} 

2412 missing_config_vars = [] 

2413 all_required_configs = self._ALL_REQUIRED_CONFIG_VARS + extra_reqs 

2414 for config_var in all_required_configs: 

2415 if config_var in resolved_config: 

2416 config[config_var] = resolved_config[config_var] 

2417 else: 

2418 missing_config_vars.append(config_var) 

2419 

2420 if missing_config_vars: 

2421 missing = ', '.join(missing_config_vars) 

2422 raise InvalidConfigError( 

2423 error_msg=( 

2424 f'The profile "{profile_name}" is configured to use SSO ' 

2425 f'but is missing required configuration: {missing}' 

2426 ) 

2427 ) 

2428 return config 

2429 

2430 def _resolve_sso_session_reference(self, profile_config, sso_sessions): 

2431 sso_session_name = profile_config.get('sso_session') 

2432 if sso_session_name is None: 

2433 # No reference to resolve, proceed with legacy flow 

2434 return profile_config, () 

2435 

2436 if sso_session_name not in sso_sessions: 

2437 error_msg = f'The specified sso-session does not exist: "{sso_session_name}"' 

2438 raise InvalidConfigError(error_msg=error_msg) 

2439 

2440 config = profile_config.copy() 

2441 session = sso_sessions[sso_session_name] 

2442 for config_var, val in session.items(): 

2443 # Validate any keys referenced in both profile and sso_session match 

2444 if config.get(config_var, val) != val: 

2445 error_msg = ( 

2446 f"The value for {config_var} is inconsistent between " 

2447 f"profile ({config[config_var]}) and sso-session ({val})." 

2448 ) 

2449 raise InvalidConfigError(error_msg=error_msg) 

2450 config[config_var] = val 

2451 return config, ('sso_session',) 

2452 

2453 def load(self): 

2454 sso_config = self._load_sso_config() 

2455 if not sso_config: 

2456 return None 

2457 

2458 fetcher_kwargs = { 

2459 'start_url': sso_config['sso_start_url'], 

2460 'sso_region': sso_config['sso_region'], 

2461 'role_name': sso_config['sso_role_name'], 

2462 'account_id': sso_config['sso_account_id'], 

2463 'client_creator': self._client_creator, 

2464 'token_loader': SSOTokenLoader(cache=self._token_cache), 

2465 'cache': self.cache, 

2466 } 

2467 sso_session_in_config = 'sso_session' in sso_config 

2468 if sso_session_in_config: 

2469 fetcher_kwargs['sso_session_name'] = sso_config['sso_session'] 

2470 fetcher_kwargs['token_provider'] = self._token_provider 

2471 self._feature_ids.add('CREDENTIALS_PROFILE_SSO') 

2472 else: 

2473 self._feature_ids.add('CREDENTIALS_PROFILE_SSO_LEGACY') 

2474 

2475 sso_fetcher = SSOCredentialFetcher(**fetcher_kwargs) 

2476 sso_fetcher.feature_ids = self._feature_ids.copy() 

2477 

2478 if sso_session_in_config: 

2479 self._feature_ids.add('CREDENTIALS_SSO') 

2480 else: 

2481 self._feature_ids.add('CREDENTIALS_SSO_LEGACY') 

2482 

2483 register_feature_ids(self._feature_ids) 

2484 return DeferredRefreshableCredentials( 

2485 method=self.METHOD, 

2486 refresh_using=sso_fetcher.fetch_credentials, 

2487 ) 

2488 

2489 

2490def _base64_url_encode_no_padding(data): 

2491 return base64.urlsafe_b64encode(data).rstrip(b'=').decode('ascii') 

2492 

2493 

2494def _build_dpop_header(private_key, uri, uid=None, ts=None): 

2495 if EC is None: 

2496 raise MissingDependencyException( 

2497 msg=( 

2498 "This operation requires an additional dependency. You" 

2499 " will need to pip install \"botocore[crt]\" before proceeding." 

2500 ) 

2501 ) 

2502 x, y = private_key.get_public_coords() 

2503 jwk = { 

2504 "kty": "EC", 

2505 "x": _base64_url_encode_no_padding(x), 

2506 "y": _base64_url_encode_no_padding(y), 

2507 "crv": "P-256", 

2508 } 

2509 

2510 header = { 

2511 "typ": "dpop+jwt", 

2512 "alg": "ES256", 

2513 "jwk": jwk, 

2514 } 

2515 

2516 payload = { 

2517 "htm": "POST", 

2518 "htu": uri, 

2519 "iat": ts or int(time.time()), 

2520 "jti": uid or str(uuid.uuid4()), 

2521 } 

2522 header_b64 = _base64_url_encode_no_padding( 

2523 json.dumps(header, separators=(',', ':')).encode() 

2524 ) 

2525 payload_b64 = _base64_url_encode_no_padding( 

2526 json.dumps(payload, separators=(',', ':')).encode() 

2527 ) 

2528 signing_input = f"{header_b64}.{payload_b64}".encode() 

2529 signature = private_key.sign(sha256(signing_input).digest()) 

2530 r, s = EC.decode_der_signature(signature) 

2531 signature_bytes = r + s 

2532 signature_b64 = _base64_url_encode_no_padding(signature_bytes) 

2533 

2534 return f"{header_b64}.{payload_b64}.{signature_b64}" 

2535 

2536 

2537def _build_add_dpop_header_handler(private_key): 

2538 """Builds a before-call handler for calculating and setting the DPoP header""" 

2539 

2540 def _add_dpop_header_handler(**kwargs): 

2541 kwargs['params']['headers']['DPoP'] = _build_dpop_header( 

2542 private_key, kwargs['params']['url'] 

2543 ) 

2544 

2545 return _add_dpop_header_handler 

2546 

2547 

2548class LoginCredentialFetcher: 

2549 """ 

2550 Converts login access tokens from the cached token to 

2551 credentials, and supports refreshing them. 

2552 """ 

2553 

2554 _REFRESH_THRESHOLD = 5 * 60 

2555 _REQUIRED_TOKEN_FIELDS = ( 

2556 'accessToken', 

2557 'refreshToken', 

2558 'dpopKey', 

2559 'clientId', 

2560 ) 

2561 

2562 def __init__( 

2563 self, 

2564 session_name, 

2565 token_loader, 

2566 client_creator, 

2567 time_fetcher=_local_now, 

2568 feature_ids=None, 

2569 ): 

2570 self._session_name = session_name 

2571 self._token_loader = token_loader 

2572 self._client_creator = client_creator 

2573 self._time_fetcher = time_fetcher 

2574 if feature_ids is None: 

2575 feature_ids = set() 

2576 self.feature_ids = feature_ids 

2577 

2578 def load_cached_credentials(self): 

2579 """Loads cached credentials without checking their expiry.""" 

2580 token = self._token_loader.load_token(self._session_name) 

2581 

2582 if token is None: 

2583 raise LoginTokenLoadError( 

2584 error_msg='Unable to load a existing login session for session ' 

2585 f'{self._session_name}. Please reauthenticate with ' 

2586 "'aws login'.", 

2587 ) 

2588 

2589 missing_fields = [ 

2590 key for key in self._REQUIRED_TOKEN_FIELDS if key not in token 

2591 ] 

2592 if missing_fields: 

2593 raise LoginTokenLoadError( 

2594 error_msg=f'Failed to load access token from token cache, missing required fields: {", ".join(missing_fields)}.' 

2595 ) 

2596 

2597 return self._token_to_credentials(token) 

2598 

2599 def refresh_credentials(self): 

2600 """Refreshes login credentials, including saving them to the cache.""" 

2601 if self.feature_ids: 

2602 register_feature_ids(self.feature_ids) 

2603 # Reload the token from disk, we need the refresh info 

2604 token = self._token_loader.load_token(self._session_name) 

2605 private_key = self._load_private_key(token) 

2606 

2607 # Check if token has already been refreshed and is still valid 

2608 if ( 

2609 token 

2610 and 'accessToken' in token 

2611 and 'expiresAt' in token['accessToken'] 

2612 ): 

2613 expiry_time = _parse_if_needed(token['accessToken']['expiresAt']) 

2614 remaining_time = total_seconds(expiry_time - self._time_fetcher()) 

2615 if remaining_time > self._REFRESH_THRESHOLD: 

2616 return self._token_to_credentials(token) 

2617 

2618 config = botocore.config.Config( 

2619 signature_version=botocore.UNSIGNED, 

2620 ) 

2621 client = self._client_creator( 

2622 'signin', 

2623 config=config, 

2624 ) 

2625 

2626 client.meta.events.register( 

2627 'before-call.signin.CreateOAuth2Token', 

2628 _build_add_dpop_header_handler(private_key), 

2629 ) 

2630 

2631 try: 

2632 response = client.create_o_auth2_token( 

2633 tokenInput={ 

2634 'clientId': token['clientId'], 

2635 'refreshToken': token['refreshToken'], 

2636 'grantType': 'refresh_token', 

2637 }, 

2638 ) 

2639 except client.exceptions.AccessDeniedException as e: 

2640 error_type = e.response.get('error', '') 

2641 if error_type in ('TOKEN_EXPIRED', 'USER_CREDENTIALS_CHANGED'): 

2642 raise LoginRefreshRequired() from e 

2643 elif error_type == 'INSUFFICIENT_PERMISSIONS': 

2644 raise LoginInsufficientPermissions() from e 

2645 raise LoginError() from e 

2646 

2647 if response is None or 'tokenOutput' not in response: 

2648 raise LoginTokenLoadError( 

2649 error_msg=( 

2650 "Unable to refresh access token due to an invalid service response. " 

2651 "Please try running 'aws login' again. If the issue persists, there " 

2652 "may be a temporary signin service problem." 

2653 ) 

2654 ) 

2655 

2656 output = response.get('tokenOutput') 

2657 

2658 expires_timestamp = self._time_fetcher().astimezone( 

2659 tzutc() 

2660 ) + datetime.timedelta(seconds=output['expiresIn']) 

2661 

2662 # Overwrite token with refreshed fields 

2663 token.update( 

2664 { 

2665 'accessToken': { 

2666 'accessKeyId': output['accessToken']['accessKeyId'], 

2667 'secretAccessKey': output['accessToken'][ 

2668 'secretAccessKey' 

2669 ], 

2670 'sessionToken': output['accessToken']['sessionToken'], 

2671 'accountId': token['accessToken']['accountId'], 

2672 'expiresAt': expires_timestamp.strftime( 

2673 '%Y-%m-%dT%H:%M:%SZ' 

2674 ), 

2675 }, 

2676 'refreshToken': output['refreshToken'], 

2677 } 

2678 ) 

2679 self._token_loader.save_token(self._session_name, token) 

2680 

2681 return self._token_to_credentials(token) 

2682 

2683 @staticmethod 

2684 def _token_to_credentials(token): 

2685 return { 

2686 'access_key': token['accessToken']['accessKeyId'], 

2687 'secret_key': token['accessToken']['secretAccessKey'], 

2688 'token': token['accessToken']['sessionToken'], 

2689 'expiry_time': token['accessToken']['expiresAt'], 

2690 'account_id': token['accessToken']['accountId'], 

2691 } 

2692 

2693 @staticmethod 

2694 def _load_private_key(token): 

2695 if 'dpopKey' not in token: 

2696 raise LoginTokenLoadError( 

2697 error_msg='Private key not found in cached token.' 

2698 ) 

2699 

2700 # Remove the PEM header and footer lines 

2701 lines = token['dpopKey'].splitlines() 

2702 content_lines = [ 

2703 line 

2704 for line in lines 

2705 if not line.startswith('-----BEGIN') 

2706 and not line.startswith('-----END') 

2707 ] 

2708 

2709 # strip should handle the optional newline at the end as well 

2710 contents = ''.join(content_lines).strip() 

2711 

2712 try: 

2713 return EC.new_key_from_der_data(base64.b64decode(contents)) 

2714 except ValueError as e: 

2715 raise LoginTokenLoadError( 

2716 error_msg='Unable to load private key from cached token.' 

2717 ) from e 

2718 

2719 

2720class LoginProvider(CredentialProvider): 

2721 METHOD = 'login' 

2722 

2723 def __init__( 

2724 self, 

2725 load_config, 

2726 client_creator, 

2727 profile_name, 

2728 token_cache=None, 

2729 ): 

2730 super().__init__() 

2731 if token_cache is None: 

2732 token_cache = JSONFileCache(get_login_token_cache_directory()) 

2733 self._token_cache = token_cache 

2734 

2735 self._load_config = load_config 

2736 self._client_creator = client_creator 

2737 self._profile_name = profile_name 

2738 self._feature_ids = {'CREDENTIALS_PROFILE_LOGIN', 'CREDENTIALS_LOGIN'} 

2739 

2740 def load(self): 

2741 loaded_config = self._load_config() 

2742 profiles = loaded_config.get('profiles', {}) 

2743 profile_config = profiles.get(self._profile_name, {}) 

2744 

2745 if 'login_session' not in profile_config: 

2746 return None 

2747 

2748 if EC is None: 

2749 raise MissingDependencyException( 

2750 msg=( 

2751 "Using the login credential provider requires an " 

2752 "additional dependency. You will need to pip install " 

2753 "\"botocore[crt]\" before proceeding." 

2754 ) 

2755 ) 

2756 

2757 fetcher = LoginCredentialFetcher( 

2758 session_name=profile_config['login_session'], 

2759 token_loader=LoginTokenLoader(self._token_cache), 

2760 client_creator=self._client_creator, 

2761 time_fetcher=_local_now, 

2762 feature_ids=self._feature_ids, 

2763 ) 

2764 

2765 register_feature_ids(self._feature_ids) 

2766 

2767 # Return the current cached credentials initially, 

2768 # regardless if they're expired 

2769 cached_credentials = fetcher.load_cached_credentials() 

2770 

2771 return RefreshableCredentials( 

2772 access_key=cached_credentials['access_key'], 

2773 secret_key=cached_credentials['secret_key'], 

2774 token=cached_credentials['token'], 

2775 expiry_time=_parse_if_needed(cached_credentials['expiry_time']), 

2776 account_id=cached_credentials['account_id'], 

2777 method=self.METHOD, 

2778 refresh_using=fetcher.refresh_credentials, 

2779 time_fetcher=_local_now, 

2780 )