Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/kafka.py: 54%

91 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from c7n.actions import Action 

4from c7n.filters.vpc import SecurityGroupFilter, SubnetFilter 

5from c7n.manager import resources 

6from c7n.filters.kms import KmsRelatedFilter 

7from c7n.query import QueryResourceManager, TypeInfo, DescribeSource, ConfigSource 

8from c7n.utils import local_session, type_schema 

9 

10from .aws import shape_validate 

11 

12 

13class DescribeKafka(DescribeSource): 

14 

15 def augment(self, resources): 

16 for r in resources: 

17 # preserve backwards compat with extant list_clsuters api 

18 if 'Provisioned' in r: 

19 for k, v in r['Provisioned'].items(): 

20 # dont overwrite 

21 if k in r: 

22 continue 

23 r[k] = v 

24 if 'Tags' not in r: 

25 continue 

26 tags = [] 

27 for k, v in r['Tags'].items(): 

28 tags.append({'Key': k, 'Value': v}) 

29 r['Tags'] = tags 

30 return resources 

31 

32 

33@resources.register('kafka') 

34class Kafka(QueryResourceManager): 

35 

36 class resource_type(TypeInfo): 

37 service = 'kafka' 

38 enum_spec = ('list_clusters_v2', 'ClusterInfoList', None) 

39 arn = id = 'ClusterArn' 

40 name = 'ClusterName' 

41 date = 'CreationTime' 

42 filter_name = 'ClusterNameFilter' 

43 filter_type = 'scalar' 

44 universal_taggable = object() 

45 cfn_type = config_type = 'AWS::MSK::Cluster' 

46 

47 source_mapping = { 

48 'describe': DescribeKafka, 

49 'config': ConfigSource 

50 } 

51 

52 

53@Kafka.filter_registry.register('security-group') 

54class KafkaSGFilter(SecurityGroupFilter): 

55 

56 RelatedIdsExpression = "BrokerNodeGroupInfo.SecurityGroups[]" 

57 

58 

59@Kafka.filter_registry.register('subnet') 

60class KafkaCompoundSubnetFilter(SubnetFilter): 

61 

62 RelatedIdsExpression = "compound" 

63 

64 def process(self, resources, event=None): 

65 # kafka v2 has both serverless and provisioned resources which have two different 

66 # locations for their subnet info 

67 

68 class ProvisionedSubnetFilter(SubnetFilter): 

69 RelatedIdsExpression = "Provisioned.BrokerNodeGroupInfo.ClientSubnets[]" 

70 

71 class ServerlessSubnetFilter(SubnetFilter): 

72 RelatedIdsExpression = "Serverless.VpcConfigs[].SubnetIds[]" 

73 

74 p = [] 

75 s = [] 

76 

77 for r in resources: 

78 if r['ClusterType'] == 'PROVISIONED': 

79 p.append(r) 

80 if r['ClusterType'] == 'SERVERLESS': 

81 s.append(r) 

82 

83 result = [] 

84 for filtered, fil in ((p, ProvisionedSubnetFilter), (s, ServerlessSubnetFilter), ): 

85 f = fil(self.data, self.manager) 

86 # necessary to validate otherwise the filter wont work 

87 f.validate() 

88 result.extend(f.process(filtered, event)) 

89 

90 return result 

91 

92 

93@Kafka.filter_registry.register('kms-key') 

94class KafkaKmsFilter(KmsRelatedFilter): 

95 """ 

96 

97 Filter a kafka cluster's data-volume encryption by its associcated kms key 

98 and optionally the aliasname of the kms key by using 'c7n:AliasName' 

99 

100 :example: 

101 

102 .. code-block:: yaml 

103 

104 policies: 

105 - name: kafka-kms-key-filter 

106 resource: kafka 

107 filters: 

108 - type: kms-key 

109 key: c7n:AliasName 

110 value: alias/aws/kafka 

111 """ 

112 RelatedIdsExpression = 'Provisioned.EncryptionInfo.EncryptionAtRest.DataVolumeKMSKeyId' 

113 

114 

115@Kafka.action_registry.register('set-monitoring') 

116class SetMonitoring(Action): 

117 

118 schema = type_schema( 

119 'set-monitoring', 

120 config={'type': 'object', 'minProperties': 1}, 

121 required=('config',)) 

122 

123 shape = 'UpdateMonitoringRequest' 

124 permissions = ('kafka:UpdateClusterConfiguration',) 

125 

126 def validate(self): 

127 attrs = dict(self.data.get('config', {})) 

128 attrs['ClusterArn'] = 'arn:' 

129 attrs['CurrentVersion'] = '123' 

130 shape_validate(attrs, self.shape, 'kafka') 

131 return super(SetMonitoring, self).validate() 

132 

133 def process(self, resources): 

134 client = local_session(self.manager.session_factory).client('kafka') 

135 for r in self.filter_resources(resources, 'State', ('ACTIVE',)): 

136 params = dict(self.data.get('config', {})) 

137 params['ClusterArn'] = r['ClusterArn'] 

138 params['CurrentVersion'] = r['CurrentVersion'] 

139 client.update_monitoring(**params) 

140 

141 

142@Kafka.action_registry.register('delete') 

143class Delete(Action): 

144 

145 schema = type_schema('delete') 

146 permissions = ('kafka:DeleteCluster',) 

147 

148 def process(self, resources): 

149 client = local_session(self.manager.session_factory).client('kafka') 

150 

151 for r in resources: 

152 try: 

153 client.delete_cluster(ClusterArn=r['ClusterArn']) 

154 except client.exceptions.NotFoundException: 

155 continue