1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from c7n.filters.iamaccess import _account, PolicyChecker
4from botocore.exceptions import ClientError
5
6from datetime import datetime, timezone
7import json
8from collections import defaultdict
9from functools import lru_cache
10
11from c7n.actions import RemovePolicyBase, BaseAction
12from c7n.filters import Filter, CrossAccountAccessFilter, ValueFilter
13from c7n.manager import resources
14from c7n.query import (
15 ConfigSource, DescribeSource, QueryResourceManager, RetryPageIterator, TypeInfo)
16from c7n.utils import local_session, type_schema, select_keys
17from c7n.tags import universal_augment
18
19from .securityhub import PostFinding
20
21
22class DescribeAlias(DescribeSource):
23
24 def augment(self, resources):
25 return [r for r in resources if 'TargetKeyId' in r]
26
27
28@resources.register('kms')
29class KeyAlias(QueryResourceManager):
30
31 class resource_type(TypeInfo):
32 service = 'kms'
33 arn_type = 'alias'
34 enum_spec = ('list_aliases', 'Aliases', None)
35 name = "AliasName"
36 id = "AliasArn"
37 config_type = cfn_type = 'AWS::KMS::Alias'
38
39 source_mapping = {'describe': DescribeAlias, 'config': ConfigSource}
40
41
42class DescribeKey(DescribeSource):
43
44 FetchThreshold = 10 # ie should we describe all keys or just fetch them directly
45
46 def get_resources(self, ids, cache=True):
47 # this forms a threshold beyond which we'll fetch individual keys of interest.
48 # else we'll need to fetch through the full set and client side filter.
49 if len(ids) < self.FetchThreshold:
50 client = local_session(self.manager.session_factory).client('kms')
51 results = []
52 for rid in ids:
53 try:
54 results.append(
55 self.manager.retry(
56 client.describe_key,
57 KeyId=rid)['KeyMetadata'])
58 except client.exceptions.NotFoundException:
59 continue
60 return results
61 return super().get_resources(ids, cache)
62
63 def augment(self, resources):
64 client = local_session(self.manager.session_factory).client('kms')
65 for r in resources:
66 key_id = r.get('KeyId')
67
68 # We get `KeyArn` from list_keys and `Arn` from describe_key.
69 # If we already have describe_key details we don't need to fetch
70 # it again.
71 if 'Arn' not in r:
72 try:
73 key_arn = r.get('KeyArn', key_id)
74 key_detail = client.describe_key(KeyId=key_arn)['KeyMetadata']
75 r.update(key_detail)
76 except ClientError as e:
77 if e.response['Error']['Code'] == 'AccessDeniedException':
78 self.manager.log.warning(
79 "Access denied when describing key:%s",
80 key_id)
81 # If a describe fails, we still want the `Arn` key
82 # available since it is a core attribute
83 r['Arn'] = r['KeyArn']
84 else:
85 raise
86
87 alias_names = self.manager.alias_map.get(key_id)
88 if alias_names:
89 r['AliasNames'] = alias_names
90
91 return universal_augment(self.manager, resources)
92
93
94class ConfigKey(ConfigSource):
95
96 def load_resource(self, item):
97 resource = super().load_resource(item)
98 alias_names = self.manager.alias_map.get(resource[self.manager.resource_type.id])
99 if alias_names:
100 resource['AliasNames'] = alias_names
101 return resource
102
103
104@resources.register('kms-key')
105class Key(QueryResourceManager):
106
107 class resource_type(TypeInfo):
108 service = 'kms'
109 arn_type = "key"
110 enum_spec = ('list_keys', 'Keys', None)
111 detail_spec = ('describe_key', 'KeyId', 'Arn', 'KeyMetadata') # overriden
112 name = id = "KeyId"
113 arn = 'Arn'
114 universal_taggable = True
115 cfn_type = config_type = 'AWS::KMS::Key'
116 permissions_augment = ("kms:ListResourceTags",)
117
118 source_mapping = {
119 'config': ConfigKey,
120 'describe': DescribeKey
121 }
122
123 @property
124 @lru_cache()
125 def alias_map(self):
126 """A dict mapping key IDs to aliases
127
128 Fetch key aliases as a flat list, and convert it to a map of
129 key ID -> aliases. We can build this once and use it to
130 augment key resources.
131 """
132 aliases = KeyAlias(self.ctx, {}).resources()
133 alias_map = defaultdict(list)
134 for a in aliases:
135 alias_map[a['TargetKeyId']].append(a['AliasName'])
136 return alias_map
137
138
139@Key.filter_registry.register('key-rotation-status')
140class KeyRotationStatus(ValueFilter):
141 """Filters KMS keys by the rotation status
142
143 :example:
144
145 .. code-block:: yaml
146
147 policies:
148 - name: kms-key-disabled-rotation
149 resource: kms-key
150 filters:
151 - type: key-rotation-status
152 key: KeyRotationEnabled
153 value: false
154 """
155
156 schema = type_schema('key-rotation-status', rinherit=ValueFilter.schema)
157 schema_alias = False
158 permissions = ('kms:GetKeyRotationStatus',)
159
160 def process(self, resources, event=None):
161 client = local_session(self.manager.session_factory).client('kms')
162
163 def _key_rotation_status(resource):
164 try:
165 resource['KeyRotationEnabled'] = client.get_key_rotation_status(
166 KeyId=resource['KeyId'])
167 except ClientError as e:
168 if e.response['Error']['Code'] == 'AccessDeniedException':
169 self.log.warning(
170 "Access denied when getting rotation status on key:%s",
171 resource.get('KeyArn'))
172 elif e.response['Error']['Code'] == 'UnsupportedOperationException':
173 # This is expected for keys that do not support rotation
174 # e.g. keys in custom keystores or when keys are in certain
175 # states such as PendingImport.
176 self.log.warning(
177 "UnsupportedOperationException when getting rotation status on key:%s",
178 resource.get('KeyArn'))
179 else:
180 raise
181
182 with self.executor_factory(max_workers=2) as w:
183 query_resources = [
184 r for r in resources if 'KeyRotationEnabled' not in r]
185 self.log.debug(
186 "Querying %d kms-keys' rotation status" % len(query_resources))
187 list(w.map(_key_rotation_status, query_resources))
188
189 return [r for r in resources if self.match(
190 r.get('KeyRotationEnabled', {}))]
191
192
193class KMSPolicyChecker(PolicyChecker):
194 # https://docs.aws.amazon.com/kms/latest/developerguide/policy-conditions.html#conditions-kms
195
196 def handle_kms_calleraccount(self, s, c):
197 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts))
198
199 def handle_kms_viaservice(self, s, c):
200 # We dont filter on service so all are presumed allowed
201 return False
202
203 def handle_kms_grantoperations(self, s, c):
204 # We dont filter on GrantOperations so all are presumed allowed
205 return False
206
207
208@Key.filter_registry.register('cross-account')
209@KeyAlias.filter_registry.register('cross-account')
210class KMSCrossAccountAccessFilter(CrossAccountAccessFilter):
211 """Filter KMS keys which have cross account permissions
212
213 :example:
214
215 .. code-block:: yaml
216
217 policies:
218 - name: check-kms-key-cross-account
219 resource: kms-key
220 filters:
221 - type: cross-account
222 """
223 permissions = ('kms:GetKeyPolicy',)
224
225 checker_factory = KMSPolicyChecker
226
227 def process(self, resources, event=None):
228 client = local_session(
229 self.manager.session_factory).client('kms')
230
231 def _augment(r):
232 key_id = r.get('TargetKeyId', r.get('KeyId'))
233 assert key_id, "Invalid key resources %s" % r
234 r['Policy'] = client.get_key_policy(
235 KeyId=key_id, PolicyName='default')['Policy']
236 return r
237
238 self.log.debug("fetching policy for %d kms keys" % len(resources))
239 with self.executor_factory(max_workers=1) as w:
240 resources = list(filter(None, w.map(_augment, resources)))
241
242 return super(KMSCrossAccountAccessFilter, self).process(
243 resources, event)
244
245
246@KeyAlias.filter_registry.register('grant-count')
247class GrantCount(Filter):
248 """Filters KMS key grants
249
250 This can be used to ensure issues around grant limits are monitored
251
252 :example:
253
254 .. code-block:: yaml
255
256 policies:
257 - name: kms-grants
258 resource: kms
259 filters:
260 - type: grant-count
261 min: 100
262 """
263
264 schema = type_schema(
265 'grant-count', min={'type': 'integer', 'minimum': 0})
266 permissions = ('kms:ListGrants',)
267
268 def process(self, keys, event=None):
269 client = local_session(self.manager.session_factory).client('kms')
270 results = []
271 for k in keys:
272 results.append(self.process_key(client, k))
273 return [r for r in results if r]
274
275 def process_key(self, client, key):
276 p = client.get_paginator('list_grants')
277 p.PAGE_ITERATOR_CLS = RetryPageIterator
278 grant_count = 0
279 for rp in p.paginate(KeyId=key['TargetKeyId']):
280 grant_count += len(rp['Grants'])
281 key['GrantCount'] = grant_count
282
283 grant_threshold = self.data.get('min', 5)
284 if grant_count < grant_threshold:
285 return None
286
287 self.manager.ctx.metrics.put_metric(
288 "ExtantGrants", grant_count, "Count",
289 Scope=key['AliasName'][6:])
290
291 return key
292
293
294class ResourceKmsKeyAlias(ValueFilter):
295
296 schema = type_schema('kms-alias', rinherit=ValueFilter.schema)
297 schema_alias = False
298
299 def get_permissions(self):
300 return KeyAlias(self.manager.ctx, {}).get_permissions()
301
302 def get_matching_aliases(self, resources, event=None):
303 key_aliases = KeyAlias(self.manager.ctx, {}).resources()
304 key_aliases_dict = {a['TargetKeyId']: a for a in key_aliases}
305
306 matched = []
307 for r in resources:
308 if r.get('KmsKeyId'):
309 r['KeyAlias'] = key_aliases_dict.get(
310 r.get('KmsKeyId').split("key/", 1)[-1])
311 if self.match(r.get('KeyAlias')):
312 matched.append(r)
313 return matched
314
315
316@Key.action_registry.register('remove-statements')
317@KeyAlias.action_registry.register('remove-statements')
318class RemovePolicyStatement(RemovePolicyBase):
319 """Action to remove policy statements from KMS
320
321 :example:
322
323 .. code-block:: yaml
324
325 policies:
326 - name: kms-key-cross-account
327 resource: kms-key
328 filters:
329 - type: cross-account
330 actions:
331 - type: remove-statements
332 statement_ids: matched
333 """
334
335 permissions = ('kms:GetKeyPolicy', 'kms:PutKeyPolicy')
336
337 def process(self, resources):
338 results = []
339 client = local_session(self.manager.session_factory).client('kms')
340 for r in resources:
341 key_id = r.get('TargetKeyId', r.get('KeyId'))
342 assert key_id, "Invalid key resources %s" % r
343 try:
344 results += filter(None, [self.process_resource(client, r, key_id)])
345 except Exception:
346 self.log.exception(
347 "Error processing sns:%s", key_id)
348 return results
349
350 def process_resource(self, client, resource, key_id):
351 if 'Policy' not in resource:
352 try:
353 resource['Policy'] = client.get_key_policy(
354 KeyId=key_id, PolicyName='default')['Policy']
355 except ClientError as e:
356 if e.response['Error']['Code'] != "NotFoundException":
357 raise
358 resource['Policy'] = None
359
360 if not resource['Policy']:
361 return
362
363 p = json.loads(resource['Policy'])
364 _, found = self.process_policy(
365 p, resource, CrossAccountAccessFilter.annotation_key)
366
367 if not found:
368 return
369
370 # NB: KMS supports only one key policy 'default'
371 # http://docs.aws.amazon.com/kms/latest/developerguide/programming-key-policies.html#list-policies
372 client.put_key_policy(
373 KeyId=key_id,
374 PolicyName='default',
375 Policy=json.dumps(p)
376 )
377
378 return {'Name': key_id,
379 'State': 'PolicyRemoved',
380 'Statements': found}
381
382
383@Key.action_registry.register('set-rotation')
384class KmsKeyRotation(BaseAction):
385 """Toggle KMS key rotation
386
387 :example:
388
389 .. code-block:: yaml
390
391 policies:
392 - name: enable-cmk-rotation
393 resource: kms-key
394 filters:
395 - type: key-rotation-status
396 key: KeyRotationEnabled
397 value: False
398 actions:
399 - type: set-rotation
400 state: True
401 """
402 permissions = ('kms:EnableKeyRotation',)
403 schema = type_schema('set-rotation', state={'type': 'boolean'})
404
405 def process(self, keys):
406 client = local_session(self.manager.session_factory).client('kms')
407 for k in keys:
408 if self.data.get('state', True):
409 client.enable_key_rotation(KeyId=k['KeyId'])
410 continue
411 client.disable_key_rotation(KeyId=k['KeyId'])
412
413
414@KeyAlias.action_registry.register('post-finding')
415@Key.action_registry.register('post-finding')
416class KmsPostFinding(PostFinding):
417
418 resource_type = 'AwsKmsKey'
419
420 def format_resource(self, r):
421 if 'TargetKeyId' in r:
422 resolved = self.manager.get_resource_manager(
423 'kms-key').get_resources([r['TargetKeyId']])
424 if not resolved:
425 return
426 r = resolved[0]
427 r[self.manager.resource_type.id] = r['KeyId']
428 envelope, payload = self.format_envelope(r)
429 payload.update(self.filter_empty(
430 select_keys(r, [
431 'AWSAccount', 'CreationDate', 'KeyId',
432 'KeyManager', 'Origin', 'KeyState'])))
433
434 # Securityhub expects a unix timestamp for CreationDate
435 if 'CreationDate' in payload and isinstance(payload['CreationDate'], datetime):
436 payload['CreationDate'] = (
437 payload['CreationDate'].replace(tzinfo=timezone.utc).timestamp()
438 )
439
440 return envelope
441
442
443@Key.action_registry.register("schedule-deletion")
444class KmsKeyScheduleDeletion(BaseAction):
445 """Schedule KMS key deletion
446
447 If the number of days is not specified, the default value of 30 days is used.
448 The number of days must be between 7 and 30.
449
450 :example:
451
452 .. code-block:: yaml
453
454 policies:
455 - name: delete-tagged-keys
456 resource: kms-key
457 filters:
458 - type: value
459 key: tag:DeleteAfter
460 op: ge
461 value_type: age # age is a special value type that will be converted to a timestamp
462 value: 0
463 actions:
464 - type: schedule-deletion
465 days: 7
466 """
467
468 permissions = ("kms:ScheduleKeyDeletion",)
469 schema = type_schema("schedule-deletion", days={"type": "integer", "minimum": 7, "maximum": 30})
470
471 def process(self, keys):
472 client = local_session(self.manager.session_factory).client("kms")
473 for k in keys:
474 client.schedule_key_deletion(
475 KeyId=k["KeyId"], PendingWindowInDays=self.data.get("days", 30)
476 )