Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/kms.py: 28%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from .core import ValueFilter
4from .related import RelatedResourceFilter
5from c7n.utils import type_schema
8class KmsRelatedFilter(RelatedResourceFilter):
9 """
10 Filter a resource by its associated kms key and optionally the aliasname
11 of the kms key by using 'c7n:AliasName'
13 :example:
15 Match a specific key alias:
17 .. code-block:: yaml
19 policies:
20 - name: dms-encrypt-key-check
21 resource: dms-instance
22 filters:
23 - type: kms-key
24 key: "c7n:AliasName"
25 value: alias/aws/dms
27 Or match against native key attributes such as ``KeyManager``, which
28 more explicitly distinguishes between ``AWS`` and ``CUSTOMER``-managed
29 keys. The above policy can also be written as:
31 .. code-block:: yaml
33 policies:
34 - name: dms-aws-managed-key
35 resource: dms-instance
36 filters:
37 - type: kms-key
38 key: KeyManager
39 value: AWS
40 """
42 schema = type_schema(
43 'kms-key', rinherit=ValueFilter.schema,
44 **{'match-resource': {'type': 'boolean'},
45 'operator': {'enum': ['and', 'or']}})
46 RelatedResource = "c7n.resources.kms.Key"
47 AnnotationKey = "matched-kms-key"
49 def get_related(self, resources):
50 resource_manager = self.get_resource_manager()
51 related_ids = self.get_related_ids(resources)
52 if len(related_ids) < self.FetchThreshold:
53 related = resource_manager.get_resources(list(related_ids))
54 else:
55 related = resource_manager.resources()
56 related_map = {}
58 for r in related:
59 # `AliasNames` is set when we fetch keys, but only for keys
60 # which have aliases defined. Fall back to an empty string
61 # to avoid lookup errors in filters.
62 r['c7n:AliasName'] = r.get('AliasNames', ('',))[0]
63 related_map[r['KeyId']] = r
65 return related_map
67 def get_related_ids(self, resources):
68 related_ids = super().get_related_ids(resources)
69 normalized_ids = set()
70 for rid in related_ids:
71 if rid.startswith('arn:'): # key arn or alias arn
72 if 'alias/' in rid:
73 rid = rid.rsplit(':', 1)[-1] # alias name
74 else:
75 rid = rid.rsplit('/', 1)[-1] # key id
76 if rid.startswith('alias/'):
77 rid = self.alias_to_id.get(rid, rid)
78 normalized_ids.add(rid)
79 return normalized_ids
81 def process(self, resources, event=None):
82 self.alias_to_id = self.key_alias_to_key_id()
83 related = self.get_related(resources)
84 return [r for r in resources if self.process_resource(r, related)]
86 def key_alias_to_key_id(self):
87 # convert key alias to key id for cache lookup
88 # else cache lookup returns [] even if the key exists
89 key_manager = self.get_resource_manager()
90 alias_to_id = {}
91 for kid, kaliases in key_manager.alias_map.items():
92 alias_to_id.update({alias: kid for alias in kaliases})
93 return alias_to_id