Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/s3.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1754 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3"""S3 Resource Manager 

4 

5Filters: 

6 

7The generic Values filters (jmespath) expression and Or filter are 

8available with all resources, including buckets, we include several 

9additonal bucket data (Tags, Replication, Acl, Policy) as keys within 

10a bucket representation. 

11 

12Actions: 

13 

14 encrypt-keys 

15 

16 Scan all keys in a bucket and optionally encrypt them in place. 

17 

18 global-grants 

19 

20 Check bucket acls for global grants 

21 

22 encryption-policy 

23 

24 Attach an encryption required policy to a bucket, this will break 

25 applications that are not using encryption, including aws log 

26 delivery. 

27 

28""" 

29import copy 

30import functools 

31import json 

32import logging 

33import math 

34import os 

35import time 

36import threading 

37import ssl 

38 

39from botocore.client import Config 

40from botocore.exceptions import ClientError 

41 

42from collections import defaultdict 

43from concurrent.futures import as_completed 

44 

45try: 

46 from urllib3.exceptions import SSLError 

47except ImportError: 

48 from botocore.vendored.requests.packages.urllib3.exceptions import SSLError 

49 

50 

51from c7n import deprecated 

52from c7n.actions import ( 

53 ActionRegistry, BaseAction, PutMetric, RemovePolicyBase, remove_statements) 

54from c7n.exceptions import PolicyValidationError, PolicyExecutionError 

55from c7n.filters import ( 

56 FilterRegistry, Filter, CrossAccountAccessFilter, MetricsFilter, 

57 ValueFilter, ListItemFilter) 

58from .aws import shape_validate 

59from c7n.filters.policystatement import HasStatementFilter 

60from c7n.manager import resources 

61from c7n.output import NullBlobOutput 

62from c7n import query 

63from c7n.resources.securityhub import PostFinding 

64from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction 

65from c7n.utils import ( 

66 chunks, local_session, set_annotation, type_schema, filter_empty, 

67 dumps, format_string_values, get_account_alias_from_sts) 

68from c7n.resources.aws import inspect_bucket_region 

69 

70 

71log = logging.getLogger('custodian.s3') 

72 

73filters = FilterRegistry('s3.filters') 

74actions = ActionRegistry('s3.actions') 

75filters.register('marked-for-op', TagActionFilter) 

76actions.register('put-metric', PutMetric) 

77 

78MAX_COPY_SIZE = 1024 * 1024 * 1024 * 2 

79 

80 

81class DescribeS3(query.DescribeSource): 

82 

83 def augment(self, buckets): 

84 assembler = BucketAssembly(self.manager) 

85 assembler.initialize() 

86 

87 with self.manager.executor_factory( 

88 max_workers=min((10, len(buckets) + 1))) as w: 

89 results = w.map(assembler.assemble, buckets) 

90 results = list(filter(None, results)) 

91 return results 

92 

93 

94class ConfigS3(query.ConfigSource): 

95 

96 # normalize config's janky idiosyncratic bespoke formating to the 

97 # standard describe api responses. 

98 

99 def get_query_params(self, query): 

100 q = super(ConfigS3, self).get_query_params(query) 

101 if 'expr' in q: 

102 q['expr'] = q['expr'].replace('select ', 'select awsRegion, ') 

103 return q 

104 

105 def load_resource(self, item): 

106 resource = super(ConfigS3, self).load_resource(item) 

107 cfg = item['supplementaryConfiguration'] 

108 # aka standard 

109 if 'awsRegion' in item and item['awsRegion'] != 'us-east-1': 

110 resource['Location'] = {'LocationConstraint': item['awsRegion']} 

111 else: 

112 resource['Location'] = {} 

113 

114 # owner is under acl per describe 

115 resource.pop('Owner', None) 

116 

117 for k, null_value in S3_CONFIG_SUPPLEMENT_NULL_MAP.items(): 

118 if k not in cfg: 

119 continue 

120 if cfg.get(k) == null_value: 

121 continue 

122 method = getattr(self, "handle_%s" % k, None) 

123 if method is None: 

124 raise ValueError("unhandled supplementary config %s", k) 

125 continue 

126 v = cfg[k] 

127 if isinstance(cfg[k], str): 

128 v = json.loads(cfg[k]) 

129 method(resource, v) 

130 

131 for el in S3_AUGMENT_TABLE: 

132 if el[1] not in resource: 

133 resource[el[1]] = el[2] 

134 return resource 

135 

136 PERMISSION_MAP = { 

137 'FullControl': 'FULL_CONTROL', 

138 'Write': 'WRITE', 

139 'WriteAcp': 'WRITE_ACP', 

140 'Read': 'READ', 

141 'ReadAcp': 'READ_ACP'} 

142 

143 GRANTEE_MAP = { 

144 'AllUsers': "http://acs.amazonaws.com/groups/global/AllUsers", 

145 'AuthenticatedUsers': "http://acs.amazonaws.com/groups/global/AuthenticatedUsers", 

146 'LogDelivery': 'http://acs.amazonaws.com/groups/s3/LogDelivery'} 

147 

148 def handle_AccessControlList(self, resource, item_value): 

149 # double serialized in config for some reason 

150 if isinstance(item_value, str): 

151 item_value = json.loads(item_value) 

152 

153 resource['Acl'] = {} 

154 resource['Acl']['Owner'] = {'ID': item_value['owner']['id']} 

155 if item_value['owner']['displayName']: 

156 resource['Acl']['Owner']['DisplayName'] = item_value[ 

157 'owner']['displayName'] 

158 resource['Acl']['Grants'] = grants = [] 

159 

160 for g in (item_value.get('grantList') or ()): 

161 if 'id' not in g['grantee']: 

162 assert g['grantee'] in self.GRANTEE_MAP, "unknown grantee %s" % g 

163 rg = {'Type': 'Group', 'URI': self.GRANTEE_MAP[g['grantee']]} 

164 else: 

165 rg = {'ID': g['grantee']['id'], 'Type': 'CanonicalUser'} 

166 

167 if 'displayName' in g: 

168 rg['DisplayName'] = g['displayName'] 

169 

170 grants.append({ 

171 'Permission': self.PERMISSION_MAP[g['permission']], 

172 'Grantee': rg, 

173 }) 

174 

175 def handle_BucketAccelerateConfiguration(self, resource, item_value): 

176 # not currently auto-augmented by custodian 

177 return 

178 

179 def handle_BucketLoggingConfiguration(self, resource, item_value): 

180 if ('destinationBucketName' not in item_value or 

181 item_value['destinationBucketName'] is None): 

182 resource[u'Logging'] = {} 

183 return 

184 resource[u'Logging'] = { 

185 'TargetBucket': item_value['destinationBucketName'], 

186 'TargetPrefix': item_value['logFilePrefix']} 

187 

188 def handle_BucketLifecycleConfiguration(self, resource, item_value): 

189 rules = [] 

190 for r in item_value.get('rules'): 

191 rr = {} 

192 rules.append(rr) 

193 expiry = {} 

194 for ek, ck in ( 

195 ('Date', 'expirationDate'), 

196 ('ExpiredObjectDeleteMarker', 'expiredObjectDeleteMarker'), 

197 ('Days', 'expirationInDays')): 

198 if ck in r and r[ck] and r[ck] != -1: 

199 expiry[ek] = r[ck] 

200 if expiry: 

201 rr['Expiration'] = expiry 

202 

203 transitions = [] 

204 for t in (r.get('transitions') or ()): 

205 tr = {} 

206 for k in ('date', 'days', 'storageClass'): 

207 if t.get(k): 

208 tr["%s%s" % (k[0].upper(), k[1:])] = t[k] 

209 transitions.append(tr) 

210 if transitions: 

211 rr['Transitions'] = transitions 

212 

213 if r.get('abortIncompleteMultipartUpload'): 

214 rr['AbortIncompleteMultipartUpload'] = { 

215 'DaysAfterInitiation': r[ 

216 'abortIncompleteMultipartUpload']['daysAfterInitiation']} 

217 if r.get('noncurrentVersionExpirationInDays'): 

218 rr['NoncurrentVersionExpiration'] = { 

219 'NoncurrentDays': r['noncurrentVersionExpirationInDays']} 

220 

221 nonc_transitions = [] 

222 for t in (r.get('noncurrentVersionTransitions') or ()): 

223 nonc_transitions.append({ 

224 'NoncurrentDays': t['days'], 

225 'StorageClass': t['storageClass']}) 

226 if nonc_transitions: 

227 rr['NoncurrentVersionTransitions'] = nonc_transitions 

228 

229 rr['Status'] = r['status'] 

230 rr['ID'] = r['id'] 

231 if r.get('prefix'): 

232 rr['Prefix'] = r['prefix'] 

233 if 'filter' not in r or not r['filter']: 

234 continue 

235 

236 if r['filter']['predicate']: 

237 rr['Filter'] = self.convertLifePredicate(r['filter']['predicate']) 

238 

239 resource['Lifecycle'] = {'Rules': rules} 

240 

241 def convertLifePredicate(self, p): 

242 if p['type'] == 'LifecyclePrefixPredicate': 

243 return {'Prefix': p['prefix']} 

244 if p['type'] == 'LifecycleTagPredicate': 

245 return {'Tags': [{'Key': p['tag']['key'], 'Value': p['tag']['value']}]} 

246 if p['type'] == 'LifecycleAndOperator': 

247 n = {} 

248 for o in p['operands']: 

249 ot = self.convertLifePredicate(o) 

250 if 'Tags' in n and 'Tags' in ot: 

251 n['Tags'].extend(ot['Tags']) 

252 else: 

253 n.update(ot) 

254 return {'And': n} 

255 

256 raise ValueError("unknown predicate: %s" % p) 

257 

258 NotifyTypeMap = { 

259 'QueueConfiguration': 'QueueConfigurations', 

260 'LambdaConfiguration': 'LambdaFunctionConfigurations', 

261 'CloudFunctionConfiguration': 'LambdaFunctionConfigurations', 

262 'TopicConfiguration': 'TopicConfigurations'} 

263 

264 def handle_BucketNotificationConfiguration(self, resource, item_value): 

265 d = {} 

266 for nid, n in item_value['configurations'].items(): 

267 ninfo = {} 

268 d.setdefault(self.NotifyTypeMap[n['type']], []).append(ninfo) 

269 if n['type'] == 'QueueConfiguration': 

270 ninfo['QueueArn'] = n['queueARN'] 

271 elif n['type'] == 'TopicConfiguration': 

272 ninfo['TopicArn'] = n['topicARN'] 

273 elif n['type'] == 'LambdaConfiguration': 

274 ninfo['LambdaFunctionArn'] = n['functionARN'] 

275 ninfo['Id'] = nid 

276 ninfo['Events'] = n['events'] 

277 rules = [] 

278 if n['filter']: 

279 for r in n['filter'].get('s3KeyFilter', {}).get('filterRules', []): 

280 rules.append({'Name': r['name'], 'Value': r['value']}) 

281 if rules: 

282 ninfo['Filter'] = {'Key': {'FilterRules': rules}} 

283 resource['Notification'] = d 

284 

285 def handle_BucketReplicationConfiguration(self, resource, item_value): 

286 d = {'Role': item_value['roleARN'], 'Rules': []} 

287 for rid, r in item_value['rules'].items(): 

288 rule = { 

289 'ID': rid, 

290 'Status': r.get('status', ''), 

291 'Prefix': r.get('prefix', ''), 

292 'Destination': { 

293 'Bucket': r['destinationConfig']['bucketARN']} 

294 } 

295 if 'Account' in r['destinationConfig']: 

296 rule['Destination']['Account'] = r['destinationConfig']['Account'] 

297 if r['destinationConfig'].get('storageClass'): 

298 rule['Destination']['StorageClass'] = r['destinationConfig']['storageClass'] 

299 d['Rules'].append(rule) 

300 resource['Replication'] = {'ReplicationConfiguration': d} 

301 

302 def handle_BucketPolicy(self, resource, item_value): 

303 resource['Policy'] = item_value.get('policyText') 

304 

305 def handle_BucketTaggingConfiguration(self, resource, item_value): 

306 resource['Tags'] = [ 

307 {"Key": k, "Value": v} for k, v in item_value['tagSets'][0]['tags'].items()] 

308 

309 def handle_BucketVersioningConfiguration(self, resource, item_value): 

310 # Config defaults versioning to 'Off' for a null value 

311 if item_value['status'] not in ('Enabled', 'Suspended'): 

312 resource['Versioning'] = {} 

313 return 

314 resource['Versioning'] = {'Status': item_value['status']} 

315 # `isMfaDeleteEnabled` is an optional boolean property - the key may be absent, 

316 # present with a null value, or present with a boolean value. 

317 # Mirror the describe source by populating Versioning.MFADelete only in the 

318 # boolean case. 

319 mfa_delete = item_value.get('isMfaDeleteEnabled') 

320 if mfa_delete is None: 

321 return 

322 resource['Versioning']['MFADelete'] = ( 

323 'Enabled' if mfa_delete else 'Disabled' 

324 ) 

325 

326 def handle_BucketWebsiteConfiguration(self, resource, item_value): 

327 website = {} 

328 if item_value['indexDocumentSuffix']: 

329 website['IndexDocument'] = { 

330 'Suffix': item_value['indexDocumentSuffix']} 

331 if item_value['errorDocument']: 

332 website['ErrorDocument'] = { 

333 'Key': item_value['errorDocument']} 

334 if item_value['redirectAllRequestsTo']: 

335 website['RedirectAllRequestsTo'] = { 

336 'HostName': item_value['redirectAllRequestsTo']['hostName'], 

337 'Protocol': item_value['redirectAllRequestsTo']['protocol']} 

338 for r in item_value['routingRules']: 

339 redirect = {} 

340 rule = {'Redirect': redirect} 

341 website.setdefault('RoutingRules', []).append(rule) 

342 if 'condition' in r: 

343 cond = {} 

344 for ck, rk in ( 

345 ('keyPrefixEquals', 'KeyPrefixEquals'), 

346 ('httpErrorCodeReturnedEquals', 

347 'HttpErrorCodeReturnedEquals')): 

348 if r['condition'][ck]: 

349 cond[rk] = r['condition'][ck] 

350 rule['Condition'] = cond 

351 for ck, rk in ( 

352 ('protocol', 'Protocol'), 

353 ('hostName', 'HostName'), 

354 ('replaceKeyPrefixWith', 'ReplaceKeyPrefixWith'), 

355 ('replaceKeyWith', 'ReplaceKeyWith'), 

356 ('httpRedirectCode', 'HttpRedirectCode')): 

357 if r['redirect'][ck]: 

358 redirect[rk] = r['redirect'][ck] 

359 resource['Website'] = website 

360 

361 

362@resources.register('s3') 

363class S3(query.QueryResourceManager): 

364 """Amazon's Simple Storage Service Buckets. 

365 

366 

367 By default and due to historical compatiblity cloud custodian will 

368 fetch a number of subdocuments (Acl, Policy, Tagging, Versioning, 

369 Website, Notification, Lifecycle, and Replication) for each bucket 

370 to allow policies authors's to target common bucket 

371 configurations. 

372 

373 This behavior can be customized to avoid extraneous api calls if a 

374 particular sub document is not needed for a policy, by setting the 

375 `augment-keys` parameter in a query block of the policy. 

376 

377 ie if we only care about bucket website and replication 

378 configuration, we can minimize the api calls needed to fetch a 

379 bucket by setting up augment-keys as follows. 

380 

381 :example: 

382 

383 .. code-block:: yaml 

384 

385 policies: 

386 - name: check-website-replication 

387 resource: s3 

388 query: 

389 - augment-keys: ['Website', 'Replication'] 

390 filters: 

391 - Website.ErrorDocument: not-null 

392 - Replication.ReplicationConfiguration.Rules: not-null 

393 

394 It also supports an automatic detection mode where the use of a subdocument 

395 in a filter is automically with the augment-keys value of 'detect'. 

396 

397 :example: 

398 

399 .. code-block:: yaml 

400 

401 policies: 

402 - name: check-website-replication 

403 resource: s3 

404 query: 

405 - augment-keys: 'detect' 

406 filters: 

407 - Website.ErrorDocument: not-null 

408 - Replication.ReplicationConfiguration.Rules: not-null 

409 

410 The default value for augment-keys is `all` to preserve historical 

411 compatiblity. `augment-keys` also supports the value of 'none' to 

412 disable all subdocument fetching except Location and Tags. 

413 

414 Note certain actions may implicitly depend on the corresponding 

415 subdocument being present. 

416 

417 """ 

418 

419 class resource_type(query.TypeInfo): 

420 service = 's3' 

421 arn_type = '' 

422 enum_spec = ('list_buckets', 'Buckets[]', None) 

423 # not used but we want some consistency on the metadata 

424 detail_spec = ('get_bucket_location', 'Bucket', 'Name', 'LocationConstraint') 

425 permissions_augment = ( 

426 "s3:GetBucketAcl", 

427 "s3:GetBucketLocation", 

428 "s3:GetBucketPolicy", 

429 "s3:GetBucketTagging", 

430 "s3:GetBucketVersioning", 

431 "s3:GetBucketLogging", 

432 "s3:GetBucketNotification", 

433 "s3:GetBucketWebsite", 

434 "s3:GetLifecycleConfiguration", 

435 "s3:GetReplicationConfiguration" 

436 ) 

437 name = id = 'Name' 

438 date = 'CreationDate' 

439 dimension = 'BucketName' 

440 cfn_type = config_type = 'AWS::S3::Bucket' 

441 

442 filter_registry = filters 

443 action_registry = actions 

444 source_mapping = { 

445 'describe': DescribeS3, 

446 'config': ConfigS3 

447 } 

448 

449 def validate(self): 

450 super().validate() 

451 BucketAssembly(self).validate() 

452 

453 def get_arns(self, resources): 

454 return ["arn:aws:s3:::{}".format(r["Name"]) for r in resources] 

455 

456 @classmethod 

457 def get_permissions(cls): 

458 perms = ["s3:ListAllMyBuckets"] 

459 perms.extend([n[-1] for n in S3_AUGMENT_TABLE]) 

460 return perms 

461 

462 

463S3_CONFIG_SUPPLEMENT_NULL_MAP = { 

464 'BucketLoggingConfiguration': u'{"destinationBucketName":null,"logFilePrefix":null}', 

465 'BucketPolicy': u'{"policyText":null}', 

466 'BucketVersioningConfiguration': u'{"status":"Off","isMfaDeleteEnabled":null}', 

467 'BucketAccelerateConfiguration': u'{"status":null}', 

468 'BucketNotificationConfiguration': u'{"configurations":{}}', 

469 'BucketLifecycleConfiguration': None, 

470 'AccessControlList': None, 

471 'BucketTaggingConfiguration': None, 

472 'BucketWebsiteConfiguration': None, 

473 'BucketReplicationConfiguration': None 

474} 

475 

476S3_AUGMENT_TABLE = ( 

477 ('get_bucket_location', 'Location', {}, None, 's3:GetBucketLocation'), 

478 ('get_bucket_tagging', 'Tags', [], 'TagSet', 's3:GetBucketTagging'), 

479 ('get_bucket_policy', 'Policy', None, 'Policy', 's3:GetBucketPolicy'), 

480 ('get_bucket_acl', 'Acl', None, None, 's3:GetBucketAcl'), 

481 ('get_bucket_replication', 

482 'Replication', None, None, 's3:GetReplicationConfiguration'), 

483 ('get_bucket_versioning', 'Versioning', None, None, 's3:GetBucketVersioning'), 

484 ('get_bucket_website', 'Website', None, None, 's3:GetBucketWebsite'), 

485 ('get_bucket_logging', 'Logging', None, 'LoggingEnabled', 's3:GetBucketLogging'), 

486 ('get_bucket_notification_configuration', 

487 'Notification', None, None, 's3:GetBucketNotification'), 

488 ('get_bucket_lifecycle_configuration', 

489 'Lifecycle', None, None, 's3:GetLifecycleConfiguration'), 

490 # ('get_bucket_cors', 'Cors'), 

491) 

492 

493 

494class BucketAssembly: 

495 

496 def __init__(self, manager): 

497 self.manager = manager 

498 self.default_region = None 

499 self.region_clients = {} 

500 self.session = None 

501 self.session_lock = None 

502 self.augment_fields = [] 

503 

504 def initialize(self): 

505 # construct a default boto3 client, using the current session region. 

506 self.session = local_session(self.manager.session_factory) 

507 self.session_lock = threading.RLock() 

508 self.default_region = self.manager.config.region 

509 self.region_clients[self.default_region] = self.session.client('s3') 

510 self.augment_fields = set(self.detect_augment_fields()) 

511 # location is required for client construction 

512 self.augment_fields.add('Location') 

513 # custodian always returns tags 

514 self.augment_fields.add('Tags') 

515 

516 def validate(self): 

517 config = self.get_augment_config() 

518 if isinstance(config, str) and config not in ('all', 'detect', 'none'): 

519 raise PolicyValidationError( 

520 "augment-keys supports 'all', 'detect', 'none' or list of keys found: %s" % config) 

521 elif isinstance(config, list): 

522 delta = set(config).difference([row[1] for row in S3_AUGMENT_TABLE]) 

523 if delta: 

524 raise PolicyValidationError("augment-keys - found invalid keys: %s" % (list(delta))) 

525 if not isinstance(config, (list, str)): 

526 raise PolicyValidationError( 

527 "augment-keys supports 'all', 'detect', 'none' or list of keys found: %s" % config) 

528 

529 def get_augment_config(self): 

530 augment_config = None 

531 for option in self.manager.data.get('query', []): 

532 if option and option.get('augment-keys') is not None: 

533 augment_config = option['augment-keys'] 

534 if augment_config is None: 

535 augment_config = 'all' 

536 return augment_config 

537 

538 def detect_augment_fields(self): 

539 # try to detect augment fields required for the policy execution 

540 # we want to avoid extraneous api calls unless they are being used by the policy. 

541 

542 detected_keys = [] 

543 augment_keys = [row[1] for row in S3_AUGMENT_TABLE] 

544 augment_config = self.get_augment_config() 

545 

546 if augment_config == 'all': 

547 return augment_keys 

548 elif augment_config == 'none': 

549 return [] 

550 elif isinstance(augment_config, list): 

551 return augment_config 

552 

553 for f in self.manager.iter_filters(): 

554 fkey = None 

555 if not isinstance(f, ValueFilter): 

556 continue 

557 

558 f = f.data 

559 # type: value 

560 if f.get('type', '') == 'value': 

561 fkey = f.get('key') 

562 # k: v dict 

563 elif len(f) == 1: 

564 fkey = list(f.keys())[0] 

565 if fkey is None: # pragma: no cover 

566 continue 

567 

568 # remove any jmespath expressions 

569 fkey = fkey.split('.', 1)[0] 

570 

571 # tags have explicit handling in value filters. 

572 if fkey.startswith('tag:'): 

573 fkey = 'Tags' 

574 

575 # denied methods checks get all keys 

576 if fkey.startswith('c7n:DeniedMethods'): 

577 return augment_keys 

578 

579 if fkey in augment_keys: 

580 detected_keys.append(fkey) 

581 

582 return detected_keys 

583 

584 def get_client(self, region): 

585 if region in self.region_clients: 

586 return self.region_clients[region] 

587 with self.session_lock: 

588 self.region_clients[region] = self.session.client('s3', region_name=region) 

589 return self.region_clients[region] 

590 

591 def assemble(self, bucket): 

592 

593 client = self.get_client(self.default_region) 

594 augments = list(S3_AUGMENT_TABLE) 

595 

596 for info in augments: 

597 # we use the offset, as tests manipulate the augments table 

598 method_name, key, default, select = info[:4] 

599 if key not in self.augment_fields: 

600 continue 

601 

602 method = getattr(client, method_name) 

603 

604 try: 

605 response = method(Bucket=bucket['Name']) 

606 # This is here as exception handling will change to defaults if not present 

607 response.pop('ResponseMetadata', None) 

608 value = response 

609 if select and select in value: 

610 value = value[select] 

611 except (ssl.SSLError, SSLError) as e: 

612 # Proxy issue most likely 

613 log.warning( 

614 "Bucket ssl error %s: %s %s", 

615 bucket['Name'], bucket.get('Location', 'unknown'), e) 

616 continue 

617 except ClientError as e: 

618 code = e.response['Error']['Code'] 

619 if code.startswith("NoSuch") or "NotFound" in code: 

620 value = default 

621 elif code == 'PermanentRedirect': # pragma: no cover 

622 # (09/2025)- its not clear how we get here given a client region switch post 

623 # location detection. 

624 # 

625 # change client region 

626 client = self.get_client(get_region(bucket)) 

627 # requeue now that we have correct region 

628 augments.append((method_name, key, default, select)) 

629 continue 

630 else: 

631 # for auth errors record as attribute and move on 

632 if e.response['Error']['Code'] == 'AccessDenied': 

633 bucket.setdefault('c7n:DeniedMethods', []).append(method_name) 

634 continue 

635 # else log and raise 

636 log.warning( 

637 "Bucket:%s unable to invoke method:%s error:%s ", 

638 bucket['Name'], method_name, e.response['Error']['Message']) 

639 raise 

640 

641 # for historical reasons we normalize EU to eu-west-1 on the bucket 

642 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlocation.html 

643 if key == 'Location' and value and value.get('LocationConstraint', '') == 'EU': 

644 value['LocationConstraint'] = 'eu-west-1' 

645 

646 bucket[key] = value 

647 

648 # For all subsequent attributes after location, use a client that is targeted to 

649 # the bucket's regional s3 endpoint. 

650 if key == 'Location' and get_region(bucket) != client.meta.region_name: 

651 client = self.get_client(get_region(bucket)) 

652 return bucket 

653 

654 

655def bucket_client(session, b, kms=False): 

656 region = get_region(b) 

657 

658 if kms: 

659 # Need v4 signature for aws:kms crypto, else let the sdk decide 

660 # based on region support. 

661 config = Config( 

662 signature_version='s3v4', 

663 read_timeout=200, connect_timeout=120) 

664 else: 

665 config = Config(read_timeout=200, connect_timeout=120) 

666 return session.client('s3', region_name=region, config=config) 

667 

668 

669def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()): 

670 for bucket in buckets: 

671 client = bucket_client(local_session(session_factory), bucket) 

672 # Bucket tags are set atomically for the set/document, we want 

673 # to refetch against current to guard against any staleness in 

674 # our cached representation across multiple policies or concurrent 

675 # modifications. 

676 

677 if 'get_bucket_tagging' in bucket.get('c7n:DeniedMethods', []): 

678 # avoid the additional API call if we already know that it's going 

679 # to result in AccessDenied. The chances that the resource's perms 

680 # would have changed between fetching the resource and acting on it 

681 # here are pretty low-- so the check here should suffice. 

682 log.warning( 

683 "Unable to get new set of bucket tags needed to modify tags," 

684 "skipping tag action for bucket: %s" % bucket["Name"]) 

685 continue 

686 

687 try: 

688 bucket['Tags'] = client.get_bucket_tagging( 

689 Bucket=bucket['Name']).get('TagSet', []) 

690 except ClientError as e: 

691 if e.response['Error']['Code'] != 'NoSuchTagSet': 

692 raise 

693 bucket['Tags'] = [] 

694 

695 new_tags = {t['Key']: t['Value'] for t in add_tags} 

696 for t in bucket.get('Tags', ()): 

697 if (t['Key'] not in new_tags and t['Key'] not in remove_tags): 

698 new_tags[t['Key']] = t['Value'] 

699 tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()] 

700 

701 try: 

702 client.put_bucket_tagging( 

703 Bucket=bucket['Name'], Tagging={'TagSet': tag_set}) 

704 except ClientError as e: 

705 log.exception( 

706 'Exception tagging bucket %s: %s', bucket['Name'], e) 

707 continue 

708 

709 

710def get_region(b): 

711 """Tries to get the bucket region from Location.LocationConstraint 

712 

713 Special cases: 

714 LocationConstraint EU defaults to eu-west-1 

715 LocationConstraint null defaults to us-east-1 

716 

717 Args: 

718 b (object): A bucket object 

719 

720 Returns: 

721 string: an aws region string 

722 """ 

723 remap = {None: 'us-east-1', 'EU': 'eu-west-1'} 

724 region = b.get('Location', {}).get('LocationConstraint') 

725 return remap.get(region, region) 

726 

727 

728@filters.register('metrics') 

729class S3Metrics(MetricsFilter): 

730 """S3 CW Metrics need special handling for attribute/dimension 

731 mismatch, and additional required dimension. 

732 """ 

733 

734 def get_dimensions(self, resource): 

735 dims = [{'Name': 'BucketName', 'Value': resource['Name']}] 

736 if (self.data['name'] == 'NumberOfObjects' and 

737 'dimensions' not in self.data): 

738 dims.append( 

739 {'Name': 'StorageType', 'Value': 'AllStorageTypes'}) 

740 return dims 

741 

742 

743@filters.register('cross-account') 

744class S3CrossAccountFilter(CrossAccountAccessFilter): 

745 """Filters cross-account access to S3 buckets 

746 

747 :example: 

748 

749 .. code-block:: yaml 

750 

751 policies: 

752 - name: s3-acl 

753 resource: s3 

754 region: us-east-1 

755 filters: 

756 - type: cross-account 

757 """ 

758 permissions = ('s3:GetBucketPolicy',) 

759 

760 def get_accounts(self): 

761 """add in elb access by default 

762 

763 ELB Accounts by region 

764 https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-access-logs.html 

765 

766 Redshift Accounts by region 

767 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#db-auditing-manage-log-files 

768 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#rs-db-auditing-cloud-trail-rs-acct-ids 

769 

770 Cloudtrail Accounts by region 

771 https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-supported-regions.html 

772 """ 

773 accounts = super(S3CrossAccountFilter, self).get_accounts() 

774 return accounts.union( 

775 [ 

776 # ELB accounts 

777 '127311923021', # us-east-1 

778 '033677994240', # us-east-2 

779 '027434742980', # us-west-1 

780 '797873946194', # us-west-2 

781 '098369216593', # af-south-1 

782 '985666609251', # ca-central-1 

783 '054676820928', # eu-central-1 

784 '897822967062', # eu-north-1 

785 '635631232127', # eu-south-1 

786 '156460612806', # eu-west-1 

787 '652711504416', # eu-west-2 

788 '009996457667', # eu-west-3 

789 '754344448648', # ap-east-1 

790 '582318560864', # ap-northeast-1 

791 '600734575887', # ap-northeast-2 

792 '383597477331', # ap-northeast-3 

793 '114774131450', # ap-southeast-1 

794 '783225319266', # ap-southeast-2 

795 '718504428378', # ap-south-1 

796 '076674570225', # me-south-1 

797 '507241528517', # sa-east-1 

798 '048591011584', # us-gov-west-1 or gov-cloud-1 

799 '190560391635', # us-gov-east-1 

800 '638102146993', # cn-north-1 

801 '037604701340', # cn-northwest-1 

802 

803 # Redshift audit logging 

804 '193672423079', # us-east-1 

805 '391106570357', # us-east-2 

806 '262260360010', # us-west-1 

807 '902366379725', # us-west-2 

808 '365689465814', # af-south-1 

809 '313564881002', # ap-east-1 

810 '865932855811', # ap-south-1 

811 '090321488786', # ap-northeast-3 

812 '760740231472', # ap-northeast-2 

813 '361669875840', # ap-southeast-1 

814 '762762565011', # ap-southeast-2 

815 '404641285394', # ap-northeast-1 

816 '907379612154', # ca-central-1 

817 '053454850223', # eu-central-1 

818 '210876761215', # eu-west-1 

819 '307160386991', # eu-west-2 

820 '945612479654', # eu-south-1 

821 '915173422425', # eu-west-3 

822 '729911121831', # eu-north-1 

823 '013126148197', # me-south-1 

824 '075028567923', # sa-east-1 

825 

826 # Cloudtrail accounts (psa. folks should be using 

827 # cloudtrail service in bucket policies) 

828 '086441151436', # us-east-1 

829 '475085895292', # us-west-2 

830 '388731089494', # us-west-1 

831 '113285607260', # us-west-2 

832 '819402241893', # ca-central-1 

833 '977081816279', # ap-south-1 

834 '492519147666', # ap-northeast-2 

835 '903692715234', # ap-southeast-1 

836 '284668455005', # ap-southeast-2 

837 '216624486486', # ap-northeast-1 

838 '035351147821', # eu-central-1 

839 '859597730677', # eu-west-1 

840 '282025262664', # eu-west-2 

841 '814480443879', # sa-east-1 

842 ]) 

843 

844 

845@filters.register('global-grants') 

846class GlobalGrantsFilter(Filter): 

847 """Filters for all S3 buckets that have global-grants 

848 

849 *Note* by default this filter allows for read access 

850 if the bucket has been configured as a website. This 

851 can be disabled per the example below. 

852 

853 :example: 

854 

855 .. code-block:: yaml 

856 

857 policies: 

858 - name: remove-global-grants 

859 resource: s3 

860 filters: 

861 - type: global-grants 

862 allow_website: false 

863 actions: 

864 - delete-global-grants 

865 

866 """ 

867 

868 schema = type_schema( 

869 'global-grants', 

870 allow_website={'type': 'boolean'}, 

871 operator={'type': 'string', 'enum': ['or', 'and']}, 

872 permissions={ 

873 'type': 'array', 'items': { 

874 'type': 'string', 'enum': [ 

875 'READ', 'WRITE', 'WRITE_ACP', 'READ_ACP', 'FULL_CONTROL']}}) 

876 

877 GLOBAL_ALL = "http://acs.amazonaws.com/groups/global/AllUsers" 

878 AUTH_ALL = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" 

879 

880 def process(self, buckets, event=None): 

881 with self.executor_factory(max_workers=5) as w: 

882 results = w.map(self.process_bucket, buckets) 

883 results = list(filter(None, list(results))) 

884 return results 

885 

886 def process_bucket(self, b): 

887 acl = b.get('Acl', {'Grants': []}) 

888 if not acl or not acl['Grants']: 

889 return 

890 

891 results = [] 

892 allow_website = self.data.get('allow_website', True) 

893 perms = self.data.get('permissions', []) 

894 

895 for grant in acl['Grants']: 

896 if 'URI' not in grant.get("Grantee", {}): 

897 continue 

898 if grant['Grantee']['URI'] not in [self.AUTH_ALL, self.GLOBAL_ALL]: 

899 continue 

900 if allow_website and grant['Permission'] == 'READ' and b['Website']: 

901 continue 

902 if not perms or (perms and grant['Permission'] in perms): 

903 results.append(grant['Permission']) 

904 

905 if results: 

906 set_annotation(b, 'GlobalPermissions', results) 

907 return b 

908 

909 

910class BucketActionBase(BaseAction): 

911 

912 def get_permissions(self): 

913 return self.permissions 

914 

915 def get_std_format_args(self, bucket): 

916 return { 

917 'account_id': self.manager.config.account_id, 

918 'region': self.manager.config.region, 

919 'bucket_name': bucket['Name'], 

920 'bucket_region': get_region(bucket) 

921 } 

922 

923 def process(self, buckets): 

924 return self._process_with_futures(buckets) 

925 

926 def _process_with_futures(self, buckets, *args, max_workers=3, **kwargs): 

927 errors = 0 

928 results = [] 

929 with self.executor_factory(max_workers=max_workers) as w: 

930 futures = {} 

931 for b in buckets: 

932 futures[w.submit(self.process_bucket, b, *args, **kwargs)] = b 

933 for f in as_completed(futures): 

934 if f.exception(): 

935 b = futures[f] 

936 self.log.error( 

937 'error modifying bucket: policy:%s action:%s bucket:%s error:%s', 

938 self.manager.data.get('name'), self.name, b['Name'], f.exception() 

939 ) 

940 errors += 1 

941 continue 

942 results += filter(None, [f.result()]) 

943 if errors: 

944 self.log.error('encountered %d errors while processing %s', errors, self.name) 

945 raise PolicyExecutionError('%d resources failed', errors) 

946 return results 

947 

948 

949class BucketFilterBase(Filter): 

950 def get_std_format_args(self, bucket): 

951 return { 

952 'account_id': self.manager.config.account_id, 

953 'region': self.manager.config.region, 

954 'bucket_name': bucket['Name'], 

955 'bucket_region': get_region(bucket) 

956 } 

957 

958 

959@S3.action_registry.register("post-finding") 

960class BucketFinding(PostFinding): 

961 

962 resource_type = 'AwsS3Bucket' 

963 

964 def format_resource(self, r): 

965 owner = r.get("Acl", {}).get("Owner", {}) 

966 resource = { 

967 "Type": self.resource_type, 

968 "Id": "arn:aws:s3:::{}".format(r["Name"]), 

969 "Region": get_region(r), 

970 "Tags": {t["Key"]: t["Value"] for t in r.get("Tags", [])}, 

971 "Details": {self.resource_type: { 

972 "OwnerId": owner.get('ID', 'Unknown')}} 

973 } 

974 

975 if "DisplayName" in owner: 

976 resource["Details"]["AwsS3Bucket"]["OwnerName"] = owner['DisplayName'] 

977 

978 return filter_empty(resource) 

979 

980 

981@S3.filter_registry.register('has-statement') 

982class S3HasStatementFilter(HasStatementFilter): 

983 def get_std_format_args(self, bucket): 

984 return { 

985 'account_id': self.manager.config.account_id, 

986 'region': self.manager.config.region, 

987 'bucket_name': bucket['Name'], 

988 'bucket_region': get_region(bucket) 

989 } 

990 

991 

992@S3.filter_registry.register('lock-configuration') 

993class S3LockConfigurationFilter(ValueFilter): 

994 """ 

995 Filter S3 buckets based on their object lock configurations 

996 

997 :example: 

998 

999 Get all buckets where lock configuration mode is COMPLIANCE 

1000 

1001 .. code-block:: yaml 

1002 

1003 policies: 

1004 - name: lock-configuration-compliance 

1005 resource: aws.s3 

1006 filters: 

1007 - type: lock-configuration 

1008 key: Rule.DefaultRetention.Mode 

1009 value: COMPLIANCE 

1010 

1011 """ 

1012 schema = type_schema('lock-configuration', rinherit=ValueFilter.schema) 

1013 permissions = ('s3:GetBucketObjectLockConfiguration',) 

1014 annotate = True 

1015 annotation_key = 'c7n:ObjectLockConfiguration' 

1016 

1017 def _process_resource(self, client, resource): 

1018 try: 

1019 config = client.get_object_lock_configuration( 

1020 Bucket=resource['Name'] 

1021 )['ObjectLockConfiguration'] 

1022 except ClientError as e: 

1023 if e.response['Error']['Code'] == 'ObjectLockConfigurationNotFoundError': 

1024 config = None 

1025 else: 

1026 raise 

1027 resource[self.annotation_key] = config 

1028 

1029 def process(self, resources, event=None): 

1030 client = local_session(self.manager.session_factory).client('s3') 

1031 with self.executor_factory(max_workers=3) as w: 

1032 futures = [] 

1033 for res in resources: 

1034 if self.annotation_key in res: 

1035 continue 

1036 futures.append(w.submit(self._process_resource, client, res)) 

1037 for f in as_completed(futures): 

1038 exc = f.exception() 

1039 if exc: 

1040 self.log.error( 

1041 "Exception getting bucket lock configuration \n %s" % ( 

1042 exc)) 

1043 return super().process(resources, event) 

1044 

1045 def __call__(self, r): 

1046 return super().__call__(r.setdefault(self.annotation_key, None)) 

1047 

1048 

1049ENCRYPTION_STATEMENT_GLOB = { 

1050 'Effect': 'Deny', 

1051 'Principal': '*', 

1052 'Action': 's3:PutObject', 

1053 "Condition": { 

1054 "StringNotEquals": { 

1055 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}} 

1056 

1057 

1058@filters.register('no-encryption-statement') 

1059class EncryptionEnabledFilter(Filter): 

1060 """Find buckets with missing encryption policy statements. 

1061 

1062 :example: 

1063 

1064 .. code-block:: yaml 

1065 

1066 policies: 

1067 - name: s3-bucket-not-encrypted 

1068 resource: s3 

1069 filters: 

1070 - type: no-encryption-statement 

1071 """ 

1072 schema = type_schema( 

1073 'no-encryption-statement') 

1074 

1075 def get_permissions(self): 

1076 perms = self.manager.get_resource_manager('s3').get_permissions() 

1077 return perms 

1078 

1079 def process(self, buckets, event=None): 

1080 return list(filter(None, map(self.process_bucket, buckets))) 

1081 

1082 def process_bucket(self, b): 

1083 p = b.get('Policy') 

1084 if p is None: 

1085 return b 

1086 p = json.loads(p) 

1087 encryption_statement = dict(ENCRYPTION_STATEMENT_GLOB) 

1088 

1089 statements = p.get('Statement', []) 

1090 check = False 

1091 for s in list(statements): 

1092 if 'Sid' in s: 

1093 encryption_statement["Sid"] = s["Sid"] 

1094 if 'Resource' in s: 

1095 encryption_statement["Resource"] = s["Resource"] 

1096 if s == encryption_statement: 

1097 check = True 

1098 break 

1099 if check: 

1100 return None 

1101 else: 

1102 return b 

1103 

1104 

1105@filters.register('missing-statement') 

1106@filters.register('missing-policy-statement') 

1107class MissingPolicyStatementFilter(Filter): 

1108 """Find buckets missing a set of named policy statements. 

1109 

1110 :example: 

1111 

1112 .. code-block:: yaml 

1113 

1114 policies: 

1115 - name: s3-bucket-missing-statement 

1116 resource: s3 

1117 filters: 

1118 - type: missing-statement 

1119 statement_ids: 

1120 - RequiredEncryptedPutObject 

1121 """ 

1122 

1123 schema = type_schema( 

1124 'missing-policy-statement', 

1125 aliases=('missing-statement',), 

1126 statement_ids={'type': 'array', 'items': {'type': 'string'}}) 

1127 

1128 def __call__(self, b): 

1129 p = b.get('Policy') 

1130 if p is None: 

1131 return b 

1132 

1133 p = json.loads(p) 

1134 

1135 required = list(self.data.get('statement_ids', [])) 

1136 statements = p.get('Statement', []) 

1137 for s in list(statements): 

1138 if s.get('Sid') in required: 

1139 required.remove(s['Sid']) 

1140 if not required: 

1141 return False 

1142 return True 

1143 

1144 

1145@filters.register('bucket-notification') 

1146class BucketNotificationFilter(ValueFilter): 

1147 """Filter based on bucket notification configuration. 

1148 

1149 :example: 

1150 

1151 .. code-block:: yaml 

1152 

1153 policies: 

1154 - name: delete-incorrect-notification 

1155 resource: s3 

1156 filters: 

1157 - type: bucket-notification 

1158 kind: lambda 

1159 key: Id 

1160 value: "IncorrectLambda" 

1161 op: eq 

1162 actions: 

1163 - type: delete-bucket-notification 

1164 statement_ids: matched 

1165 """ 

1166 

1167 schema = type_schema( 

1168 'bucket-notification', 

1169 required=['kind'], 

1170 kind={'type': 'string', 'enum': ['lambda', 'sns', 'sqs']}, 

1171 rinherit=ValueFilter.schema) 

1172 schema_alias = False 

1173 annotation_key = 'c7n:MatchedNotificationConfigurationIds' 

1174 

1175 permissions = ('s3:GetBucketNotification',) 

1176 

1177 FIELDS = { 

1178 'lambda': 'LambdaFunctionConfigurations', 

1179 'sns': 'TopicConfigurations', 

1180 'sqs': 'QueueConfigurations' 

1181 } 

1182 

1183 def process(self, buckets, event=None): 

1184 return super(BucketNotificationFilter, self).process(buckets, event) 

1185 

1186 def __call__(self, bucket): 

1187 

1188 field = self.FIELDS[self.data['kind']] 

1189 found = False 

1190 for config in bucket.get('Notification', {}).get(field, []): 

1191 if self.match(config): 

1192 set_annotation( 

1193 bucket, 

1194 BucketNotificationFilter.annotation_key, 

1195 config['Id']) 

1196 found = True 

1197 return found 

1198 

1199 

1200@filters.register('bucket-logging') 

1201class BucketLoggingFilter(BucketFilterBase): 

1202 """Filter based on bucket logging configuration. 

1203 

1204 :example: 

1205 

1206 .. code-block:: yaml 

1207 

1208 policies: 

1209 - name: add-bucket-logging-if-missing 

1210 resource: s3 

1211 filters: 

1212 - type: bucket-logging 

1213 op: disabled 

1214 actions: 

1215 - type: toggle-logging 

1216 target_bucket: "{account_id}-{region}-s3-logs" 

1217 target_prefix: "{source_bucket_name}/" 

1218 

1219 policies: 

1220 - name: update-incorrect-or-missing-logging 

1221 resource: s3 

1222 filters: 

1223 - type: bucket-logging 

1224 op: not-equal 

1225 target_bucket: "{account_id}-{region}-s3-logs" 

1226 target_prefix: "{account}/{source_bucket_name}/" 

1227 actions: 

1228 - type: toggle-logging 

1229 target_bucket: "{account_id}-{region}-s3-logs" 

1230 target_prefix: "{account}/{source_bucket_name}/" 

1231 """ 

1232 

1233 schema = type_schema( 

1234 'bucket-logging', 

1235 op={'enum': ['enabled', 'disabled', 'equal', 'not-equal', 'eq', 'ne']}, 

1236 required=['op'], 

1237 target_bucket={'type': 'string'}, 

1238 target_prefix={'type': 'string'}) 

1239 schema_alias = False 

1240 account_name = None 

1241 

1242 permissions = ("s3:GetBucketLogging", "iam:ListAccountAliases") 

1243 

1244 def process(self, buckets, event=None): 

1245 return list(filter(None, map(self.process_bucket, buckets))) 

1246 

1247 def process_bucket(self, b): 

1248 if self.match_bucket(b): 

1249 return b 

1250 

1251 def match_bucket(self, b): 

1252 op = self.data.get('op') 

1253 

1254 logging = b.get('Logging', {}) 

1255 if op == 'disabled': 

1256 return logging == {} 

1257 elif op == 'enabled': 

1258 return logging != {} 

1259 

1260 if self.account_name is None: 

1261 session = local_session(self.manager.session_factory) 

1262 self.account_name = get_account_alias_from_sts(session) 

1263 

1264 variables = self.get_std_format_args(b) 

1265 variables.update({ 

1266 'account': self.account_name, 

1267 'source_bucket_name': b['Name'], 

1268 'source_bucket_region': get_region(b), 

1269 'target_bucket_name': self.data.get('target_bucket'), 

1270 'target_prefix': self.data.get('target_prefix'), 

1271 }) 

1272 data = format_string_values(self.data, **variables) 

1273 target_bucket = data.get('target_bucket') 

1274 target_prefix = data.get('target_prefix', b['Name'] + '/') 

1275 

1276 target_config = { 

1277 "TargetBucket": target_bucket, 

1278 "TargetPrefix": target_prefix 

1279 } if target_bucket else {} 

1280 

1281 if op in ('not-equal', 'ne'): 

1282 return logging != target_config 

1283 else: 

1284 return logging == target_config 

1285 

1286 

1287@actions.register('delete-bucket-notification') 

1288class DeleteBucketNotification(BucketActionBase): 

1289 """Action to delete S3 bucket notification configurations""" 

1290 

1291 schema = type_schema( 

1292 'delete-bucket-notification', 

1293 required=['statement_ids'], 

1294 statement_ids={'oneOf': [ 

1295 {'enum': ['matched']}, 

1296 {'type': 'array', 'items': {'type': 'string'}}]}) 

1297 

1298 permissions = ('s3:PutBucketNotification',) 

1299 

1300 def process_bucket(self, bucket): 

1301 n = bucket['Notification'] 

1302 if not n: 

1303 return 

1304 

1305 statement_ids = self.data.get('statement_ids') 

1306 if statement_ids == 'matched': 

1307 statement_ids = bucket.get(BucketNotificationFilter.annotation_key, ()) 

1308 if not statement_ids: 

1309 return 

1310 

1311 cfg = defaultdict(list) 

1312 

1313 for t in BucketNotificationFilter.FIELDS.values(): 

1314 for c in n.get(t, []): 

1315 if c['Id'] not in statement_ids: 

1316 cfg[t].append(c) 

1317 

1318 client = bucket_client(local_session(self.manager.session_factory), bucket) 

1319 client.put_bucket_notification_configuration( 

1320 Bucket=bucket['Name'], 

1321 NotificationConfiguration=cfg) 

1322 

1323 

1324@actions.register('no-op') 

1325class NoOp(BucketActionBase): 

1326 

1327 schema = type_schema('no-op') 

1328 permissions = ('s3:ListAllMyBuckets',) 

1329 

1330 def process(self, buckets): 

1331 return None 

1332 

1333 

1334@actions.register('set-statements') 

1335class SetPolicyStatement(BucketActionBase): 

1336 """Action to add or update policy statements to S3 buckets 

1337 

1338 :example: 

1339 

1340 Remove all existing policies, and block insecure HTTP requests. 

1341 

1342 .. code-block:: yaml 

1343 

1344 policies: 

1345 - name: force-s3-https 

1346 resource: s3 

1347 actions: 

1348 - type: set-statements 

1349 remove: "*" 

1350 statements: 

1351 - Sid: "DenyHttp" 

1352 Effect: "Deny" 

1353 Action: "s3:GetObject" 

1354 Principal: 

1355 AWS: "*" 

1356 Resource: "arn:aws:s3:::{bucket_name}/*" 

1357 Condition: 

1358 Bool: 

1359 "aws:SecureTransport": false 

1360 

1361 :example: 

1362 

1363 Remove existing statements that grant cross-account access, and 

1364 block insecure HTTP requests. 

1365 

1366 .. code-block:: yaml 

1367 

1368 policies: 

1369 - name: s3-tighten-bucket-policy 

1370 resource: aws.s3 

1371 filters: 

1372 - type: cross-account 

1373 actions: 

1374 - type: set-statements 

1375 remove: "matched" 

1376 statements: 

1377 - Sid: "DenyHttp" 

1378 Effect: "Deny" 

1379 Action: "s3:GetObject" 

1380 Principal: 

1381 AWS: "*" 

1382 Resource: "arn:aws:s3:::{bucket_name}/*" 

1383 Condition: 

1384 Bool: 

1385 "aws:SecureTransport": false 

1386 

1387 """ 

1388 

1389 permissions = ('s3:PutBucketPolicy', 's3:DeleteBucketPolicy') 

1390 

1391 schema = type_schema( 

1392 'set-statements', 

1393 **{ 

1394 'remove': {'oneOf': [ 

1395 {'enum': ['matched', "*"]}, 

1396 {'type': 'array', 'items': {'type': 'string'}}]}, 

1397 'statements': { 

1398 'type': 'array', 

1399 'items': { 

1400 'type': 'object', 

1401 'properties': { 

1402 'Sid': {'type': 'string'}, 

1403 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']}, 

1404 'Principal': {'anyOf': [{'type': 'string'}, 

1405 {'type': 'object'}, {'type': 'array'}]}, 

1406 'NotPrincipal': {'anyOf': [{'type': 'object'}, {'type': 'array'}]}, 

1407 'Action': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1408 'NotAction': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1409 'Resource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1410 'NotResource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1411 'Condition': {'type': 'object'} 

1412 }, 

1413 'required': ['Sid', 'Effect'], 

1414 'oneOf': [ 

1415 {'required': ['Principal', 'Action', 'Resource']}, 

1416 {'required': ['NotPrincipal', 'Action', 'Resource']}, 

1417 {'required': ['Principal', 'NotAction', 'Resource']}, 

1418 {'required': ['NotPrincipal', 'NotAction', 'Resource']}, 

1419 {'required': ['Principal', 'Action', 'NotResource']}, 

1420 {'required': ['NotPrincipal', 'Action', 'NotResource']}, 

1421 {'required': ['Principal', 'NotAction', 'NotResource']}, 

1422 {'required': ['NotPrincipal', 'NotAction', 'NotResource']} 

1423 ] 

1424 } 

1425 } 

1426 } 

1427 ) 

1428 

1429 def process_bucket_remove(self, policy, bucket): 

1430 statements = policy.get('Statement', []) 

1431 resource_statements = bucket.get(CrossAccountAccessFilter.annotation_key, ()) 

1432 

1433 statements, found = remove_statements( 

1434 self.data.get('remove', []), statements, resource_statements) 

1435 

1436 return statements, found 

1437 

1438 def process_bucket_add(self, policy, bucket): 

1439 target_statements = format_string_values( 

1440 copy.deepcopy({s['Sid']: s for s in self.data.get('statements', [])}), 

1441 **self.get_std_format_args(bucket)) 

1442 

1443 bucket_statements = policy.setdefault('Statement', []) 

1444 

1445 for s in bucket_statements: 

1446 if s.get('Sid') not in target_statements: 

1447 continue 

1448 if s == target_statements[s['Sid']]: 

1449 target_statements.pop(s['Sid']) 

1450 

1451 if not target_statements: 

1452 return False 

1453 

1454 bucket_statements.extend(target_statements.values()) 

1455 return True 

1456 

1457 def process_bucket(self, bucket): 

1458 policy = bucket.get('Policy') or '{}' 

1459 policy = json.loads(policy) 

1460 

1461 statements, found = self.process_bucket_remove(policy, bucket) 

1462 modified = self.process_bucket_add(policy, bucket) 

1463 

1464 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1465 

1466 if not modified and not found: 

1467 return 

1468 

1469 policy = json.dumps(policy) 

1470 

1471 if not statements and found and not modified: 

1472 s3.delete_bucket_policy(Bucket=bucket['Name']) 

1473 return {'Name': bucket['Name'], 'Policy': policy} 

1474 

1475 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=policy) 

1476 return {'Name': bucket['Name'], 'Policy': policy} 

1477 

1478 

1479@actions.register('remove-statements') 

1480class RemovePolicyStatement(RemovePolicyBase): 

1481 """Action to remove policy statements from S3 buckets 

1482 

1483 This action has been deprecated. Please use the 'set-statements' action 

1484 with the 'remove' attribute to remove policy statements from S3 buckets. 

1485 

1486 :example: 

1487 

1488 .. code-block:: yaml 

1489 

1490 policies: 

1491 - name: s3-remove-encrypt-put 

1492 resource: s3 

1493 filters: 

1494 - type: has-statement 

1495 statement_ids: 

1496 - RequireEncryptedPutObject 

1497 actions: 

1498 - type: remove-statements 

1499 statement_ids: 

1500 - RequiredEncryptedPutObject 

1501 """ 

1502 

1503 deprecations = ( 

1504 deprecated.filter("use the 'set-statements' action with 'remove' attribute"), 

1505 ) 

1506 

1507 permissions = ("s3:PutBucketPolicy", "s3:DeleteBucketPolicy") 

1508 

1509 def process(self, buckets): 

1510 with self.executor_factory(max_workers=3) as w: 

1511 futures = {} 

1512 results = [] 

1513 for b in buckets: 

1514 futures[w.submit(self.process_bucket, b)] = b 

1515 for f in as_completed(futures): 

1516 if f.exception(): 

1517 b = futures[f] 

1518 self.log.error('error modifying bucket:%s\n%s', 

1519 b['Name'], f.exception()) 

1520 results += filter(None, [f.result()]) 

1521 return results 

1522 

1523 def process_bucket(self, bucket): 

1524 p = bucket.get('Policy') 

1525 if p is None: 

1526 return 

1527 

1528 p = json.loads(p) 

1529 

1530 statements, found = self.process_policy( 

1531 p, bucket, CrossAccountAccessFilter.annotation_key) 

1532 

1533 if not found: 

1534 return 

1535 

1536 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1537 

1538 if not statements: 

1539 s3.delete_bucket_policy(Bucket=bucket['Name']) 

1540 else: 

1541 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=json.dumps(p)) 

1542 return {'Name': bucket['Name'], 'State': 'PolicyRemoved', 'Statements': found} 

1543 

1544 

1545@actions.register('set-replication') 

1546class SetBucketReplicationConfig(BucketActionBase): 

1547 """Action to add or remove replication configuration statement from S3 buckets 

1548 

1549 :example: 

1550 

1551 .. code-block:: yaml 

1552 

1553 policies: 

1554 - name: s3-unapproved-account-replication 

1555 resource: s3 

1556 filters: 

1557 - type: value 

1558 key: Replication.ReplicationConfiguration.Rules[].Destination.Account 

1559 value: present 

1560 - type: value 

1561 key: Replication.ReplicationConfiguration.Rules[].Destination.Account 

1562 value_from: 

1563 url: 's3:///path/to/file.json' 

1564 format: json 

1565 expr: "approved_accounts.*" 

1566 op: ni 

1567 actions: 

1568 - type: set-replication 

1569 state: enable 

1570 """ 

1571 schema = type_schema( 

1572 'set-replication', 

1573 state={'type': 'string', 'enum': ['enable', 'disable', 'remove']}) 

1574 permissions = ("s3:GetReplicationConfiguration", "s3:PutReplicationConfiguration") 

1575 

1576 def process(self, buckets): 

1577 with self.executor_factory(max_workers=3) as w: 

1578 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1579 errors = [] 

1580 for future in as_completed(futures): 

1581 bucket = futures[future] 

1582 try: 

1583 future.result() 

1584 except ClientError as e: 

1585 errors.append("Message: %s Bucket: %s", e, bucket['Name']) 

1586 if errors: 

1587 raise Exception('\n'.join(map(str, errors))) 

1588 

1589 def process_bucket(self, bucket): 

1590 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1591 state = self.data.get('state') 

1592 if state is not None: 

1593 if state == 'remove': 

1594 s3.delete_bucket_replication(Bucket=bucket['Name']) 

1595 return {'Name': bucket['Name'], 'State': 'ReplicationConfigRemoved'} 

1596 if state in ('enable', 'disable'): 

1597 config = s3.get_bucket_replication(Bucket=bucket['Name']) 

1598 for rule in config['ReplicationConfiguration']['Rules']: 

1599 rule['Status'] = 'Enabled' if state == 'enable' else 'Disabled' 

1600 s3.put_bucket_replication( 

1601 Bucket=bucket['Name'], 

1602 ReplicationConfiguration=config['ReplicationConfiguration'] 

1603 ) 

1604 return {'Name': bucket['Name'], 'State': 'ReplicationConfigUpdated'} 

1605 

1606 

1607@filters.register('check-public-block') 

1608class FilterPublicBlock(Filter): 

1609 """Filter for s3 bucket public blocks 

1610 

1611 If no filter paramaters are provided it checks to see if any are unset or False. 

1612 

1613 If parameters are provided only the provided ones are checked. 

1614 

1615 :example: 

1616 

1617 .. code-block:: yaml 

1618 

1619 policies: 

1620 - name: CheckForPublicAclBlock-Off 

1621 resource: s3 

1622 region: us-east-1 

1623 filters: 

1624 - type: check-public-block 

1625 BlockPublicAcls: true 

1626 BlockPublicPolicy: true 

1627 """ 

1628 

1629 schema = type_schema( 

1630 'check-public-block', 

1631 BlockPublicAcls={'type': 'boolean'}, 

1632 IgnorePublicAcls={'type': 'boolean'}, 

1633 BlockPublicPolicy={'type': 'boolean'}, 

1634 RestrictPublicBuckets={'type': 'boolean'}) 

1635 permissions = ("s3:GetBucketPublicAccessBlock",) 

1636 keys = ( 

1637 'BlockPublicPolicy', 'BlockPublicAcls', 'IgnorePublicAcls', 'RestrictPublicBuckets') 

1638 annotation_key = 'c7n:PublicAccessBlock' 

1639 

1640 def process(self, buckets, event=None): 

1641 results = [] 

1642 with self.executor_factory(max_workers=2) as w: 

1643 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1644 for f in as_completed(futures): 

1645 if f.result(): 

1646 results.append(futures[f]) 

1647 return results 

1648 

1649 def process_bucket(self, bucket): 

1650 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1651 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys})) 

1652 if self.annotation_key not in bucket: 

1653 try: 

1654 config = s3.get_public_access_block( 

1655 Bucket=bucket['Name'])['PublicAccessBlockConfiguration'] 

1656 except ClientError as e: 

1657 error_code = e.response['Error']['Code'] 

1658 if error_code == 'NoSuchPublicAccessBlockConfiguration': 

1659 pass 

1660 elif error_code == 'AccessDenied': 

1661 # Follow the same logic as `assemble_bucket` - log and continue on access 

1662 # denied errors rather than halting a policy altogether 

1663 method = 'GetPublicAccessBlock' 

1664 log.warning( 

1665 "Bucket:%s unable to invoke method:%s error:%s ", 

1666 bucket['Name'], method, e.response['Error']['Message'] 

1667 ) 

1668 bucket.setdefault('c7n:DeniedMethods', []).append(method) 

1669 else: 

1670 raise 

1671 bucket[self.annotation_key] = config 

1672 return self.matches_filter(config) 

1673 

1674 def matches_filter(self, config): 

1675 key_set = [key for key in self.keys if key in self.data] 

1676 if key_set: 

1677 return all([self.data.get(key) is config[key] for key in key_set]) 

1678 else: 

1679 return not all(config.values()) 

1680 

1681 

1682@actions.register('set-public-block') 

1683class SetPublicBlock(BucketActionBase): 

1684 """Action to update Public Access blocks on S3 buckets 

1685 

1686 If no action parameters are provided all settings will be set to the `state`, which defaults 

1687 

1688 If action parameters are provided, those will be set and other extant values preserved. 

1689 

1690 :example: 

1691 

1692 .. code-block:: yaml 

1693 

1694 policies: 

1695 - name: s3-public-block-enable-all 

1696 resource: s3 

1697 filters: 

1698 - type: check-public-block 

1699 actions: 

1700 - type: set-public-block 

1701 

1702 policies: 

1703 - name: s3-public-block-disable-all 

1704 resource: s3 

1705 filters: 

1706 - type: check-public-block 

1707 actions: 

1708 - type: set-public-block 

1709 state: false 

1710 

1711 policies: 

1712 - name: s3-public-block-enable-some 

1713 resource: s3 

1714 filters: 

1715 - or: 

1716 - type: check-public-block 

1717 BlockPublicAcls: false 

1718 - type: check-public-block 

1719 BlockPublicPolicy: false 

1720 actions: 

1721 - type: set-public-block 

1722 BlockPublicAcls: true 

1723 BlockPublicPolicy: true 

1724 

1725 """ 

1726 

1727 schema = type_schema( 

1728 'set-public-block', 

1729 state={'type': 'boolean', 'default': True}, 

1730 BlockPublicAcls={'type': 'boolean'}, 

1731 IgnorePublicAcls={'type': 'boolean'}, 

1732 BlockPublicPolicy={'type': 'boolean'}, 

1733 RestrictPublicBuckets={'type': 'boolean'}) 

1734 permissions = ("s3:GetBucketPublicAccessBlock", "s3:PutBucketPublicAccessBlock") 

1735 keys = FilterPublicBlock.keys 

1736 annotation_key = FilterPublicBlock.annotation_key 

1737 

1738 def process(self, buckets): 

1739 with self.executor_factory(max_workers=3) as w: 

1740 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1741 for future in as_completed(futures): 

1742 future.result() 

1743 

1744 def process_bucket(self, bucket): 

1745 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1746 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys})) 

1747 if self.annotation_key not in bucket: 

1748 try: 

1749 config = s3.get_public_access_block( 

1750 Bucket=bucket['Name'])['PublicAccessBlockConfiguration'] 

1751 except ClientError as e: 

1752 if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration': 

1753 raise 

1754 

1755 key_set = [key for key in self.keys if key in self.data] 

1756 if key_set: 

1757 for key in key_set: 

1758 config[key] = self.data.get(key) 

1759 else: 

1760 for key in self.keys: 

1761 config[key] = self.data.get('state', True) 

1762 s3.put_public_access_block( 

1763 Bucket=bucket['Name'], PublicAccessBlockConfiguration=config) 

1764 

1765 

1766@actions.register('toggle-versioning') 

1767class ToggleVersioning(BucketActionBase): 

1768 """Action to enable/suspend versioning on a S3 bucket 

1769 

1770 Note versioning can never be disabled only suspended. 

1771 

1772 :example: 

1773 

1774 .. code-block:: yaml 

1775 

1776 policies: 

1777 - name: s3-enable-versioning 

1778 resource: s3 

1779 filters: 

1780 - or: 

1781 - type: value 

1782 key: Versioning.Status 

1783 value: Suspended 

1784 - type: value 

1785 key: Versioning.Status 

1786 value: absent 

1787 actions: 

1788 - type: toggle-versioning 

1789 enabled: true 

1790 """ 

1791 

1792 schema = type_schema( 

1793 'toggle-versioning', 

1794 enabled={'type': 'boolean'}) 

1795 permissions = ("s3:PutBucketVersioning",) 

1796 

1797 def process_versioning(self, resource, state): 

1798 client = bucket_client( 

1799 local_session(self.manager.session_factory), resource) 

1800 try: 

1801 client.put_bucket_versioning( 

1802 Bucket=resource['Name'], 

1803 VersioningConfiguration={ 

1804 'Status': state}) 

1805 except ClientError as e: 

1806 if e.response['Error']['Code'] != 'AccessDenied': 

1807 log.error( 

1808 "Unable to put bucket versioning on bucket %s: %s" % (resource['Name'], e)) 

1809 raise 

1810 log.warning( 

1811 "Access Denied Bucket:%s while put bucket versioning" % resource['Name']) 

1812 

1813 # mfa delete enablement looks like it needs the serial and a current token. 

1814 def process(self, resources): 

1815 enabled = self.data.get('enabled', True) 

1816 for r in resources: 

1817 if 'Versioning' not in r or not r['Versioning']: 

1818 r['Versioning'] = {'Status': 'Suspended'} 

1819 if enabled and ( 

1820 r['Versioning']['Status'] == 'Suspended'): 

1821 self.process_versioning(r, 'Enabled') 

1822 if not enabled and r['Versioning']['Status'] == 'Enabled': 

1823 self.process_versioning(r, 'Suspended') 

1824 

1825 

1826@actions.register('toggle-logging') 

1827class ToggleLogging(BucketActionBase): 

1828 """Action to enable/disable logging on a S3 bucket. 

1829 

1830 Target bucket ACL must allow for WRITE and READ_ACP Permissions 

1831 Not specifying a target_prefix will default to the current bucket name. 

1832 https://docs.aws.amazon.com/AmazonS3/latest/dev/enable-logging-programming.html 

1833 

1834 :example: 

1835 

1836 .. code-block:: yaml 

1837 

1838 policies: 

1839 - name: s3-enable-logging 

1840 resource: s3 

1841 filters: 

1842 - "tag:Testing": present 

1843 actions: 

1844 - type: toggle-logging 

1845 target_bucket: log-bucket 

1846 target_prefix: logs123/ 

1847 

1848 policies: 

1849 - name: s3-force-standard-logging 

1850 resource: s3 

1851 filters: 

1852 - type: bucket-logging 

1853 op: not-equal 

1854 target_bucket: "{account_id}-{region}-s3-logs" 

1855 target_prefix: "{account}/{source_bucket_name}/" 

1856 actions: 

1857 - type: toggle-logging 

1858 target_bucket: "{account_id}-{region}-s3-logs" 

1859 target_prefix: "{account}/{source_bucket_name}/" 

1860 """ 

1861 schema = type_schema( 

1862 'toggle-logging', 

1863 enabled={'type': 'boolean'}, 

1864 target_bucket={'type': 'string'}, 

1865 target_prefix={'type': 'string'}) 

1866 

1867 permissions = ("s3:PutBucketLogging", "iam:ListAccountAliases") 

1868 

1869 def validate(self): 

1870 if self.data.get('enabled', True): 

1871 if not self.data.get('target_bucket'): 

1872 raise PolicyValidationError( 

1873 "target_bucket must be specified on %s" % ( 

1874 self.manager.data,)) 

1875 return self 

1876 

1877 def process(self, resources): 

1878 session = local_session(self.manager.session_factory) 

1879 kwargs = { 

1880 "enabled": self.data.get('enabled', True), 

1881 "session": session, 

1882 "account_name": get_account_alias_from_sts(session), 

1883 } 

1884 

1885 return self._process_with_futures(resources, **kwargs) 

1886 

1887 def process_bucket(self, r, enabled=None, session=None, account_name=None): 

1888 client = bucket_client(session, r) 

1889 is_logging = bool(r.get('Logging')) 

1890 

1891 if enabled: 

1892 variables = self.get_std_format_args(r) 

1893 variables.update({ 

1894 'account': account_name, 

1895 'source_bucket_name': r['Name'], 

1896 'source_bucket_region': get_region(r), 

1897 'target_bucket_name': self.data.get('target_bucket'), 

1898 'target_prefix': self.data.get('target_prefix'), 

1899 }) 

1900 data = format_string_values(self.data, **variables) 

1901 config = { 

1902 'TargetBucket': data.get('target_bucket'), 

1903 'TargetPrefix': data.get('target_prefix', r['Name'] + '/') 

1904 } 

1905 if not is_logging or r.get('Logging') != config: 

1906 client.put_bucket_logging( 

1907 Bucket=r['Name'], 

1908 BucketLoggingStatus={'LoggingEnabled': config} 

1909 ) 

1910 r['Logging'] = config 

1911 

1912 elif not enabled and is_logging: 

1913 client.put_bucket_logging( 

1914 Bucket=r['Name'], BucketLoggingStatus={}) 

1915 r['Logging'] = {} 

1916 

1917 

1918@actions.register('attach-encrypt') 

1919class AttachLambdaEncrypt(BucketActionBase): 

1920 """Action attaches lambda encryption policy to S3 bucket 

1921 supports attachment via lambda bucket notification or sns notification 

1922 to invoke lambda. a special topic value of `default` will utilize an 

1923 extant notification or create one matching the bucket name. 

1924 

1925 :example: 

1926 

1927 

1928 .. code-block:: yaml 

1929 

1930 

1931 policies: 

1932 - name: attach-lambda-encrypt 

1933 resource: s3 

1934 filters: 

1935 - type: missing-policy-statement 

1936 actions: 

1937 - type: attach-encrypt 

1938 role: arn:aws:iam::123456789012:role/my-role 

1939 

1940 """ 

1941 schema = type_schema( 

1942 'attach-encrypt', 

1943 role={'type': 'string'}, 

1944 tags={'type': 'object'}, 

1945 topic={'type': 'string'}) 

1946 

1947 permissions = ( 

1948 "s3:PutBucketNotification", "s3:GetBucketNotification", 

1949 # lambda manager uses quite a few perms to provision lambdas 

1950 # and event sources, hard to disamgibuate punt for now. 

1951 "lambda:*", 

1952 ) 

1953 

1954 def __init__(self, data=None, manager=None): 

1955 self.data = data or {} 

1956 self.manager = manager 

1957 

1958 def validate(self): 

1959 if (not getattr(self.manager.config, 'dryrun', True) and 

1960 not self.data.get('role', self.manager.config.assume_role)): 

1961 raise PolicyValidationError( 

1962 "attach-encrypt: role must be specified either " 

1963 "via assume or in config on %s" % (self.manager.data,)) 

1964 

1965 return self 

1966 

1967 def process(self, buckets): 

1968 from c7n.mu import LambdaManager 

1969 from c7n.ufuncs.s3crypt import get_function 

1970 

1971 account_id = self.manager.config.account_id 

1972 topic_arn = self.data.get('topic') 

1973 

1974 func = get_function( 

1975 None, self.data.get('role', self.manager.config.assume_role), 

1976 account_id=account_id, tags=self.data.get('tags')) 

1977 

1978 regions = {get_region(b) for b in buckets} 

1979 

1980 # session managers by region 

1981 region_sessions = {} 

1982 for r in regions: 

1983 region_sessions[r] = functools.partial( 

1984 self.manager.session_factory, region=r) 

1985 

1986 # Publish function to all of our buckets regions 

1987 region_funcs = {} 

1988 

1989 for r in regions: 

1990 lambda_mgr = LambdaManager(region_sessions[r]) 

1991 lambda_mgr.publish(func) 

1992 region_funcs[r] = func 

1993 

1994 with self.executor_factory(max_workers=3) as w: 

1995 results = [] 

1996 futures = [] 

1997 for b in buckets: 

1998 region = get_region(b) 

1999 futures.append( 

2000 w.submit( 

2001 self.process_bucket, 

2002 region_funcs[region], 

2003 b, 

2004 topic_arn, 

2005 account_id, 

2006 region_sessions[region] 

2007 )) 

2008 for f in as_completed(futures): 

2009 if f.exception(): 

2010 log.exception( 

2011 "Error attaching lambda-encrypt %s" % (f.exception())) 

2012 results.append(f.result()) 

2013 return list(filter(None, results)) 

2014 

2015 def process_bucket(self, func, bucket, topic, account_id, session_factory): 

2016 from c7n.mu import BucketSNSNotification, BucketLambdaNotification 

2017 if topic: 

2018 topic = None if topic == 'default' else topic 

2019 source = BucketSNSNotification(session_factory, bucket, topic) 

2020 else: 

2021 source = BucketLambdaNotification( 

2022 {'account_s3': account_id}, session_factory, bucket) 

2023 return source.add(func, None) 

2024 

2025 

2026@actions.register('encryption-policy') 

2027class EncryptionRequiredPolicy(BucketActionBase): 

2028 """Action to apply an encryption policy to S3 buckets 

2029 

2030 

2031 :example: 

2032 

2033 .. code-block:: yaml 

2034 

2035 policies: 

2036 - name: s3-enforce-encryption 

2037 resource: s3 

2038 mode: 

2039 type: cloudtrail 

2040 events: 

2041 - CreateBucket 

2042 actions: 

2043 - encryption-policy 

2044 """ 

2045 

2046 permissions = ("s3:GetBucketPolicy", "s3:PutBucketPolicy") 

2047 schema = type_schema('encryption-policy') 

2048 

2049 def __init__(self, data=None, manager=None): 

2050 self.data = data or {} 

2051 self.manager = manager 

2052 

2053 def process(self, buckets): 

2054 with self.executor_factory(max_workers=3) as w: 

2055 results = w.map(self.process_bucket, buckets) 

2056 results = list(filter(None, list(results))) 

2057 return results 

2058 

2059 def process_bucket(self, b): 

2060 p = b['Policy'] 

2061 if p is None: 

2062 log.info("No policy found, creating new") 

2063 p = {'Version': "2012-10-17", "Statement": []} 

2064 else: 

2065 p = json.loads(p) 

2066 

2067 encryption_sid = "RequiredEncryptedPutObject" 

2068 encryption_statement = { 

2069 'Sid': encryption_sid, 

2070 'Effect': 'Deny', 

2071 'Principal': '*', 

2072 'Action': 's3:PutObject', 

2073 "Resource": "arn:aws:s3:::%s/*" % b['Name'], 

2074 "Condition": { 

2075 # AWS Managed Keys or KMS keys, note policy language 

2076 # does not support custom kms (todo add issue) 

2077 "StringNotEquals": { 

2078 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}} 

2079 

2080 statements = p.get('Statement', []) 

2081 for s in list(statements): 

2082 if s.get('Sid', '') == encryption_sid: 

2083 log.debug("Bucket:%s Found extant encrypt policy", b['Name']) 

2084 if s != encryption_statement: 

2085 log.info( 

2086 "Bucket:%s updating extant encrypt policy", b['Name']) 

2087 statements.remove(s) 

2088 else: 

2089 return 

2090 

2091 session = self.manager.session_factory() 

2092 s3 = bucket_client(session, b) 

2093 statements.append(encryption_statement) 

2094 p['Statement'] = statements 

2095 log.info('Bucket:%s attached encryption policy' % b['Name']) 

2096 

2097 try: 

2098 s3.put_bucket_policy( 

2099 Bucket=b['Name'], 

2100 Policy=json.dumps(p)) 

2101 except ClientError as e: 

2102 if e.response['Error']['Code'] == 'NoSuchBucket': 

2103 return 

2104 self.log.exception( 

2105 "Error on bucket:%s putting policy\n%s error:%s", 

2106 b['Name'], 

2107 json.dumps(statements, indent=2), e) 

2108 raise 

2109 return {'Name': b['Name'], 'State': 'PolicyAttached'} 

2110 

2111 

2112class BucketScanLog: 

2113 """Offload remediated key ids to a disk file in batches 

2114 

2115 A bucket keyspace is effectively infinite, we need to store partial 

2116 results out of memory, this class provides for a json log on disk 

2117 with partial write support. 

2118 

2119 json output format: 

2120 - [list_of_serialized_keys], 

2121 - [] # Empty list of keys at end when we close the buffer 

2122 

2123 """ 

2124 

2125 def __init__(self, log_dir, name): 

2126 self.log_dir = log_dir 

2127 self.name = name 

2128 self.fh = None 

2129 self.count = 0 

2130 

2131 @property 

2132 def path(self): 

2133 return os.path.join(self.log_dir, "%s.json" % self.name) 

2134 

2135 def __enter__(self): 

2136 # Don't require output directories 

2137 if self.log_dir is None: 

2138 return 

2139 

2140 self.fh = open(self.path, 'w') 

2141 self.fh.write("[\n") 

2142 return self 

2143 

2144 def __exit__(self, exc_type=None, exc_value=None, exc_frame=None): 

2145 if self.fh is None: 

2146 return 

2147 # we need an empty marker list at end to avoid trailing commas 

2148 self.fh.write("[]") 

2149 # and close the surrounding list 

2150 self.fh.write("\n]") 

2151 self.fh.close() 

2152 if not self.count: 

2153 os.remove(self.fh.name) 

2154 self.fh = None 

2155 return False 

2156 

2157 def add(self, keys): 

2158 self.count += len(keys) 

2159 if self.fh is None: 

2160 return 

2161 self.fh.write(dumps(keys)) 

2162 self.fh.write(",\n") 

2163 

2164 

2165class ScanBucket(BucketActionBase): 

2166 

2167 permissions = ("s3:ListBucket",) 

2168 

2169 bucket_ops = { 

2170 'standard': { 

2171 'iterator': 'list_objects', 

2172 'contents_key': ['Contents'], 

2173 'key_processor': 'process_key' 

2174 }, 

2175 'versioned': { 

2176 'iterator': 'list_object_versions', 

2177 'contents_key': ['Versions'], 

2178 'key_processor': 'process_version' 

2179 } 

2180 } 

2181 

2182 def __init__(self, data, manager=None): 

2183 super(ScanBucket, self).__init__(data, manager) 

2184 self.denied_buckets = set() 

2185 

2186 def get_bucket_style(self, b): 

2187 return ( 

2188 b.get('Versioning', {'Status': ''}).get('Status') in ( 

2189 'Enabled', 'Suspended') and 'versioned' or 'standard') 

2190 

2191 def get_bucket_op(self, b, op_name): 

2192 bucket_style = self.get_bucket_style(b) 

2193 op = self.bucket_ops[bucket_style][op_name] 

2194 if op_name == 'key_processor': 

2195 return getattr(self, op) 

2196 return op 

2197 

2198 def get_keys(self, b, key_set): 

2199 content_keys = self.get_bucket_op(b, 'contents_key') 

2200 keys = [] 

2201 for ck in content_keys: 

2202 keys.extend(key_set.get(ck, [])) 

2203 return keys 

2204 

2205 def process(self, buckets): 

2206 results = self._process_with_futures(self.process_bucket, buckets) 

2207 self.write_denied_buckets_file() 

2208 return results 

2209 

2210 def _process_with_futures(self, helper, buckets, max_workers=3): 

2211 results = [] 

2212 with self.executor_factory(max_workers) as w: 

2213 futures = {} 

2214 for b in buckets: 

2215 futures[w.submit(helper, b)] = b 

2216 for f in as_completed(futures): 

2217 if f.exception(): 

2218 b = futures[f] 

2219 self.log.error( 

2220 "Error on bucket:%s region:%s policy:%s error: %s", 

2221 b['Name'], b.get('Location', 'unknown'), 

2222 self.manager.data.get('name'), f.exception()) 

2223 self.denied_buckets.add(b['Name']) 

2224 continue 

2225 result = f.result() 

2226 if result: 

2227 results.append(result) 

2228 return results 

2229 

2230 def write_denied_buckets_file(self): 

2231 if (self.denied_buckets and 

2232 self.manager.ctx.log_dir and 

2233 not isinstance(self.manager.ctx.output, NullBlobOutput)): 

2234 with open( 

2235 os.path.join( 

2236 self.manager.ctx.log_dir, 'denied.json'), 'w') as fh: 

2237 json.dump(list(self.denied_buckets), fh, indent=2) 

2238 self.denied_buckets = set() 

2239 

2240 def process_bucket(self, b): 

2241 log.info( 

2242 "Scanning bucket:%s visitor:%s style:%s" % ( 

2243 b['Name'], self.__class__.__name__, self.get_bucket_style(b))) 

2244 

2245 s = self.manager.session_factory() 

2246 s3 = bucket_client(s, b) 

2247 

2248 # The bulk of _process_bucket function executes inline in 

2249 # calling thread/worker context, neither paginator nor 

2250 # bucketscan log should be used across worker boundary. 

2251 p = s3.get_paginator( 

2252 self.get_bucket_op(b, 'iterator')).paginate(Bucket=b['Name']) 

2253 

2254 with BucketScanLog(self.manager.ctx.log_dir, b['Name']) as key_log: 

2255 with self.executor_factory(max_workers=10) as w: 

2256 try: 

2257 return self._process_bucket(b, p, key_log, w) 

2258 except ClientError as e: 

2259 if e.response['Error']['Code'] == 'NoSuchBucket': 

2260 log.warning( 

2261 "Bucket:%s removed while scanning" % b['Name']) 

2262 return 

2263 if e.response['Error']['Code'] == 'AccessDenied': 

2264 log.warning( 

2265 "Access Denied Bucket:%s while scanning" % b['Name']) 

2266 self.denied_buckets.add(b['Name']) 

2267 return 

2268 log.exception( 

2269 "Error processing bucket:%s paginator:%s" % ( 

2270 b['Name'], p)) 

2271 

2272 __call__ = process_bucket 

2273 

2274 def _process_bucket(self, b, p, key_log, w): 

2275 count = 0 

2276 

2277 for key_set in p: 

2278 keys = self.get_keys(b, key_set) 

2279 count += len(keys) 

2280 futures = [] 

2281 

2282 for batch in chunks(keys, size=100): 

2283 if not batch: 

2284 continue 

2285 futures.append(w.submit(self.process_chunk, batch, b)) 

2286 

2287 for f in as_completed(futures): 

2288 if f.exception(): 

2289 log.exception("Exception Processing bucket:%s key batch %s" % ( 

2290 b['Name'], f.exception())) 

2291 continue 

2292 r = f.result() 

2293 if r: 

2294 key_log.add(r) 

2295 

2296 # Log completion at info level, progress at debug level 

2297 if key_set['IsTruncated']: 

2298 log.debug('Scan progress bucket:%s keys:%d remediated:%d ...', 

2299 b['Name'], count, key_log.count) 

2300 else: 

2301 log.info('Scan Complete bucket:%s keys:%d remediated:%d', 

2302 b['Name'], count, key_log.count) 

2303 

2304 b['KeyScanCount'] = count 

2305 b['KeyRemediated'] = key_log.count 

2306 return { 

2307 'Bucket': b['Name'], 'Remediated': key_log.count, 'Count': count} 

2308 

2309 def process_chunk(self, batch, bucket): 

2310 raise NotImplementedError() 

2311 

2312 def process_key(self, s3, key, bucket_name, info=None): 

2313 raise NotImplementedError() 

2314 

2315 def process_version(self, s3, bucket, key): 

2316 raise NotImplementedError() 

2317 

2318 

2319@actions.register('encrypt-keys') 

2320class EncryptExtantKeys(ScanBucket): 

2321 """Action to encrypt unencrypted S3 objects 

2322 

2323 :example: 

2324 

2325 .. code-block:: yaml 

2326 

2327 policies: 

2328 - name: s3-encrypt-objects 

2329 resource: s3 

2330 actions: 

2331 - type: encrypt-keys 

2332 crypto: aws:kms 

2333 key-id: 9c3983be-c6cf-11e6-9d9d-cec0c932ce01 

2334 """ 

2335 

2336 permissions = ( 

2337 "s3:GetObject", 

2338 "s3:PutObject", 

2339 "s3:DeleteObjectVersion", 

2340 "s3:RestoreObject", 

2341 ) + ScanBucket.permissions 

2342 

2343 schema = { 

2344 'type': 'object', 

2345 'additionalProperties': False, 

2346 'properties': { 

2347 'type': {'enum': ['encrypt-keys']}, 

2348 'report-only': {'type': 'boolean'}, 

2349 'glacier': {'type': 'boolean'}, 

2350 'large': {'type': 'boolean'}, 

2351 'crypto': {'enum': ['AES256', 'aws:kms']}, 

2352 'key-id': {'type': 'string'} 

2353 }, 

2354 'dependencies': { 

2355 'key-id': { 

2356 'properties': { 

2357 'crypto': {'pattern': 'aws:kms'} 

2358 }, 

2359 'required': ['crypto'] 

2360 } 

2361 } 

2362 } 

2363 

2364 metrics = [ 

2365 ('Total Keys', {'Scope': 'Account'}), 

2366 ('Unencrypted', {'Scope': 'Account'})] 

2367 

2368 def __init__(self, data, manager=None): 

2369 super(EncryptExtantKeys, self).__init__(data, manager) 

2370 self.kms_id = self.data.get('key-id') 

2371 

2372 def get_permissions(self): 

2373 perms = ("s3:GetObject", "s3:GetObjectVersion") 

2374 if self.data.get('report-only'): 

2375 perms += ('s3:DeleteObject', 's3:DeleteObjectVersion', 

2376 's3:PutObject', 

2377 's3:AbortMultipartUpload', 

2378 's3:ListBucket', 

2379 's3:ListBucketVersions') 

2380 return perms 

2381 

2382 def process(self, buckets): 

2383 

2384 t = time.time() 

2385 results = super(EncryptExtantKeys, self).process(buckets) 

2386 run_time = time.time() - t 

2387 remediated_count = object_count = 0 

2388 

2389 for r in results: 

2390 object_count += r['Count'] 

2391 remediated_count += r['Remediated'] 

2392 self.manager.ctx.metrics.put_metric( 

2393 "Unencrypted", r['Remediated'], "Count", Scope=r['Bucket'], 

2394 buffer=True) 

2395 

2396 self.manager.ctx.metrics.put_metric( 

2397 "Unencrypted", remediated_count, "Count", Scope="Account", 

2398 buffer=True 

2399 ) 

2400 self.manager.ctx.metrics.put_metric( 

2401 "Total Keys", object_count, "Count", Scope="Account", 

2402 buffer=True 

2403 ) 

2404 self.manager.ctx.metrics.flush() 

2405 

2406 log.info( 

2407 ("EncryptExtant Complete keys:%d " 

2408 "remediated:%d rate:%0.2f/s time:%0.2fs"), 

2409 object_count, 

2410 remediated_count, 

2411 float(object_count) / run_time if run_time else 0, 

2412 run_time) 

2413 return results 

2414 

2415 def process_chunk(self, batch, bucket): 

2416 crypto_method = self.data.get('crypto', 'AES256') 

2417 s3 = bucket_client( 

2418 local_session(self.manager.session_factory), bucket, 

2419 kms=(crypto_method == 'aws:kms')) 

2420 b = bucket['Name'] 

2421 results = [] 

2422 key_processor = self.get_bucket_op(bucket, 'key_processor') 

2423 for key in batch: 

2424 r = key_processor(s3, key, b) 

2425 if r: 

2426 results.append(r) 

2427 return results 

2428 

2429 def process_key(self, s3, key, bucket_name, info=None): 

2430 k = key['Key'] 

2431 if info is None: 

2432 info = s3.head_object(Bucket=bucket_name, Key=k) 

2433 

2434 # If the data is already encrypted with AES256 and this request is also 

2435 # for AES256 then we don't need to do anything 

2436 if info.get('ServerSideEncryption') == 'AES256' and not self.kms_id: 

2437 return False 

2438 

2439 if info.get('ServerSideEncryption') == 'aws:kms': 

2440 # If we're not looking for a specific key any key will do. 

2441 if not self.kms_id: 

2442 return False 

2443 # If we're configured to use a specific key and the key matches 

2444 # note this is not a strict equality match. 

2445 if self.kms_id in info.get('SSEKMSKeyId', ''): 

2446 return False 

2447 

2448 if self.data.get('report-only'): 

2449 return k 

2450 

2451 storage_class = info.get('StorageClass', 'STANDARD') 

2452 

2453 if storage_class == 'GLACIER': 

2454 if not self.data.get('glacier'): 

2455 return False 

2456 if 'Restore' not in info: 

2457 # This takes multiple hours, we let the next c7n 

2458 # run take care of followups. 

2459 s3.restore_object( 

2460 Bucket=bucket_name, 

2461 Key=k, 

2462 RestoreRequest={'Days': 30}) 

2463 return False 

2464 elif not restore_complete(info['Restore']): 

2465 return False 

2466 

2467 storage_class = 'STANDARD' 

2468 

2469 crypto_method = self.data.get('crypto', 'AES256') 

2470 key_id = self.data.get('key-id') 

2471 # Note on copy we lose individual object acl grants 

2472 params = {'Bucket': bucket_name, 

2473 'Key': k, 

2474 'CopySource': "/%s/%s" % (bucket_name, k), 

2475 'MetadataDirective': 'COPY', 

2476 'StorageClass': storage_class, 

2477 'ServerSideEncryption': crypto_method} 

2478 

2479 if key_id and crypto_method == 'aws:kms': 

2480 params['SSEKMSKeyId'] = key_id 

2481 

2482 if info['ContentLength'] > MAX_COPY_SIZE and self.data.get( 

2483 'large', True): 

2484 return self.process_large_file(s3, bucket_name, key, info, params) 

2485 

2486 s3.copy_object(**params) 

2487 return k 

2488 

2489 def process_version(self, s3, key, bucket_name): 

2490 info = s3.head_object( 

2491 Bucket=bucket_name, 

2492 Key=key['Key'], 

2493 VersionId=key['VersionId']) 

2494 

2495 if 'ServerSideEncryption' in info: 

2496 return False 

2497 

2498 if self.data.get('report-only'): 

2499 return key['Key'], key['VersionId'] 

2500 

2501 if key['IsLatest']: 

2502 r = self.process_key(s3, key, bucket_name, info) 

2503 # Glacier request processing, wait till we have the restored object 

2504 if not r: 

2505 return r 

2506 s3.delete_object( 

2507 Bucket=bucket_name, 

2508 Key=key['Key'], 

2509 VersionId=key['VersionId']) 

2510 return key['Key'], key['VersionId'] 

2511 

2512 def process_large_file(self, s3, bucket_name, key, info, params): 

2513 """For objects over 5gb, use multipart upload to copy""" 

2514 part_size = MAX_COPY_SIZE - (1024 ** 2) 

2515 num_parts = int(math.ceil(info['ContentLength'] / part_size)) 

2516 source = params.pop('CopySource') 

2517 

2518 params.pop('MetadataDirective') 

2519 if 'Metadata' in info: 

2520 params['Metadata'] = info['Metadata'] 

2521 

2522 upload_id = s3.create_multipart_upload(**params)['UploadId'] 

2523 

2524 params = {'Bucket': bucket_name, 

2525 'Key': key['Key'], 

2526 'UploadId': upload_id, 

2527 'CopySource': source, 

2528 'CopySourceIfMatch': info['ETag']} 

2529 

2530 def upload_part(part_num): 

2531 part_params = dict(params) 

2532 part_params['CopySourceRange'] = "bytes=%d-%d" % ( 

2533 part_size * (part_num - 1), 

2534 min(part_size * part_num - 1, info['ContentLength'] - 1)) 

2535 part_params['PartNumber'] = part_num 

2536 response = s3.upload_part_copy(**part_params) 

2537 return {'ETag': response['CopyPartResult']['ETag'], 

2538 'PartNumber': part_num} 

2539 

2540 try: 

2541 with self.executor_factory(max_workers=2) as w: 

2542 parts = list(w.map(upload_part, range(1, num_parts + 1))) 

2543 except Exception: 

2544 log.warning( 

2545 "Error during large key copy bucket: %s key: %s, " 

2546 "aborting upload", bucket_name, key, exc_info=True) 

2547 s3.abort_multipart_upload( 

2548 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id) 

2549 raise 

2550 s3.complete_multipart_upload( 

2551 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id, 

2552 MultipartUpload={'Parts': parts}) 

2553 return key['Key'] 

2554 

2555 

2556def restore_complete(restore): 

2557 if ',' in restore: 

2558 ongoing, _ = restore.split(',', 1) 

2559 else: 

2560 ongoing = restore 

2561 return 'false' in ongoing 

2562 

2563 

2564@filters.register('is-log-target') 

2565class LogTarget(Filter): 

2566 """Filter and return buckets are log destinations. 

2567 

2568 Not suitable for use in lambda on large accounts, This is a api 

2569 heavy process to detect scan all possible log sources. 

2570 

2571 Sources: 

2572 - elb (Access Log) 

2573 - s3 (Access Log) 

2574 - cfn (Template writes) 

2575 - cloudtrail 

2576 

2577 :example: 

2578 

2579 .. code-block:: yaml 

2580 

2581 policies: 

2582 - name: s3-log-bucket 

2583 resource: s3 

2584 filters: 

2585 - type: is-log-target 

2586 """ 

2587 

2588 schema = type_schema( 

2589 'is-log-target', 

2590 services={'type': 'array', 'items': {'enum': [ 

2591 's3', 'elb', 'cloudtrail']}}, 

2592 self={'type': 'boolean'}, 

2593 value={'type': 'boolean'}) 

2594 

2595 def get_permissions(self): 

2596 perms = self.manager.get_resource_manager('elb').get_permissions() 

2597 perms += ('elasticloadbalancing:DescribeLoadBalancerAttributes',) 

2598 return perms 

2599 

2600 def process(self, buckets, event=None): 

2601 log_buckets = set() 

2602 count = 0 

2603 

2604 services = self.data.get('services', ['elb', 's3', 'cloudtrail']) 

2605 self_log = self.data.get('self', False) 

2606 

2607 if 'elb' in services and not self_log: 

2608 for bucket, _ in self.get_elb_bucket_locations(): 

2609 log_buckets.add(bucket) 

2610 count += 1 

2611 self.log.debug("Found %d elb log targets" % count) 

2612 

2613 if 's3' in services: 

2614 count = 0 

2615 for bucket, _ in self.get_s3_bucket_locations(buckets, self_log): 

2616 count += 1 

2617 log_buckets.add(bucket) 

2618 self.log.debug('Found %d s3 log targets' % count) 

2619 

2620 if 'cloudtrail' in services and not self_log: 

2621 for bucket, _ in self.get_cloud_trail_locations(buckets): 

2622 log_buckets.add(bucket) 

2623 

2624 self.log.info("Found %d log targets for %d buckets" % ( 

2625 len(log_buckets), len(buckets))) 

2626 if self.data.get('value', True): 

2627 return [b for b in buckets if b['Name'] in log_buckets] 

2628 else: 

2629 return [b for b in buckets if b['Name'] not in log_buckets] 

2630 

2631 @staticmethod 

2632 def get_s3_bucket_locations(buckets, self_log=False): 

2633 """return (bucket_name, prefix) for all s3 logging targets""" 

2634 for b in buckets: 

2635 if b.get('Logging'): 

2636 if self_log: 

2637 if b['Name'] != b['Logging']['TargetBucket']: 

2638 continue 

2639 yield (b['Logging']['TargetBucket'], 

2640 b['Logging']['TargetPrefix']) 

2641 if not self_log and b['Name'].startswith('cf-templates-'): 

2642 yield (b['Name'], '') 

2643 

2644 def get_cloud_trail_locations(self, buckets): 

2645 session = local_session(self.manager.session_factory) 

2646 client = session.client('cloudtrail') 

2647 names = {b['Name'] for b in buckets} 

2648 for t in client.describe_trails().get('trailList', ()): 

2649 if t.get('S3BucketName') in names: 

2650 yield (t['S3BucketName'], t.get('S3KeyPrefix', '')) 

2651 

2652 def get_elb_bucket_locations(self): 

2653 elbs = self.manager.get_resource_manager('elb').resources() 

2654 get_elb_attrs = functools.partial( 

2655 _query_elb_attrs, self.manager.session_factory) 

2656 

2657 with self.executor_factory(max_workers=2) as w: 

2658 futures = [] 

2659 for elb_set in chunks(elbs, 100): 

2660 futures.append(w.submit(get_elb_attrs, elb_set)) 

2661 for f in as_completed(futures): 

2662 if f.exception(): 

2663 log.error("Error while scanning elb log targets: %s" % ( 

2664 f.exception())) 

2665 continue 

2666 for tgt in f.result(): 

2667 yield tgt 

2668 

2669 

2670def _query_elb_attrs(session_factory, elb_set): 

2671 session = local_session(session_factory) 

2672 client = session.client('elb') 

2673 log_targets = [] 

2674 for e in elb_set: 

2675 try: 

2676 attrs = client.describe_load_balancer_attributes( 

2677 LoadBalancerName=e['LoadBalancerName'])[ 

2678 'LoadBalancerAttributes'] 

2679 if 'AccessLog' in attrs and attrs['AccessLog']['Enabled']: 

2680 log_targets.append(( 

2681 attrs['AccessLog']['S3BucketName'], 

2682 attrs['AccessLog']['S3BucketPrefix'])) 

2683 except Exception as err: 

2684 log.warning( 

2685 "Could not retrieve load balancer %s: %s" % ( 

2686 e['LoadBalancerName'], err)) 

2687 return log_targets 

2688 

2689 

2690@actions.register('remove-website-hosting') 

2691class RemoveWebsiteHosting(BucketActionBase): 

2692 """Action that removes website hosting configuration.""" 

2693 

2694 schema = type_schema('remove-website-hosting') 

2695 

2696 permissions = ('s3:DeleteBucketWebsite',) 

2697 

2698 def process(self, buckets): 

2699 session = local_session(self.manager.session_factory) 

2700 for bucket in buckets: 

2701 client = bucket_client(session, bucket) 

2702 client.delete_bucket_website(Bucket=bucket['Name']) 

2703 

2704 

2705@actions.register('delete-global-grants') 

2706class DeleteGlobalGrants(BucketActionBase): 

2707 """Deletes global grants associated to a S3 bucket 

2708 

2709 :example: 

2710 

2711 .. code-block:: yaml 

2712 

2713 policies: 

2714 - name: s3-delete-global-grants 

2715 resource: s3 

2716 filters: 

2717 - type: global-grants 

2718 actions: 

2719 - delete-global-grants 

2720 """ 

2721 

2722 schema = type_schema( 

2723 'delete-global-grants', 

2724 grantees={'type': 'array', 'items': {'type': 'string'}}) 

2725 

2726 permissions = ('s3:PutBucketAcl',) 

2727 

2728 def process(self, buckets): 

2729 with self.executor_factory(max_workers=5) as w: 

2730 return list(filter(None, list(w.map(self.process_bucket, buckets)))) 

2731 

2732 def process_bucket(self, b): 

2733 grantees = self.data.get( 

2734 'grantees', [ 

2735 GlobalGrantsFilter.AUTH_ALL, GlobalGrantsFilter.GLOBAL_ALL]) 

2736 

2737 log.info(b) 

2738 

2739 acl = b.get('Acl', {'Grants': []}) 

2740 if not acl or not acl['Grants']: 

2741 return 

2742 new_grants = [] 

2743 for grant in acl['Grants']: 

2744 grantee = grant.get('Grantee', {}) 

2745 if not grantee: 

2746 continue 

2747 # Yuck, 'get_bucket_acl' doesn't return the grantee type. 

2748 if 'URI' in grantee: 

2749 grantee['Type'] = 'Group' 

2750 else: 

2751 grantee['Type'] = 'CanonicalUser' 

2752 if ('URI' in grantee and 

2753 grantee['URI'] in grantees and not 

2754 (grant['Permission'] == 'READ' and b['Website'])): 

2755 # Remove this grantee. 

2756 pass 

2757 else: 

2758 new_grants.append(grant) 

2759 

2760 log.info({'Owner': acl['Owner'], 'Grants': new_grants}) 

2761 

2762 c = bucket_client(self.manager.session_factory(), b) 

2763 try: 

2764 c.put_bucket_acl( 

2765 Bucket=b['Name'], 

2766 AccessControlPolicy={ 

2767 'Owner': acl['Owner'], 'Grants': new_grants}) 

2768 except ClientError as e: 

2769 if e.response['Error']['Code'] == 'NoSuchBucket': 

2770 return 

2771 return b 

2772 

2773 

2774@actions.register('tag') 

2775class BucketTag(Tag): 

2776 """Action to create tags on a S3 bucket 

2777 

2778 :example: 

2779 

2780 .. code-block:: yaml 

2781 

2782 policies: 

2783 - name: s3-tag-region 

2784 resource: s3 

2785 region: us-east-1 

2786 filters: 

2787 - "tag:RegionName": absent 

2788 actions: 

2789 - type: tag 

2790 key: RegionName 

2791 value: us-east-1 

2792 """ 

2793 

2794 def process_resource_set(self, client, resource_set, tags): 

2795 modify_bucket_tags(self.manager.session_factory, resource_set, tags) 

2796 

2797 

2798@actions.register('mark-for-op') 

2799class MarkBucketForOp(TagDelayedAction): 

2800 """Action schedules custodian to perform an action at a certain date 

2801 

2802 :example: 

2803 

2804 .. code-block:: yaml 

2805 

2806 policies: 

2807 - name: s3-encrypt 

2808 resource: s3 

2809 filters: 

2810 - type: missing-statement 

2811 statement_ids: 

2812 - RequiredEncryptedPutObject 

2813 actions: 

2814 - type: mark-for-op 

2815 op: attach-encrypt 

2816 days: 7 

2817 """ 

2818 

2819 schema = type_schema( 

2820 'mark-for-op', rinherit=TagDelayedAction.schema) 

2821 

2822 

2823@actions.register('unmark') 

2824@actions.register('remove-tag') 

2825class RemoveBucketTag(RemoveTag): 

2826 """Removes tag/tags from a S3 object 

2827 

2828 :example: 

2829 

2830 .. code-block:: yaml 

2831 

2832 policies: 

2833 - name: s3-remove-owner-tag 

2834 resource: s3 

2835 filters: 

2836 - "tag:BucketOwner": present 

2837 actions: 

2838 - type: remove-tag 

2839 tags: ['BucketOwner'] 

2840 """ 

2841 

2842 def process_resource_set(self, client, resource_set, tags): 

2843 modify_bucket_tags( 

2844 self.manager.session_factory, resource_set, remove_tags=tags) 

2845 

2846 

2847@filters.register('data-events') 

2848class DataEvents(Filter): 

2849 """Find buckets for which CloudTrail is logging data events. 

2850 

2851 Note that this filter only examines trails that are defined in the 

2852 current account. 

2853 """ 

2854 

2855 schema = type_schema('data-events', state={'enum': ['present', 'absent']}) 

2856 permissions = ( 

2857 'cloudtrail:DescribeTrails', 

2858 'cloudtrail:GetEventSelectors') 

2859 

2860 def get_event_buckets(self, session, trails): 

2861 """Return a mapping of bucket name to cloudtrail. 

2862 

2863 For wildcard trails the bucket name is ''. 

2864 """ 

2865 regions = {t.get('HomeRegion') for t in trails} 

2866 clients = {} 

2867 for region in regions: 

2868 clients[region] = session.client('cloudtrail', region_name=region) 

2869 

2870 event_buckets = {} 

2871 for t in trails: 

2872 for events in clients[t.get('HomeRegion')].get_event_selectors( 

2873 TrailName=t['Name']).get('EventSelectors', ()): 

2874 if 'DataResources' not in events: 

2875 continue 

2876 for data_events in events['DataResources']: 

2877 if data_events['Type'] != 'AWS::S3::Object': 

2878 continue 

2879 for b in data_events['Values']: 

2880 event_buckets[b.rsplit(':')[-1].strip('/')] = t['Name'] 

2881 return event_buckets 

2882 

2883 def process(self, resources, event=None): 

2884 trails = self.manager.get_resource_manager('cloudtrail').resources() 

2885 local_trails = self.filter_resources( 

2886 trails, 

2887 "split(':', TrailARN)[4]", (self.manager.account_id,) 

2888 ) 

2889 session = local_session(self.manager.session_factory) 

2890 event_buckets = self.get_event_buckets(session, local_trails) 

2891 ops = { 

2892 'present': lambda x: ( 

2893 x['Name'] in event_buckets or '' in event_buckets), 

2894 'absent': ( 

2895 lambda x: x['Name'] not in event_buckets and '' 

2896 not in event_buckets)} 

2897 

2898 op = ops[self.data.get('state', 'present')] 

2899 results = [] 

2900 for b in resources: 

2901 if op(b): 

2902 results.append(b) 

2903 return results 

2904 

2905 

2906@filters.register('inventory') 

2907class Inventory(ValueFilter): 

2908 """Filter inventories for a bucket""" 

2909 schema = type_schema('inventory', rinherit=ValueFilter.schema) 

2910 schema_alias = False 

2911 permissions = ('s3:GetInventoryConfiguration',) 

2912 

2913 def process(self, buckets, event=None): 

2914 results = [] 

2915 with self.executor_factory(max_workers=2) as w: 

2916 futures = {} 

2917 for b in buckets: 

2918 futures[w.submit(self.process_bucket, b)] = b 

2919 

2920 for f in as_completed(futures): 

2921 b = futures[f] 

2922 if f.exception(): 

2923 b.setdefault('c7n:DeniedMethods', []).append('GetInventoryConfiguration') 

2924 self.log.error( 

2925 "Error processing bucket: %s error: %s", 

2926 b['Name'], f.exception()) 

2927 continue 

2928 if f.result(): 

2929 results.append(b) 

2930 return results 

2931 

2932 def process_bucket(self, b): 

2933 if 'c7n:inventories' not in b: 

2934 client = bucket_client(local_session(self.manager.session_factory), b) 

2935 inventories = client.list_bucket_inventory_configurations( 

2936 Bucket=b['Name']).get('InventoryConfigurationList', []) 

2937 b['c7n:inventories'] = inventories 

2938 

2939 for i in b['c7n:inventories']: 

2940 if self.match(i): 

2941 return True 

2942 

2943 

2944@actions.register('set-inventory') 

2945class SetInventory(BucketActionBase): 

2946 """Configure bucket inventories for an s3 bucket. 

2947 """ 

2948 schema = type_schema( 

2949 'set-inventory', 

2950 required=['name', 'destination'], 

2951 state={'enum': ['enabled', 'disabled', 'absent']}, 

2952 name={'type': 'string', 'description': 'Name of inventory'}, 

2953 destination={'type': 'string', 'description': 'Name of destination bucket'}, 

2954 prefix={'type': 'string', 'description': 'Destination prefix'}, 

2955 encryption={'enum': ['SSES3', 'SSEKMS']}, 

2956 key_id={'type': 'string', 'description': 'Optional Customer KMS KeyId for SSE-KMS'}, 

2957 versions={'enum': ['All', 'Current']}, 

2958 schedule={'enum': ['Daily', 'Weekly']}, 

2959 format={'enum': ['CSV', 'ORC', 'Parquet']}, 

2960 fields={'type': 'array', 'items': {'enum': [ 

2961 'Size', 'LastModifiedDate', 'StorageClass', 'ETag', 

2962 'IsMultipartUploaded', 'ReplicationStatus', 'EncryptionStatus', 

2963 'ObjectLockRetainUntilDate', 'ObjectLockMode', 'ObjectLockLegalHoldStatus', 

2964 'IntelligentTieringAccessTier', 'BucketKeyStatus', 'ChecksumAlgorithm', 

2965 'ObjectAccessControlList', 'ObjectOwner']}}) 

2966 

2967 permissions = ('s3:PutInventoryConfiguration', 's3:GetInventoryConfiguration') 

2968 

2969 def process(self, buckets): 

2970 with self.executor_factory(max_workers=2) as w: 

2971 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

2972 for future in as_completed(futures): 

2973 bucket = futures[future] 

2974 try: 

2975 future.result() 

2976 except Exception as e: 

2977 self.log.error('Message: %s Bucket: %s', e, bucket['Name']) 

2978 

2979 def process_bucket(self, b): 

2980 inventory_name = self.data.get('name') 

2981 destination = self.data.get('destination') 

2982 prefix = self.data.get('prefix', '') 

2983 schedule = self.data.get('schedule', 'Daily') 

2984 fields = self.data.get('fields', ['LastModifiedDate', 'Size']) 

2985 versions = self.data.get('versions', 'Current') 

2986 state = self.data.get('state', 'enabled') 

2987 encryption = self.data.get('encryption') 

2988 inventory_format = self.data.get('format', 'CSV') 

2989 

2990 if not prefix: 

2991 prefix = "Inventories/%s" % (self.manager.config.account_id) 

2992 

2993 client = bucket_client(local_session(self.manager.session_factory), b) 

2994 if state == 'absent': 

2995 try: 

2996 client.delete_bucket_inventory_configuration( 

2997 Bucket=b['Name'], Id=inventory_name) 

2998 except ClientError as e: 

2999 if e.response['Error']['Code'] != 'NoSuchConfiguration': 

3000 raise 

3001 return 

3002 

3003 bucket = { 

3004 'Bucket': "arn:aws:s3:::%s" % destination, 

3005 'Format': inventory_format 

3006 } 

3007 

3008 inventory = { 

3009 'Destination': { 

3010 'S3BucketDestination': bucket 

3011 }, 

3012 'IsEnabled': state == 'enabled' and True or False, 

3013 'Id': inventory_name, 

3014 'OptionalFields': fields, 

3015 'IncludedObjectVersions': versions, 

3016 'Schedule': { 

3017 'Frequency': schedule 

3018 } 

3019 } 

3020 

3021 if prefix: 

3022 bucket['Prefix'] = prefix 

3023 

3024 if encryption: 

3025 bucket['Encryption'] = {encryption: {}} 

3026 if encryption == 'SSEKMS' and self.data.get('key_id'): 

3027 bucket['Encryption'] = {encryption: { 

3028 'KeyId': self.data['key_id'] 

3029 }} 

3030 

3031 found = self.get_inventory_delta(client, inventory, b) 

3032 if found: 

3033 return 

3034 if found is False: 

3035 self.log.debug("updating bucket:%s inventory configuration id:%s", 

3036 b['Name'], inventory_name) 

3037 client.put_bucket_inventory_configuration( 

3038 Bucket=b['Name'], Id=inventory_name, InventoryConfiguration=inventory) 

3039 

3040 def get_inventory_delta(self, client, inventory, b): 

3041 inventories = client.list_bucket_inventory_configurations(Bucket=b['Name']) 

3042 found = None 

3043 for i in inventories.get('InventoryConfigurationList', []): 

3044 if i['Id'] != inventory['Id']: 

3045 continue 

3046 found = True 

3047 for k, v in inventory.items(): 

3048 if k not in i: 

3049 found = False 

3050 continue 

3051 if isinstance(v, list): 

3052 v.sort() 

3053 i[k].sort() 

3054 if i[k] != v: 

3055 found = False 

3056 return found 

3057 

3058 

3059@filters.register('intelligent-tiering') 

3060class IntelligentTiering(ListItemFilter): 

3061 """Filter for S3 buckets to look at intelligent tiering configurations 

3062 

3063 The schema to supply to the attrs follows the schema here: 

3064 https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_bucket_intelligent_tiering_configurations.html 

3065 

3066 :example: 

3067 

3068 .. code-block:: yaml 

3069 

3070 policies: 

3071 - name: s3-intelligent-tiering-configuration 

3072 resource: s3 

3073 filters: 

3074 - type: intelligent-tiering 

3075 attrs: 

3076 - Status: Enabled 

3077 - Filter: 

3078 And: 

3079 Prefix: test 

3080 Tags: 

3081 - Key: Owner 

3082 Value: c7n 

3083 - Tierings: 

3084 - Days: 100 

3085 - AccessTier: ARCHIVE_ACCESS 

3086 

3087 """ 

3088 schema = type_schema( 

3089 'intelligent-tiering', 

3090 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'}, 

3091 count={'type': 'number'}, 

3092 count_op={'$ref': '#/definitions/filters_common/comparison_operators'} 

3093 ) 

3094 permissions = ('s3:GetIntelligentTieringConfiguration',) 

3095 annotation_key = "c7n:IntelligentTiering" 

3096 annotate_items = True 

3097 

3098 def __init__(self, data, manager=None): 

3099 super().__init__(data, manager) 

3100 self.data['key'] = self.annotation_key 

3101 

3102 def process(self, buckets, event=None): 

3103 with self.executor_factory(max_workers=2) as w: 

3104 futures = {w.submit(self.get_item_values, b): b for b in buckets} 

3105 for future in as_completed(futures): 

3106 b = futures[future] 

3107 if future.exception(): 

3108 self.log.error("Message: %s Bucket: %s", future.exception(), b['Name']) 

3109 continue 

3110 return super().process(buckets, event) 

3111 

3112 def get_item_values(self, b): 

3113 if self.annotation_key not in b: 

3114 client = bucket_client(local_session(self.manager.session_factory), b) 

3115 try: 

3116 int_tier_config = client.list_bucket_intelligent_tiering_configurations( 

3117 Bucket=b['Name']) 

3118 b[self.annotation_key] = int_tier_config.get( 

3119 'IntelligentTieringConfigurationList', []) 

3120 except ClientError as e: 

3121 if e.response['Error']['Code'] == 'AccessDenied': 

3122 method = 'list_bucket_intelligent_tiering_configurations' 

3123 log.warning( 

3124 "Bucket:%s unable to invoke method:%s error:%s ", 

3125 b['Name'], method, e.response['Error']['Message']) 

3126 b.setdefault('c7n:DeniedMethods', []).append(method) 

3127 return b.get(self.annotation_key) 

3128 

3129 

3130@actions.register('set-intelligent-tiering') 

3131class ConfigureIntelligentTiering(BucketActionBase): 

3132 """Action applies an intelligent tiering configuration to a S3 bucket 

3133 

3134 The schema to supply to the configuration follows the schema here: 

3135 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_bucket_intelligent_tiering_configuration.html 

3136 

3137 To delete a configuration, supply Status=delete with the either the Id or Id: matched 

3138 

3139 :example: 

3140 

3141 .. code-block:: yaml 

3142 

3143 policies: 

3144 - name: s3-apply-intelligent-tiering-config 

3145 resource: aws.s3 

3146 filters: 

3147 - not: 

3148 - type: intelligent-tiering 

3149 attrs: 

3150 - Status: Enabled 

3151 - Filter: 

3152 And: 

3153 Prefix: helloworld 

3154 Tags: 

3155 - Key: Hello 

3156 Value: World 

3157 - Tierings: 

3158 - Days: 123 

3159 AccessTier: ARCHIVE_ACCESS 

3160 actions: 

3161 - type: set-intelligent-tiering 

3162 Id: c7n-default 

3163 IntelligentTieringConfiguration: 

3164 Id: c7n-default 

3165 Status: Enabled 

3166 Tierings: 

3167 - Days: 149 

3168 AccessTier: ARCHIVE_ACCESS 

3169 

3170 - name: s3-delete-intelligent-tiering-configuration 

3171 resource: aws.s3 

3172 filters: 

3173 - type: intelligent-tiering 

3174 attrs: 

3175 - Status: Enabled 

3176 - Id: test-config 

3177 actions: 

3178 - type: set-intelligent-tiering 

3179 Id: test-config 

3180 State: delete 

3181 

3182 - name: s3-delete-intelligent-tiering-matched-configs 

3183 resource: aws.s3 

3184 filters: 

3185 - type: intelligent-tiering 

3186 attrs: 

3187 - Status: Enabled 

3188 - Id: test-config 

3189 actions: 

3190 - type: set-intelligent-tiering 

3191 Id: matched 

3192 State: delete 

3193 

3194 """ 

3195 

3196 annotation_key = 'c7n:ListItemMatches' 

3197 shape = 'PutBucketIntelligentTieringConfigurationRequest' 

3198 schema = { 

3199 'type': 'object', 

3200 'additionalProperties': False, 

3201 'oneOf': [ 

3202 {'required': ['type', 'Id', 'IntelligentTieringConfiguration']}, 

3203 {'required': ['type', 'Id', 'State']}], 

3204 'properties': { 

3205 'type': {'enum': ['set-intelligent-tiering']}, 

3206 'Id': {'type': 'string'}, 

3207 # delete intelligent tier configurations via state: delete 

3208 'State': {'type': 'string', 'enum': ['delete']}, 

3209 'IntelligentTieringConfiguration': {'type': 'object'} 

3210 }, 

3211 } 

3212 

3213 permissions = ('s3:PutIntelligentTieringConfiguration',) 

3214 

3215 def validate(self): 

3216 # You can have up to 1,000 S3 Intelligent-Tiering configurations per bucket. 

3217 # Hence, always use it with a filter 

3218 found = False 

3219 for f in self.manager.iter_filters(): 

3220 if isinstance(f, IntelligentTiering): 

3221 found = True 

3222 break 

3223 if not found: 

3224 raise PolicyValidationError( 

3225 '`set-intelligent-tiering` may only be used in ' 

3226 'conjunction with `intelligent-tiering` filter on %s' % (self.manager.data,)) 

3227 cfg = dict(self.data) 

3228 if 'IntelligentTieringConfiguration' in cfg: 

3229 cfg['Bucket'] = 'bucket' 

3230 cfg.pop('type') 

3231 return shape_validate( 

3232 cfg, self.shape, self.manager.resource_type.service) 

3233 

3234 def process(self, buckets): 

3235 with self.executor_factory(max_workers=3) as w: 

3236 futures = {} 

3237 

3238 for b in buckets: 

3239 futures[w.submit(self.process_bucket, b)] = b 

3240 

3241 for future in as_completed(futures): 

3242 if future.exception(): 

3243 bucket = futures[future] 

3244 self.log.error( 

3245 'error modifying bucket intelligent tiering configuration: %s\n%s', 

3246 bucket['Name'], future.exception()) 

3247 continue 

3248 

3249 def process_bucket(self, bucket): 

3250 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3251 

3252 if 'list_bucket_intelligent_tiering_configurations' in bucket.get( 

3253 'c7n:DeniedMethods', []): 

3254 log.warning("Access Denied Bucket:%s while reading intelligent tiering configurations" 

3255 % bucket['Name']) 

3256 return 

3257 

3258 if self.data.get('Id') and self.data.get('IntelligentTieringConfiguration'): 

3259 try: 

3260 s3.put_bucket_intelligent_tiering_configuration( 

3261 Bucket=bucket['Name'], Id=self.data.get( 

3262 'Id'), IntelligentTieringConfiguration=self.data.get( 

3263 'IntelligentTieringConfiguration')) 

3264 except ClientError as e: 

3265 if e.response['Error']['Code'] == 'AccessDenied': 

3266 log.warning( 

3267 "Access Denied Bucket:%s while applying intelligent tiering configuration" 

3268 % bucket['Name']) 

3269 if self.data.get('State'): 

3270 if self.data.get('Id') == 'matched': 

3271 for config in bucket.get(self.annotation_key): 

3272 self.delete_intelligent_tiering_configurations(s3, config.get('Id'), bucket) 

3273 else: 

3274 self.delete_intelligent_tiering_configurations(s3, self.data.get('Id'), bucket) 

3275 

3276 def delete_intelligent_tiering_configurations(self, s3_client, id, bucket): 

3277 try: 

3278 s3_client.delete_bucket_intelligent_tiering_configuration(Bucket=bucket['Name'], Id=id) 

3279 except ClientError as e: 

3280 if e.response['Error']['Code'] == 'AccessDenied': 

3281 log.warning( 

3282 "Access Denied Bucket:%s while deleting intelligent tiering configuration" 

3283 % bucket['Name']) 

3284 elif e.response['Error']['Code'] == 'NoSuchConfiguration': 

3285 log.warning( 

3286 "No such configuration found:%s while deleting intelligent tiering configuration" 

3287 % bucket['Name']) 

3288 

3289 

3290@actions.register('delete') 

3291class DeleteBucket(ScanBucket): 

3292 """Action deletes a S3 bucket 

3293 

3294 :example: 

3295 

3296 .. code-block:: yaml 

3297 

3298 policies: 

3299 - name: delete-unencrypted-buckets 

3300 resource: s3 

3301 filters: 

3302 - type: missing-statement 

3303 statement_ids: 

3304 - RequiredEncryptedPutObject 

3305 actions: 

3306 - type: delete 

3307 remove-contents: true 

3308 """ 

3309 

3310 schema = type_schema('delete', **{'remove-contents': {'type': 'boolean'}}) 

3311 

3312 permissions = ('s3:*',) 

3313 

3314 bucket_ops = { 

3315 'standard': { 

3316 'iterator': 'list_objects', 

3317 'contents_key': ['Contents'], 

3318 'key_processor': 'process_key' 

3319 }, 

3320 'versioned': { 

3321 'iterator': 'list_object_versions', 

3322 'contents_key': ['Versions', 'DeleteMarkers'], 

3323 'key_processor': 'process_version' 

3324 } 

3325 } 

3326 

3327 def process_delete_enablement(self, b): 

3328 """Prep a bucket for deletion. 

3329 

3330 Clear out any pending multi-part uploads. 

3331 

3332 Disable versioning on the bucket, so deletes don't 

3333 generate fresh deletion markers. 

3334 """ 

3335 client = bucket_client( 

3336 local_session(self.manager.session_factory), b) 

3337 

3338 # Stop replication so we can suspend versioning 

3339 if b.get('Replication') is not None: 

3340 client.delete_bucket_replication(Bucket=b['Name']) 

3341 

3342 # Suspend versioning, so we don't get new delete markers 

3343 # as we walk and delete versions 

3344 if (self.get_bucket_style(b) == 'versioned' and b['Versioning']['Status'] == 'Enabled' and 

3345 self.data.get('remove-contents', True)): 

3346 client.put_bucket_versioning( 

3347 Bucket=b['Name'], 

3348 VersioningConfiguration={'Status': 'Suspended'}) 

3349 

3350 # Clear our multi-part uploads 

3351 uploads = client.get_paginator('list_multipart_uploads') 

3352 for p in uploads.paginate(Bucket=b['Name']): 

3353 for u in p.get('Uploads', ()): 

3354 client.abort_multipart_upload( 

3355 Bucket=b['Name'], 

3356 Key=u['Key'], 

3357 UploadId=u['UploadId']) 

3358 

3359 def process(self, buckets): 

3360 # might be worth sanity checking all our permissions 

3361 # on the bucket up front before disabling versioning/replication. 

3362 if self.data.get('remove-contents', True): 

3363 self._process_with_futures(self.process_delete_enablement, buckets) 

3364 self.empty_buckets(buckets) 

3365 

3366 results = self._process_with_futures(self.delete_bucket, buckets) 

3367 self.write_denied_buckets_file() 

3368 return results 

3369 

3370 def delete_bucket(self, b): 

3371 s3 = bucket_client(self.manager.session_factory(), b) 

3372 try: 

3373 self._run_api(s3.delete_bucket, Bucket=b['Name']) 

3374 except ClientError as e: 

3375 if e.response['Error']['Code'] == 'BucketNotEmpty': 

3376 self.log.error( 

3377 "Error while deleting bucket %s, bucket not empty" % ( 

3378 b['Name'])) 

3379 else: 

3380 raise e 

3381 

3382 def empty_buckets(self, buckets): 

3383 t = time.time() 

3384 results = super(DeleteBucket, self).process(buckets) 

3385 run_time = time.time() - t 

3386 object_count = 0 

3387 

3388 for r in results: 

3389 object_count += r['Count'] 

3390 self.manager.ctx.metrics.put_metric( 

3391 "Total Keys", object_count, "Count", Scope=r['Bucket'], 

3392 buffer=True) 

3393 self.manager.ctx.metrics.put_metric( 

3394 "Total Keys", object_count, "Count", Scope="Account", buffer=True) 

3395 self.manager.ctx.metrics.flush() 

3396 

3397 log.info( 

3398 "EmptyBucket buckets:%d Complete keys:%d rate:%0.2f/s time:%0.2fs", 

3399 len(buckets), object_count, 

3400 float(object_count) / run_time if run_time else 0, run_time) 

3401 return results 

3402 

3403 def process_chunk(self, batch, bucket): 

3404 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3405 objects = [] 

3406 for key in batch: 

3407 obj = {'Key': key['Key']} 

3408 if 'VersionId' in key: 

3409 obj['VersionId'] = key['VersionId'] 

3410 objects.append(obj) 

3411 results = s3.delete_objects( 

3412 Bucket=bucket['Name'], Delete={'Objects': objects}).get('Deleted', ()) 

3413 if self.get_bucket_style(bucket) != 'versioned': 

3414 return results 

3415 

3416 

3417@actions.register('configure-lifecycle') 

3418class Lifecycle(BucketActionBase): 

3419 """Action applies a lifecycle policy to versioned S3 buckets 

3420 

3421 The schema to supply to the rule follows the schema here: 

3422 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_bucket_lifecycle_configuration 

3423 

3424 To delete a lifecycle rule, supply Status=absent 

3425 

3426 :example: 

3427 

3428 .. code-block:: yaml 

3429 

3430 policies: 

3431 - name: s3-apply-lifecycle 

3432 resource: s3 

3433 actions: 

3434 - type: configure-lifecycle 

3435 rules: 

3436 - ID: my-lifecycle-id 

3437 Status: Enabled 

3438 Prefix: foo/ 

3439 Transitions: 

3440 - Days: 60 

3441 StorageClass: GLACIER 

3442 

3443 """ 

3444 

3445 schema = type_schema( 

3446 'configure-lifecycle', 

3447 **{ 

3448 'rules': { 

3449 'type': 'array', 

3450 'items': { 

3451 'type': 'object', 

3452 'required': ['ID', 'Status'], 

3453 'additionalProperties': False, 

3454 'properties': { 

3455 'ID': {'type': 'string'}, 

3456 # c7n intercepts `absent` 

3457 'Status': {'enum': ['Enabled', 'Disabled', 'absent']}, 

3458 'Prefix': {'type': 'string'}, 

3459 'Expiration': { 

3460 'type': 'object', 

3461 'additionalProperties': False, 

3462 'properties': { 

3463 'Date': {'type': 'string'}, # Date 

3464 'Days': {'type': 'integer'}, 

3465 'ExpiredObjectDeleteMarker': {'type': 'boolean'}, 

3466 }, 

3467 }, 

3468 'Filter': { 

3469 'type': 'object', 

3470 'minProperties': 1, 

3471 'maxProperties': 1, 

3472 'additionalProperties': False, 

3473 'properties': { 

3474 'Prefix': {'type': 'string'}, 

3475 'ObjectSizeGreaterThan': {'type': 'integer'}, 

3476 'ObjectSizeLessThan': {'type': 'integer'}, 

3477 'Tag': { 

3478 'type': 'object', 

3479 'required': ['Key', 'Value'], 

3480 'additionalProperties': False, 

3481 'properties': { 

3482 'Key': {'type': 'string'}, 

3483 'Value': {'type': 'string'}, 

3484 }, 

3485 }, 

3486 'And': { 

3487 'type': 'object', 

3488 'additionalProperties': False, 

3489 'properties': { 

3490 'Prefix': {'type': 'string'}, 

3491 'ObjectSizeGreaterThan': {'type': 'integer'}, 

3492 'ObjectSizeLessThan': {'type': 'integer'}, 

3493 'Tags': { 

3494 'type': 'array', 

3495 'items': { 

3496 'type': 'object', 

3497 'required': ['Key', 'Value'], 

3498 'additionalProperties': False, 

3499 'properties': { 

3500 'Key': {'type': 'string'}, 

3501 'Value': {'type': 'string'}, 

3502 }, 

3503 }, 

3504 }, 

3505 }, 

3506 }, 

3507 }, 

3508 }, 

3509 'Transitions': { 

3510 'type': 'array', 

3511 'items': { 

3512 'type': 'object', 

3513 'additionalProperties': False, 

3514 'properties': { 

3515 'Date': {'type': 'string'}, # Date 

3516 'Days': {'type': 'integer'}, 

3517 'StorageClass': {'type': 'string'}, 

3518 }, 

3519 }, 

3520 }, 

3521 'NoncurrentVersionTransitions': { 

3522 'type': 'array', 

3523 'items': { 

3524 'type': 'object', 

3525 'additionalProperties': False, 

3526 'properties': { 

3527 'NoncurrentDays': {'type': 'integer'}, 

3528 'NewerNoncurrentVersions': {'type': 'integer'}, 

3529 'StorageClass': {'type': 'string'}, 

3530 }, 

3531 }, 

3532 }, 

3533 'NoncurrentVersionExpiration': { 

3534 'type': 'object', 

3535 'additionalProperties': False, 

3536 'properties': { 

3537 'NoncurrentDays': {'type': 'integer'}, 

3538 'NewerNoncurrentVersions': {'type': 'integer'} 

3539 }, 

3540 }, 

3541 'AbortIncompleteMultipartUpload': { 

3542 'type': 'object', 

3543 'additionalProperties': False, 

3544 'properties': { 

3545 'DaysAfterInitiation': {'type': 'integer'}, 

3546 }, 

3547 }, 

3548 }, 

3549 }, 

3550 }, 

3551 } 

3552 ) 

3553 

3554 permissions = ('s3:GetLifecycleConfiguration', 's3:PutLifecycleConfiguration') 

3555 

3556 def process(self, buckets): 

3557 with self.executor_factory(max_workers=3) as w: 

3558 futures = {} 

3559 results = [] 

3560 

3561 for b in buckets: 

3562 futures[w.submit(self.process_bucket, b)] = b 

3563 

3564 for future in as_completed(futures): 

3565 if future.exception(): 

3566 bucket = futures[future] 

3567 self.log.error('error modifying bucket lifecycle: %s\n%s', 

3568 bucket['Name'], future.exception()) 

3569 results += filter(None, [future.result()]) 

3570 

3571 return results 

3572 

3573 def process_bucket(self, bucket): 

3574 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3575 

3576 if 'get_bucket_lifecycle_configuration' in bucket.get('c7n:DeniedMethods', []): 

3577 log.warning("Access Denied Bucket:%s while reading lifecycle" % bucket['Name']) 

3578 return 

3579 

3580 # Adjust the existing lifecycle by adding/deleting/overwriting rules as necessary 

3581 config = (bucket.get('Lifecycle') or {}).get('Rules', []) 

3582 for rule in self.data['rules']: 

3583 for index, existing_rule in enumerate(config): 

3584 if not existing_rule: 

3585 continue 

3586 if rule['ID'] == existing_rule['ID']: 

3587 if rule['Status'] == 'absent': 

3588 config[index] = None 

3589 else: 

3590 config[index] = rule 

3591 break 

3592 else: 

3593 if rule['Status'] != 'absent': 

3594 config.append(rule) 

3595 

3596 # The extra `list` conversion is required for python3 

3597 config = list(filter(None, config)) 

3598 

3599 try: 

3600 if not config: 

3601 s3.delete_bucket_lifecycle(Bucket=bucket['Name']) 

3602 else: 

3603 s3.put_bucket_lifecycle_configuration( 

3604 Bucket=bucket['Name'], LifecycleConfiguration={'Rules': config}) 

3605 except ClientError as e: 

3606 if e.response['Error']['Code'] == 'AccessDenied': 

3607 log.warning("Access Denied Bucket:%s while applying lifecycle" % bucket['Name']) 

3608 else: 

3609 raise e 

3610 

3611 

3612class KMSKeyResolverMixin: 

3613 """Builds a dictionary of region specific ARNs""" 

3614 

3615 def __init__(self, data, manager=None): 

3616 self.arns = dict() 

3617 self.data = data 

3618 self.manager = manager 

3619 

3620 def resolve_keys(self, buckets): 

3621 key = self.data.get('key') 

3622 if not key: 

3623 return None 

3624 

3625 regions = {get_region(b) for b in buckets} 

3626 for r in regions: 

3627 client = local_session(self.manager.session_factory).client('kms', region_name=r) 

3628 try: 

3629 key_meta = client.describe_key( 

3630 KeyId=key 

3631 ).get('KeyMetadata', {}) 

3632 key_id = key_meta.get('KeyId') 

3633 

3634 # We need a complete set of alias identifiers (names and ARNs) 

3635 # to fully evaluate bucket encryption filters. 

3636 key_aliases = client.list_aliases( 

3637 KeyId=key_id 

3638 ).get('Aliases', []) 

3639 

3640 self.arns[r] = { 

3641 'KeyId': key_id, 

3642 'Arn': key_meta.get('Arn'), 

3643 'KeyManager': key_meta.get('KeyManager'), 

3644 'Description': key_meta.get('Description'), 

3645 'Aliases': [ 

3646 alias[attr] 

3647 for alias in key_aliases 

3648 for attr in ('AliasArn', 'AliasName') 

3649 ], 

3650 } 

3651 

3652 except ClientError as e: 

3653 self.log.error('Error resolving kms ARNs for set-bucket-encryption: %s key: %s' % ( 

3654 e, self.data.get('key'))) 

3655 

3656 def get_key(self, bucket): 

3657 if 'key' not in self.data: 

3658 return None 

3659 region = get_region(bucket) 

3660 key = self.arns.get(region) 

3661 if not key: 

3662 self.log.warning('Unable to resolve key %s for bucket %s in region %s', 

3663 self.data['key'], bucket.get('Name'), region) 

3664 return key 

3665 

3666 

3667@filters.register('bucket-encryption') 

3668class BucketEncryption(KMSKeyResolverMixin, Filter): 

3669 """Filters for S3 buckets that have bucket-encryption 

3670 

3671 :example 

3672 

3673 .. code-block:: yaml 

3674 

3675 policies: 

3676 - name: s3-bucket-encryption-AES256 

3677 resource: s3 

3678 region: us-east-1 

3679 filters: 

3680 - type: bucket-encryption 

3681 state: True 

3682 crypto: AES256 

3683 - name: s3-bucket-encryption-KMS 

3684 resource: s3 

3685 region: us-east-1 

3686 filters: 

3687 - type: bucket-encryption 

3688 state: True 

3689 crypto: aws:kms 

3690 key: alias/some/alias/key 

3691 - name: s3-bucket-encryption-off 

3692 resource: s3 

3693 region: us-east-1 

3694 filters: 

3695 - type: bucket-encryption 

3696 state: False 

3697 - name: s3-bucket-test-bucket-key-enabled 

3698 resource: s3 

3699 region: us-east-1 

3700 filters: 

3701 - type: bucket-encryption 

3702 bucket_key_enabled: True 

3703 """ 

3704 schema = type_schema('bucket-encryption', 

3705 state={'type': 'boolean'}, 

3706 crypto={'type': 'string', 'enum': ['AES256', 'aws:kms']}, 

3707 key={'type': 'string'}, 

3708 bucket_key_enabled={'type': 'boolean'}) 

3709 

3710 permissions = ('s3:GetEncryptionConfiguration', 'kms:DescribeKey', 'kms:ListAliases') 

3711 annotation_key = 'c7n:bucket-encryption' 

3712 

3713 def validate(self): 

3714 if self.data.get('bucket_key_enabled') is not None and self.data.get('key') is not None: 

3715 raise PolicyValidationError( 

3716 f'key and bucket_key_enabled attributes cannot both be set: {self.data}' 

3717 ) 

3718 

3719 def process(self, buckets, event=None): 

3720 self.resolve_keys(buckets) 

3721 results = [] 

3722 with self.executor_factory(max_workers=2) as w: 

3723 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3724 for future in as_completed(futures): 

3725 b = futures[future] 

3726 if future.exception(): 

3727 self.log.error("Message: %s Bucket: %s", future.exception(), 

3728 b['Name']) 

3729 continue 

3730 if future.result(): 

3731 results.append(b) 

3732 return results 

3733 

3734 def process_bucket(self, b): 

3735 

3736 client = bucket_client(local_session(self.manager.session_factory), b) 

3737 rules = [] 

3738 if self.annotation_key not in b: 

3739 try: 

3740 be = client.get_bucket_encryption(Bucket=b['Name']) 

3741 be.pop('ResponseMetadata', None) 

3742 except ClientError as e: 

3743 if e.response['Error']['Code'] != 'ServerSideEncryptionConfigurationNotFoundError': 

3744 raise 

3745 be = {} 

3746 b[self.annotation_key] = be 

3747 else: 

3748 be = b[self.annotation_key] 

3749 

3750 rules = be.get('ServerSideEncryptionConfiguration', {}).get('Rules', []) 

3751 # default `state` to True as previous impl assumed state == True 

3752 # to preserve backwards compatibility 

3753 if self.data.get('bucket_key_enabled'): 

3754 for rule in rules: 

3755 return self.filter_bucket_key_enabled(rule) 

3756 elif self.data.get('bucket_key_enabled') is False: 

3757 for rule in rules: 

3758 return not self.filter_bucket_key_enabled(rule) 

3759 

3760 if self.data.get('state', True): 

3761 for sse in rules: 

3762 return self.filter_bucket(b, sse) 

3763 return False 

3764 else: 

3765 for sse in rules: 

3766 return not self.filter_bucket(b, sse) 

3767 return True 

3768 

3769 def filter_bucket(self, b, sse): 

3770 allowed = ['AES256', 'aws:kms'] 

3771 key = self.get_key(b) 

3772 crypto = self.data.get('crypto') 

3773 rule = sse.get('ApplyServerSideEncryptionByDefault') 

3774 

3775 if not rule: 

3776 return False 

3777 algo = rule.get('SSEAlgorithm') 

3778 

3779 if not crypto and algo in allowed: 

3780 return True 

3781 

3782 if crypto == 'AES256' and algo == 'AES256': 

3783 return True 

3784 elif crypto == 'aws:kms' and algo == 'aws:kms': 

3785 if not key: 

3786 # There are two broad reasons to have an empty value for 

3787 # the regional key here: 

3788 # 

3789 # * The policy did not specify a key, in which case this 

3790 # filter should match _all_ buckets with a KMS default 

3791 # encryption rule. 

3792 # 

3793 # * The policy specified a key that could not be 

3794 # resolved, in which case this filter shouldn't match 

3795 # any buckets. 

3796 return 'key' not in self.data 

3797 

3798 # The default encryption rule can specify a key ID, 

3799 # key ARN, alias name or alias ARN. Match against any of 

3800 # those attributes. A rule specifying KMS with no master key 

3801 # implies the AWS-managed key. 

3802 key_ids = {key.get('Arn'), key.get('KeyId'), *key['Aliases']} 

3803 return rule.get('KMSMasterKeyID', 'alias/aws/s3') in key_ids 

3804 

3805 def filter_bucket_key_enabled(self, rule) -> bool: 

3806 if not rule: 

3807 return False 

3808 return rule.get('BucketKeyEnabled') 

3809 

3810 

3811@actions.register('set-bucket-encryption') 

3812class SetBucketEncryption(KMSKeyResolverMixin, BucketActionBase): 

3813 """Action enables default encryption on S3 buckets 

3814 

3815 `enabled`: boolean Optional: Defaults to True 

3816 

3817 `crypto`: aws:kms | AES256` Optional: Defaults to AES256 

3818 

3819 `key`: arn, alias, or kms id key 

3820 

3821 `bucket-key`: boolean Optional: 

3822 Defaults to True. 

3823 Reduces amount of API traffic from Amazon S3 to KMS and can reduce KMS request 

3824 costsby up to 99 percent. Requires kms:Decrypt permissions for copy and upload 

3825 on the AWS KMS Key Policy. 

3826 

3827 Bucket Key Docs: https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucket-key.html 

3828 

3829 :example: 

3830 

3831 .. code-block:: yaml 

3832 

3833 policies: 

3834 - name: s3-enable-default-encryption-kms 

3835 resource: s3 

3836 actions: 

3837 - type: set-bucket-encryption 

3838 # enabled: true <------ optional (true by default) 

3839 crypto: aws:kms 

3840 key: 1234abcd-12ab-34cd-56ef-1234567890ab 

3841 bucket-key: true 

3842 

3843 - name: s3-enable-default-encryption-kms-alias 

3844 resource: s3 

3845 actions: 

3846 - type: set-bucket-encryption 

3847 # enabled: true <------ optional (true by default) 

3848 crypto: aws:kms 

3849 key: alias/some/alias/key 

3850 bucket-key: true 

3851 

3852 - name: s3-enable-default-encryption-aes256 

3853 resource: s3 

3854 actions: 

3855 - type: set-bucket-encryption 

3856 # bucket-key: true <--- optional (true by default for AWS SSE) 

3857 # crypto: AES256 <----- optional (AES256 by default) 

3858 # enabled: true <------ optional (true by default) 

3859 

3860 - name: s3-disable-default-encryption 

3861 resource: s3 

3862 actions: 

3863 - type: set-bucket-encryption 

3864 enabled: false 

3865 """ 

3866 

3867 schema = { 

3868 'type': 'object', 

3869 'additionalProperties': False, 

3870 'properties': { 

3871 'type': {'enum': ['set-bucket-encryption']}, 

3872 'enabled': {'type': 'boolean'}, 

3873 'crypto': {'enum': ['aws:kms', 'AES256']}, 

3874 'key': {'type': 'string'}, 

3875 'bucket-key': {'type': 'boolean'} 

3876 }, 

3877 'dependencies': { 

3878 'key': { 

3879 'properties': { 

3880 'crypto': {'pattern': 'aws:kms'} 

3881 }, 

3882 'required': ['crypto'] 

3883 } 

3884 } 

3885 } 

3886 

3887 permissions = ('s3:PutEncryptionConfiguration', 's3:GetEncryptionConfiguration', 

3888 'kms:ListAliases', 'kms:DescribeKey') 

3889 

3890 def process(self, buckets): 

3891 if self.data.get('enabled', True): 

3892 self.resolve_keys(buckets) 

3893 

3894 with self.executor_factory(max_workers=3) as w: 

3895 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3896 for future in as_completed(futures): 

3897 if future.exception(): 

3898 self.log.error('Message: %s Bucket: %s', future.exception(), 

3899 futures[future]['Name']) 

3900 

3901 def process_bucket(self, bucket): 

3902 default_key_desc = 'Default master key that protects my S3 objects when no other key is defined' # noqa 

3903 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3904 if not self.data.get('enabled', True): 

3905 s3.delete_bucket_encryption(Bucket=bucket['Name']) 

3906 return 

3907 algo = self.data.get('crypto', 'AES256') 

3908 

3909 # bucket key defaults to True for alias/aws/s3 and AES256 (Amazon SSE) 

3910 # and ignores False values for that crypto 

3911 bucket_key = self.data.get('bucket-key', True) 

3912 config = { 

3913 'Rules': [ 

3914 { 

3915 'ApplyServerSideEncryptionByDefault': { 

3916 'SSEAlgorithm': algo, 

3917 }, 

3918 'BucketKeyEnabled': bucket_key 

3919 } 

3920 ] 

3921 } 

3922 

3923 if algo == 'aws:kms': 

3924 key = self.get_key(bucket) 

3925 if not key: 

3926 raise Exception('Valid KMS Key required but does not exist') 

3927 

3928 config['Rules'][0]['ApplyServerSideEncryptionByDefault']['KMSMasterKeyID'] = key['Arn'] 

3929 s3.put_bucket_encryption( 

3930 Bucket=bucket['Name'], 

3931 ServerSideEncryptionConfiguration=config 

3932 ) 

3933 

3934 

3935OWNERSHIP_CONTROLS = ['BucketOwnerEnforced', 'BucketOwnerPreferred', 'ObjectWriter'] 

3936VALUE_FILTER_MAGIC_VALUES = ['absent', 'present', 'not-null', 'empty'] 

3937 

3938 

3939@filters.register('ownership') 

3940class BucketOwnershipControls(BucketFilterBase, ValueFilter): 

3941 """Filter for object ownership controls 

3942 

3943 Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html 

3944 

3945 :example 

3946 

3947 Find buckets with ACLs disabled 

3948 

3949 .. code-block:: yaml 

3950 

3951 policies: 

3952 - name: s3-bucket-acls-disabled 

3953 resource: aws.s3 

3954 region: us-east-1 

3955 filters: 

3956 - type: ownership 

3957 value: BucketOwnerEnforced 

3958 

3959 :example 

3960 

3961 Find buckets with object ownership preferred or enforced 

3962 

3963 .. code-block:: yaml 

3964 

3965 policies: 

3966 - name: s3-bucket-ownership-preferred 

3967 resource: aws.s3 

3968 region: us-east-1 

3969 filters: 

3970 - type: ownership 

3971 op: in 

3972 value: 

3973 - BucketOwnerEnforced 

3974 - BucketOwnerPreferred 

3975 

3976 :example 

3977 

3978 Find buckets with no object ownership controls 

3979 

3980 .. code-block:: yaml 

3981 

3982 policies: 

3983 - name: s3-bucket-no-ownership-controls 

3984 resource: aws.s3 

3985 region: us-east-1 

3986 filters: 

3987 - type: ownership 

3988 value: empty 

3989 """ 

3990 schema = type_schema('ownership', rinherit=ValueFilter.schema, value={'oneOf': [ 

3991 {'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}, 

3992 {'type': 'array', 'items': { 

3993 'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}}]}) 

3994 permissions = ('s3:GetBucketOwnershipControls',) 

3995 annotation_key = 'c7n:ownership' 

3996 

3997 def __init__(self, data, manager=None): 

3998 super(BucketOwnershipControls, self).__init__(data, manager) 

3999 

4000 # Ownership controls appear as an array of rules. There can only be one 

4001 # ObjectOwnership rule defined for a bucket, so we can automatically 

4002 # match against that if it exists. 

4003 self.data['key'] = f'("{self.annotation_key}".Rules[].ObjectOwnership)[0]' 

4004 

4005 def process(self, buckets, event=None): 

4006 with self.executor_factory(max_workers=2) as w: 

4007 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

4008 for future in as_completed(futures): 

4009 b = futures[future] 

4010 if future.exception(): 

4011 self.log.error("Message: %s Bucket: %s", future.exception(), 

4012 b['Name']) 

4013 continue 

4014 return super(BucketOwnershipControls, self).process(buckets, event) 

4015 

4016 def process_bucket(self, b): 

4017 if self.annotation_key in b: 

4018 return 

4019 client = bucket_client(local_session(self.manager.session_factory), b) 

4020 try: 

4021 controls = client.get_bucket_ownership_controls(Bucket=b['Name']) 

4022 controls.pop('ResponseMetadata', None) 

4023 except ClientError as e: 

4024 if e.response['Error']['Code'] != 'OwnershipControlsNotFoundError': 

4025 raise 

4026 controls = {} 

4027 b[self.annotation_key] = controls.get('OwnershipControls') 

4028 

4029 

4030@filters.register('bucket-replication') 

4031class BucketReplication(ListItemFilter): 

4032 """Filter for S3 buckets to look at bucket replication configurations 

4033 

4034 The schema to supply to the attrs follows the schema here: 

4035 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_bucket_replication.html 

4036 

4037 :example: 

4038 

4039 .. code-block:: yaml 

4040 

4041 policies: 

4042 - name: s3-bucket-replication 

4043 resource: s3 

4044 filters: 

4045 - type: bucket-replication 

4046 attrs: 

4047 - Status: Enabled 

4048 - Filter: 

4049 And: 

4050 Prefix: test 

4051 Tags: 

4052 - Key: Owner 

4053 Value: c7n 

4054 - ExistingObjectReplication: Enabled 

4055 

4056 """ 

4057 schema = type_schema( 

4058 'bucket-replication', 

4059 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'}, 

4060 count={'type': 'number'}, 

4061 count_op={'$ref': '#/definitions/filters_common/comparison_operators'} 

4062 ) 

4063 

4064 permissions = ("s3:GetReplicationConfiguration",) 

4065 annotation_key = 'Replication' 

4066 annotate_items = True 

4067 

4068 def __init__(self, data, manager=None): 

4069 super().__init__(data, manager) 

4070 self.data['key'] = self.annotation_key 

4071 

4072 def get_item_values(self, b): 

4073 client = bucket_client(local_session(self.manager.session_factory), b) 

4074 # replication configuration is called in S3_AUGMENT_TABLE: 

4075 bucket_replication = b.get(self.annotation_key) 

4076 

4077 rules = [] 

4078 if bucket_replication is not None: 

4079 rules = bucket_replication.get('ReplicationConfiguration', {}).get('Rules', []) 

4080 for replication in rules: 

4081 self.augment_bucket_replication(b, replication, client) 

4082 

4083 return rules 

4084 

4085 def augment_bucket_replication(self, b, replication, client): 

4086 destination_bucket = replication.get('Destination').get('Bucket').split(':')[5] 

4087 try: 

4088 destination_region = inspect_bucket_region(destination_bucket, client.meta.endpoint_url) 

4089 except ValueError: 

4090 replication['DestinationBucketAvailable'] = False 

4091 return 

4092 source_region = get_region(b) 

4093 replication['DestinationBucketAvailable'] = True 

4094 replication['DestinationRegion'] = destination_region 

4095 replication['CrossRegion'] = destination_region != source_region 

4096 

4097 

4098@resources.register('s3-directory') 

4099class S3Directory(query.QueryResourceManager): 

4100 

4101 class resource_type(query.TypeInfo): 

4102 service = 's3' 

4103 permission_prefix = "s3express" 

4104 arn_service = "s3express" 

4105 arn_type = 'bucket' 

4106 enum_spec = ('list_directory_buckets', 'Buckets[]', None) 

4107 name = id = 'Name' 

4108 date = 'CreationDate' 

4109 dimension = 'BucketName' 

4110 cfn_type = 'AWS::S3Express::DirectoryBucket' 

4111 permissions_enum = ("s3express:ListAllMyDirectoryBuckets",)