Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/related.py: 25%
136 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import importlib
4from functools import lru_cache
6from .core import ValueFilter, OPERATORS
7from c7n.query import ChildResourceQuery
8from c7n.utils import jmespath_search
11class RelatedResourceFilter(ValueFilter):
13 schema_alias = False
15 RelatedResource = None
16 RelatedIdsExpression = None
17 AnnotationKey = None
18 FetchThreshold = 10
20 def get_permissions(self):
21 return self.get_resource_manager().get_permissions()
23 def validate(self):
24 name = self.__class__.__name__
25 if self.RelatedIdsExpression is None:
26 raise ValueError(
27 "%s Filter requires resource expression" % name)
28 # if self.AnnotationKey is None:
29 # raise ValueError(
30 # "%s Filter requires annotation key" % name)
32 if self.RelatedResource is None:
33 raise ValueError(
34 "%s Filter requires resource manager spec" % name)
35 return super(RelatedResourceFilter, self).validate()
37 def get_related_ids(self, resources):
38 return set(jmespath_search(
39 "[].%s" % self.RelatedIdsExpression, resources))
41 def get_related(self, resources):
42 resource_manager = self.get_resource_manager()
43 related_ids = self.get_related_ids(resources)
44 model = resource_manager.get_model()
45 if len(related_ids) < self.FetchThreshold:
46 related = resource_manager.get_resources(list(related_ids))
47 else:
48 related = resource_manager.resources()
50 if related is None:
51 return {}
53 return {r[model.id]: r for r in related
54 if r[model.id] in related_ids}
56 @lru_cache(maxsize=None)
57 def get_resource_manager(self):
58 mod_path, class_name = self.RelatedResource.rsplit('.', 1)
59 module = importlib.import_module(mod_path)
60 manager_class = getattr(module, class_name)
61 return manager_class(self.manager.ctx, {})
63 def process_resource(self, resource, related):
64 related_ids = self.get_related_ids([resource])
65 model = self.manager.get_model()
66 op = self.data.get('operator', 'or')
67 found = []
69 if self.data.get('match-resource') is True:
70 self.data['value'] = self.get_resource_value(
71 self.data['key'], resource)
73 if self.data.get('value_type') == 'resource_count':
74 count_matches = OPERATORS[self.data.get('op')](len(related_ids), self.data.get('value'))
75 if count_matches:
76 self._add_annotations(related_ids, resource)
77 return count_matches
79 for rid in related_ids:
80 robj = related.get(rid, None)
81 if robj is None:
82 # in the event that the filter is looking specifically for absent values, we can
83 # safely assume that the non-existant related resource will have an absent value at
84 # any given key
85 if self.data['value'] == 'absent':
86 found.append(rid)
87 else:
88 self.log.warning(
89 "Resource %s:%s references non existant %s: %s",
90 self.manager.type,
91 resource[model.id],
92 self.RelatedResource.rsplit('.', 1)[-1],
93 rid)
94 continue
95 if self.match(robj):
96 found.append(rid)
98 if found:
99 self._add_annotations(found, resource)
101 if op == 'or' and found:
102 return True
103 elif op == 'and' and len(found) == len(related_ids):
104 return True
105 return False
107 def _add_annotations(self, related_ids, resource):
108 if self.AnnotationKey is not None:
109 akey = 'c7n:%s' % self.AnnotationKey
110 resource[akey] = list(set(related_ids).union(resource.get(akey, [])))
112 def process(self, resources, event=None):
113 related = self.get_related(resources)
114 return [r for r in resources if self.process_resource(r, related)]
117class RelatedResourceByIdFilter(RelatedResourceFilter):
118 """
119 Value filter for related resources in which the main resource only contains the related
120 resource id.
121 """
123 RelatedResourceByIdExpression = None
125 def get_related(self, resources):
126 resource_manager = self.get_resource_manager()
127 related_ids = self.get_related_ids(resources)
129 related = {}
130 for r in resource_manager.resources():
131 matched_vpc = self.get_related_by_ids(r) & related_ids
132 if matched_vpc:
133 for vpc in matched_vpc:
134 related_resources = related.get(vpc, [])
135 related_resources.append(r)
136 related[vpc] = related_resources
137 return related
139 def get_related_by_ids(self, resources):
140 RelatedResourceKey = self.RelatedResourceByIdExpression or self.RelatedIdsExpression
141 ids = jmespath_search("%s" % RelatedResourceKey, resources)
142 if isinstance(ids, str):
143 ids = [ids]
144 return set(ids)
146 def process_resource(self, resource, related):
147 related_ids = self.get_related_ids([resource])
148 op = self.data.get('operator', 'or')
149 found = []
151 if self.data.get('match-resource') is True:
152 self.data['value'] = self.get_resource_value(
153 self.data['key'], resource)
155 if self.data.get('value_type') == 'resource_count':
156 count_matches = OPERATORS[self.data.get('op')](len(related_ids), self.data.get('value'))
157 if count_matches:
158 self._add_annotations(related_ids, resource)
159 return count_matches
161 for rid in related_ids:
162 robjs = related.get(rid, [None])
163 for robj in robjs:
164 if robj is None:
165 continue
166 if self.match(robj):
167 found.append(rid)
169 if found:
170 self._add_annotations(found, resource)
172 if op == 'or' and found:
173 return True
174 elif op == 'and' and len(found) == len(related_ids):
175 return True
176 return False
179class ChildResourceFilter(RelatedResourceFilter):
180 ChildResource = None
181 RelatedIdsExpression = None
182 ChildResourceParentKey = "c7n:parent-id"
184 def get_related(self, resources):
185 self.child_resources = {}
187 parent_ids = self.get_related_ids(resources)
189 child_resource_manager = self.get_resource_manager()
190 child_query = ChildResourceQuery(
191 child_resource_manager.session_factory,
192 child_resource_manager,
193 )
194 children = child_query.filter(child_resource_manager, parent_ids=list(parent_ids))
195 for r in children:
196 self.child_resources.setdefault(r[self.ChildResourceParentKey], []).append(r)
198 return self.child_resources
200 def _add_annotations(self, related_ids, resource):
201 if self.AnnotationKey is not None:
202 model = self.manager.get_model()
203 akey = 'c7n:%s' % self.AnnotationKey
204 resource[akey] = self.child_resources.get(resource[model.id], [])