Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/elasticsearch.py: 40%
336 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import json
4import time
5from collections import defaultdict
7from c7n.actions import Action, BaseAction, ModifyVpcSecurityGroupsAction, RemovePolicyBase
8from c7n.filters import MetricsFilter, CrossAccountAccessFilter, ValueFilter
9from c7n.exceptions import PolicyValidationError
10from c7n.filters.vpc import SecurityGroupFilter, SubnetFilter, VpcFilter, Filter
11from c7n.manager import resources
12from c7n.query import ConfigSource, DescribeSource, QueryResourceManager, TypeInfo
13from c7n.utils import chunks, local_session, type_schema, merge_dict_list, jmespath_search
14from c7n.tags import Tag, RemoveTag, TagActionFilter, TagDelayedAction
15from c7n.filters.kms import KmsRelatedFilter
16import c7n.filters.policystatement as polstmt_filter
18from .securityhub import PostFinding
21class DescribeDomain(DescribeSource):
23 def get_resources(self, resource_ids):
24 # augment will turn these into resource dictionaries
25 return resource_ids
27 def augment(self, domains):
28 client = local_session(self.manager.session_factory).client('es')
29 model = self.manager.get_model()
30 results = []
32 def _augment(resource_set):
33 resources = self.manager.retry(
34 client.describe_elasticsearch_domains,
35 DomainNames=resource_set)['DomainStatusList']
36 for r in resources:
37 rarn = self.manager.generate_arn(r[model.id])
38 r['Tags'] = self.manager.retry(
39 client.list_tags, ARN=rarn).get('TagList', [])
40 return resources
42 for resource_set in chunks(domains, 5):
43 results.extend(_augment(resource_set))
45 return results
48@resources.register('elasticsearch')
49class ElasticSearchDomain(QueryResourceManager):
51 class resource_type(TypeInfo):
52 service = 'es'
53 arn = 'ARN'
54 arn_type = 'domain'
55 enum_spec = (
56 'list_domain_names', 'DomainNames[].DomainName', None)
57 id = 'DomainName'
58 name = 'Name'
59 dimension = "DomainName"
60 cfn_type = config_type = 'AWS::Elasticsearch::Domain'
62 def resources(self, query=None):
63 if 'query' in self.data:
64 query = merge_dict_list(self.data['query'])
65 elif query is None:
66 query = {}
67 return super(ElasticSearchDomain, self).resources(query=query)
69 source_mapping = {
70 'describe': DescribeDomain,
71 'config': ConfigSource
72 }
75ElasticSearchDomain.filter_registry.register('marked-for-op', TagActionFilter)
78@ElasticSearchDomain.filter_registry.register('subnet')
79class Subnet(SubnetFilter):
81 RelatedIdsExpression = "VPCOptions.SubnetIds[]"
84@ElasticSearchDomain.filter_registry.register('security-group')
85class SecurityGroup(SecurityGroupFilter):
87 RelatedIdsExpression = "VPCOptions.SecurityGroupIds[]"
90@ElasticSearchDomain.filter_registry.register('vpc')
91class Vpc(VpcFilter):
93 RelatedIdsExpression = "VPCOptions.VPCId"
96@ElasticSearchDomain.filter_registry.register('metrics')
97class Metrics(MetricsFilter):
99 def get_dimensions(self, resource):
100 return [{'Name': 'ClientId',
101 'Value': self.manager.account_id},
102 {'Name': 'DomainName',
103 'Value': resource['DomainName']}]
106@ElasticSearchDomain.filter_registry.register('kms-key')
107class KmsFilter(KmsRelatedFilter):
109 RelatedIdsExpression = 'EncryptionAtRestOptions.KmsKeyId'
112@ElasticSearchDomain.filter_registry.register('cross-account')
113class ElasticSearchCrossAccountAccessFilter(CrossAccountAccessFilter):
114 """
115 Filter to return all elasticsearch domains with cross account access permissions
117 :example:
119 .. code-block:: yaml
121 policies:
122 - name: check-elasticsearch-cross-account
123 resource: aws.elasticsearch
124 filters:
125 - type: cross-account
126 """
127 policy_attribute = 'c7n:Policy'
128 permissions = ('es:DescribeElasticsearchDomainConfig',)
130 def process(self, resources, event=None):
131 client = local_session(self.manager.session_factory).client('es')
132 for r in resources:
133 if self.policy_attribute not in r:
134 result = self.manager.retry(
135 client.describe_elasticsearch_domain_config,
136 DomainName=r['DomainName'],
137 ignore_err_codes=('ResourceNotFoundException',))
138 if result:
139 options = result.get('DomainConfig').get('AccessPolicies').get('Options')
140 r[self.policy_attribute] = options and json.loads(options) or None
141 return super().process(resources)
144@ElasticSearchDomain.filter_registry.register('cross-cluster')
145class ElasticSearchCrossClusterFilter(Filter):
146 """
147 Filter to return all elasticsearch domains with inbound cross-cluster with the given info
149 :example:
151 .. code-block:: yaml
153 policies:
154 - name: check-elasticsearch-cross-cluster
155 resource: aws.elasticsearch
156 filters:
157 - type: cross-cluster
158 inbound:
159 key: SourceDomainInfo.OwnerId
160 op: eq
161 value: '123456789'
162 outbound:
163 key: SourceDomainInfo.OwnerId
164 op: eq
165 value: '123456789'
166 """
167 schema = type_schema(type_name="cross-cluster",
168 inbound=type_schema(type_name='inbound',
169 required=('key', 'value'),
170 rinherit=ValueFilter.schema),
171 outbound=type_schema(type_name='outbound',
172 required=('key', 'value'),
173 rinherit=ValueFilter.schema),)
174 schema_alias = False
175 annotation_key = "c7n:SearchConnections"
176 matched_key = "c7n:MatchedConnections"
177 annotate = False
178 permissions = ('es:ESCrossClusterGet',)
180 def process(self, resources, event=None):
181 client = local_session(self.manager.session_factory).client('es')
182 results = []
183 for r in resources:
184 if self.annotation_key not in r:
185 r[self.annotation_key] = {}
186 if "inbound" in self.data:
187 inbound = self.manager.retry(
188 client.describe_inbound_cross_cluster_search_connections,
189 Filters=[{'Name': 'destination-domain-info.domain-name',
190 'Values': [r['DomainName']]}])
191 inbound.pop('ResponseMetadata')
192 r[self.annotation_key]["inbound"] = inbound
193 if "outbound" in self.data:
194 outbound = self.manager.retry(
195 client.describe_outbound_cross_cluster_search_connections,
196 Filters=[{'Name': 'source-domain-info.domain-name',
197 'Values': [r['DomainName']]}])
198 outbound.pop('ResponseMetadata')
199 r[self.annotation_key]["outbound"] = outbound
200 matchFound = False
201 r[self.matched_key] = {}
202 for direction in r[self.annotation_key]:
203 matcher = self.data.get(direction)
204 valueFilter = ValueFilter(matcher)
205 valueFilter.annotate = False
206 matched = []
207 for conn in r[self.annotation_key][direction]['CrossClusterSearchConnections']:
208 if valueFilter(conn):
209 matched.append(conn)
210 matchFound = True
211 r[self.matched_key][direction] = matched
212 if matchFound:
213 results.append(r)
214 return results
217@ElasticSearchDomain.filter_registry.register('has-statement')
218class HasStatementFilter(polstmt_filter.HasStatementFilter):
219 def __init__(self, data, manager=None):
220 super().__init__(data, manager)
221 self.policy_attribute = 'AccessPolicies'
223 def get_std_format_args(self, domain):
224 return {
225 'domain_arn': domain['ARN'],
226 'account_id': self.manager.config.account_id,
227 'region': self.manager.config.region
228 }
231@ElasticSearchDomain.filter_registry.register('source-ip')
232class SourceIP(Filter):
233 """ValueFilter-based filter for verifying allowed source ips in
234 an ElasticSearch domain's access policy. Useful for checking to see if
235 an ElasticSearch domain allows traffic from non approved IP addresses/CIDRs.
237 :example:
239 Find ElasticSearch domains that allow traffic from IP addresses
240 not in the approved list (string matching)
242 .. code-block: yaml
244 - type: source-ip
245 op: not-in
246 value: ["103.15.250.0/24", "173.240.160.0/21", "206.108.40.0/21"]
248 Same as above but using cidr matching instead of string matching
250 .. code-block: yaml
252 - type: source-ip
253 op: not-in
254 value_type: cidr
255 value: ["103.15.250.0/24", "173.240.160.0/21", "206.108.40.0/21"]
257 """
258 schema = type_schema('source-ip', rinherit=ValueFilter.schema)
259 permissions = ("es:DescribeElasticsearchDomainConfig",)
260 annotation = 'c7n:MatchedSourceIps'
262 def __call__(self, resource):
263 es_access_policy = resource.get('AccessPolicies')
264 matched = []
265 source_ips = self.get_source_ip_perms(json.loads(es_access_policy))
266 if not self.data.get('key'):
267 self.data['key'] = 'SourceIp'
268 vf = ValueFilter(self.data, self.manager)
269 vf.annotate = False
270 for source_ip in source_ips:
271 found = vf(source_ip)
272 if found:
273 matched.append(source_ip)
275 if matched:
276 resource[self.annotation] = matched
277 return True
278 return False
280 def get_source_ip_perms(self, es_access_policy):
281 """Get SourceIps from the original access policy
282 """
283 ip_perms = []
284 stmts = es_access_policy.get('Statement', [])
285 for stmt in stmts:
286 source_ips = self.source_ips_from_stmt(stmt)
287 if not source_ips:
288 continue
289 ip_perms.extend([{'SourceIp': ip} for ip in source_ips])
290 return ip_perms
292 @classmethod
293 def source_ips_from_stmt(cls, stmt):
294 source_ips = []
295 if stmt.get('Effect', '') == 'Allow':
296 ips = stmt.get('Condition', {}).get('IpAddress', {}).get('aws:SourceIp', [])
297 if len(ips) > 0:
298 if isinstance(ips, list):
299 source_ips.extend(ips)
300 else:
301 source_ips.append(ips)
302 return source_ips
305@ElasticSearchDomain.action_registry.register('remove-statements')
306class RemovePolicyStatement(RemovePolicyBase):
307 """
308 Action to remove policy statements from elasticsearch
310 :example:
312 .. code-block:: yaml
314 policies:
315 - name: elasticsearch-cross-account
316 resource: aws.elasticsearch
317 filters:
318 - type: cross-account
319 actions:
320 - type: remove-statements
321 statement_ids: matched
322 """
324 permissions = ('es:DescribeElasticsearchDomainConfig', 'es:UpdateElasticsearchDomainConfig',)
326 def validate(self):
327 for f in self.manager.iter_filters():
328 if isinstance(f, ElasticSearchCrossAccountAccessFilter):
329 return self
330 raise PolicyValidationError(
331 '`remove-statements` may only be used in '
332 'conjunction with `cross-account` filter on %s' % (self.manager.data,))
334 def process(self, resources):
335 client = local_session(self.manager.session_factory).client('es')
336 for r in resources:
337 try:
338 self.process_resource(client, r)
339 except Exception:
340 self.log.exception("Error processing es:%s", r['ARN'])
342 def process_resource(self, client, resource):
343 p = resource.get('c7n:Policy')
345 if p is None:
346 return
348 statements, found = self.process_policy(
349 p, resource, CrossAccountAccessFilter.annotation_key)
351 if found:
352 client.update_elasticsearch_domain_config(
353 DomainName=resource['DomainName'],
354 AccessPolicies=json.dumps(p)
355 )
357 return
360@ElasticSearchDomain.action_registry.register('post-finding')
361class ElasticSearchPostFinding(PostFinding):
363 resource_type = 'AwsElasticsearchDomain'
365 def format_resource(self, r):
366 envelope, payload = self.format_envelope(r)
367 payload.update(self.filter_empty({
368 'AccessPolicies': r.get('AccessPolicies'),
369 'DomainId': r['DomainId'],
370 'DomainName': r['DomainName'],
371 'Endpoint': r.get('Endpoint'),
372 'Endpoints': r.get('Endpoints'),
373 'DomainEndpointOptions': self.filter_empty({
374 'EnforceHTTPS': jmespath_search(
375 'DomainEndpointOptions.EnforceHTTPS', r),
376 'TLSSecurityPolicy': jmespath_search(
377 'DomainEndpointOptions.TLSSecurityPolicy', r)
378 }),
379 'ElasticsearchVersion': r['ElasticsearchVersion'],
380 'EncryptionAtRestOptions': self.filter_empty({
381 'Enabled': jmespath_search(
382 'EncryptionAtRestOptions.Enabled', r),
383 'KmsKeyId': jmespath_search(
384 'EncryptionAtRestOptions.KmsKeyId', r)
385 }),
386 'NodeToNodeEncryptionOptions': self.filter_empty({
387 'Enabled': jmespath_search(
388 'NodeToNodeEncryptionOptions.Enabled', r)
389 }),
390 'VPCOptions': self.filter_empty({
391 'AvailabilityZones': jmespath_search(
392 'VPCOptions.AvailabilityZones', r),
393 'SecurityGroupIds': jmespath_search(
394 'VPCOptions.SecurityGroupIds', r),
395 'SubnetIds': jmespath_search('VPCOptions.SubnetIds', r),
396 'VPCId': jmespath_search('VPCOptions.VPCId', r)
397 })
398 }))
399 return envelope
402@ElasticSearchDomain.action_registry.register('modify-security-groups')
403class ElasticSearchModifySG(ModifyVpcSecurityGroupsAction):
404 """Modify security groups on an Elasticsearch domain"""
406 permissions = ('es:UpdateElasticsearchDomainConfig',)
408 def process(self, domains):
409 groups = super(ElasticSearchModifySG, self).get_groups(domains)
410 client = local_session(self.manager.session_factory).client('es')
412 for dx, d in enumerate(domains):
413 client.update_elasticsearch_domain_config(
414 DomainName=d['DomainName'],
415 VPCOptions={
416 'SecurityGroupIds': groups[dx]})
419@ElasticSearchDomain.action_registry.register('delete')
420class Delete(Action):
422 schema = type_schema('delete')
423 permissions = ('es:DeleteElasticsearchDomain',)
425 def process(self, resources):
426 client = local_session(self.manager.session_factory).client('es')
427 for r in resources:
428 client.delete_elasticsearch_domain(DomainName=r['DomainName'])
431@ElasticSearchDomain.action_registry.register('tag')
432class ElasticSearchAddTag(Tag):
433 """Action to create tag(s) on an existing elasticsearch domain
435 :example:
437 .. code-block:: yaml
439 policies:
440 - name: es-add-tag
441 resource: elasticsearch
442 filters:
443 - "tag:DesiredTag": absent
444 actions:
445 - type: tag
446 key: DesiredTag
447 value: DesiredValue
448 """
449 permissions = ('es:AddTags',)
451 def process_resource_set(self, client, domains, tags):
452 for d in domains:
453 try:
454 client.add_tags(ARN=d['ARN'], TagList=tags)
455 except client.exceptions.ValidationException:
456 continue
459@ElasticSearchDomain.action_registry.register('remove-tag')
460class ElasticSearchRemoveTag(RemoveTag):
461 """Removes tag(s) on an existing elasticsearch domain
463 :example:
465 .. code-block:: yaml
467 policies:
468 - name: es-remove-tag
469 resource: elasticsearch
470 filters:
471 - "tag:ExpiredTag": present
472 actions:
473 - type: remove-tag
474 tags: ['ExpiredTag']
475 """
476 permissions = ('es:RemoveTags',)
478 def process_resource_set(self, client, domains, tags):
479 for d in domains:
480 try:
481 client.remove_tags(ARN=d['ARN'], TagKeys=tags)
482 except client.exceptions.ValidationException:
483 continue
486@ElasticSearchDomain.action_registry.register('mark-for-op')
487class ElasticSearchMarkForOp(TagDelayedAction):
488 """Tag an elasticsearch domain for action later
490 :example:
492 .. code-block:: yaml
494 policies:
495 - name: es-delete-missing
496 resource: elasticsearch
497 filters:
498 - "tag:DesiredTag": absent
499 actions:
500 - type: mark-for-op
501 days: 7
502 op: delete
503 tag: c7n_es_delete
504 """
507@ElasticSearchDomain.action_registry.register('remove-matched-source-ips')
508class RemoveMatchedSourceIps(BaseAction):
509 """Action to remove matched source ips from a Access Policy. This action
510 needs to be used in conjunction with the source-ip filter. It can be used
511 for removing non-approved IP addresses from the the access policy of a
512 ElasticSearch domain.
514 :example:
516 .. code-block:: yaml
518 policies:
519 - name: es-access-revoke
520 resource: elasticsearch
521 filters:
522 - type: source-ip
523 value_type: cidr
524 op: not-in
525 value_from:
526 url: s3://my-bucket/allowed_cidrs.csv
527 actions:
528 - type: remove-matched-source-ips
529 """
531 schema = type_schema('remove-matched-source-ips')
532 permissions = ('es:UpdateElasticsearchDomainConfig',)
534 def validate(self):
535 for f in self.manager.iter_filters():
536 if isinstance(f, SourceIP):
537 return self
539 raise PolicyValidationError(
540 '`remove-matched-source-ips` can only be used in conjunction with '
541 '`source-ip` filter on %s' % (self.manager.data,))
543 def process(self, resources):
544 client = local_session(self.manager.session_factory).client('es')
546 for r in resources:
547 domain_name = r.get('DomainName', '')
548 # ES Access policy is defined as json string
549 accpol = json.loads(r.get('AccessPolicies', ''))
550 good_cidrs = []
551 bad_ips = []
553 matched_key = SourceIP.annotation
554 for matched_perm in r.get(matched_key, []):
555 bad_ips.append(matched_perm.get('SourceIp'))
557 if not bad_ips:
558 self.log.info('no matched IPs, no update needed')
559 return
561 update_needed = False
562 for stmt in accpol.get('Statement', []):
563 source_ips = SourceIP.source_ips_from_stmt(stmt)
564 if not source_ips:
565 continue
567 update_needed = True
568 good_ips = list(set(source_ips) - set(bad_ips))
569 stmt['Condition']['IpAddress']['aws:SourceIp'] = good_ips
571 if update_needed:
572 ap = self.update_accpol(client, domain_name, accpol, good_cidrs)
573 self.log.info('updated AccessPolicy: {}'.format(json.dumps(ap)))
575 def update_accpol(self, client, domain_name, accpol, good_cidrs):
576 """Update access policy to only have good ip addresses
577 """
578 for i, cidr in enumerate(good_cidrs):
579 if 'Condition' not in accpol.get('Statement', [])[i] or \
580 accpol.get('Statement', [])[i].get('Effect', '') != 'Allow':
581 continue
582 accpol['Statement'][i]['Condition']['IpAddress']['aws:SourceIp'] = cidr
583 resp = client.update_elasticsearch_domain_config(
584 DomainName=domain_name,
585 AccessPolicies=json.dumps(accpol))
586 return json.loads(resp.get('DomainConfig', {}).get('AccessPolicies', {}).get('Options', ''))
589@resources.register('elasticsearch-reserved')
590class ReservedInstances(QueryResourceManager):
592 class resource_type(TypeInfo):
593 service = 'es'
594 name = id = 'ReservedElasticsearchInstanceId'
595 date = 'StartTime'
596 enum_spec = (
597 'describe_reserved_elasticsearch_instances', 'ReservedElasticsearchInstances', None)
598 filter_name = 'ReservedElasticsearchInstances'
599 filter_type = 'list'
600 arn_type = "reserved-instances"
601 permissions_enum = ('es:DescribeReservedElasticsearchInstances',)
604@ElasticSearchDomain.action_registry.register('update-tls-config')
605class UpdateTlsConfig(Action):
607 """Action to update tls-config on a domain endpoint
609 :example:
611 .. code-block:: yaml
613 policies:
614 - name: update-tls-config
615 resource: elasticsearch
616 filters:
617 - type: value
618 key: 'DomainEndpointOptions.TLSSecurityPolicy'
619 op: eq
620 value: "Policy-Min-TLS-1-0-2019-07"
621 actions:
622 - type: update-tls-config
623 value: "Policy-Min-TLS-1-2-2019-07"
624 """
626 schema = type_schema('update-tls-config', value={'type': 'string',
627 'enum': ['Policy-Min-TLS-1-0-2019-07', 'Policy-Min-TLS-1-2-2019-07']}, required=['value'])
628 permissions = ('es:UpdateElasticsearchDomainConfig', 'es:ListDomainNames')
630 def process(self, resources):
631 client = local_session(self.manager.session_factory).client('es')
632 tls_value = self.data.get('value')
633 for r in resources:
634 client.update_elasticsearch_domain_config(DomainName=r['DomainName'],
635 DomainEndpointOptions={'EnforceHTTPS': True, 'TLSSecurityPolicy': tls_value})
638@ElasticSearchDomain.action_registry.register('enable-auditlog')
639class EnableAuditLog(Action):
641 """Action to enable audit logs on a domain endpoint
643 :example:
645 .. code-block:: yaml
647 policies:
648 - name: enable-auditlog
649 resource: elasticsearch
650 filters:
651 - type: value
652 key: 'LogPublishingOptions.AUDIT_LOGS.Enabled'
653 op: eq
654 value: false
655 actions:
656 - type: enable-auditlog
657 state: True
658 loggroup_prefix: "/aws/es/domains"
660 """
662 schema = type_schema(
663 'enable-auditlog',
664 state={'type': 'boolean'},
665 loggroup_prefix={'type': 'string'},
666 delay={'type': 'number'},
667 required=['state'])
669 statement = {
670 "Sid": "OpenSearchCloudWatchLogs",
671 "Effect": "Allow",
672 "Principal": {"Service": ["es.amazonaws.com"]},
673 "Action": ["logs:PutLogEvents", "logs:CreateLogStream"],
674 "Resource": None}
676 permissions = (
677 'es:UpdateElasticsearchDomainConfig',
678 'es:ListDomainNames',
679 'logs:DescribeLogGroups',
680 'logs:CreateLogGroup',
681 'logs:PutResourcePolicy')
683 def get_loggroup_arn(self, domain, log_prefix=None):
684 if log_prefix:
685 log_group_name = "%s/%s/audit-logs" % (log_prefix, domain)
686 else:
687 log_group_name = "/aws/OpenSearchService/domains/%s/audit-logs" % (domain)
688 log_group_arn = "arn:aws:logs:{}:{}:log-group:{}:*".format(
689 self.manager.region, self.manager.account_id, log_group_name)
690 return log_group_arn
692 def merge_dict(self, d1, d2):
693 merged_dict = defaultdict(list)
695 for d in (d1, d2):
696 for key, value in d.items():
697 if key == 'Resource':
698 if isinstance(value, list):
699 merged_dict[key].extend(value)
700 else:
701 merged_dict[key].append(value)
702 else:
703 merged_dict[key] = value
705 return dict(merged_dict)
707 def set_permissions(self, log_group_arn):
708 statement = dict(self.statement)
709 statement['Resource'] = [log_group_arn]
710 client = local_session(
711 self.manager.session_factory).client('logs')
713 try:
714 client.create_log_group(logGroupName=log_group_arn.split(":")[-2])
715 except client.exceptions.ResourceAlreadyExistsException:
716 pass
718 for policy in client.describe_resource_policies().get('resourcePolicies'):
719 if policy['policyName'] == "OpenSearchCloudwatchLogPermissions":
720 policy_doc = json.loads(policy['policyDocument'])
721 if log_group_arn not in policy_doc['Statement'][0]['Resource']:
722 merged_statement = self.merge_dict(statement, policy_doc['Statement'][0])
723 statement = merged_statement
724 continue
726 response = client.put_resource_policy(
727 policyName='OpenSearchCloudwatchLogPermissions',
728 policyDocument=json.dumps(
729 {"Version": "2012-10-17", "Statement": statement}))
731 return response
733 def process(self, resources):
734 client = local_session(
735 self.manager.session_factory).client('es')
736 state = self.data.get('state')
737 log_prefix = self.data.get('loggroup_prefix')
738 log_group_arns = {
739 r['DomainName']: self.get_loggroup_arn(r["DomainName"], log_prefix) for r in resources}
741 for r in resources:
742 if state:
743 self.set_permissions(log_group_arns[r["DomainName"]])
744 time.sleep(self.data.get('delay', 15))
745 client.update_elasticsearch_domain_config(DomainName=r['DomainName'],
746 LogPublishingOptions={"AUDIT_LOGS":
747 {'CloudWatchLogsLogGroupArn': log_group_arns[r["DomainName"]], 'Enabled': state}})