Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/filters/policystatement.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

59 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import json 

4 

5from .core import Filter 

6from c7n.utils import type_schema, format_string_values 

7 

8 

9class HasStatementFilter(Filter): 

10 """Find resources with matching access policy statements. 

11 

12 If you want to return resource statements that include the listed Action or 

13 NotAction, you can use PartialMatch instead of an exact match. 

14 

15 :example: 

16 

17 .. code-block:: yaml 

18 

19 policies: 

20 - name: sns-check-statement-id 

21 resource: sns 

22 filters: 

23 - type: has-statement 

24 statement_ids: 

25 - BlockNonSSL 

26 policies: 

27 - name: sns-check-block-non-ssl 

28 resource: sns 

29 filters: 

30 - type: has-statement 

31 statements: 

32 - Effect: Deny 

33 Action: 'SNS:Publish' 

34 Principal: '*' 

35 Condition: 

36 Bool: 

37 "aws:SecureTransport": "false" 

38 PartialMatch: 'Action' 

39 """ 

40 PARTIAL_MATCH_ELEMENTS = ['Action', 'NotAction'] 

41 schema = type_schema( 

42 'has-statement', 

43 statement_ids={'type': 'array', 'items': {'type': 'string'}}, 

44 statements={ 

45 'type': 'array', 

46 'items': { 

47 'type': 'object', 

48 'properties': { 

49 'Sid': {'type': 'string'}, 

50 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']}, 

51 'Principal': {'anyOf': [ 

52 {'type': 'string'}, 

53 {'type': 'object'}, {'type': 'array'}]}, 

54 'NotPrincipal': { 

55 'anyOf': [{'type': 'object'}, {'type': 'array'}]}, 

56 'Action': { 

57 'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

58 'NotAction': { 

59 'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

60 'Resource': { 

61 'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

62 'NotResource': { 

63 'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

64 'Condition': {'type': 'object'}, 

65 'PartialMatch': { 

66 'anyOf': [ 

67 {'type': 'string', "enum": PARTIAL_MATCH_ELEMENTS}, 

68 {'type': 'array', 'items': [ 

69 {"type": "string", "enum": PARTIAL_MATCH_ELEMENTS} 

70 ]} 

71 ] 

72 } 

73 }, 

74 'required': ['Effect'] 

75 } 

76 }) 

77 

78 def process(self, resources, event=None): 

79 return list(filter(None, map(self.process_resource, resources))) 

80 

81 def process_resource(self, resource): 

82 policy_attribute = getattr(self, 'policy_attribute', 'Policy') 

83 p = resource.get(policy_attribute) 

84 if p is None: 

85 return None 

86 p = json.loads(p) 

87 

88 required_ids_not_found = list(self.data.get('statement_ids', [])) 

89 resource_statements = p.get('Statement', []) 

90 # compare if the resource_statement sid is in the required_ids list 

91 for s in list(resource_statements): 

92 if s.get('Sid') in required_ids_not_found: 

93 required_ids_not_found.remove(s['Sid']) 

94 

95 # required_statements is the filter that we get from the c7n policy 

96 required_statements = format_string_values( 

97 list(self.data.get('statements', [])), 

98 **self.get_std_format_args(resource) 

99 ) 

100 

101 found_required_statements = self.__get_matched_statements( 

102 required_statements, 

103 resource_statements 

104 ) 

105 

106 # Both statement_ids and required_statements are found in the resource 

107 if (not required_ids_not_found) and \ 

108 (required_statements == found_required_statements): 

109 return resource 

110 return None 

111 

112 def action_resource_case_insensitive(self, actions): 

113 if isinstance(actions, str): 

114 actionsFormatted = [actions.lower()] 

115 else: 

116 actionsFormatted = [action.lower() for action in actions] 

117 return set(actionsFormatted) 

118 

119 def __get_matched_statements(self, required_stmts, resource_stmts): 

120 matched_statements = [] 

121 for required_statement in required_stmts: 

122 partial_match_elements = required_statement.pop('PartialMatch', []) 

123 

124 if isinstance(partial_match_elements, str): 

125 # If there's only one string value, make the value a list 

126 partial_match_elements = [partial_match_elements] 

127 

128 for resource_statement in resource_stmts: 

129 found = 0 

130 for req_key, req_value in required_statement.items(): 

131 if req_key in ['Action', 'NotAction'] and \ 

132 req_key in resource_statement: 

133 

134 resource_statement[req_key] = \ 

135 self.action_resource_case_insensitive( 

136 resource_statement[req_key]) 

137 req_value = self.action_resource_case_insensitive( 

138 req_value) 

139 

140 if req_key in partial_match_elements: 

141 if self.__match_partial_statement(req_key, 

142 req_value, 

143 resource_statement): 

144 found += 1 

145 

146 # If req_key is not a partial_match element, 

147 # do a regular full value match for a given req_key 

148 elif req_value == resource_statement.get(req_key): 

149 found += 1 

150 

151 if found and found == len(required_statement): 

152 matched_statements.append(required_statement) 

153 break 

154 

155 return matched_statements 

156 

157 def __match_partial_statement(self, partial_match_key, 

158 partial_match_value, resource_stmt): 

159 

160 # TO-DO: Add support for Condition json subset match. 

161 if partial_match_key in resource_stmt: 

162 if isinstance(partial_match_value, list): 

163 return set(partial_match_value).issubset( 

164 resource_stmt[partial_match_key]) 

165 elif isinstance(partial_match_value, set): 

166 return partial_match_value.issubset(resource_stmt[partial_match_key]) 

167 else: 

168 return partial_match_value in resource_stmt[partial_match_key] 

169 else: 

170 return False