1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"). You
5# may not use this file except in compliance with the License. A copy of
6# the License is located at
7#
8# http://aws.amazon.com/apache2.0/
9#
10# or in the "license" file accompanying this file. This file is
11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12# ANY KIND, either express or implied. See the License for the specific
13# language governing permissions and limitations under the License.
14import datetime
15import getpass
16import json
17import logging
18import os
19import subprocess
20import threading
21import time
22from collections import namedtuple
23from copy import deepcopy
24from hashlib import sha1
25
26from dateutil.parser import parse
27from dateutil.tz import tzlocal, tzutc
28
29import botocore.compat
30import botocore.configloader
31from botocore import UNSIGNED
32from botocore.compat import compat_shell_split, total_seconds
33from botocore.config import Config
34from botocore.exceptions import (
35 ConfigNotFound,
36 CredentialRetrievalError,
37 InfiniteLoopConfigError,
38 InvalidConfigError,
39 MetadataRetrievalError,
40 PartialCredentialsError,
41 RefreshWithMFAUnsupportedError,
42 UnauthorizedSSOTokenError,
43 UnknownCredentialError,
44)
45from botocore.tokens import SSOTokenProvider
46from botocore.utils import (
47 ArnParser,
48 ContainerMetadataFetcher,
49 FileWebIdentityTokenLoader,
50 InstanceMetadataFetcher,
51 JSONFileCache,
52 SSOTokenLoader,
53 parse_key_val_file,
54 resolve_imds_endpoint_mode,
55)
56
57logger = logging.getLogger(__name__)
58ReadOnlyCredentials = namedtuple(
59 'ReadOnlyCredentials',
60 ['access_key', 'secret_key', 'token', 'account_id'],
61 defaults=(None,),
62)
63
64_DEFAULT_MANDATORY_REFRESH_TIMEOUT = 10 * 60 # 10 min
65_DEFAULT_ADVISORY_REFRESH_TIMEOUT = 15 * 60 # 15 min
66
67
68def create_credential_resolver(session, cache=None, region_name=None):
69 """Create a default credential resolver.
70
71 This creates a pre-configured credential resolver
72 that includes the default lookup chain for
73 credentials.
74
75 """
76 profile_name = session.get_config_variable('profile') or 'default'
77 metadata_timeout = session.get_config_variable('metadata_service_timeout')
78 num_attempts = session.get_config_variable('metadata_service_num_attempts')
79 disable_env_vars = session.instance_variables().get('profile') is not None
80
81 imds_config = {
82 'ec2_metadata_service_endpoint': session.get_config_variable(
83 'ec2_metadata_service_endpoint'
84 ),
85 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode(
86 session
87 ),
88 'ec2_credential_refresh_window': _DEFAULT_ADVISORY_REFRESH_TIMEOUT,
89 'ec2_metadata_v1_disabled': session.get_config_variable(
90 'ec2_metadata_v1_disabled'
91 ),
92 }
93
94 if cache is None:
95 cache = {}
96
97 env_provider = EnvProvider()
98 container_provider = ContainerProvider()
99 instance_metadata_provider = InstanceMetadataProvider(
100 iam_role_fetcher=InstanceMetadataFetcher(
101 timeout=metadata_timeout,
102 num_attempts=num_attempts,
103 user_agent=session.user_agent(),
104 config=imds_config,
105 )
106 )
107
108 profile_provider_builder = ProfileProviderBuilder(
109 session, cache=cache, region_name=region_name
110 )
111 assume_role_provider = AssumeRoleProvider(
112 load_config=lambda: session.full_config,
113 client_creator=_get_client_creator(session, region_name),
114 cache=cache,
115 profile_name=profile_name,
116 credential_sourcer=CanonicalNameCredentialSourcer(
117 [env_provider, container_provider, instance_metadata_provider]
118 ),
119 profile_provider_builder=profile_provider_builder,
120 )
121
122 pre_profile = [
123 env_provider,
124 assume_role_provider,
125 ]
126 profile_providers = profile_provider_builder.providers(
127 profile_name=profile_name,
128 disable_env_vars=disable_env_vars,
129 )
130 post_profile = [
131 OriginalEC2Provider(),
132 BotoProvider(),
133 container_provider,
134 instance_metadata_provider,
135 ]
136 providers = pre_profile + profile_providers + post_profile
137
138 if disable_env_vars:
139 # An explicitly provided profile will negate an EnvProvider.
140 # We will defer to providers that understand the "profile"
141 # concept to retrieve credentials.
142 # The one edge case if is all three values are provided via
143 # env vars:
144 # export AWS_ACCESS_KEY_ID=foo
145 # export AWS_SECRET_ACCESS_KEY=bar
146 # export AWS_PROFILE=baz
147 # Then, just like our client() calls, the explicit credentials
148 # will take precedence.
149 #
150 # This precedence is enforced by leaving the EnvProvider in the chain.
151 # This means that the only way a "profile" would win is if the
152 # EnvProvider does not return credentials, which is what we want
153 # in this scenario.
154 providers.remove(env_provider)
155 logger.debug(
156 'Skipping environment variable credential check'
157 ' because profile name was explicitly set.'
158 )
159
160 resolver = CredentialResolver(providers=providers)
161 return resolver
162
163
164class ProfileProviderBuilder:
165 """This class handles the creation of profile based providers.
166
167 NOTE: This class is only intended for internal use.
168
169 This class handles the creation and ordering of the various credential
170 providers that primarly source their configuration from the shared config.
171 This is needed to enable sharing between the default credential chain and
172 the source profile chain created by the assume role provider.
173 """
174
175 def __init__(
176 self, session, cache=None, region_name=None, sso_token_cache=None
177 ):
178 self._session = session
179 self._cache = cache
180 self._region_name = region_name
181 self._sso_token_cache = sso_token_cache
182
183 def providers(self, profile_name, disable_env_vars=False):
184 return [
185 self._create_web_identity_provider(
186 profile_name,
187 disable_env_vars,
188 ),
189 self._create_sso_provider(profile_name),
190 self._create_shared_credential_provider(profile_name),
191 self._create_process_provider(profile_name),
192 self._create_config_provider(profile_name),
193 ]
194
195 def _create_process_provider(self, profile_name):
196 return ProcessProvider(
197 profile_name=profile_name,
198 load_config=lambda: self._session.full_config,
199 )
200
201 def _create_shared_credential_provider(self, profile_name):
202 credential_file = self._session.get_config_variable('credentials_file')
203 return SharedCredentialProvider(
204 profile_name=profile_name,
205 creds_filename=credential_file,
206 )
207
208 def _create_config_provider(self, profile_name):
209 config_file = self._session.get_config_variable('config_file')
210 return ConfigProvider(
211 profile_name=profile_name,
212 config_filename=config_file,
213 )
214
215 def _create_web_identity_provider(self, profile_name, disable_env_vars):
216 return AssumeRoleWithWebIdentityProvider(
217 load_config=lambda: self._session.full_config,
218 client_creator=_get_client_creator(
219 self._session, self._region_name
220 ),
221 cache=self._cache,
222 profile_name=profile_name,
223 disable_env_vars=disable_env_vars,
224 )
225
226 def _create_sso_provider(self, profile_name):
227 return SSOProvider(
228 load_config=lambda: self._session.full_config,
229 client_creator=self._session.create_client,
230 profile_name=profile_name,
231 cache=self._cache,
232 token_cache=self._sso_token_cache,
233 token_provider=SSOTokenProvider(
234 self._session,
235 cache=self._sso_token_cache,
236 profile_name=profile_name,
237 ),
238 )
239
240
241def get_credentials(session):
242 resolver = create_credential_resolver(session)
243 return resolver.load_credentials()
244
245
246def _local_now():
247 return datetime.datetime.now(tzlocal())
248
249
250def _parse_if_needed(value):
251 if isinstance(value, datetime.datetime):
252 return value
253 return parse(value)
254
255
256def _serialize_if_needed(value, iso=False):
257 if isinstance(value, datetime.datetime):
258 if iso:
259 return value.isoformat()
260 return value.strftime('%Y-%m-%dT%H:%M:%S%Z')
261 return value
262
263
264def _get_client_creator(session, region_name):
265 def client_creator(service_name, **kwargs):
266 create_client_kwargs = {'region_name': region_name}
267 create_client_kwargs.update(**kwargs)
268 return session.create_client(service_name, **create_client_kwargs)
269
270 return client_creator
271
272
273def create_assume_role_refresher(client, params):
274 def refresh():
275 response = client.assume_role(**params)
276 credentials = response['Credentials']
277 # We need to normalize the credential names to
278 # the values expected by the refresh creds.
279 return {
280 'access_key': credentials['AccessKeyId'],
281 'secret_key': credentials['SecretAccessKey'],
282 'token': credentials['SessionToken'],
283 'expiry_time': _serialize_if_needed(credentials['Expiration']),
284 }
285
286 return refresh
287
288
289def create_mfa_serial_refresher(actual_refresh):
290 class _Refresher:
291 def __init__(self, refresh):
292 self._refresh = refresh
293 self._has_been_called = False
294
295 def __call__(self):
296 if self._has_been_called:
297 # We can explore an option in the future to support
298 # reprompting for MFA, but for now we just error out
299 # when the temp creds expire.
300 raise RefreshWithMFAUnsupportedError()
301 self._has_been_called = True
302 return self._refresh()
303
304 return _Refresher(actual_refresh)
305
306
307class Credentials:
308 """
309 Holds the credentials needed to authenticate requests.
310
311 :param str access_key: The access key part of the credentials.
312 :param str secret_key: The secret key part of the credentials.
313 :param str token: The security token, valid only for session credentials.
314 :param str method: A string which identifies where the credentials
315 were found.
316 :param str account_id: (optional) An account ID associated with the credentials.
317 """
318
319 def __init__(
320 self, access_key, secret_key, token=None, method=None, account_id=None
321 ):
322 self.access_key = access_key
323 self.secret_key = secret_key
324 self.token = token
325
326 if method is None:
327 method = 'explicit'
328 self.method = method
329 self.account_id = account_id
330
331 self._normalize()
332
333 def _normalize(self):
334 # Keys would sometimes (accidentally) contain non-ascii characters.
335 # It would cause a confusing UnicodeDecodeError in Python 2.
336 # We explicitly convert them into unicode to avoid such error.
337 #
338 # Eventually the service will decide whether to accept the credential.
339 # This also complies with the behavior in Python 3.
340 self.access_key = botocore.compat.ensure_unicode(self.access_key)
341 self.secret_key = botocore.compat.ensure_unicode(self.secret_key)
342
343 def get_frozen_credentials(self):
344 return ReadOnlyCredentials(
345 self.access_key, self.secret_key, self.token, self.account_id
346 )
347
348 def get_deferred_property(self, property_name):
349 def get_property():
350 return getattr(self, property_name, None)
351
352 return get_property
353
354
355class RefreshableCredentials(Credentials):
356 """
357 Holds the credentials needed to authenticate requests. In addition, it
358 knows how to refresh itself.
359
360 :param str access_key: The access key part of the credentials.
361 :param str secret_key: The secret key part of the credentials.
362 :param str token: The security token, valid only for session credentials.
363 :param datetime expiry_time: The expiration time of the credentials.
364 :param function refresh_using: Callback function to refresh the credentials.
365 :param str method: A string which identifies where the credentials
366 were found.
367 :param function time_fetcher: Callback function to retrieve current time.
368 """
369
370 # The time at which we'll attempt to refresh, but not
371 # block if someone else is refreshing.
372 _advisory_refresh_timeout = _DEFAULT_ADVISORY_REFRESH_TIMEOUT
373 # The time at which all threads will block waiting for
374 # refreshed credentials.
375 _mandatory_refresh_timeout = _DEFAULT_MANDATORY_REFRESH_TIMEOUT
376
377 def __init__(
378 self,
379 access_key,
380 secret_key,
381 token,
382 expiry_time,
383 refresh_using,
384 method,
385 time_fetcher=_local_now,
386 advisory_timeout=None,
387 mandatory_timeout=None,
388 account_id=None,
389 ):
390 self._refresh_using = refresh_using
391 self._access_key = access_key
392 self._secret_key = secret_key
393 self._token = token
394 self._account_id = account_id
395 self._expiry_time = expiry_time
396 self._time_fetcher = time_fetcher
397 self._refresh_lock = threading.Lock()
398 self.method = method
399 self._frozen_credentials = ReadOnlyCredentials(
400 access_key, secret_key, token, account_id
401 )
402 self._normalize()
403 if advisory_timeout is not None:
404 self._advisory_refresh_timeout = advisory_timeout
405 if mandatory_timeout is not None:
406 self._mandatory_refresh_timeout = mandatory_timeout
407
408 def _normalize(self):
409 self._access_key = botocore.compat.ensure_unicode(self._access_key)
410 self._secret_key = botocore.compat.ensure_unicode(self._secret_key)
411
412 @classmethod
413 def create_from_metadata(
414 cls,
415 metadata,
416 refresh_using,
417 method,
418 advisory_timeout=None,
419 mandatory_timeout=None,
420 ):
421 kwargs = {}
422 if advisory_timeout is not None:
423 kwargs['advisory_timeout'] = advisory_timeout
424 if mandatory_timeout is not None:
425 kwargs['mandatory_timeout'] = mandatory_timeout
426
427 instance = cls(
428 access_key=metadata['access_key'],
429 secret_key=metadata['secret_key'],
430 token=metadata['token'],
431 expiry_time=cls._expiry_datetime(metadata['expiry_time']),
432 method=method,
433 refresh_using=refresh_using,
434 account_id=metadata.get('account_id'),
435 **kwargs,
436 )
437 return instance
438
439 @property
440 def access_key(self):
441 """Warning: Using this property can lead to race conditions if you
442 access another property subsequently along the refresh boundary.
443 Please use get_frozen_credentials instead.
444 """
445 self._refresh()
446 return self._access_key
447
448 @access_key.setter
449 def access_key(self, value):
450 self._access_key = value
451
452 @property
453 def secret_key(self):
454 """Warning: Using this property can lead to race conditions if you
455 access another property subsequently along the refresh boundary.
456 Please use get_frozen_credentials instead.
457 """
458 self._refresh()
459 return self._secret_key
460
461 @secret_key.setter
462 def secret_key(self, value):
463 self._secret_key = value
464
465 @property
466 def token(self):
467 """Warning: Using this property can lead to race conditions if you
468 access another property subsequently along the refresh boundary.
469 Please use get_frozen_credentials instead.
470 """
471 self._refresh()
472 return self._token
473
474 @token.setter
475 def token(self, value):
476 self._token = value
477
478 @property
479 def account_id(self):
480 """Warning: Using this property can lead to race conditions if you
481 access another property subsequently along the refresh boundary.
482 Please use get_frozen_credentials instead.
483 """
484 self._refresh()
485 return self._account_id
486
487 @account_id.setter
488 def account_id(self, value):
489 self._account_id = value
490
491 def _seconds_remaining(self):
492 delta = self._expiry_time - self._time_fetcher()
493 return total_seconds(delta)
494
495 def refresh_needed(self, refresh_in=None):
496 """Check if a refresh is needed.
497
498 A refresh is needed if the expiry time associated
499 with the temporary credentials is less than the
500 provided ``refresh_in``. If ``time_delta`` is not
501 provided, ``self.advisory_refresh_needed`` will be used.
502
503 For example, if your temporary credentials expire
504 in 10 minutes and the provided ``refresh_in`` is
505 ``15 * 60``, then this function will return ``True``.
506
507 :type refresh_in: int
508 :param refresh_in: The number of seconds before the
509 credentials expire in which refresh attempts should
510 be made.
511
512 :return: True if refresh needed, False otherwise.
513
514 """
515 if self._expiry_time is None:
516 # No expiration, so assume we don't need to refresh.
517 return False
518
519 if refresh_in is None:
520 refresh_in = self._advisory_refresh_timeout
521 # The credentials should be refreshed if they're going to expire
522 # in less than 5 minutes.
523 if self._seconds_remaining() >= refresh_in:
524 # There's enough time left. Don't refresh.
525 return False
526 logger.debug("Credentials need to be refreshed.")
527 return True
528
529 def _is_expired(self):
530 # Checks if the current credentials are expired.
531 return self.refresh_needed(refresh_in=0)
532
533 def _refresh(self):
534 # In the common case where we don't need a refresh, we
535 # can immediately exit and not require acquiring the
536 # refresh lock.
537 if not self.refresh_needed(self._advisory_refresh_timeout):
538 return
539
540 # acquire() doesn't accept kwargs, but False is indicating
541 # that we should not block if we can't acquire the lock.
542 # If we aren't able to acquire the lock, we'll trigger
543 # the else clause.
544 if self._refresh_lock.acquire(False):
545 try:
546 if not self.refresh_needed(self._advisory_refresh_timeout):
547 return
548 is_mandatory_refresh = self.refresh_needed(
549 self._mandatory_refresh_timeout
550 )
551 self._protected_refresh(is_mandatory=is_mandatory_refresh)
552 return
553 finally:
554 self._refresh_lock.release()
555 elif self.refresh_needed(self._mandatory_refresh_timeout):
556 # If we're within the mandatory refresh window,
557 # we must block until we get refreshed credentials.
558 with self._refresh_lock:
559 if not self.refresh_needed(self._mandatory_refresh_timeout):
560 return
561 self._protected_refresh(is_mandatory=True)
562
563 def _protected_refresh(self, is_mandatory):
564 # precondition: this method should only be called if you've acquired
565 # the self._refresh_lock.
566 try:
567 metadata = self._refresh_using()
568 except Exception:
569 period_name = 'mandatory' if is_mandatory else 'advisory'
570 logger.warning(
571 "Refreshing temporary credentials failed "
572 "during %s refresh period.",
573 period_name,
574 exc_info=True,
575 )
576 if is_mandatory:
577 # If this is a mandatory refresh, then
578 # all errors that occur when we attempt to refresh
579 # credentials are propagated back to the user.
580 raise
581 # Otherwise we'll just return.
582 # The end result will be that we'll use the current
583 # set of temporary credentials we have.
584 return
585 self._set_from_data(metadata)
586 self._frozen_credentials = ReadOnlyCredentials(
587 self._access_key, self._secret_key, self._token, self._account_id
588 )
589 if self._is_expired():
590 # We successfully refreshed credentials but for whatever
591 # reason, our refreshing function returned credentials
592 # that are still expired. In this scenario, the only
593 # thing we can do is let the user know and raise
594 # an exception.
595 msg = (
596 "Credentials were refreshed, but the "
597 "refreshed credentials are still expired."
598 )
599 logger.warning(msg)
600 raise RuntimeError(msg)
601
602 @staticmethod
603 def _expiry_datetime(time_str):
604 return parse(time_str)
605
606 def _set_from_data(self, data):
607 expected_keys = ['access_key', 'secret_key', 'token', 'expiry_time']
608 if not data:
609 missing_keys = expected_keys
610 else:
611 missing_keys = [k for k in expected_keys if k not in data]
612
613 if missing_keys:
614 message = "Credential refresh failed, response did not contain: %s"
615 raise CredentialRetrievalError(
616 provider=self.method,
617 error_msg=message % ', '.join(missing_keys),
618 )
619
620 self.access_key = data['access_key']
621 self.secret_key = data['secret_key']
622 self.token = data['token']
623 self._expiry_time = parse(data['expiry_time'])
624 self.account_id = data.get('account_id')
625 logger.debug(
626 "Retrieved credentials will expire at: %s", self._expiry_time
627 )
628 self._normalize()
629
630 def get_frozen_credentials(self):
631 """Return immutable credentials.
632
633 The ``access_key``, ``secret_key``, and ``token`` properties
634 on this class will always check and refresh credentials if
635 needed before returning the particular credentials.
636
637 This has an edge case where you can get inconsistent
638 credentials. Imagine this:
639
640 # Current creds are "t1"
641 tmp.access_key ---> expired? no, so return t1.access_key
642 # ---- time is now expired, creds need refreshing to "t2" ----
643 tmp.secret_key ---> expired? yes, refresh and return t2.secret_key
644
645 This means we're using the access key from t1 with the secret key
646 from t2. To fix this issue, you can request a frozen credential object
647 which is guaranteed not to change.
648
649 The frozen credentials returned from this method should be used
650 immediately and then discarded. The typical usage pattern would
651 be::
652
653 creds = RefreshableCredentials(...)
654 some_code = SomeSignerObject()
655 # I'm about to sign the request.
656 # The frozen credentials are only used for the
657 # duration of generate_presigned_url and will be
658 # immediately thrown away.
659 request = some_code.sign_some_request(
660 with_credentials=creds.get_frozen_credentials())
661 print("Signed request:", request)
662
663 """
664 self._refresh()
665 return self._frozen_credentials
666
667
668class DeferredRefreshableCredentials(RefreshableCredentials):
669 """Refreshable credentials that don't require initial credentials.
670
671 refresh_using will be called upon first access.
672 """
673
674 def __init__(self, refresh_using, method, time_fetcher=_local_now):
675 self._refresh_using = refresh_using
676 self._access_key = None
677 self._secret_key = None
678 self._token = None
679 self._account_id = None
680 self._expiry_time = None
681 self._time_fetcher = time_fetcher
682 self._refresh_lock = threading.Lock()
683 self.method = method
684 self._frozen_credentials = None
685
686 def refresh_needed(self, refresh_in=None):
687 if self._frozen_credentials is None:
688 return True
689 return super().refresh_needed(refresh_in)
690
691
692class CachedCredentialFetcher:
693 DEFAULT_EXPIRY_WINDOW_SECONDS = 60 * 15
694
695 def __init__(self, cache=None, expiry_window_seconds=None):
696 if cache is None:
697 cache = {}
698 self._cache = cache
699 self._cache_key = self._create_cache_key()
700 if expiry_window_seconds is None:
701 expiry_window_seconds = self.DEFAULT_EXPIRY_WINDOW_SECONDS
702 self._expiry_window_seconds = expiry_window_seconds
703
704 def _create_cache_key(self):
705 raise NotImplementedError('_create_cache_key()')
706
707 def _make_file_safe(self, filename):
708 # Replace :, path sep, and / to make it the string filename safe.
709 filename = filename.replace(':', '_').replace(os.sep, '_')
710 return filename.replace('/', '_')
711
712 def _get_credentials(self):
713 raise NotImplementedError('_get_credentials()')
714
715 def fetch_credentials(self):
716 return self._get_cached_credentials()
717
718 def _get_cached_credentials(self):
719 """Get up-to-date credentials.
720
721 This will check the cache for up-to-date credentials, calling assume
722 role if none are available.
723 """
724 response = self._load_from_cache()
725 if response is None:
726 response = self._get_credentials()
727 self._write_to_cache(response)
728 else:
729 logger.debug("Credentials for role retrieved from cache.")
730
731 creds = response['Credentials']
732 expiration = _serialize_if_needed(creds['Expiration'], iso=True)
733 credentials = {
734 'access_key': creds['AccessKeyId'],
735 'secret_key': creds['SecretAccessKey'],
736 'token': creds['SessionToken'],
737 'expiry_time': expiration,
738 'account_id': creds.get('AccountId'),
739 }
740
741 return credentials
742
743 def _load_from_cache(self):
744 if self._cache_key in self._cache:
745 creds = deepcopy(self._cache[self._cache_key])
746 if not self._is_expired(creds):
747 return creds
748 else:
749 logger.debug(
750 "Credentials were found in cache, but they are expired."
751 )
752 return None
753
754 def _write_to_cache(self, response):
755 self._cache[self._cache_key] = deepcopy(response)
756
757 def _is_expired(self, credentials):
758 """Check if credentials are expired."""
759 end_time = _parse_if_needed(credentials['Credentials']['Expiration'])
760 seconds = total_seconds(end_time - _local_now())
761 return seconds < self._expiry_window_seconds
762
763
764class BaseAssumeRoleCredentialFetcher(CachedCredentialFetcher):
765 def __init__(
766 self,
767 client_creator,
768 role_arn,
769 extra_args=None,
770 cache=None,
771 expiry_window_seconds=None,
772 ):
773 self._client_creator = client_creator
774 self._role_arn = role_arn
775
776 if extra_args is None:
777 self._assume_kwargs = {}
778 else:
779 self._assume_kwargs = deepcopy(extra_args)
780 self._assume_kwargs['RoleArn'] = self._role_arn
781
782 self._role_session_name = self._assume_kwargs.get('RoleSessionName')
783 self._using_default_session_name = False
784 if not self._role_session_name:
785 self._generate_assume_role_name()
786
787 super().__init__(cache, expiry_window_seconds)
788
789 def _generate_assume_role_name(self):
790 self._role_session_name = f'botocore-session-{int(time.time())}'
791 self._assume_kwargs['RoleSessionName'] = self._role_session_name
792 self._using_default_session_name = True
793
794 def _create_cache_key(self):
795 """Create a predictable cache key for the current configuration.
796
797 The cache key is intended to be compatible with file names.
798 """
799 args = deepcopy(self._assume_kwargs)
800
801 # The role session name gets randomly generated, so we don't want it
802 # in the hash.
803 if self._using_default_session_name:
804 del args['RoleSessionName']
805
806 if 'Policy' in args:
807 # To have a predictable hash, the keys of the policy must be
808 # sorted, so we have to load it here to make sure it gets sorted
809 # later on.
810 args['Policy'] = json.loads(args['Policy'])
811
812 args = json.dumps(args, sort_keys=True)
813 argument_hash = sha1(args.encode('utf-8')).hexdigest()
814 return self._make_file_safe(argument_hash)
815
816 def _add_account_id_to_response(self, response):
817 role_arn = response.get('AssumedRoleUser', {}).get('Arn')
818 if ArnParser.is_arn(role_arn):
819 arn_parser = ArnParser()
820 account_id = arn_parser.parse_arn(role_arn)['account']
821 response['Credentials']['AccountId'] = account_id
822 else:
823 logger.debug(f"Unable to extract account ID from Arn: {role_arn}")
824
825
826class AssumeRoleCredentialFetcher(BaseAssumeRoleCredentialFetcher):
827 def __init__(
828 self,
829 client_creator,
830 source_credentials,
831 role_arn,
832 extra_args=None,
833 mfa_prompter=None,
834 cache=None,
835 expiry_window_seconds=None,
836 ):
837 """
838 :type client_creator: callable
839 :param client_creator: A callable that creates a client taking
840 arguments like ``Session.create_client``.
841
842 :type source_credentials: Credentials
843 :param source_credentials: The credentials to use to create the
844 client for the call to AssumeRole.
845
846 :type role_arn: str
847 :param role_arn: The ARN of the role to be assumed.
848
849 :type extra_args: dict
850 :param extra_args: Any additional arguments to add to the assume
851 role request using the format of the botocore operation.
852 Possible keys include, but may not be limited to,
853 DurationSeconds, Policy, SerialNumber, ExternalId and
854 RoleSessionName.
855
856 :type mfa_prompter: callable
857 :param mfa_prompter: A callable that returns input provided by the
858 user (i.e raw_input, getpass.getpass, etc.).
859
860 :type cache: dict
861 :param cache: An object that supports ``__getitem__``,
862 ``__setitem__``, and ``__contains__``. An example of this is
863 the ``JSONFileCache`` class in aws-cli.
864
865 :type expiry_window_seconds: int
866 :param expiry_window_seconds: The amount of time, in seconds,
867 """
868 self._source_credentials = source_credentials
869 self._mfa_prompter = mfa_prompter
870 if self._mfa_prompter is None:
871 self._mfa_prompter = getpass.getpass
872
873 super().__init__(
874 client_creator,
875 role_arn,
876 extra_args=extra_args,
877 cache=cache,
878 expiry_window_seconds=expiry_window_seconds,
879 )
880
881 def _get_credentials(self):
882 """Get credentials by calling assume role."""
883 kwargs = self._assume_role_kwargs()
884 client = self._create_client()
885 response = client.assume_role(**kwargs)
886 self._add_account_id_to_response(response)
887 return response
888
889 def _assume_role_kwargs(self):
890 """Get the arguments for assume role based on current configuration."""
891 assume_role_kwargs = deepcopy(self._assume_kwargs)
892
893 mfa_serial = assume_role_kwargs.get('SerialNumber')
894
895 if mfa_serial is not None:
896 prompt = f'Enter MFA code for {mfa_serial}: '
897 token_code = self._mfa_prompter(prompt)
898 assume_role_kwargs['TokenCode'] = token_code
899
900 duration_seconds = assume_role_kwargs.get('DurationSeconds')
901
902 if duration_seconds is not None:
903 assume_role_kwargs['DurationSeconds'] = duration_seconds
904
905 return assume_role_kwargs
906
907 def _create_client(self):
908 """Create an STS client using the source credentials."""
909 frozen_credentials = self._source_credentials.get_frozen_credentials()
910 return self._client_creator(
911 'sts',
912 aws_access_key_id=frozen_credentials.access_key,
913 aws_secret_access_key=frozen_credentials.secret_key,
914 aws_session_token=frozen_credentials.token,
915 )
916
917
918class AssumeRoleWithWebIdentityCredentialFetcher(
919 BaseAssumeRoleCredentialFetcher
920):
921 def __init__(
922 self,
923 client_creator,
924 web_identity_token_loader,
925 role_arn,
926 extra_args=None,
927 cache=None,
928 expiry_window_seconds=None,
929 ):
930 """
931 :type client_creator: callable
932 :param client_creator: A callable that creates a client taking
933 arguments like ``Session.create_client``.
934
935 :type web_identity_token_loader: callable
936 :param web_identity_token_loader: A callable that takes no arguments
937 and returns a web identity token str.
938
939 :type role_arn: str
940 :param role_arn: The ARN of the role to be assumed.
941
942 :type extra_args: dict
943 :param extra_args: Any additional arguments to add to the assume
944 role request using the format of the botocore operation.
945 Possible keys include, but may not be limited to,
946 DurationSeconds, Policy, SerialNumber, ExternalId and
947 RoleSessionName.
948
949 :type cache: dict
950 :param cache: An object that supports ``__getitem__``,
951 ``__setitem__``, and ``__contains__``. An example of this is
952 the ``JSONFileCache`` class in aws-cli.
953
954 :type expiry_window_seconds: int
955 :param expiry_window_seconds: The amount of time, in seconds,
956 """
957 self._web_identity_token_loader = web_identity_token_loader
958
959 super().__init__(
960 client_creator,
961 role_arn,
962 extra_args=extra_args,
963 cache=cache,
964 expiry_window_seconds=expiry_window_seconds,
965 )
966
967 def _get_credentials(self):
968 """Get credentials by calling assume role."""
969 kwargs = self._assume_role_kwargs()
970 # Assume role with web identity does not require credentials other than
971 # the token, explicitly configure the client to not sign requests.
972 config = Config(signature_version=UNSIGNED)
973 client = self._client_creator('sts', config=config)
974 response = client.assume_role_with_web_identity(**kwargs)
975 self._add_account_id_to_response(response)
976 return response
977
978 def _assume_role_kwargs(self):
979 """Get the arguments for assume role based on current configuration."""
980 assume_role_kwargs = deepcopy(self._assume_kwargs)
981 identity_token = self._web_identity_token_loader()
982 assume_role_kwargs['WebIdentityToken'] = identity_token
983
984 return assume_role_kwargs
985
986
987class CredentialProvider:
988 # A short name to identify the provider within botocore.
989 METHOD = None
990
991 # A name to identify the provider for use in cross-sdk features like
992 # assume role's `credential_source` configuration option. These names
993 # are to be treated in a case-insensitive way. NOTE: any providers not
994 # implemented in botocore MUST prefix their canonical names with
995 # 'custom' or we DO NOT guarantee that it will work with any features
996 # that this provides.
997 CANONICAL_NAME = None
998
999 def __init__(self, session=None):
1000 self.session = session
1001
1002 def load(self):
1003 """
1004 Loads the credentials from their source & sets them on the object.
1005
1006 Subclasses should implement this method (by reading from disk, the
1007 environment, the network or wherever), returning ``True`` if they were
1008 found & loaded.
1009
1010 If not found, this method should return ``False``, indictating that the
1011 ``CredentialResolver`` should fall back to the next available method.
1012
1013 The default implementation does nothing, assuming the user has set the
1014 ``access_key/secret_key/token`` themselves.
1015
1016 :returns: Whether credentials were found & set
1017 :rtype: Credentials
1018 """
1019 return True
1020
1021 def _extract_creds_from_mapping(self, mapping, *key_names):
1022 found = []
1023 for key_name in key_names:
1024 try:
1025 found.append(mapping[key_name])
1026 except KeyError:
1027 raise PartialCredentialsError(
1028 provider=self.METHOD, cred_var=key_name
1029 )
1030 return found
1031
1032
1033class ProcessProvider(CredentialProvider):
1034 METHOD = 'custom-process'
1035
1036 def __init__(self, profile_name, load_config, popen=subprocess.Popen):
1037 self._profile_name = profile_name
1038 self._load_config = load_config
1039 self._loaded_config = None
1040 self._popen = popen
1041
1042 def load(self):
1043 credential_process = self._credential_process
1044 if credential_process is None:
1045 return
1046
1047 creds_dict = self._retrieve_credentials_using(credential_process)
1048 if creds_dict.get('expiry_time') is not None:
1049 return RefreshableCredentials.create_from_metadata(
1050 creds_dict,
1051 lambda: self._retrieve_credentials_using(credential_process),
1052 self.METHOD,
1053 )
1054
1055 return Credentials(
1056 access_key=creds_dict['access_key'],
1057 secret_key=creds_dict['secret_key'],
1058 token=creds_dict.get('token'),
1059 method=self.METHOD,
1060 account_id=creds_dict.get('account_id'),
1061 )
1062
1063 def _retrieve_credentials_using(self, credential_process):
1064 # We're not using shell=True, so we need to pass the
1065 # command and all arguments as a list.
1066 process_list = compat_shell_split(credential_process)
1067 p = self._popen(
1068 process_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE
1069 )
1070 stdout, stderr = p.communicate()
1071 if p.returncode != 0:
1072 raise CredentialRetrievalError(
1073 provider=self.METHOD, error_msg=stderr.decode('utf-8')
1074 )
1075 parsed = botocore.compat.json.loads(stdout.decode('utf-8'))
1076 version = parsed.get('Version', '<Version key not provided>')
1077 if version != 1:
1078 raise CredentialRetrievalError(
1079 provider=self.METHOD,
1080 error_msg=(
1081 f"Unsupported version '{version}' for credential process "
1082 f"provider, supported versions: 1"
1083 ),
1084 )
1085 try:
1086 return {
1087 'access_key': parsed['AccessKeyId'],
1088 'secret_key': parsed['SecretAccessKey'],
1089 'token': parsed.get('SessionToken'),
1090 'expiry_time': parsed.get('Expiration'),
1091 'account_id': self._get_account_id(parsed),
1092 }
1093 except KeyError as e:
1094 raise CredentialRetrievalError(
1095 provider=self.METHOD,
1096 error_msg=f"Missing required key in response: {e}",
1097 )
1098
1099 @property
1100 def _credential_process(self):
1101 return self.profile_config.get('credential_process')
1102
1103 @property
1104 def profile_config(self):
1105 if self._loaded_config is None:
1106 self._loaded_config = self._load_config()
1107 profile_config = self._loaded_config.get('profiles', {}).get(
1108 self._profile_name, {}
1109 )
1110 return profile_config
1111
1112 def _get_account_id(self, parsed):
1113 account_id = parsed.get('AccountId')
1114 return account_id or self.profile_config.get('aws_account_id')
1115
1116
1117class InstanceMetadataProvider(CredentialProvider):
1118 METHOD = 'iam-role'
1119 CANONICAL_NAME = 'Ec2InstanceMetadata'
1120
1121 def __init__(self, iam_role_fetcher):
1122 self._role_fetcher = iam_role_fetcher
1123
1124 def load(self):
1125 fetcher = self._role_fetcher
1126 # We do the first request, to see if we get useful data back.
1127 # If not, we'll pass & move on to whatever's next in the credential
1128 # chain.
1129 metadata = fetcher.retrieve_iam_role_credentials()
1130 if not metadata:
1131 return None
1132 logger.info(
1133 'Found credentials from IAM Role: %s', metadata['role_name']
1134 )
1135 # We manually set the data here, since we already made the request &
1136 # have it. When the expiry is hit, the credentials will auto-refresh
1137 # themselves.
1138 creds = RefreshableCredentials.create_from_metadata(
1139 metadata,
1140 method=self.METHOD,
1141 refresh_using=fetcher.retrieve_iam_role_credentials,
1142 )
1143 return creds
1144
1145
1146class EnvProvider(CredentialProvider):
1147 METHOD = 'env'
1148 CANONICAL_NAME = 'Environment'
1149 ACCESS_KEY = 'AWS_ACCESS_KEY_ID'
1150 SECRET_KEY = 'AWS_SECRET_ACCESS_KEY'
1151 # The token can come from either of these env var.
1152 # AWS_SESSION_TOKEN is what other AWS SDKs have standardized on.
1153 TOKENS = ['AWS_SECURITY_TOKEN', 'AWS_SESSION_TOKEN']
1154 EXPIRY_TIME = 'AWS_CREDENTIAL_EXPIRATION'
1155 ACCOUNT_ID = 'AWS_ACCOUNT_ID'
1156
1157 def __init__(self, environ=None, mapping=None):
1158 """
1159
1160 :param environ: The environment variables (defaults to
1161 ``os.environ`` if no value is provided).
1162 :param mapping: An optional mapping of variable names to
1163 environment variable names. Use this if you want to
1164 change the mapping of access_key->AWS_ACCESS_KEY_ID, etc.
1165 The dict can have up to 5 keys:
1166 * ``access_key``
1167 * ``secret_key``
1168 * ``token``
1169 * ``expiry_time``
1170 * ``account_id``
1171 """
1172 if environ is None:
1173 environ = os.environ
1174 self.environ = environ
1175 self._mapping = self._build_mapping(mapping)
1176
1177 def _build_mapping(self, mapping):
1178 # Mapping of variable name to env var name.
1179 var_mapping = {}
1180 if mapping is None:
1181 # Use the class var default.
1182 var_mapping['access_key'] = self.ACCESS_KEY
1183 var_mapping['secret_key'] = self.SECRET_KEY
1184 var_mapping['token'] = self.TOKENS
1185 var_mapping['expiry_time'] = self.EXPIRY_TIME
1186 var_mapping['account_id'] = self.ACCOUNT_ID
1187 else:
1188 var_mapping['access_key'] = mapping.get(
1189 'access_key', self.ACCESS_KEY
1190 )
1191 var_mapping['secret_key'] = mapping.get(
1192 'secret_key', self.SECRET_KEY
1193 )
1194 var_mapping['token'] = mapping.get('token', self.TOKENS)
1195 if not isinstance(var_mapping['token'], list):
1196 var_mapping['token'] = [var_mapping['token']]
1197 var_mapping['expiry_time'] = mapping.get(
1198 'expiry_time', self.EXPIRY_TIME
1199 )
1200 var_mapping['account_id'] = mapping.get(
1201 'account_id', self.ACCOUNT_ID
1202 )
1203 return var_mapping
1204
1205 def load(self):
1206 """
1207 Search for credentials in explicit environment variables.
1208 """
1209
1210 access_key = self.environ.get(self._mapping['access_key'], '')
1211
1212 if access_key:
1213 logger.info('Found credentials in environment variables.')
1214 fetcher = self._create_credentials_fetcher()
1215 credentials = fetcher(require_expiry=False)
1216
1217 expiry_time = credentials['expiry_time']
1218 if expiry_time is not None:
1219 expiry_time = parse(expiry_time)
1220 return RefreshableCredentials(
1221 credentials['access_key'],
1222 credentials['secret_key'],
1223 credentials['token'],
1224 expiry_time,
1225 refresh_using=fetcher,
1226 method=self.METHOD,
1227 account_id=credentials['account_id'],
1228 )
1229
1230 return Credentials(
1231 credentials['access_key'],
1232 credentials['secret_key'],
1233 credentials['token'],
1234 method=self.METHOD,
1235 account_id=credentials['account_id'],
1236 )
1237 else:
1238 return None
1239
1240 def _create_credentials_fetcher(self):
1241 mapping = self._mapping
1242 method = self.METHOD
1243 environ = self.environ
1244
1245 def fetch_credentials(require_expiry=True):
1246 credentials = {}
1247
1248 access_key = environ.get(mapping['access_key'], '')
1249 if not access_key:
1250 raise PartialCredentialsError(
1251 provider=method, cred_var=mapping['access_key']
1252 )
1253 credentials['access_key'] = access_key
1254
1255 secret_key = environ.get(mapping['secret_key'], '')
1256 if not secret_key:
1257 raise PartialCredentialsError(
1258 provider=method, cred_var=mapping['secret_key']
1259 )
1260 credentials['secret_key'] = secret_key
1261
1262 credentials['token'] = None
1263 for token_env_var in mapping['token']:
1264 token = environ.get(token_env_var, '')
1265 if token:
1266 credentials['token'] = token
1267 break
1268
1269 credentials['expiry_time'] = None
1270 expiry_time = environ.get(mapping['expiry_time'], '')
1271 if expiry_time:
1272 credentials['expiry_time'] = expiry_time
1273 if require_expiry and not expiry_time:
1274 raise PartialCredentialsError(
1275 provider=method, cred_var=mapping['expiry_time']
1276 )
1277
1278 credentials['account_id'] = None
1279 account_id = environ.get(mapping['account_id'], '')
1280 if account_id:
1281 credentials['account_id'] = account_id
1282
1283 return credentials
1284
1285 return fetch_credentials
1286
1287
1288class OriginalEC2Provider(CredentialProvider):
1289 METHOD = 'ec2-credentials-file'
1290 CANONICAL_NAME = 'Ec2Config'
1291
1292 CRED_FILE_ENV = 'AWS_CREDENTIAL_FILE'
1293 ACCESS_KEY = 'AWSAccessKeyId'
1294 SECRET_KEY = 'AWSSecretKey'
1295
1296 def __init__(self, environ=None, parser=None):
1297 if environ is None:
1298 environ = os.environ
1299 if parser is None:
1300 parser = parse_key_val_file
1301 self._environ = environ
1302 self._parser = parser
1303
1304 def load(self):
1305 """
1306 Search for a credential file used by original EC2 CLI tools.
1307 """
1308 if 'AWS_CREDENTIAL_FILE' in self._environ:
1309 full_path = os.path.expanduser(
1310 self._environ['AWS_CREDENTIAL_FILE']
1311 )
1312 creds = self._parser(full_path)
1313 if self.ACCESS_KEY in creds:
1314 logger.info('Found credentials in AWS_CREDENTIAL_FILE.')
1315 access_key = creds[self.ACCESS_KEY]
1316 secret_key = creds[self.SECRET_KEY]
1317 # EC2 creds file doesn't support session tokens.
1318 return Credentials(access_key, secret_key, method=self.METHOD)
1319 else:
1320 return None
1321
1322
1323class SharedCredentialProvider(CredentialProvider):
1324 METHOD = 'shared-credentials-file'
1325 CANONICAL_NAME = 'SharedCredentials'
1326
1327 ACCESS_KEY = 'aws_access_key_id'
1328 SECRET_KEY = 'aws_secret_access_key'
1329 # Same deal as the EnvProvider above. Botocore originally supported
1330 # aws_security_token, but the SDKs are standardizing on aws_session_token
1331 # so we support both.
1332 TOKENS = ['aws_security_token', 'aws_session_token']
1333 ACCOUNT_ID = 'aws_account_id'
1334
1335 def __init__(self, creds_filename, profile_name=None, ini_parser=None):
1336 self._creds_filename = creds_filename
1337 if profile_name is None:
1338 profile_name = 'default'
1339 self._profile_name = profile_name
1340 if ini_parser is None:
1341 ini_parser = botocore.configloader.raw_config_parse
1342 self._ini_parser = ini_parser
1343
1344 def load(self):
1345 try:
1346 available_creds = self._ini_parser(self._creds_filename)
1347 except ConfigNotFound:
1348 return None
1349 if self._profile_name in available_creds:
1350 config = available_creds[self._profile_name]
1351 if self.ACCESS_KEY in config:
1352 logger.info(
1353 "Found credentials in shared credentials file: %s",
1354 self._creds_filename,
1355 )
1356 access_key, secret_key = self._extract_creds_from_mapping(
1357 config, self.ACCESS_KEY, self.SECRET_KEY
1358 )
1359 token = self._get_session_token(config)
1360 account_id = self._get_account_id(config)
1361 return Credentials(
1362 access_key,
1363 secret_key,
1364 token,
1365 method=self.METHOD,
1366 account_id=account_id,
1367 )
1368
1369 def _get_session_token(self, config):
1370 for token_envvar in self.TOKENS:
1371 if token_envvar in config:
1372 return config[token_envvar]
1373
1374 def _get_account_id(self, config):
1375 return config.get(self.ACCOUNT_ID)
1376
1377
1378class ConfigProvider(CredentialProvider):
1379 """INI based config provider with profile sections."""
1380
1381 METHOD = 'config-file'
1382 CANONICAL_NAME = 'SharedConfig'
1383
1384 ACCESS_KEY = 'aws_access_key_id'
1385 SECRET_KEY = 'aws_secret_access_key'
1386 # Same deal as the EnvProvider above. Botocore originally supported
1387 # aws_security_token, but the SDKs are standardizing on aws_session_token
1388 # so we support both.
1389 TOKENS = ['aws_security_token', 'aws_session_token']
1390 ACCOUNT_ID = 'aws_account_id'
1391
1392 def __init__(self, config_filename, profile_name, config_parser=None):
1393 """
1394
1395 :param config_filename: The session configuration scoped to the current
1396 profile. This is available via ``session.config``.
1397 :param profile_name: The name of the current profile.
1398 :param config_parser: A config parser callable.
1399
1400 """
1401 self._config_filename = config_filename
1402 self._profile_name = profile_name
1403 if config_parser is None:
1404 config_parser = botocore.configloader.load_config
1405 self._config_parser = config_parser
1406
1407 def load(self):
1408 """
1409 If there is are credentials in the configuration associated with
1410 the session, use those.
1411 """
1412 try:
1413 full_config = self._config_parser(self._config_filename)
1414 except ConfigNotFound:
1415 return None
1416 if self._profile_name in full_config['profiles']:
1417 profile_config = full_config['profiles'][self._profile_name]
1418 if self.ACCESS_KEY in profile_config:
1419 logger.info(
1420 "Credentials found in config file: %s",
1421 self._config_filename,
1422 )
1423 access_key, secret_key = self._extract_creds_from_mapping(
1424 profile_config, self.ACCESS_KEY, self.SECRET_KEY
1425 )
1426 token = self._get_session_token(profile_config)
1427 account_id = self._get_account_id(profile_config)
1428 return Credentials(
1429 access_key,
1430 secret_key,
1431 token,
1432 method=self.METHOD,
1433 account_id=account_id,
1434 )
1435 else:
1436 return None
1437
1438 def _get_session_token(self, profile_config):
1439 for token_name in self.TOKENS:
1440 if token_name in profile_config:
1441 return profile_config[token_name]
1442
1443 def _get_account_id(self, config):
1444 return config.get(self.ACCOUNT_ID)
1445
1446
1447class BotoProvider(CredentialProvider):
1448 METHOD = 'boto-config'
1449 CANONICAL_NAME = 'Boto2Config'
1450
1451 BOTO_CONFIG_ENV = 'BOTO_CONFIG'
1452 DEFAULT_CONFIG_FILENAMES = ['/etc/boto.cfg', '~/.boto']
1453 ACCESS_KEY = 'aws_access_key_id'
1454 SECRET_KEY = 'aws_secret_access_key'
1455
1456 def __init__(self, environ=None, ini_parser=None):
1457 if environ is None:
1458 environ = os.environ
1459 if ini_parser is None:
1460 ini_parser = botocore.configloader.raw_config_parse
1461 self._environ = environ
1462 self._ini_parser = ini_parser
1463
1464 def load(self):
1465 """
1466 Look for credentials in boto config file.
1467 """
1468 if self.BOTO_CONFIG_ENV in self._environ:
1469 potential_locations = [self._environ[self.BOTO_CONFIG_ENV]]
1470 else:
1471 potential_locations = self.DEFAULT_CONFIG_FILENAMES
1472 for filename in potential_locations:
1473 try:
1474 config = self._ini_parser(filename)
1475 except ConfigNotFound:
1476 # Move on to the next potential config file name.
1477 continue
1478 if 'Credentials' in config:
1479 credentials = config['Credentials']
1480 if self.ACCESS_KEY in credentials:
1481 logger.info(
1482 "Found credentials in boto config file: %s", filename
1483 )
1484 access_key, secret_key = self._extract_creds_from_mapping(
1485 credentials, self.ACCESS_KEY, self.SECRET_KEY
1486 )
1487 return Credentials(
1488 access_key, secret_key, method=self.METHOD
1489 )
1490
1491
1492class AssumeRoleProvider(CredentialProvider):
1493 METHOD = 'assume-role'
1494 # The AssumeRole provider is logically part of the SharedConfig and
1495 # SharedCredentials providers. Since the purpose of the canonical name
1496 # is to provide cross-sdk compatibility, calling code will need to be
1497 # aware that either of those providers should be tied to the AssumeRole
1498 # provider as much as possible.
1499 CANONICAL_NAME = None
1500 ROLE_CONFIG_VAR = 'role_arn'
1501 WEB_IDENTITY_TOKE_FILE_VAR = 'web_identity_token_file'
1502 # Credentials are considered expired (and will be refreshed) once the total
1503 # remaining time left until the credentials expires is less than the
1504 # EXPIRY_WINDOW.
1505 EXPIRY_WINDOW_SECONDS = 60 * 15
1506
1507 def __init__(
1508 self,
1509 load_config,
1510 client_creator,
1511 cache,
1512 profile_name,
1513 prompter=getpass.getpass,
1514 credential_sourcer=None,
1515 profile_provider_builder=None,
1516 ):
1517 """
1518 :type load_config: callable
1519 :param load_config: A function that accepts no arguments, and
1520 when called, will return the full configuration dictionary
1521 for the session (``session.full_config``).
1522
1523 :type client_creator: callable
1524 :param client_creator: A factory function that will create
1525 a client when called. Has the same interface as
1526 ``botocore.session.Session.create_client``.
1527
1528 :type cache: dict
1529 :param cache: An object that supports ``__getitem__``,
1530 ``__setitem__``, and ``__contains__``. An example
1531 of this is the ``JSONFileCache`` class in the CLI.
1532
1533 :type profile_name: str
1534 :param profile_name: The name of the profile.
1535
1536 :type prompter: callable
1537 :param prompter: A callable that returns input provided
1538 by the user (i.e raw_input, getpass.getpass, etc.).
1539
1540 :type credential_sourcer: CanonicalNameCredentialSourcer
1541 :param credential_sourcer: A credential provider that takes a
1542 configuration, which is used to provide the source credentials
1543 for the STS call.
1544 """
1545 #: The cache used to first check for assumed credentials.
1546 #: This is checked before making the AssumeRole API
1547 #: calls and can be useful if you have short lived
1548 #: scripts and you'd like to avoid calling AssumeRole
1549 #: until the credentials are expired.
1550 self.cache = cache
1551 self._load_config = load_config
1552 # client_creator is a callable that creates function.
1553 # It's basically session.create_client
1554 self._client_creator = client_creator
1555 self._profile_name = profile_name
1556 self._prompter = prompter
1557 # The _loaded_config attribute will be populated from the
1558 # load_config() function once the configuration is actually
1559 # loaded. The reason we go through all this instead of just
1560 # requiring that the loaded_config be passed to us is to that
1561 # we can defer configuration loaded until we actually try
1562 # to load credentials (as opposed to when the object is
1563 # instantiated).
1564 self._loaded_config = {}
1565 self._credential_sourcer = credential_sourcer
1566 self._profile_provider_builder = profile_provider_builder
1567 self._visited_profiles = [self._profile_name]
1568
1569 def load(self):
1570 self._loaded_config = self._load_config()
1571 profiles = self._loaded_config.get('profiles', {})
1572 profile = profiles.get(self._profile_name, {})
1573 if self._has_assume_role_config_vars(profile):
1574 return self._load_creds_via_assume_role(self._profile_name)
1575
1576 def _has_assume_role_config_vars(self, profile):
1577 return (
1578 self.ROLE_CONFIG_VAR in profile
1579 and
1580 # We need to ensure this provider doesn't look at a profile when
1581 # the profile has configuration for web identity. Simply relying on
1582 # the order in the credential chain is insufficient as it doesn't
1583 # prevent the case when we're doing an assume role chain.
1584 self.WEB_IDENTITY_TOKE_FILE_VAR not in profile
1585 )
1586
1587 def _load_creds_via_assume_role(self, profile_name):
1588 role_config = self._get_role_config(profile_name)
1589 source_credentials = self._resolve_source_credentials(
1590 role_config, profile_name
1591 )
1592
1593 extra_args = {}
1594 role_session_name = role_config.get('role_session_name')
1595 if role_session_name is not None:
1596 extra_args['RoleSessionName'] = role_session_name
1597
1598 external_id = role_config.get('external_id')
1599 if external_id is not None:
1600 extra_args['ExternalId'] = external_id
1601
1602 mfa_serial = role_config.get('mfa_serial')
1603 if mfa_serial is not None:
1604 extra_args['SerialNumber'] = mfa_serial
1605
1606 duration_seconds = role_config.get('duration_seconds')
1607 if duration_seconds is not None:
1608 extra_args['DurationSeconds'] = duration_seconds
1609
1610 fetcher = AssumeRoleCredentialFetcher(
1611 client_creator=self._client_creator,
1612 source_credentials=source_credentials,
1613 role_arn=role_config['role_arn'],
1614 extra_args=extra_args,
1615 mfa_prompter=self._prompter,
1616 cache=self.cache,
1617 )
1618 refresher = fetcher.fetch_credentials
1619 if mfa_serial is not None:
1620 refresher = create_mfa_serial_refresher(refresher)
1621
1622 # The initial credentials are empty and the expiration time is set
1623 # to now so that we can delay the call to assume role until it is
1624 # strictly needed.
1625 return DeferredRefreshableCredentials(
1626 method=self.METHOD,
1627 refresh_using=refresher,
1628 time_fetcher=_local_now,
1629 )
1630
1631 def _get_role_config(self, profile_name):
1632 """Retrieves and validates the role configuration for the profile."""
1633 profiles = self._loaded_config.get('profiles', {})
1634
1635 profile = profiles[profile_name]
1636 source_profile = profile.get('source_profile')
1637 role_arn = profile['role_arn']
1638 credential_source = profile.get('credential_source')
1639 mfa_serial = profile.get('mfa_serial')
1640 external_id = profile.get('external_id')
1641 role_session_name = profile.get('role_session_name')
1642 duration_seconds = profile.get('duration_seconds')
1643
1644 role_config = {
1645 'role_arn': role_arn,
1646 'external_id': external_id,
1647 'mfa_serial': mfa_serial,
1648 'role_session_name': role_session_name,
1649 'source_profile': source_profile,
1650 'credential_source': credential_source,
1651 }
1652
1653 if duration_seconds is not None:
1654 try:
1655 role_config['duration_seconds'] = int(duration_seconds)
1656 except ValueError:
1657 pass
1658
1659 # Either the credential source or the source profile must be
1660 # specified, but not both.
1661 if credential_source is not None and source_profile is not None:
1662 raise InvalidConfigError(
1663 error_msg=(
1664 f'The profile "{profile_name}" contains both '
1665 'source_profile and credential_source.'
1666 )
1667 )
1668 elif credential_source is None and source_profile is None:
1669 raise PartialCredentialsError(
1670 provider=self.METHOD,
1671 cred_var='source_profile or credential_source',
1672 )
1673 elif credential_source is not None:
1674 self._validate_credential_source(profile_name, credential_source)
1675 else:
1676 self._validate_source_profile(profile_name, source_profile)
1677
1678 return role_config
1679
1680 def _validate_credential_source(self, parent_profile, credential_source):
1681 if self._credential_sourcer is None:
1682 raise InvalidConfigError(
1683 error_msg=(
1684 f"The credential_source \"{credential_source}\" is specified "
1685 f"in profile \"{parent_profile}\", "
1686 f"but no source provider was configured."
1687 )
1688 )
1689 if not self._credential_sourcer.is_supported(credential_source):
1690 raise InvalidConfigError(
1691 error_msg=(
1692 f"The credential source \"{credential_source}\" referenced "
1693 f"in profile \"{parent_profile}\" is not valid."
1694 )
1695 )
1696
1697 def _source_profile_has_credentials(self, profile):
1698 return any(
1699 [
1700 self._has_static_credentials(profile),
1701 self._has_assume_role_config_vars(profile),
1702 ]
1703 )
1704
1705 def _validate_source_profile(
1706 self, parent_profile_name, source_profile_name
1707 ):
1708 profiles = self._loaded_config.get('profiles', {})
1709 if source_profile_name not in profiles:
1710 raise InvalidConfigError(
1711 error_msg=(
1712 f"The source_profile \"{source_profile_name}\" referenced in "
1713 f"the profile \"{parent_profile_name}\" does not exist."
1714 )
1715 )
1716
1717 source_profile = profiles[source_profile_name]
1718
1719 # Make sure we aren't going into an infinite loop. If we haven't
1720 # visited the profile yet, we're good.
1721 if source_profile_name not in self._visited_profiles:
1722 return
1723
1724 # If we have visited the profile and the profile isn't simply
1725 # referencing itself, that's an infinite loop.
1726 if source_profile_name != parent_profile_name:
1727 raise InfiniteLoopConfigError(
1728 source_profile=source_profile_name,
1729 visited_profiles=self._visited_profiles,
1730 )
1731
1732 # A profile is allowed to reference itself so that it can source
1733 # static credentials and have configuration all in the same
1734 # profile. This will only ever work for the top level assume
1735 # role because the static credentials will otherwise take
1736 # precedence.
1737 if not self._has_static_credentials(source_profile):
1738 raise InfiniteLoopConfigError(
1739 source_profile=source_profile_name,
1740 visited_profiles=self._visited_profiles,
1741 )
1742
1743 def _has_static_credentials(self, profile):
1744 static_keys = ['aws_secret_access_key', 'aws_access_key_id']
1745 return any(static_key in profile for static_key in static_keys)
1746
1747 def _resolve_source_credentials(self, role_config, profile_name):
1748 credential_source = role_config.get('credential_source')
1749 if credential_source is not None:
1750 return self._resolve_credentials_from_source(
1751 credential_source, profile_name
1752 )
1753
1754 source_profile = role_config['source_profile']
1755 self._visited_profiles.append(source_profile)
1756 return self._resolve_credentials_from_profile(source_profile)
1757
1758 def _resolve_credentials_from_profile(self, profile_name):
1759 profiles = self._loaded_config.get('profiles', {})
1760 profile = profiles[profile_name]
1761
1762 if (
1763 self._has_static_credentials(profile)
1764 and not self._profile_provider_builder
1765 ):
1766 # This is only here for backwards compatibility. If this provider
1767 # isn't given a profile provider builder we still want to be able
1768 # to handle the basic static credential case as we would before the
1769 # profile provider builder parameter was added.
1770 return self._resolve_static_credentials_from_profile(profile)
1771 elif self._has_static_credentials(
1772 profile
1773 ) or not self._has_assume_role_config_vars(profile):
1774 profile_providers = self._profile_provider_builder.providers(
1775 profile_name=profile_name,
1776 disable_env_vars=True,
1777 )
1778 profile_chain = CredentialResolver(profile_providers)
1779 credentials = profile_chain.load_credentials()
1780 if credentials is None:
1781 error_message = (
1782 'The source profile "%s" must have credentials.'
1783 )
1784 raise InvalidConfigError(
1785 error_msg=error_message % profile_name,
1786 )
1787 return credentials
1788
1789 return self._load_creds_via_assume_role(profile_name)
1790
1791 def _resolve_static_credentials_from_profile(self, profile):
1792 try:
1793 return Credentials(
1794 access_key=profile['aws_access_key_id'],
1795 secret_key=profile['aws_secret_access_key'],
1796 token=profile.get('aws_session_token'),
1797 )
1798 except KeyError as e:
1799 raise PartialCredentialsError(
1800 provider=self.METHOD, cred_var=str(e)
1801 )
1802
1803 def _resolve_credentials_from_source(
1804 self, credential_source, profile_name
1805 ):
1806 credentials = self._credential_sourcer.source_credentials(
1807 credential_source
1808 )
1809 if credentials is None:
1810 raise CredentialRetrievalError(
1811 provider=credential_source,
1812 error_msg=(
1813 'No credentials found in credential_source referenced '
1814 f'in profile {profile_name}'
1815 ),
1816 )
1817 return credentials
1818
1819
1820class AssumeRoleWithWebIdentityProvider(CredentialProvider):
1821 METHOD = 'assume-role-with-web-identity'
1822 CANONICAL_NAME = None
1823 _CONFIG_TO_ENV_VAR = {
1824 'web_identity_token_file': 'AWS_WEB_IDENTITY_TOKEN_FILE',
1825 'role_session_name': 'AWS_ROLE_SESSION_NAME',
1826 'role_arn': 'AWS_ROLE_ARN',
1827 }
1828
1829 def __init__(
1830 self,
1831 load_config,
1832 client_creator,
1833 profile_name,
1834 cache=None,
1835 disable_env_vars=False,
1836 token_loader_cls=None,
1837 ):
1838 self.cache = cache
1839 self._load_config = load_config
1840 self._client_creator = client_creator
1841 self._profile_name = profile_name
1842 self._profile_config = None
1843 self._disable_env_vars = disable_env_vars
1844 if token_loader_cls is None:
1845 token_loader_cls = FileWebIdentityTokenLoader
1846 self._token_loader_cls = token_loader_cls
1847
1848 def load(self):
1849 return self._assume_role_with_web_identity()
1850
1851 def _get_profile_config(self, key):
1852 if self._profile_config is None:
1853 loaded_config = self._load_config()
1854 profiles = loaded_config.get('profiles', {})
1855 self._profile_config = profiles.get(self._profile_name, {})
1856 return self._profile_config.get(key)
1857
1858 def _get_env_config(self, key):
1859 if self._disable_env_vars:
1860 return None
1861 env_key = self._CONFIG_TO_ENV_VAR.get(key)
1862 if env_key and env_key in os.environ:
1863 return os.environ[env_key]
1864 return None
1865
1866 def _get_config(self, key):
1867 env_value = self._get_env_config(key)
1868 if env_value is not None:
1869 return env_value
1870 return self._get_profile_config(key)
1871
1872 def _assume_role_with_web_identity(self):
1873 token_path = self._get_config('web_identity_token_file')
1874 if not token_path:
1875 return None
1876 token_loader = self._token_loader_cls(token_path)
1877
1878 role_arn = self._get_config('role_arn')
1879 if not role_arn:
1880 error_msg = (
1881 'The provided profile or the current environment is '
1882 'configured to assume role with web identity but has no '
1883 'role ARN configured. Ensure that the profile has the role_arn'
1884 'configuration set or the AWS_ROLE_ARN env var is set.'
1885 )
1886 raise InvalidConfigError(error_msg=error_msg)
1887
1888 extra_args = {}
1889 role_session_name = self._get_config('role_session_name')
1890 if role_session_name is not None:
1891 extra_args['RoleSessionName'] = role_session_name
1892
1893 fetcher = AssumeRoleWithWebIdentityCredentialFetcher(
1894 client_creator=self._client_creator,
1895 web_identity_token_loader=token_loader,
1896 role_arn=role_arn,
1897 extra_args=extra_args,
1898 cache=self.cache,
1899 )
1900 # The initial credentials are empty and the expiration time is set
1901 # to now so that we can delay the call to assume role until it is
1902 # strictly needed.
1903 return DeferredRefreshableCredentials(
1904 method=self.METHOD,
1905 refresh_using=fetcher.fetch_credentials,
1906 )
1907
1908
1909class CanonicalNameCredentialSourcer:
1910 def __init__(self, providers):
1911 self._providers = providers
1912
1913 def is_supported(self, source_name):
1914 """Validates a given source name.
1915
1916 :type source_name: str
1917 :param source_name: The value of credential_source in the config
1918 file. This is the canonical name of the credential provider.
1919
1920 :rtype: bool
1921 :returns: True if the credential provider is supported,
1922 False otherwise.
1923 """
1924 return source_name in [p.CANONICAL_NAME for p in self._providers]
1925
1926 def source_credentials(self, source_name):
1927 """Loads source credentials based on the provided configuration.
1928
1929 :type source_name: str
1930 :param source_name: The value of credential_source in the config
1931 file. This is the canonical name of the credential provider.
1932
1933 :rtype: Credentials
1934 """
1935 source = self._get_provider(source_name)
1936 if isinstance(source, CredentialResolver):
1937 return source.load_credentials()
1938 return source.load()
1939
1940 def _get_provider(self, canonical_name):
1941 """Return a credential provider by its canonical name.
1942
1943 :type canonical_name: str
1944 :param canonical_name: The canonical name of the provider.
1945
1946 :raises UnknownCredentialError: Raised if no
1947 credential provider by the provided name
1948 is found.
1949 """
1950 provider = self._get_provider_by_canonical_name(canonical_name)
1951
1952 # The AssumeRole provider should really be part of the SharedConfig
1953 # provider rather than being its own thing, but it is not. It is
1954 # effectively part of both the SharedConfig provider and the
1955 # SharedCredentials provider now due to the way it behaves.
1956 # Therefore if we want either of those providers we should return
1957 # the AssumeRole provider with it.
1958 if canonical_name.lower() in ['sharedconfig', 'sharedcredentials']:
1959 assume_role_provider = self._get_provider_by_method('assume-role')
1960 if assume_role_provider is not None:
1961 # The SharedConfig or SharedCredentials provider may not be
1962 # present if it was removed for some reason, but the
1963 # AssumeRole provider could still be present. In that case,
1964 # return the assume role provider by itself.
1965 if provider is None:
1966 return assume_role_provider
1967
1968 # If both are present, return them both as a
1969 # CredentialResolver so that calling code can treat them as
1970 # a single entity.
1971 return CredentialResolver([assume_role_provider, provider])
1972
1973 if provider is None:
1974 raise UnknownCredentialError(name=canonical_name)
1975
1976 return provider
1977
1978 def _get_provider_by_canonical_name(self, canonical_name):
1979 """Return a credential provider by its canonical name.
1980
1981 This function is strict, it does not attempt to address
1982 compatibility issues.
1983 """
1984 for provider in self._providers:
1985 name = provider.CANONICAL_NAME
1986 # Canonical names are case-insensitive
1987 if name and name.lower() == canonical_name.lower():
1988 return provider
1989
1990 def _get_provider_by_method(self, method):
1991 """Return a credential provider by its METHOD name."""
1992 for provider in self._providers:
1993 if provider.METHOD == method:
1994 return provider
1995
1996
1997class ContainerProvider(CredentialProvider):
1998 METHOD = 'container-role'
1999 CANONICAL_NAME = 'EcsContainer'
2000 ENV_VAR = 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI'
2001 ENV_VAR_FULL = 'AWS_CONTAINER_CREDENTIALS_FULL_URI'
2002 ENV_VAR_AUTH_TOKEN = 'AWS_CONTAINER_AUTHORIZATION_TOKEN'
2003 ENV_VAR_AUTH_TOKEN_FILE = 'AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE'
2004
2005 def __init__(self, environ=None, fetcher=None):
2006 if environ is None:
2007 environ = os.environ
2008 if fetcher is None:
2009 fetcher = ContainerMetadataFetcher()
2010 self._environ = environ
2011 self._fetcher = fetcher
2012
2013 def load(self):
2014 # This cred provider is only triggered if the self.ENV_VAR is set,
2015 # which only happens if you opt into this feature.
2016 if self.ENV_VAR in self._environ or self.ENV_VAR_FULL in self._environ:
2017 return self._retrieve_or_fail()
2018
2019 def _retrieve_or_fail(self):
2020 if self._provided_relative_uri():
2021 full_uri = self._fetcher.full_url(self._environ[self.ENV_VAR])
2022 else:
2023 full_uri = self._environ[self.ENV_VAR_FULL]
2024 fetcher = self._create_fetcher(full_uri)
2025 creds = fetcher()
2026 return RefreshableCredentials(
2027 access_key=creds['access_key'],
2028 secret_key=creds['secret_key'],
2029 token=creds['token'],
2030 method=self.METHOD,
2031 expiry_time=_parse_if_needed(creds['expiry_time']),
2032 refresh_using=fetcher,
2033 account_id=creds.get('account_id'),
2034 )
2035
2036 def _build_headers(self):
2037 auth_token = None
2038 if self.ENV_VAR_AUTH_TOKEN_FILE in self._environ:
2039 auth_token_file_path = self._environ[self.ENV_VAR_AUTH_TOKEN_FILE]
2040 with open(auth_token_file_path) as token_file:
2041 auth_token = token_file.read()
2042 elif self.ENV_VAR_AUTH_TOKEN in self._environ:
2043 auth_token = self._environ[self.ENV_VAR_AUTH_TOKEN]
2044 if auth_token is not None:
2045 self._validate_auth_token(auth_token)
2046 return {'Authorization': auth_token}
2047
2048 def _validate_auth_token(self, auth_token):
2049 if "\r" in auth_token or "\n" in auth_token:
2050 raise ValueError("Auth token value is not a legal header value")
2051
2052 def _create_fetcher(self, full_uri, *args, **kwargs):
2053 def fetch_creds():
2054 try:
2055 headers = self._build_headers()
2056 response = self._fetcher.retrieve_full_uri(
2057 full_uri, headers=headers
2058 )
2059 except MetadataRetrievalError as e:
2060 logger.debug(
2061 "Error retrieving container metadata: %s", e, exc_info=True
2062 )
2063 raise CredentialRetrievalError(
2064 provider=self.METHOD, error_msg=str(e)
2065 )
2066 return {
2067 'access_key': response['AccessKeyId'],
2068 'secret_key': response['SecretAccessKey'],
2069 'token': response['Token'],
2070 'expiry_time': response['Expiration'],
2071 'account_id': response.get('AccountId'),
2072 }
2073
2074 return fetch_creds
2075
2076 def _provided_relative_uri(self):
2077 return self.ENV_VAR in self._environ
2078
2079
2080class CredentialResolver:
2081 def __init__(self, providers):
2082 """
2083
2084 :param providers: A list of ``CredentialProvider`` instances.
2085
2086 """
2087 self.providers = providers
2088
2089 def insert_before(self, name, credential_provider):
2090 """
2091 Inserts a new instance of ``CredentialProvider`` into the chain that
2092 will be tried before an existing one.
2093
2094 :param name: The short name of the credentials you'd like to insert the
2095 new credentials before. (ex. ``env`` or ``config``). Existing names
2096 & ordering can be discovered via ``self.available_methods``.
2097 :type name: string
2098
2099 :param cred_instance: An instance of the new ``Credentials`` object
2100 you'd like to add to the chain.
2101 :type cred_instance: A subclass of ``Credentials``
2102 """
2103 try:
2104 offset = [p.METHOD for p in self.providers].index(name)
2105 except ValueError:
2106 raise UnknownCredentialError(name=name)
2107 self.providers.insert(offset, credential_provider)
2108
2109 def insert_after(self, name, credential_provider):
2110 """
2111 Inserts a new type of ``Credentials`` instance into the chain that will
2112 be tried after an existing one.
2113
2114 :param name: The short name of the credentials you'd like to insert the
2115 new credentials after. (ex. ``env`` or ``config``). Existing names
2116 & ordering can be discovered via ``self.available_methods``.
2117 :type name: string
2118
2119 :param cred_instance: An instance of the new ``Credentials`` object
2120 you'd like to add to the chain.
2121 :type cred_instance: A subclass of ``Credentials``
2122 """
2123 offset = self._get_provider_offset(name)
2124 self.providers.insert(offset + 1, credential_provider)
2125
2126 def remove(self, name):
2127 """
2128 Removes a given ``Credentials`` instance from the chain.
2129
2130 :param name: The short name of the credentials instance to remove.
2131 :type name: string
2132 """
2133 available_methods = [p.METHOD for p in self.providers]
2134 if name not in available_methods:
2135 # It's not present. Fail silently.
2136 return
2137
2138 offset = available_methods.index(name)
2139 self.providers.pop(offset)
2140
2141 def get_provider(self, name):
2142 """Return a credential provider by name.
2143
2144 :type name: str
2145 :param name: The name of the provider.
2146
2147 :raises UnknownCredentialError: Raised if no
2148 credential provider by the provided name
2149 is found.
2150 """
2151 return self.providers[self._get_provider_offset(name)]
2152
2153 def _get_provider_offset(self, name):
2154 try:
2155 return [p.METHOD for p in self.providers].index(name)
2156 except ValueError:
2157 raise UnknownCredentialError(name=name)
2158
2159 def load_credentials(self):
2160 """
2161 Goes through the credentials chain, returning the first ``Credentials``
2162 that could be loaded.
2163 """
2164 # First provider to return a non-None response wins.
2165 for provider in self.providers:
2166 logger.debug("Looking for credentials via: %s", provider.METHOD)
2167 creds = provider.load()
2168 if creds is not None:
2169 return creds
2170
2171 # If we got here, no credentials could be found.
2172 # This feels like it should be an exception, but historically, ``None``
2173 # is returned.
2174 #
2175 # +1
2176 # -js
2177 return None
2178
2179
2180class SSOCredentialFetcher(CachedCredentialFetcher):
2181 _UTC_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
2182
2183 def __init__(
2184 self,
2185 start_url,
2186 sso_region,
2187 role_name,
2188 account_id,
2189 client_creator,
2190 token_loader=None,
2191 cache=None,
2192 expiry_window_seconds=None,
2193 token_provider=None,
2194 sso_session_name=None,
2195 ):
2196 self._client_creator = client_creator
2197 self._sso_region = sso_region
2198 self._role_name = role_name
2199 self._account_id = account_id
2200 self._start_url = start_url
2201 self._token_loader = token_loader
2202 self._token_provider = token_provider
2203 self._sso_session_name = sso_session_name
2204 super().__init__(cache, expiry_window_seconds)
2205
2206 def _create_cache_key(self):
2207 """Create a predictable cache key for the current configuration.
2208
2209 The cache key is intended to be compatible with file names.
2210 """
2211 args = {
2212 'roleName': self._role_name,
2213 'accountId': self._account_id,
2214 }
2215 if self._sso_session_name:
2216 args['sessionName'] = self._sso_session_name
2217 else:
2218 args['startUrl'] = self._start_url
2219 # NOTE: It would be good to hoist this cache key construction logic
2220 # into the CachedCredentialFetcher class as we should be consistent.
2221 # Unfortunately, the current assume role fetchers that sub class don't
2222 # pass separators resulting in non-minified JSON. In the long term,
2223 # all fetchers should use the below caching scheme.
2224 args = json.dumps(args, sort_keys=True, separators=(',', ':'))
2225 argument_hash = sha1(args.encode('utf-8')).hexdigest()
2226 return self._make_file_safe(argument_hash)
2227
2228 def _parse_timestamp(self, timestamp_ms):
2229 # fromtimestamp expects seconds so: milliseconds / 1000 = seconds
2230 timestamp_seconds = timestamp_ms / 1000.0
2231 timestamp = datetime.datetime.fromtimestamp(timestamp_seconds, tzutc())
2232 return timestamp.strftime(self._UTC_DATE_FORMAT)
2233
2234 def _get_credentials(self):
2235 """Get credentials by calling SSO get role credentials."""
2236 config = Config(
2237 signature_version=UNSIGNED,
2238 region_name=self._sso_region,
2239 )
2240 client = self._client_creator('sso', config=config)
2241 if self._token_provider:
2242 initial_token_data = self._token_provider.load_token()
2243 token = initial_token_data.get_frozen_token().token
2244 else:
2245 token = self._token_loader(self._start_url)['accessToken']
2246
2247 kwargs = {
2248 'roleName': self._role_name,
2249 'accountId': self._account_id,
2250 'accessToken': token,
2251 }
2252 try:
2253 response = client.get_role_credentials(**kwargs)
2254 except client.exceptions.UnauthorizedException:
2255 raise UnauthorizedSSOTokenError()
2256 credentials = response['roleCredentials']
2257
2258 credentials = {
2259 'ProviderType': 'sso',
2260 'Credentials': {
2261 'AccessKeyId': credentials['accessKeyId'],
2262 'SecretAccessKey': credentials['secretAccessKey'],
2263 'SessionToken': credentials['sessionToken'],
2264 'Expiration': self._parse_timestamp(credentials['expiration']),
2265 'AccountId': self._account_id,
2266 },
2267 }
2268 return credentials
2269
2270
2271class SSOProvider(CredentialProvider):
2272 METHOD = 'sso'
2273
2274 _SSO_TOKEN_CACHE_DIR = os.path.expanduser(
2275 os.path.join('~', '.aws', 'sso', 'cache')
2276 )
2277 _PROFILE_REQUIRED_CONFIG_VARS = (
2278 'sso_role_name',
2279 'sso_account_id',
2280 )
2281 _SSO_REQUIRED_CONFIG_VARS = (
2282 'sso_start_url',
2283 'sso_region',
2284 )
2285 _ALL_REQUIRED_CONFIG_VARS = (
2286 _PROFILE_REQUIRED_CONFIG_VARS + _SSO_REQUIRED_CONFIG_VARS
2287 )
2288
2289 def __init__(
2290 self,
2291 load_config,
2292 client_creator,
2293 profile_name,
2294 cache=None,
2295 token_cache=None,
2296 token_provider=None,
2297 ):
2298 if token_cache is None:
2299 token_cache = JSONFileCache(self._SSO_TOKEN_CACHE_DIR)
2300 self._token_cache = token_cache
2301 self._token_provider = token_provider
2302 if cache is None:
2303 cache = {}
2304 self.cache = cache
2305 self._load_config = load_config
2306 self._client_creator = client_creator
2307 self._profile_name = profile_name
2308
2309 def _load_sso_config(self):
2310 loaded_config = self._load_config()
2311 profiles = loaded_config.get('profiles', {})
2312 profile_name = self._profile_name
2313 profile_config = profiles.get(self._profile_name, {})
2314 sso_sessions = loaded_config.get('sso_sessions', {})
2315
2316 # Role name & Account ID indicate the cred provider should be used
2317 if all(
2318 c not in profile_config for c in self._PROFILE_REQUIRED_CONFIG_VARS
2319 ):
2320 return None
2321
2322 resolved_config, extra_reqs = self._resolve_sso_session_reference(
2323 profile_config, sso_sessions
2324 )
2325
2326 config = {}
2327 missing_config_vars = []
2328 all_required_configs = self._ALL_REQUIRED_CONFIG_VARS + extra_reqs
2329 for config_var in all_required_configs:
2330 if config_var in resolved_config:
2331 config[config_var] = resolved_config[config_var]
2332 else:
2333 missing_config_vars.append(config_var)
2334
2335 if missing_config_vars:
2336 missing = ', '.join(missing_config_vars)
2337 raise InvalidConfigError(
2338 error_msg=(
2339 f'The profile "{profile_name}" is configured to use SSO '
2340 f'but is missing required configuration: {missing}'
2341 )
2342 )
2343 return config
2344
2345 def _resolve_sso_session_reference(self, profile_config, sso_sessions):
2346 sso_session_name = profile_config.get('sso_session')
2347 if sso_session_name is None:
2348 # No reference to resolve, proceed with legacy flow
2349 return profile_config, ()
2350
2351 if sso_session_name not in sso_sessions:
2352 error_msg = f'The specified sso-session does not exist: "{sso_session_name}"'
2353 raise InvalidConfigError(error_msg=error_msg)
2354
2355 config = profile_config.copy()
2356 session = sso_sessions[sso_session_name]
2357 for config_var, val in session.items():
2358 # Validate any keys referenced in both profile and sso_session match
2359 if config.get(config_var, val) != val:
2360 error_msg = (
2361 f"The value for {config_var} is inconsistent between "
2362 f"profile ({config[config_var]}) and sso-session ({val})."
2363 )
2364 raise InvalidConfigError(error_msg=error_msg)
2365 config[config_var] = val
2366 return config, ('sso_session',)
2367
2368 def load(self):
2369 sso_config = self._load_sso_config()
2370 if not sso_config:
2371 return None
2372
2373 fetcher_kwargs = {
2374 'start_url': sso_config['sso_start_url'],
2375 'sso_region': sso_config['sso_region'],
2376 'role_name': sso_config['sso_role_name'],
2377 'account_id': sso_config['sso_account_id'],
2378 'client_creator': self._client_creator,
2379 'token_loader': SSOTokenLoader(cache=self._token_cache),
2380 'cache': self.cache,
2381 }
2382 if 'sso_session' in sso_config:
2383 fetcher_kwargs['sso_session_name'] = sso_config['sso_session']
2384 fetcher_kwargs['token_provider'] = self._token_provider
2385
2386 sso_fetcher = SSOCredentialFetcher(**fetcher_kwargs)
2387
2388 return DeferredRefreshableCredentials(
2389 method=self.METHOD,
2390 refresh_using=sso_fetcher.fetch_credentials,
2391 )