Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/kms.py: 40%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from c7n.filters.iamaccess import _account, PolicyChecker
4from botocore.exceptions import ClientError
6from datetime import datetime, timezone
7import json
8from collections import defaultdict
9from functools import lru_cache
11from c7n.actions import RemovePolicyBase, BaseAction
12from c7n.filters import Filter, CrossAccountAccessFilter, ValueFilter
13from c7n.manager import resources
14from c7n.query import (
15 ConfigSource, DescribeSource, QueryResourceManager, RetryPageIterator, TypeInfo)
16from c7n.utils import local_session, type_schema, select_keys
17from c7n.tags import universal_augment
19from .securityhub import PostFinding
22class DescribeAlias(DescribeSource):
24 def augment(self, resources):
25 return [r for r in resources if 'TargetKeyId' in r]
28@resources.register('kms')
29class KeyAlias(QueryResourceManager):
31 class resource_type(TypeInfo):
32 service = 'kms'
33 arn_type = 'alias'
34 enum_spec = ('list_aliases', 'Aliases', None)
35 name = "AliasName"
36 id = "AliasArn"
37 config_type = cfn_type = 'AWS::KMS::Alias'
39 source_mapping = {'describe': DescribeAlias, 'config': ConfigSource}
42class DescribeKey(DescribeSource):
44 FetchThreshold = 10 # ie should we describe all keys or just fetch them directly
46 def get_resources(self, ids, cache=True):
47 # this forms a threshold beyond which we'll fetch individual keys of interest.
48 # else we'll need to fetch through the full set and client side filter.
49 if len(ids) < self.FetchThreshold:
50 client = local_session(self.manager.session_factory).client('kms')
51 results = []
52 for rid in ids:
53 try:
54 results.append(
55 self.manager.retry(
56 client.describe_key,
57 KeyId=rid)['KeyMetadata'])
58 except client.exceptions.NotFoundException:
59 continue
60 return results
61 return super().get_resources(ids, cache)
63 def augment(self, resources):
64 client = local_session(self.manager.session_factory).client('kms')
65 for r in resources:
66 key_id = r.get('KeyId')
68 # We get `KeyArn` from list_keys and `Arn` from describe_key.
69 # If we already have describe_key details we don't need to fetch
70 # it again.
71 if 'Arn' not in r:
72 try:
73 key_arn = r.get('KeyArn', key_id)
74 key_detail = client.describe_key(KeyId=key_arn)['KeyMetadata']
75 r.update(key_detail)
76 except ClientError as e:
77 if e.response['Error']['Code'] == 'AccessDeniedException':
78 self.manager.log.warning(
79 "Access denied when describing key:%s",
80 key_id)
81 # If a describe fails, we still want the `Arn` key
82 # available since it is a core attribute
83 r['Arn'] = r['KeyArn']
84 else:
85 raise
87 alias_names = self.manager.alias_map.get(key_id)
88 if alias_names:
89 r['AliasNames'] = alias_names
91 return universal_augment(self.manager, resources)
94class ConfigKey(ConfigSource):
96 def load_resource(self, item):
97 resource = super().load_resource(item)
98 alias_names = self.manager.alias_map.get(resource[self.manager.resource_type.id])
99 if alias_names:
100 resource['AliasNames'] = alias_names
101 return resource
104@resources.register('kms-key')
105class Key(QueryResourceManager):
107 class resource_type(TypeInfo):
108 service = 'kms'
109 arn_type = "key"
110 enum_spec = ('list_keys', 'Keys', None)
111 detail_spec = ('describe_key', 'KeyId', 'Arn', 'KeyMetadata') # overriden
112 name = id = "KeyId"
113 arn = 'Arn'
114 universal_taggable = True
115 cfn_type = config_type = 'AWS::KMS::Key'
116 permissions_augment = ("kms:ListResourceTags",)
118 source_mapping = {
119 'config': ConfigKey,
120 'describe': DescribeKey
121 }
123 @property
124 @lru_cache()
125 def alias_map(self):
126 """A dict mapping key IDs to aliases
128 Fetch key aliases as a flat list, and convert it to a map of
129 key ID -> aliases. We can build this once and use it to
130 augment key resources.
131 """
132 aliases = KeyAlias(self.ctx, {}).resources()
133 alias_map = defaultdict(list)
134 for a in aliases:
135 alias_map[a['TargetKeyId']].append(a['AliasName'])
136 return alias_map
139@Key.filter_registry.register('key-rotation-status')
140class KeyRotationStatus(ValueFilter):
141 """Filters KMS keys by the rotation status
143 :example:
145 .. code-block:: yaml
147 policies:
148 - name: kms-key-disabled-rotation
149 resource: kms-key
150 filters:
151 - type: key-rotation-status
152 key: KeyRotationEnabled
153 value: false
154 """
156 schema = type_schema('key-rotation-status', rinherit=ValueFilter.schema)
157 schema_alias = False
158 permissions = ('kms:GetKeyRotationStatus',)
160 def process(self, resources, event=None):
161 client = local_session(self.manager.session_factory).client('kms')
163 def _key_rotation_status(resource):
164 try:
165 resource['KeyRotationEnabled'] = client.get_key_rotation_status(
166 KeyId=resource['KeyId'])
167 except ClientError as e:
168 if e.response['Error']['Code'] == 'AccessDeniedException':
169 self.log.warning(
170 "Access denied when getting rotation status on key:%s",
171 resource.get('KeyArn'))
172 elif e.response['Error']['Code'] == 'UnsupportedOperationException':
173 # This is expected for keys that do not support rotation
174 # e.g. keys in custom keystores or when keys are in certain
175 # states such as PendingImport.
176 self.log.warning(
177 "UnsupportedOperationException when getting rotation status on key:%s",
178 resource.get('KeyArn'))
179 else:
180 raise
182 with self.executor_factory(max_workers=2) as w:
183 query_resources = [
184 r for r in resources if 'KeyRotationEnabled' not in r]
185 self.log.debug(
186 "Querying %d kms-keys' rotation status" % len(query_resources))
187 list(w.map(_key_rotation_status, query_resources))
189 return [r for r in resources if self.match(
190 r.get('KeyRotationEnabled', {}))]
193class KMSPolicyChecker(PolicyChecker):
194 # https://docs.aws.amazon.com/kms/latest/developerguide/policy-conditions.html#conditions-kms
196 def handle_kms_calleraccount(self, s, c):
197 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts))
199 def handle_kms_viaservice(self, s, c):
200 # We dont filter on service so all are presumed allowed
201 return False
203 def handle_kms_grantoperations(self, s, c):
204 # We dont filter on GrantOperations so all are presumed allowed
205 return False
208@Key.filter_registry.register('cross-account')
209@KeyAlias.filter_registry.register('cross-account')
210class KMSCrossAccountAccessFilter(CrossAccountAccessFilter):
211 """Filter KMS keys which have cross account permissions
213 :example:
215 .. code-block:: yaml
217 policies:
218 - name: check-kms-key-cross-account
219 resource: kms-key
220 filters:
221 - type: cross-account
222 """
223 permissions = ('kms:GetKeyPolicy',)
225 checker_factory = KMSPolicyChecker
227 def process(self, resources, event=None):
228 client = local_session(
229 self.manager.session_factory).client('kms')
231 def _augment(r):
232 key_id = r.get('TargetKeyId', r.get('KeyId'))
233 assert key_id, "Invalid key resources %s" % r
234 r['Policy'] = client.get_key_policy(
235 KeyId=key_id, PolicyName='default')['Policy']
236 return r
238 self.log.debug("fetching policy for %d kms keys" % len(resources))
239 with self.executor_factory(max_workers=1) as w:
240 resources = list(filter(None, w.map(_augment, resources)))
242 return super(KMSCrossAccountAccessFilter, self).process(
243 resources, event)
246@KeyAlias.filter_registry.register('grant-count')
247class GrantCount(Filter):
248 """Filters KMS key grants
250 This can be used to ensure issues around grant limits are monitored
252 :example:
254 .. code-block:: yaml
256 policies:
257 - name: kms-grants
258 resource: kms
259 filters:
260 - type: grant-count
261 min: 100
262 """
264 schema = type_schema(
265 'grant-count', min={'type': 'integer', 'minimum': 0})
266 permissions = ('kms:ListGrants',)
268 def process(self, keys, event=None):
269 client = local_session(self.manager.session_factory).client('kms')
270 results = []
271 for k in keys:
272 results.append(self.process_key(client, k))
273 return [r for r in results if r]
275 def process_key(self, client, key):
276 p = client.get_paginator('list_grants')
277 p.PAGE_ITERATOR_CLS = RetryPageIterator
278 grant_count = 0
279 for rp in p.paginate(KeyId=key['TargetKeyId']):
280 grant_count += len(rp['Grants'])
281 key['GrantCount'] = grant_count
283 grant_threshold = self.data.get('min', 5)
284 if grant_count < grant_threshold:
285 return None
287 self.manager.ctx.metrics.put_metric(
288 "ExtantGrants", grant_count, "Count",
289 Scope=key['AliasName'][6:])
291 return key
294class ResourceKmsKeyAlias(ValueFilter):
296 schema = type_schema('kms-alias', rinherit=ValueFilter.schema)
297 schema_alias = False
299 def get_permissions(self):
300 return KeyAlias(self.manager.ctx, {}).get_permissions()
302 def get_matching_aliases(self, resources, event=None):
303 key_aliases = KeyAlias(self.manager.ctx, {}).resources()
304 key_aliases_dict = {a['TargetKeyId']: a for a in key_aliases}
306 matched = []
307 for r in resources:
308 if r.get('KmsKeyId'):
309 r['KeyAlias'] = key_aliases_dict.get(
310 r.get('KmsKeyId').split("key/", 1)[-1])
311 if self.match(r.get('KeyAlias')):
312 matched.append(r)
313 return matched
316@Key.action_registry.register('remove-statements')
317@KeyAlias.action_registry.register('remove-statements')
318class RemovePolicyStatement(RemovePolicyBase):
319 """Action to remove policy statements from KMS
321 :example:
323 .. code-block:: yaml
325 policies:
326 - name: kms-key-cross-account
327 resource: kms-key
328 filters:
329 - type: cross-account
330 actions:
331 - type: remove-statements
332 statement_ids: matched
333 """
335 permissions = ('kms:GetKeyPolicy', 'kms:PutKeyPolicy')
337 def process(self, resources):
338 results = []
339 client = local_session(self.manager.session_factory).client('kms')
340 for r in resources:
341 key_id = r.get('TargetKeyId', r.get('KeyId'))
342 assert key_id, "Invalid key resources %s" % r
343 try:
344 results += filter(None, [self.process_resource(client, r, key_id)])
345 except Exception:
346 self.log.exception(
347 "Error processing sns:%s", key_id)
348 return results
350 def process_resource(self, client, resource, key_id):
351 if 'Policy' not in resource:
352 try:
353 resource['Policy'] = client.get_key_policy(
354 KeyId=key_id, PolicyName='default')['Policy']
355 except ClientError as e:
356 if e.response['Error']['Code'] != "NotFoundException":
357 raise
358 resource['Policy'] = None
360 if not resource['Policy']:
361 return
363 p = json.loads(resource['Policy'])
364 _, found = self.process_policy(
365 p, resource, CrossAccountAccessFilter.annotation_key)
367 if not found:
368 return
370 # NB: KMS supports only one key policy 'default'
371 # http://docs.aws.amazon.com/kms/latest/developerguide/programming-key-policies.html#list-policies
372 client.put_key_policy(
373 KeyId=key_id,
374 PolicyName='default',
375 Policy=json.dumps(p)
376 )
378 return {'Name': key_id,
379 'State': 'PolicyRemoved',
380 'Statements': found}
383@Key.action_registry.register('set-rotation')
384class KmsKeyRotation(BaseAction):
385 """Toggle KMS key rotation
387 :example:
389 .. code-block:: yaml
391 policies:
392 - name: enable-cmk-rotation
393 resource: kms-key
394 filters:
395 - type: key-rotation-status
396 key: KeyRotationEnabled
397 value: False
398 actions:
399 - type: set-rotation
400 state: True
401 """
402 permissions = ('kms:EnableKeyRotation',)
403 schema = type_schema('set-rotation', state={'type': 'boolean'})
405 def process(self, keys):
406 client = local_session(self.manager.session_factory).client('kms')
407 for k in keys:
408 if self.data.get('state', True):
409 client.enable_key_rotation(KeyId=k['KeyId'])
410 continue
411 client.disable_key_rotation(KeyId=k['KeyId'])
414@KeyAlias.action_registry.register('post-finding')
415@Key.action_registry.register('post-finding')
416class KmsPostFinding(PostFinding):
418 resource_type = 'AwsKmsKey'
420 def format_resource(self, r):
421 if 'TargetKeyId' in r:
422 resolved = self.manager.get_resource_manager(
423 'kms-key').get_resources([r['TargetKeyId']])
424 if not resolved:
425 return
426 r = resolved[0]
427 r[self.manager.resource_type.id] = r['KeyId']
428 envelope, payload = self.format_envelope(r)
429 payload.update(self.filter_empty(
430 select_keys(r, [
431 'AWSAccount', 'CreationDate', 'KeyId',
432 'KeyManager', 'Origin', 'KeyState'])))
434 # Securityhub expects a unix timestamp for CreationDate
435 if 'CreationDate' in payload and isinstance(payload['CreationDate'], datetime):
436 payload['CreationDate'] = (
437 payload['CreationDate'].replace(tzinfo=timezone.utc).timestamp()
438 )
440 return envelope
443@Key.filter_registry.register('last-rotation')
444class LastRotation(ValueFilter):
445 """Queries KMS keys by the last time they were rotated.
447 :example:
449 .. code-block:: yaml
451 policies:
452 - name: kms-not-rotated-in-last-30
453 resource: kms-key
454 filters:
455 - type: last-rotation
456 key: RotationDate
457 value: 30
458 value_type: age
459 op: gte
461 """
463 schema = type_schema('last-rotation', rinherit=ValueFilter.schema)
464 schema_alias = False
465 permissions = ('kms:ListKeyRotations',)
466 annotation_key = 'c7n:LastRotation'
468 def get_last_rotation(self, paginator, key_id):
469 last_rotation = None
470 page_iterator = paginator.paginate(KeyId=key_id)
471 try:
472 rotations = page_iterator.build_full_result().get('Rotations', [])
473 last_rotation = rotations and max(rotations, key=lambda x: x.get('RotationDate', 0))
474 except ClientError as err:
475 self.log.warning(err)
476 return last_rotation
478 def process(self, resources, event=None):
479 client = local_session(self.manager.session_factory).client('kms')
480 results = []
481 paginator = client.get_paginator('list_key_rotations')
483 for r in resources:
484 if 'c7n:LastRotation' not in r:
485 # If the key is already there, it's cached & we'll skip the API..
486 # If not, we need the API call.
487 r[self.annotation_key] = self.get_last_rotation(paginator, r['KeyId'])
489 if self.match(r[self.annotation_key]):
490 # Either we found a rotation date or we're filtering for keys
491 # without a rotation (the match on `None`).
492 results.append(r)
494 return results
497@Key.action_registry.register("schedule-deletion")
498class KmsKeyScheduleDeletion(BaseAction):
499 """Schedule KMS key deletion
501 If the number of days is not specified, the default value of 30 days is used.
502 The number of days must be between 7 and 30.
504 :example:
506 .. code-block:: yaml
508 policies:
509 - name: delete-tagged-keys
510 resource: kms-key
511 filters:
512 - type: value
513 key: tag:DeleteAfter
514 op: ge
515 value_type: age # age is a special value type that will be converted to a timestamp
516 value: 0
517 actions:
518 - type: schedule-deletion
519 days: 7
520 """
522 permissions = ("kms:ScheduleKeyDeletion",)
523 schema = type_schema("schedule-deletion", days={"type": "integer", "minimum": 7, "maximum": 30})
525 def process(self, keys):
526 client = local_session(self.manager.session_factory).client("kms")
527 for k in keys:
528 client.schedule_key_deletion(
529 KeyId=k["KeyId"], PendingWindowInDays=self.data.get("days", 30)
530 )