Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/credentials.py: 25%

979 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ 

2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"). You 

5# may not use this file except in compliance with the License. A copy of 

6# the License is located at 

7# 

8# http://aws.amazon.com/apache2.0/ 

9# 

10# or in the "license" file accompanying this file. This file is 

11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

12# ANY KIND, either express or implied. See the License for the specific 

13# language governing permissions and limitations under the License. 

14import datetime 

15import getpass 

16import json 

17import logging 

18import os 

19import subprocess 

20import threading 

21import time 

22from collections import namedtuple 

23from copy import deepcopy 

24from hashlib import sha1 

25 

26from dateutil.parser import parse 

27from dateutil.tz import tzlocal, tzutc 

28 

29import botocore.compat 

30import botocore.configloader 

31from botocore import UNSIGNED 

32from botocore.compat import compat_shell_split, total_seconds 

33from botocore.config import Config 

34from botocore.exceptions import ( 

35 ConfigNotFound, 

36 CredentialRetrievalError, 

37 InfiniteLoopConfigError, 

38 InvalidConfigError, 

39 MetadataRetrievalError, 

40 PartialCredentialsError, 

41 RefreshWithMFAUnsupportedError, 

42 UnauthorizedSSOTokenError, 

43 UnknownCredentialError, 

44) 

45from botocore.tokens import SSOTokenProvider 

46from botocore.utils import ( 

47 ContainerMetadataFetcher, 

48 FileWebIdentityTokenLoader, 

49 InstanceMetadataFetcher, 

50 JSONFileCache, 

51 SSOTokenLoader, 

52 parse_key_val_file, 

53 resolve_imds_endpoint_mode, 

54) 

55 

56logger = logging.getLogger(__name__) 

57ReadOnlyCredentials = namedtuple( 

58 'ReadOnlyCredentials', ['access_key', 'secret_key', 'token'] 

59) 

60 

61_DEFAULT_MANDATORY_REFRESH_TIMEOUT = 10 * 60 # 10 min 

62_DEFAULT_ADVISORY_REFRESH_TIMEOUT = 15 * 60 # 15 min 

63 

64 

65def create_credential_resolver(session, cache=None, region_name=None): 

66 """Create a default credential resolver. 

67 

68 This creates a pre-configured credential resolver 

69 that includes the default lookup chain for 

70 credentials. 

71 

72 """ 

73 profile_name = session.get_config_variable('profile') or 'default' 

74 metadata_timeout = session.get_config_variable('metadata_service_timeout') 

75 num_attempts = session.get_config_variable('metadata_service_num_attempts') 

76 disable_env_vars = session.instance_variables().get('profile') is not None 

77 

78 imds_config = { 

79 'ec2_metadata_service_endpoint': session.get_config_variable( 

80 'ec2_metadata_service_endpoint' 

81 ), 

82 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode( 

83 session 

84 ), 

85 'ec2_credential_refresh_window': _DEFAULT_ADVISORY_REFRESH_TIMEOUT, 

86 'ec2_metadata_v1_disabled': session.get_config_variable( 

87 'ec2_metadata_v1_disabled' 

88 ), 

89 } 

90 

91 if cache is None: 

92 cache = {} 

93 

94 env_provider = EnvProvider() 

95 container_provider = ContainerProvider() 

96 instance_metadata_provider = InstanceMetadataProvider( 

97 iam_role_fetcher=InstanceMetadataFetcher( 

98 timeout=metadata_timeout, 

99 num_attempts=num_attempts, 

100 user_agent=session.user_agent(), 

101 config=imds_config, 

102 ) 

103 ) 

104 

105 profile_provider_builder = ProfileProviderBuilder( 

106 session, cache=cache, region_name=region_name 

107 ) 

108 assume_role_provider = AssumeRoleProvider( 

109 load_config=lambda: session.full_config, 

110 client_creator=_get_client_creator(session, region_name), 

111 cache=cache, 

112 profile_name=profile_name, 

113 credential_sourcer=CanonicalNameCredentialSourcer( 

114 [env_provider, container_provider, instance_metadata_provider] 

115 ), 

116 profile_provider_builder=profile_provider_builder, 

117 ) 

118 

119 pre_profile = [ 

120 env_provider, 

121 assume_role_provider, 

122 ] 

123 profile_providers = profile_provider_builder.providers( 

124 profile_name=profile_name, 

125 disable_env_vars=disable_env_vars, 

126 ) 

127 post_profile = [ 

128 OriginalEC2Provider(), 

129 BotoProvider(), 

130 container_provider, 

131 instance_metadata_provider, 

132 ] 

133 providers = pre_profile + profile_providers + post_profile 

134 

135 if disable_env_vars: 

136 # An explicitly provided profile will negate an EnvProvider. 

137 # We will defer to providers that understand the "profile" 

138 # concept to retrieve credentials. 

139 # The one edge case if is all three values are provided via 

140 # env vars: 

141 # export AWS_ACCESS_KEY_ID=foo 

142 # export AWS_SECRET_ACCESS_KEY=bar 

143 # export AWS_PROFILE=baz 

144 # Then, just like our client() calls, the explicit credentials 

145 # will take precedence. 

146 # 

147 # This precedence is enforced by leaving the EnvProvider in the chain. 

148 # This means that the only way a "profile" would win is if the 

149 # EnvProvider does not return credentials, which is what we want 

150 # in this scenario. 

151 providers.remove(env_provider) 

152 logger.debug( 

153 'Skipping environment variable credential check' 

154 ' because profile name was explicitly set.' 

155 ) 

156 

157 resolver = CredentialResolver(providers=providers) 

158 return resolver 

159 

160 

161class ProfileProviderBuilder: 

162 """This class handles the creation of profile based providers. 

163 

164 NOTE: This class is only intended for internal use. 

165 

166 This class handles the creation and ordering of the various credential 

167 providers that primarly source their configuration from the shared config. 

168 This is needed to enable sharing between the default credential chain and 

169 the source profile chain created by the assume role provider. 

170 """ 

171 

172 def __init__( 

173 self, session, cache=None, region_name=None, sso_token_cache=None 

174 ): 

175 self._session = session 

176 self._cache = cache 

177 self._region_name = region_name 

178 self._sso_token_cache = sso_token_cache 

179 

180 def providers(self, profile_name, disable_env_vars=False): 

181 return [ 

182 self._create_web_identity_provider( 

183 profile_name, 

184 disable_env_vars, 

185 ), 

186 self._create_sso_provider(profile_name), 

187 self._create_shared_credential_provider(profile_name), 

188 self._create_process_provider(profile_name), 

189 self._create_config_provider(profile_name), 

190 ] 

191 

192 def _create_process_provider(self, profile_name): 

193 return ProcessProvider( 

194 profile_name=profile_name, 

195 load_config=lambda: self._session.full_config, 

196 ) 

197 

198 def _create_shared_credential_provider(self, profile_name): 

199 credential_file = self._session.get_config_variable('credentials_file') 

200 return SharedCredentialProvider( 

201 profile_name=profile_name, 

202 creds_filename=credential_file, 

203 ) 

204 

205 def _create_config_provider(self, profile_name): 

206 config_file = self._session.get_config_variable('config_file') 

207 return ConfigProvider( 

208 profile_name=profile_name, 

209 config_filename=config_file, 

210 ) 

211 

212 def _create_web_identity_provider(self, profile_name, disable_env_vars): 

213 return AssumeRoleWithWebIdentityProvider( 

214 load_config=lambda: self._session.full_config, 

215 client_creator=_get_client_creator( 

216 self._session, self._region_name 

217 ), 

218 cache=self._cache, 

219 profile_name=profile_name, 

220 disable_env_vars=disable_env_vars, 

221 ) 

222 

223 def _create_sso_provider(self, profile_name): 

224 return SSOProvider( 

225 load_config=lambda: self._session.full_config, 

226 client_creator=self._session.create_client, 

227 profile_name=profile_name, 

228 cache=self._cache, 

229 token_cache=self._sso_token_cache, 

230 token_provider=SSOTokenProvider( 

231 self._session, 

232 cache=self._sso_token_cache, 

233 profile_name=profile_name, 

234 ), 

235 ) 

236 

237 

238def get_credentials(session): 

239 resolver = create_credential_resolver(session) 

240 return resolver.load_credentials() 

241 

242 

243def _local_now(): 

244 return datetime.datetime.now(tzlocal()) 

245 

246 

247def _parse_if_needed(value): 

248 if isinstance(value, datetime.datetime): 

249 return value 

250 return parse(value) 

251 

252 

253def _serialize_if_needed(value, iso=False): 

254 if isinstance(value, datetime.datetime): 

255 if iso: 

256 return value.isoformat() 

257 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

258 return value 

259 

260 

261def _get_client_creator(session, region_name): 

262 def client_creator(service_name, **kwargs): 

263 create_client_kwargs = {'region_name': region_name} 

264 create_client_kwargs.update(**kwargs) 

265 return session.create_client(service_name, **create_client_kwargs) 

266 

267 return client_creator 

268 

269 

270def create_assume_role_refresher(client, params): 

271 def refresh(): 

272 response = client.assume_role(**params) 

273 credentials = response['Credentials'] 

274 # We need to normalize the credential names to 

275 # the values expected by the refresh creds. 

276 return { 

277 'access_key': credentials['AccessKeyId'], 

278 'secret_key': credentials['SecretAccessKey'], 

279 'token': credentials['SessionToken'], 

280 'expiry_time': _serialize_if_needed(credentials['Expiration']), 

281 } 

282 

283 return refresh 

284 

285 

286def create_mfa_serial_refresher(actual_refresh): 

287 class _Refresher: 

288 def __init__(self, refresh): 

289 self._refresh = refresh 

290 self._has_been_called = False 

291 

292 def __call__(self): 

293 if self._has_been_called: 

294 # We can explore an option in the future to support 

295 # reprompting for MFA, but for now we just error out 

296 # when the temp creds expire. 

297 raise RefreshWithMFAUnsupportedError() 

298 self._has_been_called = True 

299 return self._refresh() 

300 

301 return _Refresher(actual_refresh) 

302 

303 

304class Credentials: 

305 """ 

306 Holds the credentials needed to authenticate requests. 

307 

308 :param str access_key: The access key part of the credentials. 

309 :param str secret_key: The secret key part of the credentials. 

310 :param str token: The security token, valid only for session credentials. 

311 :param str method: A string which identifies where the credentials 

312 were found. 

313 """ 

314 

315 def __init__(self, access_key, secret_key, token=None, method=None): 

316 self.access_key = access_key 

317 self.secret_key = secret_key 

318 self.token = token 

319 

320 if method is None: 

321 method = 'explicit' 

322 self.method = method 

323 

324 self._normalize() 

325 

326 def _normalize(self): 

327 # Keys would sometimes (accidentally) contain non-ascii characters. 

328 # It would cause a confusing UnicodeDecodeError in Python 2. 

329 # We explicitly convert them into unicode to avoid such error. 

330 # 

331 # Eventually the service will decide whether to accept the credential. 

332 # This also complies with the behavior in Python 3. 

333 self.access_key = botocore.compat.ensure_unicode(self.access_key) 

334 self.secret_key = botocore.compat.ensure_unicode(self.secret_key) 

335 

336 def get_frozen_credentials(self): 

337 return ReadOnlyCredentials( 

338 self.access_key, self.secret_key, self.token 

339 ) 

340 

341 

342class RefreshableCredentials(Credentials): 

343 """ 

344 Holds the credentials needed to authenticate requests. In addition, it 

345 knows how to refresh itself. 

346 

347 :param str access_key: The access key part of the credentials. 

348 :param str secret_key: The secret key part of the credentials. 

349 :param str token: The security token, valid only for session credentials. 

350 :param datetime expiry_time: The expiration time of the credentials. 

351 :param function refresh_using: Callback function to refresh the credentials. 

352 :param str method: A string which identifies where the credentials 

353 were found. 

354 :param function time_fetcher: Callback function to retrieve current time. 

355 """ 

356 

357 # The time at which we'll attempt to refresh, but not 

358 # block if someone else is refreshing. 

359 _advisory_refresh_timeout = _DEFAULT_ADVISORY_REFRESH_TIMEOUT 

360 # The time at which all threads will block waiting for 

361 # refreshed credentials. 

362 _mandatory_refresh_timeout = _DEFAULT_MANDATORY_REFRESH_TIMEOUT 

363 

364 def __init__( 

365 self, 

366 access_key, 

367 secret_key, 

368 token, 

369 expiry_time, 

370 refresh_using, 

371 method, 

372 time_fetcher=_local_now, 

373 advisory_timeout=None, 

374 mandatory_timeout=None, 

375 ): 

376 self._refresh_using = refresh_using 

377 self._access_key = access_key 

378 self._secret_key = secret_key 

379 self._token = token 

380 self._expiry_time = expiry_time 

381 self._time_fetcher = time_fetcher 

382 self._refresh_lock = threading.Lock() 

383 self.method = method 

384 self._frozen_credentials = ReadOnlyCredentials( 

385 access_key, secret_key, token 

386 ) 

387 self._normalize() 

388 if advisory_timeout is not None: 

389 self._advisory_refresh_timeout = advisory_timeout 

390 if mandatory_timeout is not None: 

391 self._mandatory_refresh_timeout = mandatory_timeout 

392 

393 def _normalize(self): 

394 self._access_key = botocore.compat.ensure_unicode(self._access_key) 

395 self._secret_key = botocore.compat.ensure_unicode(self._secret_key) 

396 

397 @classmethod 

398 def create_from_metadata( 

399 cls, 

400 metadata, 

401 refresh_using, 

402 method, 

403 advisory_timeout=None, 

404 mandatory_timeout=None, 

405 ): 

406 kwargs = {} 

407 if advisory_timeout is not None: 

408 kwargs['advisory_timeout'] = advisory_timeout 

409 if mandatory_timeout is not None: 

410 kwargs['mandatory_timeout'] = mandatory_timeout 

411 

412 instance = cls( 

413 access_key=metadata['access_key'], 

414 secret_key=metadata['secret_key'], 

415 token=metadata['token'], 

416 expiry_time=cls._expiry_datetime(metadata['expiry_time']), 

417 method=method, 

418 refresh_using=refresh_using, 

419 **kwargs, 

420 ) 

421 return instance 

422 

423 @property 

424 def access_key(self): 

425 """Warning: Using this property can lead to race conditions if you 

426 access another property subsequently along the refresh boundary. 

427 Please use get_frozen_credentials instead. 

428 """ 

429 self._refresh() 

430 return self._access_key 

431 

432 @access_key.setter 

433 def access_key(self, value): 

434 self._access_key = value 

435 

436 @property 

437 def secret_key(self): 

438 """Warning: Using this property can lead to race conditions if you 

439 access another property subsequently along the refresh boundary. 

440 Please use get_frozen_credentials instead. 

441 """ 

442 self._refresh() 

443 return self._secret_key 

444 

445 @secret_key.setter 

446 def secret_key(self, value): 

447 self._secret_key = value 

448 

449 @property 

450 def token(self): 

451 """Warning: Using this property can lead to race conditions if you 

452 access another property subsequently along the refresh boundary. 

453 Please use get_frozen_credentials instead. 

454 """ 

455 self._refresh() 

456 return self._token 

457 

458 @token.setter 

459 def token(self, value): 

460 self._token = value 

461 

462 def _seconds_remaining(self): 

463 delta = self._expiry_time - self._time_fetcher() 

464 return total_seconds(delta) 

465 

466 def refresh_needed(self, refresh_in=None): 

467 """Check if a refresh is needed. 

468 

469 A refresh is needed if the expiry time associated 

470 with the temporary credentials is less than the 

471 provided ``refresh_in``. If ``time_delta`` is not 

472 provided, ``self.advisory_refresh_needed`` will be used. 

473 

474 For example, if your temporary credentials expire 

475 in 10 minutes and the provided ``refresh_in`` is 

476 ``15 * 60``, then this function will return ``True``. 

477 

478 :type refresh_in: int 

479 :param refresh_in: The number of seconds before the 

480 credentials expire in which refresh attempts should 

481 be made. 

482 

483 :return: True if refresh needed, False otherwise. 

484 

485 """ 

486 if self._expiry_time is None: 

487 # No expiration, so assume we don't need to refresh. 

488 return False 

489 

490 if refresh_in is None: 

491 refresh_in = self._advisory_refresh_timeout 

492 # The credentials should be refreshed if they're going to expire 

493 # in less than 5 minutes. 

494 if self._seconds_remaining() >= refresh_in: 

495 # There's enough time left. Don't refresh. 

496 return False 

497 logger.debug("Credentials need to be refreshed.") 

498 return True 

499 

500 def _is_expired(self): 

501 # Checks if the current credentials are expired. 

502 return self.refresh_needed(refresh_in=0) 

503 

504 def _refresh(self): 

505 # In the common case where we don't need a refresh, we 

506 # can immediately exit and not require acquiring the 

507 # refresh lock. 

508 if not self.refresh_needed(self._advisory_refresh_timeout): 

509 return 

510 

511 # acquire() doesn't accept kwargs, but False is indicating 

512 # that we should not block if we can't acquire the lock. 

513 # If we aren't able to acquire the lock, we'll trigger 

514 # the else clause. 

515 if self._refresh_lock.acquire(False): 

516 try: 

517 if not self.refresh_needed(self._advisory_refresh_timeout): 

518 return 

519 is_mandatory_refresh = self.refresh_needed( 

520 self._mandatory_refresh_timeout 

521 ) 

522 self._protected_refresh(is_mandatory=is_mandatory_refresh) 

523 return 

524 finally: 

525 self._refresh_lock.release() 

526 elif self.refresh_needed(self._mandatory_refresh_timeout): 

527 # If we're within the mandatory refresh window, 

528 # we must block until we get refreshed credentials. 

529 with self._refresh_lock: 

530 if not self.refresh_needed(self._mandatory_refresh_timeout): 

531 return 

532 self._protected_refresh(is_mandatory=True) 

533 

534 def _protected_refresh(self, is_mandatory): 

535 # precondition: this method should only be called if you've acquired 

536 # the self._refresh_lock. 

537 try: 

538 metadata = self._refresh_using() 

539 except Exception: 

540 period_name = 'mandatory' if is_mandatory else 'advisory' 

541 logger.warning( 

542 "Refreshing temporary credentials failed " 

543 "during %s refresh period.", 

544 period_name, 

545 exc_info=True, 

546 ) 

547 if is_mandatory: 

548 # If this is a mandatory refresh, then 

549 # all errors that occur when we attempt to refresh 

550 # credentials are propagated back to the user. 

551 raise 

552 # Otherwise we'll just return. 

553 # The end result will be that we'll use the current 

554 # set of temporary credentials we have. 

555 return 

556 self._set_from_data(metadata) 

557 self._frozen_credentials = ReadOnlyCredentials( 

558 self._access_key, self._secret_key, self._token 

559 ) 

560 if self._is_expired(): 

561 # We successfully refreshed credentials but for whatever 

562 # reason, our refreshing function returned credentials 

563 # that are still expired. In this scenario, the only 

564 # thing we can do is let the user know and raise 

565 # an exception. 

566 msg = ( 

567 "Credentials were refreshed, but the " 

568 "refreshed credentials are still expired." 

569 ) 

570 logger.warning(msg) 

571 raise RuntimeError(msg) 

572 

573 @staticmethod 

574 def _expiry_datetime(time_str): 

575 return parse(time_str) 

576 

577 def _set_from_data(self, data): 

578 expected_keys = ['access_key', 'secret_key', 'token', 'expiry_time'] 

579 if not data: 

580 missing_keys = expected_keys 

581 else: 

582 missing_keys = [k for k in expected_keys if k not in data] 

583 

584 if missing_keys: 

585 message = "Credential refresh failed, response did not contain: %s" 

586 raise CredentialRetrievalError( 

587 provider=self.method, 

588 error_msg=message % ', '.join(missing_keys), 

589 ) 

590 

591 self.access_key = data['access_key'] 

592 self.secret_key = data['secret_key'] 

593 self.token = data['token'] 

594 self._expiry_time = parse(data['expiry_time']) 

595 logger.debug( 

596 "Retrieved credentials will expire at: %s", self._expiry_time 

597 ) 

598 self._normalize() 

599 

600 def get_frozen_credentials(self): 

601 """Return immutable credentials. 

602 

603 The ``access_key``, ``secret_key``, and ``token`` properties 

604 on this class will always check and refresh credentials if 

605 needed before returning the particular credentials. 

606 

607 This has an edge case where you can get inconsistent 

608 credentials. Imagine this: 

609 

610 # Current creds are "t1" 

611 tmp.access_key ---> expired? no, so return t1.access_key 

612 # ---- time is now expired, creds need refreshing to "t2" ---- 

613 tmp.secret_key ---> expired? yes, refresh and return t2.secret_key 

614 

615 This means we're using the access key from t1 with the secret key 

616 from t2. To fix this issue, you can request a frozen credential object 

617 which is guaranteed not to change. 

618 

619 The frozen credentials returned from this method should be used 

620 immediately and then discarded. The typical usage pattern would 

621 be:: 

622 

623 creds = RefreshableCredentials(...) 

624 some_code = SomeSignerObject() 

625 # I'm about to sign the request. 

626 # The frozen credentials are only used for the 

627 # duration of generate_presigned_url and will be 

628 # immediately thrown away. 

629 request = some_code.sign_some_request( 

630 with_credentials=creds.get_frozen_credentials()) 

631 print("Signed request:", request) 

632 

633 """ 

634 self._refresh() 

635 return self._frozen_credentials 

636 

637 

638class DeferredRefreshableCredentials(RefreshableCredentials): 

639 """Refreshable credentials that don't require initial credentials. 

640 

641 refresh_using will be called upon first access. 

642 """ 

643 

644 def __init__(self, refresh_using, method, time_fetcher=_local_now): 

645 self._refresh_using = refresh_using 

646 self._access_key = None 

647 self._secret_key = None 

648 self._token = None 

649 self._expiry_time = None 

650 self._time_fetcher = time_fetcher 

651 self._refresh_lock = threading.Lock() 

652 self.method = method 

653 self._frozen_credentials = None 

654 

655 def refresh_needed(self, refresh_in=None): 

656 if self._frozen_credentials is None: 

657 return True 

658 return super().refresh_needed(refresh_in) 

659 

660 

661class CachedCredentialFetcher: 

662 DEFAULT_EXPIRY_WINDOW_SECONDS = 60 * 15 

663 

664 def __init__(self, cache=None, expiry_window_seconds=None): 

665 if cache is None: 

666 cache = {} 

667 self._cache = cache 

668 self._cache_key = self._create_cache_key() 

669 if expiry_window_seconds is None: 

670 expiry_window_seconds = self.DEFAULT_EXPIRY_WINDOW_SECONDS 

671 self._expiry_window_seconds = expiry_window_seconds 

672 

673 def _create_cache_key(self): 

674 raise NotImplementedError('_create_cache_key()') 

675 

676 def _make_file_safe(self, filename): 

677 # Replace :, path sep, and / to make it the string filename safe. 

678 filename = filename.replace(':', '_').replace(os.sep, '_') 

679 return filename.replace('/', '_') 

680 

681 def _get_credentials(self): 

682 raise NotImplementedError('_get_credentials()') 

683 

684 def fetch_credentials(self): 

685 return self._get_cached_credentials() 

686 

687 def _get_cached_credentials(self): 

688 """Get up-to-date credentials. 

689 

690 This will check the cache for up-to-date credentials, calling assume 

691 role if none are available. 

692 """ 

693 response = self._load_from_cache() 

694 if response is None: 

695 response = self._get_credentials() 

696 self._write_to_cache(response) 

697 else: 

698 logger.debug("Credentials for role retrieved from cache.") 

699 

700 creds = response['Credentials'] 

701 expiration = _serialize_if_needed(creds['Expiration'], iso=True) 

702 return { 

703 'access_key': creds['AccessKeyId'], 

704 'secret_key': creds['SecretAccessKey'], 

705 'token': creds['SessionToken'], 

706 'expiry_time': expiration, 

707 } 

708 

709 def _load_from_cache(self): 

710 if self._cache_key in self._cache: 

711 creds = deepcopy(self._cache[self._cache_key]) 

712 if not self._is_expired(creds): 

713 return creds 

714 else: 

715 logger.debug( 

716 "Credentials were found in cache, but they are expired." 

717 ) 

718 return None 

719 

720 def _write_to_cache(self, response): 

721 self._cache[self._cache_key] = deepcopy(response) 

722 

723 def _is_expired(self, credentials): 

724 """Check if credentials are expired.""" 

725 end_time = _parse_if_needed(credentials['Credentials']['Expiration']) 

726 seconds = total_seconds(end_time - _local_now()) 

727 return seconds < self._expiry_window_seconds 

728 

729 

730class BaseAssumeRoleCredentialFetcher(CachedCredentialFetcher): 

731 def __init__( 

732 self, 

733 client_creator, 

734 role_arn, 

735 extra_args=None, 

736 cache=None, 

737 expiry_window_seconds=None, 

738 ): 

739 self._client_creator = client_creator 

740 self._role_arn = role_arn 

741 

742 if extra_args is None: 

743 self._assume_kwargs = {} 

744 else: 

745 self._assume_kwargs = deepcopy(extra_args) 

746 self._assume_kwargs['RoleArn'] = self._role_arn 

747 

748 self._role_session_name = self._assume_kwargs.get('RoleSessionName') 

749 self._using_default_session_name = False 

750 if not self._role_session_name: 

751 self._generate_assume_role_name() 

752 

753 super().__init__(cache, expiry_window_seconds) 

754 

755 def _generate_assume_role_name(self): 

756 self._role_session_name = 'botocore-session-%s' % (int(time.time())) 

757 self._assume_kwargs['RoleSessionName'] = self._role_session_name 

758 self._using_default_session_name = True 

759 

760 def _create_cache_key(self): 

761 """Create a predictable cache key for the current configuration. 

762 

763 The cache key is intended to be compatible with file names. 

764 """ 

765 args = deepcopy(self._assume_kwargs) 

766 

767 # The role session name gets randomly generated, so we don't want it 

768 # in the hash. 

769 if self._using_default_session_name: 

770 del args['RoleSessionName'] 

771 

772 if 'Policy' in args: 

773 # To have a predictable hash, the keys of the policy must be 

774 # sorted, so we have to load it here to make sure it gets sorted 

775 # later on. 

776 args['Policy'] = json.loads(args['Policy']) 

777 

778 args = json.dumps(args, sort_keys=True) 

779 argument_hash = sha1(args.encode('utf-8')).hexdigest() 

780 return self._make_file_safe(argument_hash) 

781 

782 

783class AssumeRoleCredentialFetcher(BaseAssumeRoleCredentialFetcher): 

784 def __init__( 

785 self, 

786 client_creator, 

787 source_credentials, 

788 role_arn, 

789 extra_args=None, 

790 mfa_prompter=None, 

791 cache=None, 

792 expiry_window_seconds=None, 

793 ): 

794 """ 

795 :type client_creator: callable 

796 :param client_creator: A callable that creates a client taking 

797 arguments like ``Session.create_client``. 

798 

799 :type source_credentials: Credentials 

800 :param source_credentials: The credentials to use to create the 

801 client for the call to AssumeRole. 

802 

803 :type role_arn: str 

804 :param role_arn: The ARN of the role to be assumed. 

805 

806 :type extra_args: dict 

807 :param extra_args: Any additional arguments to add to the assume 

808 role request using the format of the botocore operation. 

809 Possible keys include, but may not be limited to, 

810 DurationSeconds, Policy, SerialNumber, ExternalId and 

811 RoleSessionName. 

812 

813 :type mfa_prompter: callable 

814 :param mfa_prompter: A callable that returns input provided by the 

815 user (i.e raw_input, getpass.getpass, etc.). 

816 

817 :type cache: dict 

818 :param cache: An object that supports ``__getitem__``, 

819 ``__setitem__``, and ``__contains__``. An example of this is 

820 the ``JSONFileCache`` class in aws-cli. 

821 

822 :type expiry_window_seconds: int 

823 :param expiry_window_seconds: The amount of time, in seconds, 

824 """ 

825 self._source_credentials = source_credentials 

826 self._mfa_prompter = mfa_prompter 

827 if self._mfa_prompter is None: 

828 self._mfa_prompter = getpass.getpass 

829 

830 super().__init__( 

831 client_creator, 

832 role_arn, 

833 extra_args=extra_args, 

834 cache=cache, 

835 expiry_window_seconds=expiry_window_seconds, 

836 ) 

837 

838 def _get_credentials(self): 

839 """Get credentials by calling assume role.""" 

840 kwargs = self._assume_role_kwargs() 

841 client = self._create_client() 

842 return client.assume_role(**kwargs) 

843 

844 def _assume_role_kwargs(self): 

845 """Get the arguments for assume role based on current configuration.""" 

846 assume_role_kwargs = deepcopy(self._assume_kwargs) 

847 

848 mfa_serial = assume_role_kwargs.get('SerialNumber') 

849 

850 if mfa_serial is not None: 

851 prompt = 'Enter MFA code for %s: ' % mfa_serial 

852 token_code = self._mfa_prompter(prompt) 

853 assume_role_kwargs['TokenCode'] = token_code 

854 

855 duration_seconds = assume_role_kwargs.get('DurationSeconds') 

856 

857 if duration_seconds is not None: 

858 assume_role_kwargs['DurationSeconds'] = duration_seconds 

859 

860 return assume_role_kwargs 

861 

862 def _create_client(self): 

863 """Create an STS client using the source credentials.""" 

864 frozen_credentials = self._source_credentials.get_frozen_credentials() 

865 return self._client_creator( 

866 'sts', 

867 aws_access_key_id=frozen_credentials.access_key, 

868 aws_secret_access_key=frozen_credentials.secret_key, 

869 aws_session_token=frozen_credentials.token, 

870 ) 

871 

872 

873class AssumeRoleWithWebIdentityCredentialFetcher( 

874 BaseAssumeRoleCredentialFetcher 

875): 

876 def __init__( 

877 self, 

878 client_creator, 

879 web_identity_token_loader, 

880 role_arn, 

881 extra_args=None, 

882 cache=None, 

883 expiry_window_seconds=None, 

884 ): 

885 """ 

886 :type client_creator: callable 

887 :param client_creator: A callable that creates a client taking 

888 arguments like ``Session.create_client``. 

889 

890 :type web_identity_token_loader: callable 

891 :param web_identity_token_loader: A callable that takes no arguments 

892 and returns a web identity token str. 

893 

894 :type role_arn: str 

895 :param role_arn: The ARN of the role to be assumed. 

896 

897 :type extra_args: dict 

898 :param extra_args: Any additional arguments to add to the assume 

899 role request using the format of the botocore operation. 

900 Possible keys include, but may not be limited to, 

901 DurationSeconds, Policy, SerialNumber, ExternalId and 

902 RoleSessionName. 

903 

904 :type cache: dict 

905 :param cache: An object that supports ``__getitem__``, 

906 ``__setitem__``, and ``__contains__``. An example of this is 

907 the ``JSONFileCache`` class in aws-cli. 

908 

909 :type expiry_window_seconds: int 

910 :param expiry_window_seconds: The amount of time, in seconds, 

911 """ 

912 self._web_identity_token_loader = web_identity_token_loader 

913 

914 super().__init__( 

915 client_creator, 

916 role_arn, 

917 extra_args=extra_args, 

918 cache=cache, 

919 expiry_window_seconds=expiry_window_seconds, 

920 ) 

921 

922 def _get_credentials(self): 

923 """Get credentials by calling assume role.""" 

924 kwargs = self._assume_role_kwargs() 

925 # Assume role with web identity does not require credentials other than 

926 # the token, explicitly configure the client to not sign requests. 

927 config = Config(signature_version=UNSIGNED) 

928 client = self._client_creator('sts', config=config) 

929 return client.assume_role_with_web_identity(**kwargs) 

930 

931 def _assume_role_kwargs(self): 

932 """Get the arguments for assume role based on current configuration.""" 

933 assume_role_kwargs = deepcopy(self._assume_kwargs) 

934 identity_token = self._web_identity_token_loader() 

935 assume_role_kwargs['WebIdentityToken'] = identity_token 

936 

937 return assume_role_kwargs 

938 

939 

940class CredentialProvider: 

941 # A short name to identify the provider within botocore. 

942 METHOD = None 

943 

944 # A name to identify the provider for use in cross-sdk features like 

945 # assume role's `credential_source` configuration option. These names 

946 # are to be treated in a case-insensitive way. NOTE: any providers not 

947 # implemented in botocore MUST prefix their canonical names with 

948 # 'custom' or we DO NOT guarantee that it will work with any features 

949 # that this provides. 

950 CANONICAL_NAME = None 

951 

952 def __init__(self, session=None): 

953 self.session = session 

954 

955 def load(self): 

956 """ 

957 Loads the credentials from their source & sets them on the object. 

958 

959 Subclasses should implement this method (by reading from disk, the 

960 environment, the network or wherever), returning ``True`` if they were 

961 found & loaded. 

962 

963 If not found, this method should return ``False``, indictating that the 

964 ``CredentialResolver`` should fall back to the next available method. 

965 

966 The default implementation does nothing, assuming the user has set the 

967 ``access_key/secret_key/token`` themselves. 

968 

969 :returns: Whether credentials were found & set 

970 :rtype: Credentials 

971 """ 

972 return True 

973 

974 def _extract_creds_from_mapping(self, mapping, *key_names): 

975 found = [] 

976 for key_name in key_names: 

977 try: 

978 found.append(mapping[key_name]) 

979 except KeyError: 

980 raise PartialCredentialsError( 

981 provider=self.METHOD, cred_var=key_name 

982 ) 

983 return found 

984 

985 

986class ProcessProvider(CredentialProvider): 

987 METHOD = 'custom-process' 

988 

989 def __init__(self, profile_name, load_config, popen=subprocess.Popen): 

990 self._profile_name = profile_name 

991 self._load_config = load_config 

992 self._loaded_config = None 

993 self._popen = popen 

994 

995 def load(self): 

996 credential_process = self._credential_process 

997 if credential_process is None: 

998 return 

999 

1000 creds_dict = self._retrieve_credentials_using(credential_process) 

1001 if creds_dict.get('expiry_time') is not None: 

1002 return RefreshableCredentials.create_from_metadata( 

1003 creds_dict, 

1004 lambda: self._retrieve_credentials_using(credential_process), 

1005 self.METHOD, 

1006 ) 

1007 

1008 return Credentials( 

1009 access_key=creds_dict['access_key'], 

1010 secret_key=creds_dict['secret_key'], 

1011 token=creds_dict.get('token'), 

1012 method=self.METHOD, 

1013 ) 

1014 

1015 def _retrieve_credentials_using(self, credential_process): 

1016 # We're not using shell=True, so we need to pass the 

1017 # command and all arguments as a list. 

1018 process_list = compat_shell_split(credential_process) 

1019 p = self._popen( 

1020 process_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

1021 ) 

1022 stdout, stderr = p.communicate() 

1023 if p.returncode != 0: 

1024 raise CredentialRetrievalError( 

1025 provider=self.METHOD, error_msg=stderr.decode('utf-8') 

1026 ) 

1027 parsed = botocore.compat.json.loads(stdout.decode('utf-8')) 

1028 version = parsed.get('Version', '<Version key not provided>') 

1029 if version != 1: 

1030 raise CredentialRetrievalError( 

1031 provider=self.METHOD, 

1032 error_msg=( 

1033 f"Unsupported version '{version}' for credential process " 

1034 f"provider, supported versions: 1" 

1035 ), 

1036 ) 

1037 try: 

1038 return { 

1039 'access_key': parsed['AccessKeyId'], 

1040 'secret_key': parsed['SecretAccessKey'], 

1041 'token': parsed.get('SessionToken'), 

1042 'expiry_time': parsed.get('Expiration'), 

1043 } 

1044 except KeyError as e: 

1045 raise CredentialRetrievalError( 

1046 provider=self.METHOD, 

1047 error_msg=f"Missing required key in response: {e}", 

1048 ) 

1049 

1050 @property 

1051 def _credential_process(self): 

1052 if self._loaded_config is None: 

1053 self._loaded_config = self._load_config() 

1054 profile_config = self._loaded_config.get('profiles', {}).get( 

1055 self._profile_name, {} 

1056 ) 

1057 return profile_config.get('credential_process') 

1058 

1059 

1060class InstanceMetadataProvider(CredentialProvider): 

1061 METHOD = 'iam-role' 

1062 CANONICAL_NAME = 'Ec2InstanceMetadata' 

1063 

1064 def __init__(self, iam_role_fetcher): 

1065 self._role_fetcher = iam_role_fetcher 

1066 

1067 def load(self): 

1068 fetcher = self._role_fetcher 

1069 # We do the first request, to see if we get useful data back. 

1070 # If not, we'll pass & move on to whatever's next in the credential 

1071 # chain. 

1072 metadata = fetcher.retrieve_iam_role_credentials() 

1073 if not metadata: 

1074 return None 

1075 logger.info( 

1076 'Found credentials from IAM Role: %s', metadata['role_name'] 

1077 ) 

1078 # We manually set the data here, since we already made the request & 

1079 # have it. When the expiry is hit, the credentials will auto-refresh 

1080 # themselves. 

1081 creds = RefreshableCredentials.create_from_metadata( 

1082 metadata, 

1083 method=self.METHOD, 

1084 refresh_using=fetcher.retrieve_iam_role_credentials, 

1085 ) 

1086 return creds 

1087 

1088 

1089class EnvProvider(CredentialProvider): 

1090 METHOD = 'env' 

1091 CANONICAL_NAME = 'Environment' 

1092 ACCESS_KEY = 'AWS_ACCESS_KEY_ID' 

1093 SECRET_KEY = 'AWS_SECRET_ACCESS_KEY' 

1094 # The token can come from either of these env var. 

1095 # AWS_SESSION_TOKEN is what other AWS SDKs have standardized on. 

1096 TOKENS = ['AWS_SECURITY_TOKEN', 'AWS_SESSION_TOKEN'] 

1097 EXPIRY_TIME = 'AWS_CREDENTIAL_EXPIRATION' 

1098 

1099 def __init__(self, environ=None, mapping=None): 

1100 """ 

1101 

1102 :param environ: The environment variables (defaults to 

1103 ``os.environ`` if no value is provided). 

1104 :param mapping: An optional mapping of variable names to 

1105 environment variable names. Use this if you want to 

1106 change the mapping of access_key->AWS_ACCESS_KEY_ID, etc. 

1107 The dict can have up to 3 keys: ``access_key``, ``secret_key``, 

1108 ``session_token``. 

1109 """ 

1110 if environ is None: 

1111 environ = os.environ 

1112 self.environ = environ 

1113 self._mapping = self._build_mapping(mapping) 

1114 

1115 def _build_mapping(self, mapping): 

1116 # Mapping of variable name to env var name. 

1117 var_mapping = {} 

1118 if mapping is None: 

1119 # Use the class var default. 

1120 var_mapping['access_key'] = self.ACCESS_KEY 

1121 var_mapping['secret_key'] = self.SECRET_KEY 

1122 var_mapping['token'] = self.TOKENS 

1123 var_mapping['expiry_time'] = self.EXPIRY_TIME 

1124 else: 

1125 var_mapping['access_key'] = mapping.get( 

1126 'access_key', self.ACCESS_KEY 

1127 ) 

1128 var_mapping['secret_key'] = mapping.get( 

1129 'secret_key', self.SECRET_KEY 

1130 ) 

1131 var_mapping['token'] = mapping.get('token', self.TOKENS) 

1132 if not isinstance(var_mapping['token'], list): 

1133 var_mapping['token'] = [var_mapping['token']] 

1134 var_mapping['expiry_time'] = mapping.get( 

1135 'expiry_time', self.EXPIRY_TIME 

1136 ) 

1137 return var_mapping 

1138 

1139 def load(self): 

1140 """ 

1141 Search for credentials in explicit environment variables. 

1142 """ 

1143 

1144 access_key = self.environ.get(self._mapping['access_key'], '') 

1145 

1146 if access_key: 

1147 logger.info('Found credentials in environment variables.') 

1148 fetcher = self._create_credentials_fetcher() 

1149 credentials = fetcher(require_expiry=False) 

1150 

1151 expiry_time = credentials['expiry_time'] 

1152 if expiry_time is not None: 

1153 expiry_time = parse(expiry_time) 

1154 return RefreshableCredentials( 

1155 credentials['access_key'], 

1156 credentials['secret_key'], 

1157 credentials['token'], 

1158 expiry_time, 

1159 refresh_using=fetcher, 

1160 method=self.METHOD, 

1161 ) 

1162 

1163 return Credentials( 

1164 credentials['access_key'], 

1165 credentials['secret_key'], 

1166 credentials['token'], 

1167 method=self.METHOD, 

1168 ) 

1169 else: 

1170 return None 

1171 

1172 def _create_credentials_fetcher(self): 

1173 mapping = self._mapping 

1174 method = self.METHOD 

1175 environ = self.environ 

1176 

1177 def fetch_credentials(require_expiry=True): 

1178 credentials = {} 

1179 

1180 access_key = environ.get(mapping['access_key'], '') 

1181 if not access_key: 

1182 raise PartialCredentialsError( 

1183 provider=method, cred_var=mapping['access_key'] 

1184 ) 

1185 credentials['access_key'] = access_key 

1186 

1187 secret_key = environ.get(mapping['secret_key'], '') 

1188 if not secret_key: 

1189 raise PartialCredentialsError( 

1190 provider=method, cred_var=mapping['secret_key'] 

1191 ) 

1192 credentials['secret_key'] = secret_key 

1193 

1194 credentials['token'] = None 

1195 for token_env_var in mapping['token']: 

1196 token = environ.get(token_env_var, '') 

1197 if token: 

1198 credentials['token'] = token 

1199 break 

1200 

1201 credentials['expiry_time'] = None 

1202 expiry_time = environ.get(mapping['expiry_time'], '') 

1203 if expiry_time: 

1204 credentials['expiry_time'] = expiry_time 

1205 if require_expiry and not expiry_time: 

1206 raise PartialCredentialsError( 

1207 provider=method, cred_var=mapping['expiry_time'] 

1208 ) 

1209 

1210 return credentials 

1211 

1212 return fetch_credentials 

1213 

1214 

1215class OriginalEC2Provider(CredentialProvider): 

1216 METHOD = 'ec2-credentials-file' 

1217 CANONICAL_NAME = 'Ec2Config' 

1218 

1219 CRED_FILE_ENV = 'AWS_CREDENTIAL_FILE' 

1220 ACCESS_KEY = 'AWSAccessKeyId' 

1221 SECRET_KEY = 'AWSSecretKey' 

1222 

1223 def __init__(self, environ=None, parser=None): 

1224 if environ is None: 

1225 environ = os.environ 

1226 if parser is None: 

1227 parser = parse_key_val_file 

1228 self._environ = environ 

1229 self._parser = parser 

1230 

1231 def load(self): 

1232 """ 

1233 Search for a credential file used by original EC2 CLI tools. 

1234 """ 

1235 if 'AWS_CREDENTIAL_FILE' in self._environ: 

1236 full_path = os.path.expanduser( 

1237 self._environ['AWS_CREDENTIAL_FILE'] 

1238 ) 

1239 creds = self._parser(full_path) 

1240 if self.ACCESS_KEY in creds: 

1241 logger.info('Found credentials in AWS_CREDENTIAL_FILE.') 

1242 access_key = creds[self.ACCESS_KEY] 

1243 secret_key = creds[self.SECRET_KEY] 

1244 # EC2 creds file doesn't support session tokens. 

1245 return Credentials(access_key, secret_key, method=self.METHOD) 

1246 else: 

1247 return None 

1248 

1249 

1250class SharedCredentialProvider(CredentialProvider): 

1251 METHOD = 'shared-credentials-file' 

1252 CANONICAL_NAME = 'SharedCredentials' 

1253 

1254 ACCESS_KEY = 'aws_access_key_id' 

1255 SECRET_KEY = 'aws_secret_access_key' 

1256 # Same deal as the EnvProvider above. Botocore originally supported 

1257 # aws_security_token, but the SDKs are standardizing on aws_session_token 

1258 # so we support both. 

1259 TOKENS = ['aws_security_token', 'aws_session_token'] 

1260 

1261 def __init__(self, creds_filename, profile_name=None, ini_parser=None): 

1262 self._creds_filename = creds_filename 

1263 if profile_name is None: 

1264 profile_name = 'default' 

1265 self._profile_name = profile_name 

1266 if ini_parser is None: 

1267 ini_parser = botocore.configloader.raw_config_parse 

1268 self._ini_parser = ini_parser 

1269 

1270 def load(self): 

1271 try: 

1272 available_creds = self._ini_parser(self._creds_filename) 

1273 except ConfigNotFound: 

1274 return None 

1275 if self._profile_name in available_creds: 

1276 config = available_creds[self._profile_name] 

1277 if self.ACCESS_KEY in config: 

1278 logger.info( 

1279 "Found credentials in shared credentials file: %s", 

1280 self._creds_filename, 

1281 ) 

1282 access_key, secret_key = self._extract_creds_from_mapping( 

1283 config, self.ACCESS_KEY, self.SECRET_KEY 

1284 ) 

1285 token = self._get_session_token(config) 

1286 return Credentials( 

1287 access_key, secret_key, token, method=self.METHOD 

1288 ) 

1289 

1290 def _get_session_token(self, config): 

1291 for token_envvar in self.TOKENS: 

1292 if token_envvar in config: 

1293 return config[token_envvar] 

1294 

1295 

1296class ConfigProvider(CredentialProvider): 

1297 """INI based config provider with profile sections.""" 

1298 

1299 METHOD = 'config-file' 

1300 CANONICAL_NAME = 'SharedConfig' 

1301 

1302 ACCESS_KEY = 'aws_access_key_id' 

1303 SECRET_KEY = 'aws_secret_access_key' 

1304 # Same deal as the EnvProvider above. Botocore originally supported 

1305 # aws_security_token, but the SDKs are standardizing on aws_session_token 

1306 # so we support both. 

1307 TOKENS = ['aws_security_token', 'aws_session_token'] 

1308 

1309 def __init__(self, config_filename, profile_name, config_parser=None): 

1310 """ 

1311 

1312 :param config_filename: The session configuration scoped to the current 

1313 profile. This is available via ``session.config``. 

1314 :param profile_name: The name of the current profile. 

1315 :param config_parser: A config parser callable. 

1316 

1317 """ 

1318 self._config_filename = config_filename 

1319 self._profile_name = profile_name 

1320 if config_parser is None: 

1321 config_parser = botocore.configloader.load_config 

1322 self._config_parser = config_parser 

1323 

1324 def load(self): 

1325 """ 

1326 If there is are credentials in the configuration associated with 

1327 the session, use those. 

1328 """ 

1329 try: 

1330 full_config = self._config_parser(self._config_filename) 

1331 except ConfigNotFound: 

1332 return None 

1333 if self._profile_name in full_config['profiles']: 

1334 profile_config = full_config['profiles'][self._profile_name] 

1335 if self.ACCESS_KEY in profile_config: 

1336 logger.info( 

1337 "Credentials found in config file: %s", 

1338 self._config_filename, 

1339 ) 

1340 access_key, secret_key = self._extract_creds_from_mapping( 

1341 profile_config, self.ACCESS_KEY, self.SECRET_KEY 

1342 ) 

1343 token = self._get_session_token(profile_config) 

1344 return Credentials( 

1345 access_key, secret_key, token, method=self.METHOD 

1346 ) 

1347 else: 

1348 return None 

1349 

1350 def _get_session_token(self, profile_config): 

1351 for token_name in self.TOKENS: 

1352 if token_name in profile_config: 

1353 return profile_config[token_name] 

1354 

1355 

1356class BotoProvider(CredentialProvider): 

1357 METHOD = 'boto-config' 

1358 CANONICAL_NAME = 'Boto2Config' 

1359 

1360 BOTO_CONFIG_ENV = 'BOTO_CONFIG' 

1361 DEFAULT_CONFIG_FILENAMES = ['/etc/boto.cfg', '~/.boto'] 

1362 ACCESS_KEY = 'aws_access_key_id' 

1363 SECRET_KEY = 'aws_secret_access_key' 

1364 

1365 def __init__(self, environ=None, ini_parser=None): 

1366 if environ is None: 

1367 environ = os.environ 

1368 if ini_parser is None: 

1369 ini_parser = botocore.configloader.raw_config_parse 

1370 self._environ = environ 

1371 self._ini_parser = ini_parser 

1372 

1373 def load(self): 

1374 """ 

1375 Look for credentials in boto config file. 

1376 """ 

1377 if self.BOTO_CONFIG_ENV in self._environ: 

1378 potential_locations = [self._environ[self.BOTO_CONFIG_ENV]] 

1379 else: 

1380 potential_locations = self.DEFAULT_CONFIG_FILENAMES 

1381 for filename in potential_locations: 

1382 try: 

1383 config = self._ini_parser(filename) 

1384 except ConfigNotFound: 

1385 # Move on to the next potential config file name. 

1386 continue 

1387 if 'Credentials' in config: 

1388 credentials = config['Credentials'] 

1389 if self.ACCESS_KEY in credentials: 

1390 logger.info( 

1391 "Found credentials in boto config file: %s", filename 

1392 ) 

1393 access_key, secret_key = self._extract_creds_from_mapping( 

1394 credentials, self.ACCESS_KEY, self.SECRET_KEY 

1395 ) 

1396 return Credentials( 

1397 access_key, secret_key, method=self.METHOD 

1398 ) 

1399 

1400 

1401class AssumeRoleProvider(CredentialProvider): 

1402 METHOD = 'assume-role' 

1403 # The AssumeRole provider is logically part of the SharedConfig and 

1404 # SharedCredentials providers. Since the purpose of the canonical name 

1405 # is to provide cross-sdk compatibility, calling code will need to be 

1406 # aware that either of those providers should be tied to the AssumeRole 

1407 # provider as much as possible. 

1408 CANONICAL_NAME = None 

1409 ROLE_CONFIG_VAR = 'role_arn' 

1410 WEB_IDENTITY_TOKE_FILE_VAR = 'web_identity_token_file' 

1411 # Credentials are considered expired (and will be refreshed) once the total 

1412 # remaining time left until the credentials expires is less than the 

1413 # EXPIRY_WINDOW. 

1414 EXPIRY_WINDOW_SECONDS = 60 * 15 

1415 

1416 def __init__( 

1417 self, 

1418 load_config, 

1419 client_creator, 

1420 cache, 

1421 profile_name, 

1422 prompter=getpass.getpass, 

1423 credential_sourcer=None, 

1424 profile_provider_builder=None, 

1425 ): 

1426 """ 

1427 :type load_config: callable 

1428 :param load_config: A function that accepts no arguments, and 

1429 when called, will return the full configuration dictionary 

1430 for the session (``session.full_config``). 

1431 

1432 :type client_creator: callable 

1433 :param client_creator: A factory function that will create 

1434 a client when called. Has the same interface as 

1435 ``botocore.session.Session.create_client``. 

1436 

1437 :type cache: dict 

1438 :param cache: An object that supports ``__getitem__``, 

1439 ``__setitem__``, and ``__contains__``. An example 

1440 of this is the ``JSONFileCache`` class in the CLI. 

1441 

1442 :type profile_name: str 

1443 :param profile_name: The name of the profile. 

1444 

1445 :type prompter: callable 

1446 :param prompter: A callable that returns input provided 

1447 by the user (i.e raw_input, getpass.getpass, etc.). 

1448 

1449 :type credential_sourcer: CanonicalNameCredentialSourcer 

1450 :param credential_sourcer: A credential provider that takes a 

1451 configuration, which is used to provide the source credentials 

1452 for the STS call. 

1453 """ 

1454 #: The cache used to first check for assumed credentials. 

1455 #: This is checked before making the AssumeRole API 

1456 #: calls and can be useful if you have short lived 

1457 #: scripts and you'd like to avoid calling AssumeRole 

1458 #: until the credentials are expired. 

1459 self.cache = cache 

1460 self._load_config = load_config 

1461 # client_creator is a callable that creates function. 

1462 # It's basically session.create_client 

1463 self._client_creator = client_creator 

1464 self._profile_name = profile_name 

1465 self._prompter = prompter 

1466 # The _loaded_config attribute will be populated from the 

1467 # load_config() function once the configuration is actually 

1468 # loaded. The reason we go through all this instead of just 

1469 # requiring that the loaded_config be passed to us is to that 

1470 # we can defer configuration loaded until we actually try 

1471 # to load credentials (as opposed to when the object is 

1472 # instantiated). 

1473 self._loaded_config = {} 

1474 self._credential_sourcer = credential_sourcer 

1475 self._profile_provider_builder = profile_provider_builder 

1476 self._visited_profiles = [self._profile_name] 

1477 

1478 def load(self): 

1479 self._loaded_config = self._load_config() 

1480 profiles = self._loaded_config.get('profiles', {}) 

1481 profile = profiles.get(self._profile_name, {}) 

1482 if self._has_assume_role_config_vars(profile): 

1483 return self._load_creds_via_assume_role(self._profile_name) 

1484 

1485 def _has_assume_role_config_vars(self, profile): 

1486 return ( 

1487 self.ROLE_CONFIG_VAR in profile 

1488 and 

1489 # We need to ensure this provider doesn't look at a profile when 

1490 # the profile has configuration for web identity. Simply relying on 

1491 # the order in the credential chain is insufficient as it doesn't 

1492 # prevent the case when we're doing an assume role chain. 

1493 self.WEB_IDENTITY_TOKE_FILE_VAR not in profile 

1494 ) 

1495 

1496 def _load_creds_via_assume_role(self, profile_name): 

1497 role_config = self._get_role_config(profile_name) 

1498 source_credentials = self._resolve_source_credentials( 

1499 role_config, profile_name 

1500 ) 

1501 

1502 extra_args = {} 

1503 role_session_name = role_config.get('role_session_name') 

1504 if role_session_name is not None: 

1505 extra_args['RoleSessionName'] = role_session_name 

1506 

1507 external_id = role_config.get('external_id') 

1508 if external_id is not None: 

1509 extra_args['ExternalId'] = external_id 

1510 

1511 mfa_serial = role_config.get('mfa_serial') 

1512 if mfa_serial is not None: 

1513 extra_args['SerialNumber'] = mfa_serial 

1514 

1515 duration_seconds = role_config.get('duration_seconds') 

1516 if duration_seconds is not None: 

1517 extra_args['DurationSeconds'] = duration_seconds 

1518 

1519 fetcher = AssumeRoleCredentialFetcher( 

1520 client_creator=self._client_creator, 

1521 source_credentials=source_credentials, 

1522 role_arn=role_config['role_arn'], 

1523 extra_args=extra_args, 

1524 mfa_prompter=self._prompter, 

1525 cache=self.cache, 

1526 ) 

1527 refresher = fetcher.fetch_credentials 

1528 if mfa_serial is not None: 

1529 refresher = create_mfa_serial_refresher(refresher) 

1530 

1531 # The initial credentials are empty and the expiration time is set 

1532 # to now so that we can delay the call to assume role until it is 

1533 # strictly needed. 

1534 return DeferredRefreshableCredentials( 

1535 method=self.METHOD, 

1536 refresh_using=refresher, 

1537 time_fetcher=_local_now, 

1538 ) 

1539 

1540 def _get_role_config(self, profile_name): 

1541 """Retrieves and validates the role configuration for the profile.""" 

1542 profiles = self._loaded_config.get('profiles', {}) 

1543 

1544 profile = profiles[profile_name] 

1545 source_profile = profile.get('source_profile') 

1546 role_arn = profile['role_arn'] 

1547 credential_source = profile.get('credential_source') 

1548 mfa_serial = profile.get('mfa_serial') 

1549 external_id = profile.get('external_id') 

1550 role_session_name = profile.get('role_session_name') 

1551 duration_seconds = profile.get('duration_seconds') 

1552 

1553 role_config = { 

1554 'role_arn': role_arn, 

1555 'external_id': external_id, 

1556 'mfa_serial': mfa_serial, 

1557 'role_session_name': role_session_name, 

1558 'source_profile': source_profile, 

1559 'credential_source': credential_source, 

1560 } 

1561 

1562 if duration_seconds is not None: 

1563 try: 

1564 role_config['duration_seconds'] = int(duration_seconds) 

1565 except ValueError: 

1566 pass 

1567 

1568 # Either the credential source or the source profile must be 

1569 # specified, but not both. 

1570 if credential_source is not None and source_profile is not None: 

1571 raise InvalidConfigError( 

1572 error_msg=( 

1573 'The profile "%s" contains both source_profile and ' 

1574 'credential_source.' % profile_name 

1575 ) 

1576 ) 

1577 elif credential_source is None and source_profile is None: 

1578 raise PartialCredentialsError( 

1579 provider=self.METHOD, 

1580 cred_var='source_profile or credential_source', 

1581 ) 

1582 elif credential_source is not None: 

1583 self._validate_credential_source(profile_name, credential_source) 

1584 else: 

1585 self._validate_source_profile(profile_name, source_profile) 

1586 

1587 return role_config 

1588 

1589 def _validate_credential_source(self, parent_profile, credential_source): 

1590 if self._credential_sourcer is None: 

1591 raise InvalidConfigError( 

1592 error_msg=( 

1593 f"The credential_source \"{credential_source}\" is specified " 

1594 f"in profile \"{parent_profile}\", " 

1595 f"but no source provider was configured." 

1596 ) 

1597 ) 

1598 if not self._credential_sourcer.is_supported(credential_source): 

1599 raise InvalidConfigError( 

1600 error_msg=( 

1601 f"The credential source \"{credential_source}\" referenced " 

1602 f"in profile \"{parent_profile}\" is not valid." 

1603 ) 

1604 ) 

1605 

1606 def _source_profile_has_credentials(self, profile): 

1607 return any( 

1608 [ 

1609 self._has_static_credentials(profile), 

1610 self._has_assume_role_config_vars(profile), 

1611 ] 

1612 ) 

1613 

1614 def _validate_source_profile( 

1615 self, parent_profile_name, source_profile_name 

1616 ): 

1617 profiles = self._loaded_config.get('profiles', {}) 

1618 if source_profile_name not in profiles: 

1619 raise InvalidConfigError( 

1620 error_msg=( 

1621 f"The source_profile \"{source_profile_name}\" referenced in " 

1622 f"the profile \"{parent_profile_name}\" does not exist." 

1623 ) 

1624 ) 

1625 

1626 source_profile = profiles[source_profile_name] 

1627 

1628 # Make sure we aren't going into an infinite loop. If we haven't 

1629 # visited the profile yet, we're good. 

1630 if source_profile_name not in self._visited_profiles: 

1631 return 

1632 

1633 # If we have visited the profile and the profile isn't simply 

1634 # referencing itself, that's an infinite loop. 

1635 if source_profile_name != parent_profile_name: 

1636 raise InfiniteLoopConfigError( 

1637 source_profile=source_profile_name, 

1638 visited_profiles=self._visited_profiles, 

1639 ) 

1640 

1641 # A profile is allowed to reference itself so that it can source 

1642 # static credentials and have configuration all in the same 

1643 # profile. This will only ever work for the top level assume 

1644 # role because the static credentials will otherwise take 

1645 # precedence. 

1646 if not self._has_static_credentials(source_profile): 

1647 raise InfiniteLoopConfigError( 

1648 source_profile=source_profile_name, 

1649 visited_profiles=self._visited_profiles, 

1650 ) 

1651 

1652 def _has_static_credentials(self, profile): 

1653 static_keys = ['aws_secret_access_key', 'aws_access_key_id'] 

1654 return any(static_key in profile for static_key in static_keys) 

1655 

1656 def _resolve_source_credentials(self, role_config, profile_name): 

1657 credential_source = role_config.get('credential_source') 

1658 if credential_source is not None: 

1659 return self._resolve_credentials_from_source( 

1660 credential_source, profile_name 

1661 ) 

1662 

1663 source_profile = role_config['source_profile'] 

1664 self._visited_profiles.append(source_profile) 

1665 return self._resolve_credentials_from_profile(source_profile) 

1666 

1667 def _resolve_credentials_from_profile(self, profile_name): 

1668 profiles = self._loaded_config.get('profiles', {}) 

1669 profile = profiles[profile_name] 

1670 

1671 if ( 

1672 self._has_static_credentials(profile) 

1673 and not self._profile_provider_builder 

1674 ): 

1675 # This is only here for backwards compatibility. If this provider 

1676 # isn't given a profile provider builder we still want to be able 

1677 # handle the basic static credential case as we would before the 

1678 # provile provider builder parameter was added. 

1679 return self._resolve_static_credentials_from_profile(profile) 

1680 elif self._has_static_credentials( 

1681 profile 

1682 ) or not self._has_assume_role_config_vars(profile): 

1683 profile_providers = self._profile_provider_builder.providers( 

1684 profile_name=profile_name, 

1685 disable_env_vars=True, 

1686 ) 

1687 profile_chain = CredentialResolver(profile_providers) 

1688 credentials = profile_chain.load_credentials() 

1689 if credentials is None: 

1690 error_message = ( 

1691 'The source profile "%s" must have credentials.' 

1692 ) 

1693 raise InvalidConfigError( 

1694 error_msg=error_message % profile_name, 

1695 ) 

1696 return credentials 

1697 

1698 return self._load_creds_via_assume_role(profile_name) 

1699 

1700 def _resolve_static_credentials_from_profile(self, profile): 

1701 try: 

1702 return Credentials( 

1703 access_key=profile['aws_access_key_id'], 

1704 secret_key=profile['aws_secret_access_key'], 

1705 token=profile.get('aws_session_token'), 

1706 ) 

1707 except KeyError as e: 

1708 raise PartialCredentialsError( 

1709 provider=self.METHOD, cred_var=str(e) 

1710 ) 

1711 

1712 def _resolve_credentials_from_source( 

1713 self, credential_source, profile_name 

1714 ): 

1715 credentials = self._credential_sourcer.source_credentials( 

1716 credential_source 

1717 ) 

1718 if credentials is None: 

1719 raise CredentialRetrievalError( 

1720 provider=credential_source, 

1721 error_msg=( 

1722 'No credentials found in credential_source referenced ' 

1723 'in profile %s' % profile_name 

1724 ), 

1725 ) 

1726 return credentials 

1727 

1728 

1729class AssumeRoleWithWebIdentityProvider(CredentialProvider): 

1730 METHOD = 'assume-role-with-web-identity' 

1731 CANONICAL_NAME = None 

1732 _CONFIG_TO_ENV_VAR = { 

1733 'web_identity_token_file': 'AWS_WEB_IDENTITY_TOKEN_FILE', 

1734 'role_session_name': 'AWS_ROLE_SESSION_NAME', 

1735 'role_arn': 'AWS_ROLE_ARN', 

1736 } 

1737 

1738 def __init__( 

1739 self, 

1740 load_config, 

1741 client_creator, 

1742 profile_name, 

1743 cache=None, 

1744 disable_env_vars=False, 

1745 token_loader_cls=None, 

1746 ): 

1747 self.cache = cache 

1748 self._load_config = load_config 

1749 self._client_creator = client_creator 

1750 self._profile_name = profile_name 

1751 self._profile_config = None 

1752 self._disable_env_vars = disable_env_vars 

1753 if token_loader_cls is None: 

1754 token_loader_cls = FileWebIdentityTokenLoader 

1755 self._token_loader_cls = token_loader_cls 

1756 

1757 def load(self): 

1758 return self._assume_role_with_web_identity() 

1759 

1760 def _get_profile_config(self, key): 

1761 if self._profile_config is None: 

1762 loaded_config = self._load_config() 

1763 profiles = loaded_config.get('profiles', {}) 

1764 self._profile_config = profiles.get(self._profile_name, {}) 

1765 return self._profile_config.get(key) 

1766 

1767 def _get_env_config(self, key): 

1768 if self._disable_env_vars: 

1769 return None 

1770 env_key = self._CONFIG_TO_ENV_VAR.get(key) 

1771 if env_key and env_key in os.environ: 

1772 return os.environ[env_key] 

1773 return None 

1774 

1775 def _get_config(self, key): 

1776 env_value = self._get_env_config(key) 

1777 if env_value is not None: 

1778 return env_value 

1779 return self._get_profile_config(key) 

1780 

1781 def _assume_role_with_web_identity(self): 

1782 token_path = self._get_config('web_identity_token_file') 

1783 if not token_path: 

1784 return None 

1785 token_loader = self._token_loader_cls(token_path) 

1786 

1787 role_arn = self._get_config('role_arn') 

1788 if not role_arn: 

1789 error_msg = ( 

1790 'The provided profile or the current environment is ' 

1791 'configured to assume role with web identity but has no ' 

1792 'role ARN configured. Ensure that the profile has the role_arn' 

1793 'configuration set or the AWS_ROLE_ARN env var is set.' 

1794 ) 

1795 raise InvalidConfigError(error_msg=error_msg) 

1796 

1797 extra_args = {} 

1798 role_session_name = self._get_config('role_session_name') 

1799 if role_session_name is not None: 

1800 extra_args['RoleSessionName'] = role_session_name 

1801 

1802 fetcher = AssumeRoleWithWebIdentityCredentialFetcher( 

1803 client_creator=self._client_creator, 

1804 web_identity_token_loader=token_loader, 

1805 role_arn=role_arn, 

1806 extra_args=extra_args, 

1807 cache=self.cache, 

1808 ) 

1809 # The initial credentials are empty and the expiration time is set 

1810 # to now so that we can delay the call to assume role until it is 

1811 # strictly needed. 

1812 return DeferredRefreshableCredentials( 

1813 method=self.METHOD, 

1814 refresh_using=fetcher.fetch_credentials, 

1815 ) 

1816 

1817 

1818class CanonicalNameCredentialSourcer: 

1819 def __init__(self, providers): 

1820 self._providers = providers 

1821 

1822 def is_supported(self, source_name): 

1823 """Validates a given source name. 

1824 

1825 :type source_name: str 

1826 :param source_name: The value of credential_source in the config 

1827 file. This is the canonical name of the credential provider. 

1828 

1829 :rtype: bool 

1830 :returns: True if the credential provider is supported, 

1831 False otherwise. 

1832 """ 

1833 return source_name in [p.CANONICAL_NAME for p in self._providers] 

1834 

1835 def source_credentials(self, source_name): 

1836 """Loads source credentials based on the provided configuration. 

1837 

1838 :type source_name: str 

1839 :param source_name: The value of credential_source in the config 

1840 file. This is the canonical name of the credential provider. 

1841 

1842 :rtype: Credentials 

1843 """ 

1844 source = self._get_provider(source_name) 

1845 if isinstance(source, CredentialResolver): 

1846 return source.load_credentials() 

1847 return source.load() 

1848 

1849 def _get_provider(self, canonical_name): 

1850 """Return a credential provider by its canonical name. 

1851 

1852 :type canonical_name: str 

1853 :param canonical_name: The canonical name of the provider. 

1854 

1855 :raises UnknownCredentialError: Raised if no 

1856 credential provider by the provided name 

1857 is found. 

1858 """ 

1859 provider = self._get_provider_by_canonical_name(canonical_name) 

1860 

1861 # The AssumeRole provider should really be part of the SharedConfig 

1862 # provider rather than being its own thing, but it is not. It is 

1863 # effectively part of both the SharedConfig provider and the 

1864 # SharedCredentials provider now due to the way it behaves. 

1865 # Therefore if we want either of those providers we should return 

1866 # the AssumeRole provider with it. 

1867 if canonical_name.lower() in ['sharedconfig', 'sharedcredentials']: 

1868 assume_role_provider = self._get_provider_by_method('assume-role') 

1869 if assume_role_provider is not None: 

1870 # The SharedConfig or SharedCredentials provider may not be 

1871 # present if it was removed for some reason, but the 

1872 # AssumeRole provider could still be present. In that case, 

1873 # return the assume role provider by itself. 

1874 if provider is None: 

1875 return assume_role_provider 

1876 

1877 # If both are present, return them both as a 

1878 # CredentialResolver so that calling code can treat them as 

1879 # a single entity. 

1880 return CredentialResolver([assume_role_provider, provider]) 

1881 

1882 if provider is None: 

1883 raise UnknownCredentialError(name=canonical_name) 

1884 

1885 return provider 

1886 

1887 def _get_provider_by_canonical_name(self, canonical_name): 

1888 """Return a credential provider by its canonical name. 

1889 

1890 This function is strict, it does not attempt to address 

1891 compatibility issues. 

1892 """ 

1893 for provider in self._providers: 

1894 name = provider.CANONICAL_NAME 

1895 # Canonical names are case-insensitive 

1896 if name and name.lower() == canonical_name.lower(): 

1897 return provider 

1898 

1899 def _get_provider_by_method(self, method): 

1900 """Return a credential provider by its METHOD name.""" 

1901 for provider in self._providers: 

1902 if provider.METHOD == method: 

1903 return provider 

1904 

1905 

1906class ContainerProvider(CredentialProvider): 

1907 METHOD = 'container-role' 

1908 CANONICAL_NAME = 'EcsContainer' 

1909 ENV_VAR = 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI' 

1910 ENV_VAR_FULL = 'AWS_CONTAINER_CREDENTIALS_FULL_URI' 

1911 ENV_VAR_AUTH_TOKEN = 'AWS_CONTAINER_AUTHORIZATION_TOKEN' 

1912 ENV_VAR_AUTH_TOKEN_FILE = 'AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE' 

1913 

1914 def __init__(self, environ=None, fetcher=None): 

1915 if environ is None: 

1916 environ = os.environ 

1917 if fetcher is None: 

1918 fetcher = ContainerMetadataFetcher() 

1919 self._environ = environ 

1920 self._fetcher = fetcher 

1921 

1922 def load(self): 

1923 # This cred provider is only triggered if the self.ENV_VAR is set, 

1924 # which only happens if you opt into this feature. 

1925 if self.ENV_VAR in self._environ or self.ENV_VAR_FULL in self._environ: 

1926 return self._retrieve_or_fail() 

1927 

1928 def _retrieve_or_fail(self): 

1929 if self._provided_relative_uri(): 

1930 full_uri = self._fetcher.full_url(self._environ[self.ENV_VAR]) 

1931 else: 

1932 full_uri = self._environ[self.ENV_VAR_FULL] 

1933 headers = self._build_headers() 

1934 fetcher = self._create_fetcher(full_uri, headers) 

1935 creds = fetcher() 

1936 return RefreshableCredentials( 

1937 access_key=creds['access_key'], 

1938 secret_key=creds['secret_key'], 

1939 token=creds['token'], 

1940 method=self.METHOD, 

1941 expiry_time=_parse_if_needed(creds['expiry_time']), 

1942 refresh_using=fetcher, 

1943 ) 

1944 

1945 def _build_headers(self): 

1946 auth_token = None 

1947 if self.ENV_VAR_AUTH_TOKEN_FILE in self._environ: 

1948 auth_token_file_path = self._environ[self.ENV_VAR_AUTH_TOKEN_FILE] 

1949 with open(auth_token_file_path) as token_file: 

1950 auth_token = token_file.read() 

1951 elif self.ENV_VAR_AUTH_TOKEN in self._environ: 

1952 auth_token = self._environ[self.ENV_VAR_AUTH_TOKEN] 

1953 if auth_token is not None: 

1954 self._validate_auth_token(auth_token) 

1955 return {'Authorization': auth_token} 

1956 

1957 def _validate_auth_token(self, auth_token): 

1958 if "\r" in auth_token or "\n" in auth_token: 

1959 raise ValueError("Auth token value is not a legal header value") 

1960 

1961 def _create_fetcher(self, full_uri, headers): 

1962 def fetch_creds(): 

1963 try: 

1964 response = self._fetcher.retrieve_full_uri( 

1965 full_uri, headers=headers 

1966 ) 

1967 except MetadataRetrievalError as e: 

1968 logger.debug( 

1969 "Error retrieving container metadata: %s", e, exc_info=True 

1970 ) 

1971 raise CredentialRetrievalError( 

1972 provider=self.METHOD, error_msg=str(e) 

1973 ) 

1974 return { 

1975 'access_key': response['AccessKeyId'], 

1976 'secret_key': response['SecretAccessKey'], 

1977 'token': response['Token'], 

1978 'expiry_time': response['Expiration'], 

1979 } 

1980 

1981 return fetch_creds 

1982 

1983 def _provided_relative_uri(self): 

1984 return self.ENV_VAR in self._environ 

1985 

1986 

1987class CredentialResolver: 

1988 def __init__(self, providers): 

1989 """ 

1990 

1991 :param providers: A list of ``CredentialProvider`` instances. 

1992 

1993 """ 

1994 self.providers = providers 

1995 

1996 def insert_before(self, name, credential_provider): 

1997 """ 

1998 Inserts a new instance of ``CredentialProvider`` into the chain that 

1999 will be tried before an existing one. 

2000 

2001 :param name: The short name of the credentials you'd like to insert the 

2002 new credentials before. (ex. ``env`` or ``config``). Existing names 

2003 & ordering can be discovered via ``self.available_methods``. 

2004 :type name: string 

2005 

2006 :param cred_instance: An instance of the new ``Credentials`` object 

2007 you'd like to add to the chain. 

2008 :type cred_instance: A subclass of ``Credentials`` 

2009 """ 

2010 try: 

2011 offset = [p.METHOD for p in self.providers].index(name) 

2012 except ValueError: 

2013 raise UnknownCredentialError(name=name) 

2014 self.providers.insert(offset, credential_provider) 

2015 

2016 def insert_after(self, name, credential_provider): 

2017 """ 

2018 Inserts a new type of ``Credentials`` instance into the chain that will 

2019 be tried after an existing one. 

2020 

2021 :param name: The short name of the credentials you'd like to insert the 

2022 new credentials after. (ex. ``env`` or ``config``). Existing names 

2023 & ordering can be discovered via ``self.available_methods``. 

2024 :type name: string 

2025 

2026 :param cred_instance: An instance of the new ``Credentials`` object 

2027 you'd like to add to the chain. 

2028 :type cred_instance: A subclass of ``Credentials`` 

2029 """ 

2030 offset = self._get_provider_offset(name) 

2031 self.providers.insert(offset + 1, credential_provider) 

2032 

2033 def remove(self, name): 

2034 """ 

2035 Removes a given ``Credentials`` instance from the chain. 

2036 

2037 :param name: The short name of the credentials instance to remove. 

2038 :type name: string 

2039 """ 

2040 available_methods = [p.METHOD for p in self.providers] 

2041 if name not in available_methods: 

2042 # It's not present. Fail silently. 

2043 return 

2044 

2045 offset = available_methods.index(name) 

2046 self.providers.pop(offset) 

2047 

2048 def get_provider(self, name): 

2049 """Return a credential provider by name. 

2050 

2051 :type name: str 

2052 :param name: The name of the provider. 

2053 

2054 :raises UnknownCredentialError: Raised if no 

2055 credential provider by the provided name 

2056 is found. 

2057 """ 

2058 return self.providers[self._get_provider_offset(name)] 

2059 

2060 def _get_provider_offset(self, name): 

2061 try: 

2062 return [p.METHOD for p in self.providers].index(name) 

2063 except ValueError: 

2064 raise UnknownCredentialError(name=name) 

2065 

2066 def load_credentials(self): 

2067 """ 

2068 Goes through the credentials chain, returning the first ``Credentials`` 

2069 that could be loaded. 

2070 """ 

2071 # First provider to return a non-None response wins. 

2072 for provider in self.providers: 

2073 logger.debug("Looking for credentials via: %s", provider.METHOD) 

2074 creds = provider.load() 

2075 if creds is not None: 

2076 return creds 

2077 

2078 # If we got here, no credentials could be found. 

2079 # This feels like it should be an exception, but historically, ``None`` 

2080 # is returned. 

2081 # 

2082 # +1 

2083 # -js 

2084 return None 

2085 

2086 

2087class SSOCredentialFetcher(CachedCredentialFetcher): 

2088 _UTC_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' 

2089 

2090 def __init__( 

2091 self, 

2092 start_url, 

2093 sso_region, 

2094 role_name, 

2095 account_id, 

2096 client_creator, 

2097 token_loader=None, 

2098 cache=None, 

2099 expiry_window_seconds=None, 

2100 token_provider=None, 

2101 sso_session_name=None, 

2102 ): 

2103 self._client_creator = client_creator 

2104 self._sso_region = sso_region 

2105 self._role_name = role_name 

2106 self._account_id = account_id 

2107 self._start_url = start_url 

2108 self._token_loader = token_loader 

2109 self._token_provider = token_provider 

2110 self._sso_session_name = sso_session_name 

2111 super().__init__(cache, expiry_window_seconds) 

2112 

2113 def _create_cache_key(self): 

2114 """Create a predictable cache key for the current configuration. 

2115 

2116 The cache key is intended to be compatible with file names. 

2117 """ 

2118 args = { 

2119 'roleName': self._role_name, 

2120 'accountId': self._account_id, 

2121 } 

2122 if self._sso_session_name: 

2123 args['sessionName'] = self._sso_session_name 

2124 else: 

2125 args['startUrl'] = self._start_url 

2126 # NOTE: It would be good to hoist this cache key construction logic 

2127 # into the CachedCredentialFetcher class as we should be consistent. 

2128 # Unfortunately, the current assume role fetchers that sub class don't 

2129 # pass separators resulting in non-minified JSON. In the long term, 

2130 # all fetchers should use the below caching scheme. 

2131 args = json.dumps(args, sort_keys=True, separators=(',', ':')) 

2132 argument_hash = sha1(args.encode('utf-8')).hexdigest() 

2133 return self._make_file_safe(argument_hash) 

2134 

2135 def _parse_timestamp(self, timestamp_ms): 

2136 # fromtimestamp expects seconds so: milliseconds / 1000 = seconds 

2137 timestamp_seconds = timestamp_ms / 1000.0 

2138 timestamp = datetime.datetime.fromtimestamp(timestamp_seconds, tzutc()) 

2139 return timestamp.strftime(self._UTC_DATE_FORMAT) 

2140 

2141 def _get_credentials(self): 

2142 """Get credentials by calling SSO get role credentials.""" 

2143 config = Config( 

2144 signature_version=UNSIGNED, 

2145 region_name=self._sso_region, 

2146 ) 

2147 client = self._client_creator('sso', config=config) 

2148 if self._token_provider: 

2149 initial_token_data = self._token_provider.load_token() 

2150 token = initial_token_data.get_frozen_token().token 

2151 else: 

2152 token = self._token_loader(self._start_url)['accessToken'] 

2153 

2154 kwargs = { 

2155 'roleName': self._role_name, 

2156 'accountId': self._account_id, 

2157 'accessToken': token, 

2158 } 

2159 try: 

2160 response = client.get_role_credentials(**kwargs) 

2161 except client.exceptions.UnauthorizedException: 

2162 raise UnauthorizedSSOTokenError() 

2163 credentials = response['roleCredentials'] 

2164 

2165 credentials = { 

2166 'ProviderType': 'sso', 

2167 'Credentials': { 

2168 'AccessKeyId': credentials['accessKeyId'], 

2169 'SecretAccessKey': credentials['secretAccessKey'], 

2170 'SessionToken': credentials['sessionToken'], 

2171 'Expiration': self._parse_timestamp(credentials['expiration']), 

2172 }, 

2173 } 

2174 return credentials 

2175 

2176 

2177class SSOProvider(CredentialProvider): 

2178 METHOD = 'sso' 

2179 

2180 _SSO_TOKEN_CACHE_DIR = os.path.expanduser( 

2181 os.path.join('~', '.aws', 'sso', 'cache') 

2182 ) 

2183 _PROFILE_REQUIRED_CONFIG_VARS = ( 

2184 'sso_role_name', 

2185 'sso_account_id', 

2186 ) 

2187 _SSO_REQUIRED_CONFIG_VARS = ( 

2188 'sso_start_url', 

2189 'sso_region', 

2190 ) 

2191 _ALL_REQUIRED_CONFIG_VARS = ( 

2192 _PROFILE_REQUIRED_CONFIG_VARS + _SSO_REQUIRED_CONFIG_VARS 

2193 ) 

2194 

2195 def __init__( 

2196 self, 

2197 load_config, 

2198 client_creator, 

2199 profile_name, 

2200 cache=None, 

2201 token_cache=None, 

2202 token_provider=None, 

2203 ): 

2204 if token_cache is None: 

2205 token_cache = JSONFileCache(self._SSO_TOKEN_CACHE_DIR) 

2206 self._token_cache = token_cache 

2207 self._token_provider = token_provider 

2208 if cache is None: 

2209 cache = {} 

2210 self.cache = cache 

2211 self._load_config = load_config 

2212 self._client_creator = client_creator 

2213 self._profile_name = profile_name 

2214 

2215 def _load_sso_config(self): 

2216 loaded_config = self._load_config() 

2217 profiles = loaded_config.get('profiles', {}) 

2218 profile_name = self._profile_name 

2219 profile_config = profiles.get(self._profile_name, {}) 

2220 sso_sessions = loaded_config.get('sso_sessions', {}) 

2221 

2222 # Role name & Account ID indicate the cred provider should be used 

2223 if all( 

2224 c not in profile_config for c in self._PROFILE_REQUIRED_CONFIG_VARS 

2225 ): 

2226 return None 

2227 

2228 resolved_config, extra_reqs = self._resolve_sso_session_reference( 

2229 profile_config, sso_sessions 

2230 ) 

2231 

2232 config = {} 

2233 missing_config_vars = [] 

2234 all_required_configs = self._ALL_REQUIRED_CONFIG_VARS + extra_reqs 

2235 for config_var in all_required_configs: 

2236 if config_var in resolved_config: 

2237 config[config_var] = resolved_config[config_var] 

2238 else: 

2239 missing_config_vars.append(config_var) 

2240 

2241 if missing_config_vars: 

2242 missing = ', '.join(missing_config_vars) 

2243 raise InvalidConfigError( 

2244 error_msg=( 

2245 'The profile "%s" is configured to use SSO but is missing ' 

2246 'required configuration: %s' % (profile_name, missing) 

2247 ) 

2248 ) 

2249 return config 

2250 

2251 def _resolve_sso_session_reference(self, profile_config, sso_sessions): 

2252 sso_session_name = profile_config.get('sso_session') 

2253 if sso_session_name is None: 

2254 # No reference to resolve, proceed with legacy flow 

2255 return profile_config, () 

2256 

2257 if sso_session_name not in sso_sessions: 

2258 error_msg = f'The specified sso-session does not exist: "{sso_session_name}"' 

2259 raise InvalidConfigError(error_msg=error_msg) 

2260 

2261 config = profile_config.copy() 

2262 session = sso_sessions[sso_session_name] 

2263 for config_var, val in session.items(): 

2264 # Validate any keys referenced in both profile and sso_session match 

2265 if config.get(config_var, val) != val: 

2266 error_msg = ( 

2267 f"The value for {config_var} is inconsistent between " 

2268 f"profile ({config[config_var]}) and sso-session ({val})." 

2269 ) 

2270 raise InvalidConfigError(error_msg=error_msg) 

2271 config[config_var] = val 

2272 return config, ('sso_session',) 

2273 

2274 def load(self): 

2275 sso_config = self._load_sso_config() 

2276 if not sso_config: 

2277 return None 

2278 

2279 fetcher_kwargs = { 

2280 'start_url': sso_config['sso_start_url'], 

2281 'sso_region': sso_config['sso_region'], 

2282 'role_name': sso_config['sso_role_name'], 

2283 'account_id': sso_config['sso_account_id'], 

2284 'client_creator': self._client_creator, 

2285 'token_loader': SSOTokenLoader(cache=self._token_cache), 

2286 'cache': self.cache, 

2287 } 

2288 if 'sso_session' in sso_config: 

2289 fetcher_kwargs['sso_session_name'] = sso_config['sso_session'] 

2290 fetcher_kwargs['token_provider'] = self._token_provider 

2291 

2292 sso_fetcher = SSOCredentialFetcher(**fetcher_kwargs) 

2293 

2294 return DeferredRefreshableCredentials( 

2295 method=self.METHOD, 

2296 refresh_using=sso_fetcher.fetch_credentials, 

2297 )