Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/related.py: 25%

136 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import importlib 

4from functools import lru_cache 

5 

6from .core import ValueFilter, OPERATORS 

7from c7n.query import ChildResourceQuery 

8from c7n.utils import jmespath_search 

9 

10 

11class RelatedResourceFilter(ValueFilter): 

12 

13 schema_alias = False 

14 

15 RelatedResource = None 

16 RelatedIdsExpression = None 

17 AnnotationKey = None 

18 FetchThreshold = 10 

19 

20 def get_permissions(self): 

21 return self.get_resource_manager().get_permissions() 

22 

23 def validate(self): 

24 name = self.__class__.__name__ 

25 if self.RelatedIdsExpression is None: 

26 raise ValueError( 

27 "%s Filter requires resource expression" % name) 

28 # if self.AnnotationKey is None: 

29 # raise ValueError( 

30 # "%s Filter requires annotation key" % name) 

31 

32 if self.RelatedResource is None: 

33 raise ValueError( 

34 "%s Filter requires resource manager spec" % name) 

35 return super(RelatedResourceFilter, self).validate() 

36 

37 def get_related_ids(self, resources): 

38 return set(jmespath_search( 

39 "[].%s" % self.RelatedIdsExpression, resources)) 

40 

41 def get_related(self, resources): 

42 resource_manager = self.get_resource_manager() 

43 related_ids = self.get_related_ids(resources) 

44 model = resource_manager.get_model() 

45 if len(related_ids) < self.FetchThreshold: 

46 related = resource_manager.get_resources(list(related_ids)) 

47 else: 

48 related = resource_manager.resources() 

49 

50 if related is None: 

51 return {} 

52 

53 return {r[model.id]: r for r in related 

54 if r[model.id] in related_ids} 

55 

56 @lru_cache(maxsize=None) 

57 def get_resource_manager(self): 

58 mod_path, class_name = self.RelatedResource.rsplit('.', 1) 

59 module = importlib.import_module(mod_path) 

60 manager_class = getattr(module, class_name) 

61 return manager_class(self.manager.ctx, {}) 

62 

63 def process_resource(self, resource, related): 

64 related_ids = self.get_related_ids([resource]) 

65 model = self.manager.get_model() 

66 op = self.data.get('operator', 'or') 

67 found = [] 

68 

69 if self.data.get('match-resource') is True: 

70 self.data['value'] = self.get_resource_value( 

71 self.data['key'], resource) 

72 

73 if self.data.get('value_type') == 'resource_count': 

74 count_matches = OPERATORS[self.data.get('op')](len(related_ids), self.data.get('value')) 

75 if count_matches: 

76 self._add_annotations(related_ids, resource) 

77 return count_matches 

78 

79 for rid in related_ids: 

80 robj = related.get(rid, None) 

81 if robj is None: 

82 # in the event that the filter is looking specifically for absent values, we can 

83 # safely assume that the non-existant related resource will have an absent value at 

84 # any given key 

85 if self.data['value'] == 'absent': 

86 found.append(rid) 

87 else: 

88 self.log.warning( 

89 "Resource %s:%s references non existant %s: %s", 

90 self.manager.type, 

91 resource[model.id], 

92 self.RelatedResource.rsplit('.', 1)[-1], 

93 rid) 

94 continue 

95 if self.match(robj): 

96 found.append(rid) 

97 

98 if found: 

99 self._add_annotations(found, resource) 

100 

101 if op == 'or' and found: 

102 return True 

103 elif op == 'and' and len(found) == len(related_ids): 

104 return True 

105 return False 

106 

107 def _add_annotations(self, related_ids, resource): 

108 if self.AnnotationKey is not None: 

109 akey = 'c7n:%s' % self.AnnotationKey 

110 resource[akey] = list(set(related_ids).union(resource.get(akey, []))) 

111 

112 def process(self, resources, event=None): 

113 related = self.get_related(resources) 

114 return [r for r in resources if self.process_resource(r, related)] 

115 

116 

117class RelatedResourceByIdFilter(RelatedResourceFilter): 

118 """ 

119 Value filter for related resources in which the main resource only contains the related 

120 resource id. 

121 """ 

122 

123 RelatedResourceByIdExpression = None 

124 

125 def get_related(self, resources): 

126 resource_manager = self.get_resource_manager() 

127 related_ids = self.get_related_ids(resources) 

128 

129 related = {} 

130 for r in resource_manager.resources(): 

131 matched_vpc = self.get_related_by_ids(r) & related_ids 

132 if matched_vpc: 

133 for vpc in matched_vpc: 

134 related_resources = related.get(vpc, []) 

135 related_resources.append(r) 

136 related[vpc] = related_resources 

137 return related 

138 

139 def get_related_by_ids(self, resources): 

140 RelatedResourceKey = self.RelatedResourceByIdExpression or self.RelatedIdsExpression 

141 ids = jmespath_search("%s" % RelatedResourceKey, resources) 

142 if isinstance(ids, str): 

143 ids = [ids] 

144 return set(ids) 

145 

146 def process_resource(self, resource, related): 

147 related_ids = self.get_related_ids([resource]) 

148 op = self.data.get('operator', 'or') 

149 found = [] 

150 

151 if self.data.get('match-resource') is True: 

152 self.data['value'] = self.get_resource_value( 

153 self.data['key'], resource) 

154 

155 if self.data.get('value_type') == 'resource_count': 

156 count_matches = OPERATORS[self.data.get('op')](len(related_ids), self.data.get('value')) 

157 if count_matches: 

158 self._add_annotations(related_ids, resource) 

159 return count_matches 

160 

161 for rid in related_ids: 

162 robjs = related.get(rid, [None]) 

163 for robj in robjs: 

164 if robj is None: 

165 continue 

166 if self.match(robj): 

167 found.append(rid) 

168 

169 if found: 

170 self._add_annotations(found, resource) 

171 

172 if op == 'or' and found: 

173 return True 

174 elif op == 'and' and len(found) == len(related_ids): 

175 return True 

176 return False 

177 

178 

179class ChildResourceFilter(RelatedResourceFilter): 

180 ChildResource = None 

181 RelatedIdsExpression = None 

182 ChildResourceParentKey = "c7n:parent-id" 

183 

184 def get_related(self, resources): 

185 self.child_resources = {} 

186 

187 parent_ids = self.get_related_ids(resources) 

188 

189 child_resource_manager = self.get_resource_manager() 

190 child_query = ChildResourceQuery( 

191 child_resource_manager.session_factory, 

192 child_resource_manager, 

193 ) 

194 children = child_query.filter(child_resource_manager, parent_ids=list(parent_ids)) 

195 for r in children: 

196 self.child_resources.setdefault(r[self.ChildResourceParentKey], []).append(r) 

197 

198 return self.child_resources 

199 

200 def _add_annotations(self, related_ids, resource): 

201 if self.AnnotationKey is not None: 

202 model = self.manager.get_model() 

203 akey = 'c7n:%s' % self.AnnotationKey 

204 resource[akey] = self.child_resources.get(resource[model.id], [])