Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/vpc.py: 28%
163 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from c7n.exceptions import PolicyValidationError
4from c7n.utils import local_session, type_schema
6from .core import Filter, ValueFilter
7from .related import RelatedResourceFilter
10class MatchResourceValidator:
12 def validate(self):
13 if self.data.get('match-resource'):
14 self.required_keys = set('key',)
15 return super(MatchResourceValidator, self).validate()
18class SecurityGroupFilter(MatchResourceValidator, RelatedResourceFilter):
19 """Filter a resource by its associated security groups."""
20 schema = type_schema(
21 'security-group', rinherit=ValueFilter.schema,
22 **{'match-resource': {'type': 'boolean'},
23 'operator': {'enum': ['and', 'or']}})
24 schema_alias = True
26 RelatedResource = "c7n.resources.vpc.SecurityGroup"
27 AnnotationKey = "matched-security-groups"
30class SubnetFilter(MatchResourceValidator, RelatedResourceFilter):
31 """Filter a resource by its associated subnets attributes.
33 This filter is generally available for network attached resources.
35 ie. to find lambda functions that are vpc attached to subnets with
36 a tag key Location and value Database.
38 :example:
40 .. code-block:: yaml
42 policies:
43 - name: lambda
44 resource: aws.lambda
45 filters:
46 - type: subnet
47 key: tag:Location
48 value: Database
50 It also supports finding resources on public or private subnets
51 via route table introspection to determine if the subnet is
52 associated to an internet gateway.
54 :example:
56 .. code-block:: yaml
58 policies:
59 - name: public-ec2
60 resource: aws.ec2
61 filters:
62 - type: subnet
63 igw: True
64 key: SubnetId
65 value: present
67 """
69 schema = type_schema(
70 'subnet', rinherit=ValueFilter.schema,
71 **{'match-resource': {'type': 'boolean'},
72 'operator': {'enum': ['and', 'or']},
73 'igw': {'enum': [True, False]},
74 })
76 schema_alias = True
77 RelatedResource = "c7n.resources.vpc.Subnet"
78 AnnotationKey = "matched-subnets"
80 def get_permissions(self):
81 perms = super().get_permissions()
82 if self.data.get('igw') in (True, False):
83 perms += self.manager.get_resource_manager(
84 'aws.route-table').get_permissions()
85 return perms
87 def validate(self):
88 super().validate()
89 self.check_igw = self.data.get('igw')
91 def match(self, related):
92 if self.check_igw in [True, False]:
93 if not self.match_igw(related):
94 return False
95 return super().match(related)
97 def process(self, resources, event=None):
98 related = self.get_related(resources)
99 if self.check_igw in [True, False]:
100 self.route_tables = self.get_route_tables()
101 return [r for r in resources if self.process_resource(r, related)]
103 def get_route_tables(self):
104 rmanager = self.manager.get_resource_manager('aws.route-table')
105 route_tables = {}
106 for r in rmanager.resources():
107 for a in r['Associations']:
108 if a['Main']:
109 route_tables[r['VpcId']] = r
110 elif 'SubnetId' in a:
111 route_tables[a['SubnetId']] = r
112 return route_tables
114 def match_igw(self, subnet):
115 rtable = self.route_tables.get(
116 subnet['SubnetId'],
117 self.route_tables.get(subnet['VpcId']))
118 if rtable is None:
119 self.log.debug('route table for %s not found', subnet['SubnetId'])
120 return
121 found_igw = False
122 for route in rtable['Routes']:
123 if route.get('GatewayId') and route['GatewayId'].startswith('igw-'):
124 found_igw = True
125 break
126 if self.check_igw and found_igw:
127 return True
128 elif not self.check_igw and not found_igw:
129 return True
130 return False
133class VpcFilter(MatchResourceValidator, RelatedResourceFilter):
134 """Filter a resource by its associated vpc."""
135 schema = type_schema(
136 'vpc', rinherit=ValueFilter.schema,
137 **{'match-resource': {'type': 'boolean'},
138 'operator': {'enum': ['and', 'or']}})
140 schema_alias = True
141 RelatedResource = "c7n.resources.vpc.Vpc"
142 AnnotationKey = "matched-vpcs"
145class DefaultVpcBase(Filter):
146 """Filter to resources in a default vpc."""
147 vpcs = None
148 default_vpc = None
149 permissions = ('ec2:DescribeVpcs',)
151 def match(self, vpc_id):
152 if self.default_vpc is None:
153 self.log.debug("querying default vpc %s" % vpc_id)
154 client = local_session(self.manager.session_factory).client('ec2')
155 vpcs = [v['VpcId'] for v
156 in client.describe_vpcs()['Vpcs']
157 if v['IsDefault']]
158 if vpcs:
159 self.default_vpc = vpcs.pop()
160 return vpc_id == self.default_vpc and True or False
163class NetworkLocation(Filter):
164 """On a network attached resource, determine intersection of
165 security-group attributes, subnet attributes, and resource attributes.
167 The use case is a bit specialized, for most use cases using `subnet`
168 and `security-group` filters suffice. but say for example you wanted to
169 verify that an ec2 instance was only using subnets and security groups
170 with a given tag value, and that tag was not present on the resource.
172 :Example:
174 .. code-block:: yaml
176 policies:
177 - name: ec2-mismatched-sg-remove
178 resource: ec2
179 filters:
180 - type: network-location
181 compare: ["resource","security-group"]
182 key: "tag:TEAM_NAME"
183 ignore:
184 - "tag:TEAM_NAME": Enterprise
185 actions:
186 - type: modify-security-groups
187 remove: network-location
188 isolation-group: sg-xxxxxxxx
189 """
191 schema = type_schema(
192 'network-location',
193 **{'missing-ok': {
194 'type': 'boolean',
195 'default': False,
196 'description': (
197 "How to handle missing keys on elements, by default this causes"
198 "resources to be considered not-equal")},
199 'match': {'type': 'string', 'enum': ['equal', 'not-equal'],
200 'default': 'non-equal'},
201 'compare': {
202 'type': 'array',
203 'description': (
204 'Which elements of network location should be considered when'
205 ' matching.'),
206 'default': ['resource', 'subnet', 'security-group'],
207 'items': {
208 'enum': ['resource', 'subnet', 'security-group']}},
209 'key': {
210 'type': 'string',
211 'description': 'The attribute expression that should be matched on'},
212 'max-cardinality': {
213 'type': 'integer', 'default': 1,
214 'title': ''},
215 'ignore': {'type': 'array', 'items': {'type': 'object'}},
216 'required': ['key'],
218 })
219 schema_alias = True
220 permissions = ('ec2:DescribeSecurityGroups', 'ec2:DescribeSubnets')
222 def validate(self):
223 rfilters = self.manager.filter_registry.keys()
224 if 'subnet' not in rfilters:
225 raise PolicyValidationError(
226 "network-location requires resource subnet filter availability on %s" % (
227 self.manager.data))
229 if 'security-group' not in rfilters:
230 raise PolicyValidationError(
231 "network-location requires resource security-group filter availability on %s" % (
232 self.manager.data))
233 return self
235 def process(self, resources, event=None):
236 self.sg = self.manager.filter_registry.get('security-group')({}, self.manager)
237 related_sg = self.sg.get_related(resources)
239 self.subnet = self.manager.filter_registry.get('subnet')({}, self.manager)
240 related_subnet = self.subnet.get_related(resources)
242 self.sg_model = self.manager.get_resource_manager('security-group').get_model()
243 self.subnet_model = self.manager.get_resource_manager('subnet').get_model()
244 self.vf = self.manager.filter_registry.get('value')({}, self.manager)
246 # filter options
247 key = self.data.get('key')
248 self.compare = self.data.get('compare', ['subnet', 'security-group', 'resource'])
249 self.max_cardinality = self.data.get('max-cardinality', 1)
250 self.match = self.data.get('match', 'not-equal')
251 self.missing_ok = self.data.get('missing-ok', False)
253 results = []
254 for r in resources:
255 resource_sgs = self.filter_ignored(
256 [related_sg[sid] for sid in self.sg.get_related_ids([r]) if sid in related_sg])
257 resource_subnets = self.filter_ignored(
258 [related_subnet[sid] for sid in self.subnet.get_related_ids([r])
259 if sid in related_subnet])
260 found = self.process_resource(r, resource_sgs, resource_subnets, key)
261 if found:
262 results.append(found)
264 return results
266 def filter_ignored(self, resources):
267 ignores = self.data.get('ignore', ())
268 results = []
270 for r in resources:
271 found = False
272 for i in ignores:
273 for k, v in i.items():
274 if self.vf.get_resource_value(k, r) == v:
275 found = True
276 if found is True:
277 break
278 if found is True:
279 continue
280 results.append(r)
281 return results
283 def process_resource(self, r, resource_sgs, resource_subnets, key):
284 evaluation = []
285 sg_space = set()
286 subnet_space = set()
288 if 'subnet' in self.compare:
289 subnet_values = {
290 rsub[self.subnet_model.id]: self.subnet.get_resource_value(key, rsub)
291 for rsub in resource_subnets}
293 if not self.missing_ok and None in subnet_values.values():
294 evaluation.append({
295 'reason': 'SubnetLocationAbsent',
296 'subnets': subnet_values})
297 subnet_space = set(filter(None, subnet_values.values()))
299 if len(subnet_space) > self.max_cardinality:
300 evaluation.append({
301 'reason': 'SubnetLocationCardinality',
302 'subnets': subnet_values})
304 if 'security-group' in self.compare:
305 sg_values = {
306 rsg[self.sg_model.id]: self.sg.get_resource_value(key, rsg)
307 for rsg in resource_sgs}
308 if not self.missing_ok and None in sg_values.values():
309 evaluation.append({
310 'reason': 'SecurityGroupLocationAbsent',
311 'security-groups': sg_values})
313 sg_space = set(filter(None, sg_values.values()))
315 if len(sg_space) > self.max_cardinality:
316 evaluation.append({
317 'reason': 'SecurityGroupLocationCardinality',
318 'security-groups': sg_values})
320 if ('subnet' in self.compare and
321 'security-group' in self.compare and
322 sg_space != subnet_space):
323 evaluation.append({
324 'reason': 'LocationMismatch',
325 'subnets': subnet_values,
326 'security-groups': sg_values})
328 if 'resource' in self.compare:
329 r_value = self.vf.get_resource_value(key, r)
330 if not self.missing_ok and r_value is None:
331 evaluation.append({
332 'reason': 'ResourceLocationAbsent',
333 'resource': r_value})
334 elif 'security-group' in self.compare and resource_sgs and r_value not in sg_space:
335 evaluation.append({
336 'reason': 'ResourceLocationMismatch',
337 'resource': r_value,
338 'security-groups': sg_values})
339 elif 'subnet' in self.compare and resource_subnets and r_value not in subnet_space:
340 evaluation.append({
341 'reason': 'ResourceLocationMismatch',
342 'resource': r_value,
343 'subnet': subnet_values})
344 if 'security-group' in self.compare and resource_sgs:
345 mismatched_sgs = {sg_id: sg_value
346 for sg_id, sg_value in sg_values.items()
347 if sg_value != r_value}
348 if mismatched_sgs:
349 evaluation.append({
350 'reason': 'SecurityGroupMismatch',
351 'resource': r_value,
352 'security-groups': mismatched_sgs})
354 if evaluation and self.match == 'not-equal':
355 r['c7n:NetworkLocation'] = evaluation
356 return r
357 elif not evaluation and self.match == 'equal':
358 return r