Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/rds.py: 40%
927 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4RDS Resource Manager
5====================
7Example Policies
8----------------
10Find rds instances that are publicly available
12.. code-block:: yaml
14 policies:
15 - name: rds-public
16 resource: rds
17 filters:
18 - PubliclyAccessible: true
20Find rds instances that are not encrypted
22.. code-block:: yaml
24 policies:
25 - name: rds-non-encrypted
26 resource: rds
27 filters:
28 - type: value
29 key: StorageEncrypted
30 value: true
31 op: ne
33"""
34import functools
35import itertools
36import logging
37import operator
38import re
39import datetime
41from datetime import timedelta
43from decimal import Decimal as D, ROUND_HALF_UP
45from c7n.vendored.distutils.version import LooseVersion
46from botocore.exceptions import ClientError
47from concurrent.futures import as_completed
49from c7n.actions import (
50 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction)
52from c7n.exceptions import PolicyValidationError
53from c7n.filters import (
54 CrossAccountAccessFilter, FilterRegistry, Filter, ValueFilter, AgeFilter)
55from c7n.filters.offhours import OffHour, OnHour
56from c7n.filters import related
57import c7n.filters.vpc as net_filters
58from c7n.manager import resources
59from c7n.query import (
60 QueryResourceManager, DescribeSource, ConfigSource, TypeInfo, RetryPageIterator)
61from c7n import deprecated, tags
62from c7n.tags import universal_augment
64from c7n.utils import (
65 local_session, type_schema, get_retry, chunks, snapshot_identifier,
66 merge_dict_list, filter_empty, jmespath_search)
67from c7n.resources.kms import ResourceKmsKeyAlias
68from c7n.resources.securityhub import PostFinding
69from c7n.filters.backup import ConsecutiveAwsBackupsFilter
71log = logging.getLogger('custodian.rds')
73filters = FilterRegistry('rds.filters')
74actions = ActionRegistry('rds.actions')
77class DescribeRDS(DescribeSource):
79 def augment(self, dbs):
80 for d in dbs:
81 d['Tags'] = d.pop('TagList', ())
82 return dbs
85class ConfigRDS(ConfigSource):
87 def load_resource(self, item):
88 resource = super().load_resource(item)
89 for k in list(resource.keys()):
90 if k.startswith('Db'):
91 resource["DB%s" % k[2:]] = resource[k]
92 return resource
95@resources.register('rds')
96class RDS(QueryResourceManager):
97 """Resource manager for RDS DB instances.
98 """
100 class resource_type(TypeInfo):
101 service = 'rds'
102 arn_type = 'db'
103 arn_separator = ':'
104 enum_spec = ('describe_db_instances', 'DBInstances', None)
105 id = 'DBInstanceIdentifier'
106 config_id = 'DbiResourceId'
107 name = 'Endpoint.Address'
108 filter_name = 'DBInstanceIdentifier'
109 filter_type = 'scalar'
110 date = 'InstanceCreateTime'
111 dimension = 'DBInstanceIdentifier'
112 cfn_type = config_type = 'AWS::RDS::DBInstance'
113 arn = 'DBInstanceArn'
114 universal_taggable = True
115 default_report_fields = (
116 'DBInstanceIdentifier',
117 'DBName',
118 'Engine',
119 'EngineVersion',
120 'MultiAZ',
121 'AllocatedStorage',
122 'StorageEncrypted',
123 'PubliclyAccessible',
124 'InstanceCreateTime',
125 )
126 permissions_enum = ('rds:DescribeDBInstances',)
128 filter_registry = filters
129 action_registry = actions
131 def resources(self, query=None):
132 if query is None and 'query' in self.data:
133 query = merge_dict_list(self.data['query'])
134 elif query is None:
135 query = {}
136 return super(RDS, self).resources(query=query)
138 source_mapping = {
139 'describe': DescribeRDS,
140 'config': ConfigRDS
141 }
144def _db_instance_eligible_for_backup(resource):
145 db_instance_id = resource['DBInstanceIdentifier']
147 # Database instance is not in available state
148 if resource.get('DBInstanceStatus', '') != 'available':
149 log.debug(
150 "DB instance %s is not in available state",
151 db_instance_id)
152 return False
153 # The specified DB Instance is a member of a cluster and its
154 # backup retention should not be modified directly. Instead,
155 # modify the backup retention of the cluster using the
156 # ModifyDbCluster API
157 if resource.get('DBClusterIdentifier', ''):
158 log.debug(
159 "DB instance %s is a cluster member",
160 db_instance_id)
161 return False
162 # DB Backups not supported on a read replica for engine postgres
163 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and
164 resource.get('Engine', '') == 'postgres'):
165 log.debug(
166 "DB instance %s is a postgres read-replica",
167 db_instance_id)
168 return False
169 # DB Backups not supported on a read replica running a mysql
170 # version before 5.6
171 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and
172 resource.get('Engine', '') == 'mysql'):
173 engine_version = resource.get('EngineVersion', '')
174 # Assume "<major>.<minor>.<whatever>"
175 match = re.match(r'(?P<major>\d+)\.(?P<minor>\d+)\..*', engine_version)
176 if (match and int(match.group('major')) < 5 or
177 (int(match.group('major')) == 5 and int(match.group('minor')) < 6)):
178 log.debug(
179 "DB instance %s is a version %s mysql read-replica",
180 db_instance_id,
181 engine_version)
182 return False
183 return True
186def _db_instance_eligible_for_final_snapshot(resource):
187 status = resource.get('DBInstanceStatus', '')
188 # If the DB instance you are deleting has a status of "Creating,"
189 # you will not be able to have a final DB snapshot taken
190 # If the DB instance is in a failure state with a status of "failed,"
191 # "incompatible-restore," or "incompatible-network," you can only delete
192 # the instance when the SkipFinalSnapshot parameter is set to "true."
193 eligible_for_final_snapshot = True
194 if status in ['creating', 'failed', 'incompatible-restore', 'incompatible-network']:
195 eligible_for_final_snapshot = False
197 # FinalDBSnapshotIdentifier can not be specified when deleting a
198 # replica instance
199 if resource.get('ReadReplicaSourceDBInstanceIdentifier', ''):
200 eligible_for_final_snapshot = False
202 # if it's a rds-cluster, don't try to run the rds instance snapshot api call
203 if resource.get('DBClusterIdentifier', False):
204 eligible_for_final_snapshot = False
206 if not eligible_for_final_snapshot:
207 log.debug('DB instance is not eligible for a snapshot:/n %s', resource)
208 return eligible_for_final_snapshot
211def _get_available_engine_upgrades(client, major=False):
212 """Returns all extant rds engine upgrades.
214 As a nested mapping of engine type to known versions
215 and their upgrades.
217 Defaults to minor upgrades, but configurable to major.
219 Example::
221 >>> _get_available_engine_upgrades(client)
222 {
223 'oracle-se2': {'12.1.0.2.v2': '12.1.0.2.v5',
224 '12.1.0.2.v3': '12.1.0.2.v5'},
225 'postgres': {'9.3.1': '9.3.14',
226 '9.3.10': '9.3.14',
227 '9.3.12': '9.3.14',
228 '9.3.2': '9.3.14'}
229 }
230 """
231 results = {}
232 paginator = client.get_paginator('describe_db_engine_versions')
233 for page in paginator.paginate():
234 engine_versions = page['DBEngineVersions']
235 for v in engine_versions:
236 if v['Engine'] not in results:
237 results[v['Engine']] = {}
238 if 'ValidUpgradeTarget' not in v or len(v['ValidUpgradeTarget']) == 0:
239 continue
240 for t in v['ValidUpgradeTarget']:
241 if not major and t['IsMajorVersionUpgrade']:
242 continue
243 if LooseVersion(t['EngineVersion']) > LooseVersion(
244 results[v['Engine']].get(v['EngineVersion'], '0.0.0')):
245 results[v['Engine']][v['EngineVersion']] = t['EngineVersion']
246 return results
249filters.register('offhour', OffHour)
250filters.register('onhour', OnHour)
253@filters.register('default-vpc')
254class DefaultVpc(net_filters.DefaultVpcBase):
255 """ Matches if an rds database is in the default vpc
257 :example:
259 .. code-block:: yaml
261 policies:
262 - name: default-vpc-rds
263 resource: rds
264 filters:
265 - type: default-vpc
266 """
267 schema = type_schema('default-vpc')
269 def __call__(self, rdb):
270 return self.match(rdb['DBSubnetGroup']['VpcId'])
273@filters.register('security-group')
274class SecurityGroupFilter(net_filters.SecurityGroupFilter):
276 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId"
279@filters.register('subnet')
280class SubnetFilter(net_filters.SubnetFilter):
282 RelatedIdsExpression = "DBSubnetGroup.Subnets[].SubnetIdentifier"
285@filters.register('vpc')
286class VpcFilter(net_filters.VpcFilter):
288 RelatedIdsExpression = "DBSubnetGroup.VpcId"
291filters.register('network-location', net_filters.NetworkLocation)
294@filters.register('kms-alias')
295class KmsKeyAlias(ResourceKmsKeyAlias):
297 def process(self, dbs, event=None):
298 return self.get_matching_aliases(dbs)
301@actions.register('auto-patch')
302class AutoPatch(BaseAction):
303 """Toggle AutoMinorUpgrade flag on RDS instance
305 'window' parameter needs to be in the format 'ddd:hh:mm-ddd:hh:mm' and
306 have at least 30 minutes between start & end time.
307 If 'window' is not specified, AWS will assign a random maintenance window
308 to each instance selected.
310 :example:
312 .. code-block:: yaml
314 policies:
315 - name: enable-rds-autopatch
316 resource: rds
317 filters:
318 - AutoMinorVersionUpgrade: false
319 actions:
320 - type: auto-patch
321 minor: true
322 window: Mon:23:00-Tue:01:00
323 """
325 schema = type_schema(
326 'auto-patch',
327 minor={'type': 'boolean'}, window={'type': 'string'})
328 permissions = ('rds:ModifyDBInstance',)
330 def process(self, dbs):
331 client = local_session(
332 self.manager.session_factory).client('rds')
334 params = {'AutoMinorVersionUpgrade': self.data.get('minor', True)}
335 if self.data.get('window'):
336 params['PreferredMaintenanceWindow'] = self.data['window']
338 for db in dbs:
339 client.modify_db_instance(
340 DBInstanceIdentifier=db['DBInstanceIdentifier'],
341 **params)
344@filters.register('upgrade-available')
345class UpgradeAvailable(Filter):
346 """ Scan DB instances for available engine upgrades
348 This will pull DB instances & check their specific engine for any
349 engine version with higher release numbers than the current one
351 This will also annotate the rds instance with 'target_engine' which is
352 the most recent version of the engine available
354 :example:
356 .. code-block:: yaml
358 policies:
359 - name: rds-upgrade-available
360 resource: rds
361 filters:
362 - type: upgrade-available
363 major: False
365 """
367 schema = type_schema('upgrade-available',
368 major={'type': 'boolean'},
369 value={'type': 'boolean'})
370 permissions = ('rds:DescribeDBEngineVersions',)
372 def process(self, resources, event=None):
373 client = local_session(self.manager.session_factory).client('rds')
374 check_upgrade_extant = self.data.get('value', True)
375 check_major = self.data.get('major', False)
376 engine_upgrades = _get_available_engine_upgrades(
377 client, major=check_major)
378 results = []
380 for r in resources:
381 target_upgrade = engine_upgrades.get(
382 r['Engine'], {}).get(r['EngineVersion'])
383 if target_upgrade is None:
384 if check_upgrade_extant is False:
385 results.append(r)
386 continue
387 r['c7n-rds-engine-upgrade'] = target_upgrade
388 results.append(r)
389 return results
392@actions.register('upgrade')
393class UpgradeMinor(BaseAction):
394 """Upgrades a RDS instance to the latest major/minor version available
396 Use of the 'immediate' flag (default False) will automatically upgrade
397 the RDS engine disregarding the existing maintenance window.
399 :example:
401 .. code-block:: yaml
403 policies:
404 - name: upgrade-rds-minor
405 resource: rds
406 actions:
407 - type: upgrade
408 major: False
409 immediate: False
411 """
413 schema = type_schema(
414 'upgrade',
415 major={'type': 'boolean'},
416 immediate={'type': 'boolean'})
417 permissions = ('rds:ModifyDBInstance',)
419 def process(self, resources):
420 client = local_session(self.manager.session_factory).client('rds')
421 engine_upgrades = None
422 for r in resources:
423 if 'EngineVersion' in r['PendingModifiedValues']:
424 # Upgrade has already been scheduled
425 continue
426 if 'c7n-rds-engine-upgrade' not in r:
427 if engine_upgrades is None:
428 engine_upgrades = _get_available_engine_upgrades(
429 client, major=self.data.get('major', False))
430 target = engine_upgrades.get(
431 r['Engine'], {}).get(r['EngineVersion'])
432 if target is None:
433 log.debug(
434 "implicit filter no upgrade on %s",
435 r['DBInstanceIdentifier'])
436 continue
437 r['c7n-rds-engine-upgrade'] = target
438 client.modify_db_instance(
439 DBInstanceIdentifier=r['DBInstanceIdentifier'],
440 EngineVersion=r['c7n-rds-engine-upgrade'],
441 ApplyImmediately=self.data.get('immediate', False))
444@actions.register('tag-trim')
445class TagTrim(tags.TagTrim):
447 permissions = ('rds:RemoveTagsFromResource',)
449 def process_tag_removal(self, client, resource, candidates):
450 client.remove_tags_from_resource(ResourceName=resource['DBInstanceArn'], TagKeys=candidates)
453START_STOP_ELIGIBLE_ENGINES = {
454 'postgres', 'sqlserver-ee',
455 'oracle-se2', 'mariadb', 'oracle-ee',
456 'sqlserver-ex', 'sqlserver-se', 'oracle-se',
457 'mysql', 'oracle-se1', 'sqlserver-web'}
460def _eligible_start_stop(db, state="available"):
461 # See conditions noted here
462 # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html
463 # Note that this doesn't really specify what happens for all the nosql engines
464 # that are available as rds engines.
465 if db.get('DBInstanceStatus') != state:
466 return False
468 if db.get('MultiAZ') and db['Engine'].startswith('sqlserver-'):
469 return False
471 if db['Engine'] not in START_STOP_ELIGIBLE_ENGINES:
472 return False
474 if db.get('ReadReplicaDBInstanceIdentifiers'):
475 return False
477 if db.get('ReadReplicaSourceDBInstanceIdentifier'):
478 return False
480 # TODO is SQL Server mirror is detectable.
481 return True
484@actions.register('stop')
485class Stop(BaseAction):
486 """Stop an rds instance.
488 https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html
489 """
491 schema = type_schema('stop')
493 permissions = ("rds:StopDBInstance",)
495 def process(self, resources):
496 client = local_session(self.manager.session_factory).client('rds')
497 for r in filter(_eligible_start_stop, resources):
498 try:
499 client.stop_db_instance(
500 DBInstanceIdentifier=r['DBInstanceIdentifier'])
501 except ClientError as e:
502 log.exception(
503 "Error stopping db instance:%s err:%s",
504 r['DBInstanceIdentifier'], e)
507@actions.register('start')
508class Start(BaseAction):
509 """Start an rds instance.
510 """
512 schema = type_schema('start')
514 permissions = ("rds:StartDBInstance",)
516 def process(self, resources):
517 client = local_session(self.manager.session_factory).client('rds')
518 start_filter = functools.partial(_eligible_start_stop, state='stopped')
519 for r in filter(start_filter, resources):
520 try:
521 client.start_db_instance(
522 DBInstanceIdentifier=r['DBInstanceIdentifier'])
523 except ClientError as e:
524 log.exception(
525 "Error starting db instance:%s err:%s",
526 r['DBInstanceIdentifier'], e)
529@actions.register('delete')
530class Delete(BaseAction):
531 """Deletes selected RDS instances
533 This will delete RDS instances. It is recommended to apply with a filter
534 to avoid deleting all RDS instances in the account.
536 :example:
538 .. code-block:: yaml
540 policies:
541 - name: rds-delete
542 resource: rds
543 filters:
544 - default-vpc
545 actions:
546 - type: delete
547 skip-snapshot: true
548 """
550 schema = type_schema('delete', **{
551 'skip-snapshot': {'type': 'boolean'},
552 'copy-restore-info': {'type': 'boolean'}
553 })
555 permissions = ('rds:DeleteDBInstance', 'rds:AddTagsToResource')
557 def validate(self):
558 if self.data.get('skip-snapshot', False) and self.data.get(
559 'copy-restore-info'):
560 raise PolicyValidationError(
561 "skip-snapshot cannot be specified with copy-restore-info on %s" % (
562 self.manager.data,))
563 return self
565 def process(self, dbs):
566 skip = self.data.get('skip-snapshot', False)
567 # Can't delete an instance in an aurora cluster, use a policy on the cluster
568 dbs = [r for r in dbs if not r.get('DBClusterIdentifier')]
569 # Concurrency feels like overkill here.
570 client = local_session(self.manager.session_factory).client('rds')
571 for db in dbs:
572 params = dict(
573 DBInstanceIdentifier=db['DBInstanceIdentifier'])
574 if skip or not _db_instance_eligible_for_final_snapshot(db):
575 params['SkipFinalSnapshot'] = True
576 else:
577 params['FinalDBSnapshotIdentifier'] = snapshot_identifier(
578 'Final', db['DBInstanceIdentifier'])
579 if self.data.get('copy-restore-info', False):
580 self.copy_restore_info(client, db)
581 if not db['CopyTagsToSnapshot']:
582 client.modify_db_instance(
583 DBInstanceIdentifier=db['DBInstanceIdentifier'],
584 CopyTagsToSnapshot=True)
585 self.log.info(
586 "Deleting rds: %s snapshot: %s",
587 db['DBInstanceIdentifier'],
588 params.get('FinalDBSnapshotIdentifier', False))
590 try:
591 client.delete_db_instance(**params)
592 except ClientError as e:
593 if e.response['Error']['Code'] == "InvalidDBInstanceState":
594 continue
595 raise
597 return dbs
599 def copy_restore_info(self, client, instance):
600 tags = []
601 tags.append({
602 'Key': 'VPCSecurityGroups',
603 'Value': ''.join([
604 g['VpcSecurityGroupId'] for g in instance['VpcSecurityGroups']
605 ])})
606 tags.append({
607 'Key': 'OptionGroupName',
608 'Value': instance['OptionGroupMemberships'][0]['OptionGroupName']})
609 tags.append({
610 'Key': 'ParameterGroupName',
611 'Value': instance['DBParameterGroups'][0]['DBParameterGroupName']})
612 tags.append({
613 'Key': 'InstanceClass',
614 'Value': instance['DBInstanceClass']})
615 tags.append({
616 'Key': 'StorageType',
617 'Value': instance['StorageType']})
618 tags.append({
619 'Key': 'MultiAZ',
620 'Value': str(instance['MultiAZ'])})
621 tags.append({
622 'Key': 'DBSubnetGroupName',
623 'Value': instance['DBSubnetGroup']['DBSubnetGroupName']})
624 client.add_tags_to_resource(
625 ResourceName=self.manager.generate_arn(
626 instance['DBInstanceIdentifier']),
627 Tags=tags)
630@actions.register('set-snapshot-copy-tags')
631class CopySnapshotTags(BaseAction):
632 """Enables copying tags from rds instance to snapshot
634 DEPRECATED - use modify-db instead with `CopyTagsToSnapshot`
636 :example:
638 .. code-block:: yaml
640 policies:
641 - name: enable-rds-snapshot-tags
642 resource: rds
643 filters:
644 - type: value
645 key: Engine
646 value: aurora
647 op: eq
648 actions:
649 - type: set-snapshot-copy-tags
650 enable: True
651 """
652 deprecations = (
653 deprecated.action("use modify-db instead with `CopyTagsToSnapshot`"),
654 )
656 schema = type_schema(
657 'set-snapshot-copy-tags',
658 enable={'type': 'boolean'})
659 permissions = ('rds:ModifyDBInstance',)
661 def process(self, resources):
662 error = None
663 with self.executor_factory(max_workers=2) as w:
664 futures = {}
665 client = local_session(self.manager.session_factory).client('rds')
666 resources = [r for r in resources
667 if r['CopyTagsToSnapshot'] != self.data.get('enable', True)]
668 for r in resources:
669 futures[w.submit(self.set_snapshot_tags, client, r)] = r
670 for f in as_completed(futures):
671 if f.exception():
672 error = f.exception()
673 self.log.error(
674 'error updating rds:%s CopyTagsToSnapshot \n %s',
675 futures[f]['DBInstanceIdentifier'], error)
676 if error:
677 raise error
678 return resources
680 def set_snapshot_tags(self, client, r):
681 self.manager.retry(
682 client.modify_db_instance,
683 DBInstanceIdentifier=r['DBInstanceIdentifier'],
684 CopyTagsToSnapshot=self.data.get('enable', True))
687@RDS.action_registry.register('post-finding')
688class DbInstanceFinding(PostFinding):
690 resource_type = 'AwsRdsDbInstance'
692 def format_resource(self, r):
694 fields = [
695 'AssociatedRoles', 'CACertificateIdentifier', 'DBClusterIdentifier',
696 'DBInstanceIdentifier', 'DBInstanceClass', 'DbInstancePort', 'DbiResourceId',
697 'DBName', 'DeletionProtection', 'Endpoint', 'Engine', 'EngineVersion',
698 'IAMDatabaseAuthenticationEnabled', 'InstanceCreateTime', 'KmsKeyId',
699 'PubliclyAccessible', 'StorageEncrypted',
700 'TdeCredentialArn', 'VpcSecurityGroups', 'MultiAz', 'EnhancedMonitoringResourceArn',
701 'DbInstanceStatus', 'MasterUsername',
702 'AllocatedStorage', 'PreferredBackupWindow', 'BackupRetentionPeriod',
703 'DbSecurityGroups', 'DbParameterGroups',
704 'AvailabilityZone', 'DbSubnetGroup', 'PreferredMaintenanceWindow',
705 'PendingModifiedValues', 'LatestRestorableTime',
706 'AutoMinorVersionUpgrade', 'ReadReplicaSourceDBInstanceIdentifier',
707 'ReadReplicaDBInstanceIdentifiers',
708 'ReadReplicaDBClusterIdentifiers', 'LicenseModel', 'Iops', 'OptionGroupMemberships',
709 'CharacterSetName',
710 'SecondaryAvailabilityZone', 'StatusInfos', 'StorageType', 'DomainMemberships',
711 'CopyTagsToSnapshot',
712 'MonitoringInterval', 'MonitoringRoleArn', 'PromotionTier', 'Timezone',
713 'PerformanceInsightsEnabled',
714 'PerformanceInsightsKmsKeyId', 'PerformanceInsightsRetentionPeriod',
715 'EnabledCloudWatchLogsExports',
716 'ProcessorFeatures', 'ListenerEndpoint', 'MaxAllocatedStorage'
717 ]
718 details = {}
719 for f in fields:
720 if r.get(f):
721 value = r[f]
722 if isinstance(r[f], datetime.datetime):
723 value = r[f].isoformat()
724 details.setdefault(f, value)
726 db_instance = {
727 'Type': self.resource_type,
728 'Id': r['DBInstanceArn'],
729 'Region': self.manager.config.region,
730 'Tags': {t['Key']: t['Value'] for t in r.get('Tags', [])},
731 'Details': {self.resource_type: filter_empty(details)},
732 }
733 db_instance = filter_empty(db_instance)
734 return db_instance
737@actions.register('snapshot')
738class Snapshot(BaseAction):
739 """Creates a manual snapshot of a RDS instance
741 :example:
743 .. code-block:: yaml
745 policies:
746 - name: rds-snapshot
747 resource: rds
748 actions:
749 - snapshot
750 """
752 schema = type_schema('snapshot')
753 permissions = ('rds:CreateDBSnapshot',)
755 def process(self, dbs):
756 with self.executor_factory(max_workers=3) as w:
757 futures = []
758 for db in dbs:
759 futures.append(w.submit(
760 self.process_rds_snapshot,
761 db))
762 for f in as_completed(futures):
763 if f.exception():
764 self.log.error(
765 "Exception creating rds snapshot \n %s",
766 f.exception())
767 return dbs
769 def process_rds_snapshot(self, resource):
770 if not _db_instance_eligible_for_backup(resource):
771 return
773 c = local_session(self.manager.session_factory).client('rds')
774 c.create_db_snapshot(
775 DBSnapshotIdentifier=snapshot_identifier(
776 self.data.get('snapshot-prefix', 'Backup'),
777 resource['DBInstanceIdentifier']),
778 DBInstanceIdentifier=resource['DBInstanceIdentifier'])
781@actions.register('resize')
782class ResizeInstance(BaseAction):
783 """Change the allocated storage of an rds instance.
785 :example:
787 This will find databases using over 85% of their allocated
788 storage, and resize them to have an additional 30% storage
789 the resize here is async during the next maintenance.
791 .. code-block:: yaml
793 policies:
794 - name: rds-resize-up
795 resource: rds
796 filters:
797 - type: metrics
798 name: FreeStorageSpace
799 percent-attr: AllocatedStorage
800 attr-multiplier: 1073741824
801 value: 90
802 op: greater-than
803 actions:
804 - type: resize
805 percent: 30
808 This will find databases using under 20% of their allocated
809 storage, and resize them to be 30% smaller, the resize here
810 is configured to be immediate.
812 .. code-block:: yaml
814 policies:
815 - name: rds-resize-down
816 resource: rds
817 filters:
818 - type: metrics
819 name: FreeStorageSpace
820 percent-attr: AllocatedStorage
821 attr-multiplier: 1073741824
822 value: 90
823 op: greater-than
824 actions:
825 - type: resize
826 percent: -30
827 immediate: true
828 """
829 schema = type_schema(
830 'resize',
831 percent={'type': 'number'},
832 immediate={'type': 'boolean'})
834 permissions = ('rds:ModifyDBInstance',)
836 def process(self, resources):
837 c = local_session(self.manager.session_factory).client('rds')
838 for r in resources:
839 old_val = D(r['AllocatedStorage'])
840 _100 = D(100)
841 new_val = ((_100 + D(self.data['percent'])) / _100) * old_val
842 rounded = int(new_val.quantize(D('0'), ROUND_HALF_UP))
843 c.modify_db_instance(
844 DBInstanceIdentifier=r['DBInstanceIdentifier'],
845 AllocatedStorage=rounded,
846 ApplyImmediately=self.data.get('immediate', False))
849@actions.register('retention')
850class RetentionWindow(BaseAction):
851 """
852 Sets the 'BackupRetentionPeriod' value for automated snapshots,
853 enforce (min, max, exact) sets retention days occordingly.
854 :example:
856 .. code-block:: yaml
858 policies:
859 - name: rds-snapshot-retention
860 resource: rds
861 filters:
862 - type: value
863 key: BackupRetentionPeriod
864 value: 7
865 op: lt
866 actions:
867 - type: retention
868 days: 7
869 copy-tags: true
870 enforce: exact
871 """
873 date_attribute = "BackupRetentionPeriod"
874 schema = type_schema(
875 'retention', **{'days': {'type': 'number'},
876 'copy-tags': {'type': 'boolean'},
877 'enforce': {'type': 'string', 'enum': [
878 'min', 'max', 'exact']}})
879 permissions = ('rds:ModifyDBInstance',)
881 def process(self, dbs):
882 with self.executor_factory(max_workers=3) as w:
883 futures = []
884 for db in dbs:
885 futures.append(w.submit(
886 self.process_snapshot_retention,
887 db))
888 for f in as_completed(futures):
889 if f.exception():
890 self.log.error(
891 "Exception setting rds retention \n %s",
892 f.exception())
893 return dbs
895 def process_snapshot_retention(self, resource):
896 current_retention = int(resource.get('BackupRetentionPeriod', 0))
897 current_copy_tags = resource['CopyTagsToSnapshot']
898 new_retention = self.data['days']
899 new_copy_tags = self.data.get('copy-tags', True)
900 retention_type = self.data.get('enforce', 'min').lower()
902 if ((retention_type == 'min' or
903 current_copy_tags != new_copy_tags) and
904 _db_instance_eligible_for_backup(resource)):
905 self.set_retention_window(
906 resource,
907 max(current_retention, new_retention),
908 new_copy_tags)
909 return resource
911 if ((retention_type == 'max' or
912 current_copy_tags != new_copy_tags) and
913 _db_instance_eligible_for_backup(resource)):
914 self.set_retention_window(
915 resource,
916 min(current_retention, new_retention),
917 new_copy_tags)
918 return resource
920 if ((retention_type == 'exact' or
921 current_copy_tags != new_copy_tags) and
922 _db_instance_eligible_for_backup(resource)):
923 self.set_retention_window(resource, new_retention, new_copy_tags)
924 return resource
926 def set_retention_window(self, resource, retention, copy_tags):
927 c = local_session(self.manager.session_factory).client('rds')
928 c.modify_db_instance(
929 DBInstanceIdentifier=resource['DBInstanceIdentifier'],
930 BackupRetentionPeriod=retention,
931 CopyTagsToSnapshot=copy_tags)
934@actions.register('set-public-access')
935class RDSSetPublicAvailability(BaseAction):
936 """
937 This action allows for toggling an RDS instance
938 'PubliclyAccessible' flag to true or false
940 :example:
942 .. code-block:: yaml
944 policies:
945 - name: disable-rds-public-accessibility
946 resource: rds
947 filters:
948 - PubliclyAccessible: true
949 actions:
950 - type: set-public-access
951 state: false
952 """
954 schema = type_schema(
955 "set-public-access",
956 state={'type': 'boolean'})
957 permissions = ('rds:ModifyDBInstance',)
959 def set_accessibility(self, r):
960 client = local_session(self.manager.session_factory).client('rds')
961 client.modify_db_instance(
962 DBInstanceIdentifier=r['DBInstanceIdentifier'],
963 PubliclyAccessible=self.data.get('state', False))
965 def process(self, rds):
966 with self.executor_factory(max_workers=2) as w:
967 futures = {w.submit(self.set_accessibility, r): r for r in rds}
968 for f in as_completed(futures):
969 if f.exception():
970 self.log.error(
971 "Exception setting public access on %s \n %s",
972 futures[f]['DBInstanceIdentifier'], f.exception())
973 return rds
976@resources.register('rds-subscription')
977class RDSSubscription(QueryResourceManager):
979 class resource_type(TypeInfo):
980 service = 'rds'
981 arn_type = 'es'
982 cfn_type = 'AWS::RDS::EventSubscription'
983 enum_spec = (
984 'describe_event_subscriptions', 'EventSubscriptionsList', None)
985 name = id = "CustSubscriptionId"
986 arn = 'EventSubscriptionArn'
987 date = "SubscriptionCreateTime"
988 permissions_enum = ('rds:DescribeEventSubscriptions',)
989 universal_taggable = object()
991 augment = universal_augment
994@RDSSubscription.action_registry.register('delete')
995class RDSSubscriptionDelete(BaseAction):
996 """Deletes a RDS snapshot resource
998 :example:
1000 .. code-block:: yaml
1002 policies:
1003 - name: rds-subscription-delete
1004 resource: rds-subscription
1005 filters:
1006 - type: value
1007 key: CustSubscriptionId
1008 value: xyz
1009 actions:
1010 - delete
1011 """
1013 schema = type_schema('delete')
1014 permissions = ('rds:DeleteEventSubscription',)
1016 def process(self, resources):
1017 client = local_session(self.manager.session_factory).client('rds')
1018 for r in resources:
1019 self.manager.retry(
1020 client.delete_event_subscription, SubscriptionName=r['CustSubscriptionId'],
1021 ignore_err_codes=('SubscriptionNotFoundFault',
1022 'InvalidEventSubscriptionStateFault'))
1025class DescribeRDSSnapshot(DescribeSource):
1027 def get_resources(self, ids, cache=True):
1028 super_get = super().get_resources
1029 return list(itertools.chain(*[super_get((i,)) for i in ids]))
1031 def augment(self, snaps):
1032 for s in snaps:
1033 s['Tags'] = s.pop('TagList', ())
1034 return snaps
1037@resources.register('rds-snapshot')
1038class RDSSnapshot(QueryResourceManager):
1039 """Resource manager for RDS DB snapshots.
1040 """
1042 class resource_type(TypeInfo):
1043 service = 'rds'
1044 arn_type = 'snapshot'
1045 arn_separator = ':'
1046 enum_spec = ('describe_db_snapshots', 'DBSnapshots', None)
1047 name = id = 'DBSnapshotIdentifier'
1048 date = 'SnapshotCreateTime'
1049 config_type = "AWS::RDS::DBSnapshot"
1050 filter_name = "DBSnapshotIdentifier"
1051 filter_type = "scalar"
1052 universal_taggable = True
1053 permissions_enum = ('rds:DescribeDBSnapshots',)
1055 source_mapping = {
1056 'describe': DescribeRDSSnapshot,
1057 'config': ConfigSource
1058 }
1061@RDSSnapshot.filter_registry.register('onhour')
1062class RDSSnapshotOnHour(OnHour):
1063 """Scheduled action on rds snapshot."""
1066@RDSSnapshot.filter_registry.register('instance')
1067class SnapshotInstance(related.RelatedResourceFilter):
1068 """Filter snapshots by their database attributes.
1070 :example:
1072 Find snapshots without an extant database
1074 .. code-block:: yaml
1076 policies:
1077 - name: rds-snapshot-orphan
1078 resource: aws.rds-snapshot
1079 filters:
1080 - type: instance
1081 value: 0
1082 value_type: resource_count
1083 """
1084 schema = type_schema(
1085 'instance', rinherit=ValueFilter.schema
1086 )
1088 RelatedResource = "c7n.resources.rds.RDS"
1089 RelatedIdsExpression = "DBInstanceIdentifier"
1090 FetchThreshold = 5
1093@RDSSnapshot.filter_registry.register('latest')
1094class LatestSnapshot(Filter):
1095 """Return the latest snapshot for each database.
1096 """
1097 schema = type_schema('latest', automatic={'type': 'boolean'})
1098 permissions = ('rds:DescribeDBSnapshots',)
1100 def process(self, resources, event=None):
1101 results = []
1102 if not self.data.get('automatic', True):
1103 resources = [r for r in resources if r['SnapshotType'] == 'manual']
1104 for db_identifier, snapshots in itertools.groupby(
1105 resources, operator.itemgetter('DBInstanceIdentifier')):
1106 results.append(
1107 sorted(snapshots,
1108 key=operator.itemgetter('SnapshotCreateTime'))[-1])
1109 return results
1112@RDSSnapshot.filter_registry.register('age')
1113class RDSSnapshotAge(AgeFilter):
1114 """Filters RDS snapshots based on age (in days)
1116 :example:
1118 .. code-block:: yaml
1120 policies:
1121 - name: rds-snapshot-expired
1122 resource: rds-snapshot
1123 filters:
1124 - type: age
1125 days: 28
1126 op: ge
1127 actions:
1128 - delete
1129 """
1131 schema = type_schema(
1132 'age', days={'type': 'number'},
1133 op={'$ref': '#/definitions/filters_common/comparison_operators'})
1135 date_attribute = 'SnapshotCreateTime'
1137 def get_resource_date(self, i):
1138 return i.get('SnapshotCreateTime')
1141@RDSSnapshot.action_registry.register('restore')
1142class RestoreInstance(BaseAction):
1143 """Restore an rds instance from a snapshot.
1145 Note this requires the snapshot or db deletion be taken
1146 with the `copy-restore-info` boolean flag set to true, as
1147 various instance metadata is stored on the snapshot as tags.
1149 additional parameters to restore db instance api call be overriden
1150 via `restore_options` settings. various modify db instance parameters
1151 can be specified via `modify_options` settings.
1152 """
1154 schema = type_schema(
1155 'restore',
1156 restore_options={'type': 'object'},
1157 modify_options={'type': 'object'})
1159 permissions = (
1160 'rds:ModifyDBInstance',
1161 'rds:ModifyDBParameterGroup',
1162 'rds:ModifyOptionGroup',
1163 'rds:RebootDBInstance',
1164 'rds:RestoreDBInstanceFromDBSnapshot')
1166 poll_period = 60
1167 restore_keys = {
1168 'VPCSecurityGroups', 'MultiAZ', 'DBSubnetGroupName',
1169 'InstanceClass', 'StorageType', 'ParameterGroupName',
1170 'OptionGroupName'}
1172 def validate(self):
1173 found = False
1174 for f in self.manager.iter_filters():
1175 if isinstance(f, LatestSnapshot):
1176 found = True
1177 if not found:
1178 # do we really need this...
1179 raise PolicyValidationError(
1180 "must filter by latest to use restore action %s" % (
1181 self.manager.data,))
1182 return self
1184 def process(self, resources):
1185 client = local_session(self.manager.session_factory).client('rds')
1186 # restore up to 10 in parallel, we have to wait on each.
1187 with self.executor_factory(
1188 max_workers=min(10, len(resources) or 1)) as w:
1189 futures = {}
1190 for r in resources:
1191 tags = {t['Key']: t['Value'] for t in r['Tags']}
1192 if not set(tags).issuperset(self.restore_keys):
1193 self.log.warning(
1194 "snapshot:%s missing restore tags",
1195 r['DBSnapshotIdentifier'])
1196 continue
1197 futures[w.submit(self.process_instance, client, r)] = r
1198 for f in as_completed(futures):
1199 r = futures[f]
1200 if f.exception():
1201 self.log.warning(
1202 "Error restoring db:%s from:%s error:\n%s",
1203 r['DBInstanceIdentifier'], r['DBSnapshotIdentifier'],
1204 f.exception())
1205 continue
1207 def process_instance(self, client, r):
1208 params, post_modify = self.get_restore_from_tags(r)
1209 self.manager.retry(
1210 client.restore_db_instance_from_db_snapshot, **params)
1211 waiter = client.get_waiter('db_instance_available')
1212 # wait up to 40m
1213 waiter.config.delay = self.poll_period
1214 waiter.wait(DBInstanceIdentifier=params['DBInstanceIdentifier'])
1215 self.manager.retry(
1216 client.modify_db_instance,
1217 DBInstanceIdentifier=params['DBInstanceIdentifier'],
1218 ApplyImmediately=True,
1219 **post_modify)
1220 self.manager.retry(
1221 client.reboot_db_instance,
1222 DBInstanceIdentifier=params['DBInstanceIdentifier'],
1223 ForceFailover=False)
1225 def get_restore_from_tags(self, snapshot):
1226 params, post_modify = {}, {}
1227 tags = {t['Key']: t['Value'] for t in snapshot['Tags']}
1229 params['DBInstanceIdentifier'] = snapshot['DBInstanceIdentifier']
1230 params['DBSnapshotIdentifier'] = snapshot['DBSnapshotIdentifier']
1231 params['MultiAZ'] = tags['MultiAZ'] == 'True' and True or False
1232 params['DBSubnetGroupName'] = tags['DBSubnetGroupName']
1233 params['DBInstanceClass'] = tags['InstanceClass']
1234 params['CopyTagsToSnapshot'] = True
1235 params['StorageType'] = tags['StorageType']
1236 params['OptionGroupName'] = tags['OptionGroupName']
1238 post_modify['DBParameterGroupName'] = tags['ParameterGroupName']
1239 post_modify['VpcSecurityGroupIds'] = tags['VPCSecurityGroups'].split(',')
1241 params['Tags'] = [
1242 {'Key': k, 'Value': v} for k, v in tags.items()
1243 if k not in self.restore_keys]
1245 params.update(self.data.get('restore_options', {}))
1246 post_modify.update(self.data.get('modify_options', {}))
1247 return params, post_modify
1250@RDSSnapshot.filter_registry.register('cross-account')
1251class CrossAccountAccess(CrossAccountAccessFilter):
1253 permissions = ('rds:DescribeDBSnapshotAttributes',)
1254 attributes_key = 'c7n:attributes'
1255 annotation_key = 'c7n:CrossAccountViolations'
1257 def process(self, resources, event=None):
1258 self.accounts = self.get_accounts()
1259 results = []
1260 with self.executor_factory(max_workers=2) as w:
1261 futures = []
1262 for resource_set in chunks(resources, 20):
1263 futures.append(w.submit(
1264 self.process_resource_set, resource_set))
1265 for f in as_completed(futures):
1266 if f.exception():
1267 self.log.error(
1268 "Exception checking cross account access\n %s" % (
1269 f.exception()))
1270 continue
1271 results.extend(f.result())
1272 return results
1274 def process_resource_set(self, resource_set):
1275 client = local_session(self.manager.session_factory).client('rds')
1276 results = []
1277 for r in resource_set:
1278 attrs = {t['AttributeName']: t['AttributeValues']
1279 for t in self.manager.retry(
1280 client.describe_db_snapshot_attributes,
1281 DBSnapshotIdentifier=r['DBSnapshotIdentifier'])[
1282 'DBSnapshotAttributesResult']['DBSnapshotAttributes']}
1283 r[self.attributes_key] = attrs
1284 shared_accounts = set(attrs.get('restore', []))
1285 delta_accounts = shared_accounts.difference(self.accounts)
1286 if delta_accounts:
1287 r[self.annotation_key] = list(delta_accounts)
1288 results.append(r)
1289 return results
1292@RDSSnapshot.action_registry.register('set-permissions')
1293class SetPermissions(BaseAction):
1294 """Set permissions for copying or restoring an RDS snapshot
1296 Use the 'add' and 'remove' parameters to control which accounts to
1297 add or remove, respectively. The default is to remove any
1298 permissions granted to other AWS accounts.
1300 Use `remove: matched` in combination with the `cross-account` filter
1301 for more flexible removal options such as preserving access for
1302 a set of whitelisted accounts:
1304 :example:
1306 .. code-block:: yaml
1308 policies:
1309 - name: rds-snapshot-remove-cross-account
1310 resource: rds-snapshot
1311 filters:
1312 - type: cross-account
1313 whitelist:
1314 - '112233445566'
1315 actions:
1316 - type: set-permissions
1317 remove: matched
1318 """
1319 schema = type_schema(
1320 'set-permissions',
1321 remove={'oneOf': [
1322 {'enum': ['matched']},
1323 {'type': 'array', 'items': {
1324 'oneOf': [
1325 {'type': 'string', 'minLength': 12, 'maxLength': 12},
1326 {'enum': ['all']},
1327 ],
1328 }}
1329 ]},
1330 add={
1331 'type': 'array', 'items': {
1332 'oneOf': [
1333 {'type': 'string', 'minLength': 12, 'maxLength': 12},
1334 {'enum': ['all']},
1335 ]
1336 }
1337 }
1338 )
1340 permissions = ('rds:ModifyDBSnapshotAttribute',)
1342 def validate(self):
1343 if self.data.get('remove') == 'matched':
1344 found = False
1345 for f in self.manager.iter_filters():
1346 if isinstance(f, CrossAccountAccessFilter):
1347 found = True
1348 break
1349 if not found:
1350 raise PolicyValidationError(
1351 "policy:%s filter:%s with matched requires cross-account filter" % (
1352 self.manager.ctx.policy.name, self.type))
1354 def process(self, snapshots):
1355 client = local_session(self.manager.session_factory).client('rds')
1356 for s in snapshots:
1357 self.process_snapshot(client, s)
1359 def process_snapshot(self, client, snapshot):
1360 add_accounts = self.data.get('add', [])
1361 remove_accounts = self.data.get('remove', [])
1363 if not (add_accounts or remove_accounts):
1364 if CrossAccountAccess.attributes_key not in snapshot:
1365 attrs = {
1366 t['AttributeName']: t['AttributeValues']
1367 for t in self.manager.retry(
1368 client.describe_db_snapshot_attributes,
1369 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier']
1370 )['DBSnapshotAttributesResult']['DBSnapshotAttributes']
1371 }
1372 snapshot[CrossAccountAccess.attributes_key] = attrs
1373 remove_accounts = snapshot[CrossAccountAccess.attributes_key].get('restore', [])
1374 elif remove_accounts == 'matched':
1375 remove_accounts = snapshot.get(CrossAccountAccess.annotation_key, [])
1377 if add_accounts or remove_accounts:
1378 client.modify_db_snapshot_attribute(
1379 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier'],
1380 AttributeName='restore',
1381 ValuesToRemove=remove_accounts,
1382 ValuesToAdd=add_accounts)
1385@RDSSnapshot.action_registry.register('region-copy')
1386class RegionCopySnapshot(BaseAction):
1387 """Copy a snapshot across regions.
1389 Note there is a max in flight for cross region rds snapshots
1390 of 5 per region. This action will attempt to retry automatically
1391 for an hr.
1393 Example::
1395 - name: copy-encrypted-snapshots
1396 description: |
1397 copy snapshots under 1 day old to dr region with kms
1398 resource: rds-snapshot
1399 region: us-east-1
1400 filters:
1401 - Status: available
1402 - type: value
1403 key: SnapshotCreateTime
1404 value_type: age
1405 value: 1
1406 op: less-than
1407 actions:
1408 - type: region-copy
1409 target_region: us-east-2
1410 target_key: arn:aws:kms:us-east-2:0000:key/cb291f53-c9cf61
1411 copy_tags: true
1412 tags:
1413 OriginRegion: us-east-1
1414 """
1416 schema = type_schema(
1417 'region-copy',
1418 target_region={'type': 'string'},
1419 target_key={'type': 'string'},
1420 copy_tags={'type': 'boolean'},
1421 tags={'type': 'object'},
1422 required=('target_region',))
1424 permissions = ('rds:CopyDBSnapshot',)
1425 min_delay = 120
1426 max_attempts = 30
1428 def validate(self):
1429 if self.data.get('target_region') and self.manager.data.get('mode'):
1430 raise PolicyValidationError(
1431 "cross region snapshot may require waiting for "
1432 "longer then lambda runtime allows %s" % (self.manager.data,))
1433 return self
1435 def process(self, resources):
1436 if self.data['target_region'] == self.manager.config.region:
1437 self.log.warning(
1438 "Source and destination region are the same, skipping copy")
1439 return
1440 for resource_set in chunks(resources, 20):
1441 self.process_resource_set(resource_set)
1443 def process_resource(self, target, key, tags, snapshot):
1444 p = {}
1445 if key:
1446 p['KmsKeyId'] = key
1447 p['TargetDBSnapshotIdentifier'] = snapshot[
1448 'DBSnapshotIdentifier'].replace(':', '-')
1449 p['SourceRegion'] = self.manager.config.region
1450 p['SourceDBSnapshotIdentifier'] = snapshot['DBSnapshotArn']
1452 if self.data.get('copy_tags', True):
1453 p['CopyTags'] = True
1454 if tags:
1455 p['Tags'] = tags
1457 retry = get_retry(
1458 ('SnapshotQuotaExceeded',),
1459 # TODO make this configurable, class defaults to 1hr
1460 min_delay=self.min_delay,
1461 max_attempts=self.max_attempts,
1462 log_retries=logging.DEBUG)
1463 try:
1464 result = retry(target.copy_db_snapshot, **p)
1465 except ClientError as e:
1466 if e.response['Error']['Code'] == 'DBSnapshotAlreadyExists':
1467 self.log.warning(
1468 "Snapshot %s already exists in target region",
1469 snapshot['DBSnapshotIdentifier'])
1470 return
1471 raise
1472 snapshot['c7n:CopiedSnapshot'] = result[
1473 'DBSnapshot']['DBSnapshotArn']
1475 def process_resource_set(self, resource_set):
1476 target_client = self.manager.session_factory(
1477 region=self.data['target_region']).client('rds')
1478 target_key = self.data.get('target_key')
1479 tags = [{'Key': k, 'Value': v} for k, v
1480 in self.data.get('tags', {}).items()]
1482 for snapshot_set in chunks(resource_set, 5):
1483 for r in snapshot_set:
1484 # If tags are supplied, copy tags are ignored, and
1485 # we need to augment the tag set with the original
1486 # resource tags to preserve the common case.
1487 rtags = tags and list(tags) or None
1488 if tags and self.data.get('copy_tags', True):
1489 rtags.extend(r['Tags'])
1490 self.process_resource(target_client, target_key, rtags, r)
1493@RDSSnapshot.action_registry.register('delete')
1494class RDSSnapshotDelete(BaseAction):
1495 """Deletes a RDS snapshot resource
1497 :example:
1499 .. code-block:: yaml
1501 policies:
1502 - name: rds-snapshot-delete-stale
1503 resource: rds-snapshot
1504 filters:
1505 - type: age
1506 days: 28
1507 op: ge
1508 actions:
1509 - delete
1510 """
1512 schema = type_schema('delete')
1513 permissions = ('rds:DeleteDBSnapshot',)
1515 def process(self, snapshots):
1516 snapshots = self.filter_resources(snapshots, 'SnapshotType', ('manual',))
1517 if not snapshots:
1518 return []
1519 log.info("Deleting %d rds snapshots", len(snapshots))
1520 with self.executor_factory(max_workers=3) as w:
1521 futures = []
1522 for snapshot_set in chunks(reversed(snapshots), size=50):
1523 futures.append(
1524 w.submit(self.process_snapshot_set, snapshot_set))
1525 for f in as_completed(futures):
1526 if f.exception():
1527 self.log.error(
1528 "Exception deleting snapshot set \n %s",
1529 f.exception())
1530 return snapshots
1532 def process_snapshot_set(self, snapshots_set):
1533 c = local_session(self.manager.session_factory).client('rds')
1534 for s in snapshots_set:
1535 c.delete_db_snapshot(
1536 DBSnapshotIdentifier=s['DBSnapshotIdentifier'])
1539@actions.register('modify-security-groups')
1540class RDSModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
1542 permissions = ('rds:ModifyDBInstance', 'rds:ModifyDBCluster')
1543 vpc_expr = 'DBSubnetGroup.VpcId'
1545 def process(self, rds_instances):
1546 replication_group_map = {}
1547 client = local_session(self.manager.session_factory).client('rds')
1548 groups = super(RDSModifyVpcSecurityGroups, self).get_groups(
1549 rds_instances)
1551 # either build map for DB cluster or modify DB instance directly
1552 for idx, i in enumerate(rds_instances):
1553 if i.get('DBClusterIdentifier'):
1554 # build map of Replication Groups to Security Groups
1555 replication_group_map[i['DBClusterIdentifier']] = groups[idx]
1556 else:
1557 client.modify_db_instance(
1558 DBInstanceIdentifier=i['DBInstanceIdentifier'],
1559 VpcSecurityGroupIds=groups[idx])
1561 # handle DB cluster, if necessary
1562 for idx, r in enumerate(replication_group_map.keys()):
1563 client.modify_db_cluster(
1564 DBClusterIdentifier=r,
1565 VpcSecurityGroupIds=replication_group_map[r]
1566 )
1569class DescribeSubnetGroup(DescribeSource):
1571 def augment(self, resources):
1572 _db_subnet_group_tags(
1573 resources, self.manager.session_factory,
1574 self.manager.executor_factory, self.manager.retry)
1575 return resources
1578@resources.register('rds-subnet-group')
1579class RDSSubnetGroup(QueryResourceManager):
1580 """RDS subnet group."""
1582 class resource_type(TypeInfo):
1583 service = 'rds'
1584 arn_type = 'subgrp'
1585 id = name = 'DBSubnetGroupName'
1586 arn_separator = ':'
1587 enum_spec = (
1588 'describe_db_subnet_groups', 'DBSubnetGroups', None)
1589 filter_name = 'DBSubnetGroupName'
1590 filter_type = 'scalar'
1591 permissions_enum = ('rds:DescribeDBSubnetGroups',)
1592 cfn_type = config_type = 'AWS::RDS::DBSubnetGroup'
1593 universal_taggable = object()
1595 source_mapping = {
1596 'config': ConfigSource,
1597 'describe': DescribeSubnetGroup
1598 }
1601def _db_subnet_group_tags(subnet_groups, session_factory, executor_factory, retry):
1602 client = local_session(session_factory).client('rds')
1604 def process_tags(g):
1605 try:
1606 g['Tags'] = client.list_tags_for_resource(
1607 ResourceName=g['DBSubnetGroupArn'])['TagList']
1608 return g
1609 except client.exceptions.DBSubnetGroupNotFoundFault:
1610 return None
1612 return list(filter(None, map(process_tags, subnet_groups)))
1615@RDSSubnetGroup.action_registry.register('delete')
1616class RDSSubnetGroupDeleteAction(BaseAction):
1617 """Action to delete RDS Subnet Group
1619 It is recommended to apply a filter to the delete policy to avoid unwanted
1620 deletion of any rds subnet groups.
1622 :example:
1624 .. code-block:: yaml
1626 policies:
1627 - name: rds-subnet-group-delete
1628 resource: rds-subnet-group
1629 filters:
1630 - Instances: []
1631 actions:
1632 - delete
1633 """
1635 schema = type_schema('delete')
1636 permissions = ('rds:DeleteDBSubnetGroup',)
1638 def process(self, subnet_group):
1639 with self.executor_factory(max_workers=2) as w:
1640 list(w.map(self.process_subnetgroup, subnet_group))
1642 def process_subnetgroup(self, subnet_group):
1643 client = local_session(self.manager.session_factory).client('rds')
1644 client.delete_db_subnet_group(DBSubnetGroupName=subnet_group['DBSubnetGroupName'])
1647@RDSSubnetGroup.filter_registry.register('unused')
1648class UnusedRDSSubnetGroup(Filter):
1649 """Filters all launch rds subnet groups that are not in use but exist
1651 :example:
1653 .. code-block:: yaml
1655 policies:
1656 - name: rds-subnet-group-delete-unused
1657 resource: rds-subnet-group
1658 filters:
1659 - unused
1660 """
1662 schema = type_schema('unused')
1664 def get_permissions(self):
1665 return self.manager.get_resource_manager('rds').get_permissions()
1667 def process(self, configs, event=None):
1668 rds = self.manager.get_resource_manager('rds').resources()
1669 self.used = set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName', rds))
1670 self.used.update(set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName',
1671 self.manager.get_resource_manager('rds-cluster').resources(augment=False))))
1672 return super(UnusedRDSSubnetGroup, self).process(configs)
1674 def __call__(self, config):
1675 return config['DBSubnetGroupName'] not in self.used
1678@filters.register('db-parameter')
1679class ParameterFilter(ValueFilter):
1680 """
1681 Applies value type filter on set db parameter values.
1682 :example:
1684 .. code-block:: yaml
1686 policies:
1687 - name: rds-pg
1688 resource: rds
1689 filters:
1690 - type: db-parameter
1691 key: someparam
1692 op: eq
1693 value: someval
1694 """
1696 schema = type_schema('db-parameter', rinherit=ValueFilter.schema)
1697 schema_alias = False
1698 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters', )
1699 policy_annotation = 'c7n:MatchedDBParameter'
1701 @staticmethod
1702 def recast(val, datatype):
1703 """ Re-cast the value based upon an AWS supplied datatype
1704 and treat nulls sensibly.
1705 """
1706 ret_val = val
1707 if datatype == 'string':
1708 ret_val = str(val)
1709 elif datatype == 'boolean':
1710 # AWS returns 1s and 0s for boolean for most of the cases
1711 if val.isdigit():
1712 ret_val = bool(int(val))
1713 # AWS returns 'TRUE,FALSE' for Oracle engine
1714 elif val == 'TRUE':
1715 ret_val = True
1716 elif val == 'FALSE':
1717 ret_val = False
1718 elif datatype == 'integer':
1719 if val.isdigit():
1720 ret_val = int(val)
1721 elif datatype == 'float':
1722 ret_val = float(val) if val else 0.0
1724 return ret_val
1726 # Private method for 'DBParameterGroupName' paginator
1727 def _get_param_list(self, pg):
1728 client = local_session(self.manager.session_factory).client('rds')
1729 paginator = client.get_paginator('describe_db_parameters')
1730 param_list = list(itertools.chain(*[p['Parameters']
1731 for p in paginator.paginate(DBParameterGroupName=pg)]))
1732 return param_list
1734 def handle_paramgroup_cache(self, param_groups):
1735 pgcache = {}
1736 cache = self.manager._cache
1738 with cache:
1739 for pg in param_groups:
1740 cache_key = {
1741 'region': self.manager.config.region,
1742 'account_id': self.manager.config.account_id,
1743 'rds-pg': pg}
1744 pg_values = cache.get(cache_key)
1745 if pg_values is not None:
1746 pgcache[pg] = pg_values
1747 continue
1748 param_list = self._get_param_list(pg)
1749 pgcache[pg] = {
1750 p['ParameterName']: self.recast(p['ParameterValue'], p['DataType'])
1751 for p in param_list if 'ParameterValue' in p}
1752 cache.save(cache_key, pgcache[pg])
1753 return pgcache
1755 def process(self, resources, event=None):
1756 results = []
1757 parameter_group_list = {db['DBParameterGroups'][0]['DBParameterGroupName']
1758 for db in resources}
1759 paramcache = self.handle_paramgroup_cache(parameter_group_list)
1760 for resource in resources:
1761 for pg in resource['DBParameterGroups']:
1762 pg_values = paramcache[pg['DBParameterGroupName']]
1763 if self.match(pg_values):
1764 resource.setdefault(self.policy_annotation, []).append(
1765 self.data.get('key'))
1766 results.append(resource)
1767 break
1768 return results
1771@actions.register('modify-db')
1772class ModifyDb(BaseAction):
1773 """Modifies an RDS instance based on specified parameter
1774 using ModifyDbInstance.
1776 'Update' is an array with with key value pairs that should be set to
1777 the property and value you wish to modify.
1778 'Immediate" determines whether the modification is applied immediately
1779 or not. If 'immediate' is not specified, default is false.
1781 :example:
1783 .. code-block:: yaml
1785 policies:
1786 - name: disable-rds-deletion-protection
1787 resource: rds
1788 filters:
1789 - DeletionProtection: true
1790 - PubliclyAccessible: true
1791 actions:
1792 - type: modify-db
1793 update:
1794 - property: 'DeletionProtection'
1795 value: false
1796 - property: 'PubliclyAccessible'
1797 value: false
1798 immediate: true
1799 """
1801 schema = type_schema(
1802 'modify-db',
1803 immediate={"type": 'boolean'},
1804 update={
1805 'type': 'array',
1806 'items': {
1807 'type': 'object',
1808 'properties': {
1809 'property': {'type': 'string', 'enum': [
1810 'AllocatedStorage',
1811 'DBInstanceClass',
1812 'DBSubnetGroupName',
1813 'DBSecurityGroups',
1814 'VpcSecurityGroupIds',
1815 'MasterUserPassword',
1816 'DBParameterGroupName',
1817 'BackupRetentionPeriod',
1818 'PreferredBackupWindow',
1819 'PreferredMaintenanceWindow',
1820 'MultiAZ',
1821 'EngineVersion',
1822 'AllowMajorVersionUpgrade',
1823 'AutoMinorVersionUpgrade',
1824 'LicenseModel',
1825 'Iops',
1826 'OptionGroupName',
1827 'NewDBInstanceIdentifier',
1828 'StorageType',
1829 'TdeCredentialArn',
1830 'TdeCredentialPassword',
1831 'CACertificateIdentifier',
1832 'Domain',
1833 'CopyTagsToSnapshot',
1834 'MonitoringInterval',
1835 'MonitoringRoleARN',
1836 'DBPortNumber',
1837 'PubliclyAccessible',
1838 'DomainIAMRoleName',
1839 'PromotionTier',
1840 'EnableIAMDatabaseAuthentication',
1841 'EnablePerformanceInsights',
1842 'PerformanceInsightsKMSKeyId',
1843 'PerformanceInsightsRetentionPeriod',
1844 'CloudwatchLogsExportConfiguration',
1845 'ProcessorFeatures',
1846 'UseDefaultProcessorFeatures',
1847 'DeletionProtection',
1848 'MaxAllocatedStorage',
1849 'CertificateRotationRestart']},
1850 'value': {}
1851 },
1852 },
1853 },
1854 required=('update',))
1856 permissions = ('rds:ModifyDBInstance',)
1857 conversion_map = {
1858 'DBSubnetGroupName': 'DBSubnetGroup.DBSubnetGroupName',
1859 'VpcSecurityGroupIds': 'VpcSecurityGroups[].VpcSecurityGroupId',
1860 'DBParameterGroupName': 'DBParameterGroups[].DBParameterGroupName',
1861 'OptionGroupName': 'OptionGroupMemberships[].OptionGroupName',
1862 'NewDBInstanceIdentifier': 'DBInstanceIdentifier',
1863 'Domain': 'DomainMemberships[].DomainName',
1864 'DBPortNumber': 'Endpoint.Port',
1865 'EnablePerformanceInsights': 'PerformanceInsightsEnabled',
1866 'CloudwatchLogsExportConfiguration': 'EnabledCloudwatchLogsExports'
1867 }
1869 def validate(self):
1870 if self.data.get('update'):
1871 update_dict = dict((i['property'], i['value']) for i in self.data.get('update'))
1872 if ('MonitoringInterval' in update_dict and update_dict['MonitoringInterval'] > 0 and
1873 'MonitoringRoleARN' not in update_dict):
1874 raise PolicyValidationError(
1875 "A MonitoringRoleARN value is required \
1876 if you specify a MonitoringInterval value other than 0")
1877 if ('CloudwatchLogsExportConfiguration' in update_dict
1878 and all(
1879 k not in update_dict.get('CloudwatchLogsExportConfiguration')
1880 for k in ('EnableLogTypes', 'DisableLogTypes'))):
1881 raise PolicyValidationError(
1882 "A EnableLogTypes or DisableLogTypes input list is required\
1883 for setting CloudwatchLogsExportConfiguration")
1884 return self
1886 def process(self, resources):
1887 c = local_session(self.manager.session_factory).client('rds')
1888 for r in resources:
1889 param = {
1890 u['property']: u['value'] for u in self.data.get('update')
1891 if r.get(
1892 u['property'],
1893 jmespath_search(
1894 self.conversion_map.get(u['property'], 'None'), r))
1895 != u['value']}
1896 if not param:
1897 continue
1898 param['ApplyImmediately'] = self.data.get('immediate', False)
1899 param['DBInstanceIdentifier'] = r['DBInstanceIdentifier']
1900 try:
1901 c.modify_db_instance(**param)
1902 except c.exceptions.DBInstanceNotFoundFault:
1903 raise
1906@resources.register('rds-reserved')
1907class ReservedRDS(QueryResourceManager):
1908 """Lists all active rds reservations
1910 :example:
1912 .. code-block:: yaml
1914 policies:
1915 - name: existing-rds-reservations
1916 resource: rds-reserved
1917 filters:
1918 - State: active
1919 """
1921 class resource_type(TypeInfo):
1922 service = 'rds'
1923 name = id = 'ReservedDBInstanceId'
1924 date = 'StartTime'
1925 enum_spec = (
1926 'describe_reserved_db_instances', 'ReservedDBInstances', None)
1927 filter_name = 'ReservedDBInstances'
1928 filter_type = 'list'
1929 arn_type = "ri"
1930 arn = "ReservedDBInstanceArn"
1931 permissions_enum = ('rds:DescribeReservedDBInstances',)
1932 universal_taggable = object()
1934 augment = universal_augment
1937RDS.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter)
1940@filters.register('consecutive-snapshots')
1941class ConsecutiveSnapshots(Filter):
1942 """Returns instances where number of consective daily snapshots is
1943 equal to/or greater than n days.
1945 :example:
1947 .. code-block:: yaml
1949 policies:
1950 - name: rds-daily-snapshot-count
1951 resource: rds
1952 filters:
1953 - type: consecutive-snapshots
1954 days: 7
1955 """
1956 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1},
1957 required=['days'])
1958 permissions = ('rds:DescribeDBSnapshots', 'rds:DescribeDBInstances')
1959 annotation = 'c7n:DBSnapshots'
1961 def process_resource_set(self, client, resources):
1962 rds_instances = [r['DBInstanceIdentifier'] for r in resources]
1963 paginator = client.get_paginator('describe_db_snapshots')
1964 paginator.PAGE_ITERATOR_CLS = RetryPageIterator
1965 db_snapshots = paginator.paginate(Filters=[{'Name': 'db-instance-id',
1966 'Values': rds_instances}]).build_full_result().get('DBSnapshots', [])
1968 inst_map = {}
1969 for snapshot in db_snapshots:
1970 inst_map.setdefault(snapshot['DBInstanceIdentifier'], []).append(snapshot)
1971 for r in resources:
1972 r[self.annotation] = inst_map.get(r['DBInstanceIdentifier'], [])
1974 def process(self, resources, event=None):
1975 client = local_session(self.manager.session_factory).client('rds')
1976 results = []
1977 retention = self.data.get('days')
1978 utcnow = datetime.datetime.utcnow()
1979 expected_dates = set()
1980 for days in range(1, retention + 1):
1981 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d'))
1983 for resource_set in chunks(
1984 [r for r in resources if self.annotation not in r], 50):
1985 self.process_resource_set(client, resource_set)
1987 for r in resources:
1988 snapshot_dates = set()
1989 for snapshot in r[self.annotation]:
1990 if snapshot['Status'] == 'available':
1991 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d'))
1992 if expected_dates.issubset(snapshot_dates):
1993 results.append(r)
1994 return results
1997@filters.register('engine')
1998class EngineFilter(ValueFilter):
1999 """
2000 Filter a rds resource based on its Engine Metadata
2002 :example:
2004 .. code-block:: yaml
2006 policies:
2007 - name: find-deprecated-versions
2008 resource: aws.rds
2009 filters:
2010 - type: engine
2011 key: Status
2012 value: deprecated
2013 """
2015 schema = type_schema('engine', rinherit=ValueFilter.schema)
2017 permissions = ("rds:DescribeDBEngineVersions", )
2019 def process(self, resources, event=None):
2020 client = local_session(self.manager.session_factory).client('rds')
2022 engines = set()
2023 engine_versions = set()
2024 for r in resources:
2025 engines.add(r['Engine'])
2026 engine_versions.add(r['EngineVersion'])
2028 paginator = client.get_paginator('describe_db_engine_versions')
2029 response = paginator.paginate(
2030 Filters=[
2031 {'Name': 'engine', 'Values': list(engines)},
2032 {'Name': 'engine-version', 'Values': list(engine_versions)}
2033 ],
2034 IncludeAll=True,
2035 )
2036 all_versions = {}
2037 matched = []
2038 for page in response:
2039 for e in page['DBEngineVersions']:
2040 all_versions.setdefault(e['Engine'], {})
2041 all_versions[e['Engine']][e['EngineVersion']] = e
2042 for r in resources:
2043 v = all_versions[r['Engine']][r['EngineVersion']]
2044 if self.match(v):
2045 r['c7n:Engine'] = v
2046 matched.append(r)
2047 return matched
2050class DescribeDBProxy(DescribeSource):
2051 def augment(self, resources):
2052 return universal_augment(self.manager, resources)
2055@resources.register('rds-proxy')
2056class RDSProxy(QueryResourceManager):
2057 """Resource Manager for RDS DB Proxies
2059 :example:
2061 .. code-block:: yaml
2063 policies:
2064 - name: rds-proxy-tls-check
2065 resource: rds-proxy
2066 filters:
2067 - type: value
2068 key: RequireTLS
2069 value: false
2070 """
2072 class resource_type(TypeInfo):
2073 service = 'rds'
2074 name = id = 'DBProxyName'
2075 date = 'CreatedDate'
2076 enum_spec = ('describe_db_proxies', 'DBProxies', None)
2077 arn = 'DBProxyArn'
2078 arn_type = 'db-proxy'
2079 cfn_type = config_type = 'AWS::RDS::DBInstance'
2080 permissions_enum = ('rds:DescribeDBProxies',)
2081 universal_taggable = object()
2083 source_mapping = {
2084 'describe': DescribeDBProxy,
2085 'config': ConfigSource
2086 }
2089@RDSProxy.action_registry.register('delete')
2090class DeleteRDSProxy(BaseAction):
2091 """
2092 Deletes a RDS Proxy
2094 :example:
2096 .. code-block:: yaml
2098 policies:
2099 - name: delete-rds-proxy
2100 resource: aws.rds-proxy
2101 filters:
2102 - type: value
2103 key: "DBProxyName"
2104 op: eq
2105 value: "proxy-test-1"
2106 actions:
2107 - type: delete
2108 """
2110 schema = type_schema('delete')
2112 permissions = ('rds:DeleteDBProxy',)
2114 def process(self, resources):
2115 client = local_session(self.manager.session_factory).client('rds')
2116 for r in resources:
2117 self.manager.retry(
2118 client.delete_db_proxy, DBProxyName=r['DBProxyName'],
2119 ignore_err_codes=('DBProxyNotFoundFault',
2120 'InvalidDBProxyStateFault'))
2123@RDSProxy.filter_registry.register('subnet')
2124class RDSProxySubnetFilter(net_filters.SubnetFilter):
2126 RelatedIdsExpression = "VpcSubnetIds[]"
2129@RDSProxy.filter_registry.register('security-group')
2130class RDSProxySecurityGroupFilter(net_filters.SecurityGroupFilter):
2132 RelatedIdsExpression = "VpcSecurityGroupIds[]"
2134@RDSProxy.filter_registry.register('vpc')
2135class RDSProxyVpcFilter(net_filters.VpcFilter):
2137 RelatedIdsExpression = "VpcId"
2140@filters.register('db-option-groups')
2141class DbOptionGroups(ValueFilter):
2142 """This filter describes RDS option groups for associated RDS instances.
2143 Use this filter in conjunction with jmespath and value filter operators
2144 to filter RDS instance based on their option groups
2146 :example:
2148 .. code-block:: yaml
2150 policies:
2151 - name: rds-data-in-transit-encrypted
2152 resource: aws.rds
2153 filters:
2154 - type: db-option-groups
2155 key: Options[].OptionName
2156 op: intersect
2157 value:
2158 - SSL
2159 - NATIVE_NETWORK_ENCRYPTION
2161 :example:
2163 .. code-block:: yaml
2165 policies:
2166 - name: rds-oracle-encryption-in-transit
2167 resource: aws.rds
2168 filters:
2169 - Engine: oracle-ee
2170 - type: db-option-groups
2171 key: Options[].OptionSettings[?Name == 'SQLNET.ENCRYPTION_SERVER'].Value[]
2172 value:
2173 - REQUIRED
2174 """
2176 schema = type_schema('db-option-groups', rinherit=ValueFilter.schema)
2177 schema_alias = False
2178 permissions = ('rds:DescribeDBInstances', 'rds:DescribeOptionGroups', )
2179 policy_annotation = 'c7n:MatchedDBOptionGroups'
2181 def handle_optiongroup_cache(self, client, paginator, option_groups):
2182 ogcache = {}
2183 cache = self.manager._cache
2185 with cache:
2186 for og in option_groups:
2187 cache_key = {
2188 'region': self.manager.config.region,
2189 'account_id': self.manager.config.account_id,
2190 'rds-pg': og}
2191 og_values = cache.get(cache_key)
2192 if og_values is not None:
2193 ogcache[og] = og_values
2194 continue
2195 option_groups_list = list(itertools.chain(*[p['OptionGroupsList']
2196 for p in paginator.paginate(OptionGroupName=og)]))
2198 ogcache[og] = {}
2199 for option_group in option_groups_list:
2200 ogcache[og] = option_group
2202 cache.save(cache_key, ogcache[og])
2204 return ogcache
2206 def process(self, resources, event=None):
2207 results = []
2208 client = local_session(self.manager.session_factory).client('rds')
2209 paginator = client.get_paginator('describe_option_groups')
2210 option_groups = [db['OptionGroupMemberships'][0]['OptionGroupName']
2211 for db in resources]
2212 optioncache = self.handle_optiongroup_cache(client, paginator, option_groups)
2214 for resource in resources:
2215 for og in resource['OptionGroupMemberships']:
2216 og_values = optioncache[og['OptionGroupName']]
2217 if self.match(og_values):
2218 resource.setdefault(self.policy_annotation, []).append({
2219 k: jmespath_search(k, og_values)
2220 for k in {'OptionGroupName', self.data.get('key')}
2221 })
2222 results.append(resource)
2223 break
2225 return results
2228@filters.register('pending-maintenance')
2229class PendingMaintenance(Filter):
2230 """ Scan DB instances for those with pending maintenance
2232 :example:
2234 .. code-block:: yaml
2236 policies:
2237 - name: rds-pending-maintenance
2238 resource: aws.rds
2239 filters:
2240 - pending-maintenance
2241 """
2243 schema = type_schema('pending-maintenance')
2244 permissions = ('rds:DescribePendingMaintenanceActions',)
2246 def process(self, resources, event=None):
2247 client = local_session(self.manager.session_factory).client('rds')
2249 results = []
2250 pending_maintenance = set()
2251 paginator = client.get_paginator('describe_pending_maintenance_actions')
2252 for page in paginator.paginate():
2253 pending_maintenance.update(
2254 {action['ResourceIdentifier'] for action in page['PendingMaintenanceActions']}
2255 )
2257 for r in resources:
2258 if r['DBInstanceArn'] in pending_maintenance:
2259 results.append(r)
2261 return results