Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/emr.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

200 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import logging 

4import time 

5import json 

6 

7from c7n.actions import ActionRegistry, BaseAction 

8from c7n.filters import FilterRegistry, MetricsFilter, ValueFilter 

9from c7n.manager import resources 

10from c7n.query import QueryResourceManager, TypeInfo, ConfigSource, DescribeSource 

11from c7n.tags import universal_augment 

12from c7n.utils import ( 

13 local_session, type_schema, get_retry, jmespath_search, QueryParser) 

14from c7n.tags import ( 

15 TagDelayedAction, RemoveTag, TagActionFilter, Tag) 

16import c7n.filters.vpc as net_filters 

17 

18filters = FilterRegistry('emr.filters') 

19actions = ActionRegistry('emr.actions') 

20log = logging.getLogger('custodian.emr') 

21 

22filters.register('marked-for-op', TagActionFilter) 

23 

24 

25@resources.register('emr') 

26class EMRCluster(QueryResourceManager): 

27 """Resource manager for Elastic MapReduce clusters 

28 """ 

29 

30 class resource_type(TypeInfo): 

31 service = 'emr' 

32 arn_type = 'emr' 

33 permission_prefix = 'elasticmapreduce' 

34 default_cluster_states = ['WAITING', 'BOOTSTRAPPING', 'RUNNING', 'STARTING'] 

35 enum_spec = ('list_clusters', 'Clusters', None) 

36 name = 'Name' 

37 id = 'Id' 

38 date = "Status.Timeline.CreationDateTime" 

39 cfn_type = 'AWS::EMR::Cluster' 

40 

41 action_registry = actions 

42 filter_registry = filters 

43 retry = staticmethod(get_retry(('ThrottlingException',))) 

44 

45 def __init__(self, ctx, data): 

46 super(EMRCluster, self).__init__(ctx, data) 

47 self.queries = EMRQueryParser.parse( 

48 self.data.get('query', [])) 

49 

50 @classmethod 

51 def get_permissions(cls): 

52 return ("elasticmapreduce:ListClusters", 

53 "elasticmapreduce:DescribeCluster") 

54 

55 def get_resources(self, ids): 

56 # no filtering by id set supported at the api 

57 client = local_session(self.session_factory).client('emr') 

58 results = [] 

59 for jid in ids: 

60 results.append( 

61 client.describe_cluster(ClusterId=jid)['Cluster']) 

62 return results 

63 

64 def resources(self, query=None): 

65 query = query or {} 

66 for q in self.queries: 

67 query.update(q) 

68 if 'ClusterStates' not in query: 

69 query['ClusterStates'] = self.resource_type.default_cluster_states 

70 return super(EMRCluster, self).resources(query=query) 

71 

72 def augment(self, resources): 

73 client = local_session( 

74 self.get_resource_manager('emr').session_factory).client('emr') 

75 result = [] 

76 # remap for cwmetrics 

77 for r in resources: 

78 cluster = self.retry( 

79 client.describe_cluster, ClusterId=r['Id'])['Cluster'] 

80 result.append(cluster) 

81 return result 

82 

83 

84@EMRCluster.filter_registry.register('metrics') 

85class EMRMetrics(MetricsFilter): 

86 

87 def get_dimensions(self, resource): 

88 # Job flow id is legacy name for cluster id 

89 return [{'Name': 'JobFlowId', 'Value': resource['Id']}] 

90 

91 

92@actions.register('mark-for-op') 

93class TagDelayedAction(TagDelayedAction): 

94 """Action to specify an action to occur at a later date 

95 

96 :example: 

97 

98 .. code-block:: yaml 

99 

100 policies: 

101 - name: emr-mark-for-op 

102 resource: emr 

103 filters: 

104 - "tag:Name": absent 

105 actions: 

106 - type: mark-for-op 

107 tag: custodian_cleanup 

108 op: terminate 

109 days: 4 

110 msg: "Cluster does not have required tags" 

111 """ 

112 

113 

114@actions.register('tag') 

115class TagTable(Tag): 

116 """Action to create tag(s) on a resource 

117 

118 :example: 

119 

120 .. code-block:: yaml 

121 

122 policies: 

123 - name: emr-tag-table 

124 resource: emr 

125 filters: 

126 - "tag:target-tag": absent 

127 actions: 

128 - type: tag 

129 key: target-tag 

130 value: target-tag-value 

131 """ 

132 

133 permissions = ('elasticmapreduce:AddTags',) 

134 batch_size = 1 

135 retry = staticmethod(get_retry(('ThrottlingException',))) 

136 

137 def process_resource_set(self, client, resources, tags): 

138 for r in resources: 

139 self.retry(client.add_tags, ResourceId=r['Id'], Tags=tags) 

140 

141 

142@actions.register('remove-tag') 

143class UntagTable(RemoveTag): 

144 """Action to remove tag(s) on a resource 

145 

146 :example: 

147 

148 .. code-block:: yaml 

149 

150 policies: 

151 - name: emr-remove-tag 

152 resource: emr 

153 filters: 

154 - "tag:target-tag": present 

155 actions: 

156 - type: remove-tag 

157 tags: ["target-tag"] 

158 """ 

159 

160 concurrency = 2 

161 batch_size = 5 

162 permissions = ('elasticmapreduce:RemoveTags',) 

163 

164 def process_resource_set(self, client, resources, tag_keys): 

165 for r in resources: 

166 client.remove_tags(ResourceId=r['Id'], TagKeys=tag_keys) 

167 

168 

169@actions.register('terminate') 

170class Terminate(BaseAction): 

171 """Action to terminate EMR cluster(s) 

172 

173 It is recommended to apply a filter to the terminate action to avoid 

174 termination of all EMR clusters 

175 

176 :example: 

177 

178 .. code-block:: yaml 

179 

180 policies: 

181 - name: emr-terminate 

182 resource: emr 

183 query: 

184 - ClusterStates: [STARTING, BOOTSTRAPPING, RUNNING, WAITING] 

185 actions: 

186 - terminate 

187 """ 

188 

189 schema = type_schema('terminate', force={'type': 'boolean'}) 

190 permissions = ("elasticmapreduce:TerminateJobFlows",) 

191 delay = 5 

192 

193 def process(self, emrs): 

194 client = local_session(self.manager.session_factory).client('emr') 

195 cluster_ids = [emr['Id'] for emr in emrs] 

196 if self.data.get('force'): 

197 client.set_termination_protection( 

198 JobFlowIds=cluster_ids, TerminationProtected=False) 

199 time.sleep(self.delay) 

200 client.terminate_job_flows(JobFlowIds=cluster_ids) 

201 self.log.info("Deleted emrs: %s", cluster_ids) 

202 return emrs 

203 

204 

205class EMRQueryParser(QueryParser): 

206 QuerySchema = { 

207 'ClusterStates': 

208 ('STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING', 'TERMINATING', 'TERMINATED', 

209 'TERMINATED_WITH_ERRORS',), 

210 'CreatedBefore': 'date', 

211 'CreatedAfter': 'date', 

212 } 

213 single_value_fields = ('CreatedBefore', 'CreatedAfter') 

214 

215 type_name = "EMR" 

216 

217 

218@filters.register('subnet') 

219class SubnetFilter(net_filters.SubnetFilter): 

220 

221 RelatedIdsExpression = "Ec2InstanceAttributes.RequestedEc2SubnetIds[]" 

222 

223 

224@filters.register('security-group') 

225class SecurityGroupFilter(net_filters.SecurityGroupFilter): 

226 

227 RelatedIdsExpression = "" 

228 expressions = ('Ec2InstanceAttributes.EmrManagedMasterSecurityGroup', 

229 'Ec2InstanceAttributes.EmrManagedSlaveSecurityGroup', 

230 'Ec2InstanceAttributes.ServiceAccessSecurityGroup', 

231 'Ec2InstanceAttributes.AdditionalMasterSecurityGroups[]', 

232 'Ec2InstanceAttributes.AdditionalSlaveSecurityGroups[]') 

233 

234 def get_related_ids(self, resources): 

235 sg_ids = set() 

236 for r in resources: 

237 for exp in self.expressions: 

238 ids = jmespath_search(exp, r) 

239 if isinstance(ids, list): 

240 sg_ids.update(tuple(ids)) 

241 elif isinstance(ids, str): 

242 sg_ids.add(ids) 

243 return list(sg_ids) 

244 

245 

246filters.register('network-location', net_filters.NetworkLocation) 

247 

248 

249@filters.register('security-configuration') 

250class EMRSecurityConfigurationFilter(ValueFilter): 

251 """Filter for annotate security configuration and 

252 filter based on its attributes. 

253 

254 :example: 

255 

256 .. code-block:: yaml 

257 

258 policies: 

259 - name: emr-security-configuration 

260 resource: emr 

261 filters: 

262 - type: security-configuration 

263 key: EnableAtRestEncryption 

264 value: true 

265 

266 """ 

267 annotation_key = 'c7n:SecurityConfiguration' 

268 permissions = ("elasticmapreduce:ListSecurityConfigurations", 

269 "elasticmapreduce:DescribeSecurityConfiguration",) 

270 schema = type_schema('security-configuration', rinherit=ValueFilter.schema) 

271 schema_alias = False 

272 

273 def process(self, resources, event=None): 

274 results = [] 

275 emr_sec_cfgs = { 

276 cfg['Name']: cfg for cfg in self.manager.get_resource_manager( 

277 'emr-security-configuration').resources()} 

278 for r in resources: 

279 if 'SecurityConfiguration' not in r: 

280 continue 

281 cfg = emr_sec_cfgs.get(r['SecurityConfiguration'], {}).get('SecurityConfiguration', {}) 

282 if self.match(cfg): 

283 r[self.annotation_key] = cfg 

284 results.append(r) 

285 return results 

286 

287 

288@resources.register('emr-security-configuration') 

289class EMRSecurityConfiguration(QueryResourceManager): 

290 """Resource manager for EMR Security Configuration 

291 """ 

292 

293 class resource_type(TypeInfo): 

294 service = 'emr' 

295 arn_type = 'emr' 

296 permission_prefix = 'elasticmapreduce' 

297 enum_spec = ('list_security_configurations', 'SecurityConfigurations', None) 

298 detail_spec = ('describe_security_configuration', 'Name', 'Name', None) 

299 id = name = 'Name' 

300 cfn_type = 'AWS::EMR::SecurityConfiguration' 

301 

302 permissions = ('elasticmapreduce:ListSecurityConfigurations', 

303 'elasticmapreduce:DescribeSecurityConfiguration',) 

304 

305 def augment(self, resources): 

306 resources = super().augment(resources) 

307 for r in resources: 

308 r['SecurityConfiguration'] = json.loads(r['SecurityConfiguration']) 

309 return resources 

310 

311 

312@EMRSecurityConfiguration.action_registry.register('delete') 

313class DeleteEMRSecurityConfiguration(BaseAction): 

314 

315 schema = type_schema('delete') 

316 permissions = ('elasticmapreduce:DeleteSecurityConfiguration',) 

317 

318 def process(self, resources): 

319 client = local_session(self.manager.session_factory).client('emr') 

320 for r in resources: 

321 try: 

322 client.delete_security_configuration(Name=r['Name']) 

323 except client.exceptions.EntityNotFoundException: 

324 continue 

325 

326 

327class DescribeEMRServerlessApp(DescribeSource): 

328 

329 def augment(self, resources): 

330 return universal_augment( 

331 self.manager, 

332 super().augment(resources)) 

333 

334 

335@resources.register('emr-serverless-app') 

336class EMRServerless(QueryResourceManager): 

337 """Resource manager for Elastic MapReduce Serverless Application 

338 """ 

339 

340 class resource_type(TypeInfo): 

341 service = 'emr-serverless' 

342 enum_spec = ('list_applications', 'applications', None) 

343 arn = 'arn' 

344 arn_type = '/applications' 

345 name = 'name' 

346 id = 'id' 

347 date = "createdAt" 

348 cfn_type = 'AWS::EMRServerless::Application' 

349 

350 source_mapping = { 

351 'describe': DescribeEMRServerlessApp, 

352 'config': ConfigSource 

353 } 

354 

355 

356EMRServerless.action_registry.register('mark-for-op', TagDelayedAction) 

357EMRServerless.filter_registry.register('marked-for-op', TagActionFilter) 

358 

359 

360@EMRServerless.action_registry.register('tag') 

361class EMRServerlessTag(Tag): 

362 """Action to create tag(s) on EMR-Serverless 

363 

364 :example: 

365 

366 .. code-block:: yaml 

367 

368 policies: 

369 - name: tag-emr-serverless 

370 resource: emr-serverless-app 

371 filters: 

372 - "tag:target-tag": absent 

373 actions: 

374 - type: tag 

375 key: target-tag 

376 value: target-tag-value 

377 """ 

378 

379 permissions = ('emr-serverless:TagResource',) 

380 

381 def process_resource_set(self, client, resource_set, tags): 

382 Tags = {r['Key']: r['Value'] for r in tags} 

383 for r in resource_set: 

384 client.tag_resource(resourceArn=r['arn'], tags=Tags) 

385 

386 

387@EMRServerless.action_registry.register("remove-tag") 

388class EMRServerlessRemoveTag(RemoveTag): 

389 """Action to create tag(s) on EMR-Serverless 

390 

391 :example: 

392 

393 .. code-block:: yaml 

394 

395 policies: 

396 - name: untag-emr-serverless 

397 resource: emr-serverless-app 

398 filters: 

399 - "tag:target-tag": present 

400 actions: 

401 - type: remove-tag 

402 tags: ["target-tag"] 

403 """ 

404 permissions = ('emr-serverless:UntagResource',) 

405 

406 def process_resource_set(self, client, resource_set, tags): 

407 for r in resource_set: 

408 client.untag_resource(resourceArn=r['arn'], tagKeys=tags) 

409 

410 

411@EMRServerless.action_registry.register("delete") 

412class EMRServerlessDelete(BaseAction): 

413 """Deletes an EMRServerless application 

414 :example: 

415 

416 .. code-block:: yaml 

417 

418 policies: 

419 - name: delete-emr-serverless-app 

420 resource: emr-serverless-app 

421 actions: 

422 - type: delete 

423 """ 

424 schema = type_schema('delete') 

425 permissions = ('emr-serverless:DeleteApplication',) 

426 

427 def process(self, resources): 

428 client = local_session(self.manager.session_factory).client('emr-serverless') 

429 for r in resources: 

430 try: 

431 client.delete_application( 

432 applicationId=r['id'] 

433 ) 

434 except client.exceptions.ResourceNotFoundException: 

435 continue