Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/s3.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1673 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3"""S3 Resource Manager 

4 

5Filters: 

6 

7The generic Values filters (jmespath) expression and Or filter are 

8available with all resources, including buckets, we include several 

9additonal bucket data (Tags, Replication, Acl, Policy) as keys within 

10a bucket representation. 

11 

12Actions: 

13 

14 encrypt-keys 

15 

16 Scan all keys in a bucket and optionally encrypt them in place. 

17 

18 global-grants 

19 

20 Check bucket acls for global grants 

21 

22 encryption-policy 

23 

24 Attach an encryption required policy to a bucket, this will break 

25 applications that are not using encryption, including aws log 

26 delivery. 

27 

28""" 

29import copy 

30import functools 

31import json 

32import itertools 

33import logging 

34import math 

35import os 

36import time 

37import ssl 

38 

39from botocore.client import Config 

40from botocore.exceptions import ClientError 

41 

42from collections import defaultdict 

43from concurrent.futures import as_completed 

44 

45try: 

46 from urllib3.exceptions import SSLError 

47except ImportError: 

48 from botocore.vendored.requests.packages.urllib3.exceptions import SSLError 

49 

50 

51from c7n.actions import ( 

52 ActionRegistry, BaseAction, PutMetric, RemovePolicyBase) 

53from c7n.exceptions import PolicyValidationError, PolicyExecutionError 

54from c7n.filters import ( 

55 FilterRegistry, Filter, CrossAccountAccessFilter, MetricsFilter, 

56 ValueFilter, ListItemFilter) 

57from .aws import shape_validate 

58import c7n.filters.policystatement as polstmt_filter 

59from c7n.manager import resources 

60from c7n.output import NullBlobOutput 

61from c7n import query 

62from c7n.resources.securityhub import PostFinding 

63from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction 

64from c7n.utils import ( 

65 chunks, local_session, set_annotation, type_schema, filter_empty, 

66 dumps, format_string_values, get_account_alias_from_sts) 

67from c7n.resources.aws import inspect_bucket_region 

68 

69 

70log = logging.getLogger('custodian.s3') 

71 

72filters = FilterRegistry('s3.filters') 

73actions = ActionRegistry('s3.actions') 

74filters.register('marked-for-op', TagActionFilter) 

75actions.register('put-metric', PutMetric) 

76 

77MAX_COPY_SIZE = 1024 * 1024 * 1024 * 2 

78 

79 

80class DescribeS3(query.DescribeSource): 

81 

82 def augment(self, buckets): 

83 with self.manager.executor_factory( 

84 max_workers=min((10, len(buckets) + 1))) as w: 

85 results = w.map( 

86 assemble_bucket, 

87 zip(itertools.repeat(self.manager.session_factory), buckets)) 

88 results = list(filter(None, results)) 

89 return results 

90 

91 

92class ConfigS3(query.ConfigSource): 

93 

94 # normalize config's janky idiosyncratic bespoke formating to the 

95 # standard describe api responses. 

96 

97 def get_query_params(self, query): 

98 q = super(ConfigS3, self).get_query_params(query) 

99 if 'expr' in q: 

100 q['expr'] = q['expr'].replace('select ', 'select awsRegion, ') 

101 return q 

102 

103 def load_resource(self, item): 

104 resource = super(ConfigS3, self).load_resource(item) 

105 cfg = item['supplementaryConfiguration'] 

106 # aka standard 

107 if 'awsRegion' in item and item['awsRegion'] != 'us-east-1': 

108 resource['Location'] = {'LocationConstraint': item['awsRegion']} 

109 else: 

110 resource['Location'] = {} 

111 

112 # owner is under acl per describe 

113 resource.pop('Owner', None) 

114 

115 for k, null_value in S3_CONFIG_SUPPLEMENT_NULL_MAP.items(): 

116 if k not in cfg: 

117 continue 

118 if cfg.get(k) == null_value: 

119 continue 

120 method = getattr(self, "handle_%s" % k, None) 

121 if method is None: 

122 raise ValueError("unhandled supplementary config %s", k) 

123 continue 

124 v = cfg[k] 

125 if isinstance(cfg[k], str): 

126 v = json.loads(cfg[k]) 

127 method(resource, v) 

128 

129 for el in S3_AUGMENT_TABLE: 

130 if el[1] not in resource: 

131 resource[el[1]] = el[2] 

132 return resource 

133 

134 PERMISSION_MAP = { 

135 'FullControl': 'FULL_CONTROL', 

136 'Write': 'WRITE', 

137 'WriteAcp': 'WRITE_ACP', 

138 'Read': 'READ', 

139 'ReadAcp': 'READ_ACP'} 

140 

141 GRANTEE_MAP = { 

142 'AllUsers': "http://acs.amazonaws.com/groups/global/AllUsers", 

143 'AuthenticatedUsers': "http://acs.amazonaws.com/groups/global/AuthenticatedUsers", 

144 'LogDelivery': 'http://acs.amazonaws.com/groups/s3/LogDelivery'} 

145 

146 def handle_AccessControlList(self, resource, item_value): 

147 # double serialized in config for some reason 

148 if isinstance(item_value, str): 

149 item_value = json.loads(item_value) 

150 

151 resource['Acl'] = {} 

152 resource['Acl']['Owner'] = {'ID': item_value['owner']['id']} 

153 if item_value['owner']['displayName']: 

154 resource['Acl']['Owner']['DisplayName'] = item_value[ 

155 'owner']['displayName'] 

156 resource['Acl']['Grants'] = grants = [] 

157 

158 for g in (item_value.get('grantList') or ()): 

159 if 'id' not in g['grantee']: 

160 assert g['grantee'] in self.GRANTEE_MAP, "unknown grantee %s" % g 

161 rg = {'Type': 'Group', 'URI': self.GRANTEE_MAP[g['grantee']]} 

162 else: 

163 rg = {'ID': g['grantee']['id'], 'Type': 'CanonicalUser'} 

164 

165 if 'displayName' in g: 

166 rg['DisplayName'] = g['displayName'] 

167 

168 grants.append({ 

169 'Permission': self.PERMISSION_MAP[g['permission']], 

170 'Grantee': rg, 

171 }) 

172 

173 def handle_BucketAccelerateConfiguration(self, resource, item_value): 

174 # not currently auto-augmented by custodian 

175 return 

176 

177 def handle_BucketLoggingConfiguration(self, resource, item_value): 

178 if ('destinationBucketName' not in item_value or 

179 item_value['destinationBucketName'] is None): 

180 resource[u'Logging'] = {} 

181 return 

182 resource[u'Logging'] = { 

183 'TargetBucket': item_value['destinationBucketName'], 

184 'TargetPrefix': item_value['logFilePrefix']} 

185 

186 def handle_BucketLifecycleConfiguration(self, resource, item_value): 

187 rules = [] 

188 for r in item_value.get('rules'): 

189 rr = {} 

190 rules.append(rr) 

191 expiry = {} 

192 for ek, ck in ( 

193 ('Date', 'expirationDate'), 

194 ('ExpiredObjectDeleteMarker', 'expiredObjectDeleteMarker'), 

195 ('Days', 'expirationInDays')): 

196 if ck in r and r[ck] and r[ck] != -1: 

197 expiry[ek] = r[ck] 

198 if expiry: 

199 rr['Expiration'] = expiry 

200 

201 transitions = [] 

202 for t in (r.get('transitions') or ()): 

203 tr = {} 

204 for k in ('date', 'days', 'storageClass'): 

205 if t.get(k): 

206 tr["%s%s" % (k[0].upper(), k[1:])] = t[k] 

207 transitions.append(tr) 

208 if transitions: 

209 rr['Transitions'] = transitions 

210 

211 if r.get('abortIncompleteMultipartUpload'): 

212 rr['AbortIncompleteMultipartUpload'] = { 

213 'DaysAfterInitiation': r[ 

214 'abortIncompleteMultipartUpload']['daysAfterInitiation']} 

215 if r.get('noncurrentVersionExpirationInDays'): 

216 rr['NoncurrentVersionExpiration'] = { 

217 'NoncurrentDays': r['noncurrentVersionExpirationInDays']} 

218 

219 nonc_transitions = [] 

220 for t in (r.get('noncurrentVersionTransitions') or ()): 

221 nonc_transitions.append({ 

222 'NoncurrentDays': t['days'], 

223 'StorageClass': t['storageClass']}) 

224 if nonc_transitions: 

225 rr['NoncurrentVersionTransitions'] = nonc_transitions 

226 

227 rr['Status'] = r['status'] 

228 rr['ID'] = r['id'] 

229 if r.get('prefix'): 

230 rr['Prefix'] = r['prefix'] 

231 if 'filter' not in r or not r['filter']: 

232 continue 

233 

234 if r['filter']['predicate']: 

235 rr['Filter'] = self.convertLifePredicate(r['filter']['predicate']) 

236 

237 resource['Lifecycle'] = {'Rules': rules} 

238 

239 def convertLifePredicate(self, p): 

240 if p['type'] == 'LifecyclePrefixPredicate': 

241 return {'Prefix': p['prefix']} 

242 if p['type'] == 'LifecycleTagPredicate': 

243 return {'Tags': [{'Key': p['tag']['key'], 'Value': p['tag']['value']}]} 

244 if p['type'] == 'LifecycleAndOperator': 

245 n = {} 

246 for o in p['operands']: 

247 ot = self.convertLifePredicate(o) 

248 if 'Tags' in n and 'Tags' in ot: 

249 n['Tags'].extend(ot['Tags']) 

250 else: 

251 n.update(ot) 

252 return {'And': n} 

253 

254 raise ValueError("unknown predicate: %s" % p) 

255 

256 NotifyTypeMap = { 

257 'QueueConfiguration': 'QueueConfigurations', 

258 'LambdaConfiguration': 'LambdaFunctionConfigurations', 

259 'CloudFunctionConfiguration': 'LambdaFunctionConfigurations', 

260 'TopicConfiguration': 'TopicConfigurations'} 

261 

262 def handle_BucketNotificationConfiguration(self, resource, item_value): 

263 d = {} 

264 for nid, n in item_value['configurations'].items(): 

265 ninfo = {} 

266 d.setdefault(self.NotifyTypeMap[n['type']], []).append(ninfo) 

267 if n['type'] == 'QueueConfiguration': 

268 ninfo['QueueArn'] = n['queueARN'] 

269 elif n['type'] == 'TopicConfiguration': 

270 ninfo['TopicArn'] = n['topicARN'] 

271 elif n['type'] == 'LambdaConfiguration': 

272 ninfo['LambdaFunctionArn'] = n['functionARN'] 

273 ninfo['Id'] = nid 

274 ninfo['Events'] = n['events'] 

275 rules = [] 

276 if n['filter']: 

277 for r in n['filter'].get('s3KeyFilter', {}).get('filterRules', []): 

278 rules.append({'Name': r['name'], 'Value': r['value']}) 

279 if rules: 

280 ninfo['Filter'] = {'Key': {'FilterRules': rules}} 

281 resource['Notification'] = d 

282 

283 def handle_BucketReplicationConfiguration(self, resource, item_value): 

284 d = {'Role': item_value['roleARN'], 'Rules': []} 

285 for rid, r in item_value['rules'].items(): 

286 rule = { 

287 'ID': rid, 

288 'Status': r.get('status', ''), 

289 'Prefix': r.get('prefix', ''), 

290 'Destination': { 

291 'Bucket': r['destinationConfig']['bucketARN']} 

292 } 

293 if 'Account' in r['destinationConfig']: 

294 rule['Destination']['Account'] = r['destinationConfig']['Account'] 

295 if r['destinationConfig'].get('storageClass'): 

296 rule['Destination']['StorageClass'] = r['destinationConfig']['storageClass'] 

297 d['Rules'].append(rule) 

298 resource['Replication'] = {'ReplicationConfiguration': d} 

299 

300 def handle_BucketPolicy(self, resource, item_value): 

301 resource['Policy'] = item_value.get('policyText') 

302 

303 def handle_BucketTaggingConfiguration(self, resource, item_value): 

304 resource['Tags'] = [ 

305 {"Key": k, "Value": v} for k, v in item_value['tagSets'][0]['tags'].items()] 

306 

307 def handle_BucketVersioningConfiguration(self, resource, item_value): 

308 # Config defaults versioning to 'Off' for a null value 

309 if item_value['status'] not in ('Enabled', 'Suspended'): 

310 resource['Versioning'] = {} 

311 return 

312 resource['Versioning'] = {'Status': item_value['status']} 

313 # `isMfaDeleteEnabled` is an optional boolean property - the key may be absent, 

314 # present with a null value, or present with a boolean value. 

315 # Mirror the describe source by populating Versioning.MFADelete only in the 

316 # boolean case. 

317 mfa_delete = item_value.get('isMfaDeleteEnabled') 

318 if mfa_delete is None: 

319 return 

320 resource['Versioning']['MFADelete'] = ( 

321 'Enabled' if mfa_delete else 'Disabled' 

322 ) 

323 

324 def handle_BucketWebsiteConfiguration(self, resource, item_value): 

325 website = {} 

326 if item_value['indexDocumentSuffix']: 

327 website['IndexDocument'] = { 

328 'Suffix': item_value['indexDocumentSuffix']} 

329 if item_value['errorDocument']: 

330 website['ErrorDocument'] = { 

331 'Key': item_value['errorDocument']} 

332 if item_value['redirectAllRequestsTo']: 

333 website['RedirectAllRequestsTo'] = { 

334 'HostName': item_value['redirectAllRequestsTo']['hostName'], 

335 'Protocol': item_value['redirectAllRequestsTo']['protocol']} 

336 for r in item_value['routingRules']: 

337 redirect = {} 

338 rule = {'Redirect': redirect} 

339 website.setdefault('RoutingRules', []).append(rule) 

340 if 'condition' in r: 

341 cond = {} 

342 for ck, rk in ( 

343 ('keyPrefixEquals', 'KeyPrefixEquals'), 

344 ('httpErrorCodeReturnedEquals', 

345 'HttpErrorCodeReturnedEquals')): 

346 if r['condition'][ck]: 

347 cond[rk] = r['condition'][ck] 

348 rule['Condition'] = cond 

349 for ck, rk in ( 

350 ('protocol', 'Protocol'), 

351 ('hostName', 'HostName'), 

352 ('replaceKeyPrefixWith', 'ReplaceKeyPrefixWith'), 

353 ('replaceKeyWith', 'ReplaceKeyWith'), 

354 ('httpRedirectCode', 'HttpRedirectCode')): 

355 if r['redirect'][ck]: 

356 redirect[rk] = r['redirect'][ck] 

357 resource['Website'] = website 

358 

359 

360@resources.register('s3') 

361class S3(query.QueryResourceManager): 

362 

363 class resource_type(query.TypeInfo): 

364 service = 's3' 

365 arn_type = '' 

366 enum_spec = ('list_buckets', 'Buckets[]', None) 

367 # not used but we want some consistency on the metadata 

368 detail_spec = ('get_bucket_location', 'Bucket', 'Name', 'LocationConstraint') 

369 permissions_augment = ( 

370 "s3:GetBucketAcl", 

371 "s3:GetBucketLocation", 

372 "s3:GetBucketPolicy", 

373 "s3:GetBucketTagging", 

374 "s3:GetBucketVersioning", 

375 "s3:GetBucketLogging", 

376 "s3:GetBucketNotification", 

377 "s3:GetBucketWebsite", 

378 "s3:GetLifecycleConfiguration", 

379 "s3:GetReplicationConfiguration" 

380 ) 

381 name = id = 'Name' 

382 date = 'CreationDate' 

383 dimension = 'BucketName' 

384 cfn_type = config_type = 'AWS::S3::Bucket' 

385 

386 filter_registry = filters 

387 action_registry = actions 

388 source_mapping = { 

389 'describe': DescribeS3, 

390 'config': ConfigS3 

391 } 

392 

393 def get_arns(self, resources): 

394 return ["arn:aws:s3:::{}".format(r["Name"]) for r in resources] 

395 

396 @classmethod 

397 def get_permissions(cls): 

398 perms = ["s3:ListAllMyBuckets"] 

399 perms.extend([n[-1] for n in S3_AUGMENT_TABLE]) 

400 return perms 

401 

402 

403S3_CONFIG_SUPPLEMENT_NULL_MAP = { 

404 'BucketLoggingConfiguration': u'{"destinationBucketName":null,"logFilePrefix":null}', 

405 'BucketPolicy': u'{"policyText":null}', 

406 'BucketVersioningConfiguration': u'{"status":"Off","isMfaDeleteEnabled":null}', 

407 'BucketAccelerateConfiguration': u'{"status":null}', 

408 'BucketNotificationConfiguration': u'{"configurations":{}}', 

409 'BucketLifecycleConfiguration': None, 

410 'AccessControlList': None, 

411 'BucketTaggingConfiguration': None, 

412 'BucketWebsiteConfiguration': None, 

413 'BucketReplicationConfiguration': None 

414} 

415 

416S3_AUGMENT_TABLE = ( 

417 ('get_bucket_location', 'Location', {}, None, 's3:GetBucketLocation'), 

418 ('get_bucket_tagging', 'Tags', [], 'TagSet', 's3:GetBucketTagging'), 

419 ('get_bucket_policy', 'Policy', None, 'Policy', 's3:GetBucketPolicy'), 

420 ('get_bucket_acl', 'Acl', None, None, 's3:GetBucketAcl'), 

421 ('get_bucket_replication', 

422 'Replication', None, None, 's3:GetReplicationConfiguration'), 

423 ('get_bucket_versioning', 'Versioning', None, None, 's3:GetBucketVersioning'), 

424 ('get_bucket_website', 'Website', None, None, 's3:GetBucketWebsite'), 

425 ('get_bucket_logging', 'Logging', None, 'LoggingEnabled', 's3:GetBucketLogging'), 

426 ('get_bucket_notification_configuration', 

427 'Notification', None, None, 's3:GetBucketNotification'), 

428 ('get_bucket_lifecycle_configuration', 

429 'Lifecycle', None, None, 's3:GetLifecycleConfiguration'), 

430 # ('get_bucket_cors', 'Cors'), 

431) 

432 

433 

434def assemble_bucket(item): 

435 """Assemble a document representing all the config state around a bucket. 

436 

437 TODO: Refactor this, the logic here feels quite muddled. 

438 """ 

439 factory, b = item 

440 s = factory() 

441 c = s.client('s3') 

442 # Bucket Location, Current Client Location, Default Location 

443 b_location = c_location = location = "us-east-1" 

444 methods = list(S3_AUGMENT_TABLE) 

445 for minfo in methods: 

446 m, k, default, select = minfo[:4] 

447 try: 

448 method = getattr(c, m) 

449 v = method(Bucket=b['Name']) 

450 v.pop('ResponseMetadata') 

451 if select is not None and select in v: 

452 v = v[select] 

453 except (ssl.SSLError, SSLError) as e: 

454 # Proxy issues? i assume 

455 log.warning("Bucket ssl error %s: %s %s", 

456 b['Name'], b.get('Location', 'unknown'), 

457 e) 

458 continue 

459 except ClientError as e: 

460 code = e.response['Error']['Code'] 

461 if code.startswith("NoSuch") or "NotFound" in code: 

462 v = default 

463 elif code == 'PermanentRedirect': 

464 s = factory() 

465 c = bucket_client(s, b) 

466 # Requeue with the correct region given location constraint 

467 methods.append((m, k, default, select)) 

468 continue 

469 else: 

470 log.warning( 

471 "Bucket:%s unable to invoke method:%s error:%s ", 

472 b['Name'], m, e.response['Error']['Message']) 

473 # For auth failures, we don't bail out, continue processing if we can. 

474 # Note this can lead to missing data, but in general is cleaner than 

475 # failing hard, due to the common use of locked down s3 bucket policies 

476 # that may cause issues fetching information across a fleet of buckets. 

477 

478 # This does mean s3 policies depending on augments should check denied 

479 # methods annotation, generally though lacking get access to an augment means 

480 # they won't have write access either. 

481 

482 # For other error types we raise and bail policy execution. 

483 if e.response['Error']['Code'] == 'AccessDenied': 

484 b.setdefault('c7n:DeniedMethods', []).append(m) 

485 continue 

486 raise 

487 # As soon as we learn location (which generally works) 

488 if k == 'Location' and v is not None: 

489 b_location = v.get('LocationConstraint') 

490 # Location == region for all cases but EU 

491 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlocation.html 

492 if b_location is None: 

493 b_location = "us-east-1" 

494 elif b_location == 'EU': 

495 b_location = "eu-west-1" 

496 v['LocationConstraint'] = 'eu-west-1' 

497 if v and v != c_location: 

498 c = s.client('s3', region_name=b_location) 

499 elif c_location != location: 

500 c = s.client('s3', region_name=location) 

501 b[k] = v 

502 return b 

503 

504 

505def bucket_client(session, b, kms=False): 

506 region = get_region(b) 

507 

508 if kms: 

509 # Need v4 signature for aws:kms crypto, else let the sdk decide 

510 # based on region support. 

511 config = Config( 

512 signature_version='s3v4', 

513 read_timeout=200, connect_timeout=120) 

514 else: 

515 config = Config(read_timeout=200, connect_timeout=120) 

516 return session.client('s3', region_name=region, config=config) 

517 

518 

519def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()): 

520 for bucket in buckets: 

521 client = bucket_client(local_session(session_factory), bucket) 

522 # Bucket tags are set atomically for the set/document, we want 

523 # to refetch against current to guard against any staleness in 

524 # our cached representation across multiple policies or concurrent 

525 # modifications. 

526 

527 if 'get_bucket_tagging' in bucket.get('c7n:DeniedMethods', []): 

528 # avoid the additional API call if we already know that it's going 

529 # to result in AccessDenied. The chances that the resource's perms 

530 # would have changed between fetching the resource and acting on it 

531 # here are pretty low-- so the check here should suffice. 

532 log.warning( 

533 "Unable to get new set of bucket tags needed to modify tags," 

534 "skipping tag action for bucket: %s" % bucket["Name"]) 

535 continue 

536 

537 try: 

538 bucket['Tags'] = client.get_bucket_tagging( 

539 Bucket=bucket['Name']).get('TagSet', []) 

540 except ClientError as e: 

541 if e.response['Error']['Code'] != 'NoSuchTagSet': 

542 raise 

543 bucket['Tags'] = [] 

544 

545 new_tags = {t['Key']: t['Value'] for t in add_tags} 

546 for t in bucket.get('Tags', ()): 

547 if (t['Key'] not in new_tags and t['Key'] not in remove_tags): 

548 new_tags[t['Key']] = t['Value'] 

549 tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()] 

550 

551 try: 

552 client.put_bucket_tagging( 

553 Bucket=bucket['Name'], Tagging={'TagSet': tag_set}) 

554 except ClientError as e: 

555 log.exception( 

556 'Exception tagging bucket %s: %s', bucket['Name'], e) 

557 continue 

558 

559 

560def get_region(b): 

561 """Tries to get the bucket region from Location.LocationConstraint 

562 

563 Special cases: 

564 LocationConstraint EU defaults to eu-west-1 

565 LocationConstraint null defaults to us-east-1 

566 

567 Args: 

568 b (object): A bucket object 

569 

570 Returns: 

571 string: an aws region string 

572 """ 

573 remap = {None: 'us-east-1', 'EU': 'eu-west-1'} 

574 region = b.get('Location', {}).get('LocationConstraint') 

575 return remap.get(region, region) 

576 

577 

578@filters.register('metrics') 

579class S3Metrics(MetricsFilter): 

580 """S3 CW Metrics need special handling for attribute/dimension 

581 mismatch, and additional required dimension. 

582 """ 

583 

584 def get_dimensions(self, resource): 

585 dims = [{'Name': 'BucketName', 'Value': resource['Name']}] 

586 if (self.data['name'] == 'NumberOfObjects' and 

587 'dimensions' not in self.data): 

588 dims.append( 

589 {'Name': 'StorageType', 'Value': 'AllStorageTypes'}) 

590 return dims 

591 

592 

593@filters.register('cross-account') 

594class S3CrossAccountFilter(CrossAccountAccessFilter): 

595 """Filters cross-account access to S3 buckets 

596 

597 :example: 

598 

599 .. code-block:: yaml 

600 

601 policies: 

602 - name: s3-acl 

603 resource: s3 

604 region: us-east-1 

605 filters: 

606 - type: cross-account 

607 """ 

608 permissions = ('s3:GetBucketPolicy',) 

609 

610 def get_accounts(self): 

611 """add in elb access by default 

612 

613 ELB Accounts by region 

614 https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-access-logs.html 

615 

616 Redshift Accounts by region 

617 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#db-auditing-manage-log-files 

618 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#rs-db-auditing-cloud-trail-rs-acct-ids 

619 

620 Cloudtrail Accounts by region 

621 https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-supported-regions.html 

622 """ 

623 accounts = super(S3CrossAccountFilter, self).get_accounts() 

624 return accounts.union( 

625 [ 

626 # ELB accounts 

627 '127311923021', # us-east-1 

628 '033677994240', # us-east-2 

629 '027434742980', # us-west-1 

630 '797873946194', # us-west-2 

631 '098369216593', # af-south-1 

632 '985666609251', # ca-central-1 

633 '054676820928', # eu-central-1 

634 '897822967062', # eu-north-1 

635 '635631232127', # eu-south-1 

636 '156460612806', # eu-west-1 

637 '652711504416', # eu-west-2 

638 '009996457667', # eu-west-3 

639 '754344448648', # ap-east-1 

640 '582318560864', # ap-northeast-1 

641 '600734575887', # ap-northeast-2 

642 '383597477331', # ap-northeast-3 

643 '114774131450', # ap-southeast-1 

644 '783225319266', # ap-southeast-2 

645 '718504428378', # ap-south-1 

646 '076674570225', # me-south-1 

647 '507241528517', # sa-east-1 

648 '048591011584', # us-gov-west-1 or gov-cloud-1 

649 '190560391635', # us-gov-east-1 

650 '638102146993', # cn-north-1 

651 '037604701340', # cn-northwest-1 

652 

653 # Redshift audit logging 

654 '193672423079', # us-east-1 

655 '391106570357', # us-east-2 

656 '262260360010', # us-west-1 

657 '902366379725', # us-west-2 

658 '365689465814', # af-south-1 

659 '313564881002', # ap-east-1 

660 '865932855811', # ap-south-1 

661 '090321488786', # ap-northeast-3 

662 '760740231472', # ap-northeast-2 

663 '361669875840', # ap-southeast-1 

664 '762762565011', # ap-southeast-2 

665 '404641285394', # ap-northeast-1 

666 '907379612154', # ca-central-1 

667 '053454850223', # eu-central-1 

668 '210876761215', # eu-west-1 

669 '307160386991', # eu-west-2 

670 '945612479654', # eu-south-1 

671 '915173422425', # eu-west-3 

672 '729911121831', # eu-north-1 

673 '013126148197', # me-south-1 

674 '075028567923', # sa-east-1 

675 

676 # Cloudtrail accounts (psa. folks should be using 

677 # cloudtrail service in bucket policies) 

678 '086441151436', # us-east-1 

679 '475085895292', # us-west-2 

680 '388731089494', # us-west-1 

681 '113285607260', # us-west-2 

682 '819402241893', # ca-central-1 

683 '977081816279', # ap-south-1 

684 '492519147666', # ap-northeast-2 

685 '903692715234', # ap-southeast-1 

686 '284668455005', # ap-southeast-2 

687 '216624486486', # ap-northeast-1 

688 '035351147821', # eu-central-1 

689 '859597730677', # eu-west-1 

690 '282025262664', # eu-west-2 

691 '814480443879', # sa-east-1 

692 ]) 

693 

694 

695@filters.register('global-grants') 

696class GlobalGrantsFilter(Filter): 

697 """Filters for all S3 buckets that have global-grants 

698 

699 *Note* by default this filter allows for read access 

700 if the bucket has been configured as a website. This 

701 can be disabled per the example below. 

702 

703 :example: 

704 

705 .. code-block:: yaml 

706 

707 policies: 

708 - name: remove-global-grants 

709 resource: s3 

710 filters: 

711 - type: global-grants 

712 allow_website: false 

713 actions: 

714 - delete-global-grants 

715 

716 """ 

717 

718 schema = type_schema( 

719 'global-grants', 

720 allow_website={'type': 'boolean'}, 

721 operator={'type': 'string', 'enum': ['or', 'and']}, 

722 permissions={ 

723 'type': 'array', 'items': { 

724 'type': 'string', 'enum': [ 

725 'READ', 'WRITE', 'WRITE_ACP', 'READ_ACP', 'FULL_CONTROL']}}) 

726 

727 GLOBAL_ALL = "http://acs.amazonaws.com/groups/global/AllUsers" 

728 AUTH_ALL = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" 

729 

730 def process(self, buckets, event=None): 

731 with self.executor_factory(max_workers=5) as w: 

732 results = w.map(self.process_bucket, buckets) 

733 results = list(filter(None, list(results))) 

734 return results 

735 

736 def process_bucket(self, b): 

737 acl = b.get('Acl', {'Grants': []}) 

738 if not acl or not acl['Grants']: 

739 return 

740 

741 results = [] 

742 allow_website = self.data.get('allow_website', True) 

743 perms = self.data.get('permissions', []) 

744 

745 for grant in acl['Grants']: 

746 if 'URI' not in grant.get("Grantee", {}): 

747 continue 

748 if grant['Grantee']['URI'] not in [self.AUTH_ALL, self.GLOBAL_ALL]: 

749 continue 

750 if allow_website and grant['Permission'] == 'READ' and b['Website']: 

751 continue 

752 if not perms or (perms and grant['Permission'] in perms): 

753 results.append(grant['Permission']) 

754 

755 if results: 

756 set_annotation(b, 'GlobalPermissions', results) 

757 return b 

758 

759 

760class BucketActionBase(BaseAction): 

761 

762 def get_permissions(self): 

763 return self.permissions 

764 

765 def get_std_format_args(self, bucket): 

766 return { 

767 'account_id': self.manager.config.account_id, 

768 'region': self.manager.config.region, 

769 'bucket_name': bucket['Name'], 

770 'bucket_region': get_region(bucket) 

771 } 

772 

773 def process(self, buckets): 

774 return self._process_with_futures(buckets) 

775 

776 def _process_with_futures(self, buckets, *args, max_workers=3, **kwargs): 

777 errors = 0 

778 results = [] 

779 with self.executor_factory(max_workers=max_workers) as w: 

780 futures = {} 

781 for b in buckets: 

782 futures[w.submit(self.process_bucket, b, *args, **kwargs)] = b 

783 for f in as_completed(futures): 

784 if f.exception(): 

785 b = futures[f] 

786 self.log.error( 

787 'error modifying bucket: policy:%s action:%s bucket:%s error:%s', 

788 self.manager.data.get('name'), self.name, b['Name'], f.exception() 

789 ) 

790 errors += 1 

791 continue 

792 results += filter(None, [f.result()]) 

793 if errors: 

794 self.log.error('encountered %d errors while processing %s', errors, self.name) 

795 raise PolicyExecutionError('%d resources failed', errors) 

796 return results 

797 

798 

799class BucketFilterBase(Filter): 

800 def get_std_format_args(self, bucket): 

801 return { 

802 'account_id': self.manager.config.account_id, 

803 'region': self.manager.config.region, 

804 'bucket_name': bucket['Name'], 

805 'bucket_region': get_region(bucket) 

806 } 

807 

808 

809@S3.action_registry.register("post-finding") 

810class BucketFinding(PostFinding): 

811 

812 resource_type = 'AwsS3Bucket' 

813 

814 def format_resource(self, r): 

815 owner = r.get("Acl", {}).get("Owner", {}) 

816 resource = { 

817 "Type": self.resource_type, 

818 "Id": "arn:aws:s3:::{}".format(r["Name"]), 

819 "Region": get_region(r), 

820 "Tags": {t["Key"]: t["Value"] for t in r.get("Tags", [])}, 

821 "Details": {self.resource_type: { 

822 "OwnerId": owner.get('ID', 'Unknown')}} 

823 } 

824 

825 if "DisplayName" in owner: 

826 resource["Details"]["AwsS3Bucket"]["OwnerName"] = owner['DisplayName'] 

827 

828 return filter_empty(resource) 

829 

830 

831@S3.filter_registry.register('has-statement') 

832class HasStatementFilter(polstmt_filter.HasStatementFilter): 

833 def get_std_format_args(self, bucket): 

834 return { 

835 'account_id': self.manager.config.account_id, 

836 'region': self.manager.config.region, 

837 'bucket_name': bucket['Name'], 

838 'bucket_region': get_region(bucket) 

839 } 

840 

841 

842@S3.filter_registry.register('lock-configuration') 

843class S3LockConfigurationFilter(ValueFilter): 

844 """ 

845 Filter S3 buckets based on their object lock configurations 

846 

847 :example: 

848 

849 Get all buckets where lock configuration mode is COMPLIANCE 

850 

851 .. code-block:: yaml 

852 

853 policies: 

854 - name: lock-configuration-compliance 

855 resource: aws.s3 

856 filters: 

857 - type: lock-configuration 

858 key: Rule.DefaultRetention.Mode 

859 value: COMPLIANCE 

860 

861 """ 

862 schema = type_schema('lock-configuration', rinherit=ValueFilter.schema) 

863 permissions = ('s3:GetBucketObjectLockConfiguration',) 

864 annotate = True 

865 annotation_key = 'c7n:ObjectLockConfiguration' 

866 

867 def _process_resource(self, client, resource): 

868 try: 

869 config = client.get_object_lock_configuration( 

870 Bucket=resource['Name'] 

871 )['ObjectLockConfiguration'] 

872 except ClientError as e: 

873 if e.response['Error']['Code'] == 'ObjectLockConfigurationNotFoundError': 

874 config = None 

875 else: 

876 raise 

877 resource[self.annotation_key] = config 

878 

879 def process(self, resources, event=None): 

880 client = local_session(self.manager.session_factory).client('s3') 

881 with self.executor_factory(max_workers=3) as w: 

882 futures = [] 

883 for res in resources: 

884 if self.annotation_key in res: 

885 continue 

886 futures.append(w.submit(self._process_resource, client, res)) 

887 for f in as_completed(futures): 

888 exc = f.exception() 

889 if exc: 

890 self.log.error( 

891 "Exception getting bucket lock configuration \n %s" % ( 

892 exc)) 

893 return super().process(resources, event) 

894 

895 def __call__(self, r): 

896 return super().__call__(r.setdefault(self.annotation_key, None)) 

897 

898 

899ENCRYPTION_STATEMENT_GLOB = { 

900 'Effect': 'Deny', 

901 'Principal': '*', 

902 'Action': 's3:PutObject', 

903 "Condition": { 

904 "StringNotEquals": { 

905 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}} 

906 

907 

908@filters.register('no-encryption-statement') 

909class EncryptionEnabledFilter(Filter): 

910 """Find buckets with missing encryption policy statements. 

911 

912 :example: 

913 

914 .. code-block:: yaml 

915 

916 policies: 

917 - name: s3-bucket-not-encrypted 

918 resource: s3 

919 filters: 

920 - type: no-encryption-statement 

921 """ 

922 schema = type_schema( 

923 'no-encryption-statement') 

924 

925 def get_permissions(self): 

926 perms = self.manager.get_resource_manager('s3').get_permissions() 

927 return perms 

928 

929 def process(self, buckets, event=None): 

930 return list(filter(None, map(self.process_bucket, buckets))) 

931 

932 def process_bucket(self, b): 

933 p = b.get('Policy') 

934 if p is None: 

935 return b 

936 p = json.loads(p) 

937 encryption_statement = dict(ENCRYPTION_STATEMENT_GLOB) 

938 

939 statements = p.get('Statement', []) 

940 check = False 

941 for s in list(statements): 

942 if 'Sid' in s: 

943 encryption_statement["Sid"] = s["Sid"] 

944 if 'Resource' in s: 

945 encryption_statement["Resource"] = s["Resource"] 

946 if s == encryption_statement: 

947 check = True 

948 break 

949 if check: 

950 return None 

951 else: 

952 return b 

953 

954 

955@filters.register('missing-statement') 

956@filters.register('missing-policy-statement') 

957class MissingPolicyStatementFilter(Filter): 

958 """Find buckets missing a set of named policy statements. 

959 

960 :example: 

961 

962 .. code-block:: yaml 

963 

964 policies: 

965 - name: s3-bucket-missing-statement 

966 resource: s3 

967 filters: 

968 - type: missing-statement 

969 statement_ids: 

970 - RequiredEncryptedPutObject 

971 """ 

972 

973 schema = type_schema( 

974 'missing-policy-statement', 

975 aliases=('missing-statement',), 

976 statement_ids={'type': 'array', 'items': {'type': 'string'}}) 

977 

978 def __call__(self, b): 

979 p = b.get('Policy') 

980 if p is None: 

981 return b 

982 

983 p = json.loads(p) 

984 

985 required = list(self.data.get('statement_ids', [])) 

986 statements = p.get('Statement', []) 

987 for s in list(statements): 

988 if s.get('Sid') in required: 

989 required.remove(s['Sid']) 

990 if not required: 

991 return False 

992 return True 

993 

994 

995@filters.register('bucket-notification') 

996class BucketNotificationFilter(ValueFilter): 

997 """Filter based on bucket notification configuration. 

998 

999 :example: 

1000 

1001 .. code-block:: yaml 

1002 

1003 policies: 

1004 - name: delete-incorrect-notification 

1005 resource: s3 

1006 filters: 

1007 - type: bucket-notification 

1008 kind: lambda 

1009 key: Id 

1010 value: "IncorrectLambda" 

1011 op: eq 

1012 actions: 

1013 - type: delete-bucket-notification 

1014 statement_ids: matched 

1015 """ 

1016 

1017 schema = type_schema( 

1018 'bucket-notification', 

1019 required=['kind'], 

1020 kind={'type': 'string', 'enum': ['lambda', 'sns', 'sqs']}, 

1021 rinherit=ValueFilter.schema) 

1022 schema_alias = False 

1023 annotation_key = 'c7n:MatchedNotificationConfigurationIds' 

1024 

1025 permissions = ('s3:GetBucketNotification',) 

1026 

1027 FIELDS = { 

1028 'lambda': 'LambdaFunctionConfigurations', 

1029 'sns': 'TopicConfigurations', 

1030 'sqs': 'QueueConfigurations' 

1031 } 

1032 

1033 def process(self, buckets, event=None): 

1034 return super(BucketNotificationFilter, self).process(buckets, event) 

1035 

1036 def __call__(self, bucket): 

1037 

1038 field = self.FIELDS[self.data['kind']] 

1039 found = False 

1040 for config in bucket.get('Notification', {}).get(field, []): 

1041 if self.match(config): 

1042 set_annotation( 

1043 bucket, 

1044 BucketNotificationFilter.annotation_key, 

1045 config['Id']) 

1046 found = True 

1047 return found 

1048 

1049 

1050@filters.register('bucket-logging') 

1051class BucketLoggingFilter(BucketFilterBase): 

1052 """Filter based on bucket logging configuration. 

1053 

1054 :example: 

1055 

1056 .. code-block:: yaml 

1057 

1058 policies: 

1059 - name: add-bucket-logging-if-missing 

1060 resource: s3 

1061 filters: 

1062 - type: bucket-logging 

1063 op: disabled 

1064 actions: 

1065 - type: toggle-logging 

1066 target_bucket: "{account_id}-{region}-s3-logs" 

1067 target_prefix: "{source_bucket_name}/" 

1068 

1069 policies: 

1070 - name: update-incorrect-or-missing-logging 

1071 resource: s3 

1072 filters: 

1073 - type: bucket-logging 

1074 op: not-equal 

1075 target_bucket: "{account_id}-{region}-s3-logs" 

1076 target_prefix: "{account}/{source_bucket_name}/" 

1077 actions: 

1078 - type: toggle-logging 

1079 target_bucket: "{account_id}-{region}-s3-logs" 

1080 target_prefix: "{account}/{source_bucket_name}/" 

1081 """ 

1082 

1083 schema = type_schema( 

1084 'bucket-logging', 

1085 op={'enum': ['enabled', 'disabled', 'equal', 'not-equal', 'eq', 'ne']}, 

1086 required=['op'], 

1087 target_bucket={'type': 'string'}, 

1088 target_prefix={'type': 'string'}) 

1089 schema_alias = False 

1090 account_name = None 

1091 

1092 permissions = ("s3:GetBucketLogging", "iam:ListAccountAliases") 

1093 

1094 def process(self, buckets, event=None): 

1095 return list(filter(None, map(self.process_bucket, buckets))) 

1096 

1097 def process_bucket(self, b): 

1098 if self.match_bucket(b): 

1099 return b 

1100 

1101 def match_bucket(self, b): 

1102 op = self.data.get('op') 

1103 

1104 logging = b.get('Logging', {}) 

1105 if op == 'disabled': 

1106 return logging == {} 

1107 elif op == 'enabled': 

1108 return logging != {} 

1109 

1110 if self.account_name is None: 

1111 session = local_session(self.manager.session_factory) 

1112 self.account_name = get_account_alias_from_sts(session) 

1113 

1114 variables = self.get_std_format_args(b) 

1115 variables.update({ 

1116 'account': self.account_name, 

1117 'source_bucket_name': b['Name'], 

1118 'source_bucket_region': get_region(b), 

1119 'target_bucket_name': self.data.get('target_bucket'), 

1120 'target_prefix': self.data.get('target_prefix'), 

1121 }) 

1122 data = format_string_values(self.data, **variables) 

1123 target_bucket = data.get('target_bucket') 

1124 target_prefix = data.get('target_prefix', b['Name'] + '/') 

1125 

1126 target_config = { 

1127 "TargetBucket": target_bucket, 

1128 "TargetPrefix": target_prefix 

1129 } if target_bucket else {} 

1130 

1131 if op in ('not-equal', 'ne'): 

1132 return logging != target_config 

1133 else: 

1134 return logging == target_config 

1135 

1136 

1137@actions.register('delete-bucket-notification') 

1138class DeleteBucketNotification(BucketActionBase): 

1139 """Action to delete S3 bucket notification configurations""" 

1140 

1141 schema = type_schema( 

1142 'delete-bucket-notification', 

1143 required=['statement_ids'], 

1144 statement_ids={'oneOf': [ 

1145 {'enum': ['matched']}, 

1146 {'type': 'array', 'items': {'type': 'string'}}]}) 

1147 

1148 permissions = ('s3:PutBucketNotification',) 

1149 

1150 def process_bucket(self, bucket): 

1151 n = bucket['Notification'] 

1152 if not n: 

1153 return 

1154 

1155 statement_ids = self.data.get('statement_ids') 

1156 if statement_ids == 'matched': 

1157 statement_ids = bucket.get(BucketNotificationFilter.annotation_key, ()) 

1158 if not statement_ids: 

1159 return 

1160 

1161 cfg = defaultdict(list) 

1162 

1163 for t in BucketNotificationFilter.FIELDS.values(): 

1164 for c in n.get(t, []): 

1165 if c['Id'] not in statement_ids: 

1166 cfg[t].append(c) 

1167 

1168 client = bucket_client(local_session(self.manager.session_factory), bucket) 

1169 client.put_bucket_notification_configuration( 

1170 Bucket=bucket['Name'], 

1171 NotificationConfiguration=cfg) 

1172 

1173 

1174@actions.register('no-op') 

1175class NoOp(BucketActionBase): 

1176 

1177 schema = type_schema('no-op') 

1178 permissions = ('s3:ListAllMyBuckets',) 

1179 

1180 def process(self, buckets): 

1181 return None 

1182 

1183 

1184@actions.register('set-statements') 

1185class SetPolicyStatement(BucketActionBase): 

1186 """Action to add or update policy statements to S3 buckets 

1187 

1188 :example: 

1189 

1190 .. code-block:: yaml 

1191 

1192 policies: 

1193 - name: force-s3-https 

1194 resource: s3 

1195 actions: 

1196 - type: set-statements 

1197 statements: 

1198 - Sid: "DenyHttp" 

1199 Effect: "Deny" 

1200 Action: "s3:GetObject" 

1201 Principal: 

1202 AWS: "*" 

1203 Resource: "arn:aws:s3:::{bucket_name}/*" 

1204 Condition: 

1205 Bool: 

1206 "aws:SecureTransport": false 

1207 """ 

1208 

1209 permissions = ('s3:PutBucketPolicy',) 

1210 

1211 schema = type_schema( 

1212 'set-statements', 

1213 **{ 

1214 'statements': { 

1215 'type': 'array', 

1216 'items': { 

1217 'type': 'object', 

1218 'properties': { 

1219 'Sid': {'type': 'string'}, 

1220 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']}, 

1221 'Principal': {'anyOf': [{'type': 'string'}, 

1222 {'type': 'object'}, {'type': 'array'}]}, 

1223 'NotPrincipal': {'anyOf': [{'type': 'object'}, {'type': 'array'}]}, 

1224 'Action': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1225 'NotAction': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1226 'Resource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1227 'NotResource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1228 'Condition': {'type': 'object'} 

1229 }, 

1230 'required': ['Sid', 'Effect'], 

1231 'oneOf': [ 

1232 {'required': ['Principal', 'Action', 'Resource']}, 

1233 {'required': ['NotPrincipal', 'Action', 'Resource']}, 

1234 {'required': ['Principal', 'NotAction', 'Resource']}, 

1235 {'required': ['NotPrincipal', 'NotAction', 'Resource']}, 

1236 {'required': ['Principal', 'Action', 'NotResource']}, 

1237 {'required': ['NotPrincipal', 'Action', 'NotResource']}, 

1238 {'required': ['Principal', 'NotAction', 'NotResource']}, 

1239 {'required': ['NotPrincipal', 'NotAction', 'NotResource']} 

1240 ] 

1241 } 

1242 } 

1243 } 

1244 ) 

1245 

1246 def process_bucket(self, bucket): 

1247 policy = bucket.get('Policy') or '{}' 

1248 

1249 target_statements = format_string_values( 

1250 copy.deepcopy({s['Sid']: s for s in self.data.get('statements', [])}), 

1251 **self.get_std_format_args(bucket)) 

1252 

1253 policy = json.loads(policy) 

1254 bucket_statements = policy.setdefault('Statement', []) 

1255 

1256 for s in bucket_statements: 

1257 if s.get('Sid') not in target_statements: 

1258 continue 

1259 if s == target_statements[s['Sid']]: 

1260 target_statements.pop(s['Sid']) 

1261 

1262 if not target_statements: 

1263 return 

1264 

1265 bucket_statements.extend(target_statements.values()) 

1266 policy = json.dumps(policy) 

1267 

1268 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1269 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=policy) 

1270 return {'Name': bucket['Name'], 'Policy': policy} 

1271 

1272 

1273@actions.register('remove-statements') 

1274class RemovePolicyStatement(RemovePolicyBase): 

1275 """Action to remove policy statements from S3 buckets 

1276 

1277 :example: 

1278 

1279 .. code-block:: yaml 

1280 

1281 policies: 

1282 - name: s3-remove-encrypt-put 

1283 resource: s3 

1284 filters: 

1285 - type: has-statement 

1286 statement_ids: 

1287 - RequireEncryptedPutObject 

1288 actions: 

1289 - type: remove-statements 

1290 statement_ids: 

1291 - RequiredEncryptedPutObject 

1292 """ 

1293 

1294 permissions = ("s3:PutBucketPolicy", "s3:DeleteBucketPolicy") 

1295 

1296 def process(self, buckets): 

1297 with self.executor_factory(max_workers=3) as w: 

1298 futures = {} 

1299 results = [] 

1300 for b in buckets: 

1301 futures[w.submit(self.process_bucket, b)] = b 

1302 for f in as_completed(futures): 

1303 if f.exception(): 

1304 b = futures[f] 

1305 self.log.error('error modifying bucket:%s\n%s', 

1306 b['Name'], f.exception()) 

1307 results += filter(None, [f.result()]) 

1308 return results 

1309 

1310 def process_bucket(self, bucket): 

1311 p = bucket.get('Policy') 

1312 if p is None: 

1313 return 

1314 

1315 p = json.loads(p) 

1316 

1317 statements, found = self.process_policy( 

1318 p, bucket, CrossAccountAccessFilter.annotation_key) 

1319 

1320 if not found: 

1321 return 

1322 

1323 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1324 

1325 if not statements: 

1326 s3.delete_bucket_policy(Bucket=bucket['Name']) 

1327 else: 

1328 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=json.dumps(p)) 

1329 return {'Name': bucket['Name'], 'State': 'PolicyRemoved', 'Statements': found} 

1330 

1331 

1332@actions.register('set-replication') 

1333class SetBucketReplicationConfig(BucketActionBase): 

1334 """Action to add or remove replication configuration statement from S3 buckets 

1335 

1336 :example: 

1337 

1338 .. code-block:: yaml 

1339 

1340 policies: 

1341 - name: s3-unapproved-account-replication 

1342 resource: s3 

1343 filters: 

1344 - type: value 

1345 key: Replication.ReplicationConfiguration.Rules[].Destination.Account 

1346 value: present 

1347 - type: value 

1348 key: Replication.ReplicationConfiguration.Rules[].Destination.Account 

1349 value_from: 

1350 url: 's3:///path/to/file.json' 

1351 format: json 

1352 expr: "approved_accounts.*" 

1353 op: ni 

1354 actions: 

1355 - type: set-replication 

1356 state: enable 

1357 """ 

1358 schema = type_schema( 

1359 'set-replication', 

1360 state={'type': 'string', 'enum': ['enable', 'disable', 'remove']}) 

1361 permissions = ("s3:GetReplicationConfiguration", "s3:PutReplicationConfiguration") 

1362 

1363 def process(self, buckets): 

1364 with self.executor_factory(max_workers=3) as w: 

1365 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1366 errors = [] 

1367 for future in as_completed(futures): 

1368 bucket = futures[future] 

1369 try: 

1370 future.result() 

1371 except ClientError as e: 

1372 errors.append("Message: %s Bucket: %s", e, bucket['Name']) 

1373 if errors: 

1374 raise Exception('\n'.join(map(str, errors))) 

1375 

1376 def process_bucket(self, bucket): 

1377 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1378 state = self.data.get('state') 

1379 if state is not None: 

1380 if state == 'remove': 

1381 s3.delete_bucket_replication(Bucket=bucket['Name']) 

1382 return {'Name': bucket['Name'], 'State': 'ReplicationConfigRemoved'} 

1383 if state in ('enable', 'disable'): 

1384 config = s3.get_bucket_replication(Bucket=bucket['Name']) 

1385 for rule in config['ReplicationConfiguration']['Rules']: 

1386 rule['Status'] = 'Enabled' if state == 'enable' else 'Disabled' 

1387 s3.put_bucket_replication( 

1388 Bucket=bucket['Name'], 

1389 ReplicationConfiguration=config['ReplicationConfiguration'] 

1390 ) 

1391 return {'Name': bucket['Name'], 'State': 'ReplicationConfigUpdated'} 

1392 

1393 

1394@filters.register('check-public-block') 

1395class FilterPublicBlock(Filter): 

1396 """Filter for s3 bucket public blocks 

1397 

1398 If no filter paramaters are provided it checks to see if any are unset or False. 

1399 

1400 If parameters are provided only the provided ones are checked. 

1401 

1402 :example: 

1403 

1404 .. code-block:: yaml 

1405 

1406 policies: 

1407 - name: CheckForPublicAclBlock-Off 

1408 resource: s3 

1409 region: us-east-1 

1410 filters: 

1411 - type: check-public-block 

1412 BlockPublicAcls: true 

1413 BlockPublicPolicy: true 

1414 """ 

1415 

1416 schema = type_schema( 

1417 'check-public-block', 

1418 BlockPublicAcls={'type': 'boolean'}, 

1419 IgnorePublicAcls={'type': 'boolean'}, 

1420 BlockPublicPolicy={'type': 'boolean'}, 

1421 RestrictPublicBuckets={'type': 'boolean'}) 

1422 permissions = ("s3:GetBucketPublicAccessBlock",) 

1423 keys = ( 

1424 'BlockPublicPolicy', 'BlockPublicAcls', 'IgnorePublicAcls', 'RestrictPublicBuckets') 

1425 annotation_key = 'c7n:PublicAccessBlock' 

1426 

1427 def process(self, buckets, event=None): 

1428 results = [] 

1429 with self.executor_factory(max_workers=2) as w: 

1430 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1431 for f in as_completed(futures): 

1432 if f.result(): 

1433 results.append(futures[f]) 

1434 return results 

1435 

1436 def process_bucket(self, bucket): 

1437 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1438 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys})) 

1439 if self.annotation_key not in bucket: 

1440 try: 

1441 config = s3.get_public_access_block( 

1442 Bucket=bucket['Name'])['PublicAccessBlockConfiguration'] 

1443 except ClientError as e: 

1444 error_code = e.response['Error']['Code'] 

1445 if error_code == 'NoSuchPublicAccessBlockConfiguration': 

1446 pass 

1447 elif error_code == 'AccessDenied': 

1448 # Follow the same logic as `assemble_bucket` - log and continue on access 

1449 # denied errors rather than halting a policy altogether 

1450 method = 'GetPublicAccessBlock' 

1451 log.warning( 

1452 "Bucket:%s unable to invoke method:%s error:%s ", 

1453 bucket['Name'], method, e.response['Error']['Message'] 

1454 ) 

1455 bucket.setdefault('c7n:DeniedMethods', []).append(method) 

1456 else: 

1457 raise 

1458 bucket[self.annotation_key] = config 

1459 return self.matches_filter(config) 

1460 

1461 def matches_filter(self, config): 

1462 key_set = [key for key in self.keys if key in self.data] 

1463 if key_set: 

1464 return all([self.data.get(key) is config[key] for key in key_set]) 

1465 else: 

1466 return not all(config.values()) 

1467 

1468 

1469@actions.register('set-public-block') 

1470class SetPublicBlock(BucketActionBase): 

1471 """Action to update Public Access blocks on S3 buckets 

1472 

1473 If no action parameters are provided all settings will be set to the `state`, which defaults 

1474 

1475 If action parameters are provided, those will be set and other extant values preserved. 

1476 

1477 :example: 

1478 

1479 .. code-block:: yaml 

1480 

1481 policies: 

1482 - name: s3-public-block-enable-all 

1483 resource: s3 

1484 filters: 

1485 - type: check-public-block 

1486 actions: 

1487 - type: set-public-block 

1488 

1489 policies: 

1490 - name: s3-public-block-disable-all 

1491 resource: s3 

1492 filters: 

1493 - type: check-public-block 

1494 actions: 

1495 - type: set-public-block 

1496 state: false 

1497 

1498 policies: 

1499 - name: s3-public-block-enable-some 

1500 resource: s3 

1501 filters: 

1502 - or: 

1503 - type: check-public-block 

1504 BlockPublicAcls: false 

1505 - type: check-public-block 

1506 BlockPublicPolicy: false 

1507 actions: 

1508 - type: set-public-block 

1509 BlockPublicAcls: true 

1510 BlockPublicPolicy: true 

1511 

1512 """ 

1513 

1514 schema = type_schema( 

1515 'set-public-block', 

1516 state={'type': 'boolean', 'default': True}, 

1517 BlockPublicAcls={'type': 'boolean'}, 

1518 IgnorePublicAcls={'type': 'boolean'}, 

1519 BlockPublicPolicy={'type': 'boolean'}, 

1520 RestrictPublicBuckets={'type': 'boolean'}) 

1521 permissions = ("s3:GetBucketPublicAccessBlock", "s3:PutBucketPublicAccessBlock") 

1522 keys = FilterPublicBlock.keys 

1523 annotation_key = FilterPublicBlock.annotation_key 

1524 

1525 def process(self, buckets): 

1526 with self.executor_factory(max_workers=3) as w: 

1527 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1528 for future in as_completed(futures): 

1529 future.result() 

1530 

1531 def process_bucket(self, bucket): 

1532 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1533 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys})) 

1534 if self.annotation_key not in bucket: 

1535 try: 

1536 config = s3.get_public_access_block( 

1537 Bucket=bucket['Name'])['PublicAccessBlockConfiguration'] 

1538 except ClientError as e: 

1539 if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration': 

1540 raise 

1541 

1542 key_set = [key for key in self.keys if key in self.data] 

1543 if key_set: 

1544 for key in key_set: 

1545 config[key] = self.data.get(key) 

1546 else: 

1547 for key in self.keys: 

1548 config[key] = self.data.get('state', True) 

1549 s3.put_public_access_block( 

1550 Bucket=bucket['Name'], PublicAccessBlockConfiguration=config) 

1551 

1552 

1553@actions.register('toggle-versioning') 

1554class ToggleVersioning(BucketActionBase): 

1555 """Action to enable/suspend versioning on a S3 bucket 

1556 

1557 Note versioning can never be disabled only suspended. 

1558 

1559 :example: 

1560 

1561 .. code-block:: yaml 

1562 

1563 policies: 

1564 - name: s3-enable-versioning 

1565 resource: s3 

1566 filters: 

1567 - or: 

1568 - type: value 

1569 key: Versioning.Status 

1570 value: Suspended 

1571 - type: value 

1572 key: Versioning.Status 

1573 value: absent 

1574 actions: 

1575 - type: toggle-versioning 

1576 enabled: true 

1577 """ 

1578 

1579 schema = type_schema( 

1580 'toggle-versioning', 

1581 enabled={'type': 'boolean'}) 

1582 permissions = ("s3:PutBucketVersioning",) 

1583 

1584 def process_versioning(self, resource, state): 

1585 client = bucket_client( 

1586 local_session(self.manager.session_factory), resource) 

1587 try: 

1588 client.put_bucket_versioning( 

1589 Bucket=resource['Name'], 

1590 VersioningConfiguration={ 

1591 'Status': state}) 

1592 except ClientError as e: 

1593 if e.response['Error']['Code'] != 'AccessDenied': 

1594 log.error( 

1595 "Unable to put bucket versioning on bucket %s: %s" % (resource['Name'], e)) 

1596 raise 

1597 log.warning( 

1598 "Access Denied Bucket:%s while put bucket versioning" % resource['Name']) 

1599 

1600 # mfa delete enablement looks like it needs the serial and a current token. 

1601 def process(self, resources): 

1602 enabled = self.data.get('enabled', True) 

1603 for r in resources: 

1604 if 'Versioning' not in r or not r['Versioning']: 

1605 r['Versioning'] = {'Status': 'Suspended'} 

1606 if enabled and ( 

1607 r['Versioning']['Status'] == 'Suspended'): 

1608 self.process_versioning(r, 'Enabled') 

1609 if not enabled and r['Versioning']['Status'] == 'Enabled': 

1610 self.process_versioning(r, 'Suspended') 

1611 

1612 

1613@actions.register('toggle-logging') 

1614class ToggleLogging(BucketActionBase): 

1615 """Action to enable/disable logging on a S3 bucket. 

1616 

1617 Target bucket ACL must allow for WRITE and READ_ACP Permissions 

1618 Not specifying a target_prefix will default to the current bucket name. 

1619 https://docs.aws.amazon.com/AmazonS3/latest/dev/enable-logging-programming.html 

1620 

1621 :example: 

1622 

1623 .. code-block:: yaml 

1624 

1625 policies: 

1626 - name: s3-enable-logging 

1627 resource: s3 

1628 filters: 

1629 - "tag:Testing": present 

1630 actions: 

1631 - type: toggle-logging 

1632 target_bucket: log-bucket 

1633 target_prefix: logs123/ 

1634 

1635 policies: 

1636 - name: s3-force-standard-logging 

1637 resource: s3 

1638 filters: 

1639 - type: bucket-logging 

1640 op: not-equal 

1641 target_bucket: "{account_id}-{region}-s3-logs" 

1642 target_prefix: "{account}/{source_bucket_name}/" 

1643 actions: 

1644 - type: toggle-logging 

1645 target_bucket: "{account_id}-{region}-s3-logs" 

1646 target_prefix: "{account}/{source_bucket_name}/" 

1647 """ 

1648 schema = type_schema( 

1649 'toggle-logging', 

1650 enabled={'type': 'boolean'}, 

1651 target_bucket={'type': 'string'}, 

1652 target_prefix={'type': 'string'}) 

1653 

1654 permissions = ("s3:PutBucketLogging", "iam:ListAccountAliases") 

1655 

1656 def validate(self): 

1657 if self.data.get('enabled', True): 

1658 if not self.data.get('target_bucket'): 

1659 raise PolicyValidationError( 

1660 "target_bucket must be specified on %s" % ( 

1661 self.manager.data,)) 

1662 return self 

1663 

1664 def process(self, resources): 

1665 session = local_session(self.manager.session_factory) 

1666 kwargs = { 

1667 "enabled": self.data.get('enabled', True), 

1668 "session": session, 

1669 "account_name": get_account_alias_from_sts(session), 

1670 } 

1671 

1672 return self._process_with_futures(resources, **kwargs) 

1673 

1674 def process_bucket(self, r, enabled=None, session=None, account_name=None): 

1675 client = bucket_client(session, r) 

1676 is_logging = bool(r.get('Logging')) 

1677 

1678 if enabled: 

1679 variables = self.get_std_format_args(r) 

1680 variables.update({ 

1681 'account': account_name, 

1682 'source_bucket_name': r['Name'], 

1683 'source_bucket_region': get_region(r), 

1684 'target_bucket_name': self.data.get('target_bucket'), 

1685 'target_prefix': self.data.get('target_prefix'), 

1686 }) 

1687 data = format_string_values(self.data, **variables) 

1688 config = { 

1689 'TargetBucket': data.get('target_bucket'), 

1690 'TargetPrefix': data.get('target_prefix', r['Name'] + '/') 

1691 } 

1692 if not is_logging or r.get('Logging') != config: 

1693 client.put_bucket_logging( 

1694 Bucket=r['Name'], 

1695 BucketLoggingStatus={'LoggingEnabled': config} 

1696 ) 

1697 r['Logging'] = config 

1698 

1699 elif not enabled and is_logging: 

1700 client.put_bucket_logging( 

1701 Bucket=r['Name'], BucketLoggingStatus={}) 

1702 r['Logging'] = {} 

1703 

1704 

1705@actions.register('attach-encrypt') 

1706class AttachLambdaEncrypt(BucketActionBase): 

1707 """Action attaches lambda encryption policy to S3 bucket 

1708 supports attachment via lambda bucket notification or sns notification 

1709 to invoke lambda. a special topic value of `default` will utilize an 

1710 extant notification or create one matching the bucket name. 

1711 

1712 :example: 

1713 

1714 

1715 .. code-block:: yaml 

1716 

1717 

1718 policies: 

1719 - name: attach-lambda-encrypt 

1720 resource: s3 

1721 filters: 

1722 - type: missing-policy-statement 

1723 actions: 

1724 - type: attach-encrypt 

1725 role: arn:aws:iam::123456789012:role/my-role 

1726 

1727 """ 

1728 schema = type_schema( 

1729 'attach-encrypt', 

1730 role={'type': 'string'}, 

1731 tags={'type': 'object'}, 

1732 topic={'type': 'string'}) 

1733 

1734 permissions = ( 

1735 "s3:PutBucketNotification", "s3:GetBucketNotification", 

1736 # lambda manager uses quite a few perms to provision lambdas 

1737 # and event sources, hard to disamgibuate punt for now. 

1738 "lambda:*", 

1739 ) 

1740 

1741 def __init__(self, data=None, manager=None): 

1742 self.data = data or {} 

1743 self.manager = manager 

1744 

1745 def validate(self): 

1746 if (not getattr(self.manager.config, 'dryrun', True) and 

1747 not self.data.get('role', self.manager.config.assume_role)): 

1748 raise PolicyValidationError( 

1749 "attach-encrypt: role must be specified either " 

1750 "via assume or in config on %s" % (self.manager.data,)) 

1751 

1752 return self 

1753 

1754 def process(self, buckets): 

1755 from c7n.mu import LambdaManager 

1756 from c7n.ufuncs.s3crypt import get_function 

1757 

1758 account_id = self.manager.config.account_id 

1759 topic_arn = self.data.get('topic') 

1760 

1761 func = get_function( 

1762 None, self.data.get('role', self.manager.config.assume_role), 

1763 account_id=account_id, tags=self.data.get('tags')) 

1764 

1765 regions = {get_region(b) for b in buckets} 

1766 

1767 # session managers by region 

1768 region_sessions = {} 

1769 for r in regions: 

1770 region_sessions[r] = functools.partial( 

1771 self.manager.session_factory, region=r) 

1772 

1773 # Publish function to all of our buckets regions 

1774 region_funcs = {} 

1775 

1776 for r in regions: 

1777 lambda_mgr = LambdaManager(region_sessions[r]) 

1778 lambda_mgr.publish(func) 

1779 region_funcs[r] = func 

1780 

1781 with self.executor_factory(max_workers=3) as w: 

1782 results = [] 

1783 futures = [] 

1784 for b in buckets: 

1785 region = get_region(b) 

1786 futures.append( 

1787 w.submit( 

1788 self.process_bucket, 

1789 region_funcs[region], 

1790 b, 

1791 topic_arn, 

1792 account_id, 

1793 region_sessions[region] 

1794 )) 

1795 for f in as_completed(futures): 

1796 if f.exception(): 

1797 log.exception( 

1798 "Error attaching lambda-encrypt %s" % (f.exception())) 

1799 results.append(f.result()) 

1800 return list(filter(None, results)) 

1801 

1802 def process_bucket(self, func, bucket, topic, account_id, session_factory): 

1803 from c7n.mu import BucketSNSNotification, BucketLambdaNotification 

1804 if topic: 

1805 topic = None if topic == 'default' else topic 

1806 source = BucketSNSNotification(session_factory, bucket, topic) 

1807 else: 

1808 source = BucketLambdaNotification( 

1809 {'account_s3': account_id}, session_factory, bucket) 

1810 return source.add(func, None) 

1811 

1812 

1813@actions.register('encryption-policy') 

1814class EncryptionRequiredPolicy(BucketActionBase): 

1815 """Action to apply an encryption policy to S3 buckets 

1816 

1817 

1818 :example: 

1819 

1820 .. code-block:: yaml 

1821 

1822 policies: 

1823 - name: s3-enforce-encryption 

1824 resource: s3 

1825 mode: 

1826 type: cloudtrail 

1827 events: 

1828 - CreateBucket 

1829 actions: 

1830 - encryption-policy 

1831 """ 

1832 

1833 permissions = ("s3:GetBucketPolicy", "s3:PutBucketPolicy") 

1834 schema = type_schema('encryption-policy') 

1835 

1836 def __init__(self, data=None, manager=None): 

1837 self.data = data or {} 

1838 self.manager = manager 

1839 

1840 def process(self, buckets): 

1841 with self.executor_factory(max_workers=3) as w: 

1842 results = w.map(self.process_bucket, buckets) 

1843 results = list(filter(None, list(results))) 

1844 return results 

1845 

1846 def process_bucket(self, b): 

1847 p = b['Policy'] 

1848 if p is None: 

1849 log.info("No policy found, creating new") 

1850 p = {'Version': "2012-10-17", "Statement": []} 

1851 else: 

1852 p = json.loads(p) 

1853 

1854 encryption_sid = "RequiredEncryptedPutObject" 

1855 encryption_statement = { 

1856 'Sid': encryption_sid, 

1857 'Effect': 'Deny', 

1858 'Principal': '*', 

1859 'Action': 's3:PutObject', 

1860 "Resource": "arn:aws:s3:::%s/*" % b['Name'], 

1861 "Condition": { 

1862 # AWS Managed Keys or KMS keys, note policy language 

1863 # does not support custom kms (todo add issue) 

1864 "StringNotEquals": { 

1865 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}} 

1866 

1867 statements = p.get('Statement', []) 

1868 for s in list(statements): 

1869 if s.get('Sid', '') == encryption_sid: 

1870 log.debug("Bucket:%s Found extant encrypt policy", b['Name']) 

1871 if s != encryption_statement: 

1872 log.info( 

1873 "Bucket:%s updating extant encrypt policy", b['Name']) 

1874 statements.remove(s) 

1875 else: 

1876 return 

1877 

1878 session = self.manager.session_factory() 

1879 s3 = bucket_client(session, b) 

1880 statements.append(encryption_statement) 

1881 p['Statement'] = statements 

1882 log.info('Bucket:%s attached encryption policy' % b['Name']) 

1883 

1884 try: 

1885 s3.put_bucket_policy( 

1886 Bucket=b['Name'], 

1887 Policy=json.dumps(p)) 

1888 except ClientError as e: 

1889 if e.response['Error']['Code'] == 'NoSuchBucket': 

1890 return 

1891 self.log.exception( 

1892 "Error on bucket:%s putting policy\n%s error:%s", 

1893 b['Name'], 

1894 json.dumps(statements, indent=2), e) 

1895 raise 

1896 return {'Name': b['Name'], 'State': 'PolicyAttached'} 

1897 

1898 

1899class BucketScanLog: 

1900 """Offload remediated key ids to a disk file in batches 

1901 

1902 A bucket keyspace is effectively infinite, we need to store partial 

1903 results out of memory, this class provides for a json log on disk 

1904 with partial write support. 

1905 

1906 json output format: 

1907 - [list_of_serialized_keys], 

1908 - [] # Empty list of keys at end when we close the buffer 

1909 

1910 """ 

1911 

1912 def __init__(self, log_dir, name): 

1913 self.log_dir = log_dir 

1914 self.name = name 

1915 self.fh = None 

1916 self.count = 0 

1917 

1918 @property 

1919 def path(self): 

1920 return os.path.join(self.log_dir, "%s.json" % self.name) 

1921 

1922 def __enter__(self): 

1923 # Don't require output directories 

1924 if self.log_dir is None: 

1925 return 

1926 

1927 self.fh = open(self.path, 'w') 

1928 self.fh.write("[\n") 

1929 return self 

1930 

1931 def __exit__(self, exc_type=None, exc_value=None, exc_frame=None): 

1932 if self.fh is None: 

1933 return 

1934 # we need an empty marker list at end to avoid trailing commas 

1935 self.fh.write("[]") 

1936 # and close the surrounding list 

1937 self.fh.write("\n]") 

1938 self.fh.close() 

1939 if not self.count: 

1940 os.remove(self.fh.name) 

1941 self.fh = None 

1942 return False 

1943 

1944 def add(self, keys): 

1945 self.count += len(keys) 

1946 if self.fh is None: 

1947 return 

1948 self.fh.write(dumps(keys)) 

1949 self.fh.write(",\n") 

1950 

1951 

1952class ScanBucket(BucketActionBase): 

1953 

1954 permissions = ("s3:ListBucket",) 

1955 

1956 bucket_ops = { 

1957 'standard': { 

1958 'iterator': 'list_objects', 

1959 'contents_key': ['Contents'], 

1960 'key_processor': 'process_key' 

1961 }, 

1962 'versioned': { 

1963 'iterator': 'list_object_versions', 

1964 'contents_key': ['Versions'], 

1965 'key_processor': 'process_version' 

1966 } 

1967 } 

1968 

1969 def __init__(self, data, manager=None): 

1970 super(ScanBucket, self).__init__(data, manager) 

1971 self.denied_buckets = set() 

1972 

1973 def get_bucket_style(self, b): 

1974 return ( 

1975 b.get('Versioning', {'Status': ''}).get('Status') in ( 

1976 'Enabled', 'Suspended') and 'versioned' or 'standard') 

1977 

1978 def get_bucket_op(self, b, op_name): 

1979 bucket_style = self.get_bucket_style(b) 

1980 op = self.bucket_ops[bucket_style][op_name] 

1981 if op_name == 'key_processor': 

1982 return getattr(self, op) 

1983 return op 

1984 

1985 def get_keys(self, b, key_set): 

1986 content_keys = self.get_bucket_op(b, 'contents_key') 

1987 keys = [] 

1988 for ck in content_keys: 

1989 keys.extend(key_set.get(ck, [])) 

1990 return keys 

1991 

1992 def process(self, buckets): 

1993 results = self._process_with_futures(self.process_bucket, buckets) 

1994 self.write_denied_buckets_file() 

1995 return results 

1996 

1997 def _process_with_futures(self, helper, buckets, max_workers=3): 

1998 results = [] 

1999 with self.executor_factory(max_workers) as w: 

2000 futures = {} 

2001 for b in buckets: 

2002 futures[w.submit(helper, b)] = b 

2003 for f in as_completed(futures): 

2004 if f.exception(): 

2005 b = futures[f] 

2006 self.log.error( 

2007 "Error on bucket:%s region:%s policy:%s error: %s", 

2008 b['Name'], b.get('Location', 'unknown'), 

2009 self.manager.data.get('name'), f.exception()) 

2010 self.denied_buckets.add(b['Name']) 

2011 continue 

2012 result = f.result() 

2013 if result: 

2014 results.append(result) 

2015 return results 

2016 

2017 def write_denied_buckets_file(self): 

2018 if (self.denied_buckets and 

2019 self.manager.ctx.log_dir and 

2020 not isinstance(self.manager.ctx.output, NullBlobOutput)): 

2021 with open( 

2022 os.path.join( 

2023 self.manager.ctx.log_dir, 'denied.json'), 'w') as fh: 

2024 json.dump(list(self.denied_buckets), fh, indent=2) 

2025 self.denied_buckets = set() 

2026 

2027 def process_bucket(self, b): 

2028 log.info( 

2029 "Scanning bucket:%s visitor:%s style:%s" % ( 

2030 b['Name'], self.__class__.__name__, self.get_bucket_style(b))) 

2031 

2032 s = self.manager.session_factory() 

2033 s3 = bucket_client(s, b) 

2034 

2035 # The bulk of _process_bucket function executes inline in 

2036 # calling thread/worker context, neither paginator nor 

2037 # bucketscan log should be used across worker boundary. 

2038 p = s3.get_paginator( 

2039 self.get_bucket_op(b, 'iterator')).paginate(Bucket=b['Name']) 

2040 

2041 with BucketScanLog(self.manager.ctx.log_dir, b['Name']) as key_log: 

2042 with self.executor_factory(max_workers=10) as w: 

2043 try: 

2044 return self._process_bucket(b, p, key_log, w) 

2045 except ClientError as e: 

2046 if e.response['Error']['Code'] == 'NoSuchBucket': 

2047 log.warning( 

2048 "Bucket:%s removed while scanning" % b['Name']) 

2049 return 

2050 if e.response['Error']['Code'] == 'AccessDenied': 

2051 log.warning( 

2052 "Access Denied Bucket:%s while scanning" % b['Name']) 

2053 self.denied_buckets.add(b['Name']) 

2054 return 

2055 log.exception( 

2056 "Error processing bucket:%s paginator:%s" % ( 

2057 b['Name'], p)) 

2058 

2059 __call__ = process_bucket 

2060 

2061 def _process_bucket(self, b, p, key_log, w): 

2062 count = 0 

2063 

2064 for key_set in p: 

2065 keys = self.get_keys(b, key_set) 

2066 count += len(keys) 

2067 futures = [] 

2068 

2069 for batch in chunks(keys, size=100): 

2070 if not batch: 

2071 continue 

2072 futures.append(w.submit(self.process_chunk, batch, b)) 

2073 

2074 for f in as_completed(futures): 

2075 if f.exception(): 

2076 log.exception("Exception Processing bucket:%s key batch %s" % ( 

2077 b['Name'], f.exception())) 

2078 continue 

2079 r = f.result() 

2080 if r: 

2081 key_log.add(r) 

2082 

2083 # Log completion at info level, progress at debug level 

2084 if key_set['IsTruncated']: 

2085 log.debug('Scan progress bucket:%s keys:%d remediated:%d ...', 

2086 b['Name'], count, key_log.count) 

2087 else: 

2088 log.info('Scan Complete bucket:%s keys:%d remediated:%d', 

2089 b['Name'], count, key_log.count) 

2090 

2091 b['KeyScanCount'] = count 

2092 b['KeyRemediated'] = key_log.count 

2093 return { 

2094 'Bucket': b['Name'], 'Remediated': key_log.count, 'Count': count} 

2095 

2096 def process_chunk(self, batch, bucket): 

2097 raise NotImplementedError() 

2098 

2099 def process_key(self, s3, key, bucket_name, info=None): 

2100 raise NotImplementedError() 

2101 

2102 def process_version(self, s3, bucket, key): 

2103 raise NotImplementedError() 

2104 

2105 

2106@actions.register('encrypt-keys') 

2107class EncryptExtantKeys(ScanBucket): 

2108 """Action to encrypt unencrypted S3 objects 

2109 

2110 :example: 

2111 

2112 .. code-block:: yaml 

2113 

2114 policies: 

2115 - name: s3-encrypt-objects 

2116 resource: s3 

2117 actions: 

2118 - type: encrypt-keys 

2119 crypto: aws:kms 

2120 key-id: 9c3983be-c6cf-11e6-9d9d-cec0c932ce01 

2121 """ 

2122 

2123 permissions = ( 

2124 "s3:GetObject", 

2125 "s3:PutObject", 

2126 "s3:DeleteObjectVersion", 

2127 "s3:RestoreObject", 

2128 ) + ScanBucket.permissions 

2129 

2130 schema = { 

2131 'type': 'object', 

2132 'additionalProperties': False, 

2133 'properties': { 

2134 'type': {'enum': ['encrypt-keys']}, 

2135 'report-only': {'type': 'boolean'}, 

2136 'glacier': {'type': 'boolean'}, 

2137 'large': {'type': 'boolean'}, 

2138 'crypto': {'enum': ['AES256', 'aws:kms']}, 

2139 'key-id': {'type': 'string'} 

2140 }, 

2141 'dependencies': { 

2142 'key-id': { 

2143 'properties': { 

2144 'crypto': {'pattern': 'aws:kms'} 

2145 }, 

2146 'required': ['crypto'] 

2147 } 

2148 } 

2149 } 

2150 

2151 metrics = [ 

2152 ('Total Keys', {'Scope': 'Account'}), 

2153 ('Unencrypted', {'Scope': 'Account'})] 

2154 

2155 def __init__(self, data, manager=None): 

2156 super(EncryptExtantKeys, self).__init__(data, manager) 

2157 self.kms_id = self.data.get('key-id') 

2158 

2159 def get_permissions(self): 

2160 perms = ("s3:GetObject", "s3:GetObjectVersion") 

2161 if self.data.get('report-only'): 

2162 perms += ('s3:DeleteObject', 's3:DeleteObjectVersion', 

2163 's3:PutObject', 

2164 's3:AbortMultipartUpload', 

2165 's3:ListBucket', 

2166 's3:ListBucketVersions') 

2167 return perms 

2168 

2169 def process(self, buckets): 

2170 

2171 t = time.time() 

2172 results = super(EncryptExtantKeys, self).process(buckets) 

2173 run_time = time.time() - t 

2174 remediated_count = object_count = 0 

2175 

2176 for r in results: 

2177 object_count += r['Count'] 

2178 remediated_count += r['Remediated'] 

2179 self.manager.ctx.metrics.put_metric( 

2180 "Unencrypted", r['Remediated'], "Count", Scope=r['Bucket'], 

2181 buffer=True) 

2182 

2183 self.manager.ctx.metrics.put_metric( 

2184 "Unencrypted", remediated_count, "Count", Scope="Account", 

2185 buffer=True 

2186 ) 

2187 self.manager.ctx.metrics.put_metric( 

2188 "Total Keys", object_count, "Count", Scope="Account", 

2189 buffer=True 

2190 ) 

2191 self.manager.ctx.metrics.flush() 

2192 

2193 log.info( 

2194 ("EncryptExtant Complete keys:%d " 

2195 "remediated:%d rate:%0.2f/s time:%0.2fs"), 

2196 object_count, 

2197 remediated_count, 

2198 float(object_count) / run_time if run_time else 0, 

2199 run_time) 

2200 return results 

2201 

2202 def process_chunk(self, batch, bucket): 

2203 crypto_method = self.data.get('crypto', 'AES256') 

2204 s3 = bucket_client( 

2205 local_session(self.manager.session_factory), bucket, 

2206 kms=(crypto_method == 'aws:kms')) 

2207 b = bucket['Name'] 

2208 results = [] 

2209 key_processor = self.get_bucket_op(bucket, 'key_processor') 

2210 for key in batch: 

2211 r = key_processor(s3, key, b) 

2212 if r: 

2213 results.append(r) 

2214 return results 

2215 

2216 def process_key(self, s3, key, bucket_name, info=None): 

2217 k = key['Key'] 

2218 if info is None: 

2219 info = s3.head_object(Bucket=bucket_name, Key=k) 

2220 

2221 # If the data is already encrypted with AES256 and this request is also 

2222 # for AES256 then we don't need to do anything 

2223 if info.get('ServerSideEncryption') == 'AES256' and not self.kms_id: 

2224 return False 

2225 

2226 if info.get('ServerSideEncryption') == 'aws:kms': 

2227 # If we're not looking for a specific key any key will do. 

2228 if not self.kms_id: 

2229 return False 

2230 # If we're configured to use a specific key and the key matches 

2231 # note this is not a strict equality match. 

2232 if self.kms_id in info.get('SSEKMSKeyId', ''): 

2233 return False 

2234 

2235 if self.data.get('report-only'): 

2236 return k 

2237 

2238 storage_class = info.get('StorageClass', 'STANDARD') 

2239 

2240 if storage_class == 'GLACIER': 

2241 if not self.data.get('glacier'): 

2242 return False 

2243 if 'Restore' not in info: 

2244 # This takes multiple hours, we let the next c7n 

2245 # run take care of followups. 

2246 s3.restore_object( 

2247 Bucket=bucket_name, 

2248 Key=k, 

2249 RestoreRequest={'Days': 30}) 

2250 return False 

2251 elif not restore_complete(info['Restore']): 

2252 return False 

2253 

2254 storage_class = 'STANDARD' 

2255 

2256 crypto_method = self.data.get('crypto', 'AES256') 

2257 key_id = self.data.get('key-id') 

2258 # Note on copy we lose individual object acl grants 

2259 params = {'Bucket': bucket_name, 

2260 'Key': k, 

2261 'CopySource': "/%s/%s" % (bucket_name, k), 

2262 'MetadataDirective': 'COPY', 

2263 'StorageClass': storage_class, 

2264 'ServerSideEncryption': crypto_method} 

2265 

2266 if key_id and crypto_method == 'aws:kms': 

2267 params['SSEKMSKeyId'] = key_id 

2268 

2269 if info['ContentLength'] > MAX_COPY_SIZE and self.data.get( 

2270 'large', True): 

2271 return self.process_large_file(s3, bucket_name, key, info, params) 

2272 

2273 s3.copy_object(**params) 

2274 return k 

2275 

2276 def process_version(self, s3, key, bucket_name): 

2277 info = s3.head_object( 

2278 Bucket=bucket_name, 

2279 Key=key['Key'], 

2280 VersionId=key['VersionId']) 

2281 

2282 if 'ServerSideEncryption' in info: 

2283 return False 

2284 

2285 if self.data.get('report-only'): 

2286 return key['Key'], key['VersionId'] 

2287 

2288 if key['IsLatest']: 

2289 r = self.process_key(s3, key, bucket_name, info) 

2290 # Glacier request processing, wait till we have the restored object 

2291 if not r: 

2292 return r 

2293 s3.delete_object( 

2294 Bucket=bucket_name, 

2295 Key=key['Key'], 

2296 VersionId=key['VersionId']) 

2297 return key['Key'], key['VersionId'] 

2298 

2299 def process_large_file(self, s3, bucket_name, key, info, params): 

2300 """For objects over 5gb, use multipart upload to copy""" 

2301 part_size = MAX_COPY_SIZE - (1024 ** 2) 

2302 num_parts = int(math.ceil(info['ContentLength'] / part_size)) 

2303 source = params.pop('CopySource') 

2304 

2305 params.pop('MetadataDirective') 

2306 if 'Metadata' in info: 

2307 params['Metadata'] = info['Metadata'] 

2308 

2309 upload_id = s3.create_multipart_upload(**params)['UploadId'] 

2310 

2311 params = {'Bucket': bucket_name, 

2312 'Key': key['Key'], 

2313 'UploadId': upload_id, 

2314 'CopySource': source, 

2315 'CopySourceIfMatch': info['ETag']} 

2316 

2317 def upload_part(part_num): 

2318 part_params = dict(params) 

2319 part_params['CopySourceRange'] = "bytes=%d-%d" % ( 

2320 part_size * (part_num - 1), 

2321 min(part_size * part_num - 1, info['ContentLength'] - 1)) 

2322 part_params['PartNumber'] = part_num 

2323 response = s3.upload_part_copy(**part_params) 

2324 return {'ETag': response['CopyPartResult']['ETag'], 

2325 'PartNumber': part_num} 

2326 

2327 try: 

2328 with self.executor_factory(max_workers=2) as w: 

2329 parts = list(w.map(upload_part, range(1, num_parts + 1))) 

2330 except Exception: 

2331 log.warning( 

2332 "Error during large key copy bucket: %s key: %s, " 

2333 "aborting upload", bucket_name, key, exc_info=True) 

2334 s3.abort_multipart_upload( 

2335 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id) 

2336 raise 

2337 s3.complete_multipart_upload( 

2338 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id, 

2339 MultipartUpload={'Parts': parts}) 

2340 return key['Key'] 

2341 

2342 

2343def restore_complete(restore): 

2344 if ',' in restore: 

2345 ongoing, _ = restore.split(',', 1) 

2346 else: 

2347 ongoing = restore 

2348 return 'false' in ongoing 

2349 

2350 

2351@filters.register('is-log-target') 

2352class LogTarget(Filter): 

2353 """Filter and return buckets are log destinations. 

2354 

2355 Not suitable for use in lambda on large accounts, This is a api 

2356 heavy process to detect scan all possible log sources. 

2357 

2358 Sources: 

2359 - elb (Access Log) 

2360 - s3 (Access Log) 

2361 - cfn (Template writes) 

2362 - cloudtrail 

2363 

2364 :example: 

2365 

2366 .. code-block:: yaml 

2367 

2368 policies: 

2369 - name: s3-log-bucket 

2370 resource: s3 

2371 filters: 

2372 - type: is-log-target 

2373 """ 

2374 

2375 schema = type_schema( 

2376 'is-log-target', 

2377 services={'type': 'array', 'items': {'enum': [ 

2378 's3', 'elb', 'cloudtrail']}}, 

2379 self={'type': 'boolean'}, 

2380 value={'type': 'boolean'}) 

2381 

2382 def get_permissions(self): 

2383 perms = self.manager.get_resource_manager('elb').get_permissions() 

2384 perms += ('elasticloadbalancing:DescribeLoadBalancerAttributes',) 

2385 return perms 

2386 

2387 def process(self, buckets, event=None): 

2388 log_buckets = set() 

2389 count = 0 

2390 

2391 services = self.data.get('services', ['elb', 's3', 'cloudtrail']) 

2392 self_log = self.data.get('self', False) 

2393 

2394 if 'elb' in services and not self_log: 

2395 for bucket, _ in self.get_elb_bucket_locations(): 

2396 log_buckets.add(bucket) 

2397 count += 1 

2398 self.log.debug("Found %d elb log targets" % count) 

2399 

2400 if 's3' in services: 

2401 count = 0 

2402 for bucket, _ in self.get_s3_bucket_locations(buckets, self_log): 

2403 count += 1 

2404 log_buckets.add(bucket) 

2405 self.log.debug('Found %d s3 log targets' % count) 

2406 

2407 if 'cloudtrail' in services and not self_log: 

2408 for bucket, _ in self.get_cloud_trail_locations(buckets): 

2409 log_buckets.add(bucket) 

2410 

2411 self.log.info("Found %d log targets for %d buckets" % ( 

2412 len(log_buckets), len(buckets))) 

2413 if self.data.get('value', True): 

2414 return [b for b in buckets if b['Name'] in log_buckets] 

2415 else: 

2416 return [b for b in buckets if b['Name'] not in log_buckets] 

2417 

2418 @staticmethod 

2419 def get_s3_bucket_locations(buckets, self_log=False): 

2420 """return (bucket_name, prefix) for all s3 logging targets""" 

2421 for b in buckets: 

2422 if b.get('Logging'): 

2423 if self_log: 

2424 if b['Name'] != b['Logging']['TargetBucket']: 

2425 continue 

2426 yield (b['Logging']['TargetBucket'], 

2427 b['Logging']['TargetPrefix']) 

2428 if not self_log and b['Name'].startswith('cf-templates-'): 

2429 yield (b['Name'], '') 

2430 

2431 def get_cloud_trail_locations(self, buckets): 

2432 session = local_session(self.manager.session_factory) 

2433 client = session.client('cloudtrail') 

2434 names = {b['Name'] for b in buckets} 

2435 for t in client.describe_trails().get('trailList', ()): 

2436 if t.get('S3BucketName') in names: 

2437 yield (t['S3BucketName'], t.get('S3KeyPrefix', '')) 

2438 

2439 def get_elb_bucket_locations(self): 

2440 elbs = self.manager.get_resource_manager('elb').resources() 

2441 get_elb_attrs = functools.partial( 

2442 _query_elb_attrs, self.manager.session_factory) 

2443 

2444 with self.executor_factory(max_workers=2) as w: 

2445 futures = [] 

2446 for elb_set in chunks(elbs, 100): 

2447 futures.append(w.submit(get_elb_attrs, elb_set)) 

2448 for f in as_completed(futures): 

2449 if f.exception(): 

2450 log.error("Error while scanning elb log targets: %s" % ( 

2451 f.exception())) 

2452 continue 

2453 for tgt in f.result(): 

2454 yield tgt 

2455 

2456 

2457def _query_elb_attrs(session_factory, elb_set): 

2458 session = local_session(session_factory) 

2459 client = session.client('elb') 

2460 log_targets = [] 

2461 for e in elb_set: 

2462 try: 

2463 attrs = client.describe_load_balancer_attributes( 

2464 LoadBalancerName=e['LoadBalancerName'])[ 

2465 'LoadBalancerAttributes'] 

2466 if 'AccessLog' in attrs and attrs['AccessLog']['Enabled']: 

2467 log_targets.append(( 

2468 attrs['AccessLog']['S3BucketName'], 

2469 attrs['AccessLog']['S3BucketPrefix'])) 

2470 except Exception as err: 

2471 log.warning( 

2472 "Could not retrieve load balancer %s: %s" % ( 

2473 e['LoadBalancerName'], err)) 

2474 return log_targets 

2475 

2476 

2477@actions.register('remove-website-hosting') 

2478class RemoveWebsiteHosting(BucketActionBase): 

2479 """Action that removes website hosting configuration.""" 

2480 

2481 schema = type_schema('remove-website-hosting') 

2482 

2483 permissions = ('s3:DeleteBucketWebsite',) 

2484 

2485 def process(self, buckets): 

2486 session = local_session(self.manager.session_factory) 

2487 for bucket in buckets: 

2488 client = bucket_client(session, bucket) 

2489 client.delete_bucket_website(Bucket=bucket['Name']) 

2490 

2491 

2492@actions.register('delete-global-grants') 

2493class DeleteGlobalGrants(BucketActionBase): 

2494 """Deletes global grants associated to a S3 bucket 

2495 

2496 :example: 

2497 

2498 .. code-block:: yaml 

2499 

2500 policies: 

2501 - name: s3-delete-global-grants 

2502 resource: s3 

2503 filters: 

2504 - type: global-grants 

2505 actions: 

2506 - delete-global-grants 

2507 """ 

2508 

2509 schema = type_schema( 

2510 'delete-global-grants', 

2511 grantees={'type': 'array', 'items': {'type': 'string'}}) 

2512 

2513 permissions = ('s3:PutBucketAcl',) 

2514 

2515 def process(self, buckets): 

2516 with self.executor_factory(max_workers=5) as w: 

2517 return list(filter(None, list(w.map(self.process_bucket, buckets)))) 

2518 

2519 def process_bucket(self, b): 

2520 grantees = self.data.get( 

2521 'grantees', [ 

2522 GlobalGrantsFilter.AUTH_ALL, GlobalGrantsFilter.GLOBAL_ALL]) 

2523 

2524 log.info(b) 

2525 

2526 acl = b.get('Acl', {'Grants': []}) 

2527 if not acl or not acl['Grants']: 

2528 return 

2529 new_grants = [] 

2530 for grant in acl['Grants']: 

2531 grantee = grant.get('Grantee', {}) 

2532 if not grantee: 

2533 continue 

2534 # Yuck, 'get_bucket_acl' doesn't return the grantee type. 

2535 if 'URI' in grantee: 

2536 grantee['Type'] = 'Group' 

2537 else: 

2538 grantee['Type'] = 'CanonicalUser' 

2539 if ('URI' in grantee and 

2540 grantee['URI'] in grantees and not 

2541 (grant['Permission'] == 'READ' and b['Website'])): 

2542 # Remove this grantee. 

2543 pass 

2544 else: 

2545 new_grants.append(grant) 

2546 

2547 log.info({'Owner': acl['Owner'], 'Grants': new_grants}) 

2548 

2549 c = bucket_client(self.manager.session_factory(), b) 

2550 try: 

2551 c.put_bucket_acl( 

2552 Bucket=b['Name'], 

2553 AccessControlPolicy={ 

2554 'Owner': acl['Owner'], 'Grants': new_grants}) 

2555 except ClientError as e: 

2556 if e.response['Error']['Code'] == 'NoSuchBucket': 

2557 return 

2558 return b 

2559 

2560 

2561@actions.register('tag') 

2562class BucketTag(Tag): 

2563 """Action to create tags on a S3 bucket 

2564 

2565 :example: 

2566 

2567 .. code-block:: yaml 

2568 

2569 policies: 

2570 - name: s3-tag-region 

2571 resource: s3 

2572 region: us-east-1 

2573 filters: 

2574 - "tag:RegionName": absent 

2575 actions: 

2576 - type: tag 

2577 key: RegionName 

2578 value: us-east-1 

2579 """ 

2580 

2581 def process_resource_set(self, client, resource_set, tags): 

2582 modify_bucket_tags(self.manager.session_factory, resource_set, tags) 

2583 

2584 

2585@actions.register('mark-for-op') 

2586class MarkBucketForOp(TagDelayedAction): 

2587 """Action schedules custodian to perform an action at a certain date 

2588 

2589 :example: 

2590 

2591 .. code-block:: yaml 

2592 

2593 policies: 

2594 - name: s3-encrypt 

2595 resource: s3 

2596 filters: 

2597 - type: missing-statement 

2598 statement_ids: 

2599 - RequiredEncryptedPutObject 

2600 actions: 

2601 - type: mark-for-op 

2602 op: attach-encrypt 

2603 days: 7 

2604 """ 

2605 

2606 schema = type_schema( 

2607 'mark-for-op', rinherit=TagDelayedAction.schema) 

2608 

2609 

2610@actions.register('unmark') 

2611@actions.register('remove-tag') 

2612class RemoveBucketTag(RemoveTag): 

2613 """Removes tag/tags from a S3 object 

2614 

2615 :example: 

2616 

2617 .. code-block:: yaml 

2618 

2619 policies: 

2620 - name: s3-remove-owner-tag 

2621 resource: s3 

2622 filters: 

2623 - "tag:BucketOwner": present 

2624 actions: 

2625 - type: remove-tag 

2626 tags: ['BucketOwner'] 

2627 """ 

2628 

2629 def process_resource_set(self, client, resource_set, tags): 

2630 modify_bucket_tags( 

2631 self.manager.session_factory, resource_set, remove_tags=tags) 

2632 

2633 

2634@filters.register('data-events') 

2635class DataEvents(Filter): 

2636 """Find buckets for which CloudTrail is logging data events. 

2637 

2638 Note that this filter only examines trails that are defined in the 

2639 current account. 

2640 """ 

2641 

2642 schema = type_schema('data-events', state={'enum': ['present', 'absent']}) 

2643 permissions = ( 

2644 'cloudtrail:DescribeTrails', 

2645 'cloudtrail:GetEventSelectors') 

2646 

2647 def get_event_buckets(self, session, trails): 

2648 """Return a mapping of bucket name to cloudtrail. 

2649 

2650 For wildcard trails the bucket name is ''. 

2651 """ 

2652 regions = {t.get('HomeRegion') for t in trails} 

2653 clients = {} 

2654 for region in regions: 

2655 clients[region] = session.client('cloudtrail', region_name=region) 

2656 

2657 event_buckets = {} 

2658 for t in trails: 

2659 for events in clients[t.get('HomeRegion')].get_event_selectors( 

2660 TrailName=t['Name']).get('EventSelectors', ()): 

2661 if 'DataResources' not in events: 

2662 continue 

2663 for data_events in events['DataResources']: 

2664 if data_events['Type'] != 'AWS::S3::Object': 

2665 continue 

2666 for b in data_events['Values']: 

2667 event_buckets[b.rsplit(':')[-1].strip('/')] = t['Name'] 

2668 return event_buckets 

2669 

2670 def process(self, resources, event=None): 

2671 trails = self.manager.get_resource_manager('cloudtrail').resources() 

2672 local_trails = self.filter_resources( 

2673 trails, 

2674 "split(':', TrailARN)[4]", (self.manager.account_id,) 

2675 ) 

2676 session = local_session(self.manager.session_factory) 

2677 event_buckets = self.get_event_buckets(session, local_trails) 

2678 ops = { 

2679 'present': lambda x: ( 

2680 x['Name'] in event_buckets or '' in event_buckets), 

2681 'absent': ( 

2682 lambda x: x['Name'] not in event_buckets and '' 

2683 not in event_buckets)} 

2684 

2685 op = ops[self.data.get('state', 'present')] 

2686 results = [] 

2687 for b in resources: 

2688 if op(b): 

2689 results.append(b) 

2690 return results 

2691 

2692 

2693@filters.register('inventory') 

2694class Inventory(ValueFilter): 

2695 """Filter inventories for a bucket""" 

2696 schema = type_schema('inventory', rinherit=ValueFilter.schema) 

2697 schema_alias = False 

2698 permissions = ('s3:GetInventoryConfiguration',) 

2699 

2700 def process(self, buckets, event=None): 

2701 results = [] 

2702 with self.executor_factory(max_workers=2) as w: 

2703 futures = {} 

2704 for b in buckets: 

2705 futures[w.submit(self.process_bucket, b)] = b 

2706 

2707 for f in as_completed(futures): 

2708 b = futures[f] 

2709 if f.exception(): 

2710 b.setdefault('c7n:DeniedMethods', []).append('GetInventoryConfiguration') 

2711 self.log.error( 

2712 "Error processing bucket: %s error: %s", 

2713 b['Name'], f.exception()) 

2714 continue 

2715 if f.result(): 

2716 results.append(b) 

2717 return results 

2718 

2719 def process_bucket(self, b): 

2720 if 'c7n:inventories' not in b: 

2721 client = bucket_client(local_session(self.manager.session_factory), b) 

2722 inventories = client.list_bucket_inventory_configurations( 

2723 Bucket=b['Name']).get('InventoryConfigurationList', []) 

2724 b['c7n:inventories'] = inventories 

2725 

2726 for i in b['c7n:inventories']: 

2727 if self.match(i): 

2728 return True 

2729 

2730 

2731@actions.register('set-inventory') 

2732class SetInventory(BucketActionBase): 

2733 """Configure bucket inventories for an s3 bucket. 

2734 """ 

2735 schema = type_schema( 

2736 'set-inventory', 

2737 required=['name', 'destination'], 

2738 state={'enum': ['enabled', 'disabled', 'absent']}, 

2739 name={'type': 'string', 'description': 'Name of inventory'}, 

2740 destination={'type': 'string', 'description': 'Name of destination bucket'}, 

2741 prefix={'type': 'string', 'description': 'Destination prefix'}, 

2742 encryption={'enum': ['SSES3', 'SSEKMS']}, 

2743 key_id={'type': 'string', 'description': 'Optional Customer KMS KeyId for SSE-KMS'}, 

2744 versions={'enum': ['All', 'Current']}, 

2745 schedule={'enum': ['Daily', 'Weekly']}, 

2746 format={'enum': ['CSV', 'ORC', 'Parquet']}, 

2747 fields={'type': 'array', 'items': {'enum': [ 

2748 'Size', 'LastModifiedDate', 'StorageClass', 'ETag', 

2749 'IsMultipartUploaded', 'ReplicationStatus', 'EncryptionStatus', 

2750 'ObjectLockRetainUntilDate', 'ObjectLockMode', 'ObjectLockLegalHoldStatus', 

2751 'IntelligentTieringAccessTier', 'BucketKeyStatus', 'ChecksumAlgorithm']}}) 

2752 

2753 permissions = ('s3:PutInventoryConfiguration', 's3:GetInventoryConfiguration') 

2754 

2755 def process(self, buckets): 

2756 with self.executor_factory(max_workers=2) as w: 

2757 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

2758 for future in as_completed(futures): 

2759 bucket = futures[future] 

2760 try: 

2761 future.result() 

2762 except Exception as e: 

2763 self.log.error('Message: %s Bucket: %s', e, bucket['Name']) 

2764 

2765 def process_bucket(self, b): 

2766 inventory_name = self.data.get('name') 

2767 destination = self.data.get('destination') 

2768 prefix = self.data.get('prefix', '') 

2769 schedule = self.data.get('schedule', 'Daily') 

2770 fields = self.data.get('fields', ['LastModifiedDate', 'Size']) 

2771 versions = self.data.get('versions', 'Current') 

2772 state = self.data.get('state', 'enabled') 

2773 encryption = self.data.get('encryption') 

2774 inventory_format = self.data.get('format', 'CSV') 

2775 

2776 if not prefix: 

2777 prefix = "Inventories/%s" % (self.manager.config.account_id) 

2778 

2779 client = bucket_client(local_session(self.manager.session_factory), b) 

2780 if state == 'absent': 

2781 try: 

2782 client.delete_bucket_inventory_configuration( 

2783 Bucket=b['Name'], Id=inventory_name) 

2784 except ClientError as e: 

2785 if e.response['Error']['Code'] != 'NoSuchConfiguration': 

2786 raise 

2787 return 

2788 

2789 bucket = { 

2790 'Bucket': "arn:aws:s3:::%s" % destination, 

2791 'Format': inventory_format 

2792 } 

2793 

2794 inventory = { 

2795 'Destination': { 

2796 'S3BucketDestination': bucket 

2797 }, 

2798 'IsEnabled': state == 'enabled' and True or False, 

2799 'Id': inventory_name, 

2800 'OptionalFields': fields, 

2801 'IncludedObjectVersions': versions, 

2802 'Schedule': { 

2803 'Frequency': schedule 

2804 } 

2805 } 

2806 

2807 if prefix: 

2808 bucket['Prefix'] = prefix 

2809 

2810 if encryption: 

2811 bucket['Encryption'] = {encryption: {}} 

2812 if encryption == 'SSEKMS' and self.data.get('key_id'): 

2813 bucket['Encryption'] = {encryption: { 

2814 'KeyId': self.data['key_id'] 

2815 }} 

2816 

2817 found = self.get_inventory_delta(client, inventory, b) 

2818 if found: 

2819 return 

2820 if found is False: 

2821 self.log.debug("updating bucket:%s inventory configuration id:%s", 

2822 b['Name'], inventory_name) 

2823 client.put_bucket_inventory_configuration( 

2824 Bucket=b['Name'], Id=inventory_name, InventoryConfiguration=inventory) 

2825 

2826 def get_inventory_delta(self, client, inventory, b): 

2827 inventories = client.list_bucket_inventory_configurations(Bucket=b['Name']) 

2828 found = None 

2829 for i in inventories.get('InventoryConfigurationList', []): 

2830 if i['Id'] != inventory['Id']: 

2831 continue 

2832 found = True 

2833 for k, v in inventory.items(): 

2834 if k not in i: 

2835 found = False 

2836 continue 

2837 if isinstance(v, list): 

2838 v.sort() 

2839 i[k].sort() 

2840 if i[k] != v: 

2841 found = False 

2842 return found 

2843 

2844 

2845@filters.register('intelligent-tiering') 

2846class IntelligentTiering(ListItemFilter): 

2847 """Filter for S3 buckets to look at intelligent tiering configurations 

2848 

2849 The schema to supply to the attrs follows the schema here: 

2850 https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_bucket_intelligent_tiering_configurations.html 

2851 

2852 :example: 

2853 

2854 .. code-block:: yaml 

2855 

2856 policies: 

2857 - name: s3-intelligent-tiering-configuration 

2858 resource: s3 

2859 filters: 

2860 - type: intelligent-tiering 

2861 attrs: 

2862 - Status: Enabled 

2863 - Filter: 

2864 And: 

2865 Prefix: test 

2866 Tags: 

2867 - Key: Owner 

2868 Value: c7n 

2869 - Tierings: 

2870 - Days: 100 

2871 - AccessTier: ARCHIVE_ACCESS 

2872 

2873 """ 

2874 schema = type_schema( 

2875 'intelligent-tiering', 

2876 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'}, 

2877 count={'type': 'number'}, 

2878 count_op={'$ref': '#/definitions/filters_common/comparison_operators'} 

2879 ) 

2880 permissions = ('s3:GetIntelligentTieringConfiguration',) 

2881 annotation_key = "c7n:IntelligentTiering" 

2882 annotate_items = True 

2883 

2884 def __init__(self, data, manager=None): 

2885 super().__init__(data, manager) 

2886 self.data['key'] = self.annotation_key 

2887 

2888 def process(self, buckets, event=None): 

2889 with self.executor_factory(max_workers=2) as w: 

2890 futures = {w.submit(self.get_item_values, b): b for b in buckets} 

2891 for future in as_completed(futures): 

2892 b = futures[future] 

2893 if future.exception(): 

2894 self.log.error("Message: %s Bucket: %s", future.exception(), b['Name']) 

2895 continue 

2896 return super().process(buckets, event) 

2897 

2898 def get_item_values(self, b): 

2899 if self.annotation_key not in b: 

2900 client = bucket_client(local_session(self.manager.session_factory), b) 

2901 try: 

2902 int_tier_config = client.list_bucket_intelligent_tiering_configurations( 

2903 Bucket=b['Name']) 

2904 b[self.annotation_key] = int_tier_config.get( 

2905 'IntelligentTieringConfigurationList', []) 

2906 except ClientError as e: 

2907 if e.response['Error']['Code'] == 'AccessDenied': 

2908 method = 'list_bucket_intelligent_tiering_configurations' 

2909 log.warning( 

2910 "Bucket:%s unable to invoke method:%s error:%s ", 

2911 b['Name'], method, e.response['Error']['Message']) 

2912 b.setdefault('c7n:DeniedMethods', []).append(method) 

2913 return b.get(self.annotation_key) 

2914 

2915 

2916@actions.register('set-intelligent-tiering') 

2917class ConfigureIntelligentTiering(BucketActionBase): 

2918 """Action applies an intelligent tiering configuration to a S3 bucket 

2919 

2920 The schema to supply to the configuration follows the schema here: 

2921 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_bucket_intelligent_tiering_configuration.html 

2922 

2923 To delete a configuration, supply Status=delete with the either the Id or Id: matched 

2924 

2925 :example: 

2926 

2927 .. code-block:: yaml 

2928 

2929 policies: 

2930 - name: s3-apply-intelligent-tiering-config 

2931 resource: aws.s3 

2932 filters: 

2933 - not: 

2934 - type: intelligent-tiering 

2935 attrs: 

2936 - Status: Enabled 

2937 - Filter: 

2938 And: 

2939 Prefix: helloworld 

2940 Tags: 

2941 - Key: Hello 

2942 Value: World 

2943 - Tierings: 

2944 - Days: 123 

2945 AccessTier: ARCHIVE_ACCESS 

2946 actions: 

2947 - type: set-intelligent-tiering 

2948 Id: c7n-default 

2949 IntelligentTieringConfiguration: 

2950 Id: c7n-default 

2951 Status: Enabled 

2952 Tierings: 

2953 - Days: 149 

2954 AccessTier: ARCHIVE_ACCESS 

2955 

2956 - name: s3-delete-intelligent-tiering-configuration 

2957 resource: aws.s3 

2958 filters: 

2959 - type: intelligent-tiering 

2960 attrs: 

2961 - Status: Enabled 

2962 - Id: test-config 

2963 actions: 

2964 - type: set-intelligent-tiering 

2965 Id: test-config 

2966 State: delete 

2967 

2968 - name: s3-delete-intelligent-tiering-matched-configs 

2969 resource: aws.s3 

2970 filters: 

2971 - type: intelligent-tiering 

2972 attrs: 

2973 - Status: Enabled 

2974 - Id: test-config 

2975 actions: 

2976 - type: set-intelligent-tiering 

2977 Id: matched 

2978 State: delete 

2979 

2980 """ 

2981 

2982 annotation_key = 'c7n:ListItemMatches' 

2983 shape = 'PutBucketIntelligentTieringConfigurationRequest' 

2984 schema = { 

2985 'type': 'object', 

2986 'additionalProperties': False, 

2987 'oneOf': [ 

2988 {'required': ['type', 'Id', 'IntelligentTieringConfiguration']}, 

2989 {'required': ['type', 'Id', 'State']}], 

2990 'properties': { 

2991 'type': {'enum': ['set-intelligent-tiering']}, 

2992 'Id': {'type': 'string'}, 

2993 # delete intelligent tier configurations via state: delete 

2994 'State': {'type': 'string', 'enum': ['delete']}, 

2995 'IntelligentTieringConfiguration': {'type': 'object'} 

2996 }, 

2997 } 

2998 

2999 permissions = ('s3:PutIntelligentTieringConfiguration',) 

3000 

3001 def validate(self): 

3002 # You can have up to 1,000 S3 Intelligent-Tiering configurations per bucket. 

3003 # Hence, always use it with a filter 

3004 found = False 

3005 for f in self.manager.iter_filters(): 

3006 if isinstance(f, IntelligentTiering): 

3007 found = True 

3008 break 

3009 if not found: 

3010 raise PolicyValidationError( 

3011 '`set-intelligent-tiering` may only be used in ' 

3012 'conjunction with `intelligent-tiering` filter on %s' % (self.manager.data,)) 

3013 cfg = dict(self.data) 

3014 if 'IntelligentTieringConfiguration' in cfg: 

3015 cfg['Bucket'] = 'bucket' 

3016 cfg.pop('type') 

3017 return shape_validate( 

3018 cfg, self.shape, self.manager.resource_type.service) 

3019 

3020 def process(self, buckets): 

3021 with self.executor_factory(max_workers=3) as w: 

3022 futures = {} 

3023 

3024 for b in buckets: 

3025 futures[w.submit(self.process_bucket, b)] = b 

3026 

3027 for future in as_completed(futures): 

3028 if future.exception(): 

3029 bucket = futures[future] 

3030 self.log.error( 

3031 'error modifying bucket intelligent tiering configuration: %s\n%s', 

3032 bucket['Name'], future.exception()) 

3033 continue 

3034 

3035 def process_bucket(self, bucket): 

3036 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3037 

3038 if 'list_bucket_intelligent_tiering_configurations' in bucket.get( 

3039 'c7n:DeniedMethods', []): 

3040 log.warning("Access Denied Bucket:%s while reading intelligent tiering configurations" 

3041 % bucket['Name']) 

3042 return 

3043 

3044 if self.data.get('Id') and self.data.get('IntelligentTieringConfiguration'): 

3045 try: 

3046 s3.put_bucket_intelligent_tiering_configuration( 

3047 Bucket=bucket['Name'], Id=self.data.get( 

3048 'Id'), IntelligentTieringConfiguration=self.data.get( 

3049 'IntelligentTieringConfiguration')) 

3050 except ClientError as e: 

3051 if e.response['Error']['Code'] == 'AccessDenied': 

3052 log.warning( 

3053 "Access Denied Bucket:%s while applying intelligent tiering configuration" 

3054 % bucket['Name']) 

3055 if self.data.get('State'): 

3056 if self.data.get('Id') == 'matched': 

3057 for config in bucket.get(self.annotation_key): 

3058 self.delete_intelligent_tiering_configurations(s3, config.get('Id'), bucket) 

3059 else: 

3060 self.delete_intelligent_tiering_configurations(s3, self.data.get('Id'), bucket) 

3061 

3062 def delete_intelligent_tiering_configurations(self, s3_client, id, bucket): 

3063 try: 

3064 s3_client.delete_bucket_intelligent_tiering_configuration(Bucket=bucket['Name'], Id=id) 

3065 except ClientError as e: 

3066 if e.response['Error']['Code'] == 'AccessDenied': 

3067 log.warning( 

3068 "Access Denied Bucket:%s while deleting intelligent tiering configuration" 

3069 % bucket['Name']) 

3070 elif e.response['Error']['Code'] == 'NoSuchConfiguration': 

3071 log.warning( 

3072 "No such configuration found:%s while deleting intelligent tiering configuration" 

3073 % bucket['Name']) 

3074 

3075 

3076@actions.register('delete') 

3077class DeleteBucket(ScanBucket): 

3078 """Action deletes a S3 bucket 

3079 

3080 :example: 

3081 

3082 .. code-block:: yaml 

3083 

3084 policies: 

3085 - name: delete-unencrypted-buckets 

3086 resource: s3 

3087 filters: 

3088 - type: missing-statement 

3089 statement_ids: 

3090 - RequiredEncryptedPutObject 

3091 actions: 

3092 - type: delete 

3093 remove-contents: true 

3094 """ 

3095 

3096 schema = type_schema('delete', **{'remove-contents': {'type': 'boolean'}}) 

3097 

3098 permissions = ('s3:*',) 

3099 

3100 bucket_ops = { 

3101 'standard': { 

3102 'iterator': 'list_objects', 

3103 'contents_key': ['Contents'], 

3104 'key_processor': 'process_key' 

3105 }, 

3106 'versioned': { 

3107 'iterator': 'list_object_versions', 

3108 'contents_key': ['Versions', 'DeleteMarkers'], 

3109 'key_processor': 'process_version' 

3110 } 

3111 } 

3112 

3113 def process_delete_enablement(self, b): 

3114 """Prep a bucket for deletion. 

3115 

3116 Clear out any pending multi-part uploads. 

3117 

3118 Disable versioning on the bucket, so deletes don't 

3119 generate fresh deletion markers. 

3120 """ 

3121 client = bucket_client( 

3122 local_session(self.manager.session_factory), b) 

3123 

3124 # Stop replication so we can suspend versioning 

3125 if b.get('Replication') is not None: 

3126 client.delete_bucket_replication(Bucket=b['Name']) 

3127 

3128 # Suspend versioning, so we don't get new delete markers 

3129 # as we walk and delete versions 

3130 if (self.get_bucket_style(b) == 'versioned' and b['Versioning']['Status'] == 'Enabled' and 

3131 self.data.get('remove-contents', True)): 

3132 client.put_bucket_versioning( 

3133 Bucket=b['Name'], 

3134 VersioningConfiguration={'Status': 'Suspended'}) 

3135 

3136 # Clear our multi-part uploads 

3137 uploads = client.get_paginator('list_multipart_uploads') 

3138 for p in uploads.paginate(Bucket=b['Name']): 

3139 for u in p.get('Uploads', ()): 

3140 client.abort_multipart_upload( 

3141 Bucket=b['Name'], 

3142 Key=u['Key'], 

3143 UploadId=u['UploadId']) 

3144 

3145 def process(self, buckets): 

3146 # might be worth sanity checking all our permissions 

3147 # on the bucket up front before disabling versioning/replication. 

3148 if self.data.get('remove-contents', True): 

3149 self._process_with_futures(self.process_delete_enablement, buckets) 

3150 self.empty_buckets(buckets) 

3151 

3152 results = self._process_with_futures(self.delete_bucket, buckets) 

3153 self.write_denied_buckets_file() 

3154 return results 

3155 

3156 def delete_bucket(self, b): 

3157 s3 = bucket_client(self.manager.session_factory(), b) 

3158 try: 

3159 self._run_api(s3.delete_bucket, Bucket=b['Name']) 

3160 except ClientError as e: 

3161 if e.response['Error']['Code'] == 'BucketNotEmpty': 

3162 self.log.error( 

3163 "Error while deleting bucket %s, bucket not empty" % ( 

3164 b['Name'])) 

3165 else: 

3166 raise e 

3167 

3168 def empty_buckets(self, buckets): 

3169 t = time.time() 

3170 results = super(DeleteBucket, self).process(buckets) 

3171 run_time = time.time() - t 

3172 object_count = 0 

3173 

3174 for r in results: 

3175 object_count += r['Count'] 

3176 self.manager.ctx.metrics.put_metric( 

3177 "Total Keys", object_count, "Count", Scope=r['Bucket'], 

3178 buffer=True) 

3179 self.manager.ctx.metrics.put_metric( 

3180 "Total Keys", object_count, "Count", Scope="Account", buffer=True) 

3181 self.manager.ctx.metrics.flush() 

3182 

3183 log.info( 

3184 "EmptyBucket buckets:%d Complete keys:%d rate:%0.2f/s time:%0.2fs", 

3185 len(buckets), object_count, 

3186 float(object_count) / run_time if run_time else 0, run_time) 

3187 return results 

3188 

3189 def process_chunk(self, batch, bucket): 

3190 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3191 objects = [] 

3192 for key in batch: 

3193 obj = {'Key': key['Key']} 

3194 if 'VersionId' in key: 

3195 obj['VersionId'] = key['VersionId'] 

3196 objects.append(obj) 

3197 results = s3.delete_objects( 

3198 Bucket=bucket['Name'], Delete={'Objects': objects}).get('Deleted', ()) 

3199 if self.get_bucket_style(bucket) != 'versioned': 

3200 return results 

3201 

3202 

3203@actions.register('configure-lifecycle') 

3204class Lifecycle(BucketActionBase): 

3205 """Action applies a lifecycle policy to versioned S3 buckets 

3206 

3207 The schema to supply to the rule follows the schema here: 

3208 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_bucket_lifecycle_configuration 

3209 

3210 To delete a lifecycle rule, supply Status=absent 

3211 

3212 :example: 

3213 

3214 .. code-block:: yaml 

3215 

3216 policies: 

3217 - name: s3-apply-lifecycle 

3218 resource: s3 

3219 actions: 

3220 - type: configure-lifecycle 

3221 rules: 

3222 - ID: my-lifecycle-id 

3223 Status: Enabled 

3224 Prefix: foo/ 

3225 Transitions: 

3226 - Days: 60 

3227 StorageClass: GLACIER 

3228 

3229 """ 

3230 

3231 schema = type_schema( 

3232 'configure-lifecycle', 

3233 **{ 

3234 'rules': { 

3235 'type': 'array', 

3236 'items': { 

3237 'type': 'object', 

3238 'required': ['ID', 'Status'], 

3239 'additionalProperties': False, 

3240 'properties': { 

3241 'ID': {'type': 'string'}, 

3242 # c7n intercepts `absent` 

3243 'Status': {'enum': ['Enabled', 'Disabled', 'absent']}, 

3244 'Prefix': {'type': 'string'}, 

3245 'Expiration': { 

3246 'type': 'object', 

3247 'additionalProperties': False, 

3248 'properties': { 

3249 'Date': {'type': 'string'}, # Date 

3250 'Days': {'type': 'integer'}, 

3251 'ExpiredObjectDeleteMarker': {'type': 'boolean'}, 

3252 }, 

3253 }, 

3254 'Filter': { 

3255 'type': 'object', 

3256 'minProperties': 1, 

3257 'maxProperties': 1, 

3258 'additionalProperties': False, 

3259 'properties': { 

3260 'Prefix': {'type': 'string'}, 

3261 'ObjectSizeGreaterThan': {'type': 'integer'}, 

3262 'ObjectSizeLessThan': {'type': 'integer'}, 

3263 'Tag': { 

3264 'type': 'object', 

3265 'required': ['Key', 'Value'], 

3266 'additionalProperties': False, 

3267 'properties': { 

3268 'Key': {'type': 'string'}, 

3269 'Value': {'type': 'string'}, 

3270 }, 

3271 }, 

3272 'And': { 

3273 'type': 'object', 

3274 'additionalProperties': False, 

3275 'properties': { 

3276 'Prefix': {'type': 'string'}, 

3277 'ObjectSizeGreaterThan': {'type': 'integer'}, 

3278 'ObjectSizeLessThan': {'type': 'integer'}, 

3279 'Tags': { 

3280 'type': 'array', 

3281 'items': { 

3282 'type': 'object', 

3283 'required': ['Key', 'Value'], 

3284 'additionalProperties': False, 

3285 'properties': { 

3286 'Key': {'type': 'string'}, 

3287 'Value': {'type': 'string'}, 

3288 }, 

3289 }, 

3290 }, 

3291 }, 

3292 }, 

3293 }, 

3294 }, 

3295 'Transitions': { 

3296 'type': 'array', 

3297 'items': { 

3298 'type': 'object', 

3299 'additionalProperties': False, 

3300 'properties': { 

3301 'Date': {'type': 'string'}, # Date 

3302 'Days': {'type': 'integer'}, 

3303 'StorageClass': {'type': 'string'}, 

3304 }, 

3305 }, 

3306 }, 

3307 'NoncurrentVersionTransitions': { 

3308 'type': 'array', 

3309 'items': { 

3310 'type': 'object', 

3311 'additionalProperties': False, 

3312 'properties': { 

3313 'NoncurrentDays': {'type': 'integer'}, 

3314 'NewerNoncurrentVersions': {'type': 'integer'}, 

3315 'StorageClass': {'type': 'string'}, 

3316 }, 

3317 }, 

3318 }, 

3319 'NoncurrentVersionExpiration': { 

3320 'type': 'object', 

3321 'additionalProperties': False, 

3322 'properties': { 

3323 'NoncurrentDays': {'type': 'integer'}, 

3324 'NewerNoncurrentVersions': {'type': 'integer'} 

3325 }, 

3326 }, 

3327 'AbortIncompleteMultipartUpload': { 

3328 'type': 'object', 

3329 'additionalProperties': False, 

3330 'properties': { 

3331 'DaysAfterInitiation': {'type': 'integer'}, 

3332 }, 

3333 }, 

3334 }, 

3335 }, 

3336 }, 

3337 } 

3338 ) 

3339 

3340 permissions = ('s3:GetLifecycleConfiguration', 's3:PutLifecycleConfiguration') 

3341 

3342 def process(self, buckets): 

3343 with self.executor_factory(max_workers=3) as w: 

3344 futures = {} 

3345 results = [] 

3346 

3347 for b in buckets: 

3348 futures[w.submit(self.process_bucket, b)] = b 

3349 

3350 for future in as_completed(futures): 

3351 if future.exception(): 

3352 bucket = futures[future] 

3353 self.log.error('error modifying bucket lifecycle: %s\n%s', 

3354 bucket['Name'], future.exception()) 

3355 results += filter(None, [future.result()]) 

3356 

3357 return results 

3358 

3359 def process_bucket(self, bucket): 

3360 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3361 

3362 if 'get_bucket_lifecycle_configuration' in bucket.get('c7n:DeniedMethods', []): 

3363 log.warning("Access Denied Bucket:%s while reading lifecycle" % bucket['Name']) 

3364 return 

3365 

3366 # Adjust the existing lifecycle by adding/deleting/overwriting rules as necessary 

3367 config = (bucket.get('Lifecycle') or {}).get('Rules', []) 

3368 for rule in self.data['rules']: 

3369 for index, existing_rule in enumerate(config): 

3370 if not existing_rule: 

3371 continue 

3372 if rule['ID'] == existing_rule['ID']: 

3373 if rule['Status'] == 'absent': 

3374 config[index] = None 

3375 else: 

3376 config[index] = rule 

3377 break 

3378 else: 

3379 if rule['Status'] != 'absent': 

3380 config.append(rule) 

3381 

3382 # The extra `list` conversion is required for python3 

3383 config = list(filter(None, config)) 

3384 

3385 try: 

3386 if not config: 

3387 s3.delete_bucket_lifecycle(Bucket=bucket['Name']) 

3388 else: 

3389 s3.put_bucket_lifecycle_configuration( 

3390 Bucket=bucket['Name'], LifecycleConfiguration={'Rules': config}) 

3391 except ClientError as e: 

3392 if e.response['Error']['Code'] == 'AccessDenied': 

3393 log.warning("Access Denied Bucket:%s while applying lifecycle" % bucket['Name']) 

3394 else: 

3395 raise e 

3396 

3397 

3398class KMSKeyResolverMixin: 

3399 """Builds a dictionary of region specific ARNs""" 

3400 

3401 def __init__(self, data, manager=None): 

3402 self.arns = dict() 

3403 self.data = data 

3404 self.manager = manager 

3405 

3406 def resolve_keys(self, buckets): 

3407 key = self.data.get('key') 

3408 if not key: 

3409 return None 

3410 

3411 regions = {get_region(b) for b in buckets} 

3412 for r in regions: 

3413 client = local_session(self.manager.session_factory).client('kms', region_name=r) 

3414 try: 

3415 key_meta = client.describe_key( 

3416 KeyId=key 

3417 ).get('KeyMetadata', {}) 

3418 key_id = key_meta.get('KeyId') 

3419 

3420 # We need a complete set of alias identifiers (names and ARNs) 

3421 # to fully evaluate bucket encryption filters. 

3422 key_aliases = client.list_aliases( 

3423 KeyId=key_id 

3424 ).get('Aliases', []) 

3425 

3426 self.arns[r] = { 

3427 'KeyId': key_id, 

3428 'Arn': key_meta.get('Arn'), 

3429 'KeyManager': key_meta.get('KeyManager'), 

3430 'Description': key_meta.get('Description'), 

3431 'Aliases': [ 

3432 alias[attr] 

3433 for alias in key_aliases 

3434 for attr in ('AliasArn', 'AliasName') 

3435 ], 

3436 } 

3437 

3438 except ClientError as e: 

3439 self.log.error('Error resolving kms ARNs for set-bucket-encryption: %s key: %s' % ( 

3440 e, self.data.get('key'))) 

3441 

3442 def get_key(self, bucket): 

3443 if 'key' not in self.data: 

3444 return None 

3445 region = get_region(bucket) 

3446 key = self.arns.get(region) 

3447 if not key: 

3448 self.log.warning('Unable to resolve key %s for bucket %s in region %s', 

3449 self.data['key'], bucket.get('Name'), region) 

3450 return key 

3451 

3452 

3453@filters.register('bucket-encryption') 

3454class BucketEncryption(KMSKeyResolverMixin, Filter): 

3455 """Filters for S3 buckets that have bucket-encryption 

3456 

3457 :example 

3458 

3459 .. code-block:: yaml 

3460 

3461 policies: 

3462 - name: s3-bucket-encryption-AES256 

3463 resource: s3 

3464 region: us-east-1 

3465 filters: 

3466 - type: bucket-encryption 

3467 state: True 

3468 crypto: AES256 

3469 - name: s3-bucket-encryption-KMS 

3470 resource: s3 

3471 region: us-east-1 

3472 filters: 

3473 - type: bucket-encryption 

3474 state: True 

3475 crypto: aws:kms 

3476 key: alias/some/alias/key 

3477 - name: s3-bucket-encryption-off 

3478 resource: s3 

3479 region: us-east-1 

3480 filters: 

3481 - type: bucket-encryption 

3482 state: False 

3483 - name: s3-bucket-test-bucket-key-enabled 

3484 resource: s3 

3485 region: us-east-1 

3486 filters: 

3487 - type: bucket-encryption 

3488 bucket_key_enabled: True 

3489 """ 

3490 schema = type_schema('bucket-encryption', 

3491 state={'type': 'boolean'}, 

3492 crypto={'type': 'string', 'enum': ['AES256', 'aws:kms']}, 

3493 key={'type': 'string'}, 

3494 bucket_key_enabled={'type': 'boolean'}) 

3495 

3496 permissions = ('s3:GetEncryptionConfiguration', 'kms:DescribeKey', 'kms:ListAliases') 

3497 annotation_key = 'c7n:bucket-encryption' 

3498 

3499 def validate(self): 

3500 if self.data.get('bucket_key_enabled') is not None and self.data.get('key') is not None: 

3501 raise PolicyValidationError( 

3502 f'key and bucket_key_enabled attributes cannot both be set: {self.data}' 

3503 ) 

3504 

3505 def process(self, buckets, event=None): 

3506 self.resolve_keys(buckets) 

3507 results = [] 

3508 with self.executor_factory(max_workers=2) as w: 

3509 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3510 for future in as_completed(futures): 

3511 b = futures[future] 

3512 if future.exception(): 

3513 self.log.error("Message: %s Bucket: %s", future.exception(), 

3514 b['Name']) 

3515 continue 

3516 if future.result(): 

3517 results.append(b) 

3518 return results 

3519 

3520 def process_bucket(self, b): 

3521 

3522 client = bucket_client(local_session(self.manager.session_factory), b) 

3523 rules = [] 

3524 if self.annotation_key not in b: 

3525 try: 

3526 be = client.get_bucket_encryption(Bucket=b['Name']) 

3527 be.pop('ResponseMetadata', None) 

3528 except ClientError as e: 

3529 if e.response['Error']['Code'] != 'ServerSideEncryptionConfigurationNotFoundError': 

3530 raise 

3531 be = {} 

3532 b[self.annotation_key] = be 

3533 else: 

3534 be = b[self.annotation_key] 

3535 

3536 rules = be.get('ServerSideEncryptionConfiguration', {}).get('Rules', []) 

3537 # default `state` to True as previous impl assumed state == True 

3538 # to preserve backwards compatibility 

3539 if self.data.get('bucket_key_enabled'): 

3540 for rule in rules: 

3541 return self.filter_bucket_key_enabled(rule) 

3542 elif self.data.get('bucket_key_enabled') is False: 

3543 for rule in rules: 

3544 return not self.filter_bucket_key_enabled(rule) 

3545 

3546 if self.data.get('state', True): 

3547 for sse in rules: 

3548 return self.filter_bucket(b, sse) 

3549 return False 

3550 else: 

3551 for sse in rules: 

3552 return not self.filter_bucket(b, sse) 

3553 return True 

3554 

3555 def filter_bucket(self, b, sse): 

3556 allowed = ['AES256', 'aws:kms'] 

3557 key = self.get_key(b) 

3558 crypto = self.data.get('crypto') 

3559 rule = sse.get('ApplyServerSideEncryptionByDefault') 

3560 

3561 if not rule: 

3562 return False 

3563 algo = rule.get('SSEAlgorithm') 

3564 

3565 if not crypto and algo in allowed: 

3566 return True 

3567 

3568 if crypto == 'AES256' and algo == 'AES256': 

3569 return True 

3570 elif crypto == 'aws:kms' and algo == 'aws:kms': 

3571 if not key: 

3572 # There are two broad reasons to have an empty value for 

3573 # the regional key here: 

3574 # 

3575 # * The policy did not specify a key, in which case this 

3576 # filter should match _all_ buckets with a KMS default 

3577 # encryption rule. 

3578 # 

3579 # * The policy specified a key that could not be 

3580 # resolved, in which case this filter shouldn't match 

3581 # any buckets. 

3582 return 'key' not in self.data 

3583 

3584 # The default encryption rule can specify a key ID, 

3585 # key ARN, alias name or alias ARN. Match against any of 

3586 # those attributes. A rule specifying KMS with no master key 

3587 # implies the AWS-managed key. 

3588 key_ids = {key.get('Arn'), key.get('KeyId'), *key['Aliases']} 

3589 return rule.get('KMSMasterKeyID', 'alias/aws/s3') in key_ids 

3590 

3591 def filter_bucket_key_enabled(self, rule) -> bool: 

3592 if not rule: 

3593 return False 

3594 return rule.get('BucketKeyEnabled') 

3595 

3596 

3597@actions.register('set-bucket-encryption') 

3598class SetBucketEncryption(KMSKeyResolverMixin, BucketActionBase): 

3599 """Action enables default encryption on S3 buckets 

3600 

3601 `enabled`: boolean Optional: Defaults to True 

3602 

3603 `crypto`: aws:kms | AES256` Optional: Defaults to AES256 

3604 

3605 `key`: arn, alias, or kms id key 

3606 

3607 `bucket-key`: boolean Optional: 

3608 Defaults to True. 

3609 Reduces amount of API traffic from Amazon S3 to KMS and can reduce KMS request 

3610 costsby up to 99 percent. Requires kms:Decrypt permissions for copy and upload 

3611 on the AWS KMS Key Policy. 

3612 

3613 Bucket Key Docs: https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucket-key.html 

3614 

3615 :example: 

3616 

3617 .. code-block:: yaml 

3618 

3619 policies: 

3620 - name: s3-enable-default-encryption-kms 

3621 resource: s3 

3622 actions: 

3623 - type: set-bucket-encryption 

3624 # enabled: true <------ optional (true by default) 

3625 crypto: aws:kms 

3626 key: 1234abcd-12ab-34cd-56ef-1234567890ab 

3627 bucket-key: true 

3628 

3629 - name: s3-enable-default-encryption-kms-alias 

3630 resource: s3 

3631 actions: 

3632 - type: set-bucket-encryption 

3633 # enabled: true <------ optional (true by default) 

3634 crypto: aws:kms 

3635 key: alias/some/alias/key 

3636 bucket-key: true 

3637 

3638 - name: s3-enable-default-encryption-aes256 

3639 resource: s3 

3640 actions: 

3641 - type: set-bucket-encryption 

3642 # bucket-key: true <--- optional (true by default for AWS SSE) 

3643 # crypto: AES256 <----- optional (AES256 by default) 

3644 # enabled: true <------ optional (true by default) 

3645 

3646 - name: s3-disable-default-encryption 

3647 resource: s3 

3648 actions: 

3649 - type: set-bucket-encryption 

3650 enabled: false 

3651 """ 

3652 

3653 schema = { 

3654 'type': 'object', 

3655 'additionalProperties': False, 

3656 'properties': { 

3657 'type': {'enum': ['set-bucket-encryption']}, 

3658 'enabled': {'type': 'boolean'}, 

3659 'crypto': {'enum': ['aws:kms', 'AES256']}, 

3660 'key': {'type': 'string'}, 

3661 'bucket-key': {'type': 'boolean'} 

3662 }, 

3663 'dependencies': { 

3664 'key': { 

3665 'properties': { 

3666 'crypto': {'pattern': 'aws:kms'} 

3667 }, 

3668 'required': ['crypto'] 

3669 } 

3670 } 

3671 } 

3672 

3673 permissions = ('s3:PutEncryptionConfiguration', 's3:GetEncryptionConfiguration', 

3674 'kms:ListAliases', 'kms:DescribeKey') 

3675 

3676 def process(self, buckets): 

3677 if self.data.get('enabled', True): 

3678 self.resolve_keys(buckets) 

3679 

3680 with self.executor_factory(max_workers=3) as w: 

3681 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3682 for future in as_completed(futures): 

3683 if future.exception(): 

3684 self.log.error('Message: %s Bucket: %s', future.exception(), 

3685 futures[future]['Name']) 

3686 

3687 def process_bucket(self, bucket): 

3688 default_key_desc = 'Default master key that protects my S3 objects when no other key is defined' # noqa 

3689 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3690 if not self.data.get('enabled', True): 

3691 s3.delete_bucket_encryption(Bucket=bucket['Name']) 

3692 return 

3693 algo = self.data.get('crypto', 'AES256') 

3694 

3695 # bucket key defaults to True for alias/aws/s3 and AES256 (Amazon SSE) 

3696 # and ignores False values for that crypto 

3697 bucket_key = self.data.get('bucket-key', True) 

3698 config = { 

3699 'Rules': [ 

3700 { 

3701 'ApplyServerSideEncryptionByDefault': { 

3702 'SSEAlgorithm': algo, 

3703 }, 

3704 'BucketKeyEnabled': bucket_key 

3705 } 

3706 ] 

3707 } 

3708 

3709 if algo == 'aws:kms': 

3710 key = self.get_key(bucket) 

3711 if not key: 

3712 raise Exception('Valid KMS Key required but does not exist') 

3713 

3714 config['Rules'][0]['ApplyServerSideEncryptionByDefault']['KMSMasterKeyID'] = key['Arn'] 

3715 s3.put_bucket_encryption( 

3716 Bucket=bucket['Name'], 

3717 ServerSideEncryptionConfiguration=config 

3718 ) 

3719 

3720 

3721OWNERSHIP_CONTROLS = ['BucketOwnerEnforced', 'BucketOwnerPreferred', 'ObjectWriter'] 

3722VALUE_FILTER_MAGIC_VALUES = ['absent', 'present', 'not-null', 'empty'] 

3723 

3724 

3725@filters.register('ownership') 

3726class BucketOwnershipControls(BucketFilterBase, ValueFilter): 

3727 """Filter for object ownership controls 

3728 

3729 Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html 

3730 

3731 :example 

3732 

3733 Find buckets with ACLs disabled 

3734 

3735 .. code-block:: yaml 

3736 

3737 policies: 

3738 - name: s3-bucket-acls-disabled 

3739 resource: aws.s3 

3740 region: us-east-1 

3741 filters: 

3742 - type: ownership 

3743 value: BucketOwnerEnforced 

3744 

3745 :example 

3746 

3747 Find buckets with object ownership preferred or enforced 

3748 

3749 .. code-block:: yaml 

3750 

3751 policies: 

3752 - name: s3-bucket-ownership-preferred 

3753 resource: aws.s3 

3754 region: us-east-1 

3755 filters: 

3756 - type: ownership 

3757 op: in 

3758 value: 

3759 - BucketOwnerEnforced 

3760 - BucketOwnerPreferred 

3761 

3762 :example 

3763 

3764 Find buckets with no object ownership controls 

3765 

3766 .. code-block:: yaml 

3767 

3768 policies: 

3769 - name: s3-bucket-no-ownership-controls 

3770 resource: aws.s3 

3771 region: us-east-1 

3772 filters: 

3773 - type: ownership 

3774 value: empty 

3775 """ 

3776 schema = type_schema('ownership', rinherit=ValueFilter.schema, value={'oneOf': [ 

3777 {'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}, 

3778 {'type': 'array', 'items': { 

3779 'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}}]}) 

3780 permissions = ('s3:GetBucketOwnershipControls',) 

3781 annotation_key = 'c7n:ownership' 

3782 

3783 def __init__(self, data, manager=None): 

3784 super(BucketOwnershipControls, self).__init__(data, manager) 

3785 

3786 # Ownership controls appear as an array of rules. There can only be one 

3787 # ObjectOwnership rule defined for a bucket, so we can automatically 

3788 # match against that if it exists. 

3789 self.data['key'] = f'("{self.annotation_key}".Rules[].ObjectOwnership)[0]' 

3790 

3791 def process(self, buckets, event=None): 

3792 with self.executor_factory(max_workers=2) as w: 

3793 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3794 for future in as_completed(futures): 

3795 b = futures[future] 

3796 if future.exception(): 

3797 self.log.error("Message: %s Bucket: %s", future.exception(), 

3798 b['Name']) 

3799 continue 

3800 return super(BucketOwnershipControls, self).process(buckets, event) 

3801 

3802 def process_bucket(self, b): 

3803 if self.annotation_key in b: 

3804 return 

3805 client = bucket_client(local_session(self.manager.session_factory), b) 

3806 try: 

3807 controls = client.get_bucket_ownership_controls(Bucket=b['Name']) 

3808 controls.pop('ResponseMetadata', None) 

3809 except ClientError as e: 

3810 if e.response['Error']['Code'] != 'OwnershipControlsNotFoundError': 

3811 raise 

3812 controls = {} 

3813 b[self.annotation_key] = controls.get('OwnershipControls') 

3814 

3815 

3816@filters.register('bucket-replication') 

3817class BucketReplication(ListItemFilter): 

3818 """Filter for S3 buckets to look at bucket replication configurations 

3819 

3820 The schema to supply to the attrs follows the schema here: 

3821 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_bucket_replication.html 

3822 

3823 :example: 

3824 

3825 .. code-block:: yaml 

3826 

3827 policies: 

3828 - name: s3-bucket-replication 

3829 resource: s3 

3830 filters: 

3831 - type: bucket-replication 

3832 attrs: 

3833 - Status: Enabled 

3834 - Filter: 

3835 And: 

3836 Prefix: test 

3837 Tags: 

3838 - Key: Owner 

3839 Value: c7n 

3840 - ExistingObjectReplication: Enabled 

3841 

3842 """ 

3843 schema = type_schema( 

3844 'bucket-replication', 

3845 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'}, 

3846 count={'type': 'number'}, 

3847 count_op={'$ref': '#/definitions/filters_common/comparison_operators'} 

3848 ) 

3849 

3850 permissions = ("s3:GetReplicationConfiguration",) 

3851 annotation_key = 'Replication' 

3852 annotate_items = True 

3853 

3854 def __init__(self, data, manager=None): 

3855 super().__init__(data, manager) 

3856 self.data['key'] = self.annotation_key 

3857 

3858 def get_item_values(self, b): 

3859 client = bucket_client(local_session(self.manager.session_factory), b) 

3860 # replication configuration is called in S3_AUGMENT_TABLE: 

3861 bucket_replication = b.get(self.annotation_key) 

3862 

3863 rules = [] 

3864 if bucket_replication is not None: 

3865 rules = bucket_replication.get('ReplicationConfiguration', {}).get('Rules', []) 

3866 for replication in rules: 

3867 self.augment_bucket_replication(b, replication, client) 

3868 

3869 return rules 

3870 

3871 def augment_bucket_replication(self, b, replication, client): 

3872 destination_bucket = replication.get('Destination').get('Bucket').split(':')[5] 

3873 destination_region = inspect_bucket_region(destination_bucket, client.meta.endpoint_url) 

3874 source_region = get_region(b) 

3875 replication['DestinationRegion'] = destination_region 

3876 replication['CrossRegion'] = destination_region != source_region 

3877 

3878 

3879@resources.register('s3-directory') 

3880class S3Directory(query.QueryResourceManager): 

3881 

3882 class resource_type(query.TypeInfo): 

3883 service = 's3' 

3884 permission_prefix = "s3express" 

3885 arn_service = "s3express" 

3886 arn_type = 'bucket' 

3887 enum_spec = ('list_directory_buckets', 'Buckets[]', None) 

3888 name = id = 'Name' 

3889 date = 'CreationDate' 

3890 dimension = 'BucketName' 

3891 cfn_type = 'AWS::S3Express::DirectoryBucket' 

3892 permissions_enum = ("s3express:ListAllMyDirectoryBuckets",)