Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/kms.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

233 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from c7n.filters.iamaccess import _account, PolicyChecker 

4from botocore.exceptions import ClientError 

5 

6from datetime import datetime, timezone 

7import json 

8from collections import defaultdict 

9from functools import lru_cache 

10 

11from c7n.actions import RemovePolicyBase, BaseAction 

12from c7n.filters import Filter, CrossAccountAccessFilter, ValueFilter 

13from c7n.manager import resources 

14from c7n.query import ( 

15 ConfigSource, DescribeSource, QueryResourceManager, RetryPageIterator, TypeInfo) 

16from c7n.utils import local_session, type_schema, select_keys 

17from c7n.tags import universal_augment 

18 

19from .securityhub import PostFinding 

20 

21 

22class DescribeAlias(DescribeSource): 

23 

24 def augment(self, resources): 

25 return [r for r in resources if 'TargetKeyId' in r] 

26 

27 

28@resources.register('kms') 

29class KeyAlias(QueryResourceManager): 

30 

31 class resource_type(TypeInfo): 

32 service = 'kms' 

33 arn_type = 'alias' 

34 enum_spec = ('list_aliases', 'Aliases', None) 

35 name = "AliasName" 

36 id = "AliasArn" 

37 config_type = cfn_type = 'AWS::KMS::Alias' 

38 

39 source_mapping = {'describe': DescribeAlias, 'config': ConfigSource} 

40 

41 

42class DescribeKey(DescribeSource): 

43 

44 FetchThreshold = 10 # ie should we describe all keys or just fetch them directly 

45 

46 def get_resources(self, ids, cache=True): 

47 # this forms a threshold beyond which we'll fetch individual keys of interest. 

48 # else we'll need to fetch through the full set and client side filter. 

49 if len(ids) < self.FetchThreshold: 

50 client = local_session(self.manager.session_factory).client('kms') 

51 results = [] 

52 for rid in ids: 

53 try: 

54 results.append( 

55 self.manager.retry( 

56 client.describe_key, 

57 KeyId=rid)['KeyMetadata']) 

58 except client.exceptions.NotFoundException: 

59 continue 

60 return results 

61 return super().get_resources(ids, cache) 

62 

63 def augment(self, resources): 

64 client = local_session(self.manager.session_factory).client('kms') 

65 for r in resources: 

66 key_id = r.get('KeyId') 

67 

68 # We get `KeyArn` from list_keys and `Arn` from describe_key. 

69 # If we already have describe_key details we don't need to fetch 

70 # it again. 

71 if 'Arn' not in r: 

72 try: 

73 key_arn = r.get('KeyArn', key_id) 

74 key_detail = client.describe_key(KeyId=key_arn)['KeyMetadata'] 

75 r.update(key_detail) 

76 except ClientError as e: 

77 if e.response['Error']['Code'] == 'AccessDeniedException': 

78 self.manager.log.warning( 

79 "Access denied when describing key:%s", 

80 key_id) 

81 # If a describe fails, we still want the `Arn` key 

82 # available since it is a core attribute 

83 r['Arn'] = r['KeyArn'] 

84 else: 

85 raise 

86 

87 alias_names = self.manager.alias_map.get(key_id) 

88 if alias_names: 

89 r['AliasNames'] = alias_names 

90 

91 return universal_augment(self.manager, resources) 

92 

93 

94class ConfigKey(ConfigSource): 

95 

96 def load_resource(self, item): 

97 resource = super().load_resource(item) 

98 alias_names = self.manager.alias_map.get(resource[self.manager.resource_type.id]) 

99 if alias_names: 

100 resource['AliasNames'] = alias_names 

101 return resource 

102 

103 

104@resources.register('kms-key') 

105class Key(QueryResourceManager): 

106 

107 class resource_type(TypeInfo): 

108 service = 'kms' 

109 arn_type = "key" 

110 enum_spec = ('list_keys', 'Keys', None) 

111 detail_spec = ('describe_key', 'KeyId', 'Arn', 'KeyMetadata') # overriden 

112 name = id = "KeyId" 

113 arn = 'Arn' 

114 universal_taggable = True 

115 cfn_type = config_type = 'AWS::KMS::Key' 

116 permissions_augment = ("kms:ListResourceTags",) 

117 

118 source_mapping = { 

119 'config': ConfigKey, 

120 'describe': DescribeKey 

121 } 

122 

123 @property 

124 @lru_cache() 

125 def alias_map(self): 

126 """A dict mapping key IDs to aliases 

127 

128 Fetch key aliases as a flat list, and convert it to a map of 

129 key ID -> aliases. We can build this once and use it to 

130 augment key resources. 

131 """ 

132 aliases = KeyAlias(self.ctx, {}).resources() 

133 alias_map = defaultdict(list) 

134 for a in aliases: 

135 alias_map[a['TargetKeyId']].append(a['AliasName']) 

136 return alias_map 

137 

138 

139@Key.filter_registry.register('key-rotation-status') 

140class KeyRotationStatus(ValueFilter): 

141 """Filters KMS keys by the rotation status 

142 

143 :example: 

144 

145 .. code-block:: yaml 

146 

147 policies: 

148 - name: kms-key-disabled-rotation 

149 resource: kms-key 

150 filters: 

151 - type: key-rotation-status 

152 key: KeyRotationEnabled 

153 value: false 

154 """ 

155 

156 schema = type_schema('key-rotation-status', rinherit=ValueFilter.schema) 

157 schema_alias = False 

158 permissions = ('kms:GetKeyRotationStatus',) 

159 

160 def process(self, resources, event=None): 

161 client = local_session(self.manager.session_factory).client('kms') 

162 

163 def _key_rotation_status(resource): 

164 try: 

165 resource['KeyRotationEnabled'] = client.get_key_rotation_status( 

166 KeyId=resource['KeyId']) 

167 except ClientError as e: 

168 if e.response['Error']['Code'] == 'AccessDeniedException': 

169 self.log.warning( 

170 "Access denied when getting rotation status on key:%s", 

171 resource.get('KeyArn')) 

172 elif e.response['Error']['Code'] == 'UnsupportedOperationException': 

173 # This is expected for keys that do not support rotation 

174 # e.g. keys in custom keystores or when keys are in certain 

175 # states such as PendingImport. 

176 self.log.warning( 

177 "UnsupportedOperationException when getting rotation status on key:%s", 

178 resource.get('KeyArn')) 

179 else: 

180 raise 

181 

182 with self.executor_factory(max_workers=2) as w: 

183 query_resources = [ 

184 r for r in resources if 'KeyRotationEnabled' not in r] 

185 self.log.debug( 

186 "Querying %d kms-keys' rotation status" % len(query_resources)) 

187 list(w.map(_key_rotation_status, query_resources)) 

188 

189 return [r for r in resources if self.match( 

190 r.get('KeyRotationEnabled', {}))] 

191 

192 

193class KMSPolicyChecker(PolicyChecker): 

194 # https://docs.aws.amazon.com/kms/latest/developerguide/policy-conditions.html#conditions-kms 

195 

196 def handle_kms_calleraccount(self, s, c): 

197 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

198 

199 def handle_kms_viaservice(self, s, c): 

200 # We dont filter on service so all are presumed allowed 

201 return False 

202 

203 def handle_kms_grantoperations(self, s, c): 

204 # We dont filter on GrantOperations so all are presumed allowed 

205 return False 

206 

207 

208@Key.filter_registry.register('cross-account') 

209@KeyAlias.filter_registry.register('cross-account') 

210class KMSCrossAccountAccessFilter(CrossAccountAccessFilter): 

211 """Filter KMS keys which have cross account permissions 

212 

213 :example: 

214 

215 .. code-block:: yaml 

216 

217 policies: 

218 - name: check-kms-key-cross-account 

219 resource: kms-key 

220 filters: 

221 - type: cross-account 

222 """ 

223 permissions = ('kms:GetKeyPolicy',) 

224 

225 checker_factory = KMSPolicyChecker 

226 

227 def process(self, resources, event=None): 

228 client = local_session( 

229 self.manager.session_factory).client('kms') 

230 

231 def _augment(r): 

232 key_id = r.get('TargetKeyId', r.get('KeyId')) 

233 assert key_id, "Invalid key resources %s" % r 

234 r['Policy'] = client.get_key_policy( 

235 KeyId=key_id, PolicyName='default')['Policy'] 

236 return r 

237 

238 self.log.debug("fetching policy for %d kms keys" % len(resources)) 

239 with self.executor_factory(max_workers=1) as w: 

240 resources = list(filter(None, w.map(_augment, resources))) 

241 

242 return super(KMSCrossAccountAccessFilter, self).process( 

243 resources, event) 

244 

245 

246@KeyAlias.filter_registry.register('grant-count') 

247class GrantCount(Filter): 

248 """Filters KMS key grants 

249 

250 This can be used to ensure issues around grant limits are monitored 

251 

252 :example: 

253 

254 .. code-block:: yaml 

255 

256 policies: 

257 - name: kms-grants 

258 resource: kms 

259 filters: 

260 - type: grant-count 

261 min: 100 

262 """ 

263 

264 schema = type_schema( 

265 'grant-count', min={'type': 'integer', 'minimum': 0}) 

266 permissions = ('kms:ListGrants',) 

267 

268 def process(self, keys, event=None): 

269 client = local_session(self.manager.session_factory).client('kms') 

270 results = [] 

271 for k in keys: 

272 results.append(self.process_key(client, k)) 

273 return [r for r in results if r] 

274 

275 def process_key(self, client, key): 

276 p = client.get_paginator('list_grants') 

277 p.PAGE_ITERATOR_CLS = RetryPageIterator 

278 grant_count = 0 

279 for rp in p.paginate(KeyId=key['TargetKeyId']): 

280 grant_count += len(rp['Grants']) 

281 key['GrantCount'] = grant_count 

282 

283 grant_threshold = self.data.get('min', 5) 

284 if grant_count < grant_threshold: 

285 return None 

286 

287 self.manager.ctx.metrics.put_metric( 

288 "ExtantGrants", grant_count, "Count", 

289 Scope=key['AliasName'][6:]) 

290 

291 return key 

292 

293 

294class ResourceKmsKeyAlias(ValueFilter): 

295 

296 schema = type_schema('kms-alias', rinherit=ValueFilter.schema) 

297 schema_alias = False 

298 

299 def get_permissions(self): 

300 return KeyAlias(self.manager.ctx, {}).get_permissions() 

301 

302 def get_matching_aliases(self, resources, event=None): 

303 key_aliases = KeyAlias(self.manager.ctx, {}).resources() 

304 key_aliases_dict = {a['TargetKeyId']: a for a in key_aliases} 

305 

306 matched = [] 

307 for r in resources: 

308 if r.get('KmsKeyId'): 

309 r['KeyAlias'] = key_aliases_dict.get( 

310 r.get('KmsKeyId').split("key/", 1)[-1]) 

311 if self.match(r.get('KeyAlias')): 

312 matched.append(r) 

313 return matched 

314 

315 

316@Key.action_registry.register('remove-statements') 

317@KeyAlias.action_registry.register('remove-statements') 

318class RemovePolicyStatement(RemovePolicyBase): 

319 """Action to remove policy statements from KMS 

320 

321 :example: 

322 

323 .. code-block:: yaml 

324 

325 policies: 

326 - name: kms-key-cross-account 

327 resource: kms-key 

328 filters: 

329 - type: cross-account 

330 actions: 

331 - type: remove-statements 

332 statement_ids: matched 

333 """ 

334 

335 permissions = ('kms:GetKeyPolicy', 'kms:PutKeyPolicy') 

336 

337 def process(self, resources): 

338 results = [] 

339 client = local_session(self.manager.session_factory).client('kms') 

340 for r in resources: 

341 key_id = r.get('TargetKeyId', r.get('KeyId')) 

342 assert key_id, "Invalid key resources %s" % r 

343 try: 

344 results += filter(None, [self.process_resource(client, r, key_id)]) 

345 except Exception: 

346 self.log.exception( 

347 "Error processing sns:%s", key_id) 

348 return results 

349 

350 def process_resource(self, client, resource, key_id): 

351 if 'Policy' not in resource: 

352 try: 

353 resource['Policy'] = client.get_key_policy( 

354 KeyId=key_id, PolicyName='default')['Policy'] 

355 except ClientError as e: 

356 if e.response['Error']['Code'] != "NotFoundException": 

357 raise 

358 resource['Policy'] = None 

359 

360 if not resource['Policy']: 

361 return 

362 

363 p = json.loads(resource['Policy']) 

364 _, found = self.process_policy( 

365 p, resource, CrossAccountAccessFilter.annotation_key) 

366 

367 if not found: 

368 return 

369 

370 # NB: KMS supports only one key policy 'default' 

371 # http://docs.aws.amazon.com/kms/latest/developerguide/programming-key-policies.html#list-policies 

372 client.put_key_policy( 

373 KeyId=key_id, 

374 PolicyName='default', 

375 Policy=json.dumps(p) 

376 ) 

377 

378 return {'Name': key_id, 

379 'State': 'PolicyRemoved', 

380 'Statements': found} 

381 

382 

383@Key.action_registry.register('set-rotation') 

384class KmsKeyRotation(BaseAction): 

385 """Toggle KMS key rotation 

386 

387 :example: 

388 

389 .. code-block:: yaml 

390 

391 policies: 

392 - name: enable-cmk-rotation 

393 resource: kms-key 

394 filters: 

395 - type: key-rotation-status 

396 key: KeyRotationEnabled 

397 value: False 

398 actions: 

399 - type: set-rotation 

400 state: True 

401 """ 

402 permissions = ('kms:EnableKeyRotation',) 

403 schema = type_schema('set-rotation', state={'type': 'boolean'}) 

404 

405 def process(self, keys): 

406 client = local_session(self.manager.session_factory).client('kms') 

407 for k in keys: 

408 if self.data.get('state', True): 

409 client.enable_key_rotation(KeyId=k['KeyId']) 

410 continue 

411 client.disable_key_rotation(KeyId=k['KeyId']) 

412 

413 

414@KeyAlias.action_registry.register('post-finding') 

415@Key.action_registry.register('post-finding') 

416class KmsPostFinding(PostFinding): 

417 

418 resource_type = 'AwsKmsKey' 

419 

420 def format_resource(self, r): 

421 if 'TargetKeyId' in r: 

422 resolved = self.manager.get_resource_manager( 

423 'kms-key').get_resources([r['TargetKeyId']]) 

424 if not resolved: 

425 return 

426 r = resolved[0] 

427 r[self.manager.resource_type.id] = r['KeyId'] 

428 envelope, payload = self.format_envelope(r) 

429 payload.update(self.filter_empty( 

430 select_keys(r, [ 

431 'AWSAccount', 'CreationDate', 'KeyId', 

432 'KeyManager', 'Origin', 'KeyState']))) 

433 

434 # Securityhub expects a unix timestamp for CreationDate 

435 if 'CreationDate' in payload and isinstance(payload['CreationDate'], datetime): 

436 payload['CreationDate'] = ( 

437 payload['CreationDate'].replace(tzinfo=timezone.utc).timestamp() 

438 ) 

439 

440 return envelope 

441 

442 

443@Key.action_registry.register("schedule-deletion") 

444class KmsKeyScheduleDeletion(BaseAction): 

445 """Schedule KMS key deletion 

446 

447 If the number of days is not specified, the default value of 30 days is used. 

448 The number of days must be between 7 and 30. 

449 

450 :example: 

451 

452 .. code-block:: yaml 

453 

454 policies: 

455 - name: delete-tagged-keys 

456 resource: kms-key 

457 filters: 

458 - type: value 

459 key: tag:DeleteAfter 

460 op: ge 

461 value_type: age # age is a special value type that will be converted to a timestamp 

462 value: 0 

463 actions: 

464 - type: schedule-deletion 

465 days: 7 

466 """ 

467 

468 permissions = ("kms:ScheduleKeyDeletion",) 

469 schema = type_schema("schedule-deletion", days={"type": "integer", "minimum": 7, "maximum": 30}) 

470 

471 def process(self, keys): 

472 client = local_session(self.manager.session_factory).client("kms") 

473 for k in keys: 

474 client.schedule_key_deletion( 

475 KeyId=k["KeyId"], PendingWindowInDays=self.data.get("days", 30) 

476 )