1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import json
4import itertools
5
6from botocore.exceptions import ClientError
7from concurrent.futures import as_completed
8
9from c7n.actions import BaseAction, ModifyVpcSecurityGroupsAction
10from c7n.exceptions import PolicyValidationError
11from c7n.filters import (
12 ValueFilter, AgeFilter, CrossAccountAccessFilter, Filter)
13import c7n.filters.vpc as net_filters
14from c7n.filters.kms import KmsRelatedFilter
15from c7n.filters.offhours import OffHour, OnHour
16from c7n.manager import resources
17from c7n.resolver import ValuesFrom
18from c7n.query import QueryResourceManager, TypeInfo, RetryPageIterator
19from c7n import tags
20from c7n.utils import (
21 type_schema, local_session, chunks, snapshot_identifier, jmespath_search)
22from .aws import shape_validate
23from datetime import datetime, timedelta
24from c7n.filters.backup import ConsecutiveAwsBackupsFilter
25
26
27@resources.register('redshift')
28class Redshift(QueryResourceManager):
29
30 class resource_type(TypeInfo):
31 service = 'redshift'
32 arn_type = 'cluster'
33 arn_separator = ":"
34 enum_spec = ('describe_clusters', 'Clusters', None)
35 name = id = 'ClusterIdentifier'
36 filter_name = 'ClusterIdentifier'
37 filter_type = 'scalar'
38 date = 'ClusterCreateTime'
39 dimension = 'ClusterIdentifier'
40 cfn_type = config_type = "AWS::Redshift::Cluster"
41
42
43Redshift.filter_registry.register('marked-for-op', tags.TagActionFilter)
44Redshift.filter_registry.register('network-location', net_filters.NetworkLocation)
45Redshift.filter_registry.register('offhour', OffHour)
46Redshift.filter_registry.register('onhour', OnHour)
47Redshift.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter)
48
49
50@Redshift.filter_registry.register('default-vpc')
51class DefaultVpc(net_filters.DefaultVpcBase):
52 """ Matches if an redshift database is in the default vpc
53
54 :example:
55
56 .. code-block:: yaml
57
58 policies:
59 - name: redshift-default-vpc
60 resource: redshift
61 filters:
62 - default-vpc
63 """
64
65 schema = type_schema('default-vpc')
66
67 def __call__(self, redshift):
68 return (redshift.get('VpcId') and
69 self.match(redshift.get('VpcId')) or False)
70
71
72@Redshift.filter_registry.register('logging')
73class LoggingFilter(ValueFilter):
74 """ Checks Redshift logging status and attributes.
75
76 :example:
77
78 .. code-block:: yaml
79
80
81 policies:
82
83 - name: redshift-logging-bucket-and-prefix-test
84 resource: redshift
85 filters:
86 - type: logging
87 key: LoggingEnabled
88 value: true
89 - type: logging
90 key: S3KeyPrefix
91 value: "accounts/{account_id}"
92 - type: logging
93 key: BucketName
94 value: "redshiftlogs"
95
96
97 """
98 permissions = ("redshift:DescribeLoggingStatus",)
99 schema = type_schema('logging', rinherit=ValueFilter.schema)
100 annotation_key = 'c7n:logging'
101
102 def process(self, clusters, event=None):
103 client = local_session(self.manager.session_factory).client('redshift')
104 results = []
105 for cluster in clusters:
106 if self.annotation_key not in cluster:
107 try:
108 result = client.describe_logging_status(
109 ClusterIdentifier=cluster['ClusterIdentifier'])
110 result.pop('ResponseMetadata')
111 except client.exceptions.ClusterNotFound:
112 continue
113 cluster[self.annotation_key] = result
114
115 if self.match(cluster[self.annotation_key]):
116 results.append(cluster)
117 return results
118
119
120@Redshift.action_registry.register('pause')
121class Pause(BaseAction):
122
123 schema = type_schema('pause')
124 permissions = ('redshift:PauseCluster',)
125
126 def process(self, resources):
127 client = local_session(
128 self.manager.session_factory).client('redshift')
129 for r in self.filter_resources(resources, 'ClusterStatus', ('available',)):
130 try:
131 client.pause_cluster(
132 ClusterIdentifier=r['ClusterIdentifier'])
133 except (client.exceptions.ClusterNotFoundFault,
134 client.exceptions.InvalidClusterStateFault):
135 raise
136
137
138@Redshift.action_registry.register('resume')
139class Resume(BaseAction):
140
141 schema = type_schema('resume')
142 permissions = ('redshift:ResumeCluster',)
143
144 def process(self, resources):
145 client = local_session(
146 self.manager.session_factory).client('redshift')
147 for r in self.filter_resources(resources, 'ClusterStatus', ('paused',)):
148 try:
149 client.resume_cluster(
150 ClusterIdentifier=r['ClusterIdentifier'])
151 except (client.exceptions.ClusterNotFoundFault,
152 client.exceptions.InvalidClusterStateFault):
153 raise
154
155
156@Redshift.action_registry.register('set-logging')
157class SetRedshiftLogging(BaseAction):
158 """Action to enable/disable Redshift logging for a Redshift Cluster.
159
160 :example:
161
162 .. code-block:: yaml
163
164 policies:
165 - name: redshift-test
166 resource: redshift
167 filters:
168 - type: logging
169 key: LoggingEnabled
170 value: false
171 actions:
172 - type: set-logging
173 bucket: redshiftlogtest
174 prefix: redshiftlogs
175 state: enabled
176 """
177 schema = type_schema(
178 'set-logging',
179 state={'enum': ['enabled', 'disabled']},
180 bucket={'type': 'string'},
181 prefix={'type': 'string'},
182 required=('state',))
183
184 def get_permissions(self):
185 perms = ('redshift:EnableLogging',)
186 if self.data.get('state') == 'disabled':
187 return ('redshift:DisableLogging',)
188 return perms
189
190 def validate(self):
191 if self.data.get('state') == 'enabled':
192 if 'bucket' not in self.data:
193 raise PolicyValidationError((
194 "redshift logging enablement requires `bucket` "
195 "and `prefix` specification on %s" % (self.manager.data,)))
196 return self
197
198 def process(self, resources):
199 client = local_session(self.manager.session_factory).client('redshift')
200 for redshift in resources:
201 redshift_id = redshift['ClusterIdentifier']
202
203 if self.data.get('state') == 'enabled':
204
205 prefix = self.data.get('prefix')
206 bucketname = self.data.get('bucket')
207
208 self.manager.retry(
209 client.enable_logging,
210 ClusterIdentifier=redshift_id, BucketName=bucketname, S3KeyPrefix=prefix)
211
212 elif self.data.get('state') == 'disabled':
213
214 self.manager.retry(
215 client.disable_logging,
216 ClusterIdentifier=redshift_id)
217
218
219@Redshift.filter_registry.register('security-group')
220class SecurityGroupFilter(net_filters.SecurityGroupFilter):
221
222 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId"
223
224
225@Redshift.filter_registry.register('subnet')
226class SubnetFilter(net_filters.SubnetFilter):
227
228 RelatedIdsExpression = ""
229
230 def get_permissions(self):
231 return RedshiftSubnetGroup(self.manager.ctx, {}).get_permissions()
232
233 def get_related_ids(self, resources):
234 group_ids = set()
235 for r in resources:
236 group_ids.update(
237 [s['SubnetIdentifier'] for s in
238 self.groups[r['ClusterSubnetGroupName']]['Subnets']])
239 return group_ids
240
241 def process(self, resources, event=None):
242 self.groups = {r['ClusterSubnetGroupName']: r for r in
243 RedshiftSubnetGroup(self.manager.ctx, {}).resources()}
244 return super(SubnetFilter, self).process(resources, event)
245
246
247@Redshift.filter_registry.register('param')
248class Parameter(ValueFilter):
249 """Filter redshift clusters based on parameter values
250
251 :example:
252
253 .. code-block:: yaml
254
255 policies:
256 - name: redshift-param-ssl
257 resource: redshift
258 filters:
259 - type: param
260 key: require_ssl
261 value: false
262 op: eq
263 """
264
265 schema = type_schema('param', rinherit=ValueFilter.schema)
266 schema_alias = False
267 group_params = ()
268
269 permissions = ("redshift:DescribeClusterParameters",)
270
271 def process(self, clusters, event=None):
272 groups = {}
273 for r in clusters:
274 for pg in r['ClusterParameterGroups']:
275 groups.setdefault(pg['ParameterGroupName'], []).append(
276 r['ClusterIdentifier'])
277
278 def get_params(group_name):
279 c = local_session(self.manager.session_factory).client('redshift')
280 paginator = c.get_paginator('describe_cluster_parameters')
281 param_group = list(itertools.chain(*[p['Parameters']
282 for p in paginator.paginate(ParameterGroupName=group_name)]))
283 params = {}
284 for p in param_group:
285 v = p['ParameterValue']
286 if v != 'default' and p['DataType'] in ('integer', 'boolean'):
287 # overkill..
288 v = json.loads(v)
289 params[p['ParameterName']] = v
290 return params
291
292 with self.executor_factory(max_workers=3) as w:
293 group_names = groups.keys()
294 self.group_params = dict(
295 zip(group_names, w.map(get_params, group_names)))
296 return super(Parameter, self).process(clusters, event)
297
298 def __call__(self, db):
299 params = {}
300 for pg in db['ClusterParameterGroups']:
301 params.update(self.group_params[pg['ParameterGroupName']])
302 return self.match(params)
303
304
305@Redshift.filter_registry.register('kms-key')
306class KmsFilter(KmsRelatedFilter):
307
308 RelatedIdsExpression = 'KmsKeyId'
309
310
311@Redshift.action_registry.register('delete')
312class Delete(BaseAction):
313 """Action to delete a redshift cluster
314
315 To prevent unwanted deletion of redshift clusters, it is recommended to
316 apply a filter to the rule
317
318 :example:
319
320 .. code-block:: yaml
321
322 policies:
323 - name: redshift-no-ssl
324 resource: redshift
325 filters:
326 - type: param
327 key: require_ssl
328 value: false
329 op: eq
330 actions:
331 - type: delete
332 """
333
334 schema = type_schema(
335 'delete', **{'skip-snapshot': {'type': 'boolean'}})
336
337 permissions = ('redshift:DeleteCluster',)
338
339 def process(self, clusters):
340 with self.executor_factory(max_workers=2) as w:
341 futures = []
342 for db_set in chunks(clusters, size=5):
343 futures.append(
344 w.submit(self.process_db_set, db_set))
345 for f in as_completed(futures):
346 if f.exception():
347 self.log.error(
348 "Exception deleting redshift set \n %s",
349 f.exception())
350
351 def process_db_set(self, db_set):
352 skip = self.data.get('skip-snapshot', False)
353 c = local_session(self.manager.session_factory).client('redshift')
354 for db in db_set:
355 params = {'ClusterIdentifier': db['ClusterIdentifier']}
356 if skip:
357 params['SkipFinalClusterSnapshot'] = True
358 else:
359 params['FinalClusterSnapshotIdentifier'] = snapshot_identifier(
360 'Final', db['ClusterIdentifier'])
361 try:
362 c.delete_cluster(**params)
363 except ClientError as e:
364 if e.response['Error']['Code'] == "InvalidClusterState":
365 self.log.warning(
366 "Cannot delete cluster when not 'Available' state: %s",
367 db['ClusterIdentifier'])
368 continue
369 raise
370
371
372@Redshift.action_registry.register('retention')
373class RetentionWindow(BaseAction):
374 """Action to set the snapshot retention period (in days)
375
376 :example:
377
378 .. code-block:: yaml
379
380 policies:
381 - name: redshift-snapshot-retention
382 resource: redshift
383 filters:
384 - type: value
385 key: AutomatedSnapshotRetentionPeriod
386 value: 21
387 op: ne
388 actions:
389 - type: retention
390 days: 21
391 """
392
393 date_attribute = 'AutomatedSnapshotRetentionPeriod'
394 schema = type_schema(
395 'retention',
396 **{'days': {'type': 'number'}})
397 permissions = ('redshift:ModifyCluster',)
398
399 def process(self, clusters):
400 with self.executor_factory(max_workers=2) as w:
401 futures = []
402 for cluster in clusters:
403 futures.append(w.submit(
404 self.process_snapshot_retention,
405 cluster))
406 for f in as_completed(futures):
407 if f.exception():
408 self.log.error(
409 "Exception setting Redshift retention \n %s",
410 f.exception())
411
412 def process_snapshot_retention(self, cluster):
413 current_retention = int(cluster.get(self.date_attribute, 0))
414 new_retention = self.data['days']
415
416 if current_retention < new_retention:
417 self.set_retention_window(
418 cluster,
419 max(current_retention, new_retention))
420 return cluster
421
422 def set_retention_window(self, cluster, retention):
423 c = local_session(self.manager.session_factory).client('redshift')
424 c.modify_cluster(
425 ClusterIdentifier=cluster['ClusterIdentifier'],
426 AutomatedSnapshotRetentionPeriod=retention)
427
428
429@Redshift.action_registry.register('snapshot')
430class Snapshot(BaseAction):
431 """Action to take a snapshot of a redshift cluster
432
433 :example:
434
435 .. code-block:: yaml
436
437 policies:
438 - name: redshift-snapshot
439 resource: redshift
440 filters:
441 - type: value
442 key: ClusterStatus
443 value: available
444 op: eq
445 actions:
446 - snapshot
447 """
448
449 schema = type_schema('snapshot')
450 permissions = ('redshift:CreateClusterSnapshot',)
451
452 def process(self, clusters):
453 client = local_session(self.manager.session_factory).client('redshift')
454 with self.executor_factory(max_workers=2) as w:
455 futures = []
456 for cluster in clusters:
457 futures.append(w.submit(
458 self.process_cluster_snapshot,
459 client, cluster))
460 for f in as_completed(futures):
461 if f.exception():
462 self.log.error(
463 "Exception creating Redshift snapshot \n %s",
464 f.exception())
465 return clusters
466
467 def process_cluster_snapshot(self, client, cluster):
468 cluster_tags = cluster.get('Tags')
469 client.create_cluster_snapshot(
470 SnapshotIdentifier=snapshot_identifier(
471 'Backup',
472 cluster['ClusterIdentifier']),
473 ClusterIdentifier=cluster['ClusterIdentifier'],
474 Tags=cluster_tags)
475
476
477@Redshift.action_registry.register('enable-vpc-routing')
478class EnhancedVpcRoutine(BaseAction):
479 """Action to enable enhanced vpc routing on a redshift cluster
480
481 More: https://docs.aws.amazon.com/redshift/latest/mgmt/enhanced-vpc-routing.html
482
483 :example:
484
485 .. code-block:: yaml
486
487 policies:
488 - name: redshift-enable-enhanced-routing
489 resource: redshift
490 filters:
491 - type: value
492 key: EnhancedVpcRouting
493 value: false
494 op: eq
495 actions:
496 - type: enable-vpc-routing
497 value: true
498 """
499
500 schema = type_schema(
501 'enable-vpc-routing',
502 value={'type': 'boolean'})
503 permissions = ('redshift:ModifyCluster',)
504
505 def process(self, clusters):
506 with self.executor_factory(max_workers=3) as w:
507 futures = []
508 for cluster in clusters:
509 futures.append(w.submit(
510 self.process_vpc_routing,
511 cluster))
512 for f in as_completed(futures):
513 if f.exception():
514 self.log.error(
515 "Exception changing Redshift VPC routing \n %s",
516 f.exception())
517 return clusters
518
519 def process_vpc_routing(self, cluster):
520 current_routing = bool(cluster.get('EnhancedVpcRouting', False))
521 new_routing = self.data.get('value', True)
522
523 if current_routing != new_routing:
524 c = local_session(self.manager.session_factory).client('redshift')
525 c.modify_cluster(
526 ClusterIdentifier=cluster['ClusterIdentifier'],
527 EnhancedVpcRouting=new_routing)
528
529
530@Redshift.action_registry.register('set-public-access')
531class RedshiftSetPublicAccess(BaseAction):
532 """
533 Action to set the 'PubliclyAccessible' setting on a redshift cluster
534
535 :example:
536
537 .. code-block:: yaml
538
539 policies:
540 - name: redshift-set-public-access
541 resource: redshift
542 filters:
543 - PubliclyAccessible: true
544 actions:
545 - type: set-public-access
546 state: false
547 """
548
549 schema = type_schema(
550 'set-public-access',
551 state={'type': 'boolean'})
552 permissions = ('redshift:ModifyCluster',)
553
554 def set_access(self, c):
555 client = local_session(self.manager.session_factory).client('redshift')
556 client.modify_cluster(
557 ClusterIdentifier=c['ClusterIdentifier'],
558 PubliclyAccessible=self.data.get('state', False))
559
560 def process(self, clusters):
561 with self.executor_factory(max_workers=2) as w:
562 futures = {w.submit(self.set_access, c): c for c in clusters}
563 for f in as_completed(futures):
564 if f.exception():
565 self.log.error(
566 "Exception setting Redshift public access on %s \n %s",
567 futures[f]['ClusterIdentifier'], f.exception())
568 return clusters
569
570
571@Redshift.action_registry.register('set-attributes')
572class RedshiftSetAttributes(BaseAction):
573 """
574 Action to modify Redshift clusters
575
576 :example:
577
578 .. code-block:: yaml
579
580 policies:
581 - name: redshift-modify-cluster
582 resource: redshift
583 filters:
584 - type: value
585 key: AllowVersionUpgrade
586 value: false
587 actions:
588 - type: set-attributes
589 attributes:
590 AllowVersionUpgrade: true
591 """
592
593 schema = type_schema('set-attributes',
594 attributes={"type": "object"},
595 required=('attributes',))
596
597 permissions = ('redshift:ModifyCluster',)
598 cluster_mapping = {
599 'ElasticIp': 'ElasticIpStatus.ElasticIp',
600 'ClusterSecurityGroups': 'ClusterSecurityGroups[].ClusterSecurityGroupName',
601 'VpcSecurityGroupIds': 'VpcSecurityGroups[].ClusterSecurityGroupName',
602 'HsmClientCertificateIdentifier': 'HsmStatus.HsmClientCertificateIdentifier',
603 'HsmConfigurationIdentifier': 'HsmStatus.HsmConfigurationIdentifier'
604 }
605
606 shape = 'ModifyClusterMessage'
607
608 def validate(self):
609 attrs = dict(self.data.get('attributes'))
610 if attrs.get('ClusterIdentifier'):
611 raise PolicyValidationError('ClusterIdentifier field cannot be updated')
612 attrs["ClusterIdentifier"] = ""
613 return shape_validate(attrs, self.shape, 'redshift')
614
615 def process(self, clusters):
616 client = local_session(self.manager.session_factory).client(
617 self.manager.get_model().service)
618 for cluster in clusters:
619 self.process_cluster(client, cluster)
620
621 def process_cluster(self, client, cluster):
622 try:
623 config = dict(self.data.get('attributes'))
624 modify = {}
625 for k, v in config.items():
626 if ((k in self.cluster_mapping and
627 v != jmespath_search(self.cluster_mapping[k], cluster)) or
628 v != cluster.get('PendingModifiedValues', {}).get(k, cluster.get(k))):
629 modify[k] = v
630 if not modify:
631 return
632
633 modify['ClusterIdentifier'] = (cluster.get('PendingModifiedValues', {})
634 .get('ClusterIdentifier')
635 or cluster.get('ClusterIdentifier'))
636 client.modify_cluster(**modify)
637 except (client.exceptions.ClusterNotFoundFault):
638 return
639 except ClientError as e:
640 self.log.warning(
641 "Exception trying to modify cluster: %s error: %s",
642 cluster['ClusterIdentifier'], e)
643 raise
644
645
646@Redshift.action_registry.register('mark-for-op')
647class TagDelayedAction(tags.TagDelayedAction):
648 """Action to create an action to be performed at a later time
649
650 :example:
651
652 .. code-block:: yaml
653
654 policies:
655 - name: redshift-terminate-unencrypted
656 resource: redshift
657 filters:
658 - "tag:custodian_cleanup": absent
659 - type: value
660 key: Encrypted
661 value: false
662 op: eq
663 actions:
664 - type: mark-for-op
665 tag: custodian_cleanup
666 op: delete
667 days: 5
668 msg: "Unencrypted Redshift cluster: {op}@{action_date}"
669 """
670
671
672@Redshift.action_registry.register('tag')
673class Tag(tags.Tag):
674 """Action to add tag/tags to a redshift cluster
675
676 :example:
677
678 .. code-block:: yaml
679
680 policies:
681 - name: redshift-tag
682 resource: redshift
683 filters:
684 - "tag:RedshiftTag": absent
685 actions:
686 - type: tag
687 key: RedshiftTag
688 value: "Redshift Tag Value"
689 """
690
691 concurrency = 2
692 batch_size = 5
693 permissions = ('redshift:CreateTags',)
694
695 def process_resource_set(self, client, resources, tags):
696 for rarn, r in zip(self.manager.get_arns(resources), resources):
697 client.create_tags(ResourceName=rarn, Tags=tags)
698
699
700@Redshift.action_registry.register('unmark')
701@Redshift.action_registry.register('remove-tag')
702class RemoveTag(tags.RemoveTag):
703 """Action to remove tag/tags from a redshift cluster
704
705 :example:
706
707 .. code-block:: yaml
708
709 policies:
710 - name: redshift-remove-tag
711 resource: redshift
712 filters:
713 - "tag:RedshiftTag": present
714 actions:
715 - type: remove-tag
716 tags: ["RedshiftTags"]
717 """
718
719 concurrency = 2
720 batch_size = 5
721 permissions = ('redshift:DeleteTags',)
722
723 def process_resource_set(self, client, resources, tag_keys):
724 for rarn, r in zip(self.manager.get_arns(resources), resources):
725 client.delete_tags(ResourceName=rarn, TagKeys=tag_keys)
726
727
728@Redshift.action_registry.register('tag-trim')
729class TagTrim(tags.TagTrim):
730 """Action to remove tags from a redshift cluster
731
732 This can be used to prevent reaching the ceiling limit of tags on a
733 resource
734
735 :example:
736
737 .. code-block:: yaml
738
739 policies:
740 - name: redshift-tag-trim
741 resource: redshift
742 filters:
743 - type: value
744 key: "length(Tags)"
745 op: ge
746 value: 10
747 actions:
748 - type: tag-trim
749 space: 1
750 preserve:
751 - RequiredTag1
752 - RequiredTag2
753 """
754
755 max_tag_count = 10
756 permissions = ('redshift:DeleteTags',)
757
758 def process_tag_removal(self, client, resource, candidates):
759 arn = self.manager.generate_arn(resource['DBInstanceIdentifier'])
760 client.delete_tags(ResourceName=arn, TagKeys=candidates)
761
762
763@Redshift.action_registry.register('modify-security-groups')
764class RedshiftModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
765 """Modify security groups on a Redshift cluster"""
766
767 permissions = ('redshift:ModifyCluster',)
768
769 def process(self, clusters):
770 client = local_session(self.manager.session_factory).client('redshift')
771 groups = super(
772 RedshiftModifyVpcSecurityGroups, self).get_groups(clusters)
773
774 for idx, c in enumerate(clusters):
775 client.modify_cluster(
776 ClusterIdentifier=c['ClusterIdentifier'],
777 VpcSecurityGroupIds=groups[idx])
778
779
780@resources.register('redshift-subnet-group')
781class RedshiftSubnetGroup(QueryResourceManager):
782 """Redshift subnet group."""
783
784 class resource_type(TypeInfo):
785 service = 'redshift'
786 arn_type = 'subnetgroup'
787 arn_separator = ':'
788 id = name = 'ClusterSubnetGroupName'
789 enum_spec = (
790 'describe_cluster_subnet_groups', 'ClusterSubnetGroups', None)
791 filter_name = 'ClusterSubnetGroupName'
792 filter_type = 'scalar'
793 cfn_type = config_type = "AWS::Redshift::ClusterSubnetGroup"
794 universal_taggable = object()
795
796
797@resources.register('redshift-snapshot')
798class RedshiftSnapshot(QueryResourceManager):
799 """Resource manager for Redshift snapshots.
800 """
801
802 class resource_type(TypeInfo):
803 service = 'redshift'
804 arn_type = 'snapshot'
805 arn_separator = ':'
806 enum_spec = ('describe_cluster_snapshots', 'Snapshots', None)
807 name = id = 'SnapshotIdentifier'
808 date = 'SnapshotCreateTime'
809 config_type = "AWS::Redshift::ClusterSnapshot"
810 universal_taggable = True
811
812 def get_arns(self, resources):
813 arns = []
814 for r in resources:
815 arns.append(self.generate_arn(r['ClusterIdentifier'] + '/' + r[self.get_model().id]))
816 return arns
817
818
819@RedshiftSnapshot.filter_registry.register('age')
820class RedshiftSnapshotAge(AgeFilter):
821 """Filters redshift snapshots based on age (in days)
822
823 :example:
824
825 .. code-block:: yaml
826
827 policies:
828 - name: redshift-old-snapshots
829 resource: redshift-snapshot
830 filters:
831 - type: age
832 days: 21
833 op: gt
834 """
835
836 schema = type_schema(
837 'age', days={'type': 'number'},
838 op={'$ref': '#/definitions/filters_common/comparison_operators'})
839
840 date_attribute = 'SnapshotCreateTime'
841
842
843@RedshiftSnapshot.filter_registry.register('cross-account')
844class RedshiftSnapshotCrossAccount(CrossAccountAccessFilter):
845 """Filter all accounts that allow access to non-whitelisted accounts
846 """
847 permissions = ('redshift:DescribeClusterSnapshots',)
848 schema = type_schema(
849 'cross-account',
850 whitelist={'type': 'array', 'items': {'type': 'string'}},
851 whitelist_from=ValuesFrom.schema)
852
853 def process(self, snapshots, event=None):
854 accounts = self.get_accounts()
855 snapshots = [s for s in snapshots if s.get('AccountsWithRestoreAccess')]
856 results = []
857 for s in snapshots:
858 s_accounts = {a.get('AccountId') for a in s[
859 'AccountsWithRestoreAccess']}
860 delta_accounts = s_accounts.difference(accounts)
861 if delta_accounts:
862 s['c7n:CrossAccountViolations'] = list(delta_accounts)
863 results.append(s)
864 return results
865
866
867@RedshiftSnapshot.action_registry.register('delete')
868class RedshiftSnapshotDelete(BaseAction):
869 """Filters redshift snapshots based on age (in days)
870
871 :example:
872
873 .. code-block:: yaml
874
875 policies:
876 - name: redshift-delete-old-snapshots
877 resource: redshift-snapshot
878 filters:
879 - type: age
880 days: 21
881 op: gt
882 actions:
883 - delete
884 """
885
886 schema = type_schema('delete')
887 permissions = ('redshift:DeleteClusterSnapshot',)
888
889 def process(self, snapshots):
890 self.log.info("Deleting %d Redshift snapshots", len(snapshots))
891 with self.executor_factory(max_workers=3) as w:
892 futures = []
893 for snapshot_set in chunks(reversed(snapshots), size=50):
894 futures.append(
895 w.submit(self.process_snapshot_set, snapshot_set))
896 for f in as_completed(futures):
897 if f.exception():
898 self.log.error(
899 "Exception deleting snapshot set \n %s",
900 f.exception())
901 return snapshots
902
903 def process_snapshot_set(self, snapshots_set):
904 c = local_session(self.manager.session_factory).client('redshift')
905 for s in snapshots_set:
906 c.delete_cluster_snapshot(
907 SnapshotIdentifier=s['SnapshotIdentifier'],
908 SnapshotClusterIdentifier=s['ClusterIdentifier'])
909
910
911@RedshiftSnapshot.action_registry.register('revoke-access')
912class RedshiftSnapshotRevokeAccess(BaseAction):
913 """Revokes ability of accounts to restore a snapshot
914
915 :example:
916
917 .. code-block:: yaml
918
919 policies:
920 - name: redshift-snapshot-revoke-access
921 resource: redshift-snapshot
922 filters:
923 - type: cross-account
924 whitelist:
925 - 012345678910
926 actions:
927 - type: revoke-access
928 """
929 permissions = ('redshift:RevokeSnapshotAccess',)
930 schema = type_schema('revoke-access')
931
932 def validate(self):
933 for f in self.manager.iter_filters():
934 if isinstance(f, RedshiftSnapshotCrossAccount):
935 return self
936 raise PolicyValidationError(
937 '`revoke-access` may only be used in '
938 'conjunction with `cross-account` filter on %s' % (self.manager.data,))
939
940 def process_snapshot_set(self, client, snapshot_set):
941 for s in snapshot_set:
942 for a in s.get('c7n:CrossAccountViolations', []):
943 try:
944 self.manager.retry(
945 client.revoke_snapshot_access,
946 SnapshotIdentifier=s['SnapshotIdentifier'],
947 AccountWithRestoreAccess=a)
948 except ClientError as e:
949 if e.response['Error']['Code'] == 'ClusterSnapshotNotFound':
950 continue
951 raise
952
953 def process(self, snapshots):
954 client = local_session(self.manager.session_factory).client('redshift')
955 with self.executor_factory(max_workers=2) as w:
956 futures = {}
957 for snapshot_set in chunks(snapshots, 25):
958 futures[w.submit(
959 self.process_snapshot_set, client, snapshot_set)
960 ] = snapshot_set
961 for f in as_completed(futures):
962 if f.exception():
963 self.log.exception(
964 'Exception while revoking access on %s: %s' % (
965 ', '.join(
966 [s['SnapshotIdentifier'] for s in futures[f]]),
967 f.exception()))
968
969
970@resources.register('redshift-reserved')
971class ReservedNode(QueryResourceManager):
972
973 class resource_type(TypeInfo):
974 service = 'redshift'
975 name = id = 'ReservedNodeId'
976 date = 'StartTime'
977 enum_spec = (
978 'describe_reserved_nodes', 'ReservedNodes', None)
979 filter_name = 'ReservedNodes'
980 filter_type = 'list'
981 arn_type = "reserved-nodes"
982 permissions_enum = ('redshift:DescribeReservedNodes',)
983
984
985@Redshift.filter_registry.register('consecutive-snapshots')
986class ClusterConsecutiveSnapshots(Filter):
987 """Returns Clusters where number of consective daily backups is
988 equal to/or greater than n days.
989
990 :example:
991
992 .. code-block:: yaml
993
994 policies:
995 - name: redshift-daily-snapshot-count
996 resource: redshift
997 filters:
998 - type: consecutive-snapshots
999 count: 7
1000 period: days
1001 status: available
1002 """
1003 schema = type_schema('consecutive-snapshots', count={'type': 'number', 'minimum': 1},
1004 period={'enum': ['hours', 'days', 'weeks']},
1005 status={'enum': ['available', 'creating', 'final snapshot', 'failed']},
1006 required=['count', 'period', 'status'])
1007 permissions = ('redshift:DescribeClusterSnapshots', 'redshift:DescribeClusters', )
1008 annotation = 'c7n:RedshiftSnapshots'
1009
1010 def process_resource_set(self, client, resources, lbdate):
1011 paginator = client.get_paginator('describe_cluster_snapshots')
1012 paginator.PAGE_ITERATOR_CLS = RetryPageIterator
1013 rs_snapshots = paginator.paginate(StartTime=lbdate).build_full_result().get(
1014 'Snapshots', [])
1015
1016 cluster_map = {}
1017 for snap in rs_snapshots:
1018 cluster_map.setdefault(snap['ClusterIdentifier'], []).append(snap)
1019 for r in resources:
1020 r[self.annotation] = cluster_map.get(r['ClusterIdentifier'], [])
1021
1022 def get_date(self, time):
1023 period = self.data.get('period')
1024 if period == 'weeks':
1025 date = (datetime.utcnow() - timedelta(weeks=time)).strftime('%Y-%m-%d')
1026 elif period == 'hours':
1027 date = (datetime.utcnow() - timedelta(hours=time)).strftime('%Y-%m-%d-%H')
1028 else:
1029 date = (datetime.utcnow() - timedelta(days=time)).strftime('%Y-%m-%d')
1030 return date
1031
1032 def process(self, resources, event=None):
1033 client = local_session(self.manager.session_factory).client('redshift')
1034 results = []
1035 retention = self.data.get('count')
1036 lbdate = self.get_date(retention)
1037 expected_dates = set()
1038 for time in range(1, retention + 1):
1039 expected_dates.add(self.get_date(time))
1040
1041 for resource_set in chunks(
1042 [r for r in resources if self.annotation not in r], 50):
1043 self.process_resource_set(client, resource_set, lbdate)
1044
1045 for r in resources:
1046 snapshot_dates = set()
1047 for snapshot in r[self.annotation]:
1048 if snapshot['Status'] == self.data.get('status'):
1049 if self.data.get('period') == 'hours':
1050 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d-%H'))
1051 else:
1052 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d'))
1053 if expected_dates.issubset(snapshot_dates):
1054 results.append(r)
1055 return results