Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/policystatement.py: 20%
44 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import json
5from .core import Filter
6from c7n.utils import type_schema, format_string_values
9class HasStatementFilter(Filter):
10 """Find resources with matching access policy statements.
11 :Example:
13 .. code-block:: yaml
15 policies:
16 - name: sns-check-statement-id
17 resource: sns
18 filters:
19 - type: has-statement
20 statement_ids:
21 - BlockNonSSL
22 policies:
23 - name: sns-check-block-non-ssl
24 resource: sns
25 filters:
26 - type: has-statement
27 statements:
28 - Effect: Deny
29 Action: 'SNS:Publish'
30 Principal: '*'
31 Condition:
32 Bool:
33 "aws:SecureTransport": "false"
34 """
35 schema = type_schema(
36 'has-statement',
37 statement_ids={'type': 'array', 'items': {'type': 'string'}},
38 statements={
39 'type': 'array',
40 'items': {
41 'type': 'object',
42 'properties': {
43 'Sid': {'type': 'string'},
44 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']},
45 'Principal': {'anyOf': [
46 {'type': 'string'},
47 {'type': 'object'}, {'type': 'array'}]},
48 'NotPrincipal': {
49 'anyOf': [{'type': 'object'}, {'type': 'array'}]},
50 'Action': {
51 'anyOf': [{'type': 'string'}, {'type': 'array'}]},
52 'NotAction': {
53 'anyOf': [{'type': 'string'}, {'type': 'array'}]},
54 'Resource': {
55 'anyOf': [{'type': 'string'}, {'type': 'array'}]},
56 'NotResource': {
57 'anyOf': [{'type': 'string'}, {'type': 'array'}]},
58 'Condition': {'type': 'object'}
59 },
60 'required': ['Effect']
61 }
62 })
64 def process(self, resources, event=None):
65 return list(filter(None, map(self.process_resource, resources)))
67 def action_resource_case_insensitive(self, actions):
68 if isinstance(actions, str):
69 if len(actions.split(':')) > 1:
70 actionsFormatted = '{}:{}'.format(actions.split(':')[0].lower(),
71 actions.split(':')[1])
72 else:
73 actionsFormatted = actions
74 else:
75 actionsFormatted = []
76 for action in actions:
77 actionsFormatted.append('{}:{}'.format(action.split(':')[0].lower(),
78 action.split(':')[1]))
79 return actionsFormatted
81 def process_resource(self, resource):
82 policy_attribute = getattr(self, 'policy_attribute', 'Policy')
83 p = resource.get(policy_attribute)
84 if p is None:
85 return None
86 p = json.loads(p)
88 required = list(self.data.get('statement_ids', []))
89 statements = p.get('Statement', [])
90 for s in list(statements):
91 if s.get('Sid') in required:
92 required.remove(s['Sid'])
94 required_statements = list(self.data.get('statements', []))
96 required_statements = format_string_values(list(self.data.get('statements', [])),
97 **self.get_std_format_args(resource))
99 for required_statement in required_statements:
100 for statement in statements:
101 found = 0
102 for key, value in required_statement.items():
103 if key in ['Action', 'NotAction']:
104 if key in statement and self.action_resource_case_insensitive(value) \
105 == self.action_resource_case_insensitive(statement[key]):
106 found += 1
107 else:
108 if key in statement and value == statement[key]:
109 found += 1
110 if found and found == len(required_statement):
111 required_statements.remove(required_statement)
112 break
114 if (self.data.get('statement_ids', []) and not required) or \
115 (self.data.get('statements', []) and not required_statements):
116 return resource
117 return None