Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/core.py: 26%
665 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4Resource Filtering Logic
5"""
6import copy
7import datetime
8from datetime import timedelta
9import fnmatch
10import ipaddress
11import logging
12import operator
13import re
15from dateutil.tz import tzutc
16from dateutil.parser import parse
17from c7n.vendored.distutils import version
18from random import sample
20from c7n.element import Element
21from c7n.exceptions import PolicyValidationError, PolicyExecutionError
22from c7n.manager import ResourceManager
23from c7n.registry import PluginRegistry
24from c7n.resolver import ValuesFrom
25from c7n.utils import (
26 set_annotation,
27 type_schema,
28 parse_cidr,
29 parse_date,
30 jmespath_search,
31 jmespath_compile
32)
33from c7n.manager import iter_filters
36class FilterValidationError(Exception):
37 pass
40# Matching filters annotate their key onto objects
41ANNOTATION_KEY = "c7n:MatchedFilters"
44def glob_match(value, pattern):
45 if not isinstance(value, str):
46 return False
47 return fnmatch.fnmatch(value, pattern)
50def regex_match(value, regex):
51 if not isinstance(value, str):
52 return False
53 # Note python 2.5+ internally cache regex
54 # would be nice to use re2
55 return bool(re.match(regex, value, flags=re.IGNORECASE))
58def regex_case_sensitive_match(value, regex):
59 if not isinstance(value, str):
60 return False
61 # Note python 2.5+ internally cache regex
62 # would be nice to use re2
63 return bool(re.match(regex, value))
66def operator_in(x, y):
67 return x in y
70def operator_ni(x, y):
71 return x not in y
74def difference(x, y):
75 return bool(set(x).difference(y))
78def intersect(x, y):
79 return bool(set(x).intersection(y))
82OPERATORS = {
83 'eq': operator.eq,
84 'equal': operator.eq,
85 'ne': operator.ne,
86 'not-equal': operator.ne,
87 'gt': operator.gt,
88 'greater-than': operator.gt,
89 'ge': operator.ge,
90 'gte': operator.ge,
91 'le': operator.le,
92 'lte': operator.le,
93 'lt': operator.lt,
94 'less-than': operator.lt,
95 'glob': glob_match,
96 'regex': regex_match,
97 'regex-case': regex_case_sensitive_match,
98 'in': operator_in,
99 'ni': operator_ni,
100 'not-in': operator_ni,
101 'contains': operator.contains,
102 'difference': difference,
103 'intersect': intersect}
106VALUE_TYPES = [
107 'age', 'integer', 'expiration', 'normalize', 'size',
108 'cidr', 'cidr_size', 'swap', 'resource_count', 'expr',
109 'unique_size', 'date', 'version', 'float']
112class FilterRegistry(PluginRegistry):
114 def __init__(self, *args, **kw):
115 super().__init__(*args, **kw)
116 self.register('value', ValueFilter)
117 self.register('or', Or)
118 self.register('and', And)
119 self.register('not', Not)
120 self.register('event', EventFilter)
121 self.register('reduce', ReduceFilter)
122 self.register('list-item', ListItemFilter)
124 def parse(self, data, manager):
125 results = []
126 for d in data:
127 results.append(self.factory(d, manager))
128 return results
130 def factory(self, data, manager=None):
131 """Factory func for filters.
133 data - policy config for filters
134 manager - resource type manager (ec2, s3, etc)
135 """
137 # Make the syntax a little nicer for common cases.
138 if isinstance(data, dict) and len(data) == 1 and 'type' not in data:
139 op = list(data.keys())[0]
140 if op == 'or':
141 return self['or'](data, self, manager)
142 elif op == 'and':
143 return self['and'](data, self, manager)
144 elif op == 'not':
145 return self['not'](data, self, manager)
146 return ValueFilter(data, manager)
147 if isinstance(data, str):
148 filter_type = data
149 data = {'type': data}
150 else:
151 filter_type = data.get('type')
152 if not filter_type:
153 raise PolicyValidationError(
154 "%s Invalid Filter %s" % (
155 self.plugin_type, data))
156 filter_class = self.get(filter_type)
157 if filter_class is not None:
158 return filter_class(data, manager)
159 else:
160 raise PolicyValidationError(
161 "%s Invalid filter type %s" % (
162 self.plugin_type, data))
165def trim_runtime(filters):
166 """Remove runtime filters.
168 Some filters can only be effectively evaluated at policy
169 execution, ie. event filters.
171 When evaluating conditions for dryrun or provisioning stages we
172 remove them.
173 """
174 def remove_filter(f):
175 block = f.get_block_parent()
176 block.filters.remove(f)
177 if isinstance(block, BooleanGroupFilter) and not len(block):
178 remove_filter(block)
180 for f in iter_filters(filters):
181 if isinstance(f, EventFilter):
182 remove_filter(f)
185# Really should be an abstract base class (abc) or
186# zope.interface
188class Filter(Element):
190 log = logging.getLogger('custodian.filters')
192 def __init__(self, data, manager=None):
193 self.data = data
194 self.manager = manager
196 def process(self, resources, event=None):
197 """ Bulk process resources and return filtered set."""
198 return list(filter(self, resources))
200 def get_block_operator(self):
201 """Determine the immediate parent boolean operator for a filter"""
202 # Top level operator is `and`
203 block = self.get_block_parent()
204 if block.type in ('and', 'or', 'not'):
205 return block.type
206 return 'and'
208 def get_block_parent(self):
209 """Get the block parent for a filter"""
210 block_stack = [self.manager]
211 for f in self.manager.iter_filters(block_end=True):
212 if f is None:
213 block_stack.pop()
214 elif f == self:
215 return block_stack[-1]
216 elif f.type in ('and', 'or', 'not'):
217 block_stack.append(f)
219 def merge_annotation(self, r, annotation_key, values):
220 block_op = self.get_block_operator()
221 if block_op in ('and', 'not'):
222 r[self.matched_annotation_key] = intersect_list(
223 values,
224 r.get(self.matched_annotation_key))
225 elif block_op == 'or':
226 r[self.matched_annotation_key] = union_list(
227 values,
228 r.get(self.matched_annotation_key))
231class BaseValueFilter(Filter):
232 expr = None
234 def __init__(self, data, manager=None):
235 super(BaseValueFilter, self).__init__(data, manager)
236 self.expr = {}
238 def get_resource_value(self, k, i, regex=None):
239 r = None
240 if k.startswith('tag:'):
241 tk = k.split(':', 1)[1]
242 if 'Tags' in i:
243 for t in i.get("Tags", []):
244 if t.get('Key') == tk:
245 r = t.get('Value')
246 break
247 # GCP schema: 'labels': {'key': 'value'}
248 elif 'labels' in i:
249 r = i.get('labels', {}).get(tk, None)
250 # GCP has a secondary form of labels called tags
251 # as labels without values.
252 # Azure schema: 'tags': {'key': 'value'}
253 elif 'tags' in i:
254 r = i.get('tags', {}).get(tk, None)
255 elif k in i:
256 r = i.get(k)
257 elif k not in self.expr:
258 self.expr[k] = jmespath_compile(k)
259 r = self.expr[k].search(i)
260 else:
261 r = self.expr[k].search(i)
263 if regex:
264 r = ValueRegex(regex).get_resource_value(r)
265 return r
267 def _validate_value_regex(self, regex):
268 """Specific validation for `value_regex` type
270 The `value_regex` type works a little differently. In
271 particular it doesn't support OPERATORS that perform
272 operations on a list of values, specifically 'intersect',
273 'contains', 'difference', 'in' and 'not-in'
274 """
275 # Sanity check that we can compile
276 try:
277 pattern = re.compile(regex)
278 if pattern.groups != 1:
279 raise PolicyValidationError(
280 "value_regex must have a single capturing group: %s" %
281 self.data)
282 except re.error as e:
283 raise PolicyValidationError(
284 "Invalid value_regex: %s %s" % (e, self.data))
285 return self
288def intersect_list(a, b):
289 if b is None:
290 return a
291 elif a is None:
292 return b
293 res = []
294 for x in a:
295 if x in b:
296 res.append(x)
297 return res
300def union_list(a, b):
301 if not b:
302 return a
303 if not a:
304 return b
305 res = a
306 res.extend(x for x in b if x not in a)
307 return res
310class BooleanGroupFilter(Filter):
312 def __init__(self, data, registry, manager):
313 super(BooleanGroupFilter, self).__init__(data)
314 self.registry = registry
315 self.filters = registry.parse(list(self.data.values())[0], manager)
316 self.manager = manager
318 def validate(self):
319 for f in self.filters:
320 f.validate()
321 return self
323 def get_resource_type_id(self):
324 resource_type = self.manager.get_model()
325 return resource_type.id
327 def __len__(self):
328 return len(self.filters)
330 def __bool__(self):
331 return True
333 def get_deprecations(self):
334 """Return any matching deprecations for the nested filters."""
335 deprecations = []
336 for f in self.filters:
337 deprecations.extend(f.get_deprecations())
338 return deprecations
341class Or(BooleanGroupFilter):
343 def process(self, resources, event=None):
344 if self.manager:
345 return self.process_set(resources, event)
346 return super(Or, self).process(resources, event)
348 def __call__(self, r):
349 """Fallback for older unit tests that don't utilize a query manager"""
350 for f in self.filters:
351 if f(r):
352 return True
353 return False
355 def process_set(self, resources, event):
356 rtype_id = self.get_resource_type_id()
357 compiled = None
358 if '.' in rtype_id:
359 compiled = jmespath_compile(rtype_id)
360 resource_map = {compiled.search(r): r for r in resources}
361 else:
362 resource_map = {r[rtype_id]: r for r in resources}
363 results = set()
364 for f in self.filters:
365 if compiled:
366 results = results.union([
367 compiled.search(r) for r in f.process(resources, event)])
368 else:
369 results = results.union([
370 r[rtype_id] for r in f.process(resources, event)])
371 return [resource_map[r_id] for r_id in results]
374class And(BooleanGroupFilter):
376 def process(self, resources, events=None):
377 if self.manager:
378 sweeper = AnnotationSweeper(self.get_resource_type_id(), resources)
380 for f in self.filters:
381 resources = f.process(resources, events)
382 if not resources:
383 break
385 if self.manager:
386 sweeper.sweep(resources)
388 return resources
391class Not(BooleanGroupFilter):
393 def process(self, resources, event=None):
394 if self.manager:
395 return self.process_set(resources, event)
396 return super(Not, self).process(resources, event)
398 def __call__(self, r):
399 """Fallback for older unit tests that don't utilize a query manager"""
401 # There is an implicit 'and' for self.filters
402 # ~(A ^ B ^ ... ^ Z) = ~A v ~B v ... v ~Z
403 for f in self.filters:
404 if not f(r):
405 return True
406 return False
408 def process_set(self, resources, event):
409 rtype_id = self.get_resource_type_id()
410 compiled = None
411 if '.' in rtype_id:
412 compiled = jmespath_compile(rtype_id)
413 resource_map = {compiled.search(r): r for r in resources}
414 else:
415 resource_map = {r[rtype_id]: r for r in resources}
416 sweeper = AnnotationSweeper(rtype_id, resources)
418 for f in self.filters:
419 resources = f.process(resources, event)
420 if not resources:
421 break
423 before = set(resource_map.keys())
424 if compiled:
425 after = {compiled.search(r) for r in resources}
426 else:
427 after = {r[rtype_id] for r in resources}
428 results = before - after
429 sweeper.sweep([])
431 return [resource_map[r_id] for r_id in results]
434class AnnotationSweeper:
435 """Support clearing annotations set within a block filter.
437 See https://github.com/cloud-custodian/cloud-custodian/issues/2116
438 """
439 def __init__(self, id_key, resources):
440 self.id_key = id_key
441 ra_map = {}
442 resource_map = {}
443 compiled = None
444 if '.' in id_key:
445 compiled = jmespath_compile(self.id_key)
446 for r in resources:
447 if compiled:
448 id_ = compiled.search(r)
449 else:
450 id_ = r[self.id_key]
451 ra_map[id_] = {k: v for k, v in r.items() if k.startswith('c7n')}
452 resource_map[id_] = r
453 # We keep a full copy of the annotation keys to allow restore.
454 self.ra_map = copy.deepcopy(ra_map)
455 self.resource_map = resource_map
457 def sweep(self, resources):
458 compiled = None
459 if '.' in self.id_key:
460 compiled = jmespath_compile(self.id_key)
461 diff = set(self.ra_map).difference([compiled.search(r) for r in resources])
462 else:
463 diff = set(self.ra_map).difference([r[self.id_key] for r in resources])
464 for rid in diff:
465 # Clear annotations if the block filter didn't match
466 akeys = [k for k in self.resource_map[rid] if k.startswith('c7n')]
467 for k in akeys:
468 del self.resource_map[rid][k]
469 # Restore annotations that may have existed prior to the block filter.
470 self.resource_map[rid].update(self.ra_map[rid])
473# The default LooseVersion will fail on comparing present strings, used
474# in the value as shorthand for certain options.
475class ComparableVersion(version.LooseVersion):
476 def __eq__(self, other):
477 try:
478 return super(ComparableVersion, self).__eq__(other)
479 except TypeError:
480 return False
483class ValueFilter(BaseValueFilter):
484 """Generic value filter using jmespath
485 """
486 op = v = vtype = None
488 schema = {
489 'type': 'object',
490 # Doesn't mix well with inherits that extend
491 'additionalProperties': False,
492 'required': ['type'],
493 'properties': {
494 # Doesn't mix well as enum with inherits that extend
495 'type': {'enum': ['value']},
496 'key': {'type': 'string'},
497 'value_type': {'$ref': '#/definitions/filters_common/value_types'},
498 'default': {'type': 'object'},
499 'value_regex': {'type': 'string'},
500 'value_from': {'$ref': '#/definitions/filters_common/value_from'},
501 'value': {'$ref': '#/definitions/filters_common/value'},
502 'op': {'$ref': '#/definitions/filters_common/comparison_operators'},
503 'value_path': {'type':'string'}
504 }
505 }
506 schema_alias = True
507 annotate = True
508 required_keys = {'value', 'key'}
510 def _validate_resource_count(self):
511 """ Specific validation for `resource_count` type
513 The `resource_count` type works a little differently because it operates
514 on the entire set of resources. It:
515 - does not require `key`
516 - `value` must be a number
517 - supports a subset of the OPERATORS list
518 """
519 for field in ('op', 'value'):
520 if field not in self.data:
521 raise PolicyValidationError(
522 "Missing '%s' in value filter %s" % (field, self.data))
524 if not (isinstance(self.data['value'], int) or
525 isinstance(self.data['value'], list)):
526 raise PolicyValidationError(
527 "`value` must be an integer in resource_count filter %s" % self.data)
529 # I don't see how to support regex for this?
530 if (self.data['op'] not in OPERATORS or
531 self.data['op'] in {'regex', 'regex-case'} or
532 'value_regex' in self.data):
533 raise PolicyValidationError(
534 "Invalid operator in value filter %s" % self.data)
536 return self
538 def validate(self):
539 if len(self.data) == 1:
540 return self
542 # `resource_count` requires a slightly different schema than the rest of
543 # the value filters because it operates on the full resource list
544 if self.data.get('value_type') == 'resource_count':
545 return self._validate_resource_count()
546 elif self.data.get('value_type') == 'date':
547 if not parse_date(self.data.get('value')):
548 raise PolicyValidationError(
549 "value_type: date with invalid date value:%s",
550 self.data.get('value', ''))
551 if 'key' not in self.data and 'key' in self.required_keys:
552 raise PolicyValidationError(
553 "Missing 'key' in value filter %s" % self.data)
554 if ('value' not in self.data and
555 'value_from' not in self.data and
556 'value_path' not in self.data and
557 'value' in self.required_keys):
558 raise PolicyValidationError(
559 "Missing 'value' in value filter %s" % self.data)
560 if 'op' in self.data:
561 if self.data['op'] not in OPERATORS:
562 raise PolicyValidationError(
563 "Invalid operator in value filter %s" % self.data)
564 if self.data['op'] in {'regex', 'regex-case'}:
565 # Sanity check that we can compile
566 try:
567 re.compile(self.data['value'])
568 except re.error as e:
569 raise PolicyValidationError(
570 "Invalid regex: %s %s" % (e, self.data))
571 if 'value_regex' in self.data:
572 return self._validate_value_regex(self.data['value_regex'])
574 return self
576 def __call__(self, i):
577 if self.data.get('value_type') == 'resource_count':
578 return self.process(i)
580 matched = self.match(i)
581 if matched and self.annotate:
582 set_annotation(i, ANNOTATION_KEY, self.k)
583 return matched
585 def process(self, resources, event=None):
586 # For the resource_count filter we operate on the full set of resources.
587 if self.data.get('value_type') == 'resource_count':
588 op = OPERATORS[self.data.get('op')]
589 if op(len(resources), self.data.get('value')):
590 return resources
591 return []
593 return super(ValueFilter, self).process(resources, event)
595 def get_resource_value(self, k, i):
596 return super(ValueFilter, self).get_resource_value(k, i, self.data.get('value_regex'))
598 def get_path_value(self,i):
599 """Retrieve values using JMESPath.
601 When using a Value Filter, a ``value_path`` can be specified.
602 This means the value(s) the filter will compare against are
603 calculated during the initialization of the filter.
605 Note that this option only pulls properties of the resource
606 currently being filtered.
608 .. code-block:: yaml
609 - name: find-admins-with-user-roles
610 resource: gcp.project
611 filters:
612 - type: iam-policy
613 doc:
614 key: bindings[?(role=='roles/admin')].members[]
615 op: intersect
616 value_path: bindings[?(role=='roles/user_access')].members[]
618 The iam-policy use the implementation of the generic Value Filter.
619 This implementation allows for the comparison of two separate lists of values
620 within the same resource.
621 """
622 return jmespath_search(self.data.get('value_path'),i)
624 def match(self, i):
625 if self.v is None and len(self.data) == 1:
626 [(self.k, self.v)] = self.data.items()
627 elif self.v is None and not hasattr(self, 'content_initialized'):
628 self.k = self.data.get('key')
629 self.op = self.data.get('op')
630 if 'value_from' in self.data:
631 values = ValuesFrom(self.data['value_from'], self.manager)
632 self.v = values.get_values()
633 elif 'value_path' in self.data:
634 self.v = self.get_path_value(i)
635 else:
636 self.v = self.data.get('value')
637 self.content_initialized = True
638 self.vtype = self.data.get('value_type')
640 if i is None:
641 return False
643 # value extract
644 r = self.get_resource_value(self.k, i)
645 if self.op in ('in', 'not-in') and r is None:
646 r = ()
648 # value type conversion
649 if self.vtype is not None:
650 v, r = self.process_value_type(self.v, r, i)
651 else:
652 v = self.v
654 # Value match
655 if r is None and v == 'absent':
656 return True
657 elif r is not None and v == 'present':
658 return True
659 elif v == 'not-null' and r:
660 return True
661 elif v == 'empty' and not r:
662 return True
663 elif self.op:
664 op = OPERATORS[self.op]
665 try:
666 return op(r, v)
667 except TypeError:
668 return False
669 elif r == v:
670 return True
672 return False
674 def process_value_type(self, sentinel, value, resource):
675 if self.vtype == 'normalize' and isinstance(value, str):
676 return sentinel, value.strip().lower()
678 elif self.vtype == 'expr':
679 sentinel = self.get_resource_value(sentinel, resource)
680 return sentinel, value
682 elif self.vtype == 'integer':
683 try:
684 value = int(str(value).strip())
685 except ValueError:
686 value = 0
687 elif self.vtype == 'float':
688 try:
689 value = float(str(value).strip())
690 except ValueError:
691 value = 0.0
692 elif self.vtype == 'size':
693 try:
694 return sentinel, len(value)
695 except TypeError:
696 return sentinel, 0
697 elif self.vtype == 'unique_size':
698 try:
699 return sentinel, len(set(value))
700 except TypeError:
701 return sentinel, 0
702 elif self.vtype == 'swap':
703 return value, sentinel
704 elif self.vtype == 'date':
705 return parse_date(sentinel), parse_date(value)
706 elif self.vtype == 'age':
707 if not isinstance(sentinel, datetime.datetime):
708 sentinel = datetime.datetime.now(tz=tzutc()) - timedelta(sentinel)
709 value = parse_date(value)
710 if value is None:
711 # compatiblity
712 value = 0
713 # Reverse the age comparison, we want to compare the value being
714 # greater than the sentinel typically. Else the syntax for age
715 # comparisons is intuitively wrong.
716 return value, sentinel
717 elif self.vtype == 'cidr':
718 s = parse_cidr(sentinel)
719 v = parse_cidr(value)
720 if (isinstance(s, ipaddress._BaseAddress) and isinstance(v, ipaddress._BaseNetwork)):
721 return v, s
722 return s, v
723 elif self.vtype == 'cidr_size':
724 cidr = parse_cidr(value)
725 if cidr:
726 return sentinel, cidr.prefixlen
727 return sentinel, 0
729 # Allows for expiration filtering, for events in the future as opposed
730 # to events in the past which age filtering allows for.
731 elif self.vtype == 'expiration':
732 if not isinstance(sentinel, datetime.datetime):
733 sentinel = datetime.datetime.now(tz=tzutc()) + timedelta(sentinel)
734 value = parse_date(value)
735 if value is None:
736 value = 0
737 return sentinel, value
739 # Allows for comparing version numbers, for things that you expect a minimum version number.
740 elif self.vtype == 'version':
741 s = ComparableVersion(sentinel)
742 v = ComparableVersion(value)
743 return s, v
745 return sentinel, value
748class AgeFilter(Filter):
749 """Automatically filter resources older than a given date.
751 **Deprecated** use a value filter with `value_type: age` which can be
752 done on any attribute.
753 """
754 threshold_date = None
756 # The name of attribute to compare to threshold; must override in subclass
757 date_attribute = None
759 schema = None
761 def validate(self):
762 if not self.date_attribute:
763 raise NotImplementedError(
764 "date_attribute must be overriden in subclass")
765 return self
767 def get_resource_date(self, i):
768 v = i[self.date_attribute]
769 if not isinstance(v, datetime.datetime):
770 v = parse(v)
771 if not v.tzinfo:
772 v = v.replace(tzinfo=tzutc())
773 return v
775 def __call__(self, i):
776 v = self.get_resource_date(i)
777 if v is None:
778 return False
779 op = OPERATORS[self.data.get('op', 'greater-than')]
781 if not self.threshold_date:
783 days = self.data.get('days', 0)
784 hours = self.data.get('hours', 0)
785 minutes = self.data.get('minutes', 0)
786 # Work around placebo issues with tz
787 if v.tzinfo:
788 n = datetime.datetime.now(tz=tzutc())
789 else:
790 n = datetime.datetime.now()
791 self.threshold_date = n - timedelta(days=days, hours=hours, minutes=minutes)
793 return op(self.threshold_date, v)
796class EventFilter(ValueFilter):
797 """Filter a resource based on an event."""
799 schema = type_schema('event', rinherit=ValueFilter.schema)
800 schema_alias = True
802 def validate(self):
803 if 'mode' not in self.manager.data:
804 raise PolicyValidationError(
805 "Event filters can only be used with lambda policies in %s" % (
806 self.manager.data,))
807 return self
809 def process(self, resources, event=None):
810 if event is None:
811 return resources
812 if self(event):
813 return resources
814 return []
817class ValueRegex:
818 """Allows filtering based on the output of a regex capture.
819 This is useful for parsing data that has a weird format.
821 Instead of comparing the contents of the 'resource value' with the 'value',
822 it will instead apply the regex to contents of the 'resource value', and compare
823 the result of the capture group defined in that regex with the 'value'.
824 Therefore you must have a single capture group defined in the regex.
826 If the regex doesn't find a match it will return 'None'
828 Example of getting a datetime object to make an 'expiration' comparison::
830 type: value
831 value_regex: ".*delete_after=([0-9]{4}-[0-9]{2}-[0-9]{2}).*"
832 key: "tag:company_mandated_metadata"
833 value_type: expiration
834 op: lte
835 value: 0
836 """
838 def __init__(self, expr):
839 self.expr = expr
841 def get_resource_value(self, resource):
842 if resource is None:
843 return resource
844 try:
845 capture = re.match(self.expr, resource)
846 except (ValueError, TypeError):
847 return None
848 if capture is None: # regex didn't capture anything
849 return None
850 return capture.group(1)
853class ReduceFilter(BaseValueFilter):
854 """Generic reduce filter to group, sort, and limit your resources.
856 This example will select the longest running instance from each ASG,
857 then randomly choose 10% of those, maxing at 15 total instances.
859 :example:
861 .. code-block:: yaml
863 - name: oldest-instance-by-asg
864 resource: ec2
865 filters:
866 - "tag:aws:autoscaling:groupName": present
867 - type: reduce
868 group-by: "tag:aws:autoscaling:groupName"
869 sort-by: "LaunchTime"
870 order: asc
871 limit: 1
873 Or you might want to randomly select a 10 percent of your resources,
874 but no more than 15.
876 :example:
878 .. code-block:: yaml
880 - name: random-selection
881 resource: ec2
882 filters:
883 - type: reduce
884 order: randomize
885 limit: 15
886 limit-percent: 10
888 """
889 annotate = False
891 schema = {
892 'type': 'object',
893 # Doesn't mix well with inherits that extend
894 'additionalProperties': False,
895 'required': ['type'],
896 'properties': {
897 # Doesn't mix well as enum with inherits that extend
898 'type': {'enum': ['reduce']},
899 'group-by': {
900 'oneOf': [
901 {'type': 'string'},
902 {
903 'type': 'object',
904 'key': {'type': 'string'},
905 'value_type': {'enum': ['string', 'number', 'date']},
906 'value_regex': 'string',
907 },
908 ]
909 },
910 'sort-by': {
911 'oneOf': [
912 {'type': 'string'},
913 {
914 'type': 'object',
915 'key': {'type': 'string'},
916 'value_type': {'enum': ['string', 'number', 'date']},
917 'value_regex': 'string',
918 },
919 ]
920 },
921 'order': {'enum': ['asc', 'desc', 'reverse', 'randomize']},
922 'null-order': {'enum': ['first', 'last']},
923 'limit': {'type': 'number', 'minimum': 0},
924 'limit-percent': {'type': 'number', 'minimum': 0, 'maximum': 100},
925 'discard': {'type': 'number', 'minimum': 0},
926 'discard-percent': {'type': 'number', 'minimum': 0, 'maximum': 100},
927 },
928 }
929 schema_alias = True
931 def __init__(self, data, manager):
932 super(ReduceFilter, self).__init__(data, manager)
933 self.order = self.data.get('order', 'asc')
934 self.group_by = self.get_sort_config('group-by')
935 self.sort_by = self.get_sort_config('sort-by')
937 def validate(self):
938 # make sure the regexes compile
939 if 'value_regex' in self.group_by:
940 self._validate_value_regex(self.group_by['value_regex'])
941 if 'value_regex' in self.sort_by:
942 self._validate_value_regex(self.sort_by['value_regex'])
943 return self
945 def process(self, resources, event=None):
946 groups = self.group(resources)
948 # specified either of the sorting options, so sort
949 if 'sort-by' in self.data or 'order' in self.data:
950 groups = self.sort_groups(groups)
952 # now apply any limits to the groups and concatenate
953 return list(filter(None, self.limit(groups)))
955 def group(self, resources):
956 groups = {}
957 for r in resources:
958 v = self._value_to_sort(self.group_by, r)
959 vstr = str(v)
960 if vstr not in groups:
961 groups[vstr] = {'sortkey': v, 'resources': []}
962 groups[vstr]['resources'].append(r)
963 return groups
965 def get_sort_config(self, key):
966 # allow `foo: bar` but convert to
967 # `foo: {'key': bar}`
968 d = self.data.get(key, {})
969 if isinstance(d, str):
970 d = {'key': d}
971 d['null_sort_value'] = self.null_sort_value(d)
972 return d
974 def sort_groups(self, groups):
975 for g in groups:
976 groups[g]['resources'] = self.reorder(
977 groups[g]['resources'],
978 key=lambda r: self._value_to_sort(self.sort_by, r),
979 )
980 return groups
982 def _value_to_sort(self, config, r):
983 expr = config.get('key')
984 vtype = config.get('value_type', 'string')
985 vregex = config.get('value_regex')
986 v = None
988 try:
989 # extract value based on jmespath
990 if expr:
991 v = self.get_resource_value(expr, r, vregex)
993 if v is not None:
994 # now convert to expected type
995 if vtype == 'number':
996 v = float(v)
997 elif vtype == 'date':
998 v = parse_date(v)
999 else:
1000 v = str(v)
1001 except (AttributeError, ValueError):
1002 v = None
1004 if v is None:
1005 v = config.get('null_sort_value')
1006 return v
1008 def null_sort_value(self, config):
1009 vtype = config.get('value_type', 'string')
1010 placement = self.data.get('null-order', 'last')
1012 if (placement == 'last' and self.order == 'desc') or (
1013 placement != 'last' and self.order != 'desc'
1014 ):
1015 # return a value that will sort first
1016 if vtype == 'number':
1017 return float('-inf')
1018 elif vtype == 'date':
1019 return datetime.datetime.min.replace(tzinfo=tzutc())
1020 return ''
1021 else:
1022 # return a value that will sort last
1023 if vtype == 'number':
1024 return float('inf')
1025 elif vtype == 'date':
1026 return datetime.datetime.max.replace(tzinfo=tzutc())
1027 return '\uffff'
1029 def limit(self, groups):
1030 results = []
1032 max = self.data.get('limit', 0)
1033 pct = self.data.get('limit-percent', 0)
1034 drop = self.data.get('discard', 0)
1035 droppct = self.data.get('discard-percent', 0)
1036 ordered = list(groups)
1037 if 'group-by' in self.data or 'order' in self.data:
1038 ordered = self.reorder(ordered, key=lambda r: groups[r]['sortkey'])
1039 for g in ordered:
1040 # discard X first
1041 if droppct > 0:
1042 n = int(droppct / 100 * len(groups[g]['resources']))
1043 if n > drop:
1044 drop = n
1045 if drop > 0:
1046 groups[g]['resources'] = groups[g]['resources'][drop:]
1048 # then limit the remaining
1049 count = len(groups[g]['resources'])
1050 if pct > 0:
1051 count = int(pct / 100 * len(groups[g]['resources']))
1052 if max > 0 and max < count:
1053 count = max
1054 results.extend(groups[g]['resources'][0:count])
1055 return results
1057 def reorder(self, items, key=None):
1058 if self.order == 'randomize':
1059 return sample(items, k=len(items))
1060 elif self.order == 'reverse':
1061 return items[::-1]
1062 else:
1063 return sorted(items, key=key, reverse=(self.order == 'desc'))
1066class ListItemModel:
1067 id = 'c7n:_id'
1070class ListItemRegistry(FilterRegistry):
1072 def __init__(self, *args, **kw):
1073 super(FilterRegistry, self).__init__(*args, **kw)
1074 self.register('value', ValueFilter)
1075 self.register('or', Or)
1076 self.register('and', And)
1077 self.register('not', Not)
1078 self.register('reduce', ReduceFilter)
1081class ListItemResourceManager(ResourceManager):
1082 filter_registry = ListItemRegistry('filters')
1084 def get_model(self):
1085 return ListItemModel
1088class ListItemFilter(Filter):
1089 """
1090 Perform multi attribute filtering on items within a list,
1091 for example looking for security groups that have rules which
1092 include 0.0.0.0/0 and port 22 open.
1094 :example:
1096 .. code-block:: yaml
1098 policies:
1099 - name: security-group-with-22-open-to-world
1100 resource: aws.security-group
1101 filters:
1102 - type: list-item
1103 key: IpPermissions
1104 attrs:
1105 - type: value
1106 key: IpRanges[].CidrIp
1107 value: '0.0.0.0/0'
1108 op: in
1109 value_type: swap
1110 - type: value
1111 key: FromPort
1112 value: 22
1113 - type: value
1114 key: ToPort
1115 value: 22
1116 - name: find-task-def-not-using-registry
1117 resource: aws.ecs-task-definition
1118 filters:
1119 - not:
1120 - type: list-item
1121 key: containerDefinitions
1122 attrs:
1123 - not:
1124 - type: value
1125 key: image
1126 value: "${account_id}.dkr.ecr.us-east-2.amazonaws.com.*"
1127 op: regex
1128 """
1130 schema = type_schema(
1131 'list-item',
1132 **{
1133 'key': {'type': 'string'},
1134 'attrs': {'$ref': '#/definitions/filters_common/list_item_attrs'},
1135 'count': {'type': 'number'},
1136 'count_op': {'$ref': '#/definitions/filters_common/comparison_operators'},
1137 },
1138 )
1140 schema_alias = True
1141 annotate_items = False
1143 _expr = None
1145 @property
1146 def expr(self):
1147 if self._expr:
1148 return self._expr
1149 self._expr = jmespath_compile(self.data['key'])
1150 return self._expr
1152 def check_count(self, rcount):
1153 if 'count' not in self.data:
1154 return False
1155 count = self.data['count']
1156 op = OPERATORS[self.data.get('count_op', 'eq')]
1157 if op(rcount, count):
1158 return True
1160 def process(self, resources, event=None):
1161 result = []
1162 frm = ListItemResourceManager(
1163 self.manager.ctx, data={'filters': self.data.get('attrs', [])})
1164 for r in resources:
1165 list_values = self.get_item_values(r)
1166 if not list_values:
1167 if self.check_count(0):
1168 result.append(r)
1169 continue
1170 if not isinstance(list_values, list):
1171 item_type = type(list_values)
1172 raise PolicyExecutionError(
1173 f"list-item filter value for {self.data['key']} is a {item_type} not a list"
1174 )
1175 for idx, list_value in enumerate(list_values):
1176 list_value['c7n:_id'] = idx
1177 list_resources = frm.filter_resources(list_values, event)
1178 matched_indicies = [r['c7n:_id'] for r in list_resources]
1179 for idx, list_value in enumerate(list_values):
1180 list_value.pop('c7n:_id')
1181 if 'count' in self.data:
1182 if self.check_count(len(list_resources)):
1183 result.append(r)
1184 elif list_resources:
1185 if not self.annotate_items:
1186 annotations = [
1187 f'{self.data.get("key", self.type)}[{str(i)}]'
1188 for i in matched_indicies
1189 ]
1190 else:
1191 annotations = list_resources
1192 r.setdefault('c7n:ListItemMatches', [])
1193 r['c7n:ListItemMatches'].extend(annotations)
1194 result.append(r)
1195 return result
1197 def get_item_values(self, resource):
1198 return self.expr.search(resource)
1200 def __call__(self, resource):
1201 if self.process((resource,)):
1202 return True
1203 return False