Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/emr.py: 62%

239 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import logging 

4import time 

5import json 

6 

7from c7n.actions import ActionRegistry, BaseAction 

8from c7n.exceptions import PolicyValidationError 

9from c7n.filters import FilterRegistry, MetricsFilter, ValueFilter 

10from c7n.manager import resources 

11from c7n.query import QueryResourceManager, TypeInfo, ConfigSource, DescribeSource 

12from c7n.tags import universal_augment 

13from c7n.utils import ( 

14 local_session, type_schema, get_retry, jmespath_search) 

15from c7n.tags import ( 

16 TagDelayedAction, RemoveTag, TagActionFilter, Tag) 

17import c7n.filters.vpc as net_filters 

18 

19filters = FilterRegistry('emr.filters') 

20actions = ActionRegistry('emr.actions') 

21log = logging.getLogger('custodian.emr') 

22 

23filters.register('marked-for-op', TagActionFilter) 

24 

25 

26@resources.register('emr') 

27class EMRCluster(QueryResourceManager): 

28 """Resource manager for Elastic MapReduce clusters 

29 """ 

30 

31 class resource_type(TypeInfo): 

32 service = 'emr' 

33 arn_type = 'emr' 

34 permission_prefix = 'elasticmapreduce' 

35 default_cluster_states = ['WAITING', 'BOOTSTRAPPING', 'RUNNING', 'STARTING'] 

36 enum_spec = ('list_clusters', 'Clusters', None) 

37 name = 'Name' 

38 id = 'Id' 

39 date = "Status.Timeline.CreationDateTime" 

40 cfn_type = 'AWS::EMR::Cluster' 

41 

42 action_registry = actions 

43 filter_registry = filters 

44 retry = staticmethod(get_retry(('ThrottlingException',))) 

45 

46 def __init__(self, ctx, data): 

47 super(EMRCluster, self).__init__(ctx, data) 

48 self.queries = QueryFilter.parse( 

49 self.data.get('query', [])) 

50 

51 @classmethod 

52 def get_permissions(cls): 

53 return ("elasticmapreduce:ListClusters", 

54 "elasticmapreduce:DescribeCluster") 

55 

56 def get_resources(self, ids): 

57 # no filtering by id set supported at the api 

58 client = local_session(self.session_factory).client('emr') 

59 results = [] 

60 for jid in ids: 

61 results.append( 

62 client.describe_cluster(ClusterId=jid)['Cluster']) 

63 return results 

64 

65 def resources(self, query=None): 

66 q = self.consolidate_query_filter() 

67 if q is not None: 

68 query = query or {} 

69 for i in range(0, len(q)): 

70 query[q[i]['Name']] = q[i]['Values'] 

71 return super(EMRCluster, self).resources(query=query) 

72 

73 def consolidate_query_filter(self): 

74 result = [] 

75 names = set() 

76 # allow same name to be specified multiple times and append the queries 

77 # under the same name 

78 for q in self.queries: 

79 query_filter = q.query() 

80 if query_filter['Name'] in names: 

81 for filt in result: 

82 if query_filter['Name'] == filt['Name']: 

83 filt['Values'].extend(query_filter['Values']) 

84 else: 

85 names.add(query_filter['Name']) 

86 result.append(query_filter) 

87 if 'ClusterStates' not in names: 

88 # include default query 

89 result.append( 

90 { 

91 'Name': 'ClusterStates', 

92 'Values': self.resource_type.default_cluster_states 

93 } 

94 ) 

95 return result 

96 

97 def augment(self, resources): 

98 client = local_session( 

99 self.get_resource_manager('emr').session_factory).client('emr') 

100 result = [] 

101 # remap for cwmetrics 

102 for r in resources: 

103 cluster = self.retry( 

104 client.describe_cluster, ClusterId=r['Id'])['Cluster'] 

105 result.append(cluster) 

106 return result 

107 

108 

109@EMRCluster.filter_registry.register('metrics') 

110class EMRMetrics(MetricsFilter): 

111 

112 def get_dimensions(self, resource): 

113 # Job flow id is legacy name for cluster id 

114 return [{'Name': 'JobFlowId', 'Value': resource['Id']}] 

115 

116 

117@actions.register('mark-for-op') 

118class TagDelayedAction(TagDelayedAction): 

119 """Action to specify an action to occur at a later date 

120 

121 :example: 

122 

123 .. code-block:: yaml 

124 

125 policies: 

126 - name: emr-mark-for-op 

127 resource: emr 

128 filters: 

129 - "tag:Name": absent 

130 actions: 

131 - type: mark-for-op 

132 tag: custodian_cleanup 

133 op: terminate 

134 days: 4 

135 msg: "Cluster does not have required tags" 

136 """ 

137 

138 

139@actions.register('tag') 

140class TagTable(Tag): 

141 """Action to create tag(s) on a resource 

142 

143 :example: 

144 

145 .. code-block:: yaml 

146 

147 policies: 

148 - name: emr-tag-table 

149 resource: emr 

150 filters: 

151 - "tag:target-tag": absent 

152 actions: 

153 - type: tag 

154 key: target-tag 

155 value: target-tag-value 

156 """ 

157 

158 permissions = ('elasticmapreduce:AddTags',) 

159 batch_size = 1 

160 retry = staticmethod(get_retry(('ThrottlingException',))) 

161 

162 def process_resource_set(self, client, resources, tags): 

163 for r in resources: 

164 self.retry(client.add_tags, ResourceId=r['Id'], Tags=tags) 

165 

166 

167@actions.register('remove-tag') 

168class UntagTable(RemoveTag): 

169 """Action to remove tag(s) on a resource 

170 

171 :example: 

172 

173 .. code-block:: yaml 

174 

175 policies: 

176 - name: emr-remove-tag 

177 resource: emr 

178 filters: 

179 - "tag:target-tag": present 

180 actions: 

181 - type: remove-tag 

182 tags: ["target-tag"] 

183 """ 

184 

185 concurrency = 2 

186 batch_size = 5 

187 permissions = ('elasticmapreduce:RemoveTags',) 

188 

189 def process_resource_set(self, client, resources, tag_keys): 

190 for r in resources: 

191 client.remove_tags(ResourceId=r['Id'], TagKeys=tag_keys) 

192 

193 

194@actions.register('terminate') 

195class Terminate(BaseAction): 

196 """Action to terminate EMR cluster(s) 

197 

198 It is recommended to apply a filter to the terminate action to avoid 

199 termination of all EMR clusters 

200 

201 :example: 

202 

203 .. code-block:: yaml 

204 

205 policies: 

206 - name: emr-terminate 

207 resource: emr 

208 query: 

209 - ClusterStates: [STARTING, BOOTSTRAPPING, RUNNING, WAITING] 

210 actions: 

211 - terminate 

212 """ 

213 

214 schema = type_schema('terminate', force={'type': 'boolean'}) 

215 permissions = ("elasticmapreduce:TerminateJobFlows",) 

216 delay = 5 

217 

218 def process(self, emrs): 

219 client = local_session(self.manager.session_factory).client('emr') 

220 cluster_ids = [emr['Id'] for emr in emrs] 

221 if self.data.get('force'): 

222 client.set_termination_protection( 

223 JobFlowIds=cluster_ids, TerminationProtected=False) 

224 time.sleep(self.delay) 

225 client.terminate_job_flows(JobFlowIds=cluster_ids) 

226 self.log.info("Deleted emrs: %s", cluster_ids) 

227 return emrs 

228 

229 

230# Valid EMR Query Filters 

231EMR_VALID_FILTERS = {'CreatedAfter', 'CreatedBefore', 'ClusterStates'} 

232 

233 

234class QueryFilter: 

235 

236 @classmethod 

237 def parse(cls, data): 

238 results = [] 

239 for d in data: 

240 if not isinstance(d, dict): 

241 raise PolicyValidationError( 

242 "EMR Query Filter Invalid structure %s" % d) 

243 results.append(cls(d).validate()) 

244 return results 

245 

246 def __init__(self, data): 

247 self.data = data 

248 self.key = None 

249 self.value = None 

250 

251 def validate(self): 

252 if not len(list(self.data.keys())) == 1: 

253 raise PolicyValidationError( 

254 "EMR Query Filter Invalid %s" % self.data) 

255 self.key = list(self.data.keys())[0] 

256 self.value = list(self.data.values())[0] 

257 

258 if self.key not in EMR_VALID_FILTERS and not self.key.startswith( 

259 'tag:'): 

260 raise PolicyValidationError( 

261 "EMR Query Filter invalid filter name %s" % (self.data)) 

262 

263 if self.value is None: 

264 raise PolicyValidationError( 

265 "EMR Query Filters must have a value, use tag-key" 

266 " w/ tag name as value for tag present checks" 

267 " %s" % self.data) 

268 return self 

269 

270 def query(self): 

271 value = self.value 

272 if isinstance(self.value, str): 

273 value = [self.value] 

274 

275 return {'Name': self.key, 'Values': value} 

276 

277 

278@filters.register('subnet') 

279class SubnetFilter(net_filters.SubnetFilter): 

280 

281 RelatedIdsExpression = "Ec2InstanceAttributes.RequestedEc2SubnetIds[]" 

282 

283 

284@filters.register('security-group') 

285class SecurityGroupFilter(net_filters.SecurityGroupFilter): 

286 

287 RelatedIdsExpression = "" 

288 expressions = ('Ec2InstanceAttributes.EmrManagedMasterSecurityGroup', 

289 'Ec2InstanceAttributes.EmrManagedSlaveSecurityGroup', 

290 'Ec2InstanceAttributes.ServiceAccessSecurityGroup', 

291 'Ec2InstanceAttributes.AdditionalMasterSecurityGroups[]', 

292 'Ec2InstanceAttributes.AdditionalSlaveSecurityGroups[]') 

293 

294 def get_related_ids(self, resources): 

295 sg_ids = set() 

296 for r in resources: 

297 for exp in self.expressions: 

298 ids = jmespath_search(exp, r) 

299 if isinstance(ids, list): 

300 sg_ids.update(tuple(ids)) 

301 elif isinstance(ids, str): 

302 sg_ids.add(ids) 

303 return list(sg_ids) 

304 

305 

306filters.register('network-location', net_filters.NetworkLocation) 

307 

308 

309@filters.register('security-configuration') 

310class EMRSecurityConfigurationFilter(ValueFilter): 

311 """Filter for annotate security configuration and 

312 filter based on its attributes. 

313 

314 :example: 

315 

316 .. code-block:: yaml 

317 

318 policies: 

319 - name: emr-security-configuration 

320 resource: emr 

321 filters: 

322 - type: security-configuration 

323 key: EnableAtRestEncryption 

324 value: true 

325 

326 """ 

327 annotation_key = 'c7n:SecurityConfiguration' 

328 permissions = ("elasticmapreduce:ListSecurityConfigurations", 

329 "elasticmapreduce:DescribeSecurityConfiguration",) 

330 schema = type_schema('security-configuration', rinherit=ValueFilter.schema) 

331 schema_alias = False 

332 

333 def process(self, resources, event=None): 

334 results = [] 

335 emr_sec_cfgs = { 

336 cfg['Name']: cfg for cfg in self.manager.get_resource_manager( 

337 'emr-security-configuration').resources()} 

338 for r in resources: 

339 if 'SecurityConfiguration' not in r: 

340 continue 

341 cfg = emr_sec_cfgs.get(r['SecurityConfiguration'], {}).get('SecurityConfiguration', {}) 

342 if self.match(cfg): 

343 r[self.annotation_key] = cfg 

344 results.append(r) 

345 return results 

346 

347 

348@resources.register('emr-security-configuration') 

349class EMRSecurityConfiguration(QueryResourceManager): 

350 """Resource manager for EMR Security Configuration 

351 """ 

352 

353 class resource_type(TypeInfo): 

354 service = 'emr' 

355 arn_type = 'emr' 

356 permission_prefix = 'elasticmapreduce' 

357 enum_spec = ('list_security_configurations', 'SecurityConfigurations', None) 

358 detail_spec = ('describe_security_configuration', 'Name', 'Name', None) 

359 id = name = 'Name' 

360 cfn_type = 'AWS::EMR::SecurityConfiguration' 

361 

362 permissions = ('elasticmapreduce:ListSecurityConfigurations', 

363 'elasticmapreduce:DescribeSecurityConfiguration',) 

364 

365 def augment(self, resources): 

366 resources = super().augment(resources) 

367 for r in resources: 

368 r['SecurityConfiguration'] = json.loads(r['SecurityConfiguration']) 

369 return resources 

370 

371 

372@EMRSecurityConfiguration.action_registry.register('delete') 

373class DeleteEMRSecurityConfiguration(BaseAction): 

374 

375 schema = type_schema('delete') 

376 permissions = ('elasticmapreduce:DeleteSecurityConfiguration',) 

377 

378 def process(self, resources): 

379 client = local_session(self.manager.session_factory).client('emr') 

380 for r in resources: 

381 try: 

382 client.delete_security_configuration(Name=r['Name']) 

383 except client.exceptions.EntityNotFoundException: 

384 continue 

385 

386 

387class DescribeEMRServerlessApp(DescribeSource): 

388 

389 def augment(self, resources): 

390 return universal_augment( 

391 self.manager, 

392 super().augment(resources)) 

393 

394 

395@resources.register('emr-serverless-app') 

396class EMRServerless(QueryResourceManager): 

397 """Resource manager for Elastic MapReduce Serverless Application 

398 """ 

399 

400 class resource_type(TypeInfo): 

401 service = 'emr-serverless' 

402 enum_spec = ('list_applications', 'applications', None) 

403 arn = 'arn' 

404 arn_type = '/applications' 

405 name = 'name' 

406 id = 'id' 

407 date = "createdAt" 

408 cfn_type = 'AWS::EMRServerless::Application' 

409 

410 source_mapping = { 

411 'describe': DescribeEMRServerlessApp, 

412 'config': ConfigSource 

413 } 

414 

415 

416EMRServerless.action_registry.register('mark-for-op', TagDelayedAction) 

417EMRServerless.filter_registry.register('marked-for-op', TagActionFilter) 

418 

419 

420@EMRServerless.action_registry.register('tag') 

421class EMRServerlessTag(Tag): 

422 """Action to create tag(s) on EMR-Serverless 

423 

424 :example: 

425 

426 .. code-block:: yaml 

427 

428 policies: 

429 - name: tag-emr-serverless 

430 resource: emr-serverless-app 

431 filters: 

432 - "tag:target-tag": absent 

433 actions: 

434 - type: tag 

435 key: target-tag 

436 value: target-tag-value 

437 """ 

438 

439 permissions = ('emr-serverless:TagResource',) 

440 

441 def process_resource_set(self, client, resource_set, tags): 

442 Tags = {r['Key']: r['Value'] for r in tags} 

443 for r in resource_set: 

444 client.tag_resource(resourceArn=r['arn'], tags=Tags) 

445 

446 

447@EMRServerless.action_registry.register("remove-tag") 

448class EMRServerlessRemoveTag(RemoveTag): 

449 """Action to create tag(s) on EMR-Serverless 

450 

451 :example: 

452 

453 .. code-block:: yaml 

454 

455 policies: 

456 - name: untag-emr-serverless 

457 resource: emr-serverless-app 

458 filters: 

459 - "tag:target-tag": present 

460 actions: 

461 - type: remove-tag 

462 tags: ["target-tag"] 

463 """ 

464 permissions = ('emr-serverless:UntagResource',) 

465 

466 def process_resource_set(self, client, resource_set, tags): 

467 for r in resource_set: 

468 client.untag_resource(resourceArn=r['arn'], tagKeys=tags) 

469 

470 

471@EMRServerless.action_registry.register("delete") 

472class EMRServerlessDelete(BaseAction): 

473 """Deletes an EMRServerless application 

474 :example: 

475 

476 .. code-block:: yaml 

477 

478 policies: 

479 - name: delete-emr-serverless-app 

480 resource: emr-serverless-app 

481 actions: 

482 - type: delete 

483 """ 

484 schema = type_schema('delete') 

485 permissions = ('emr-serverless:DeleteApplication',) 

486 

487 def process(self, resources): 

488 client = local_session(self.manager.session_factory).client('emr-serverless') 

489 for r in resources: 

490 try: 

491 client.delete_application( 

492 applicationId=r['id'] 

493 ) 

494 except client.exceptions.ResourceNotFoundException: 

495 continue