Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/credentials.py: 28%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4Authentication utilities
5"""
6import threading
7import os
9from botocore.credentials import RefreshableCredentials
10from botocore.session import get_session
11from boto3 import Session
12import json
14from c7n.version import version
15from c7n.utils import get_retry
18# we still have some issues (see #5023) to work through to switch to
19# default regional endpoints, for now its opt-in.
20USE_STS_REGIONAL = os.environ.get(
21 'C7N_USE_STS_REGIONAL', '').lower() in ('yes', 'true')
24class CustodianSession(Session):
26 # track clients and return extant ones if present
27 _clients = {}
28 lock = threading.Lock()
30 def client(self, service_name, region_name=None, *args, **kw):
31 if kw.get('config'):
32 return super().client(service_name, region_name, *args, **kw)
34 key = self._cache_key(service_name, region_name)
35 client = self._clients.get(key)
36 if client is not None:
37 return client
39 with self.lock:
40 client = self._clients.get(key)
41 if client is not None:
42 return client
44 client = super().client(service_name, region_name, *args, **kw)
45 self._clients[key] = client
46 return client
48 def _cache_key(self, service_name, region_name):
49 region_name = region_name or self.region_name
50 return (
51 # namedtuple so stable comparison
52 hash(self.get_credentials().get_frozen_credentials()),
53 service_name,
54 region_name
55 )
57 @classmethod
58 def close(cls):
59 with cls.lock:
60 for c in cls._clients.values():
61 c.close()
62 cls._clients = {}
65class SessionFactory:
67 def __init__(
68 self, region, profile=None, assume_role=None, external_id=None, session_policy=None):
69 self.region = region
70 self.profile = profile
71 self.session_policy = session_policy
72 self.assume_role = assume_role
73 self.external_id = external_id
74 self.session_name = "CloudCustodian"
75 if 'C7N_SESSION_SUFFIX' in os.environ:
76 self.session_name = "%s@%s" % (
77 self.session_name, os.environ['C7N_SESSION_SUFFIX'])
78 self._subscribers = []
79 self._policy_name = ""
81 def _set_policy_name(self, name):
82 self._policy_name = name
84 policy_name = property(None, _set_policy_name)
86 def __call__(self, assume=True, region=None):
87 if self.assume_role and assume:
88 session = Session(profile_name=self.profile)
89 session = assumed_session(
90 self.assume_role, self.session_name, self.session_policy, session,
91 region or self.region, self.external_id)
92 else:
93 session = Session(
94 region_name=region or self.region, profile_name=self.profile)
96 return self.update(session)
98 def update(self, session):
99 session._session.user_agent_name = "c7n"
100 session._session.user_agent_version = version
101 if self._policy_name:
102 session._session.user_agent_extra = f"c7n/policy#{self._policy_name}"
104 for s in self._subscribers:
105 s(session)
107 return session
109 def set_subscribers(self, subscribers):
110 self._subscribers = subscribers
113def assumed_session(
114 role_arn, session_name, session_policy=None, session=None, region=None, external_id=None):
115 """STS Role assume a boto3.Session
117 With automatic credential renewal.
119 Args:
120 role_arn: iam role arn to assume
121 session_name: client session identifier
122 session: an optional extant session, note session is captured
123 in a function closure for renewing the sts assumed role.
125 :return: a boto3 session using the sts assumed role credentials
127 Notes: We have to poke at botocore internals a few times
128 """
129 if session is None:
130 session = Session()
132 retry = get_retry(('Throttling',))
134 def refresh():
136 parameters = {"RoleArn": role_arn, "RoleSessionName": session_name}
137 if session_policy is not None:
138 parameters['Policy'] = json.dumps(session_policy)
140 if external_id is not None:
141 parameters['ExternalId'] = external_id
143 credentials = retry(
144 get_sts_client(
145 session, region).assume_role, **parameters)['Credentials']
146 return dict(
147 access_key=credentials['AccessKeyId'],
148 secret_key=credentials['SecretAccessKey'],
149 token=credentials['SessionToken'],
150 # Silly that we basically stringify so it can be parsed again
151 expiry_time=credentials['Expiration'].isoformat())
153 session_credentials = RefreshableCredentials.create_from_metadata(
154 metadata=refresh(),
155 refresh_using=refresh,
156 method='sts-assume-role')
158 # so dirty.. it hurts, no clean way to set this outside of the
159 # internals poke. There's some work upstream on making this nicer
160 # but its pretty baroque as well with upstream support.
161 # https://github.com/boto/boto3/issues/443
162 # https://github.com/boto/botocore/issues/761
164 s = get_session()
165 s._credentials = session_credentials
166 if region is None:
167 region = s.get_config_variable('region') or 'us-east-1'
168 s.set_config_variable('region', region)
169 return Session(botocore_session=s)
172def get_sts_client(session, region):
173 """Get the AWS STS endpoint specific for the given region.
175 Returns the global endpoint if region is not specified.
177 For the list of regional endpoints, see https://amzn.to/2ohJgtR
178 """
179 if region and USE_STS_REGIONAL:
180 endpoint_url = "https://sts.{}.amazonaws.com".format(region)
181 region_name = region
182 else:
183 endpoint_url = None
184 region_name = None
185 return session.client(
186 'sts', endpoint_url=endpoint_url, region_name=region_name)