Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/redshift.py: 46%

465 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import json 

4import itertools 

5 

6from botocore.exceptions import ClientError 

7from concurrent.futures import as_completed 

8 

9from c7n.actions import BaseAction, ModifyVpcSecurityGroupsAction 

10from c7n.exceptions import PolicyValidationError 

11from c7n.filters import ( 

12 ValueFilter, AgeFilter, CrossAccountAccessFilter, Filter) 

13import c7n.filters.vpc as net_filters 

14from c7n.filters.kms import KmsRelatedFilter 

15from c7n.filters.offhours import OffHour, OnHour 

16from c7n.manager import resources 

17from c7n.resolver import ValuesFrom 

18from c7n.query import QueryResourceManager, TypeInfo, RetryPageIterator 

19from c7n import tags 

20from c7n.utils import ( 

21 type_schema, local_session, chunks, snapshot_identifier, jmespath_search) 

22from .aws import shape_validate 

23from datetime import datetime, timedelta 

24from c7n.filters.backup import ConsecutiveAwsBackupsFilter 

25 

26 

27@resources.register('redshift') 

28class Redshift(QueryResourceManager): 

29 

30 class resource_type(TypeInfo): 

31 service = 'redshift' 

32 arn_type = 'cluster' 

33 arn_separator = ":" 

34 enum_spec = ('describe_clusters', 'Clusters', None) 

35 name = id = 'ClusterIdentifier' 

36 filter_name = 'ClusterIdentifier' 

37 filter_type = 'scalar' 

38 date = 'ClusterCreateTime' 

39 dimension = 'ClusterIdentifier' 

40 cfn_type = config_type = "AWS::Redshift::Cluster" 

41 

42 

43Redshift.filter_registry.register('marked-for-op', tags.TagActionFilter) 

44Redshift.filter_registry.register('network-location', net_filters.NetworkLocation) 

45Redshift.filter_registry.register('offhour', OffHour) 

46Redshift.filter_registry.register('onhour', OnHour) 

47Redshift.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter) 

48 

49 

50@Redshift.filter_registry.register('default-vpc') 

51class DefaultVpc(net_filters.DefaultVpcBase): 

52 """ Matches if an redshift database is in the default vpc 

53 

54 :example: 

55 

56 .. code-block:: yaml 

57 

58 policies: 

59 - name: redshift-default-vpc 

60 resource: redshift 

61 filters: 

62 - default-vpc 

63 """ 

64 

65 schema = type_schema('default-vpc') 

66 

67 def __call__(self, redshift): 

68 return (redshift.get('VpcId') and 

69 self.match(redshift.get('VpcId')) or False) 

70 

71 

72@Redshift.filter_registry.register('logging') 

73class LoggingFilter(ValueFilter): 

74 """ Checks Redshift logging status and attributes. 

75 

76 :example: 

77 

78 .. code-block:: yaml 

79 

80 

81 policies: 

82 

83 - name: redshift-logging-bucket-and-prefix-test 

84 resource: redshift 

85 filters: 

86 - type: logging 

87 key: LoggingEnabled 

88 value: true 

89 - type: logging 

90 key: S3KeyPrefix 

91 value: "accounts/{account_id}" 

92 - type: logging 

93 key: BucketName 

94 value: "redshiftlogs" 

95 

96 

97 """ 

98 permissions = ("redshift:DescribeLoggingStatus",) 

99 schema = type_schema('logging', rinherit=ValueFilter.schema) 

100 annotation_key = 'c7n:logging' 

101 

102 def process(self, clusters, event=None): 

103 client = local_session(self.manager.session_factory).client('redshift') 

104 results = [] 

105 for cluster in clusters: 

106 if self.annotation_key not in cluster: 

107 try: 

108 result = client.describe_logging_status( 

109 ClusterIdentifier=cluster['ClusterIdentifier']) 

110 result.pop('ResponseMetadata') 

111 except client.exceptions.ClusterNotFound: 

112 continue 

113 cluster[self.annotation_key] = result 

114 

115 if self.match(cluster[self.annotation_key]): 

116 results.append(cluster) 

117 return results 

118 

119 

120@Redshift.action_registry.register('pause') 

121class Pause(BaseAction): 

122 

123 schema = type_schema('pause') 

124 permissions = ('redshift:PauseCluster',) 

125 

126 def process(self, resources): 

127 client = local_session( 

128 self.manager.session_factory).client('redshift') 

129 for r in self.filter_resources(resources, 'ClusterStatus', ('available',)): 

130 try: 

131 client.pause_cluster( 

132 ClusterIdentifier=r['ClusterIdentifier']) 

133 except (client.exceptions.ClusterNotFoundFault, 

134 client.exceptions.InvalidClusterStateFault): 

135 raise 

136 

137 

138@Redshift.action_registry.register('resume') 

139class Resume(BaseAction): 

140 

141 schema = type_schema('resume') 

142 permissions = ('redshift:ResumeCluster',) 

143 

144 def process(self, resources): 

145 client = local_session( 

146 self.manager.session_factory).client('redshift') 

147 for r in self.filter_resources(resources, 'ClusterStatus', ('paused',)): 

148 try: 

149 client.resume_cluster( 

150 ClusterIdentifier=r['ClusterIdentifier']) 

151 except (client.exceptions.ClusterNotFoundFault, 

152 client.exceptions.InvalidClusterStateFault): 

153 raise 

154 

155 

156@Redshift.action_registry.register('set-logging') 

157class SetRedshiftLogging(BaseAction): 

158 """Action to enable/disable Redshift logging for a Redshift Cluster. 

159 

160 :example: 

161 

162 .. code-block:: yaml 

163 

164 policies: 

165 - name: redshift-test 

166 resource: redshift 

167 filters: 

168 - type: logging 

169 key: LoggingEnabled 

170 value: false 

171 actions: 

172 - type: set-logging 

173 bucket: redshiftlogtest 

174 prefix: redshiftlogs 

175 state: enabled 

176 """ 

177 schema = type_schema( 

178 'set-logging', 

179 state={'enum': ['enabled', 'disabled']}, 

180 bucket={'type': 'string'}, 

181 prefix={'type': 'string'}, 

182 required=('state',)) 

183 

184 def get_permissions(self): 

185 perms = ('redshift:EnableLogging',) 

186 if self.data.get('state') == 'disabled': 

187 return ('redshift:DisableLogging',) 

188 return perms 

189 

190 def validate(self): 

191 if self.data.get('state') == 'enabled': 

192 if 'bucket' not in self.data: 

193 raise PolicyValidationError(( 

194 "redshift logging enablement requires `bucket` " 

195 "and `prefix` specification on %s" % (self.manager.data,))) 

196 return self 

197 

198 def process(self, resources): 

199 client = local_session(self.manager.session_factory).client('redshift') 

200 for redshift in resources: 

201 redshift_id = redshift['ClusterIdentifier'] 

202 

203 if self.data.get('state') == 'enabled': 

204 

205 prefix = self.data.get('prefix') 

206 bucketname = self.data.get('bucket') 

207 

208 self.manager.retry( 

209 client.enable_logging, 

210 ClusterIdentifier=redshift_id, BucketName=bucketname, S3KeyPrefix=prefix) 

211 

212 elif self.data.get('state') == 'disabled': 

213 

214 self.manager.retry( 

215 client.disable_logging, 

216 ClusterIdentifier=redshift_id) 

217 

218 

219@Redshift.filter_registry.register('security-group') 

220class SecurityGroupFilter(net_filters.SecurityGroupFilter): 

221 

222 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId" 

223 

224 

225@Redshift.filter_registry.register('subnet') 

226class SubnetFilter(net_filters.SubnetFilter): 

227 

228 RelatedIdsExpression = "" 

229 

230 def get_permissions(self): 

231 return RedshiftSubnetGroup(self.manager.ctx, {}).get_permissions() 

232 

233 def get_related_ids(self, resources): 

234 group_ids = set() 

235 for r in resources: 

236 group_ids.update( 

237 [s['SubnetIdentifier'] for s in 

238 self.groups[r['ClusterSubnetGroupName']]['Subnets']]) 

239 return group_ids 

240 

241 def process(self, resources, event=None): 

242 self.groups = {r['ClusterSubnetGroupName']: r for r in 

243 RedshiftSubnetGroup(self.manager.ctx, {}).resources()} 

244 return super(SubnetFilter, self).process(resources, event) 

245 

246 

247@Redshift.filter_registry.register('param') 

248class Parameter(ValueFilter): 

249 """Filter redshift clusters based on parameter values 

250 

251 :example: 

252 

253 .. code-block:: yaml 

254 

255 policies: 

256 - name: redshift-param-ssl 

257 resource: redshift 

258 filters: 

259 - type: param 

260 key: require_ssl 

261 value: false 

262 op: eq 

263 """ 

264 

265 schema = type_schema('param', rinherit=ValueFilter.schema) 

266 schema_alias = False 

267 group_params = () 

268 

269 permissions = ("redshift:DescribeClusterParameters",) 

270 

271 def process(self, clusters, event=None): 

272 groups = {} 

273 for r in clusters: 

274 for pg in r['ClusterParameterGroups']: 

275 groups.setdefault(pg['ParameterGroupName'], []).append( 

276 r['ClusterIdentifier']) 

277 

278 def get_params(group_name): 

279 c = local_session(self.manager.session_factory).client('redshift') 

280 paginator = c.get_paginator('describe_cluster_parameters') 

281 param_group = list(itertools.chain(*[p['Parameters'] 

282 for p in paginator.paginate(ParameterGroupName=group_name)])) 

283 params = {} 

284 for p in param_group: 

285 v = p['ParameterValue'] 

286 if v != 'default' and p['DataType'] in ('integer', 'boolean'): 

287 # overkill.. 

288 v = json.loads(v) 

289 params[p['ParameterName']] = v 

290 return params 

291 

292 with self.executor_factory(max_workers=3) as w: 

293 group_names = groups.keys() 

294 self.group_params = dict( 

295 zip(group_names, w.map(get_params, group_names))) 

296 return super(Parameter, self).process(clusters, event) 

297 

298 def __call__(self, db): 

299 params = {} 

300 for pg in db['ClusterParameterGroups']: 

301 params.update(self.group_params[pg['ParameterGroupName']]) 

302 return self.match(params) 

303 

304 

305@Redshift.filter_registry.register('kms-key') 

306class KmsFilter(KmsRelatedFilter): 

307 

308 RelatedIdsExpression = 'KmsKeyId' 

309 

310 

311@Redshift.action_registry.register('delete') 

312class Delete(BaseAction): 

313 """Action to delete a redshift cluster 

314 

315 To prevent unwanted deletion of redshift clusters, it is recommended to 

316 apply a filter to the rule 

317 

318 :example: 

319 

320 .. code-block:: yaml 

321 

322 policies: 

323 - name: redshift-no-ssl 

324 resource: redshift 

325 filters: 

326 - type: param 

327 key: require_ssl 

328 value: false 

329 op: eq 

330 actions: 

331 - type: delete 

332 """ 

333 

334 schema = type_schema( 

335 'delete', **{'skip-snapshot': {'type': 'boolean'}}) 

336 

337 permissions = ('redshift:DeleteCluster',) 

338 

339 def process(self, clusters): 

340 with self.executor_factory(max_workers=2) as w: 

341 futures = [] 

342 for db_set in chunks(clusters, size=5): 

343 futures.append( 

344 w.submit(self.process_db_set, db_set)) 

345 for f in as_completed(futures): 

346 if f.exception(): 

347 self.log.error( 

348 "Exception deleting redshift set \n %s", 

349 f.exception()) 

350 

351 def process_db_set(self, db_set): 

352 skip = self.data.get('skip-snapshot', False) 

353 c = local_session(self.manager.session_factory).client('redshift') 

354 for db in db_set: 

355 params = {'ClusterIdentifier': db['ClusterIdentifier']} 

356 if skip: 

357 params['SkipFinalClusterSnapshot'] = True 

358 else: 

359 params['FinalClusterSnapshotIdentifier'] = snapshot_identifier( 

360 'Final', db['ClusterIdentifier']) 

361 try: 

362 c.delete_cluster(**params) 

363 except ClientError as e: 

364 if e.response['Error']['Code'] == "InvalidClusterState": 

365 self.log.warning( 

366 "Cannot delete cluster when not 'Available' state: %s", 

367 db['ClusterIdentifier']) 

368 continue 

369 raise 

370 

371 

372@Redshift.action_registry.register('retention') 

373class RetentionWindow(BaseAction): 

374 """Action to set the snapshot retention period (in days) 

375 

376 :example: 

377 

378 .. code-block:: yaml 

379 

380 policies: 

381 - name: redshift-snapshot-retention 

382 resource: redshift 

383 filters: 

384 - type: value 

385 key: AutomatedSnapshotRetentionPeriod 

386 value: 21 

387 op: ne 

388 actions: 

389 - type: retention 

390 days: 21 

391 """ 

392 

393 date_attribute = 'AutomatedSnapshotRetentionPeriod' 

394 schema = type_schema( 

395 'retention', 

396 **{'days': {'type': 'number'}}) 

397 permissions = ('redshift:ModifyCluster',) 

398 

399 def process(self, clusters): 

400 with self.executor_factory(max_workers=2) as w: 

401 futures = [] 

402 for cluster in clusters: 

403 futures.append(w.submit( 

404 self.process_snapshot_retention, 

405 cluster)) 

406 for f in as_completed(futures): 

407 if f.exception(): 

408 self.log.error( 

409 "Exception setting Redshift retention \n %s", 

410 f.exception()) 

411 

412 def process_snapshot_retention(self, cluster): 

413 current_retention = int(cluster.get(self.date_attribute, 0)) 

414 new_retention = self.data['days'] 

415 

416 if current_retention < new_retention: 

417 self.set_retention_window( 

418 cluster, 

419 max(current_retention, new_retention)) 

420 return cluster 

421 

422 def set_retention_window(self, cluster, retention): 

423 c = local_session(self.manager.session_factory).client('redshift') 

424 c.modify_cluster( 

425 ClusterIdentifier=cluster['ClusterIdentifier'], 

426 AutomatedSnapshotRetentionPeriod=retention) 

427 

428 

429@Redshift.action_registry.register('snapshot') 

430class Snapshot(BaseAction): 

431 """Action to take a snapshot of a redshift cluster 

432 

433 :example: 

434 

435 .. code-block:: yaml 

436 

437 policies: 

438 - name: redshift-snapshot 

439 resource: redshift 

440 filters: 

441 - type: value 

442 key: ClusterStatus 

443 value: available 

444 op: eq 

445 actions: 

446 - snapshot 

447 """ 

448 

449 schema = type_schema('snapshot') 

450 permissions = ('redshift:CreateClusterSnapshot',) 

451 

452 def process(self, clusters): 

453 client = local_session(self.manager.session_factory).client('redshift') 

454 with self.executor_factory(max_workers=2) as w: 

455 futures = [] 

456 for cluster in clusters: 

457 futures.append(w.submit( 

458 self.process_cluster_snapshot, 

459 client, cluster)) 

460 for f in as_completed(futures): 

461 if f.exception(): 

462 self.log.error( 

463 "Exception creating Redshift snapshot \n %s", 

464 f.exception()) 

465 return clusters 

466 

467 def process_cluster_snapshot(self, client, cluster): 

468 cluster_tags = cluster.get('Tags') 

469 client.create_cluster_snapshot( 

470 SnapshotIdentifier=snapshot_identifier( 

471 'Backup', 

472 cluster['ClusterIdentifier']), 

473 ClusterIdentifier=cluster['ClusterIdentifier'], 

474 Tags=cluster_tags) 

475 

476 

477@Redshift.action_registry.register('enable-vpc-routing') 

478class EnhancedVpcRoutine(BaseAction): 

479 """Action to enable enhanced vpc routing on a redshift cluster 

480 

481 More: https://docs.aws.amazon.com/redshift/latest/mgmt/enhanced-vpc-routing.html 

482 

483 :example: 

484 

485 .. code-block:: yaml 

486 

487 policies: 

488 - name: redshift-enable-enhanced-routing 

489 resource: redshift 

490 filters: 

491 - type: value 

492 key: EnhancedVpcRouting 

493 value: false 

494 op: eq 

495 actions: 

496 - type: enable-vpc-routing 

497 value: true 

498 """ 

499 

500 schema = type_schema( 

501 'enable-vpc-routing', 

502 value={'type': 'boolean'}) 

503 permissions = ('redshift:ModifyCluster',) 

504 

505 def process(self, clusters): 

506 with self.executor_factory(max_workers=3) as w: 

507 futures = [] 

508 for cluster in clusters: 

509 futures.append(w.submit( 

510 self.process_vpc_routing, 

511 cluster)) 

512 for f in as_completed(futures): 

513 if f.exception(): 

514 self.log.error( 

515 "Exception changing Redshift VPC routing \n %s", 

516 f.exception()) 

517 return clusters 

518 

519 def process_vpc_routing(self, cluster): 

520 current_routing = bool(cluster.get('EnhancedVpcRouting', False)) 

521 new_routing = self.data.get('value', True) 

522 

523 if current_routing != new_routing: 

524 c = local_session(self.manager.session_factory).client('redshift') 

525 c.modify_cluster( 

526 ClusterIdentifier=cluster['ClusterIdentifier'], 

527 EnhancedVpcRouting=new_routing) 

528 

529 

530@Redshift.action_registry.register('set-public-access') 

531class RedshiftSetPublicAccess(BaseAction): 

532 """ 

533 Action to set the 'PubliclyAccessible' setting on a redshift cluster 

534 

535 :example: 

536 

537 .. code-block:: yaml 

538 

539 policies: 

540 - name: redshift-set-public-access 

541 resource: redshift 

542 filters: 

543 - PubliclyAccessible: true 

544 actions: 

545 - type: set-public-access 

546 state: false 

547 """ 

548 

549 schema = type_schema( 

550 'set-public-access', 

551 state={'type': 'boolean'}) 

552 permissions = ('redshift:ModifyCluster',) 

553 

554 def set_access(self, c): 

555 client = local_session(self.manager.session_factory).client('redshift') 

556 client.modify_cluster( 

557 ClusterIdentifier=c['ClusterIdentifier'], 

558 PubliclyAccessible=self.data.get('state', False)) 

559 

560 def process(self, clusters): 

561 with self.executor_factory(max_workers=2) as w: 

562 futures = {w.submit(self.set_access, c): c for c in clusters} 

563 for f in as_completed(futures): 

564 if f.exception(): 

565 self.log.error( 

566 "Exception setting Redshift public access on %s \n %s", 

567 futures[f]['ClusterIdentifier'], f.exception()) 

568 return clusters 

569 

570 

571@Redshift.action_registry.register('set-attributes') 

572class RedshiftSetAttributes(BaseAction): 

573 """ 

574 Action to modify Redshift clusters 

575 

576 :example: 

577 

578 .. code-block:: yaml 

579 

580 policies: 

581 - name: redshift-modify-cluster 

582 resource: redshift 

583 filters: 

584 - type: value 

585 key: AllowVersionUpgrade 

586 value: false 

587 actions: 

588 - type: set-attributes 

589 attributes: 

590 AllowVersionUpgrade: true 

591 """ 

592 

593 schema = type_schema('set-attributes', 

594 attributes={"type": "object"}, 

595 required=('attributes',)) 

596 

597 permissions = ('redshift:ModifyCluster',) 

598 cluster_mapping = { 

599 'ElasticIp': 'ElasticIpStatus.ElasticIp', 

600 'ClusterSecurityGroups': 'ClusterSecurityGroups[].ClusterSecurityGroupName', 

601 'VpcSecurityGroupIds': 'VpcSecurityGroups[].ClusterSecurityGroupName', 

602 'HsmClientCertificateIdentifier': 'HsmStatus.HsmClientCertificateIdentifier', 

603 'HsmConfigurationIdentifier': 'HsmStatus.HsmConfigurationIdentifier' 

604 } 

605 

606 shape = 'ModifyClusterMessage' 

607 

608 def validate(self): 

609 attrs = dict(self.data.get('attributes')) 

610 if attrs.get('ClusterIdentifier'): 

611 raise PolicyValidationError('ClusterIdentifier field cannot be updated') 

612 attrs["ClusterIdentifier"] = "" 

613 return shape_validate(attrs, self.shape, 'redshift') 

614 

615 def process(self, clusters): 

616 client = local_session(self.manager.session_factory).client( 

617 self.manager.get_model().service) 

618 for cluster in clusters: 

619 self.process_cluster(client, cluster) 

620 

621 def process_cluster(self, client, cluster): 

622 try: 

623 config = dict(self.data.get('attributes')) 

624 modify = {} 

625 for k, v in config.items(): 

626 if ((k in self.cluster_mapping and 

627 v != jmespath_search(self.cluster_mapping[k], cluster)) or 

628 v != cluster.get('PendingModifiedValues', {}).get(k, cluster.get(k))): 

629 modify[k] = v 

630 if not modify: 

631 return 

632 

633 modify['ClusterIdentifier'] = (cluster.get('PendingModifiedValues', {}) 

634 .get('ClusterIdentifier') 

635 or cluster.get('ClusterIdentifier')) 

636 client.modify_cluster(**modify) 

637 except (client.exceptions.ClusterNotFoundFault): 

638 return 

639 except ClientError as e: 

640 self.log.warning( 

641 "Exception trying to modify cluster: %s error: %s", 

642 cluster['ClusterIdentifier'], e) 

643 raise 

644 

645 

646@Redshift.action_registry.register('mark-for-op') 

647class TagDelayedAction(tags.TagDelayedAction): 

648 """Action to create an action to be performed at a later time 

649 

650 :example: 

651 

652 .. code-block:: yaml 

653 

654 policies: 

655 - name: redshift-terminate-unencrypted 

656 resource: redshift 

657 filters: 

658 - "tag:custodian_cleanup": absent 

659 - type: value 

660 key: Encrypted 

661 value: false 

662 op: eq 

663 actions: 

664 - type: mark-for-op 

665 tag: custodian_cleanup 

666 op: delete 

667 days: 5 

668 msg: "Unencrypted Redshift cluster: {op}@{action_date}" 

669 """ 

670 

671 

672@Redshift.action_registry.register('tag') 

673class Tag(tags.Tag): 

674 """Action to add tag/tags to a redshift cluster 

675 

676 :example: 

677 

678 .. code-block:: yaml 

679 

680 policies: 

681 - name: redshift-tag 

682 resource: redshift 

683 filters: 

684 - "tag:RedshiftTag": absent 

685 actions: 

686 - type: tag 

687 key: RedshiftTag 

688 value: "Redshift Tag Value" 

689 """ 

690 

691 concurrency = 2 

692 batch_size = 5 

693 permissions = ('redshift:CreateTags',) 

694 

695 def process_resource_set(self, client, resources, tags): 

696 for rarn, r in zip(self.manager.get_arns(resources), resources): 

697 client.create_tags(ResourceName=rarn, Tags=tags) 

698 

699 

700@Redshift.action_registry.register('unmark') 

701@Redshift.action_registry.register('remove-tag') 

702class RemoveTag(tags.RemoveTag): 

703 """Action to remove tag/tags from a redshift cluster 

704 

705 :example: 

706 

707 .. code-block:: yaml 

708 

709 policies: 

710 - name: redshift-remove-tag 

711 resource: redshift 

712 filters: 

713 - "tag:RedshiftTag": present 

714 actions: 

715 - type: remove-tag 

716 tags: ["RedshiftTags"] 

717 """ 

718 

719 concurrency = 2 

720 batch_size = 5 

721 permissions = ('redshift:DeleteTags',) 

722 

723 def process_resource_set(self, client, resources, tag_keys): 

724 for rarn, r in zip(self.manager.get_arns(resources), resources): 

725 client.delete_tags(ResourceName=rarn, TagKeys=tag_keys) 

726 

727 

728@Redshift.action_registry.register('tag-trim') 

729class TagTrim(tags.TagTrim): 

730 """Action to remove tags from a redshift cluster 

731 

732 This can be used to prevent reaching the ceiling limit of tags on a 

733 resource 

734 

735 :example: 

736 

737 .. code-block:: yaml 

738 

739 policies: 

740 - name: redshift-tag-trim 

741 resource: redshift 

742 filters: 

743 - type: value 

744 key: "length(Tags)" 

745 op: ge 

746 value: 10 

747 actions: 

748 - type: tag-trim 

749 space: 1 

750 preserve: 

751 - RequiredTag1 

752 - RequiredTag2 

753 """ 

754 

755 max_tag_count = 10 

756 permissions = ('redshift:DeleteTags',) 

757 

758 def process_tag_removal(self, client, resource, candidates): 

759 arn = self.manager.generate_arn(resource['DBInstanceIdentifier']) 

760 client.delete_tags(ResourceName=arn, TagKeys=candidates) 

761 

762 

763@Redshift.action_registry.register('modify-security-groups') 

764class RedshiftModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction): 

765 """Modify security groups on a Redshift cluster""" 

766 

767 permissions = ('redshift:ModifyCluster',) 

768 

769 def process(self, clusters): 

770 client = local_session(self.manager.session_factory).client('redshift') 

771 groups = super( 

772 RedshiftModifyVpcSecurityGroups, self).get_groups(clusters) 

773 

774 for idx, c in enumerate(clusters): 

775 client.modify_cluster( 

776 ClusterIdentifier=c['ClusterIdentifier'], 

777 VpcSecurityGroupIds=groups[idx]) 

778 

779 

780@resources.register('redshift-subnet-group') 

781class RedshiftSubnetGroup(QueryResourceManager): 

782 """Redshift subnet group.""" 

783 

784 class resource_type(TypeInfo): 

785 service = 'redshift' 

786 arn_type = 'subnetgroup' 

787 arn_separator = ':' 

788 id = name = 'ClusterSubnetGroupName' 

789 enum_spec = ( 

790 'describe_cluster_subnet_groups', 'ClusterSubnetGroups', None) 

791 filter_name = 'ClusterSubnetGroupName' 

792 filter_type = 'scalar' 

793 cfn_type = config_type = "AWS::Redshift::ClusterSubnetGroup" 

794 universal_taggable = object() 

795 

796 

797@resources.register('redshift-snapshot') 

798class RedshiftSnapshot(QueryResourceManager): 

799 """Resource manager for Redshift snapshots. 

800 """ 

801 

802 class resource_type(TypeInfo): 

803 service = 'redshift' 

804 arn_type = 'snapshot' 

805 arn_separator = ':' 

806 enum_spec = ('describe_cluster_snapshots', 'Snapshots', None) 

807 name = id = 'SnapshotIdentifier' 

808 date = 'SnapshotCreateTime' 

809 config_type = "AWS::Redshift::ClusterSnapshot" 

810 universal_taggable = True 

811 

812 def get_arns(self, resources): 

813 arns = [] 

814 for r in resources: 

815 arns.append(self.generate_arn(r['ClusterIdentifier'] + '/' + r[self.get_model().id])) 

816 return arns 

817 

818 

819@RedshiftSnapshot.filter_registry.register('age') 

820class RedshiftSnapshotAge(AgeFilter): 

821 """Filters redshift snapshots based on age (in days) 

822 

823 :example: 

824 

825 .. code-block:: yaml 

826 

827 policies: 

828 - name: redshift-old-snapshots 

829 resource: redshift-snapshot 

830 filters: 

831 - type: age 

832 days: 21 

833 op: gt 

834 """ 

835 

836 schema = type_schema( 

837 'age', days={'type': 'number'}, 

838 op={'$ref': '#/definitions/filters_common/comparison_operators'}) 

839 

840 date_attribute = 'SnapshotCreateTime' 

841 

842 

843@RedshiftSnapshot.filter_registry.register('cross-account') 

844class RedshiftSnapshotCrossAccount(CrossAccountAccessFilter): 

845 """Filter all accounts that allow access to non-whitelisted accounts 

846 """ 

847 permissions = ('redshift:DescribeClusterSnapshots',) 

848 schema = type_schema( 

849 'cross-account', 

850 whitelist={'type': 'array', 'items': {'type': 'string'}}, 

851 whitelist_from=ValuesFrom.schema) 

852 

853 def process(self, snapshots, event=None): 

854 accounts = self.get_accounts() 

855 snapshots = [s for s in snapshots if s.get('AccountsWithRestoreAccess')] 

856 results = [] 

857 for s in snapshots: 

858 s_accounts = {a.get('AccountId') for a in s[ 

859 'AccountsWithRestoreAccess']} 

860 delta_accounts = s_accounts.difference(accounts) 

861 if delta_accounts: 

862 s['c7n:CrossAccountViolations'] = list(delta_accounts) 

863 results.append(s) 

864 return results 

865 

866 

867@RedshiftSnapshot.action_registry.register('delete') 

868class RedshiftSnapshotDelete(BaseAction): 

869 """Filters redshift snapshots based on age (in days) 

870 

871 :example: 

872 

873 .. code-block:: yaml 

874 

875 policies: 

876 - name: redshift-delete-old-snapshots 

877 resource: redshift-snapshot 

878 filters: 

879 - type: age 

880 days: 21 

881 op: gt 

882 actions: 

883 - delete 

884 """ 

885 

886 schema = type_schema('delete') 

887 permissions = ('redshift:DeleteClusterSnapshot',) 

888 

889 def process(self, snapshots): 

890 self.log.info("Deleting %d Redshift snapshots", len(snapshots)) 

891 with self.executor_factory(max_workers=3) as w: 

892 futures = [] 

893 for snapshot_set in chunks(reversed(snapshots), size=50): 

894 futures.append( 

895 w.submit(self.process_snapshot_set, snapshot_set)) 

896 for f in as_completed(futures): 

897 if f.exception(): 

898 self.log.error( 

899 "Exception deleting snapshot set \n %s", 

900 f.exception()) 

901 return snapshots 

902 

903 def process_snapshot_set(self, snapshots_set): 

904 c = local_session(self.manager.session_factory).client('redshift') 

905 for s in snapshots_set: 

906 c.delete_cluster_snapshot( 

907 SnapshotIdentifier=s['SnapshotIdentifier'], 

908 SnapshotClusterIdentifier=s['ClusterIdentifier']) 

909 

910 

911@RedshiftSnapshot.action_registry.register('revoke-access') 

912class RedshiftSnapshotRevokeAccess(BaseAction): 

913 """Revokes ability of accounts to restore a snapshot 

914 

915 :example: 

916 

917 .. code-block:: yaml 

918 

919 policies: 

920 - name: redshift-snapshot-revoke-access 

921 resource: redshift-snapshot 

922 filters: 

923 - type: cross-account 

924 whitelist: 

925 - 012345678910 

926 actions: 

927 - type: revoke-access 

928 """ 

929 permissions = ('redshift:RevokeSnapshotAccess',) 

930 schema = type_schema('revoke-access') 

931 

932 def validate(self): 

933 for f in self.manager.iter_filters(): 

934 if isinstance(f, RedshiftSnapshotCrossAccount): 

935 return self 

936 raise PolicyValidationError( 

937 '`revoke-access` may only be used in ' 

938 'conjunction with `cross-account` filter on %s' % (self.manager.data,)) 

939 

940 def process_snapshot_set(self, client, snapshot_set): 

941 for s in snapshot_set: 

942 for a in s.get('c7n:CrossAccountViolations', []): 

943 try: 

944 self.manager.retry( 

945 client.revoke_snapshot_access, 

946 SnapshotIdentifier=s['SnapshotIdentifier'], 

947 AccountWithRestoreAccess=a) 

948 except ClientError as e: 

949 if e.response['Error']['Code'] == 'ClusterSnapshotNotFound': 

950 continue 

951 raise 

952 

953 def process(self, snapshots): 

954 client = local_session(self.manager.session_factory).client('redshift') 

955 with self.executor_factory(max_workers=2) as w: 

956 futures = {} 

957 for snapshot_set in chunks(snapshots, 25): 

958 futures[w.submit( 

959 self.process_snapshot_set, client, snapshot_set) 

960 ] = snapshot_set 

961 for f in as_completed(futures): 

962 if f.exception(): 

963 self.log.exception( 

964 'Exception while revoking access on %s: %s' % ( 

965 ', '.join( 

966 [s['SnapshotIdentifier'] for s in futures[f]]), 

967 f.exception())) 

968 

969 

970@resources.register('redshift-reserved') 

971class ReservedNode(QueryResourceManager): 

972 

973 class resource_type(TypeInfo): 

974 service = 'redshift' 

975 name = id = 'ReservedNodeId' 

976 date = 'StartTime' 

977 enum_spec = ( 

978 'describe_reserved_nodes', 'ReservedNodes', None) 

979 filter_name = 'ReservedNodes' 

980 filter_type = 'list' 

981 arn_type = "reserved-nodes" 

982 permissions_enum = ('redshift:DescribeReservedNodes',) 

983 

984 

985@Redshift.filter_registry.register('consecutive-snapshots') 

986class ClusterConsecutiveSnapshots(Filter): 

987 """Returns Clusters where number of consective daily backups is 

988 equal to/or greater than n days. 

989 

990 :example: 

991 

992 .. code-block:: yaml 

993 

994 policies: 

995 - name: redshift-daily-snapshot-count 

996 resource: redshift 

997 filters: 

998 - type: consecutive-snapshots 

999 count: 7 

1000 period: days 

1001 status: available 

1002 """ 

1003 schema = type_schema('consecutive-snapshots', count={'type': 'number', 'minimum': 1}, 

1004 period={'enum': ['hours', 'days', 'weeks']}, 

1005 status={'enum': ['available', 'creating', 'final snapshot', 'failed']}, 

1006 required=['count', 'period', 'status']) 

1007 permissions = ('redshift:DescribeClusterSnapshots', 'redshift:DescribeClusters', ) 

1008 annotation = 'c7n:RedshiftSnapshots' 

1009 

1010 def process_resource_set(self, client, resources, lbdate): 

1011 paginator = client.get_paginator('describe_cluster_snapshots') 

1012 paginator.PAGE_ITERATOR_CLS = RetryPageIterator 

1013 rs_snapshots = paginator.paginate(StartTime=lbdate).build_full_result().get( 

1014 'Snapshots', []) 

1015 

1016 cluster_map = {} 

1017 for snap in rs_snapshots: 

1018 cluster_map.setdefault(snap['ClusterIdentifier'], []).append(snap) 

1019 for r in resources: 

1020 r[self.annotation] = cluster_map.get(r['ClusterIdentifier'], []) 

1021 

1022 def get_date(self, time): 

1023 period = self.data.get('period') 

1024 if period == 'weeks': 

1025 date = (datetime.utcnow() - timedelta(weeks=time)).strftime('%Y-%m-%d') 

1026 elif period == 'hours': 

1027 date = (datetime.utcnow() - timedelta(hours=time)).strftime('%Y-%m-%d-%H') 

1028 else: 

1029 date = (datetime.utcnow() - timedelta(days=time)).strftime('%Y-%m-%d') 

1030 return date 

1031 

1032 def process(self, resources, event=None): 

1033 client = local_session(self.manager.session_factory).client('redshift') 

1034 results = [] 

1035 retention = self.data.get('count') 

1036 lbdate = self.get_date(retention) 

1037 expected_dates = set() 

1038 for time in range(1, retention + 1): 

1039 expected_dates.add(self.get_date(time)) 

1040 

1041 for resource_set in chunks( 

1042 [r for r in resources if self.annotation not in r], 50): 

1043 self.process_resource_set(client, resource_set, lbdate) 

1044 

1045 for r in resources: 

1046 snapshot_dates = set() 

1047 for snapshot in r[self.annotation]: 

1048 if snapshot['Status'] == self.data.get('status'): 

1049 if self.data.get('period') == 'hours': 

1050 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d-%H')) 

1051 else: 

1052 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d')) 

1053 if expected_dates.issubset(snapshot_dates): 

1054 results.append(r) 

1055 return results