Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/kafka.py: 54%
91 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from c7n.actions import Action
4from c7n.filters.vpc import SecurityGroupFilter, SubnetFilter
5from c7n.manager import resources
6from c7n.filters.kms import KmsRelatedFilter
7from c7n.query import QueryResourceManager, TypeInfo, DescribeSource, ConfigSource
8from c7n.utils import local_session, type_schema
10from .aws import shape_validate
13class DescribeKafka(DescribeSource):
15 def augment(self, resources):
16 for r in resources:
17 # preserve backwards compat with extant list_clsuters api
18 if 'Provisioned' in r:
19 for k, v in r['Provisioned'].items():
20 # dont overwrite
21 if k in r:
22 continue
23 r[k] = v
24 if 'Tags' not in r:
25 continue
26 tags = []
27 for k, v in r['Tags'].items():
28 tags.append({'Key': k, 'Value': v})
29 r['Tags'] = tags
30 return resources
33@resources.register('kafka')
34class Kafka(QueryResourceManager):
36 class resource_type(TypeInfo):
37 service = 'kafka'
38 enum_spec = ('list_clusters_v2', 'ClusterInfoList', None)
39 arn = id = 'ClusterArn'
40 name = 'ClusterName'
41 date = 'CreationTime'
42 filter_name = 'ClusterNameFilter'
43 filter_type = 'scalar'
44 universal_taggable = object()
45 cfn_type = config_type = 'AWS::MSK::Cluster'
47 source_mapping = {
48 'describe': DescribeKafka,
49 'config': ConfigSource
50 }
53@Kafka.filter_registry.register('security-group')
54class KafkaSGFilter(SecurityGroupFilter):
56 RelatedIdsExpression = "BrokerNodeGroupInfo.SecurityGroups[]"
59@Kafka.filter_registry.register('subnet')
60class KafkaCompoundSubnetFilter(SubnetFilter):
62 RelatedIdsExpression = "compound"
64 def process(self, resources, event=None):
65 # kafka v2 has both serverless and provisioned resources which have two different
66 # locations for their subnet info
68 class ProvisionedSubnetFilter(SubnetFilter):
69 RelatedIdsExpression = "Provisioned.BrokerNodeGroupInfo.ClientSubnets[]"
71 class ServerlessSubnetFilter(SubnetFilter):
72 RelatedIdsExpression = "Serverless.VpcConfigs[].SubnetIds[]"
74 p = []
75 s = []
77 for r in resources:
78 if r['ClusterType'] == 'PROVISIONED':
79 p.append(r)
80 if r['ClusterType'] == 'SERVERLESS':
81 s.append(r)
83 result = []
84 for filtered, fil in ((p, ProvisionedSubnetFilter), (s, ServerlessSubnetFilter), ):
85 f = fil(self.data, self.manager)
86 # necessary to validate otherwise the filter wont work
87 f.validate()
88 result.extend(f.process(filtered, event))
90 return result
93@Kafka.filter_registry.register('kms-key')
94class KafkaKmsFilter(KmsRelatedFilter):
95 """
97 Filter a kafka cluster's data-volume encryption by its associcated kms key
98 and optionally the aliasname of the kms key by using 'c7n:AliasName'
100 :example:
102 .. code-block:: yaml
104 policies:
105 - name: kafka-kms-key-filter
106 resource: kafka
107 filters:
108 - type: kms-key
109 key: c7n:AliasName
110 value: alias/aws/kafka
111 """
112 RelatedIdsExpression = 'Provisioned.EncryptionInfo.EncryptionAtRest.DataVolumeKMSKeyId'
115@Kafka.action_registry.register('set-monitoring')
116class SetMonitoring(Action):
118 schema = type_schema(
119 'set-monitoring',
120 config={'type': 'object', 'minProperties': 1},
121 required=('config',))
123 shape = 'UpdateMonitoringRequest'
124 permissions = ('kafka:UpdateClusterConfiguration',)
126 def validate(self):
127 attrs = dict(self.data.get('config', {}))
128 attrs['ClusterArn'] = 'arn:'
129 attrs['CurrentVersion'] = '123'
130 shape_validate(attrs, self.shape, 'kafka')
131 return super(SetMonitoring, self).validate()
133 def process(self, resources):
134 client = local_session(self.manager.session_factory).client('kafka')
135 for r in self.filter_resources(resources, 'State', ('ACTIVE',)):
136 params = dict(self.data.get('config', {}))
137 params['ClusterArn'] = r['ClusterArn']
138 params['CurrentVersion'] = r['CurrentVersion']
139 client.update_monitoring(**params)
142@Kafka.action_registry.register('delete')
143class Delete(Action):
145 schema = type_schema('delete')
146 permissions = ('kafka:DeleteCluster',)
148 def process(self, resources):
149 client = local_session(self.manager.session_factory).client('kafka')
151 for r in resources:
152 try:
153 client.delete_cluster(ClusterArn=r['ClusterArn'])
154 except client.exceptions.NotFoundException:
155 continue