Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/iamaccess.py: 40%
195 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4IAM Resource Policy Checker
5---------------------------
7When securing resources with iam policies, we want to parse and evaluate
8the resource's policy for any cross account or public access grants that
9are not intended.
11In general, iam policies can be complex, and where possible using iam
12simulate is preferrable, but requires passing the caller's arn, which
13is not feasible when we're evaluating who the valid set of callers
14are.
17References
19- IAM Policy Evaluation
20 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_evaluation-logic.html
22- IAM Policy Reference
23 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html
25- IAM Global Condition Context Keys
26 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html
28"""
29import fnmatch
30import logging
31import json
33from c7n.filters import Filter
34from c7n.resolver import ValuesFrom
35from c7n.utils import type_schema
37log = logging.getLogger('custodian.iamaccess')
40def _account(arn):
41 # we could try except but some minor runtime cost, basically flag
42 # invalids values
43 if arn.count(":") < 4:
44 return arn
45 return arn.split(':', 5)[4]
48class PolicyChecker:
49 """
50 checker_config:
51 - check_actions: only check one of the specified actions
52 - everyone_only: only check for wildcard permission grants
53 - allowed_accounts: permission grants to these accounts are okay
54 - whitelist_conditions: a list of conditions that are considered
55 sufficient enough to whitelist the statement.
56 """
57 def __init__(self, checker_config):
58 self.checker_config = checker_config
60 # Config properties
61 @property
62 def allowed_accounts(self):
63 return self.checker_config.get('allowed_accounts', ())
65 @property
66 def everyone_only(self):
67 return self.checker_config.get('everyone_only', False)
69 @property
70 def check_actions(self):
71 return self.checker_config.get('check_actions', ())
73 @property
74 def whitelist_conditions(self):
75 return set(v.lower() for v in self.checker_config.get('whitelist_conditions', ()))
77 @property
78 def allowed_vpce(self):
79 return self.checker_config.get('allowed_vpce', ())
81 @property
82 def allowed_vpc(self):
83 return self.checker_config.get('allowed_vpc', ())
85 @property
86 def allowed_orgid(self):
87 return self.checker_config.get('allowed_orgid', ())
89 # Policy statement handling
90 def check(self, policy_text):
91 if isinstance(policy_text, str):
92 policy = json.loads(policy_text)
93 else:
94 policy = policy_text
96 violations = []
97 for s in policy.get('Statement', ()):
98 if self.handle_statement(s):
99 violations.append(s)
100 return violations
102 def handle_statement(self, s):
103 if (all((self.handle_principal(s),
104 self.handle_effect(s),
105 self.handle_action(s))) and not self.handle_conditions(s)):
106 return s
108 def handle_action(self, s):
109 if self.check_actions:
110 actions = s.get('Action')
111 actions = isinstance(actions, str) and (actions,) or actions
112 for a in actions:
113 if fnmatch.filter(self.check_actions, a):
114 return True
115 return False
116 return True
118 def handle_effect(self, s):
119 if s['Effect'] == 'Allow':
120 return True
122 def handle_principal(self, s):
123 if 'NotPrincipal' in s:
124 return True
126 principals = s.get('Principal')
127 if not principals:
128 return True
129 if not isinstance(principals, dict):
130 principals = {'AWS': principals}
132 # Ignore service principals, merge the rest into a single set
133 non_service_principals = set()
134 for principal_type in set(principals) - {'Service'}:
135 p = principals[principal_type]
136 non_service_principals.update({p} if isinstance(p, str) else p)
138 if not non_service_principals:
139 return False
141 principal_ok = True
142 for pid in non_service_principals:
143 if pid == '*':
144 principal_ok = False
145 elif self.everyone_only:
146 continue
147 elif pid.startswith('arn:aws:iam::cloudfront:user'):
148 continue
149 else:
150 account_id = _account(pid)
151 if account_id not in self.allowed_accounts:
152 principal_ok = False
153 return not principal_ok
155 def handle_conditions(self, s):
156 conditions = self.normalize_conditions(s)
157 if not conditions:
158 return False
160 results = []
161 for c in conditions:
162 results.append(self.handle_condition(s, c))
164 return all(results)
166 def handle_condition(self, s, c):
167 if not c['op']:
168 return False
169 if c['key'] in self.whitelist_conditions:
170 return True
171 handler_name = "handle_%s" % c['key'].replace('-', '_').replace(':', '_')
172 handler = getattr(self, handler_name, None)
173 if handler is None:
174 log.warning("no handler:%s op:%s key:%s values:%s" % (
175 handler_name, c['op'], c['key'], c['values']))
176 return
177 return not handler(s, c)
179 def normalize_conditions(self, s):
180 s_cond = []
181 if 'Condition' not in s:
182 return s_cond
184 conditions = (
185 'StringEquals',
186 'StringEqualsIgnoreCase',
187 'StringLike',
188 'ArnEquals',
189 'ArnLike',
190 'IpAddress',
191 'NotIpAddress')
192 set_conditions = ('ForAllValues', 'ForAnyValues')
194 for s_cond_op in list(s['Condition'].keys()):
195 cond = {'op': s_cond_op}
197 if s_cond_op not in conditions:
198 if not any(s_cond_op.startswith(c) for c in set_conditions):
199 continue
201 cond['key'] = list(s['Condition'][s_cond_op].keys())[0]
202 cond['values'] = s['Condition'][s_cond_op][cond['key']]
203 cond['values'] = (
204 isinstance(cond['values'],
205 str) and (cond['values'],) or cond['values'])
206 cond['key'] = cond['key'].lower()
207 s_cond.append(cond)
209 return s_cond
211 # Condition handlers
213 # sns default policy
214 def handle_aws_sourceowner(self, s, c):
215 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts))
217 # AWS Connect default policy on Lex
218 def handle_aws_sourceaccount(self, s, c):
219 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts))
221 # s3 logging
222 def handle_aws_sourcearn(self, s, c):
223 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts))
225 def handle_aws_sourceip(self, s, c):
226 return False
228 def handle_aws_sourcevpce(self, s, c):
229 if not self.allowed_vpce:
230 return False
231 return bool(set(map(_account, c['values'])).difference(self.allowed_vpce))
233 def handle_aws_sourcevpc(self, s, c):
234 if not self.allowed_vpc:
235 return False
236 return bool(set(map(_account, c['values'])).difference(self.allowed_vpc))
238 def handle_aws_principalorgid(self, s, c):
239 if not self.allowed_orgid:
240 return True
241 return bool(set(map(_account, c['values'])).difference(self.allowed_orgid))
244class CrossAccountAccessFilter(Filter):
245 """Check a resource's embedded iam policy for cross account access.
246 """
248 schema = type_schema(
249 'cross-account',
250 # only consider policies that grant one of the given actions.
251 actions={'type': 'array', 'items': {'type': 'string'}},
252 # only consider policies which grant to *
253 everyone_only={'type': 'boolean'},
254 # disregard statements using these conditions.
255 whitelist_conditions={'type': 'array', 'items': {'type': 'string'}},
256 # white list accounts
257 whitelist_from={'$ref': '#/definitions/filters_common/value_from'},
258 whitelist={'type': 'array', 'items': {'type': 'string'}},
259 whitelist_orgids_from={'$ref': '#/definitions/filters_common/value_from'},
260 whitelist_orgids={'type': 'array', 'items': {'type': 'string'}},
261 whitelist_vpce_from={'$ref': '#/definitions/filters_common/value_from'},
262 whitelist_vpce={'type': 'array', 'items': {'type': 'string'}},
263 whitelist_vpc_from={'$ref': '#/definitions/filters_common/value_from'},
264 whitelist_vpc={'type': 'array', 'items': {'type': 'string'}})
266 policy_attribute = 'Policy'
267 annotation_key = 'CrossAccountViolations'
269 checker_factory = PolicyChecker
271 def process(self, resources, event=None):
272 self.everyone_only = self.data.get('everyone_only', False)
273 self.conditions = set(self.data.get(
274 'whitelist_conditions',
275 ("aws:userid", "aws:username")))
276 self.actions = self.data.get('actions', ())
277 self.accounts = self.get_accounts()
278 self.vpcs = self.get_vpcs()
279 self.vpces = self.get_vpces()
280 self.orgid = self.get_orgids()
281 self.checker_config = getattr(self, 'checker_config', None) or {}
282 self.checker_config.update(
283 {'allowed_accounts': self.accounts,
284 'allowed_vpc': self.vpcs,
285 'allowed_vpce': self.vpces,
286 'allowed_orgid': self.orgid,
287 'check_actions': self.actions,
288 'everyone_only': self.everyone_only,
289 'whitelist_conditions': self.conditions})
290 self.checker = self.checker_factory(self.checker_config)
291 return super(CrossAccountAccessFilter, self).process(resources, event)
293 def get_accounts(self):
294 owner_id = self.manager.config.account_id
295 accounts = set(self.data.get('whitelist', ()))
296 if 'whitelist_from' in self.data:
297 values = ValuesFrom(self.data['whitelist_from'], self.manager)
298 accounts = accounts.union(values.get_values())
299 accounts.add(owner_id)
300 return accounts
302 def get_vpcs(self):
303 vpc = set(self.data.get('whitelist_vpc', ()))
304 if 'whitelist_vpc_from' in self.data:
305 values = ValuesFrom(self.data['whitelist_vpc_from'], self.manager)
306 vpc = vpc.union(values.get_values())
307 return vpc
309 def get_vpces(self):
310 vpce = set(self.data.get('whitelist_vpce', ()))
311 if 'whitelist_vpce_from' in self.data:
312 values = ValuesFrom(self.data['whitelist_vpce_from'], self.manager)
313 vpce = vpce.union(values.get_values())
314 return vpce
316 def get_orgids(self):
317 org_ids = set(self.data.get('whitelist_orgids', ()))
318 if 'whitelist_orgids_from' in self.data:
319 values = ValuesFrom(self.data['whitelist_orgids_from'], self.manager)
320 org_ids = org_ids.union(values.get_values())
321 return org_ids
323 def get_resource_policy(self, r):
324 return r.get(self.policy_attribute, None)
326 def __call__(self, r):
327 p = self.get_resource_policy(r)
328 if p is None:
329 return False
330 violations = self.checker.check(p)
331 if violations:
332 r[self.annotation_key] = violations
333 return True