Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/config.py: 27%

60 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from c7n.filters import ValueFilter 

4from c7n.manager import resources 

5from c7n.utils import local_session, type_schema 

6 

7from .core import Filter 

8 

9 

10class ConfigCompliance(Filter): 

11 """Filter resources by their compliance with one or more AWS config rules. 

12 

13 An example of using the filter to find all ec2 instances that have 

14 been registered as non compliant in the last 30 days against two 

15 custom AWS Config rules. 

16 

17 :example: 

18 

19 .. code-block:: yaml 

20 

21 policies: 

22 - name: non-compliant-ec2 

23 resource: ec2 

24 filters: 

25 - type: config-compliance 

26 eval_filters: 

27 - type: value 

28 key: ResultRecordedTime 

29 value_type: age 

30 value: 30 

31 op: less-than 

32 rules: 

33 - custodian-ec2-encryption-required 

34 - custodian-ec2-tags-required 

35 

36 Also note, custodian has direct support for deploying policies as config 

37 rules see https://cloudcustodian.io/docs/policy/lambda.html#config-rules 

38 """ 

39 permissions = ('config:DescribeComplianceByConfigRule',) 

40 schema = type_schema( 

41 'config-compliance', 

42 required=('rules',), 

43 op={'enum': ['or', 'and']}, 

44 eval_filters={'type': 'array', 'items': { 

45 'oneOf': [ 

46 {'$ref': '#/definitions/filters/valuekv'}, 

47 {'$ref': '#/definitions/filters/value'}]}}, 

48 states={'type': 'array', 'items': {'enum': [ 

49 'COMPLIANT', 'NON_COMPLIANT', 

50 'NOT_APPLICABLE', 'INSUFFICIENT_DATA']}}, 

51 rules={'type': 'array', 'items': {'type': 'string'}}) 

52 schema_alias = True 

53 annotation_key = 'c7n:config-compliance' 

54 

55 def get_resource_map(self, filters, resource_model, resources): 

56 rule_ids = self.data.get('rules') 

57 states = self.data.get('states', ['NON_COMPLIANT']) 

58 op = self.data.get('op', 'or') == 'or' and any or all 

59 

60 client = local_session(self.manager.session_factory).client('config') 

61 resource_map = {} 

62 

63 for rid in rule_ids: 

64 pager = client.get_paginator('get_compliance_details_by_config_rule') 

65 for page in pager.paginate( 

66 ConfigRuleName=rid, ComplianceTypes=states): 

67 evaluations = page.get('EvaluationResults', ()) 

68 

69 for e in evaluations: 

70 rident = e['EvaluationResultIdentifier'][ 

71 'EvaluationResultQualifier'] 

72 # for multi resource type rules, only look at 

73 # results for the resource type currently being 

74 # processed. 

75 if rident['ResourceType'] not in ( 

76 resource_model.config_type, 

77 resource_model.cfn_type): 

78 continue 

79 

80 if not filters: 

81 resource_map.setdefault( 

82 rident['ResourceId'], []).append(e) 

83 continue 

84 

85 if op([f.match(e) for f in filters]): 

86 resource_map.setdefault( 

87 rident['ResourceId'], []).append(e) 

88 

89 return resource_map 

90 

91 def process(self, resources, event=None): 

92 filters = [] 

93 for f in self.data.get('eval_filters', ()): 

94 vf = ValueFilter(f) 

95 vf.annotate = False 

96 filters.append(vf) 

97 

98 resource_model = self.manager.get_model() 

99 resource_map = self.get_resource_map(filters, resource_model, resources) 

100 

101 # Avoid static/import time dep on boto in filters package 

102 from c7n.resources.aws import Arn 

103 results = [] 

104 for arn, r in zip(self.manager.get_arns(resources), resources): 

105 # many aws provided rules are inconsistent in their 

106 # treatment of resource ids, some use arns, some use names 

107 # as identifiers for the same resource type. security 

108 # hub in particular is bad at consistency. 

109 rid = None 

110 if arn in resource_map: 

111 rid = arn 

112 elif r[resource_model.id] in resource_map: 

113 rid = r[resource_model.id] 

114 if arn == r[resource_model.id] and not rid: 

115 rid = Arn.parse(arn).resource 

116 if rid not in resource_map: 

117 rid = None 

118 if rid is None: 

119 continue 

120 r[self.annotation_key] = resource_map[rid] 

121 results.append(r) 

122 return results 

123 

124 @classmethod 

125 def register_resources(klass, registry, resource_class): 

126 """model resource subscriber on resource registration. 

127 

128 Watch for new resource types being registered if they are 

129 supported by aws config, automatically, register the 

130 config-compliance filter. 

131 """ 

132 if (resource_class.resource_type.cfn_type is None and 

133 resource_class.resource_type.config_type is None): 

134 return 

135 resource_class.filter_registry.register('config-compliance', klass) 

136 

137 

138resources.subscribe(ConfigCompliance.register_resources)