Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/elasticsearch.py: 40%

336 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import json 

4import time 

5from collections import defaultdict 

6 

7from c7n.actions import Action, BaseAction, ModifyVpcSecurityGroupsAction, RemovePolicyBase 

8from c7n.filters import MetricsFilter, CrossAccountAccessFilter, ValueFilter 

9from c7n.exceptions import PolicyValidationError 

10from c7n.filters.vpc import SecurityGroupFilter, SubnetFilter, VpcFilter, Filter 

11from c7n.manager import resources 

12from c7n.query import ConfigSource, DescribeSource, QueryResourceManager, TypeInfo 

13from c7n.utils import chunks, local_session, type_schema, merge_dict_list, jmespath_search 

14from c7n.tags import Tag, RemoveTag, TagActionFilter, TagDelayedAction 

15from c7n.filters.kms import KmsRelatedFilter 

16import c7n.filters.policystatement as polstmt_filter 

17 

18from .securityhub import PostFinding 

19 

20 

21class DescribeDomain(DescribeSource): 

22 

23 def get_resources(self, resource_ids): 

24 # augment will turn these into resource dictionaries 

25 return resource_ids 

26 

27 def augment(self, domains): 

28 client = local_session(self.manager.session_factory).client('es') 

29 model = self.manager.get_model() 

30 results = [] 

31 

32 def _augment(resource_set): 

33 resources = self.manager.retry( 

34 client.describe_elasticsearch_domains, 

35 DomainNames=resource_set)['DomainStatusList'] 

36 for r in resources: 

37 rarn = self.manager.generate_arn(r[model.id]) 

38 r['Tags'] = self.manager.retry( 

39 client.list_tags, ARN=rarn).get('TagList', []) 

40 return resources 

41 

42 for resource_set in chunks(domains, 5): 

43 results.extend(_augment(resource_set)) 

44 

45 return results 

46 

47 

48@resources.register('elasticsearch') 

49class ElasticSearchDomain(QueryResourceManager): 

50 

51 class resource_type(TypeInfo): 

52 service = 'es' 

53 arn = 'ARN' 

54 arn_type = 'domain' 

55 enum_spec = ( 

56 'list_domain_names', 'DomainNames[].DomainName', None) 

57 id = 'DomainName' 

58 name = 'Name' 

59 dimension = "DomainName" 

60 cfn_type = config_type = 'AWS::Elasticsearch::Domain' 

61 

62 def resources(self, query=None): 

63 if 'query' in self.data: 

64 query = merge_dict_list(self.data['query']) 

65 elif query is None: 

66 query = {} 

67 return super(ElasticSearchDomain, self).resources(query=query) 

68 

69 source_mapping = { 

70 'describe': DescribeDomain, 

71 'config': ConfigSource 

72 } 

73 

74 

75ElasticSearchDomain.filter_registry.register('marked-for-op', TagActionFilter) 

76 

77 

78@ElasticSearchDomain.filter_registry.register('subnet') 

79class Subnet(SubnetFilter): 

80 

81 RelatedIdsExpression = "VPCOptions.SubnetIds[]" 

82 

83 

84@ElasticSearchDomain.filter_registry.register('security-group') 

85class SecurityGroup(SecurityGroupFilter): 

86 

87 RelatedIdsExpression = "VPCOptions.SecurityGroupIds[]" 

88 

89 

90@ElasticSearchDomain.filter_registry.register('vpc') 

91class Vpc(VpcFilter): 

92 

93 RelatedIdsExpression = "VPCOptions.VPCId" 

94 

95 

96@ElasticSearchDomain.filter_registry.register('metrics') 

97class Metrics(MetricsFilter): 

98 

99 def get_dimensions(self, resource): 

100 return [{'Name': 'ClientId', 

101 'Value': self.manager.account_id}, 

102 {'Name': 'DomainName', 

103 'Value': resource['DomainName']}] 

104 

105 

106@ElasticSearchDomain.filter_registry.register('kms-key') 

107class KmsFilter(KmsRelatedFilter): 

108 

109 RelatedIdsExpression = 'EncryptionAtRestOptions.KmsKeyId' 

110 

111 

112@ElasticSearchDomain.filter_registry.register('cross-account') 

113class ElasticSearchCrossAccountAccessFilter(CrossAccountAccessFilter): 

114 """ 

115 Filter to return all elasticsearch domains with cross account access permissions 

116 

117 :example: 

118 

119 .. code-block:: yaml 

120 

121 policies: 

122 - name: check-elasticsearch-cross-account 

123 resource: aws.elasticsearch 

124 filters: 

125 - type: cross-account 

126 """ 

127 policy_attribute = 'c7n:Policy' 

128 permissions = ('es:DescribeElasticsearchDomainConfig',) 

129 

130 def process(self, resources, event=None): 

131 client = local_session(self.manager.session_factory).client('es') 

132 for r in resources: 

133 if self.policy_attribute not in r: 

134 result = self.manager.retry( 

135 client.describe_elasticsearch_domain_config, 

136 DomainName=r['DomainName'], 

137 ignore_err_codes=('ResourceNotFoundException',)) 

138 if result: 

139 options = result.get('DomainConfig').get('AccessPolicies').get('Options') 

140 r[self.policy_attribute] = options and json.loads(options) or None 

141 return super().process(resources) 

142 

143 

144@ElasticSearchDomain.filter_registry.register('cross-cluster') 

145class ElasticSearchCrossClusterFilter(Filter): 

146 """ 

147 Filter to return all elasticsearch domains with inbound cross-cluster with the given info 

148 

149 :example: 

150 

151 .. code-block:: yaml 

152 

153 policies: 

154 - name: check-elasticsearch-cross-cluster 

155 resource: aws.elasticsearch 

156 filters: 

157 - type: cross-cluster 

158 inbound: 

159 key: SourceDomainInfo.OwnerId 

160 op: eq 

161 value: '123456789' 

162 outbound: 

163 key: SourceDomainInfo.OwnerId 

164 op: eq 

165 value: '123456789' 

166 """ 

167 schema = type_schema(type_name="cross-cluster", 

168 inbound=type_schema(type_name='inbound', 

169 required=('key', 'value'), 

170 rinherit=ValueFilter.schema), 

171 outbound=type_schema(type_name='outbound', 

172 required=('key', 'value'), 

173 rinherit=ValueFilter.schema),) 

174 schema_alias = False 

175 annotation_key = "c7n:SearchConnections" 

176 matched_key = "c7n:MatchedConnections" 

177 annotate = False 

178 permissions = ('es:ESCrossClusterGet',) 

179 

180 def process(self, resources, event=None): 

181 client = local_session(self.manager.session_factory).client('es') 

182 results = [] 

183 for r in resources: 

184 if self.annotation_key not in r: 

185 r[self.annotation_key] = {} 

186 if "inbound" in self.data: 

187 inbound = self.manager.retry( 

188 client.describe_inbound_cross_cluster_search_connections, 

189 Filters=[{'Name': 'destination-domain-info.domain-name', 

190 'Values': [r['DomainName']]}]) 

191 inbound.pop('ResponseMetadata') 

192 r[self.annotation_key]["inbound"] = inbound 

193 if "outbound" in self.data: 

194 outbound = self.manager.retry( 

195 client.describe_outbound_cross_cluster_search_connections, 

196 Filters=[{'Name': 'source-domain-info.domain-name', 

197 'Values': [r['DomainName']]}]) 

198 outbound.pop('ResponseMetadata') 

199 r[self.annotation_key]["outbound"] = outbound 

200 matchFound = False 

201 r[self.matched_key] = {} 

202 for direction in r[self.annotation_key]: 

203 matcher = self.data.get(direction) 

204 valueFilter = ValueFilter(matcher) 

205 valueFilter.annotate = False 

206 matched = [] 

207 for conn in r[self.annotation_key][direction]['CrossClusterSearchConnections']: 

208 if valueFilter(conn): 

209 matched.append(conn) 

210 matchFound = True 

211 r[self.matched_key][direction] = matched 

212 if matchFound: 

213 results.append(r) 

214 return results 

215 

216 

217@ElasticSearchDomain.filter_registry.register('has-statement') 

218class HasStatementFilter(polstmt_filter.HasStatementFilter): 

219 def __init__(self, data, manager=None): 

220 super().__init__(data, manager) 

221 self.policy_attribute = 'AccessPolicies' 

222 

223 def get_std_format_args(self, domain): 

224 return { 

225 'domain_arn': domain['ARN'], 

226 'account_id': self.manager.config.account_id, 

227 'region': self.manager.config.region 

228 } 

229 

230 

231@ElasticSearchDomain.filter_registry.register('source-ip') 

232class SourceIP(Filter): 

233 """ValueFilter-based filter for verifying allowed source ips in 

234 an ElasticSearch domain's access policy. Useful for checking to see if 

235 an ElasticSearch domain allows traffic from non approved IP addresses/CIDRs. 

236 

237 :example: 

238 

239 Find ElasticSearch domains that allow traffic from IP addresses 

240 not in the approved list (string matching) 

241 

242 .. code-block: yaml 

243 

244 - type: source-ip 

245 op: not-in 

246 value: ["103.15.250.0/24", "173.240.160.0/21", "206.108.40.0/21"] 

247 

248 Same as above but using cidr matching instead of string matching 

249 

250 .. code-block: yaml 

251 

252 - type: source-ip 

253 op: not-in 

254 value_type: cidr 

255 value: ["103.15.250.0/24", "173.240.160.0/21", "206.108.40.0/21"] 

256 

257 """ 

258 schema = type_schema('source-ip', rinherit=ValueFilter.schema) 

259 permissions = ("es:DescribeElasticsearchDomainConfig",) 

260 annotation = 'c7n:MatchedSourceIps' 

261 

262 def __call__(self, resource): 

263 es_access_policy = resource.get('AccessPolicies') 

264 matched = [] 

265 source_ips = self.get_source_ip_perms(json.loads(es_access_policy)) 

266 if not self.data.get('key'): 

267 self.data['key'] = 'SourceIp' 

268 vf = ValueFilter(self.data, self.manager) 

269 vf.annotate = False 

270 for source_ip in source_ips: 

271 found = vf(source_ip) 

272 if found: 

273 matched.append(source_ip) 

274 

275 if matched: 

276 resource[self.annotation] = matched 

277 return True 

278 return False 

279 

280 def get_source_ip_perms(self, es_access_policy): 

281 """Get SourceIps from the original access policy 

282 """ 

283 ip_perms = [] 

284 stmts = es_access_policy.get('Statement', []) 

285 for stmt in stmts: 

286 source_ips = self.source_ips_from_stmt(stmt) 

287 if not source_ips: 

288 continue 

289 ip_perms.extend([{'SourceIp': ip} for ip in source_ips]) 

290 return ip_perms 

291 

292 @classmethod 

293 def source_ips_from_stmt(cls, stmt): 

294 source_ips = [] 

295 if stmt.get('Effect', '') == 'Allow': 

296 ips = stmt.get('Condition', {}).get('IpAddress', {}).get('aws:SourceIp', []) 

297 if len(ips) > 0: 

298 if isinstance(ips, list): 

299 source_ips.extend(ips) 

300 else: 

301 source_ips.append(ips) 

302 return source_ips 

303 

304 

305@ElasticSearchDomain.action_registry.register('remove-statements') 

306class RemovePolicyStatement(RemovePolicyBase): 

307 """ 

308 Action to remove policy statements from elasticsearch 

309 

310 :example: 

311 

312 .. code-block:: yaml 

313 

314 policies: 

315 - name: elasticsearch-cross-account 

316 resource: aws.elasticsearch 

317 filters: 

318 - type: cross-account 

319 actions: 

320 - type: remove-statements 

321 statement_ids: matched 

322 """ 

323 

324 permissions = ('es:DescribeElasticsearchDomainConfig', 'es:UpdateElasticsearchDomainConfig',) 

325 

326 def validate(self): 

327 for f in self.manager.iter_filters(): 

328 if isinstance(f, ElasticSearchCrossAccountAccessFilter): 

329 return self 

330 raise PolicyValidationError( 

331 '`remove-statements` may only be used in ' 

332 'conjunction with `cross-account` filter on %s' % (self.manager.data,)) 

333 

334 def process(self, resources): 

335 client = local_session(self.manager.session_factory).client('es') 

336 for r in resources: 

337 try: 

338 self.process_resource(client, r) 

339 except Exception: 

340 self.log.exception("Error processing es:%s", r['ARN']) 

341 

342 def process_resource(self, client, resource): 

343 p = resource.get('c7n:Policy') 

344 

345 if p is None: 

346 return 

347 

348 statements, found = self.process_policy( 

349 p, resource, CrossAccountAccessFilter.annotation_key) 

350 

351 if found: 

352 client.update_elasticsearch_domain_config( 

353 DomainName=resource['DomainName'], 

354 AccessPolicies=json.dumps(p) 

355 ) 

356 

357 return 

358 

359 

360@ElasticSearchDomain.action_registry.register('post-finding') 

361class ElasticSearchPostFinding(PostFinding): 

362 

363 resource_type = 'AwsElasticsearchDomain' 

364 

365 def format_resource(self, r): 

366 envelope, payload = self.format_envelope(r) 

367 payload.update(self.filter_empty({ 

368 'AccessPolicies': r.get('AccessPolicies'), 

369 'DomainId': r['DomainId'], 

370 'DomainName': r['DomainName'], 

371 'Endpoint': r.get('Endpoint'), 

372 'Endpoints': r.get('Endpoints'), 

373 'DomainEndpointOptions': self.filter_empty({ 

374 'EnforceHTTPS': jmespath_search( 

375 'DomainEndpointOptions.EnforceHTTPS', r), 

376 'TLSSecurityPolicy': jmespath_search( 

377 'DomainEndpointOptions.TLSSecurityPolicy', r) 

378 }), 

379 'ElasticsearchVersion': r['ElasticsearchVersion'], 

380 'EncryptionAtRestOptions': self.filter_empty({ 

381 'Enabled': jmespath_search( 

382 'EncryptionAtRestOptions.Enabled', r), 

383 'KmsKeyId': jmespath_search( 

384 'EncryptionAtRestOptions.KmsKeyId', r) 

385 }), 

386 'NodeToNodeEncryptionOptions': self.filter_empty({ 

387 'Enabled': jmespath_search( 

388 'NodeToNodeEncryptionOptions.Enabled', r) 

389 }), 

390 'VPCOptions': self.filter_empty({ 

391 'AvailabilityZones': jmespath_search( 

392 'VPCOptions.AvailabilityZones', r), 

393 'SecurityGroupIds': jmespath_search( 

394 'VPCOptions.SecurityGroupIds', r), 

395 'SubnetIds': jmespath_search('VPCOptions.SubnetIds', r), 

396 'VPCId': jmespath_search('VPCOptions.VPCId', r) 

397 }) 

398 })) 

399 return envelope 

400 

401 

402@ElasticSearchDomain.action_registry.register('modify-security-groups') 

403class ElasticSearchModifySG(ModifyVpcSecurityGroupsAction): 

404 """Modify security groups on an Elasticsearch domain""" 

405 

406 permissions = ('es:UpdateElasticsearchDomainConfig',) 

407 

408 def process(self, domains): 

409 groups = super(ElasticSearchModifySG, self).get_groups(domains) 

410 client = local_session(self.manager.session_factory).client('es') 

411 

412 for dx, d in enumerate(domains): 

413 client.update_elasticsearch_domain_config( 

414 DomainName=d['DomainName'], 

415 VPCOptions={ 

416 'SecurityGroupIds': groups[dx]}) 

417 

418 

419@ElasticSearchDomain.action_registry.register('delete') 

420class Delete(Action): 

421 

422 schema = type_schema('delete') 

423 permissions = ('es:DeleteElasticsearchDomain',) 

424 

425 def process(self, resources): 

426 client = local_session(self.manager.session_factory).client('es') 

427 for r in resources: 

428 client.delete_elasticsearch_domain(DomainName=r['DomainName']) 

429 

430 

431@ElasticSearchDomain.action_registry.register('tag') 

432class ElasticSearchAddTag(Tag): 

433 """Action to create tag(s) on an existing elasticsearch domain 

434 

435 :example: 

436 

437 .. code-block:: yaml 

438 

439 policies: 

440 - name: es-add-tag 

441 resource: elasticsearch 

442 filters: 

443 - "tag:DesiredTag": absent 

444 actions: 

445 - type: tag 

446 key: DesiredTag 

447 value: DesiredValue 

448 """ 

449 permissions = ('es:AddTags',) 

450 

451 def process_resource_set(self, client, domains, tags): 

452 for d in domains: 

453 try: 

454 client.add_tags(ARN=d['ARN'], TagList=tags) 

455 except client.exceptions.ValidationException: 

456 continue 

457 

458 

459@ElasticSearchDomain.action_registry.register('remove-tag') 

460class ElasticSearchRemoveTag(RemoveTag): 

461 """Removes tag(s) on an existing elasticsearch domain 

462 

463 :example: 

464 

465 .. code-block:: yaml 

466 

467 policies: 

468 - name: es-remove-tag 

469 resource: elasticsearch 

470 filters: 

471 - "tag:ExpiredTag": present 

472 actions: 

473 - type: remove-tag 

474 tags: ['ExpiredTag'] 

475 """ 

476 permissions = ('es:RemoveTags',) 

477 

478 def process_resource_set(self, client, domains, tags): 

479 for d in domains: 

480 try: 

481 client.remove_tags(ARN=d['ARN'], TagKeys=tags) 

482 except client.exceptions.ValidationException: 

483 continue 

484 

485 

486@ElasticSearchDomain.action_registry.register('mark-for-op') 

487class ElasticSearchMarkForOp(TagDelayedAction): 

488 """Tag an elasticsearch domain for action later 

489 

490 :example: 

491 

492 .. code-block:: yaml 

493 

494 policies: 

495 - name: es-delete-missing 

496 resource: elasticsearch 

497 filters: 

498 - "tag:DesiredTag": absent 

499 actions: 

500 - type: mark-for-op 

501 days: 7 

502 op: delete 

503 tag: c7n_es_delete 

504 """ 

505 

506 

507@ElasticSearchDomain.action_registry.register('remove-matched-source-ips') 

508class RemoveMatchedSourceIps(BaseAction): 

509 """Action to remove matched source ips from a Access Policy. This action 

510 needs to be used in conjunction with the source-ip filter. It can be used 

511 for removing non-approved IP addresses from the the access policy of a 

512 ElasticSearch domain. 

513 

514 :example: 

515 

516 .. code-block:: yaml 

517 

518 policies: 

519 - name: es-access-revoke 

520 resource: elasticsearch 

521 filters: 

522 - type: source-ip 

523 value_type: cidr 

524 op: not-in 

525 value_from: 

526 url: s3://my-bucket/allowed_cidrs.csv 

527 actions: 

528 - type: remove-matched-source-ips 

529 """ 

530 

531 schema = type_schema('remove-matched-source-ips') 

532 permissions = ('es:UpdateElasticsearchDomainConfig',) 

533 

534 def validate(self): 

535 for f in self.manager.iter_filters(): 

536 if isinstance(f, SourceIP): 

537 return self 

538 

539 raise PolicyValidationError( 

540 '`remove-matched-source-ips` can only be used in conjunction with ' 

541 '`source-ip` filter on %s' % (self.manager.data,)) 

542 

543 def process(self, resources): 

544 client = local_session(self.manager.session_factory).client('es') 

545 

546 for r in resources: 

547 domain_name = r.get('DomainName', '') 

548 # ES Access policy is defined as json string 

549 accpol = json.loads(r.get('AccessPolicies', '')) 

550 good_cidrs = [] 

551 bad_ips = [] 

552 

553 matched_key = SourceIP.annotation 

554 for matched_perm in r.get(matched_key, []): 

555 bad_ips.append(matched_perm.get('SourceIp')) 

556 

557 if not bad_ips: 

558 self.log.info('no matched IPs, no update needed') 

559 return 

560 

561 update_needed = False 

562 for stmt in accpol.get('Statement', []): 

563 source_ips = SourceIP.source_ips_from_stmt(stmt) 

564 if not source_ips: 

565 continue 

566 

567 update_needed = True 

568 good_ips = list(set(source_ips) - set(bad_ips)) 

569 stmt['Condition']['IpAddress']['aws:SourceIp'] = good_ips 

570 

571 if update_needed: 

572 ap = self.update_accpol(client, domain_name, accpol, good_cidrs) 

573 self.log.info('updated AccessPolicy: {}'.format(json.dumps(ap))) 

574 

575 def update_accpol(self, client, domain_name, accpol, good_cidrs): 

576 """Update access policy to only have good ip addresses 

577 """ 

578 for i, cidr in enumerate(good_cidrs): 

579 if 'Condition' not in accpol.get('Statement', [])[i] or \ 

580 accpol.get('Statement', [])[i].get('Effect', '') != 'Allow': 

581 continue 

582 accpol['Statement'][i]['Condition']['IpAddress']['aws:SourceIp'] = cidr 

583 resp = client.update_elasticsearch_domain_config( 

584 DomainName=domain_name, 

585 AccessPolicies=json.dumps(accpol)) 

586 return json.loads(resp.get('DomainConfig', {}).get('AccessPolicies', {}).get('Options', '')) 

587 

588 

589@resources.register('elasticsearch-reserved') 

590class ReservedInstances(QueryResourceManager): 

591 

592 class resource_type(TypeInfo): 

593 service = 'es' 

594 name = id = 'ReservedElasticsearchInstanceId' 

595 date = 'StartTime' 

596 enum_spec = ( 

597 'describe_reserved_elasticsearch_instances', 'ReservedElasticsearchInstances', None) 

598 filter_name = 'ReservedElasticsearchInstances' 

599 filter_type = 'list' 

600 arn_type = "reserved-instances" 

601 permissions_enum = ('es:DescribeReservedElasticsearchInstances',) 

602 

603 

604@ElasticSearchDomain.action_registry.register('update-tls-config') 

605class UpdateTlsConfig(Action): 

606 

607 """Action to update tls-config on a domain endpoint 

608 

609 :example: 

610 

611 .. code-block:: yaml 

612 

613 policies: 

614 - name: update-tls-config 

615 resource: elasticsearch 

616 filters: 

617 - type: value 

618 key: 'DomainEndpointOptions.TLSSecurityPolicy' 

619 op: eq 

620 value: "Policy-Min-TLS-1-0-2019-07" 

621 actions: 

622 - type: update-tls-config 

623 value: "Policy-Min-TLS-1-2-2019-07" 

624 """ 

625 

626 schema = type_schema('update-tls-config', value={'type': 'string', 

627 'enum': ['Policy-Min-TLS-1-0-2019-07', 'Policy-Min-TLS-1-2-2019-07']}, required=['value']) 

628 permissions = ('es:UpdateElasticsearchDomainConfig', 'es:ListDomainNames') 

629 

630 def process(self, resources): 

631 client = local_session(self.manager.session_factory).client('es') 

632 tls_value = self.data.get('value') 

633 for r in resources: 

634 client.update_elasticsearch_domain_config(DomainName=r['DomainName'], 

635 DomainEndpointOptions={'EnforceHTTPS': True, 'TLSSecurityPolicy': tls_value}) 

636 

637 

638@ElasticSearchDomain.action_registry.register('enable-auditlog') 

639class EnableAuditLog(Action): 

640 

641 """Action to enable audit logs on a domain endpoint 

642 

643 :example: 

644 

645 .. code-block:: yaml 

646 

647 policies: 

648 - name: enable-auditlog 

649 resource: elasticsearch 

650 filters: 

651 - type: value 

652 key: 'LogPublishingOptions.AUDIT_LOGS.Enabled' 

653 op: eq 

654 value: false 

655 actions: 

656 - type: enable-auditlog 

657 state: True 

658 loggroup_prefix: "/aws/es/domains" 

659 

660 """ 

661 

662 schema = type_schema( 

663 'enable-auditlog', 

664 state={'type': 'boolean'}, 

665 loggroup_prefix={'type': 'string'}, 

666 delay={'type': 'number'}, 

667 required=['state']) 

668 

669 statement = { 

670 "Sid": "OpenSearchCloudWatchLogs", 

671 "Effect": "Allow", 

672 "Principal": {"Service": ["es.amazonaws.com"]}, 

673 "Action": ["logs:PutLogEvents", "logs:CreateLogStream"], 

674 "Resource": None} 

675 

676 permissions = ( 

677 'es:UpdateElasticsearchDomainConfig', 

678 'es:ListDomainNames', 

679 'logs:DescribeLogGroups', 

680 'logs:CreateLogGroup', 

681 'logs:PutResourcePolicy') 

682 

683 def get_loggroup_arn(self, domain, log_prefix=None): 

684 if log_prefix: 

685 log_group_name = "%s/%s/audit-logs" % (log_prefix, domain) 

686 else: 

687 log_group_name = "/aws/OpenSearchService/domains/%s/audit-logs" % (domain) 

688 log_group_arn = "arn:aws:logs:{}:{}:log-group:{}:*".format( 

689 self.manager.region, self.manager.account_id, log_group_name) 

690 return log_group_arn 

691 

692 def merge_dict(self, d1, d2): 

693 merged_dict = defaultdict(list) 

694 

695 for d in (d1, d2): 

696 for key, value in d.items(): 

697 if key == 'Resource': 

698 if isinstance(value, list): 

699 merged_dict[key].extend(value) 

700 else: 

701 merged_dict[key].append(value) 

702 else: 

703 merged_dict[key] = value 

704 

705 return dict(merged_dict) 

706 

707 def set_permissions(self, log_group_arn): 

708 statement = dict(self.statement) 

709 statement['Resource'] = [log_group_arn] 

710 client = local_session( 

711 self.manager.session_factory).client('logs') 

712 

713 try: 

714 client.create_log_group(logGroupName=log_group_arn.split(":")[-2]) 

715 except client.exceptions.ResourceAlreadyExistsException: 

716 pass 

717 

718 for policy in client.describe_resource_policies().get('resourcePolicies'): 

719 if policy['policyName'] == "OpenSearchCloudwatchLogPermissions": 

720 policy_doc = json.loads(policy['policyDocument']) 

721 if log_group_arn not in policy_doc['Statement'][0]['Resource']: 

722 merged_statement = self.merge_dict(statement, policy_doc['Statement'][0]) 

723 statement = merged_statement 

724 continue 

725 

726 response = client.put_resource_policy( 

727 policyName='OpenSearchCloudwatchLogPermissions', 

728 policyDocument=json.dumps( 

729 {"Version": "2012-10-17", "Statement": statement})) 

730 

731 return response 

732 

733 def process(self, resources): 

734 client = local_session( 

735 self.manager.session_factory).client('es') 

736 state = self.data.get('state') 

737 log_prefix = self.data.get('loggroup_prefix') 

738 log_group_arns = { 

739 r['DomainName']: self.get_loggroup_arn(r["DomainName"], log_prefix) for r in resources} 

740 

741 for r in resources: 

742 if state: 

743 self.set_permissions(log_group_arns[r["DomainName"]]) 

744 time.sleep(self.data.get('delay', 15)) 

745 client.update_elasticsearch_domain_config(DomainName=r['DomainName'], 

746 LogPublishingOptions={"AUDIT_LOGS": 

747 {'CloudWatchLogsLogGroupArn': log_group_arns[r["DomainName"]], 'Enabled': state}})