Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/filters/vpc.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from c7n.exceptions import PolicyValidationError
4from c7n.utils import local_session, type_schema
6from .core import Filter, ValueFilter
7from .related import RelatedResourceFilter
10class MatchResourceValidator:
12 def validate(self):
13 if self.data.get('match-resource'):
14 self.required_keys = set('key',)
15 return super(MatchResourceValidator, self).validate()
18class SecurityGroupFilter(MatchResourceValidator, RelatedResourceFilter):
19 """Filter a resource by its associated security groups."""
20 schema = type_schema(
21 'security-group', rinherit=ValueFilter.schema,
22 **{'match-resource': {'type': 'boolean'},
23 'operator': {'enum': ['and', 'or']}})
24 schema_alias = True
26 RelatedResource = "c7n.resources.vpc.SecurityGroup"
27 AnnotationKey = "matched-security-groups"
30class SubnetFilter(MatchResourceValidator, RelatedResourceFilter):
31 """Filter a resource by its associated subnets attributes.
33 This filter is generally available for network attached resources.
35 ie. to find lambda functions that are vpc attached to subnets with
36 a tag key Location and value Database.
38 :example:
40 .. code-block:: yaml
42 policies:
43 - name: lambda
44 resource: aws.lambda
45 filters:
46 - type: subnet
47 key: tag:Location
48 value: Database
50 It also supports finding resources on public or private subnets
51 via route table introspection to determine if the subnet is
52 associated to an internet gateway or a nat gateway.
54 :example:
56 .. code-block:: yaml
58 policies:
59 - name: public-ec2
60 resource: aws.ec2
61 filters:
62 - type: subnet
63 operator: or
64 igw: True
65 nat: True
67 """
69 schema = type_schema(
70 'subnet', rinherit=ValueFilter.schema,
71 **{'match-resource': {'type': 'boolean'},
72 'operator': {'enum': ['and', 'or']},
73 'igw': {'enum': [True, False]},
74 'nat': {'enum': [True, False]},
75 })
77 schema_alias = True
78 RelatedResource = "c7n.resources.vpc.Subnet"
79 AnnotationKey = "matched-subnets"
80 required_keys = set()
82 def get_permissions(self):
83 perms = super().get_permissions()
84 if self.data.get('igw') in (True, False) or self.data.get('nat') in (True, False):
85 perms += self.manager.get_resource_manager(
86 'aws.route-table').get_permissions()
87 return perms
89 def validate(self):
90 super().validate()
91 self.check_igw = self.data.get('igw')
92 self.check_nat = self.data.get('nat')
94 def match(self, related):
95 op = all if self.data.get('operator', 'and') == 'and' else any
97 # If the policy doesn't define value filter keys, implicitly
98 # pass and skip the value filter match.
99 value_match = {'key', 'value'}.difference(self.data) or super().match(related)
101 return op(
102 (
103 self.match_igw(related),
104 self.match_nat(related),
105 value_match,
106 )
107 )
109 def process(self, resources, event=None):
110 related = self.get_related(resources)
111 if self.check_igw in [True, False] or self.check_nat in [True, False]:
112 self.route_tables = self.get_route_tables()
113 return [r for r in resources if self.process_resource(r, related)]
115 def get_route_tables(self):
116 rmanager = self.manager.get_resource_manager('aws.route-table')
117 route_tables = {}
118 for r in rmanager.resources():
119 for a in r['Associations']:
120 if a['Main']:
121 route_tables[r['VpcId']] = r
122 elif 'SubnetId' in a:
123 route_tables[a['SubnetId']] = r
124 return route_tables
126 def match_igw(self, subnet):
127 if self.check_igw is None:
128 return True
129 rtable = self.route_tables.get(
130 subnet['SubnetId'],
131 self.route_tables.get(subnet['VpcId']))
132 if rtable is None:
133 self.log.debug('route table for %s not found', subnet['SubnetId'])
134 return
135 found_igw = False
136 for route in rtable['Routes']:
137 if route.get('GatewayId') and route['GatewayId'].startswith('igw-'):
138 found_igw = True
139 break
140 if self.check_igw and found_igw:
141 return True
142 elif not self.check_igw and not found_igw:
143 return True
144 return False
146 def match_nat(self, subnet):
147 if self.check_nat is None:
148 return True
149 rtable = self.route_tables.get(
150 subnet['SubnetId'],
151 self.route_tables.get(subnet['VpcId']))
152 if rtable is None:
153 self.log.debug('route table for %s not found', subnet['SubnetId'])
154 return
155 found_nat = False
156 for route in rtable['Routes']:
157 if route.get('NatGatewayId'):
158 found_nat = True
159 break
160 if self.check_nat and found_nat:
161 return True
162 elif not self.check_nat and not found_nat:
163 return True
164 return False
167class VpcFilter(MatchResourceValidator, RelatedResourceFilter):
168 """Filter a resource by its associated vpc."""
169 schema = type_schema(
170 'vpc', rinherit=ValueFilter.schema,
171 **{'match-resource': {'type': 'boolean'},
172 'operator': {'enum': ['and', 'or']}})
174 schema_alias = True
175 RelatedResource = "c7n.resources.vpc.Vpc"
176 AnnotationKey = "matched-vpcs"
179class DefaultVpcBase(Filter):
180 """Filter to resources in a default vpc."""
181 vpcs = None
182 default_vpc = None
183 permissions = ('ec2:DescribeVpcs',)
185 def match(self, vpc_id):
186 if self.default_vpc is None:
187 self.log.debug("querying default vpc %s" % vpc_id)
188 client = local_session(self.manager.session_factory).client('ec2')
189 vpcs = [v['VpcId'] for v
190 in client.describe_vpcs()['Vpcs']
191 if v['IsDefault']]
192 if vpcs:
193 self.default_vpc = vpcs.pop()
194 return vpc_id == self.default_vpc and True or False
197class NetworkLocation(Filter):
198 """On a network attached resource, determine intersection of
199 security-group attributes, subnet attributes, and resource attributes.
201 The use case is a bit specialized, for most use cases using `subnet`
202 and `security-group` filters suffice. but say for example you wanted to
203 verify that an ec2 instance was only using subnets and security groups
204 with a given tag value, and that tag was not present on the resource.
206 :Example:
208 .. code-block:: yaml
210 policies:
211 - name: ec2-mismatched-sg-remove
212 resource: ec2
213 filters:
214 - type: network-location
215 compare: ["resource","security-group"]
216 key: "tag:TEAM_NAME"
217 ignore:
218 - "tag:TEAM_NAME": Enterprise
219 actions:
220 - type: modify-security-groups
221 remove: network-location
222 isolation-group: sg-xxxxxxxx
223 """
225 schema = type_schema(
226 'network-location',
227 **{'missing-ok': {
228 'type': 'boolean',
229 'default': False,
230 'description': (
231 "How to handle missing keys on elements, by default this causes"
232 "resources to be considered not-equal")},
233 'match': {'type': 'string', 'enum': ['equal', 'not-equal', 'in'],
234 'default': 'non-equal'},
235 'compare': {
236 'type': 'array',
237 'description': (
238 'Which elements of network location should be considered when'
239 ' matching.'),
240 'default': ['resource', 'subnet', 'security-group'],
241 'items': {
242 'enum': ['resource', 'subnet', 'security-group']}},
243 'key': {
244 'type': 'string',
245 'description': 'The attribute expression that should be matched on'},
246 'max-cardinality': {
247 'type': 'integer', 'default': 1,
248 'title': ''},
249 'ignore': {'type': 'array', 'items': {'type': 'object'}},
250 'required': ['key'],
251 'value': {'type': 'array', 'items': {'type': 'string'}}
252 })
253 schema_alias = True
254 permissions = ('ec2:DescribeSecurityGroups', 'ec2:DescribeSubnets')
256 def validate(self):
257 rfilters = self.manager.filter_registry.keys()
258 if 'subnet' not in rfilters:
259 raise PolicyValidationError(
260 "network-location requires resource subnet filter availability on %s" % (
261 self.manager.data))
263 if 'security-group' not in rfilters:
264 raise PolicyValidationError(
265 "network-location requires resource security-group filter availability on %s" % (
266 self.manager.data))
267 return self
269 def process(self, resources, event=None):
270 self.sg = self.manager.filter_registry.get('security-group')({}, self.manager)
271 related_sg = self.sg.get_related(resources)
273 self.subnet = self.manager.filter_registry.get('subnet')({}, self.manager)
274 related_subnet = self.subnet.get_related(resources)
276 self.sg_model = self.manager.get_resource_manager('security-group').get_model()
277 self.subnet_model = self.manager.get_resource_manager('subnet').get_model()
278 self.vf = self.manager.filter_registry.get('value')({}, self.manager)
280 # filter options
281 key = self.data.get('key')
282 self.compare = self.data.get('compare', ['subnet', 'security-group', 'resource'])
283 self.max_cardinality = self.data.get('max-cardinality', 1)
284 self.match = self.data.get('match', 'not-equal')
285 self.missing_ok = self.data.get('missing-ok', False)
287 results = []
288 for r in resources:
289 resource_sgs = self.filter_ignored(
290 [related_sg[sid] for sid in self.sg.get_related_ids([r]) if sid in related_sg])
291 resource_subnets = self.filter_ignored(
292 [related_subnet[sid] for sid in self.subnet.get_related_ids([r])
293 if sid in related_subnet])
294 found = self.process_resource(r, resource_sgs, resource_subnets, key)
295 if found:
296 results.append(found)
298 return results
300 def filter_ignored(self, resources):
301 ignores = self.data.get('ignore', ())
302 results = []
304 for r in resources:
305 found = False
306 for i in ignores:
307 for k, v in i.items():
308 if self.vf.get_resource_value(k, r) == v:
309 found = True
310 if found is True:
311 break
312 if found is True:
313 continue
314 results.append(r)
315 return results
317 def process_resource(self, r, resource_sgs, resource_subnets, key):
318 evaluation = []
319 sg_space = set()
320 subnet_space = set()
322 if self.match == 'in':
323 return self.process_match_in(r, resource_sgs, resource_subnets, key)
325 if 'subnet' in self.compare:
326 subnet_values = {
327 rsub[self.subnet_model.id]: self.subnet.get_resource_value(key, rsub)
328 for rsub in resource_subnets}
330 if not self.missing_ok and None in subnet_values.values():
331 evaluation.append({
332 'reason': 'SubnetLocationAbsent',
333 'subnets': subnet_values})
334 subnet_space = set(filter(None, subnet_values.values()))
336 if len(subnet_space) > self.max_cardinality:
337 evaluation.append({
338 'reason': 'SubnetLocationCardinality',
339 'subnets': subnet_values})
341 if 'security-group' in self.compare:
342 sg_values = {
343 rsg[self.sg_model.id]: self.sg.get_resource_value(key, rsg)
344 for rsg in resource_sgs}
345 if not self.missing_ok and None in sg_values.values():
346 evaluation.append({
347 'reason': 'SecurityGroupLocationAbsent',
348 'security-groups': sg_values})
350 sg_space = set(filter(None, sg_values.values()))
352 if len(sg_space) > self.max_cardinality:
353 evaluation.append({
354 'reason': 'SecurityGroupLocationCardinality',
355 'security-groups': sg_values})
357 if ('subnet' in self.compare and
358 'security-group' in self.compare and
359 sg_space != subnet_space):
360 evaluation.append({
361 'reason': 'LocationMismatch',
362 'subnets': subnet_values,
363 'security-groups': sg_values})
365 if 'resource' in self.compare:
366 r_value = self.vf.get_resource_value(key, r)
367 if not self.missing_ok and r_value is None:
368 evaluation.append({
369 'reason': 'ResourceLocationAbsent',
370 'resource': r_value})
371 elif 'security-group' in self.compare and resource_sgs and r_value not in sg_space:
372 evaluation.append({
373 'reason': 'ResourceLocationMismatch',
374 'resource': r_value,
375 'security-groups': sg_values})
376 elif 'subnet' in self.compare and resource_subnets and r_value not in subnet_space:
377 evaluation.append({
378 'reason': 'ResourceLocationMismatch',
379 'resource': r_value,
380 'subnet': subnet_values})
381 if 'security-group' in self.compare and resource_sgs:
382 mismatched_sgs = {sg_id: sg_value
383 for sg_id, sg_value in sg_values.items()
384 if sg_value != r_value}
385 if mismatched_sgs:
386 evaluation.append({
387 'reason': 'SecurityGroupMismatch',
388 'resource': r_value,
389 'security-groups': mismatched_sgs})
391 if evaluation and self.match == 'not-equal':
392 r['c7n:NetworkLocation'] = evaluation
393 return r
394 elif not evaluation and self.match == 'equal':
395 return r
397 def process_match_in(self, r, resource_sgs, resource_subnets, key):
398 network_location_vals = set(self.data.get('value', []))
400 if 'subnet' in self.compare:
401 subnet_values = {
402 rsub[self.subnet_model.id]: self.subnet.get_resource_value(key, rsub)
403 for rsub in resource_subnets}
404 # import pdb; pdb.set_trace()
405 if not self.missing_ok and None in subnet_values.values():
406 return
408 subnet_space = set(filter(None, subnet_values.values()))
409 if not subnet_space.issubset(network_location_vals):
410 return
412 if 'security-group' in self.compare:
413 sg_values = {
414 rsg[self.sg_model.id]: self.sg.get_resource_value(key, rsg)
415 for rsg in resource_sgs}
416 if not self.missing_ok and None in sg_values.values():
417 return
419 sg_space = set(filter(None, sg_values.values()))
421 if not sg_space.issubset(network_location_vals):
422 return
424 if 'resource' in self.compare:
425 r_value = self.vf.get_resource_value(key, r)
426 if not self.missing_ok and r_value is None:
427 return
429 if r_value not in network_location_vals:
430 return
432 return r