Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/s3.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1738 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3"""S3 Resource Manager 

4 

5Filters: 

6 

7The generic Values filters (jmespath) expression and Or filter are 

8available with all resources, including buckets, we include several 

9additonal bucket data (Tags, Replication, Acl, Policy) as keys within 

10a bucket representation. 

11 

12Actions: 

13 

14 encrypt-keys 

15 

16 Scan all keys in a bucket and optionally encrypt them in place. 

17 

18 global-grants 

19 

20 Check bucket acls for global grants 

21 

22 encryption-policy 

23 

24 Attach an encryption required policy to a bucket, this will break 

25 applications that are not using encryption, including aws log 

26 delivery. 

27 

28""" 

29import copy 

30import functools 

31import json 

32import logging 

33import math 

34import os 

35import time 

36import threading 

37import ssl 

38 

39from botocore.client import Config 

40from botocore.exceptions import ClientError 

41 

42from collections import defaultdict 

43from concurrent.futures import as_completed 

44 

45try: 

46 from urllib3.exceptions import SSLError 

47except ImportError: 

48 from botocore.vendored.requests.packages.urllib3.exceptions import SSLError 

49 

50 

51from c7n.actions import ( 

52 ActionRegistry, BaseAction, PutMetric, RemovePolicyBase) 

53from c7n.exceptions import PolicyValidationError, PolicyExecutionError 

54from c7n.filters import ( 

55 FilterRegistry, Filter, CrossAccountAccessFilter, MetricsFilter, 

56 ValueFilter, ListItemFilter) 

57from .aws import shape_validate 

58from c7n.filters.policystatement import HasStatementFilter 

59from c7n.manager import resources 

60from c7n.output import NullBlobOutput 

61from c7n import query 

62from c7n.resources.securityhub import PostFinding 

63from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction 

64from c7n.utils import ( 

65 chunks, local_session, set_annotation, type_schema, filter_empty, 

66 dumps, format_string_values, get_account_alias_from_sts) 

67from c7n.resources.aws import inspect_bucket_region 

68 

69 

70log = logging.getLogger('custodian.s3') 

71 

72filters = FilterRegistry('s3.filters') 

73actions = ActionRegistry('s3.actions') 

74filters.register('marked-for-op', TagActionFilter) 

75actions.register('put-metric', PutMetric) 

76 

77MAX_COPY_SIZE = 1024 * 1024 * 1024 * 2 

78 

79 

80class DescribeS3(query.DescribeSource): 

81 

82 def augment(self, buckets): 

83 assembler = BucketAssembly(self.manager) 

84 assembler.initialize() 

85 

86 with self.manager.executor_factory( 

87 max_workers=min((10, len(buckets) + 1))) as w: 

88 results = w.map(assembler.assemble, buckets) 

89 results = list(filter(None, results)) 

90 return results 

91 

92 

93class ConfigS3(query.ConfigSource): 

94 

95 # normalize config's janky idiosyncratic bespoke formating to the 

96 # standard describe api responses. 

97 

98 def get_query_params(self, query): 

99 q = super(ConfigS3, self).get_query_params(query) 

100 if 'expr' in q: 

101 q['expr'] = q['expr'].replace('select ', 'select awsRegion, ') 

102 return q 

103 

104 def load_resource(self, item): 

105 resource = super(ConfigS3, self).load_resource(item) 

106 cfg = item['supplementaryConfiguration'] 

107 # aka standard 

108 if 'awsRegion' in item and item['awsRegion'] != 'us-east-1': 

109 resource['Location'] = {'LocationConstraint': item['awsRegion']} 

110 else: 

111 resource['Location'] = {} 

112 

113 # owner is under acl per describe 

114 resource.pop('Owner', None) 

115 

116 for k, null_value in S3_CONFIG_SUPPLEMENT_NULL_MAP.items(): 

117 if k not in cfg: 

118 continue 

119 if cfg.get(k) == null_value: 

120 continue 

121 method = getattr(self, "handle_%s" % k, None) 

122 if method is None: 

123 raise ValueError("unhandled supplementary config %s", k) 

124 continue 

125 v = cfg[k] 

126 if isinstance(cfg[k], str): 

127 v = json.loads(cfg[k]) 

128 method(resource, v) 

129 

130 for el in S3_AUGMENT_TABLE: 

131 if el[1] not in resource: 

132 resource[el[1]] = el[2] 

133 return resource 

134 

135 PERMISSION_MAP = { 

136 'FullControl': 'FULL_CONTROL', 

137 'Write': 'WRITE', 

138 'WriteAcp': 'WRITE_ACP', 

139 'Read': 'READ', 

140 'ReadAcp': 'READ_ACP'} 

141 

142 GRANTEE_MAP = { 

143 'AllUsers': "http://acs.amazonaws.com/groups/global/AllUsers", 

144 'AuthenticatedUsers': "http://acs.amazonaws.com/groups/global/AuthenticatedUsers", 

145 'LogDelivery': 'http://acs.amazonaws.com/groups/s3/LogDelivery'} 

146 

147 def handle_AccessControlList(self, resource, item_value): 

148 # double serialized in config for some reason 

149 if isinstance(item_value, str): 

150 item_value = json.loads(item_value) 

151 

152 resource['Acl'] = {} 

153 resource['Acl']['Owner'] = {'ID': item_value['owner']['id']} 

154 if item_value['owner']['displayName']: 

155 resource['Acl']['Owner']['DisplayName'] = item_value[ 

156 'owner']['displayName'] 

157 resource['Acl']['Grants'] = grants = [] 

158 

159 for g in (item_value.get('grantList') or ()): 

160 if 'id' not in g['grantee']: 

161 assert g['grantee'] in self.GRANTEE_MAP, "unknown grantee %s" % g 

162 rg = {'Type': 'Group', 'URI': self.GRANTEE_MAP[g['grantee']]} 

163 else: 

164 rg = {'ID': g['grantee']['id'], 'Type': 'CanonicalUser'} 

165 

166 if 'displayName' in g: 

167 rg['DisplayName'] = g['displayName'] 

168 

169 grants.append({ 

170 'Permission': self.PERMISSION_MAP[g['permission']], 

171 'Grantee': rg, 

172 }) 

173 

174 def handle_BucketAccelerateConfiguration(self, resource, item_value): 

175 # not currently auto-augmented by custodian 

176 return 

177 

178 def handle_BucketLoggingConfiguration(self, resource, item_value): 

179 if ('destinationBucketName' not in item_value or 

180 item_value['destinationBucketName'] is None): 

181 resource[u'Logging'] = {} 

182 return 

183 resource[u'Logging'] = { 

184 'TargetBucket': item_value['destinationBucketName'], 

185 'TargetPrefix': item_value['logFilePrefix']} 

186 

187 def handle_BucketLifecycleConfiguration(self, resource, item_value): 

188 rules = [] 

189 for r in item_value.get('rules'): 

190 rr = {} 

191 rules.append(rr) 

192 expiry = {} 

193 for ek, ck in ( 

194 ('Date', 'expirationDate'), 

195 ('ExpiredObjectDeleteMarker', 'expiredObjectDeleteMarker'), 

196 ('Days', 'expirationInDays')): 

197 if ck in r and r[ck] and r[ck] != -1: 

198 expiry[ek] = r[ck] 

199 if expiry: 

200 rr['Expiration'] = expiry 

201 

202 transitions = [] 

203 for t in (r.get('transitions') or ()): 

204 tr = {} 

205 for k in ('date', 'days', 'storageClass'): 

206 if t.get(k): 

207 tr["%s%s" % (k[0].upper(), k[1:])] = t[k] 

208 transitions.append(tr) 

209 if transitions: 

210 rr['Transitions'] = transitions 

211 

212 if r.get('abortIncompleteMultipartUpload'): 

213 rr['AbortIncompleteMultipartUpload'] = { 

214 'DaysAfterInitiation': r[ 

215 'abortIncompleteMultipartUpload']['daysAfterInitiation']} 

216 if r.get('noncurrentVersionExpirationInDays'): 

217 rr['NoncurrentVersionExpiration'] = { 

218 'NoncurrentDays': r['noncurrentVersionExpirationInDays']} 

219 

220 nonc_transitions = [] 

221 for t in (r.get('noncurrentVersionTransitions') or ()): 

222 nonc_transitions.append({ 

223 'NoncurrentDays': t['days'], 

224 'StorageClass': t['storageClass']}) 

225 if nonc_transitions: 

226 rr['NoncurrentVersionTransitions'] = nonc_transitions 

227 

228 rr['Status'] = r['status'] 

229 rr['ID'] = r['id'] 

230 if r.get('prefix'): 

231 rr['Prefix'] = r['prefix'] 

232 if 'filter' not in r or not r['filter']: 

233 continue 

234 

235 if r['filter']['predicate']: 

236 rr['Filter'] = self.convertLifePredicate(r['filter']['predicate']) 

237 

238 resource['Lifecycle'] = {'Rules': rules} 

239 

240 def convertLifePredicate(self, p): 

241 if p['type'] == 'LifecyclePrefixPredicate': 

242 return {'Prefix': p['prefix']} 

243 if p['type'] == 'LifecycleTagPredicate': 

244 return {'Tags': [{'Key': p['tag']['key'], 'Value': p['tag']['value']}]} 

245 if p['type'] == 'LifecycleAndOperator': 

246 n = {} 

247 for o in p['operands']: 

248 ot = self.convertLifePredicate(o) 

249 if 'Tags' in n and 'Tags' in ot: 

250 n['Tags'].extend(ot['Tags']) 

251 else: 

252 n.update(ot) 

253 return {'And': n} 

254 

255 raise ValueError("unknown predicate: %s" % p) 

256 

257 NotifyTypeMap = { 

258 'QueueConfiguration': 'QueueConfigurations', 

259 'LambdaConfiguration': 'LambdaFunctionConfigurations', 

260 'CloudFunctionConfiguration': 'LambdaFunctionConfigurations', 

261 'TopicConfiguration': 'TopicConfigurations'} 

262 

263 def handle_BucketNotificationConfiguration(self, resource, item_value): 

264 d = {} 

265 for nid, n in item_value['configurations'].items(): 

266 ninfo = {} 

267 d.setdefault(self.NotifyTypeMap[n['type']], []).append(ninfo) 

268 if n['type'] == 'QueueConfiguration': 

269 ninfo['QueueArn'] = n['queueARN'] 

270 elif n['type'] == 'TopicConfiguration': 

271 ninfo['TopicArn'] = n['topicARN'] 

272 elif n['type'] == 'LambdaConfiguration': 

273 ninfo['LambdaFunctionArn'] = n['functionARN'] 

274 ninfo['Id'] = nid 

275 ninfo['Events'] = n['events'] 

276 rules = [] 

277 if n['filter']: 

278 for r in n['filter'].get('s3KeyFilter', {}).get('filterRules', []): 

279 rules.append({'Name': r['name'], 'Value': r['value']}) 

280 if rules: 

281 ninfo['Filter'] = {'Key': {'FilterRules': rules}} 

282 resource['Notification'] = d 

283 

284 def handle_BucketReplicationConfiguration(self, resource, item_value): 

285 d = {'Role': item_value['roleARN'], 'Rules': []} 

286 for rid, r in item_value['rules'].items(): 

287 rule = { 

288 'ID': rid, 

289 'Status': r.get('status', ''), 

290 'Prefix': r.get('prefix', ''), 

291 'Destination': { 

292 'Bucket': r['destinationConfig']['bucketARN']} 

293 } 

294 if 'Account' in r['destinationConfig']: 

295 rule['Destination']['Account'] = r['destinationConfig']['Account'] 

296 if r['destinationConfig'].get('storageClass'): 

297 rule['Destination']['StorageClass'] = r['destinationConfig']['storageClass'] 

298 d['Rules'].append(rule) 

299 resource['Replication'] = {'ReplicationConfiguration': d} 

300 

301 def handle_BucketPolicy(self, resource, item_value): 

302 resource['Policy'] = item_value.get('policyText') 

303 

304 def handle_BucketTaggingConfiguration(self, resource, item_value): 

305 resource['Tags'] = [ 

306 {"Key": k, "Value": v} for k, v in item_value['tagSets'][0]['tags'].items()] 

307 

308 def handle_BucketVersioningConfiguration(self, resource, item_value): 

309 # Config defaults versioning to 'Off' for a null value 

310 if item_value['status'] not in ('Enabled', 'Suspended'): 

311 resource['Versioning'] = {} 

312 return 

313 resource['Versioning'] = {'Status': item_value['status']} 

314 # `isMfaDeleteEnabled` is an optional boolean property - the key may be absent, 

315 # present with a null value, or present with a boolean value. 

316 # Mirror the describe source by populating Versioning.MFADelete only in the 

317 # boolean case. 

318 mfa_delete = item_value.get('isMfaDeleteEnabled') 

319 if mfa_delete is None: 

320 return 

321 resource['Versioning']['MFADelete'] = ( 

322 'Enabled' if mfa_delete else 'Disabled' 

323 ) 

324 

325 def handle_BucketWebsiteConfiguration(self, resource, item_value): 

326 website = {} 

327 if item_value['indexDocumentSuffix']: 

328 website['IndexDocument'] = { 

329 'Suffix': item_value['indexDocumentSuffix']} 

330 if item_value['errorDocument']: 

331 website['ErrorDocument'] = { 

332 'Key': item_value['errorDocument']} 

333 if item_value['redirectAllRequestsTo']: 

334 website['RedirectAllRequestsTo'] = { 

335 'HostName': item_value['redirectAllRequestsTo']['hostName'], 

336 'Protocol': item_value['redirectAllRequestsTo']['protocol']} 

337 for r in item_value['routingRules']: 

338 redirect = {} 

339 rule = {'Redirect': redirect} 

340 website.setdefault('RoutingRules', []).append(rule) 

341 if 'condition' in r: 

342 cond = {} 

343 for ck, rk in ( 

344 ('keyPrefixEquals', 'KeyPrefixEquals'), 

345 ('httpErrorCodeReturnedEquals', 

346 'HttpErrorCodeReturnedEquals')): 

347 if r['condition'][ck]: 

348 cond[rk] = r['condition'][ck] 

349 rule['Condition'] = cond 

350 for ck, rk in ( 

351 ('protocol', 'Protocol'), 

352 ('hostName', 'HostName'), 

353 ('replaceKeyPrefixWith', 'ReplaceKeyPrefixWith'), 

354 ('replaceKeyWith', 'ReplaceKeyWith'), 

355 ('httpRedirectCode', 'HttpRedirectCode')): 

356 if r['redirect'][ck]: 

357 redirect[rk] = r['redirect'][ck] 

358 resource['Website'] = website 

359 

360 

361@resources.register('s3') 

362class S3(query.QueryResourceManager): 

363 """Amazon's Simple Storage Service Buckets. 

364 

365 

366 By default and due to historical compatiblity cloud custodian will 

367 fetch a number of subdocuments (Acl, Policy, Tagging, Versioning, 

368 Website, Notification, Lifecycle, and Replication) for each bucket 

369 to allow policies authors's to target common bucket 

370 configurations. 

371 

372 This behavior can be customized to avoid extraneous api calls if a 

373 particular sub document is not needed for a policy, by setting the 

374 `augment-keys` parameter in a query block of the policy. 

375 

376 ie if we only care about bucket website and replication 

377 configuration, we can minimize the api calls needed to fetch a 

378 bucket by setting up augment-keys as follows. 

379 

380 :example: 

381 

382 .. code-block:: yaml 

383 

384 policies: 

385 - name: check-website-replication 

386 resource: s3 

387 query: 

388 - augment-keys: ['Website', 'Replication'] 

389 filters: 

390 - Website.ErrorDocument: not-null 

391 - Replication.ReplicationConfiguration.Rules: not-null 

392 

393 It also supports an automatic detection mode where the use of a subdocument 

394 in a filter is automically with the augment-keys value of 'detect'. 

395 

396 :example: 

397 

398 .. code-block:: yaml 

399 

400 policies: 

401 - name: check-website-replication 

402 resource: s3 

403 query: 

404 - augment-keys: 'detect' 

405 filters: 

406 - Website.ErrorDocument: not-null 

407 - Replication.ReplicationConfiguration.Rules: not-null 

408 

409 The default value for augment-keys is `all` to preserve historical 

410 compatiblity. `augment-keys` also supports the value of 'none' to 

411 disable all subdocument fetching except Location and Tags. 

412 

413 Note certain actions may implicitly depend on the corresponding 

414 subdocument being present. 

415 

416 """ 

417 

418 class resource_type(query.TypeInfo): 

419 service = 's3' 

420 arn_type = '' 

421 enum_spec = ('list_buckets', 'Buckets[]', None) 

422 # not used but we want some consistency on the metadata 

423 detail_spec = ('get_bucket_location', 'Bucket', 'Name', 'LocationConstraint') 

424 permissions_augment = ( 

425 "s3:GetBucketAcl", 

426 "s3:GetBucketLocation", 

427 "s3:GetBucketPolicy", 

428 "s3:GetBucketTagging", 

429 "s3:GetBucketVersioning", 

430 "s3:GetBucketLogging", 

431 "s3:GetBucketNotification", 

432 "s3:GetBucketWebsite", 

433 "s3:GetLifecycleConfiguration", 

434 "s3:GetReplicationConfiguration" 

435 ) 

436 name = id = 'Name' 

437 date = 'CreationDate' 

438 dimension = 'BucketName' 

439 cfn_type = config_type = 'AWS::S3::Bucket' 

440 

441 filter_registry = filters 

442 action_registry = actions 

443 source_mapping = { 

444 'describe': DescribeS3, 

445 'config': ConfigS3 

446 } 

447 

448 def validate(self): 

449 super().validate() 

450 BucketAssembly(self).validate() 

451 

452 def get_arns(self, resources): 

453 return ["arn:aws:s3:::{}".format(r["Name"]) for r in resources] 

454 

455 @classmethod 

456 def get_permissions(cls): 

457 perms = ["s3:ListAllMyBuckets"] 

458 perms.extend([n[-1] for n in S3_AUGMENT_TABLE]) 

459 return perms 

460 

461 

462S3_CONFIG_SUPPLEMENT_NULL_MAP = { 

463 'BucketLoggingConfiguration': u'{"destinationBucketName":null,"logFilePrefix":null}', 

464 'BucketPolicy': u'{"policyText":null}', 

465 'BucketVersioningConfiguration': u'{"status":"Off","isMfaDeleteEnabled":null}', 

466 'BucketAccelerateConfiguration': u'{"status":null}', 

467 'BucketNotificationConfiguration': u'{"configurations":{}}', 

468 'BucketLifecycleConfiguration': None, 

469 'AccessControlList': None, 

470 'BucketTaggingConfiguration': None, 

471 'BucketWebsiteConfiguration': None, 

472 'BucketReplicationConfiguration': None 

473} 

474 

475S3_AUGMENT_TABLE = ( 

476 ('get_bucket_location', 'Location', {}, None, 's3:GetBucketLocation'), 

477 ('get_bucket_tagging', 'Tags', [], 'TagSet', 's3:GetBucketTagging'), 

478 ('get_bucket_policy', 'Policy', None, 'Policy', 's3:GetBucketPolicy'), 

479 ('get_bucket_acl', 'Acl', None, None, 's3:GetBucketAcl'), 

480 ('get_bucket_replication', 

481 'Replication', None, None, 's3:GetReplicationConfiguration'), 

482 ('get_bucket_versioning', 'Versioning', None, None, 's3:GetBucketVersioning'), 

483 ('get_bucket_website', 'Website', None, None, 's3:GetBucketWebsite'), 

484 ('get_bucket_logging', 'Logging', None, 'LoggingEnabled', 's3:GetBucketLogging'), 

485 ('get_bucket_notification_configuration', 

486 'Notification', None, None, 's3:GetBucketNotification'), 

487 ('get_bucket_lifecycle_configuration', 

488 'Lifecycle', None, None, 's3:GetLifecycleConfiguration'), 

489 # ('get_bucket_cors', 'Cors'), 

490) 

491 

492 

493class BucketAssembly: 

494 

495 def __init__(self, manager): 

496 self.manager = manager 

497 self.default_region = None 

498 self.region_clients = {} 

499 self.session = None 

500 self.session_lock = None 

501 self.augment_fields = [] 

502 

503 def initialize(self): 

504 # construct a default boto3 client, using the current session region. 

505 self.session = local_session(self.manager.session_factory) 

506 self.session_lock = threading.RLock() 

507 self.default_region = self.manager.config.region 

508 self.region_clients[self.default_region] = self.session.client('s3') 

509 self.augment_fields = set(self.detect_augment_fields()) 

510 # location is required for client construction 

511 self.augment_fields.add('Location') 

512 # custodian always returns tags 

513 self.augment_fields.add('Tags') 

514 

515 def validate(self): 

516 config = self.get_augment_config() 

517 if isinstance(config, str) and config not in ('all', 'detect', 'none'): 

518 raise PolicyValidationError( 

519 "augment-keys supports 'all', 'detect', 'none' or list of keys found: %s" % config) 

520 elif isinstance(config, list): 

521 delta = set(config).difference([row[1] for row in S3_AUGMENT_TABLE]) 

522 if delta: 

523 raise PolicyValidationError("augment-keys - found invalid keys: %s" % (list(delta))) 

524 if not isinstance(config, (list, str)): 

525 raise PolicyValidationError( 

526 "augment-keys supports 'all', 'detect', 'none' or list of keys found: %s" % config) 

527 

528 def get_augment_config(self): 

529 augment_config = None 

530 for option in self.manager.data.get('query', []): 

531 if option and option.get('augment-keys') is not None: 

532 augment_config = option['augment-keys'] 

533 if augment_config is None: 

534 augment_config = 'all' 

535 return augment_config 

536 

537 def detect_augment_fields(self): 

538 # try to detect augment fields required for the policy execution 

539 # we want to avoid extraneous api calls unless they are being used by the policy. 

540 

541 detected_keys = [] 

542 augment_keys = [row[1] for row in S3_AUGMENT_TABLE] 

543 augment_config = self.get_augment_config() 

544 

545 if augment_config == 'all': 

546 return augment_keys 

547 elif augment_config == 'none': 

548 return [] 

549 elif isinstance(augment_config, list): 

550 return augment_config 

551 

552 for f in self.manager.iter_filters(): 

553 fkey = None 

554 if not isinstance(f, ValueFilter): 

555 continue 

556 

557 f = f.data 

558 # type: value 

559 if f.get('type', '') == 'value': 

560 fkey = f.get('key') 

561 # k: v dict 

562 elif len(f) == 1: 

563 fkey = list(f.keys())[0] 

564 if fkey is None: # pragma: no cover 

565 continue 

566 

567 # remove any jmespath expressions 

568 fkey = fkey.split('.', 1)[0] 

569 

570 # tags have explicit handling in value filters. 

571 if fkey.startswith('tag:'): 

572 fkey = 'Tags' 

573 

574 # denied methods checks get all keys 

575 if fkey.startswith('c7n:DeniedMethods'): 

576 return augment_keys 

577 

578 if fkey in augment_keys: 

579 detected_keys.append(fkey) 

580 

581 return detected_keys 

582 

583 def get_client(self, region): 

584 if region in self.region_clients: 

585 return self.region_clients[region] 

586 with self.session_lock: 

587 self.region_clients[region] = self.session.client('s3', region_name=region) 

588 return self.region_clients[region] 

589 

590 def assemble(self, bucket): 

591 

592 client = self.get_client(self.default_region) 

593 augments = list(S3_AUGMENT_TABLE) 

594 

595 for info in augments: 

596 # we use the offset, as tests manipulate the augments table 

597 method_name, key, default, select = info[:4] 

598 if key not in self.augment_fields: 

599 continue 

600 

601 method = getattr(client, method_name) 

602 

603 try: 

604 response = method(Bucket=bucket['Name']) 

605 # This is here as exception handling will change to defaults if not present 

606 response.pop('ResponseMetadata', None) 

607 value = response 

608 if select and select in value: 

609 value = value[select] 

610 except (ssl.SSLError, SSLError) as e: 

611 # Proxy issue most likely 

612 log.warning( 

613 "Bucket ssl error %s: %s %s", 

614 bucket['Name'], bucket.get('Location', 'unknown'), e) 

615 continue 

616 except ClientError as e: 

617 code = e.response['Error']['Code'] 

618 if code.startswith("NoSuch") or "NotFound" in code: 

619 value = default 

620 elif code == 'PermanentRedirect': # pragma: no cover 

621 # (09/2025)- its not clear how we get here given a client region switch post 

622 # location detection. 

623 # 

624 # change client region 

625 client = self.get_client(get_region(bucket)) 

626 # requeue now that we have correct region 

627 augments.append((method_name, key, default, select)) 

628 continue 

629 else: 

630 # for auth errors record as attribute and move on 

631 if e.response['Error']['Code'] == 'AccessDenied': 

632 bucket.setdefault('c7n:DeniedMethods', []).append(method_name) 

633 continue 

634 # else log and raise 

635 log.warning( 

636 "Bucket:%s unable to invoke method:%s error:%s ", 

637 bucket['Name'], method_name, e.response['Error']['Message']) 

638 raise 

639 

640 # for historical reasons we normalize EU to eu-west-1 on the bucket 

641 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlocation.html 

642 if key == 'Location' and value and value.get('LocationConstraint', '') == 'EU': 

643 value['LocationConstraint'] = 'eu-west-1' 

644 

645 bucket[key] = value 

646 

647 # For all subsequent attributes after location, use a client that is targeted to 

648 # the bucket's regional s3 endpoint. 

649 if key == 'Location' and get_region(bucket) != client.meta.region_name: 

650 client = self.get_client(get_region(bucket)) 

651 return bucket 

652 

653 

654def bucket_client(session, b, kms=False): 

655 region = get_region(b) 

656 

657 if kms: 

658 # Need v4 signature for aws:kms crypto, else let the sdk decide 

659 # based on region support. 

660 config = Config( 

661 signature_version='s3v4', 

662 read_timeout=200, connect_timeout=120) 

663 else: 

664 config = Config(read_timeout=200, connect_timeout=120) 

665 return session.client('s3', region_name=region, config=config) 

666 

667 

668def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()): 

669 for bucket in buckets: 

670 client = bucket_client(local_session(session_factory), bucket) 

671 # Bucket tags are set atomically for the set/document, we want 

672 # to refetch against current to guard against any staleness in 

673 # our cached representation across multiple policies or concurrent 

674 # modifications. 

675 

676 if 'get_bucket_tagging' in bucket.get('c7n:DeniedMethods', []): 

677 # avoid the additional API call if we already know that it's going 

678 # to result in AccessDenied. The chances that the resource's perms 

679 # would have changed between fetching the resource and acting on it 

680 # here are pretty low-- so the check here should suffice. 

681 log.warning( 

682 "Unable to get new set of bucket tags needed to modify tags," 

683 "skipping tag action for bucket: %s" % bucket["Name"]) 

684 continue 

685 

686 try: 

687 bucket['Tags'] = client.get_bucket_tagging( 

688 Bucket=bucket['Name']).get('TagSet', []) 

689 except ClientError as e: 

690 if e.response['Error']['Code'] != 'NoSuchTagSet': 

691 raise 

692 bucket['Tags'] = [] 

693 

694 new_tags = {t['Key']: t['Value'] for t in add_tags} 

695 for t in bucket.get('Tags', ()): 

696 if (t['Key'] not in new_tags and t['Key'] not in remove_tags): 

697 new_tags[t['Key']] = t['Value'] 

698 tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()] 

699 

700 try: 

701 client.put_bucket_tagging( 

702 Bucket=bucket['Name'], Tagging={'TagSet': tag_set}) 

703 except ClientError as e: 

704 log.exception( 

705 'Exception tagging bucket %s: %s', bucket['Name'], e) 

706 continue 

707 

708 

709def get_region(b): 

710 """Tries to get the bucket region from Location.LocationConstraint 

711 

712 Special cases: 

713 LocationConstraint EU defaults to eu-west-1 

714 LocationConstraint null defaults to us-east-1 

715 

716 Args: 

717 b (object): A bucket object 

718 

719 Returns: 

720 string: an aws region string 

721 """ 

722 remap = {None: 'us-east-1', 'EU': 'eu-west-1'} 

723 region = b.get('Location', {}).get('LocationConstraint') 

724 return remap.get(region, region) 

725 

726 

727@filters.register('metrics') 

728class S3Metrics(MetricsFilter): 

729 """S3 CW Metrics need special handling for attribute/dimension 

730 mismatch, and additional required dimension. 

731 """ 

732 

733 def get_dimensions(self, resource): 

734 dims = [{'Name': 'BucketName', 'Value': resource['Name']}] 

735 if (self.data['name'] == 'NumberOfObjects' and 

736 'dimensions' not in self.data): 

737 dims.append( 

738 {'Name': 'StorageType', 'Value': 'AllStorageTypes'}) 

739 return dims 

740 

741 

742@filters.register('cross-account') 

743class S3CrossAccountFilter(CrossAccountAccessFilter): 

744 """Filters cross-account access to S3 buckets 

745 

746 :example: 

747 

748 .. code-block:: yaml 

749 

750 policies: 

751 - name: s3-acl 

752 resource: s3 

753 region: us-east-1 

754 filters: 

755 - type: cross-account 

756 """ 

757 permissions = ('s3:GetBucketPolicy',) 

758 

759 def get_accounts(self): 

760 """add in elb access by default 

761 

762 ELB Accounts by region 

763 https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-access-logs.html 

764 

765 Redshift Accounts by region 

766 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#db-auditing-manage-log-files 

767 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#rs-db-auditing-cloud-trail-rs-acct-ids 

768 

769 Cloudtrail Accounts by region 

770 https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-supported-regions.html 

771 """ 

772 accounts = super(S3CrossAccountFilter, self).get_accounts() 

773 return accounts.union( 

774 [ 

775 # ELB accounts 

776 '127311923021', # us-east-1 

777 '033677994240', # us-east-2 

778 '027434742980', # us-west-1 

779 '797873946194', # us-west-2 

780 '098369216593', # af-south-1 

781 '985666609251', # ca-central-1 

782 '054676820928', # eu-central-1 

783 '897822967062', # eu-north-1 

784 '635631232127', # eu-south-1 

785 '156460612806', # eu-west-1 

786 '652711504416', # eu-west-2 

787 '009996457667', # eu-west-3 

788 '754344448648', # ap-east-1 

789 '582318560864', # ap-northeast-1 

790 '600734575887', # ap-northeast-2 

791 '383597477331', # ap-northeast-3 

792 '114774131450', # ap-southeast-1 

793 '783225319266', # ap-southeast-2 

794 '718504428378', # ap-south-1 

795 '076674570225', # me-south-1 

796 '507241528517', # sa-east-1 

797 '048591011584', # us-gov-west-1 or gov-cloud-1 

798 '190560391635', # us-gov-east-1 

799 '638102146993', # cn-north-1 

800 '037604701340', # cn-northwest-1 

801 

802 # Redshift audit logging 

803 '193672423079', # us-east-1 

804 '391106570357', # us-east-2 

805 '262260360010', # us-west-1 

806 '902366379725', # us-west-2 

807 '365689465814', # af-south-1 

808 '313564881002', # ap-east-1 

809 '865932855811', # ap-south-1 

810 '090321488786', # ap-northeast-3 

811 '760740231472', # ap-northeast-2 

812 '361669875840', # ap-southeast-1 

813 '762762565011', # ap-southeast-2 

814 '404641285394', # ap-northeast-1 

815 '907379612154', # ca-central-1 

816 '053454850223', # eu-central-1 

817 '210876761215', # eu-west-1 

818 '307160386991', # eu-west-2 

819 '945612479654', # eu-south-1 

820 '915173422425', # eu-west-3 

821 '729911121831', # eu-north-1 

822 '013126148197', # me-south-1 

823 '075028567923', # sa-east-1 

824 

825 # Cloudtrail accounts (psa. folks should be using 

826 # cloudtrail service in bucket policies) 

827 '086441151436', # us-east-1 

828 '475085895292', # us-west-2 

829 '388731089494', # us-west-1 

830 '113285607260', # us-west-2 

831 '819402241893', # ca-central-1 

832 '977081816279', # ap-south-1 

833 '492519147666', # ap-northeast-2 

834 '903692715234', # ap-southeast-1 

835 '284668455005', # ap-southeast-2 

836 '216624486486', # ap-northeast-1 

837 '035351147821', # eu-central-1 

838 '859597730677', # eu-west-1 

839 '282025262664', # eu-west-2 

840 '814480443879', # sa-east-1 

841 ]) 

842 

843 

844@filters.register('global-grants') 

845class GlobalGrantsFilter(Filter): 

846 """Filters for all S3 buckets that have global-grants 

847 

848 *Note* by default this filter allows for read access 

849 if the bucket has been configured as a website. This 

850 can be disabled per the example below. 

851 

852 :example: 

853 

854 .. code-block:: yaml 

855 

856 policies: 

857 - name: remove-global-grants 

858 resource: s3 

859 filters: 

860 - type: global-grants 

861 allow_website: false 

862 actions: 

863 - delete-global-grants 

864 

865 """ 

866 

867 schema = type_schema( 

868 'global-grants', 

869 allow_website={'type': 'boolean'}, 

870 operator={'type': 'string', 'enum': ['or', 'and']}, 

871 permissions={ 

872 'type': 'array', 'items': { 

873 'type': 'string', 'enum': [ 

874 'READ', 'WRITE', 'WRITE_ACP', 'READ_ACP', 'FULL_CONTROL']}}) 

875 

876 GLOBAL_ALL = "http://acs.amazonaws.com/groups/global/AllUsers" 

877 AUTH_ALL = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" 

878 

879 def process(self, buckets, event=None): 

880 with self.executor_factory(max_workers=5) as w: 

881 results = w.map(self.process_bucket, buckets) 

882 results = list(filter(None, list(results))) 

883 return results 

884 

885 def process_bucket(self, b): 

886 acl = b.get('Acl', {'Grants': []}) 

887 if not acl or not acl['Grants']: 

888 return 

889 

890 results = [] 

891 allow_website = self.data.get('allow_website', True) 

892 perms = self.data.get('permissions', []) 

893 

894 for grant in acl['Grants']: 

895 if 'URI' not in grant.get("Grantee", {}): 

896 continue 

897 if grant['Grantee']['URI'] not in [self.AUTH_ALL, self.GLOBAL_ALL]: 

898 continue 

899 if allow_website and grant['Permission'] == 'READ' and b['Website']: 

900 continue 

901 if not perms or (perms and grant['Permission'] in perms): 

902 results.append(grant['Permission']) 

903 

904 if results: 

905 set_annotation(b, 'GlobalPermissions', results) 

906 return b 

907 

908 

909class BucketActionBase(BaseAction): 

910 

911 def get_permissions(self): 

912 return self.permissions 

913 

914 def get_std_format_args(self, bucket): 

915 return { 

916 'account_id': self.manager.config.account_id, 

917 'region': self.manager.config.region, 

918 'bucket_name': bucket['Name'], 

919 'bucket_region': get_region(bucket) 

920 } 

921 

922 def process(self, buckets): 

923 return self._process_with_futures(buckets) 

924 

925 def _process_with_futures(self, buckets, *args, max_workers=3, **kwargs): 

926 errors = 0 

927 results = [] 

928 with self.executor_factory(max_workers=max_workers) as w: 

929 futures = {} 

930 for b in buckets: 

931 futures[w.submit(self.process_bucket, b, *args, **kwargs)] = b 

932 for f in as_completed(futures): 

933 if f.exception(): 

934 b = futures[f] 

935 self.log.error( 

936 'error modifying bucket: policy:%s action:%s bucket:%s error:%s', 

937 self.manager.data.get('name'), self.name, b['Name'], f.exception() 

938 ) 

939 errors += 1 

940 continue 

941 results += filter(None, [f.result()]) 

942 if errors: 

943 self.log.error('encountered %d errors while processing %s', errors, self.name) 

944 raise PolicyExecutionError('%d resources failed', errors) 

945 return results 

946 

947 

948class BucketFilterBase(Filter): 

949 def get_std_format_args(self, bucket): 

950 return { 

951 'account_id': self.manager.config.account_id, 

952 'region': self.manager.config.region, 

953 'bucket_name': bucket['Name'], 

954 'bucket_region': get_region(bucket) 

955 } 

956 

957 

958@S3.action_registry.register("post-finding") 

959class BucketFinding(PostFinding): 

960 

961 resource_type = 'AwsS3Bucket' 

962 

963 def format_resource(self, r): 

964 owner = r.get("Acl", {}).get("Owner", {}) 

965 resource = { 

966 "Type": self.resource_type, 

967 "Id": "arn:aws:s3:::{}".format(r["Name"]), 

968 "Region": get_region(r), 

969 "Tags": {t["Key"]: t["Value"] for t in r.get("Tags", [])}, 

970 "Details": {self.resource_type: { 

971 "OwnerId": owner.get('ID', 'Unknown')}} 

972 } 

973 

974 if "DisplayName" in owner: 

975 resource["Details"]["AwsS3Bucket"]["OwnerName"] = owner['DisplayName'] 

976 

977 return filter_empty(resource) 

978 

979 

980@S3.filter_registry.register('has-statement') 

981class S3HasStatementFilter(HasStatementFilter): 

982 def get_std_format_args(self, bucket): 

983 return { 

984 'account_id': self.manager.config.account_id, 

985 'region': self.manager.config.region, 

986 'bucket_name': bucket['Name'], 

987 'bucket_region': get_region(bucket) 

988 } 

989 

990 

991@S3.filter_registry.register('lock-configuration') 

992class S3LockConfigurationFilter(ValueFilter): 

993 """ 

994 Filter S3 buckets based on their object lock configurations 

995 

996 :example: 

997 

998 Get all buckets where lock configuration mode is COMPLIANCE 

999 

1000 .. code-block:: yaml 

1001 

1002 policies: 

1003 - name: lock-configuration-compliance 

1004 resource: aws.s3 

1005 filters: 

1006 - type: lock-configuration 

1007 key: Rule.DefaultRetention.Mode 

1008 value: COMPLIANCE 

1009 

1010 """ 

1011 schema = type_schema('lock-configuration', rinherit=ValueFilter.schema) 

1012 permissions = ('s3:GetBucketObjectLockConfiguration',) 

1013 annotate = True 

1014 annotation_key = 'c7n:ObjectLockConfiguration' 

1015 

1016 def _process_resource(self, client, resource): 

1017 try: 

1018 config = client.get_object_lock_configuration( 

1019 Bucket=resource['Name'] 

1020 )['ObjectLockConfiguration'] 

1021 except ClientError as e: 

1022 if e.response['Error']['Code'] == 'ObjectLockConfigurationNotFoundError': 

1023 config = None 

1024 else: 

1025 raise 

1026 resource[self.annotation_key] = config 

1027 

1028 def process(self, resources, event=None): 

1029 client = local_session(self.manager.session_factory).client('s3') 

1030 with self.executor_factory(max_workers=3) as w: 

1031 futures = [] 

1032 for res in resources: 

1033 if self.annotation_key in res: 

1034 continue 

1035 futures.append(w.submit(self._process_resource, client, res)) 

1036 for f in as_completed(futures): 

1037 exc = f.exception() 

1038 if exc: 

1039 self.log.error( 

1040 "Exception getting bucket lock configuration \n %s" % ( 

1041 exc)) 

1042 return super().process(resources, event) 

1043 

1044 def __call__(self, r): 

1045 return super().__call__(r.setdefault(self.annotation_key, None)) 

1046 

1047 

1048ENCRYPTION_STATEMENT_GLOB = { 

1049 'Effect': 'Deny', 

1050 'Principal': '*', 

1051 'Action': 's3:PutObject', 

1052 "Condition": { 

1053 "StringNotEquals": { 

1054 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}} 

1055 

1056 

1057@filters.register('no-encryption-statement') 

1058class EncryptionEnabledFilter(Filter): 

1059 """Find buckets with missing encryption policy statements. 

1060 

1061 :example: 

1062 

1063 .. code-block:: yaml 

1064 

1065 policies: 

1066 - name: s3-bucket-not-encrypted 

1067 resource: s3 

1068 filters: 

1069 - type: no-encryption-statement 

1070 """ 

1071 schema = type_schema( 

1072 'no-encryption-statement') 

1073 

1074 def get_permissions(self): 

1075 perms = self.manager.get_resource_manager('s3').get_permissions() 

1076 return perms 

1077 

1078 def process(self, buckets, event=None): 

1079 return list(filter(None, map(self.process_bucket, buckets))) 

1080 

1081 def process_bucket(self, b): 

1082 p = b.get('Policy') 

1083 if p is None: 

1084 return b 

1085 p = json.loads(p) 

1086 encryption_statement = dict(ENCRYPTION_STATEMENT_GLOB) 

1087 

1088 statements = p.get('Statement', []) 

1089 check = False 

1090 for s in list(statements): 

1091 if 'Sid' in s: 

1092 encryption_statement["Sid"] = s["Sid"] 

1093 if 'Resource' in s: 

1094 encryption_statement["Resource"] = s["Resource"] 

1095 if s == encryption_statement: 

1096 check = True 

1097 break 

1098 if check: 

1099 return None 

1100 else: 

1101 return b 

1102 

1103 

1104@filters.register('missing-statement') 

1105@filters.register('missing-policy-statement') 

1106class MissingPolicyStatementFilter(Filter): 

1107 """Find buckets missing a set of named policy statements. 

1108 

1109 :example: 

1110 

1111 .. code-block:: yaml 

1112 

1113 policies: 

1114 - name: s3-bucket-missing-statement 

1115 resource: s3 

1116 filters: 

1117 - type: missing-statement 

1118 statement_ids: 

1119 - RequiredEncryptedPutObject 

1120 """ 

1121 

1122 schema = type_schema( 

1123 'missing-policy-statement', 

1124 aliases=('missing-statement',), 

1125 statement_ids={'type': 'array', 'items': {'type': 'string'}}) 

1126 

1127 def __call__(self, b): 

1128 p = b.get('Policy') 

1129 if p is None: 

1130 return b 

1131 

1132 p = json.loads(p) 

1133 

1134 required = list(self.data.get('statement_ids', [])) 

1135 statements = p.get('Statement', []) 

1136 for s in list(statements): 

1137 if s.get('Sid') in required: 

1138 required.remove(s['Sid']) 

1139 if not required: 

1140 return False 

1141 return True 

1142 

1143 

1144@filters.register('bucket-notification') 

1145class BucketNotificationFilter(ValueFilter): 

1146 """Filter based on bucket notification configuration. 

1147 

1148 :example: 

1149 

1150 .. code-block:: yaml 

1151 

1152 policies: 

1153 - name: delete-incorrect-notification 

1154 resource: s3 

1155 filters: 

1156 - type: bucket-notification 

1157 kind: lambda 

1158 key: Id 

1159 value: "IncorrectLambda" 

1160 op: eq 

1161 actions: 

1162 - type: delete-bucket-notification 

1163 statement_ids: matched 

1164 """ 

1165 

1166 schema = type_schema( 

1167 'bucket-notification', 

1168 required=['kind'], 

1169 kind={'type': 'string', 'enum': ['lambda', 'sns', 'sqs']}, 

1170 rinherit=ValueFilter.schema) 

1171 schema_alias = False 

1172 annotation_key = 'c7n:MatchedNotificationConfigurationIds' 

1173 

1174 permissions = ('s3:GetBucketNotification',) 

1175 

1176 FIELDS = { 

1177 'lambda': 'LambdaFunctionConfigurations', 

1178 'sns': 'TopicConfigurations', 

1179 'sqs': 'QueueConfigurations' 

1180 } 

1181 

1182 def process(self, buckets, event=None): 

1183 return super(BucketNotificationFilter, self).process(buckets, event) 

1184 

1185 def __call__(self, bucket): 

1186 

1187 field = self.FIELDS[self.data['kind']] 

1188 found = False 

1189 for config in bucket.get('Notification', {}).get(field, []): 

1190 if self.match(config): 

1191 set_annotation( 

1192 bucket, 

1193 BucketNotificationFilter.annotation_key, 

1194 config['Id']) 

1195 found = True 

1196 return found 

1197 

1198 

1199@filters.register('bucket-logging') 

1200class BucketLoggingFilter(BucketFilterBase): 

1201 """Filter based on bucket logging configuration. 

1202 

1203 :example: 

1204 

1205 .. code-block:: yaml 

1206 

1207 policies: 

1208 - name: add-bucket-logging-if-missing 

1209 resource: s3 

1210 filters: 

1211 - type: bucket-logging 

1212 op: disabled 

1213 actions: 

1214 - type: toggle-logging 

1215 target_bucket: "{account_id}-{region}-s3-logs" 

1216 target_prefix: "{source_bucket_name}/" 

1217 

1218 policies: 

1219 - name: update-incorrect-or-missing-logging 

1220 resource: s3 

1221 filters: 

1222 - type: bucket-logging 

1223 op: not-equal 

1224 target_bucket: "{account_id}-{region}-s3-logs" 

1225 target_prefix: "{account}/{source_bucket_name}/" 

1226 actions: 

1227 - type: toggle-logging 

1228 target_bucket: "{account_id}-{region}-s3-logs" 

1229 target_prefix: "{account}/{source_bucket_name}/" 

1230 """ 

1231 

1232 schema = type_schema( 

1233 'bucket-logging', 

1234 op={'enum': ['enabled', 'disabled', 'equal', 'not-equal', 'eq', 'ne']}, 

1235 required=['op'], 

1236 target_bucket={'type': 'string'}, 

1237 target_prefix={'type': 'string'}) 

1238 schema_alias = False 

1239 account_name = None 

1240 

1241 permissions = ("s3:GetBucketLogging", "iam:ListAccountAliases") 

1242 

1243 def process(self, buckets, event=None): 

1244 return list(filter(None, map(self.process_bucket, buckets))) 

1245 

1246 def process_bucket(self, b): 

1247 if self.match_bucket(b): 

1248 return b 

1249 

1250 def match_bucket(self, b): 

1251 op = self.data.get('op') 

1252 

1253 logging = b.get('Logging', {}) 

1254 if op == 'disabled': 

1255 return logging == {} 

1256 elif op == 'enabled': 

1257 return logging != {} 

1258 

1259 if self.account_name is None: 

1260 session = local_session(self.manager.session_factory) 

1261 self.account_name = get_account_alias_from_sts(session) 

1262 

1263 variables = self.get_std_format_args(b) 

1264 variables.update({ 

1265 'account': self.account_name, 

1266 'source_bucket_name': b['Name'], 

1267 'source_bucket_region': get_region(b), 

1268 'target_bucket_name': self.data.get('target_bucket'), 

1269 'target_prefix': self.data.get('target_prefix'), 

1270 }) 

1271 data = format_string_values(self.data, **variables) 

1272 target_bucket = data.get('target_bucket') 

1273 target_prefix = data.get('target_prefix', b['Name'] + '/') 

1274 

1275 target_config = { 

1276 "TargetBucket": target_bucket, 

1277 "TargetPrefix": target_prefix 

1278 } if target_bucket else {} 

1279 

1280 if op in ('not-equal', 'ne'): 

1281 return logging != target_config 

1282 else: 

1283 return logging == target_config 

1284 

1285 

1286@actions.register('delete-bucket-notification') 

1287class DeleteBucketNotification(BucketActionBase): 

1288 """Action to delete S3 bucket notification configurations""" 

1289 

1290 schema = type_schema( 

1291 'delete-bucket-notification', 

1292 required=['statement_ids'], 

1293 statement_ids={'oneOf': [ 

1294 {'enum': ['matched']}, 

1295 {'type': 'array', 'items': {'type': 'string'}}]}) 

1296 

1297 permissions = ('s3:PutBucketNotification',) 

1298 

1299 def process_bucket(self, bucket): 

1300 n = bucket['Notification'] 

1301 if not n: 

1302 return 

1303 

1304 statement_ids = self.data.get('statement_ids') 

1305 if statement_ids == 'matched': 

1306 statement_ids = bucket.get(BucketNotificationFilter.annotation_key, ()) 

1307 if not statement_ids: 

1308 return 

1309 

1310 cfg = defaultdict(list) 

1311 

1312 for t in BucketNotificationFilter.FIELDS.values(): 

1313 for c in n.get(t, []): 

1314 if c['Id'] not in statement_ids: 

1315 cfg[t].append(c) 

1316 

1317 client = bucket_client(local_session(self.manager.session_factory), bucket) 

1318 client.put_bucket_notification_configuration( 

1319 Bucket=bucket['Name'], 

1320 NotificationConfiguration=cfg) 

1321 

1322 

1323@actions.register('no-op') 

1324class NoOp(BucketActionBase): 

1325 

1326 schema = type_schema('no-op') 

1327 permissions = ('s3:ListAllMyBuckets',) 

1328 

1329 def process(self, buckets): 

1330 return None 

1331 

1332 

1333@actions.register('set-statements') 

1334class SetPolicyStatement(BucketActionBase): 

1335 """Action to add or update policy statements to S3 buckets 

1336 

1337 :example: 

1338 

1339 .. code-block:: yaml 

1340 

1341 policies: 

1342 - name: force-s3-https 

1343 resource: s3 

1344 actions: 

1345 - type: set-statements 

1346 statements: 

1347 - Sid: "DenyHttp" 

1348 Effect: "Deny" 

1349 Action: "s3:GetObject" 

1350 Principal: 

1351 AWS: "*" 

1352 Resource: "arn:aws:s3:::{bucket_name}/*" 

1353 Condition: 

1354 Bool: 

1355 "aws:SecureTransport": false 

1356 """ 

1357 

1358 permissions = ('s3:PutBucketPolicy',) 

1359 

1360 schema = type_schema( 

1361 'set-statements', 

1362 **{ 

1363 'statements': { 

1364 'type': 'array', 

1365 'items': { 

1366 'type': 'object', 

1367 'properties': { 

1368 'Sid': {'type': 'string'}, 

1369 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']}, 

1370 'Principal': {'anyOf': [{'type': 'string'}, 

1371 {'type': 'object'}, {'type': 'array'}]}, 

1372 'NotPrincipal': {'anyOf': [{'type': 'object'}, {'type': 'array'}]}, 

1373 'Action': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1374 'NotAction': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1375 'Resource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1376 'NotResource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1377 'Condition': {'type': 'object'} 

1378 }, 

1379 'required': ['Sid', 'Effect'], 

1380 'oneOf': [ 

1381 {'required': ['Principal', 'Action', 'Resource']}, 

1382 {'required': ['NotPrincipal', 'Action', 'Resource']}, 

1383 {'required': ['Principal', 'NotAction', 'Resource']}, 

1384 {'required': ['NotPrincipal', 'NotAction', 'Resource']}, 

1385 {'required': ['Principal', 'Action', 'NotResource']}, 

1386 {'required': ['NotPrincipal', 'Action', 'NotResource']}, 

1387 {'required': ['Principal', 'NotAction', 'NotResource']}, 

1388 {'required': ['NotPrincipal', 'NotAction', 'NotResource']} 

1389 ] 

1390 } 

1391 } 

1392 } 

1393 ) 

1394 

1395 def process_bucket(self, bucket): 

1396 policy = bucket.get('Policy') or '{}' 

1397 

1398 target_statements = format_string_values( 

1399 copy.deepcopy({s['Sid']: s for s in self.data.get('statements', [])}), 

1400 **self.get_std_format_args(bucket)) 

1401 

1402 policy = json.loads(policy) 

1403 bucket_statements = policy.setdefault('Statement', []) 

1404 

1405 for s in bucket_statements: 

1406 if s.get('Sid') not in target_statements: 

1407 continue 

1408 if s == target_statements[s['Sid']]: 

1409 target_statements.pop(s['Sid']) 

1410 

1411 if not target_statements: 

1412 return 

1413 

1414 bucket_statements.extend(target_statements.values()) 

1415 policy = json.dumps(policy) 

1416 

1417 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1418 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=policy) 

1419 return {'Name': bucket['Name'], 'Policy': policy} 

1420 

1421 

1422@actions.register('remove-statements') 

1423class RemovePolicyStatement(RemovePolicyBase): 

1424 """Action to remove policy statements from S3 buckets 

1425 

1426 :example: 

1427 

1428 .. code-block:: yaml 

1429 

1430 policies: 

1431 - name: s3-remove-encrypt-put 

1432 resource: s3 

1433 filters: 

1434 - type: has-statement 

1435 statement_ids: 

1436 - RequireEncryptedPutObject 

1437 actions: 

1438 - type: remove-statements 

1439 statement_ids: 

1440 - RequiredEncryptedPutObject 

1441 """ 

1442 

1443 permissions = ("s3:PutBucketPolicy", "s3:DeleteBucketPolicy") 

1444 

1445 def process(self, buckets): 

1446 with self.executor_factory(max_workers=3) as w: 

1447 futures = {} 

1448 results = [] 

1449 for b in buckets: 

1450 futures[w.submit(self.process_bucket, b)] = b 

1451 for f in as_completed(futures): 

1452 if f.exception(): 

1453 b = futures[f] 

1454 self.log.error('error modifying bucket:%s\n%s', 

1455 b['Name'], f.exception()) 

1456 results += filter(None, [f.result()]) 

1457 return results 

1458 

1459 def process_bucket(self, bucket): 

1460 p = bucket.get('Policy') 

1461 if p is None: 

1462 return 

1463 

1464 p = json.loads(p) 

1465 

1466 statements, found = self.process_policy( 

1467 p, bucket, CrossAccountAccessFilter.annotation_key) 

1468 

1469 if not found: 

1470 return 

1471 

1472 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1473 

1474 if not statements: 

1475 s3.delete_bucket_policy(Bucket=bucket['Name']) 

1476 else: 

1477 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=json.dumps(p)) 

1478 return {'Name': bucket['Name'], 'State': 'PolicyRemoved', 'Statements': found} 

1479 

1480 

1481@actions.register('set-replication') 

1482class SetBucketReplicationConfig(BucketActionBase): 

1483 """Action to add or remove replication configuration statement from S3 buckets 

1484 

1485 :example: 

1486 

1487 .. code-block:: yaml 

1488 

1489 policies: 

1490 - name: s3-unapproved-account-replication 

1491 resource: s3 

1492 filters: 

1493 - type: value 

1494 key: Replication.ReplicationConfiguration.Rules[].Destination.Account 

1495 value: present 

1496 - type: value 

1497 key: Replication.ReplicationConfiguration.Rules[].Destination.Account 

1498 value_from: 

1499 url: 's3:///path/to/file.json' 

1500 format: json 

1501 expr: "approved_accounts.*" 

1502 op: ni 

1503 actions: 

1504 - type: set-replication 

1505 state: enable 

1506 """ 

1507 schema = type_schema( 

1508 'set-replication', 

1509 state={'type': 'string', 'enum': ['enable', 'disable', 'remove']}) 

1510 permissions = ("s3:GetReplicationConfiguration", "s3:PutReplicationConfiguration") 

1511 

1512 def process(self, buckets): 

1513 with self.executor_factory(max_workers=3) as w: 

1514 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1515 errors = [] 

1516 for future in as_completed(futures): 

1517 bucket = futures[future] 

1518 try: 

1519 future.result() 

1520 except ClientError as e: 

1521 errors.append("Message: %s Bucket: %s", e, bucket['Name']) 

1522 if errors: 

1523 raise Exception('\n'.join(map(str, errors))) 

1524 

1525 def process_bucket(self, bucket): 

1526 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1527 state = self.data.get('state') 

1528 if state is not None: 

1529 if state == 'remove': 

1530 s3.delete_bucket_replication(Bucket=bucket['Name']) 

1531 return {'Name': bucket['Name'], 'State': 'ReplicationConfigRemoved'} 

1532 if state in ('enable', 'disable'): 

1533 config = s3.get_bucket_replication(Bucket=bucket['Name']) 

1534 for rule in config['ReplicationConfiguration']['Rules']: 

1535 rule['Status'] = 'Enabled' if state == 'enable' else 'Disabled' 

1536 s3.put_bucket_replication( 

1537 Bucket=bucket['Name'], 

1538 ReplicationConfiguration=config['ReplicationConfiguration'] 

1539 ) 

1540 return {'Name': bucket['Name'], 'State': 'ReplicationConfigUpdated'} 

1541 

1542 

1543@filters.register('check-public-block') 

1544class FilterPublicBlock(Filter): 

1545 """Filter for s3 bucket public blocks 

1546 

1547 If no filter paramaters are provided it checks to see if any are unset or False. 

1548 

1549 If parameters are provided only the provided ones are checked. 

1550 

1551 :example: 

1552 

1553 .. code-block:: yaml 

1554 

1555 policies: 

1556 - name: CheckForPublicAclBlock-Off 

1557 resource: s3 

1558 region: us-east-1 

1559 filters: 

1560 - type: check-public-block 

1561 BlockPublicAcls: true 

1562 BlockPublicPolicy: true 

1563 """ 

1564 

1565 schema = type_schema( 

1566 'check-public-block', 

1567 BlockPublicAcls={'type': 'boolean'}, 

1568 IgnorePublicAcls={'type': 'boolean'}, 

1569 BlockPublicPolicy={'type': 'boolean'}, 

1570 RestrictPublicBuckets={'type': 'boolean'}) 

1571 permissions = ("s3:GetBucketPublicAccessBlock",) 

1572 keys = ( 

1573 'BlockPublicPolicy', 'BlockPublicAcls', 'IgnorePublicAcls', 'RestrictPublicBuckets') 

1574 annotation_key = 'c7n:PublicAccessBlock' 

1575 

1576 def process(self, buckets, event=None): 

1577 results = [] 

1578 with self.executor_factory(max_workers=2) as w: 

1579 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1580 for f in as_completed(futures): 

1581 if f.result(): 

1582 results.append(futures[f]) 

1583 return results 

1584 

1585 def process_bucket(self, bucket): 

1586 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1587 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys})) 

1588 if self.annotation_key not in bucket: 

1589 try: 

1590 config = s3.get_public_access_block( 

1591 Bucket=bucket['Name'])['PublicAccessBlockConfiguration'] 

1592 except ClientError as e: 

1593 error_code = e.response['Error']['Code'] 

1594 if error_code == 'NoSuchPublicAccessBlockConfiguration': 

1595 pass 

1596 elif error_code == 'AccessDenied': 

1597 # Follow the same logic as `assemble_bucket` - log and continue on access 

1598 # denied errors rather than halting a policy altogether 

1599 method = 'GetPublicAccessBlock' 

1600 log.warning( 

1601 "Bucket:%s unable to invoke method:%s error:%s ", 

1602 bucket['Name'], method, e.response['Error']['Message'] 

1603 ) 

1604 bucket.setdefault('c7n:DeniedMethods', []).append(method) 

1605 else: 

1606 raise 

1607 bucket[self.annotation_key] = config 

1608 return self.matches_filter(config) 

1609 

1610 def matches_filter(self, config): 

1611 key_set = [key for key in self.keys if key in self.data] 

1612 if key_set: 

1613 return all([self.data.get(key) is config[key] for key in key_set]) 

1614 else: 

1615 return not all(config.values()) 

1616 

1617 

1618@actions.register('set-public-block') 

1619class SetPublicBlock(BucketActionBase): 

1620 """Action to update Public Access blocks on S3 buckets 

1621 

1622 If no action parameters are provided all settings will be set to the `state`, which defaults 

1623 

1624 If action parameters are provided, those will be set and other extant values preserved. 

1625 

1626 :example: 

1627 

1628 .. code-block:: yaml 

1629 

1630 policies: 

1631 - name: s3-public-block-enable-all 

1632 resource: s3 

1633 filters: 

1634 - type: check-public-block 

1635 actions: 

1636 - type: set-public-block 

1637 

1638 policies: 

1639 - name: s3-public-block-disable-all 

1640 resource: s3 

1641 filters: 

1642 - type: check-public-block 

1643 actions: 

1644 - type: set-public-block 

1645 state: false 

1646 

1647 policies: 

1648 - name: s3-public-block-enable-some 

1649 resource: s3 

1650 filters: 

1651 - or: 

1652 - type: check-public-block 

1653 BlockPublicAcls: false 

1654 - type: check-public-block 

1655 BlockPublicPolicy: false 

1656 actions: 

1657 - type: set-public-block 

1658 BlockPublicAcls: true 

1659 BlockPublicPolicy: true 

1660 

1661 """ 

1662 

1663 schema = type_schema( 

1664 'set-public-block', 

1665 state={'type': 'boolean', 'default': True}, 

1666 BlockPublicAcls={'type': 'boolean'}, 

1667 IgnorePublicAcls={'type': 'boolean'}, 

1668 BlockPublicPolicy={'type': 'boolean'}, 

1669 RestrictPublicBuckets={'type': 'boolean'}) 

1670 permissions = ("s3:GetBucketPublicAccessBlock", "s3:PutBucketPublicAccessBlock") 

1671 keys = FilterPublicBlock.keys 

1672 annotation_key = FilterPublicBlock.annotation_key 

1673 

1674 def process(self, buckets): 

1675 with self.executor_factory(max_workers=3) as w: 

1676 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1677 for future in as_completed(futures): 

1678 future.result() 

1679 

1680 def process_bucket(self, bucket): 

1681 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1682 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys})) 

1683 if self.annotation_key not in bucket: 

1684 try: 

1685 config = s3.get_public_access_block( 

1686 Bucket=bucket['Name'])['PublicAccessBlockConfiguration'] 

1687 except ClientError as e: 

1688 if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration': 

1689 raise 

1690 

1691 key_set = [key for key in self.keys if key in self.data] 

1692 if key_set: 

1693 for key in key_set: 

1694 config[key] = self.data.get(key) 

1695 else: 

1696 for key in self.keys: 

1697 config[key] = self.data.get('state', True) 

1698 s3.put_public_access_block( 

1699 Bucket=bucket['Name'], PublicAccessBlockConfiguration=config) 

1700 

1701 

1702@actions.register('toggle-versioning') 

1703class ToggleVersioning(BucketActionBase): 

1704 """Action to enable/suspend versioning on a S3 bucket 

1705 

1706 Note versioning can never be disabled only suspended. 

1707 

1708 :example: 

1709 

1710 .. code-block:: yaml 

1711 

1712 policies: 

1713 - name: s3-enable-versioning 

1714 resource: s3 

1715 filters: 

1716 - or: 

1717 - type: value 

1718 key: Versioning.Status 

1719 value: Suspended 

1720 - type: value 

1721 key: Versioning.Status 

1722 value: absent 

1723 actions: 

1724 - type: toggle-versioning 

1725 enabled: true 

1726 """ 

1727 

1728 schema = type_schema( 

1729 'toggle-versioning', 

1730 enabled={'type': 'boolean'}) 

1731 permissions = ("s3:PutBucketVersioning",) 

1732 

1733 def process_versioning(self, resource, state): 

1734 client = bucket_client( 

1735 local_session(self.manager.session_factory), resource) 

1736 try: 

1737 client.put_bucket_versioning( 

1738 Bucket=resource['Name'], 

1739 VersioningConfiguration={ 

1740 'Status': state}) 

1741 except ClientError as e: 

1742 if e.response['Error']['Code'] != 'AccessDenied': 

1743 log.error( 

1744 "Unable to put bucket versioning on bucket %s: %s" % (resource['Name'], e)) 

1745 raise 

1746 log.warning( 

1747 "Access Denied Bucket:%s while put bucket versioning" % resource['Name']) 

1748 

1749 # mfa delete enablement looks like it needs the serial and a current token. 

1750 def process(self, resources): 

1751 enabled = self.data.get('enabled', True) 

1752 for r in resources: 

1753 if 'Versioning' not in r or not r['Versioning']: 

1754 r['Versioning'] = {'Status': 'Suspended'} 

1755 if enabled and ( 

1756 r['Versioning']['Status'] == 'Suspended'): 

1757 self.process_versioning(r, 'Enabled') 

1758 if not enabled and r['Versioning']['Status'] == 'Enabled': 

1759 self.process_versioning(r, 'Suspended') 

1760 

1761 

1762@actions.register('toggle-logging') 

1763class ToggleLogging(BucketActionBase): 

1764 """Action to enable/disable logging on a S3 bucket. 

1765 

1766 Target bucket ACL must allow for WRITE and READ_ACP Permissions 

1767 Not specifying a target_prefix will default to the current bucket name. 

1768 https://docs.aws.amazon.com/AmazonS3/latest/dev/enable-logging-programming.html 

1769 

1770 :example: 

1771 

1772 .. code-block:: yaml 

1773 

1774 policies: 

1775 - name: s3-enable-logging 

1776 resource: s3 

1777 filters: 

1778 - "tag:Testing": present 

1779 actions: 

1780 - type: toggle-logging 

1781 target_bucket: log-bucket 

1782 target_prefix: logs123/ 

1783 

1784 policies: 

1785 - name: s3-force-standard-logging 

1786 resource: s3 

1787 filters: 

1788 - type: bucket-logging 

1789 op: not-equal 

1790 target_bucket: "{account_id}-{region}-s3-logs" 

1791 target_prefix: "{account}/{source_bucket_name}/" 

1792 actions: 

1793 - type: toggle-logging 

1794 target_bucket: "{account_id}-{region}-s3-logs" 

1795 target_prefix: "{account}/{source_bucket_name}/" 

1796 """ 

1797 schema = type_schema( 

1798 'toggle-logging', 

1799 enabled={'type': 'boolean'}, 

1800 target_bucket={'type': 'string'}, 

1801 target_prefix={'type': 'string'}) 

1802 

1803 permissions = ("s3:PutBucketLogging", "iam:ListAccountAliases") 

1804 

1805 def validate(self): 

1806 if self.data.get('enabled', True): 

1807 if not self.data.get('target_bucket'): 

1808 raise PolicyValidationError( 

1809 "target_bucket must be specified on %s" % ( 

1810 self.manager.data,)) 

1811 return self 

1812 

1813 def process(self, resources): 

1814 session = local_session(self.manager.session_factory) 

1815 kwargs = { 

1816 "enabled": self.data.get('enabled', True), 

1817 "session": session, 

1818 "account_name": get_account_alias_from_sts(session), 

1819 } 

1820 

1821 return self._process_with_futures(resources, **kwargs) 

1822 

1823 def process_bucket(self, r, enabled=None, session=None, account_name=None): 

1824 client = bucket_client(session, r) 

1825 is_logging = bool(r.get('Logging')) 

1826 

1827 if enabled: 

1828 variables = self.get_std_format_args(r) 

1829 variables.update({ 

1830 'account': account_name, 

1831 'source_bucket_name': r['Name'], 

1832 'source_bucket_region': get_region(r), 

1833 'target_bucket_name': self.data.get('target_bucket'), 

1834 'target_prefix': self.data.get('target_prefix'), 

1835 }) 

1836 data = format_string_values(self.data, **variables) 

1837 config = { 

1838 'TargetBucket': data.get('target_bucket'), 

1839 'TargetPrefix': data.get('target_prefix', r['Name'] + '/') 

1840 } 

1841 if not is_logging or r.get('Logging') != config: 

1842 client.put_bucket_logging( 

1843 Bucket=r['Name'], 

1844 BucketLoggingStatus={'LoggingEnabled': config} 

1845 ) 

1846 r['Logging'] = config 

1847 

1848 elif not enabled and is_logging: 

1849 client.put_bucket_logging( 

1850 Bucket=r['Name'], BucketLoggingStatus={}) 

1851 r['Logging'] = {} 

1852 

1853 

1854@actions.register('attach-encrypt') 

1855class AttachLambdaEncrypt(BucketActionBase): 

1856 """Action attaches lambda encryption policy to S3 bucket 

1857 supports attachment via lambda bucket notification or sns notification 

1858 to invoke lambda. a special topic value of `default` will utilize an 

1859 extant notification or create one matching the bucket name. 

1860 

1861 :example: 

1862 

1863 

1864 .. code-block:: yaml 

1865 

1866 

1867 policies: 

1868 - name: attach-lambda-encrypt 

1869 resource: s3 

1870 filters: 

1871 - type: missing-policy-statement 

1872 actions: 

1873 - type: attach-encrypt 

1874 role: arn:aws:iam::123456789012:role/my-role 

1875 

1876 """ 

1877 schema = type_schema( 

1878 'attach-encrypt', 

1879 role={'type': 'string'}, 

1880 tags={'type': 'object'}, 

1881 topic={'type': 'string'}) 

1882 

1883 permissions = ( 

1884 "s3:PutBucketNotification", "s3:GetBucketNotification", 

1885 # lambda manager uses quite a few perms to provision lambdas 

1886 # and event sources, hard to disamgibuate punt for now. 

1887 "lambda:*", 

1888 ) 

1889 

1890 def __init__(self, data=None, manager=None): 

1891 self.data = data or {} 

1892 self.manager = manager 

1893 

1894 def validate(self): 

1895 if (not getattr(self.manager.config, 'dryrun', True) and 

1896 not self.data.get('role', self.manager.config.assume_role)): 

1897 raise PolicyValidationError( 

1898 "attach-encrypt: role must be specified either " 

1899 "via assume or in config on %s" % (self.manager.data,)) 

1900 

1901 return self 

1902 

1903 def process(self, buckets): 

1904 from c7n.mu import LambdaManager 

1905 from c7n.ufuncs.s3crypt import get_function 

1906 

1907 account_id = self.manager.config.account_id 

1908 topic_arn = self.data.get('topic') 

1909 

1910 func = get_function( 

1911 None, self.data.get('role', self.manager.config.assume_role), 

1912 account_id=account_id, tags=self.data.get('tags')) 

1913 

1914 regions = {get_region(b) for b in buckets} 

1915 

1916 # session managers by region 

1917 region_sessions = {} 

1918 for r in regions: 

1919 region_sessions[r] = functools.partial( 

1920 self.manager.session_factory, region=r) 

1921 

1922 # Publish function to all of our buckets regions 

1923 region_funcs = {} 

1924 

1925 for r in regions: 

1926 lambda_mgr = LambdaManager(region_sessions[r]) 

1927 lambda_mgr.publish(func) 

1928 region_funcs[r] = func 

1929 

1930 with self.executor_factory(max_workers=3) as w: 

1931 results = [] 

1932 futures = [] 

1933 for b in buckets: 

1934 region = get_region(b) 

1935 futures.append( 

1936 w.submit( 

1937 self.process_bucket, 

1938 region_funcs[region], 

1939 b, 

1940 topic_arn, 

1941 account_id, 

1942 region_sessions[region] 

1943 )) 

1944 for f in as_completed(futures): 

1945 if f.exception(): 

1946 log.exception( 

1947 "Error attaching lambda-encrypt %s" % (f.exception())) 

1948 results.append(f.result()) 

1949 return list(filter(None, results)) 

1950 

1951 def process_bucket(self, func, bucket, topic, account_id, session_factory): 

1952 from c7n.mu import BucketSNSNotification, BucketLambdaNotification 

1953 if topic: 

1954 topic = None if topic == 'default' else topic 

1955 source = BucketSNSNotification(session_factory, bucket, topic) 

1956 else: 

1957 source = BucketLambdaNotification( 

1958 {'account_s3': account_id}, session_factory, bucket) 

1959 return source.add(func, None) 

1960 

1961 

1962@actions.register('encryption-policy') 

1963class EncryptionRequiredPolicy(BucketActionBase): 

1964 """Action to apply an encryption policy to S3 buckets 

1965 

1966 

1967 :example: 

1968 

1969 .. code-block:: yaml 

1970 

1971 policies: 

1972 - name: s3-enforce-encryption 

1973 resource: s3 

1974 mode: 

1975 type: cloudtrail 

1976 events: 

1977 - CreateBucket 

1978 actions: 

1979 - encryption-policy 

1980 """ 

1981 

1982 permissions = ("s3:GetBucketPolicy", "s3:PutBucketPolicy") 

1983 schema = type_schema('encryption-policy') 

1984 

1985 def __init__(self, data=None, manager=None): 

1986 self.data = data or {} 

1987 self.manager = manager 

1988 

1989 def process(self, buckets): 

1990 with self.executor_factory(max_workers=3) as w: 

1991 results = w.map(self.process_bucket, buckets) 

1992 results = list(filter(None, list(results))) 

1993 return results 

1994 

1995 def process_bucket(self, b): 

1996 p = b['Policy'] 

1997 if p is None: 

1998 log.info("No policy found, creating new") 

1999 p = {'Version': "2012-10-17", "Statement": []} 

2000 else: 

2001 p = json.loads(p) 

2002 

2003 encryption_sid = "RequiredEncryptedPutObject" 

2004 encryption_statement = { 

2005 'Sid': encryption_sid, 

2006 'Effect': 'Deny', 

2007 'Principal': '*', 

2008 'Action': 's3:PutObject', 

2009 "Resource": "arn:aws:s3:::%s/*" % b['Name'], 

2010 "Condition": { 

2011 # AWS Managed Keys or KMS keys, note policy language 

2012 # does not support custom kms (todo add issue) 

2013 "StringNotEquals": { 

2014 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}} 

2015 

2016 statements = p.get('Statement', []) 

2017 for s in list(statements): 

2018 if s.get('Sid', '') == encryption_sid: 

2019 log.debug("Bucket:%s Found extant encrypt policy", b['Name']) 

2020 if s != encryption_statement: 

2021 log.info( 

2022 "Bucket:%s updating extant encrypt policy", b['Name']) 

2023 statements.remove(s) 

2024 else: 

2025 return 

2026 

2027 session = self.manager.session_factory() 

2028 s3 = bucket_client(session, b) 

2029 statements.append(encryption_statement) 

2030 p['Statement'] = statements 

2031 log.info('Bucket:%s attached encryption policy' % b['Name']) 

2032 

2033 try: 

2034 s3.put_bucket_policy( 

2035 Bucket=b['Name'], 

2036 Policy=json.dumps(p)) 

2037 except ClientError as e: 

2038 if e.response['Error']['Code'] == 'NoSuchBucket': 

2039 return 

2040 self.log.exception( 

2041 "Error on bucket:%s putting policy\n%s error:%s", 

2042 b['Name'], 

2043 json.dumps(statements, indent=2), e) 

2044 raise 

2045 return {'Name': b['Name'], 'State': 'PolicyAttached'} 

2046 

2047 

2048class BucketScanLog: 

2049 """Offload remediated key ids to a disk file in batches 

2050 

2051 A bucket keyspace is effectively infinite, we need to store partial 

2052 results out of memory, this class provides for a json log on disk 

2053 with partial write support. 

2054 

2055 json output format: 

2056 - [list_of_serialized_keys], 

2057 - [] # Empty list of keys at end when we close the buffer 

2058 

2059 """ 

2060 

2061 def __init__(self, log_dir, name): 

2062 self.log_dir = log_dir 

2063 self.name = name 

2064 self.fh = None 

2065 self.count = 0 

2066 

2067 @property 

2068 def path(self): 

2069 return os.path.join(self.log_dir, "%s.json" % self.name) 

2070 

2071 def __enter__(self): 

2072 # Don't require output directories 

2073 if self.log_dir is None: 

2074 return 

2075 

2076 self.fh = open(self.path, 'w') 

2077 self.fh.write("[\n") 

2078 return self 

2079 

2080 def __exit__(self, exc_type=None, exc_value=None, exc_frame=None): 

2081 if self.fh is None: 

2082 return 

2083 # we need an empty marker list at end to avoid trailing commas 

2084 self.fh.write("[]") 

2085 # and close the surrounding list 

2086 self.fh.write("\n]") 

2087 self.fh.close() 

2088 if not self.count: 

2089 os.remove(self.fh.name) 

2090 self.fh = None 

2091 return False 

2092 

2093 def add(self, keys): 

2094 self.count += len(keys) 

2095 if self.fh is None: 

2096 return 

2097 self.fh.write(dumps(keys)) 

2098 self.fh.write(",\n") 

2099 

2100 

2101class ScanBucket(BucketActionBase): 

2102 

2103 permissions = ("s3:ListBucket",) 

2104 

2105 bucket_ops = { 

2106 'standard': { 

2107 'iterator': 'list_objects', 

2108 'contents_key': ['Contents'], 

2109 'key_processor': 'process_key' 

2110 }, 

2111 'versioned': { 

2112 'iterator': 'list_object_versions', 

2113 'contents_key': ['Versions'], 

2114 'key_processor': 'process_version' 

2115 } 

2116 } 

2117 

2118 def __init__(self, data, manager=None): 

2119 super(ScanBucket, self).__init__(data, manager) 

2120 self.denied_buckets = set() 

2121 

2122 def get_bucket_style(self, b): 

2123 return ( 

2124 b.get('Versioning', {'Status': ''}).get('Status') in ( 

2125 'Enabled', 'Suspended') and 'versioned' or 'standard') 

2126 

2127 def get_bucket_op(self, b, op_name): 

2128 bucket_style = self.get_bucket_style(b) 

2129 op = self.bucket_ops[bucket_style][op_name] 

2130 if op_name == 'key_processor': 

2131 return getattr(self, op) 

2132 return op 

2133 

2134 def get_keys(self, b, key_set): 

2135 content_keys = self.get_bucket_op(b, 'contents_key') 

2136 keys = [] 

2137 for ck in content_keys: 

2138 keys.extend(key_set.get(ck, [])) 

2139 return keys 

2140 

2141 def process(self, buckets): 

2142 results = self._process_with_futures(self.process_bucket, buckets) 

2143 self.write_denied_buckets_file() 

2144 return results 

2145 

2146 def _process_with_futures(self, helper, buckets, max_workers=3): 

2147 results = [] 

2148 with self.executor_factory(max_workers) as w: 

2149 futures = {} 

2150 for b in buckets: 

2151 futures[w.submit(helper, b)] = b 

2152 for f in as_completed(futures): 

2153 if f.exception(): 

2154 b = futures[f] 

2155 self.log.error( 

2156 "Error on bucket:%s region:%s policy:%s error: %s", 

2157 b['Name'], b.get('Location', 'unknown'), 

2158 self.manager.data.get('name'), f.exception()) 

2159 self.denied_buckets.add(b['Name']) 

2160 continue 

2161 result = f.result() 

2162 if result: 

2163 results.append(result) 

2164 return results 

2165 

2166 def write_denied_buckets_file(self): 

2167 if (self.denied_buckets and 

2168 self.manager.ctx.log_dir and 

2169 not isinstance(self.manager.ctx.output, NullBlobOutput)): 

2170 with open( 

2171 os.path.join( 

2172 self.manager.ctx.log_dir, 'denied.json'), 'w') as fh: 

2173 json.dump(list(self.denied_buckets), fh, indent=2) 

2174 self.denied_buckets = set() 

2175 

2176 def process_bucket(self, b): 

2177 log.info( 

2178 "Scanning bucket:%s visitor:%s style:%s" % ( 

2179 b['Name'], self.__class__.__name__, self.get_bucket_style(b))) 

2180 

2181 s = self.manager.session_factory() 

2182 s3 = bucket_client(s, b) 

2183 

2184 # The bulk of _process_bucket function executes inline in 

2185 # calling thread/worker context, neither paginator nor 

2186 # bucketscan log should be used across worker boundary. 

2187 p = s3.get_paginator( 

2188 self.get_bucket_op(b, 'iterator')).paginate(Bucket=b['Name']) 

2189 

2190 with BucketScanLog(self.manager.ctx.log_dir, b['Name']) as key_log: 

2191 with self.executor_factory(max_workers=10) as w: 

2192 try: 

2193 return self._process_bucket(b, p, key_log, w) 

2194 except ClientError as e: 

2195 if e.response['Error']['Code'] == 'NoSuchBucket': 

2196 log.warning( 

2197 "Bucket:%s removed while scanning" % b['Name']) 

2198 return 

2199 if e.response['Error']['Code'] == 'AccessDenied': 

2200 log.warning( 

2201 "Access Denied Bucket:%s while scanning" % b['Name']) 

2202 self.denied_buckets.add(b['Name']) 

2203 return 

2204 log.exception( 

2205 "Error processing bucket:%s paginator:%s" % ( 

2206 b['Name'], p)) 

2207 

2208 __call__ = process_bucket 

2209 

2210 def _process_bucket(self, b, p, key_log, w): 

2211 count = 0 

2212 

2213 for key_set in p: 

2214 keys = self.get_keys(b, key_set) 

2215 count += len(keys) 

2216 futures = [] 

2217 

2218 for batch in chunks(keys, size=100): 

2219 if not batch: 

2220 continue 

2221 futures.append(w.submit(self.process_chunk, batch, b)) 

2222 

2223 for f in as_completed(futures): 

2224 if f.exception(): 

2225 log.exception("Exception Processing bucket:%s key batch %s" % ( 

2226 b['Name'], f.exception())) 

2227 continue 

2228 r = f.result() 

2229 if r: 

2230 key_log.add(r) 

2231 

2232 # Log completion at info level, progress at debug level 

2233 if key_set['IsTruncated']: 

2234 log.debug('Scan progress bucket:%s keys:%d remediated:%d ...', 

2235 b['Name'], count, key_log.count) 

2236 else: 

2237 log.info('Scan Complete bucket:%s keys:%d remediated:%d', 

2238 b['Name'], count, key_log.count) 

2239 

2240 b['KeyScanCount'] = count 

2241 b['KeyRemediated'] = key_log.count 

2242 return { 

2243 'Bucket': b['Name'], 'Remediated': key_log.count, 'Count': count} 

2244 

2245 def process_chunk(self, batch, bucket): 

2246 raise NotImplementedError() 

2247 

2248 def process_key(self, s3, key, bucket_name, info=None): 

2249 raise NotImplementedError() 

2250 

2251 def process_version(self, s3, bucket, key): 

2252 raise NotImplementedError() 

2253 

2254 

2255@actions.register('encrypt-keys') 

2256class EncryptExtantKeys(ScanBucket): 

2257 """Action to encrypt unencrypted S3 objects 

2258 

2259 :example: 

2260 

2261 .. code-block:: yaml 

2262 

2263 policies: 

2264 - name: s3-encrypt-objects 

2265 resource: s3 

2266 actions: 

2267 - type: encrypt-keys 

2268 crypto: aws:kms 

2269 key-id: 9c3983be-c6cf-11e6-9d9d-cec0c932ce01 

2270 """ 

2271 

2272 permissions = ( 

2273 "s3:GetObject", 

2274 "s3:PutObject", 

2275 "s3:DeleteObjectVersion", 

2276 "s3:RestoreObject", 

2277 ) + ScanBucket.permissions 

2278 

2279 schema = { 

2280 'type': 'object', 

2281 'additionalProperties': False, 

2282 'properties': { 

2283 'type': {'enum': ['encrypt-keys']}, 

2284 'report-only': {'type': 'boolean'}, 

2285 'glacier': {'type': 'boolean'}, 

2286 'large': {'type': 'boolean'}, 

2287 'crypto': {'enum': ['AES256', 'aws:kms']}, 

2288 'key-id': {'type': 'string'} 

2289 }, 

2290 'dependencies': { 

2291 'key-id': { 

2292 'properties': { 

2293 'crypto': {'pattern': 'aws:kms'} 

2294 }, 

2295 'required': ['crypto'] 

2296 } 

2297 } 

2298 } 

2299 

2300 metrics = [ 

2301 ('Total Keys', {'Scope': 'Account'}), 

2302 ('Unencrypted', {'Scope': 'Account'})] 

2303 

2304 def __init__(self, data, manager=None): 

2305 super(EncryptExtantKeys, self).__init__(data, manager) 

2306 self.kms_id = self.data.get('key-id') 

2307 

2308 def get_permissions(self): 

2309 perms = ("s3:GetObject", "s3:GetObjectVersion") 

2310 if self.data.get('report-only'): 

2311 perms += ('s3:DeleteObject', 's3:DeleteObjectVersion', 

2312 's3:PutObject', 

2313 's3:AbortMultipartUpload', 

2314 's3:ListBucket', 

2315 's3:ListBucketVersions') 

2316 return perms 

2317 

2318 def process(self, buckets): 

2319 

2320 t = time.time() 

2321 results = super(EncryptExtantKeys, self).process(buckets) 

2322 run_time = time.time() - t 

2323 remediated_count = object_count = 0 

2324 

2325 for r in results: 

2326 object_count += r['Count'] 

2327 remediated_count += r['Remediated'] 

2328 self.manager.ctx.metrics.put_metric( 

2329 "Unencrypted", r['Remediated'], "Count", Scope=r['Bucket'], 

2330 buffer=True) 

2331 

2332 self.manager.ctx.metrics.put_metric( 

2333 "Unencrypted", remediated_count, "Count", Scope="Account", 

2334 buffer=True 

2335 ) 

2336 self.manager.ctx.metrics.put_metric( 

2337 "Total Keys", object_count, "Count", Scope="Account", 

2338 buffer=True 

2339 ) 

2340 self.manager.ctx.metrics.flush() 

2341 

2342 log.info( 

2343 ("EncryptExtant Complete keys:%d " 

2344 "remediated:%d rate:%0.2f/s time:%0.2fs"), 

2345 object_count, 

2346 remediated_count, 

2347 float(object_count) / run_time if run_time else 0, 

2348 run_time) 

2349 return results 

2350 

2351 def process_chunk(self, batch, bucket): 

2352 crypto_method = self.data.get('crypto', 'AES256') 

2353 s3 = bucket_client( 

2354 local_session(self.manager.session_factory), bucket, 

2355 kms=(crypto_method == 'aws:kms')) 

2356 b = bucket['Name'] 

2357 results = [] 

2358 key_processor = self.get_bucket_op(bucket, 'key_processor') 

2359 for key in batch: 

2360 r = key_processor(s3, key, b) 

2361 if r: 

2362 results.append(r) 

2363 return results 

2364 

2365 def process_key(self, s3, key, bucket_name, info=None): 

2366 k = key['Key'] 

2367 if info is None: 

2368 info = s3.head_object(Bucket=bucket_name, Key=k) 

2369 

2370 # If the data is already encrypted with AES256 and this request is also 

2371 # for AES256 then we don't need to do anything 

2372 if info.get('ServerSideEncryption') == 'AES256' and not self.kms_id: 

2373 return False 

2374 

2375 if info.get('ServerSideEncryption') == 'aws:kms': 

2376 # If we're not looking for a specific key any key will do. 

2377 if not self.kms_id: 

2378 return False 

2379 # If we're configured to use a specific key and the key matches 

2380 # note this is not a strict equality match. 

2381 if self.kms_id in info.get('SSEKMSKeyId', ''): 

2382 return False 

2383 

2384 if self.data.get('report-only'): 

2385 return k 

2386 

2387 storage_class = info.get('StorageClass', 'STANDARD') 

2388 

2389 if storage_class == 'GLACIER': 

2390 if not self.data.get('glacier'): 

2391 return False 

2392 if 'Restore' not in info: 

2393 # This takes multiple hours, we let the next c7n 

2394 # run take care of followups. 

2395 s3.restore_object( 

2396 Bucket=bucket_name, 

2397 Key=k, 

2398 RestoreRequest={'Days': 30}) 

2399 return False 

2400 elif not restore_complete(info['Restore']): 

2401 return False 

2402 

2403 storage_class = 'STANDARD' 

2404 

2405 crypto_method = self.data.get('crypto', 'AES256') 

2406 key_id = self.data.get('key-id') 

2407 # Note on copy we lose individual object acl grants 

2408 params = {'Bucket': bucket_name, 

2409 'Key': k, 

2410 'CopySource': "/%s/%s" % (bucket_name, k), 

2411 'MetadataDirective': 'COPY', 

2412 'StorageClass': storage_class, 

2413 'ServerSideEncryption': crypto_method} 

2414 

2415 if key_id and crypto_method == 'aws:kms': 

2416 params['SSEKMSKeyId'] = key_id 

2417 

2418 if info['ContentLength'] > MAX_COPY_SIZE and self.data.get( 

2419 'large', True): 

2420 return self.process_large_file(s3, bucket_name, key, info, params) 

2421 

2422 s3.copy_object(**params) 

2423 return k 

2424 

2425 def process_version(self, s3, key, bucket_name): 

2426 info = s3.head_object( 

2427 Bucket=bucket_name, 

2428 Key=key['Key'], 

2429 VersionId=key['VersionId']) 

2430 

2431 if 'ServerSideEncryption' in info: 

2432 return False 

2433 

2434 if self.data.get('report-only'): 

2435 return key['Key'], key['VersionId'] 

2436 

2437 if key['IsLatest']: 

2438 r = self.process_key(s3, key, bucket_name, info) 

2439 # Glacier request processing, wait till we have the restored object 

2440 if not r: 

2441 return r 

2442 s3.delete_object( 

2443 Bucket=bucket_name, 

2444 Key=key['Key'], 

2445 VersionId=key['VersionId']) 

2446 return key['Key'], key['VersionId'] 

2447 

2448 def process_large_file(self, s3, bucket_name, key, info, params): 

2449 """For objects over 5gb, use multipart upload to copy""" 

2450 part_size = MAX_COPY_SIZE - (1024 ** 2) 

2451 num_parts = int(math.ceil(info['ContentLength'] / part_size)) 

2452 source = params.pop('CopySource') 

2453 

2454 params.pop('MetadataDirective') 

2455 if 'Metadata' in info: 

2456 params['Metadata'] = info['Metadata'] 

2457 

2458 upload_id = s3.create_multipart_upload(**params)['UploadId'] 

2459 

2460 params = {'Bucket': bucket_name, 

2461 'Key': key['Key'], 

2462 'UploadId': upload_id, 

2463 'CopySource': source, 

2464 'CopySourceIfMatch': info['ETag']} 

2465 

2466 def upload_part(part_num): 

2467 part_params = dict(params) 

2468 part_params['CopySourceRange'] = "bytes=%d-%d" % ( 

2469 part_size * (part_num - 1), 

2470 min(part_size * part_num - 1, info['ContentLength'] - 1)) 

2471 part_params['PartNumber'] = part_num 

2472 response = s3.upload_part_copy(**part_params) 

2473 return {'ETag': response['CopyPartResult']['ETag'], 

2474 'PartNumber': part_num} 

2475 

2476 try: 

2477 with self.executor_factory(max_workers=2) as w: 

2478 parts = list(w.map(upload_part, range(1, num_parts + 1))) 

2479 except Exception: 

2480 log.warning( 

2481 "Error during large key copy bucket: %s key: %s, " 

2482 "aborting upload", bucket_name, key, exc_info=True) 

2483 s3.abort_multipart_upload( 

2484 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id) 

2485 raise 

2486 s3.complete_multipart_upload( 

2487 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id, 

2488 MultipartUpload={'Parts': parts}) 

2489 return key['Key'] 

2490 

2491 

2492def restore_complete(restore): 

2493 if ',' in restore: 

2494 ongoing, _ = restore.split(',', 1) 

2495 else: 

2496 ongoing = restore 

2497 return 'false' in ongoing 

2498 

2499 

2500@filters.register('is-log-target') 

2501class LogTarget(Filter): 

2502 """Filter and return buckets are log destinations. 

2503 

2504 Not suitable for use in lambda on large accounts, This is a api 

2505 heavy process to detect scan all possible log sources. 

2506 

2507 Sources: 

2508 - elb (Access Log) 

2509 - s3 (Access Log) 

2510 - cfn (Template writes) 

2511 - cloudtrail 

2512 

2513 :example: 

2514 

2515 .. code-block:: yaml 

2516 

2517 policies: 

2518 - name: s3-log-bucket 

2519 resource: s3 

2520 filters: 

2521 - type: is-log-target 

2522 """ 

2523 

2524 schema = type_schema( 

2525 'is-log-target', 

2526 services={'type': 'array', 'items': {'enum': [ 

2527 's3', 'elb', 'cloudtrail']}}, 

2528 self={'type': 'boolean'}, 

2529 value={'type': 'boolean'}) 

2530 

2531 def get_permissions(self): 

2532 perms = self.manager.get_resource_manager('elb').get_permissions() 

2533 perms += ('elasticloadbalancing:DescribeLoadBalancerAttributes',) 

2534 return perms 

2535 

2536 def process(self, buckets, event=None): 

2537 log_buckets = set() 

2538 count = 0 

2539 

2540 services = self.data.get('services', ['elb', 's3', 'cloudtrail']) 

2541 self_log = self.data.get('self', False) 

2542 

2543 if 'elb' in services and not self_log: 

2544 for bucket, _ in self.get_elb_bucket_locations(): 

2545 log_buckets.add(bucket) 

2546 count += 1 

2547 self.log.debug("Found %d elb log targets" % count) 

2548 

2549 if 's3' in services: 

2550 count = 0 

2551 for bucket, _ in self.get_s3_bucket_locations(buckets, self_log): 

2552 count += 1 

2553 log_buckets.add(bucket) 

2554 self.log.debug('Found %d s3 log targets' % count) 

2555 

2556 if 'cloudtrail' in services and not self_log: 

2557 for bucket, _ in self.get_cloud_trail_locations(buckets): 

2558 log_buckets.add(bucket) 

2559 

2560 self.log.info("Found %d log targets for %d buckets" % ( 

2561 len(log_buckets), len(buckets))) 

2562 if self.data.get('value', True): 

2563 return [b for b in buckets if b['Name'] in log_buckets] 

2564 else: 

2565 return [b for b in buckets if b['Name'] not in log_buckets] 

2566 

2567 @staticmethod 

2568 def get_s3_bucket_locations(buckets, self_log=False): 

2569 """return (bucket_name, prefix) for all s3 logging targets""" 

2570 for b in buckets: 

2571 if b.get('Logging'): 

2572 if self_log: 

2573 if b['Name'] != b['Logging']['TargetBucket']: 

2574 continue 

2575 yield (b['Logging']['TargetBucket'], 

2576 b['Logging']['TargetPrefix']) 

2577 if not self_log and b['Name'].startswith('cf-templates-'): 

2578 yield (b['Name'], '') 

2579 

2580 def get_cloud_trail_locations(self, buckets): 

2581 session = local_session(self.manager.session_factory) 

2582 client = session.client('cloudtrail') 

2583 names = {b['Name'] for b in buckets} 

2584 for t in client.describe_trails().get('trailList', ()): 

2585 if t.get('S3BucketName') in names: 

2586 yield (t['S3BucketName'], t.get('S3KeyPrefix', '')) 

2587 

2588 def get_elb_bucket_locations(self): 

2589 elbs = self.manager.get_resource_manager('elb').resources() 

2590 get_elb_attrs = functools.partial( 

2591 _query_elb_attrs, self.manager.session_factory) 

2592 

2593 with self.executor_factory(max_workers=2) as w: 

2594 futures = [] 

2595 for elb_set in chunks(elbs, 100): 

2596 futures.append(w.submit(get_elb_attrs, elb_set)) 

2597 for f in as_completed(futures): 

2598 if f.exception(): 

2599 log.error("Error while scanning elb log targets: %s" % ( 

2600 f.exception())) 

2601 continue 

2602 for tgt in f.result(): 

2603 yield tgt 

2604 

2605 

2606def _query_elb_attrs(session_factory, elb_set): 

2607 session = local_session(session_factory) 

2608 client = session.client('elb') 

2609 log_targets = [] 

2610 for e in elb_set: 

2611 try: 

2612 attrs = client.describe_load_balancer_attributes( 

2613 LoadBalancerName=e['LoadBalancerName'])[ 

2614 'LoadBalancerAttributes'] 

2615 if 'AccessLog' in attrs and attrs['AccessLog']['Enabled']: 

2616 log_targets.append(( 

2617 attrs['AccessLog']['S3BucketName'], 

2618 attrs['AccessLog']['S3BucketPrefix'])) 

2619 except Exception as err: 

2620 log.warning( 

2621 "Could not retrieve load balancer %s: %s" % ( 

2622 e['LoadBalancerName'], err)) 

2623 return log_targets 

2624 

2625 

2626@actions.register('remove-website-hosting') 

2627class RemoveWebsiteHosting(BucketActionBase): 

2628 """Action that removes website hosting configuration.""" 

2629 

2630 schema = type_schema('remove-website-hosting') 

2631 

2632 permissions = ('s3:DeleteBucketWebsite',) 

2633 

2634 def process(self, buckets): 

2635 session = local_session(self.manager.session_factory) 

2636 for bucket in buckets: 

2637 client = bucket_client(session, bucket) 

2638 client.delete_bucket_website(Bucket=bucket['Name']) 

2639 

2640 

2641@actions.register('delete-global-grants') 

2642class DeleteGlobalGrants(BucketActionBase): 

2643 """Deletes global grants associated to a S3 bucket 

2644 

2645 :example: 

2646 

2647 .. code-block:: yaml 

2648 

2649 policies: 

2650 - name: s3-delete-global-grants 

2651 resource: s3 

2652 filters: 

2653 - type: global-grants 

2654 actions: 

2655 - delete-global-grants 

2656 """ 

2657 

2658 schema = type_schema( 

2659 'delete-global-grants', 

2660 grantees={'type': 'array', 'items': {'type': 'string'}}) 

2661 

2662 permissions = ('s3:PutBucketAcl',) 

2663 

2664 def process(self, buckets): 

2665 with self.executor_factory(max_workers=5) as w: 

2666 return list(filter(None, list(w.map(self.process_bucket, buckets)))) 

2667 

2668 def process_bucket(self, b): 

2669 grantees = self.data.get( 

2670 'grantees', [ 

2671 GlobalGrantsFilter.AUTH_ALL, GlobalGrantsFilter.GLOBAL_ALL]) 

2672 

2673 log.info(b) 

2674 

2675 acl = b.get('Acl', {'Grants': []}) 

2676 if not acl or not acl['Grants']: 

2677 return 

2678 new_grants = [] 

2679 for grant in acl['Grants']: 

2680 grantee = grant.get('Grantee', {}) 

2681 if not grantee: 

2682 continue 

2683 # Yuck, 'get_bucket_acl' doesn't return the grantee type. 

2684 if 'URI' in grantee: 

2685 grantee['Type'] = 'Group' 

2686 else: 

2687 grantee['Type'] = 'CanonicalUser' 

2688 if ('URI' in grantee and 

2689 grantee['URI'] in grantees and not 

2690 (grant['Permission'] == 'READ' and b['Website'])): 

2691 # Remove this grantee. 

2692 pass 

2693 else: 

2694 new_grants.append(grant) 

2695 

2696 log.info({'Owner': acl['Owner'], 'Grants': new_grants}) 

2697 

2698 c = bucket_client(self.manager.session_factory(), b) 

2699 try: 

2700 c.put_bucket_acl( 

2701 Bucket=b['Name'], 

2702 AccessControlPolicy={ 

2703 'Owner': acl['Owner'], 'Grants': new_grants}) 

2704 except ClientError as e: 

2705 if e.response['Error']['Code'] == 'NoSuchBucket': 

2706 return 

2707 return b 

2708 

2709 

2710@actions.register('tag') 

2711class BucketTag(Tag): 

2712 """Action to create tags on a S3 bucket 

2713 

2714 :example: 

2715 

2716 .. code-block:: yaml 

2717 

2718 policies: 

2719 - name: s3-tag-region 

2720 resource: s3 

2721 region: us-east-1 

2722 filters: 

2723 - "tag:RegionName": absent 

2724 actions: 

2725 - type: tag 

2726 key: RegionName 

2727 value: us-east-1 

2728 """ 

2729 

2730 def process_resource_set(self, client, resource_set, tags): 

2731 modify_bucket_tags(self.manager.session_factory, resource_set, tags) 

2732 

2733 

2734@actions.register('mark-for-op') 

2735class MarkBucketForOp(TagDelayedAction): 

2736 """Action schedules custodian to perform an action at a certain date 

2737 

2738 :example: 

2739 

2740 .. code-block:: yaml 

2741 

2742 policies: 

2743 - name: s3-encrypt 

2744 resource: s3 

2745 filters: 

2746 - type: missing-statement 

2747 statement_ids: 

2748 - RequiredEncryptedPutObject 

2749 actions: 

2750 - type: mark-for-op 

2751 op: attach-encrypt 

2752 days: 7 

2753 """ 

2754 

2755 schema = type_schema( 

2756 'mark-for-op', rinherit=TagDelayedAction.schema) 

2757 

2758 

2759@actions.register('unmark') 

2760@actions.register('remove-tag') 

2761class RemoveBucketTag(RemoveTag): 

2762 """Removes tag/tags from a S3 object 

2763 

2764 :example: 

2765 

2766 .. code-block:: yaml 

2767 

2768 policies: 

2769 - name: s3-remove-owner-tag 

2770 resource: s3 

2771 filters: 

2772 - "tag:BucketOwner": present 

2773 actions: 

2774 - type: remove-tag 

2775 tags: ['BucketOwner'] 

2776 """ 

2777 

2778 def process_resource_set(self, client, resource_set, tags): 

2779 modify_bucket_tags( 

2780 self.manager.session_factory, resource_set, remove_tags=tags) 

2781 

2782 

2783@filters.register('data-events') 

2784class DataEvents(Filter): 

2785 """Find buckets for which CloudTrail is logging data events. 

2786 

2787 Note that this filter only examines trails that are defined in the 

2788 current account. 

2789 """ 

2790 

2791 schema = type_schema('data-events', state={'enum': ['present', 'absent']}) 

2792 permissions = ( 

2793 'cloudtrail:DescribeTrails', 

2794 'cloudtrail:GetEventSelectors') 

2795 

2796 def get_event_buckets(self, session, trails): 

2797 """Return a mapping of bucket name to cloudtrail. 

2798 

2799 For wildcard trails the bucket name is ''. 

2800 """ 

2801 regions = {t.get('HomeRegion') for t in trails} 

2802 clients = {} 

2803 for region in regions: 

2804 clients[region] = session.client('cloudtrail', region_name=region) 

2805 

2806 event_buckets = {} 

2807 for t in trails: 

2808 for events in clients[t.get('HomeRegion')].get_event_selectors( 

2809 TrailName=t['Name']).get('EventSelectors', ()): 

2810 if 'DataResources' not in events: 

2811 continue 

2812 for data_events in events['DataResources']: 

2813 if data_events['Type'] != 'AWS::S3::Object': 

2814 continue 

2815 for b in data_events['Values']: 

2816 event_buckets[b.rsplit(':')[-1].strip('/')] = t['Name'] 

2817 return event_buckets 

2818 

2819 def process(self, resources, event=None): 

2820 trails = self.manager.get_resource_manager('cloudtrail').resources() 

2821 local_trails = self.filter_resources( 

2822 trails, 

2823 "split(':', TrailARN)[4]", (self.manager.account_id,) 

2824 ) 

2825 session = local_session(self.manager.session_factory) 

2826 event_buckets = self.get_event_buckets(session, local_trails) 

2827 ops = { 

2828 'present': lambda x: ( 

2829 x['Name'] in event_buckets or '' in event_buckets), 

2830 'absent': ( 

2831 lambda x: x['Name'] not in event_buckets and '' 

2832 not in event_buckets)} 

2833 

2834 op = ops[self.data.get('state', 'present')] 

2835 results = [] 

2836 for b in resources: 

2837 if op(b): 

2838 results.append(b) 

2839 return results 

2840 

2841 

2842@filters.register('inventory') 

2843class Inventory(ValueFilter): 

2844 """Filter inventories for a bucket""" 

2845 schema = type_schema('inventory', rinherit=ValueFilter.schema) 

2846 schema_alias = False 

2847 permissions = ('s3:GetInventoryConfiguration',) 

2848 

2849 def process(self, buckets, event=None): 

2850 results = [] 

2851 with self.executor_factory(max_workers=2) as w: 

2852 futures = {} 

2853 for b in buckets: 

2854 futures[w.submit(self.process_bucket, b)] = b 

2855 

2856 for f in as_completed(futures): 

2857 b = futures[f] 

2858 if f.exception(): 

2859 b.setdefault('c7n:DeniedMethods', []).append('GetInventoryConfiguration') 

2860 self.log.error( 

2861 "Error processing bucket: %s error: %s", 

2862 b['Name'], f.exception()) 

2863 continue 

2864 if f.result(): 

2865 results.append(b) 

2866 return results 

2867 

2868 def process_bucket(self, b): 

2869 if 'c7n:inventories' not in b: 

2870 client = bucket_client(local_session(self.manager.session_factory), b) 

2871 inventories = client.list_bucket_inventory_configurations( 

2872 Bucket=b['Name']).get('InventoryConfigurationList', []) 

2873 b['c7n:inventories'] = inventories 

2874 

2875 for i in b['c7n:inventories']: 

2876 if self.match(i): 

2877 return True 

2878 

2879 

2880@actions.register('set-inventory') 

2881class SetInventory(BucketActionBase): 

2882 """Configure bucket inventories for an s3 bucket. 

2883 """ 

2884 schema = type_schema( 

2885 'set-inventory', 

2886 required=['name', 'destination'], 

2887 state={'enum': ['enabled', 'disabled', 'absent']}, 

2888 name={'type': 'string', 'description': 'Name of inventory'}, 

2889 destination={'type': 'string', 'description': 'Name of destination bucket'}, 

2890 prefix={'type': 'string', 'description': 'Destination prefix'}, 

2891 encryption={'enum': ['SSES3', 'SSEKMS']}, 

2892 key_id={'type': 'string', 'description': 'Optional Customer KMS KeyId for SSE-KMS'}, 

2893 versions={'enum': ['All', 'Current']}, 

2894 schedule={'enum': ['Daily', 'Weekly']}, 

2895 format={'enum': ['CSV', 'ORC', 'Parquet']}, 

2896 fields={'type': 'array', 'items': {'enum': [ 

2897 'Size', 'LastModifiedDate', 'StorageClass', 'ETag', 

2898 'IsMultipartUploaded', 'ReplicationStatus', 'EncryptionStatus', 

2899 'ObjectLockRetainUntilDate', 'ObjectLockMode', 'ObjectLockLegalHoldStatus', 

2900 'IntelligentTieringAccessTier', 'BucketKeyStatus', 'ChecksumAlgorithm', 

2901 'ObjectAccessControlList', 'ObjectOwner']}}) 

2902 

2903 permissions = ('s3:PutInventoryConfiguration', 's3:GetInventoryConfiguration') 

2904 

2905 def process(self, buckets): 

2906 with self.executor_factory(max_workers=2) as w: 

2907 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

2908 for future in as_completed(futures): 

2909 bucket = futures[future] 

2910 try: 

2911 future.result() 

2912 except Exception as e: 

2913 self.log.error('Message: %s Bucket: %s', e, bucket['Name']) 

2914 

2915 def process_bucket(self, b): 

2916 inventory_name = self.data.get('name') 

2917 destination = self.data.get('destination') 

2918 prefix = self.data.get('prefix', '') 

2919 schedule = self.data.get('schedule', 'Daily') 

2920 fields = self.data.get('fields', ['LastModifiedDate', 'Size']) 

2921 versions = self.data.get('versions', 'Current') 

2922 state = self.data.get('state', 'enabled') 

2923 encryption = self.data.get('encryption') 

2924 inventory_format = self.data.get('format', 'CSV') 

2925 

2926 if not prefix: 

2927 prefix = "Inventories/%s" % (self.manager.config.account_id) 

2928 

2929 client = bucket_client(local_session(self.manager.session_factory), b) 

2930 if state == 'absent': 

2931 try: 

2932 client.delete_bucket_inventory_configuration( 

2933 Bucket=b['Name'], Id=inventory_name) 

2934 except ClientError as e: 

2935 if e.response['Error']['Code'] != 'NoSuchConfiguration': 

2936 raise 

2937 return 

2938 

2939 bucket = { 

2940 'Bucket': "arn:aws:s3:::%s" % destination, 

2941 'Format': inventory_format 

2942 } 

2943 

2944 inventory = { 

2945 'Destination': { 

2946 'S3BucketDestination': bucket 

2947 }, 

2948 'IsEnabled': state == 'enabled' and True or False, 

2949 'Id': inventory_name, 

2950 'OptionalFields': fields, 

2951 'IncludedObjectVersions': versions, 

2952 'Schedule': { 

2953 'Frequency': schedule 

2954 } 

2955 } 

2956 

2957 if prefix: 

2958 bucket['Prefix'] = prefix 

2959 

2960 if encryption: 

2961 bucket['Encryption'] = {encryption: {}} 

2962 if encryption == 'SSEKMS' and self.data.get('key_id'): 

2963 bucket['Encryption'] = {encryption: { 

2964 'KeyId': self.data['key_id'] 

2965 }} 

2966 

2967 found = self.get_inventory_delta(client, inventory, b) 

2968 if found: 

2969 return 

2970 if found is False: 

2971 self.log.debug("updating bucket:%s inventory configuration id:%s", 

2972 b['Name'], inventory_name) 

2973 client.put_bucket_inventory_configuration( 

2974 Bucket=b['Name'], Id=inventory_name, InventoryConfiguration=inventory) 

2975 

2976 def get_inventory_delta(self, client, inventory, b): 

2977 inventories = client.list_bucket_inventory_configurations(Bucket=b['Name']) 

2978 found = None 

2979 for i in inventories.get('InventoryConfigurationList', []): 

2980 if i['Id'] != inventory['Id']: 

2981 continue 

2982 found = True 

2983 for k, v in inventory.items(): 

2984 if k not in i: 

2985 found = False 

2986 continue 

2987 if isinstance(v, list): 

2988 v.sort() 

2989 i[k].sort() 

2990 if i[k] != v: 

2991 found = False 

2992 return found 

2993 

2994 

2995@filters.register('intelligent-tiering') 

2996class IntelligentTiering(ListItemFilter): 

2997 """Filter for S3 buckets to look at intelligent tiering configurations 

2998 

2999 The schema to supply to the attrs follows the schema here: 

3000 https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_bucket_intelligent_tiering_configurations.html 

3001 

3002 :example: 

3003 

3004 .. code-block:: yaml 

3005 

3006 policies: 

3007 - name: s3-intelligent-tiering-configuration 

3008 resource: s3 

3009 filters: 

3010 - type: intelligent-tiering 

3011 attrs: 

3012 - Status: Enabled 

3013 - Filter: 

3014 And: 

3015 Prefix: test 

3016 Tags: 

3017 - Key: Owner 

3018 Value: c7n 

3019 - Tierings: 

3020 - Days: 100 

3021 - AccessTier: ARCHIVE_ACCESS 

3022 

3023 """ 

3024 schema = type_schema( 

3025 'intelligent-tiering', 

3026 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'}, 

3027 count={'type': 'number'}, 

3028 count_op={'$ref': '#/definitions/filters_common/comparison_operators'} 

3029 ) 

3030 permissions = ('s3:GetIntelligentTieringConfiguration',) 

3031 annotation_key = "c7n:IntelligentTiering" 

3032 annotate_items = True 

3033 

3034 def __init__(self, data, manager=None): 

3035 super().__init__(data, manager) 

3036 self.data['key'] = self.annotation_key 

3037 

3038 def process(self, buckets, event=None): 

3039 with self.executor_factory(max_workers=2) as w: 

3040 futures = {w.submit(self.get_item_values, b): b for b in buckets} 

3041 for future in as_completed(futures): 

3042 b = futures[future] 

3043 if future.exception(): 

3044 self.log.error("Message: %s Bucket: %s", future.exception(), b['Name']) 

3045 continue 

3046 return super().process(buckets, event) 

3047 

3048 def get_item_values(self, b): 

3049 if self.annotation_key not in b: 

3050 client = bucket_client(local_session(self.manager.session_factory), b) 

3051 try: 

3052 int_tier_config = client.list_bucket_intelligent_tiering_configurations( 

3053 Bucket=b['Name']) 

3054 b[self.annotation_key] = int_tier_config.get( 

3055 'IntelligentTieringConfigurationList', []) 

3056 except ClientError as e: 

3057 if e.response['Error']['Code'] == 'AccessDenied': 

3058 method = 'list_bucket_intelligent_tiering_configurations' 

3059 log.warning( 

3060 "Bucket:%s unable to invoke method:%s error:%s ", 

3061 b['Name'], method, e.response['Error']['Message']) 

3062 b.setdefault('c7n:DeniedMethods', []).append(method) 

3063 return b.get(self.annotation_key) 

3064 

3065 

3066@actions.register('set-intelligent-tiering') 

3067class ConfigureIntelligentTiering(BucketActionBase): 

3068 """Action applies an intelligent tiering configuration to a S3 bucket 

3069 

3070 The schema to supply to the configuration follows the schema here: 

3071 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_bucket_intelligent_tiering_configuration.html 

3072 

3073 To delete a configuration, supply Status=delete with the either the Id or Id: matched 

3074 

3075 :example: 

3076 

3077 .. code-block:: yaml 

3078 

3079 policies: 

3080 - name: s3-apply-intelligent-tiering-config 

3081 resource: aws.s3 

3082 filters: 

3083 - not: 

3084 - type: intelligent-tiering 

3085 attrs: 

3086 - Status: Enabled 

3087 - Filter: 

3088 And: 

3089 Prefix: helloworld 

3090 Tags: 

3091 - Key: Hello 

3092 Value: World 

3093 - Tierings: 

3094 - Days: 123 

3095 AccessTier: ARCHIVE_ACCESS 

3096 actions: 

3097 - type: set-intelligent-tiering 

3098 Id: c7n-default 

3099 IntelligentTieringConfiguration: 

3100 Id: c7n-default 

3101 Status: Enabled 

3102 Tierings: 

3103 - Days: 149 

3104 AccessTier: ARCHIVE_ACCESS 

3105 

3106 - name: s3-delete-intelligent-tiering-configuration 

3107 resource: aws.s3 

3108 filters: 

3109 - type: intelligent-tiering 

3110 attrs: 

3111 - Status: Enabled 

3112 - Id: test-config 

3113 actions: 

3114 - type: set-intelligent-tiering 

3115 Id: test-config 

3116 State: delete 

3117 

3118 - name: s3-delete-intelligent-tiering-matched-configs 

3119 resource: aws.s3 

3120 filters: 

3121 - type: intelligent-tiering 

3122 attrs: 

3123 - Status: Enabled 

3124 - Id: test-config 

3125 actions: 

3126 - type: set-intelligent-tiering 

3127 Id: matched 

3128 State: delete 

3129 

3130 """ 

3131 

3132 annotation_key = 'c7n:ListItemMatches' 

3133 shape = 'PutBucketIntelligentTieringConfigurationRequest' 

3134 schema = { 

3135 'type': 'object', 

3136 'additionalProperties': False, 

3137 'oneOf': [ 

3138 {'required': ['type', 'Id', 'IntelligentTieringConfiguration']}, 

3139 {'required': ['type', 'Id', 'State']}], 

3140 'properties': { 

3141 'type': {'enum': ['set-intelligent-tiering']}, 

3142 'Id': {'type': 'string'}, 

3143 # delete intelligent tier configurations via state: delete 

3144 'State': {'type': 'string', 'enum': ['delete']}, 

3145 'IntelligentTieringConfiguration': {'type': 'object'} 

3146 }, 

3147 } 

3148 

3149 permissions = ('s3:PutIntelligentTieringConfiguration',) 

3150 

3151 def validate(self): 

3152 # You can have up to 1,000 S3 Intelligent-Tiering configurations per bucket. 

3153 # Hence, always use it with a filter 

3154 found = False 

3155 for f in self.manager.iter_filters(): 

3156 if isinstance(f, IntelligentTiering): 

3157 found = True 

3158 break 

3159 if not found: 

3160 raise PolicyValidationError( 

3161 '`set-intelligent-tiering` may only be used in ' 

3162 'conjunction with `intelligent-tiering` filter on %s' % (self.manager.data,)) 

3163 cfg = dict(self.data) 

3164 if 'IntelligentTieringConfiguration' in cfg: 

3165 cfg['Bucket'] = 'bucket' 

3166 cfg.pop('type') 

3167 return shape_validate( 

3168 cfg, self.shape, self.manager.resource_type.service) 

3169 

3170 def process(self, buckets): 

3171 with self.executor_factory(max_workers=3) as w: 

3172 futures = {} 

3173 

3174 for b in buckets: 

3175 futures[w.submit(self.process_bucket, b)] = b 

3176 

3177 for future in as_completed(futures): 

3178 if future.exception(): 

3179 bucket = futures[future] 

3180 self.log.error( 

3181 'error modifying bucket intelligent tiering configuration: %s\n%s', 

3182 bucket['Name'], future.exception()) 

3183 continue 

3184 

3185 def process_bucket(self, bucket): 

3186 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3187 

3188 if 'list_bucket_intelligent_tiering_configurations' in bucket.get( 

3189 'c7n:DeniedMethods', []): 

3190 log.warning("Access Denied Bucket:%s while reading intelligent tiering configurations" 

3191 % bucket['Name']) 

3192 return 

3193 

3194 if self.data.get('Id') and self.data.get('IntelligentTieringConfiguration'): 

3195 try: 

3196 s3.put_bucket_intelligent_tiering_configuration( 

3197 Bucket=bucket['Name'], Id=self.data.get( 

3198 'Id'), IntelligentTieringConfiguration=self.data.get( 

3199 'IntelligentTieringConfiguration')) 

3200 except ClientError as e: 

3201 if e.response['Error']['Code'] == 'AccessDenied': 

3202 log.warning( 

3203 "Access Denied Bucket:%s while applying intelligent tiering configuration" 

3204 % bucket['Name']) 

3205 if self.data.get('State'): 

3206 if self.data.get('Id') == 'matched': 

3207 for config in bucket.get(self.annotation_key): 

3208 self.delete_intelligent_tiering_configurations(s3, config.get('Id'), bucket) 

3209 else: 

3210 self.delete_intelligent_tiering_configurations(s3, self.data.get('Id'), bucket) 

3211 

3212 def delete_intelligent_tiering_configurations(self, s3_client, id, bucket): 

3213 try: 

3214 s3_client.delete_bucket_intelligent_tiering_configuration(Bucket=bucket['Name'], Id=id) 

3215 except ClientError as e: 

3216 if e.response['Error']['Code'] == 'AccessDenied': 

3217 log.warning( 

3218 "Access Denied Bucket:%s while deleting intelligent tiering configuration" 

3219 % bucket['Name']) 

3220 elif e.response['Error']['Code'] == 'NoSuchConfiguration': 

3221 log.warning( 

3222 "No such configuration found:%s while deleting intelligent tiering configuration" 

3223 % bucket['Name']) 

3224 

3225 

3226@actions.register('delete') 

3227class DeleteBucket(ScanBucket): 

3228 """Action deletes a S3 bucket 

3229 

3230 :example: 

3231 

3232 .. code-block:: yaml 

3233 

3234 policies: 

3235 - name: delete-unencrypted-buckets 

3236 resource: s3 

3237 filters: 

3238 - type: missing-statement 

3239 statement_ids: 

3240 - RequiredEncryptedPutObject 

3241 actions: 

3242 - type: delete 

3243 remove-contents: true 

3244 """ 

3245 

3246 schema = type_schema('delete', **{'remove-contents': {'type': 'boolean'}}) 

3247 

3248 permissions = ('s3:*',) 

3249 

3250 bucket_ops = { 

3251 'standard': { 

3252 'iterator': 'list_objects', 

3253 'contents_key': ['Contents'], 

3254 'key_processor': 'process_key' 

3255 }, 

3256 'versioned': { 

3257 'iterator': 'list_object_versions', 

3258 'contents_key': ['Versions', 'DeleteMarkers'], 

3259 'key_processor': 'process_version' 

3260 } 

3261 } 

3262 

3263 def process_delete_enablement(self, b): 

3264 """Prep a bucket for deletion. 

3265 

3266 Clear out any pending multi-part uploads. 

3267 

3268 Disable versioning on the bucket, so deletes don't 

3269 generate fresh deletion markers. 

3270 """ 

3271 client = bucket_client( 

3272 local_session(self.manager.session_factory), b) 

3273 

3274 # Stop replication so we can suspend versioning 

3275 if b.get('Replication') is not None: 

3276 client.delete_bucket_replication(Bucket=b['Name']) 

3277 

3278 # Suspend versioning, so we don't get new delete markers 

3279 # as we walk and delete versions 

3280 if (self.get_bucket_style(b) == 'versioned' and b['Versioning']['Status'] == 'Enabled' and 

3281 self.data.get('remove-contents', True)): 

3282 client.put_bucket_versioning( 

3283 Bucket=b['Name'], 

3284 VersioningConfiguration={'Status': 'Suspended'}) 

3285 

3286 # Clear our multi-part uploads 

3287 uploads = client.get_paginator('list_multipart_uploads') 

3288 for p in uploads.paginate(Bucket=b['Name']): 

3289 for u in p.get('Uploads', ()): 

3290 client.abort_multipart_upload( 

3291 Bucket=b['Name'], 

3292 Key=u['Key'], 

3293 UploadId=u['UploadId']) 

3294 

3295 def process(self, buckets): 

3296 # might be worth sanity checking all our permissions 

3297 # on the bucket up front before disabling versioning/replication. 

3298 if self.data.get('remove-contents', True): 

3299 self._process_with_futures(self.process_delete_enablement, buckets) 

3300 self.empty_buckets(buckets) 

3301 

3302 results = self._process_with_futures(self.delete_bucket, buckets) 

3303 self.write_denied_buckets_file() 

3304 return results 

3305 

3306 def delete_bucket(self, b): 

3307 s3 = bucket_client(self.manager.session_factory(), b) 

3308 try: 

3309 self._run_api(s3.delete_bucket, Bucket=b['Name']) 

3310 except ClientError as e: 

3311 if e.response['Error']['Code'] == 'BucketNotEmpty': 

3312 self.log.error( 

3313 "Error while deleting bucket %s, bucket not empty" % ( 

3314 b['Name'])) 

3315 else: 

3316 raise e 

3317 

3318 def empty_buckets(self, buckets): 

3319 t = time.time() 

3320 results = super(DeleteBucket, self).process(buckets) 

3321 run_time = time.time() - t 

3322 object_count = 0 

3323 

3324 for r in results: 

3325 object_count += r['Count'] 

3326 self.manager.ctx.metrics.put_metric( 

3327 "Total Keys", object_count, "Count", Scope=r['Bucket'], 

3328 buffer=True) 

3329 self.manager.ctx.metrics.put_metric( 

3330 "Total Keys", object_count, "Count", Scope="Account", buffer=True) 

3331 self.manager.ctx.metrics.flush() 

3332 

3333 log.info( 

3334 "EmptyBucket buckets:%d Complete keys:%d rate:%0.2f/s time:%0.2fs", 

3335 len(buckets), object_count, 

3336 float(object_count) / run_time if run_time else 0, run_time) 

3337 return results 

3338 

3339 def process_chunk(self, batch, bucket): 

3340 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3341 objects = [] 

3342 for key in batch: 

3343 obj = {'Key': key['Key']} 

3344 if 'VersionId' in key: 

3345 obj['VersionId'] = key['VersionId'] 

3346 objects.append(obj) 

3347 results = s3.delete_objects( 

3348 Bucket=bucket['Name'], Delete={'Objects': objects}).get('Deleted', ()) 

3349 if self.get_bucket_style(bucket) != 'versioned': 

3350 return results 

3351 

3352 

3353@actions.register('configure-lifecycle') 

3354class Lifecycle(BucketActionBase): 

3355 """Action applies a lifecycle policy to versioned S3 buckets 

3356 

3357 The schema to supply to the rule follows the schema here: 

3358 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_bucket_lifecycle_configuration 

3359 

3360 To delete a lifecycle rule, supply Status=absent 

3361 

3362 :example: 

3363 

3364 .. code-block:: yaml 

3365 

3366 policies: 

3367 - name: s3-apply-lifecycle 

3368 resource: s3 

3369 actions: 

3370 - type: configure-lifecycle 

3371 rules: 

3372 - ID: my-lifecycle-id 

3373 Status: Enabled 

3374 Prefix: foo/ 

3375 Transitions: 

3376 - Days: 60 

3377 StorageClass: GLACIER 

3378 

3379 """ 

3380 

3381 schema = type_schema( 

3382 'configure-lifecycle', 

3383 **{ 

3384 'rules': { 

3385 'type': 'array', 

3386 'items': { 

3387 'type': 'object', 

3388 'required': ['ID', 'Status'], 

3389 'additionalProperties': False, 

3390 'properties': { 

3391 'ID': {'type': 'string'}, 

3392 # c7n intercepts `absent` 

3393 'Status': {'enum': ['Enabled', 'Disabled', 'absent']}, 

3394 'Prefix': {'type': 'string'}, 

3395 'Expiration': { 

3396 'type': 'object', 

3397 'additionalProperties': False, 

3398 'properties': { 

3399 'Date': {'type': 'string'}, # Date 

3400 'Days': {'type': 'integer'}, 

3401 'ExpiredObjectDeleteMarker': {'type': 'boolean'}, 

3402 }, 

3403 }, 

3404 'Filter': { 

3405 'type': 'object', 

3406 'minProperties': 1, 

3407 'maxProperties': 1, 

3408 'additionalProperties': False, 

3409 'properties': { 

3410 'Prefix': {'type': 'string'}, 

3411 'ObjectSizeGreaterThan': {'type': 'integer'}, 

3412 'ObjectSizeLessThan': {'type': 'integer'}, 

3413 'Tag': { 

3414 'type': 'object', 

3415 'required': ['Key', 'Value'], 

3416 'additionalProperties': False, 

3417 'properties': { 

3418 'Key': {'type': 'string'}, 

3419 'Value': {'type': 'string'}, 

3420 }, 

3421 }, 

3422 'And': { 

3423 'type': 'object', 

3424 'additionalProperties': False, 

3425 'properties': { 

3426 'Prefix': {'type': 'string'}, 

3427 'ObjectSizeGreaterThan': {'type': 'integer'}, 

3428 'ObjectSizeLessThan': {'type': 'integer'}, 

3429 'Tags': { 

3430 'type': 'array', 

3431 'items': { 

3432 'type': 'object', 

3433 'required': ['Key', 'Value'], 

3434 'additionalProperties': False, 

3435 'properties': { 

3436 'Key': {'type': 'string'}, 

3437 'Value': {'type': 'string'}, 

3438 }, 

3439 }, 

3440 }, 

3441 }, 

3442 }, 

3443 }, 

3444 }, 

3445 'Transitions': { 

3446 'type': 'array', 

3447 'items': { 

3448 'type': 'object', 

3449 'additionalProperties': False, 

3450 'properties': { 

3451 'Date': {'type': 'string'}, # Date 

3452 'Days': {'type': 'integer'}, 

3453 'StorageClass': {'type': 'string'}, 

3454 }, 

3455 }, 

3456 }, 

3457 'NoncurrentVersionTransitions': { 

3458 'type': 'array', 

3459 'items': { 

3460 'type': 'object', 

3461 'additionalProperties': False, 

3462 'properties': { 

3463 'NoncurrentDays': {'type': 'integer'}, 

3464 'NewerNoncurrentVersions': {'type': 'integer'}, 

3465 'StorageClass': {'type': 'string'}, 

3466 }, 

3467 }, 

3468 }, 

3469 'NoncurrentVersionExpiration': { 

3470 'type': 'object', 

3471 'additionalProperties': False, 

3472 'properties': { 

3473 'NoncurrentDays': {'type': 'integer'}, 

3474 'NewerNoncurrentVersions': {'type': 'integer'} 

3475 }, 

3476 }, 

3477 'AbortIncompleteMultipartUpload': { 

3478 'type': 'object', 

3479 'additionalProperties': False, 

3480 'properties': { 

3481 'DaysAfterInitiation': {'type': 'integer'}, 

3482 }, 

3483 }, 

3484 }, 

3485 }, 

3486 }, 

3487 } 

3488 ) 

3489 

3490 permissions = ('s3:GetLifecycleConfiguration', 's3:PutLifecycleConfiguration') 

3491 

3492 def process(self, buckets): 

3493 with self.executor_factory(max_workers=3) as w: 

3494 futures = {} 

3495 results = [] 

3496 

3497 for b in buckets: 

3498 futures[w.submit(self.process_bucket, b)] = b 

3499 

3500 for future in as_completed(futures): 

3501 if future.exception(): 

3502 bucket = futures[future] 

3503 self.log.error('error modifying bucket lifecycle: %s\n%s', 

3504 bucket['Name'], future.exception()) 

3505 results += filter(None, [future.result()]) 

3506 

3507 return results 

3508 

3509 def process_bucket(self, bucket): 

3510 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3511 

3512 if 'get_bucket_lifecycle_configuration' in bucket.get('c7n:DeniedMethods', []): 

3513 log.warning("Access Denied Bucket:%s while reading lifecycle" % bucket['Name']) 

3514 return 

3515 

3516 # Adjust the existing lifecycle by adding/deleting/overwriting rules as necessary 

3517 config = (bucket.get('Lifecycle') or {}).get('Rules', []) 

3518 for rule in self.data['rules']: 

3519 for index, existing_rule in enumerate(config): 

3520 if not existing_rule: 

3521 continue 

3522 if rule['ID'] == existing_rule['ID']: 

3523 if rule['Status'] == 'absent': 

3524 config[index] = None 

3525 else: 

3526 config[index] = rule 

3527 break 

3528 else: 

3529 if rule['Status'] != 'absent': 

3530 config.append(rule) 

3531 

3532 # The extra `list` conversion is required for python3 

3533 config = list(filter(None, config)) 

3534 

3535 try: 

3536 if not config: 

3537 s3.delete_bucket_lifecycle(Bucket=bucket['Name']) 

3538 else: 

3539 s3.put_bucket_lifecycle_configuration( 

3540 Bucket=bucket['Name'], LifecycleConfiguration={'Rules': config}) 

3541 except ClientError as e: 

3542 if e.response['Error']['Code'] == 'AccessDenied': 

3543 log.warning("Access Denied Bucket:%s while applying lifecycle" % bucket['Name']) 

3544 else: 

3545 raise e 

3546 

3547 

3548class KMSKeyResolverMixin: 

3549 """Builds a dictionary of region specific ARNs""" 

3550 

3551 def __init__(self, data, manager=None): 

3552 self.arns = dict() 

3553 self.data = data 

3554 self.manager = manager 

3555 

3556 def resolve_keys(self, buckets): 

3557 key = self.data.get('key') 

3558 if not key: 

3559 return None 

3560 

3561 regions = {get_region(b) for b in buckets} 

3562 for r in regions: 

3563 client = local_session(self.manager.session_factory).client('kms', region_name=r) 

3564 try: 

3565 key_meta = client.describe_key( 

3566 KeyId=key 

3567 ).get('KeyMetadata', {}) 

3568 key_id = key_meta.get('KeyId') 

3569 

3570 # We need a complete set of alias identifiers (names and ARNs) 

3571 # to fully evaluate bucket encryption filters. 

3572 key_aliases = client.list_aliases( 

3573 KeyId=key_id 

3574 ).get('Aliases', []) 

3575 

3576 self.arns[r] = { 

3577 'KeyId': key_id, 

3578 'Arn': key_meta.get('Arn'), 

3579 'KeyManager': key_meta.get('KeyManager'), 

3580 'Description': key_meta.get('Description'), 

3581 'Aliases': [ 

3582 alias[attr] 

3583 for alias in key_aliases 

3584 for attr in ('AliasArn', 'AliasName') 

3585 ], 

3586 } 

3587 

3588 except ClientError as e: 

3589 self.log.error('Error resolving kms ARNs for set-bucket-encryption: %s key: %s' % ( 

3590 e, self.data.get('key'))) 

3591 

3592 def get_key(self, bucket): 

3593 if 'key' not in self.data: 

3594 return None 

3595 region = get_region(bucket) 

3596 key = self.arns.get(region) 

3597 if not key: 

3598 self.log.warning('Unable to resolve key %s for bucket %s in region %s', 

3599 self.data['key'], bucket.get('Name'), region) 

3600 return key 

3601 

3602 

3603@filters.register('bucket-encryption') 

3604class BucketEncryption(KMSKeyResolverMixin, Filter): 

3605 """Filters for S3 buckets that have bucket-encryption 

3606 

3607 :example 

3608 

3609 .. code-block:: yaml 

3610 

3611 policies: 

3612 - name: s3-bucket-encryption-AES256 

3613 resource: s3 

3614 region: us-east-1 

3615 filters: 

3616 - type: bucket-encryption 

3617 state: True 

3618 crypto: AES256 

3619 - name: s3-bucket-encryption-KMS 

3620 resource: s3 

3621 region: us-east-1 

3622 filters: 

3623 - type: bucket-encryption 

3624 state: True 

3625 crypto: aws:kms 

3626 key: alias/some/alias/key 

3627 - name: s3-bucket-encryption-off 

3628 resource: s3 

3629 region: us-east-1 

3630 filters: 

3631 - type: bucket-encryption 

3632 state: False 

3633 - name: s3-bucket-test-bucket-key-enabled 

3634 resource: s3 

3635 region: us-east-1 

3636 filters: 

3637 - type: bucket-encryption 

3638 bucket_key_enabled: True 

3639 """ 

3640 schema = type_schema('bucket-encryption', 

3641 state={'type': 'boolean'}, 

3642 crypto={'type': 'string', 'enum': ['AES256', 'aws:kms']}, 

3643 key={'type': 'string'}, 

3644 bucket_key_enabled={'type': 'boolean'}) 

3645 

3646 permissions = ('s3:GetEncryptionConfiguration', 'kms:DescribeKey', 'kms:ListAliases') 

3647 annotation_key = 'c7n:bucket-encryption' 

3648 

3649 def validate(self): 

3650 if self.data.get('bucket_key_enabled') is not None and self.data.get('key') is not None: 

3651 raise PolicyValidationError( 

3652 f'key and bucket_key_enabled attributes cannot both be set: {self.data}' 

3653 ) 

3654 

3655 def process(self, buckets, event=None): 

3656 self.resolve_keys(buckets) 

3657 results = [] 

3658 with self.executor_factory(max_workers=2) as w: 

3659 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3660 for future in as_completed(futures): 

3661 b = futures[future] 

3662 if future.exception(): 

3663 self.log.error("Message: %s Bucket: %s", future.exception(), 

3664 b['Name']) 

3665 continue 

3666 if future.result(): 

3667 results.append(b) 

3668 return results 

3669 

3670 def process_bucket(self, b): 

3671 

3672 client = bucket_client(local_session(self.manager.session_factory), b) 

3673 rules = [] 

3674 if self.annotation_key not in b: 

3675 try: 

3676 be = client.get_bucket_encryption(Bucket=b['Name']) 

3677 be.pop('ResponseMetadata', None) 

3678 except ClientError as e: 

3679 if e.response['Error']['Code'] != 'ServerSideEncryptionConfigurationNotFoundError': 

3680 raise 

3681 be = {} 

3682 b[self.annotation_key] = be 

3683 else: 

3684 be = b[self.annotation_key] 

3685 

3686 rules = be.get('ServerSideEncryptionConfiguration', {}).get('Rules', []) 

3687 # default `state` to True as previous impl assumed state == True 

3688 # to preserve backwards compatibility 

3689 if self.data.get('bucket_key_enabled'): 

3690 for rule in rules: 

3691 return self.filter_bucket_key_enabled(rule) 

3692 elif self.data.get('bucket_key_enabled') is False: 

3693 for rule in rules: 

3694 return not self.filter_bucket_key_enabled(rule) 

3695 

3696 if self.data.get('state', True): 

3697 for sse in rules: 

3698 return self.filter_bucket(b, sse) 

3699 return False 

3700 else: 

3701 for sse in rules: 

3702 return not self.filter_bucket(b, sse) 

3703 return True 

3704 

3705 def filter_bucket(self, b, sse): 

3706 allowed = ['AES256', 'aws:kms'] 

3707 key = self.get_key(b) 

3708 crypto = self.data.get('crypto') 

3709 rule = sse.get('ApplyServerSideEncryptionByDefault') 

3710 

3711 if not rule: 

3712 return False 

3713 algo = rule.get('SSEAlgorithm') 

3714 

3715 if not crypto and algo in allowed: 

3716 return True 

3717 

3718 if crypto == 'AES256' and algo == 'AES256': 

3719 return True 

3720 elif crypto == 'aws:kms' and algo == 'aws:kms': 

3721 if not key: 

3722 # There are two broad reasons to have an empty value for 

3723 # the regional key here: 

3724 # 

3725 # * The policy did not specify a key, in which case this 

3726 # filter should match _all_ buckets with a KMS default 

3727 # encryption rule. 

3728 # 

3729 # * The policy specified a key that could not be 

3730 # resolved, in which case this filter shouldn't match 

3731 # any buckets. 

3732 return 'key' not in self.data 

3733 

3734 # The default encryption rule can specify a key ID, 

3735 # key ARN, alias name or alias ARN. Match against any of 

3736 # those attributes. A rule specifying KMS with no master key 

3737 # implies the AWS-managed key. 

3738 key_ids = {key.get('Arn'), key.get('KeyId'), *key['Aliases']} 

3739 return rule.get('KMSMasterKeyID', 'alias/aws/s3') in key_ids 

3740 

3741 def filter_bucket_key_enabled(self, rule) -> bool: 

3742 if not rule: 

3743 return False 

3744 return rule.get('BucketKeyEnabled') 

3745 

3746 

3747@actions.register('set-bucket-encryption') 

3748class SetBucketEncryption(KMSKeyResolverMixin, BucketActionBase): 

3749 """Action enables default encryption on S3 buckets 

3750 

3751 `enabled`: boolean Optional: Defaults to True 

3752 

3753 `crypto`: aws:kms | AES256` Optional: Defaults to AES256 

3754 

3755 `key`: arn, alias, or kms id key 

3756 

3757 `bucket-key`: boolean Optional: 

3758 Defaults to True. 

3759 Reduces amount of API traffic from Amazon S3 to KMS and can reduce KMS request 

3760 costsby up to 99 percent. Requires kms:Decrypt permissions for copy and upload 

3761 on the AWS KMS Key Policy. 

3762 

3763 Bucket Key Docs: https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucket-key.html 

3764 

3765 :example: 

3766 

3767 .. code-block:: yaml 

3768 

3769 policies: 

3770 - name: s3-enable-default-encryption-kms 

3771 resource: s3 

3772 actions: 

3773 - type: set-bucket-encryption 

3774 # enabled: true <------ optional (true by default) 

3775 crypto: aws:kms 

3776 key: 1234abcd-12ab-34cd-56ef-1234567890ab 

3777 bucket-key: true 

3778 

3779 - name: s3-enable-default-encryption-kms-alias 

3780 resource: s3 

3781 actions: 

3782 - type: set-bucket-encryption 

3783 # enabled: true <------ optional (true by default) 

3784 crypto: aws:kms 

3785 key: alias/some/alias/key 

3786 bucket-key: true 

3787 

3788 - name: s3-enable-default-encryption-aes256 

3789 resource: s3 

3790 actions: 

3791 - type: set-bucket-encryption 

3792 # bucket-key: true <--- optional (true by default for AWS SSE) 

3793 # crypto: AES256 <----- optional (AES256 by default) 

3794 # enabled: true <------ optional (true by default) 

3795 

3796 - name: s3-disable-default-encryption 

3797 resource: s3 

3798 actions: 

3799 - type: set-bucket-encryption 

3800 enabled: false 

3801 """ 

3802 

3803 schema = { 

3804 'type': 'object', 

3805 'additionalProperties': False, 

3806 'properties': { 

3807 'type': {'enum': ['set-bucket-encryption']}, 

3808 'enabled': {'type': 'boolean'}, 

3809 'crypto': {'enum': ['aws:kms', 'AES256']}, 

3810 'key': {'type': 'string'}, 

3811 'bucket-key': {'type': 'boolean'} 

3812 }, 

3813 'dependencies': { 

3814 'key': { 

3815 'properties': { 

3816 'crypto': {'pattern': 'aws:kms'} 

3817 }, 

3818 'required': ['crypto'] 

3819 } 

3820 } 

3821 } 

3822 

3823 permissions = ('s3:PutEncryptionConfiguration', 's3:GetEncryptionConfiguration', 

3824 'kms:ListAliases', 'kms:DescribeKey') 

3825 

3826 def process(self, buckets): 

3827 if self.data.get('enabled', True): 

3828 self.resolve_keys(buckets) 

3829 

3830 with self.executor_factory(max_workers=3) as w: 

3831 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3832 for future in as_completed(futures): 

3833 if future.exception(): 

3834 self.log.error('Message: %s Bucket: %s', future.exception(), 

3835 futures[future]['Name']) 

3836 

3837 def process_bucket(self, bucket): 

3838 default_key_desc = 'Default master key that protects my S3 objects when no other key is defined' # noqa 

3839 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3840 if not self.data.get('enabled', True): 

3841 s3.delete_bucket_encryption(Bucket=bucket['Name']) 

3842 return 

3843 algo = self.data.get('crypto', 'AES256') 

3844 

3845 # bucket key defaults to True for alias/aws/s3 and AES256 (Amazon SSE) 

3846 # and ignores False values for that crypto 

3847 bucket_key = self.data.get('bucket-key', True) 

3848 config = { 

3849 'Rules': [ 

3850 { 

3851 'ApplyServerSideEncryptionByDefault': { 

3852 'SSEAlgorithm': algo, 

3853 }, 

3854 'BucketKeyEnabled': bucket_key 

3855 } 

3856 ] 

3857 } 

3858 

3859 if algo == 'aws:kms': 

3860 key = self.get_key(bucket) 

3861 if not key: 

3862 raise Exception('Valid KMS Key required but does not exist') 

3863 

3864 config['Rules'][0]['ApplyServerSideEncryptionByDefault']['KMSMasterKeyID'] = key['Arn'] 

3865 s3.put_bucket_encryption( 

3866 Bucket=bucket['Name'], 

3867 ServerSideEncryptionConfiguration=config 

3868 ) 

3869 

3870 

3871OWNERSHIP_CONTROLS = ['BucketOwnerEnforced', 'BucketOwnerPreferred', 'ObjectWriter'] 

3872VALUE_FILTER_MAGIC_VALUES = ['absent', 'present', 'not-null', 'empty'] 

3873 

3874 

3875@filters.register('ownership') 

3876class BucketOwnershipControls(BucketFilterBase, ValueFilter): 

3877 """Filter for object ownership controls 

3878 

3879 Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html 

3880 

3881 :example 

3882 

3883 Find buckets with ACLs disabled 

3884 

3885 .. code-block:: yaml 

3886 

3887 policies: 

3888 - name: s3-bucket-acls-disabled 

3889 resource: aws.s3 

3890 region: us-east-1 

3891 filters: 

3892 - type: ownership 

3893 value: BucketOwnerEnforced 

3894 

3895 :example 

3896 

3897 Find buckets with object ownership preferred or enforced 

3898 

3899 .. code-block:: yaml 

3900 

3901 policies: 

3902 - name: s3-bucket-ownership-preferred 

3903 resource: aws.s3 

3904 region: us-east-1 

3905 filters: 

3906 - type: ownership 

3907 op: in 

3908 value: 

3909 - BucketOwnerEnforced 

3910 - BucketOwnerPreferred 

3911 

3912 :example 

3913 

3914 Find buckets with no object ownership controls 

3915 

3916 .. code-block:: yaml 

3917 

3918 policies: 

3919 - name: s3-bucket-no-ownership-controls 

3920 resource: aws.s3 

3921 region: us-east-1 

3922 filters: 

3923 - type: ownership 

3924 value: empty 

3925 """ 

3926 schema = type_schema('ownership', rinherit=ValueFilter.schema, value={'oneOf': [ 

3927 {'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}, 

3928 {'type': 'array', 'items': { 

3929 'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}}]}) 

3930 permissions = ('s3:GetBucketOwnershipControls',) 

3931 annotation_key = 'c7n:ownership' 

3932 

3933 def __init__(self, data, manager=None): 

3934 super(BucketOwnershipControls, self).__init__(data, manager) 

3935 

3936 # Ownership controls appear as an array of rules. There can only be one 

3937 # ObjectOwnership rule defined for a bucket, so we can automatically 

3938 # match against that if it exists. 

3939 self.data['key'] = f'("{self.annotation_key}".Rules[].ObjectOwnership)[0]' 

3940 

3941 def process(self, buckets, event=None): 

3942 with self.executor_factory(max_workers=2) as w: 

3943 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3944 for future in as_completed(futures): 

3945 b = futures[future] 

3946 if future.exception(): 

3947 self.log.error("Message: %s Bucket: %s", future.exception(), 

3948 b['Name']) 

3949 continue 

3950 return super(BucketOwnershipControls, self).process(buckets, event) 

3951 

3952 def process_bucket(self, b): 

3953 if self.annotation_key in b: 

3954 return 

3955 client = bucket_client(local_session(self.manager.session_factory), b) 

3956 try: 

3957 controls = client.get_bucket_ownership_controls(Bucket=b['Name']) 

3958 controls.pop('ResponseMetadata', None) 

3959 except ClientError as e: 

3960 if e.response['Error']['Code'] != 'OwnershipControlsNotFoundError': 

3961 raise 

3962 controls = {} 

3963 b[self.annotation_key] = controls.get('OwnershipControls') 

3964 

3965 

3966@filters.register('bucket-replication') 

3967class BucketReplication(ListItemFilter): 

3968 """Filter for S3 buckets to look at bucket replication configurations 

3969 

3970 The schema to supply to the attrs follows the schema here: 

3971 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_bucket_replication.html 

3972 

3973 :example: 

3974 

3975 .. code-block:: yaml 

3976 

3977 policies: 

3978 - name: s3-bucket-replication 

3979 resource: s3 

3980 filters: 

3981 - type: bucket-replication 

3982 attrs: 

3983 - Status: Enabled 

3984 - Filter: 

3985 And: 

3986 Prefix: test 

3987 Tags: 

3988 - Key: Owner 

3989 Value: c7n 

3990 - ExistingObjectReplication: Enabled 

3991 

3992 """ 

3993 schema = type_schema( 

3994 'bucket-replication', 

3995 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'}, 

3996 count={'type': 'number'}, 

3997 count_op={'$ref': '#/definitions/filters_common/comparison_operators'} 

3998 ) 

3999 

4000 permissions = ("s3:GetReplicationConfiguration",) 

4001 annotation_key = 'Replication' 

4002 annotate_items = True 

4003 

4004 def __init__(self, data, manager=None): 

4005 super().__init__(data, manager) 

4006 self.data['key'] = self.annotation_key 

4007 

4008 def get_item_values(self, b): 

4009 client = bucket_client(local_session(self.manager.session_factory), b) 

4010 # replication configuration is called in S3_AUGMENT_TABLE: 

4011 bucket_replication = b.get(self.annotation_key) 

4012 

4013 rules = [] 

4014 if bucket_replication is not None: 

4015 rules = bucket_replication.get('ReplicationConfiguration', {}).get('Rules', []) 

4016 for replication in rules: 

4017 self.augment_bucket_replication(b, replication, client) 

4018 

4019 return rules 

4020 

4021 def augment_bucket_replication(self, b, replication, client): 

4022 destination_bucket = replication.get('Destination').get('Bucket').split(':')[5] 

4023 try: 

4024 destination_region = inspect_bucket_region(destination_bucket, client.meta.endpoint_url) 

4025 except ValueError: 

4026 replication['DestinationBucketAvailable'] = False 

4027 return 

4028 source_region = get_region(b) 

4029 replication['DestinationBucketAvailable'] = True 

4030 replication['DestinationRegion'] = destination_region 

4031 replication['CrossRegion'] = destination_region != source_region 

4032 

4033 

4034@resources.register('s3-directory') 

4035class S3Directory(query.QueryResourceManager): 

4036 

4037 class resource_type(query.TypeInfo): 

4038 service = 's3' 

4039 permission_prefix = "s3express" 

4040 arn_service = "s3express" 

4041 arn_type = 'bucket' 

4042 enum_spec = ('list_directory_buckets', 'Buckets[]', None) 

4043 name = id = 'Name' 

4044 date = 'CreationDate' 

4045 dimension = 'BucketName' 

4046 cfn_type = 'AWS::S3Express::DirectoryBucket' 

4047 permissions_enum = ("s3express:ListAllMyDirectoryBuckets",)