1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from c7n.filters.iamaccess import _account, PolicyChecker
4from botocore.exceptions import ClientError
5
6from datetime import datetime, timezone
7import json
8from collections import defaultdict
9from functools import lru_cache
10
11from c7n.actions import RemovePolicyBase, BaseAction
12from c7n.filters import Filter, CrossAccountAccessFilter, ValueFilter
13from c7n.manager import resources
14from c7n.query import (
15 ConfigSource, DescribeSource, QueryResourceManager, RetryPageIterator, TypeInfo)
16from c7n.utils import local_session, type_schema, select_keys
17from c7n.tags import universal_augment
18
19from .securityhub import PostFinding
20
21
22class DescribeAlias(DescribeSource):
23
24 def augment(self, resources):
25 return [r for r in resources if 'TargetKeyId' in r]
26
27
28@resources.register('kms')
29class KeyAlias(QueryResourceManager):
30
31 class resource_type(TypeInfo):
32 service = 'kms'
33 arn_type = 'alias'
34 enum_spec = ('list_aliases', 'Aliases', None)
35 name = "AliasName"
36 id = "AliasArn"
37 config_type = cfn_type = 'AWS::KMS::Alias'
38
39 source_mapping = {'describe': DescribeAlias, 'config': ConfigSource}
40
41
42class DescribeKey(DescribeSource):
43
44 FetchThreshold = 10 # ie should we describe all keys or just fetch them directly
45
46 def get_resources(self, ids, cache=True):
47 # this forms a threshold beyond which we'll fetch individual keys of interest.
48 # else we'll need to fetch through the full set and client side filter.
49 if len(ids) < self.FetchThreshold:
50 client = local_session(self.manager.session_factory).client('kms')
51 results = []
52 for rid in ids:
53 try:
54 results.append(
55 self.manager.retry(
56 client.describe_key,
57 KeyId=rid)['KeyMetadata'])
58 except client.exceptions.NotFoundException:
59 continue
60 return results
61 return super().get_resources(ids, cache)
62
63 def augment(self, resources):
64 client = local_session(self.manager.session_factory).client('kms')
65 for r in resources:
66 key_id = r.get('KeyId')
67
68 # We get `KeyArn` from list_keys and `Arn` from describe_key.
69 # If we already have describe_key details we don't need to fetch
70 # it again.
71 if 'Arn' not in r:
72 try:
73 key_arn = r.get('KeyArn', key_id)
74 key_detail = client.describe_key(KeyId=key_arn)['KeyMetadata']
75 r.update(key_detail)
76 except ClientError as e:
77 if e.response['Error']['Code'] == 'AccessDeniedException':
78 self.manager.log.warning(
79 "Access denied when describing key:%s",
80 key_id)
81 # If a describe fails, we still want the `Arn` key
82 # available since it is a core attribute
83 r['Arn'] = r['KeyArn']
84 else:
85 raise
86
87 alias_names = self.manager.alias_map.get(key_id)
88 if alias_names:
89 r['AliasNames'] = alias_names
90
91 return universal_augment(self.manager, resources)
92
93
94class ConfigKey(ConfigSource):
95
96 def load_resource(self, item):
97 resource = super().load_resource(item)
98 alias_names = self.manager.alias_map.get(resource[self.manager.resource_type.id])
99 if alias_names:
100 resource['AliasNames'] = alias_names
101 return resource
102
103
104@resources.register('kms-key')
105class Key(QueryResourceManager):
106
107 class resource_type(TypeInfo):
108 service = 'kms'
109 arn_type = "key"
110 enum_spec = ('list_keys', 'Keys', None)
111 detail_spec = ('describe_key', 'KeyId', 'Arn', 'KeyMetadata') # overriden
112 name = id = "KeyId"
113 arn = 'Arn'
114 universal_taggable = True
115 cfn_type = config_type = 'AWS::KMS::Key'
116 permissions_augment = ("kms:ListResourceTags",)
117
118 source_mapping = {
119 'config': ConfigKey,
120 'describe': DescribeKey
121 }
122
123 @property
124 @lru_cache()
125 def alias_map(self):
126 """A dict mapping key IDs to aliases
127
128 Fetch key aliases as a flat list, and convert it to a map of
129 key ID -> aliases. We can build this once and use it to
130 augment key resources.
131 """
132 aliases = KeyAlias(self.ctx, {}).resources()
133 alias_map = defaultdict(list)
134 for a in aliases:
135 alias_map[a['TargetKeyId']].append(a['AliasName'])
136 return alias_map
137
138
139@Key.filter_registry.register('key-rotation-status')
140class KeyRotationStatus(ValueFilter):
141 """Filters KMS keys by the rotation status
142
143 :example:
144
145 .. code-block:: yaml
146
147 policies:
148 - name: kms-key-disabled-rotation
149 resource: kms-key
150 filters:
151 - type: key-rotation-status
152 key: KeyRotationEnabled
153 value: false
154 """
155
156 schema = type_schema('key-rotation-status', rinherit=ValueFilter.schema)
157 schema_alias = False
158 permissions = ('kms:GetKeyRotationStatus',)
159
160 def process(self, resources, event=None):
161 client = local_session(self.manager.session_factory).client('kms')
162
163 def _key_rotation_status(resource):
164 try:
165 resource['KeyRotationEnabled'] = client.get_key_rotation_status(
166 KeyId=resource['KeyId'])
167 except ClientError as e:
168 if e.response['Error']['Code'] == 'AccessDeniedException':
169 self.log.warning(
170 "Access denied when getting rotation status on key:%s",
171 resource.get('KeyArn'))
172 else:
173 raise
174
175 with self.executor_factory(max_workers=2) as w:
176 query_resources = [
177 r for r in resources if 'KeyRotationEnabled' not in r]
178 self.log.debug(
179 "Querying %d kms-keys' rotation status" % len(query_resources))
180 list(w.map(_key_rotation_status, query_resources))
181
182 return [r for r in resources if self.match(
183 r.get('KeyRotationEnabled', {}))]
184
185
186class KMSPolicyChecker(PolicyChecker):
187 # https://docs.aws.amazon.com/kms/latest/developerguide/policy-conditions.html#conditions-kms
188
189 def handle_kms_calleraccount(self, s, c):
190 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts))
191
192 def handle_kms_viaservice(self, s, c):
193 # We dont filter on service so all are presumed allowed
194 return False
195
196 def handle_kms_grantoperations(self, s, c):
197 # We dont filter on GrantOperations so all are presumed allowed
198 return False
199
200
201@Key.filter_registry.register('cross-account')
202@KeyAlias.filter_registry.register('cross-account')
203class KMSCrossAccountAccessFilter(CrossAccountAccessFilter):
204 """Filter KMS keys which have cross account permissions
205
206 :example:
207
208 .. code-block:: yaml
209
210 policies:
211 - name: check-kms-key-cross-account
212 resource: kms-key
213 filters:
214 - type: cross-account
215 """
216 permissions = ('kms:GetKeyPolicy',)
217
218 checker_factory = KMSPolicyChecker
219
220 def process(self, resources, event=None):
221 client = local_session(
222 self.manager.session_factory).client('kms')
223
224 def _augment(r):
225 key_id = r.get('TargetKeyId', r.get('KeyId'))
226 assert key_id, "Invalid key resources %s" % r
227 r['Policy'] = client.get_key_policy(
228 KeyId=key_id, PolicyName='default')['Policy']
229 return r
230
231 self.log.debug("fetching policy for %d kms keys" % len(resources))
232 with self.executor_factory(max_workers=1) as w:
233 resources = list(filter(None, w.map(_augment, resources)))
234
235 return super(KMSCrossAccountAccessFilter, self).process(
236 resources, event)
237
238
239@KeyAlias.filter_registry.register('grant-count')
240class GrantCount(Filter):
241 """Filters KMS key grants
242
243 This can be used to ensure issues around grant limits are monitored
244
245 :example:
246
247 .. code-block:: yaml
248
249 policies:
250 - name: kms-grants
251 resource: kms
252 filters:
253 - type: grant-count
254 min: 100
255 """
256
257 schema = type_schema(
258 'grant-count', min={'type': 'integer', 'minimum': 0})
259 permissions = ('kms:ListGrants',)
260
261 def process(self, keys, event=None):
262 client = local_session(self.manager.session_factory).client('kms')
263 results = []
264 for k in keys:
265 results.append(self.process_key(client, k))
266 return [r for r in results if r]
267
268 def process_key(self, client, key):
269 p = client.get_paginator('list_grants')
270 p.PAGE_ITERATOR_CLS = RetryPageIterator
271 grant_count = 0
272 for rp in p.paginate(KeyId=key['TargetKeyId']):
273 grant_count += len(rp['Grants'])
274 key['GrantCount'] = grant_count
275
276 grant_threshold = self.data.get('min', 5)
277 if grant_count < grant_threshold:
278 return None
279
280 self.manager.ctx.metrics.put_metric(
281 "ExtantGrants", grant_count, "Count",
282 Scope=key['AliasName'][6:])
283
284 return key
285
286
287class ResourceKmsKeyAlias(ValueFilter):
288
289 schema = type_schema('kms-alias', rinherit=ValueFilter.schema)
290 schema_alias = False
291
292 def get_permissions(self):
293 return KeyAlias(self.manager.ctx, {}).get_permissions()
294
295 def get_matching_aliases(self, resources, event=None):
296 key_aliases = KeyAlias(self.manager.ctx, {}).resources()
297 key_aliases_dict = {a['TargetKeyId']: a for a in key_aliases}
298
299 matched = []
300 for r in resources:
301 if r.get('KmsKeyId'):
302 r['KeyAlias'] = key_aliases_dict.get(
303 r.get('KmsKeyId').split("key/", 1)[-1])
304 if self.match(r.get('KeyAlias')):
305 matched.append(r)
306 return matched
307
308
309@Key.action_registry.register('remove-statements')
310@KeyAlias.action_registry.register('remove-statements')
311class RemovePolicyStatement(RemovePolicyBase):
312 """Action to remove policy statements from KMS
313
314 :example:
315
316 .. code-block:: yaml
317
318 policies:
319 - name: kms-key-cross-account
320 resource: kms-key
321 filters:
322 - type: cross-account
323 actions:
324 - type: remove-statements
325 statement_ids: matched
326 """
327
328 permissions = ('kms:GetKeyPolicy', 'kms:PutKeyPolicy')
329
330 def process(self, resources):
331 results = []
332 client = local_session(self.manager.session_factory).client('kms')
333 for r in resources:
334 key_id = r.get('TargetKeyId', r.get('KeyId'))
335 assert key_id, "Invalid key resources %s" % r
336 try:
337 results += filter(None, [self.process_resource(client, r, key_id)])
338 except Exception:
339 self.log.exception(
340 "Error processing sns:%s", key_id)
341 return results
342
343 def process_resource(self, client, resource, key_id):
344 if 'Policy' not in resource:
345 try:
346 resource['Policy'] = client.get_key_policy(
347 KeyId=key_id, PolicyName='default')['Policy']
348 except ClientError as e:
349 if e.response['Error']['Code'] != "NotFoundException":
350 raise
351 resource['Policy'] = None
352
353 if not resource['Policy']:
354 return
355
356 p = json.loads(resource['Policy'])
357 _, found = self.process_policy(
358 p, resource, CrossAccountAccessFilter.annotation_key)
359
360 if not found:
361 return
362
363 # NB: KMS supports only one key policy 'default'
364 # http://docs.aws.amazon.com/kms/latest/developerguide/programming-key-policies.html#list-policies
365 client.put_key_policy(
366 KeyId=key_id,
367 PolicyName='default',
368 Policy=json.dumps(p)
369 )
370
371 return {'Name': key_id,
372 'State': 'PolicyRemoved',
373 'Statements': found}
374
375
376@Key.action_registry.register('set-rotation')
377class KmsKeyRotation(BaseAction):
378 """Toggle KMS key rotation
379
380 :example:
381
382 .. code-block:: yaml
383
384 policies:
385 - name: enable-cmk-rotation
386 resource: kms-key
387 filters:
388 - type: key-rotation-status
389 key: KeyRotationEnabled
390 value: False
391 actions:
392 - type: set-rotation
393 state: True
394 """
395 permissions = ('kms:EnableKeyRotation',)
396 schema = type_schema('set-rotation', state={'type': 'boolean'})
397
398 def process(self, keys):
399 client = local_session(self.manager.session_factory).client('kms')
400 for k in keys:
401 if self.data.get('state', True):
402 client.enable_key_rotation(KeyId=k['KeyId'])
403 continue
404 client.disable_key_rotation(KeyId=k['KeyId'])
405
406
407@KeyAlias.action_registry.register('post-finding')
408@Key.action_registry.register('post-finding')
409class KmsPostFinding(PostFinding):
410
411 resource_type = 'AwsKmsKey'
412
413 def format_resource(self, r):
414 if 'TargetKeyId' in r:
415 resolved = self.manager.get_resource_manager(
416 'kms-key').get_resources([r['TargetKeyId']])
417 if not resolved:
418 return
419 r = resolved[0]
420 r[self.manager.resource_type.id] = r['KeyId']
421 envelope, payload = self.format_envelope(r)
422 payload.update(self.filter_empty(
423 select_keys(r, [
424 'AWSAccount', 'CreationDate', 'KeyId',
425 'KeyManager', 'Origin', 'KeyState'])))
426
427 # Securityhub expects a unix timestamp for CreationDate
428 if 'CreationDate' in payload and isinstance(payload['CreationDate'], datetime):
429 payload['CreationDate'] = (
430 payload['CreationDate'].replace(tzinfo=timezone.utc).timestamp()
431 )
432
433 return envelope