Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/kms.py: 40%
219 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from c7n.filters.iamaccess import _account, PolicyChecker
4from botocore.exceptions import ClientError
6from datetime import datetime, timezone
7import json
8from collections import defaultdict
9from functools import lru_cache
11from c7n.actions import RemovePolicyBase, BaseAction
12from c7n.filters import Filter, CrossAccountAccessFilter, ValueFilter
13from c7n.manager import resources
14from c7n.query import (
15 ConfigSource, DescribeSource, QueryResourceManager, RetryPageIterator, TypeInfo)
16from c7n.utils import local_session, type_schema, select_keys
17from c7n.tags import universal_augment
19from .securityhub import PostFinding
22@resources.register('kms')
23class KeyAlias(QueryResourceManager):
25 class resource_type(TypeInfo):
26 service = 'kms'
27 arn_type = 'alias'
28 enum_spec = ('list_aliases', 'Aliases', None)
29 name = "AliasName"
30 id = "AliasArn"
31 cfn_type = 'AWS::KMS::Alias'
33 def augment(self, resources):
34 return [r for r in resources if 'TargetKeyId' in r]
37class DescribeKey(DescribeSource):
39 FetchThreshold = 10 # ie should we describe all keys or just fetch them directly
41 def get_resources(self, ids, cache=True):
42 # this forms a threshold beyond which we'll fetch individual keys of interest.
43 # else we'll need to fetch through the full set and client side filter.
44 if len(ids) < self.FetchThreshold:
45 client = local_session(self.manager.session_factory).client('kms')
46 results = []
47 for rid in ids:
48 try:
49 results.append(
50 self.manager.retry(
51 client.describe_key,
52 KeyId=rid)['KeyMetadata'])
53 except client.exceptions.NotFoundException:
54 continue
55 return results
56 return super().get_resources(ids, cache)
58 def augment(self, resources):
59 client = local_session(self.manager.session_factory).client('kms')
60 for r in resources:
61 key_id = r.get('KeyId')
63 # We get `KeyArn` from list_keys and `Arn` from describe_key.
64 # If we already have describe_key details we don't need to fetch
65 # it again.
66 if 'Arn' not in r:
67 try:
68 key_arn = r.get('KeyArn', key_id)
69 key_detail = client.describe_key(KeyId=key_arn)['KeyMetadata']
70 r.update(key_detail)
71 except ClientError as e:
72 if e.response['Error']['Code'] == 'AccessDeniedException':
73 self.manager.log.warning(
74 "Access denied when describing key:%s",
75 key_id)
76 # If a describe fails, we still want the `Arn` key
77 # available since it is a core attribute
78 r['Arn'] = r['KeyArn']
79 else:
80 raise
82 alias_names = self.manager.alias_map.get(key_id)
83 if alias_names:
84 r['AliasNames'] = alias_names
86 return universal_augment(self.manager, resources)
89class ConfigKey(ConfigSource):
91 def load_resource(self, item):
92 resource = super().load_resource(item)
93 alias_names = self.manager.alias_map.get(resource[self.manager.resource_type.id])
94 if alias_names:
95 resource['AliasNames'] = alias_names
96 return resource
99@resources.register('kms-key')
100class Key(QueryResourceManager):
102 class resource_type(TypeInfo):
103 service = 'kms'
104 arn_type = "key"
105 enum_spec = ('list_keys', 'Keys', None)
106 detail_spec = ('describe_key', 'KeyId', 'Arn', 'KeyMetadata') # overriden
107 name = id = "KeyId"
108 arn = 'Arn'
109 universal_taggable = True
110 cfn_type = config_type = 'AWS::KMS::Key'
112 source_mapping = {
113 'config': ConfigKey,
114 'describe': DescribeKey
115 }
117 @property
118 @lru_cache()
119 def alias_map(self):
120 """A dict mapping key IDs to aliases
122 Fetch key aliases as a flat list, and convert it to a map of
123 key ID -> aliases. We can build this once and use it to
124 augment key resources.
125 """
126 aliases = KeyAlias(self.ctx, {}).resources()
127 alias_map = defaultdict(list)
128 for a in aliases:
129 alias_map[a['TargetKeyId']].append(a['AliasName'])
130 return alias_map
133@Key.filter_registry.register('key-rotation-status')
134class KeyRotationStatus(ValueFilter):
135 """Filters KMS keys by the rotation status
137 :example:
139 .. code-block:: yaml
141 policies:
142 - name: kms-key-disabled-rotation
143 resource: kms-key
144 filters:
145 - type: key-rotation-status
146 key: KeyRotationEnabled
147 value: false
148 """
150 schema = type_schema('key-rotation-status', rinherit=ValueFilter.schema)
151 schema_alias = False
152 permissions = ('kms:GetKeyRotationStatus',)
154 def process(self, resources, event=None):
155 client = local_session(self.manager.session_factory).client('kms')
157 def _key_rotation_status(resource):
158 try:
159 resource['KeyRotationEnabled'] = client.get_key_rotation_status(
160 KeyId=resource['KeyId'])
161 except ClientError as e:
162 if e.response['Error']['Code'] == 'AccessDeniedException':
163 self.log.warning(
164 "Access denied when getting rotation status on key:%s",
165 resource.get('KeyArn'))
166 else:
167 raise
169 with self.executor_factory(max_workers=2) as w:
170 query_resources = [
171 r for r in resources if 'KeyRotationEnabled' not in r]
172 self.log.debug(
173 "Querying %d kms-keys' rotation status" % len(query_resources))
174 list(w.map(_key_rotation_status, query_resources))
176 return [r for r in resources if self.match(
177 r.get('KeyRotationEnabled', {}))]
180class KMSPolicyChecker(PolicyChecker):
181 # https://docs.aws.amazon.com/kms/latest/developerguide/policy-conditions.html#conditions-kms
183 def handle_kms_calleraccount(self, s, c):
184 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts))
186 def handle_kms_viaservice(self, s, c):
187 # We dont filter on service so all are presumed allowed
188 return False
190 def handle_kms_grantoperations(self, s, c):
191 # We dont filter on GrantOperations so all are presumed allowed
192 return False
195@Key.filter_registry.register('cross-account')
196@KeyAlias.filter_registry.register('cross-account')
197class KMSCrossAccountAccessFilter(CrossAccountAccessFilter):
198 """Filter KMS keys which have cross account permissions
200 :example:
202 .. code-block:: yaml
204 policies:
205 - name: check-kms-key-cross-account
206 resource: kms-key
207 filters:
208 - type: cross-account
209 """
210 permissions = ('kms:GetKeyPolicy',)
212 checker_factory = KMSPolicyChecker
214 def process(self, resources, event=None):
215 client = local_session(
216 self.manager.session_factory).client('kms')
218 def _augment(r):
219 key_id = r.get('TargetKeyId', r.get('KeyId'))
220 assert key_id, "Invalid key resources %s" % r
221 r['Policy'] = client.get_key_policy(
222 KeyId=key_id, PolicyName='default')['Policy']
223 return r
225 self.log.debug("fetching policy for %d kms keys" % len(resources))
226 with self.executor_factory(max_workers=1) as w:
227 resources = list(filter(None, w.map(_augment, resources)))
229 return super(KMSCrossAccountAccessFilter, self).process(
230 resources, event)
233@KeyAlias.filter_registry.register('grant-count')
234class GrantCount(Filter):
235 """Filters KMS key grants
237 This can be used to ensure issues around grant limits are monitored
239 :example:
241 .. code-block:: yaml
243 policies:
244 - name: kms-grants
245 resource: kms
246 filters:
247 - type: grant-count
248 min: 100
249 """
251 schema = type_schema(
252 'grant-count', min={'type': 'integer', 'minimum': 0})
253 permissions = ('kms:ListGrants',)
255 def process(self, keys, event=None):
256 client = local_session(self.manager.session_factory).client('kms')
257 results = []
258 for k in keys:
259 results.append(self.process_key(client, k))
260 return [r for r in results if r]
262 def process_key(self, client, key):
263 p = client.get_paginator('list_grants')
264 p.PAGE_ITERATOR_CLS = RetryPageIterator
265 grant_count = 0
266 for rp in p.paginate(KeyId=key['TargetKeyId']):
267 grant_count += len(rp['Grants'])
268 key['GrantCount'] = grant_count
270 grant_threshold = self.data.get('min', 5)
271 if grant_count < grant_threshold:
272 return None
274 self.manager.ctx.metrics.put_metric(
275 "ExtantGrants", grant_count, "Count",
276 Scope=key['AliasName'][6:])
278 return key
281class ResourceKmsKeyAlias(ValueFilter):
283 schema = type_schema('kms-alias', rinherit=ValueFilter.schema)
284 schema_alias = False
286 def get_permissions(self):
287 return KeyAlias(self.manager.ctx, {}).get_permissions()
289 def get_matching_aliases(self, resources, event=None):
290 key_aliases = KeyAlias(self.manager.ctx, {}).resources()
291 key_aliases_dict = {a['TargetKeyId']: a for a in key_aliases}
293 matched = []
294 for r in resources:
295 if r.get('KmsKeyId'):
296 r['KeyAlias'] = key_aliases_dict.get(
297 r.get('KmsKeyId').split("key/", 1)[-1])
298 if self.match(r.get('KeyAlias')):
299 matched.append(r)
300 return matched
303@Key.action_registry.register('remove-statements')
304@KeyAlias.action_registry.register('remove-statements')
305class RemovePolicyStatement(RemovePolicyBase):
306 """Action to remove policy statements from KMS
308 :example:
310 .. code-block:: yaml
312 policies:
313 - name: kms-key-cross-account
314 resource: kms-key
315 filters:
316 - type: cross-account
317 actions:
318 - type: remove-statements
319 statement_ids: matched
320 """
322 permissions = ('kms:GetKeyPolicy', 'kms:PutKeyPolicy')
324 def process(self, resources):
325 results = []
326 client = local_session(self.manager.session_factory).client('kms')
327 for r in resources:
328 key_id = r.get('TargetKeyId', r.get('KeyId'))
329 assert key_id, "Invalid key resources %s" % r
330 try:
331 results += filter(None, [self.process_resource(client, r, key_id)])
332 except Exception:
333 self.log.exception(
334 "Error processing sns:%s", key_id)
335 return results
337 def process_resource(self, client, resource, key_id):
338 if 'Policy' not in resource:
339 try:
340 resource['Policy'] = client.get_key_policy(
341 KeyId=key_id, PolicyName='default')['Policy']
342 except ClientError as e:
343 if e.response['Error']['Code'] != "NotFoundException":
344 raise
345 resource['Policy'] = None
347 if not resource['Policy']:
348 return
350 p = json.loads(resource['Policy'])
351 statements, found = self.process_policy(
352 p, resource, CrossAccountAccessFilter.annotation_key)
354 if not found:
355 return
357 # NB: KMS supports only one key policy 'default'
358 # http://docs.aws.amazon.com/kms/latest/developerguide/programming-key-policies.html#list-policies
359 client.put_key_policy(
360 KeyId=key_id,
361 PolicyName='default',
362 Policy=json.dumps(p)
363 )
365 return {'Name': key_id,
366 'State': 'PolicyRemoved',
367 'Statements': found}
370@Key.action_registry.register('set-rotation')
371class KmsKeyRotation(BaseAction):
372 """Toggle KMS key rotation
374 :example:
376 .. code-block:: yaml
378 policies:
379 - name: enable-cmk-rotation
380 resource: kms-key
381 filters:
382 - type: key-rotation-status
383 key: KeyRotationEnabled
384 value: False
385 actions:
386 - type: set-rotation
387 state: True
388 """
389 permissions = ('kms:EnableKeyRotation',)
390 schema = type_schema('set-rotation', state={'type': 'boolean'})
392 def process(self, keys):
393 client = local_session(self.manager.session_factory).client('kms')
394 for k in keys:
395 if self.data.get('state', True):
396 client.enable_key_rotation(KeyId=k['KeyId'])
397 continue
398 client.disable_key_rotation(KeyId=k['KeyId'])
401@KeyAlias.action_registry.register('post-finding')
402@Key.action_registry.register('post-finding')
403class KmsPostFinding(PostFinding):
405 resource_type = 'AwsKmsKey'
407 def format_resource(self, r):
408 if 'TargetKeyId' in r:
409 resolved = self.manager.get_resource_manager(
410 'kms-key').get_resources([r['TargetKeyId']])
411 if not resolved:
412 return
413 r = resolved[0]
414 r[self.manager.resource_type.id] = r['KeyId']
415 envelope, payload = self.format_envelope(r)
416 payload.update(self.filter_empty(
417 select_keys(r, [
418 'AWSAccount', 'CreationDate', 'KeyId',
419 'KeyManager', 'Origin', 'KeyState'])))
421 # Securityhub expects a unix timestamp for CreationDate
422 if 'CreationDate' in payload and isinstance(payload['CreationDate'], datetime):
423 payload['CreationDate'] = (
424 payload['CreationDate'].replace(tzinfo=timezone.utc).timestamp()
425 )
427 return envelope