Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/kms.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

222 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from c7n.filters.iamaccess import _account, PolicyChecker 

4from botocore.exceptions import ClientError 

5 

6from datetime import datetime, timezone 

7import json 

8from collections import defaultdict 

9from functools import lru_cache 

10 

11from c7n.actions import RemovePolicyBase, BaseAction 

12from c7n.filters import Filter, CrossAccountAccessFilter, ValueFilter 

13from c7n.manager import resources 

14from c7n.query import ( 

15 ConfigSource, DescribeSource, QueryResourceManager, RetryPageIterator, TypeInfo) 

16from c7n.utils import local_session, type_schema, select_keys 

17from c7n.tags import universal_augment 

18 

19from .securityhub import PostFinding 

20 

21 

22class DescribeAlias(DescribeSource): 

23 

24 def augment(self, resources): 

25 return [r for r in resources if 'TargetKeyId' in r] 

26 

27 

28@resources.register('kms') 

29class KeyAlias(QueryResourceManager): 

30 

31 class resource_type(TypeInfo): 

32 service = 'kms' 

33 arn_type = 'alias' 

34 enum_spec = ('list_aliases', 'Aliases', None) 

35 name = "AliasName" 

36 id = "AliasArn" 

37 config_type = cfn_type = 'AWS::KMS::Alias' 

38 

39 source_mapping = {'describe': DescribeAlias, 'config': ConfigSource} 

40 

41 

42class DescribeKey(DescribeSource): 

43 

44 FetchThreshold = 10 # ie should we describe all keys or just fetch them directly 

45 

46 def get_resources(self, ids, cache=True): 

47 # this forms a threshold beyond which we'll fetch individual keys of interest. 

48 # else we'll need to fetch through the full set and client side filter. 

49 if len(ids) < self.FetchThreshold: 

50 client = local_session(self.manager.session_factory).client('kms') 

51 results = [] 

52 for rid in ids: 

53 try: 

54 results.append( 

55 self.manager.retry( 

56 client.describe_key, 

57 KeyId=rid)['KeyMetadata']) 

58 except client.exceptions.NotFoundException: 

59 continue 

60 return results 

61 return super().get_resources(ids, cache) 

62 

63 def augment(self, resources): 

64 client = local_session(self.manager.session_factory).client('kms') 

65 for r in resources: 

66 key_id = r.get('KeyId') 

67 

68 # We get `KeyArn` from list_keys and `Arn` from describe_key. 

69 # If we already have describe_key details we don't need to fetch 

70 # it again. 

71 if 'Arn' not in r: 

72 try: 

73 key_arn = r.get('KeyArn', key_id) 

74 key_detail = client.describe_key(KeyId=key_arn)['KeyMetadata'] 

75 r.update(key_detail) 

76 except ClientError as e: 

77 if e.response['Error']['Code'] == 'AccessDeniedException': 

78 self.manager.log.warning( 

79 "Access denied when describing key:%s", 

80 key_id) 

81 # If a describe fails, we still want the `Arn` key 

82 # available since it is a core attribute 

83 r['Arn'] = r['KeyArn'] 

84 else: 

85 raise 

86 

87 alias_names = self.manager.alias_map.get(key_id) 

88 if alias_names: 

89 r['AliasNames'] = alias_names 

90 

91 return universal_augment(self.manager, resources) 

92 

93 

94class ConfigKey(ConfigSource): 

95 

96 def load_resource(self, item): 

97 resource = super().load_resource(item) 

98 alias_names = self.manager.alias_map.get(resource[self.manager.resource_type.id]) 

99 if alias_names: 

100 resource['AliasNames'] = alias_names 

101 return resource 

102 

103 

104@resources.register('kms-key') 

105class Key(QueryResourceManager): 

106 

107 class resource_type(TypeInfo): 

108 service = 'kms' 

109 arn_type = "key" 

110 enum_spec = ('list_keys', 'Keys', None) 

111 detail_spec = ('describe_key', 'KeyId', 'Arn', 'KeyMetadata') # overriden 

112 name = id = "KeyId" 

113 arn = 'Arn' 

114 universal_taggable = True 

115 cfn_type = config_type = 'AWS::KMS::Key' 

116 permissions_augment = ("kms:ListResourceTags",) 

117 

118 source_mapping = { 

119 'config': ConfigKey, 

120 'describe': DescribeKey 

121 } 

122 

123 @property 

124 @lru_cache() 

125 def alias_map(self): 

126 """A dict mapping key IDs to aliases 

127 

128 Fetch key aliases as a flat list, and convert it to a map of 

129 key ID -> aliases. We can build this once and use it to 

130 augment key resources. 

131 """ 

132 aliases = KeyAlias(self.ctx, {}).resources() 

133 alias_map = defaultdict(list) 

134 for a in aliases: 

135 alias_map[a['TargetKeyId']].append(a['AliasName']) 

136 return alias_map 

137 

138 

139@Key.filter_registry.register('key-rotation-status') 

140class KeyRotationStatus(ValueFilter): 

141 """Filters KMS keys by the rotation status 

142 

143 :example: 

144 

145 .. code-block:: yaml 

146 

147 policies: 

148 - name: kms-key-disabled-rotation 

149 resource: kms-key 

150 filters: 

151 - type: key-rotation-status 

152 key: KeyRotationEnabled 

153 value: false 

154 """ 

155 

156 schema = type_schema('key-rotation-status', rinherit=ValueFilter.schema) 

157 schema_alias = False 

158 permissions = ('kms:GetKeyRotationStatus',) 

159 

160 def process(self, resources, event=None): 

161 client = local_session(self.manager.session_factory).client('kms') 

162 

163 def _key_rotation_status(resource): 

164 try: 

165 resource['KeyRotationEnabled'] = client.get_key_rotation_status( 

166 KeyId=resource['KeyId']) 

167 except ClientError as e: 

168 if e.response['Error']['Code'] == 'AccessDeniedException': 

169 self.log.warning( 

170 "Access denied when getting rotation status on key:%s", 

171 resource.get('KeyArn')) 

172 else: 

173 raise 

174 

175 with self.executor_factory(max_workers=2) as w: 

176 query_resources = [ 

177 r for r in resources if 'KeyRotationEnabled' not in r] 

178 self.log.debug( 

179 "Querying %d kms-keys' rotation status" % len(query_resources)) 

180 list(w.map(_key_rotation_status, query_resources)) 

181 

182 return [r for r in resources if self.match( 

183 r.get('KeyRotationEnabled', {}))] 

184 

185 

186class KMSPolicyChecker(PolicyChecker): 

187 # https://docs.aws.amazon.com/kms/latest/developerguide/policy-conditions.html#conditions-kms 

188 

189 def handle_kms_calleraccount(self, s, c): 

190 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

191 

192 def handle_kms_viaservice(self, s, c): 

193 # We dont filter on service so all are presumed allowed 

194 return False 

195 

196 def handle_kms_grantoperations(self, s, c): 

197 # We dont filter on GrantOperations so all are presumed allowed 

198 return False 

199 

200 

201@Key.filter_registry.register('cross-account') 

202@KeyAlias.filter_registry.register('cross-account') 

203class KMSCrossAccountAccessFilter(CrossAccountAccessFilter): 

204 """Filter KMS keys which have cross account permissions 

205 

206 :example: 

207 

208 .. code-block:: yaml 

209 

210 policies: 

211 - name: check-kms-key-cross-account 

212 resource: kms-key 

213 filters: 

214 - type: cross-account 

215 """ 

216 permissions = ('kms:GetKeyPolicy',) 

217 

218 checker_factory = KMSPolicyChecker 

219 

220 def process(self, resources, event=None): 

221 client = local_session( 

222 self.manager.session_factory).client('kms') 

223 

224 def _augment(r): 

225 key_id = r.get('TargetKeyId', r.get('KeyId')) 

226 assert key_id, "Invalid key resources %s" % r 

227 r['Policy'] = client.get_key_policy( 

228 KeyId=key_id, PolicyName='default')['Policy'] 

229 return r 

230 

231 self.log.debug("fetching policy for %d kms keys" % len(resources)) 

232 with self.executor_factory(max_workers=1) as w: 

233 resources = list(filter(None, w.map(_augment, resources))) 

234 

235 return super(KMSCrossAccountAccessFilter, self).process( 

236 resources, event) 

237 

238 

239@KeyAlias.filter_registry.register('grant-count') 

240class GrantCount(Filter): 

241 """Filters KMS key grants 

242 

243 This can be used to ensure issues around grant limits are monitored 

244 

245 :example: 

246 

247 .. code-block:: yaml 

248 

249 policies: 

250 - name: kms-grants 

251 resource: kms 

252 filters: 

253 - type: grant-count 

254 min: 100 

255 """ 

256 

257 schema = type_schema( 

258 'grant-count', min={'type': 'integer', 'minimum': 0}) 

259 permissions = ('kms:ListGrants',) 

260 

261 def process(self, keys, event=None): 

262 client = local_session(self.manager.session_factory).client('kms') 

263 results = [] 

264 for k in keys: 

265 results.append(self.process_key(client, k)) 

266 return [r for r in results if r] 

267 

268 def process_key(self, client, key): 

269 p = client.get_paginator('list_grants') 

270 p.PAGE_ITERATOR_CLS = RetryPageIterator 

271 grant_count = 0 

272 for rp in p.paginate(KeyId=key['TargetKeyId']): 

273 grant_count += len(rp['Grants']) 

274 key['GrantCount'] = grant_count 

275 

276 grant_threshold = self.data.get('min', 5) 

277 if grant_count < grant_threshold: 

278 return None 

279 

280 self.manager.ctx.metrics.put_metric( 

281 "ExtantGrants", grant_count, "Count", 

282 Scope=key['AliasName'][6:]) 

283 

284 return key 

285 

286 

287class ResourceKmsKeyAlias(ValueFilter): 

288 

289 schema = type_schema('kms-alias', rinherit=ValueFilter.schema) 

290 schema_alias = False 

291 

292 def get_permissions(self): 

293 return KeyAlias(self.manager.ctx, {}).get_permissions() 

294 

295 def get_matching_aliases(self, resources, event=None): 

296 key_aliases = KeyAlias(self.manager.ctx, {}).resources() 

297 key_aliases_dict = {a['TargetKeyId']: a for a in key_aliases} 

298 

299 matched = [] 

300 for r in resources: 

301 if r.get('KmsKeyId'): 

302 r['KeyAlias'] = key_aliases_dict.get( 

303 r.get('KmsKeyId').split("key/", 1)[-1]) 

304 if self.match(r.get('KeyAlias')): 

305 matched.append(r) 

306 return matched 

307 

308 

309@Key.action_registry.register('remove-statements') 

310@KeyAlias.action_registry.register('remove-statements') 

311class RemovePolicyStatement(RemovePolicyBase): 

312 """Action to remove policy statements from KMS 

313 

314 :example: 

315 

316 .. code-block:: yaml 

317 

318 policies: 

319 - name: kms-key-cross-account 

320 resource: kms-key 

321 filters: 

322 - type: cross-account 

323 actions: 

324 - type: remove-statements 

325 statement_ids: matched 

326 """ 

327 

328 permissions = ('kms:GetKeyPolicy', 'kms:PutKeyPolicy') 

329 

330 def process(self, resources): 

331 results = [] 

332 client = local_session(self.manager.session_factory).client('kms') 

333 for r in resources: 

334 key_id = r.get('TargetKeyId', r.get('KeyId')) 

335 assert key_id, "Invalid key resources %s" % r 

336 try: 

337 results += filter(None, [self.process_resource(client, r, key_id)]) 

338 except Exception: 

339 self.log.exception( 

340 "Error processing sns:%s", key_id) 

341 return results 

342 

343 def process_resource(self, client, resource, key_id): 

344 if 'Policy' not in resource: 

345 try: 

346 resource['Policy'] = client.get_key_policy( 

347 KeyId=key_id, PolicyName='default')['Policy'] 

348 except ClientError as e: 

349 if e.response['Error']['Code'] != "NotFoundException": 

350 raise 

351 resource['Policy'] = None 

352 

353 if not resource['Policy']: 

354 return 

355 

356 p = json.loads(resource['Policy']) 

357 _, found = self.process_policy( 

358 p, resource, CrossAccountAccessFilter.annotation_key) 

359 

360 if not found: 

361 return 

362 

363 # NB: KMS supports only one key policy 'default' 

364 # http://docs.aws.amazon.com/kms/latest/developerguide/programming-key-policies.html#list-policies 

365 client.put_key_policy( 

366 KeyId=key_id, 

367 PolicyName='default', 

368 Policy=json.dumps(p) 

369 ) 

370 

371 return {'Name': key_id, 

372 'State': 'PolicyRemoved', 

373 'Statements': found} 

374 

375 

376@Key.action_registry.register('set-rotation') 

377class KmsKeyRotation(BaseAction): 

378 """Toggle KMS key rotation 

379 

380 :example: 

381 

382 .. code-block:: yaml 

383 

384 policies: 

385 - name: enable-cmk-rotation 

386 resource: kms-key 

387 filters: 

388 - type: key-rotation-status 

389 key: KeyRotationEnabled 

390 value: False 

391 actions: 

392 - type: set-rotation 

393 state: True 

394 """ 

395 permissions = ('kms:EnableKeyRotation',) 

396 schema = type_schema('set-rotation', state={'type': 'boolean'}) 

397 

398 def process(self, keys): 

399 client = local_session(self.manager.session_factory).client('kms') 

400 for k in keys: 

401 if self.data.get('state', True): 

402 client.enable_key_rotation(KeyId=k['KeyId']) 

403 continue 

404 client.disable_key_rotation(KeyId=k['KeyId']) 

405 

406 

407@KeyAlias.action_registry.register('post-finding') 

408@Key.action_registry.register('post-finding') 

409class KmsPostFinding(PostFinding): 

410 

411 resource_type = 'AwsKmsKey' 

412 

413 def format_resource(self, r): 

414 if 'TargetKeyId' in r: 

415 resolved = self.manager.get_resource_manager( 

416 'kms-key').get_resources([r['TargetKeyId']]) 

417 if not resolved: 

418 return 

419 r = resolved[0] 

420 r[self.manager.resource_type.id] = r['KeyId'] 

421 envelope, payload = self.format_envelope(r) 

422 payload.update(self.filter_empty( 

423 select_keys(r, [ 

424 'AWSAccount', 'CreationDate', 'KeyId', 

425 'KeyManager', 'Origin', 'KeyState']))) 

426 

427 # Securityhub expects a unix timestamp for CreationDate 

428 if 'CreationDate' in payload and isinstance(payload['CreationDate'], datetime): 

429 payload['CreationDate'] = ( 

430 payload['CreationDate'].replace(tzinfo=timezone.utc).timestamp() 

431 ) 

432 

433 return envelope