Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/config.py: 27%
60 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from c7n.filters import ValueFilter
4from c7n.manager import resources
5from c7n.utils import local_session, type_schema
7from .core import Filter
10class ConfigCompliance(Filter):
11 """Filter resources by their compliance with one or more AWS config rules.
13 An example of using the filter to find all ec2 instances that have
14 been registered as non compliant in the last 30 days against two
15 custom AWS Config rules.
17 :example:
19 .. code-block:: yaml
21 policies:
22 - name: non-compliant-ec2
23 resource: ec2
24 filters:
25 - type: config-compliance
26 eval_filters:
27 - type: value
28 key: ResultRecordedTime
29 value_type: age
30 value: 30
31 op: less-than
32 rules:
33 - custodian-ec2-encryption-required
34 - custodian-ec2-tags-required
36 Also note, custodian has direct support for deploying policies as config
37 rules see https://cloudcustodian.io/docs/policy/lambda.html#config-rules
38 """
39 permissions = ('config:DescribeComplianceByConfigRule',)
40 schema = type_schema(
41 'config-compliance',
42 required=('rules',),
43 op={'enum': ['or', 'and']},
44 eval_filters={'type': 'array', 'items': {
45 'oneOf': [
46 {'$ref': '#/definitions/filters/valuekv'},
47 {'$ref': '#/definitions/filters/value'}]}},
48 states={'type': 'array', 'items': {'enum': [
49 'COMPLIANT', 'NON_COMPLIANT',
50 'NOT_APPLICABLE', 'INSUFFICIENT_DATA']}},
51 rules={'type': 'array', 'items': {'type': 'string'}})
52 schema_alias = True
53 annotation_key = 'c7n:config-compliance'
55 def get_resource_map(self, filters, resource_model, resources):
56 rule_ids = self.data.get('rules')
57 states = self.data.get('states', ['NON_COMPLIANT'])
58 op = self.data.get('op', 'or') == 'or' and any or all
60 client = local_session(self.manager.session_factory).client('config')
61 resource_map = {}
63 for rid in rule_ids:
64 pager = client.get_paginator('get_compliance_details_by_config_rule')
65 for page in pager.paginate(
66 ConfigRuleName=rid, ComplianceTypes=states):
67 evaluations = page.get('EvaluationResults', ())
69 for e in evaluations:
70 rident = e['EvaluationResultIdentifier'][
71 'EvaluationResultQualifier']
72 # for multi resource type rules, only look at
73 # results for the resource type currently being
74 # processed.
75 if rident['ResourceType'] not in (
76 resource_model.config_type,
77 resource_model.cfn_type):
78 continue
80 if not filters:
81 resource_map.setdefault(
82 rident['ResourceId'], []).append(e)
83 continue
85 if op([f.match(e) for f in filters]):
86 resource_map.setdefault(
87 rident['ResourceId'], []).append(e)
89 return resource_map
91 def process(self, resources, event=None):
92 filters = []
93 for f in self.data.get('eval_filters', ()):
94 vf = ValueFilter(f)
95 vf.annotate = False
96 filters.append(vf)
98 resource_model = self.manager.get_model()
99 resource_map = self.get_resource_map(filters, resource_model, resources)
101 # Avoid static/import time dep on boto in filters package
102 from c7n.resources.aws import Arn
103 results = []
104 for arn, r in zip(self.manager.get_arns(resources), resources):
105 # many aws provided rules are inconsistent in their
106 # treatment of resource ids, some use arns, some use names
107 # as identifiers for the same resource type. security
108 # hub in particular is bad at consistency.
109 rid = None
110 if arn in resource_map:
111 rid = arn
112 elif r[resource_model.id] in resource_map:
113 rid = r[resource_model.id]
114 if arn == r[resource_model.id] and not rid:
115 rid = Arn.parse(arn).resource
116 if rid not in resource_map:
117 rid = None
118 if rid is None:
119 continue
120 r[self.annotation_key] = resource_map[rid]
121 results.append(r)
122 return results
124 @classmethod
125 def register_resources(klass, registry, resource_class):
126 """model resource subscriber on resource registration.
128 Watch for new resource types being registered if they are
129 supported by aws config, automatically, register the
130 config-compliance filter.
131 """
132 if (resource_class.resource_type.cfn_type is None and
133 resource_class.resource_type.config_type is None):
134 return
135 resource_class.filter_registry.register('config-compliance', klass)
138resources.subscribe(ConfigCompliance.register_resources)