Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/credentials.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1028 statements  

1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ 

2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"). You 

5# may not use this file except in compliance with the License. A copy of 

6# the License is located at 

7# 

8# http://aws.amazon.com/apache2.0/ 

9# 

10# or in the "license" file accompanying this file. This file is 

11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

12# ANY KIND, either express or implied. See the License for the specific 

13# language governing permissions and limitations under the License. 

14import datetime 

15import getpass 

16import json 

17import logging 

18import os 

19import subprocess 

20import threading 

21import time 

22from collections import namedtuple 

23from copy import deepcopy 

24from hashlib import sha1 

25 

26from dateutil.parser import parse 

27from dateutil.tz import tzlocal, tzutc 

28 

29import botocore.compat 

30import botocore.configloader 

31from botocore import UNSIGNED 

32from botocore.compat import compat_shell_split, total_seconds 

33from botocore.config import Config 

34from botocore.exceptions import ( 

35 ConfigNotFound, 

36 CredentialRetrievalError, 

37 InfiniteLoopConfigError, 

38 InvalidConfigError, 

39 MetadataRetrievalError, 

40 PartialCredentialsError, 

41 RefreshWithMFAUnsupportedError, 

42 UnauthorizedSSOTokenError, 

43 UnknownCredentialError, 

44) 

45from botocore.tokens import SSOTokenProvider 

46from botocore.utils import ( 

47 ArnParser, 

48 ContainerMetadataFetcher, 

49 FileWebIdentityTokenLoader, 

50 InstanceMetadataFetcher, 

51 JSONFileCache, 

52 SSOTokenLoader, 

53 parse_key_val_file, 

54 resolve_imds_endpoint_mode, 

55) 

56 

57logger = logging.getLogger(__name__) 

58ReadOnlyCredentials = namedtuple( 

59 'ReadOnlyCredentials', 

60 ['access_key', 'secret_key', 'token', 'account_id'], 

61 defaults=(None,), 

62) 

63 

64_DEFAULT_MANDATORY_REFRESH_TIMEOUT = 10 * 60 # 10 min 

65_DEFAULT_ADVISORY_REFRESH_TIMEOUT = 15 * 60 # 15 min 

66 

67 

68def create_credential_resolver(session, cache=None, region_name=None): 

69 """Create a default credential resolver. 

70 

71 This creates a pre-configured credential resolver 

72 that includes the default lookup chain for 

73 credentials. 

74 

75 """ 

76 profile_name = session.get_config_variable('profile') or 'default' 

77 metadata_timeout = session.get_config_variable('metadata_service_timeout') 

78 num_attempts = session.get_config_variable('metadata_service_num_attempts') 

79 disable_env_vars = session.instance_variables().get('profile') is not None 

80 

81 imds_config = { 

82 'ec2_metadata_service_endpoint': session.get_config_variable( 

83 'ec2_metadata_service_endpoint' 

84 ), 

85 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode( 

86 session 

87 ), 

88 'ec2_credential_refresh_window': _DEFAULT_ADVISORY_REFRESH_TIMEOUT, 

89 'ec2_metadata_v1_disabled': session.get_config_variable( 

90 'ec2_metadata_v1_disabled' 

91 ), 

92 } 

93 

94 if cache is None: 

95 cache = {} 

96 

97 env_provider = EnvProvider() 

98 container_provider = ContainerProvider() 

99 instance_metadata_provider = InstanceMetadataProvider( 

100 iam_role_fetcher=InstanceMetadataFetcher( 

101 timeout=metadata_timeout, 

102 num_attempts=num_attempts, 

103 user_agent=session.user_agent(), 

104 config=imds_config, 

105 ) 

106 ) 

107 

108 profile_provider_builder = ProfileProviderBuilder( 

109 session, cache=cache, region_name=region_name 

110 ) 

111 assume_role_provider = AssumeRoleProvider( 

112 load_config=lambda: session.full_config, 

113 client_creator=_get_client_creator(session, region_name), 

114 cache=cache, 

115 profile_name=profile_name, 

116 credential_sourcer=CanonicalNameCredentialSourcer( 

117 [env_provider, container_provider, instance_metadata_provider] 

118 ), 

119 profile_provider_builder=profile_provider_builder, 

120 ) 

121 

122 pre_profile = [ 

123 env_provider, 

124 assume_role_provider, 

125 ] 

126 profile_providers = profile_provider_builder.providers( 

127 profile_name=profile_name, 

128 disable_env_vars=disable_env_vars, 

129 ) 

130 post_profile = [ 

131 OriginalEC2Provider(), 

132 BotoProvider(), 

133 container_provider, 

134 instance_metadata_provider, 

135 ] 

136 providers = pre_profile + profile_providers + post_profile 

137 

138 if disable_env_vars: 

139 # An explicitly provided profile will negate an EnvProvider. 

140 # We will defer to providers that understand the "profile" 

141 # concept to retrieve credentials. 

142 # The one edge case if is all three values are provided via 

143 # env vars: 

144 # export AWS_ACCESS_KEY_ID=foo 

145 # export AWS_SECRET_ACCESS_KEY=bar 

146 # export AWS_PROFILE=baz 

147 # Then, just like our client() calls, the explicit credentials 

148 # will take precedence. 

149 # 

150 # This precedence is enforced by leaving the EnvProvider in the chain. 

151 # This means that the only way a "profile" would win is if the 

152 # EnvProvider does not return credentials, which is what we want 

153 # in this scenario. 

154 providers.remove(env_provider) 

155 logger.debug( 

156 'Skipping environment variable credential check' 

157 ' because profile name was explicitly set.' 

158 ) 

159 

160 resolver = CredentialResolver(providers=providers) 

161 return resolver 

162 

163 

164class ProfileProviderBuilder: 

165 """This class handles the creation of profile based providers. 

166 

167 NOTE: This class is only intended for internal use. 

168 

169 This class handles the creation and ordering of the various credential 

170 providers that primarly source their configuration from the shared config. 

171 This is needed to enable sharing between the default credential chain and 

172 the source profile chain created by the assume role provider. 

173 """ 

174 

175 def __init__( 

176 self, session, cache=None, region_name=None, sso_token_cache=None 

177 ): 

178 self._session = session 

179 self._cache = cache 

180 self._region_name = region_name 

181 self._sso_token_cache = sso_token_cache 

182 

183 def providers(self, profile_name, disable_env_vars=False): 

184 return [ 

185 self._create_web_identity_provider( 

186 profile_name, 

187 disable_env_vars, 

188 ), 

189 self._create_sso_provider(profile_name), 

190 self._create_shared_credential_provider(profile_name), 

191 self._create_process_provider(profile_name), 

192 self._create_config_provider(profile_name), 

193 ] 

194 

195 def _create_process_provider(self, profile_name): 

196 return ProcessProvider( 

197 profile_name=profile_name, 

198 load_config=lambda: self._session.full_config, 

199 ) 

200 

201 def _create_shared_credential_provider(self, profile_name): 

202 credential_file = self._session.get_config_variable('credentials_file') 

203 return SharedCredentialProvider( 

204 profile_name=profile_name, 

205 creds_filename=credential_file, 

206 ) 

207 

208 def _create_config_provider(self, profile_name): 

209 config_file = self._session.get_config_variable('config_file') 

210 return ConfigProvider( 

211 profile_name=profile_name, 

212 config_filename=config_file, 

213 ) 

214 

215 def _create_web_identity_provider(self, profile_name, disable_env_vars): 

216 return AssumeRoleWithWebIdentityProvider( 

217 load_config=lambda: self._session.full_config, 

218 client_creator=_get_client_creator( 

219 self._session, self._region_name 

220 ), 

221 cache=self._cache, 

222 profile_name=profile_name, 

223 disable_env_vars=disable_env_vars, 

224 ) 

225 

226 def _create_sso_provider(self, profile_name): 

227 return SSOProvider( 

228 load_config=lambda: self._session.full_config, 

229 client_creator=self._session.create_client, 

230 profile_name=profile_name, 

231 cache=self._cache, 

232 token_cache=self._sso_token_cache, 

233 token_provider=SSOTokenProvider( 

234 self._session, 

235 cache=self._sso_token_cache, 

236 profile_name=profile_name, 

237 ), 

238 ) 

239 

240 

241def get_credentials(session): 

242 resolver = create_credential_resolver(session) 

243 return resolver.load_credentials() 

244 

245 

246def _local_now(): 

247 return datetime.datetime.now(tzlocal()) 

248 

249 

250def _parse_if_needed(value): 

251 if isinstance(value, datetime.datetime): 

252 return value 

253 return parse(value) 

254 

255 

256def _serialize_if_needed(value, iso=False): 

257 if isinstance(value, datetime.datetime): 

258 if iso: 

259 return value.isoformat() 

260 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

261 return value 

262 

263 

264def _get_client_creator(session, region_name): 

265 def client_creator(service_name, **kwargs): 

266 create_client_kwargs = {'region_name': region_name} 

267 create_client_kwargs.update(**kwargs) 

268 return session.create_client(service_name, **create_client_kwargs) 

269 

270 return client_creator 

271 

272 

273def create_assume_role_refresher(client, params): 

274 def refresh(): 

275 response = client.assume_role(**params) 

276 credentials = response['Credentials'] 

277 # We need to normalize the credential names to 

278 # the values expected by the refresh creds. 

279 return { 

280 'access_key': credentials['AccessKeyId'], 

281 'secret_key': credentials['SecretAccessKey'], 

282 'token': credentials['SessionToken'], 

283 'expiry_time': _serialize_if_needed(credentials['Expiration']), 

284 } 

285 

286 return refresh 

287 

288 

289def create_mfa_serial_refresher(actual_refresh): 

290 class _Refresher: 

291 def __init__(self, refresh): 

292 self._refresh = refresh 

293 self._has_been_called = False 

294 

295 def __call__(self): 

296 if self._has_been_called: 

297 # We can explore an option in the future to support 

298 # reprompting for MFA, but for now we just error out 

299 # when the temp creds expire. 

300 raise RefreshWithMFAUnsupportedError() 

301 self._has_been_called = True 

302 return self._refresh() 

303 

304 return _Refresher(actual_refresh) 

305 

306 

307class Credentials: 

308 """ 

309 Holds the credentials needed to authenticate requests. 

310 

311 :param str access_key: The access key part of the credentials. 

312 :param str secret_key: The secret key part of the credentials. 

313 :param str token: The security token, valid only for session credentials. 

314 :param str method: A string which identifies where the credentials 

315 were found. 

316 :param str account_id: (optional) An account ID associated with the credentials. 

317 """ 

318 

319 def __init__( 

320 self, access_key, secret_key, token=None, method=None, account_id=None 

321 ): 

322 self.access_key = access_key 

323 self.secret_key = secret_key 

324 self.token = token 

325 

326 if method is None: 

327 method = 'explicit' 

328 self.method = method 

329 self.account_id = account_id 

330 

331 self._normalize() 

332 

333 def _normalize(self): 

334 # Keys would sometimes (accidentally) contain non-ascii characters. 

335 # It would cause a confusing UnicodeDecodeError in Python 2. 

336 # We explicitly convert them into unicode to avoid such error. 

337 # 

338 # Eventually the service will decide whether to accept the credential. 

339 # This also complies with the behavior in Python 3. 

340 self.access_key = botocore.compat.ensure_unicode(self.access_key) 

341 self.secret_key = botocore.compat.ensure_unicode(self.secret_key) 

342 

343 def get_frozen_credentials(self): 

344 return ReadOnlyCredentials( 

345 self.access_key, self.secret_key, self.token, self.account_id 

346 ) 

347 

348 def get_deferred_property(self, property_name): 

349 def get_property(): 

350 return getattr(self, property_name, None) 

351 

352 return get_property 

353 

354 

355class RefreshableCredentials(Credentials): 

356 """ 

357 Holds the credentials needed to authenticate requests. In addition, it 

358 knows how to refresh itself. 

359 

360 :param str access_key: The access key part of the credentials. 

361 :param str secret_key: The secret key part of the credentials. 

362 :param str token: The security token, valid only for session credentials. 

363 :param datetime expiry_time: The expiration time of the credentials. 

364 :param function refresh_using: Callback function to refresh the credentials. 

365 :param str method: A string which identifies where the credentials 

366 were found. 

367 :param function time_fetcher: Callback function to retrieve current time. 

368 """ 

369 

370 # The time at which we'll attempt to refresh, but not 

371 # block if someone else is refreshing. 

372 _advisory_refresh_timeout = _DEFAULT_ADVISORY_REFRESH_TIMEOUT 

373 # The time at which all threads will block waiting for 

374 # refreshed credentials. 

375 _mandatory_refresh_timeout = _DEFAULT_MANDATORY_REFRESH_TIMEOUT 

376 

377 def __init__( 

378 self, 

379 access_key, 

380 secret_key, 

381 token, 

382 expiry_time, 

383 refresh_using, 

384 method, 

385 time_fetcher=_local_now, 

386 advisory_timeout=None, 

387 mandatory_timeout=None, 

388 account_id=None, 

389 ): 

390 self._refresh_using = refresh_using 

391 self._access_key = access_key 

392 self._secret_key = secret_key 

393 self._token = token 

394 self._account_id = account_id 

395 self._expiry_time = expiry_time 

396 self._time_fetcher = time_fetcher 

397 self._refresh_lock = threading.Lock() 

398 self.method = method 

399 self._frozen_credentials = ReadOnlyCredentials( 

400 access_key, secret_key, token, account_id 

401 ) 

402 self._normalize() 

403 if advisory_timeout is not None: 

404 self._advisory_refresh_timeout = advisory_timeout 

405 if mandatory_timeout is not None: 

406 self._mandatory_refresh_timeout = mandatory_timeout 

407 

408 def _normalize(self): 

409 self._access_key = botocore.compat.ensure_unicode(self._access_key) 

410 self._secret_key = botocore.compat.ensure_unicode(self._secret_key) 

411 

412 @classmethod 

413 def create_from_metadata( 

414 cls, 

415 metadata, 

416 refresh_using, 

417 method, 

418 advisory_timeout=None, 

419 mandatory_timeout=None, 

420 ): 

421 kwargs = {} 

422 if advisory_timeout is not None: 

423 kwargs['advisory_timeout'] = advisory_timeout 

424 if mandatory_timeout is not None: 

425 kwargs['mandatory_timeout'] = mandatory_timeout 

426 

427 instance = cls( 

428 access_key=metadata['access_key'], 

429 secret_key=metadata['secret_key'], 

430 token=metadata['token'], 

431 expiry_time=cls._expiry_datetime(metadata['expiry_time']), 

432 method=method, 

433 refresh_using=refresh_using, 

434 account_id=metadata.get('account_id'), 

435 **kwargs, 

436 ) 

437 return instance 

438 

439 @property 

440 def access_key(self): 

441 """Warning: Using this property can lead to race conditions if you 

442 access another property subsequently along the refresh boundary. 

443 Please use get_frozen_credentials instead. 

444 """ 

445 self._refresh() 

446 return self._access_key 

447 

448 @access_key.setter 

449 def access_key(self, value): 

450 self._access_key = value 

451 

452 @property 

453 def secret_key(self): 

454 """Warning: Using this property can lead to race conditions if you 

455 access another property subsequently along the refresh boundary. 

456 Please use get_frozen_credentials instead. 

457 """ 

458 self._refresh() 

459 return self._secret_key 

460 

461 @secret_key.setter 

462 def secret_key(self, value): 

463 self._secret_key = value 

464 

465 @property 

466 def token(self): 

467 """Warning: Using this property can lead to race conditions if you 

468 access another property subsequently along the refresh boundary. 

469 Please use get_frozen_credentials instead. 

470 """ 

471 self._refresh() 

472 return self._token 

473 

474 @token.setter 

475 def token(self, value): 

476 self._token = value 

477 

478 @property 

479 def account_id(self): 

480 """Warning: Using this property can lead to race conditions if you 

481 access another property subsequently along the refresh boundary. 

482 Please use get_frozen_credentials instead. 

483 """ 

484 self._refresh() 

485 return self._account_id 

486 

487 @account_id.setter 

488 def account_id(self, value): 

489 self._account_id = value 

490 

491 def _seconds_remaining(self): 

492 delta = self._expiry_time - self._time_fetcher() 

493 return total_seconds(delta) 

494 

495 def refresh_needed(self, refresh_in=None): 

496 """Check if a refresh is needed. 

497 

498 A refresh is needed if the expiry time associated 

499 with the temporary credentials is less than the 

500 provided ``refresh_in``. If ``time_delta`` is not 

501 provided, ``self.advisory_refresh_needed`` will be used. 

502 

503 For example, if your temporary credentials expire 

504 in 10 minutes and the provided ``refresh_in`` is 

505 ``15 * 60``, then this function will return ``True``. 

506 

507 :type refresh_in: int 

508 :param refresh_in: The number of seconds before the 

509 credentials expire in which refresh attempts should 

510 be made. 

511 

512 :return: True if refresh needed, False otherwise. 

513 

514 """ 

515 if self._expiry_time is None: 

516 # No expiration, so assume we don't need to refresh. 

517 return False 

518 

519 if refresh_in is None: 

520 refresh_in = self._advisory_refresh_timeout 

521 # The credentials should be refreshed if they're going to expire 

522 # in less than 5 minutes. 

523 if self._seconds_remaining() >= refresh_in: 

524 # There's enough time left. Don't refresh. 

525 return False 

526 logger.debug("Credentials need to be refreshed.") 

527 return True 

528 

529 def _is_expired(self): 

530 # Checks if the current credentials are expired. 

531 return self.refresh_needed(refresh_in=0) 

532 

533 def _refresh(self): 

534 # In the common case where we don't need a refresh, we 

535 # can immediately exit and not require acquiring the 

536 # refresh lock. 

537 if not self.refresh_needed(self._advisory_refresh_timeout): 

538 return 

539 

540 # acquire() doesn't accept kwargs, but False is indicating 

541 # that we should not block if we can't acquire the lock. 

542 # If we aren't able to acquire the lock, we'll trigger 

543 # the else clause. 

544 if self._refresh_lock.acquire(False): 

545 try: 

546 if not self.refresh_needed(self._advisory_refresh_timeout): 

547 return 

548 is_mandatory_refresh = self.refresh_needed( 

549 self._mandatory_refresh_timeout 

550 ) 

551 self._protected_refresh(is_mandatory=is_mandatory_refresh) 

552 return 

553 finally: 

554 self._refresh_lock.release() 

555 elif self.refresh_needed(self._mandatory_refresh_timeout): 

556 # If we're within the mandatory refresh window, 

557 # we must block until we get refreshed credentials. 

558 with self._refresh_lock: 

559 if not self.refresh_needed(self._mandatory_refresh_timeout): 

560 return 

561 self._protected_refresh(is_mandatory=True) 

562 

563 def _protected_refresh(self, is_mandatory): 

564 # precondition: this method should only be called if you've acquired 

565 # the self._refresh_lock. 

566 try: 

567 metadata = self._refresh_using() 

568 except Exception: 

569 period_name = 'mandatory' if is_mandatory else 'advisory' 

570 logger.warning( 

571 "Refreshing temporary credentials failed " 

572 "during %s refresh period.", 

573 period_name, 

574 exc_info=True, 

575 ) 

576 if is_mandatory: 

577 # If this is a mandatory refresh, then 

578 # all errors that occur when we attempt to refresh 

579 # credentials are propagated back to the user. 

580 raise 

581 # Otherwise we'll just return. 

582 # The end result will be that we'll use the current 

583 # set of temporary credentials we have. 

584 return 

585 self._set_from_data(metadata) 

586 self._frozen_credentials = ReadOnlyCredentials( 

587 self._access_key, self._secret_key, self._token, self._account_id 

588 ) 

589 if self._is_expired(): 

590 # We successfully refreshed credentials but for whatever 

591 # reason, our refreshing function returned credentials 

592 # that are still expired. In this scenario, the only 

593 # thing we can do is let the user know and raise 

594 # an exception. 

595 msg = ( 

596 "Credentials were refreshed, but the " 

597 "refreshed credentials are still expired." 

598 ) 

599 logger.warning(msg) 

600 raise RuntimeError(msg) 

601 

602 @staticmethod 

603 def _expiry_datetime(time_str): 

604 return parse(time_str) 

605 

606 def _set_from_data(self, data): 

607 expected_keys = ['access_key', 'secret_key', 'token', 'expiry_time'] 

608 if not data: 

609 missing_keys = expected_keys 

610 else: 

611 missing_keys = [k for k in expected_keys if k not in data] 

612 

613 if missing_keys: 

614 message = "Credential refresh failed, response did not contain: %s" 

615 raise CredentialRetrievalError( 

616 provider=self.method, 

617 error_msg=message % ', '.join(missing_keys), 

618 ) 

619 

620 self.access_key = data['access_key'] 

621 self.secret_key = data['secret_key'] 

622 self.token = data['token'] 

623 self._expiry_time = parse(data['expiry_time']) 

624 self.account_id = data.get('account_id') 

625 logger.debug( 

626 "Retrieved credentials will expire at: %s", self._expiry_time 

627 ) 

628 self._normalize() 

629 

630 def get_frozen_credentials(self): 

631 """Return immutable credentials. 

632 

633 The ``access_key``, ``secret_key``, and ``token`` properties 

634 on this class will always check and refresh credentials if 

635 needed before returning the particular credentials. 

636 

637 This has an edge case where you can get inconsistent 

638 credentials. Imagine this: 

639 

640 # Current creds are "t1" 

641 tmp.access_key ---> expired? no, so return t1.access_key 

642 # ---- time is now expired, creds need refreshing to "t2" ---- 

643 tmp.secret_key ---> expired? yes, refresh and return t2.secret_key 

644 

645 This means we're using the access key from t1 with the secret key 

646 from t2. To fix this issue, you can request a frozen credential object 

647 which is guaranteed not to change. 

648 

649 The frozen credentials returned from this method should be used 

650 immediately and then discarded. The typical usage pattern would 

651 be:: 

652 

653 creds = RefreshableCredentials(...) 

654 some_code = SomeSignerObject() 

655 # I'm about to sign the request. 

656 # The frozen credentials are only used for the 

657 # duration of generate_presigned_url and will be 

658 # immediately thrown away. 

659 request = some_code.sign_some_request( 

660 with_credentials=creds.get_frozen_credentials()) 

661 print("Signed request:", request) 

662 

663 """ 

664 self._refresh() 

665 return self._frozen_credentials 

666 

667 

668class DeferredRefreshableCredentials(RefreshableCredentials): 

669 """Refreshable credentials that don't require initial credentials. 

670 

671 refresh_using will be called upon first access. 

672 """ 

673 

674 def __init__(self, refresh_using, method, time_fetcher=_local_now): 

675 self._refresh_using = refresh_using 

676 self._access_key = None 

677 self._secret_key = None 

678 self._token = None 

679 self._account_id = None 

680 self._expiry_time = None 

681 self._time_fetcher = time_fetcher 

682 self._refresh_lock = threading.Lock() 

683 self.method = method 

684 self._frozen_credentials = None 

685 

686 def refresh_needed(self, refresh_in=None): 

687 if self._frozen_credentials is None: 

688 return True 

689 return super().refresh_needed(refresh_in) 

690 

691 

692class CachedCredentialFetcher: 

693 DEFAULT_EXPIRY_WINDOW_SECONDS = 60 * 15 

694 

695 def __init__(self, cache=None, expiry_window_seconds=None): 

696 if cache is None: 

697 cache = {} 

698 self._cache = cache 

699 self._cache_key = self._create_cache_key() 

700 if expiry_window_seconds is None: 

701 expiry_window_seconds = self.DEFAULT_EXPIRY_WINDOW_SECONDS 

702 self._expiry_window_seconds = expiry_window_seconds 

703 

704 def _create_cache_key(self): 

705 raise NotImplementedError('_create_cache_key()') 

706 

707 def _make_file_safe(self, filename): 

708 # Replace :, path sep, and / to make it the string filename safe. 

709 filename = filename.replace(':', '_').replace(os.sep, '_') 

710 return filename.replace('/', '_') 

711 

712 def _get_credentials(self): 

713 raise NotImplementedError('_get_credentials()') 

714 

715 def fetch_credentials(self): 

716 return self._get_cached_credentials() 

717 

718 def _get_cached_credentials(self): 

719 """Get up-to-date credentials. 

720 

721 This will check the cache for up-to-date credentials, calling assume 

722 role if none are available. 

723 """ 

724 response = self._load_from_cache() 

725 if response is None: 

726 response = self._get_credentials() 

727 self._write_to_cache(response) 

728 else: 

729 logger.debug("Credentials for role retrieved from cache.") 

730 

731 creds = response['Credentials'] 

732 expiration = _serialize_if_needed(creds['Expiration'], iso=True) 

733 credentials = { 

734 'access_key': creds['AccessKeyId'], 

735 'secret_key': creds['SecretAccessKey'], 

736 'token': creds['SessionToken'], 

737 'expiry_time': expiration, 

738 'account_id': creds.get('AccountId'), 

739 } 

740 

741 return credentials 

742 

743 def _load_from_cache(self): 

744 if self._cache_key in self._cache: 

745 creds = deepcopy(self._cache[self._cache_key]) 

746 if not self._is_expired(creds): 

747 return creds 

748 else: 

749 logger.debug( 

750 "Credentials were found in cache, but they are expired." 

751 ) 

752 return None 

753 

754 def _write_to_cache(self, response): 

755 self._cache[self._cache_key] = deepcopy(response) 

756 

757 def _is_expired(self, credentials): 

758 """Check if credentials are expired.""" 

759 end_time = _parse_if_needed(credentials['Credentials']['Expiration']) 

760 seconds = total_seconds(end_time - _local_now()) 

761 return seconds < self._expiry_window_seconds 

762 

763 

764class BaseAssumeRoleCredentialFetcher(CachedCredentialFetcher): 

765 def __init__( 

766 self, 

767 client_creator, 

768 role_arn, 

769 extra_args=None, 

770 cache=None, 

771 expiry_window_seconds=None, 

772 ): 

773 self._client_creator = client_creator 

774 self._role_arn = role_arn 

775 

776 if extra_args is None: 

777 self._assume_kwargs = {} 

778 else: 

779 self._assume_kwargs = deepcopy(extra_args) 

780 self._assume_kwargs['RoleArn'] = self._role_arn 

781 

782 self._role_session_name = self._assume_kwargs.get('RoleSessionName') 

783 self._using_default_session_name = False 

784 if not self._role_session_name: 

785 self._generate_assume_role_name() 

786 

787 super().__init__(cache, expiry_window_seconds) 

788 

789 def _generate_assume_role_name(self): 

790 self._role_session_name = f'botocore-session-{int(time.time())}' 

791 self._assume_kwargs['RoleSessionName'] = self._role_session_name 

792 self._using_default_session_name = True 

793 

794 def _create_cache_key(self): 

795 """Create a predictable cache key for the current configuration. 

796 

797 The cache key is intended to be compatible with file names. 

798 """ 

799 args = deepcopy(self._assume_kwargs) 

800 

801 # The role session name gets randomly generated, so we don't want it 

802 # in the hash. 

803 if self._using_default_session_name: 

804 del args['RoleSessionName'] 

805 

806 if 'Policy' in args: 

807 # To have a predictable hash, the keys of the policy must be 

808 # sorted, so we have to load it here to make sure it gets sorted 

809 # later on. 

810 args['Policy'] = json.loads(args['Policy']) 

811 

812 args = json.dumps(args, sort_keys=True) 

813 argument_hash = sha1(args.encode('utf-8')).hexdigest() 

814 return self._make_file_safe(argument_hash) 

815 

816 def _add_account_id_to_response(self, response): 

817 role_arn = response.get('AssumedRoleUser', {}).get('Arn') 

818 if ArnParser.is_arn(role_arn): 

819 arn_parser = ArnParser() 

820 account_id = arn_parser.parse_arn(role_arn)['account'] 

821 response['Credentials']['AccountId'] = account_id 

822 else: 

823 logger.debug(f"Unable to extract account ID from Arn: {role_arn}") 

824 

825 

826class AssumeRoleCredentialFetcher(BaseAssumeRoleCredentialFetcher): 

827 def __init__( 

828 self, 

829 client_creator, 

830 source_credentials, 

831 role_arn, 

832 extra_args=None, 

833 mfa_prompter=None, 

834 cache=None, 

835 expiry_window_seconds=None, 

836 ): 

837 """ 

838 :type client_creator: callable 

839 :param client_creator: A callable that creates a client taking 

840 arguments like ``Session.create_client``. 

841 

842 :type source_credentials: Credentials 

843 :param source_credentials: The credentials to use to create the 

844 client for the call to AssumeRole. 

845 

846 :type role_arn: str 

847 :param role_arn: The ARN of the role to be assumed. 

848 

849 :type extra_args: dict 

850 :param extra_args: Any additional arguments to add to the assume 

851 role request using the format of the botocore operation. 

852 Possible keys include, but may not be limited to, 

853 DurationSeconds, Policy, SerialNumber, ExternalId and 

854 RoleSessionName. 

855 

856 :type mfa_prompter: callable 

857 :param mfa_prompter: A callable that returns input provided by the 

858 user (i.e raw_input, getpass.getpass, etc.). 

859 

860 :type cache: dict 

861 :param cache: An object that supports ``__getitem__``, 

862 ``__setitem__``, and ``__contains__``. An example of this is 

863 the ``JSONFileCache`` class in aws-cli. 

864 

865 :type expiry_window_seconds: int 

866 :param expiry_window_seconds: The amount of time, in seconds, 

867 """ 

868 self._source_credentials = source_credentials 

869 self._mfa_prompter = mfa_prompter 

870 if self._mfa_prompter is None: 

871 self._mfa_prompter = getpass.getpass 

872 

873 super().__init__( 

874 client_creator, 

875 role_arn, 

876 extra_args=extra_args, 

877 cache=cache, 

878 expiry_window_seconds=expiry_window_seconds, 

879 ) 

880 

881 def _get_credentials(self): 

882 """Get credentials by calling assume role.""" 

883 kwargs = self._assume_role_kwargs() 

884 client = self._create_client() 

885 response = client.assume_role(**kwargs) 

886 self._add_account_id_to_response(response) 

887 return response 

888 

889 def _assume_role_kwargs(self): 

890 """Get the arguments for assume role based on current configuration.""" 

891 assume_role_kwargs = deepcopy(self._assume_kwargs) 

892 

893 mfa_serial = assume_role_kwargs.get('SerialNumber') 

894 

895 if mfa_serial is not None: 

896 prompt = f'Enter MFA code for {mfa_serial}: ' 

897 token_code = self._mfa_prompter(prompt) 

898 assume_role_kwargs['TokenCode'] = token_code 

899 

900 duration_seconds = assume_role_kwargs.get('DurationSeconds') 

901 

902 if duration_seconds is not None: 

903 assume_role_kwargs['DurationSeconds'] = duration_seconds 

904 

905 return assume_role_kwargs 

906 

907 def _create_client(self): 

908 """Create an STS client using the source credentials.""" 

909 frozen_credentials = self._source_credentials.get_frozen_credentials() 

910 return self._client_creator( 

911 'sts', 

912 aws_access_key_id=frozen_credentials.access_key, 

913 aws_secret_access_key=frozen_credentials.secret_key, 

914 aws_session_token=frozen_credentials.token, 

915 ) 

916 

917 

918class AssumeRoleWithWebIdentityCredentialFetcher( 

919 BaseAssumeRoleCredentialFetcher 

920): 

921 def __init__( 

922 self, 

923 client_creator, 

924 web_identity_token_loader, 

925 role_arn, 

926 extra_args=None, 

927 cache=None, 

928 expiry_window_seconds=None, 

929 ): 

930 """ 

931 :type client_creator: callable 

932 :param client_creator: A callable that creates a client taking 

933 arguments like ``Session.create_client``. 

934 

935 :type web_identity_token_loader: callable 

936 :param web_identity_token_loader: A callable that takes no arguments 

937 and returns a web identity token str. 

938 

939 :type role_arn: str 

940 :param role_arn: The ARN of the role to be assumed. 

941 

942 :type extra_args: dict 

943 :param extra_args: Any additional arguments to add to the assume 

944 role request using the format of the botocore operation. 

945 Possible keys include, but may not be limited to, 

946 DurationSeconds, Policy, SerialNumber, ExternalId and 

947 RoleSessionName. 

948 

949 :type cache: dict 

950 :param cache: An object that supports ``__getitem__``, 

951 ``__setitem__``, and ``__contains__``. An example of this is 

952 the ``JSONFileCache`` class in aws-cli. 

953 

954 :type expiry_window_seconds: int 

955 :param expiry_window_seconds: The amount of time, in seconds, 

956 """ 

957 self._web_identity_token_loader = web_identity_token_loader 

958 

959 super().__init__( 

960 client_creator, 

961 role_arn, 

962 extra_args=extra_args, 

963 cache=cache, 

964 expiry_window_seconds=expiry_window_seconds, 

965 ) 

966 

967 def _get_credentials(self): 

968 """Get credentials by calling assume role.""" 

969 kwargs = self._assume_role_kwargs() 

970 # Assume role with web identity does not require credentials other than 

971 # the token, explicitly configure the client to not sign requests. 

972 config = Config(signature_version=UNSIGNED) 

973 client = self._client_creator('sts', config=config) 

974 response = client.assume_role_with_web_identity(**kwargs) 

975 self._add_account_id_to_response(response) 

976 return response 

977 

978 def _assume_role_kwargs(self): 

979 """Get the arguments for assume role based on current configuration.""" 

980 assume_role_kwargs = deepcopy(self._assume_kwargs) 

981 identity_token = self._web_identity_token_loader() 

982 assume_role_kwargs['WebIdentityToken'] = identity_token 

983 

984 return assume_role_kwargs 

985 

986 

987class CredentialProvider: 

988 # A short name to identify the provider within botocore. 

989 METHOD = None 

990 

991 # A name to identify the provider for use in cross-sdk features like 

992 # assume role's `credential_source` configuration option. These names 

993 # are to be treated in a case-insensitive way. NOTE: any providers not 

994 # implemented in botocore MUST prefix their canonical names with 

995 # 'custom' or we DO NOT guarantee that it will work with any features 

996 # that this provides. 

997 CANONICAL_NAME = None 

998 

999 def __init__(self, session=None): 

1000 self.session = session 

1001 

1002 def load(self): 

1003 """ 

1004 Loads the credentials from their source & sets them on the object. 

1005 

1006 Subclasses should implement this method (by reading from disk, the 

1007 environment, the network or wherever), returning ``True`` if they were 

1008 found & loaded. 

1009 

1010 If not found, this method should return ``False``, indictating that the 

1011 ``CredentialResolver`` should fall back to the next available method. 

1012 

1013 The default implementation does nothing, assuming the user has set the 

1014 ``access_key/secret_key/token`` themselves. 

1015 

1016 :returns: Whether credentials were found & set 

1017 :rtype: Credentials 

1018 """ 

1019 return True 

1020 

1021 def _extract_creds_from_mapping(self, mapping, *key_names): 

1022 found = [] 

1023 for key_name in key_names: 

1024 try: 

1025 found.append(mapping[key_name]) 

1026 except KeyError: 

1027 raise PartialCredentialsError( 

1028 provider=self.METHOD, cred_var=key_name 

1029 ) 

1030 return found 

1031 

1032 

1033class ProcessProvider(CredentialProvider): 

1034 METHOD = 'custom-process' 

1035 

1036 def __init__(self, profile_name, load_config, popen=subprocess.Popen): 

1037 self._profile_name = profile_name 

1038 self._load_config = load_config 

1039 self._loaded_config = None 

1040 self._popen = popen 

1041 

1042 def load(self): 

1043 credential_process = self._credential_process 

1044 if credential_process is None: 

1045 return 

1046 

1047 creds_dict = self._retrieve_credentials_using(credential_process) 

1048 if creds_dict.get('expiry_time') is not None: 

1049 return RefreshableCredentials.create_from_metadata( 

1050 creds_dict, 

1051 lambda: self._retrieve_credentials_using(credential_process), 

1052 self.METHOD, 

1053 ) 

1054 

1055 return Credentials( 

1056 access_key=creds_dict['access_key'], 

1057 secret_key=creds_dict['secret_key'], 

1058 token=creds_dict.get('token'), 

1059 method=self.METHOD, 

1060 account_id=creds_dict.get('account_id'), 

1061 ) 

1062 

1063 def _retrieve_credentials_using(self, credential_process): 

1064 # We're not using shell=True, so we need to pass the 

1065 # command and all arguments as a list. 

1066 process_list = compat_shell_split(credential_process) 

1067 p = self._popen( 

1068 process_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

1069 ) 

1070 stdout, stderr = p.communicate() 

1071 if p.returncode != 0: 

1072 raise CredentialRetrievalError( 

1073 provider=self.METHOD, error_msg=stderr.decode('utf-8') 

1074 ) 

1075 parsed = botocore.compat.json.loads(stdout.decode('utf-8')) 

1076 version = parsed.get('Version', '<Version key not provided>') 

1077 if version != 1: 

1078 raise CredentialRetrievalError( 

1079 provider=self.METHOD, 

1080 error_msg=( 

1081 f"Unsupported version '{version}' for credential process " 

1082 f"provider, supported versions: 1" 

1083 ), 

1084 ) 

1085 try: 

1086 return { 

1087 'access_key': parsed['AccessKeyId'], 

1088 'secret_key': parsed['SecretAccessKey'], 

1089 'token': parsed.get('SessionToken'), 

1090 'expiry_time': parsed.get('Expiration'), 

1091 'account_id': self._get_account_id(parsed), 

1092 } 

1093 except KeyError as e: 

1094 raise CredentialRetrievalError( 

1095 provider=self.METHOD, 

1096 error_msg=f"Missing required key in response: {e}", 

1097 ) 

1098 

1099 @property 

1100 def _credential_process(self): 

1101 return self.profile_config.get('credential_process') 

1102 

1103 @property 

1104 def profile_config(self): 

1105 if self._loaded_config is None: 

1106 self._loaded_config = self._load_config() 

1107 profile_config = self._loaded_config.get('profiles', {}).get( 

1108 self._profile_name, {} 

1109 ) 

1110 return profile_config 

1111 

1112 def _get_account_id(self, parsed): 

1113 account_id = parsed.get('AccountId') 

1114 return account_id or self.profile_config.get('aws_account_id') 

1115 

1116 

1117class InstanceMetadataProvider(CredentialProvider): 

1118 METHOD = 'iam-role' 

1119 CANONICAL_NAME = 'Ec2InstanceMetadata' 

1120 

1121 def __init__(self, iam_role_fetcher): 

1122 self._role_fetcher = iam_role_fetcher 

1123 

1124 def load(self): 

1125 fetcher = self._role_fetcher 

1126 # We do the first request, to see if we get useful data back. 

1127 # If not, we'll pass & move on to whatever's next in the credential 

1128 # chain. 

1129 metadata = fetcher.retrieve_iam_role_credentials() 

1130 if not metadata: 

1131 return None 

1132 logger.info( 

1133 'Found credentials from IAM Role: %s', metadata['role_name'] 

1134 ) 

1135 # We manually set the data here, since we already made the request & 

1136 # have it. When the expiry is hit, the credentials will auto-refresh 

1137 # themselves. 

1138 creds = RefreshableCredentials.create_from_metadata( 

1139 metadata, 

1140 method=self.METHOD, 

1141 refresh_using=fetcher.retrieve_iam_role_credentials, 

1142 ) 

1143 return creds 

1144 

1145 

1146class EnvProvider(CredentialProvider): 

1147 METHOD = 'env' 

1148 CANONICAL_NAME = 'Environment' 

1149 ACCESS_KEY = 'AWS_ACCESS_KEY_ID' 

1150 SECRET_KEY = 'AWS_SECRET_ACCESS_KEY' 

1151 # The token can come from either of these env var. 

1152 # AWS_SESSION_TOKEN is what other AWS SDKs have standardized on. 

1153 TOKENS = ['AWS_SECURITY_TOKEN', 'AWS_SESSION_TOKEN'] 

1154 EXPIRY_TIME = 'AWS_CREDENTIAL_EXPIRATION' 

1155 ACCOUNT_ID = 'AWS_ACCOUNT_ID' 

1156 

1157 def __init__(self, environ=None, mapping=None): 

1158 """ 

1159 

1160 :param environ: The environment variables (defaults to 

1161 ``os.environ`` if no value is provided). 

1162 :param mapping: An optional mapping of variable names to 

1163 environment variable names. Use this if you want to 

1164 change the mapping of access_key->AWS_ACCESS_KEY_ID, etc. 

1165 The dict can have up to 5 keys: 

1166 * ``access_key`` 

1167 * ``secret_key`` 

1168 * ``token`` 

1169 * ``expiry_time`` 

1170 * ``account_id`` 

1171 """ 

1172 if environ is None: 

1173 environ = os.environ 

1174 self.environ = environ 

1175 self._mapping = self._build_mapping(mapping) 

1176 

1177 def _build_mapping(self, mapping): 

1178 # Mapping of variable name to env var name. 

1179 var_mapping = {} 

1180 if mapping is None: 

1181 # Use the class var default. 

1182 var_mapping['access_key'] = self.ACCESS_KEY 

1183 var_mapping['secret_key'] = self.SECRET_KEY 

1184 var_mapping['token'] = self.TOKENS 

1185 var_mapping['expiry_time'] = self.EXPIRY_TIME 

1186 var_mapping['account_id'] = self.ACCOUNT_ID 

1187 else: 

1188 var_mapping['access_key'] = mapping.get( 

1189 'access_key', self.ACCESS_KEY 

1190 ) 

1191 var_mapping['secret_key'] = mapping.get( 

1192 'secret_key', self.SECRET_KEY 

1193 ) 

1194 var_mapping['token'] = mapping.get('token', self.TOKENS) 

1195 if not isinstance(var_mapping['token'], list): 

1196 var_mapping['token'] = [var_mapping['token']] 

1197 var_mapping['expiry_time'] = mapping.get( 

1198 'expiry_time', self.EXPIRY_TIME 

1199 ) 

1200 var_mapping['account_id'] = mapping.get( 

1201 'account_id', self.ACCOUNT_ID 

1202 ) 

1203 return var_mapping 

1204 

1205 def load(self): 

1206 """ 

1207 Search for credentials in explicit environment variables. 

1208 """ 

1209 

1210 access_key = self.environ.get(self._mapping['access_key'], '') 

1211 

1212 if access_key: 

1213 logger.info('Found credentials in environment variables.') 

1214 fetcher = self._create_credentials_fetcher() 

1215 credentials = fetcher(require_expiry=False) 

1216 

1217 expiry_time = credentials['expiry_time'] 

1218 if expiry_time is not None: 

1219 expiry_time = parse(expiry_time) 

1220 return RefreshableCredentials( 

1221 credentials['access_key'], 

1222 credentials['secret_key'], 

1223 credentials['token'], 

1224 expiry_time, 

1225 refresh_using=fetcher, 

1226 method=self.METHOD, 

1227 account_id=credentials['account_id'], 

1228 ) 

1229 

1230 return Credentials( 

1231 credentials['access_key'], 

1232 credentials['secret_key'], 

1233 credentials['token'], 

1234 method=self.METHOD, 

1235 account_id=credentials['account_id'], 

1236 ) 

1237 else: 

1238 return None 

1239 

1240 def _create_credentials_fetcher(self): 

1241 mapping = self._mapping 

1242 method = self.METHOD 

1243 environ = self.environ 

1244 

1245 def fetch_credentials(require_expiry=True): 

1246 credentials = {} 

1247 

1248 access_key = environ.get(mapping['access_key'], '') 

1249 if not access_key: 

1250 raise PartialCredentialsError( 

1251 provider=method, cred_var=mapping['access_key'] 

1252 ) 

1253 credentials['access_key'] = access_key 

1254 

1255 secret_key = environ.get(mapping['secret_key'], '') 

1256 if not secret_key: 

1257 raise PartialCredentialsError( 

1258 provider=method, cred_var=mapping['secret_key'] 

1259 ) 

1260 credentials['secret_key'] = secret_key 

1261 

1262 credentials['token'] = None 

1263 for token_env_var in mapping['token']: 

1264 token = environ.get(token_env_var, '') 

1265 if token: 

1266 credentials['token'] = token 

1267 break 

1268 

1269 credentials['expiry_time'] = None 

1270 expiry_time = environ.get(mapping['expiry_time'], '') 

1271 if expiry_time: 

1272 credentials['expiry_time'] = expiry_time 

1273 if require_expiry and not expiry_time: 

1274 raise PartialCredentialsError( 

1275 provider=method, cred_var=mapping['expiry_time'] 

1276 ) 

1277 

1278 credentials['account_id'] = None 

1279 account_id = environ.get(mapping['account_id'], '') 

1280 if account_id: 

1281 credentials['account_id'] = account_id 

1282 

1283 return credentials 

1284 

1285 return fetch_credentials 

1286 

1287 

1288class OriginalEC2Provider(CredentialProvider): 

1289 METHOD = 'ec2-credentials-file' 

1290 CANONICAL_NAME = 'Ec2Config' 

1291 

1292 CRED_FILE_ENV = 'AWS_CREDENTIAL_FILE' 

1293 ACCESS_KEY = 'AWSAccessKeyId' 

1294 SECRET_KEY = 'AWSSecretKey' 

1295 

1296 def __init__(self, environ=None, parser=None): 

1297 if environ is None: 

1298 environ = os.environ 

1299 if parser is None: 

1300 parser = parse_key_val_file 

1301 self._environ = environ 

1302 self._parser = parser 

1303 

1304 def load(self): 

1305 """ 

1306 Search for a credential file used by original EC2 CLI tools. 

1307 """ 

1308 if 'AWS_CREDENTIAL_FILE' in self._environ: 

1309 full_path = os.path.expanduser( 

1310 self._environ['AWS_CREDENTIAL_FILE'] 

1311 ) 

1312 creds = self._parser(full_path) 

1313 if self.ACCESS_KEY in creds: 

1314 logger.info('Found credentials in AWS_CREDENTIAL_FILE.') 

1315 access_key = creds[self.ACCESS_KEY] 

1316 secret_key = creds[self.SECRET_KEY] 

1317 # EC2 creds file doesn't support session tokens. 

1318 return Credentials(access_key, secret_key, method=self.METHOD) 

1319 else: 

1320 return None 

1321 

1322 

1323class SharedCredentialProvider(CredentialProvider): 

1324 METHOD = 'shared-credentials-file' 

1325 CANONICAL_NAME = 'SharedCredentials' 

1326 

1327 ACCESS_KEY = 'aws_access_key_id' 

1328 SECRET_KEY = 'aws_secret_access_key' 

1329 # Same deal as the EnvProvider above. Botocore originally supported 

1330 # aws_security_token, but the SDKs are standardizing on aws_session_token 

1331 # so we support both. 

1332 TOKENS = ['aws_security_token', 'aws_session_token'] 

1333 ACCOUNT_ID = 'aws_account_id' 

1334 

1335 def __init__(self, creds_filename, profile_name=None, ini_parser=None): 

1336 self._creds_filename = creds_filename 

1337 if profile_name is None: 

1338 profile_name = 'default' 

1339 self._profile_name = profile_name 

1340 if ini_parser is None: 

1341 ini_parser = botocore.configloader.raw_config_parse 

1342 self._ini_parser = ini_parser 

1343 

1344 def load(self): 

1345 try: 

1346 available_creds = self._ini_parser(self._creds_filename) 

1347 except ConfigNotFound: 

1348 return None 

1349 if self._profile_name in available_creds: 

1350 config = available_creds[self._profile_name] 

1351 if self.ACCESS_KEY in config: 

1352 logger.info( 

1353 "Found credentials in shared credentials file: %s", 

1354 self._creds_filename, 

1355 ) 

1356 access_key, secret_key = self._extract_creds_from_mapping( 

1357 config, self.ACCESS_KEY, self.SECRET_KEY 

1358 ) 

1359 token = self._get_session_token(config) 

1360 account_id = self._get_account_id(config) 

1361 return Credentials( 

1362 access_key, 

1363 secret_key, 

1364 token, 

1365 method=self.METHOD, 

1366 account_id=account_id, 

1367 ) 

1368 

1369 def _get_session_token(self, config): 

1370 for token_envvar in self.TOKENS: 

1371 if token_envvar in config: 

1372 return config[token_envvar] 

1373 

1374 def _get_account_id(self, config): 

1375 return config.get(self.ACCOUNT_ID) 

1376 

1377 

1378class ConfigProvider(CredentialProvider): 

1379 """INI based config provider with profile sections.""" 

1380 

1381 METHOD = 'config-file' 

1382 CANONICAL_NAME = 'SharedConfig' 

1383 

1384 ACCESS_KEY = 'aws_access_key_id' 

1385 SECRET_KEY = 'aws_secret_access_key' 

1386 # Same deal as the EnvProvider above. Botocore originally supported 

1387 # aws_security_token, but the SDKs are standardizing on aws_session_token 

1388 # so we support both. 

1389 TOKENS = ['aws_security_token', 'aws_session_token'] 

1390 ACCOUNT_ID = 'aws_account_id' 

1391 

1392 def __init__(self, config_filename, profile_name, config_parser=None): 

1393 """ 

1394 

1395 :param config_filename: The session configuration scoped to the current 

1396 profile. This is available via ``session.config``. 

1397 :param profile_name: The name of the current profile. 

1398 :param config_parser: A config parser callable. 

1399 

1400 """ 

1401 self._config_filename = config_filename 

1402 self._profile_name = profile_name 

1403 if config_parser is None: 

1404 config_parser = botocore.configloader.load_config 

1405 self._config_parser = config_parser 

1406 

1407 def load(self): 

1408 """ 

1409 If there is are credentials in the configuration associated with 

1410 the session, use those. 

1411 """ 

1412 try: 

1413 full_config = self._config_parser(self._config_filename) 

1414 except ConfigNotFound: 

1415 return None 

1416 if self._profile_name in full_config['profiles']: 

1417 profile_config = full_config['profiles'][self._profile_name] 

1418 if self.ACCESS_KEY in profile_config: 

1419 logger.info( 

1420 "Credentials found in config file: %s", 

1421 self._config_filename, 

1422 ) 

1423 access_key, secret_key = self._extract_creds_from_mapping( 

1424 profile_config, self.ACCESS_KEY, self.SECRET_KEY 

1425 ) 

1426 token = self._get_session_token(profile_config) 

1427 account_id = self._get_account_id(profile_config) 

1428 return Credentials( 

1429 access_key, 

1430 secret_key, 

1431 token, 

1432 method=self.METHOD, 

1433 account_id=account_id, 

1434 ) 

1435 else: 

1436 return None 

1437 

1438 def _get_session_token(self, profile_config): 

1439 for token_name in self.TOKENS: 

1440 if token_name in profile_config: 

1441 return profile_config[token_name] 

1442 

1443 def _get_account_id(self, config): 

1444 return config.get(self.ACCOUNT_ID) 

1445 

1446 

1447class BotoProvider(CredentialProvider): 

1448 METHOD = 'boto-config' 

1449 CANONICAL_NAME = 'Boto2Config' 

1450 

1451 BOTO_CONFIG_ENV = 'BOTO_CONFIG' 

1452 DEFAULT_CONFIG_FILENAMES = ['/etc/boto.cfg', '~/.boto'] 

1453 ACCESS_KEY = 'aws_access_key_id' 

1454 SECRET_KEY = 'aws_secret_access_key' 

1455 

1456 def __init__(self, environ=None, ini_parser=None): 

1457 if environ is None: 

1458 environ = os.environ 

1459 if ini_parser is None: 

1460 ini_parser = botocore.configloader.raw_config_parse 

1461 self._environ = environ 

1462 self._ini_parser = ini_parser 

1463 

1464 def load(self): 

1465 """ 

1466 Look for credentials in boto config file. 

1467 """ 

1468 if self.BOTO_CONFIG_ENV in self._environ: 

1469 potential_locations = [self._environ[self.BOTO_CONFIG_ENV]] 

1470 else: 

1471 potential_locations = self.DEFAULT_CONFIG_FILENAMES 

1472 for filename in potential_locations: 

1473 try: 

1474 config = self._ini_parser(filename) 

1475 except ConfigNotFound: 

1476 # Move on to the next potential config file name. 

1477 continue 

1478 if 'Credentials' in config: 

1479 credentials = config['Credentials'] 

1480 if self.ACCESS_KEY in credentials: 

1481 logger.info( 

1482 "Found credentials in boto config file: %s", filename 

1483 ) 

1484 access_key, secret_key = self._extract_creds_from_mapping( 

1485 credentials, self.ACCESS_KEY, self.SECRET_KEY 

1486 ) 

1487 return Credentials( 

1488 access_key, secret_key, method=self.METHOD 

1489 ) 

1490 

1491 

1492class AssumeRoleProvider(CredentialProvider): 

1493 METHOD = 'assume-role' 

1494 # The AssumeRole provider is logically part of the SharedConfig and 

1495 # SharedCredentials providers. Since the purpose of the canonical name 

1496 # is to provide cross-sdk compatibility, calling code will need to be 

1497 # aware that either of those providers should be tied to the AssumeRole 

1498 # provider as much as possible. 

1499 CANONICAL_NAME = None 

1500 ROLE_CONFIG_VAR = 'role_arn' 

1501 WEB_IDENTITY_TOKE_FILE_VAR = 'web_identity_token_file' 

1502 # Credentials are considered expired (and will be refreshed) once the total 

1503 # remaining time left until the credentials expires is less than the 

1504 # EXPIRY_WINDOW. 

1505 EXPIRY_WINDOW_SECONDS = 60 * 15 

1506 

1507 def __init__( 

1508 self, 

1509 load_config, 

1510 client_creator, 

1511 cache, 

1512 profile_name, 

1513 prompter=getpass.getpass, 

1514 credential_sourcer=None, 

1515 profile_provider_builder=None, 

1516 ): 

1517 """ 

1518 :type load_config: callable 

1519 :param load_config: A function that accepts no arguments, and 

1520 when called, will return the full configuration dictionary 

1521 for the session (``session.full_config``). 

1522 

1523 :type client_creator: callable 

1524 :param client_creator: A factory function that will create 

1525 a client when called. Has the same interface as 

1526 ``botocore.session.Session.create_client``. 

1527 

1528 :type cache: dict 

1529 :param cache: An object that supports ``__getitem__``, 

1530 ``__setitem__``, and ``__contains__``. An example 

1531 of this is the ``JSONFileCache`` class in the CLI. 

1532 

1533 :type profile_name: str 

1534 :param profile_name: The name of the profile. 

1535 

1536 :type prompter: callable 

1537 :param prompter: A callable that returns input provided 

1538 by the user (i.e raw_input, getpass.getpass, etc.). 

1539 

1540 :type credential_sourcer: CanonicalNameCredentialSourcer 

1541 :param credential_sourcer: A credential provider that takes a 

1542 configuration, which is used to provide the source credentials 

1543 for the STS call. 

1544 """ 

1545 #: The cache used to first check for assumed credentials. 

1546 #: This is checked before making the AssumeRole API 

1547 #: calls and can be useful if you have short lived 

1548 #: scripts and you'd like to avoid calling AssumeRole 

1549 #: until the credentials are expired. 

1550 self.cache = cache 

1551 self._load_config = load_config 

1552 # client_creator is a callable that creates function. 

1553 # It's basically session.create_client 

1554 self._client_creator = client_creator 

1555 self._profile_name = profile_name 

1556 self._prompter = prompter 

1557 # The _loaded_config attribute will be populated from the 

1558 # load_config() function once the configuration is actually 

1559 # loaded. The reason we go through all this instead of just 

1560 # requiring that the loaded_config be passed to us is to that 

1561 # we can defer configuration loaded until we actually try 

1562 # to load credentials (as opposed to when the object is 

1563 # instantiated). 

1564 self._loaded_config = {} 

1565 self._credential_sourcer = credential_sourcer 

1566 self._profile_provider_builder = profile_provider_builder 

1567 self._visited_profiles = [self._profile_name] 

1568 

1569 def load(self): 

1570 self._loaded_config = self._load_config() 

1571 profiles = self._loaded_config.get('profiles', {}) 

1572 profile = profiles.get(self._profile_name, {}) 

1573 if self._has_assume_role_config_vars(profile): 

1574 return self._load_creds_via_assume_role(self._profile_name) 

1575 

1576 def _has_assume_role_config_vars(self, profile): 

1577 return ( 

1578 self.ROLE_CONFIG_VAR in profile 

1579 and 

1580 # We need to ensure this provider doesn't look at a profile when 

1581 # the profile has configuration for web identity. Simply relying on 

1582 # the order in the credential chain is insufficient as it doesn't 

1583 # prevent the case when we're doing an assume role chain. 

1584 self.WEB_IDENTITY_TOKE_FILE_VAR not in profile 

1585 ) 

1586 

1587 def _load_creds_via_assume_role(self, profile_name): 

1588 role_config = self._get_role_config(profile_name) 

1589 source_credentials = self._resolve_source_credentials( 

1590 role_config, profile_name 

1591 ) 

1592 

1593 extra_args = {} 

1594 role_session_name = role_config.get('role_session_name') 

1595 if role_session_name is not None: 

1596 extra_args['RoleSessionName'] = role_session_name 

1597 

1598 external_id = role_config.get('external_id') 

1599 if external_id is not None: 

1600 extra_args['ExternalId'] = external_id 

1601 

1602 mfa_serial = role_config.get('mfa_serial') 

1603 if mfa_serial is not None: 

1604 extra_args['SerialNumber'] = mfa_serial 

1605 

1606 duration_seconds = role_config.get('duration_seconds') 

1607 if duration_seconds is not None: 

1608 extra_args['DurationSeconds'] = duration_seconds 

1609 

1610 fetcher = AssumeRoleCredentialFetcher( 

1611 client_creator=self._client_creator, 

1612 source_credentials=source_credentials, 

1613 role_arn=role_config['role_arn'], 

1614 extra_args=extra_args, 

1615 mfa_prompter=self._prompter, 

1616 cache=self.cache, 

1617 ) 

1618 refresher = fetcher.fetch_credentials 

1619 if mfa_serial is not None: 

1620 refresher = create_mfa_serial_refresher(refresher) 

1621 

1622 # The initial credentials are empty and the expiration time is set 

1623 # to now so that we can delay the call to assume role until it is 

1624 # strictly needed. 

1625 return DeferredRefreshableCredentials( 

1626 method=self.METHOD, 

1627 refresh_using=refresher, 

1628 time_fetcher=_local_now, 

1629 ) 

1630 

1631 def _get_role_config(self, profile_name): 

1632 """Retrieves and validates the role configuration for the profile.""" 

1633 profiles = self._loaded_config.get('profiles', {}) 

1634 

1635 profile = profiles[profile_name] 

1636 source_profile = profile.get('source_profile') 

1637 role_arn = profile['role_arn'] 

1638 credential_source = profile.get('credential_source') 

1639 mfa_serial = profile.get('mfa_serial') 

1640 external_id = profile.get('external_id') 

1641 role_session_name = profile.get('role_session_name') 

1642 duration_seconds = profile.get('duration_seconds') 

1643 

1644 role_config = { 

1645 'role_arn': role_arn, 

1646 'external_id': external_id, 

1647 'mfa_serial': mfa_serial, 

1648 'role_session_name': role_session_name, 

1649 'source_profile': source_profile, 

1650 'credential_source': credential_source, 

1651 } 

1652 

1653 if duration_seconds is not None: 

1654 try: 

1655 role_config['duration_seconds'] = int(duration_seconds) 

1656 except ValueError: 

1657 pass 

1658 

1659 # Either the credential source or the source profile must be 

1660 # specified, but not both. 

1661 if credential_source is not None and source_profile is not None: 

1662 raise InvalidConfigError( 

1663 error_msg=( 

1664 f'The profile "{profile_name}" contains both ' 

1665 'source_profile and credential_source.' 

1666 ) 

1667 ) 

1668 elif credential_source is None and source_profile is None: 

1669 raise PartialCredentialsError( 

1670 provider=self.METHOD, 

1671 cred_var='source_profile or credential_source', 

1672 ) 

1673 elif credential_source is not None: 

1674 self._validate_credential_source(profile_name, credential_source) 

1675 else: 

1676 self._validate_source_profile(profile_name, source_profile) 

1677 

1678 return role_config 

1679 

1680 def _validate_credential_source(self, parent_profile, credential_source): 

1681 if self._credential_sourcer is None: 

1682 raise InvalidConfigError( 

1683 error_msg=( 

1684 f"The credential_source \"{credential_source}\" is specified " 

1685 f"in profile \"{parent_profile}\", " 

1686 f"but no source provider was configured." 

1687 ) 

1688 ) 

1689 if not self._credential_sourcer.is_supported(credential_source): 

1690 raise InvalidConfigError( 

1691 error_msg=( 

1692 f"The credential source \"{credential_source}\" referenced " 

1693 f"in profile \"{parent_profile}\" is not valid." 

1694 ) 

1695 ) 

1696 

1697 def _source_profile_has_credentials(self, profile): 

1698 return any( 

1699 [ 

1700 self._has_static_credentials(profile), 

1701 self._has_assume_role_config_vars(profile), 

1702 ] 

1703 ) 

1704 

1705 def _validate_source_profile( 

1706 self, parent_profile_name, source_profile_name 

1707 ): 

1708 profiles = self._loaded_config.get('profiles', {}) 

1709 if source_profile_name not in profiles: 

1710 raise InvalidConfigError( 

1711 error_msg=( 

1712 f"The source_profile \"{source_profile_name}\" referenced in " 

1713 f"the profile \"{parent_profile_name}\" does not exist." 

1714 ) 

1715 ) 

1716 

1717 source_profile = profiles[source_profile_name] 

1718 

1719 # Make sure we aren't going into an infinite loop. If we haven't 

1720 # visited the profile yet, we're good. 

1721 if source_profile_name not in self._visited_profiles: 

1722 return 

1723 

1724 # If we have visited the profile and the profile isn't simply 

1725 # referencing itself, that's an infinite loop. 

1726 if source_profile_name != parent_profile_name: 

1727 raise InfiniteLoopConfigError( 

1728 source_profile=source_profile_name, 

1729 visited_profiles=self._visited_profiles, 

1730 ) 

1731 

1732 # A profile is allowed to reference itself so that it can source 

1733 # static credentials and have configuration all in the same 

1734 # profile. This will only ever work for the top level assume 

1735 # role because the static credentials will otherwise take 

1736 # precedence. 

1737 if not self._has_static_credentials(source_profile): 

1738 raise InfiniteLoopConfigError( 

1739 source_profile=source_profile_name, 

1740 visited_profiles=self._visited_profiles, 

1741 ) 

1742 

1743 def _has_static_credentials(self, profile): 

1744 static_keys = ['aws_secret_access_key', 'aws_access_key_id'] 

1745 return any(static_key in profile for static_key in static_keys) 

1746 

1747 def _resolve_source_credentials(self, role_config, profile_name): 

1748 credential_source = role_config.get('credential_source') 

1749 if credential_source is not None: 

1750 return self._resolve_credentials_from_source( 

1751 credential_source, profile_name 

1752 ) 

1753 

1754 source_profile = role_config['source_profile'] 

1755 self._visited_profiles.append(source_profile) 

1756 return self._resolve_credentials_from_profile(source_profile) 

1757 

1758 def _resolve_credentials_from_profile(self, profile_name): 

1759 profiles = self._loaded_config.get('profiles', {}) 

1760 profile = profiles[profile_name] 

1761 

1762 if ( 

1763 self._has_static_credentials(profile) 

1764 and not self._profile_provider_builder 

1765 ): 

1766 # This is only here for backwards compatibility. If this provider 

1767 # isn't given a profile provider builder we still want to be able 

1768 # to handle the basic static credential case as we would before the 

1769 # profile provider builder parameter was added. 

1770 return self._resolve_static_credentials_from_profile(profile) 

1771 elif self._has_static_credentials( 

1772 profile 

1773 ) or not self._has_assume_role_config_vars(profile): 

1774 profile_providers = self._profile_provider_builder.providers( 

1775 profile_name=profile_name, 

1776 disable_env_vars=True, 

1777 ) 

1778 profile_chain = CredentialResolver(profile_providers) 

1779 credentials = profile_chain.load_credentials() 

1780 if credentials is None: 

1781 error_message = ( 

1782 'The source profile "%s" must have credentials.' 

1783 ) 

1784 raise InvalidConfigError( 

1785 error_msg=error_message % profile_name, 

1786 ) 

1787 return credentials 

1788 

1789 return self._load_creds_via_assume_role(profile_name) 

1790 

1791 def _resolve_static_credentials_from_profile(self, profile): 

1792 try: 

1793 return Credentials( 

1794 access_key=profile['aws_access_key_id'], 

1795 secret_key=profile['aws_secret_access_key'], 

1796 token=profile.get('aws_session_token'), 

1797 ) 

1798 except KeyError as e: 

1799 raise PartialCredentialsError( 

1800 provider=self.METHOD, cred_var=str(e) 

1801 ) 

1802 

1803 def _resolve_credentials_from_source( 

1804 self, credential_source, profile_name 

1805 ): 

1806 credentials = self._credential_sourcer.source_credentials( 

1807 credential_source 

1808 ) 

1809 if credentials is None: 

1810 raise CredentialRetrievalError( 

1811 provider=credential_source, 

1812 error_msg=( 

1813 'No credentials found in credential_source referenced ' 

1814 f'in profile {profile_name}' 

1815 ), 

1816 ) 

1817 return credentials 

1818 

1819 

1820class AssumeRoleWithWebIdentityProvider(CredentialProvider): 

1821 METHOD = 'assume-role-with-web-identity' 

1822 CANONICAL_NAME = None 

1823 _CONFIG_TO_ENV_VAR = { 

1824 'web_identity_token_file': 'AWS_WEB_IDENTITY_TOKEN_FILE', 

1825 'role_session_name': 'AWS_ROLE_SESSION_NAME', 

1826 'role_arn': 'AWS_ROLE_ARN', 

1827 } 

1828 

1829 def __init__( 

1830 self, 

1831 load_config, 

1832 client_creator, 

1833 profile_name, 

1834 cache=None, 

1835 disable_env_vars=False, 

1836 token_loader_cls=None, 

1837 ): 

1838 self.cache = cache 

1839 self._load_config = load_config 

1840 self._client_creator = client_creator 

1841 self._profile_name = profile_name 

1842 self._profile_config = None 

1843 self._disable_env_vars = disable_env_vars 

1844 if token_loader_cls is None: 

1845 token_loader_cls = FileWebIdentityTokenLoader 

1846 self._token_loader_cls = token_loader_cls 

1847 

1848 def load(self): 

1849 return self._assume_role_with_web_identity() 

1850 

1851 def _get_profile_config(self, key): 

1852 if self._profile_config is None: 

1853 loaded_config = self._load_config() 

1854 profiles = loaded_config.get('profiles', {}) 

1855 self._profile_config = profiles.get(self._profile_name, {}) 

1856 return self._profile_config.get(key) 

1857 

1858 def _get_env_config(self, key): 

1859 if self._disable_env_vars: 

1860 return None 

1861 env_key = self._CONFIG_TO_ENV_VAR.get(key) 

1862 if env_key and env_key in os.environ: 

1863 return os.environ[env_key] 

1864 return None 

1865 

1866 def _get_config(self, key): 

1867 env_value = self._get_env_config(key) 

1868 if env_value is not None: 

1869 return env_value 

1870 return self._get_profile_config(key) 

1871 

1872 def _assume_role_with_web_identity(self): 

1873 token_path = self._get_config('web_identity_token_file') 

1874 if not token_path: 

1875 return None 

1876 token_loader = self._token_loader_cls(token_path) 

1877 

1878 role_arn = self._get_config('role_arn') 

1879 if not role_arn: 

1880 error_msg = ( 

1881 'The provided profile or the current environment is ' 

1882 'configured to assume role with web identity but has no ' 

1883 'role ARN configured. Ensure that the profile has the role_arn' 

1884 'configuration set or the AWS_ROLE_ARN env var is set.' 

1885 ) 

1886 raise InvalidConfigError(error_msg=error_msg) 

1887 

1888 extra_args = {} 

1889 role_session_name = self._get_config('role_session_name') 

1890 if role_session_name is not None: 

1891 extra_args['RoleSessionName'] = role_session_name 

1892 

1893 fetcher = AssumeRoleWithWebIdentityCredentialFetcher( 

1894 client_creator=self._client_creator, 

1895 web_identity_token_loader=token_loader, 

1896 role_arn=role_arn, 

1897 extra_args=extra_args, 

1898 cache=self.cache, 

1899 ) 

1900 # The initial credentials are empty and the expiration time is set 

1901 # to now so that we can delay the call to assume role until it is 

1902 # strictly needed. 

1903 return DeferredRefreshableCredentials( 

1904 method=self.METHOD, 

1905 refresh_using=fetcher.fetch_credentials, 

1906 ) 

1907 

1908 

1909class CanonicalNameCredentialSourcer: 

1910 def __init__(self, providers): 

1911 self._providers = providers 

1912 

1913 def is_supported(self, source_name): 

1914 """Validates a given source name. 

1915 

1916 :type source_name: str 

1917 :param source_name: The value of credential_source in the config 

1918 file. This is the canonical name of the credential provider. 

1919 

1920 :rtype: bool 

1921 :returns: True if the credential provider is supported, 

1922 False otherwise. 

1923 """ 

1924 return source_name in [p.CANONICAL_NAME for p in self._providers] 

1925 

1926 def source_credentials(self, source_name): 

1927 """Loads source credentials based on the provided configuration. 

1928 

1929 :type source_name: str 

1930 :param source_name: The value of credential_source in the config 

1931 file. This is the canonical name of the credential provider. 

1932 

1933 :rtype: Credentials 

1934 """ 

1935 source = self._get_provider(source_name) 

1936 if isinstance(source, CredentialResolver): 

1937 return source.load_credentials() 

1938 return source.load() 

1939 

1940 def _get_provider(self, canonical_name): 

1941 """Return a credential provider by its canonical name. 

1942 

1943 :type canonical_name: str 

1944 :param canonical_name: The canonical name of the provider. 

1945 

1946 :raises UnknownCredentialError: Raised if no 

1947 credential provider by the provided name 

1948 is found. 

1949 """ 

1950 provider = self._get_provider_by_canonical_name(canonical_name) 

1951 

1952 # The AssumeRole provider should really be part of the SharedConfig 

1953 # provider rather than being its own thing, but it is not. It is 

1954 # effectively part of both the SharedConfig provider and the 

1955 # SharedCredentials provider now due to the way it behaves. 

1956 # Therefore if we want either of those providers we should return 

1957 # the AssumeRole provider with it. 

1958 if canonical_name.lower() in ['sharedconfig', 'sharedcredentials']: 

1959 assume_role_provider = self._get_provider_by_method('assume-role') 

1960 if assume_role_provider is not None: 

1961 # The SharedConfig or SharedCredentials provider may not be 

1962 # present if it was removed for some reason, but the 

1963 # AssumeRole provider could still be present. In that case, 

1964 # return the assume role provider by itself. 

1965 if provider is None: 

1966 return assume_role_provider 

1967 

1968 # If both are present, return them both as a 

1969 # CredentialResolver so that calling code can treat them as 

1970 # a single entity. 

1971 return CredentialResolver([assume_role_provider, provider]) 

1972 

1973 if provider is None: 

1974 raise UnknownCredentialError(name=canonical_name) 

1975 

1976 return provider 

1977 

1978 def _get_provider_by_canonical_name(self, canonical_name): 

1979 """Return a credential provider by its canonical name. 

1980 

1981 This function is strict, it does not attempt to address 

1982 compatibility issues. 

1983 """ 

1984 for provider in self._providers: 

1985 name = provider.CANONICAL_NAME 

1986 # Canonical names are case-insensitive 

1987 if name and name.lower() == canonical_name.lower(): 

1988 return provider 

1989 

1990 def _get_provider_by_method(self, method): 

1991 """Return a credential provider by its METHOD name.""" 

1992 for provider in self._providers: 

1993 if provider.METHOD == method: 

1994 return provider 

1995 

1996 

1997class ContainerProvider(CredentialProvider): 

1998 METHOD = 'container-role' 

1999 CANONICAL_NAME = 'EcsContainer' 

2000 ENV_VAR = 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI' 

2001 ENV_VAR_FULL = 'AWS_CONTAINER_CREDENTIALS_FULL_URI' 

2002 ENV_VAR_AUTH_TOKEN = 'AWS_CONTAINER_AUTHORIZATION_TOKEN' 

2003 ENV_VAR_AUTH_TOKEN_FILE = 'AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE' 

2004 

2005 def __init__(self, environ=None, fetcher=None): 

2006 if environ is None: 

2007 environ = os.environ 

2008 if fetcher is None: 

2009 fetcher = ContainerMetadataFetcher() 

2010 self._environ = environ 

2011 self._fetcher = fetcher 

2012 

2013 def load(self): 

2014 # This cred provider is only triggered if the self.ENV_VAR is set, 

2015 # which only happens if you opt into this feature. 

2016 if self.ENV_VAR in self._environ or self.ENV_VAR_FULL in self._environ: 

2017 return self._retrieve_or_fail() 

2018 

2019 def _retrieve_or_fail(self): 

2020 if self._provided_relative_uri(): 

2021 full_uri = self._fetcher.full_url(self._environ[self.ENV_VAR]) 

2022 else: 

2023 full_uri = self._environ[self.ENV_VAR_FULL] 

2024 fetcher = self._create_fetcher(full_uri) 

2025 creds = fetcher() 

2026 return RefreshableCredentials( 

2027 access_key=creds['access_key'], 

2028 secret_key=creds['secret_key'], 

2029 token=creds['token'], 

2030 method=self.METHOD, 

2031 expiry_time=_parse_if_needed(creds['expiry_time']), 

2032 refresh_using=fetcher, 

2033 account_id=creds.get('account_id'), 

2034 ) 

2035 

2036 def _build_headers(self): 

2037 auth_token = None 

2038 if self.ENV_VAR_AUTH_TOKEN_FILE in self._environ: 

2039 auth_token_file_path = self._environ[self.ENV_VAR_AUTH_TOKEN_FILE] 

2040 with open(auth_token_file_path) as token_file: 

2041 auth_token = token_file.read() 

2042 elif self.ENV_VAR_AUTH_TOKEN in self._environ: 

2043 auth_token = self._environ[self.ENV_VAR_AUTH_TOKEN] 

2044 if auth_token is not None: 

2045 self._validate_auth_token(auth_token) 

2046 return {'Authorization': auth_token} 

2047 

2048 def _validate_auth_token(self, auth_token): 

2049 if "\r" in auth_token or "\n" in auth_token: 

2050 raise ValueError("Auth token value is not a legal header value") 

2051 

2052 def _create_fetcher(self, full_uri, *args, **kwargs): 

2053 def fetch_creds(): 

2054 try: 

2055 headers = self._build_headers() 

2056 response = self._fetcher.retrieve_full_uri( 

2057 full_uri, headers=headers 

2058 ) 

2059 except MetadataRetrievalError as e: 

2060 logger.debug( 

2061 "Error retrieving container metadata: %s", e, exc_info=True 

2062 ) 

2063 raise CredentialRetrievalError( 

2064 provider=self.METHOD, error_msg=str(e) 

2065 ) 

2066 return { 

2067 'access_key': response['AccessKeyId'], 

2068 'secret_key': response['SecretAccessKey'], 

2069 'token': response['Token'], 

2070 'expiry_time': response['Expiration'], 

2071 'account_id': response.get('AccountId'), 

2072 } 

2073 

2074 return fetch_creds 

2075 

2076 def _provided_relative_uri(self): 

2077 return self.ENV_VAR in self._environ 

2078 

2079 

2080class CredentialResolver: 

2081 def __init__(self, providers): 

2082 """ 

2083 

2084 :param providers: A list of ``CredentialProvider`` instances. 

2085 

2086 """ 

2087 self.providers = providers 

2088 

2089 def insert_before(self, name, credential_provider): 

2090 """ 

2091 Inserts a new instance of ``CredentialProvider`` into the chain that 

2092 will be tried before an existing one. 

2093 

2094 :param name: The short name of the credentials you'd like to insert the 

2095 new credentials before. (ex. ``env`` or ``config``). Existing names 

2096 & ordering can be discovered via ``self.available_methods``. 

2097 :type name: string 

2098 

2099 :param cred_instance: An instance of the new ``Credentials`` object 

2100 you'd like to add to the chain. 

2101 :type cred_instance: A subclass of ``Credentials`` 

2102 """ 

2103 try: 

2104 offset = [p.METHOD for p in self.providers].index(name) 

2105 except ValueError: 

2106 raise UnknownCredentialError(name=name) 

2107 self.providers.insert(offset, credential_provider) 

2108 

2109 def insert_after(self, name, credential_provider): 

2110 """ 

2111 Inserts a new type of ``Credentials`` instance into the chain that will 

2112 be tried after an existing one. 

2113 

2114 :param name: The short name of the credentials you'd like to insert the 

2115 new credentials after. (ex. ``env`` or ``config``). Existing names 

2116 & ordering can be discovered via ``self.available_methods``. 

2117 :type name: string 

2118 

2119 :param cred_instance: An instance of the new ``Credentials`` object 

2120 you'd like to add to the chain. 

2121 :type cred_instance: A subclass of ``Credentials`` 

2122 """ 

2123 offset = self._get_provider_offset(name) 

2124 self.providers.insert(offset + 1, credential_provider) 

2125 

2126 def remove(self, name): 

2127 """ 

2128 Removes a given ``Credentials`` instance from the chain. 

2129 

2130 :param name: The short name of the credentials instance to remove. 

2131 :type name: string 

2132 """ 

2133 available_methods = [p.METHOD for p in self.providers] 

2134 if name not in available_methods: 

2135 # It's not present. Fail silently. 

2136 return 

2137 

2138 offset = available_methods.index(name) 

2139 self.providers.pop(offset) 

2140 

2141 def get_provider(self, name): 

2142 """Return a credential provider by name. 

2143 

2144 :type name: str 

2145 :param name: The name of the provider. 

2146 

2147 :raises UnknownCredentialError: Raised if no 

2148 credential provider by the provided name 

2149 is found. 

2150 """ 

2151 return self.providers[self._get_provider_offset(name)] 

2152 

2153 def _get_provider_offset(self, name): 

2154 try: 

2155 return [p.METHOD for p in self.providers].index(name) 

2156 except ValueError: 

2157 raise UnknownCredentialError(name=name) 

2158 

2159 def load_credentials(self): 

2160 """ 

2161 Goes through the credentials chain, returning the first ``Credentials`` 

2162 that could be loaded. 

2163 """ 

2164 # First provider to return a non-None response wins. 

2165 for provider in self.providers: 

2166 logger.debug("Looking for credentials via: %s", provider.METHOD) 

2167 creds = provider.load() 

2168 if creds is not None: 

2169 return creds 

2170 

2171 # If we got here, no credentials could be found. 

2172 # This feels like it should be an exception, but historically, ``None`` 

2173 # is returned. 

2174 # 

2175 # +1 

2176 # -js 

2177 return None 

2178 

2179 

2180class SSOCredentialFetcher(CachedCredentialFetcher): 

2181 _UTC_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' 

2182 

2183 def __init__( 

2184 self, 

2185 start_url, 

2186 sso_region, 

2187 role_name, 

2188 account_id, 

2189 client_creator, 

2190 token_loader=None, 

2191 cache=None, 

2192 expiry_window_seconds=None, 

2193 token_provider=None, 

2194 sso_session_name=None, 

2195 ): 

2196 self._client_creator = client_creator 

2197 self._sso_region = sso_region 

2198 self._role_name = role_name 

2199 self._account_id = account_id 

2200 self._start_url = start_url 

2201 self._token_loader = token_loader 

2202 self._token_provider = token_provider 

2203 self._sso_session_name = sso_session_name 

2204 super().__init__(cache, expiry_window_seconds) 

2205 

2206 def _create_cache_key(self): 

2207 """Create a predictable cache key for the current configuration. 

2208 

2209 The cache key is intended to be compatible with file names. 

2210 """ 

2211 args = { 

2212 'roleName': self._role_name, 

2213 'accountId': self._account_id, 

2214 } 

2215 if self._sso_session_name: 

2216 args['sessionName'] = self._sso_session_name 

2217 else: 

2218 args['startUrl'] = self._start_url 

2219 # NOTE: It would be good to hoist this cache key construction logic 

2220 # into the CachedCredentialFetcher class as we should be consistent. 

2221 # Unfortunately, the current assume role fetchers that sub class don't 

2222 # pass separators resulting in non-minified JSON. In the long term, 

2223 # all fetchers should use the below caching scheme. 

2224 args = json.dumps(args, sort_keys=True, separators=(',', ':')) 

2225 argument_hash = sha1(args.encode('utf-8')).hexdigest() 

2226 return self._make_file_safe(argument_hash) 

2227 

2228 def _parse_timestamp(self, timestamp_ms): 

2229 # fromtimestamp expects seconds so: milliseconds / 1000 = seconds 

2230 timestamp_seconds = timestamp_ms / 1000.0 

2231 timestamp = datetime.datetime.fromtimestamp(timestamp_seconds, tzutc()) 

2232 return timestamp.strftime(self._UTC_DATE_FORMAT) 

2233 

2234 def _get_credentials(self): 

2235 """Get credentials by calling SSO get role credentials.""" 

2236 config = Config( 

2237 signature_version=UNSIGNED, 

2238 region_name=self._sso_region, 

2239 ) 

2240 client = self._client_creator('sso', config=config) 

2241 if self._token_provider: 

2242 initial_token_data = self._token_provider.load_token() 

2243 token = initial_token_data.get_frozen_token().token 

2244 else: 

2245 token = self._token_loader(self._start_url)['accessToken'] 

2246 

2247 kwargs = { 

2248 'roleName': self._role_name, 

2249 'accountId': self._account_id, 

2250 'accessToken': token, 

2251 } 

2252 try: 

2253 response = client.get_role_credentials(**kwargs) 

2254 except client.exceptions.UnauthorizedException: 

2255 raise UnauthorizedSSOTokenError() 

2256 credentials = response['roleCredentials'] 

2257 

2258 credentials = { 

2259 'ProviderType': 'sso', 

2260 'Credentials': { 

2261 'AccessKeyId': credentials['accessKeyId'], 

2262 'SecretAccessKey': credentials['secretAccessKey'], 

2263 'SessionToken': credentials['sessionToken'], 

2264 'Expiration': self._parse_timestamp(credentials['expiration']), 

2265 'AccountId': self._account_id, 

2266 }, 

2267 } 

2268 return credentials 

2269 

2270 

2271class SSOProvider(CredentialProvider): 

2272 METHOD = 'sso' 

2273 

2274 _SSO_TOKEN_CACHE_DIR = os.path.expanduser( 

2275 os.path.join('~', '.aws', 'sso', 'cache') 

2276 ) 

2277 _PROFILE_REQUIRED_CONFIG_VARS = ( 

2278 'sso_role_name', 

2279 'sso_account_id', 

2280 ) 

2281 _SSO_REQUIRED_CONFIG_VARS = ( 

2282 'sso_start_url', 

2283 'sso_region', 

2284 ) 

2285 _ALL_REQUIRED_CONFIG_VARS = ( 

2286 _PROFILE_REQUIRED_CONFIG_VARS + _SSO_REQUIRED_CONFIG_VARS 

2287 ) 

2288 

2289 def __init__( 

2290 self, 

2291 load_config, 

2292 client_creator, 

2293 profile_name, 

2294 cache=None, 

2295 token_cache=None, 

2296 token_provider=None, 

2297 ): 

2298 if token_cache is None: 

2299 token_cache = JSONFileCache(self._SSO_TOKEN_CACHE_DIR) 

2300 self._token_cache = token_cache 

2301 self._token_provider = token_provider 

2302 if cache is None: 

2303 cache = {} 

2304 self.cache = cache 

2305 self._load_config = load_config 

2306 self._client_creator = client_creator 

2307 self._profile_name = profile_name 

2308 

2309 def _load_sso_config(self): 

2310 loaded_config = self._load_config() 

2311 profiles = loaded_config.get('profiles', {}) 

2312 profile_name = self._profile_name 

2313 profile_config = profiles.get(self._profile_name, {}) 

2314 sso_sessions = loaded_config.get('sso_sessions', {}) 

2315 

2316 # Role name & Account ID indicate the cred provider should be used 

2317 if all( 

2318 c not in profile_config for c in self._PROFILE_REQUIRED_CONFIG_VARS 

2319 ): 

2320 return None 

2321 

2322 resolved_config, extra_reqs = self._resolve_sso_session_reference( 

2323 profile_config, sso_sessions 

2324 ) 

2325 

2326 config = {} 

2327 missing_config_vars = [] 

2328 all_required_configs = self._ALL_REQUIRED_CONFIG_VARS + extra_reqs 

2329 for config_var in all_required_configs: 

2330 if config_var in resolved_config: 

2331 config[config_var] = resolved_config[config_var] 

2332 else: 

2333 missing_config_vars.append(config_var) 

2334 

2335 if missing_config_vars: 

2336 missing = ', '.join(missing_config_vars) 

2337 raise InvalidConfigError( 

2338 error_msg=( 

2339 f'The profile "{profile_name}" is configured to use SSO ' 

2340 f'but is missing required configuration: {missing}' 

2341 ) 

2342 ) 

2343 return config 

2344 

2345 def _resolve_sso_session_reference(self, profile_config, sso_sessions): 

2346 sso_session_name = profile_config.get('sso_session') 

2347 if sso_session_name is None: 

2348 # No reference to resolve, proceed with legacy flow 

2349 return profile_config, () 

2350 

2351 if sso_session_name not in sso_sessions: 

2352 error_msg = f'The specified sso-session does not exist: "{sso_session_name}"' 

2353 raise InvalidConfigError(error_msg=error_msg) 

2354 

2355 config = profile_config.copy() 

2356 session = sso_sessions[sso_session_name] 

2357 for config_var, val in session.items(): 

2358 # Validate any keys referenced in both profile and sso_session match 

2359 if config.get(config_var, val) != val: 

2360 error_msg = ( 

2361 f"The value for {config_var} is inconsistent between " 

2362 f"profile ({config[config_var]}) and sso-session ({val})." 

2363 ) 

2364 raise InvalidConfigError(error_msg=error_msg) 

2365 config[config_var] = val 

2366 return config, ('sso_session',) 

2367 

2368 def load(self): 

2369 sso_config = self._load_sso_config() 

2370 if not sso_config: 

2371 return None 

2372 

2373 fetcher_kwargs = { 

2374 'start_url': sso_config['sso_start_url'], 

2375 'sso_region': sso_config['sso_region'], 

2376 'role_name': sso_config['sso_role_name'], 

2377 'account_id': sso_config['sso_account_id'], 

2378 'client_creator': self._client_creator, 

2379 'token_loader': SSOTokenLoader(cache=self._token_cache), 

2380 'cache': self.cache, 

2381 } 

2382 if 'sso_session' in sso_config: 

2383 fetcher_kwargs['sso_session_name'] = sso_config['sso_session'] 

2384 fetcher_kwargs['token_provider'] = self._token_provider 

2385 

2386 sso_fetcher = SSOCredentialFetcher(**fetcher_kwargs) 

2387 

2388 return DeferredRefreshableCredentials( 

2389 method=self.METHOD, 

2390 refresh_using=sso_fetcher.fetch_credentials, 

2391 )