Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/rdscluster.py: 44%
345 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import logging
4import itertools
5from concurrent.futures import as_completed
6from datetime import datetime, timedelta
8from c7n.actions import BaseAction
9from c7n.filters import AgeFilter, CrossAccountAccessFilter, Filter, ValueFilter
10from c7n.filters.offhours import OffHour, OnHour
11import c7n.filters.vpc as net_filters
12from c7n.manager import resources
13from c7n.query import (
14 ConfigSource, QueryResourceManager, TypeInfo, DescribeSource, RetryPageIterator)
15from c7n.resources import rds
16from c7n.filters.kms import KmsRelatedFilter
17from .aws import shape_validate
18from c7n.exceptions import PolicyValidationError
19from c7n.utils import (
20 type_schema, local_session, snapshot_identifier, chunks)
22from c7n.resources.rds import ParameterFilter
23from c7n.filters.backup import ConsecutiveAwsBackupsFilter
25log = logging.getLogger('custodian.rds-cluster')
28class DescribeCluster(DescribeSource):
30 def get_resources(self, ids):
31 return self.query.filter(
32 self.manager,
33 **{
34 'Filters': [
35 {'Name': 'db-cluster-id', 'Values': ids}]})
37 def augment(self, resources):
38 for r in resources:
39 r['Tags'] = r.pop('TagList', ())
40 return resources
43class ConfigCluster(ConfigSource):
45 def load_resource(self, item):
46 resource = super().load_resource(item)
47 resource.pop('TagList', None) # we pull tags from supplementary config
48 for k in list(resource.keys()):
49 if k.startswith('Dbc'):
50 resource["DBC%s" % (k[3:])] = resource.pop(k)
51 elif k.startswith('Iamd'):
52 resource['IAMD%s' % (k[4:])] = resource.pop(k)
53 elif k.startswith('Dbs'):
54 resource["DBS%s" % (k[3:])] = resource.pop(k)
55 return resource
58@resources.register('rds-cluster')
59class RDSCluster(QueryResourceManager):
60 """Resource manager for RDS clusters.
61 """
63 class resource_type(TypeInfo):
65 service = 'rds'
66 arn = 'DBClusterArn'
67 arn_type = 'cluster'
68 arn_separator = ":"
69 enum_spec = ('describe_db_clusters', 'DBClusters', None)
70 name = id = 'DBClusterIdentifier'
71 config_id = 'DbClusterResourceId'
72 dimension = 'DBClusterIdentifier'
73 universal_taggable = True
74 permissions_enum = ('rds:DescribeDBClusters',)
75 cfn_type = config_type = 'AWS::RDS::DBCluster'
77 source_mapping = {
78 'config': ConfigCluster,
79 'describe': DescribeCluster
80 }
83RDSCluster.filter_registry.register('offhour', OffHour)
84RDSCluster.filter_registry.register('onhour', OnHour)
87@RDSCluster.filter_registry.register('security-group')
88class SecurityGroupFilter(net_filters.SecurityGroupFilter):
90 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId"
93@RDSCluster.filter_registry.register('subnet')
94class SubnetFilter(net_filters.SubnetFilter):
96 RelatedIdsExpression = ""
97 groups = None
99 def get_permissions(self):
100 return self.manager.get_resource_manager(
101 'rds-subnet-group').get_permissions()
103 def get_subnet_groups(self):
104 return {
105 r['DBSubnetGroupName']: r for r in
106 self.manager.get_resource_manager('rds-subnet-group').resources()}
108 def get_related_ids(self, resources):
109 if not self.groups:
110 self.groups = self.get_subnet_groups()
111 group_ids = set()
112 for r in resources:
113 group_ids.update(
114 [s['SubnetIdentifier'] for s in
115 self.groups[r['DBSubnetGroup']]['Subnets']])
116 return group_ids
118 def process(self, resources, event=None):
119 if not self.groups:
120 self.groups = self.get_subnet_groups()
121 return super(SubnetFilter, self).process(resources, event)
124RDSCluster.filter_registry.register('network-location', net_filters.NetworkLocation)
127@RDSCluster.filter_registry.register('kms-key')
128class KmsFilter(KmsRelatedFilter):
130 RelatedIdsExpression = 'KmsKeyId'
133@RDSCluster.action_registry.register('delete')
134class Delete(BaseAction):
135 """Action to delete a RDS cluster
137 To prevent unwanted deletion of clusters, it is recommended to apply a
138 filter to the rule
140 :example:
142 .. code-block:: yaml
144 policies:
145 - name: rds-cluster-delete-unused
146 resource: rds-cluster
147 filters:
148 - type: metrics
149 name: CPUUtilization
150 days: 21
151 value: 1.0
152 op: le
153 actions:
154 - type: delete
155 skip-snapshot: false
156 delete-instances: true
157 """
159 schema = type_schema(
160 'delete', **{'skip-snapshot': {'type': 'boolean'},
161 'delete-instances': {'type': 'boolean'}})
163 permissions = ('rds:DeleteDBCluster',)
165 def process(self, clusters):
166 skip = self.data.get('skip-snapshot', False)
167 delete_instances = self.data.get('delete-instances', True)
168 client = local_session(self.manager.session_factory).client('rds')
170 for cluster in clusters:
171 if delete_instances:
172 for instance in cluster.get('DBClusterMembers', []):
173 client.delete_db_instance(
174 DBInstanceIdentifier=instance['DBInstanceIdentifier'],
175 SkipFinalSnapshot=True)
176 self.log.info(
177 'Deleted RDS instance: %s',
178 instance['DBInstanceIdentifier'])
180 params = {'DBClusterIdentifier': cluster['DBClusterIdentifier']}
181 if skip:
182 params['SkipFinalSnapshot'] = True
183 else:
184 params['FinalDBSnapshotIdentifier'] = snapshot_identifier(
185 'Final', cluster['DBClusterIdentifier'])
187 _run_cluster_method(
188 client.delete_db_cluster, params,
189 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
190 client.exceptions.InvalidDBClusterStateFault)
193@RDSCluster.action_registry.register('retention')
194class RetentionWindow(BaseAction):
195 """
196 Action to set the retention period on rds cluster snapshots,
197 enforce (min, max, exact) sets retention days occordingly.
199 :example:
201 .. code-block:: yaml
203 policies:
204 - name: rds-cluster-backup-retention
205 resource: rds-cluster
206 filters:
207 - type: value
208 key: BackupRetentionPeriod
209 value: 21
210 op: ne
211 actions:
212 - type: retention
213 days: 21
214 enforce: min
215 """
217 date_attribute = "BackupRetentionPeriod"
218 # Tag copy not yet available for Aurora:
219 # https://forums.aws.amazon.com/thread.jspa?threadID=225812
220 schema = type_schema(
221 'retention', **{'days': {'type': 'number'},
222 'enforce': {'type': 'string', 'enum': [
223 'min', 'max', 'exact']}})
224 permissions = ('rds:ModifyDBCluster',)
226 def process(self, clusters):
227 client = local_session(self.manager.session_factory).client('rds')
229 for cluster in clusters:
230 self.process_snapshot_retention(client, cluster)
232 def process_snapshot_retention(self, client, cluster):
233 current_retention = int(cluster.get('BackupRetentionPeriod', 0))
234 new_retention = self.data['days']
235 retention_type = self.data.get('enforce', 'min').lower()
236 if retention_type == 'min':
237 self.set_retention_window(
238 client, cluster, max(current_retention, new_retention))
239 elif retention_type == 'max':
240 self.set_retention_window(
241 client, cluster, min(current_retention, new_retention))
242 elif retention_type == 'exact':
243 self.set_retention_window(client, cluster, new_retention)
245 def set_retention_window(self, client, cluster, retention):
246 params = dict(
247 DBClusterIdentifier=cluster['DBClusterIdentifier'],
248 BackupRetentionPeriod=retention
249 )
250 if cluster.get('EngineMode') != 'serverless':
251 params.update(
252 dict(
253 PreferredBackupWindow=cluster['PreferredBackupWindow'],
254 PreferredMaintenanceWindow=cluster['PreferredMaintenanceWindow'])
255 )
256 _run_cluster_method(
257 client.modify_db_cluster,
258 params,
259 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
260 client.exceptions.InvalidDBClusterStateFault
261 )
264@RDSCluster.action_registry.register('stop')
265class Stop(BaseAction):
266 """Stop a running db cluster
267 """
269 schema = type_schema('stop')
270 permissions = ('rds:StopDBCluster',)
272 def process(self, clusters):
273 client = local_session(self.manager.session_factory).client('rds')
274 for c in clusters:
275 _run_cluster_method(
276 client.stop_db_cluster, dict(DBClusterIdentifier=c['DBClusterIdentifier']),
277 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
278 client.exceptions.InvalidDBClusterStateFault)
281@RDSCluster.action_registry.register('start')
282class Start(BaseAction):
283 """Start a stopped db cluster
284 """
286 schema = type_schema('start')
287 permissions = ('rds:StartDBCluster',)
289 def process(self, clusters):
290 client = local_session(self.manager.session_factory).client('rds')
291 for c in clusters:
292 _run_cluster_method(
293 client.start_db_cluster, dict(DBClusterIdentifier=c['DBClusterIdentifier']),
294 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
295 client.exceptions.InvalidDBClusterStateFault)
298def _run_cluster_method(method, params, ignore=(), warn=(), method_name=""):
299 try:
300 method(**params)
301 except ignore:
302 pass
303 except warn as e:
304 log.warning(
305 "error %s on cluster %s error %s",
306 method_name or method.__name__, params['DBClusterIdentifier'], e)
309@RDSCluster.action_registry.register('snapshot')
310class Snapshot(BaseAction):
311 """Action to create a snapshot of a rds cluster
313 :example:
315 .. code-block:: yaml
317 policies:
318 - name: rds-cluster-snapshot
319 resource: rds-cluster
320 actions:
321 - snapshot
322 """
324 schema = type_schema('snapshot')
325 permissions = ('rds:CreateDBClusterSnapshot',)
327 def process(self, clusters):
328 client = local_session(self.manager.session_factory).client('rds')
329 for cluster in clusters:
330 _run_cluster_method(
331 client.create_db_cluster_snapshot,
332 dict(
333 DBClusterSnapshotIdentifier=snapshot_identifier(
334 'Backup', cluster['DBClusterIdentifier']),
335 DBClusterIdentifier=cluster['DBClusterIdentifier']),
336 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
337 client.exceptions.InvalidDBClusterStateFault)
340@RDSCluster.action_registry.register('modify-db-cluster')
341class ModifyDbCluster(BaseAction):
342 """Modifies an RDS instance based on specified parameter
343 using ModifyDbInstance.
345 'Immediate" determines whether the modification is applied immediately
346 or not. If 'immediate' is not specified, default is false.
348 :example:
350 .. code-block:: yaml
352 policies:
353 - name: disable-db-cluster-deletion-protection
354 resource: rds-cluster
355 filters:
356 - DeletionProtection: true
357 - PubliclyAccessible: true
358 actions:
359 - type: modify-db-cluster
360 attributes:
361 CopyTagsToSnapshot: true
362 DeletionProtection: false
363 """
365 schema = type_schema(
366 'modify-db-cluster',
367 attributes={'type': 'object'},
368 required=('attributes',))
370 permissions = ('rds:ModifyDBCluster',)
371 shape = 'ModifyDBClusterMessage'
373 def validate(self):
374 attrs = dict(self.data['attributes'])
375 if 'DBClusterIdentifier' in attrs:
376 raise PolicyValidationError(
377 "Can't include DBClusterIdentifier in modify-db-cluster action")
378 attrs['DBClusterIdentifier'] = 'PolicyValidation'
379 return shape_validate(attrs, self.shape, 'rds')
381 def process(self, clusters):
382 client = local_session(self.manager.session_factory).client('rds')
383 for c in clusters:
384 client.modify_db_cluster(
385 DBClusterIdentifier=c['DBClusterIdentifier'],
386 **self.data['attributes'])
389class DescribeClusterSnapshot(DescribeSource):
391 def get_resources(self, resource_ids, cache=True):
392 client = local_session(self.manager.session_factory).client('rds')
393 return self.manager.retry(
394 client.describe_db_cluster_snapshots,
395 Filters=[{
396 'Name': 'db-cluster-snapshot-id',
397 'Values': resource_ids}]).get('DBClusterSnapshots', ())
399 def augment(self, resources):
400 for r in resources:
401 r['Tags'] = r.pop('TagList', ())
402 return resources
405class ConfigClusterSnapshot(ConfigSource):
407 def load_resource(self, item):
409 resource = super(ConfigClusterSnapshot, self).load_resource(item)
410 # db cluster snapshots are particularly mangled on keys
411 for k, v in list(resource.items()):
412 if k.startswith('Dbcl'):
413 resource.pop(k)
414 k = 'DBCl%s' % k[4:]
415 resource[k] = v
416 elif k.startswith('Iamd'):
417 resource.pop(k)
418 k = 'IAMD%s' % k[4:]
419 resource[k] = v
420 return resource
423@resources.register('rds-cluster-snapshot')
424class RDSClusterSnapshot(QueryResourceManager):
425 """Resource manager for RDS cluster snapshots.
426 """
428 class resource_type(TypeInfo):
429 service = 'rds'
430 arn_type = 'cluster-snapshot'
431 arn_separator = ':'
432 arn = 'DBClusterSnapshotArn'
433 enum_spec = (
434 'describe_db_cluster_snapshots', 'DBClusterSnapshots', None)
435 name = id = 'DBClusterSnapshotIdentifier'
436 date = 'SnapshotCreateTime'
437 universal_taggable = object()
438 config_type = 'AWS::RDS::DBClusterSnapshot'
439 permissions_enum = ('rds:DescribeDBClusterSnapshots',)
441 source_mapping = {
442 'describe': DescribeClusterSnapshot,
443 'config': ConfigClusterSnapshot
444 }
447@RDSClusterSnapshot.filter_registry.register('cross-account')
448class CrossAccountSnapshot(CrossAccountAccessFilter):
450 permissions = ('rds:DescribeDBClusterSnapshotAttributes',)
451 attributes_key = 'c7n:attributes'
452 annotation_key = 'c7n:CrossAccountViolations'
454 def process(self, resources, event=None):
455 self.accounts = self.get_accounts()
456 results = []
457 with self.executor_factory(max_workers=2) as w:
458 futures = []
459 for resource_set in chunks(resources, 20):
460 futures.append(w.submit(
461 self.process_resource_set, resource_set))
462 for f in as_completed(futures):
463 results.extend(f.result())
464 return results
466 def process_resource_set(self, resource_set):
467 client = local_session(self.manager.session_factory).client('rds')
468 results = []
469 for r in resource_set:
470 attrs = {t['AttributeName']: t['AttributeValues']
471 for t in self.manager.retry(
472 client.describe_db_cluster_snapshot_attributes,
473 DBClusterSnapshotIdentifier=r['DBClusterSnapshotIdentifier'])[
474 'DBClusterSnapshotAttributesResult']['DBClusterSnapshotAttributes']}
475 r[self.attributes_key] = attrs
476 shared_accounts = set(attrs.get('restore', []))
477 delta_accounts = shared_accounts.difference(self.accounts)
478 if delta_accounts:
479 r[self.annotation_key] = list(delta_accounts)
480 results.append(r)
481 return results
484@RDSClusterSnapshot.filter_registry.register('age')
485class RDSSnapshotAge(AgeFilter):
486 """Filters rds cluster snapshots based on age (in days)
488 :example:
490 .. code-block:: yaml
492 policies:
493 - name: rds-cluster-snapshots-expired
494 resource: rds-cluster-snapshot
495 filters:
496 - type: age
497 days: 30
498 op: gt
499 """
501 schema = type_schema(
502 'age', days={'type': 'number'},
503 op={'$ref': '#/definitions/filters_common/comparison_operators'})
505 date_attribute = 'SnapshotCreateTime'
508@RDSClusterSnapshot.action_registry.register('set-permissions')
509class SetPermissions(rds.SetPermissions):
510 """Set permissions for copying or restoring an RDS cluster snapshot
512 Use the 'add' and 'remove' parameters to control which accounts to
513 add or remove, respectively. The default is to remove any
514 permissions granted to other AWS accounts.
516 Use `remove: matched` in combination with the `cross-account` filter
517 for more flexible removal options such as preserving access for
518 a set of whitelisted accounts:
520 :example:
522 .. code-block:: yaml
524 policies:
525 - name: rds-cluster-snapshot-prune-permissions
526 resource: rds-cluster-snapshot
527 filters:
528 - type: cross-account
529 whitelist:
530 - '112233445566'
531 actions:
532 - type: set-permissions
533 remove: matched
534 """
535 permissions = ('rds:ModifyDBClusterSnapshotAttribute',)
537 def process_snapshot(self, client, snapshot):
538 add_accounts = self.data.get('add', [])
539 remove_accounts = self.data.get('remove', [])
541 if not (add_accounts or remove_accounts):
542 if CrossAccountSnapshot.attributes_key not in snapshot:
543 attrs = {
544 t['AttributeName']: t['AttributeValues']
545 for t in self.manager.retry(
546 client.describe_db_cluster_snapshot_attributes,
547 DBClusterSnapshotIdentifier=snapshot['DBClusterSnapshotIdentifier']
548 )['DBClusterSnapshotAttributesResult']['DBClusterSnapshotAttributes']
549 }
550 snapshot[CrossAccountSnapshot.attributes_key] = attrs
551 remove_accounts = snapshot[CrossAccountSnapshot.attributes_key].get('restore', [])
552 elif remove_accounts == 'matched':
553 remove_accounts = snapshot.get(CrossAccountSnapshot.annotation_key, [])
555 if add_accounts or remove_accounts:
556 client.modify_db_cluster_snapshot_attribute(
557 DBClusterSnapshotIdentifier=snapshot['DBClusterSnapshotIdentifier'],
558 AttributeName='restore',
559 ValuesToRemove=remove_accounts,
560 ValuesToAdd=add_accounts)
563@RDSClusterSnapshot.action_registry.register('delete')
564class RDSClusterSnapshotDelete(BaseAction):
565 """Action to delete rds cluster snapshots
567 To prevent unwanted deletion of rds cluster snapshots, it is recommended
568 to apply a filter to the rule
570 :example:
572 .. code-block:: yaml
574 policies:
575 - name: rds-cluster-snapshots-expired-delete
576 resource: rds-cluster-snapshot
577 filters:
578 - type: age
579 days: 30
580 op: gt
581 actions:
582 - delete
583 """
585 schema = type_schema('delete')
586 permissions = ('rds:DeleteDBClusterSnapshot',)
588 def process(self, snapshots):
589 self.log.info("Deleting %d RDS cluster snapshots", len(snapshots))
590 client = local_session(self.manager.session_factory).client('rds')
591 error = None
592 with self.executor_factory(max_workers=2) as w:
593 futures = []
594 for snapshot_set in chunks(reversed(snapshots), size=50):
595 futures.append(
596 w.submit(self.process_snapshot_set, client, snapshot_set))
597 for f in as_completed(futures):
598 if f.exception():
599 error = f.exception()
600 self.log.error(
601 "Exception deleting snapshot set \n %s",
602 f.exception())
603 if error:
604 raise error
605 return snapshots
607 def process_snapshot_set(self, client, snapshots_set):
608 for s in snapshots_set:
609 try:
610 client.delete_db_cluster_snapshot(
611 DBClusterSnapshotIdentifier=s['DBClusterSnapshotIdentifier'])
612 except (client.exceptions.DBSnapshotNotFoundFault,
613 client.exceptions.InvalidDBSnapshotStateFault):
614 continue
617RDSCluster.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter)
620@RDSCluster.filter_registry.register('consecutive-snapshots')
621class ConsecutiveSnapshots(Filter):
622 """Returns RDS clusters where number of consective daily snapshots is equal to/or greater
623 than n days.
625 :example:
627 .. code-block:: yaml
629 policies:
630 - name: rdscluster-daily-snapshot-count
631 resource: rds-cluster
632 filters:
633 - type: consecutive-snapshots
634 days: 7
635 """
636 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1},
637 required=['days'])
638 permissions = ('rds:DescribeDBClusterSnapshots', 'rds:DescribeDBClusters')
639 annotation = 'c7n:DBClusterSnapshots'
641 def process_resource_set(self, client, resources):
642 rds_clusters = [r['DBClusterIdentifier'] for r in resources]
643 paginator = client.get_paginator('describe_db_cluster_snapshots')
644 paginator.PAGE_ITERATOR_CLS = RetryPageIterator
645 cluster_snapshots = paginator.paginate(Filters=[{'Name': 'db-cluster-id',
646 'Values': rds_clusters}]).build_full_result().get('DBClusterSnapshots', [])
648 cluster_map = {}
649 for snapshot in cluster_snapshots:
650 cluster_map.setdefault(snapshot['DBClusterIdentifier'], []).append(snapshot)
651 for r in resources:
652 r[self.annotation] = cluster_map.get(r['DBClusterIdentifier'], [])
654 def process(self, resources, event=None):
655 client = local_session(self.manager.session_factory).client('rds')
656 results = []
657 retention = self.data.get('days')
658 utcnow = datetime.utcnow()
659 expected_dates = set()
660 for days in range(1, retention + 1):
661 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d'))
663 for resource_set in chunks(
664 [r for r in resources if self.annotation not in r], 50):
665 self.process_resource_set(client, resource_set)
667 for r in resources:
668 snapshot_dates = set()
669 for snapshot in r[self.annotation]:
670 if snapshot['Status'] == 'available':
671 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d'))
672 if expected_dates.issubset(snapshot_dates):
673 results.append(r)
674 return results
677@RDSCluster.filter_registry.register('db-cluster-parameter')
678class ClusterParameterFilter(ParameterFilter):
679 """
680 Applies value type filter on set db cluster parameter values.
682 :example:
684 .. code-block:: yaml
686 policies:
687 - name: rdscluster-pg
688 resource: rds-cluster
689 filters:
690 - type: db-cluster-parameter
691 key: someparam
692 op: eq
693 value: someval
694 """
695 schema = type_schema('db-cluster-parameter', rinherit=ValueFilter.schema)
696 schema_alias = False
697 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters',)
698 policy_annotation = 'c7n:MatchedDBClusterParameter'
699 param_group_attribute = 'DBClusterParameterGroup'
701 def _get_param_list(self, pg):
702 client = local_session(self.manager.session_factory).client('rds')
703 paginator = client.get_paginator('describe_db_cluster_parameters')
704 param_list = list(itertools.chain(*[p['Parameters']
705 for p in paginator.paginate(DBClusterParameterGroupName=pg)]))
706 return param_list
708 def process(self, resources, event=None):
709 results = []
710 parameter_group_list = {db.get(self.param_group_attribute) for db in resources}
711 paramcache = self.handle_paramgroup_cache(parameter_group_list)
712 for resource in resources:
713 pg_values = paramcache[resource['DBClusterParameterGroup']]
714 if self.match(pg_values):
715 resource.setdefault(self.policy_annotation, []).append(
716 self.data.get('key'))
717 results.append(resource)
718 return results
721@RDSCluster.filter_registry.register('pending-maintenance')
722class PendingMaintenance(Filter):
723 """
724 Scan DB Clusters for those with pending maintenance
726 :example:
728 .. code-block:: yaml
730 policies:
731 - name: rds-cluster-pending-maintenance
732 resource: rds-cluster
733 filters:
734 - pending-maintenance
735 """
737 schema = type_schema('pending-maintenance')
738 permissions = ('rds:DescribePendingMaintenanceActions',)
740 def process(self, resources, event=None):
741 client = local_session(self.manager.session_factory).client('rds')
743 results = []
744 pending_maintenance = set()
745 paginator = client.get_paginator('describe_pending_maintenance_actions')
746 for page in paginator.paginate():
747 pending_maintenance.update(
748 {action['ResourceIdentifier'] for action in page['PendingMaintenanceActions']}
749 )
751 for r in resources:
752 if r['DBClusterArn'] in pending_maintenance:
753 results.append(r)
755 return results