Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/s3.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1678 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3"""S3 Resource Manager 

4 

5Filters: 

6 

7The generic Values filters (jmespath) expression and Or filter are 

8available with all resources, including buckets, we include several 

9additonal bucket data (Tags, Replication, Acl, Policy) as keys within 

10a bucket representation. 

11 

12Actions: 

13 

14 encrypt-keys 

15 

16 Scan all keys in a bucket and optionally encrypt them in place. 

17 

18 global-grants 

19 

20 Check bucket acls for global grants 

21 

22 encryption-policy 

23 

24 Attach an encryption required policy to a bucket, this will break 

25 applications that are not using encryption, including aws log 

26 delivery. 

27 

28""" 

29import copy 

30import functools 

31import json 

32import itertools 

33import logging 

34import math 

35import os 

36import time 

37import ssl 

38 

39from botocore.client import Config 

40from botocore.exceptions import ClientError 

41 

42from collections import defaultdict 

43from concurrent.futures import as_completed 

44 

45try: 

46 from urllib3.exceptions import SSLError 

47except ImportError: 

48 from botocore.vendored.requests.packages.urllib3.exceptions import SSLError 

49 

50 

51from c7n.actions import ( 

52 ActionRegistry, BaseAction, PutMetric, RemovePolicyBase) 

53from c7n.exceptions import PolicyValidationError, PolicyExecutionError 

54from c7n.filters import ( 

55 FilterRegistry, Filter, CrossAccountAccessFilter, MetricsFilter, 

56 ValueFilter, ListItemFilter) 

57from .aws import shape_validate 

58from c7n.filters.policystatement import HasStatementFilter 

59from c7n.manager import resources 

60from c7n.output import NullBlobOutput 

61from c7n import query 

62from c7n.resources.securityhub import PostFinding 

63from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction 

64from c7n.utils import ( 

65 chunks, local_session, set_annotation, type_schema, filter_empty, 

66 dumps, format_string_values, get_account_alias_from_sts) 

67from c7n.resources.aws import inspect_bucket_region 

68 

69 

70log = logging.getLogger('custodian.s3') 

71 

72filters = FilterRegistry('s3.filters') 

73actions = ActionRegistry('s3.actions') 

74filters.register('marked-for-op', TagActionFilter) 

75actions.register('put-metric', PutMetric) 

76 

77MAX_COPY_SIZE = 1024 * 1024 * 1024 * 2 

78 

79 

80class DescribeS3(query.DescribeSource): 

81 

82 def augment(self, buckets): 

83 with self.manager.executor_factory( 

84 max_workers=min((10, len(buckets) + 1))) as w: 

85 results = w.map( 

86 assemble_bucket, 

87 zip(itertools.repeat(self.manager.session_factory), buckets)) 

88 results = list(filter(None, results)) 

89 return results 

90 

91 

92class ConfigS3(query.ConfigSource): 

93 

94 # normalize config's janky idiosyncratic bespoke formating to the 

95 # standard describe api responses. 

96 

97 def get_query_params(self, query): 

98 q = super(ConfigS3, self).get_query_params(query) 

99 if 'expr' in q: 

100 q['expr'] = q['expr'].replace('select ', 'select awsRegion, ') 

101 return q 

102 

103 def load_resource(self, item): 

104 resource = super(ConfigS3, self).load_resource(item) 

105 cfg = item['supplementaryConfiguration'] 

106 # aka standard 

107 if 'awsRegion' in item and item['awsRegion'] != 'us-east-1': 

108 resource['Location'] = {'LocationConstraint': item['awsRegion']} 

109 else: 

110 resource['Location'] = {} 

111 

112 # owner is under acl per describe 

113 resource.pop('Owner', None) 

114 

115 for k, null_value in S3_CONFIG_SUPPLEMENT_NULL_MAP.items(): 

116 if k not in cfg: 

117 continue 

118 if cfg.get(k) == null_value: 

119 continue 

120 method = getattr(self, "handle_%s" % k, None) 

121 if method is None: 

122 raise ValueError("unhandled supplementary config %s", k) 

123 continue 

124 v = cfg[k] 

125 if isinstance(cfg[k], str): 

126 v = json.loads(cfg[k]) 

127 method(resource, v) 

128 

129 for el in S3_AUGMENT_TABLE: 

130 if el[1] not in resource: 

131 resource[el[1]] = el[2] 

132 return resource 

133 

134 PERMISSION_MAP = { 

135 'FullControl': 'FULL_CONTROL', 

136 'Write': 'WRITE', 

137 'WriteAcp': 'WRITE_ACP', 

138 'Read': 'READ', 

139 'ReadAcp': 'READ_ACP'} 

140 

141 GRANTEE_MAP = { 

142 'AllUsers': "http://acs.amazonaws.com/groups/global/AllUsers", 

143 'AuthenticatedUsers': "http://acs.amazonaws.com/groups/global/AuthenticatedUsers", 

144 'LogDelivery': 'http://acs.amazonaws.com/groups/s3/LogDelivery'} 

145 

146 def handle_AccessControlList(self, resource, item_value): 

147 # double serialized in config for some reason 

148 if isinstance(item_value, str): 

149 item_value = json.loads(item_value) 

150 

151 resource['Acl'] = {} 

152 resource['Acl']['Owner'] = {'ID': item_value['owner']['id']} 

153 if item_value['owner']['displayName']: 

154 resource['Acl']['Owner']['DisplayName'] = item_value[ 

155 'owner']['displayName'] 

156 resource['Acl']['Grants'] = grants = [] 

157 

158 for g in (item_value.get('grantList') or ()): 

159 if 'id' not in g['grantee']: 

160 assert g['grantee'] in self.GRANTEE_MAP, "unknown grantee %s" % g 

161 rg = {'Type': 'Group', 'URI': self.GRANTEE_MAP[g['grantee']]} 

162 else: 

163 rg = {'ID': g['grantee']['id'], 'Type': 'CanonicalUser'} 

164 

165 if 'displayName' in g: 

166 rg['DisplayName'] = g['displayName'] 

167 

168 grants.append({ 

169 'Permission': self.PERMISSION_MAP[g['permission']], 

170 'Grantee': rg, 

171 }) 

172 

173 def handle_BucketAccelerateConfiguration(self, resource, item_value): 

174 # not currently auto-augmented by custodian 

175 return 

176 

177 def handle_BucketLoggingConfiguration(self, resource, item_value): 

178 if ('destinationBucketName' not in item_value or 

179 item_value['destinationBucketName'] is None): 

180 resource[u'Logging'] = {} 

181 return 

182 resource[u'Logging'] = { 

183 'TargetBucket': item_value['destinationBucketName'], 

184 'TargetPrefix': item_value['logFilePrefix']} 

185 

186 def handle_BucketLifecycleConfiguration(self, resource, item_value): 

187 rules = [] 

188 for r in item_value.get('rules'): 

189 rr = {} 

190 rules.append(rr) 

191 expiry = {} 

192 for ek, ck in ( 

193 ('Date', 'expirationDate'), 

194 ('ExpiredObjectDeleteMarker', 'expiredObjectDeleteMarker'), 

195 ('Days', 'expirationInDays')): 

196 if ck in r and r[ck] and r[ck] != -1: 

197 expiry[ek] = r[ck] 

198 if expiry: 

199 rr['Expiration'] = expiry 

200 

201 transitions = [] 

202 for t in (r.get('transitions') or ()): 

203 tr = {} 

204 for k in ('date', 'days', 'storageClass'): 

205 if t.get(k): 

206 tr["%s%s" % (k[0].upper(), k[1:])] = t[k] 

207 transitions.append(tr) 

208 if transitions: 

209 rr['Transitions'] = transitions 

210 

211 if r.get('abortIncompleteMultipartUpload'): 

212 rr['AbortIncompleteMultipartUpload'] = { 

213 'DaysAfterInitiation': r[ 

214 'abortIncompleteMultipartUpload']['daysAfterInitiation']} 

215 if r.get('noncurrentVersionExpirationInDays'): 

216 rr['NoncurrentVersionExpiration'] = { 

217 'NoncurrentDays': r['noncurrentVersionExpirationInDays']} 

218 

219 nonc_transitions = [] 

220 for t in (r.get('noncurrentVersionTransitions') or ()): 

221 nonc_transitions.append({ 

222 'NoncurrentDays': t['days'], 

223 'StorageClass': t['storageClass']}) 

224 if nonc_transitions: 

225 rr['NoncurrentVersionTransitions'] = nonc_transitions 

226 

227 rr['Status'] = r['status'] 

228 rr['ID'] = r['id'] 

229 if r.get('prefix'): 

230 rr['Prefix'] = r['prefix'] 

231 if 'filter' not in r or not r['filter']: 

232 continue 

233 

234 if r['filter']['predicate']: 

235 rr['Filter'] = self.convertLifePredicate(r['filter']['predicate']) 

236 

237 resource['Lifecycle'] = {'Rules': rules} 

238 

239 def convertLifePredicate(self, p): 

240 if p['type'] == 'LifecyclePrefixPredicate': 

241 return {'Prefix': p['prefix']} 

242 if p['type'] == 'LifecycleTagPredicate': 

243 return {'Tags': [{'Key': p['tag']['key'], 'Value': p['tag']['value']}]} 

244 if p['type'] == 'LifecycleAndOperator': 

245 n = {} 

246 for o in p['operands']: 

247 ot = self.convertLifePredicate(o) 

248 if 'Tags' in n and 'Tags' in ot: 

249 n['Tags'].extend(ot['Tags']) 

250 else: 

251 n.update(ot) 

252 return {'And': n} 

253 

254 raise ValueError("unknown predicate: %s" % p) 

255 

256 NotifyTypeMap = { 

257 'QueueConfiguration': 'QueueConfigurations', 

258 'LambdaConfiguration': 'LambdaFunctionConfigurations', 

259 'CloudFunctionConfiguration': 'LambdaFunctionConfigurations', 

260 'TopicConfiguration': 'TopicConfigurations'} 

261 

262 def handle_BucketNotificationConfiguration(self, resource, item_value): 

263 d = {} 

264 for nid, n in item_value['configurations'].items(): 

265 ninfo = {} 

266 d.setdefault(self.NotifyTypeMap[n['type']], []).append(ninfo) 

267 if n['type'] == 'QueueConfiguration': 

268 ninfo['QueueArn'] = n['queueARN'] 

269 elif n['type'] == 'TopicConfiguration': 

270 ninfo['TopicArn'] = n['topicARN'] 

271 elif n['type'] == 'LambdaConfiguration': 

272 ninfo['LambdaFunctionArn'] = n['functionARN'] 

273 ninfo['Id'] = nid 

274 ninfo['Events'] = n['events'] 

275 rules = [] 

276 if n['filter']: 

277 for r in n['filter'].get('s3KeyFilter', {}).get('filterRules', []): 

278 rules.append({'Name': r['name'], 'Value': r['value']}) 

279 if rules: 

280 ninfo['Filter'] = {'Key': {'FilterRules': rules}} 

281 resource['Notification'] = d 

282 

283 def handle_BucketReplicationConfiguration(self, resource, item_value): 

284 d = {'Role': item_value['roleARN'], 'Rules': []} 

285 for rid, r in item_value['rules'].items(): 

286 rule = { 

287 'ID': rid, 

288 'Status': r.get('status', ''), 

289 'Prefix': r.get('prefix', ''), 

290 'Destination': { 

291 'Bucket': r['destinationConfig']['bucketARN']} 

292 } 

293 if 'Account' in r['destinationConfig']: 

294 rule['Destination']['Account'] = r['destinationConfig']['Account'] 

295 if r['destinationConfig'].get('storageClass'): 

296 rule['Destination']['StorageClass'] = r['destinationConfig']['storageClass'] 

297 d['Rules'].append(rule) 

298 resource['Replication'] = {'ReplicationConfiguration': d} 

299 

300 def handle_BucketPolicy(self, resource, item_value): 

301 resource['Policy'] = item_value.get('policyText') 

302 

303 def handle_BucketTaggingConfiguration(self, resource, item_value): 

304 resource['Tags'] = [ 

305 {"Key": k, "Value": v} for k, v in item_value['tagSets'][0]['tags'].items()] 

306 

307 def handle_BucketVersioningConfiguration(self, resource, item_value): 

308 # Config defaults versioning to 'Off' for a null value 

309 if item_value['status'] not in ('Enabled', 'Suspended'): 

310 resource['Versioning'] = {} 

311 return 

312 resource['Versioning'] = {'Status': item_value['status']} 

313 # `isMfaDeleteEnabled` is an optional boolean property - the key may be absent, 

314 # present with a null value, or present with a boolean value. 

315 # Mirror the describe source by populating Versioning.MFADelete only in the 

316 # boolean case. 

317 mfa_delete = item_value.get('isMfaDeleteEnabled') 

318 if mfa_delete is None: 

319 return 

320 resource['Versioning']['MFADelete'] = ( 

321 'Enabled' if mfa_delete else 'Disabled' 

322 ) 

323 

324 def handle_BucketWebsiteConfiguration(self, resource, item_value): 

325 website = {} 

326 if item_value['indexDocumentSuffix']: 

327 website['IndexDocument'] = { 

328 'Suffix': item_value['indexDocumentSuffix']} 

329 if item_value['errorDocument']: 

330 website['ErrorDocument'] = { 

331 'Key': item_value['errorDocument']} 

332 if item_value['redirectAllRequestsTo']: 

333 website['RedirectAllRequestsTo'] = { 

334 'HostName': item_value['redirectAllRequestsTo']['hostName'], 

335 'Protocol': item_value['redirectAllRequestsTo']['protocol']} 

336 for r in item_value['routingRules']: 

337 redirect = {} 

338 rule = {'Redirect': redirect} 

339 website.setdefault('RoutingRules', []).append(rule) 

340 if 'condition' in r: 

341 cond = {} 

342 for ck, rk in ( 

343 ('keyPrefixEquals', 'KeyPrefixEquals'), 

344 ('httpErrorCodeReturnedEquals', 

345 'HttpErrorCodeReturnedEquals')): 

346 if r['condition'][ck]: 

347 cond[rk] = r['condition'][ck] 

348 rule['Condition'] = cond 

349 for ck, rk in ( 

350 ('protocol', 'Protocol'), 

351 ('hostName', 'HostName'), 

352 ('replaceKeyPrefixWith', 'ReplaceKeyPrefixWith'), 

353 ('replaceKeyWith', 'ReplaceKeyWith'), 

354 ('httpRedirectCode', 'HttpRedirectCode')): 

355 if r['redirect'][ck]: 

356 redirect[rk] = r['redirect'][ck] 

357 resource['Website'] = website 

358 

359 

360@resources.register('s3') 

361class S3(query.QueryResourceManager): 

362 

363 class resource_type(query.TypeInfo): 

364 service = 's3' 

365 arn_type = '' 

366 enum_spec = ('list_buckets', 'Buckets[]', None) 

367 # not used but we want some consistency on the metadata 

368 detail_spec = ('get_bucket_location', 'Bucket', 'Name', 'LocationConstraint') 

369 permissions_augment = ( 

370 "s3:GetBucketAcl", 

371 "s3:GetBucketLocation", 

372 "s3:GetBucketPolicy", 

373 "s3:GetBucketTagging", 

374 "s3:GetBucketVersioning", 

375 "s3:GetBucketLogging", 

376 "s3:GetBucketNotification", 

377 "s3:GetBucketWebsite", 

378 "s3:GetLifecycleConfiguration", 

379 "s3:GetReplicationConfiguration" 

380 ) 

381 name = id = 'Name' 

382 date = 'CreationDate' 

383 dimension = 'BucketName' 

384 cfn_type = config_type = 'AWS::S3::Bucket' 

385 

386 filter_registry = filters 

387 action_registry = actions 

388 source_mapping = { 

389 'describe': DescribeS3, 

390 'config': ConfigS3 

391 } 

392 

393 def get_arns(self, resources): 

394 return ["arn:aws:s3:::{}".format(r["Name"]) for r in resources] 

395 

396 @classmethod 

397 def get_permissions(cls): 

398 perms = ["s3:ListAllMyBuckets"] 

399 perms.extend([n[-1] for n in S3_AUGMENT_TABLE]) 

400 return perms 

401 

402 

403S3_CONFIG_SUPPLEMENT_NULL_MAP = { 

404 'BucketLoggingConfiguration': u'{"destinationBucketName":null,"logFilePrefix":null}', 

405 'BucketPolicy': u'{"policyText":null}', 

406 'BucketVersioningConfiguration': u'{"status":"Off","isMfaDeleteEnabled":null}', 

407 'BucketAccelerateConfiguration': u'{"status":null}', 

408 'BucketNotificationConfiguration': u'{"configurations":{}}', 

409 'BucketLifecycleConfiguration': None, 

410 'AccessControlList': None, 

411 'BucketTaggingConfiguration': None, 

412 'BucketWebsiteConfiguration': None, 

413 'BucketReplicationConfiguration': None 

414} 

415 

416S3_AUGMENT_TABLE = ( 

417 ('get_bucket_location', 'Location', {}, None, 's3:GetBucketLocation'), 

418 ('get_bucket_tagging', 'Tags', [], 'TagSet', 's3:GetBucketTagging'), 

419 ('get_bucket_policy', 'Policy', None, 'Policy', 's3:GetBucketPolicy'), 

420 ('get_bucket_acl', 'Acl', None, None, 's3:GetBucketAcl'), 

421 ('get_bucket_replication', 

422 'Replication', None, None, 's3:GetReplicationConfiguration'), 

423 ('get_bucket_versioning', 'Versioning', None, None, 's3:GetBucketVersioning'), 

424 ('get_bucket_website', 'Website', None, None, 's3:GetBucketWebsite'), 

425 ('get_bucket_logging', 'Logging', None, 'LoggingEnabled', 's3:GetBucketLogging'), 

426 ('get_bucket_notification_configuration', 

427 'Notification', None, None, 's3:GetBucketNotification'), 

428 ('get_bucket_lifecycle_configuration', 

429 'Lifecycle', None, None, 's3:GetLifecycleConfiguration'), 

430 # ('get_bucket_cors', 'Cors'), 

431) 

432 

433 

434def assemble_bucket(item): 

435 """Assemble a document representing all the config state around a bucket. 

436 

437 TODO: Refactor this, the logic here feels quite muddled. 

438 """ 

439 factory, b = item 

440 s = factory() 

441 c = s.client('s3') 

442 # Bucket Location, Current Client Location, Default Location 

443 b_location = c_location = location = "us-east-1" 

444 methods = list(S3_AUGMENT_TABLE) 

445 for minfo in methods: 

446 m, k, default, select = minfo[:4] 

447 try: 

448 method = getattr(c, m) 

449 v = method(Bucket=b['Name']) 

450 v.pop('ResponseMetadata') 

451 if select is not None and select in v: 

452 v = v[select] 

453 except (ssl.SSLError, SSLError) as e: 

454 # Proxy issues? i assume 

455 log.warning("Bucket ssl error %s: %s %s", 

456 b['Name'], b.get('Location', 'unknown'), 

457 e) 

458 continue 

459 except ClientError as e: 

460 code = e.response['Error']['Code'] 

461 if code.startswith("NoSuch") or "NotFound" in code: 

462 v = default 

463 elif code == 'PermanentRedirect': 

464 s = factory() 

465 c = bucket_client(s, b) 

466 # Requeue with the correct region given location constraint 

467 methods.append((m, k, default, select)) 

468 continue 

469 else: 

470 log.warning( 

471 "Bucket:%s unable to invoke method:%s error:%s ", 

472 b['Name'], m, e.response['Error']['Message']) 

473 # For auth failures, we don't bail out, continue processing if we can. 

474 # Note this can lead to missing data, but in general is cleaner than 

475 # failing hard, due to the common use of locked down s3 bucket policies 

476 # that may cause issues fetching information across a fleet of buckets. 

477 

478 # This does mean s3 policies depending on augments should check denied 

479 # methods annotation, generally though lacking get access to an augment means 

480 # they won't have write access either. 

481 

482 # For other error types we raise and bail policy execution. 

483 if e.response['Error']['Code'] == 'AccessDenied': 

484 b.setdefault('c7n:DeniedMethods', []).append(m) 

485 continue 

486 raise 

487 # As soon as we learn location (which generally works) 

488 if k == 'Location' and v is not None: 

489 b_location = v.get('LocationConstraint') 

490 # Location == region for all cases but EU 

491 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlocation.html 

492 if b_location is None: 

493 b_location = "us-east-1" 

494 elif b_location == 'EU': 

495 b_location = "eu-west-1" 

496 v['LocationConstraint'] = 'eu-west-1' 

497 if v and v != c_location: 

498 c = s.client('s3', region_name=b_location) 

499 elif c_location != location: 

500 c = s.client('s3', region_name=location) 

501 b[k] = v 

502 return b 

503 

504 

505def bucket_client(session, b, kms=False): 

506 region = get_region(b) 

507 

508 if kms: 

509 # Need v4 signature for aws:kms crypto, else let the sdk decide 

510 # based on region support. 

511 config = Config( 

512 signature_version='s3v4', 

513 read_timeout=200, connect_timeout=120) 

514 else: 

515 config = Config(read_timeout=200, connect_timeout=120) 

516 return session.client('s3', region_name=region, config=config) 

517 

518 

519def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()): 

520 for bucket in buckets: 

521 client = bucket_client(local_session(session_factory), bucket) 

522 # Bucket tags are set atomically for the set/document, we want 

523 # to refetch against current to guard against any staleness in 

524 # our cached representation across multiple policies or concurrent 

525 # modifications. 

526 

527 if 'get_bucket_tagging' in bucket.get('c7n:DeniedMethods', []): 

528 # avoid the additional API call if we already know that it's going 

529 # to result in AccessDenied. The chances that the resource's perms 

530 # would have changed between fetching the resource and acting on it 

531 # here are pretty low-- so the check here should suffice. 

532 log.warning( 

533 "Unable to get new set of bucket tags needed to modify tags," 

534 "skipping tag action for bucket: %s" % bucket["Name"]) 

535 continue 

536 

537 try: 

538 bucket['Tags'] = client.get_bucket_tagging( 

539 Bucket=bucket['Name']).get('TagSet', []) 

540 except ClientError as e: 

541 if e.response['Error']['Code'] != 'NoSuchTagSet': 

542 raise 

543 bucket['Tags'] = [] 

544 

545 new_tags = {t['Key']: t['Value'] for t in add_tags} 

546 for t in bucket.get('Tags', ()): 

547 if (t['Key'] not in new_tags and t['Key'] not in remove_tags): 

548 new_tags[t['Key']] = t['Value'] 

549 tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()] 

550 

551 try: 

552 client.put_bucket_tagging( 

553 Bucket=bucket['Name'], Tagging={'TagSet': tag_set}) 

554 except ClientError as e: 

555 log.exception( 

556 'Exception tagging bucket %s: %s', bucket['Name'], e) 

557 continue 

558 

559 

560def get_region(b): 

561 """Tries to get the bucket region from Location.LocationConstraint 

562 

563 Special cases: 

564 LocationConstraint EU defaults to eu-west-1 

565 LocationConstraint null defaults to us-east-1 

566 

567 Args: 

568 b (object): A bucket object 

569 

570 Returns: 

571 string: an aws region string 

572 """ 

573 remap = {None: 'us-east-1', 'EU': 'eu-west-1'} 

574 region = b.get('Location', {}).get('LocationConstraint') 

575 return remap.get(region, region) 

576 

577 

578@filters.register('metrics') 

579class S3Metrics(MetricsFilter): 

580 """S3 CW Metrics need special handling for attribute/dimension 

581 mismatch, and additional required dimension. 

582 """ 

583 

584 def get_dimensions(self, resource): 

585 dims = [{'Name': 'BucketName', 'Value': resource['Name']}] 

586 if (self.data['name'] == 'NumberOfObjects' and 

587 'dimensions' not in self.data): 

588 dims.append( 

589 {'Name': 'StorageType', 'Value': 'AllStorageTypes'}) 

590 return dims 

591 

592 

593@filters.register('cross-account') 

594class S3CrossAccountFilter(CrossAccountAccessFilter): 

595 """Filters cross-account access to S3 buckets 

596 

597 :example: 

598 

599 .. code-block:: yaml 

600 

601 policies: 

602 - name: s3-acl 

603 resource: s3 

604 region: us-east-1 

605 filters: 

606 - type: cross-account 

607 """ 

608 permissions = ('s3:GetBucketPolicy',) 

609 

610 def get_accounts(self): 

611 """add in elb access by default 

612 

613 ELB Accounts by region 

614 https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-access-logs.html 

615 

616 Redshift Accounts by region 

617 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#db-auditing-manage-log-files 

618 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#rs-db-auditing-cloud-trail-rs-acct-ids 

619 

620 Cloudtrail Accounts by region 

621 https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-supported-regions.html 

622 """ 

623 accounts = super(S3CrossAccountFilter, self).get_accounts() 

624 return accounts.union( 

625 [ 

626 # ELB accounts 

627 '127311923021', # us-east-1 

628 '033677994240', # us-east-2 

629 '027434742980', # us-west-1 

630 '797873946194', # us-west-2 

631 '098369216593', # af-south-1 

632 '985666609251', # ca-central-1 

633 '054676820928', # eu-central-1 

634 '897822967062', # eu-north-1 

635 '635631232127', # eu-south-1 

636 '156460612806', # eu-west-1 

637 '652711504416', # eu-west-2 

638 '009996457667', # eu-west-3 

639 '754344448648', # ap-east-1 

640 '582318560864', # ap-northeast-1 

641 '600734575887', # ap-northeast-2 

642 '383597477331', # ap-northeast-3 

643 '114774131450', # ap-southeast-1 

644 '783225319266', # ap-southeast-2 

645 '718504428378', # ap-south-1 

646 '076674570225', # me-south-1 

647 '507241528517', # sa-east-1 

648 '048591011584', # us-gov-west-1 or gov-cloud-1 

649 '190560391635', # us-gov-east-1 

650 '638102146993', # cn-north-1 

651 '037604701340', # cn-northwest-1 

652 

653 # Redshift audit logging 

654 '193672423079', # us-east-1 

655 '391106570357', # us-east-2 

656 '262260360010', # us-west-1 

657 '902366379725', # us-west-2 

658 '365689465814', # af-south-1 

659 '313564881002', # ap-east-1 

660 '865932855811', # ap-south-1 

661 '090321488786', # ap-northeast-3 

662 '760740231472', # ap-northeast-2 

663 '361669875840', # ap-southeast-1 

664 '762762565011', # ap-southeast-2 

665 '404641285394', # ap-northeast-1 

666 '907379612154', # ca-central-1 

667 '053454850223', # eu-central-1 

668 '210876761215', # eu-west-1 

669 '307160386991', # eu-west-2 

670 '945612479654', # eu-south-1 

671 '915173422425', # eu-west-3 

672 '729911121831', # eu-north-1 

673 '013126148197', # me-south-1 

674 '075028567923', # sa-east-1 

675 

676 # Cloudtrail accounts (psa. folks should be using 

677 # cloudtrail service in bucket policies) 

678 '086441151436', # us-east-1 

679 '475085895292', # us-west-2 

680 '388731089494', # us-west-1 

681 '113285607260', # us-west-2 

682 '819402241893', # ca-central-1 

683 '977081816279', # ap-south-1 

684 '492519147666', # ap-northeast-2 

685 '903692715234', # ap-southeast-1 

686 '284668455005', # ap-southeast-2 

687 '216624486486', # ap-northeast-1 

688 '035351147821', # eu-central-1 

689 '859597730677', # eu-west-1 

690 '282025262664', # eu-west-2 

691 '814480443879', # sa-east-1 

692 ]) 

693 

694 

695@filters.register('global-grants') 

696class GlobalGrantsFilter(Filter): 

697 """Filters for all S3 buckets that have global-grants 

698 

699 *Note* by default this filter allows for read access 

700 if the bucket has been configured as a website. This 

701 can be disabled per the example below. 

702 

703 :example: 

704 

705 .. code-block:: yaml 

706 

707 policies: 

708 - name: remove-global-grants 

709 resource: s3 

710 filters: 

711 - type: global-grants 

712 allow_website: false 

713 actions: 

714 - delete-global-grants 

715 

716 """ 

717 

718 schema = type_schema( 

719 'global-grants', 

720 allow_website={'type': 'boolean'}, 

721 operator={'type': 'string', 'enum': ['or', 'and']}, 

722 permissions={ 

723 'type': 'array', 'items': { 

724 'type': 'string', 'enum': [ 

725 'READ', 'WRITE', 'WRITE_ACP', 'READ_ACP', 'FULL_CONTROL']}}) 

726 

727 GLOBAL_ALL = "http://acs.amazonaws.com/groups/global/AllUsers" 

728 AUTH_ALL = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" 

729 

730 def process(self, buckets, event=None): 

731 with self.executor_factory(max_workers=5) as w: 

732 results = w.map(self.process_bucket, buckets) 

733 results = list(filter(None, list(results))) 

734 return results 

735 

736 def process_bucket(self, b): 

737 acl = b.get('Acl', {'Grants': []}) 

738 if not acl or not acl['Grants']: 

739 return 

740 

741 results = [] 

742 allow_website = self.data.get('allow_website', True) 

743 perms = self.data.get('permissions', []) 

744 

745 for grant in acl['Grants']: 

746 if 'URI' not in grant.get("Grantee", {}): 

747 continue 

748 if grant['Grantee']['URI'] not in [self.AUTH_ALL, self.GLOBAL_ALL]: 

749 continue 

750 if allow_website and grant['Permission'] == 'READ' and b['Website']: 

751 continue 

752 if not perms or (perms and grant['Permission'] in perms): 

753 results.append(grant['Permission']) 

754 

755 if results: 

756 set_annotation(b, 'GlobalPermissions', results) 

757 return b 

758 

759 

760class BucketActionBase(BaseAction): 

761 

762 def get_permissions(self): 

763 return self.permissions 

764 

765 def get_std_format_args(self, bucket): 

766 return { 

767 'account_id': self.manager.config.account_id, 

768 'region': self.manager.config.region, 

769 'bucket_name': bucket['Name'], 

770 'bucket_region': get_region(bucket) 

771 } 

772 

773 def process(self, buckets): 

774 return self._process_with_futures(buckets) 

775 

776 def _process_with_futures(self, buckets, *args, max_workers=3, **kwargs): 

777 errors = 0 

778 results = [] 

779 with self.executor_factory(max_workers=max_workers) as w: 

780 futures = {} 

781 for b in buckets: 

782 futures[w.submit(self.process_bucket, b, *args, **kwargs)] = b 

783 for f in as_completed(futures): 

784 if f.exception(): 

785 b = futures[f] 

786 self.log.error( 

787 'error modifying bucket: policy:%s action:%s bucket:%s error:%s', 

788 self.manager.data.get('name'), self.name, b['Name'], f.exception() 

789 ) 

790 errors += 1 

791 continue 

792 results += filter(None, [f.result()]) 

793 if errors: 

794 self.log.error('encountered %d errors while processing %s', errors, self.name) 

795 raise PolicyExecutionError('%d resources failed', errors) 

796 return results 

797 

798 

799class BucketFilterBase(Filter): 

800 def get_std_format_args(self, bucket): 

801 return { 

802 'account_id': self.manager.config.account_id, 

803 'region': self.manager.config.region, 

804 'bucket_name': bucket['Name'], 

805 'bucket_region': get_region(bucket) 

806 } 

807 

808 

809@S3.action_registry.register("post-finding") 

810class BucketFinding(PostFinding): 

811 

812 resource_type = 'AwsS3Bucket' 

813 

814 def format_resource(self, r): 

815 owner = r.get("Acl", {}).get("Owner", {}) 

816 resource = { 

817 "Type": self.resource_type, 

818 "Id": "arn:aws:s3:::{}".format(r["Name"]), 

819 "Region": get_region(r), 

820 "Tags": {t["Key"]: t["Value"] for t in r.get("Tags", [])}, 

821 "Details": {self.resource_type: { 

822 "OwnerId": owner.get('ID', 'Unknown')}} 

823 } 

824 

825 if "DisplayName" in owner: 

826 resource["Details"]["AwsS3Bucket"]["OwnerName"] = owner['DisplayName'] 

827 

828 return filter_empty(resource) 

829 

830 

831@S3.filter_registry.register('has-statement') 

832class S3HasStatementFilter(HasStatementFilter): 

833 def get_std_format_args(self, bucket): 

834 return { 

835 'account_id': self.manager.config.account_id, 

836 'region': self.manager.config.region, 

837 'bucket_name': bucket['Name'], 

838 'bucket_region': get_region(bucket) 

839 } 

840 

841 

842@S3.filter_registry.register('lock-configuration') 

843class S3LockConfigurationFilter(ValueFilter): 

844 """ 

845 Filter S3 buckets based on their object lock configurations 

846 

847 :example: 

848 

849 Get all buckets where lock configuration mode is COMPLIANCE 

850 

851 .. code-block:: yaml 

852 

853 policies: 

854 - name: lock-configuration-compliance 

855 resource: aws.s3 

856 filters: 

857 - type: lock-configuration 

858 key: Rule.DefaultRetention.Mode 

859 value: COMPLIANCE 

860 

861 """ 

862 schema = type_schema('lock-configuration', rinherit=ValueFilter.schema) 

863 permissions = ('s3:GetBucketObjectLockConfiguration',) 

864 annotate = True 

865 annotation_key = 'c7n:ObjectLockConfiguration' 

866 

867 def _process_resource(self, client, resource): 

868 try: 

869 config = client.get_object_lock_configuration( 

870 Bucket=resource['Name'] 

871 )['ObjectLockConfiguration'] 

872 except ClientError as e: 

873 if e.response['Error']['Code'] == 'ObjectLockConfigurationNotFoundError': 

874 config = None 

875 else: 

876 raise 

877 resource[self.annotation_key] = config 

878 

879 def process(self, resources, event=None): 

880 client = local_session(self.manager.session_factory).client('s3') 

881 with self.executor_factory(max_workers=3) as w: 

882 futures = [] 

883 for res in resources: 

884 if self.annotation_key in res: 

885 continue 

886 futures.append(w.submit(self._process_resource, client, res)) 

887 for f in as_completed(futures): 

888 exc = f.exception() 

889 if exc: 

890 self.log.error( 

891 "Exception getting bucket lock configuration \n %s" % ( 

892 exc)) 

893 return super().process(resources, event) 

894 

895 def __call__(self, r): 

896 return super().__call__(r.setdefault(self.annotation_key, None)) 

897 

898 

899ENCRYPTION_STATEMENT_GLOB = { 

900 'Effect': 'Deny', 

901 'Principal': '*', 

902 'Action': 's3:PutObject', 

903 "Condition": { 

904 "StringNotEquals": { 

905 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}} 

906 

907 

908@filters.register('no-encryption-statement') 

909class EncryptionEnabledFilter(Filter): 

910 """Find buckets with missing encryption policy statements. 

911 

912 :example: 

913 

914 .. code-block:: yaml 

915 

916 policies: 

917 - name: s3-bucket-not-encrypted 

918 resource: s3 

919 filters: 

920 - type: no-encryption-statement 

921 """ 

922 schema = type_schema( 

923 'no-encryption-statement') 

924 

925 def get_permissions(self): 

926 perms = self.manager.get_resource_manager('s3').get_permissions() 

927 return perms 

928 

929 def process(self, buckets, event=None): 

930 return list(filter(None, map(self.process_bucket, buckets))) 

931 

932 def process_bucket(self, b): 

933 p = b.get('Policy') 

934 if p is None: 

935 return b 

936 p = json.loads(p) 

937 encryption_statement = dict(ENCRYPTION_STATEMENT_GLOB) 

938 

939 statements = p.get('Statement', []) 

940 check = False 

941 for s in list(statements): 

942 if 'Sid' in s: 

943 encryption_statement["Sid"] = s["Sid"] 

944 if 'Resource' in s: 

945 encryption_statement["Resource"] = s["Resource"] 

946 if s == encryption_statement: 

947 check = True 

948 break 

949 if check: 

950 return None 

951 else: 

952 return b 

953 

954 

955@filters.register('missing-statement') 

956@filters.register('missing-policy-statement') 

957class MissingPolicyStatementFilter(Filter): 

958 """Find buckets missing a set of named policy statements. 

959 

960 :example: 

961 

962 .. code-block:: yaml 

963 

964 policies: 

965 - name: s3-bucket-missing-statement 

966 resource: s3 

967 filters: 

968 - type: missing-statement 

969 statement_ids: 

970 - RequiredEncryptedPutObject 

971 """ 

972 

973 schema = type_schema( 

974 'missing-policy-statement', 

975 aliases=('missing-statement',), 

976 statement_ids={'type': 'array', 'items': {'type': 'string'}}) 

977 

978 def __call__(self, b): 

979 p = b.get('Policy') 

980 if p is None: 

981 return b 

982 

983 p = json.loads(p) 

984 

985 required = list(self.data.get('statement_ids', [])) 

986 statements = p.get('Statement', []) 

987 for s in list(statements): 

988 if s.get('Sid') in required: 

989 required.remove(s['Sid']) 

990 if not required: 

991 return False 

992 return True 

993 

994 

995@filters.register('bucket-notification') 

996class BucketNotificationFilter(ValueFilter): 

997 """Filter based on bucket notification configuration. 

998 

999 :example: 

1000 

1001 .. code-block:: yaml 

1002 

1003 policies: 

1004 - name: delete-incorrect-notification 

1005 resource: s3 

1006 filters: 

1007 - type: bucket-notification 

1008 kind: lambda 

1009 key: Id 

1010 value: "IncorrectLambda" 

1011 op: eq 

1012 actions: 

1013 - type: delete-bucket-notification 

1014 statement_ids: matched 

1015 """ 

1016 

1017 schema = type_schema( 

1018 'bucket-notification', 

1019 required=['kind'], 

1020 kind={'type': 'string', 'enum': ['lambda', 'sns', 'sqs']}, 

1021 rinherit=ValueFilter.schema) 

1022 schema_alias = False 

1023 annotation_key = 'c7n:MatchedNotificationConfigurationIds' 

1024 

1025 permissions = ('s3:GetBucketNotification',) 

1026 

1027 FIELDS = { 

1028 'lambda': 'LambdaFunctionConfigurations', 

1029 'sns': 'TopicConfigurations', 

1030 'sqs': 'QueueConfigurations' 

1031 } 

1032 

1033 def process(self, buckets, event=None): 

1034 return super(BucketNotificationFilter, self).process(buckets, event) 

1035 

1036 def __call__(self, bucket): 

1037 

1038 field = self.FIELDS[self.data['kind']] 

1039 found = False 

1040 for config in bucket.get('Notification', {}).get(field, []): 

1041 if self.match(config): 

1042 set_annotation( 

1043 bucket, 

1044 BucketNotificationFilter.annotation_key, 

1045 config['Id']) 

1046 found = True 

1047 return found 

1048 

1049 

1050@filters.register('bucket-logging') 

1051class BucketLoggingFilter(BucketFilterBase): 

1052 """Filter based on bucket logging configuration. 

1053 

1054 :example: 

1055 

1056 .. code-block:: yaml 

1057 

1058 policies: 

1059 - name: add-bucket-logging-if-missing 

1060 resource: s3 

1061 filters: 

1062 - type: bucket-logging 

1063 op: disabled 

1064 actions: 

1065 - type: toggle-logging 

1066 target_bucket: "{account_id}-{region}-s3-logs" 

1067 target_prefix: "{source_bucket_name}/" 

1068 

1069 policies: 

1070 - name: update-incorrect-or-missing-logging 

1071 resource: s3 

1072 filters: 

1073 - type: bucket-logging 

1074 op: not-equal 

1075 target_bucket: "{account_id}-{region}-s3-logs" 

1076 target_prefix: "{account}/{source_bucket_name}/" 

1077 actions: 

1078 - type: toggle-logging 

1079 target_bucket: "{account_id}-{region}-s3-logs" 

1080 target_prefix: "{account}/{source_bucket_name}/" 

1081 """ 

1082 

1083 schema = type_schema( 

1084 'bucket-logging', 

1085 op={'enum': ['enabled', 'disabled', 'equal', 'not-equal', 'eq', 'ne']}, 

1086 required=['op'], 

1087 target_bucket={'type': 'string'}, 

1088 target_prefix={'type': 'string'}) 

1089 schema_alias = False 

1090 account_name = None 

1091 

1092 permissions = ("s3:GetBucketLogging", "iam:ListAccountAliases") 

1093 

1094 def process(self, buckets, event=None): 

1095 return list(filter(None, map(self.process_bucket, buckets))) 

1096 

1097 def process_bucket(self, b): 

1098 if self.match_bucket(b): 

1099 return b 

1100 

1101 def match_bucket(self, b): 

1102 op = self.data.get('op') 

1103 

1104 logging = b.get('Logging', {}) 

1105 if op == 'disabled': 

1106 return logging == {} 

1107 elif op == 'enabled': 

1108 return logging != {} 

1109 

1110 if self.account_name is None: 

1111 session = local_session(self.manager.session_factory) 

1112 self.account_name = get_account_alias_from_sts(session) 

1113 

1114 variables = self.get_std_format_args(b) 

1115 variables.update({ 

1116 'account': self.account_name, 

1117 'source_bucket_name': b['Name'], 

1118 'source_bucket_region': get_region(b), 

1119 'target_bucket_name': self.data.get('target_bucket'), 

1120 'target_prefix': self.data.get('target_prefix'), 

1121 }) 

1122 data = format_string_values(self.data, **variables) 

1123 target_bucket = data.get('target_bucket') 

1124 target_prefix = data.get('target_prefix', b['Name'] + '/') 

1125 

1126 target_config = { 

1127 "TargetBucket": target_bucket, 

1128 "TargetPrefix": target_prefix 

1129 } if target_bucket else {} 

1130 

1131 if op in ('not-equal', 'ne'): 

1132 return logging != target_config 

1133 else: 

1134 return logging == target_config 

1135 

1136 

1137@actions.register('delete-bucket-notification') 

1138class DeleteBucketNotification(BucketActionBase): 

1139 """Action to delete S3 bucket notification configurations""" 

1140 

1141 schema = type_schema( 

1142 'delete-bucket-notification', 

1143 required=['statement_ids'], 

1144 statement_ids={'oneOf': [ 

1145 {'enum': ['matched']}, 

1146 {'type': 'array', 'items': {'type': 'string'}}]}) 

1147 

1148 permissions = ('s3:PutBucketNotification',) 

1149 

1150 def process_bucket(self, bucket): 

1151 n = bucket['Notification'] 

1152 if not n: 

1153 return 

1154 

1155 statement_ids = self.data.get('statement_ids') 

1156 if statement_ids == 'matched': 

1157 statement_ids = bucket.get(BucketNotificationFilter.annotation_key, ()) 

1158 if not statement_ids: 

1159 return 

1160 

1161 cfg = defaultdict(list) 

1162 

1163 for t in BucketNotificationFilter.FIELDS.values(): 

1164 for c in n.get(t, []): 

1165 if c['Id'] not in statement_ids: 

1166 cfg[t].append(c) 

1167 

1168 client = bucket_client(local_session(self.manager.session_factory), bucket) 

1169 client.put_bucket_notification_configuration( 

1170 Bucket=bucket['Name'], 

1171 NotificationConfiguration=cfg) 

1172 

1173 

1174@actions.register('no-op') 

1175class NoOp(BucketActionBase): 

1176 

1177 schema = type_schema('no-op') 

1178 permissions = ('s3:ListAllMyBuckets',) 

1179 

1180 def process(self, buckets): 

1181 return None 

1182 

1183 

1184@actions.register('set-statements') 

1185class SetPolicyStatement(BucketActionBase): 

1186 """Action to add or update policy statements to S3 buckets 

1187 

1188 :example: 

1189 

1190 .. code-block:: yaml 

1191 

1192 policies: 

1193 - name: force-s3-https 

1194 resource: s3 

1195 actions: 

1196 - type: set-statements 

1197 statements: 

1198 - Sid: "DenyHttp" 

1199 Effect: "Deny" 

1200 Action: "s3:GetObject" 

1201 Principal: 

1202 AWS: "*" 

1203 Resource: "arn:aws:s3:::{bucket_name}/*" 

1204 Condition: 

1205 Bool: 

1206 "aws:SecureTransport": false 

1207 """ 

1208 

1209 permissions = ('s3:PutBucketPolicy',) 

1210 

1211 schema = type_schema( 

1212 'set-statements', 

1213 **{ 

1214 'statements': { 

1215 'type': 'array', 

1216 'items': { 

1217 'type': 'object', 

1218 'properties': { 

1219 'Sid': {'type': 'string'}, 

1220 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']}, 

1221 'Principal': {'anyOf': [{'type': 'string'}, 

1222 {'type': 'object'}, {'type': 'array'}]}, 

1223 'NotPrincipal': {'anyOf': [{'type': 'object'}, {'type': 'array'}]}, 

1224 'Action': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1225 'NotAction': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1226 'Resource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1227 'NotResource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1228 'Condition': {'type': 'object'} 

1229 }, 

1230 'required': ['Sid', 'Effect'], 

1231 'oneOf': [ 

1232 {'required': ['Principal', 'Action', 'Resource']}, 

1233 {'required': ['NotPrincipal', 'Action', 'Resource']}, 

1234 {'required': ['Principal', 'NotAction', 'Resource']}, 

1235 {'required': ['NotPrincipal', 'NotAction', 'Resource']}, 

1236 {'required': ['Principal', 'Action', 'NotResource']}, 

1237 {'required': ['NotPrincipal', 'Action', 'NotResource']}, 

1238 {'required': ['Principal', 'NotAction', 'NotResource']}, 

1239 {'required': ['NotPrincipal', 'NotAction', 'NotResource']} 

1240 ] 

1241 } 

1242 } 

1243 } 

1244 ) 

1245 

1246 def process_bucket(self, bucket): 

1247 policy = bucket.get('Policy') or '{}' 

1248 

1249 target_statements = format_string_values( 

1250 copy.deepcopy({s['Sid']: s for s in self.data.get('statements', [])}), 

1251 **self.get_std_format_args(bucket)) 

1252 

1253 policy = json.loads(policy) 

1254 bucket_statements = policy.setdefault('Statement', []) 

1255 

1256 for s in bucket_statements: 

1257 if s.get('Sid') not in target_statements: 

1258 continue 

1259 if s == target_statements[s['Sid']]: 

1260 target_statements.pop(s['Sid']) 

1261 

1262 if not target_statements: 

1263 return 

1264 

1265 bucket_statements.extend(target_statements.values()) 

1266 policy = json.dumps(policy) 

1267 

1268 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1269 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=policy) 

1270 return {'Name': bucket['Name'], 'Policy': policy} 

1271 

1272 

1273@actions.register('remove-statements') 

1274class RemovePolicyStatement(RemovePolicyBase): 

1275 """Action to remove policy statements from S3 buckets 

1276 

1277 :example: 

1278 

1279 .. code-block:: yaml 

1280 

1281 policies: 

1282 - name: s3-remove-encrypt-put 

1283 resource: s3 

1284 filters: 

1285 - type: has-statement 

1286 statement_ids: 

1287 - RequireEncryptedPutObject 

1288 actions: 

1289 - type: remove-statements 

1290 statement_ids: 

1291 - RequiredEncryptedPutObject 

1292 """ 

1293 

1294 permissions = ("s3:PutBucketPolicy", "s3:DeleteBucketPolicy") 

1295 

1296 def process(self, buckets): 

1297 with self.executor_factory(max_workers=3) as w: 

1298 futures = {} 

1299 results = [] 

1300 for b in buckets: 

1301 futures[w.submit(self.process_bucket, b)] = b 

1302 for f in as_completed(futures): 

1303 if f.exception(): 

1304 b = futures[f] 

1305 self.log.error('error modifying bucket:%s\n%s', 

1306 b['Name'], f.exception()) 

1307 results += filter(None, [f.result()]) 

1308 return results 

1309 

1310 def process_bucket(self, bucket): 

1311 p = bucket.get('Policy') 

1312 if p is None: 

1313 return 

1314 

1315 p = json.loads(p) 

1316 

1317 statements, found = self.process_policy( 

1318 p, bucket, CrossAccountAccessFilter.annotation_key) 

1319 

1320 if not found: 

1321 return 

1322 

1323 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1324 

1325 if not statements: 

1326 s3.delete_bucket_policy(Bucket=bucket['Name']) 

1327 else: 

1328 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=json.dumps(p)) 

1329 return {'Name': bucket['Name'], 'State': 'PolicyRemoved', 'Statements': found} 

1330 

1331 

1332@actions.register('set-replication') 

1333class SetBucketReplicationConfig(BucketActionBase): 

1334 """Action to add or remove replication configuration statement from S3 buckets 

1335 

1336 :example: 

1337 

1338 .. code-block:: yaml 

1339 

1340 policies: 

1341 - name: s3-unapproved-account-replication 

1342 resource: s3 

1343 filters: 

1344 - type: value 

1345 key: Replication.ReplicationConfiguration.Rules[].Destination.Account 

1346 value: present 

1347 - type: value 

1348 key: Replication.ReplicationConfiguration.Rules[].Destination.Account 

1349 value_from: 

1350 url: 's3:///path/to/file.json' 

1351 format: json 

1352 expr: "approved_accounts.*" 

1353 op: ni 

1354 actions: 

1355 - type: set-replication 

1356 state: enable 

1357 """ 

1358 schema = type_schema( 

1359 'set-replication', 

1360 state={'type': 'string', 'enum': ['enable', 'disable', 'remove']}) 

1361 permissions = ("s3:GetReplicationConfiguration", "s3:PutReplicationConfiguration") 

1362 

1363 def process(self, buckets): 

1364 with self.executor_factory(max_workers=3) as w: 

1365 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1366 errors = [] 

1367 for future in as_completed(futures): 

1368 bucket = futures[future] 

1369 try: 

1370 future.result() 

1371 except ClientError as e: 

1372 errors.append("Message: %s Bucket: %s", e, bucket['Name']) 

1373 if errors: 

1374 raise Exception('\n'.join(map(str, errors))) 

1375 

1376 def process_bucket(self, bucket): 

1377 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1378 state = self.data.get('state') 

1379 if state is not None: 

1380 if state == 'remove': 

1381 s3.delete_bucket_replication(Bucket=bucket['Name']) 

1382 return {'Name': bucket['Name'], 'State': 'ReplicationConfigRemoved'} 

1383 if state in ('enable', 'disable'): 

1384 config = s3.get_bucket_replication(Bucket=bucket['Name']) 

1385 for rule in config['ReplicationConfiguration']['Rules']: 

1386 rule['Status'] = 'Enabled' if state == 'enable' else 'Disabled' 

1387 s3.put_bucket_replication( 

1388 Bucket=bucket['Name'], 

1389 ReplicationConfiguration=config['ReplicationConfiguration'] 

1390 ) 

1391 return {'Name': bucket['Name'], 'State': 'ReplicationConfigUpdated'} 

1392 

1393 

1394@filters.register('check-public-block') 

1395class FilterPublicBlock(Filter): 

1396 """Filter for s3 bucket public blocks 

1397 

1398 If no filter paramaters are provided it checks to see if any are unset or False. 

1399 

1400 If parameters are provided only the provided ones are checked. 

1401 

1402 :example: 

1403 

1404 .. code-block:: yaml 

1405 

1406 policies: 

1407 - name: CheckForPublicAclBlock-Off 

1408 resource: s3 

1409 region: us-east-1 

1410 filters: 

1411 - type: check-public-block 

1412 BlockPublicAcls: true 

1413 BlockPublicPolicy: true 

1414 """ 

1415 

1416 schema = type_schema( 

1417 'check-public-block', 

1418 BlockPublicAcls={'type': 'boolean'}, 

1419 IgnorePublicAcls={'type': 'boolean'}, 

1420 BlockPublicPolicy={'type': 'boolean'}, 

1421 RestrictPublicBuckets={'type': 'boolean'}) 

1422 permissions = ("s3:GetBucketPublicAccessBlock",) 

1423 keys = ( 

1424 'BlockPublicPolicy', 'BlockPublicAcls', 'IgnorePublicAcls', 'RestrictPublicBuckets') 

1425 annotation_key = 'c7n:PublicAccessBlock' 

1426 

1427 def process(self, buckets, event=None): 

1428 results = [] 

1429 with self.executor_factory(max_workers=2) as w: 

1430 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1431 for f in as_completed(futures): 

1432 if f.result(): 

1433 results.append(futures[f]) 

1434 return results 

1435 

1436 def process_bucket(self, bucket): 

1437 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1438 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys})) 

1439 if self.annotation_key not in bucket: 

1440 try: 

1441 config = s3.get_public_access_block( 

1442 Bucket=bucket['Name'])['PublicAccessBlockConfiguration'] 

1443 except ClientError as e: 

1444 error_code = e.response['Error']['Code'] 

1445 if error_code == 'NoSuchPublicAccessBlockConfiguration': 

1446 pass 

1447 elif error_code == 'AccessDenied': 

1448 # Follow the same logic as `assemble_bucket` - log and continue on access 

1449 # denied errors rather than halting a policy altogether 

1450 method = 'GetPublicAccessBlock' 

1451 log.warning( 

1452 "Bucket:%s unable to invoke method:%s error:%s ", 

1453 bucket['Name'], method, e.response['Error']['Message'] 

1454 ) 

1455 bucket.setdefault('c7n:DeniedMethods', []).append(method) 

1456 else: 

1457 raise 

1458 bucket[self.annotation_key] = config 

1459 return self.matches_filter(config) 

1460 

1461 def matches_filter(self, config): 

1462 key_set = [key for key in self.keys if key in self.data] 

1463 if key_set: 

1464 return all([self.data.get(key) is config[key] for key in key_set]) 

1465 else: 

1466 return not all(config.values()) 

1467 

1468 

1469@actions.register('set-public-block') 

1470class SetPublicBlock(BucketActionBase): 

1471 """Action to update Public Access blocks on S3 buckets 

1472 

1473 If no action parameters are provided all settings will be set to the `state`, which defaults 

1474 

1475 If action parameters are provided, those will be set and other extant values preserved. 

1476 

1477 :example: 

1478 

1479 .. code-block:: yaml 

1480 

1481 policies: 

1482 - name: s3-public-block-enable-all 

1483 resource: s3 

1484 filters: 

1485 - type: check-public-block 

1486 actions: 

1487 - type: set-public-block 

1488 

1489 policies: 

1490 - name: s3-public-block-disable-all 

1491 resource: s3 

1492 filters: 

1493 - type: check-public-block 

1494 actions: 

1495 - type: set-public-block 

1496 state: false 

1497 

1498 policies: 

1499 - name: s3-public-block-enable-some 

1500 resource: s3 

1501 filters: 

1502 - or: 

1503 - type: check-public-block 

1504 BlockPublicAcls: false 

1505 - type: check-public-block 

1506 BlockPublicPolicy: false 

1507 actions: 

1508 - type: set-public-block 

1509 BlockPublicAcls: true 

1510 BlockPublicPolicy: true 

1511 

1512 """ 

1513 

1514 schema = type_schema( 

1515 'set-public-block', 

1516 state={'type': 'boolean', 'default': True}, 

1517 BlockPublicAcls={'type': 'boolean'}, 

1518 IgnorePublicAcls={'type': 'boolean'}, 

1519 BlockPublicPolicy={'type': 'boolean'}, 

1520 RestrictPublicBuckets={'type': 'boolean'}) 

1521 permissions = ("s3:GetBucketPublicAccessBlock", "s3:PutBucketPublicAccessBlock") 

1522 keys = FilterPublicBlock.keys 

1523 annotation_key = FilterPublicBlock.annotation_key 

1524 

1525 def process(self, buckets): 

1526 with self.executor_factory(max_workers=3) as w: 

1527 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1528 for future in as_completed(futures): 

1529 future.result() 

1530 

1531 def process_bucket(self, bucket): 

1532 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1533 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys})) 

1534 if self.annotation_key not in bucket: 

1535 try: 

1536 config = s3.get_public_access_block( 

1537 Bucket=bucket['Name'])['PublicAccessBlockConfiguration'] 

1538 except ClientError as e: 

1539 if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration': 

1540 raise 

1541 

1542 key_set = [key for key in self.keys if key in self.data] 

1543 if key_set: 

1544 for key in key_set: 

1545 config[key] = self.data.get(key) 

1546 else: 

1547 for key in self.keys: 

1548 config[key] = self.data.get('state', True) 

1549 s3.put_public_access_block( 

1550 Bucket=bucket['Name'], PublicAccessBlockConfiguration=config) 

1551 

1552 

1553@actions.register('toggle-versioning') 

1554class ToggleVersioning(BucketActionBase): 

1555 """Action to enable/suspend versioning on a S3 bucket 

1556 

1557 Note versioning can never be disabled only suspended. 

1558 

1559 :example: 

1560 

1561 .. code-block:: yaml 

1562 

1563 policies: 

1564 - name: s3-enable-versioning 

1565 resource: s3 

1566 filters: 

1567 - or: 

1568 - type: value 

1569 key: Versioning.Status 

1570 value: Suspended 

1571 - type: value 

1572 key: Versioning.Status 

1573 value: absent 

1574 actions: 

1575 - type: toggle-versioning 

1576 enabled: true 

1577 """ 

1578 

1579 schema = type_schema( 

1580 'toggle-versioning', 

1581 enabled={'type': 'boolean'}) 

1582 permissions = ("s3:PutBucketVersioning",) 

1583 

1584 def process_versioning(self, resource, state): 

1585 client = bucket_client( 

1586 local_session(self.manager.session_factory), resource) 

1587 try: 

1588 client.put_bucket_versioning( 

1589 Bucket=resource['Name'], 

1590 VersioningConfiguration={ 

1591 'Status': state}) 

1592 except ClientError as e: 

1593 if e.response['Error']['Code'] != 'AccessDenied': 

1594 log.error( 

1595 "Unable to put bucket versioning on bucket %s: %s" % (resource['Name'], e)) 

1596 raise 

1597 log.warning( 

1598 "Access Denied Bucket:%s while put bucket versioning" % resource['Name']) 

1599 

1600 # mfa delete enablement looks like it needs the serial and a current token. 

1601 def process(self, resources): 

1602 enabled = self.data.get('enabled', True) 

1603 for r in resources: 

1604 if 'Versioning' not in r or not r['Versioning']: 

1605 r['Versioning'] = {'Status': 'Suspended'} 

1606 if enabled and ( 

1607 r['Versioning']['Status'] == 'Suspended'): 

1608 self.process_versioning(r, 'Enabled') 

1609 if not enabled and r['Versioning']['Status'] == 'Enabled': 

1610 self.process_versioning(r, 'Suspended') 

1611 

1612 

1613@actions.register('toggle-logging') 

1614class ToggleLogging(BucketActionBase): 

1615 """Action to enable/disable logging on a S3 bucket. 

1616 

1617 Target bucket ACL must allow for WRITE and READ_ACP Permissions 

1618 Not specifying a target_prefix will default to the current bucket name. 

1619 https://docs.aws.amazon.com/AmazonS3/latest/dev/enable-logging-programming.html 

1620 

1621 :example: 

1622 

1623 .. code-block:: yaml 

1624 

1625 policies: 

1626 - name: s3-enable-logging 

1627 resource: s3 

1628 filters: 

1629 - "tag:Testing": present 

1630 actions: 

1631 - type: toggle-logging 

1632 target_bucket: log-bucket 

1633 target_prefix: logs123/ 

1634 

1635 policies: 

1636 - name: s3-force-standard-logging 

1637 resource: s3 

1638 filters: 

1639 - type: bucket-logging 

1640 op: not-equal 

1641 target_bucket: "{account_id}-{region}-s3-logs" 

1642 target_prefix: "{account}/{source_bucket_name}/" 

1643 actions: 

1644 - type: toggle-logging 

1645 target_bucket: "{account_id}-{region}-s3-logs" 

1646 target_prefix: "{account}/{source_bucket_name}/" 

1647 """ 

1648 schema = type_schema( 

1649 'toggle-logging', 

1650 enabled={'type': 'boolean'}, 

1651 target_bucket={'type': 'string'}, 

1652 target_prefix={'type': 'string'}) 

1653 

1654 permissions = ("s3:PutBucketLogging", "iam:ListAccountAliases") 

1655 

1656 def validate(self): 

1657 if self.data.get('enabled', True): 

1658 if not self.data.get('target_bucket'): 

1659 raise PolicyValidationError( 

1660 "target_bucket must be specified on %s" % ( 

1661 self.manager.data,)) 

1662 return self 

1663 

1664 def process(self, resources): 

1665 session = local_session(self.manager.session_factory) 

1666 kwargs = { 

1667 "enabled": self.data.get('enabled', True), 

1668 "session": session, 

1669 "account_name": get_account_alias_from_sts(session), 

1670 } 

1671 

1672 return self._process_with_futures(resources, **kwargs) 

1673 

1674 def process_bucket(self, r, enabled=None, session=None, account_name=None): 

1675 client = bucket_client(session, r) 

1676 is_logging = bool(r.get('Logging')) 

1677 

1678 if enabled: 

1679 variables = self.get_std_format_args(r) 

1680 variables.update({ 

1681 'account': account_name, 

1682 'source_bucket_name': r['Name'], 

1683 'source_bucket_region': get_region(r), 

1684 'target_bucket_name': self.data.get('target_bucket'), 

1685 'target_prefix': self.data.get('target_prefix'), 

1686 }) 

1687 data = format_string_values(self.data, **variables) 

1688 config = { 

1689 'TargetBucket': data.get('target_bucket'), 

1690 'TargetPrefix': data.get('target_prefix', r['Name'] + '/') 

1691 } 

1692 if not is_logging or r.get('Logging') != config: 

1693 client.put_bucket_logging( 

1694 Bucket=r['Name'], 

1695 BucketLoggingStatus={'LoggingEnabled': config} 

1696 ) 

1697 r['Logging'] = config 

1698 

1699 elif not enabled and is_logging: 

1700 client.put_bucket_logging( 

1701 Bucket=r['Name'], BucketLoggingStatus={}) 

1702 r['Logging'] = {} 

1703 

1704 

1705@actions.register('attach-encrypt') 

1706class AttachLambdaEncrypt(BucketActionBase): 

1707 """Action attaches lambda encryption policy to S3 bucket 

1708 supports attachment via lambda bucket notification or sns notification 

1709 to invoke lambda. a special topic value of `default` will utilize an 

1710 extant notification or create one matching the bucket name. 

1711 

1712 :example: 

1713 

1714 

1715 .. code-block:: yaml 

1716 

1717 

1718 policies: 

1719 - name: attach-lambda-encrypt 

1720 resource: s3 

1721 filters: 

1722 - type: missing-policy-statement 

1723 actions: 

1724 - type: attach-encrypt 

1725 role: arn:aws:iam::123456789012:role/my-role 

1726 

1727 """ 

1728 schema = type_schema( 

1729 'attach-encrypt', 

1730 role={'type': 'string'}, 

1731 tags={'type': 'object'}, 

1732 topic={'type': 'string'}) 

1733 

1734 permissions = ( 

1735 "s3:PutBucketNotification", "s3:GetBucketNotification", 

1736 # lambda manager uses quite a few perms to provision lambdas 

1737 # and event sources, hard to disamgibuate punt for now. 

1738 "lambda:*", 

1739 ) 

1740 

1741 def __init__(self, data=None, manager=None): 

1742 self.data = data or {} 

1743 self.manager = manager 

1744 

1745 def validate(self): 

1746 if (not getattr(self.manager.config, 'dryrun', True) and 

1747 not self.data.get('role', self.manager.config.assume_role)): 

1748 raise PolicyValidationError( 

1749 "attach-encrypt: role must be specified either " 

1750 "via assume or in config on %s" % (self.manager.data,)) 

1751 

1752 return self 

1753 

1754 def process(self, buckets): 

1755 from c7n.mu import LambdaManager 

1756 from c7n.ufuncs.s3crypt import get_function 

1757 

1758 account_id = self.manager.config.account_id 

1759 topic_arn = self.data.get('topic') 

1760 

1761 func = get_function( 

1762 None, self.data.get('role', self.manager.config.assume_role), 

1763 account_id=account_id, tags=self.data.get('tags')) 

1764 

1765 regions = {get_region(b) for b in buckets} 

1766 

1767 # session managers by region 

1768 region_sessions = {} 

1769 for r in regions: 

1770 region_sessions[r] = functools.partial( 

1771 self.manager.session_factory, region=r) 

1772 

1773 # Publish function to all of our buckets regions 

1774 region_funcs = {} 

1775 

1776 for r in regions: 

1777 lambda_mgr = LambdaManager(region_sessions[r]) 

1778 lambda_mgr.publish(func) 

1779 region_funcs[r] = func 

1780 

1781 with self.executor_factory(max_workers=3) as w: 

1782 results = [] 

1783 futures = [] 

1784 for b in buckets: 

1785 region = get_region(b) 

1786 futures.append( 

1787 w.submit( 

1788 self.process_bucket, 

1789 region_funcs[region], 

1790 b, 

1791 topic_arn, 

1792 account_id, 

1793 region_sessions[region] 

1794 )) 

1795 for f in as_completed(futures): 

1796 if f.exception(): 

1797 log.exception( 

1798 "Error attaching lambda-encrypt %s" % (f.exception())) 

1799 results.append(f.result()) 

1800 return list(filter(None, results)) 

1801 

1802 def process_bucket(self, func, bucket, topic, account_id, session_factory): 

1803 from c7n.mu import BucketSNSNotification, BucketLambdaNotification 

1804 if topic: 

1805 topic = None if topic == 'default' else topic 

1806 source = BucketSNSNotification(session_factory, bucket, topic) 

1807 else: 

1808 source = BucketLambdaNotification( 

1809 {'account_s3': account_id}, session_factory, bucket) 

1810 return source.add(func, None) 

1811 

1812 

1813@actions.register('encryption-policy') 

1814class EncryptionRequiredPolicy(BucketActionBase): 

1815 """Action to apply an encryption policy to S3 buckets 

1816 

1817 

1818 :example: 

1819 

1820 .. code-block:: yaml 

1821 

1822 policies: 

1823 - name: s3-enforce-encryption 

1824 resource: s3 

1825 mode: 

1826 type: cloudtrail 

1827 events: 

1828 - CreateBucket 

1829 actions: 

1830 - encryption-policy 

1831 """ 

1832 

1833 permissions = ("s3:GetBucketPolicy", "s3:PutBucketPolicy") 

1834 schema = type_schema('encryption-policy') 

1835 

1836 def __init__(self, data=None, manager=None): 

1837 self.data = data or {} 

1838 self.manager = manager 

1839 

1840 def process(self, buckets): 

1841 with self.executor_factory(max_workers=3) as w: 

1842 results = w.map(self.process_bucket, buckets) 

1843 results = list(filter(None, list(results))) 

1844 return results 

1845 

1846 def process_bucket(self, b): 

1847 p = b['Policy'] 

1848 if p is None: 

1849 log.info("No policy found, creating new") 

1850 p = {'Version': "2012-10-17", "Statement": []} 

1851 else: 

1852 p = json.loads(p) 

1853 

1854 encryption_sid = "RequiredEncryptedPutObject" 

1855 encryption_statement = { 

1856 'Sid': encryption_sid, 

1857 'Effect': 'Deny', 

1858 'Principal': '*', 

1859 'Action': 's3:PutObject', 

1860 "Resource": "arn:aws:s3:::%s/*" % b['Name'], 

1861 "Condition": { 

1862 # AWS Managed Keys or KMS keys, note policy language 

1863 # does not support custom kms (todo add issue) 

1864 "StringNotEquals": { 

1865 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}} 

1866 

1867 statements = p.get('Statement', []) 

1868 for s in list(statements): 

1869 if s.get('Sid', '') == encryption_sid: 

1870 log.debug("Bucket:%s Found extant encrypt policy", b['Name']) 

1871 if s != encryption_statement: 

1872 log.info( 

1873 "Bucket:%s updating extant encrypt policy", b['Name']) 

1874 statements.remove(s) 

1875 else: 

1876 return 

1877 

1878 session = self.manager.session_factory() 

1879 s3 = bucket_client(session, b) 

1880 statements.append(encryption_statement) 

1881 p['Statement'] = statements 

1882 log.info('Bucket:%s attached encryption policy' % b['Name']) 

1883 

1884 try: 

1885 s3.put_bucket_policy( 

1886 Bucket=b['Name'], 

1887 Policy=json.dumps(p)) 

1888 except ClientError as e: 

1889 if e.response['Error']['Code'] == 'NoSuchBucket': 

1890 return 

1891 self.log.exception( 

1892 "Error on bucket:%s putting policy\n%s error:%s", 

1893 b['Name'], 

1894 json.dumps(statements, indent=2), e) 

1895 raise 

1896 return {'Name': b['Name'], 'State': 'PolicyAttached'} 

1897 

1898 

1899class BucketScanLog: 

1900 """Offload remediated key ids to a disk file in batches 

1901 

1902 A bucket keyspace is effectively infinite, we need to store partial 

1903 results out of memory, this class provides for a json log on disk 

1904 with partial write support. 

1905 

1906 json output format: 

1907 - [list_of_serialized_keys], 

1908 - [] # Empty list of keys at end when we close the buffer 

1909 

1910 """ 

1911 

1912 def __init__(self, log_dir, name): 

1913 self.log_dir = log_dir 

1914 self.name = name 

1915 self.fh = None 

1916 self.count = 0 

1917 

1918 @property 

1919 def path(self): 

1920 return os.path.join(self.log_dir, "%s.json" % self.name) 

1921 

1922 def __enter__(self): 

1923 # Don't require output directories 

1924 if self.log_dir is None: 

1925 return 

1926 

1927 self.fh = open(self.path, 'w') 

1928 self.fh.write("[\n") 

1929 return self 

1930 

1931 def __exit__(self, exc_type=None, exc_value=None, exc_frame=None): 

1932 if self.fh is None: 

1933 return 

1934 # we need an empty marker list at end to avoid trailing commas 

1935 self.fh.write("[]") 

1936 # and close the surrounding list 

1937 self.fh.write("\n]") 

1938 self.fh.close() 

1939 if not self.count: 

1940 os.remove(self.fh.name) 

1941 self.fh = None 

1942 return False 

1943 

1944 def add(self, keys): 

1945 self.count += len(keys) 

1946 if self.fh is None: 

1947 return 

1948 self.fh.write(dumps(keys)) 

1949 self.fh.write(",\n") 

1950 

1951 

1952class ScanBucket(BucketActionBase): 

1953 

1954 permissions = ("s3:ListBucket",) 

1955 

1956 bucket_ops = { 

1957 'standard': { 

1958 'iterator': 'list_objects', 

1959 'contents_key': ['Contents'], 

1960 'key_processor': 'process_key' 

1961 }, 

1962 'versioned': { 

1963 'iterator': 'list_object_versions', 

1964 'contents_key': ['Versions'], 

1965 'key_processor': 'process_version' 

1966 } 

1967 } 

1968 

1969 def __init__(self, data, manager=None): 

1970 super(ScanBucket, self).__init__(data, manager) 

1971 self.denied_buckets = set() 

1972 

1973 def get_bucket_style(self, b): 

1974 return ( 

1975 b.get('Versioning', {'Status': ''}).get('Status') in ( 

1976 'Enabled', 'Suspended') and 'versioned' or 'standard') 

1977 

1978 def get_bucket_op(self, b, op_name): 

1979 bucket_style = self.get_bucket_style(b) 

1980 op = self.bucket_ops[bucket_style][op_name] 

1981 if op_name == 'key_processor': 

1982 return getattr(self, op) 

1983 return op 

1984 

1985 def get_keys(self, b, key_set): 

1986 content_keys = self.get_bucket_op(b, 'contents_key') 

1987 keys = [] 

1988 for ck in content_keys: 

1989 keys.extend(key_set.get(ck, [])) 

1990 return keys 

1991 

1992 def process(self, buckets): 

1993 results = self._process_with_futures(self.process_bucket, buckets) 

1994 self.write_denied_buckets_file() 

1995 return results 

1996 

1997 def _process_with_futures(self, helper, buckets, max_workers=3): 

1998 results = [] 

1999 with self.executor_factory(max_workers) as w: 

2000 futures = {} 

2001 for b in buckets: 

2002 futures[w.submit(helper, b)] = b 

2003 for f in as_completed(futures): 

2004 if f.exception(): 

2005 b = futures[f] 

2006 self.log.error( 

2007 "Error on bucket:%s region:%s policy:%s error: %s", 

2008 b['Name'], b.get('Location', 'unknown'), 

2009 self.manager.data.get('name'), f.exception()) 

2010 self.denied_buckets.add(b['Name']) 

2011 continue 

2012 result = f.result() 

2013 if result: 

2014 results.append(result) 

2015 return results 

2016 

2017 def write_denied_buckets_file(self): 

2018 if (self.denied_buckets and 

2019 self.manager.ctx.log_dir and 

2020 not isinstance(self.manager.ctx.output, NullBlobOutput)): 

2021 with open( 

2022 os.path.join( 

2023 self.manager.ctx.log_dir, 'denied.json'), 'w') as fh: 

2024 json.dump(list(self.denied_buckets), fh, indent=2) 

2025 self.denied_buckets = set() 

2026 

2027 def process_bucket(self, b): 

2028 log.info( 

2029 "Scanning bucket:%s visitor:%s style:%s" % ( 

2030 b['Name'], self.__class__.__name__, self.get_bucket_style(b))) 

2031 

2032 s = self.manager.session_factory() 

2033 s3 = bucket_client(s, b) 

2034 

2035 # The bulk of _process_bucket function executes inline in 

2036 # calling thread/worker context, neither paginator nor 

2037 # bucketscan log should be used across worker boundary. 

2038 p = s3.get_paginator( 

2039 self.get_bucket_op(b, 'iterator')).paginate(Bucket=b['Name']) 

2040 

2041 with BucketScanLog(self.manager.ctx.log_dir, b['Name']) as key_log: 

2042 with self.executor_factory(max_workers=10) as w: 

2043 try: 

2044 return self._process_bucket(b, p, key_log, w) 

2045 except ClientError as e: 

2046 if e.response['Error']['Code'] == 'NoSuchBucket': 

2047 log.warning( 

2048 "Bucket:%s removed while scanning" % b['Name']) 

2049 return 

2050 if e.response['Error']['Code'] == 'AccessDenied': 

2051 log.warning( 

2052 "Access Denied Bucket:%s while scanning" % b['Name']) 

2053 self.denied_buckets.add(b['Name']) 

2054 return 

2055 log.exception( 

2056 "Error processing bucket:%s paginator:%s" % ( 

2057 b['Name'], p)) 

2058 

2059 __call__ = process_bucket 

2060 

2061 def _process_bucket(self, b, p, key_log, w): 

2062 count = 0 

2063 

2064 for key_set in p: 

2065 keys = self.get_keys(b, key_set) 

2066 count += len(keys) 

2067 futures = [] 

2068 

2069 for batch in chunks(keys, size=100): 

2070 if not batch: 

2071 continue 

2072 futures.append(w.submit(self.process_chunk, batch, b)) 

2073 

2074 for f in as_completed(futures): 

2075 if f.exception(): 

2076 log.exception("Exception Processing bucket:%s key batch %s" % ( 

2077 b['Name'], f.exception())) 

2078 continue 

2079 r = f.result() 

2080 if r: 

2081 key_log.add(r) 

2082 

2083 # Log completion at info level, progress at debug level 

2084 if key_set['IsTruncated']: 

2085 log.debug('Scan progress bucket:%s keys:%d remediated:%d ...', 

2086 b['Name'], count, key_log.count) 

2087 else: 

2088 log.info('Scan Complete bucket:%s keys:%d remediated:%d', 

2089 b['Name'], count, key_log.count) 

2090 

2091 b['KeyScanCount'] = count 

2092 b['KeyRemediated'] = key_log.count 

2093 return { 

2094 'Bucket': b['Name'], 'Remediated': key_log.count, 'Count': count} 

2095 

2096 def process_chunk(self, batch, bucket): 

2097 raise NotImplementedError() 

2098 

2099 def process_key(self, s3, key, bucket_name, info=None): 

2100 raise NotImplementedError() 

2101 

2102 def process_version(self, s3, bucket, key): 

2103 raise NotImplementedError() 

2104 

2105 

2106@actions.register('encrypt-keys') 

2107class EncryptExtantKeys(ScanBucket): 

2108 """Action to encrypt unencrypted S3 objects 

2109 

2110 :example: 

2111 

2112 .. code-block:: yaml 

2113 

2114 policies: 

2115 - name: s3-encrypt-objects 

2116 resource: s3 

2117 actions: 

2118 - type: encrypt-keys 

2119 crypto: aws:kms 

2120 key-id: 9c3983be-c6cf-11e6-9d9d-cec0c932ce01 

2121 """ 

2122 

2123 permissions = ( 

2124 "s3:GetObject", 

2125 "s3:PutObject", 

2126 "s3:DeleteObjectVersion", 

2127 "s3:RestoreObject", 

2128 ) + ScanBucket.permissions 

2129 

2130 schema = { 

2131 'type': 'object', 

2132 'additionalProperties': False, 

2133 'properties': { 

2134 'type': {'enum': ['encrypt-keys']}, 

2135 'report-only': {'type': 'boolean'}, 

2136 'glacier': {'type': 'boolean'}, 

2137 'large': {'type': 'boolean'}, 

2138 'crypto': {'enum': ['AES256', 'aws:kms']}, 

2139 'key-id': {'type': 'string'} 

2140 }, 

2141 'dependencies': { 

2142 'key-id': { 

2143 'properties': { 

2144 'crypto': {'pattern': 'aws:kms'} 

2145 }, 

2146 'required': ['crypto'] 

2147 } 

2148 } 

2149 } 

2150 

2151 metrics = [ 

2152 ('Total Keys', {'Scope': 'Account'}), 

2153 ('Unencrypted', {'Scope': 'Account'})] 

2154 

2155 def __init__(self, data, manager=None): 

2156 super(EncryptExtantKeys, self).__init__(data, manager) 

2157 self.kms_id = self.data.get('key-id') 

2158 

2159 def get_permissions(self): 

2160 perms = ("s3:GetObject", "s3:GetObjectVersion") 

2161 if self.data.get('report-only'): 

2162 perms += ('s3:DeleteObject', 's3:DeleteObjectVersion', 

2163 's3:PutObject', 

2164 's3:AbortMultipartUpload', 

2165 's3:ListBucket', 

2166 's3:ListBucketVersions') 

2167 return perms 

2168 

2169 def process(self, buckets): 

2170 

2171 t = time.time() 

2172 results = super(EncryptExtantKeys, self).process(buckets) 

2173 run_time = time.time() - t 

2174 remediated_count = object_count = 0 

2175 

2176 for r in results: 

2177 object_count += r['Count'] 

2178 remediated_count += r['Remediated'] 

2179 self.manager.ctx.metrics.put_metric( 

2180 "Unencrypted", r['Remediated'], "Count", Scope=r['Bucket'], 

2181 buffer=True) 

2182 

2183 self.manager.ctx.metrics.put_metric( 

2184 "Unencrypted", remediated_count, "Count", Scope="Account", 

2185 buffer=True 

2186 ) 

2187 self.manager.ctx.metrics.put_metric( 

2188 "Total Keys", object_count, "Count", Scope="Account", 

2189 buffer=True 

2190 ) 

2191 self.manager.ctx.metrics.flush() 

2192 

2193 log.info( 

2194 ("EncryptExtant Complete keys:%d " 

2195 "remediated:%d rate:%0.2f/s time:%0.2fs"), 

2196 object_count, 

2197 remediated_count, 

2198 float(object_count) / run_time if run_time else 0, 

2199 run_time) 

2200 return results 

2201 

2202 def process_chunk(self, batch, bucket): 

2203 crypto_method = self.data.get('crypto', 'AES256') 

2204 s3 = bucket_client( 

2205 local_session(self.manager.session_factory), bucket, 

2206 kms=(crypto_method == 'aws:kms')) 

2207 b = bucket['Name'] 

2208 results = [] 

2209 key_processor = self.get_bucket_op(bucket, 'key_processor') 

2210 for key in batch: 

2211 r = key_processor(s3, key, b) 

2212 if r: 

2213 results.append(r) 

2214 return results 

2215 

2216 def process_key(self, s3, key, bucket_name, info=None): 

2217 k = key['Key'] 

2218 if info is None: 

2219 info = s3.head_object(Bucket=bucket_name, Key=k) 

2220 

2221 # If the data is already encrypted with AES256 and this request is also 

2222 # for AES256 then we don't need to do anything 

2223 if info.get('ServerSideEncryption') == 'AES256' and not self.kms_id: 

2224 return False 

2225 

2226 if info.get('ServerSideEncryption') == 'aws:kms': 

2227 # If we're not looking for a specific key any key will do. 

2228 if not self.kms_id: 

2229 return False 

2230 # If we're configured to use a specific key and the key matches 

2231 # note this is not a strict equality match. 

2232 if self.kms_id in info.get('SSEKMSKeyId', ''): 

2233 return False 

2234 

2235 if self.data.get('report-only'): 

2236 return k 

2237 

2238 storage_class = info.get('StorageClass', 'STANDARD') 

2239 

2240 if storage_class == 'GLACIER': 

2241 if not self.data.get('glacier'): 

2242 return False 

2243 if 'Restore' not in info: 

2244 # This takes multiple hours, we let the next c7n 

2245 # run take care of followups. 

2246 s3.restore_object( 

2247 Bucket=bucket_name, 

2248 Key=k, 

2249 RestoreRequest={'Days': 30}) 

2250 return False 

2251 elif not restore_complete(info['Restore']): 

2252 return False 

2253 

2254 storage_class = 'STANDARD' 

2255 

2256 crypto_method = self.data.get('crypto', 'AES256') 

2257 key_id = self.data.get('key-id') 

2258 # Note on copy we lose individual object acl grants 

2259 params = {'Bucket': bucket_name, 

2260 'Key': k, 

2261 'CopySource': "/%s/%s" % (bucket_name, k), 

2262 'MetadataDirective': 'COPY', 

2263 'StorageClass': storage_class, 

2264 'ServerSideEncryption': crypto_method} 

2265 

2266 if key_id and crypto_method == 'aws:kms': 

2267 params['SSEKMSKeyId'] = key_id 

2268 

2269 if info['ContentLength'] > MAX_COPY_SIZE and self.data.get( 

2270 'large', True): 

2271 return self.process_large_file(s3, bucket_name, key, info, params) 

2272 

2273 s3.copy_object(**params) 

2274 return k 

2275 

2276 def process_version(self, s3, key, bucket_name): 

2277 info = s3.head_object( 

2278 Bucket=bucket_name, 

2279 Key=key['Key'], 

2280 VersionId=key['VersionId']) 

2281 

2282 if 'ServerSideEncryption' in info: 

2283 return False 

2284 

2285 if self.data.get('report-only'): 

2286 return key['Key'], key['VersionId'] 

2287 

2288 if key['IsLatest']: 

2289 r = self.process_key(s3, key, bucket_name, info) 

2290 # Glacier request processing, wait till we have the restored object 

2291 if not r: 

2292 return r 

2293 s3.delete_object( 

2294 Bucket=bucket_name, 

2295 Key=key['Key'], 

2296 VersionId=key['VersionId']) 

2297 return key['Key'], key['VersionId'] 

2298 

2299 def process_large_file(self, s3, bucket_name, key, info, params): 

2300 """For objects over 5gb, use multipart upload to copy""" 

2301 part_size = MAX_COPY_SIZE - (1024 ** 2) 

2302 num_parts = int(math.ceil(info['ContentLength'] / part_size)) 

2303 source = params.pop('CopySource') 

2304 

2305 params.pop('MetadataDirective') 

2306 if 'Metadata' in info: 

2307 params['Metadata'] = info['Metadata'] 

2308 

2309 upload_id = s3.create_multipart_upload(**params)['UploadId'] 

2310 

2311 params = {'Bucket': bucket_name, 

2312 'Key': key['Key'], 

2313 'UploadId': upload_id, 

2314 'CopySource': source, 

2315 'CopySourceIfMatch': info['ETag']} 

2316 

2317 def upload_part(part_num): 

2318 part_params = dict(params) 

2319 part_params['CopySourceRange'] = "bytes=%d-%d" % ( 

2320 part_size * (part_num - 1), 

2321 min(part_size * part_num - 1, info['ContentLength'] - 1)) 

2322 part_params['PartNumber'] = part_num 

2323 response = s3.upload_part_copy(**part_params) 

2324 return {'ETag': response['CopyPartResult']['ETag'], 

2325 'PartNumber': part_num} 

2326 

2327 try: 

2328 with self.executor_factory(max_workers=2) as w: 

2329 parts = list(w.map(upload_part, range(1, num_parts + 1))) 

2330 except Exception: 

2331 log.warning( 

2332 "Error during large key copy bucket: %s key: %s, " 

2333 "aborting upload", bucket_name, key, exc_info=True) 

2334 s3.abort_multipart_upload( 

2335 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id) 

2336 raise 

2337 s3.complete_multipart_upload( 

2338 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id, 

2339 MultipartUpload={'Parts': parts}) 

2340 return key['Key'] 

2341 

2342 

2343def restore_complete(restore): 

2344 if ',' in restore: 

2345 ongoing, _ = restore.split(',', 1) 

2346 else: 

2347 ongoing = restore 

2348 return 'false' in ongoing 

2349 

2350 

2351@filters.register('is-log-target') 

2352class LogTarget(Filter): 

2353 """Filter and return buckets are log destinations. 

2354 

2355 Not suitable for use in lambda on large accounts, This is a api 

2356 heavy process to detect scan all possible log sources. 

2357 

2358 Sources: 

2359 - elb (Access Log) 

2360 - s3 (Access Log) 

2361 - cfn (Template writes) 

2362 - cloudtrail 

2363 

2364 :example: 

2365 

2366 .. code-block:: yaml 

2367 

2368 policies: 

2369 - name: s3-log-bucket 

2370 resource: s3 

2371 filters: 

2372 - type: is-log-target 

2373 """ 

2374 

2375 schema = type_schema( 

2376 'is-log-target', 

2377 services={'type': 'array', 'items': {'enum': [ 

2378 's3', 'elb', 'cloudtrail']}}, 

2379 self={'type': 'boolean'}, 

2380 value={'type': 'boolean'}) 

2381 

2382 def get_permissions(self): 

2383 perms = self.manager.get_resource_manager('elb').get_permissions() 

2384 perms += ('elasticloadbalancing:DescribeLoadBalancerAttributes',) 

2385 return perms 

2386 

2387 def process(self, buckets, event=None): 

2388 log_buckets = set() 

2389 count = 0 

2390 

2391 services = self.data.get('services', ['elb', 's3', 'cloudtrail']) 

2392 self_log = self.data.get('self', False) 

2393 

2394 if 'elb' in services and not self_log: 

2395 for bucket, _ in self.get_elb_bucket_locations(): 

2396 log_buckets.add(bucket) 

2397 count += 1 

2398 self.log.debug("Found %d elb log targets" % count) 

2399 

2400 if 's3' in services: 

2401 count = 0 

2402 for bucket, _ in self.get_s3_bucket_locations(buckets, self_log): 

2403 count += 1 

2404 log_buckets.add(bucket) 

2405 self.log.debug('Found %d s3 log targets' % count) 

2406 

2407 if 'cloudtrail' in services and not self_log: 

2408 for bucket, _ in self.get_cloud_trail_locations(buckets): 

2409 log_buckets.add(bucket) 

2410 

2411 self.log.info("Found %d log targets for %d buckets" % ( 

2412 len(log_buckets), len(buckets))) 

2413 if self.data.get('value', True): 

2414 return [b for b in buckets if b['Name'] in log_buckets] 

2415 else: 

2416 return [b for b in buckets if b['Name'] not in log_buckets] 

2417 

2418 @staticmethod 

2419 def get_s3_bucket_locations(buckets, self_log=False): 

2420 """return (bucket_name, prefix) for all s3 logging targets""" 

2421 for b in buckets: 

2422 if b.get('Logging'): 

2423 if self_log: 

2424 if b['Name'] != b['Logging']['TargetBucket']: 

2425 continue 

2426 yield (b['Logging']['TargetBucket'], 

2427 b['Logging']['TargetPrefix']) 

2428 if not self_log and b['Name'].startswith('cf-templates-'): 

2429 yield (b['Name'], '') 

2430 

2431 def get_cloud_trail_locations(self, buckets): 

2432 session = local_session(self.manager.session_factory) 

2433 client = session.client('cloudtrail') 

2434 names = {b['Name'] for b in buckets} 

2435 for t in client.describe_trails().get('trailList', ()): 

2436 if t.get('S3BucketName') in names: 

2437 yield (t['S3BucketName'], t.get('S3KeyPrefix', '')) 

2438 

2439 def get_elb_bucket_locations(self): 

2440 elbs = self.manager.get_resource_manager('elb').resources() 

2441 get_elb_attrs = functools.partial( 

2442 _query_elb_attrs, self.manager.session_factory) 

2443 

2444 with self.executor_factory(max_workers=2) as w: 

2445 futures = [] 

2446 for elb_set in chunks(elbs, 100): 

2447 futures.append(w.submit(get_elb_attrs, elb_set)) 

2448 for f in as_completed(futures): 

2449 if f.exception(): 

2450 log.error("Error while scanning elb log targets: %s" % ( 

2451 f.exception())) 

2452 continue 

2453 for tgt in f.result(): 

2454 yield tgt 

2455 

2456 

2457def _query_elb_attrs(session_factory, elb_set): 

2458 session = local_session(session_factory) 

2459 client = session.client('elb') 

2460 log_targets = [] 

2461 for e in elb_set: 

2462 try: 

2463 attrs = client.describe_load_balancer_attributes( 

2464 LoadBalancerName=e['LoadBalancerName'])[ 

2465 'LoadBalancerAttributes'] 

2466 if 'AccessLog' in attrs and attrs['AccessLog']['Enabled']: 

2467 log_targets.append(( 

2468 attrs['AccessLog']['S3BucketName'], 

2469 attrs['AccessLog']['S3BucketPrefix'])) 

2470 except Exception as err: 

2471 log.warning( 

2472 "Could not retrieve load balancer %s: %s" % ( 

2473 e['LoadBalancerName'], err)) 

2474 return log_targets 

2475 

2476 

2477@actions.register('remove-website-hosting') 

2478class RemoveWebsiteHosting(BucketActionBase): 

2479 """Action that removes website hosting configuration.""" 

2480 

2481 schema = type_schema('remove-website-hosting') 

2482 

2483 permissions = ('s3:DeleteBucketWebsite',) 

2484 

2485 def process(self, buckets): 

2486 session = local_session(self.manager.session_factory) 

2487 for bucket in buckets: 

2488 client = bucket_client(session, bucket) 

2489 client.delete_bucket_website(Bucket=bucket['Name']) 

2490 

2491 

2492@actions.register('delete-global-grants') 

2493class DeleteGlobalGrants(BucketActionBase): 

2494 """Deletes global grants associated to a S3 bucket 

2495 

2496 :example: 

2497 

2498 .. code-block:: yaml 

2499 

2500 policies: 

2501 - name: s3-delete-global-grants 

2502 resource: s3 

2503 filters: 

2504 - type: global-grants 

2505 actions: 

2506 - delete-global-grants 

2507 """ 

2508 

2509 schema = type_schema( 

2510 'delete-global-grants', 

2511 grantees={'type': 'array', 'items': {'type': 'string'}}) 

2512 

2513 permissions = ('s3:PutBucketAcl',) 

2514 

2515 def process(self, buckets): 

2516 with self.executor_factory(max_workers=5) as w: 

2517 return list(filter(None, list(w.map(self.process_bucket, buckets)))) 

2518 

2519 def process_bucket(self, b): 

2520 grantees = self.data.get( 

2521 'grantees', [ 

2522 GlobalGrantsFilter.AUTH_ALL, GlobalGrantsFilter.GLOBAL_ALL]) 

2523 

2524 log.info(b) 

2525 

2526 acl = b.get('Acl', {'Grants': []}) 

2527 if not acl or not acl['Grants']: 

2528 return 

2529 new_grants = [] 

2530 for grant in acl['Grants']: 

2531 grantee = grant.get('Grantee', {}) 

2532 if not grantee: 

2533 continue 

2534 # Yuck, 'get_bucket_acl' doesn't return the grantee type. 

2535 if 'URI' in grantee: 

2536 grantee['Type'] = 'Group' 

2537 else: 

2538 grantee['Type'] = 'CanonicalUser' 

2539 if ('URI' in grantee and 

2540 grantee['URI'] in grantees and not 

2541 (grant['Permission'] == 'READ' and b['Website'])): 

2542 # Remove this grantee. 

2543 pass 

2544 else: 

2545 new_grants.append(grant) 

2546 

2547 log.info({'Owner': acl['Owner'], 'Grants': new_grants}) 

2548 

2549 c = bucket_client(self.manager.session_factory(), b) 

2550 try: 

2551 c.put_bucket_acl( 

2552 Bucket=b['Name'], 

2553 AccessControlPolicy={ 

2554 'Owner': acl['Owner'], 'Grants': new_grants}) 

2555 except ClientError as e: 

2556 if e.response['Error']['Code'] == 'NoSuchBucket': 

2557 return 

2558 return b 

2559 

2560 

2561@actions.register('tag') 

2562class BucketTag(Tag): 

2563 """Action to create tags on a S3 bucket 

2564 

2565 :example: 

2566 

2567 .. code-block:: yaml 

2568 

2569 policies: 

2570 - name: s3-tag-region 

2571 resource: s3 

2572 region: us-east-1 

2573 filters: 

2574 - "tag:RegionName": absent 

2575 actions: 

2576 - type: tag 

2577 key: RegionName 

2578 value: us-east-1 

2579 """ 

2580 

2581 def process_resource_set(self, client, resource_set, tags): 

2582 modify_bucket_tags(self.manager.session_factory, resource_set, tags) 

2583 

2584 

2585@actions.register('mark-for-op') 

2586class MarkBucketForOp(TagDelayedAction): 

2587 """Action schedules custodian to perform an action at a certain date 

2588 

2589 :example: 

2590 

2591 .. code-block:: yaml 

2592 

2593 policies: 

2594 - name: s3-encrypt 

2595 resource: s3 

2596 filters: 

2597 - type: missing-statement 

2598 statement_ids: 

2599 - RequiredEncryptedPutObject 

2600 actions: 

2601 - type: mark-for-op 

2602 op: attach-encrypt 

2603 days: 7 

2604 """ 

2605 

2606 schema = type_schema( 

2607 'mark-for-op', rinherit=TagDelayedAction.schema) 

2608 

2609 

2610@actions.register('unmark') 

2611@actions.register('remove-tag') 

2612class RemoveBucketTag(RemoveTag): 

2613 """Removes tag/tags from a S3 object 

2614 

2615 :example: 

2616 

2617 .. code-block:: yaml 

2618 

2619 policies: 

2620 - name: s3-remove-owner-tag 

2621 resource: s3 

2622 filters: 

2623 - "tag:BucketOwner": present 

2624 actions: 

2625 - type: remove-tag 

2626 tags: ['BucketOwner'] 

2627 """ 

2628 

2629 def process_resource_set(self, client, resource_set, tags): 

2630 modify_bucket_tags( 

2631 self.manager.session_factory, resource_set, remove_tags=tags) 

2632 

2633 

2634@filters.register('data-events') 

2635class DataEvents(Filter): 

2636 """Find buckets for which CloudTrail is logging data events. 

2637 

2638 Note that this filter only examines trails that are defined in the 

2639 current account. 

2640 """ 

2641 

2642 schema = type_schema('data-events', state={'enum': ['present', 'absent']}) 

2643 permissions = ( 

2644 'cloudtrail:DescribeTrails', 

2645 'cloudtrail:GetEventSelectors') 

2646 

2647 def get_event_buckets(self, session, trails): 

2648 """Return a mapping of bucket name to cloudtrail. 

2649 

2650 For wildcard trails the bucket name is ''. 

2651 """ 

2652 regions = {t.get('HomeRegion') for t in trails} 

2653 clients = {} 

2654 for region in regions: 

2655 clients[region] = session.client('cloudtrail', region_name=region) 

2656 

2657 event_buckets = {} 

2658 for t in trails: 

2659 for events in clients[t.get('HomeRegion')].get_event_selectors( 

2660 TrailName=t['Name']).get('EventSelectors', ()): 

2661 if 'DataResources' not in events: 

2662 continue 

2663 for data_events in events['DataResources']: 

2664 if data_events['Type'] != 'AWS::S3::Object': 

2665 continue 

2666 for b in data_events['Values']: 

2667 event_buckets[b.rsplit(':')[-1].strip('/')] = t['Name'] 

2668 return event_buckets 

2669 

2670 def process(self, resources, event=None): 

2671 trails = self.manager.get_resource_manager('cloudtrail').resources() 

2672 local_trails = self.filter_resources( 

2673 trails, 

2674 "split(':', TrailARN)[4]", (self.manager.account_id,) 

2675 ) 

2676 session = local_session(self.manager.session_factory) 

2677 event_buckets = self.get_event_buckets(session, local_trails) 

2678 ops = { 

2679 'present': lambda x: ( 

2680 x['Name'] in event_buckets or '' in event_buckets), 

2681 'absent': ( 

2682 lambda x: x['Name'] not in event_buckets and '' 

2683 not in event_buckets)} 

2684 

2685 op = ops[self.data.get('state', 'present')] 

2686 results = [] 

2687 for b in resources: 

2688 if op(b): 

2689 results.append(b) 

2690 return results 

2691 

2692 

2693@filters.register('inventory') 

2694class Inventory(ValueFilter): 

2695 """Filter inventories for a bucket""" 

2696 schema = type_schema('inventory', rinherit=ValueFilter.schema) 

2697 schema_alias = False 

2698 permissions = ('s3:GetInventoryConfiguration',) 

2699 

2700 def process(self, buckets, event=None): 

2701 results = [] 

2702 with self.executor_factory(max_workers=2) as w: 

2703 futures = {} 

2704 for b in buckets: 

2705 futures[w.submit(self.process_bucket, b)] = b 

2706 

2707 for f in as_completed(futures): 

2708 b = futures[f] 

2709 if f.exception(): 

2710 b.setdefault('c7n:DeniedMethods', []).append('GetInventoryConfiguration') 

2711 self.log.error( 

2712 "Error processing bucket: %s error: %s", 

2713 b['Name'], f.exception()) 

2714 continue 

2715 if f.result(): 

2716 results.append(b) 

2717 return results 

2718 

2719 def process_bucket(self, b): 

2720 if 'c7n:inventories' not in b: 

2721 client = bucket_client(local_session(self.manager.session_factory), b) 

2722 inventories = client.list_bucket_inventory_configurations( 

2723 Bucket=b['Name']).get('InventoryConfigurationList', []) 

2724 b['c7n:inventories'] = inventories 

2725 

2726 for i in b['c7n:inventories']: 

2727 if self.match(i): 

2728 return True 

2729 

2730 

2731@actions.register('set-inventory') 

2732class SetInventory(BucketActionBase): 

2733 """Configure bucket inventories for an s3 bucket. 

2734 """ 

2735 schema = type_schema( 

2736 'set-inventory', 

2737 required=['name', 'destination'], 

2738 state={'enum': ['enabled', 'disabled', 'absent']}, 

2739 name={'type': 'string', 'description': 'Name of inventory'}, 

2740 destination={'type': 'string', 'description': 'Name of destination bucket'}, 

2741 prefix={'type': 'string', 'description': 'Destination prefix'}, 

2742 encryption={'enum': ['SSES3', 'SSEKMS']}, 

2743 key_id={'type': 'string', 'description': 'Optional Customer KMS KeyId for SSE-KMS'}, 

2744 versions={'enum': ['All', 'Current']}, 

2745 schedule={'enum': ['Daily', 'Weekly']}, 

2746 format={'enum': ['CSV', 'ORC', 'Parquet']}, 

2747 fields={'type': 'array', 'items': {'enum': [ 

2748 'Size', 'LastModifiedDate', 'StorageClass', 'ETag', 

2749 'IsMultipartUploaded', 'ReplicationStatus', 'EncryptionStatus', 

2750 'ObjectLockRetainUntilDate', 'ObjectLockMode', 'ObjectLockLegalHoldStatus', 

2751 'IntelligentTieringAccessTier', 'BucketKeyStatus', 'ChecksumAlgorithm', 

2752 'ObjectAccessControlList', 'ObjectOwner']}}) 

2753 

2754 permissions = ('s3:PutInventoryConfiguration', 's3:GetInventoryConfiguration') 

2755 

2756 def process(self, buckets): 

2757 with self.executor_factory(max_workers=2) as w: 

2758 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

2759 for future in as_completed(futures): 

2760 bucket = futures[future] 

2761 try: 

2762 future.result() 

2763 except Exception as e: 

2764 self.log.error('Message: %s Bucket: %s', e, bucket['Name']) 

2765 

2766 def process_bucket(self, b): 

2767 inventory_name = self.data.get('name') 

2768 destination = self.data.get('destination') 

2769 prefix = self.data.get('prefix', '') 

2770 schedule = self.data.get('schedule', 'Daily') 

2771 fields = self.data.get('fields', ['LastModifiedDate', 'Size']) 

2772 versions = self.data.get('versions', 'Current') 

2773 state = self.data.get('state', 'enabled') 

2774 encryption = self.data.get('encryption') 

2775 inventory_format = self.data.get('format', 'CSV') 

2776 

2777 if not prefix: 

2778 prefix = "Inventories/%s" % (self.manager.config.account_id) 

2779 

2780 client = bucket_client(local_session(self.manager.session_factory), b) 

2781 if state == 'absent': 

2782 try: 

2783 client.delete_bucket_inventory_configuration( 

2784 Bucket=b['Name'], Id=inventory_name) 

2785 except ClientError as e: 

2786 if e.response['Error']['Code'] != 'NoSuchConfiguration': 

2787 raise 

2788 return 

2789 

2790 bucket = { 

2791 'Bucket': "arn:aws:s3:::%s" % destination, 

2792 'Format': inventory_format 

2793 } 

2794 

2795 inventory = { 

2796 'Destination': { 

2797 'S3BucketDestination': bucket 

2798 }, 

2799 'IsEnabled': state == 'enabled' and True or False, 

2800 'Id': inventory_name, 

2801 'OptionalFields': fields, 

2802 'IncludedObjectVersions': versions, 

2803 'Schedule': { 

2804 'Frequency': schedule 

2805 } 

2806 } 

2807 

2808 if prefix: 

2809 bucket['Prefix'] = prefix 

2810 

2811 if encryption: 

2812 bucket['Encryption'] = {encryption: {}} 

2813 if encryption == 'SSEKMS' and self.data.get('key_id'): 

2814 bucket['Encryption'] = {encryption: { 

2815 'KeyId': self.data['key_id'] 

2816 }} 

2817 

2818 found = self.get_inventory_delta(client, inventory, b) 

2819 if found: 

2820 return 

2821 if found is False: 

2822 self.log.debug("updating bucket:%s inventory configuration id:%s", 

2823 b['Name'], inventory_name) 

2824 client.put_bucket_inventory_configuration( 

2825 Bucket=b['Name'], Id=inventory_name, InventoryConfiguration=inventory) 

2826 

2827 def get_inventory_delta(self, client, inventory, b): 

2828 inventories = client.list_bucket_inventory_configurations(Bucket=b['Name']) 

2829 found = None 

2830 for i in inventories.get('InventoryConfigurationList', []): 

2831 if i['Id'] != inventory['Id']: 

2832 continue 

2833 found = True 

2834 for k, v in inventory.items(): 

2835 if k not in i: 

2836 found = False 

2837 continue 

2838 if isinstance(v, list): 

2839 v.sort() 

2840 i[k].sort() 

2841 if i[k] != v: 

2842 found = False 

2843 return found 

2844 

2845 

2846@filters.register('intelligent-tiering') 

2847class IntelligentTiering(ListItemFilter): 

2848 """Filter for S3 buckets to look at intelligent tiering configurations 

2849 

2850 The schema to supply to the attrs follows the schema here: 

2851 https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_bucket_intelligent_tiering_configurations.html 

2852 

2853 :example: 

2854 

2855 .. code-block:: yaml 

2856 

2857 policies: 

2858 - name: s3-intelligent-tiering-configuration 

2859 resource: s3 

2860 filters: 

2861 - type: intelligent-tiering 

2862 attrs: 

2863 - Status: Enabled 

2864 - Filter: 

2865 And: 

2866 Prefix: test 

2867 Tags: 

2868 - Key: Owner 

2869 Value: c7n 

2870 - Tierings: 

2871 - Days: 100 

2872 - AccessTier: ARCHIVE_ACCESS 

2873 

2874 """ 

2875 schema = type_schema( 

2876 'intelligent-tiering', 

2877 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'}, 

2878 count={'type': 'number'}, 

2879 count_op={'$ref': '#/definitions/filters_common/comparison_operators'} 

2880 ) 

2881 permissions = ('s3:GetIntelligentTieringConfiguration',) 

2882 annotation_key = "c7n:IntelligentTiering" 

2883 annotate_items = True 

2884 

2885 def __init__(self, data, manager=None): 

2886 super().__init__(data, manager) 

2887 self.data['key'] = self.annotation_key 

2888 

2889 def process(self, buckets, event=None): 

2890 with self.executor_factory(max_workers=2) as w: 

2891 futures = {w.submit(self.get_item_values, b): b for b in buckets} 

2892 for future in as_completed(futures): 

2893 b = futures[future] 

2894 if future.exception(): 

2895 self.log.error("Message: %s Bucket: %s", future.exception(), b['Name']) 

2896 continue 

2897 return super().process(buckets, event) 

2898 

2899 def get_item_values(self, b): 

2900 if self.annotation_key not in b: 

2901 client = bucket_client(local_session(self.manager.session_factory), b) 

2902 try: 

2903 int_tier_config = client.list_bucket_intelligent_tiering_configurations( 

2904 Bucket=b['Name']) 

2905 b[self.annotation_key] = int_tier_config.get( 

2906 'IntelligentTieringConfigurationList', []) 

2907 except ClientError as e: 

2908 if e.response['Error']['Code'] == 'AccessDenied': 

2909 method = 'list_bucket_intelligent_tiering_configurations' 

2910 log.warning( 

2911 "Bucket:%s unable to invoke method:%s error:%s ", 

2912 b['Name'], method, e.response['Error']['Message']) 

2913 b.setdefault('c7n:DeniedMethods', []).append(method) 

2914 return b.get(self.annotation_key) 

2915 

2916 

2917@actions.register('set-intelligent-tiering') 

2918class ConfigureIntelligentTiering(BucketActionBase): 

2919 """Action applies an intelligent tiering configuration to a S3 bucket 

2920 

2921 The schema to supply to the configuration follows the schema here: 

2922 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_bucket_intelligent_tiering_configuration.html 

2923 

2924 To delete a configuration, supply Status=delete with the either the Id or Id: matched 

2925 

2926 :example: 

2927 

2928 .. code-block:: yaml 

2929 

2930 policies: 

2931 - name: s3-apply-intelligent-tiering-config 

2932 resource: aws.s3 

2933 filters: 

2934 - not: 

2935 - type: intelligent-tiering 

2936 attrs: 

2937 - Status: Enabled 

2938 - Filter: 

2939 And: 

2940 Prefix: helloworld 

2941 Tags: 

2942 - Key: Hello 

2943 Value: World 

2944 - Tierings: 

2945 - Days: 123 

2946 AccessTier: ARCHIVE_ACCESS 

2947 actions: 

2948 - type: set-intelligent-tiering 

2949 Id: c7n-default 

2950 IntelligentTieringConfiguration: 

2951 Id: c7n-default 

2952 Status: Enabled 

2953 Tierings: 

2954 - Days: 149 

2955 AccessTier: ARCHIVE_ACCESS 

2956 

2957 - name: s3-delete-intelligent-tiering-configuration 

2958 resource: aws.s3 

2959 filters: 

2960 - type: intelligent-tiering 

2961 attrs: 

2962 - Status: Enabled 

2963 - Id: test-config 

2964 actions: 

2965 - type: set-intelligent-tiering 

2966 Id: test-config 

2967 State: delete 

2968 

2969 - name: s3-delete-intelligent-tiering-matched-configs 

2970 resource: aws.s3 

2971 filters: 

2972 - type: intelligent-tiering 

2973 attrs: 

2974 - Status: Enabled 

2975 - Id: test-config 

2976 actions: 

2977 - type: set-intelligent-tiering 

2978 Id: matched 

2979 State: delete 

2980 

2981 """ 

2982 

2983 annotation_key = 'c7n:ListItemMatches' 

2984 shape = 'PutBucketIntelligentTieringConfigurationRequest' 

2985 schema = { 

2986 'type': 'object', 

2987 'additionalProperties': False, 

2988 'oneOf': [ 

2989 {'required': ['type', 'Id', 'IntelligentTieringConfiguration']}, 

2990 {'required': ['type', 'Id', 'State']}], 

2991 'properties': { 

2992 'type': {'enum': ['set-intelligent-tiering']}, 

2993 'Id': {'type': 'string'}, 

2994 # delete intelligent tier configurations via state: delete 

2995 'State': {'type': 'string', 'enum': ['delete']}, 

2996 'IntelligentTieringConfiguration': {'type': 'object'} 

2997 }, 

2998 } 

2999 

3000 permissions = ('s3:PutIntelligentTieringConfiguration',) 

3001 

3002 def validate(self): 

3003 # You can have up to 1,000 S3 Intelligent-Tiering configurations per bucket. 

3004 # Hence, always use it with a filter 

3005 found = False 

3006 for f in self.manager.iter_filters(): 

3007 if isinstance(f, IntelligentTiering): 

3008 found = True 

3009 break 

3010 if not found: 

3011 raise PolicyValidationError( 

3012 '`set-intelligent-tiering` may only be used in ' 

3013 'conjunction with `intelligent-tiering` filter on %s' % (self.manager.data,)) 

3014 cfg = dict(self.data) 

3015 if 'IntelligentTieringConfiguration' in cfg: 

3016 cfg['Bucket'] = 'bucket' 

3017 cfg.pop('type') 

3018 return shape_validate( 

3019 cfg, self.shape, self.manager.resource_type.service) 

3020 

3021 def process(self, buckets): 

3022 with self.executor_factory(max_workers=3) as w: 

3023 futures = {} 

3024 

3025 for b in buckets: 

3026 futures[w.submit(self.process_bucket, b)] = b 

3027 

3028 for future in as_completed(futures): 

3029 if future.exception(): 

3030 bucket = futures[future] 

3031 self.log.error( 

3032 'error modifying bucket intelligent tiering configuration: %s\n%s', 

3033 bucket['Name'], future.exception()) 

3034 continue 

3035 

3036 def process_bucket(self, bucket): 

3037 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3038 

3039 if 'list_bucket_intelligent_tiering_configurations' in bucket.get( 

3040 'c7n:DeniedMethods', []): 

3041 log.warning("Access Denied Bucket:%s while reading intelligent tiering configurations" 

3042 % bucket['Name']) 

3043 return 

3044 

3045 if self.data.get('Id') and self.data.get('IntelligentTieringConfiguration'): 

3046 try: 

3047 s3.put_bucket_intelligent_tiering_configuration( 

3048 Bucket=bucket['Name'], Id=self.data.get( 

3049 'Id'), IntelligentTieringConfiguration=self.data.get( 

3050 'IntelligentTieringConfiguration')) 

3051 except ClientError as e: 

3052 if e.response['Error']['Code'] == 'AccessDenied': 

3053 log.warning( 

3054 "Access Denied Bucket:%s while applying intelligent tiering configuration" 

3055 % bucket['Name']) 

3056 if self.data.get('State'): 

3057 if self.data.get('Id') == 'matched': 

3058 for config in bucket.get(self.annotation_key): 

3059 self.delete_intelligent_tiering_configurations(s3, config.get('Id'), bucket) 

3060 else: 

3061 self.delete_intelligent_tiering_configurations(s3, self.data.get('Id'), bucket) 

3062 

3063 def delete_intelligent_tiering_configurations(self, s3_client, id, bucket): 

3064 try: 

3065 s3_client.delete_bucket_intelligent_tiering_configuration(Bucket=bucket['Name'], Id=id) 

3066 except ClientError as e: 

3067 if e.response['Error']['Code'] == 'AccessDenied': 

3068 log.warning( 

3069 "Access Denied Bucket:%s while deleting intelligent tiering configuration" 

3070 % bucket['Name']) 

3071 elif e.response['Error']['Code'] == 'NoSuchConfiguration': 

3072 log.warning( 

3073 "No such configuration found:%s while deleting intelligent tiering configuration" 

3074 % bucket['Name']) 

3075 

3076 

3077@actions.register('delete') 

3078class DeleteBucket(ScanBucket): 

3079 """Action deletes a S3 bucket 

3080 

3081 :example: 

3082 

3083 .. code-block:: yaml 

3084 

3085 policies: 

3086 - name: delete-unencrypted-buckets 

3087 resource: s3 

3088 filters: 

3089 - type: missing-statement 

3090 statement_ids: 

3091 - RequiredEncryptedPutObject 

3092 actions: 

3093 - type: delete 

3094 remove-contents: true 

3095 """ 

3096 

3097 schema = type_schema('delete', **{'remove-contents': {'type': 'boolean'}}) 

3098 

3099 permissions = ('s3:*',) 

3100 

3101 bucket_ops = { 

3102 'standard': { 

3103 'iterator': 'list_objects', 

3104 'contents_key': ['Contents'], 

3105 'key_processor': 'process_key' 

3106 }, 

3107 'versioned': { 

3108 'iterator': 'list_object_versions', 

3109 'contents_key': ['Versions', 'DeleteMarkers'], 

3110 'key_processor': 'process_version' 

3111 } 

3112 } 

3113 

3114 def process_delete_enablement(self, b): 

3115 """Prep a bucket for deletion. 

3116 

3117 Clear out any pending multi-part uploads. 

3118 

3119 Disable versioning on the bucket, so deletes don't 

3120 generate fresh deletion markers. 

3121 """ 

3122 client = bucket_client( 

3123 local_session(self.manager.session_factory), b) 

3124 

3125 # Stop replication so we can suspend versioning 

3126 if b.get('Replication') is not None: 

3127 client.delete_bucket_replication(Bucket=b['Name']) 

3128 

3129 # Suspend versioning, so we don't get new delete markers 

3130 # as we walk and delete versions 

3131 if (self.get_bucket_style(b) == 'versioned' and b['Versioning']['Status'] == 'Enabled' and 

3132 self.data.get('remove-contents', True)): 

3133 client.put_bucket_versioning( 

3134 Bucket=b['Name'], 

3135 VersioningConfiguration={'Status': 'Suspended'}) 

3136 

3137 # Clear our multi-part uploads 

3138 uploads = client.get_paginator('list_multipart_uploads') 

3139 for p in uploads.paginate(Bucket=b['Name']): 

3140 for u in p.get('Uploads', ()): 

3141 client.abort_multipart_upload( 

3142 Bucket=b['Name'], 

3143 Key=u['Key'], 

3144 UploadId=u['UploadId']) 

3145 

3146 def process(self, buckets): 

3147 # might be worth sanity checking all our permissions 

3148 # on the bucket up front before disabling versioning/replication. 

3149 if self.data.get('remove-contents', True): 

3150 self._process_with_futures(self.process_delete_enablement, buckets) 

3151 self.empty_buckets(buckets) 

3152 

3153 results = self._process_with_futures(self.delete_bucket, buckets) 

3154 self.write_denied_buckets_file() 

3155 return results 

3156 

3157 def delete_bucket(self, b): 

3158 s3 = bucket_client(self.manager.session_factory(), b) 

3159 try: 

3160 self._run_api(s3.delete_bucket, Bucket=b['Name']) 

3161 except ClientError as e: 

3162 if e.response['Error']['Code'] == 'BucketNotEmpty': 

3163 self.log.error( 

3164 "Error while deleting bucket %s, bucket not empty" % ( 

3165 b['Name'])) 

3166 else: 

3167 raise e 

3168 

3169 def empty_buckets(self, buckets): 

3170 t = time.time() 

3171 results = super(DeleteBucket, self).process(buckets) 

3172 run_time = time.time() - t 

3173 object_count = 0 

3174 

3175 for r in results: 

3176 object_count += r['Count'] 

3177 self.manager.ctx.metrics.put_metric( 

3178 "Total Keys", object_count, "Count", Scope=r['Bucket'], 

3179 buffer=True) 

3180 self.manager.ctx.metrics.put_metric( 

3181 "Total Keys", object_count, "Count", Scope="Account", buffer=True) 

3182 self.manager.ctx.metrics.flush() 

3183 

3184 log.info( 

3185 "EmptyBucket buckets:%d Complete keys:%d rate:%0.2f/s time:%0.2fs", 

3186 len(buckets), object_count, 

3187 float(object_count) / run_time if run_time else 0, run_time) 

3188 return results 

3189 

3190 def process_chunk(self, batch, bucket): 

3191 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3192 objects = [] 

3193 for key in batch: 

3194 obj = {'Key': key['Key']} 

3195 if 'VersionId' in key: 

3196 obj['VersionId'] = key['VersionId'] 

3197 objects.append(obj) 

3198 results = s3.delete_objects( 

3199 Bucket=bucket['Name'], Delete={'Objects': objects}).get('Deleted', ()) 

3200 if self.get_bucket_style(bucket) != 'versioned': 

3201 return results 

3202 

3203 

3204@actions.register('configure-lifecycle') 

3205class Lifecycle(BucketActionBase): 

3206 """Action applies a lifecycle policy to versioned S3 buckets 

3207 

3208 The schema to supply to the rule follows the schema here: 

3209 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_bucket_lifecycle_configuration 

3210 

3211 To delete a lifecycle rule, supply Status=absent 

3212 

3213 :example: 

3214 

3215 .. code-block:: yaml 

3216 

3217 policies: 

3218 - name: s3-apply-lifecycle 

3219 resource: s3 

3220 actions: 

3221 - type: configure-lifecycle 

3222 rules: 

3223 - ID: my-lifecycle-id 

3224 Status: Enabled 

3225 Prefix: foo/ 

3226 Transitions: 

3227 - Days: 60 

3228 StorageClass: GLACIER 

3229 

3230 """ 

3231 

3232 schema = type_schema( 

3233 'configure-lifecycle', 

3234 **{ 

3235 'rules': { 

3236 'type': 'array', 

3237 'items': { 

3238 'type': 'object', 

3239 'required': ['ID', 'Status'], 

3240 'additionalProperties': False, 

3241 'properties': { 

3242 'ID': {'type': 'string'}, 

3243 # c7n intercepts `absent` 

3244 'Status': {'enum': ['Enabled', 'Disabled', 'absent']}, 

3245 'Prefix': {'type': 'string'}, 

3246 'Expiration': { 

3247 'type': 'object', 

3248 'additionalProperties': False, 

3249 'properties': { 

3250 'Date': {'type': 'string'}, # Date 

3251 'Days': {'type': 'integer'}, 

3252 'ExpiredObjectDeleteMarker': {'type': 'boolean'}, 

3253 }, 

3254 }, 

3255 'Filter': { 

3256 'type': 'object', 

3257 'minProperties': 1, 

3258 'maxProperties': 1, 

3259 'additionalProperties': False, 

3260 'properties': { 

3261 'Prefix': {'type': 'string'}, 

3262 'ObjectSizeGreaterThan': {'type': 'integer'}, 

3263 'ObjectSizeLessThan': {'type': 'integer'}, 

3264 'Tag': { 

3265 'type': 'object', 

3266 'required': ['Key', 'Value'], 

3267 'additionalProperties': False, 

3268 'properties': { 

3269 'Key': {'type': 'string'}, 

3270 'Value': {'type': 'string'}, 

3271 }, 

3272 }, 

3273 'And': { 

3274 'type': 'object', 

3275 'additionalProperties': False, 

3276 'properties': { 

3277 'Prefix': {'type': 'string'}, 

3278 'ObjectSizeGreaterThan': {'type': 'integer'}, 

3279 'ObjectSizeLessThan': {'type': 'integer'}, 

3280 'Tags': { 

3281 'type': 'array', 

3282 'items': { 

3283 'type': 'object', 

3284 'required': ['Key', 'Value'], 

3285 'additionalProperties': False, 

3286 'properties': { 

3287 'Key': {'type': 'string'}, 

3288 'Value': {'type': 'string'}, 

3289 }, 

3290 }, 

3291 }, 

3292 }, 

3293 }, 

3294 }, 

3295 }, 

3296 'Transitions': { 

3297 'type': 'array', 

3298 'items': { 

3299 'type': 'object', 

3300 'additionalProperties': False, 

3301 'properties': { 

3302 'Date': {'type': 'string'}, # Date 

3303 'Days': {'type': 'integer'}, 

3304 'StorageClass': {'type': 'string'}, 

3305 }, 

3306 }, 

3307 }, 

3308 'NoncurrentVersionTransitions': { 

3309 'type': 'array', 

3310 'items': { 

3311 'type': 'object', 

3312 'additionalProperties': False, 

3313 'properties': { 

3314 'NoncurrentDays': {'type': 'integer'}, 

3315 'NewerNoncurrentVersions': {'type': 'integer'}, 

3316 'StorageClass': {'type': 'string'}, 

3317 }, 

3318 }, 

3319 }, 

3320 'NoncurrentVersionExpiration': { 

3321 'type': 'object', 

3322 'additionalProperties': False, 

3323 'properties': { 

3324 'NoncurrentDays': {'type': 'integer'}, 

3325 'NewerNoncurrentVersions': {'type': 'integer'} 

3326 }, 

3327 }, 

3328 'AbortIncompleteMultipartUpload': { 

3329 'type': 'object', 

3330 'additionalProperties': False, 

3331 'properties': { 

3332 'DaysAfterInitiation': {'type': 'integer'}, 

3333 }, 

3334 }, 

3335 }, 

3336 }, 

3337 }, 

3338 } 

3339 ) 

3340 

3341 permissions = ('s3:GetLifecycleConfiguration', 's3:PutLifecycleConfiguration') 

3342 

3343 def process(self, buckets): 

3344 with self.executor_factory(max_workers=3) as w: 

3345 futures = {} 

3346 results = [] 

3347 

3348 for b in buckets: 

3349 futures[w.submit(self.process_bucket, b)] = b 

3350 

3351 for future in as_completed(futures): 

3352 if future.exception(): 

3353 bucket = futures[future] 

3354 self.log.error('error modifying bucket lifecycle: %s\n%s', 

3355 bucket['Name'], future.exception()) 

3356 results += filter(None, [future.result()]) 

3357 

3358 return results 

3359 

3360 def process_bucket(self, bucket): 

3361 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3362 

3363 if 'get_bucket_lifecycle_configuration' in bucket.get('c7n:DeniedMethods', []): 

3364 log.warning("Access Denied Bucket:%s while reading lifecycle" % bucket['Name']) 

3365 return 

3366 

3367 # Adjust the existing lifecycle by adding/deleting/overwriting rules as necessary 

3368 config = (bucket.get('Lifecycle') or {}).get('Rules', []) 

3369 for rule in self.data['rules']: 

3370 for index, existing_rule in enumerate(config): 

3371 if not existing_rule: 

3372 continue 

3373 if rule['ID'] == existing_rule['ID']: 

3374 if rule['Status'] == 'absent': 

3375 config[index] = None 

3376 else: 

3377 config[index] = rule 

3378 break 

3379 else: 

3380 if rule['Status'] != 'absent': 

3381 config.append(rule) 

3382 

3383 # The extra `list` conversion is required for python3 

3384 config = list(filter(None, config)) 

3385 

3386 try: 

3387 if not config: 

3388 s3.delete_bucket_lifecycle(Bucket=bucket['Name']) 

3389 else: 

3390 s3.put_bucket_lifecycle_configuration( 

3391 Bucket=bucket['Name'], LifecycleConfiguration={'Rules': config}) 

3392 except ClientError as e: 

3393 if e.response['Error']['Code'] == 'AccessDenied': 

3394 log.warning("Access Denied Bucket:%s while applying lifecycle" % bucket['Name']) 

3395 else: 

3396 raise e 

3397 

3398 

3399class KMSKeyResolverMixin: 

3400 """Builds a dictionary of region specific ARNs""" 

3401 

3402 def __init__(self, data, manager=None): 

3403 self.arns = dict() 

3404 self.data = data 

3405 self.manager = manager 

3406 

3407 def resolve_keys(self, buckets): 

3408 key = self.data.get('key') 

3409 if not key: 

3410 return None 

3411 

3412 regions = {get_region(b) for b in buckets} 

3413 for r in regions: 

3414 client = local_session(self.manager.session_factory).client('kms', region_name=r) 

3415 try: 

3416 key_meta = client.describe_key( 

3417 KeyId=key 

3418 ).get('KeyMetadata', {}) 

3419 key_id = key_meta.get('KeyId') 

3420 

3421 # We need a complete set of alias identifiers (names and ARNs) 

3422 # to fully evaluate bucket encryption filters. 

3423 key_aliases = client.list_aliases( 

3424 KeyId=key_id 

3425 ).get('Aliases', []) 

3426 

3427 self.arns[r] = { 

3428 'KeyId': key_id, 

3429 'Arn': key_meta.get('Arn'), 

3430 'KeyManager': key_meta.get('KeyManager'), 

3431 'Description': key_meta.get('Description'), 

3432 'Aliases': [ 

3433 alias[attr] 

3434 for alias in key_aliases 

3435 for attr in ('AliasArn', 'AliasName') 

3436 ], 

3437 } 

3438 

3439 except ClientError as e: 

3440 self.log.error('Error resolving kms ARNs for set-bucket-encryption: %s key: %s' % ( 

3441 e, self.data.get('key'))) 

3442 

3443 def get_key(self, bucket): 

3444 if 'key' not in self.data: 

3445 return None 

3446 region = get_region(bucket) 

3447 key = self.arns.get(region) 

3448 if not key: 

3449 self.log.warning('Unable to resolve key %s for bucket %s in region %s', 

3450 self.data['key'], bucket.get('Name'), region) 

3451 return key 

3452 

3453 

3454@filters.register('bucket-encryption') 

3455class BucketEncryption(KMSKeyResolverMixin, Filter): 

3456 """Filters for S3 buckets that have bucket-encryption 

3457 

3458 :example 

3459 

3460 .. code-block:: yaml 

3461 

3462 policies: 

3463 - name: s3-bucket-encryption-AES256 

3464 resource: s3 

3465 region: us-east-1 

3466 filters: 

3467 - type: bucket-encryption 

3468 state: True 

3469 crypto: AES256 

3470 - name: s3-bucket-encryption-KMS 

3471 resource: s3 

3472 region: us-east-1 

3473 filters: 

3474 - type: bucket-encryption 

3475 state: True 

3476 crypto: aws:kms 

3477 key: alias/some/alias/key 

3478 - name: s3-bucket-encryption-off 

3479 resource: s3 

3480 region: us-east-1 

3481 filters: 

3482 - type: bucket-encryption 

3483 state: False 

3484 - name: s3-bucket-test-bucket-key-enabled 

3485 resource: s3 

3486 region: us-east-1 

3487 filters: 

3488 - type: bucket-encryption 

3489 bucket_key_enabled: True 

3490 """ 

3491 schema = type_schema('bucket-encryption', 

3492 state={'type': 'boolean'}, 

3493 crypto={'type': 'string', 'enum': ['AES256', 'aws:kms']}, 

3494 key={'type': 'string'}, 

3495 bucket_key_enabled={'type': 'boolean'}) 

3496 

3497 permissions = ('s3:GetEncryptionConfiguration', 'kms:DescribeKey', 'kms:ListAliases') 

3498 annotation_key = 'c7n:bucket-encryption' 

3499 

3500 def validate(self): 

3501 if self.data.get('bucket_key_enabled') is not None and self.data.get('key') is not None: 

3502 raise PolicyValidationError( 

3503 f'key and bucket_key_enabled attributes cannot both be set: {self.data}' 

3504 ) 

3505 

3506 def process(self, buckets, event=None): 

3507 self.resolve_keys(buckets) 

3508 results = [] 

3509 with self.executor_factory(max_workers=2) as w: 

3510 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3511 for future in as_completed(futures): 

3512 b = futures[future] 

3513 if future.exception(): 

3514 self.log.error("Message: %s Bucket: %s", future.exception(), 

3515 b['Name']) 

3516 continue 

3517 if future.result(): 

3518 results.append(b) 

3519 return results 

3520 

3521 def process_bucket(self, b): 

3522 

3523 client = bucket_client(local_session(self.manager.session_factory), b) 

3524 rules = [] 

3525 if self.annotation_key not in b: 

3526 try: 

3527 be = client.get_bucket_encryption(Bucket=b['Name']) 

3528 be.pop('ResponseMetadata', None) 

3529 except ClientError as e: 

3530 if e.response['Error']['Code'] != 'ServerSideEncryptionConfigurationNotFoundError': 

3531 raise 

3532 be = {} 

3533 b[self.annotation_key] = be 

3534 else: 

3535 be = b[self.annotation_key] 

3536 

3537 rules = be.get('ServerSideEncryptionConfiguration', {}).get('Rules', []) 

3538 # default `state` to True as previous impl assumed state == True 

3539 # to preserve backwards compatibility 

3540 if self.data.get('bucket_key_enabled'): 

3541 for rule in rules: 

3542 return self.filter_bucket_key_enabled(rule) 

3543 elif self.data.get('bucket_key_enabled') is False: 

3544 for rule in rules: 

3545 return not self.filter_bucket_key_enabled(rule) 

3546 

3547 if self.data.get('state', True): 

3548 for sse in rules: 

3549 return self.filter_bucket(b, sse) 

3550 return False 

3551 else: 

3552 for sse in rules: 

3553 return not self.filter_bucket(b, sse) 

3554 return True 

3555 

3556 def filter_bucket(self, b, sse): 

3557 allowed = ['AES256', 'aws:kms'] 

3558 key = self.get_key(b) 

3559 crypto = self.data.get('crypto') 

3560 rule = sse.get('ApplyServerSideEncryptionByDefault') 

3561 

3562 if not rule: 

3563 return False 

3564 algo = rule.get('SSEAlgorithm') 

3565 

3566 if not crypto and algo in allowed: 

3567 return True 

3568 

3569 if crypto == 'AES256' and algo == 'AES256': 

3570 return True 

3571 elif crypto == 'aws:kms' and algo == 'aws:kms': 

3572 if not key: 

3573 # There are two broad reasons to have an empty value for 

3574 # the regional key here: 

3575 # 

3576 # * The policy did not specify a key, in which case this 

3577 # filter should match _all_ buckets with a KMS default 

3578 # encryption rule. 

3579 # 

3580 # * The policy specified a key that could not be 

3581 # resolved, in which case this filter shouldn't match 

3582 # any buckets. 

3583 return 'key' not in self.data 

3584 

3585 # The default encryption rule can specify a key ID, 

3586 # key ARN, alias name or alias ARN. Match against any of 

3587 # those attributes. A rule specifying KMS with no master key 

3588 # implies the AWS-managed key. 

3589 key_ids = {key.get('Arn'), key.get('KeyId'), *key['Aliases']} 

3590 return rule.get('KMSMasterKeyID', 'alias/aws/s3') in key_ids 

3591 

3592 def filter_bucket_key_enabled(self, rule) -> bool: 

3593 if not rule: 

3594 return False 

3595 return rule.get('BucketKeyEnabled') 

3596 

3597 

3598@actions.register('set-bucket-encryption') 

3599class SetBucketEncryption(KMSKeyResolverMixin, BucketActionBase): 

3600 """Action enables default encryption on S3 buckets 

3601 

3602 `enabled`: boolean Optional: Defaults to True 

3603 

3604 `crypto`: aws:kms | AES256` Optional: Defaults to AES256 

3605 

3606 `key`: arn, alias, or kms id key 

3607 

3608 `bucket-key`: boolean Optional: 

3609 Defaults to True. 

3610 Reduces amount of API traffic from Amazon S3 to KMS and can reduce KMS request 

3611 costsby up to 99 percent. Requires kms:Decrypt permissions for copy and upload 

3612 on the AWS KMS Key Policy. 

3613 

3614 Bucket Key Docs: https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucket-key.html 

3615 

3616 :example: 

3617 

3618 .. code-block:: yaml 

3619 

3620 policies: 

3621 - name: s3-enable-default-encryption-kms 

3622 resource: s3 

3623 actions: 

3624 - type: set-bucket-encryption 

3625 # enabled: true <------ optional (true by default) 

3626 crypto: aws:kms 

3627 key: 1234abcd-12ab-34cd-56ef-1234567890ab 

3628 bucket-key: true 

3629 

3630 - name: s3-enable-default-encryption-kms-alias 

3631 resource: s3 

3632 actions: 

3633 - type: set-bucket-encryption 

3634 # enabled: true <------ optional (true by default) 

3635 crypto: aws:kms 

3636 key: alias/some/alias/key 

3637 bucket-key: true 

3638 

3639 - name: s3-enable-default-encryption-aes256 

3640 resource: s3 

3641 actions: 

3642 - type: set-bucket-encryption 

3643 # bucket-key: true <--- optional (true by default for AWS SSE) 

3644 # crypto: AES256 <----- optional (AES256 by default) 

3645 # enabled: true <------ optional (true by default) 

3646 

3647 - name: s3-disable-default-encryption 

3648 resource: s3 

3649 actions: 

3650 - type: set-bucket-encryption 

3651 enabled: false 

3652 """ 

3653 

3654 schema = { 

3655 'type': 'object', 

3656 'additionalProperties': False, 

3657 'properties': { 

3658 'type': {'enum': ['set-bucket-encryption']}, 

3659 'enabled': {'type': 'boolean'}, 

3660 'crypto': {'enum': ['aws:kms', 'AES256']}, 

3661 'key': {'type': 'string'}, 

3662 'bucket-key': {'type': 'boolean'} 

3663 }, 

3664 'dependencies': { 

3665 'key': { 

3666 'properties': { 

3667 'crypto': {'pattern': 'aws:kms'} 

3668 }, 

3669 'required': ['crypto'] 

3670 } 

3671 } 

3672 } 

3673 

3674 permissions = ('s3:PutEncryptionConfiguration', 's3:GetEncryptionConfiguration', 

3675 'kms:ListAliases', 'kms:DescribeKey') 

3676 

3677 def process(self, buckets): 

3678 if self.data.get('enabled', True): 

3679 self.resolve_keys(buckets) 

3680 

3681 with self.executor_factory(max_workers=3) as w: 

3682 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3683 for future in as_completed(futures): 

3684 if future.exception(): 

3685 self.log.error('Message: %s Bucket: %s', future.exception(), 

3686 futures[future]['Name']) 

3687 

3688 def process_bucket(self, bucket): 

3689 default_key_desc = 'Default master key that protects my S3 objects when no other key is defined' # noqa 

3690 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3691 if not self.data.get('enabled', True): 

3692 s3.delete_bucket_encryption(Bucket=bucket['Name']) 

3693 return 

3694 algo = self.data.get('crypto', 'AES256') 

3695 

3696 # bucket key defaults to True for alias/aws/s3 and AES256 (Amazon SSE) 

3697 # and ignores False values for that crypto 

3698 bucket_key = self.data.get('bucket-key', True) 

3699 config = { 

3700 'Rules': [ 

3701 { 

3702 'ApplyServerSideEncryptionByDefault': { 

3703 'SSEAlgorithm': algo, 

3704 }, 

3705 'BucketKeyEnabled': bucket_key 

3706 } 

3707 ] 

3708 } 

3709 

3710 if algo == 'aws:kms': 

3711 key = self.get_key(bucket) 

3712 if not key: 

3713 raise Exception('Valid KMS Key required but does not exist') 

3714 

3715 config['Rules'][0]['ApplyServerSideEncryptionByDefault']['KMSMasterKeyID'] = key['Arn'] 

3716 s3.put_bucket_encryption( 

3717 Bucket=bucket['Name'], 

3718 ServerSideEncryptionConfiguration=config 

3719 ) 

3720 

3721 

3722OWNERSHIP_CONTROLS = ['BucketOwnerEnforced', 'BucketOwnerPreferred', 'ObjectWriter'] 

3723VALUE_FILTER_MAGIC_VALUES = ['absent', 'present', 'not-null', 'empty'] 

3724 

3725 

3726@filters.register('ownership') 

3727class BucketOwnershipControls(BucketFilterBase, ValueFilter): 

3728 """Filter for object ownership controls 

3729 

3730 Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html 

3731 

3732 :example 

3733 

3734 Find buckets with ACLs disabled 

3735 

3736 .. code-block:: yaml 

3737 

3738 policies: 

3739 - name: s3-bucket-acls-disabled 

3740 resource: aws.s3 

3741 region: us-east-1 

3742 filters: 

3743 - type: ownership 

3744 value: BucketOwnerEnforced 

3745 

3746 :example 

3747 

3748 Find buckets with object ownership preferred or enforced 

3749 

3750 .. code-block:: yaml 

3751 

3752 policies: 

3753 - name: s3-bucket-ownership-preferred 

3754 resource: aws.s3 

3755 region: us-east-1 

3756 filters: 

3757 - type: ownership 

3758 op: in 

3759 value: 

3760 - BucketOwnerEnforced 

3761 - BucketOwnerPreferred 

3762 

3763 :example 

3764 

3765 Find buckets with no object ownership controls 

3766 

3767 .. code-block:: yaml 

3768 

3769 policies: 

3770 - name: s3-bucket-no-ownership-controls 

3771 resource: aws.s3 

3772 region: us-east-1 

3773 filters: 

3774 - type: ownership 

3775 value: empty 

3776 """ 

3777 schema = type_schema('ownership', rinherit=ValueFilter.schema, value={'oneOf': [ 

3778 {'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}, 

3779 {'type': 'array', 'items': { 

3780 'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}}]}) 

3781 permissions = ('s3:GetBucketOwnershipControls',) 

3782 annotation_key = 'c7n:ownership' 

3783 

3784 def __init__(self, data, manager=None): 

3785 super(BucketOwnershipControls, self).__init__(data, manager) 

3786 

3787 # Ownership controls appear as an array of rules. There can only be one 

3788 # ObjectOwnership rule defined for a bucket, so we can automatically 

3789 # match against that if it exists. 

3790 self.data['key'] = f'("{self.annotation_key}".Rules[].ObjectOwnership)[0]' 

3791 

3792 def process(self, buckets, event=None): 

3793 with self.executor_factory(max_workers=2) as w: 

3794 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3795 for future in as_completed(futures): 

3796 b = futures[future] 

3797 if future.exception(): 

3798 self.log.error("Message: %s Bucket: %s", future.exception(), 

3799 b['Name']) 

3800 continue 

3801 return super(BucketOwnershipControls, self).process(buckets, event) 

3802 

3803 def process_bucket(self, b): 

3804 if self.annotation_key in b: 

3805 return 

3806 client = bucket_client(local_session(self.manager.session_factory), b) 

3807 try: 

3808 controls = client.get_bucket_ownership_controls(Bucket=b['Name']) 

3809 controls.pop('ResponseMetadata', None) 

3810 except ClientError as e: 

3811 if e.response['Error']['Code'] != 'OwnershipControlsNotFoundError': 

3812 raise 

3813 controls = {} 

3814 b[self.annotation_key] = controls.get('OwnershipControls') 

3815 

3816 

3817@filters.register('bucket-replication') 

3818class BucketReplication(ListItemFilter): 

3819 """Filter for S3 buckets to look at bucket replication configurations 

3820 

3821 The schema to supply to the attrs follows the schema here: 

3822 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_bucket_replication.html 

3823 

3824 :example: 

3825 

3826 .. code-block:: yaml 

3827 

3828 policies: 

3829 - name: s3-bucket-replication 

3830 resource: s3 

3831 filters: 

3832 - type: bucket-replication 

3833 attrs: 

3834 - Status: Enabled 

3835 - Filter: 

3836 And: 

3837 Prefix: test 

3838 Tags: 

3839 - Key: Owner 

3840 Value: c7n 

3841 - ExistingObjectReplication: Enabled 

3842 

3843 """ 

3844 schema = type_schema( 

3845 'bucket-replication', 

3846 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'}, 

3847 count={'type': 'number'}, 

3848 count_op={'$ref': '#/definitions/filters_common/comparison_operators'} 

3849 ) 

3850 

3851 permissions = ("s3:GetReplicationConfiguration",) 

3852 annotation_key = 'Replication' 

3853 annotate_items = True 

3854 

3855 def __init__(self, data, manager=None): 

3856 super().__init__(data, manager) 

3857 self.data['key'] = self.annotation_key 

3858 

3859 def get_item_values(self, b): 

3860 client = bucket_client(local_session(self.manager.session_factory), b) 

3861 # replication configuration is called in S3_AUGMENT_TABLE: 

3862 bucket_replication = b.get(self.annotation_key) 

3863 

3864 rules = [] 

3865 if bucket_replication is not None: 

3866 rules = bucket_replication.get('ReplicationConfiguration', {}).get('Rules', []) 

3867 for replication in rules: 

3868 self.augment_bucket_replication(b, replication, client) 

3869 

3870 return rules 

3871 

3872 def augment_bucket_replication(self, b, replication, client): 

3873 destination_bucket = replication.get('Destination').get('Bucket').split(':')[5] 

3874 try: 

3875 destination_region = inspect_bucket_region(destination_bucket, client.meta.endpoint_url) 

3876 except ValueError: 

3877 replication['DestinationBucketAvailable'] = False 

3878 return 

3879 source_region = get_region(b) 

3880 replication['DestinationBucketAvailable'] = True 

3881 replication['DestinationRegion'] = destination_region 

3882 replication['CrossRegion'] = destination_region != source_region 

3883 

3884 

3885@resources.register('s3-directory') 

3886class S3Directory(query.QueryResourceManager): 

3887 

3888 class resource_type(query.TypeInfo): 

3889 service = 's3' 

3890 permission_prefix = "s3express" 

3891 arn_service = "s3express" 

3892 arn_type = 'bucket' 

3893 enum_spec = ('list_directory_buckets', 'Buckets[]', None) 

3894 name = id = 'Name' 

3895 date = 'CreationDate' 

3896 dimension = 'BucketName' 

3897 cfn_type = 'AWS::S3Express::DirectoryBucket' 

3898 permissions_enum = ("s3express:ListAllMyDirectoryBuckets",)