Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/credentials.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1038 statements  

1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ 

2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"). You 

5# may not use this file except in compliance with the License. A copy of 

6# the License is located at 

7# 

8# http://aws.amazon.com/apache2.0/ 

9# 

10# or in the "license" file accompanying this file. This file is 

11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

12# ANY KIND, either express or implied. See the License for the specific 

13# language governing permissions and limitations under the License. 

14import datetime 

15import getpass 

16import json 

17import logging 

18import os 

19import subprocess 

20import threading 

21import time 

22from collections import namedtuple 

23from copy import deepcopy 

24from hashlib import sha1 

25 

26import dateutil.parser 

27from dateutil.parser import parse 

28from dateutil.tz import tzlocal, tzutc 

29 

30import botocore.compat 

31import botocore.configloader 

32from botocore import UNSIGNED 

33from botocore.compat import compat_shell_split, total_seconds 

34from botocore.config import Config 

35from botocore.exceptions import ( 

36 ConfigNotFound, 

37 CredentialRetrievalError, 

38 InfiniteLoopConfigError, 

39 InvalidConfigError, 

40 MetadataRetrievalError, 

41 PartialCredentialsError, 

42 RefreshWithMFAUnsupportedError, 

43 UnauthorizedSSOTokenError, 

44 UnknownCredentialError, 

45) 

46from botocore.tokens import SSOTokenProvider 

47from botocore.useragent import register_feature_id 

48from botocore.utils import ( 

49 ArnParser, 

50 ContainerMetadataFetcher, 

51 FileWebIdentityTokenLoader, 

52 InstanceMetadataFetcher, 

53 JSONFileCache, 

54 SSOTokenLoader, 

55 create_nested_client, 

56 parse_key_val_file, 

57 resolve_imds_endpoint_mode, 

58) 

59 

60logger = logging.getLogger(__name__) 

61ReadOnlyCredentials = namedtuple( 

62 'ReadOnlyCredentials', 

63 ['access_key', 'secret_key', 'token', 'account_id'], 

64 defaults=(None,), 

65) 

66 

67_DEFAULT_MANDATORY_REFRESH_TIMEOUT = 10 * 60 # 10 min 

68_DEFAULT_ADVISORY_REFRESH_TIMEOUT = 15 * 60 # 15 min 

69 

70 

71def create_credential_resolver(session, cache=None, region_name=None): 

72 """Create a default credential resolver. 

73 

74 This creates a pre-configured credential resolver 

75 that includes the default lookup chain for 

76 credentials. 

77 

78 """ 

79 profile_name = session.get_config_variable('profile') or 'default' 

80 metadata_timeout = session.get_config_variable('metadata_service_timeout') 

81 num_attempts = session.get_config_variable('metadata_service_num_attempts') 

82 disable_env_vars = session.instance_variables().get('profile') is not None 

83 

84 imds_config = { 

85 'ec2_metadata_service_endpoint': session.get_config_variable( 

86 'ec2_metadata_service_endpoint' 

87 ), 

88 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode( 

89 session 

90 ), 

91 'ec2_credential_refresh_window': _DEFAULT_ADVISORY_REFRESH_TIMEOUT, 

92 'ec2_metadata_v1_disabled': session.get_config_variable( 

93 'ec2_metadata_v1_disabled' 

94 ), 

95 } 

96 

97 if cache is None: 

98 cache = {} 

99 

100 env_provider = EnvProvider() 

101 container_provider = ContainerProvider() 

102 instance_metadata_provider = InstanceMetadataProvider( 

103 iam_role_fetcher=InstanceMetadataFetcher( 

104 timeout=metadata_timeout, 

105 num_attempts=num_attempts, 

106 user_agent=session.user_agent(), 

107 config=imds_config, 

108 ) 

109 ) 

110 

111 profile_provider_builder = ProfileProviderBuilder( 

112 session, cache=cache, region_name=region_name 

113 ) 

114 assume_role_provider = AssumeRoleProvider( 

115 load_config=lambda: session.full_config, 

116 client_creator=_get_client_creator(session, region_name), 

117 cache=cache, 

118 profile_name=profile_name, 

119 credential_sourcer=CanonicalNameCredentialSourcer( 

120 [env_provider, container_provider, instance_metadata_provider] 

121 ), 

122 profile_provider_builder=profile_provider_builder, 

123 ) 

124 

125 pre_profile = [ 

126 env_provider, 

127 assume_role_provider, 

128 ] 

129 profile_providers = profile_provider_builder.providers( 

130 profile_name=profile_name, 

131 disable_env_vars=disable_env_vars, 

132 ) 

133 post_profile = [ 

134 OriginalEC2Provider(), 

135 BotoProvider(), 

136 container_provider, 

137 instance_metadata_provider, 

138 ] 

139 providers = pre_profile + profile_providers + post_profile 

140 

141 if disable_env_vars: 

142 # An explicitly provided profile will negate an EnvProvider. 

143 # We will defer to providers that understand the "profile" 

144 # concept to retrieve credentials. 

145 # The one edge case if is all three values are provided via 

146 # env vars: 

147 # export AWS_ACCESS_KEY_ID=foo 

148 # export AWS_SECRET_ACCESS_KEY=bar 

149 # export AWS_PROFILE=baz 

150 # Then, just like our client() calls, the explicit credentials 

151 # will take precedence. 

152 # 

153 # This precedence is enforced by leaving the EnvProvider in the chain. 

154 # This means that the only way a "profile" would win is if the 

155 # EnvProvider does not return credentials, which is what we want 

156 # in this scenario. 

157 providers.remove(env_provider) 

158 logger.debug( 

159 'Skipping environment variable credential check' 

160 ' because profile name was explicitly set.' 

161 ) 

162 

163 resolver = CredentialResolver(providers=providers) 

164 return resolver 

165 

166 

167class ProfileProviderBuilder: 

168 """This class handles the creation of profile based providers. 

169 

170 NOTE: This class is only intended for internal use. 

171 

172 This class handles the creation and ordering of the various credential 

173 providers that primarly source their configuration from the shared config. 

174 This is needed to enable sharing between the default credential chain and 

175 the source profile chain created by the assume role provider. 

176 """ 

177 

178 def __init__( 

179 self, session, cache=None, region_name=None, sso_token_cache=None 

180 ): 

181 self._session = session 

182 self._cache = cache 

183 self._region_name = region_name 

184 self._sso_token_cache = sso_token_cache 

185 

186 def providers(self, profile_name, disable_env_vars=False): 

187 return [ 

188 self._create_web_identity_provider( 

189 profile_name, 

190 disable_env_vars, 

191 ), 

192 self._create_sso_provider(profile_name), 

193 self._create_shared_credential_provider(profile_name), 

194 self._create_process_provider(profile_name), 

195 self._create_config_provider(profile_name), 

196 ] 

197 

198 def _create_process_provider(self, profile_name): 

199 return ProcessProvider( 

200 profile_name=profile_name, 

201 load_config=lambda: self._session.full_config, 

202 ) 

203 

204 def _create_shared_credential_provider(self, profile_name): 

205 credential_file = self._session.get_config_variable('credentials_file') 

206 return SharedCredentialProvider( 

207 profile_name=profile_name, 

208 creds_filename=credential_file, 

209 ) 

210 

211 def _create_config_provider(self, profile_name): 

212 config_file = self._session.get_config_variable('config_file') 

213 return ConfigProvider( 

214 profile_name=profile_name, 

215 config_filename=config_file, 

216 ) 

217 

218 def _create_web_identity_provider(self, profile_name, disable_env_vars): 

219 return AssumeRoleWithWebIdentityProvider( 

220 load_config=lambda: self._session.full_config, 

221 client_creator=_get_client_creator( 

222 self._session, self._region_name 

223 ), 

224 cache=self._cache, 

225 profile_name=profile_name, 

226 disable_env_vars=disable_env_vars, 

227 ) 

228 

229 def _create_sso_provider(self, profile_name): 

230 return SSOProvider( 

231 load_config=lambda: self._session.full_config, 

232 client_creator=self._session.create_client, 

233 profile_name=profile_name, 

234 cache=self._cache, 

235 token_cache=self._sso_token_cache, 

236 token_provider=SSOTokenProvider( 

237 self._session, 

238 cache=self._sso_token_cache, 

239 profile_name=profile_name, 

240 ), 

241 ) 

242 

243 

244def get_credentials(session): 

245 resolver = create_credential_resolver(session) 

246 return resolver.load_credentials() 

247 

248 

249def _local_now(): 

250 return datetime.datetime.now(tzlocal()) 

251 

252 

253def _parse_if_needed(value): 

254 if isinstance(value, datetime.datetime): 

255 return value 

256 return parse(value) 

257 

258 

259def _serialize_if_needed(value, iso=False): 

260 if isinstance(value, datetime.datetime): 

261 if iso: 

262 return value.isoformat() 

263 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

264 return value 

265 

266 

267def _get_client_creator(session, region_name): 

268 def client_creator(service_name, **kwargs): 

269 create_client_kwargs = {'region_name': region_name} 

270 create_client_kwargs.update(**kwargs) 

271 return create_nested_client( 

272 session, service_name, **create_client_kwargs 

273 ) 

274 

275 return client_creator 

276 

277 

278def create_assume_role_refresher(client, params): 

279 def refresh(): 

280 response = client.assume_role(**params) 

281 credentials = response['Credentials'] 

282 # We need to normalize the credential names to 

283 # the values expected by the refresh creds. 

284 return { 

285 'access_key': credentials['AccessKeyId'], 

286 'secret_key': credentials['SecretAccessKey'], 

287 'token': credentials['SessionToken'], 

288 'expiry_time': _serialize_if_needed(credentials['Expiration']), 

289 } 

290 

291 return refresh 

292 

293 

294def create_mfa_serial_refresher(actual_refresh): 

295 class _Refresher: 

296 def __init__(self, refresh): 

297 self._refresh = refresh 

298 self._has_been_called = False 

299 

300 def __call__(self): 

301 if self._has_been_called: 

302 # We can explore an option in the future to support 

303 # reprompting for MFA, but for now we just error out 

304 # when the temp creds expire. 

305 raise RefreshWithMFAUnsupportedError() 

306 self._has_been_called = True 

307 return self._refresh() 

308 

309 return _Refresher(actual_refresh) 

310 

311 

312class Credentials: 

313 """ 

314 Holds the credentials needed to authenticate requests. 

315 

316 :param str access_key: The access key part of the credentials. 

317 :param str secret_key: The secret key part of the credentials. 

318 :param str token: The security token, valid only for session credentials. 

319 :param str method: A string which identifies where the credentials 

320 were found. 

321 :param str account_id: (optional) An account ID associated with the credentials. 

322 """ 

323 

324 def __init__( 

325 self, access_key, secret_key, token=None, method=None, account_id=None 

326 ): 

327 self.access_key = access_key 

328 self.secret_key = secret_key 

329 self.token = token 

330 

331 if method is None: 

332 method = 'explicit' 

333 self.method = method 

334 self.account_id = account_id 

335 

336 self._normalize() 

337 

338 def _normalize(self): 

339 # Keys would sometimes (accidentally) contain non-ascii characters. 

340 # It would cause a confusing UnicodeDecodeError in Python 2. 

341 # We explicitly convert them into unicode to avoid such error. 

342 # 

343 # Eventually the service will decide whether to accept the credential. 

344 # This also complies with the behavior in Python 3. 

345 self.access_key = botocore.compat.ensure_unicode(self.access_key) 

346 self.secret_key = botocore.compat.ensure_unicode(self.secret_key) 

347 

348 def get_frozen_credentials(self): 

349 return ReadOnlyCredentials( 

350 self.access_key, self.secret_key, self.token, self.account_id 

351 ) 

352 

353 def get_deferred_property(self, property_name): 

354 def get_property(): 

355 return getattr(self, property_name, None) 

356 

357 return get_property 

358 

359 

360class RefreshableCredentials(Credentials): 

361 """ 

362 Holds the credentials needed to authenticate requests. In addition, it 

363 knows how to refresh itself. 

364 

365 :param str access_key: The access key part of the credentials. 

366 :param str secret_key: The secret key part of the credentials. 

367 :param str token: The security token, valid only for session credentials. 

368 :param datetime expiry_time: The expiration time of the credentials. 

369 :param function refresh_using: Callback function to refresh the credentials. 

370 :param str method: A string which identifies where the credentials 

371 were found. 

372 :param function time_fetcher: Callback function to retrieve current time. 

373 """ 

374 

375 # The time at which we'll attempt to refresh, but not 

376 # block if someone else is refreshing. 

377 _advisory_refresh_timeout = _DEFAULT_ADVISORY_REFRESH_TIMEOUT 

378 # The time at which all threads will block waiting for 

379 # refreshed credentials. 

380 _mandatory_refresh_timeout = _DEFAULT_MANDATORY_REFRESH_TIMEOUT 

381 

382 def __init__( 

383 self, 

384 access_key, 

385 secret_key, 

386 token, 

387 expiry_time, 

388 refresh_using, 

389 method, 

390 time_fetcher=_local_now, 

391 advisory_timeout=None, 

392 mandatory_timeout=None, 

393 account_id=None, 

394 ): 

395 self._refresh_using = refresh_using 

396 self._access_key = access_key 

397 self._secret_key = secret_key 

398 self._token = token 

399 self._account_id = account_id 

400 self._expiry_time = expiry_time 

401 self._time_fetcher = time_fetcher 

402 self._refresh_lock = threading.Lock() 

403 self.method = method 

404 self._frozen_credentials = ReadOnlyCredentials( 

405 access_key, secret_key, token, account_id 

406 ) 

407 self._normalize() 

408 if advisory_timeout is not None: 

409 self._advisory_refresh_timeout = advisory_timeout 

410 if mandatory_timeout is not None: 

411 self._mandatory_refresh_timeout = mandatory_timeout 

412 

413 def _normalize(self): 

414 self._access_key = botocore.compat.ensure_unicode(self._access_key) 

415 self._secret_key = botocore.compat.ensure_unicode(self._secret_key) 

416 

417 @classmethod 

418 def create_from_metadata( 

419 cls, 

420 metadata, 

421 refresh_using, 

422 method, 

423 advisory_timeout=None, 

424 mandatory_timeout=None, 

425 ): 

426 kwargs = {} 

427 if advisory_timeout is not None: 

428 kwargs['advisory_timeout'] = advisory_timeout 

429 if mandatory_timeout is not None: 

430 kwargs['mandatory_timeout'] = mandatory_timeout 

431 

432 instance = cls( 

433 access_key=metadata['access_key'], 

434 secret_key=metadata['secret_key'], 

435 token=metadata['token'], 

436 expiry_time=cls._expiry_datetime(metadata['expiry_time']), 

437 method=method, 

438 refresh_using=refresh_using, 

439 account_id=metadata.get('account_id'), 

440 **kwargs, 

441 ) 

442 return instance 

443 

444 @property 

445 def access_key(self): 

446 """Warning: Using this property can lead to race conditions if you 

447 access another property subsequently along the refresh boundary. 

448 Please use get_frozen_credentials instead. 

449 """ 

450 self._refresh() 

451 return self._access_key 

452 

453 @access_key.setter 

454 def access_key(self, value): 

455 self._access_key = value 

456 

457 @property 

458 def secret_key(self): 

459 """Warning: Using this property can lead to race conditions if you 

460 access another property subsequently along the refresh boundary. 

461 Please use get_frozen_credentials instead. 

462 """ 

463 self._refresh() 

464 return self._secret_key 

465 

466 @secret_key.setter 

467 def secret_key(self, value): 

468 self._secret_key = value 

469 

470 @property 

471 def token(self): 

472 """Warning: Using this property can lead to race conditions if you 

473 access another property subsequently along the refresh boundary. 

474 Please use get_frozen_credentials instead. 

475 """ 

476 self._refresh() 

477 return self._token 

478 

479 @token.setter 

480 def token(self, value): 

481 self._token = value 

482 

483 @property 

484 def account_id(self): 

485 """Warning: Using this property can lead to race conditions if you 

486 access another property subsequently along the refresh boundary. 

487 Please use get_frozen_credentials instead. 

488 """ 

489 self._refresh() 

490 return self._account_id 

491 

492 @account_id.setter 

493 def account_id(self, value): 

494 self._account_id = value 

495 

496 def _seconds_remaining(self): 

497 delta = self._expiry_time - self._time_fetcher() 

498 return total_seconds(delta) 

499 

500 def refresh_needed(self, refresh_in=None): 

501 """Check if a refresh is needed. 

502 

503 A refresh is needed if the expiry time associated 

504 with the temporary credentials is less than the 

505 provided ``refresh_in``. If ``time_delta`` is not 

506 provided, ``self.advisory_refresh_needed`` will be used. 

507 

508 For example, if your temporary credentials expire 

509 in 10 minutes and the provided ``refresh_in`` is 

510 ``15 * 60``, then this function will return ``True``. 

511 

512 :type refresh_in: int 

513 :param refresh_in: The number of seconds before the 

514 credentials expire in which refresh attempts should 

515 be made. 

516 

517 :return: True if refresh needed, False otherwise. 

518 

519 """ 

520 if self._expiry_time is None: 

521 # No expiration, so assume we don't need to refresh. 

522 return False 

523 

524 if refresh_in is None: 

525 refresh_in = self._advisory_refresh_timeout 

526 # The credentials should be refreshed if they're going to expire 

527 # in less than 5 minutes. 

528 if self._seconds_remaining() >= refresh_in: 

529 # There's enough time left. Don't refresh. 

530 return False 

531 logger.debug("Credentials need to be refreshed.") 

532 return True 

533 

534 def _is_expired(self): 

535 # Checks if the current credentials are expired. 

536 return self.refresh_needed(refresh_in=0) 

537 

538 def _refresh(self): 

539 # In the common case where we don't need a refresh, we 

540 # can immediately exit and not require acquiring the 

541 # refresh lock. 

542 if not self.refresh_needed(self._advisory_refresh_timeout): 

543 return 

544 

545 # acquire() doesn't accept kwargs, but False is indicating 

546 # that we should not block if we can't acquire the lock. 

547 # If we aren't able to acquire the lock, we'll trigger 

548 # the else clause. 

549 if self._refresh_lock.acquire(False): 

550 try: 

551 if not self.refresh_needed(self._advisory_refresh_timeout): 

552 return 

553 is_mandatory_refresh = self.refresh_needed( 

554 self._mandatory_refresh_timeout 

555 ) 

556 self._protected_refresh(is_mandatory=is_mandatory_refresh) 

557 return 

558 finally: 

559 self._refresh_lock.release() 

560 elif self.refresh_needed(self._mandatory_refresh_timeout): 

561 # If we're within the mandatory refresh window, 

562 # we must block until we get refreshed credentials. 

563 with self._refresh_lock: 

564 if not self.refresh_needed(self._mandatory_refresh_timeout): 

565 return 

566 self._protected_refresh(is_mandatory=True) 

567 

568 def _protected_refresh(self, is_mandatory): 

569 # precondition: this method should only be called if you've acquired 

570 # the self._refresh_lock. 

571 try: 

572 metadata = self._refresh_using() 

573 except Exception: 

574 period_name = 'mandatory' if is_mandatory else 'advisory' 

575 logger.warning( 

576 "Refreshing temporary credentials failed " 

577 "during %s refresh period.", 

578 period_name, 

579 exc_info=True, 

580 ) 

581 if is_mandatory: 

582 # If this is a mandatory refresh, then 

583 # all errors that occur when we attempt to refresh 

584 # credentials are propagated back to the user. 

585 raise 

586 # Otherwise we'll just return. 

587 # The end result will be that we'll use the current 

588 # set of temporary credentials we have. 

589 return 

590 self._set_from_data(metadata) 

591 self._frozen_credentials = ReadOnlyCredentials( 

592 self._access_key, self._secret_key, self._token, self._account_id 

593 ) 

594 if self._is_expired(): 

595 # We successfully refreshed credentials but for whatever 

596 # reason, our refreshing function returned credentials 

597 # that are still expired. In this scenario, the only 

598 # thing we can do is let the user know and raise 

599 # an exception. 

600 msg = ( 

601 "Credentials were refreshed, but the " 

602 "refreshed credentials are still expired." 

603 ) 

604 logger.warning(msg) 

605 raise RuntimeError(msg) 

606 

607 @staticmethod 

608 def _expiry_datetime(time_str): 

609 return parse(time_str) 

610 

611 def _set_from_data(self, data): 

612 expected_keys = ['access_key', 'secret_key', 'token', 'expiry_time'] 

613 if not data: 

614 missing_keys = expected_keys 

615 else: 

616 missing_keys = [k for k in expected_keys if k not in data] 

617 

618 if missing_keys: 

619 message = "Credential refresh failed, response did not contain: %s" 

620 raise CredentialRetrievalError( 

621 provider=self.method, 

622 error_msg=message % ', '.join(missing_keys), 

623 ) 

624 

625 self.access_key = data['access_key'] 

626 self.secret_key = data['secret_key'] 

627 self.token = data['token'] 

628 self._expiry_time = parse(data['expiry_time']) 

629 self.account_id = data.get('account_id') 

630 logger.debug( 

631 "Retrieved credentials will expire at: %s", self._expiry_time 

632 ) 

633 self._normalize() 

634 

635 def get_frozen_credentials(self): 

636 """Return immutable credentials. 

637 

638 The ``access_key``, ``secret_key``, and ``token`` properties 

639 on this class will always check and refresh credentials if 

640 needed before returning the particular credentials. 

641 

642 This has an edge case where you can get inconsistent 

643 credentials. Imagine this: 

644 

645 # Current creds are "t1" 

646 tmp.access_key ---> expired? no, so return t1.access_key 

647 # ---- time is now expired, creds need refreshing to "t2" ---- 

648 tmp.secret_key ---> expired? yes, refresh and return t2.secret_key 

649 

650 This means we're using the access key from t1 with the secret key 

651 from t2. To fix this issue, you can request a frozen credential object 

652 which is guaranteed not to change. 

653 

654 The frozen credentials returned from this method should be used 

655 immediately and then discarded. The typical usage pattern would 

656 be:: 

657 

658 creds = RefreshableCredentials(...) 

659 some_code = SomeSignerObject() 

660 # I'm about to sign the request. 

661 # The frozen credentials are only used for the 

662 # duration of generate_presigned_url and will be 

663 # immediately thrown away. 

664 request = some_code.sign_some_request( 

665 with_credentials=creds.get_frozen_credentials()) 

666 print("Signed request:", request) 

667 

668 """ 

669 self._refresh() 

670 return self._frozen_credentials 

671 

672 

673class DeferredRefreshableCredentials(RefreshableCredentials): 

674 """Refreshable credentials that don't require initial credentials. 

675 

676 refresh_using will be called upon first access. 

677 """ 

678 

679 def __init__(self, refresh_using, method, time_fetcher=_local_now): 

680 self._refresh_using = refresh_using 

681 self._access_key = None 

682 self._secret_key = None 

683 self._token = None 

684 self._account_id = None 

685 self._expiry_time = None 

686 self._time_fetcher = time_fetcher 

687 self._refresh_lock = threading.Lock() 

688 self.method = method 

689 self._frozen_credentials = None 

690 

691 def refresh_needed(self, refresh_in=None): 

692 if self._frozen_credentials is None: 

693 return True 

694 return super().refresh_needed(refresh_in) 

695 

696 

697class CachedCredentialFetcher: 

698 DEFAULT_EXPIRY_WINDOW_SECONDS = 60 * 15 

699 

700 def __init__(self, cache=None, expiry_window_seconds=None): 

701 if cache is None: 

702 cache = {} 

703 self._cache = cache 

704 self._cache_key = self._create_cache_key() 

705 if expiry_window_seconds is None: 

706 expiry_window_seconds = self.DEFAULT_EXPIRY_WINDOW_SECONDS 

707 self._expiry_window_seconds = expiry_window_seconds 

708 

709 def _create_cache_key(self): 

710 raise NotImplementedError('_create_cache_key()') 

711 

712 def _make_file_safe(self, filename): 

713 # Replace :, path sep, and / to make it the string filename safe. 

714 filename = filename.replace(':', '_').replace(os.sep, '_') 

715 return filename.replace('/', '_') 

716 

717 def _get_credentials(self): 

718 raise NotImplementedError('_get_credentials()') 

719 

720 def fetch_credentials(self): 

721 return self._get_cached_credentials() 

722 

723 def _get_cached_credentials(self): 

724 """Get up-to-date credentials. 

725 

726 This will check the cache for up-to-date credentials, calling assume 

727 role if none are available. 

728 """ 

729 response = self._load_from_cache() 

730 if response is None: 

731 response = self._get_credentials() 

732 self._write_to_cache(response) 

733 else: 

734 logger.debug("Credentials for role retrieved from cache.") 

735 

736 creds = response['Credentials'] 

737 expiration = _serialize_if_needed(creds['Expiration'], iso=True) 

738 credentials = { 

739 'access_key': creds['AccessKeyId'], 

740 'secret_key': creds['SecretAccessKey'], 

741 'token': creds['SessionToken'], 

742 'expiry_time': expiration, 

743 'account_id': creds.get('AccountId'), 

744 } 

745 

746 return credentials 

747 

748 def _load_from_cache(self): 

749 if self._cache_key in self._cache: 

750 creds = deepcopy(self._cache[self._cache_key]) 

751 if not self._is_expired(creds): 

752 return creds 

753 else: 

754 logger.debug( 

755 "Credentials were found in cache, but they are expired." 

756 ) 

757 return None 

758 

759 def _write_to_cache(self, response): 

760 self._cache[self._cache_key] = deepcopy(response) 

761 

762 def _is_expired(self, credentials): 

763 """Check if credentials are expired.""" 

764 end_time = _parse_if_needed(credentials['Credentials']['Expiration']) 

765 seconds = total_seconds(end_time - _local_now()) 

766 return seconds < self._expiry_window_seconds 

767 

768 

769class BaseAssumeRoleCredentialFetcher(CachedCredentialFetcher): 

770 def __init__( 

771 self, 

772 client_creator, 

773 role_arn, 

774 extra_args=None, 

775 cache=None, 

776 expiry_window_seconds=None, 

777 ): 

778 self._client_creator = client_creator 

779 self._role_arn = role_arn 

780 

781 if extra_args is None: 

782 self._assume_kwargs = {} 

783 else: 

784 self._assume_kwargs = deepcopy(extra_args) 

785 self._assume_kwargs['RoleArn'] = self._role_arn 

786 

787 self._role_session_name = self._assume_kwargs.get('RoleSessionName') 

788 self._using_default_session_name = False 

789 if not self._role_session_name: 

790 self._generate_assume_role_name() 

791 

792 super().__init__(cache, expiry_window_seconds) 

793 

794 def _generate_assume_role_name(self): 

795 self._role_session_name = f'botocore-session-{int(time.time())}' 

796 self._assume_kwargs['RoleSessionName'] = self._role_session_name 

797 self._using_default_session_name = True 

798 

799 def _create_cache_key(self): 

800 """Create a predictable cache key for the current configuration. 

801 

802 The cache key is intended to be compatible with file names. 

803 """ 

804 args = deepcopy(self._assume_kwargs) 

805 

806 # The role session name gets randomly generated, so we don't want it 

807 # in the hash. 

808 if self._using_default_session_name: 

809 del args['RoleSessionName'] 

810 

811 if 'Policy' in args: 

812 # To have a predictable hash, the keys of the policy must be 

813 # sorted, so we have to load it here to make sure it gets sorted 

814 # later on. 

815 args['Policy'] = json.loads(args['Policy']) 

816 

817 args = json.dumps(args, sort_keys=True) 

818 argument_hash = sha1(args.encode('utf-8')).hexdigest() 

819 return self._make_file_safe(argument_hash) 

820 

821 def _add_account_id_to_response(self, response): 

822 role_arn = response.get('AssumedRoleUser', {}).get('Arn') 

823 if ArnParser.is_arn(role_arn): 

824 arn_parser = ArnParser() 

825 account_id = arn_parser.parse_arn(role_arn)['account'] 

826 response['Credentials']['AccountId'] = account_id 

827 else: 

828 logger.debug("Unable to extract account ID from Arn: %s", role_arn) 

829 

830 

831class AssumeRoleCredentialFetcher(BaseAssumeRoleCredentialFetcher): 

832 def __init__( 

833 self, 

834 client_creator, 

835 source_credentials, 

836 role_arn, 

837 extra_args=None, 

838 mfa_prompter=None, 

839 cache=None, 

840 expiry_window_seconds=None, 

841 ): 

842 """ 

843 :type client_creator: callable 

844 :param client_creator: A callable that creates a client taking 

845 arguments like ``Session.create_client``. 

846 

847 :type source_credentials: Credentials 

848 :param source_credentials: The credentials to use to create the 

849 client for the call to AssumeRole. 

850 

851 :type role_arn: str 

852 :param role_arn: The ARN of the role to be assumed. 

853 

854 :type extra_args: dict 

855 :param extra_args: Any additional arguments to add to the assume 

856 role request using the format of the botocore operation. 

857 Possible keys include, but may not be limited to, 

858 DurationSeconds, Policy, SerialNumber, ExternalId and 

859 RoleSessionName. 

860 

861 :type mfa_prompter: callable 

862 :param mfa_prompter: A callable that returns input provided by the 

863 user (i.e raw_input, getpass.getpass, etc.). 

864 

865 :type cache: dict 

866 :param cache: An object that supports ``__getitem__``, 

867 ``__setitem__``, and ``__contains__``. An example of this is 

868 the ``JSONFileCache`` class in aws-cli. 

869 

870 :type expiry_window_seconds: int 

871 :param expiry_window_seconds: The amount of time, in seconds, 

872 """ 

873 self._source_credentials = source_credentials 

874 self._mfa_prompter = mfa_prompter 

875 if self._mfa_prompter is None: 

876 self._mfa_prompter = getpass.getpass 

877 

878 super().__init__( 

879 client_creator, 

880 role_arn, 

881 extra_args=extra_args, 

882 cache=cache, 

883 expiry_window_seconds=expiry_window_seconds, 

884 ) 

885 

886 def _get_credentials(self): 

887 """Get credentials by calling assume role.""" 

888 kwargs = self._assume_role_kwargs() 

889 client = self._create_client() 

890 response = client.assume_role(**kwargs) 

891 self._add_account_id_to_response(response) 

892 return response 

893 

894 def _assume_role_kwargs(self): 

895 """Get the arguments for assume role based on current configuration.""" 

896 assume_role_kwargs = deepcopy(self._assume_kwargs) 

897 

898 mfa_serial = assume_role_kwargs.get('SerialNumber') 

899 

900 if mfa_serial is not None: 

901 prompt = f'Enter MFA code for {mfa_serial}: ' 

902 token_code = self._mfa_prompter(prompt) 

903 assume_role_kwargs['TokenCode'] = token_code 

904 

905 duration_seconds = assume_role_kwargs.get('DurationSeconds') 

906 

907 if duration_seconds is not None: 

908 assume_role_kwargs['DurationSeconds'] = duration_seconds 

909 

910 return assume_role_kwargs 

911 

912 def _create_client(self): 

913 """Create an STS client using the source credentials.""" 

914 frozen_credentials = self._source_credentials.get_frozen_credentials() 

915 return self._client_creator( 

916 'sts', 

917 aws_access_key_id=frozen_credentials.access_key, 

918 aws_secret_access_key=frozen_credentials.secret_key, 

919 aws_session_token=frozen_credentials.token, 

920 ) 

921 

922 

923class AssumeRoleWithWebIdentityCredentialFetcher( 

924 BaseAssumeRoleCredentialFetcher 

925): 

926 def __init__( 

927 self, 

928 client_creator, 

929 web_identity_token_loader, 

930 role_arn, 

931 extra_args=None, 

932 cache=None, 

933 expiry_window_seconds=None, 

934 ): 

935 """ 

936 :type client_creator: callable 

937 :param client_creator: A callable that creates a client taking 

938 arguments like ``Session.create_client``. 

939 

940 :type web_identity_token_loader: callable 

941 :param web_identity_token_loader: A callable that takes no arguments 

942 and returns a web identity token str. 

943 

944 :type role_arn: str 

945 :param role_arn: The ARN of the role to be assumed. 

946 

947 :type extra_args: dict 

948 :param extra_args: Any additional arguments to add to the assume 

949 role request using the format of the botocore operation. 

950 Possible keys include, but may not be limited to, 

951 DurationSeconds, Policy, SerialNumber, ExternalId and 

952 RoleSessionName. 

953 

954 :type cache: dict 

955 :param cache: An object that supports ``__getitem__``, 

956 ``__setitem__``, and ``__contains__``. An example of this is 

957 the ``JSONFileCache`` class in aws-cli. 

958 

959 :type expiry_window_seconds: int 

960 :param expiry_window_seconds: The amount of time, in seconds, 

961 """ 

962 self._web_identity_token_loader = web_identity_token_loader 

963 

964 super().__init__( 

965 client_creator, 

966 role_arn, 

967 extra_args=extra_args, 

968 cache=cache, 

969 expiry_window_seconds=expiry_window_seconds, 

970 ) 

971 

972 def _get_credentials(self): 

973 """Get credentials by calling assume role.""" 

974 kwargs = self._assume_role_kwargs() 

975 # Assume role with web identity does not require credentials other than 

976 # the token, explicitly configure the client to not sign requests. 

977 config = Config(signature_version=UNSIGNED) 

978 client = self._client_creator('sts', config=config) 

979 response = client.assume_role_with_web_identity(**kwargs) 

980 self._add_account_id_to_response(response) 

981 return response 

982 

983 def _assume_role_kwargs(self): 

984 """Get the arguments for assume role based on current configuration.""" 

985 assume_role_kwargs = deepcopy(self._assume_kwargs) 

986 identity_token = self._web_identity_token_loader() 

987 assume_role_kwargs['WebIdentityToken'] = identity_token 

988 

989 return assume_role_kwargs 

990 

991 

992class CredentialProvider: 

993 # A short name to identify the provider within botocore. 

994 METHOD = None 

995 

996 # A name to identify the provider for use in cross-sdk features like 

997 # assume role's `credential_source` configuration option. These names 

998 # are to be treated in a case-insensitive way. NOTE: any providers not 

999 # implemented in botocore MUST prefix their canonical names with 

1000 # 'custom' or we DO NOT guarantee that it will work with any features 

1001 # that this provides. 

1002 CANONICAL_NAME = None 

1003 

1004 def __init__(self, session=None): 

1005 self.session = session 

1006 

1007 def load(self): 

1008 """ 

1009 Loads the credentials from their source & sets them on the object. 

1010 

1011 Subclasses should implement this method (by reading from disk, the 

1012 environment, the network or wherever), returning ``True`` if they were 

1013 found & loaded. 

1014 

1015 If not found, this method should return ``False``, indictating that the 

1016 ``CredentialResolver`` should fall back to the next available method. 

1017 

1018 The default implementation does nothing, assuming the user has set the 

1019 ``access_key/secret_key/token`` themselves. 

1020 

1021 :returns: Whether credentials were found & set 

1022 :rtype: Credentials 

1023 """ 

1024 return True 

1025 

1026 def _extract_creds_from_mapping(self, mapping, *key_names): 

1027 found = [] 

1028 for key_name in key_names: 

1029 try: 

1030 found.append(mapping[key_name]) 

1031 except KeyError: 

1032 raise PartialCredentialsError( 

1033 provider=self.METHOD, cred_var=key_name 

1034 ) 

1035 return found 

1036 

1037 

1038class ProcessProvider(CredentialProvider): 

1039 METHOD = 'custom-process' 

1040 

1041 def __init__(self, profile_name, load_config, popen=subprocess.Popen): 

1042 self._profile_name = profile_name 

1043 self._load_config = load_config 

1044 self._loaded_config = None 

1045 self._popen = popen 

1046 

1047 def load(self): 

1048 credential_process = self._credential_process 

1049 if credential_process is None: 

1050 return 

1051 

1052 creds_dict = self._retrieve_credentials_using(credential_process) 

1053 if creds_dict.get('expiry_time') is not None: 

1054 return RefreshableCredentials.create_from_metadata( 

1055 creds_dict, 

1056 lambda: self._retrieve_credentials_using(credential_process), 

1057 self.METHOD, 

1058 ) 

1059 

1060 return Credentials( 

1061 access_key=creds_dict['access_key'], 

1062 secret_key=creds_dict['secret_key'], 

1063 token=creds_dict.get('token'), 

1064 method=self.METHOD, 

1065 account_id=creds_dict.get('account_id'), 

1066 ) 

1067 

1068 def _retrieve_credentials_using(self, credential_process): 

1069 # We're not using shell=True, so we need to pass the 

1070 # command and all arguments as a list. 

1071 process_list = compat_shell_split(credential_process) 

1072 p = self._popen( 

1073 process_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

1074 ) 

1075 stdout, stderr = p.communicate() 

1076 if p.returncode != 0: 

1077 raise CredentialRetrievalError( 

1078 provider=self.METHOD, error_msg=stderr.decode('utf-8') 

1079 ) 

1080 parsed = botocore.compat.json.loads(stdout.decode('utf-8')) 

1081 version = parsed.get('Version', '<Version key not provided>') 

1082 if version != 1: 

1083 raise CredentialRetrievalError( 

1084 provider=self.METHOD, 

1085 error_msg=( 

1086 f"Unsupported version '{version}' for credential process " 

1087 f"provider, supported versions: 1" 

1088 ), 

1089 ) 

1090 try: 

1091 return { 

1092 'access_key': parsed['AccessKeyId'], 

1093 'secret_key': parsed['SecretAccessKey'], 

1094 'token': parsed.get('SessionToken'), 

1095 'expiry_time': parsed.get('Expiration'), 

1096 'account_id': self._get_account_id(parsed), 

1097 } 

1098 except KeyError as e: 

1099 raise CredentialRetrievalError( 

1100 provider=self.METHOD, 

1101 error_msg=f"Missing required key in response: {e}", 

1102 ) 

1103 

1104 @property 

1105 def _credential_process(self): 

1106 return self.profile_config.get('credential_process') 

1107 

1108 @property 

1109 def profile_config(self): 

1110 if self._loaded_config is None: 

1111 self._loaded_config = self._load_config() 

1112 profile_config = self._loaded_config.get('profiles', {}).get( 

1113 self._profile_name, {} 

1114 ) 

1115 return profile_config 

1116 

1117 def _get_account_id(self, parsed): 

1118 account_id = parsed.get('AccountId') 

1119 return account_id or self.profile_config.get('aws_account_id') 

1120 

1121 

1122class InstanceMetadataProvider(CredentialProvider): 

1123 METHOD = 'iam-role' 

1124 CANONICAL_NAME = 'Ec2InstanceMetadata' 

1125 

1126 def __init__(self, iam_role_fetcher): 

1127 self._role_fetcher = iam_role_fetcher 

1128 

1129 def load(self): 

1130 fetcher = self._role_fetcher 

1131 # We do the first request, to see if we get useful data back. 

1132 # If not, we'll pass & move on to whatever's next in the credential 

1133 # chain. 

1134 metadata = fetcher.retrieve_iam_role_credentials() 

1135 if not metadata: 

1136 return None 

1137 register_feature_id('CREDENTIALS_IMDS') 

1138 logger.info( 

1139 'Found credentials from IAM Role: %s', metadata['role_name'] 

1140 ) 

1141 # We manually set the data here, since we already made the request & 

1142 # have it. When the expiry is hit, the credentials will auto-refresh 

1143 # themselves. 

1144 creds = RefreshableCredentials.create_from_metadata( 

1145 metadata, 

1146 method=self.METHOD, 

1147 refresh_using=fetcher.retrieve_iam_role_credentials, 

1148 ) 

1149 return creds 

1150 

1151 

1152class EnvProvider(CredentialProvider): 

1153 METHOD = 'env' 

1154 CANONICAL_NAME = 'Environment' 

1155 ACCESS_KEY = 'AWS_ACCESS_KEY_ID' 

1156 SECRET_KEY = 'AWS_SECRET_ACCESS_KEY' 

1157 # The token can come from either of these env var. 

1158 # AWS_SESSION_TOKEN is what other AWS SDKs have standardized on. 

1159 TOKENS = ['AWS_SECURITY_TOKEN', 'AWS_SESSION_TOKEN'] 

1160 EXPIRY_TIME = 'AWS_CREDENTIAL_EXPIRATION' 

1161 ACCOUNT_ID = 'AWS_ACCOUNT_ID' 

1162 

1163 def __init__(self, environ=None, mapping=None): 

1164 """ 

1165 

1166 :param environ: The environment variables (defaults to 

1167 ``os.environ`` if no value is provided). 

1168 :param mapping: An optional mapping of variable names to 

1169 environment variable names. Use this if you want to 

1170 change the mapping of access_key->AWS_ACCESS_KEY_ID, etc. 

1171 The dict can have up to 5 keys: 

1172 * ``access_key`` 

1173 * ``secret_key`` 

1174 * ``token`` 

1175 * ``expiry_time`` 

1176 * ``account_id`` 

1177 """ 

1178 if environ is None: 

1179 environ = os.environ 

1180 self.environ = environ 

1181 self._mapping = self._build_mapping(mapping) 

1182 

1183 def _build_mapping(self, mapping): 

1184 # Mapping of variable name to env var name. 

1185 var_mapping = {} 

1186 if mapping is None: 

1187 # Use the class var default. 

1188 var_mapping['access_key'] = self.ACCESS_KEY 

1189 var_mapping['secret_key'] = self.SECRET_KEY 

1190 var_mapping['token'] = self.TOKENS 

1191 var_mapping['expiry_time'] = self.EXPIRY_TIME 

1192 var_mapping['account_id'] = self.ACCOUNT_ID 

1193 else: 

1194 var_mapping['access_key'] = mapping.get( 

1195 'access_key', self.ACCESS_KEY 

1196 ) 

1197 var_mapping['secret_key'] = mapping.get( 

1198 'secret_key', self.SECRET_KEY 

1199 ) 

1200 var_mapping['token'] = mapping.get('token', self.TOKENS) 

1201 if not isinstance(var_mapping['token'], list): 

1202 var_mapping['token'] = [var_mapping['token']] 

1203 var_mapping['expiry_time'] = mapping.get( 

1204 'expiry_time', self.EXPIRY_TIME 

1205 ) 

1206 var_mapping['account_id'] = mapping.get( 

1207 'account_id', self.ACCOUNT_ID 

1208 ) 

1209 return var_mapping 

1210 

1211 def load(self): 

1212 """ 

1213 Search for credentials in explicit environment variables. 

1214 """ 

1215 

1216 access_key = self.environ.get(self._mapping['access_key'], '') 

1217 

1218 if access_key: 

1219 logger.info('Found credentials in environment variables.') 

1220 fetcher = self._create_credentials_fetcher() 

1221 credentials = fetcher(require_expiry=False) 

1222 

1223 expiry_time = credentials['expiry_time'] 

1224 if expiry_time is not None: 

1225 expiry_time = parse(expiry_time) 

1226 return RefreshableCredentials( 

1227 credentials['access_key'], 

1228 credentials['secret_key'], 

1229 credentials['token'], 

1230 expiry_time, 

1231 refresh_using=fetcher, 

1232 method=self.METHOD, 

1233 account_id=credentials['account_id'], 

1234 ) 

1235 

1236 return Credentials( 

1237 credentials['access_key'], 

1238 credentials['secret_key'], 

1239 credentials['token'], 

1240 method=self.METHOD, 

1241 account_id=credentials['account_id'], 

1242 ) 

1243 else: 

1244 return None 

1245 

1246 def _create_credentials_fetcher(self): 

1247 mapping = self._mapping 

1248 method = self.METHOD 

1249 environ = self.environ 

1250 

1251 def fetch_credentials(require_expiry=True): 

1252 credentials = {} 

1253 

1254 access_key = environ.get(mapping['access_key'], '') 

1255 if not access_key: 

1256 raise PartialCredentialsError( 

1257 provider=method, cred_var=mapping['access_key'] 

1258 ) 

1259 credentials['access_key'] = access_key 

1260 

1261 secret_key = environ.get(mapping['secret_key'], '') 

1262 if not secret_key: 

1263 raise PartialCredentialsError( 

1264 provider=method, cred_var=mapping['secret_key'] 

1265 ) 

1266 credentials['secret_key'] = secret_key 

1267 

1268 credentials['token'] = None 

1269 for token_env_var in mapping['token']: 

1270 token = environ.get(token_env_var, '') 

1271 if token: 

1272 credentials['token'] = token 

1273 break 

1274 

1275 credentials['expiry_time'] = None 

1276 expiry_time = environ.get(mapping['expiry_time'], '') 

1277 if expiry_time: 

1278 credentials['expiry_time'] = expiry_time 

1279 if require_expiry and not expiry_time: 

1280 raise PartialCredentialsError( 

1281 provider=method, cred_var=mapping['expiry_time'] 

1282 ) 

1283 

1284 credentials['account_id'] = None 

1285 account_id = environ.get(mapping['account_id'], '') 

1286 if account_id: 

1287 credentials['account_id'] = account_id 

1288 

1289 return credentials 

1290 

1291 return fetch_credentials 

1292 

1293 

1294class OriginalEC2Provider(CredentialProvider): 

1295 METHOD = 'ec2-credentials-file' 

1296 CANONICAL_NAME = 'Ec2Config' 

1297 

1298 CRED_FILE_ENV = 'AWS_CREDENTIAL_FILE' 

1299 ACCESS_KEY = 'AWSAccessKeyId' 

1300 SECRET_KEY = 'AWSSecretKey' 

1301 

1302 def __init__(self, environ=None, parser=None): 

1303 if environ is None: 

1304 environ = os.environ 

1305 if parser is None: 

1306 parser = parse_key_val_file 

1307 self._environ = environ 

1308 self._parser = parser 

1309 

1310 def load(self): 

1311 """ 

1312 Search for a credential file used by original EC2 CLI tools. 

1313 """ 

1314 if 'AWS_CREDENTIAL_FILE' in self._environ: 

1315 full_path = os.path.expanduser( 

1316 self._environ['AWS_CREDENTIAL_FILE'] 

1317 ) 

1318 creds = self._parser(full_path) 

1319 if self.ACCESS_KEY in creds: 

1320 logger.info('Found credentials in AWS_CREDENTIAL_FILE.') 

1321 access_key = creds[self.ACCESS_KEY] 

1322 secret_key = creds[self.SECRET_KEY] 

1323 # EC2 creds file doesn't support session tokens. 

1324 return Credentials(access_key, secret_key, method=self.METHOD) 

1325 else: 

1326 return None 

1327 

1328 

1329class SharedCredentialProvider(CredentialProvider): 

1330 METHOD = 'shared-credentials-file' 

1331 CANONICAL_NAME = 'SharedCredentials' 

1332 

1333 ACCESS_KEY = 'aws_access_key_id' 

1334 SECRET_KEY = 'aws_secret_access_key' 

1335 # Same deal as the EnvProvider above. Botocore originally supported 

1336 # aws_security_token, but the SDKs are standardizing on aws_session_token 

1337 # so we support both. 

1338 TOKENS = ['aws_security_token', 'aws_session_token'] 

1339 ACCOUNT_ID = 'aws_account_id' 

1340 

1341 def __init__(self, creds_filename, profile_name=None, ini_parser=None): 

1342 self._creds_filename = creds_filename 

1343 if profile_name is None: 

1344 profile_name = 'default' 

1345 self._profile_name = profile_name 

1346 if ini_parser is None: 

1347 ini_parser = botocore.configloader.raw_config_parse 

1348 self._ini_parser = ini_parser 

1349 

1350 def load(self): 

1351 try: 

1352 available_creds = self._ini_parser(self._creds_filename) 

1353 except ConfigNotFound: 

1354 return None 

1355 if self._profile_name in available_creds: 

1356 config = available_creds[self._profile_name] 

1357 if self.ACCESS_KEY in config: 

1358 logger.info( 

1359 "Found credentials in shared credentials file: %s", 

1360 self._creds_filename, 

1361 ) 

1362 access_key, secret_key = self._extract_creds_from_mapping( 

1363 config, self.ACCESS_KEY, self.SECRET_KEY 

1364 ) 

1365 token = self._get_session_token(config) 

1366 account_id = self._get_account_id(config) 

1367 return Credentials( 

1368 access_key, 

1369 secret_key, 

1370 token, 

1371 method=self.METHOD, 

1372 account_id=account_id, 

1373 ) 

1374 

1375 def _get_session_token(self, config): 

1376 for token_envvar in self.TOKENS: 

1377 if token_envvar in config: 

1378 return config[token_envvar] 

1379 

1380 def _get_account_id(self, config): 

1381 return config.get(self.ACCOUNT_ID) 

1382 

1383 

1384class ConfigProvider(CredentialProvider): 

1385 """INI based config provider with profile sections.""" 

1386 

1387 METHOD = 'config-file' 

1388 CANONICAL_NAME = 'SharedConfig' 

1389 

1390 ACCESS_KEY = 'aws_access_key_id' 

1391 SECRET_KEY = 'aws_secret_access_key' 

1392 # Same deal as the EnvProvider above. Botocore originally supported 

1393 # aws_security_token, but the SDKs are standardizing on aws_session_token 

1394 # so we support both. 

1395 TOKENS = ['aws_security_token', 'aws_session_token'] 

1396 ACCOUNT_ID = 'aws_account_id' 

1397 

1398 def __init__(self, config_filename, profile_name, config_parser=None): 

1399 """ 

1400 

1401 :param config_filename: The session configuration scoped to the current 

1402 profile. This is available via ``session.config``. 

1403 :param profile_name: The name of the current profile. 

1404 :param config_parser: A config parser callable. 

1405 

1406 """ 

1407 self._config_filename = config_filename 

1408 self._profile_name = profile_name 

1409 if config_parser is None: 

1410 config_parser = botocore.configloader.load_config 

1411 self._config_parser = config_parser 

1412 

1413 def load(self): 

1414 """ 

1415 If there is are credentials in the configuration associated with 

1416 the session, use those. 

1417 """ 

1418 try: 

1419 full_config = self._config_parser(self._config_filename) 

1420 except ConfigNotFound: 

1421 return None 

1422 if self._profile_name in full_config['profiles']: 

1423 profile_config = full_config['profiles'][self._profile_name] 

1424 if self.ACCESS_KEY in profile_config: 

1425 logger.info( 

1426 "Credentials found in config file: %s", 

1427 self._config_filename, 

1428 ) 

1429 access_key, secret_key = self._extract_creds_from_mapping( 

1430 profile_config, self.ACCESS_KEY, self.SECRET_KEY 

1431 ) 

1432 token = self._get_session_token(profile_config) 

1433 account_id = self._get_account_id(profile_config) 

1434 return Credentials( 

1435 access_key, 

1436 secret_key, 

1437 token, 

1438 method=self.METHOD, 

1439 account_id=account_id, 

1440 ) 

1441 else: 

1442 return None 

1443 

1444 def _get_session_token(self, profile_config): 

1445 for token_name in self.TOKENS: 

1446 if token_name in profile_config: 

1447 return profile_config[token_name] 

1448 

1449 def _get_account_id(self, config): 

1450 return config.get(self.ACCOUNT_ID) 

1451 

1452 

1453class BotoProvider(CredentialProvider): 

1454 METHOD = 'boto-config' 

1455 CANONICAL_NAME = 'Boto2Config' 

1456 

1457 BOTO_CONFIG_ENV = 'BOTO_CONFIG' 

1458 DEFAULT_CONFIG_FILENAMES = ['/etc/boto.cfg', '~/.boto'] 

1459 ACCESS_KEY = 'aws_access_key_id' 

1460 SECRET_KEY = 'aws_secret_access_key' 

1461 

1462 def __init__(self, environ=None, ini_parser=None): 

1463 if environ is None: 

1464 environ = os.environ 

1465 if ini_parser is None: 

1466 ini_parser = botocore.configloader.raw_config_parse 

1467 self._environ = environ 

1468 self._ini_parser = ini_parser 

1469 

1470 def load(self): 

1471 """ 

1472 Look for credentials in boto config file. 

1473 """ 

1474 if self.BOTO_CONFIG_ENV in self._environ: 

1475 potential_locations = [self._environ[self.BOTO_CONFIG_ENV]] 

1476 else: 

1477 potential_locations = self.DEFAULT_CONFIG_FILENAMES 

1478 for filename in potential_locations: 

1479 try: 

1480 config = self._ini_parser(filename) 

1481 except ConfigNotFound: 

1482 # Move on to the next potential config file name. 

1483 continue 

1484 if 'Credentials' in config: 

1485 credentials = config['Credentials'] 

1486 if self.ACCESS_KEY in credentials: 

1487 logger.info( 

1488 "Found credentials in boto config file: %s", filename 

1489 ) 

1490 access_key, secret_key = self._extract_creds_from_mapping( 

1491 credentials, self.ACCESS_KEY, self.SECRET_KEY 

1492 ) 

1493 return Credentials( 

1494 access_key, secret_key, method=self.METHOD 

1495 ) 

1496 

1497 

1498class AssumeRoleProvider(CredentialProvider): 

1499 METHOD = 'assume-role' 

1500 # The AssumeRole provider is logically part of the SharedConfig and 

1501 # SharedCredentials providers. Since the purpose of the canonical name 

1502 # is to provide cross-sdk compatibility, calling code will need to be 

1503 # aware that either of those providers should be tied to the AssumeRole 

1504 # provider as much as possible. 

1505 CANONICAL_NAME = None 

1506 ROLE_CONFIG_VAR = 'role_arn' 

1507 WEB_IDENTITY_TOKE_FILE_VAR = 'web_identity_token_file' 

1508 # Credentials are considered expired (and will be refreshed) once the total 

1509 # remaining time left until the credentials expires is less than the 

1510 # EXPIRY_WINDOW. 

1511 EXPIRY_WINDOW_SECONDS = 60 * 15 

1512 

1513 def __init__( 

1514 self, 

1515 load_config, 

1516 client_creator, 

1517 cache, 

1518 profile_name, 

1519 prompter=getpass.getpass, 

1520 credential_sourcer=None, 

1521 profile_provider_builder=None, 

1522 ): 

1523 """ 

1524 :type load_config: callable 

1525 :param load_config: A function that accepts no arguments, and 

1526 when called, will return the full configuration dictionary 

1527 for the session (``session.full_config``). 

1528 

1529 :type client_creator: callable 

1530 :param client_creator: A factory function that will create 

1531 a client when called. Has the same interface as 

1532 ``botocore.session.Session.create_client``. 

1533 

1534 :type cache: dict 

1535 :param cache: An object that supports ``__getitem__``, 

1536 ``__setitem__``, and ``__contains__``. An example 

1537 of this is the ``JSONFileCache`` class in the CLI. 

1538 

1539 :type profile_name: str 

1540 :param profile_name: The name of the profile. 

1541 

1542 :type prompter: callable 

1543 :param prompter: A callable that returns input provided 

1544 by the user (i.e raw_input, getpass.getpass, etc.). 

1545 

1546 :type credential_sourcer: CanonicalNameCredentialSourcer 

1547 :param credential_sourcer: A credential provider that takes a 

1548 configuration, which is used to provide the source credentials 

1549 for the STS call. 

1550 """ 

1551 #: The cache used to first check for assumed credentials. 

1552 #: This is checked before making the AssumeRole API 

1553 #: calls and can be useful if you have short lived 

1554 #: scripts and you'd like to avoid calling AssumeRole 

1555 #: until the credentials are expired. 

1556 self.cache = cache 

1557 self._load_config = load_config 

1558 # client_creator is a callable that creates function. 

1559 # It's basically session.create_client 

1560 self._client_creator = client_creator 

1561 self._profile_name = profile_name 

1562 self._prompter = prompter 

1563 # The _loaded_config attribute will be populated from the 

1564 # load_config() function once the configuration is actually 

1565 # loaded. The reason we go through all this instead of just 

1566 # requiring that the loaded_config be passed to us is to that 

1567 # we can defer configuration loaded until we actually try 

1568 # to load credentials (as opposed to when the object is 

1569 # instantiated). 

1570 self._loaded_config = {} 

1571 self._credential_sourcer = credential_sourcer 

1572 self._profile_provider_builder = profile_provider_builder 

1573 self._visited_profiles = [self._profile_name] 

1574 

1575 def load(self): 

1576 self._loaded_config = self._load_config() 

1577 profiles = self._loaded_config.get('profiles', {}) 

1578 profile = profiles.get(self._profile_name, {}) 

1579 if self._has_assume_role_config_vars(profile): 

1580 return self._load_creds_via_assume_role(self._profile_name) 

1581 

1582 def _has_assume_role_config_vars(self, profile): 

1583 return ( 

1584 self.ROLE_CONFIG_VAR in profile 

1585 and 

1586 # We need to ensure this provider doesn't look at a profile when 

1587 # the profile has configuration for web identity. Simply relying on 

1588 # the order in the credential chain is insufficient as it doesn't 

1589 # prevent the case when we're doing an assume role chain. 

1590 self.WEB_IDENTITY_TOKE_FILE_VAR not in profile 

1591 ) 

1592 

1593 def _load_creds_via_assume_role(self, profile_name): 

1594 role_config = self._get_role_config(profile_name) 

1595 source_credentials = self._resolve_source_credentials( 

1596 role_config, profile_name 

1597 ) 

1598 

1599 extra_args = {} 

1600 role_session_name = role_config.get('role_session_name') 

1601 if role_session_name is not None: 

1602 extra_args['RoleSessionName'] = role_session_name 

1603 

1604 external_id = role_config.get('external_id') 

1605 if external_id is not None: 

1606 extra_args['ExternalId'] = external_id 

1607 

1608 mfa_serial = role_config.get('mfa_serial') 

1609 if mfa_serial is not None: 

1610 extra_args['SerialNumber'] = mfa_serial 

1611 

1612 duration_seconds = role_config.get('duration_seconds') 

1613 if duration_seconds is not None: 

1614 extra_args['DurationSeconds'] = duration_seconds 

1615 

1616 fetcher = AssumeRoleCredentialFetcher( 

1617 client_creator=self._client_creator, 

1618 source_credentials=source_credentials, 

1619 role_arn=role_config['role_arn'], 

1620 extra_args=extra_args, 

1621 mfa_prompter=self._prompter, 

1622 cache=self.cache, 

1623 ) 

1624 refresher = fetcher.fetch_credentials 

1625 if mfa_serial is not None: 

1626 refresher = create_mfa_serial_refresher(refresher) 

1627 

1628 # The initial credentials are empty and the expiration time is set 

1629 # to now so that we can delay the call to assume role until it is 

1630 # strictly needed. 

1631 return DeferredRefreshableCredentials( 

1632 method=self.METHOD, 

1633 refresh_using=refresher, 

1634 time_fetcher=_local_now, 

1635 ) 

1636 

1637 def _get_role_config(self, profile_name): 

1638 """Retrieves and validates the role configuration for the profile.""" 

1639 profiles = self._loaded_config.get('profiles', {}) 

1640 

1641 profile = profiles[profile_name] 

1642 source_profile = profile.get('source_profile') 

1643 role_arn = profile['role_arn'] 

1644 credential_source = profile.get('credential_source') 

1645 mfa_serial = profile.get('mfa_serial') 

1646 external_id = profile.get('external_id') 

1647 role_session_name = profile.get('role_session_name') 

1648 duration_seconds = profile.get('duration_seconds') 

1649 

1650 role_config = { 

1651 'role_arn': role_arn, 

1652 'external_id': external_id, 

1653 'mfa_serial': mfa_serial, 

1654 'role_session_name': role_session_name, 

1655 'source_profile': source_profile, 

1656 'credential_source': credential_source, 

1657 } 

1658 

1659 if duration_seconds is not None: 

1660 try: 

1661 role_config['duration_seconds'] = int(duration_seconds) 

1662 except ValueError: 

1663 pass 

1664 

1665 # Either the credential source or the source profile must be 

1666 # specified, but not both. 

1667 if credential_source is not None and source_profile is not None: 

1668 raise InvalidConfigError( 

1669 error_msg=( 

1670 f'The profile "{profile_name}" contains both ' 

1671 'source_profile and credential_source.' 

1672 ) 

1673 ) 

1674 elif credential_source is None and source_profile is None: 

1675 raise PartialCredentialsError( 

1676 provider=self.METHOD, 

1677 cred_var='source_profile or credential_source', 

1678 ) 

1679 elif credential_source is not None: 

1680 self._validate_credential_source(profile_name, credential_source) 

1681 else: 

1682 self._validate_source_profile(profile_name, source_profile) 

1683 

1684 return role_config 

1685 

1686 def _validate_credential_source(self, parent_profile, credential_source): 

1687 if self._credential_sourcer is None: 

1688 raise InvalidConfigError( 

1689 error_msg=( 

1690 f"The credential_source \"{credential_source}\" is specified " 

1691 f"in profile \"{parent_profile}\", " 

1692 f"but no source provider was configured." 

1693 ) 

1694 ) 

1695 if not self._credential_sourcer.is_supported(credential_source): 

1696 raise InvalidConfigError( 

1697 error_msg=( 

1698 f"The credential source \"{credential_source}\" referenced " 

1699 f"in profile \"{parent_profile}\" is not valid." 

1700 ) 

1701 ) 

1702 

1703 def _source_profile_has_credentials(self, profile): 

1704 return any( 

1705 [ 

1706 self._has_static_credentials(profile), 

1707 self._has_assume_role_config_vars(profile), 

1708 ] 

1709 ) 

1710 

1711 def _validate_source_profile( 

1712 self, parent_profile_name, source_profile_name 

1713 ): 

1714 profiles = self._loaded_config.get('profiles', {}) 

1715 if source_profile_name not in profiles: 

1716 raise InvalidConfigError( 

1717 error_msg=( 

1718 f"The source_profile \"{source_profile_name}\" referenced in " 

1719 f"the profile \"{parent_profile_name}\" does not exist." 

1720 ) 

1721 ) 

1722 

1723 source_profile = profiles[source_profile_name] 

1724 

1725 # Make sure we aren't going into an infinite loop. If we haven't 

1726 # visited the profile yet, we're good. 

1727 if source_profile_name not in self._visited_profiles: 

1728 return 

1729 

1730 # If we have visited the profile and the profile isn't simply 

1731 # referencing itself, that's an infinite loop. 

1732 if source_profile_name != parent_profile_name: 

1733 raise InfiniteLoopConfigError( 

1734 source_profile=source_profile_name, 

1735 visited_profiles=self._visited_profiles, 

1736 ) 

1737 

1738 # A profile is allowed to reference itself so that it can source 

1739 # static credentials and have configuration all in the same 

1740 # profile. This will only ever work for the top level assume 

1741 # role because the static credentials will otherwise take 

1742 # precedence. 

1743 if not self._has_static_credentials(source_profile): 

1744 raise InfiniteLoopConfigError( 

1745 source_profile=source_profile_name, 

1746 visited_profiles=self._visited_profiles, 

1747 ) 

1748 

1749 def _has_static_credentials(self, profile): 

1750 static_keys = ['aws_secret_access_key', 'aws_access_key_id'] 

1751 return any(static_key in profile for static_key in static_keys) 

1752 

1753 def _resolve_source_credentials(self, role_config, profile_name): 

1754 credential_source = role_config.get('credential_source') 

1755 if credential_source is not None: 

1756 return self._resolve_credentials_from_source( 

1757 credential_source, profile_name 

1758 ) 

1759 

1760 source_profile = role_config['source_profile'] 

1761 self._visited_profiles.append(source_profile) 

1762 return self._resolve_credentials_from_profile(source_profile) 

1763 

1764 def _resolve_credentials_from_profile(self, profile_name): 

1765 profiles = self._loaded_config.get('profiles', {}) 

1766 profile = profiles[profile_name] 

1767 

1768 if ( 

1769 self._has_static_credentials(profile) 

1770 and not self._profile_provider_builder 

1771 ): 

1772 # This is only here for backwards compatibility. If this provider 

1773 # isn't given a profile provider builder we still want to be able 

1774 # to handle the basic static credential case as we would before the 

1775 # profile provider builder parameter was added. 

1776 return self._resolve_static_credentials_from_profile(profile) 

1777 elif self._has_static_credentials( 

1778 profile 

1779 ) or not self._has_assume_role_config_vars(profile): 

1780 profile_providers = self._profile_provider_builder.providers( 

1781 profile_name=profile_name, 

1782 disable_env_vars=True, 

1783 ) 

1784 profile_chain = CredentialResolver(profile_providers) 

1785 credentials = profile_chain.load_credentials() 

1786 if credentials is None: 

1787 error_message = ( 

1788 'The source profile "%s" must have credentials.' 

1789 ) 

1790 raise InvalidConfigError( 

1791 error_msg=error_message % profile_name, 

1792 ) 

1793 return credentials 

1794 

1795 return self._load_creds_via_assume_role(profile_name) 

1796 

1797 def _resolve_static_credentials_from_profile(self, profile): 

1798 try: 

1799 return Credentials( 

1800 access_key=profile['aws_access_key_id'], 

1801 secret_key=profile['aws_secret_access_key'], 

1802 token=profile.get('aws_session_token'), 

1803 ) 

1804 except KeyError as e: 

1805 raise PartialCredentialsError( 

1806 provider=self.METHOD, cred_var=str(e) 

1807 ) 

1808 

1809 def _resolve_credentials_from_source( 

1810 self, credential_source, profile_name 

1811 ): 

1812 credentials = self._credential_sourcer.source_credentials( 

1813 credential_source 

1814 ) 

1815 if credentials is None: 

1816 raise CredentialRetrievalError( 

1817 provider=credential_source, 

1818 error_msg=( 

1819 'No credentials found in credential_source referenced ' 

1820 f'in profile {profile_name}' 

1821 ), 

1822 ) 

1823 return credentials 

1824 

1825 

1826class AssumeRoleWithWebIdentityProvider(CredentialProvider): 

1827 METHOD = 'assume-role-with-web-identity' 

1828 CANONICAL_NAME = None 

1829 _CONFIG_TO_ENV_VAR = { 

1830 'web_identity_token_file': 'AWS_WEB_IDENTITY_TOKEN_FILE', 

1831 'role_session_name': 'AWS_ROLE_SESSION_NAME', 

1832 'role_arn': 'AWS_ROLE_ARN', 

1833 } 

1834 

1835 def __init__( 

1836 self, 

1837 load_config, 

1838 client_creator, 

1839 profile_name, 

1840 cache=None, 

1841 disable_env_vars=False, 

1842 token_loader_cls=None, 

1843 ): 

1844 self.cache = cache 

1845 self._load_config = load_config 

1846 self._client_creator = client_creator 

1847 self._profile_name = profile_name 

1848 self._profile_config = None 

1849 self._disable_env_vars = disable_env_vars 

1850 if token_loader_cls is None: 

1851 token_loader_cls = FileWebIdentityTokenLoader 

1852 self._token_loader_cls = token_loader_cls 

1853 

1854 def load(self): 

1855 return self._assume_role_with_web_identity() 

1856 

1857 def _get_profile_config(self, key): 

1858 if self._profile_config is None: 

1859 loaded_config = self._load_config() 

1860 profiles = loaded_config.get('profiles', {}) 

1861 self._profile_config = profiles.get(self._profile_name, {}) 

1862 return self._profile_config.get(key) 

1863 

1864 def _get_env_config(self, key): 

1865 if self._disable_env_vars: 

1866 return None 

1867 env_key = self._CONFIG_TO_ENV_VAR.get(key) 

1868 if env_key and env_key in os.environ: 

1869 return os.environ[env_key] 

1870 return None 

1871 

1872 def _get_config(self, key): 

1873 env_value = self._get_env_config(key) 

1874 if env_value is not None: 

1875 return env_value 

1876 return self._get_profile_config(key) 

1877 

1878 def _assume_role_with_web_identity(self): 

1879 token_path = self._get_config('web_identity_token_file') 

1880 if not token_path: 

1881 return None 

1882 token_loader = self._token_loader_cls(token_path) 

1883 

1884 role_arn = self._get_config('role_arn') 

1885 if not role_arn: 

1886 error_msg = ( 

1887 'The provided profile or the current environment is ' 

1888 'configured to assume role with web identity but has no ' 

1889 'role ARN configured. Ensure that the profile has the role_arn' 

1890 'configuration set or the AWS_ROLE_ARN env var is set.' 

1891 ) 

1892 raise InvalidConfigError(error_msg=error_msg) 

1893 

1894 extra_args = {} 

1895 role_session_name = self._get_config('role_session_name') 

1896 if role_session_name is not None: 

1897 extra_args['RoleSessionName'] = role_session_name 

1898 

1899 fetcher = AssumeRoleWithWebIdentityCredentialFetcher( 

1900 client_creator=self._client_creator, 

1901 web_identity_token_loader=token_loader, 

1902 role_arn=role_arn, 

1903 extra_args=extra_args, 

1904 cache=self.cache, 

1905 ) 

1906 # The initial credentials are empty and the expiration time is set 

1907 # to now so that we can delay the call to assume role until it is 

1908 # strictly needed. 

1909 return DeferredRefreshableCredentials( 

1910 method=self.METHOD, 

1911 refresh_using=fetcher.fetch_credentials, 

1912 ) 

1913 

1914 

1915class CanonicalNameCredentialSourcer: 

1916 def __init__(self, providers): 

1917 self._providers = providers 

1918 

1919 def is_supported(self, source_name): 

1920 """Validates a given source name. 

1921 

1922 :type source_name: str 

1923 :param source_name: The value of credential_source in the config 

1924 file. This is the canonical name of the credential provider. 

1925 

1926 :rtype: bool 

1927 :returns: True if the credential provider is supported, 

1928 False otherwise. 

1929 """ 

1930 return source_name in [p.CANONICAL_NAME for p in self._providers] 

1931 

1932 def source_credentials(self, source_name): 

1933 """Loads source credentials based on the provided configuration. 

1934 

1935 :type source_name: str 

1936 :param source_name: The value of credential_source in the config 

1937 file. This is the canonical name of the credential provider. 

1938 

1939 :rtype: Credentials 

1940 """ 

1941 source = self._get_provider(source_name) 

1942 if isinstance(source, CredentialResolver): 

1943 return source.load_credentials() 

1944 return source.load() 

1945 

1946 def _get_provider(self, canonical_name): 

1947 """Return a credential provider by its canonical name. 

1948 

1949 :type canonical_name: str 

1950 :param canonical_name: The canonical name of the provider. 

1951 

1952 :raises UnknownCredentialError: Raised if no 

1953 credential provider by the provided name 

1954 is found. 

1955 """ 

1956 provider = self._get_provider_by_canonical_name(canonical_name) 

1957 

1958 # The AssumeRole provider should really be part of the SharedConfig 

1959 # provider rather than being its own thing, but it is not. It is 

1960 # effectively part of both the SharedConfig provider and the 

1961 # SharedCredentials provider now due to the way it behaves. 

1962 # Therefore if we want either of those providers we should return 

1963 # the AssumeRole provider with it. 

1964 if canonical_name.lower() in ['sharedconfig', 'sharedcredentials']: 

1965 assume_role_provider = self._get_provider_by_method('assume-role') 

1966 if assume_role_provider is not None: 

1967 # The SharedConfig or SharedCredentials provider may not be 

1968 # present if it was removed for some reason, but the 

1969 # AssumeRole provider could still be present. In that case, 

1970 # return the assume role provider by itself. 

1971 if provider is None: 

1972 return assume_role_provider 

1973 

1974 # If both are present, return them both as a 

1975 # CredentialResolver so that calling code can treat them as 

1976 # a single entity. 

1977 return CredentialResolver([assume_role_provider, provider]) 

1978 

1979 if provider is None: 

1980 raise UnknownCredentialError(name=canonical_name) 

1981 

1982 return provider 

1983 

1984 def _get_provider_by_canonical_name(self, canonical_name): 

1985 """Return a credential provider by its canonical name. 

1986 

1987 This function is strict, it does not attempt to address 

1988 compatibility issues. 

1989 """ 

1990 for provider in self._providers: 

1991 name = provider.CANONICAL_NAME 

1992 # Canonical names are case-insensitive 

1993 if name and name.lower() == canonical_name.lower(): 

1994 return provider 

1995 

1996 def _get_provider_by_method(self, method): 

1997 """Return a credential provider by its METHOD name.""" 

1998 for provider in self._providers: 

1999 if provider.METHOD == method: 

2000 return provider 

2001 

2002 

2003class ContainerProvider(CredentialProvider): 

2004 METHOD = 'container-role' 

2005 CANONICAL_NAME = 'EcsContainer' 

2006 ENV_VAR = 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI' 

2007 ENV_VAR_FULL = 'AWS_CONTAINER_CREDENTIALS_FULL_URI' 

2008 ENV_VAR_AUTH_TOKEN = 'AWS_CONTAINER_AUTHORIZATION_TOKEN' 

2009 ENV_VAR_AUTH_TOKEN_FILE = 'AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE' 

2010 

2011 def __init__(self, environ=None, fetcher=None): 

2012 if environ is None: 

2013 environ = os.environ 

2014 if fetcher is None: 

2015 fetcher = ContainerMetadataFetcher() 

2016 self._environ = environ 

2017 self._fetcher = fetcher 

2018 

2019 def load(self): 

2020 # This cred provider is only triggered if the self.ENV_VAR is set, 

2021 # which only happens if you opt into this feature. 

2022 if self.ENV_VAR in self._environ or self.ENV_VAR_FULL in self._environ: 

2023 return self._retrieve_or_fail() 

2024 

2025 def _retrieve_or_fail(self): 

2026 if self._provided_relative_uri(): 

2027 full_uri = self._fetcher.full_url(self._environ[self.ENV_VAR]) 

2028 else: 

2029 full_uri = self._environ[self.ENV_VAR_FULL] 

2030 fetcher = self._create_fetcher(full_uri) 

2031 creds = fetcher() 

2032 return RefreshableCredentials( 

2033 access_key=creds['access_key'], 

2034 secret_key=creds['secret_key'], 

2035 token=creds['token'], 

2036 method=self.METHOD, 

2037 expiry_time=_parse_if_needed(creds['expiry_time']), 

2038 refresh_using=fetcher, 

2039 account_id=creds.get('account_id'), 

2040 ) 

2041 

2042 def _build_headers(self): 

2043 auth_token = None 

2044 if self.ENV_VAR_AUTH_TOKEN_FILE in self._environ: 

2045 auth_token_file_path = self._environ[self.ENV_VAR_AUTH_TOKEN_FILE] 

2046 with open(auth_token_file_path) as token_file: 

2047 auth_token = token_file.read() 

2048 elif self.ENV_VAR_AUTH_TOKEN in self._environ: 

2049 auth_token = self._environ[self.ENV_VAR_AUTH_TOKEN] 

2050 if auth_token is not None: 

2051 self._validate_auth_token(auth_token) 

2052 return {'Authorization': auth_token} 

2053 

2054 def _validate_auth_token(self, auth_token): 

2055 if "\r" in auth_token or "\n" in auth_token: 

2056 raise ValueError("Auth token value is not a legal header value") 

2057 

2058 def _create_fetcher(self, full_uri, *args, **kwargs): 

2059 def fetch_creds(): 

2060 try: 

2061 headers = self._build_headers() 

2062 response = self._fetcher.retrieve_full_uri( 

2063 full_uri, headers=headers 

2064 ) 

2065 register_feature_id('CREDENTIALS_HTTP') 

2066 except MetadataRetrievalError as e: 

2067 logger.debug( 

2068 "Error retrieving container metadata: %s", e, exc_info=True 

2069 ) 

2070 raise CredentialRetrievalError( 

2071 provider=self.METHOD, error_msg=str(e) 

2072 ) 

2073 return { 

2074 'access_key': response['AccessKeyId'], 

2075 'secret_key': response['SecretAccessKey'], 

2076 'token': response['Token'], 

2077 'expiry_time': response['Expiration'], 

2078 'account_id': response.get('AccountId'), 

2079 } 

2080 

2081 return fetch_creds 

2082 

2083 def _provided_relative_uri(self): 

2084 return self.ENV_VAR in self._environ 

2085 

2086 

2087class CredentialResolver: 

2088 def __init__(self, providers): 

2089 """ 

2090 

2091 :param providers: A list of ``CredentialProvider`` instances. 

2092 

2093 """ 

2094 self.providers = providers 

2095 

2096 def insert_before(self, name, credential_provider): 

2097 """ 

2098 Inserts a new instance of ``CredentialProvider`` into the chain that 

2099 will be tried before an existing one. 

2100 

2101 :param name: The short name of the credentials you'd like to insert the 

2102 new credentials before. (ex. ``env`` or ``config``). Existing names 

2103 & ordering can be discovered via ``self.available_methods``. 

2104 :type name: string 

2105 

2106 :param cred_instance: An instance of the new ``Credentials`` object 

2107 you'd like to add to the chain. 

2108 :type cred_instance: A subclass of ``Credentials`` 

2109 """ 

2110 try: 

2111 offset = [p.METHOD for p in self.providers].index(name) 

2112 except ValueError: 

2113 raise UnknownCredentialError(name=name) 

2114 self.providers.insert(offset, credential_provider) 

2115 

2116 def insert_after(self, name, credential_provider): 

2117 """ 

2118 Inserts a new type of ``Credentials`` instance into the chain that will 

2119 be tried after an existing one. 

2120 

2121 :param name: The short name of the credentials you'd like to insert the 

2122 new credentials after. (ex. ``env`` or ``config``). Existing names 

2123 & ordering can be discovered via ``self.available_methods``. 

2124 :type name: string 

2125 

2126 :param cred_instance: An instance of the new ``Credentials`` object 

2127 you'd like to add to the chain. 

2128 :type cred_instance: A subclass of ``Credentials`` 

2129 """ 

2130 offset = self._get_provider_offset(name) 

2131 self.providers.insert(offset + 1, credential_provider) 

2132 

2133 def remove(self, name): 

2134 """ 

2135 Removes a given ``Credentials`` instance from the chain. 

2136 

2137 :param name: The short name of the credentials instance to remove. 

2138 :type name: string 

2139 """ 

2140 available_methods = [p.METHOD for p in self.providers] 

2141 if name not in available_methods: 

2142 # It's not present. Fail silently. 

2143 return 

2144 

2145 offset = available_methods.index(name) 

2146 self.providers.pop(offset) 

2147 

2148 def get_provider(self, name): 

2149 """Return a credential provider by name. 

2150 

2151 :type name: str 

2152 :param name: The name of the provider. 

2153 

2154 :raises UnknownCredentialError: Raised if no 

2155 credential provider by the provided name 

2156 is found. 

2157 """ 

2158 return self.providers[self._get_provider_offset(name)] 

2159 

2160 def _get_provider_offset(self, name): 

2161 try: 

2162 return [p.METHOD for p in self.providers].index(name) 

2163 except ValueError: 

2164 raise UnknownCredentialError(name=name) 

2165 

2166 def load_credentials(self): 

2167 """ 

2168 Goes through the credentials chain, returning the first ``Credentials`` 

2169 that could be loaded. 

2170 """ 

2171 # First provider to return a non-None response wins. 

2172 for provider in self.providers: 

2173 logger.debug("Looking for credentials via: %s", provider.METHOD) 

2174 creds = provider.load() 

2175 if creds is not None: 

2176 return creds 

2177 

2178 # If we got here, no credentials could be found. 

2179 # This feels like it should be an exception, but historically, ``None`` 

2180 # is returned. 

2181 # 

2182 # +1 

2183 # -js 

2184 return None 

2185 

2186 

2187class SSOCredentialFetcher(CachedCredentialFetcher): 

2188 _UTC_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' 

2189 

2190 def __init__( 

2191 self, 

2192 start_url, 

2193 sso_region, 

2194 role_name, 

2195 account_id, 

2196 client_creator, 

2197 token_loader=None, 

2198 cache=None, 

2199 expiry_window_seconds=None, 

2200 token_provider=None, 

2201 sso_session_name=None, 

2202 time_fetcher=_local_now, 

2203 ): 

2204 self._client_creator = client_creator 

2205 self._sso_region = sso_region 

2206 self._role_name = role_name 

2207 self._account_id = account_id 

2208 self._start_url = start_url 

2209 self._token_loader = token_loader 

2210 self._token_provider = token_provider 

2211 self._sso_session_name = sso_session_name 

2212 self._time_fetcher = time_fetcher 

2213 super().__init__(cache, expiry_window_seconds) 

2214 

2215 def _create_cache_key(self): 

2216 """Create a predictable cache key for the current configuration. 

2217 

2218 The cache key is intended to be compatible with file names. 

2219 """ 

2220 args = { 

2221 'roleName': self._role_name, 

2222 'accountId': self._account_id, 

2223 } 

2224 if self._sso_session_name: 

2225 args['sessionName'] = self._sso_session_name 

2226 else: 

2227 args['startUrl'] = self._start_url 

2228 # NOTE: It would be good to hoist this cache key construction logic 

2229 # into the CachedCredentialFetcher class as we should be consistent. 

2230 # Unfortunately, the current assume role fetchers that sub class don't 

2231 # pass separators resulting in non-minified JSON. In the long term, 

2232 # all fetchers should use the below caching scheme. 

2233 args = json.dumps(args, sort_keys=True, separators=(',', ':')) 

2234 argument_hash = sha1(args.encode('utf-8')).hexdigest() 

2235 return self._make_file_safe(argument_hash) 

2236 

2237 def _parse_timestamp(self, timestamp_ms): 

2238 # fromtimestamp expects seconds so: milliseconds / 1000 = seconds 

2239 timestamp_seconds = timestamp_ms / 1000.0 

2240 timestamp = datetime.datetime.fromtimestamp(timestamp_seconds, tzutc()) 

2241 return timestamp.strftime(self._UTC_DATE_FORMAT) 

2242 

2243 def _get_credentials(self): 

2244 """Get credentials by calling SSO get role credentials.""" 

2245 config = Config( 

2246 signature_version=UNSIGNED, 

2247 region_name=self._sso_region, 

2248 ) 

2249 client = self._client_creator('sso', config=config) 

2250 if self._token_provider: 

2251 initial_token_data = self._token_provider.load_token() 

2252 token = initial_token_data.get_frozen_token().token 

2253 else: 

2254 token_dict = self._token_loader(self._start_url) 

2255 token = token_dict['accessToken'] 

2256 

2257 # raise an UnauthorizedSSOTokenError if the loaded legacy token 

2258 # is expired to save a call to GetRoleCredentials with an 

2259 # expired token. 

2260 expiration = dateutil.parser.parse(token_dict['expiresAt']) 

2261 remaining = total_seconds(expiration - self._time_fetcher()) 

2262 if remaining <= 0: 

2263 raise UnauthorizedSSOTokenError() 

2264 

2265 kwargs = { 

2266 'roleName': self._role_name, 

2267 'accountId': self._account_id, 

2268 'accessToken': token, 

2269 } 

2270 try: 

2271 response = client.get_role_credentials(**kwargs) 

2272 except client.exceptions.UnauthorizedException: 

2273 raise UnauthorizedSSOTokenError() 

2274 credentials = response['roleCredentials'] 

2275 

2276 credentials = { 

2277 'ProviderType': 'sso', 

2278 'Credentials': { 

2279 'AccessKeyId': credentials['accessKeyId'], 

2280 'SecretAccessKey': credentials['secretAccessKey'], 

2281 'SessionToken': credentials['sessionToken'], 

2282 'Expiration': self._parse_timestamp(credentials['expiration']), 

2283 'AccountId': self._account_id, 

2284 }, 

2285 } 

2286 return credentials 

2287 

2288 

2289class SSOProvider(CredentialProvider): 

2290 METHOD = 'sso' 

2291 

2292 _SSO_TOKEN_CACHE_DIR = os.path.expanduser( 

2293 os.path.join('~', '.aws', 'sso', 'cache') 

2294 ) 

2295 _PROFILE_REQUIRED_CONFIG_VARS = ( 

2296 'sso_role_name', 

2297 'sso_account_id', 

2298 ) 

2299 _SSO_REQUIRED_CONFIG_VARS = ( 

2300 'sso_start_url', 

2301 'sso_region', 

2302 ) 

2303 _ALL_REQUIRED_CONFIG_VARS = ( 

2304 _PROFILE_REQUIRED_CONFIG_VARS + _SSO_REQUIRED_CONFIG_VARS 

2305 ) 

2306 

2307 def __init__( 

2308 self, 

2309 load_config, 

2310 client_creator, 

2311 profile_name, 

2312 cache=None, 

2313 token_cache=None, 

2314 token_provider=None, 

2315 ): 

2316 if token_cache is None: 

2317 token_cache = JSONFileCache(self._SSO_TOKEN_CACHE_DIR) 

2318 self._token_cache = token_cache 

2319 self._token_provider = token_provider 

2320 if cache is None: 

2321 cache = {} 

2322 self.cache = cache 

2323 self._load_config = load_config 

2324 self._client_creator = client_creator 

2325 self._profile_name = profile_name 

2326 

2327 def _load_sso_config(self): 

2328 loaded_config = self._load_config() 

2329 profiles = loaded_config.get('profiles', {}) 

2330 profile_name = self._profile_name 

2331 profile_config = profiles.get(self._profile_name, {}) 

2332 sso_sessions = loaded_config.get('sso_sessions', {}) 

2333 

2334 # Role name & Account ID indicate the cred provider should be used 

2335 if all( 

2336 c not in profile_config for c in self._PROFILE_REQUIRED_CONFIG_VARS 

2337 ): 

2338 return None 

2339 

2340 resolved_config, extra_reqs = self._resolve_sso_session_reference( 

2341 profile_config, sso_sessions 

2342 ) 

2343 

2344 config = {} 

2345 missing_config_vars = [] 

2346 all_required_configs = self._ALL_REQUIRED_CONFIG_VARS + extra_reqs 

2347 for config_var in all_required_configs: 

2348 if config_var in resolved_config: 

2349 config[config_var] = resolved_config[config_var] 

2350 else: 

2351 missing_config_vars.append(config_var) 

2352 

2353 if missing_config_vars: 

2354 missing = ', '.join(missing_config_vars) 

2355 raise InvalidConfigError( 

2356 error_msg=( 

2357 f'The profile "{profile_name}" is configured to use SSO ' 

2358 f'but is missing required configuration: {missing}' 

2359 ) 

2360 ) 

2361 return config 

2362 

2363 def _resolve_sso_session_reference(self, profile_config, sso_sessions): 

2364 sso_session_name = profile_config.get('sso_session') 

2365 if sso_session_name is None: 

2366 # No reference to resolve, proceed with legacy flow 

2367 return profile_config, () 

2368 

2369 if sso_session_name not in sso_sessions: 

2370 error_msg = f'The specified sso-session does not exist: "{sso_session_name}"' 

2371 raise InvalidConfigError(error_msg=error_msg) 

2372 

2373 config = profile_config.copy() 

2374 session = sso_sessions[sso_session_name] 

2375 for config_var, val in session.items(): 

2376 # Validate any keys referenced in both profile and sso_session match 

2377 if config.get(config_var, val) != val: 

2378 error_msg = ( 

2379 f"The value for {config_var} is inconsistent between " 

2380 f"profile ({config[config_var]}) and sso-session ({val})." 

2381 ) 

2382 raise InvalidConfigError(error_msg=error_msg) 

2383 config[config_var] = val 

2384 return config, ('sso_session',) 

2385 

2386 def load(self): 

2387 sso_config = self._load_sso_config() 

2388 if not sso_config: 

2389 return None 

2390 

2391 fetcher_kwargs = { 

2392 'start_url': sso_config['sso_start_url'], 

2393 'sso_region': sso_config['sso_region'], 

2394 'role_name': sso_config['sso_role_name'], 

2395 'account_id': sso_config['sso_account_id'], 

2396 'client_creator': self._client_creator, 

2397 'token_loader': SSOTokenLoader(cache=self._token_cache), 

2398 'cache': self.cache, 

2399 } 

2400 if 'sso_session' in sso_config: 

2401 fetcher_kwargs['sso_session_name'] = sso_config['sso_session'] 

2402 fetcher_kwargs['token_provider'] = self._token_provider 

2403 

2404 sso_fetcher = SSOCredentialFetcher(**fetcher_kwargs) 

2405 

2406 return DeferredRefreshableCredentials( 

2407 method=self.METHOD, 

2408 refresh_using=sso_fetcher.fetch_credentials, 

2409 )