Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/kafka.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

113 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from c7n.actions import Action 

4from c7n.filters.vpc import SecurityGroupFilter, SubnetFilter 

5from c7n.manager import resources 

6from c7n.filters.kms import KmsRelatedFilter 

7from c7n.query import QueryResourceManager, TypeInfo, DescribeSource, ConfigSource 

8from c7n.utils import local_session, type_schema 

9 

10from .aws import shape_validate 

11 

12 

13class DescribeKafka(DescribeSource): 

14 

15 def augment(self, resources): 

16 for r in resources: 

17 # preserve backwards compat with extant list_clsuters api 

18 if 'Provisioned' in r: 

19 for k, v in r['Provisioned'].items(): 

20 # dont overwrite 

21 if k in r: 

22 continue 

23 r[k] = v 

24 if 'Tags' not in r: 

25 continue 

26 tags = [] 

27 for k, v in r['Tags'].items(): 

28 tags.append({'Key': k, 'Value': v}) 

29 r['Tags'] = tags 

30 return resources 

31 

32 

33@resources.register('kafka') 

34class Kafka(QueryResourceManager): 

35 

36 class resource_type(TypeInfo): 

37 service = 'kafka' 

38 enum_spec = ('list_clusters_v2', 'ClusterInfoList', None) 

39 arn = id = 'ClusterArn' 

40 name = 'ClusterName' 

41 date = 'CreationTime' 

42 filter_name = 'ClusterNameFilter' 

43 filter_type = 'scalar' 

44 universal_taggable = object() 

45 cfn_type = config_type = 'AWS::MSK::Cluster' 

46 permissions_augment = ("kafka:ListTagsForResource",) 

47 

48 source_mapping = { 

49 'describe': DescribeKafka, 

50 'config': ConfigSource 

51 } 

52 

53 

54@Kafka.filter_registry.register('security-group') 

55class KafkaSGFilter(SecurityGroupFilter): 

56 

57 RelatedIdsExpression = "BrokerNodeGroupInfo.SecurityGroups[]" 

58 

59 

60@Kafka.filter_registry.register('subnet') 

61class KafkaCompoundSubnetFilter(SubnetFilter): 

62 

63 RelatedIdsExpression = "compound" 

64 

65 def process(self, resources, event=None): 

66 # kafka v2 has both serverless and provisioned resources which have two different 

67 # locations for their subnet info 

68 

69 class ProvisionedSubnetFilter(SubnetFilter): 

70 RelatedIdsExpression = "Provisioned.BrokerNodeGroupInfo.ClientSubnets[]" 

71 

72 class ServerlessSubnetFilter(SubnetFilter): 

73 RelatedIdsExpression = "Serverless.VpcConfigs[].SubnetIds[]" 

74 

75 p = [] 

76 s = [] 

77 

78 for r in resources: 

79 if r['ClusterType'] == 'PROVISIONED': 

80 p.append(r) 

81 if r['ClusterType'] == 'SERVERLESS': 

82 s.append(r) 

83 

84 result = [] 

85 for filtered, fil in ((p, ProvisionedSubnetFilter), (s, ServerlessSubnetFilter), ): 

86 f = fil(self.data, self.manager) 

87 # necessary to validate otherwise the filter wont work 

88 f.validate() 

89 result.extend(f.process(filtered, event)) 

90 

91 return result 

92 

93 

94@Kafka.filter_registry.register('kms-key') 

95class KafkaKmsFilter(KmsRelatedFilter): 

96 """ 

97 

98 Filter a kafka cluster's data-volume encryption by its associcated kms key 

99 and optionally the aliasname of the kms key by using 'c7n:AliasName' 

100 

101 :example: 

102 

103 .. code-block:: yaml 

104 

105 policies: 

106 - name: kafka-kms-key-filter 

107 resource: kafka 

108 filters: 

109 - type: kms-key 

110 key: c7n:AliasName 

111 value: alias/aws/kafka 

112 """ 

113 RelatedIdsExpression = 'Provisioned.EncryptionInfo.EncryptionAtRest.DataVolumeKMSKeyId' 

114 

115 

116@Kafka.action_registry.register('set-monitoring') 

117class SetMonitoring(Action): 

118 

119 schema = type_schema( 

120 'set-monitoring', 

121 config={'type': 'object', 'minProperties': 1}, 

122 required=('config',)) 

123 

124 shape = 'UpdateMonitoringRequest' 

125 permissions = ('kafka:UpdateClusterConfiguration',) 

126 

127 def validate(self): 

128 attrs = dict(self.data.get('config', {})) 

129 attrs['ClusterArn'] = 'arn:' 

130 attrs['CurrentVersion'] = '123' 

131 shape_validate(attrs, self.shape, 'kafka') 

132 return super(SetMonitoring, self).validate() 

133 

134 def process(self, resources): 

135 client = local_session(self.manager.session_factory).client('kafka') 

136 for r in self.filter_resources(resources, 'State', ('ACTIVE',)): 

137 params = dict(self.data.get('config', {})) 

138 params['ClusterArn'] = r['ClusterArn'] 

139 params['CurrentVersion'] = r['CurrentVersion'] 

140 client.update_monitoring(**params) 

141 

142 

143@Kafka.action_registry.register('delete') 

144class Delete(Action): 

145 

146 schema = type_schema('delete') 

147 permissions = ('kafka:DeleteCluster',) 

148 

149 def process(self, resources): 

150 client = local_session(self.manager.session_factory).client('kafka') 

151 

152 for r in resources: 

153 try: 

154 client.delete_cluster(ClusterArn=r['ClusterArn']) 

155 except client.exceptions.NotFoundException: 

156 continue 

157 

158 

159@resources.register('kafka-config') 

160class KafkaClusterConfiguration(QueryResourceManager): 

161 """ Resource Manager for MSK Kafka Configuration. 

162 """ 

163 

164 class resource_type(TypeInfo): 

165 service = 'kafka' 

166 enum_spec = ('list_configurations', 'Configurations', None) 

167 name = 'Name' 

168 id = arn = 'Arn' 

169 date = 'CreationTime' 

170 permissions_augment = ("kafka:ListConfigurations",) 

171 

172 

173@KafkaClusterConfiguration.action_registry.register('delete') 

174class DeleteClusterConfiguration(Action): 

175 """Delete MSK Cluster Configuration. 

176 

177 :example: 

178 

179 .. code-block:: yaml 

180 

181 policies: 

182 - name: msk-delete-cluster-configuration 

183 resource: aws.kafka-config 

184 actions: 

185 - type: delete 

186 """ 

187 schema = type_schema('delete') 

188 permissions = ('kafka:DeleteConfiguration',) 

189 

190 def process(self, resources): 

191 client = local_session(self.manager.session_factory).client('kafka') 

192 for r in resources: 

193 try: 

194 client.delete_configuration(Arn=r['Arn']) 

195 except client.exceptions.NotFoundException: 

196 continue