Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/credentials.py: 45%
62 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4Authentication utilities
5"""
6import os
8from botocore.credentials import RefreshableCredentials
9from botocore.session import get_session
10from boto3 import Session
12from c7n.version import version
13from c7n.utils import get_retry
16# we still have some issues (see #5023) to work through to switch to
17# default regional endpoints, for now its opt-in.
18USE_STS_REGIONAL = os.environ.get(
19 'C7N_USE_STS_REGIONAL', '').lower() in ('yes', 'true')
22class SessionFactory:
24 def __init__(self, region, profile=None, assume_role=None, external_id=None):
25 self.region = region
26 self.profile = profile
27 self.assume_role = assume_role
28 self.external_id = external_id
29 self.session_name = "CloudCustodian"
30 if 'C7N_SESSION_SUFFIX' in os.environ:
31 self.session_name = "%s@%s" % (
32 self.session_name, os.environ['C7N_SESSION_SUFFIX'])
33 self._subscribers = []
34 self._policy_name = ""
36 def _set_policy_name(self, name):
37 self._policy_name = name
39 policy_name = property(None, _set_policy_name)
41 def __call__(self, assume=True, region=None):
42 if self.assume_role and assume:
43 session = Session(profile_name=self.profile)
44 session = assumed_session(
45 self.assume_role, self.session_name, session,
46 region or self.region, self.external_id)
47 else:
48 session = Session(
49 region_name=region or self.region, profile_name=self.profile)
51 return self.update(session)
53 def update(self, session):
54 session._session.user_agent_name = "c7n"
55 session._session.user_agent_version = version
56 if self._policy_name:
57 session._session.user_agent_extra = f"c7n/policy#{self._policy_name}"
59 for s in self._subscribers:
60 s(session)
62 return session
64 def set_subscribers(self, subscribers):
65 self._subscribers = subscribers
68def assumed_session(role_arn, session_name, session=None, region=None, external_id=None):
69 """STS Role assume a boto3.Session
71 With automatic credential renewal.
73 Args:
74 role_arn: iam role arn to assume
75 session_name: client session identifier
76 session: an optional extant session, note session is captured
77 in a function closure for renewing the sts assumed role.
79 :return: a boto3 session using the sts assumed role credentials
81 Notes: We have to poke at botocore internals a few times
82 """
83 if session is None:
84 session = Session()
86 retry = get_retry(('Throttling',))
88 def refresh():
90 parameters = {"RoleArn": role_arn, "RoleSessionName": session_name}
92 if external_id is not None:
93 parameters['ExternalId'] = external_id
95 credentials = retry(
96 get_sts_client(
97 session, region).assume_role, **parameters)['Credentials']
98 return dict(
99 access_key=credentials['AccessKeyId'],
100 secret_key=credentials['SecretAccessKey'],
101 token=credentials['SessionToken'],
102 # Silly that we basically stringify so it can be parsed again
103 expiry_time=credentials['Expiration'].isoformat())
105 session_credentials = RefreshableCredentials.create_from_metadata(
106 metadata=refresh(),
107 refresh_using=refresh,
108 method='sts-assume-role')
110 # so dirty.. it hurts, no clean way to set this outside of the
111 # internals poke. There's some work upstream on making this nicer
112 # but its pretty baroque as well with upstream support.
113 # https://github.com/boto/boto3/issues/443
114 # https://github.com/boto/botocore/issues/761
116 s = get_session()
117 s._credentials = session_credentials
118 if region is None:
119 region = s.get_config_variable('region') or 'us-east-1'
120 s.set_config_variable('region', region)
121 return Session(botocore_session=s)
124def get_sts_client(session, region):
125 """Get the AWS STS endpoint specific for the given region.
127 Returns the global endpoint if region is not specified.
129 For the list of regional endpoints, see https://amzn.to/2ohJgtR
130 """
131 if region and USE_STS_REGIONAL:
132 endpoint_url = "https://sts.{}.amazonaws.com".format(region)
133 region_name = region
134 else:
135 endpoint_url = None
136 region_name = None
137 return session.client(
138 'sts', endpoint_url=endpoint_url, region_name=region_name)