Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/rds.py: 40%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4RDS Resource Manager
5====================
7Example Policies
8----------------
10Find rds instances that are publicly available
12.. code-block:: yaml
14 policies:
15 - name: rds-public
16 resource: rds
17 filters:
18 - PubliclyAccessible: true
20Find rds instances that are not encrypted
22.. code-block:: yaml
24 policies:
25 - name: rds-non-encrypted
26 resource: rds
27 filters:
28 - type: value
29 key: StorageEncrypted
30 value: true
31 op: ne
33"""
34import functools
35import itertools
36import logging
37import operator
38import re
39import datetime
41from datetime import timedelta
43from decimal import Decimal as D, ROUND_HALF_UP
45from c7n.vendored.distutils.version import LooseVersion
46from botocore.exceptions import ClientError
47from concurrent.futures import as_completed
49from c7n.actions import (
50 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction)
52from c7n.exceptions import PolicyValidationError
53from c7n.filters import (
54 CrossAccountAccessFilter, FilterRegistry, Filter, ValueFilter, AgeFilter)
55from c7n.filters.offhours import OffHour, OnHour
56from c7n.filters import related
57import c7n.filters.vpc as net_filters
58from c7n.manager import resources
59from c7n.query import (
60 QueryResourceManager, DescribeSource, ConfigSource, TypeInfo, RetryPageIterator)
61from c7n import deprecated, tags
62from c7n.tags import universal_augment
64from c7n.utils import (
65 local_session, type_schema, get_retry, chunks, snapshot_identifier,
66 merge_dict_list, filter_empty, jmespath_search)
67from c7n.resources.kms import ResourceKmsKeyAlias
68from c7n.resources.securityhub import PostFinding
69from c7n.filters.backup import ConsecutiveAwsBackupsFilter
71log = logging.getLogger('custodian.rds')
73filters = FilterRegistry('rds.filters')
74actions = ActionRegistry('rds.actions')
77class DescribeRDS(DescribeSource):
79 def augment(self, dbs):
80 for d in dbs:
81 d['Tags'] = d.pop('TagList', ())
82 return dbs
85class ConfigRDS(ConfigSource):
87 def load_resource(self, item):
88 resource = super().load_resource(item)
89 for k in list(resource.keys()):
90 if k.startswith('Db'):
91 resource["DB%s" % k[2:]] = resource[k]
92 return resource
95@resources.register('rds')
96class RDS(QueryResourceManager):
97 """Resource manager for RDS DB instances.
98 """
100 class resource_type(TypeInfo):
101 service = 'rds'
102 arn_type = 'db'
103 arn_separator = ':'
104 enum_spec = ('describe_db_instances', 'DBInstances', None)
105 id = 'DBInstanceIdentifier'
106 config_id = 'DbiResourceId'
107 name = 'Endpoint.Address'
108 filter_name = 'DBInstanceIdentifier'
109 filter_type = 'scalar'
110 date = 'InstanceCreateTime'
111 dimension = 'DBInstanceIdentifier'
112 cfn_type = config_type = 'AWS::RDS::DBInstance'
113 arn = 'DBInstanceArn'
114 universal_taggable = True
115 default_report_fields = (
116 'DBInstanceIdentifier',
117 'DBName',
118 'Engine',
119 'EngineVersion',
120 'MultiAZ',
121 'AllocatedStorage',
122 'StorageEncrypted',
123 'PubliclyAccessible',
124 'InstanceCreateTime',
125 )
126 permissions_enum = ('rds:DescribeDBInstances',)
128 filter_registry = filters
129 action_registry = actions
131 def resources(self, query=None):
132 if query is None and 'query' in self.data:
133 query = merge_dict_list(self.data['query'])
134 elif query is None:
135 query = {}
136 return super(RDS, self).resources(query=query)
138 source_mapping = {
139 'describe': DescribeRDS,
140 'config': ConfigRDS
141 }
144def _db_instance_eligible_for_backup(resource):
145 db_instance_id = resource['DBInstanceIdentifier']
147 # Database instance is not in available state
148 if resource.get('DBInstanceStatus', '') != 'available':
149 log.debug(
150 "DB instance %s is not in available state",
151 db_instance_id)
152 return False
153 # The specified DB Instance is a member of a cluster and its
154 # backup retention should not be modified directly. Instead,
155 # modify the backup retention of the cluster using the
156 # ModifyDbCluster API
157 if resource.get('DBClusterIdentifier', ''):
158 log.debug(
159 "DB instance %s is a cluster member",
160 db_instance_id)
161 return False
162 # DB Backups not supported on a read replica for engine postgres
163 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and
164 resource.get('Engine', '') == 'postgres'):
165 log.debug(
166 "DB instance %s is a postgres read-replica",
167 db_instance_id)
168 return False
169 # DB Backups not supported on a read replica running a mysql
170 # version before 5.6
171 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and
172 resource.get('Engine', '') == 'mysql'):
173 engine_version = resource.get('EngineVersion', '')
174 # Assume "<major>.<minor>.<whatever>"
175 match = re.match(r'(?P<major>\d+)\.(?P<minor>\d+)\..*', engine_version)
176 if (match and int(match.group('major')) < 5 or
177 (int(match.group('major')) == 5 and int(match.group('minor')) < 6)):
178 log.debug(
179 "DB instance %s is a version %s mysql read-replica",
180 db_instance_id,
181 engine_version)
182 return False
183 return True
186def _db_instance_eligible_for_final_snapshot(resource):
187 status = resource.get('DBInstanceStatus', '')
188 # If the DB instance you are deleting has a status of "Creating,"
189 # you will not be able to have a final DB snapshot taken
190 # If the DB instance is in a failure state with a status of "failed,"
191 # "incompatible-restore," or "incompatible-network," you can only delete
192 # the instance when the SkipFinalSnapshot parameter is set to "true."
193 eligible_for_final_snapshot = True
194 if status in ['creating', 'failed', 'incompatible-restore', 'incompatible-network']:
195 eligible_for_final_snapshot = False
197 # FinalDBSnapshotIdentifier can not be specified when deleting a
198 # replica instance
199 if resource.get('ReadReplicaSourceDBInstanceIdentifier', ''):
200 eligible_for_final_snapshot = False
202 # if it's a rds-cluster, don't try to run the rds instance snapshot api call
203 if resource.get('DBClusterIdentifier', False):
204 eligible_for_final_snapshot = False
206 if not eligible_for_final_snapshot:
207 log.debug('DB instance is not eligible for a snapshot:/n %s', resource)
208 return eligible_for_final_snapshot
211def _get_available_engine_upgrades(client, major=False):
212 """Returns all extant rds engine upgrades.
214 As a nested mapping of engine type to known versions
215 and their upgrades.
217 Defaults to minor upgrades, but configurable to major.
219 Example::
221 >>> _get_available_engine_upgrades(client)
222 {
223 'oracle-se2': {'12.1.0.2.v2': '12.1.0.2.v5',
224 '12.1.0.2.v3': '12.1.0.2.v5'},
225 'postgres': {'9.3.1': '9.3.14',
226 '9.3.10': '9.3.14',
227 '9.3.12': '9.3.14',
228 '9.3.2': '9.3.14'}
229 }
230 """
231 results = {}
232 paginator = client.get_paginator('describe_db_engine_versions')
233 for page in paginator.paginate():
234 engine_versions = page['DBEngineVersions']
235 for v in engine_versions:
236 if v['Engine'] not in results:
237 results[v['Engine']] = {}
238 if 'ValidUpgradeTarget' not in v or len(v['ValidUpgradeTarget']) == 0:
239 continue
240 for t in v['ValidUpgradeTarget']:
241 if not major and t['IsMajorVersionUpgrade']:
242 continue
243 if LooseVersion(t['EngineVersion']) > LooseVersion(
244 results[v['Engine']].get(v['EngineVersion'], '0.0.0')):
245 results[v['Engine']][v['EngineVersion']] = t['EngineVersion']
246 return results
249filters.register('offhour', OffHour)
250filters.register('onhour', OnHour)
253@filters.register('default-vpc')
254class DefaultVpc(net_filters.DefaultVpcBase):
255 """ Matches if an rds database is in the default vpc
257 :example:
259 .. code-block:: yaml
261 policies:
262 - name: default-vpc-rds
263 resource: rds
264 filters:
265 - type: default-vpc
266 """
267 schema = type_schema('default-vpc')
269 def __call__(self, rdb):
270 return self.match(rdb['DBSubnetGroup']['VpcId'])
273@filters.register('security-group')
274class SecurityGroupFilter(net_filters.SecurityGroupFilter):
276 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId"
279@filters.register('subnet')
280class SubnetFilter(net_filters.SubnetFilter):
282 RelatedIdsExpression = "DBSubnetGroup.Subnets[].SubnetIdentifier"
285@filters.register('vpc')
286class VpcFilter(net_filters.VpcFilter):
288 RelatedIdsExpression = "DBSubnetGroup.VpcId"
291filters.register('network-location', net_filters.NetworkLocation)
294@filters.register('kms-alias')
295class KmsKeyAlias(ResourceKmsKeyAlias):
297 def process(self, dbs, event=None):
298 return self.get_matching_aliases(dbs)
301@actions.register('auto-patch')
302class AutoPatch(BaseAction):
303 """Toggle AutoMinorUpgrade flag on RDS instance
305 'window' parameter needs to be in the format 'ddd:hh:mm-ddd:hh:mm' and
306 have at least 30 minutes between start & end time.
307 If 'window' is not specified, AWS will assign a random maintenance window
308 to each instance selected.
310 :example:
312 .. code-block:: yaml
314 policies:
315 - name: enable-rds-autopatch
316 resource: rds
317 filters:
318 - AutoMinorVersionUpgrade: false
319 actions:
320 - type: auto-patch
321 minor: true
322 window: Mon:23:00-Tue:01:00
323 """
325 schema = type_schema(
326 'auto-patch',
327 minor={'type': 'boolean'}, window={'type': 'string'})
328 permissions = ('rds:ModifyDBInstance',)
330 def process(self, dbs):
331 client = local_session(
332 self.manager.session_factory).client('rds')
334 params = {'AutoMinorVersionUpgrade': self.data.get('minor', True)}
335 if self.data.get('window'):
336 params['PreferredMaintenanceWindow'] = self.data['window']
338 for db in dbs:
339 client.modify_db_instance(
340 DBInstanceIdentifier=db['DBInstanceIdentifier'],
341 **params)
344@filters.register('upgrade-available')
345class UpgradeAvailable(Filter):
346 """ Scan DB instances for available engine upgrades
348 This will pull DB instances & check their specific engine for any
349 engine version with higher release numbers than the current one
351 This will also annotate the rds instance with 'target_engine' which is
352 the most recent version of the engine available
354 :example:
356 .. code-block:: yaml
358 policies:
359 - name: rds-upgrade-available
360 resource: rds
361 filters:
362 - type: upgrade-available
363 major: False
365 """
367 schema = type_schema('upgrade-available',
368 major={'type': 'boolean'},
369 value={'type': 'boolean'})
370 permissions = ('rds:DescribeDBEngineVersions',)
372 def process(self, resources, event=None):
373 client = local_session(self.manager.session_factory).client('rds')
374 check_upgrade_extant = self.data.get('value', True)
375 check_major = self.data.get('major', False)
376 engine_upgrades = _get_available_engine_upgrades(
377 client, major=check_major)
378 results = []
380 for r in resources:
381 target_upgrade = engine_upgrades.get(
382 r['Engine'], {}).get(r['EngineVersion'])
383 if target_upgrade is None:
384 if check_upgrade_extant is False:
385 results.append(r)
386 continue
387 r['c7n-rds-engine-upgrade'] = target_upgrade
388 results.append(r)
389 return results
392@actions.register('upgrade')
393class UpgradeMinor(BaseAction):
394 """Upgrades a RDS instance to the latest major/minor version available
396 Use of the 'immediate' flag (default False) will automatically upgrade
397 the RDS engine disregarding the existing maintenance window.
399 :example:
401 .. code-block:: yaml
403 policies:
404 - name: upgrade-rds-minor
405 resource: rds
406 actions:
407 - type: upgrade
408 major: False
409 immediate: False
411 """
413 schema = type_schema(
414 'upgrade',
415 major={'type': 'boolean'},
416 immediate={'type': 'boolean'})
417 permissions = ('rds:ModifyDBInstance',)
419 def process(self, resources):
420 client = local_session(self.manager.session_factory).client('rds')
421 engine_upgrades = None
422 for r in resources:
423 if 'EngineVersion' in r['PendingModifiedValues']:
424 # Upgrade has already been scheduled
425 continue
426 if 'c7n-rds-engine-upgrade' not in r:
427 if engine_upgrades is None:
428 engine_upgrades = _get_available_engine_upgrades(
429 client, major=self.data.get('major', False))
430 target = engine_upgrades.get(
431 r['Engine'], {}).get(r['EngineVersion'])
432 if target is None:
433 log.debug(
434 "implicit filter no upgrade on %s",
435 r['DBInstanceIdentifier'])
436 continue
437 r['c7n-rds-engine-upgrade'] = target
438 client.modify_db_instance(
439 DBInstanceIdentifier=r['DBInstanceIdentifier'],
440 EngineVersion=r['c7n-rds-engine-upgrade'],
441 ApplyImmediately=self.data.get('immediate', False))
444@actions.register('tag-trim')
445class TagTrim(tags.TagTrim):
447 permissions = ('rds:RemoveTagsFromResource',)
449 def process_tag_removal(self, client, resource, candidates):
450 client.remove_tags_from_resource(ResourceName=resource['DBInstanceArn'], TagKeys=candidates)
453START_STOP_ELIGIBLE_ENGINES = {
454 'postgres', 'sqlserver-ee',
455 'oracle-se2', 'mariadb', 'oracle-ee',
456 'sqlserver-ex', 'sqlserver-se', 'oracle-se',
457 'mysql', 'oracle-se1', 'sqlserver-web',
458 'db2-ae', 'db2-se', 'oracle-ee-cdb',
459 'sqlserver-ee', 'oracle-se2-cdb'}
462def _eligible_start_stop(db, state="available"):
463 # See conditions noted here
464 # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html
465 # Note that this doesn't really specify what happens for all the nosql engines
466 # that are available as rds engines.
467 if db.get('DBInstanceStatus') != state:
468 return False
470 if db.get('MultiAZ') and db['Engine'].startswith('sqlserver-'):
471 return False
473 if db['Engine'] not in START_STOP_ELIGIBLE_ENGINES:
474 return False
476 if db.get('ReadReplicaDBInstanceIdentifiers'):
477 return False
479 if db.get('ReadReplicaSourceDBInstanceIdentifier'):
480 return False
482 # TODO is SQL Server mirror is detectable.
483 return True
486@actions.register('stop')
487class Stop(BaseAction):
488 """Stop an rds instance.
490 https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html
491 """
493 schema = type_schema('stop')
495 permissions = ("rds:StopDBInstance",)
497 def process(self, resources):
498 client = local_session(self.manager.session_factory).client('rds')
499 for r in filter(_eligible_start_stop, resources):
500 try:
501 client.stop_db_instance(
502 DBInstanceIdentifier=r['DBInstanceIdentifier'])
503 except ClientError as e:
504 log.exception(
505 "Error stopping db instance:%s err:%s",
506 r['DBInstanceIdentifier'], e)
509@actions.register('start')
510class Start(BaseAction):
511 """Start an rds instance.
512 """
514 schema = type_schema('start')
516 permissions = ("rds:StartDBInstance",)
518 def process(self, resources):
519 client = local_session(self.manager.session_factory).client('rds')
520 start_filter = functools.partial(_eligible_start_stop, state='stopped')
521 for r in filter(start_filter, resources):
522 try:
523 client.start_db_instance(
524 DBInstanceIdentifier=r['DBInstanceIdentifier'])
525 except ClientError as e:
526 log.exception(
527 "Error starting db instance:%s err:%s",
528 r['DBInstanceIdentifier'], e)
531@actions.register('delete')
532class Delete(BaseAction):
533 """Deletes selected RDS instances
535 This will delete RDS instances. It is recommended to apply with a filter
536 to avoid deleting all RDS instances in the account.
538 :example:
540 .. code-block:: yaml
542 policies:
543 - name: rds-delete
544 resource: rds
545 filters:
546 - default-vpc
547 actions:
548 - type: delete
549 skip-snapshot: true
550 """
552 schema = type_schema('delete', **{
553 'skip-snapshot': {'type': 'boolean'},
554 'copy-restore-info': {'type': 'boolean'}
555 })
557 permissions = ('rds:DeleteDBInstance', 'rds:AddTagsToResource')
559 def validate(self):
560 if self.data.get('skip-snapshot', False) and self.data.get(
561 'copy-restore-info'):
562 raise PolicyValidationError(
563 "skip-snapshot cannot be specified with copy-restore-info on %s" % (
564 self.manager.data,))
565 return self
567 def process(self, dbs):
568 skip = self.data.get('skip-snapshot', False)
569 # Can't delete an instance in an aurora cluster, use a policy on the cluster
570 dbs = [r for r in dbs if not r.get('DBClusterIdentifier')]
571 # Concurrency feels like overkill here.
572 client = local_session(self.manager.session_factory).client('rds')
573 for db in dbs:
574 params = dict(
575 DBInstanceIdentifier=db['DBInstanceIdentifier'])
576 if skip or not _db_instance_eligible_for_final_snapshot(db):
577 params['SkipFinalSnapshot'] = True
578 else:
579 params['FinalDBSnapshotIdentifier'] = snapshot_identifier(
580 'Final', db['DBInstanceIdentifier'])
581 if self.data.get('copy-restore-info', False):
582 self.copy_restore_info(client, db)
583 if not db['CopyTagsToSnapshot']:
584 client.modify_db_instance(
585 DBInstanceIdentifier=db['DBInstanceIdentifier'],
586 CopyTagsToSnapshot=True)
587 self.log.info(
588 "Deleting rds: %s snapshot: %s",
589 db['DBInstanceIdentifier'],
590 params.get('FinalDBSnapshotIdentifier', False))
592 try:
593 client.delete_db_instance(**params)
594 except ClientError as e:
595 if e.response['Error']['Code'] == "InvalidDBInstanceState":
596 continue
597 raise
599 return dbs
601 def copy_restore_info(self, client, instance):
602 tags = []
603 tags.append({
604 'Key': 'VPCSecurityGroups',
605 'Value': ''.join([
606 g['VpcSecurityGroupId'] for g in instance['VpcSecurityGroups']
607 ])})
608 tags.append({
609 'Key': 'OptionGroupName',
610 'Value': instance['OptionGroupMemberships'][0]['OptionGroupName']})
611 tags.append({
612 'Key': 'ParameterGroupName',
613 'Value': instance['DBParameterGroups'][0]['DBParameterGroupName']})
614 tags.append({
615 'Key': 'InstanceClass',
616 'Value': instance['DBInstanceClass']})
617 tags.append({
618 'Key': 'StorageType',
619 'Value': instance['StorageType']})
620 tags.append({
621 'Key': 'MultiAZ',
622 'Value': str(instance['MultiAZ'])})
623 tags.append({
624 'Key': 'DBSubnetGroupName',
625 'Value': instance['DBSubnetGroup']['DBSubnetGroupName']})
626 client.add_tags_to_resource(
627 ResourceName=self.manager.generate_arn(
628 instance['DBInstanceIdentifier']),
629 Tags=tags)
632@actions.register('set-snapshot-copy-tags')
633class CopySnapshotTags(BaseAction):
634 """Enables copying tags from rds instance to snapshot
636 DEPRECATED - use modify-db instead with `CopyTagsToSnapshot`
638 :example:
640 .. code-block:: yaml
642 policies:
643 - name: enable-rds-snapshot-tags
644 resource: rds
645 filters:
646 - type: value
647 key: Engine
648 value: aurora
649 op: eq
650 actions:
651 - type: set-snapshot-copy-tags
652 enable: True
653 """
654 deprecations = (
655 deprecated.action("use modify-db instead with `CopyTagsToSnapshot`"),
656 )
658 schema = type_schema(
659 'set-snapshot-copy-tags',
660 enable={'type': 'boolean'})
661 permissions = ('rds:ModifyDBInstance',)
663 def process(self, resources):
664 error = None
665 with self.executor_factory(max_workers=2) as w:
666 futures = {}
667 client = local_session(self.manager.session_factory).client('rds')
668 resources = [r for r in resources
669 if r['CopyTagsToSnapshot'] != self.data.get('enable', True)]
670 for r in resources:
671 futures[w.submit(self.set_snapshot_tags, client, r)] = r
672 for f in as_completed(futures):
673 if f.exception():
674 error = f.exception()
675 self.log.error(
676 'error updating rds:%s CopyTagsToSnapshot \n %s',
677 futures[f]['DBInstanceIdentifier'], error)
678 if error:
679 raise error
680 return resources
682 def set_snapshot_tags(self, client, r):
683 self.manager.retry(
684 client.modify_db_instance,
685 DBInstanceIdentifier=r['DBInstanceIdentifier'],
686 CopyTagsToSnapshot=self.data.get('enable', True))
689@RDS.action_registry.register('post-finding')
690class DbInstanceFinding(PostFinding):
692 resource_type = 'AwsRdsDbInstance'
694 def format_resource(self, r):
696 fields = [
697 'AssociatedRoles', 'CACertificateIdentifier', 'DBClusterIdentifier',
698 'DBInstanceIdentifier', 'DBInstanceClass', 'DbInstancePort', 'DbiResourceId',
699 'DBName', 'DeletionProtection', 'Endpoint', 'Engine', 'EngineVersion',
700 'IAMDatabaseAuthenticationEnabled', 'InstanceCreateTime', 'KmsKeyId',
701 'PubliclyAccessible', 'StorageEncrypted',
702 'TdeCredentialArn', 'VpcSecurityGroups', 'MultiAz', 'EnhancedMonitoringResourceArn',
703 'DbInstanceStatus', 'MasterUsername',
704 'AllocatedStorage', 'PreferredBackupWindow', 'BackupRetentionPeriod',
705 'DbSecurityGroups', 'DbParameterGroups',
706 'AvailabilityZone', 'DbSubnetGroup', 'PreferredMaintenanceWindow',
707 'PendingModifiedValues', 'LatestRestorableTime',
708 'AutoMinorVersionUpgrade', 'ReadReplicaSourceDBInstanceIdentifier',
709 'ReadReplicaDBInstanceIdentifiers',
710 'ReadReplicaDBClusterIdentifiers', 'LicenseModel', 'Iops', 'OptionGroupMemberships',
711 'CharacterSetName',
712 'SecondaryAvailabilityZone', 'StatusInfos', 'StorageType', 'DomainMemberships',
713 'CopyTagsToSnapshot',
714 'MonitoringInterval', 'MonitoringRoleArn', 'PromotionTier', 'Timezone',
715 'PerformanceInsightsEnabled',
716 'PerformanceInsightsKmsKeyId', 'PerformanceInsightsRetentionPeriod',
717 'EnabledCloudWatchLogsExports',
718 'ProcessorFeatures', 'ListenerEndpoint', 'MaxAllocatedStorage'
719 ]
720 details = {}
721 for f in fields:
722 if r.get(f):
723 value = r[f]
724 if isinstance(r[f], datetime.datetime):
725 value = r[f].isoformat()
726 details.setdefault(f, value)
728 db_instance = {
729 'Type': self.resource_type,
730 'Id': r['DBInstanceArn'],
731 'Region': self.manager.config.region,
732 'Tags': {t['Key']: t['Value'] for t in r.get('Tags', [])},
733 'Details': {self.resource_type: filter_empty(details)},
734 }
735 db_instance = filter_empty(db_instance)
736 return db_instance
739@actions.register('snapshot')
740class Snapshot(BaseAction):
741 """Creates a manual snapshot of a RDS instance
743 :example:
745 .. code-block:: yaml
747 policies:
748 - name: rds-snapshot
749 resource: rds
750 actions:
751 - snapshot
752 """
754 schema = type_schema('snapshot')
755 permissions = ('rds:CreateDBSnapshot',)
757 def process(self, dbs):
758 with self.executor_factory(max_workers=3) as w:
759 futures = []
760 for db in dbs:
761 futures.append(w.submit(
762 self.process_rds_snapshot,
763 db))
764 for f in as_completed(futures):
765 if f.exception():
766 self.log.error(
767 "Exception creating rds snapshot \n %s",
768 f.exception())
769 return dbs
771 def process_rds_snapshot(self, resource):
772 if not _db_instance_eligible_for_backup(resource):
773 return
775 c = local_session(self.manager.session_factory).client('rds')
776 c.create_db_snapshot(
777 DBSnapshotIdentifier=snapshot_identifier(
778 self.data.get('snapshot-prefix', 'Backup'),
779 resource['DBInstanceIdentifier']),
780 DBInstanceIdentifier=resource['DBInstanceIdentifier'])
783@actions.register('resize')
784class ResizeInstance(BaseAction):
785 """Change the allocated storage of an rds instance.
787 :example:
789 This will find databases using over 85% of their allocated
790 storage, and resize them to have an additional 30% storage
791 the resize here is async during the next maintenance.
793 .. code-block:: yaml
795 policies:
796 - name: rds-resize-up
797 resource: rds
798 filters:
799 - type: metrics
800 name: FreeStorageSpace
801 percent-attr: AllocatedStorage
802 attr-multiplier: 1073741824
803 value: 90
804 op: greater-than
805 actions:
806 - type: resize
807 percent: 30
810 This will find databases using under 20% of their allocated
811 storage, and resize them to be 30% smaller, the resize here
812 is configured to be immediate.
814 .. code-block:: yaml
816 policies:
817 - name: rds-resize-down
818 resource: rds
819 filters:
820 - type: metrics
821 name: FreeStorageSpace
822 percent-attr: AllocatedStorage
823 attr-multiplier: 1073741824
824 value: 90
825 op: greater-than
826 actions:
827 - type: resize
828 percent: -30
829 immediate: true
830 """
831 schema = type_schema(
832 'resize',
833 percent={'type': 'number'},
834 immediate={'type': 'boolean'})
836 permissions = ('rds:ModifyDBInstance',)
838 def process(self, resources):
839 c = local_session(self.manager.session_factory).client('rds')
840 for r in resources:
841 old_val = D(r['AllocatedStorage'])
842 _100 = D(100)
843 new_val = ((_100 + D(self.data['percent'])) / _100) * old_val
844 rounded = int(new_val.quantize(D('0'), ROUND_HALF_UP))
845 c.modify_db_instance(
846 DBInstanceIdentifier=r['DBInstanceIdentifier'],
847 AllocatedStorage=rounded,
848 ApplyImmediately=self.data.get('immediate', False))
851@actions.register('retention')
852class RetentionWindow(BaseAction):
853 """
854 Sets the 'BackupRetentionPeriod' value for automated snapshots,
855 enforce (min, max, exact) sets retention days occordingly.
856 :example:
858 .. code-block:: yaml
860 policies:
861 - name: rds-snapshot-retention
862 resource: rds
863 filters:
864 - type: value
865 key: BackupRetentionPeriod
866 value: 7
867 op: lt
868 actions:
869 - type: retention
870 days: 7
871 copy-tags: true
872 enforce: exact
873 """
875 date_attribute = "BackupRetentionPeriod"
876 schema = type_schema(
877 'retention', **{'days': {'type': 'number'},
878 'copy-tags': {'type': 'boolean'},
879 'enforce': {'type': 'string', 'enum': [
880 'min', 'max', 'exact']}})
881 permissions = ('rds:ModifyDBInstance',)
883 def process(self, dbs):
884 with self.executor_factory(max_workers=3) as w:
885 futures = []
886 for db in dbs:
887 futures.append(w.submit(
888 self.process_snapshot_retention,
889 db))
890 for f in as_completed(futures):
891 if f.exception():
892 self.log.error(
893 "Exception setting rds retention \n %s",
894 f.exception())
895 return dbs
897 def process_snapshot_retention(self, resource):
898 current_retention = int(resource.get('BackupRetentionPeriod', 0))
899 current_copy_tags = resource['CopyTagsToSnapshot']
900 new_retention = self.data['days']
901 new_copy_tags = self.data.get('copy-tags', True)
902 retention_type = self.data.get('enforce', 'min').lower()
904 if ((retention_type == 'min' or
905 current_copy_tags != new_copy_tags) and
906 _db_instance_eligible_for_backup(resource)):
907 self.set_retention_window(
908 resource,
909 max(current_retention, new_retention),
910 new_copy_tags)
911 return resource
913 if ((retention_type == 'max' or
914 current_copy_tags != new_copy_tags) and
915 _db_instance_eligible_for_backup(resource)):
916 self.set_retention_window(
917 resource,
918 min(current_retention, new_retention),
919 new_copy_tags)
920 return resource
922 if ((retention_type == 'exact' or
923 current_copy_tags != new_copy_tags) and
924 _db_instance_eligible_for_backup(resource)):
925 self.set_retention_window(resource, new_retention, new_copy_tags)
926 return resource
928 def set_retention_window(self, resource, retention, copy_tags):
929 c = local_session(self.manager.session_factory).client('rds')
930 c.modify_db_instance(
931 DBInstanceIdentifier=resource['DBInstanceIdentifier'],
932 BackupRetentionPeriod=retention,
933 CopyTagsToSnapshot=copy_tags)
936@actions.register('set-public-access')
937class RDSSetPublicAvailability(BaseAction):
938 """
939 This action allows for toggling an RDS instance
940 'PubliclyAccessible' flag to true or false
942 :example:
944 .. code-block:: yaml
946 policies:
947 - name: disable-rds-public-accessibility
948 resource: rds
949 filters:
950 - PubliclyAccessible: true
951 actions:
952 - type: set-public-access
953 state: false
954 """
956 schema = type_schema(
957 "set-public-access",
958 state={'type': 'boolean'})
959 permissions = ('rds:ModifyDBInstance',)
961 def set_accessibility(self, r):
962 client = local_session(self.manager.session_factory).client('rds')
963 client.modify_db_instance(
964 DBInstanceIdentifier=r['DBInstanceIdentifier'],
965 PubliclyAccessible=self.data.get('state', False))
967 def process(self, rds):
968 with self.executor_factory(max_workers=2) as w:
969 futures = {w.submit(self.set_accessibility, r): r for r in rds}
970 for f in as_completed(futures):
971 if f.exception():
972 self.log.error(
973 "Exception setting public access on %s \n %s",
974 futures[f]['DBInstanceIdentifier'], f.exception())
975 return rds
978@resources.register('rds-subscription')
979class RDSSubscription(QueryResourceManager):
981 class resource_type(TypeInfo):
982 service = 'rds'
983 arn_type = 'es'
984 cfn_type = 'AWS::RDS::EventSubscription'
985 enum_spec = (
986 'describe_event_subscriptions', 'EventSubscriptionsList', None)
987 name = id = "CustSubscriptionId"
988 arn = 'EventSubscriptionArn'
989 date = "SubscriptionCreateTime"
990 permissions_enum = ('rds:DescribeEventSubscriptions',)
991 universal_taggable = object()
993 augment = universal_augment
996@RDSSubscription.action_registry.register('delete')
997class RDSSubscriptionDelete(BaseAction):
998 """Deletes a RDS snapshot resource
1000 :example:
1002 .. code-block:: yaml
1004 policies:
1005 - name: rds-subscription-delete
1006 resource: rds-subscription
1007 filters:
1008 - type: value
1009 key: CustSubscriptionId
1010 value: xyz
1011 actions:
1012 - delete
1013 """
1015 schema = type_schema('delete')
1016 permissions = ('rds:DeleteEventSubscription',)
1018 def process(self, resources):
1019 client = local_session(self.manager.session_factory).client('rds')
1020 for r in resources:
1021 self.manager.retry(
1022 client.delete_event_subscription, SubscriptionName=r['CustSubscriptionId'],
1023 ignore_err_codes=('SubscriptionNotFoundFault',
1024 'InvalidEventSubscriptionStateFault'))
1027class DescribeRDSSnapshot(DescribeSource):
1029 def get_resources(self, ids, cache=True):
1030 super_get = super().get_resources
1031 return list(itertools.chain(*[super_get((i,)) for i in ids]))
1033 def augment(self, snaps):
1034 for s in snaps:
1035 s['Tags'] = s.pop('TagList', ())
1036 return snaps
1039@resources.register('rds-snapshot')
1040class RDSSnapshot(QueryResourceManager):
1041 """Resource manager for RDS DB snapshots.
1042 """
1044 class resource_type(TypeInfo):
1045 service = 'rds'
1046 arn_type = 'snapshot'
1047 arn_separator = ':'
1048 enum_spec = ('describe_db_snapshots', 'DBSnapshots', None)
1049 name = id = 'DBSnapshotIdentifier'
1050 date = 'SnapshotCreateTime'
1051 config_type = "AWS::RDS::DBSnapshot"
1052 filter_name = "DBSnapshotIdentifier"
1053 filter_type = "scalar"
1054 universal_taggable = True
1055 permissions_enum = ('rds:DescribeDBSnapshots',)
1057 source_mapping = {
1058 'describe': DescribeRDSSnapshot,
1059 'config': ConfigSource
1060 }
1063@RDSSnapshot.filter_registry.register('onhour')
1064class RDSSnapshotOnHour(OnHour):
1065 """Scheduled action on rds snapshot."""
1068@RDSSnapshot.filter_registry.register('instance')
1069class SnapshotInstance(related.RelatedResourceFilter):
1070 """Filter snapshots by their database attributes.
1072 :example:
1074 Find snapshots without an extant database
1076 .. code-block:: yaml
1078 policies:
1079 - name: rds-snapshot-orphan
1080 resource: aws.rds-snapshot
1081 filters:
1082 - type: instance
1083 value: 0
1084 value_type: resource_count
1085 """
1086 schema = type_schema(
1087 'instance', rinherit=ValueFilter.schema
1088 )
1090 RelatedResource = "c7n.resources.rds.RDS"
1091 RelatedIdsExpression = "DBInstanceIdentifier"
1092 FetchThreshold = 5
1095@RDSSnapshot.filter_registry.register('latest')
1096class LatestSnapshot(Filter):
1097 """Return the latest snapshot for each database.
1098 """
1099 schema = type_schema('latest', automatic={'type': 'boolean'})
1100 permissions = ('rds:DescribeDBSnapshots',)
1102 def process(self, resources, event=None):
1103 results = []
1104 if not self.data.get('automatic', True):
1105 resources = [r for r in resources if r['SnapshotType'] == 'manual']
1106 for db_identifier, snapshots in itertools.groupby(
1107 resources, operator.itemgetter('DBInstanceIdentifier')):
1108 results.append(
1109 sorted(snapshots,
1110 key=operator.itemgetter('SnapshotCreateTime'))[-1])
1111 return results
1114@RDSSnapshot.filter_registry.register('age')
1115class RDSSnapshotAge(AgeFilter):
1116 """Filters RDS snapshots based on age (in days)
1118 :example:
1120 .. code-block:: yaml
1122 policies:
1123 - name: rds-snapshot-expired
1124 resource: rds-snapshot
1125 filters:
1126 - type: age
1127 days: 28
1128 op: ge
1129 actions:
1130 - delete
1131 """
1133 schema = type_schema(
1134 'age', days={'type': 'number'},
1135 op={'$ref': '#/definitions/filters_common/comparison_operators'})
1137 date_attribute = 'SnapshotCreateTime'
1139 def get_resource_date(self, i):
1140 return i.get('SnapshotCreateTime')
1143@RDSSnapshot.action_registry.register('restore')
1144class RestoreInstance(BaseAction):
1145 """Restore an rds instance from a snapshot.
1147 Note this requires the snapshot or db deletion be taken
1148 with the `copy-restore-info` boolean flag set to true, as
1149 various instance metadata is stored on the snapshot as tags.
1151 additional parameters to restore db instance api call be overriden
1152 via `restore_options` settings. various modify db instance parameters
1153 can be specified via `modify_options` settings.
1154 """
1156 schema = type_schema(
1157 'restore',
1158 restore_options={'type': 'object'},
1159 modify_options={'type': 'object'})
1161 permissions = (
1162 'rds:ModifyDBInstance',
1163 'rds:ModifyDBParameterGroup',
1164 'rds:ModifyOptionGroup',
1165 'rds:RebootDBInstance',
1166 'rds:RestoreDBInstanceFromDBSnapshot')
1168 poll_period = 60
1169 restore_keys = {
1170 'VPCSecurityGroups', 'MultiAZ', 'DBSubnetGroupName',
1171 'InstanceClass', 'StorageType', 'ParameterGroupName',
1172 'OptionGroupName'}
1174 def validate(self):
1175 found = False
1176 for f in self.manager.iter_filters():
1177 if isinstance(f, LatestSnapshot):
1178 found = True
1179 if not found:
1180 # do we really need this...
1181 raise PolicyValidationError(
1182 "must filter by latest to use restore action %s" % (
1183 self.manager.data,))
1184 return self
1186 def process(self, resources):
1187 client = local_session(self.manager.session_factory).client('rds')
1188 # restore up to 10 in parallel, we have to wait on each.
1189 with self.executor_factory(
1190 max_workers=min(10, len(resources) or 1)) as w:
1191 futures = {}
1192 for r in resources:
1193 tags = {t['Key']: t['Value'] for t in r['Tags']}
1194 if not set(tags).issuperset(self.restore_keys):
1195 self.log.warning(
1196 "snapshot:%s missing restore tags",
1197 r['DBSnapshotIdentifier'])
1198 continue
1199 futures[w.submit(self.process_instance, client, r)] = r
1200 for f in as_completed(futures):
1201 r = futures[f]
1202 if f.exception():
1203 self.log.warning(
1204 "Error restoring db:%s from:%s error:\n%s",
1205 r['DBInstanceIdentifier'], r['DBSnapshotIdentifier'],
1206 f.exception())
1207 continue
1209 def process_instance(self, client, r):
1210 params, post_modify = self.get_restore_from_tags(r)
1211 self.manager.retry(
1212 client.restore_db_instance_from_db_snapshot, **params)
1213 waiter = client.get_waiter('db_instance_available')
1214 # wait up to 40m
1215 waiter.config.delay = self.poll_period
1216 waiter.wait(DBInstanceIdentifier=params['DBInstanceIdentifier'])
1217 self.manager.retry(
1218 client.modify_db_instance,
1219 DBInstanceIdentifier=params['DBInstanceIdentifier'],
1220 ApplyImmediately=True,
1221 **post_modify)
1222 self.manager.retry(
1223 client.reboot_db_instance,
1224 DBInstanceIdentifier=params['DBInstanceIdentifier'],
1225 ForceFailover=False)
1227 def get_restore_from_tags(self, snapshot):
1228 params, post_modify = {}, {}
1229 tags = {t['Key']: t['Value'] for t in snapshot['Tags']}
1231 params['DBInstanceIdentifier'] = snapshot['DBInstanceIdentifier']
1232 params['DBSnapshotIdentifier'] = snapshot['DBSnapshotIdentifier']
1233 params['MultiAZ'] = tags['MultiAZ'] == 'True' and True or False
1234 params['DBSubnetGroupName'] = tags['DBSubnetGroupName']
1235 params['DBInstanceClass'] = tags['InstanceClass']
1236 params['CopyTagsToSnapshot'] = True
1237 params['StorageType'] = tags['StorageType']
1238 params['OptionGroupName'] = tags['OptionGroupName']
1240 post_modify['DBParameterGroupName'] = tags['ParameterGroupName']
1241 post_modify['VpcSecurityGroupIds'] = tags['VPCSecurityGroups'].split(',')
1243 params['Tags'] = [
1244 {'Key': k, 'Value': v} for k, v in tags.items()
1245 if k not in self.restore_keys]
1247 params.update(self.data.get('restore_options', {}))
1248 post_modify.update(self.data.get('modify_options', {}))
1249 return params, post_modify
1252@RDSSnapshot.filter_registry.register('cross-account')
1253class CrossAccountAccess(CrossAccountAccessFilter):
1255 permissions = ('rds:DescribeDBSnapshotAttributes',)
1256 attributes_key = 'c7n:attributes'
1257 annotation_key = 'c7n:CrossAccountViolations'
1259 def process(self, resources, event=None):
1260 self.accounts = self.get_accounts()
1261 results = []
1262 with self.executor_factory(max_workers=2) as w:
1263 futures = []
1264 for resource_set in chunks(resources, 20):
1265 futures.append(w.submit(
1266 self.process_resource_set, resource_set))
1267 for f in as_completed(futures):
1268 if f.exception():
1269 self.log.error(
1270 "Exception checking cross account access\n %s" % (
1271 f.exception()))
1272 continue
1273 results.extend(f.result())
1274 return results
1276 def process_resource_set(self, resource_set):
1277 client = local_session(self.manager.session_factory).client('rds')
1278 results = []
1279 for r in resource_set:
1280 attrs = {t['AttributeName']: t['AttributeValues']
1281 for t in self.manager.retry(
1282 client.describe_db_snapshot_attributes,
1283 DBSnapshotIdentifier=r['DBSnapshotIdentifier'])[
1284 'DBSnapshotAttributesResult']['DBSnapshotAttributes']}
1285 r[self.attributes_key] = attrs
1286 shared_accounts = set(attrs.get('restore', []))
1287 delta_accounts = shared_accounts.difference(self.accounts)
1288 if delta_accounts:
1289 r[self.annotation_key] = list(delta_accounts)
1290 results.append(r)
1291 return results
1294@RDSSnapshot.action_registry.register('set-permissions')
1295class SetPermissions(BaseAction):
1296 """Set permissions for copying or restoring an RDS snapshot
1298 Use the 'add' and 'remove' parameters to control which accounts to
1299 add or remove, respectively. The default is to remove any
1300 permissions granted to other AWS accounts.
1302 Use `remove: matched` in combination with the `cross-account` filter
1303 for more flexible removal options such as preserving access for
1304 a set of whitelisted accounts:
1306 :example:
1308 .. code-block:: yaml
1310 policies:
1311 - name: rds-snapshot-remove-cross-account
1312 resource: rds-snapshot
1313 filters:
1314 - type: cross-account
1315 whitelist:
1316 - '112233445566'
1317 actions:
1318 - type: set-permissions
1319 remove: matched
1320 """
1321 schema = type_schema(
1322 'set-permissions',
1323 remove={'oneOf': [
1324 {'enum': ['matched']},
1325 {'type': 'array', 'items': {
1326 'oneOf': [
1327 {'type': 'string', 'minLength': 12, 'maxLength': 12},
1328 {'enum': ['all']},
1329 ],
1330 }}
1331 ]},
1332 add={
1333 'type': 'array', 'items': {
1334 'oneOf': [
1335 {'type': 'string', 'minLength': 12, 'maxLength': 12},
1336 {'enum': ['all']},
1337 ]
1338 }
1339 }
1340 )
1342 permissions = ('rds:ModifyDBSnapshotAttribute',)
1344 def validate(self):
1345 if self.data.get('remove') == 'matched':
1346 found = False
1347 for f in self.manager.iter_filters():
1348 if isinstance(f, CrossAccountAccessFilter):
1349 found = True
1350 break
1351 if not found:
1352 raise PolicyValidationError(
1353 "policy:%s filter:%s with matched requires cross-account filter" % (
1354 self.manager.ctx.policy.name, self.type))
1356 def process(self, snapshots):
1357 client = local_session(self.manager.session_factory).client('rds')
1358 for s in snapshots:
1359 self.process_snapshot(client, s)
1361 def process_snapshot(self, client, snapshot):
1362 add_accounts = self.data.get('add', [])
1363 remove_accounts = self.data.get('remove', [])
1365 if not (add_accounts or remove_accounts):
1366 if CrossAccountAccess.attributes_key not in snapshot:
1367 attrs = {
1368 t['AttributeName']: t['AttributeValues']
1369 for t in self.manager.retry(
1370 client.describe_db_snapshot_attributes,
1371 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier']
1372 )['DBSnapshotAttributesResult']['DBSnapshotAttributes']
1373 }
1374 snapshot[CrossAccountAccess.attributes_key] = attrs
1375 remove_accounts = snapshot[CrossAccountAccess.attributes_key].get('restore', [])
1376 elif remove_accounts == 'matched':
1377 remove_accounts = snapshot.get(CrossAccountAccess.annotation_key, [])
1379 if add_accounts or remove_accounts:
1380 client.modify_db_snapshot_attribute(
1381 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier'],
1382 AttributeName='restore',
1383 ValuesToRemove=remove_accounts,
1384 ValuesToAdd=add_accounts)
1387@RDSSnapshot.action_registry.register('region-copy')
1388class RegionCopySnapshot(BaseAction):
1389 """Copy a snapshot across regions.
1391 Note there is a max in flight for cross region rds snapshots
1392 of 5 per region. This action will attempt to retry automatically
1393 for an hr.
1395 Example::
1397 - name: copy-encrypted-snapshots
1398 description: |
1399 copy snapshots under 1 day old to dr region with kms
1400 resource: rds-snapshot
1401 region: us-east-1
1402 filters:
1403 - Status: available
1404 - type: value
1405 key: SnapshotCreateTime
1406 value_type: age
1407 value: 1
1408 op: less-than
1409 actions:
1410 - type: region-copy
1411 target_region: us-east-2
1412 target_key: arn:aws:kms:us-east-2:0000:key/cb291f53-c9cf61
1413 copy_tags: true
1414 tags:
1415 OriginRegion: us-east-1
1416 """
1418 schema = type_schema(
1419 'region-copy',
1420 target_region={'type': 'string'},
1421 target_key={'type': 'string'},
1422 copy_tags={'type': 'boolean'},
1423 tags={'type': 'object'},
1424 required=('target_region',))
1426 permissions = ('rds:CopyDBSnapshot',)
1427 min_delay = 120
1428 max_attempts = 30
1430 def validate(self):
1431 if self.data.get('target_region') and self.manager.data.get('mode'):
1432 raise PolicyValidationError(
1433 "cross region snapshot may require waiting for "
1434 "longer then lambda runtime allows %s" % (self.manager.data,))
1435 return self
1437 def process(self, resources):
1438 if self.data['target_region'] == self.manager.config.region:
1439 self.log.warning(
1440 "Source and destination region are the same, skipping copy")
1441 return
1442 for resource_set in chunks(resources, 20):
1443 self.process_resource_set(resource_set)
1445 def process_resource(self, target, key, tags, snapshot):
1446 p = {}
1447 if key:
1448 p['KmsKeyId'] = key
1449 p['TargetDBSnapshotIdentifier'] = snapshot[
1450 'DBSnapshotIdentifier'].replace(':', '-')
1451 p['SourceRegion'] = self.manager.config.region
1452 p['SourceDBSnapshotIdentifier'] = snapshot['DBSnapshotArn']
1454 if self.data.get('copy_tags', True):
1455 p['CopyTags'] = True
1456 if tags:
1457 p['Tags'] = tags
1459 retry = get_retry(
1460 ('SnapshotQuotaExceeded',),
1461 # TODO make this configurable, class defaults to 1hr
1462 min_delay=self.min_delay,
1463 max_attempts=self.max_attempts,
1464 log_retries=logging.DEBUG)
1465 try:
1466 result = retry(target.copy_db_snapshot, **p)
1467 except ClientError as e:
1468 if e.response['Error']['Code'] == 'DBSnapshotAlreadyExists':
1469 self.log.warning(
1470 "Snapshot %s already exists in target region",
1471 snapshot['DBSnapshotIdentifier'])
1472 return
1473 raise
1474 snapshot['c7n:CopiedSnapshot'] = result[
1475 'DBSnapshot']['DBSnapshotArn']
1477 def process_resource_set(self, resource_set):
1478 target_client = self.manager.session_factory(
1479 region=self.data['target_region']).client('rds')
1480 target_key = self.data.get('target_key')
1481 tags = [{'Key': k, 'Value': v} for k, v
1482 in self.data.get('tags', {}).items()]
1484 for snapshot_set in chunks(resource_set, 5):
1485 for r in snapshot_set:
1486 # If tags are supplied, copy tags are ignored, and
1487 # we need to augment the tag set with the original
1488 # resource tags to preserve the common case.
1489 rtags = tags and list(tags) or None
1490 if tags and self.data.get('copy_tags', True):
1491 rtags.extend(r['Tags'])
1492 self.process_resource(target_client, target_key, rtags, r)
1495@RDSSnapshot.action_registry.register('delete')
1496class RDSSnapshotDelete(BaseAction):
1497 """Deletes a RDS snapshot resource
1499 :example:
1501 .. code-block:: yaml
1503 policies:
1504 - name: rds-snapshot-delete-stale
1505 resource: rds-snapshot
1506 filters:
1507 - type: age
1508 days: 28
1509 op: ge
1510 actions:
1511 - delete
1512 """
1514 schema = type_schema('delete')
1515 permissions = ('rds:DeleteDBSnapshot',)
1517 def process(self, snapshots):
1518 snapshots = self.filter_resources(snapshots, 'SnapshotType', ('manual',))
1519 if not snapshots:
1520 return []
1521 log.info("Deleting %d rds snapshots", len(snapshots))
1522 with self.executor_factory(max_workers=3) as w:
1523 futures = []
1524 for snapshot_set in chunks(reversed(snapshots), size=50):
1525 futures.append(
1526 w.submit(self.process_snapshot_set, snapshot_set))
1527 for f in as_completed(futures):
1528 if f.exception():
1529 self.log.error(
1530 "Exception deleting snapshot set \n %s",
1531 f.exception())
1532 return snapshots
1534 def process_snapshot_set(self, snapshots_set):
1535 c = local_session(self.manager.session_factory).client('rds')
1536 for s in snapshots_set:
1537 c.delete_db_snapshot(
1538 DBSnapshotIdentifier=s['DBSnapshotIdentifier'])
1541@actions.register('modify-security-groups')
1542class RDSModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
1544 permissions = ('rds:ModifyDBInstance', 'rds:ModifyDBCluster')
1545 vpc_expr = 'DBSubnetGroup.VpcId'
1547 def process(self, rds_instances):
1548 replication_group_map = {}
1549 client = local_session(self.manager.session_factory).client('rds')
1550 groups = super(RDSModifyVpcSecurityGroups, self).get_groups(
1551 rds_instances)
1553 # either build map for DB cluster or modify DB instance directly
1554 for idx, i in enumerate(rds_instances):
1555 if i.get('DBClusterIdentifier'):
1556 # build map of Replication Groups to Security Groups
1557 replication_group_map[i['DBClusterIdentifier']] = groups[idx]
1558 else:
1559 client.modify_db_instance(
1560 DBInstanceIdentifier=i['DBInstanceIdentifier'],
1561 VpcSecurityGroupIds=groups[idx])
1563 # handle DB cluster, if necessary
1564 for idx, r in enumerate(replication_group_map.keys()):
1565 client.modify_db_cluster(
1566 DBClusterIdentifier=r,
1567 VpcSecurityGroupIds=replication_group_map[r]
1568 )
1571class DescribeSubnetGroup(DescribeSource):
1573 def augment(self, resources):
1574 _db_subnet_group_tags(
1575 resources, self.manager.session_factory,
1576 self.manager.executor_factory, self.manager.retry)
1577 return resources
1580@resources.register('rds-subnet-group')
1581class RDSSubnetGroup(QueryResourceManager):
1582 """RDS subnet group."""
1584 class resource_type(TypeInfo):
1585 service = 'rds'
1586 arn_type = 'subgrp'
1587 id = name = 'DBSubnetGroupName'
1588 arn_separator = ':'
1589 enum_spec = (
1590 'describe_db_subnet_groups', 'DBSubnetGroups', None)
1591 filter_name = 'DBSubnetGroupName'
1592 filter_type = 'scalar'
1593 permissions_enum = ('rds:DescribeDBSubnetGroups',)
1594 cfn_type = config_type = 'AWS::RDS::DBSubnetGroup'
1595 universal_taggable = object()
1597 source_mapping = {
1598 'config': ConfigSource,
1599 'describe': DescribeSubnetGroup
1600 }
1603def _db_subnet_group_tags(subnet_groups, session_factory, executor_factory, retry):
1604 client = local_session(session_factory).client('rds')
1606 def process_tags(g):
1607 try:
1608 g['Tags'] = client.list_tags_for_resource(
1609 ResourceName=g['DBSubnetGroupArn'])['TagList']
1610 return g
1611 except client.exceptions.DBSubnetGroupNotFoundFault:
1612 return None
1614 return list(filter(None, map(process_tags, subnet_groups)))
1617@RDSSubnetGroup.action_registry.register('delete')
1618class RDSSubnetGroupDeleteAction(BaseAction):
1619 """Action to delete RDS Subnet Group
1621 It is recommended to apply a filter to the delete policy to avoid unwanted
1622 deletion of any rds subnet groups.
1624 :example:
1626 .. code-block:: yaml
1628 policies:
1629 - name: rds-subnet-group-delete
1630 resource: rds-subnet-group
1631 filters:
1632 - Instances: []
1633 actions:
1634 - delete
1635 """
1637 schema = type_schema('delete')
1638 permissions = ('rds:DeleteDBSubnetGroup',)
1640 def process(self, subnet_group):
1641 with self.executor_factory(max_workers=2) as w:
1642 list(w.map(self.process_subnetgroup, subnet_group))
1644 def process_subnetgroup(self, subnet_group):
1645 client = local_session(self.manager.session_factory).client('rds')
1646 client.delete_db_subnet_group(DBSubnetGroupName=subnet_group['DBSubnetGroupName'])
1649@RDSSubnetGroup.filter_registry.register('unused')
1650class UnusedRDSSubnetGroup(Filter):
1651 """Filters all launch rds subnet groups that are not in use but exist
1653 :example:
1655 .. code-block:: yaml
1657 policies:
1658 - name: rds-subnet-group-delete-unused
1659 resource: rds-subnet-group
1660 filters:
1661 - unused
1662 """
1664 schema = type_schema('unused')
1666 def get_permissions(self):
1667 return self.manager.get_resource_manager('rds').get_permissions()
1669 def process(self, configs, event=None):
1670 rds = self.manager.get_resource_manager('rds').resources()
1671 self.used = set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName', rds))
1672 self.used.update(set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName',
1673 self.manager.get_resource_manager('rds-cluster').resources(augment=False))))
1674 return super(UnusedRDSSubnetGroup, self).process(configs)
1676 def __call__(self, config):
1677 return config['DBSubnetGroupName'] not in self.used
1680@filters.register('db-parameter')
1681class ParameterFilter(ValueFilter):
1682 """
1683 Applies value type filter on set db parameter values.
1684 :example:
1686 .. code-block:: yaml
1688 policies:
1689 - name: rds-pg
1690 resource: rds
1691 filters:
1692 - type: db-parameter
1693 key: someparam
1694 op: eq
1695 value: someval
1696 """
1698 schema = type_schema('db-parameter', rinherit=ValueFilter.schema)
1699 schema_alias = False
1700 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters', )
1701 policy_annotation = 'c7n:MatchedDBParameter'
1703 @staticmethod
1704 def recast(val, datatype):
1705 """ Re-cast the value based upon an AWS supplied datatype
1706 and treat nulls sensibly.
1707 """
1708 ret_val = val
1709 if datatype == 'string':
1710 ret_val = str(val)
1711 elif datatype == 'boolean':
1712 # AWS returns 1s and 0s for boolean for most of the cases
1713 if val.isdigit():
1714 ret_val = bool(int(val))
1715 # AWS returns 'TRUE,FALSE' for Oracle engine
1716 elif val == 'TRUE':
1717 ret_val = True
1718 elif val == 'FALSE':
1719 ret_val = False
1720 elif datatype == 'integer':
1721 if val.isdigit():
1722 ret_val = int(val)
1723 elif datatype == 'float':
1724 ret_val = float(val) if val else 0.0
1726 return ret_val
1728 # Private method for 'DBParameterGroupName' paginator
1729 def _get_param_list(self, pg):
1730 client = local_session(self.manager.session_factory).client('rds')
1731 paginator = client.get_paginator('describe_db_parameters')
1732 param_list = list(itertools.chain(*[p['Parameters']
1733 for p in paginator.paginate(DBParameterGroupName=pg)]))
1734 return param_list
1736 def handle_paramgroup_cache(self, param_groups):
1737 pgcache = {}
1738 cache = self.manager._cache
1740 with cache:
1741 for pg in param_groups:
1742 cache_key = {
1743 'region': self.manager.config.region,
1744 'account_id': self.manager.config.account_id,
1745 'rds-pg': pg}
1746 pg_values = cache.get(cache_key)
1747 if pg_values is not None:
1748 pgcache[pg] = pg_values
1749 continue
1750 param_list = self._get_param_list(pg)
1751 pgcache[pg] = {
1752 p['ParameterName']: self.recast(p['ParameterValue'], p['DataType'])
1753 for p in param_list if 'ParameterValue' in p}
1754 cache.save(cache_key, pgcache[pg])
1755 return pgcache
1757 def process(self, resources, event=None):
1758 results = []
1759 parameter_group_list = {db['DBParameterGroups'][0]['DBParameterGroupName']
1760 for db in resources}
1761 paramcache = self.handle_paramgroup_cache(parameter_group_list)
1762 for resource in resources:
1763 for pg in resource['DBParameterGroups']:
1764 pg_values = paramcache[pg['DBParameterGroupName']]
1765 if self.match(pg_values):
1766 resource.setdefault(self.policy_annotation, []).append(
1767 self.data.get('key'))
1768 results.append(resource)
1769 break
1770 return results
1773@actions.register('modify-db')
1774class ModifyDb(BaseAction):
1775 """Modifies an RDS instance based on specified parameter
1776 using ModifyDbInstance.
1778 'Update' is an array with with key value pairs that should be set to
1779 the property and value you wish to modify.
1780 'Immediate" determines whether the modification is applied immediately
1781 or not. If 'immediate' is not specified, default is false.
1783 :example:
1785 .. code-block:: yaml
1787 policies:
1788 - name: disable-rds-deletion-protection
1789 resource: rds
1790 filters:
1791 - DeletionProtection: true
1792 - PubliclyAccessible: true
1793 actions:
1794 - type: modify-db
1795 update:
1796 - property: 'DeletionProtection'
1797 value: false
1798 - property: 'PubliclyAccessible'
1799 value: false
1800 immediate: true
1801 """
1803 schema = type_schema(
1804 'modify-db',
1805 immediate={"type": 'boolean'},
1806 update={
1807 'type': 'array',
1808 'items': {
1809 'type': 'object',
1810 'properties': {
1811 'property': {'type': 'string', 'enum': [
1812 'AllocatedStorage',
1813 'DBInstanceClass',
1814 'DBSubnetGroupName',
1815 'DBSecurityGroups',
1816 'VpcSecurityGroupIds',
1817 'MasterUserPassword',
1818 'DBParameterGroupName',
1819 'BackupRetentionPeriod',
1820 'PreferredBackupWindow',
1821 'PreferredMaintenanceWindow',
1822 'MultiAZ',
1823 'EngineVersion',
1824 'AllowMajorVersionUpgrade',
1825 'AutoMinorVersionUpgrade',
1826 'LicenseModel',
1827 'Iops',
1828 'OptionGroupName',
1829 'NewDBInstanceIdentifier',
1830 'StorageType',
1831 'TdeCredentialArn',
1832 'TdeCredentialPassword',
1833 'CACertificateIdentifier',
1834 'Domain',
1835 'CopyTagsToSnapshot',
1836 'MonitoringInterval',
1837 'MonitoringRoleARN',
1838 'DBPortNumber',
1839 'PubliclyAccessible',
1840 'DomainIAMRoleName',
1841 'PromotionTier',
1842 'EnableIAMDatabaseAuthentication',
1843 'EnablePerformanceInsights',
1844 'PerformanceInsightsKMSKeyId',
1845 'PerformanceInsightsRetentionPeriod',
1846 'CloudwatchLogsExportConfiguration',
1847 'ProcessorFeatures',
1848 'UseDefaultProcessorFeatures',
1849 'DeletionProtection',
1850 'MaxAllocatedStorage',
1851 'CertificateRotationRestart']},
1852 'value': {}
1853 },
1854 },
1855 },
1856 required=('update',))
1858 permissions = ('rds:ModifyDBInstance',)
1859 conversion_map = {
1860 'DBSubnetGroupName': 'DBSubnetGroup.DBSubnetGroupName',
1861 'VpcSecurityGroupIds': 'VpcSecurityGroups[].VpcSecurityGroupId',
1862 'DBParameterGroupName': 'DBParameterGroups[].DBParameterGroupName',
1863 'OptionGroupName': 'OptionGroupMemberships[].OptionGroupName',
1864 'NewDBInstanceIdentifier': 'DBInstanceIdentifier',
1865 'Domain': 'DomainMemberships[].DomainName',
1866 'DBPortNumber': 'Endpoint.Port',
1867 'EnablePerformanceInsights': 'PerformanceInsightsEnabled',
1868 'CloudwatchLogsExportConfiguration': 'EnabledCloudwatchLogsExports'
1869 }
1871 def validate(self):
1872 if self.data.get('update'):
1873 update_dict = dict((i['property'], i['value']) for i in self.data.get('update'))
1874 if ('MonitoringInterval' in update_dict and update_dict['MonitoringInterval'] > 0 and
1875 'MonitoringRoleARN' not in update_dict):
1876 raise PolicyValidationError(
1877 "A MonitoringRoleARN value is required \
1878 if you specify a MonitoringInterval value other than 0")
1879 if ('CloudwatchLogsExportConfiguration' in update_dict
1880 and all(
1881 k not in update_dict.get('CloudwatchLogsExportConfiguration')
1882 for k in ('EnableLogTypes', 'DisableLogTypes'))):
1883 raise PolicyValidationError(
1884 "A EnableLogTypes or DisableLogTypes input list is required\
1885 for setting CloudwatchLogsExportConfiguration")
1886 return self
1888 def process(self, resources):
1889 c = local_session(self.manager.session_factory).client('rds')
1890 for r in resources:
1891 param = {
1892 u['property']: u['value'] for u in self.data.get('update')
1893 if r.get(
1894 u['property'],
1895 jmespath_search(
1896 self.conversion_map.get(u['property'], 'None'), r))
1897 != u['value']}
1898 if not param:
1899 continue
1900 param['ApplyImmediately'] = self.data.get('immediate', False)
1901 param['DBInstanceIdentifier'] = r['DBInstanceIdentifier']
1902 try:
1903 c.modify_db_instance(**param)
1904 except c.exceptions.DBInstanceNotFoundFault:
1905 raise
1908@resources.register('rds-reserved')
1909class ReservedRDS(QueryResourceManager):
1910 """Lists all active rds reservations
1912 :example:
1914 .. code-block:: yaml
1916 policies:
1917 - name: existing-rds-reservations
1918 resource: rds-reserved
1919 filters:
1920 - State: active
1921 """
1923 class resource_type(TypeInfo):
1924 service = 'rds'
1925 name = id = 'ReservedDBInstanceId'
1926 date = 'StartTime'
1927 enum_spec = (
1928 'describe_reserved_db_instances', 'ReservedDBInstances', None)
1929 filter_name = 'ReservedDBInstances'
1930 filter_type = 'list'
1931 arn_type = "ri"
1932 arn = "ReservedDBInstanceArn"
1933 permissions_enum = ('rds:DescribeReservedDBInstances',)
1934 universal_taggable = object()
1936 augment = universal_augment
1939RDS.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter)
1942@filters.register('consecutive-snapshots')
1943class ConsecutiveSnapshots(Filter):
1944 """Returns instances where number of consective daily snapshots is
1945 equal to/or greater than n days.
1947 :example:
1949 .. code-block:: yaml
1951 policies:
1952 - name: rds-daily-snapshot-count
1953 resource: rds
1954 filters:
1955 - type: consecutive-snapshots
1956 days: 7
1957 """
1958 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1},
1959 required=['days'])
1960 permissions = ('rds:DescribeDBSnapshots', 'rds:DescribeDBInstances')
1961 annotation = 'c7n:DBSnapshots'
1963 def process_resource_set(self, client, resources):
1964 rds_instances = [r['DBInstanceIdentifier'] for r in resources]
1965 paginator = client.get_paginator('describe_db_snapshots')
1966 paginator.PAGE_ITERATOR_CLS = RetryPageIterator
1967 db_snapshots = paginator.paginate(Filters=[{'Name': 'db-instance-id',
1968 'Values': rds_instances}]).build_full_result().get('DBSnapshots', [])
1970 inst_map = {}
1971 for snapshot in db_snapshots:
1972 inst_map.setdefault(snapshot['DBInstanceIdentifier'], []).append(snapshot)
1973 for r in resources:
1974 r[self.annotation] = inst_map.get(r['DBInstanceIdentifier'], [])
1976 def process(self, resources, event=None):
1977 client = local_session(self.manager.session_factory).client('rds')
1978 results = []
1979 retention = self.data.get('days')
1980 utcnow = datetime.datetime.utcnow()
1981 expected_dates = set()
1982 for days in range(1, retention + 1):
1983 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d'))
1985 for resource_set in chunks(
1986 [r for r in resources if self.annotation not in r], 50):
1987 self.process_resource_set(client, resource_set)
1989 for r in resources:
1990 snapshot_dates = set()
1991 for snapshot in r[self.annotation]:
1992 if snapshot['Status'] == 'available':
1993 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d'))
1994 if expected_dates.issubset(snapshot_dates):
1995 results.append(r)
1996 return results
1999@filters.register('engine')
2000class EngineFilter(ValueFilter):
2001 """
2002 Filter a rds resource based on its Engine Metadata
2004 :example:
2006 .. code-block:: yaml
2008 policies:
2009 - name: find-deprecated-versions
2010 resource: aws.rds
2011 filters:
2012 - type: engine
2013 key: Status
2014 value: deprecated
2015 """
2017 schema = type_schema('engine', rinherit=ValueFilter.schema)
2019 permissions = ("rds:DescribeDBEngineVersions", )
2021 def process(self, resources, event=None):
2022 client = local_session(self.manager.session_factory).client('rds')
2024 engines = set()
2025 engine_versions = set()
2026 for r in resources:
2027 engines.add(r['Engine'])
2028 engine_versions.add(r['EngineVersion'])
2030 paginator = client.get_paginator('describe_db_engine_versions')
2031 response = paginator.paginate(
2032 Filters=[
2033 {'Name': 'engine', 'Values': list(engines)},
2034 {'Name': 'engine-version', 'Values': list(engine_versions)}
2035 ],
2036 IncludeAll=True,
2037 )
2038 all_versions = {}
2039 matched = []
2040 for page in response:
2041 for e in page['DBEngineVersions']:
2042 all_versions.setdefault(e['Engine'], {})
2043 all_versions[e['Engine']][e['EngineVersion']] = e
2044 for r in resources:
2045 v = all_versions[r['Engine']][r['EngineVersion']]
2046 if self.match(v):
2047 r['c7n:Engine'] = v
2048 matched.append(r)
2049 return matched
2052class DescribeDBProxy(DescribeSource):
2053 def augment(self, resources):
2054 return universal_augment(self.manager, resources)
2057@resources.register('rds-proxy')
2058class RDSProxy(QueryResourceManager):
2059 """Resource Manager for RDS DB Proxies
2061 :example:
2063 .. code-block:: yaml
2065 policies:
2066 - name: rds-proxy-tls-check
2067 resource: rds-proxy
2068 filters:
2069 - type: value
2070 key: RequireTLS
2071 value: false
2072 """
2074 class resource_type(TypeInfo):
2075 service = 'rds'
2076 name = id = 'DBProxyName'
2077 date = 'CreatedDate'
2078 enum_spec = ('describe_db_proxies', 'DBProxies', None)
2079 arn = 'DBProxyArn'
2080 arn_type = 'db-proxy'
2081 cfn_type = 'AWS::RDS::DBProxy'
2082 permissions_enum = ('rds:DescribeDBProxies',)
2083 universal_taggable = object()
2085 source_mapping = {
2086 'describe': DescribeDBProxy,
2087 'config': ConfigSource
2088 }
2091@RDSProxy.action_registry.register('delete')
2092class DeleteRDSProxy(BaseAction):
2093 """
2094 Deletes a RDS Proxy
2096 :example:
2098 .. code-block:: yaml
2100 policies:
2101 - name: delete-rds-proxy
2102 resource: aws.rds-proxy
2103 filters:
2104 - type: value
2105 key: "DBProxyName"
2106 op: eq
2107 value: "proxy-test-1"
2108 actions:
2109 - type: delete
2110 """
2112 schema = type_schema('delete')
2114 permissions = ('rds:DeleteDBProxy',)
2116 def process(self, resources):
2117 client = local_session(self.manager.session_factory).client('rds')
2118 for r in resources:
2119 self.manager.retry(
2120 client.delete_db_proxy, DBProxyName=r['DBProxyName'],
2121 ignore_err_codes=('DBProxyNotFoundFault',
2122 'InvalidDBProxyStateFault'))
2125@RDSProxy.filter_registry.register('subnet')
2126class RDSProxySubnetFilter(net_filters.SubnetFilter):
2128 RelatedIdsExpression = "VpcSubnetIds[]"
2131@RDSProxy.filter_registry.register('security-group')
2132class RDSProxySecurityGroupFilter(net_filters.SecurityGroupFilter):
2134 RelatedIdsExpression = "VpcSecurityGroupIds[]"
2137@RDSProxy.filter_registry.register('vpc')
2138class RDSProxyVpcFilter(net_filters.VpcFilter):
2140 RelatedIdsExpression = "VpcId"
2143@filters.register('db-option-groups')
2144class DbOptionGroups(ValueFilter):
2145 """This filter describes RDS option groups for associated RDS instances.
2146 Use this filter in conjunction with jmespath and value filter operators
2147 to filter RDS instance based on their option groups
2149 :example:
2151 .. code-block:: yaml
2153 policies:
2154 - name: rds-data-in-transit-encrypted
2155 resource: aws.rds
2156 filters:
2157 - type: db-option-groups
2158 key: Options[].OptionName
2159 op: intersect
2160 value:
2161 - SSL
2162 - NATIVE_NETWORK_ENCRYPTION
2164 :example:
2166 .. code-block:: yaml
2168 policies:
2169 - name: rds-oracle-encryption-in-transit
2170 resource: aws.rds
2171 filters:
2172 - Engine: oracle-ee
2173 - type: db-option-groups
2174 key: Options[].OptionSettings[?Name == 'SQLNET.ENCRYPTION_SERVER'].Value[]
2175 value:
2176 - REQUIRED
2177 """
2179 schema = type_schema('db-option-groups', rinherit=ValueFilter.schema)
2180 schema_alias = False
2181 permissions = ('rds:DescribeDBInstances', 'rds:DescribeOptionGroups', )
2182 policy_annotation = 'c7n:MatchedDBOptionGroups'
2184 def handle_optiongroup_cache(self, client, paginator, option_groups):
2185 ogcache = {}
2186 cache = self.manager._cache
2188 with cache:
2189 for og in option_groups:
2190 cache_key = {
2191 'region': self.manager.config.region,
2192 'account_id': self.manager.config.account_id,
2193 'rds-pg': og}
2194 og_values = cache.get(cache_key)
2195 if og_values is not None:
2196 ogcache[og] = og_values
2197 continue
2198 option_groups_list = list(itertools.chain(*[p['OptionGroupsList']
2199 for p in paginator.paginate(OptionGroupName=og)]))
2201 ogcache[og] = {}
2202 for option_group in option_groups_list:
2203 ogcache[og] = option_group
2205 cache.save(cache_key, ogcache[og])
2207 return ogcache
2209 def process(self, resources, event=None):
2210 results = []
2211 client = local_session(self.manager.session_factory).client('rds')
2212 paginator = client.get_paginator('describe_option_groups')
2213 option_groups = [db['OptionGroupMemberships'][0]['OptionGroupName']
2214 for db in resources]
2215 optioncache = self.handle_optiongroup_cache(client, paginator, option_groups)
2217 for resource in resources:
2218 for og in resource['OptionGroupMemberships']:
2219 og_values = optioncache[og['OptionGroupName']]
2220 if self.match(og_values):
2221 resource.setdefault(self.policy_annotation, []).append({
2222 k: jmespath_search(k, og_values)
2223 for k in {'OptionGroupName', self.data.get('key')}
2224 })
2225 results.append(resource)
2226 break
2228 return results
2231@filters.register('pending-maintenance')
2232class PendingMaintenance(Filter):
2233 """Scan DB instances for those with pending maintenance
2235 :example:
2237 .. code-block:: yaml
2239 policies:
2240 - name: rds-pending-maintenance
2241 resource: aws.rds
2242 filters:
2243 - pending-maintenance
2244 - type: value
2245 key: '"c7n:PendingMaintenance"[].PendingMaintenanceActionDetails[].Action'
2246 op: intersect
2247 value:
2248 - system-update
2249 """
2251 annotation_key = 'c7n:PendingMaintenance'
2252 schema = type_schema('pending-maintenance')
2253 permissions = ('rds:DescribePendingMaintenanceActions',)
2255 def process(self, resources, event=None):
2256 client = local_session(self.manager.session_factory).client('rds')
2258 results = []
2259 resource_maintenances = {}
2260 paginator = client.get_paginator('describe_pending_maintenance_actions')
2261 for page in paginator.paginate():
2262 for action in page['PendingMaintenanceActions']:
2263 resource_maintenances.setdefault(action['ResourceIdentifier'], []).append(action)
2265 for r in resources:
2266 pending_maintenances = resource_maintenances.get(r['DBInstanceArn'], [])
2267 if len(pending_maintenances) > 0:
2268 r[self.annotation_key] = pending_maintenances
2269 results.append(r)
2271 return results