Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/filters/policystatement.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

72 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import json 

4 

5from .core import Filter 

6from c7n.utils import ( 

7 type_schema, 

8 format_string_values, 

9 merge_dict, 

10 compare_dicts_using_sets, 

11 format_to_set, 

12 format_dict_with_sets 

13) 

14 

15 

16class HasStatementFilter(Filter): 

17 """Find resources with matching access policy statements. 

18 

19 If you want to return resource statements that include the listed key, 

20 e.g. Action, you can use PartialMatch instead of an exact match. 

21 

22 :example: 

23 

24 .. code-block:: yaml 

25 

26 policies: 

27 - name: sns-check-statement-id 

28 resource: sns 

29 filters: 

30 - type: has-statement 

31 statement_ids: 

32 - BlockNonSSL 

33 policies: 

34 - name: sns-check-block-non-ssl 

35 resource: sns 

36 filters: 

37 - type: has-statement 

38 statements: 

39 - Effect: Deny 

40 Action: 'SNS:Publish' 

41 Principal: '*' 

42 Condition: 

43 Bool: 

44 "aws:SecureTransport": "false" 

45 PartialMatch: 'Action' 

46 """ 

47 PARTIAL_MATCH_ELEMENTS = ['Action', 

48 'NotAction', 

49 'Principal', 

50 'NotPrincipal', 

51 'Resource', 

52 'NotResource', 

53 'Condition' 

54 ] 

55 schema = type_schema( 

56 'has-statement', 

57 statement_ids={'type': 'array', 'items': {'type': 'string'}}, 

58 statements={ 

59 'type': 'array', 

60 'items': { 

61 'type': 'object', 

62 'properties': { 

63 'Sid': {'type': 'string'}, 

64 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']}, 

65 'Principal': {'anyOf': [ 

66 {'type': 'string'}, 

67 {'type': 'object'}, {'type': 'array'}]}, 

68 'NotPrincipal': { 

69 'anyOf': [{'type': 'object'}, {'type': 'array'}]}, 

70 'Action': { 

71 'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

72 'NotAction': { 

73 'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

74 'Resource': { 

75 'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

76 'NotResource': { 

77 'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

78 'Condition': {'type': 'object'}, 

79 'PartialMatch': { 

80 'anyOf': [ 

81 {'type': 'string', "enum": PARTIAL_MATCH_ELEMENTS}, 

82 {'type': 'array', 'items': [ 

83 {"type": "string", "enum": PARTIAL_MATCH_ELEMENTS} 

84 ]} 

85 ] 

86 } 

87 }, 

88 'required': ['Effect'] 

89 } 

90 }) 

91 

92 def process(self, resources, event=None): 

93 return list(filter(None, map(self.process_resource, resources))) 

94 

95 def process_resource(self, resource): 

96 policy_attribute = getattr(self, 'policy_attribute', 'Policy') 

97 p = resource.get(policy_attribute) 

98 if p is None: 

99 return None 

100 p = json.loads(p) 

101 

102 required_ids_not_found = list(self.data.get('statement_ids', [])) 

103 resource_statements = p.get('Statement', []) 

104 # compare if the resource_statement sid is in the required_ids list 

105 for s in list(resource_statements): 

106 if s.get('Sid') in required_ids_not_found: 

107 required_ids_not_found.remove(s['Sid']) 

108 

109 # required_statements is the filter that we get from the c7n policy 

110 required_statements = format_string_values( 

111 list(self.data.get('statements', [])), 

112 **self.get_std_format_args(resource) 

113 ) 

114 

115 found_required_statements = self.__get_matched_statements( 

116 required_statements, 

117 resource_statements 

118 ) 

119 

120 # Both statement_ids and required_statements are found in the resource 

121 if (not required_ids_not_found) and \ 

122 (required_statements == found_required_statements): 

123 return resource 

124 return None 

125 

126 # Use set data type for comparing lists with different order of items 

127 def action_resource_case_insensitive(self, actions): 

128 if isinstance(actions, str): 

129 actionsFormatted = [actions.lower()] 

130 else: 

131 actionsFormatted = [action.lower() for action in actions] 

132 return set(actionsFormatted) 

133 

134 def __get_matched_statements(self, required_stmts, resource_stmts): 

135 matched_statements = [] 

136 for required_statement in required_stmts: 

137 partial_match_elements = required_statement.pop('PartialMatch', []) 

138 

139 if isinstance(partial_match_elements, str): 

140 # If there's only one string value, make the value a list 

141 partial_match_elements = [partial_match_elements] 

142 

143 for resource_statement in resource_stmts: 

144 found = 0 

145 for req_key, req_value in required_statement.items(): 

146 if req_key in ['Action', 'NotAction'] and \ 

147 req_key in resource_statement: 

148 

149 resource_statement[req_key] = \ 

150 self.action_resource_case_insensitive( 

151 resource_statement[req_key]) 

152 req_value = self.action_resource_case_insensitive( 

153 req_value) 

154 

155 if req_key in partial_match_elements: 

156 if self.__match_partial_statement(req_key, 

157 req_value, 

158 resource_statement): 

159 found += 1 

160 

161 else: 

162 if req_key in resource_statement: 

163 if isinstance(req_value, dict): 

164 req_value = format_dict_with_sets(req_value) 

165 else: 

166 req_value = format_to_set(req_value) 

167 

168 if isinstance(resource_statement[req_key], dict): 

169 resource_statement[req_key] = format_dict_with_sets( 

170 resource_statement[req_key] 

171 ) 

172 else: 

173 resource_statement[req_key] = format_to_set( 

174 resource_statement[req_key] 

175 ) 

176 

177 # If req_key is not a partial_match element, 

178 # do a regular full value match for a given req_value 

179 # and the value in the resource_statement 

180 if req_value == resource_statement.get(req_key): 

181 found += 1 

182 

183 if found and found == len(required_statement): 

184 matched_statements.append(required_statement) 

185 break 

186 

187 return matched_statements 

188 

189 def __match_partial_statement(self, partial_match_key, 

190 partial_match_value, resource_stmt): 

191 

192 if partial_match_key in resource_stmt: 

193 resource_stmt_value = resource_stmt.get(partial_match_key) 

194 

195 # set as a list in case partial_match_value is a list with len of 1 

196 if (isinstance(resource_stmt_value, str) or 

197 isinstance(resource_stmt_value, list) 

198 ): 

199 resource_stmt_value = format_to_set(resource_stmt_value) 

200 

201 if isinstance(partial_match_value, list): 

202 return format_to_set(partial_match_value).issubset(resource_stmt_value) 

203 elif isinstance(partial_match_value, set): 

204 return partial_match_value.issubset(resource_stmt_value) 

205 elif isinstance(partial_match_value, dict): 

206 merged_stmts = merge_dict( 

207 partial_match_value, resource_stmt_value 

208 ) 

209 return compare_dicts_using_sets( 

210 merged_stmts, resource_stmt_value 

211 ) 

212 else: 

213 return partial_match_value in resource_stmt_value 

214 else: 

215 return False