Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/kms.py: 29%
38 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from .core import ValueFilter
4from .related import RelatedResourceFilter
5from c7n.utils import type_schema
8class KmsRelatedFilter(RelatedResourceFilter):
9 """
10 Filter a resource by its associated kms key and optionally the aliasname
11 of the kms key by using 'c7n:AliasName'
13 :example:
15 Match a specific key alias:
17 .. code-block:: yaml
19 policies:
20 - name: dms-encrypt-key-check
21 resource: dms-instance
22 filters:
23 - type: kms-key
24 key: "c7n:AliasName"
25 value: alias/aws/dms
27 Or match against native key attributes such as ``KeyManager``, which
28 more explicitly distinguishes between ``AWS`` and ``CUSTOMER``-managed
29 keys. The above policy can also be written as:
31 .. code-block:: yaml
33 policies:
34 - name: dms-aws-managed-key
35 resource: dms-instance
36 filters:
37 - type: kms-key
38 key: KeyManager
39 value: AWS
40 """
42 schema = type_schema(
43 'kms-key', rinherit=ValueFilter.schema,
44 **{'match-resource': {'type': 'boolean'},
45 'operator': {'enum': ['and', 'or']}})
46 RelatedResource = "c7n.resources.kms.Key"
47 AnnotationKey = "matched-kms-key"
49 def get_related(self, resources):
50 resource_manager = self.get_resource_manager()
51 related_ids = self.get_related_ids(resources)
52 if len(related_ids) < self.FetchThreshold:
53 related = resource_manager.get_resources(list(related_ids))
54 else:
55 related = resource_manager.resources()
56 related_map = {}
58 for r in related:
59 # `AliasNames` is set when we fetch keys, but only for keys
60 # which have aliases defined. Fall back to an empty string
61 # to avoid lookup errors in filters.
62 r['c7n:AliasName'] = r.get('AliasNames', ('',))[0]
63 related_map[r['KeyId']] = r
65 return related_map
67 def get_related_ids(self, resources):
68 related_ids = super().get_related_ids(resources)
69 normalized_ids = []
70 for rid in related_ids:
71 if rid.startswith('alias'):
72 rid = self.alias_to_id.get(rid, rid)
73 if rid.startswith('arn:'):
74 normalized_ids.append(rid.rsplit('/', 1)[-1])
75 else:
76 normalized_ids.append(rid)
77 return normalized_ids
79 def process(self, resources, event=None):
80 self.alias_to_id = self.key_alias_to_key_id()
81 related = self.get_related(resources)
82 return [r for r in resources if self.process_resource(r, related)]
84 def key_alias_to_key_id(self):
85 # convert key alias to key id for cache lookup
86 # else cache lookup returns [] even if the key exists
87 key_manager = self.get_resource_manager()
88 alias_to_id = {}
89 for kid, kaliases in key_manager.alias_map.items():
90 alias_to_id.update({alias: kid for alias in kaliases})
91 return alias_to_id