1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from c7n.actions import Action
4from c7n.filters.vpc import SecurityGroupFilter, SubnetFilter
5from c7n.manager import resources
6from c7n.filters.kms import KmsRelatedFilter
7from c7n.query import QueryResourceManager, TypeInfo, DescribeSource, ConfigSource
8from c7n.utils import local_session, type_schema
9
10from .aws import shape_validate
11
12
13class DescribeKafka(DescribeSource):
14
15 def augment(self, resources):
16 for r in resources:
17 # preserve backwards compat with extant list_clsuters api
18 if 'Provisioned' in r:
19 for k, v in r['Provisioned'].items():
20 # dont overwrite
21 if k in r:
22 continue
23 r[k] = v
24 if 'Tags' not in r:
25 continue
26 tags = []
27 for k, v in r['Tags'].items():
28 tags.append({'Key': k, 'Value': v})
29 r['Tags'] = tags
30 return resources
31
32
33@resources.register('kafka')
34class Kafka(QueryResourceManager):
35
36 class resource_type(TypeInfo):
37 service = 'kafka'
38 enum_spec = ('list_clusters_v2', 'ClusterInfoList', None)
39 arn = id = 'ClusterArn'
40 name = 'ClusterName'
41 date = 'CreationTime'
42 filter_name = 'ClusterNameFilter'
43 filter_type = 'scalar'
44 universal_taggable = object()
45 cfn_type = config_type = 'AWS::MSK::Cluster'
46 permissions_augment = ("kafka:ListTagsForResource",)
47
48 source_mapping = {
49 'describe': DescribeKafka,
50 'config': ConfigSource
51 }
52
53
54@Kafka.filter_registry.register('security-group')
55class KafkaSGFilter(SecurityGroupFilter):
56
57 RelatedIdsExpression = "BrokerNodeGroupInfo.SecurityGroups[]"
58
59
60@Kafka.filter_registry.register('subnet')
61class KafkaCompoundSubnetFilter(SubnetFilter):
62
63 RelatedIdsExpression = "compound"
64
65 def process(self, resources, event=None):
66 # kafka v2 has both serverless and provisioned resources which have two different
67 # locations for their subnet info
68
69 class ProvisionedSubnetFilter(SubnetFilter):
70 RelatedIdsExpression = "Provisioned.BrokerNodeGroupInfo.ClientSubnets[]"
71
72 class ServerlessSubnetFilter(SubnetFilter):
73 RelatedIdsExpression = "Serverless.VpcConfigs[].SubnetIds[]"
74
75 p = []
76 s = []
77
78 for r in resources:
79 if r['ClusterType'] == 'PROVISIONED':
80 p.append(r)
81 if r['ClusterType'] == 'SERVERLESS':
82 s.append(r)
83
84 result = []
85 for filtered, fil in ((p, ProvisionedSubnetFilter), (s, ServerlessSubnetFilter), ):
86 f = fil(self.data, self.manager)
87 # necessary to validate otherwise the filter wont work
88 f.validate()
89 result.extend(f.process(filtered, event))
90
91 return result
92
93
94@Kafka.filter_registry.register('kms-key')
95class KafkaKmsFilter(KmsRelatedFilter):
96 """
97
98 Filter a kafka cluster's data-volume encryption by its associcated kms key
99 and optionally the aliasname of the kms key by using 'c7n:AliasName'
100
101 :example:
102
103 .. code-block:: yaml
104
105 policies:
106 - name: kafka-kms-key-filter
107 resource: kafka
108 filters:
109 - type: kms-key
110 key: c7n:AliasName
111 value: alias/aws/kafka
112 """
113 RelatedIdsExpression = 'Provisioned.EncryptionInfo.EncryptionAtRest.DataVolumeKMSKeyId'
114
115
116@Kafka.action_registry.register('set-monitoring')
117class SetMonitoring(Action):
118
119 schema = type_schema(
120 'set-monitoring',
121 config={'type': 'object', 'minProperties': 1},
122 required=('config',))
123
124 shape = 'UpdateMonitoringRequest'
125 permissions = ('kafka:UpdateClusterConfiguration',)
126
127 def validate(self):
128 attrs = dict(self.data.get('config', {}))
129 attrs['ClusterArn'] = 'arn:'
130 attrs['CurrentVersion'] = '123'
131 shape_validate(attrs, self.shape, 'kafka')
132 return super(SetMonitoring, self).validate()
133
134 def process(self, resources):
135 client = local_session(self.manager.session_factory).client('kafka')
136 for r in self.filter_resources(resources, 'State', ('ACTIVE',)):
137 params = dict(self.data.get('config', {}))
138 params['ClusterArn'] = r['ClusterArn']
139 params['CurrentVersion'] = r['CurrentVersion']
140 client.update_monitoring(**params)
141
142
143@Kafka.action_registry.register('delete')
144class Delete(Action):
145
146 schema = type_schema('delete')
147 permissions = ('kafka:DeleteCluster',)
148
149 def process(self, resources):
150 client = local_session(self.manager.session_factory).client('kafka')
151
152 for r in resources:
153 try:
154 client.delete_cluster(ClusterArn=r['ClusterArn'])
155 except client.exceptions.NotFoundException:
156 continue
157
158
159@resources.register('kafka-config')
160class KafkaClusterConfiguration(QueryResourceManager):
161 """ Resource Manager for MSK Kafka Configuration.
162 """
163
164 class resource_type(TypeInfo):
165 service = 'kafka'
166 enum_spec = ('list_configurations', 'Configurations', None)
167 name = 'Name'
168 id = arn = 'Arn'
169 date = 'CreationTime'
170 permissions_augment = ("kafka:ListConfigurations",)
171
172
173@KafkaClusterConfiguration.action_registry.register('delete')
174class DeleteClusterConfiguration(Action):
175 """Delete MSK Cluster Configuration.
176
177 :example:
178
179 .. code-block:: yaml
180
181 policies:
182 - name: msk-delete-cluster-configuration
183 resource: aws.kafka-config
184 actions:
185 - type: delete
186 """
187 schema = type_schema('delete')
188 permissions = ('kafka:DeleteConfiguration',)
189
190 def process(self, resources):
191 client = local_session(self.manager.session_factory).client('kafka')
192 for r in resources:
193 try:
194 client.delete_configuration(Arn=r['Arn'])
195 except client.exceptions.NotFoundException:
196 continue