Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/kms.py: 40%

219 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from c7n.filters.iamaccess import _account, PolicyChecker 

4from botocore.exceptions import ClientError 

5 

6from datetime import datetime, timezone 

7import json 

8from collections import defaultdict 

9from functools import lru_cache 

10 

11from c7n.actions import RemovePolicyBase, BaseAction 

12from c7n.filters import Filter, CrossAccountAccessFilter, ValueFilter 

13from c7n.manager import resources 

14from c7n.query import ( 

15 ConfigSource, DescribeSource, QueryResourceManager, RetryPageIterator, TypeInfo) 

16from c7n.utils import local_session, type_schema, select_keys 

17from c7n.tags import universal_augment 

18 

19from .securityhub import PostFinding 

20 

21 

22@resources.register('kms') 

23class KeyAlias(QueryResourceManager): 

24 

25 class resource_type(TypeInfo): 

26 service = 'kms' 

27 arn_type = 'alias' 

28 enum_spec = ('list_aliases', 'Aliases', None) 

29 name = "AliasName" 

30 id = "AliasArn" 

31 cfn_type = 'AWS::KMS::Alias' 

32 

33 def augment(self, resources): 

34 return [r for r in resources if 'TargetKeyId' in r] 

35 

36 

37class DescribeKey(DescribeSource): 

38 

39 FetchThreshold = 10 # ie should we describe all keys or just fetch them directly 

40 

41 def get_resources(self, ids, cache=True): 

42 # this forms a threshold beyond which we'll fetch individual keys of interest. 

43 # else we'll need to fetch through the full set and client side filter. 

44 if len(ids) < self.FetchThreshold: 

45 client = local_session(self.manager.session_factory).client('kms') 

46 results = [] 

47 for rid in ids: 

48 try: 

49 results.append( 

50 self.manager.retry( 

51 client.describe_key, 

52 KeyId=rid)['KeyMetadata']) 

53 except client.exceptions.NotFoundException: 

54 continue 

55 return results 

56 return super().get_resources(ids, cache) 

57 

58 def augment(self, resources): 

59 client = local_session(self.manager.session_factory).client('kms') 

60 for r in resources: 

61 key_id = r.get('KeyId') 

62 

63 # We get `KeyArn` from list_keys and `Arn` from describe_key. 

64 # If we already have describe_key details we don't need to fetch 

65 # it again. 

66 if 'Arn' not in r: 

67 try: 

68 key_arn = r.get('KeyArn', key_id) 

69 key_detail = client.describe_key(KeyId=key_arn)['KeyMetadata'] 

70 r.update(key_detail) 

71 except ClientError as e: 

72 if e.response['Error']['Code'] == 'AccessDeniedException': 

73 self.manager.log.warning( 

74 "Access denied when describing key:%s", 

75 key_id) 

76 # If a describe fails, we still want the `Arn` key 

77 # available since it is a core attribute 

78 r['Arn'] = r['KeyArn'] 

79 else: 

80 raise 

81 

82 alias_names = self.manager.alias_map.get(key_id) 

83 if alias_names: 

84 r['AliasNames'] = alias_names 

85 

86 return universal_augment(self.manager, resources) 

87 

88 

89class ConfigKey(ConfigSource): 

90 

91 def load_resource(self, item): 

92 resource = super().load_resource(item) 

93 alias_names = self.manager.alias_map.get(resource[self.manager.resource_type.id]) 

94 if alias_names: 

95 resource['AliasNames'] = alias_names 

96 return resource 

97 

98 

99@resources.register('kms-key') 

100class Key(QueryResourceManager): 

101 

102 class resource_type(TypeInfo): 

103 service = 'kms' 

104 arn_type = "key" 

105 enum_spec = ('list_keys', 'Keys', None) 

106 detail_spec = ('describe_key', 'KeyId', 'Arn', 'KeyMetadata') # overriden 

107 name = id = "KeyId" 

108 arn = 'Arn' 

109 universal_taggable = True 

110 cfn_type = config_type = 'AWS::KMS::Key' 

111 

112 source_mapping = { 

113 'config': ConfigKey, 

114 'describe': DescribeKey 

115 } 

116 

117 @property 

118 @lru_cache() 

119 def alias_map(self): 

120 """A dict mapping key IDs to aliases 

121 

122 Fetch key aliases as a flat list, and convert it to a map of 

123 key ID -> aliases. We can build this once and use it to 

124 augment key resources. 

125 """ 

126 aliases = KeyAlias(self.ctx, {}).resources() 

127 alias_map = defaultdict(list) 

128 for a in aliases: 

129 alias_map[a['TargetKeyId']].append(a['AliasName']) 

130 return alias_map 

131 

132 

133@Key.filter_registry.register('key-rotation-status') 

134class KeyRotationStatus(ValueFilter): 

135 """Filters KMS keys by the rotation status 

136 

137 :example: 

138 

139 .. code-block:: yaml 

140 

141 policies: 

142 - name: kms-key-disabled-rotation 

143 resource: kms-key 

144 filters: 

145 - type: key-rotation-status 

146 key: KeyRotationEnabled 

147 value: false 

148 """ 

149 

150 schema = type_schema('key-rotation-status', rinherit=ValueFilter.schema) 

151 schema_alias = False 

152 permissions = ('kms:GetKeyRotationStatus',) 

153 

154 def process(self, resources, event=None): 

155 client = local_session(self.manager.session_factory).client('kms') 

156 

157 def _key_rotation_status(resource): 

158 try: 

159 resource['KeyRotationEnabled'] = client.get_key_rotation_status( 

160 KeyId=resource['KeyId']) 

161 except ClientError as e: 

162 if e.response['Error']['Code'] == 'AccessDeniedException': 

163 self.log.warning( 

164 "Access denied when getting rotation status on key:%s", 

165 resource.get('KeyArn')) 

166 else: 

167 raise 

168 

169 with self.executor_factory(max_workers=2) as w: 

170 query_resources = [ 

171 r for r in resources if 'KeyRotationEnabled' not in r] 

172 self.log.debug( 

173 "Querying %d kms-keys' rotation status" % len(query_resources)) 

174 list(w.map(_key_rotation_status, query_resources)) 

175 

176 return [r for r in resources if self.match( 

177 r.get('KeyRotationEnabled', {}))] 

178 

179 

180class KMSPolicyChecker(PolicyChecker): 

181 # https://docs.aws.amazon.com/kms/latest/developerguide/policy-conditions.html#conditions-kms 

182 

183 def handle_kms_calleraccount(self, s, c): 

184 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

185 

186 def handle_kms_viaservice(self, s, c): 

187 # We dont filter on service so all are presumed allowed 

188 return False 

189 

190 def handle_kms_grantoperations(self, s, c): 

191 # We dont filter on GrantOperations so all are presumed allowed 

192 return False 

193 

194 

195@Key.filter_registry.register('cross-account') 

196@KeyAlias.filter_registry.register('cross-account') 

197class KMSCrossAccountAccessFilter(CrossAccountAccessFilter): 

198 """Filter KMS keys which have cross account permissions 

199 

200 :example: 

201 

202 .. code-block:: yaml 

203 

204 policies: 

205 - name: check-kms-key-cross-account 

206 resource: kms-key 

207 filters: 

208 - type: cross-account 

209 """ 

210 permissions = ('kms:GetKeyPolicy',) 

211 

212 checker_factory = KMSPolicyChecker 

213 

214 def process(self, resources, event=None): 

215 client = local_session( 

216 self.manager.session_factory).client('kms') 

217 

218 def _augment(r): 

219 key_id = r.get('TargetKeyId', r.get('KeyId')) 

220 assert key_id, "Invalid key resources %s" % r 

221 r['Policy'] = client.get_key_policy( 

222 KeyId=key_id, PolicyName='default')['Policy'] 

223 return r 

224 

225 self.log.debug("fetching policy for %d kms keys" % len(resources)) 

226 with self.executor_factory(max_workers=1) as w: 

227 resources = list(filter(None, w.map(_augment, resources))) 

228 

229 return super(KMSCrossAccountAccessFilter, self).process( 

230 resources, event) 

231 

232 

233@KeyAlias.filter_registry.register('grant-count') 

234class GrantCount(Filter): 

235 """Filters KMS key grants 

236 

237 This can be used to ensure issues around grant limits are monitored 

238 

239 :example: 

240 

241 .. code-block:: yaml 

242 

243 policies: 

244 - name: kms-grants 

245 resource: kms 

246 filters: 

247 - type: grant-count 

248 min: 100 

249 """ 

250 

251 schema = type_schema( 

252 'grant-count', min={'type': 'integer', 'minimum': 0}) 

253 permissions = ('kms:ListGrants',) 

254 

255 def process(self, keys, event=None): 

256 client = local_session(self.manager.session_factory).client('kms') 

257 results = [] 

258 for k in keys: 

259 results.append(self.process_key(client, k)) 

260 return [r for r in results if r] 

261 

262 def process_key(self, client, key): 

263 p = client.get_paginator('list_grants') 

264 p.PAGE_ITERATOR_CLS = RetryPageIterator 

265 grant_count = 0 

266 for rp in p.paginate(KeyId=key['TargetKeyId']): 

267 grant_count += len(rp['Grants']) 

268 key['GrantCount'] = grant_count 

269 

270 grant_threshold = self.data.get('min', 5) 

271 if grant_count < grant_threshold: 

272 return None 

273 

274 self.manager.ctx.metrics.put_metric( 

275 "ExtantGrants", grant_count, "Count", 

276 Scope=key['AliasName'][6:]) 

277 

278 return key 

279 

280 

281class ResourceKmsKeyAlias(ValueFilter): 

282 

283 schema = type_schema('kms-alias', rinherit=ValueFilter.schema) 

284 schema_alias = False 

285 

286 def get_permissions(self): 

287 return KeyAlias(self.manager.ctx, {}).get_permissions() 

288 

289 def get_matching_aliases(self, resources, event=None): 

290 key_aliases = KeyAlias(self.manager.ctx, {}).resources() 

291 key_aliases_dict = {a['TargetKeyId']: a for a in key_aliases} 

292 

293 matched = [] 

294 for r in resources: 

295 if r.get('KmsKeyId'): 

296 r['KeyAlias'] = key_aliases_dict.get( 

297 r.get('KmsKeyId').split("key/", 1)[-1]) 

298 if self.match(r.get('KeyAlias')): 

299 matched.append(r) 

300 return matched 

301 

302 

303@Key.action_registry.register('remove-statements') 

304@KeyAlias.action_registry.register('remove-statements') 

305class RemovePolicyStatement(RemovePolicyBase): 

306 """Action to remove policy statements from KMS 

307 

308 :example: 

309 

310 .. code-block:: yaml 

311 

312 policies: 

313 - name: kms-key-cross-account 

314 resource: kms-key 

315 filters: 

316 - type: cross-account 

317 actions: 

318 - type: remove-statements 

319 statement_ids: matched 

320 """ 

321 

322 permissions = ('kms:GetKeyPolicy', 'kms:PutKeyPolicy') 

323 

324 def process(self, resources): 

325 results = [] 

326 client = local_session(self.manager.session_factory).client('kms') 

327 for r in resources: 

328 key_id = r.get('TargetKeyId', r.get('KeyId')) 

329 assert key_id, "Invalid key resources %s" % r 

330 try: 

331 results += filter(None, [self.process_resource(client, r, key_id)]) 

332 except Exception: 

333 self.log.exception( 

334 "Error processing sns:%s", key_id) 

335 return results 

336 

337 def process_resource(self, client, resource, key_id): 

338 if 'Policy' not in resource: 

339 try: 

340 resource['Policy'] = client.get_key_policy( 

341 KeyId=key_id, PolicyName='default')['Policy'] 

342 except ClientError as e: 

343 if e.response['Error']['Code'] != "NotFoundException": 

344 raise 

345 resource['Policy'] = None 

346 

347 if not resource['Policy']: 

348 return 

349 

350 p = json.loads(resource['Policy']) 

351 statements, found = self.process_policy( 

352 p, resource, CrossAccountAccessFilter.annotation_key) 

353 

354 if not found: 

355 return 

356 

357 # NB: KMS supports only one key policy 'default' 

358 # http://docs.aws.amazon.com/kms/latest/developerguide/programming-key-policies.html#list-policies 

359 client.put_key_policy( 

360 KeyId=key_id, 

361 PolicyName='default', 

362 Policy=json.dumps(p) 

363 ) 

364 

365 return {'Name': key_id, 

366 'State': 'PolicyRemoved', 

367 'Statements': found} 

368 

369 

370@Key.action_registry.register('set-rotation') 

371class KmsKeyRotation(BaseAction): 

372 """Toggle KMS key rotation 

373 

374 :example: 

375 

376 .. code-block:: yaml 

377 

378 policies: 

379 - name: enable-cmk-rotation 

380 resource: kms-key 

381 filters: 

382 - type: key-rotation-status 

383 key: KeyRotationEnabled 

384 value: False 

385 actions: 

386 - type: set-rotation 

387 state: True 

388 """ 

389 permissions = ('kms:EnableKeyRotation',) 

390 schema = type_schema('set-rotation', state={'type': 'boolean'}) 

391 

392 def process(self, keys): 

393 client = local_session(self.manager.session_factory).client('kms') 

394 for k in keys: 

395 if self.data.get('state', True): 

396 client.enable_key_rotation(KeyId=k['KeyId']) 

397 continue 

398 client.disable_key_rotation(KeyId=k['KeyId']) 

399 

400 

401@KeyAlias.action_registry.register('post-finding') 

402@Key.action_registry.register('post-finding') 

403class KmsPostFinding(PostFinding): 

404 

405 resource_type = 'AwsKmsKey' 

406 

407 def format_resource(self, r): 

408 if 'TargetKeyId' in r: 

409 resolved = self.manager.get_resource_manager( 

410 'kms-key').get_resources([r['TargetKeyId']]) 

411 if not resolved: 

412 return 

413 r = resolved[0] 

414 r[self.manager.resource_type.id] = r['KeyId'] 

415 envelope, payload = self.format_envelope(r) 

416 payload.update(self.filter_empty( 

417 select_keys(r, [ 

418 'AWSAccount', 'CreationDate', 'KeyId', 

419 'KeyManager', 'Origin', 'KeyState']))) 

420 

421 # Securityhub expects a unix timestamp for CreationDate 

422 if 'CreationDate' in payload and isinstance(payload['CreationDate'], datetime): 

423 payload['CreationDate'] = ( 

424 payload['CreationDate'].replace(tzinfo=timezone.utc).timestamp() 

425 ) 

426 

427 return envelope