1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4RDS Resource Manager
5====================
6
7Example Policies
8----------------
9
10Find rds instances that are publicly available
11
12.. code-block:: yaml
13
14 policies:
15 - name: rds-public
16 resource: rds
17 filters:
18 - PubliclyAccessible: true
19
20Find rds instances that are not encrypted
21
22.. code-block:: yaml
23
24 policies:
25 - name: rds-non-encrypted
26 resource: rds
27 filters:
28 - type: value
29 key: StorageEncrypted
30 value: true
31 op: ne
32
33"""
34import functools
35import itertools
36import logging
37import operator
38import re
39import datetime
40
41from datetime import timedelta
42
43from decimal import Decimal as D, ROUND_HALF_UP
44
45from c7n.vendored.distutils.version import LooseVersion
46from botocore.exceptions import ClientError
47from concurrent.futures import as_completed
48
49from c7n.actions import (
50 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction)
51
52from c7n.exceptions import PolicyValidationError
53from c7n.filters import (
54 CrossAccountAccessFilter, FilterRegistry, Filter, ValueFilter, AgeFilter)
55from c7n.filters.offhours import OffHour, OnHour
56from c7n.filters import related
57import c7n.filters.vpc as net_filters
58from c7n.manager import resources
59from c7n.query import (
60 QueryResourceManager, DescribeSource, ConfigSource, TypeInfo, RetryPageIterator)
61from c7n import deprecated, tags
62from c7n.tags import universal_augment
63
64from c7n.utils import (
65 local_session, type_schema, get_retry, chunks, snapshot_identifier,
66 merge_dict_list, filter_empty, jmespath_search)
67from c7n.resources.kms import ResourceKmsKeyAlias
68from c7n.resources.securityhub import PostFinding
69from c7n.filters.backup import ConsecutiveAwsBackupsFilter
70
71log = logging.getLogger('custodian.rds')
72
73filters = FilterRegistry('rds.filters')
74actions = ActionRegistry('rds.actions')
75
76
77class DescribeRDS(DescribeSource):
78
79 def augment(self, dbs):
80 for d in dbs:
81 d['Tags'] = d.pop('TagList', ())
82 return dbs
83
84
85class ConfigRDS(ConfigSource):
86
87 def load_resource(self, item):
88 resource = super().load_resource(item)
89 for k in list(resource.keys()):
90 if k.startswith('Db'):
91 resource["DB%s" % k[2:]] = resource[k]
92 return resource
93
94
95@resources.register('rds')
96class RDS(QueryResourceManager):
97 """Resource manager for RDS DB instances.
98 """
99
100 class resource_type(TypeInfo):
101 service = 'rds'
102 arn_type = 'db'
103 arn_separator = ':'
104 enum_spec = ('describe_db_instances', 'DBInstances', None)
105 id = 'DBInstanceIdentifier'
106 config_id = 'DbiResourceId'
107 name = 'Endpoint.Address'
108 filter_name = 'DBInstanceIdentifier'
109 filter_type = 'scalar'
110 date = 'InstanceCreateTime'
111 dimension = 'DBInstanceIdentifier'
112 cfn_type = config_type = 'AWS::RDS::DBInstance'
113 arn = 'DBInstanceArn'
114 universal_taggable = True
115 default_report_fields = (
116 'DBInstanceIdentifier',
117 'DBName',
118 'Engine',
119 'EngineVersion',
120 'MultiAZ',
121 'AllocatedStorage',
122 'StorageEncrypted',
123 'PubliclyAccessible',
124 'InstanceCreateTime',
125 )
126 permissions_enum = ('rds:DescribeDBInstances',)
127
128 filter_registry = filters
129 action_registry = actions
130
131 def resources(self, query=None):
132 if query is None and 'query' in self.data:
133 query = merge_dict_list(self.data['query'])
134 elif query is None:
135 query = {}
136 return super(RDS, self).resources(query=query)
137
138 source_mapping = {
139 'describe': DescribeRDS,
140 'config': ConfigRDS
141 }
142
143
144def _db_instance_eligible_for_backup(resource):
145 db_instance_id = resource['DBInstanceIdentifier']
146
147 # Database instance is not in available state
148 if resource.get('DBInstanceStatus', '') != 'available':
149 log.debug(
150 "DB instance %s is not in available state",
151 db_instance_id)
152 return False
153 # The specified DB Instance is a member of a cluster and its
154 # backup retention should not be modified directly. Instead,
155 # modify the backup retention of the cluster using the
156 # ModifyDbCluster API
157 if resource.get('DBClusterIdentifier', ''):
158 log.debug(
159 "DB instance %s is a cluster member",
160 db_instance_id)
161 return False
162 # DB Backups not supported on a read replica for engine postgres
163 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and
164 resource.get('Engine', '') == 'postgres'):
165 log.debug(
166 "DB instance %s is a postgres read-replica",
167 db_instance_id)
168 return False
169 # DB Backups not supported on a read replica running a mysql
170 # version before 5.6
171 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and
172 resource.get('Engine', '') == 'mysql'):
173 engine_version = resource.get('EngineVersion', '')
174 # Assume "<major>.<minor>.<whatever>"
175 match = re.match(r'(?P<major>\d+)\.(?P<minor>\d+)\..*', engine_version)
176 if (match and int(match.group('major')) < 5 or
177 (int(match.group('major')) == 5 and int(match.group('minor')) < 6)):
178 log.debug(
179 "DB instance %s is a version %s mysql read-replica",
180 db_instance_id,
181 engine_version)
182 return False
183 return True
184
185
186def _db_instance_eligible_for_final_snapshot(resource):
187 status = resource.get('DBInstanceStatus', '')
188 # If the DB instance you are deleting has a status of "Creating,"
189 # you will not be able to have a final DB snapshot taken
190 # If the DB instance is in a failure state with a status of "failed,"
191 # "incompatible-restore," or "incompatible-network," you can only delete
192 # the instance when the SkipFinalSnapshot parameter is set to "true."
193 eligible_for_final_snapshot = True
194 if status in ['creating', 'failed', 'incompatible-restore', 'incompatible-network']:
195 eligible_for_final_snapshot = False
196
197 # FinalDBSnapshotIdentifier can not be specified when deleting a
198 # replica instance
199 if resource.get('ReadReplicaSourceDBInstanceIdentifier', ''):
200 eligible_for_final_snapshot = False
201
202 # if it's a rds-cluster, don't try to run the rds instance snapshot api call
203 if resource.get('DBClusterIdentifier', False):
204 eligible_for_final_snapshot = False
205
206 if not eligible_for_final_snapshot:
207 log.debug('DB instance is not eligible for a snapshot:/n %s', resource)
208 return eligible_for_final_snapshot
209
210
211def _get_available_engine_upgrades(client, major=False):
212 """Returns all extant rds engine upgrades.
213
214 As a nested mapping of engine type to known versions
215 and their upgrades.
216
217 Defaults to minor upgrades, but configurable to major.
218
219 Example::
220
221 >>> _get_available_engine_upgrades(client)
222 {
223 'oracle-se2': {'12.1.0.2.v2': '12.1.0.2.v5',
224 '12.1.0.2.v3': '12.1.0.2.v5'},
225 'postgres': {'9.3.1': '9.3.14',
226 '9.3.10': '9.3.14',
227 '9.3.12': '9.3.14',
228 '9.3.2': '9.3.14'}
229 }
230 """
231 results = {}
232 paginator = client.get_paginator('describe_db_engine_versions')
233 for page in paginator.paginate():
234 engine_versions = page['DBEngineVersions']
235 for v in engine_versions:
236 if v['Engine'] not in results:
237 results[v['Engine']] = {}
238 if 'ValidUpgradeTarget' not in v or len(v['ValidUpgradeTarget']) == 0:
239 continue
240 for t in v['ValidUpgradeTarget']:
241 if not major and t['IsMajorVersionUpgrade']:
242 continue
243 if LooseVersion(t['EngineVersion']) > LooseVersion(
244 results[v['Engine']].get(v['EngineVersion'], '0.0.0')):
245 results[v['Engine']][v['EngineVersion']] = t['EngineVersion']
246 return results
247
248
249filters.register('offhour', OffHour)
250filters.register('onhour', OnHour)
251
252
253@filters.register('default-vpc')
254class DefaultVpc(net_filters.DefaultVpcBase):
255 """ Matches if an rds database is in the default vpc
256
257 :example:
258
259 .. code-block:: yaml
260
261 policies:
262 - name: default-vpc-rds
263 resource: rds
264 filters:
265 - type: default-vpc
266 """
267 schema = type_schema('default-vpc')
268
269 def __call__(self, rdb):
270 return self.match(rdb['DBSubnetGroup']['VpcId'])
271
272
273@filters.register('security-group')
274class SecurityGroupFilter(net_filters.SecurityGroupFilter):
275
276 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId"
277
278
279@filters.register('subnet')
280class SubnetFilter(net_filters.SubnetFilter):
281
282 RelatedIdsExpression = "DBSubnetGroup.Subnets[].SubnetIdentifier"
283
284
285@filters.register('vpc')
286class VpcFilter(net_filters.VpcFilter):
287
288 RelatedIdsExpression = "DBSubnetGroup.VpcId"
289
290
291filters.register('network-location', net_filters.NetworkLocation)
292
293
294@filters.register('kms-alias')
295class KmsKeyAlias(ResourceKmsKeyAlias):
296
297 def process(self, dbs, event=None):
298 return self.get_matching_aliases(dbs)
299
300
301@actions.register('auto-patch')
302class AutoPatch(BaseAction):
303 """Toggle AutoMinorUpgrade flag on RDS instance
304
305 'window' parameter needs to be in the format 'ddd:hh:mm-ddd:hh:mm' and
306 have at least 30 minutes between start & end time.
307 If 'window' is not specified, AWS will assign a random maintenance window
308 to each instance selected.
309
310 :example:
311
312 .. code-block:: yaml
313
314 policies:
315 - name: enable-rds-autopatch
316 resource: rds
317 filters:
318 - AutoMinorVersionUpgrade: false
319 actions:
320 - type: auto-patch
321 minor: true
322 window: Mon:23:00-Tue:01:00
323 """
324
325 schema = type_schema(
326 'auto-patch',
327 minor={'type': 'boolean'}, window={'type': 'string'})
328 permissions = ('rds:ModifyDBInstance',)
329
330 def process(self, dbs):
331 client = local_session(
332 self.manager.session_factory).client('rds')
333
334 params = {'AutoMinorVersionUpgrade': self.data.get('minor', True)}
335 if self.data.get('window'):
336 params['PreferredMaintenanceWindow'] = self.data['window']
337
338 for db in dbs:
339 client.modify_db_instance(
340 DBInstanceIdentifier=db['DBInstanceIdentifier'],
341 **params)
342
343
344@filters.register('upgrade-available')
345class UpgradeAvailable(Filter):
346 """ Scan DB instances for available engine upgrades
347
348 This will pull DB instances & check their specific engine for any
349 engine version with higher release numbers than the current one
350
351 This will also annotate the rds instance with 'target_engine' which is
352 the most recent version of the engine available
353
354 :example:
355
356 .. code-block:: yaml
357
358 policies:
359 - name: rds-upgrade-available
360 resource: rds
361 filters:
362 - type: upgrade-available
363 major: False
364
365 """
366
367 schema = type_schema('upgrade-available',
368 major={'type': 'boolean'},
369 value={'type': 'boolean'})
370 permissions = ('rds:DescribeDBEngineVersions',)
371
372 def process(self, resources, event=None):
373 client = local_session(self.manager.session_factory).client('rds')
374 check_upgrade_extant = self.data.get('value', True)
375 check_major = self.data.get('major', False)
376 engine_upgrades = _get_available_engine_upgrades(
377 client, major=check_major)
378 results = []
379
380 for r in resources:
381 target_upgrade = engine_upgrades.get(
382 r['Engine'], {}).get(r['EngineVersion'])
383 if target_upgrade is None:
384 if check_upgrade_extant is False:
385 results.append(r)
386 continue
387 r['c7n-rds-engine-upgrade'] = target_upgrade
388 results.append(r)
389 return results
390
391
392@actions.register('upgrade')
393class UpgradeMinor(BaseAction):
394 """Upgrades a RDS instance to the latest major/minor version available
395
396 Use of the 'immediate' flag (default False) will automatically upgrade
397 the RDS engine disregarding the existing maintenance window.
398
399 :example:
400
401 .. code-block:: yaml
402
403 policies:
404 - name: upgrade-rds-minor
405 resource: rds
406 actions:
407 - type: upgrade
408 major: False
409 immediate: False
410
411 """
412
413 schema = type_schema(
414 'upgrade',
415 major={'type': 'boolean'},
416 immediate={'type': 'boolean'})
417 permissions = ('rds:ModifyDBInstance',)
418
419 def process(self, resources):
420 client = local_session(self.manager.session_factory).client('rds')
421 engine_upgrades = None
422 for r in resources:
423 if 'EngineVersion' in r['PendingModifiedValues']:
424 # Upgrade has already been scheduled
425 continue
426 if 'c7n-rds-engine-upgrade' not in r:
427 if engine_upgrades is None:
428 engine_upgrades = _get_available_engine_upgrades(
429 client, major=self.data.get('major', False))
430 target = engine_upgrades.get(
431 r['Engine'], {}).get(r['EngineVersion'])
432 if target is None:
433 log.debug(
434 "implicit filter no upgrade on %s",
435 r['DBInstanceIdentifier'])
436 continue
437 r['c7n-rds-engine-upgrade'] = target
438 client.modify_db_instance(
439 DBInstanceIdentifier=r['DBInstanceIdentifier'],
440 EngineVersion=r['c7n-rds-engine-upgrade'],
441 ApplyImmediately=self.data.get('immediate', False))
442
443
444@actions.register('tag-trim')
445class TagTrim(tags.TagTrim):
446
447 permissions = ('rds:RemoveTagsFromResource',)
448
449 def process_tag_removal(self, client, resource, candidates):
450 client.remove_tags_from_resource(ResourceName=resource['DBInstanceArn'], TagKeys=candidates)
451
452
453START_STOP_ELIGIBLE_ENGINES = {
454 'postgres', 'sqlserver-ee',
455 'oracle-se2', 'mariadb', 'oracle-ee',
456 'sqlserver-ex', 'sqlserver-se', 'oracle-se',
457 'mysql', 'oracle-se1', 'sqlserver-web',
458 'db2-ae', 'db2-se', 'oracle-ee-cdb',
459 'sqlserver-ee', 'oracle-se2-cdb'}
460
461
462def _eligible_start_stop(db, state="available"):
463 # See conditions noted here
464 # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html
465 # Note that this doesn't really specify what happens for all the nosql engines
466 # that are available as rds engines.
467 if db.get('DBInstanceStatus') != state:
468 return False
469
470 if db.get('MultiAZ') and db['Engine'].startswith('sqlserver-'):
471 return False
472
473 if db['Engine'] not in START_STOP_ELIGIBLE_ENGINES:
474 return False
475
476 if db.get('ReadReplicaDBInstanceIdentifiers'):
477 return False
478
479 if db.get('ReadReplicaSourceDBInstanceIdentifier'):
480 return False
481
482 # TODO is SQL Server mirror is detectable.
483 return True
484
485
486@actions.register('stop')
487class Stop(BaseAction):
488 """Stop an rds instance.
489
490 https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html
491 """
492
493 schema = type_schema('stop')
494
495 permissions = ("rds:StopDBInstance",)
496
497 def process(self, resources):
498 client = local_session(self.manager.session_factory).client('rds')
499 for r in filter(_eligible_start_stop, resources):
500 try:
501 client.stop_db_instance(
502 DBInstanceIdentifier=r['DBInstanceIdentifier'])
503 except ClientError as e:
504 log.exception(
505 "Error stopping db instance:%s err:%s",
506 r['DBInstanceIdentifier'], e)
507
508
509@actions.register('start')
510class Start(BaseAction):
511 """Start an rds instance.
512 """
513
514 schema = type_schema('start')
515
516 permissions = ("rds:StartDBInstance",)
517
518 def process(self, resources):
519 client = local_session(self.manager.session_factory).client('rds')
520 start_filter = functools.partial(_eligible_start_stop, state='stopped')
521 for r in filter(start_filter, resources):
522 try:
523 client.start_db_instance(
524 DBInstanceIdentifier=r['DBInstanceIdentifier'])
525 except ClientError as e:
526 log.exception(
527 "Error starting db instance:%s err:%s",
528 r['DBInstanceIdentifier'], e)
529
530
531@actions.register('delete')
532class Delete(BaseAction):
533 """Deletes selected RDS instances
534
535 This will delete RDS instances. It is recommended to apply with a filter
536 to avoid deleting all RDS instances in the account.
537
538 :example:
539
540 .. code-block:: yaml
541
542 policies:
543 - name: rds-delete
544 resource: rds
545 filters:
546 - default-vpc
547 actions:
548 - type: delete
549 skip-snapshot: true
550 """
551
552 schema = type_schema('delete', **{
553 'skip-snapshot': {'type': 'boolean'},
554 'copy-restore-info': {'type': 'boolean'}
555 })
556
557 permissions = ('rds:DeleteDBInstance', 'rds:AddTagsToResource')
558
559 def validate(self):
560 if self.data.get('skip-snapshot', False) and self.data.get(
561 'copy-restore-info'):
562 raise PolicyValidationError(
563 "skip-snapshot cannot be specified with copy-restore-info on %s" % (
564 self.manager.data,))
565 return self
566
567 def process(self, dbs):
568 skip = self.data.get('skip-snapshot', False)
569 # Can't delete an instance in an aurora cluster, use a policy on the cluster
570 dbs = [r for r in dbs if not r.get('DBClusterIdentifier')]
571 # Concurrency feels like overkill here.
572 client = local_session(self.manager.session_factory).client('rds')
573 for db in dbs:
574 params = dict(
575 DBInstanceIdentifier=db['DBInstanceIdentifier'])
576 if skip or not _db_instance_eligible_for_final_snapshot(db):
577 params['SkipFinalSnapshot'] = True
578 else:
579 params['FinalDBSnapshotIdentifier'] = snapshot_identifier(
580 'Final', db['DBInstanceIdentifier'])
581 if self.data.get('copy-restore-info', False):
582 self.copy_restore_info(client, db)
583 if not db['CopyTagsToSnapshot']:
584 client.modify_db_instance(
585 DBInstanceIdentifier=db['DBInstanceIdentifier'],
586 CopyTagsToSnapshot=True)
587 self.log.info(
588 "Deleting rds: %s snapshot: %s",
589 db['DBInstanceIdentifier'],
590 params.get('FinalDBSnapshotIdentifier', False))
591
592 try:
593 client.delete_db_instance(**params)
594 except ClientError as e:
595 if e.response['Error']['Code'] == "InvalidDBInstanceState":
596 continue
597 raise
598
599 return dbs
600
601 def copy_restore_info(self, client, instance):
602 tags = []
603 tags.append({
604 'Key': 'VPCSecurityGroups',
605 'Value': ''.join([
606 g['VpcSecurityGroupId'] for g in instance['VpcSecurityGroups']
607 ])})
608 tags.append({
609 'Key': 'OptionGroupName',
610 'Value': instance['OptionGroupMemberships'][0]['OptionGroupName']})
611 tags.append({
612 'Key': 'ParameterGroupName',
613 'Value': instance['DBParameterGroups'][0]['DBParameterGroupName']})
614 tags.append({
615 'Key': 'InstanceClass',
616 'Value': instance['DBInstanceClass']})
617 tags.append({
618 'Key': 'StorageType',
619 'Value': instance['StorageType']})
620 tags.append({
621 'Key': 'MultiAZ',
622 'Value': str(instance['MultiAZ'])})
623 tags.append({
624 'Key': 'DBSubnetGroupName',
625 'Value': instance['DBSubnetGroup']['DBSubnetGroupName']})
626 client.add_tags_to_resource(
627 ResourceName=self.manager.generate_arn(
628 instance['DBInstanceIdentifier']),
629 Tags=tags)
630
631
632@actions.register('set-snapshot-copy-tags')
633class CopySnapshotTags(BaseAction):
634 """Enables copying tags from rds instance to snapshot
635
636 DEPRECATED - use modify-db instead with `CopyTagsToSnapshot`
637
638 :example:
639
640 .. code-block:: yaml
641
642 policies:
643 - name: enable-rds-snapshot-tags
644 resource: rds
645 filters:
646 - type: value
647 key: Engine
648 value: aurora
649 op: eq
650 actions:
651 - type: set-snapshot-copy-tags
652 enable: True
653 """
654 deprecations = (
655 deprecated.action("use modify-db instead with `CopyTagsToSnapshot`"),
656 )
657
658 schema = type_schema(
659 'set-snapshot-copy-tags',
660 enable={'type': 'boolean'})
661 permissions = ('rds:ModifyDBInstance',)
662
663 def process(self, resources):
664 error = None
665 with self.executor_factory(max_workers=2) as w:
666 futures = {}
667 client = local_session(self.manager.session_factory).client('rds')
668 resources = [r for r in resources
669 if r['CopyTagsToSnapshot'] != self.data.get('enable', True)]
670 for r in resources:
671 futures[w.submit(self.set_snapshot_tags, client, r)] = r
672 for f in as_completed(futures):
673 if f.exception():
674 error = f.exception()
675 self.log.error(
676 'error updating rds:%s CopyTagsToSnapshot \n %s',
677 futures[f]['DBInstanceIdentifier'], error)
678 if error:
679 raise error
680 return resources
681
682 def set_snapshot_tags(self, client, r):
683 self.manager.retry(
684 client.modify_db_instance,
685 DBInstanceIdentifier=r['DBInstanceIdentifier'],
686 CopyTagsToSnapshot=self.data.get('enable', True))
687
688
689@RDS.action_registry.register('post-finding')
690class DbInstanceFinding(PostFinding):
691
692 resource_type = 'AwsRdsDbInstance'
693
694 def format_resource(self, r):
695
696 fields = [
697 'AssociatedRoles', 'CACertificateIdentifier', 'DBClusterIdentifier',
698 'DBInstanceIdentifier', 'DBInstanceClass', 'DbInstancePort', 'DbiResourceId',
699 'DBName', 'DeletionProtection', 'Endpoint', 'Engine', 'EngineVersion',
700 'IAMDatabaseAuthenticationEnabled', 'InstanceCreateTime', 'KmsKeyId',
701 'PubliclyAccessible', 'StorageEncrypted',
702 'TdeCredentialArn', 'VpcSecurityGroups', 'MultiAz', 'EnhancedMonitoringResourceArn',
703 'DbInstanceStatus', 'MasterUsername',
704 'AllocatedStorage', 'PreferredBackupWindow', 'BackupRetentionPeriod',
705 'DbSecurityGroups', 'DbParameterGroups',
706 'AvailabilityZone', 'DbSubnetGroup', 'PreferredMaintenanceWindow',
707 'PendingModifiedValues', 'LatestRestorableTime',
708 'AutoMinorVersionUpgrade', 'ReadReplicaSourceDBInstanceIdentifier',
709 'ReadReplicaDBInstanceIdentifiers',
710 'ReadReplicaDBClusterIdentifiers', 'LicenseModel', 'Iops', 'OptionGroupMemberships',
711 'CharacterSetName',
712 'SecondaryAvailabilityZone', 'StatusInfos', 'StorageType', 'DomainMemberships',
713 'CopyTagsToSnapshot',
714 'MonitoringInterval', 'MonitoringRoleArn', 'PromotionTier', 'Timezone',
715 'PerformanceInsightsEnabled',
716 'PerformanceInsightsKmsKeyId', 'PerformanceInsightsRetentionPeriod',
717 'EnabledCloudWatchLogsExports',
718 'ProcessorFeatures', 'ListenerEndpoint', 'MaxAllocatedStorage'
719 ]
720 details = {}
721 for f in fields:
722 if r.get(f):
723 value = r[f]
724 if isinstance(r[f], datetime.datetime):
725 value = r[f].isoformat()
726 details.setdefault(f, value)
727
728 db_instance = {
729 'Type': self.resource_type,
730 'Id': r['DBInstanceArn'],
731 'Region': self.manager.config.region,
732 'Tags': {t['Key']: t['Value'] for t in r.get('Tags', [])},
733 'Details': {self.resource_type: filter_empty(details)},
734 }
735 db_instance = filter_empty(db_instance)
736 return db_instance
737
738
739@actions.register('snapshot')
740class Snapshot(BaseAction):
741 """Creates a manual snapshot of a RDS instance
742
743 :example:
744
745 .. code-block:: yaml
746
747 policies:
748 - name: rds-snapshot
749 resource: rds
750 actions:
751 - snapshot
752 """
753
754 schema = type_schema('snapshot')
755 permissions = ('rds:CreateDBSnapshot',)
756
757 def process(self, dbs):
758 with self.executor_factory(max_workers=3) as w:
759 futures = []
760 for db in dbs:
761 futures.append(w.submit(
762 self.process_rds_snapshot,
763 db))
764 for f in as_completed(futures):
765 if f.exception():
766 self.log.error(
767 "Exception creating rds snapshot \n %s",
768 f.exception())
769 return dbs
770
771 def process_rds_snapshot(self, resource):
772 if not _db_instance_eligible_for_backup(resource):
773 return
774
775 c = local_session(self.manager.session_factory).client('rds')
776 c.create_db_snapshot(
777 DBSnapshotIdentifier=snapshot_identifier(
778 self.data.get('snapshot-prefix', 'Backup'),
779 resource['DBInstanceIdentifier']),
780 DBInstanceIdentifier=resource['DBInstanceIdentifier'])
781
782
783@actions.register('resize')
784class ResizeInstance(BaseAction):
785 """Change the allocated storage of an rds instance.
786
787 :example:
788
789 This will find databases using over 85% of their allocated
790 storage, and resize them to have an additional 30% storage
791 the resize here is async during the next maintenance.
792
793 .. code-block:: yaml
794
795 policies:
796 - name: rds-resize-up
797 resource: rds
798 filters:
799 - type: metrics
800 name: FreeStorageSpace
801 percent-attr: AllocatedStorage
802 attr-multiplier: 1073741824
803 value: 90
804 op: greater-than
805 actions:
806 - type: resize
807 percent: 30
808
809
810 This will find databases using under 20% of their allocated
811 storage, and resize them to be 30% smaller, the resize here
812 is configured to be immediate.
813
814 .. code-block:: yaml
815
816 policies:
817 - name: rds-resize-down
818 resource: rds
819 filters:
820 - type: metrics
821 name: FreeStorageSpace
822 percent-attr: AllocatedStorage
823 attr-multiplier: 1073741824
824 value: 90
825 op: greater-than
826 actions:
827 - type: resize
828 percent: -30
829 immediate: true
830 """
831 schema = type_schema(
832 'resize',
833 percent={'type': 'number'},
834 immediate={'type': 'boolean'})
835
836 permissions = ('rds:ModifyDBInstance',)
837
838 def process(self, resources):
839 c = local_session(self.manager.session_factory).client('rds')
840 for r in resources:
841 old_val = D(r['AllocatedStorage'])
842 _100 = D(100)
843 new_val = ((_100 + D(self.data['percent'])) / _100) * old_val
844 rounded = int(new_val.quantize(D('0'), ROUND_HALF_UP))
845 c.modify_db_instance(
846 DBInstanceIdentifier=r['DBInstanceIdentifier'],
847 AllocatedStorage=rounded,
848 ApplyImmediately=self.data.get('immediate', False))
849
850
851@actions.register('retention')
852class RetentionWindow(BaseAction):
853 """
854 Sets the 'BackupRetentionPeriod' value for automated snapshots,
855 enforce (min, max, exact) sets retention days occordingly.
856 :example:
857
858 .. code-block:: yaml
859
860 policies:
861 - name: rds-snapshot-retention
862 resource: rds
863 filters:
864 - type: value
865 key: BackupRetentionPeriod
866 value: 7
867 op: lt
868 actions:
869 - type: retention
870 days: 7
871 copy-tags: true
872 enforce: exact
873 """
874
875 date_attribute = "BackupRetentionPeriod"
876 schema = type_schema(
877 'retention', **{'days': {'type': 'number'},
878 'copy-tags': {'type': 'boolean'},
879 'enforce': {'type': 'string', 'enum': [
880 'min', 'max', 'exact']}})
881 permissions = ('rds:ModifyDBInstance',)
882
883 def process(self, dbs):
884 with self.executor_factory(max_workers=3) as w:
885 futures = []
886 for db in dbs:
887 futures.append(w.submit(
888 self.process_snapshot_retention,
889 db))
890 for f in as_completed(futures):
891 if f.exception():
892 self.log.error(
893 "Exception setting rds retention \n %s",
894 f.exception())
895 return dbs
896
897 def process_snapshot_retention(self, resource):
898 current_retention = int(resource.get('BackupRetentionPeriod', 0))
899 current_copy_tags = resource['CopyTagsToSnapshot']
900 new_retention = self.data['days']
901 new_copy_tags = self.data.get('copy-tags', True)
902 retention_type = self.data.get('enforce', 'min').lower()
903
904 if ((retention_type == 'min' or
905 current_copy_tags != new_copy_tags) and
906 _db_instance_eligible_for_backup(resource)):
907 self.set_retention_window(
908 resource,
909 max(current_retention, new_retention),
910 new_copy_tags)
911 return resource
912
913 if ((retention_type == 'max' or
914 current_copy_tags != new_copy_tags) and
915 _db_instance_eligible_for_backup(resource)):
916 self.set_retention_window(
917 resource,
918 min(current_retention, new_retention),
919 new_copy_tags)
920 return resource
921
922 if ((retention_type == 'exact' or
923 current_copy_tags != new_copy_tags) and
924 _db_instance_eligible_for_backup(resource)):
925 self.set_retention_window(resource, new_retention, new_copy_tags)
926 return resource
927
928 def set_retention_window(self, resource, retention, copy_tags):
929 c = local_session(self.manager.session_factory).client('rds')
930 c.modify_db_instance(
931 DBInstanceIdentifier=resource['DBInstanceIdentifier'],
932 BackupRetentionPeriod=retention,
933 CopyTagsToSnapshot=copy_tags)
934
935
936@actions.register('set-public-access')
937class RDSSetPublicAvailability(BaseAction):
938 """
939 This action allows for toggling an RDS instance
940 'PubliclyAccessible' flag to true or false
941
942 :example:
943
944 .. code-block:: yaml
945
946 policies:
947 - name: disable-rds-public-accessibility
948 resource: rds
949 filters:
950 - PubliclyAccessible: true
951 actions:
952 - type: set-public-access
953 state: false
954 """
955
956 schema = type_schema(
957 "set-public-access",
958 state={'type': 'boolean'})
959 permissions = ('rds:ModifyDBInstance',)
960
961 def set_accessibility(self, r):
962 client = local_session(self.manager.session_factory).client('rds')
963 client.modify_db_instance(
964 DBInstanceIdentifier=r['DBInstanceIdentifier'],
965 PubliclyAccessible=self.data.get('state', False))
966
967 def process(self, rds):
968 with self.executor_factory(max_workers=2) as w:
969 futures = {w.submit(self.set_accessibility, r): r for r in rds}
970 for f in as_completed(futures):
971 if f.exception():
972 self.log.error(
973 "Exception setting public access on %s \n %s",
974 futures[f]['DBInstanceIdentifier'], f.exception())
975 return rds
976
977
978@resources.register('rds-subscription')
979class RDSSubscription(QueryResourceManager):
980
981 class resource_type(TypeInfo):
982 service = 'rds'
983 arn_type = 'es'
984 cfn_type = 'AWS::RDS::EventSubscription'
985 enum_spec = (
986 'describe_event_subscriptions', 'EventSubscriptionsList', None)
987 name = id = "CustSubscriptionId"
988 arn = 'EventSubscriptionArn'
989 date = "SubscriptionCreateTime"
990 permissions_enum = ('rds:DescribeEventSubscriptions',)
991 universal_taggable = object()
992
993 augment = universal_augment
994
995
996@RDSSubscription.action_registry.register('delete')
997class RDSSubscriptionDelete(BaseAction):
998 """Deletes a RDS snapshot resource
999
1000 :example:
1001
1002 .. code-block:: yaml
1003
1004 policies:
1005 - name: rds-subscription-delete
1006 resource: rds-subscription
1007 filters:
1008 - type: value
1009 key: CustSubscriptionId
1010 value: xyz
1011 actions:
1012 - delete
1013 """
1014
1015 schema = type_schema('delete')
1016 permissions = ('rds:DeleteEventSubscription',)
1017
1018 def process(self, resources):
1019 client = local_session(self.manager.session_factory).client('rds')
1020 for r in resources:
1021 self.manager.retry(
1022 client.delete_event_subscription, SubscriptionName=r['CustSubscriptionId'],
1023 ignore_err_codes=('SubscriptionNotFoundFault',
1024 'InvalidEventSubscriptionStateFault'))
1025
1026
1027class DescribeRDSSnapshot(DescribeSource):
1028
1029 def get_resources(self, ids, cache=True):
1030 super_get = super().get_resources
1031 return list(itertools.chain(*[super_get((i,)) for i in ids]))
1032
1033 def augment(self, snaps):
1034 for s in snaps:
1035 s['Tags'] = s.pop('TagList', ())
1036 return snaps
1037
1038
1039@resources.register('rds-snapshot')
1040class RDSSnapshot(QueryResourceManager):
1041 """Resource manager for RDS DB snapshots.
1042 """
1043
1044 class resource_type(TypeInfo):
1045 service = 'rds'
1046 arn_type = 'snapshot'
1047 arn_separator = ':'
1048 enum_spec = ('describe_db_snapshots', 'DBSnapshots', None)
1049 name = id = 'DBSnapshotIdentifier'
1050 date = 'SnapshotCreateTime'
1051 config_type = "AWS::RDS::DBSnapshot"
1052 filter_name = "DBSnapshotIdentifier"
1053 filter_type = "scalar"
1054 universal_taggable = True
1055 permissions_enum = ('rds:DescribeDBSnapshots',)
1056
1057 source_mapping = {
1058 'describe': DescribeRDSSnapshot,
1059 'config': ConfigSource
1060 }
1061
1062
1063@RDSSnapshot.filter_registry.register('onhour')
1064class RDSSnapshotOnHour(OnHour):
1065 """Scheduled action on rds snapshot."""
1066
1067
1068@RDSSnapshot.filter_registry.register('instance')
1069class SnapshotInstance(related.RelatedResourceFilter):
1070 """Filter snapshots by their database attributes.
1071
1072 :example:
1073
1074 Find snapshots without an extant database
1075
1076 .. code-block:: yaml
1077
1078 policies:
1079 - name: rds-snapshot-orphan
1080 resource: aws.rds-snapshot
1081 filters:
1082 - type: instance
1083 value: 0
1084 value_type: resource_count
1085 """
1086 schema = type_schema(
1087 'instance', rinherit=ValueFilter.schema
1088 )
1089
1090 RelatedResource = "c7n.resources.rds.RDS"
1091 RelatedIdsExpression = "DBInstanceIdentifier"
1092 FetchThreshold = 5
1093
1094
1095@RDSSnapshot.filter_registry.register('latest')
1096class LatestSnapshot(Filter):
1097 """Return the latest snapshot for each database.
1098 """
1099 schema = type_schema('latest', automatic={'type': 'boolean'})
1100 permissions = ('rds:DescribeDBSnapshots',)
1101
1102 def process(self, resources, event=None):
1103 results = []
1104 if not self.data.get('automatic', True):
1105 resources = [r for r in resources if r['SnapshotType'] == 'manual']
1106 for db_identifier, snapshots in itertools.groupby(
1107 resources, operator.itemgetter('DBInstanceIdentifier')):
1108 results.append(
1109 sorted(snapshots,
1110 key=operator.itemgetter('SnapshotCreateTime'))[-1])
1111 return results
1112
1113
1114@RDSSnapshot.filter_registry.register('age')
1115class RDSSnapshotAge(AgeFilter):
1116 """Filters RDS snapshots based on age (in days)
1117
1118 :example:
1119
1120 .. code-block:: yaml
1121
1122 policies:
1123 - name: rds-snapshot-expired
1124 resource: rds-snapshot
1125 filters:
1126 - type: age
1127 days: 28
1128 op: ge
1129 actions:
1130 - delete
1131 """
1132
1133 schema = type_schema(
1134 'age', days={'type': 'number'},
1135 op={'$ref': '#/definitions/filters_common/comparison_operators'})
1136
1137 date_attribute = 'SnapshotCreateTime'
1138
1139 def get_resource_date(self, i):
1140 return i.get('SnapshotCreateTime')
1141
1142
1143@RDSSnapshot.action_registry.register('restore')
1144class RestoreInstance(BaseAction):
1145 """Restore an rds instance from a snapshot.
1146
1147 Note this requires the snapshot or db deletion be taken
1148 with the `copy-restore-info` boolean flag set to true, as
1149 various instance metadata is stored on the snapshot as tags.
1150
1151 additional parameters to restore db instance api call be overriden
1152 via `restore_options` settings. various modify db instance parameters
1153 can be specified via `modify_options` settings.
1154 """
1155
1156 schema = type_schema(
1157 'restore',
1158 restore_options={'type': 'object'},
1159 modify_options={'type': 'object'})
1160
1161 permissions = (
1162 'rds:ModifyDBInstance',
1163 'rds:ModifyDBParameterGroup',
1164 'rds:ModifyOptionGroup',
1165 'rds:RebootDBInstance',
1166 'rds:RestoreDBInstanceFromDBSnapshot')
1167
1168 poll_period = 60
1169 restore_keys = {
1170 'VPCSecurityGroups', 'MultiAZ', 'DBSubnetGroupName',
1171 'InstanceClass', 'StorageType', 'ParameterGroupName',
1172 'OptionGroupName'}
1173
1174 def validate(self):
1175 found = False
1176 for f in self.manager.iter_filters():
1177 if isinstance(f, LatestSnapshot):
1178 found = True
1179 if not found:
1180 # do we really need this...
1181 raise PolicyValidationError(
1182 "must filter by latest to use restore action %s" % (
1183 self.manager.data,))
1184 return self
1185
1186 def process(self, resources):
1187 client = local_session(self.manager.session_factory).client('rds')
1188 # restore up to 10 in parallel, we have to wait on each.
1189 with self.executor_factory(
1190 max_workers=min(10, len(resources) or 1)) as w:
1191 futures = {}
1192 for r in resources:
1193 tags = {t['Key']: t['Value'] for t in r['Tags']}
1194 if not set(tags).issuperset(self.restore_keys):
1195 self.log.warning(
1196 "snapshot:%s missing restore tags",
1197 r['DBSnapshotIdentifier'])
1198 continue
1199 futures[w.submit(self.process_instance, client, r)] = r
1200 for f in as_completed(futures):
1201 r = futures[f]
1202 if f.exception():
1203 self.log.warning(
1204 "Error restoring db:%s from:%s error:\n%s",
1205 r['DBInstanceIdentifier'], r['DBSnapshotIdentifier'],
1206 f.exception())
1207 continue
1208
1209 def process_instance(self, client, r):
1210 params, post_modify = self.get_restore_from_tags(r)
1211 self.manager.retry(
1212 client.restore_db_instance_from_db_snapshot, **params)
1213 waiter = client.get_waiter('db_instance_available')
1214 # wait up to 40m
1215 waiter.config.delay = self.poll_period
1216 waiter.wait(DBInstanceIdentifier=params['DBInstanceIdentifier'])
1217 self.manager.retry(
1218 client.modify_db_instance,
1219 DBInstanceIdentifier=params['DBInstanceIdentifier'],
1220 ApplyImmediately=True,
1221 **post_modify)
1222 self.manager.retry(
1223 client.reboot_db_instance,
1224 DBInstanceIdentifier=params['DBInstanceIdentifier'],
1225 ForceFailover=False)
1226
1227 def get_restore_from_tags(self, snapshot):
1228 params, post_modify = {}, {}
1229 tags = {t['Key']: t['Value'] for t in snapshot['Tags']}
1230
1231 params['DBInstanceIdentifier'] = snapshot['DBInstanceIdentifier']
1232 params['DBSnapshotIdentifier'] = snapshot['DBSnapshotIdentifier']
1233 params['MultiAZ'] = tags['MultiAZ'] == 'True' and True or False
1234 params['DBSubnetGroupName'] = tags['DBSubnetGroupName']
1235 params['DBInstanceClass'] = tags['InstanceClass']
1236 params['CopyTagsToSnapshot'] = True
1237 params['StorageType'] = tags['StorageType']
1238 params['OptionGroupName'] = tags['OptionGroupName']
1239
1240 post_modify['DBParameterGroupName'] = tags['ParameterGroupName']
1241 post_modify['VpcSecurityGroupIds'] = tags['VPCSecurityGroups'].split(',')
1242
1243 params['Tags'] = [
1244 {'Key': k, 'Value': v} for k, v in tags.items()
1245 if k not in self.restore_keys]
1246
1247 params.update(self.data.get('restore_options', {}))
1248 post_modify.update(self.data.get('modify_options', {}))
1249 return params, post_modify
1250
1251
1252@RDSSnapshot.filter_registry.register('cross-account')
1253class CrossAccountAccess(CrossAccountAccessFilter):
1254
1255 permissions = ('rds:DescribeDBSnapshotAttributes',)
1256 attributes_key = 'c7n:attributes'
1257 annotation_key = 'c7n:CrossAccountViolations'
1258
1259 def process(self, resources, event=None):
1260 self.accounts = self.get_accounts()
1261 results = []
1262 with self.executor_factory(max_workers=2) as w:
1263 futures = []
1264 for resource_set in chunks(resources, 20):
1265 futures.append(w.submit(
1266 self.process_resource_set, resource_set))
1267 for f in as_completed(futures):
1268 if f.exception():
1269 self.log.error(
1270 "Exception checking cross account access\n %s" % (
1271 f.exception()))
1272 continue
1273 results.extend(f.result())
1274 return results
1275
1276 def process_resource_set(self, resource_set):
1277 client = local_session(self.manager.session_factory).client('rds')
1278 results = []
1279 for r in resource_set:
1280 attrs = {t['AttributeName']: t['AttributeValues']
1281 for t in self.manager.retry(
1282 client.describe_db_snapshot_attributes,
1283 DBSnapshotIdentifier=r['DBSnapshotIdentifier'])[
1284 'DBSnapshotAttributesResult']['DBSnapshotAttributes']}
1285 r[self.attributes_key] = attrs
1286 shared_accounts = set(attrs.get('restore', []))
1287 delta_accounts = shared_accounts.difference(self.accounts)
1288 if delta_accounts:
1289 r[self.annotation_key] = list(delta_accounts)
1290 results.append(r)
1291 return results
1292
1293
1294@RDSSnapshot.action_registry.register('set-permissions')
1295class SetPermissions(BaseAction):
1296 """Set permissions for copying or restoring an RDS snapshot
1297
1298 Use the 'add' and 'remove' parameters to control which accounts to
1299 add or remove, respectively. The default is to remove any
1300 permissions granted to other AWS accounts.
1301
1302 Use `remove: matched` in combination with the `cross-account` filter
1303 for more flexible removal options such as preserving access for
1304 a set of whitelisted accounts:
1305
1306 :example:
1307
1308 .. code-block:: yaml
1309
1310 policies:
1311 - name: rds-snapshot-remove-cross-account
1312 resource: rds-snapshot
1313 filters:
1314 - type: cross-account
1315 whitelist:
1316 - '112233445566'
1317 actions:
1318 - type: set-permissions
1319 remove: matched
1320 """
1321 schema = type_schema(
1322 'set-permissions',
1323 remove={'oneOf': [
1324 {'enum': ['matched']},
1325 {'type': 'array', 'items': {
1326 'oneOf': [
1327 {'type': 'string', 'minLength': 12, 'maxLength': 12},
1328 {'enum': ['all']},
1329 ],
1330 }}
1331 ]},
1332 add={
1333 'type': 'array', 'items': {
1334 'oneOf': [
1335 {'type': 'string', 'minLength': 12, 'maxLength': 12},
1336 {'enum': ['all']},
1337 ]
1338 }
1339 }
1340 )
1341
1342 permissions = ('rds:ModifyDBSnapshotAttribute',)
1343
1344 def validate(self):
1345 if self.data.get('remove') == 'matched':
1346 found = False
1347 for f in self.manager.iter_filters():
1348 if isinstance(f, CrossAccountAccessFilter):
1349 found = True
1350 break
1351 if not found:
1352 raise PolicyValidationError(
1353 "policy:%s filter:%s with matched requires cross-account filter" % (
1354 self.manager.ctx.policy.name, self.type))
1355
1356 def process(self, snapshots):
1357 client = local_session(self.manager.session_factory).client('rds')
1358 for s in snapshots:
1359 self.process_snapshot(client, s)
1360
1361 def process_snapshot(self, client, snapshot):
1362 add_accounts = self.data.get('add', [])
1363 remove_accounts = self.data.get('remove', [])
1364
1365 if not (add_accounts or remove_accounts):
1366 if CrossAccountAccess.attributes_key not in snapshot:
1367 attrs = {
1368 t['AttributeName']: t['AttributeValues']
1369 for t in self.manager.retry(
1370 client.describe_db_snapshot_attributes,
1371 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier']
1372 )['DBSnapshotAttributesResult']['DBSnapshotAttributes']
1373 }
1374 snapshot[CrossAccountAccess.attributes_key] = attrs
1375 remove_accounts = snapshot[CrossAccountAccess.attributes_key].get('restore', [])
1376 elif remove_accounts == 'matched':
1377 remove_accounts = snapshot.get(CrossAccountAccess.annotation_key, [])
1378
1379 if add_accounts or remove_accounts:
1380 client.modify_db_snapshot_attribute(
1381 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier'],
1382 AttributeName='restore',
1383 ValuesToRemove=remove_accounts,
1384 ValuesToAdd=add_accounts)
1385
1386
1387@RDSSnapshot.action_registry.register('region-copy')
1388class RegionCopySnapshot(BaseAction):
1389 """Copy a snapshot across regions.
1390
1391 Note there is a max in flight for cross region rds snapshots
1392 of 5 per region. This action will attempt to retry automatically
1393 for an hr.
1394
1395 Example::
1396
1397 - name: copy-encrypted-snapshots
1398 description: |
1399 copy snapshots under 1 day old to dr region with kms
1400 resource: rds-snapshot
1401 region: us-east-1
1402 filters:
1403 - Status: available
1404 - type: value
1405 key: SnapshotCreateTime
1406 value_type: age
1407 value: 1
1408 op: less-than
1409 actions:
1410 - type: region-copy
1411 target_region: us-east-2
1412 target_key: arn:aws:kms:us-east-2:0000:key/cb291f53-c9cf61
1413 copy_tags: true
1414 tags:
1415 OriginRegion: us-east-1
1416 """
1417
1418 schema = type_schema(
1419 'region-copy',
1420 target_region={'type': 'string'},
1421 target_key={'type': 'string'},
1422 copy_tags={'type': 'boolean'},
1423 tags={'type': 'object'},
1424 required=('target_region',))
1425
1426 permissions = ('rds:CopyDBSnapshot',)
1427 min_delay = 120
1428 max_attempts = 30
1429
1430 def validate(self):
1431 if self.data.get('target_region') and self.manager.data.get('mode'):
1432 raise PolicyValidationError(
1433 "cross region snapshot may require waiting for "
1434 "longer then lambda runtime allows %s" % (self.manager.data,))
1435 return self
1436
1437 def process(self, resources):
1438 if self.data['target_region'] == self.manager.config.region:
1439 self.log.warning(
1440 "Source and destination region are the same, skipping copy")
1441 return
1442 for resource_set in chunks(resources, 20):
1443 self.process_resource_set(resource_set)
1444
1445 def process_resource(self, target, key, tags, snapshot):
1446 p = {}
1447 if key:
1448 p['KmsKeyId'] = key
1449 p['TargetDBSnapshotIdentifier'] = snapshot[
1450 'DBSnapshotIdentifier'].replace(':', '-')
1451 p['SourceRegion'] = self.manager.config.region
1452 p['SourceDBSnapshotIdentifier'] = snapshot['DBSnapshotArn']
1453
1454 if self.data.get('copy_tags', True):
1455 p['CopyTags'] = True
1456 if tags:
1457 p['Tags'] = tags
1458
1459 retry = get_retry(
1460 ('SnapshotQuotaExceeded',),
1461 # TODO make this configurable, class defaults to 1hr
1462 min_delay=self.min_delay,
1463 max_attempts=self.max_attempts,
1464 log_retries=logging.DEBUG)
1465 try:
1466 result = retry(target.copy_db_snapshot, **p)
1467 except ClientError as e:
1468 if e.response['Error']['Code'] == 'DBSnapshotAlreadyExists':
1469 self.log.warning(
1470 "Snapshot %s already exists in target region",
1471 snapshot['DBSnapshotIdentifier'])
1472 return
1473 raise
1474 snapshot['c7n:CopiedSnapshot'] = result[
1475 'DBSnapshot']['DBSnapshotArn']
1476
1477 def process_resource_set(self, resource_set):
1478 target_client = self.manager.session_factory(
1479 region=self.data['target_region']).client('rds')
1480 target_key = self.data.get('target_key')
1481 tags = [{'Key': k, 'Value': v} for k, v
1482 in self.data.get('tags', {}).items()]
1483
1484 for snapshot_set in chunks(resource_set, 5):
1485 for r in snapshot_set:
1486 # If tags are supplied, copy tags are ignored, and
1487 # we need to augment the tag set with the original
1488 # resource tags to preserve the common case.
1489 rtags = tags and list(tags) or None
1490 if tags and self.data.get('copy_tags', True):
1491 rtags.extend(r['Tags'])
1492 self.process_resource(target_client, target_key, rtags, r)
1493
1494
1495@RDSSnapshot.action_registry.register('delete')
1496class RDSSnapshotDelete(BaseAction):
1497 """Deletes a RDS snapshot resource
1498
1499 :example:
1500
1501 .. code-block:: yaml
1502
1503 policies:
1504 - name: rds-snapshot-delete-stale
1505 resource: rds-snapshot
1506 filters:
1507 - type: age
1508 days: 28
1509 op: ge
1510 actions:
1511 - delete
1512 """
1513
1514 schema = type_schema('delete')
1515 permissions = ('rds:DeleteDBSnapshot',)
1516
1517 def process(self, snapshots):
1518 snapshots = self.filter_resources(snapshots, 'SnapshotType', ('manual',))
1519 if not snapshots:
1520 return []
1521 log.info("Deleting %d rds snapshots", len(snapshots))
1522 with self.executor_factory(max_workers=3) as w:
1523 futures = []
1524 for snapshot_set in chunks(reversed(snapshots), size=50):
1525 futures.append(
1526 w.submit(self.process_snapshot_set, snapshot_set))
1527 for f in as_completed(futures):
1528 if f.exception():
1529 self.log.error(
1530 "Exception deleting snapshot set \n %s",
1531 f.exception())
1532 return snapshots
1533
1534 def process_snapshot_set(self, snapshots_set):
1535 c = local_session(self.manager.session_factory).client('rds')
1536 for s in snapshots_set:
1537 c.delete_db_snapshot(
1538 DBSnapshotIdentifier=s['DBSnapshotIdentifier'])
1539
1540
1541@actions.register('modify-security-groups')
1542class RDSModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
1543
1544 permissions = ('rds:ModifyDBInstance', 'rds:ModifyDBCluster')
1545 vpc_expr = 'DBSubnetGroup.VpcId'
1546
1547 def process(self, rds_instances):
1548 replication_group_map = {}
1549 client = local_session(self.manager.session_factory).client('rds')
1550 groups = super(RDSModifyVpcSecurityGroups, self).get_groups(
1551 rds_instances)
1552
1553 # either build map for DB cluster or modify DB instance directly
1554 for idx, i in enumerate(rds_instances):
1555 if i.get('DBClusterIdentifier'):
1556 # build map of Replication Groups to Security Groups
1557 replication_group_map[i['DBClusterIdentifier']] = groups[idx]
1558 else:
1559 client.modify_db_instance(
1560 DBInstanceIdentifier=i['DBInstanceIdentifier'],
1561 VpcSecurityGroupIds=groups[idx])
1562
1563 # handle DB cluster, if necessary
1564 for idx, r in enumerate(replication_group_map.keys()):
1565 client.modify_db_cluster(
1566 DBClusterIdentifier=r,
1567 VpcSecurityGroupIds=replication_group_map[r]
1568 )
1569
1570
1571class DescribeSubnetGroup(DescribeSource):
1572
1573 def augment(self, resources):
1574 _db_subnet_group_tags(
1575 resources, self.manager.session_factory,
1576 self.manager.executor_factory, self.manager.retry)
1577 return resources
1578
1579
1580@resources.register('rds-subnet-group')
1581class RDSSubnetGroup(QueryResourceManager):
1582 """RDS subnet group."""
1583
1584 class resource_type(TypeInfo):
1585 service = 'rds'
1586 arn_type = 'subgrp'
1587 id = name = 'DBSubnetGroupName'
1588 arn_separator = ':'
1589 enum_spec = (
1590 'describe_db_subnet_groups', 'DBSubnetGroups', None)
1591 filter_name = 'DBSubnetGroupName'
1592 filter_type = 'scalar'
1593 permissions_enum = ('rds:DescribeDBSubnetGroups',)
1594 cfn_type = config_type = 'AWS::RDS::DBSubnetGroup'
1595 universal_taggable = object()
1596
1597 source_mapping = {
1598 'config': ConfigSource,
1599 'describe': DescribeSubnetGroup
1600 }
1601
1602
1603def _db_subnet_group_tags(subnet_groups, session_factory, executor_factory, retry):
1604 client = local_session(session_factory).client('rds')
1605
1606 def process_tags(g):
1607 try:
1608 g['Tags'] = client.list_tags_for_resource(
1609 ResourceName=g['DBSubnetGroupArn'])['TagList']
1610 return g
1611 except client.exceptions.DBSubnetGroupNotFoundFault:
1612 return None
1613
1614 return list(filter(None, map(process_tags, subnet_groups)))
1615
1616
1617@RDSSubnetGroup.action_registry.register('delete')
1618class RDSSubnetGroupDeleteAction(BaseAction):
1619 """Action to delete RDS Subnet Group
1620
1621 It is recommended to apply a filter to the delete policy to avoid unwanted
1622 deletion of any rds subnet groups.
1623
1624 :example:
1625
1626 .. code-block:: yaml
1627
1628 policies:
1629 - name: rds-subnet-group-delete
1630 resource: rds-subnet-group
1631 filters:
1632 - Instances: []
1633 actions:
1634 - delete
1635 """
1636
1637 schema = type_schema('delete')
1638 permissions = ('rds:DeleteDBSubnetGroup',)
1639
1640 def process(self, subnet_group):
1641 with self.executor_factory(max_workers=2) as w:
1642 list(w.map(self.process_subnetgroup, subnet_group))
1643
1644 def process_subnetgroup(self, subnet_group):
1645 client = local_session(self.manager.session_factory).client('rds')
1646 client.delete_db_subnet_group(DBSubnetGroupName=subnet_group['DBSubnetGroupName'])
1647
1648
1649@RDSSubnetGroup.filter_registry.register('unused')
1650class UnusedRDSSubnetGroup(Filter):
1651 """Filters all launch rds subnet groups that are not in use but exist
1652
1653 :example:
1654
1655 .. code-block:: yaml
1656
1657 policies:
1658 - name: rds-subnet-group-delete-unused
1659 resource: rds-subnet-group
1660 filters:
1661 - unused
1662 """
1663
1664 schema = type_schema('unused')
1665
1666 def get_permissions(self):
1667 return self.manager.get_resource_manager('rds').get_permissions()
1668
1669 def process(self, configs, event=None):
1670 rds = self.manager.get_resource_manager('rds').resources()
1671 self.used = set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName', rds))
1672 self.used.update(set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName',
1673 self.manager.get_resource_manager('rds-cluster').resources(augment=False))))
1674 return super(UnusedRDSSubnetGroup, self).process(configs)
1675
1676 def __call__(self, config):
1677 return config['DBSubnetGroupName'] not in self.used
1678
1679
1680@filters.register('db-parameter')
1681class ParameterFilter(ValueFilter):
1682 """
1683 Applies value type filter on set db parameter values.
1684 :example:
1685
1686 .. code-block:: yaml
1687
1688 policies:
1689 - name: rds-pg
1690 resource: rds
1691 filters:
1692 - type: db-parameter
1693 key: someparam
1694 op: eq
1695 value: someval
1696 """
1697
1698 schema = type_schema('db-parameter', rinherit=ValueFilter.schema)
1699 schema_alias = False
1700 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters', )
1701 policy_annotation = 'c7n:MatchedDBParameter'
1702
1703 @staticmethod
1704 def recast(val, datatype):
1705 """ Re-cast the value based upon an AWS supplied datatype
1706 and treat nulls sensibly.
1707 """
1708 ret_val = val
1709 if datatype == 'string':
1710 ret_val = str(val)
1711 elif datatype == 'boolean':
1712 # AWS returns 1s and 0s for boolean for most of the cases
1713 if val.isdigit():
1714 ret_val = bool(int(val))
1715 # AWS returns 'TRUE,FALSE' for Oracle engine
1716 elif val == 'TRUE':
1717 ret_val = True
1718 elif val == 'FALSE':
1719 ret_val = False
1720 elif datatype == 'integer':
1721 if val.isdigit():
1722 ret_val = int(val)
1723 elif datatype == 'float':
1724 ret_val = float(val) if val else 0.0
1725
1726 return ret_val
1727
1728 # Private method for 'DBParameterGroupName' paginator
1729 def _get_param_list(self, pg):
1730 client = local_session(self.manager.session_factory).client('rds')
1731 paginator = client.get_paginator('describe_db_parameters')
1732 param_list = list(itertools.chain(*[p['Parameters']
1733 for p in paginator.paginate(DBParameterGroupName=pg)]))
1734 return param_list
1735
1736 def handle_paramgroup_cache(self, param_groups):
1737 pgcache = {}
1738 cache = self.manager._cache
1739
1740 with cache:
1741 for pg in param_groups:
1742 cache_key = {
1743 'region': self.manager.config.region,
1744 'account_id': self.manager.config.account_id,
1745 'rds-pg': pg}
1746 pg_values = cache.get(cache_key)
1747 if pg_values is not None:
1748 pgcache[pg] = pg_values
1749 continue
1750 param_list = self._get_param_list(pg)
1751 pgcache[pg] = {
1752 p['ParameterName']: self.recast(p['ParameterValue'], p['DataType'])
1753 for p in param_list if 'ParameterValue' in p}
1754 cache.save(cache_key, pgcache[pg])
1755 return pgcache
1756
1757 def process(self, resources, event=None):
1758 results = []
1759 parameter_group_list = {db['DBParameterGroups'][0]['DBParameterGroupName']
1760 for db in resources}
1761 paramcache = self.handle_paramgroup_cache(parameter_group_list)
1762 for resource in resources:
1763 for pg in resource['DBParameterGroups']:
1764 pg_values = paramcache[pg['DBParameterGroupName']]
1765 if self.match(pg_values):
1766 resource.setdefault(self.policy_annotation, []).append(
1767 self.data.get('key'))
1768 results.append(resource)
1769 break
1770 return results
1771
1772
1773@actions.register('modify-db')
1774class ModifyDb(BaseAction):
1775 """Modifies an RDS instance based on specified parameter
1776 using ModifyDbInstance.
1777
1778 'Update' is an array with with key value pairs that should be set to
1779 the property and value you wish to modify.
1780 'Immediate" determines whether the modification is applied immediately
1781 or not. If 'immediate' is not specified, default is false.
1782
1783 :example:
1784
1785 .. code-block:: yaml
1786
1787 policies:
1788 - name: disable-rds-deletion-protection
1789 resource: rds
1790 filters:
1791 - DeletionProtection: true
1792 - PubliclyAccessible: true
1793 actions:
1794 - type: modify-db
1795 update:
1796 - property: 'DeletionProtection'
1797 value: false
1798 - property: 'PubliclyAccessible'
1799 value: false
1800 immediate: true
1801 """
1802
1803 schema = type_schema(
1804 'modify-db',
1805 immediate={"type": 'boolean'},
1806 update={
1807 'type': 'array',
1808 'items': {
1809 'type': 'object',
1810 'properties': {
1811 'property': {'type': 'string', 'enum': [
1812 'AllocatedStorage',
1813 'DBInstanceClass',
1814 'DBSubnetGroupName',
1815 'DBSecurityGroups',
1816 'VpcSecurityGroupIds',
1817 'MasterUserPassword',
1818 'DBParameterGroupName',
1819 'BackupRetentionPeriod',
1820 'PreferredBackupWindow',
1821 'PreferredMaintenanceWindow',
1822 'MultiAZ',
1823 'EngineVersion',
1824 'AllowMajorVersionUpgrade',
1825 'AutoMinorVersionUpgrade',
1826 'LicenseModel',
1827 'Iops',
1828 'OptionGroupName',
1829 'NewDBInstanceIdentifier',
1830 'StorageType',
1831 'TdeCredentialArn',
1832 'TdeCredentialPassword',
1833 'CACertificateIdentifier',
1834 'Domain',
1835 'CopyTagsToSnapshot',
1836 'MonitoringInterval',
1837 'MonitoringRoleARN',
1838 'DBPortNumber',
1839 'PubliclyAccessible',
1840 'DomainIAMRoleName',
1841 'PromotionTier',
1842 'EnableIAMDatabaseAuthentication',
1843 'EnablePerformanceInsights',
1844 'PerformanceInsightsKMSKeyId',
1845 'PerformanceInsightsRetentionPeriod',
1846 'CloudwatchLogsExportConfiguration',
1847 'ProcessorFeatures',
1848 'UseDefaultProcessorFeatures',
1849 'DeletionProtection',
1850 'MaxAllocatedStorage',
1851 'CertificateRotationRestart']},
1852 'value': {}
1853 },
1854 },
1855 },
1856 required=('update',))
1857
1858 permissions = ('rds:ModifyDBInstance',)
1859 conversion_map = {
1860 'DBSubnetGroupName': 'DBSubnetGroup.DBSubnetGroupName',
1861 'VpcSecurityGroupIds': 'VpcSecurityGroups[].VpcSecurityGroupId',
1862 'DBParameterGroupName': 'DBParameterGroups[].DBParameterGroupName',
1863 'OptionGroupName': 'OptionGroupMemberships[].OptionGroupName',
1864 'NewDBInstanceIdentifier': 'DBInstanceIdentifier',
1865 'Domain': 'DomainMemberships[].DomainName',
1866 'DBPortNumber': 'Endpoint.Port',
1867 'EnablePerformanceInsights': 'PerformanceInsightsEnabled',
1868 'CloudwatchLogsExportConfiguration': 'EnabledCloudwatchLogsExports'
1869 }
1870
1871 def validate(self):
1872 if self.data.get('update'):
1873 update_dict = dict((i['property'], i['value']) for i in self.data.get('update'))
1874 if ('MonitoringInterval' in update_dict and update_dict['MonitoringInterval'] > 0 and
1875 'MonitoringRoleARN' not in update_dict):
1876 raise PolicyValidationError(
1877 "A MonitoringRoleARN value is required \
1878 if you specify a MonitoringInterval value other than 0")
1879 if ('CloudwatchLogsExportConfiguration' in update_dict
1880 and all(
1881 k not in update_dict.get('CloudwatchLogsExportConfiguration')
1882 for k in ('EnableLogTypes', 'DisableLogTypes'))):
1883 raise PolicyValidationError(
1884 "A EnableLogTypes or DisableLogTypes input list is required\
1885 for setting CloudwatchLogsExportConfiguration")
1886 return self
1887
1888 def process(self, resources):
1889 c = local_session(self.manager.session_factory).client('rds')
1890 for r in resources:
1891 param = {
1892 u['property']: u['value'] for u in self.data.get('update')
1893 if r.get(
1894 u['property'],
1895 jmespath_search(
1896 self.conversion_map.get(u['property'], 'None'), r))
1897 != u['value']}
1898 if not param:
1899 continue
1900 param['ApplyImmediately'] = self.data.get('immediate', False)
1901 param['DBInstanceIdentifier'] = r['DBInstanceIdentifier']
1902 try:
1903 c.modify_db_instance(**param)
1904 except c.exceptions.DBInstanceNotFoundFault:
1905 raise
1906
1907
1908@resources.register('rds-reserved')
1909class ReservedRDS(QueryResourceManager):
1910 """Lists all active rds reservations
1911
1912 :example:
1913
1914 .. code-block:: yaml
1915
1916 policies:
1917 - name: existing-rds-reservations
1918 resource: rds-reserved
1919 filters:
1920 - State: active
1921 """
1922
1923 class resource_type(TypeInfo):
1924 service = 'rds'
1925 name = id = 'ReservedDBInstanceId'
1926 date = 'StartTime'
1927 enum_spec = (
1928 'describe_reserved_db_instances', 'ReservedDBInstances', None)
1929 filter_name = 'ReservedDBInstances'
1930 filter_type = 'list'
1931 arn_type = "ri"
1932 arn = "ReservedDBInstanceArn"
1933 permissions_enum = ('rds:DescribeReservedDBInstances',)
1934 universal_taggable = object()
1935
1936 augment = universal_augment
1937
1938
1939RDS.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter)
1940
1941
1942@filters.register('consecutive-snapshots')
1943class ConsecutiveSnapshots(Filter):
1944 """Returns instances where number of consective daily snapshots is
1945 equal to/or greater than n days.
1946
1947 :example:
1948
1949 .. code-block:: yaml
1950
1951 policies:
1952 - name: rds-daily-snapshot-count
1953 resource: rds
1954 filters:
1955 - type: consecutive-snapshots
1956 days: 7
1957 """
1958 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1},
1959 required=['days'])
1960 permissions = ('rds:DescribeDBSnapshots', 'rds:DescribeDBInstances')
1961 annotation = 'c7n:DBSnapshots'
1962
1963 def process_resource_set(self, client, resources):
1964 rds_instances = [r['DBInstanceIdentifier'] for r in resources]
1965 paginator = client.get_paginator('describe_db_snapshots')
1966 paginator.PAGE_ITERATOR_CLS = RetryPageIterator
1967 db_snapshots = paginator.paginate(Filters=[{'Name': 'db-instance-id',
1968 'Values': rds_instances}]).build_full_result().get('DBSnapshots', [])
1969
1970 inst_map = {}
1971 for snapshot in db_snapshots:
1972 inst_map.setdefault(snapshot['DBInstanceIdentifier'], []).append(snapshot)
1973 for r in resources:
1974 r[self.annotation] = inst_map.get(r['DBInstanceIdentifier'], [])
1975
1976 def process(self, resources, event=None):
1977 client = local_session(self.manager.session_factory).client('rds')
1978 results = []
1979 retention = self.data.get('days')
1980 utcnow = datetime.datetime.utcnow()
1981 expected_dates = set()
1982 for days in range(1, retention + 1):
1983 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d'))
1984
1985 for resource_set in chunks(
1986 [r for r in resources if self.annotation not in r], 50):
1987 self.process_resource_set(client, resource_set)
1988
1989 for r in resources:
1990 snapshot_dates = set()
1991 for snapshot in r[self.annotation]:
1992 if snapshot['Status'] == 'available':
1993 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d'))
1994 if expected_dates.issubset(snapshot_dates):
1995 results.append(r)
1996 return results
1997
1998
1999@filters.register('engine')
2000class EngineFilter(ValueFilter):
2001 """
2002 Filter a rds resource based on its Engine Metadata
2003
2004 :example:
2005
2006 .. code-block:: yaml
2007
2008 policies:
2009 - name: find-deprecated-versions
2010 resource: aws.rds
2011 filters:
2012 - type: engine
2013 key: Status
2014 value: deprecated
2015 """
2016
2017 schema = type_schema('engine', rinherit=ValueFilter.schema)
2018
2019 permissions = ("rds:DescribeDBEngineVersions", )
2020
2021 def process(self, resources, event=None):
2022 client = local_session(self.manager.session_factory).client('rds')
2023
2024 engines = set()
2025 engine_versions = set()
2026 for r in resources:
2027 engines.add(r['Engine'])
2028 engine_versions.add(r['EngineVersion'])
2029
2030 paginator = client.get_paginator('describe_db_engine_versions')
2031 response = paginator.paginate(
2032 Filters=[
2033 {'Name': 'engine', 'Values': list(engines)},
2034 {'Name': 'engine-version', 'Values': list(engine_versions)}
2035 ],
2036 IncludeAll=True,
2037 )
2038 all_versions = {}
2039 matched = []
2040 for page in response:
2041 for e in page['DBEngineVersions']:
2042 all_versions.setdefault(e['Engine'], {})
2043 all_versions[e['Engine']][e['EngineVersion']] = e
2044 for r in resources:
2045 v = all_versions[r['Engine']][r['EngineVersion']]
2046 if self.match(v):
2047 r['c7n:Engine'] = v
2048 matched.append(r)
2049 return matched
2050
2051
2052class DescribeDBProxy(DescribeSource):
2053 def augment(self, resources):
2054 return universal_augment(self.manager, resources)
2055
2056
2057@resources.register('rds-proxy')
2058class RDSProxy(QueryResourceManager):
2059 """Resource Manager for RDS DB Proxies
2060
2061 :example:
2062
2063 .. code-block:: yaml
2064
2065 policies:
2066 - name: rds-proxy-tls-check
2067 resource: rds-proxy
2068 filters:
2069 - type: value
2070 key: RequireTLS
2071 value: false
2072 """
2073
2074 class resource_type(TypeInfo):
2075 service = 'rds'
2076 name = id = 'DBProxyName'
2077 date = 'CreatedDate'
2078 enum_spec = ('describe_db_proxies', 'DBProxies', None)
2079 arn = 'DBProxyArn'
2080 arn_type = 'db-proxy'
2081 cfn_type = 'AWS::RDS::DBProxy'
2082 permissions_enum = ('rds:DescribeDBProxies',)
2083 universal_taggable = object()
2084
2085 source_mapping = {
2086 'describe': DescribeDBProxy,
2087 'config': ConfigSource
2088 }
2089
2090
2091@RDSProxy.action_registry.register('delete')
2092class DeleteRDSProxy(BaseAction):
2093 """
2094 Deletes a RDS Proxy
2095
2096 :example:
2097
2098 .. code-block:: yaml
2099
2100 policies:
2101 - name: delete-rds-proxy
2102 resource: aws.rds-proxy
2103 filters:
2104 - type: value
2105 key: "DBProxyName"
2106 op: eq
2107 value: "proxy-test-1"
2108 actions:
2109 - type: delete
2110 """
2111
2112 schema = type_schema('delete')
2113
2114 permissions = ('rds:DeleteDBProxy',)
2115
2116 def process(self, resources):
2117 client = local_session(self.manager.session_factory).client('rds')
2118 for r in resources:
2119 self.manager.retry(
2120 client.delete_db_proxy, DBProxyName=r['DBProxyName'],
2121 ignore_err_codes=('DBProxyNotFoundFault',
2122 'InvalidDBProxyStateFault'))
2123
2124
2125@RDSProxy.filter_registry.register('subnet')
2126class RDSProxySubnetFilter(net_filters.SubnetFilter):
2127
2128 RelatedIdsExpression = "VpcSubnetIds[]"
2129
2130
2131@RDSProxy.filter_registry.register('security-group')
2132class RDSProxySecurityGroupFilter(net_filters.SecurityGroupFilter):
2133
2134 RelatedIdsExpression = "VpcSecurityGroupIds[]"
2135
2136
2137@RDSProxy.filter_registry.register('vpc')
2138class RDSProxyVpcFilter(net_filters.VpcFilter):
2139
2140 RelatedIdsExpression = "VpcId"
2141
2142
2143@filters.register('db-option-groups')
2144class DbOptionGroups(ValueFilter):
2145 """This filter describes RDS option groups for associated RDS instances.
2146 Use this filter in conjunction with jmespath and value filter operators
2147 to filter RDS instance based on their option groups
2148
2149 :example:
2150
2151 .. code-block:: yaml
2152
2153 policies:
2154 - name: rds-data-in-transit-encrypted
2155 resource: aws.rds
2156 filters:
2157 - type: db-option-groups
2158 key: Options[].OptionName
2159 op: intersect
2160 value:
2161 - SSL
2162 - NATIVE_NETWORK_ENCRYPTION
2163
2164 :example:
2165
2166 .. code-block:: yaml
2167
2168 policies:
2169 - name: rds-oracle-encryption-in-transit
2170 resource: aws.rds
2171 filters:
2172 - Engine: oracle-ee
2173 - type: db-option-groups
2174 key: Options[].OptionSettings[?Name == 'SQLNET.ENCRYPTION_SERVER'].Value[]
2175 value:
2176 - REQUIRED
2177 """
2178
2179 schema = type_schema('db-option-groups', rinherit=ValueFilter.schema)
2180 schema_alias = False
2181 permissions = ('rds:DescribeDBInstances', 'rds:DescribeOptionGroups', )
2182 policy_annotation = 'c7n:MatchedDBOptionGroups'
2183
2184 def handle_optiongroup_cache(self, client, paginator, option_groups):
2185 ogcache = {}
2186 cache = self.manager._cache
2187
2188 with cache:
2189 for og in option_groups:
2190 cache_key = {
2191 'region': self.manager.config.region,
2192 'account_id': self.manager.config.account_id,
2193 'rds-pg': og}
2194 og_values = cache.get(cache_key)
2195 if og_values is not None:
2196 ogcache[og] = og_values
2197 continue
2198 option_groups_list = list(itertools.chain(*[p['OptionGroupsList']
2199 for p in paginator.paginate(OptionGroupName=og)]))
2200
2201 ogcache[og] = {}
2202 for option_group in option_groups_list:
2203 ogcache[og] = option_group
2204
2205 cache.save(cache_key, ogcache[og])
2206
2207 return ogcache
2208
2209 def process(self, resources, event=None):
2210 results = []
2211 client = local_session(self.manager.session_factory).client('rds')
2212 paginator = client.get_paginator('describe_option_groups')
2213 option_groups = [db['OptionGroupMemberships'][0]['OptionGroupName']
2214 for db in resources]
2215 optioncache = self.handle_optiongroup_cache(client, paginator, option_groups)
2216
2217 for resource in resources:
2218 for og in resource['OptionGroupMemberships']:
2219 og_values = optioncache[og['OptionGroupName']]
2220 if self.match(og_values):
2221 resource.setdefault(self.policy_annotation, []).append({
2222 k: jmespath_search(k, og_values)
2223 for k in {'OptionGroupName', self.data.get('key')}
2224 })
2225 results.append(resource)
2226 break
2227
2228 return results
2229
2230
2231@filters.register('pending-maintenance')
2232class PendingMaintenance(Filter):
2233 """Scan DB instances for those with pending maintenance
2234
2235 :example:
2236
2237 .. code-block:: yaml
2238
2239 policies:
2240 - name: rds-pending-maintenance
2241 resource: aws.rds
2242 filters:
2243 - pending-maintenance
2244 - type: value
2245 key: '"c7n:PendingMaintenance"[].PendingMaintenanceActionDetails[].Action'
2246 op: intersect
2247 value:
2248 - system-update
2249 """
2250
2251 annotation_key = 'c7n:PendingMaintenance'
2252 schema = type_schema('pending-maintenance')
2253 permissions = ('rds:DescribePendingMaintenanceActions',)
2254
2255 def process(self, resources, event=None):
2256 client = local_session(self.manager.session_factory).client('rds')
2257
2258 results = []
2259 resource_maintenances = {}
2260 paginator = client.get_paginator('describe_pending_maintenance_actions')
2261 for page in paginator.paginate():
2262 for action in page['PendingMaintenanceActions']:
2263 resource_maintenances.setdefault(action['ResourceIdentifier'], []).append(action)
2264
2265 for r in resources:
2266 pending_maintenances = resource_maintenances.get(r['DBInstanceArn'], [])
2267 if len(pending_maintenances) > 0:
2268 r[self.annotation_key] = pending_maintenances
2269 results.append(r)
2270
2271 return results