Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/kms.py: 29%

38 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from .core import ValueFilter 

4from .related import RelatedResourceFilter 

5from c7n.utils import type_schema 

6 

7 

8class KmsRelatedFilter(RelatedResourceFilter): 

9 """ 

10 Filter a resource by its associated kms key and optionally the aliasname 

11 of the kms key by using 'c7n:AliasName' 

12 

13 :example: 

14 

15 Match a specific key alias: 

16 

17 .. code-block:: yaml 

18 

19 policies: 

20 - name: dms-encrypt-key-check 

21 resource: dms-instance 

22 filters: 

23 - type: kms-key 

24 key: "c7n:AliasName" 

25 value: alias/aws/dms 

26 

27 Or match against native key attributes such as ``KeyManager``, which 

28 more explicitly distinguishes between ``AWS`` and ``CUSTOMER``-managed 

29 keys. The above policy can also be written as: 

30 

31 .. code-block:: yaml 

32 

33 policies: 

34 - name: dms-aws-managed-key 

35 resource: dms-instance 

36 filters: 

37 - type: kms-key 

38 key: KeyManager 

39 value: AWS 

40 """ 

41 

42 schema = type_schema( 

43 'kms-key', rinherit=ValueFilter.schema, 

44 **{'match-resource': {'type': 'boolean'}, 

45 'operator': {'enum': ['and', 'or']}}) 

46 RelatedResource = "c7n.resources.kms.Key" 

47 AnnotationKey = "matched-kms-key" 

48 

49 def get_related(self, resources): 

50 resource_manager = self.get_resource_manager() 

51 related_ids = self.get_related_ids(resources) 

52 if len(related_ids) < self.FetchThreshold: 

53 related = resource_manager.get_resources(list(related_ids)) 

54 else: 

55 related = resource_manager.resources() 

56 related_map = {} 

57 

58 for r in related: 

59 # `AliasNames` is set when we fetch keys, but only for keys 

60 # which have aliases defined. Fall back to an empty string 

61 # to avoid lookup errors in filters. 

62 r['c7n:AliasName'] = r.get('AliasNames', ('',))[0] 

63 related_map[r['KeyId']] = r 

64 

65 return related_map 

66 

67 def get_related_ids(self, resources): 

68 related_ids = super().get_related_ids(resources) 

69 normalized_ids = [] 

70 for rid in related_ids: 

71 if rid.startswith('alias'): 

72 rid = self.alias_to_id.get(rid, rid) 

73 if rid.startswith('arn:'): 

74 normalized_ids.append(rid.rsplit('/', 1)[-1]) 

75 else: 

76 normalized_ids.append(rid) 

77 return normalized_ids 

78 

79 def process(self, resources, event=None): 

80 self.alias_to_id = self.key_alias_to_key_id() 

81 related = self.get_related(resources) 

82 return [r for r in resources if self.process_resource(r, related)] 

83 

84 def key_alias_to_key_id(self): 

85 # convert key alias to key id for cache lookup 

86 # else cache lookup returns [] even if the key exists 

87 key_manager = self.get_resource_manager() 

88 alias_to_id = {} 

89 for kid, kaliases in key_manager.alias_map.items(): 

90 alias_to_id.update({alias: kid for alias in kaliases}) 

91 return alias_to_id