Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/s3.py: 23%

1630 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3"""S3 Resource Manager 

4 

5Filters: 

6 

7The generic Values filters (jmespath) expression and Or filter are 

8available with all resources, including buckets, we include several 

9additonal bucket data (Tags, Replication, Acl, Policy) as keys within 

10a bucket representation. 

11 

12Actions: 

13 

14 encrypt-keys 

15 

16 Scan all keys in a bucket and optionally encrypt them in place. 

17 

18 global-grants 

19 

20 Check bucket acls for global grants 

21 

22 encryption-policy 

23 

24 Attach an encryption required policy to a bucket, this will break 

25 applications that are not using encryption, including aws log 

26 delivery. 

27 

28""" 

29import copy 

30import functools 

31import json 

32import itertools 

33import logging 

34import math 

35import os 

36import time 

37import ssl 

38 

39from botocore.client import Config 

40from botocore.exceptions import ClientError 

41 

42from collections import defaultdict 

43from concurrent.futures import as_completed 

44 

45try: 

46 from urllib3.exceptions import SSLError 

47except ImportError: 

48 from botocore.vendored.requests.packages.urllib3.exceptions import SSLError 

49 

50 

51from c7n.actions import ( 

52 ActionRegistry, BaseAction, PutMetric, RemovePolicyBase) 

53from c7n.exceptions import PolicyValidationError, PolicyExecutionError 

54from c7n.filters import ( 

55 FilterRegistry, Filter, CrossAccountAccessFilter, MetricsFilter, 

56 ValueFilter, ListItemFilter) 

57from .aws import shape_validate 

58import c7n.filters.policystatement as polstmt_filter 

59from c7n.manager import resources 

60from c7n.output import NullBlobOutput 

61from c7n import query 

62from c7n.resources.securityhub import PostFinding 

63from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction 

64from c7n.utils import ( 

65 chunks, local_session, set_annotation, type_schema, filter_empty, 

66 dumps, format_string_values, get_account_alias_from_sts) 

67from c7n.resources.aws import inspect_bucket_region 

68 

69 

70log = logging.getLogger('custodian.s3') 

71 

72filters = FilterRegistry('s3.filters') 

73actions = ActionRegistry('s3.actions') 

74filters.register('marked-for-op', TagActionFilter) 

75actions.register('put-metric', PutMetric) 

76 

77MAX_COPY_SIZE = 1024 * 1024 * 1024 * 2 

78 

79 

80class DescribeS3(query.DescribeSource): 

81 

82 def augment(self, buckets): 

83 with self.manager.executor_factory( 

84 max_workers=min((10, len(buckets) + 1))) as w: 

85 results = w.map( 

86 assemble_bucket, 

87 zip(itertools.repeat(self.manager.session_factory), buckets)) 

88 results = list(filter(None, results)) 

89 return results 

90 

91 

92class ConfigS3(query.ConfigSource): 

93 

94 # normalize config's janky idiosyncratic bespoke formating to the 

95 # standard describe api responses. 

96 

97 def get_query_params(self, query): 

98 q = super(ConfigS3, self).get_query_params(query) 

99 if 'expr' in q: 

100 q['expr'] = q['expr'].replace('select ', 'select awsRegion, ') 

101 return q 

102 

103 def load_resource(self, item): 

104 resource = super(ConfigS3, self).load_resource(item) 

105 cfg = item['supplementaryConfiguration'] 

106 # aka standard 

107 if 'awsRegion' in item and item['awsRegion'] != 'us-east-1': 

108 resource['Location'] = {'LocationConstraint': item['awsRegion']} 

109 else: 

110 resource['Location'] = {} 

111 

112 # owner is under acl per describe 

113 resource.pop('Owner', None) 

114 

115 for k, null_value in S3_CONFIG_SUPPLEMENT_NULL_MAP.items(): 

116 if k not in cfg: 

117 continue 

118 if cfg.get(k) == null_value: 

119 continue 

120 method = getattr(self, "handle_%s" % k, None) 

121 if method is None: 

122 raise ValueError("unhandled supplementary config %s", k) 

123 continue 

124 v = cfg[k] 

125 if isinstance(cfg[k], str): 

126 v = json.loads(cfg[k]) 

127 method(resource, v) 

128 

129 for el in S3_AUGMENT_TABLE: 

130 if el[1] not in resource: 

131 resource[el[1]] = el[2] 

132 return resource 

133 

134 PERMISSION_MAP = { 

135 'FullControl': 'FULL_CONTROL', 

136 'Write': 'WRITE', 

137 'WriteAcp': 'WRITE_ACP', 

138 'Read': 'READ', 

139 'ReadAcp': 'READ_ACP'} 

140 

141 GRANTEE_MAP = { 

142 'AllUsers': "http://acs.amazonaws.com/groups/global/AllUsers", 

143 'AuthenticatedUsers': "http://acs.amazonaws.com/groups/global/AuthenticatedUsers", 

144 'LogDelivery': 'http://acs.amazonaws.com/groups/s3/LogDelivery'} 

145 

146 def handle_AccessControlList(self, resource, item_value): 

147 # double serialized in config for some reason 

148 if isinstance(item_value, str): 

149 item_value = json.loads(item_value) 

150 

151 resource['Acl'] = {} 

152 resource['Acl']['Owner'] = {'ID': item_value['owner']['id']} 

153 if item_value['owner']['displayName']: 

154 resource['Acl']['Owner']['DisplayName'] = item_value[ 

155 'owner']['displayName'] 

156 resource['Acl']['Grants'] = grants = [] 

157 

158 for g in (item_value.get('grantList') or ()): 

159 if 'id' not in g['grantee']: 

160 assert g['grantee'] in self.GRANTEE_MAP, "unknown grantee %s" % g 

161 rg = {'Type': 'Group', 'URI': self.GRANTEE_MAP[g['grantee']]} 

162 else: 

163 rg = {'ID': g['grantee']['id'], 'Type': 'CanonicalUser'} 

164 

165 if 'displayName' in g: 

166 rg['DisplayName'] = g['displayName'] 

167 

168 grants.append({ 

169 'Permission': self.PERMISSION_MAP[g['permission']], 

170 'Grantee': rg, 

171 }) 

172 

173 def handle_BucketAccelerateConfiguration(self, resource, item_value): 

174 # not currently auto-augmented by custodian 

175 return 

176 

177 def handle_BucketLoggingConfiguration(self, resource, item_value): 

178 if ('destinationBucketName' not in item_value or 

179 item_value['destinationBucketName'] is None): 

180 resource[u'Logging'] = {} 

181 return 

182 resource[u'Logging'] = { 

183 'TargetBucket': item_value['destinationBucketName'], 

184 'TargetPrefix': item_value['logFilePrefix']} 

185 

186 def handle_BucketLifecycleConfiguration(self, resource, item_value): 

187 rules = [] 

188 for r in item_value.get('rules'): 

189 rr = {} 

190 rules.append(rr) 

191 expiry = {} 

192 for ek, ck in ( 

193 ('Date', 'expirationDate'), 

194 ('ExpiredObjectDeleteMarker', 'expiredObjectDeleteMarker'), 

195 ('Days', 'expirationInDays')): 

196 if ck in r and r[ck] and r[ck] != -1: 

197 expiry[ek] = r[ck] 

198 if expiry: 

199 rr['Expiration'] = expiry 

200 

201 transitions = [] 

202 for t in (r.get('transitions') or ()): 

203 tr = {} 

204 for k in ('date', 'days', 'storageClass'): 

205 if t.get(k): 

206 tr["%s%s" % (k[0].upper(), k[1:])] = t[k] 

207 transitions.append(tr) 

208 if transitions: 

209 rr['Transitions'] = transitions 

210 

211 if r.get('abortIncompleteMultipartUpload'): 

212 rr['AbortIncompleteMultipartUpload'] = { 

213 'DaysAfterInitiation': r[ 

214 'abortIncompleteMultipartUpload']['daysAfterInitiation']} 

215 if r.get('noncurrentVersionExpirationInDays'): 

216 rr['NoncurrentVersionExpiration'] = { 

217 'NoncurrentDays': r['noncurrentVersionExpirationInDays']} 

218 

219 nonc_transitions = [] 

220 for t in (r.get('noncurrentVersionTransitions') or ()): 

221 nonc_transitions.append({ 

222 'NoncurrentDays': t['days'], 

223 'StorageClass': t['storageClass']}) 

224 if nonc_transitions: 

225 rr['NoncurrentVersionTransitions'] = nonc_transitions 

226 

227 rr['Status'] = r['status'] 

228 rr['ID'] = r['id'] 

229 if r.get('prefix'): 

230 rr['Prefix'] = r['prefix'] 

231 if 'filter' not in r or not r['filter']: 

232 continue 

233 

234 if r['filter']['predicate']: 

235 rr['Filter'] = self.convertLifePredicate(r['filter']['predicate']) 

236 

237 resource['Lifecycle'] = {'Rules': rules} 

238 

239 def convertLifePredicate(self, p): 

240 if p['type'] == 'LifecyclePrefixPredicate': 

241 return {'Prefix': p['prefix']} 

242 if p['type'] == 'LifecycleTagPredicate': 

243 return {'Tags': [{'Key': p['tag']['key'], 'Value': p['tag']['value']}]} 

244 if p['type'] == 'LifecycleAndOperator': 

245 n = {} 

246 for o in p['operands']: 

247 ot = self.convertLifePredicate(o) 

248 if 'Tags' in n and 'Tags' in ot: 

249 n['Tags'].extend(ot['Tags']) 

250 else: 

251 n.update(ot) 

252 return {'And': n} 

253 

254 raise ValueError("unknown predicate: %s" % p) 

255 

256 NotifyTypeMap = { 

257 'QueueConfiguration': 'QueueConfigurations', 

258 'LambdaConfiguration': 'LambdaFunctionConfigurations', 

259 'CloudFunctionConfiguration': 'LambdaFunctionConfigurations', 

260 'TopicConfiguration': 'TopicConfigurations'} 

261 

262 def handle_BucketNotificationConfiguration(self, resource, item_value): 

263 d = {} 

264 for nid, n in item_value['configurations'].items(): 

265 ninfo = {} 

266 d.setdefault(self.NotifyTypeMap[n['type']], []).append(ninfo) 

267 if n['type'] == 'QueueConfiguration': 

268 ninfo['QueueArn'] = n['queueARN'] 

269 elif n['type'] == 'TopicConfiguration': 

270 ninfo['TopicArn'] = n['topicARN'] 

271 elif n['type'] == 'LambdaConfiguration': 

272 ninfo['LambdaFunctionArn'] = n['functionARN'] 

273 ninfo['Id'] = nid 

274 ninfo['Events'] = n['events'] 

275 rules = [] 

276 if n['filter']: 

277 for r in n['filter'].get('s3KeyFilter', {}).get('filterRules', []): 

278 rules.append({'Name': r['name'], 'Value': r['value']}) 

279 if rules: 

280 ninfo['Filter'] = {'Key': {'FilterRules': rules}} 

281 resource['Notification'] = d 

282 

283 def handle_BucketReplicationConfiguration(self, resource, item_value): 

284 d = {'Role': item_value['roleARN'], 'Rules': []} 

285 for rid, r in item_value['rules'].items(): 

286 rule = { 

287 'ID': rid, 

288 'Status': r.get('status', ''), 

289 'Prefix': r.get('prefix', ''), 

290 'Destination': { 

291 'Bucket': r['destinationConfig']['bucketARN']} 

292 } 

293 if 'Account' in r['destinationConfig']: 

294 rule['Destination']['Account'] = r['destinationConfig']['Account'] 

295 if r['destinationConfig'].get('storageClass'): 

296 rule['Destination']['StorageClass'] = r['destinationConfig']['storageClass'] 

297 d['Rules'].append(rule) 

298 resource['Replication'] = {'ReplicationConfiguration': d} 

299 

300 def handle_BucketPolicy(self, resource, item_value): 

301 resource['Policy'] = item_value.get('policyText') 

302 

303 def handle_BucketTaggingConfiguration(self, resource, item_value): 

304 resource['Tags'] = [ 

305 {"Key": k, "Value": v} for k, v in item_value['tagSets'][0]['tags'].items()] 

306 

307 def handle_BucketVersioningConfiguration(self, resource, item_value): 

308 # Config defaults versioning to 'Off' for a null value 

309 if item_value['status'] not in ('Enabled', 'Suspended'): 

310 resource['Versioning'] = {} 

311 return 

312 resource['Versioning'] = {'Status': item_value['status']} 

313 # `isMfaDeleteEnabled` is an optional boolean property - the key may be absent, 

314 # present with a null value, or present with a boolean value. 

315 # Mirror the describe source by populating Versioning.MFADelete only in the 

316 # boolean case. 

317 mfa_delete = item_value.get('isMfaDeleteEnabled') 

318 if mfa_delete is None: 

319 return 

320 resource['Versioning']['MFADelete'] = ( 

321 'Enabled' if mfa_delete else 'Disabled' 

322 ) 

323 

324 def handle_BucketWebsiteConfiguration(self, resource, item_value): 

325 website = {} 

326 if item_value['indexDocumentSuffix']: 

327 website['IndexDocument'] = { 

328 'Suffix': item_value['indexDocumentSuffix']} 

329 if item_value['errorDocument']: 

330 website['ErrorDocument'] = { 

331 'Key': item_value['errorDocument']} 

332 if item_value['redirectAllRequestsTo']: 

333 website['RedirectAllRequestsTo'] = { 

334 'HostName': item_value['redirectAllRequestsTo']['hostName'], 

335 'Protocol': item_value['redirectAllRequestsTo']['protocol']} 

336 for r in item_value['routingRules']: 

337 redirect = {} 

338 rule = {'Redirect': redirect} 

339 website.setdefault('RoutingRules', []).append(rule) 

340 if 'condition' in r: 

341 cond = {} 

342 for ck, rk in ( 

343 ('keyPrefixEquals', 'KeyPrefixEquals'), 

344 ('httpErrorCodeReturnedEquals', 

345 'HttpErrorCodeReturnedEquals')): 

346 if r['condition'][ck]: 

347 cond[rk] = r['condition'][ck] 

348 rule['Condition'] = cond 

349 for ck, rk in ( 

350 ('protocol', 'Protocol'), 

351 ('hostName', 'HostName'), 

352 ('replaceKeyPrefixWith', 'ReplaceKeyPrefixWith'), 

353 ('replaceKeyWith', 'ReplaceKeyWith'), 

354 ('httpRedirectCode', 'HttpRedirectCode')): 

355 if r['redirect'][ck]: 

356 redirect[rk] = r['redirect'][ck] 

357 resource['Website'] = website 

358 

359 

360@resources.register('s3') 

361class S3(query.QueryResourceManager): 

362 

363 class resource_type(query.TypeInfo): 

364 service = 's3' 

365 arn_type = '' 

366 enum_spec = ('list_buckets', 'Buckets[]', None) 

367 # not used but we want some consistency on the metadata 

368 detail_spec = ('get_bucket_location', 'Bucket', 'Name', 'LocationConstraint') 

369 name = id = 'Name' 

370 date = 'CreationDate' 

371 dimension = 'BucketName' 

372 cfn_type = config_type = 'AWS::S3::Bucket' 

373 

374 filter_registry = filters 

375 action_registry = actions 

376 source_mapping = { 

377 'describe': DescribeS3, 

378 'config': ConfigS3 

379 } 

380 

381 def get_arns(self, resources): 

382 return ["arn:aws:s3:::{}".format(r["Name"]) for r in resources] 

383 

384 @classmethod 

385 def get_permissions(cls): 

386 perms = ["s3:ListAllMyBuckets"] 

387 perms.extend([n[-1] for n in S3_AUGMENT_TABLE]) 

388 return perms 

389 

390 

391S3_CONFIG_SUPPLEMENT_NULL_MAP = { 

392 'BucketLoggingConfiguration': u'{"destinationBucketName":null,"logFilePrefix":null}', 

393 'BucketPolicy': u'{"policyText":null}', 

394 'BucketVersioningConfiguration': u'{"status":"Off","isMfaDeleteEnabled":null}', 

395 'BucketAccelerateConfiguration': u'{"status":null}', 

396 'BucketNotificationConfiguration': u'{"configurations":{}}', 

397 'BucketLifecycleConfiguration': None, 

398 'AccessControlList': None, 

399 'BucketTaggingConfiguration': None, 

400 'BucketWebsiteConfiguration': None, 

401 'BucketReplicationConfiguration': None 

402} 

403 

404S3_AUGMENT_TABLE = ( 

405 ('get_bucket_location', 'Location', {}, None, 's3:GetBucketLocation'), 

406 ('get_bucket_tagging', 'Tags', [], 'TagSet', 's3:GetBucketTagging'), 

407 ('get_bucket_policy', 'Policy', None, 'Policy', 's3:GetBucketPolicy'), 

408 ('get_bucket_acl', 'Acl', None, None, 's3:GetBucketAcl'), 

409 ('get_bucket_replication', 

410 'Replication', None, None, 's3:GetReplicationConfiguration'), 

411 ('get_bucket_versioning', 'Versioning', None, None, 's3:GetBucketVersioning'), 

412 ('get_bucket_website', 'Website', None, None, 's3:GetBucketWebsite'), 

413 ('get_bucket_logging', 'Logging', None, 'LoggingEnabled', 's3:GetBucketLogging'), 

414 ('get_bucket_notification_configuration', 

415 'Notification', None, None, 's3:GetBucketNotification'), 

416 ('get_bucket_lifecycle_configuration', 

417 'Lifecycle', None, None, 's3:GetLifecycleConfiguration'), 

418 # ('get_bucket_cors', 'Cors'), 

419) 

420 

421 

422def assemble_bucket(item): 

423 """Assemble a document representing all the config state around a bucket. 

424 

425 TODO: Refactor this, the logic here feels quite muddled. 

426 """ 

427 factory, b = item 

428 s = factory() 

429 c = s.client('s3') 

430 # Bucket Location, Current Client Location, Default Location 

431 b_location = c_location = location = "us-east-1" 

432 methods = list(S3_AUGMENT_TABLE) 

433 for minfo in methods: 

434 m, k, default, select = minfo[:4] 

435 try: 

436 method = getattr(c, m) 

437 v = method(Bucket=b['Name']) 

438 v.pop('ResponseMetadata') 

439 if select is not None and select in v: 

440 v = v[select] 

441 except (ssl.SSLError, SSLError) as e: 

442 # Proxy issues? i assume 

443 log.warning("Bucket ssl error %s: %s %s", 

444 b['Name'], b.get('Location', 'unknown'), 

445 e) 

446 continue 

447 except ClientError as e: 

448 code = e.response['Error']['Code'] 

449 if code.startswith("NoSuch") or "NotFound" in code: 

450 v = default 

451 elif code == 'PermanentRedirect': 

452 s = factory() 

453 c = bucket_client(s, b) 

454 # Requeue with the correct region given location constraint 

455 methods.append((m, k, default, select)) 

456 continue 

457 else: 

458 log.warning( 

459 "Bucket:%s unable to invoke method:%s error:%s ", 

460 b['Name'], m, e.response['Error']['Message']) 

461 # For auth failures, we don't bail out, continue processing if we can. 

462 # Note this can lead to missing data, but in general is cleaner than 

463 # failing hard, due to the common use of locked down s3 bucket policies 

464 # that may cause issues fetching information across a fleet of buckets. 

465 

466 # This does mean s3 policies depending on augments should check denied 

467 # methods annotation, generally though lacking get access to an augment means 

468 # they won't have write access either. 

469 

470 # For other error types we raise and bail policy execution. 

471 if e.response['Error']['Code'] == 'AccessDenied': 

472 b.setdefault('c7n:DeniedMethods', []).append(m) 

473 continue 

474 raise 

475 # As soon as we learn location (which generally works) 

476 if k == 'Location' and v is not None: 

477 b_location = v.get('LocationConstraint') 

478 # Location == region for all cases but EU 

479 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlocation.html 

480 if b_location is None: 

481 b_location = "us-east-1" 

482 elif b_location == 'EU': 

483 b_location = "eu-west-1" 

484 v['LocationConstraint'] = 'eu-west-1' 

485 if v and v != c_location: 

486 c = s.client('s3', region_name=b_location) 

487 elif c_location != location: 

488 c = s.client('s3', region_name=location) 

489 b[k] = v 

490 return b 

491 

492 

493def bucket_client(session, b, kms=False): 

494 region = get_region(b) 

495 

496 if kms: 

497 # Need v4 signature for aws:kms crypto, else let the sdk decide 

498 # based on region support. 

499 config = Config( 

500 signature_version='s3v4', 

501 read_timeout=200, connect_timeout=120) 

502 else: 

503 config = Config(read_timeout=200, connect_timeout=120) 

504 return session.client('s3', region_name=region, config=config) 

505 

506 

507def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()): 

508 for bucket in buckets: 

509 client = bucket_client(local_session(session_factory), bucket) 

510 # Bucket tags are set atomically for the set/document, we want 

511 # to refetch against current to guard against any staleness in 

512 # our cached representation across multiple policies or concurrent 

513 # modifications. 

514 

515 if 'get_bucket_tagging' in bucket.get('c7n:DeniedMethods', []): 

516 # avoid the additional API call if we already know that it's going 

517 # to result in AccessDenied. The chances that the resource's perms 

518 # would have changed between fetching the resource and acting on it 

519 # here are pretty low-- so the check here should suffice. 

520 log.warning( 

521 "Unable to get new set of bucket tags needed to modify tags," 

522 "skipping tag action for bucket: %s" % bucket["Name"]) 

523 continue 

524 

525 try: 

526 bucket['Tags'] = client.get_bucket_tagging( 

527 Bucket=bucket['Name']).get('TagSet', []) 

528 except ClientError as e: 

529 if e.response['Error']['Code'] != 'NoSuchTagSet': 

530 raise 

531 bucket['Tags'] = [] 

532 

533 new_tags = {t['Key']: t['Value'] for t in add_tags} 

534 for t in bucket.get('Tags', ()): 

535 if (t['Key'] not in new_tags and t['Key'] not in remove_tags): 

536 new_tags[t['Key']] = t['Value'] 

537 tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()] 

538 

539 try: 

540 client.put_bucket_tagging( 

541 Bucket=bucket['Name'], Tagging={'TagSet': tag_set}) 

542 except ClientError as e: 

543 log.exception( 

544 'Exception tagging bucket %s: %s', bucket['Name'], e) 

545 continue 

546 

547 

548def get_region(b): 

549 """Tries to get the bucket region from Location.LocationConstraint 

550 

551 Special cases: 

552 LocationConstraint EU defaults to eu-west-1 

553 LocationConstraint null defaults to us-east-1 

554 

555 Args: 

556 b (object): A bucket object 

557 

558 Returns: 

559 string: an aws region string 

560 """ 

561 remap = {None: 'us-east-1', 'EU': 'eu-west-1'} 

562 region = b.get('Location', {}).get('LocationConstraint') 

563 return remap.get(region, region) 

564 

565 

566@filters.register('metrics') 

567class S3Metrics(MetricsFilter): 

568 """S3 CW Metrics need special handling for attribute/dimension 

569 mismatch, and additional required dimension. 

570 """ 

571 

572 def get_dimensions(self, resource): 

573 dims = [{'Name': 'BucketName', 'Value': resource['Name']}] 

574 if (self.data['name'] == 'NumberOfObjects' and 

575 'dimensions' not in self.data): 

576 dims.append( 

577 {'Name': 'StorageType', 'Value': 'AllStorageTypes'}) 

578 return dims 

579 

580 

581@filters.register('cross-account') 

582class S3CrossAccountFilter(CrossAccountAccessFilter): 

583 """Filters cross-account access to S3 buckets 

584 

585 :example: 

586 

587 .. code-block:: yaml 

588 

589 policies: 

590 - name: s3-acl 

591 resource: s3 

592 region: us-east-1 

593 filters: 

594 - type: cross-account 

595 """ 

596 permissions = ('s3:GetBucketPolicy',) 

597 

598 def get_accounts(self): 

599 """add in elb access by default 

600 

601 ELB Accounts by region 

602 https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-access-logs.html 

603 

604 Redshift Accounts by region 

605 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#db-auditing-manage-log-files 

606 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#rs-db-auditing-cloud-trail-rs-acct-ids 

607 

608 Cloudtrail Accounts by region 

609 https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-supported-regions.html 

610 """ 

611 accounts = super(S3CrossAccountFilter, self).get_accounts() 

612 return accounts.union( 

613 [ 

614 # ELB accounts 

615 '127311923021', # us-east-1 

616 '033677994240', # us-east-2 

617 '027434742980', # us-west-1 

618 '797873946194', # us-west-2 

619 '098369216593', # af-south-1 

620 '985666609251', # ca-central-1 

621 '054676820928', # eu-central-1 

622 '897822967062', # eu-north-1 

623 '635631232127', # eu-south-1 

624 '156460612806', # eu-west-1 

625 '652711504416', # eu-west-2 

626 '009996457667', # eu-west-3 

627 '754344448648', # ap-east-1 

628 '582318560864', # ap-northeast-1 

629 '600734575887', # ap-northeast-2 

630 '383597477331', # ap-northeast-3 

631 '114774131450', # ap-southeast-1 

632 '783225319266', # ap-southeast-2 

633 '718504428378', # ap-south-1 

634 '076674570225', # me-south-1 

635 '507241528517', # sa-east-1 

636 '048591011584', # us-gov-west-1 or gov-cloud-1 

637 '190560391635', # us-gov-east-1 

638 '638102146993', # cn-north-1 

639 '037604701340', # cn-northwest-1 

640 

641 # Redshift audit logging 

642 '193672423079', # us-east-1 

643 '391106570357', # us-east-2 

644 '262260360010', # us-west-1 

645 '902366379725', # us-west-2 

646 '365689465814', # af-south-1 

647 '313564881002', # ap-east-1 

648 '865932855811', # ap-south-1 

649 '090321488786', # ap-northeast-3 

650 '760740231472', # ap-northeast-2 

651 '361669875840', # ap-southeast-1 

652 '762762565011', # ap-southeast-2 

653 '404641285394', # ap-northeast-1 

654 '907379612154', # ca-central-1 

655 '053454850223', # eu-central-1 

656 '210876761215', # eu-west-1 

657 '307160386991', # eu-west-2 

658 '945612479654', # eu-south-1 

659 '915173422425', # eu-west-3 

660 '729911121831', # eu-north-1 

661 '013126148197', # me-south-1 

662 '075028567923', # sa-east-1 

663 

664 # Cloudtrail accounts (psa. folks should be using 

665 # cloudtrail service in bucket policies) 

666 '086441151436', # us-east-1 

667 '475085895292', # us-west-2 

668 '388731089494', # us-west-1 

669 '113285607260', # us-west-2 

670 '819402241893', # ca-central-1 

671 '977081816279', # ap-south-1 

672 '492519147666', # ap-northeast-2 

673 '903692715234', # ap-southeast-1 

674 '284668455005', # ap-southeast-2 

675 '216624486486', # ap-northeast-1 

676 '035351147821', # eu-central-1 

677 '859597730677', # eu-west-1 

678 '282025262664', # eu-west-2 

679 '814480443879', # sa-east-1 

680 ]) 

681 

682 

683@filters.register('global-grants') 

684class GlobalGrantsFilter(Filter): 

685 """Filters for all S3 buckets that have global-grants 

686 

687 *Note* by default this filter allows for read access 

688 if the bucket has been configured as a website. This 

689 can be disabled per the example below. 

690 

691 :example: 

692 

693 .. code-block:: yaml 

694 

695 policies: 

696 - name: remove-global-grants 

697 resource: s3 

698 filters: 

699 - type: global-grants 

700 allow_website: false 

701 actions: 

702 - delete-global-grants 

703 

704 """ 

705 

706 schema = type_schema( 

707 'global-grants', 

708 allow_website={'type': 'boolean'}, 

709 operator={'type': 'string', 'enum': ['or', 'and']}, 

710 permissions={ 

711 'type': 'array', 'items': { 

712 'type': 'string', 'enum': [ 

713 'READ', 'WRITE', 'WRITE_ACP', 'READ_ACP', 'FULL_CONTROL']}}) 

714 

715 GLOBAL_ALL = "http://acs.amazonaws.com/groups/global/AllUsers" 

716 AUTH_ALL = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" 

717 

718 def process(self, buckets, event=None): 

719 with self.executor_factory(max_workers=5) as w: 

720 results = w.map(self.process_bucket, buckets) 

721 results = list(filter(None, list(results))) 

722 return results 

723 

724 def process_bucket(self, b): 

725 acl = b.get('Acl', {'Grants': []}) 

726 if not acl or not acl['Grants']: 

727 return 

728 

729 results = [] 

730 allow_website = self.data.get('allow_website', True) 

731 perms = self.data.get('permissions', []) 

732 

733 for grant in acl['Grants']: 

734 if 'URI' not in grant.get("Grantee", {}): 

735 continue 

736 if grant['Grantee']['URI'] not in [self.AUTH_ALL, self.GLOBAL_ALL]: 

737 continue 

738 if allow_website and grant['Permission'] == 'READ' and b['Website']: 

739 continue 

740 if not perms or (perms and grant['Permission'] in perms): 

741 results.append(grant['Permission']) 

742 

743 if results: 

744 set_annotation(b, 'GlobalPermissions', results) 

745 return b 

746 

747 

748class BucketActionBase(BaseAction): 

749 

750 def get_permissions(self): 

751 return self.permissions 

752 

753 def get_std_format_args(self, bucket): 

754 return { 

755 'account_id': self.manager.config.account_id, 

756 'region': self.manager.config.region, 

757 'bucket_name': bucket['Name'], 

758 'bucket_region': get_region(bucket) 

759 } 

760 

761 def process(self, buckets): 

762 return self._process_with_futures(buckets) 

763 

764 def _process_with_futures(self, buckets, *args, max_workers=3, **kwargs): 

765 errors = 0 

766 results = [] 

767 with self.executor_factory(max_workers=max_workers) as w: 

768 futures = {} 

769 for b in buckets: 

770 futures[w.submit(self.process_bucket, b, *args, **kwargs)] = b 

771 for f in as_completed(futures): 

772 if f.exception(): 

773 b = futures[f] 

774 self.log.error( 

775 'error modifying bucket: policy:%s action:%s bucket:%s error:%s', 

776 self.manager.data.get('name'), self.name, b['Name'], f.exception() 

777 ) 

778 errors += 1 

779 continue 

780 results += filter(None, [f.result()]) 

781 if errors: 

782 self.log.error('encountered %d errors while processing %s', errors, self.name) 

783 raise PolicyExecutionError('%d resources failed', errors) 

784 return results 

785 

786 

787class BucketFilterBase(Filter): 

788 def get_std_format_args(self, bucket): 

789 return { 

790 'account_id': self.manager.config.account_id, 

791 'region': self.manager.config.region, 

792 'bucket_name': bucket['Name'], 

793 'bucket_region': get_region(bucket) 

794 } 

795 

796 

797@S3.action_registry.register("post-finding") 

798class BucketFinding(PostFinding): 

799 

800 resource_type = 'AwsS3Bucket' 

801 

802 def format_resource(self, r): 

803 owner = r.get("Acl", {}).get("Owner", {}) 

804 resource = { 

805 "Type": self.resource_type, 

806 "Id": "arn:aws:s3:::{}".format(r["Name"]), 

807 "Region": get_region(r), 

808 "Tags": {t["Key"]: t["Value"] for t in r.get("Tags", [])}, 

809 "Details": {self.resource_type: { 

810 "OwnerId": owner.get('ID', 'Unknown')}} 

811 } 

812 

813 if "DisplayName" in owner: 

814 resource["Details"]["AwsS3Bucket"]["OwnerName"] = owner['DisplayName'] 

815 

816 return filter_empty(resource) 

817 

818 

819@S3.filter_registry.register('has-statement') 

820class HasStatementFilter(polstmt_filter.HasStatementFilter): 

821 def get_std_format_args(self, bucket): 

822 return { 

823 'account_id': self.manager.config.account_id, 

824 'region': self.manager.config.region, 

825 'bucket_name': bucket['Name'], 

826 'bucket_region': get_region(bucket) 

827 } 

828 

829 

830ENCRYPTION_STATEMENT_GLOB = { 

831 'Effect': 'Deny', 

832 'Principal': '*', 

833 'Action': 's3:PutObject', 

834 "Condition": { 

835 "StringNotEquals": { 

836 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}} 

837 

838 

839@filters.register('no-encryption-statement') 

840class EncryptionEnabledFilter(Filter): 

841 """Find buckets with missing encryption policy statements. 

842 

843 :example: 

844 

845 .. code-block:: yaml 

846 

847 policies: 

848 - name: s3-bucket-not-encrypted 

849 resource: s3 

850 filters: 

851 - type: no-encryption-statement 

852 """ 

853 schema = type_schema( 

854 'no-encryption-statement') 

855 

856 def get_permissions(self): 

857 perms = self.manager.get_resource_manager('s3').get_permissions() 

858 return perms 

859 

860 def process(self, buckets, event=None): 

861 return list(filter(None, map(self.process_bucket, buckets))) 

862 

863 def process_bucket(self, b): 

864 p = b.get('Policy') 

865 if p is None: 

866 return b 

867 p = json.loads(p) 

868 encryption_statement = dict(ENCRYPTION_STATEMENT_GLOB) 

869 

870 statements = p.get('Statement', []) 

871 check = False 

872 for s in list(statements): 

873 if 'Sid' in s: 

874 encryption_statement["Sid"] = s["Sid"] 

875 if 'Resource' in s: 

876 encryption_statement["Resource"] = s["Resource"] 

877 if s == encryption_statement: 

878 check = True 

879 break 

880 if check: 

881 return None 

882 else: 

883 return b 

884 

885 

886@filters.register('missing-statement') 

887@filters.register('missing-policy-statement') 

888class MissingPolicyStatementFilter(Filter): 

889 """Find buckets missing a set of named policy statements. 

890 

891 :example: 

892 

893 .. code-block:: yaml 

894 

895 policies: 

896 - name: s3-bucket-missing-statement 

897 resource: s3 

898 filters: 

899 - type: missing-statement 

900 statement_ids: 

901 - RequiredEncryptedPutObject 

902 """ 

903 

904 schema = type_schema( 

905 'missing-policy-statement', 

906 aliases=('missing-statement',), 

907 statement_ids={'type': 'array', 'items': {'type': 'string'}}) 

908 

909 def __call__(self, b): 

910 p = b.get('Policy') 

911 if p is None: 

912 return b 

913 

914 p = json.loads(p) 

915 

916 required = list(self.data.get('statement_ids', [])) 

917 statements = p.get('Statement', []) 

918 for s in list(statements): 

919 if s.get('Sid') in required: 

920 required.remove(s['Sid']) 

921 if not required: 

922 return False 

923 return True 

924 

925 

926@filters.register('bucket-notification') 

927class BucketNotificationFilter(ValueFilter): 

928 """Filter based on bucket notification configuration. 

929 

930 :example: 

931 

932 .. code-block:: yaml 

933 

934 policies: 

935 - name: delete-incorrect-notification 

936 resource: s3 

937 filters: 

938 - type: bucket-notification 

939 kind: lambda 

940 key: Id 

941 value: "IncorrectLambda" 

942 op: eq 

943 actions: 

944 - type: delete-bucket-notification 

945 statement_ids: matched 

946 """ 

947 

948 schema = type_schema( 

949 'bucket-notification', 

950 required=['kind'], 

951 kind={'type': 'string', 'enum': ['lambda', 'sns', 'sqs']}, 

952 rinherit=ValueFilter.schema) 

953 schema_alias = False 

954 annotation_key = 'c7n:MatchedNotificationConfigurationIds' 

955 

956 permissions = ('s3:GetBucketNotification',) 

957 

958 FIELDS = { 

959 'lambda': 'LambdaFunctionConfigurations', 

960 'sns': 'TopicConfigurations', 

961 'sqs': 'QueueConfigurations' 

962 } 

963 

964 def process(self, buckets, event=None): 

965 return super(BucketNotificationFilter, self).process(buckets, event) 

966 

967 def __call__(self, bucket): 

968 

969 field = self.FIELDS[self.data['kind']] 

970 found = False 

971 for config in bucket.get('Notification', {}).get(field, []): 

972 if self.match(config): 

973 set_annotation( 

974 bucket, 

975 BucketNotificationFilter.annotation_key, 

976 config['Id']) 

977 found = True 

978 return found 

979 

980 

981@filters.register('bucket-logging') 

982class BucketLoggingFilter(BucketFilterBase): 

983 """Filter based on bucket logging configuration. 

984 

985 :example: 

986 

987 .. code-block:: yaml 

988 

989 policies: 

990 - name: add-bucket-logging-if-missing 

991 resource: s3 

992 filters: 

993 - type: bucket-logging 

994 op: disabled 

995 actions: 

996 - type: toggle-logging 

997 target_bucket: "{account_id}-{region}-s3-logs" 

998 target_prefix: "{source_bucket_name}/" 

999 

1000 policies: 

1001 - name: update-incorrect-or-missing-logging 

1002 resource: s3 

1003 filters: 

1004 - type: bucket-logging 

1005 op: not-equal 

1006 target_bucket: "{account_id}-{region}-s3-logs" 

1007 target_prefix: "{account}/{source_bucket_name}/" 

1008 actions: 

1009 - type: toggle-logging 

1010 target_bucket: "{account_id}-{region}-s3-logs" 

1011 target_prefix: "{account}/{source_bucket_name}/" 

1012 """ 

1013 

1014 schema = type_schema( 

1015 'bucket-logging', 

1016 op={'enum': ['enabled', 'disabled', 'equal', 'not-equal', 'eq', 'ne']}, 

1017 required=['op'], 

1018 target_bucket={'type': 'string'}, 

1019 target_prefix={'type': 'string'}) 

1020 schema_alias = False 

1021 account_name = None 

1022 

1023 permissions = ("s3:GetBucketLogging", "iam:ListAccountAliases") 

1024 

1025 def process(self, buckets, event=None): 

1026 return list(filter(None, map(self.process_bucket, buckets))) 

1027 

1028 def process_bucket(self, b): 

1029 if self.match_bucket(b): 

1030 return b 

1031 

1032 def match_bucket(self, b): 

1033 op = self.data.get('op') 

1034 

1035 logging = b.get('Logging', {}) 

1036 if op == 'disabled': 

1037 return logging == {} 

1038 elif op == 'enabled': 

1039 return logging != {} 

1040 

1041 if self.account_name is None: 

1042 session = local_session(self.manager.session_factory) 

1043 self.account_name = get_account_alias_from_sts(session) 

1044 

1045 variables = self.get_std_format_args(b) 

1046 variables.update({ 

1047 'account': self.account_name, 

1048 'source_bucket_name': b['Name'], 

1049 'source_bucket_region': get_region(b), 

1050 'target_bucket_name': self.data.get('target_bucket'), 

1051 'target_prefix': self.data.get('target_prefix'), 

1052 }) 

1053 data = format_string_values(self.data, **variables) 

1054 target_bucket = data.get('target_bucket') 

1055 target_prefix = data.get('target_prefix', b['Name'] + '/') 

1056 

1057 target_config = { 

1058 "TargetBucket": target_bucket, 

1059 "TargetPrefix": target_prefix 

1060 } if target_bucket else {} 

1061 

1062 if op in ('not-equal', 'ne'): 

1063 return logging != target_config 

1064 else: 

1065 return logging == target_config 

1066 

1067 

1068@actions.register('delete-bucket-notification') 

1069class DeleteBucketNotification(BucketActionBase): 

1070 """Action to delete S3 bucket notification configurations""" 

1071 

1072 schema = type_schema( 

1073 'delete-bucket-notification', 

1074 required=['statement_ids'], 

1075 statement_ids={'oneOf': [ 

1076 {'enum': ['matched']}, 

1077 {'type': 'array', 'items': {'type': 'string'}}]}) 

1078 

1079 permissions = ('s3:PutBucketNotification',) 

1080 

1081 def process_bucket(self, bucket): 

1082 n = bucket['Notification'] 

1083 if not n: 

1084 return 

1085 

1086 statement_ids = self.data.get('statement_ids') 

1087 if statement_ids == 'matched': 

1088 statement_ids = bucket.get(BucketNotificationFilter.annotation_key, ()) 

1089 if not statement_ids: 

1090 return 

1091 

1092 cfg = defaultdict(list) 

1093 

1094 for t in BucketNotificationFilter.FIELDS.values(): 

1095 for c in n.get(t, []): 

1096 if c['Id'] not in statement_ids: 

1097 cfg[t].append(c) 

1098 

1099 client = bucket_client(local_session(self.manager.session_factory), bucket) 

1100 client.put_bucket_notification_configuration( 

1101 Bucket=bucket['Name'], 

1102 NotificationConfiguration=cfg) 

1103 

1104 

1105@actions.register('no-op') 

1106class NoOp(BucketActionBase): 

1107 

1108 schema = type_schema('no-op') 

1109 permissions = ('s3:ListAllMyBuckets',) 

1110 

1111 def process(self, buckets): 

1112 return None 

1113 

1114 

1115@actions.register('set-statements') 

1116class SetPolicyStatement(BucketActionBase): 

1117 """Action to add or update policy statements to S3 buckets 

1118 

1119 :example: 

1120 

1121 .. code-block:: yaml 

1122 

1123 policies: 

1124 - name: force-s3-https 

1125 resource: s3 

1126 actions: 

1127 - type: set-statements 

1128 statements: 

1129 - Sid: "DenyHttp" 

1130 Effect: "Deny" 

1131 Action: "s3:GetObject" 

1132 Principal: 

1133 AWS: "*" 

1134 Resource: "arn:aws:s3:::{bucket_name}/*" 

1135 Condition: 

1136 Bool: 

1137 "aws:SecureTransport": false 

1138 """ 

1139 

1140 permissions = ('s3:PutBucketPolicy',) 

1141 

1142 schema = type_schema( 

1143 'set-statements', 

1144 **{ 

1145 'statements': { 

1146 'type': 'array', 

1147 'items': { 

1148 'type': 'object', 

1149 'properties': { 

1150 'Sid': {'type': 'string'}, 

1151 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']}, 

1152 'Principal': {'anyOf': [{'type': 'string'}, 

1153 {'type': 'object'}, {'type': 'array'}]}, 

1154 'NotPrincipal': {'anyOf': [{'type': 'object'}, {'type': 'array'}]}, 

1155 'Action': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1156 'NotAction': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1157 'Resource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1158 'NotResource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

1159 'Condition': {'type': 'object'} 

1160 }, 

1161 'required': ['Sid', 'Effect'], 

1162 'oneOf': [ 

1163 {'required': ['Principal', 'Action', 'Resource']}, 

1164 {'required': ['NotPrincipal', 'Action', 'Resource']}, 

1165 {'required': ['Principal', 'NotAction', 'Resource']}, 

1166 {'required': ['NotPrincipal', 'NotAction', 'Resource']}, 

1167 {'required': ['Principal', 'Action', 'NotResource']}, 

1168 {'required': ['NotPrincipal', 'Action', 'NotResource']}, 

1169 {'required': ['Principal', 'NotAction', 'NotResource']}, 

1170 {'required': ['NotPrincipal', 'NotAction', 'NotResource']} 

1171 ] 

1172 } 

1173 } 

1174 } 

1175 ) 

1176 

1177 def process_bucket(self, bucket): 

1178 policy = bucket.get('Policy') or '{}' 

1179 

1180 target_statements = format_string_values( 

1181 copy.deepcopy({s['Sid']: s for s in self.data.get('statements', [])}), 

1182 **self.get_std_format_args(bucket)) 

1183 

1184 policy = json.loads(policy) 

1185 bucket_statements = policy.setdefault('Statement', []) 

1186 

1187 for s in bucket_statements: 

1188 if s.get('Sid') not in target_statements: 

1189 continue 

1190 if s == target_statements[s['Sid']]: 

1191 target_statements.pop(s['Sid']) 

1192 

1193 if not target_statements: 

1194 return 

1195 

1196 bucket_statements.extend(target_statements.values()) 

1197 policy = json.dumps(policy) 

1198 

1199 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1200 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=policy) 

1201 return {'Name': bucket['Name'], 'Policy': policy} 

1202 

1203 

1204@actions.register('remove-statements') 

1205class RemovePolicyStatement(RemovePolicyBase): 

1206 """Action to remove policy statements from S3 buckets 

1207 

1208 :example: 

1209 

1210 .. code-block:: yaml 

1211 

1212 policies: 

1213 - name: s3-remove-encrypt-put 

1214 resource: s3 

1215 filters: 

1216 - type: has-statement 

1217 statement_ids: 

1218 - RequireEncryptedPutObject 

1219 actions: 

1220 - type: remove-statements 

1221 statement_ids: 

1222 - RequiredEncryptedPutObject 

1223 """ 

1224 

1225 permissions = ("s3:PutBucketPolicy", "s3:DeleteBucketPolicy") 

1226 

1227 def process(self, buckets): 

1228 with self.executor_factory(max_workers=3) as w: 

1229 futures = {} 

1230 results = [] 

1231 for b in buckets: 

1232 futures[w.submit(self.process_bucket, b)] = b 

1233 for f in as_completed(futures): 

1234 if f.exception(): 

1235 b = futures[f] 

1236 self.log.error('error modifying bucket:%s\n%s', 

1237 b['Name'], f.exception()) 

1238 results += filter(None, [f.result()]) 

1239 return results 

1240 

1241 def process_bucket(self, bucket): 

1242 p = bucket.get('Policy') 

1243 if p is None: 

1244 return 

1245 

1246 p = json.loads(p) 

1247 

1248 statements, found = self.process_policy( 

1249 p, bucket, CrossAccountAccessFilter.annotation_key) 

1250 

1251 if not found: 

1252 return 

1253 

1254 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1255 

1256 if not statements: 

1257 s3.delete_bucket_policy(Bucket=bucket['Name']) 

1258 else: 

1259 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=json.dumps(p)) 

1260 return {'Name': bucket['Name'], 'State': 'PolicyRemoved', 'Statements': found} 

1261 

1262 

1263@actions.register('set-replication') 

1264class SetBucketReplicationConfig(BucketActionBase): 

1265 """Action to add or remove replication configuration statement from S3 buckets 

1266 

1267 :example: 

1268 

1269 .. code-block:: yaml 

1270 

1271 policies: 

1272 - name: s3-unapproved-account-replication 

1273 resource: s3 

1274 filters: 

1275 - type: value 

1276 key: Replication.ReplicationConfiguration.Rules[].Destination.Account 

1277 value: present 

1278 - type: value 

1279 key: Replication.ReplicationConfiguration.Rules[].Destination.Account 

1280 value_from: 

1281 url: 's3:///path/to/file.json' 

1282 format: json 

1283 expr: "approved_accounts.*" 

1284 op: ni 

1285 actions: 

1286 - type: set-replication 

1287 state: enable 

1288 """ 

1289 schema = type_schema( 

1290 'set-replication', 

1291 state={'type': 'string', 'enum': ['enable', 'disable', 'remove']}) 

1292 permissions = ("s3:GetReplicationConfiguration", "s3:PutReplicationConfiguration") 

1293 

1294 def process(self, buckets): 

1295 with self.executor_factory(max_workers=3) as w: 

1296 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1297 errors = [] 

1298 for future in as_completed(futures): 

1299 bucket = futures[future] 

1300 try: 

1301 future.result() 

1302 except ClientError as e: 

1303 errors.append("Message: %s Bucket: %s", e, bucket['Name']) 

1304 if errors: 

1305 raise Exception('\n'.join(map(str, errors))) 

1306 

1307 def process_bucket(self, bucket): 

1308 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1309 state = self.data.get('state') 

1310 if state is not None: 

1311 if state == 'remove': 

1312 s3.delete_bucket_replication(Bucket=bucket['Name']) 

1313 return {'Name': bucket['Name'], 'State': 'ReplicationConfigRemoved'} 

1314 if state in ('enable', 'disable'): 

1315 config = s3.get_bucket_replication(Bucket=bucket['Name']) 

1316 for rule in config['ReplicationConfiguration']['Rules']: 

1317 rule['Status'] = 'Enabled' if state == 'enable' else 'Disabled' 

1318 s3.put_bucket_replication( 

1319 Bucket=bucket['Name'], 

1320 ReplicationConfiguration=config['ReplicationConfiguration'] 

1321 ) 

1322 return {'Name': bucket['Name'], 'State': 'ReplicationConfigUpdated'} 

1323 

1324 

1325@filters.register('check-public-block') 

1326class FilterPublicBlock(Filter): 

1327 """Filter for s3 bucket public blocks 

1328 

1329 If no filter paramaters are provided it checks to see if any are unset or False. 

1330 

1331 If parameters are provided only the provided ones are checked. 

1332 

1333 :example: 

1334 

1335 .. code-block:: yaml 

1336 

1337 policies: 

1338 - name: CheckForPublicAclBlock-Off 

1339 resource: s3 

1340 region: us-east-1 

1341 filters: 

1342 - type: check-public-block 

1343 BlockPublicAcls: true 

1344 BlockPublicPolicy: true 

1345 """ 

1346 

1347 schema = type_schema( 

1348 'check-public-block', 

1349 BlockPublicAcls={'type': 'boolean'}, 

1350 IgnorePublicAcls={'type': 'boolean'}, 

1351 BlockPublicPolicy={'type': 'boolean'}, 

1352 RestrictPublicBuckets={'type': 'boolean'}) 

1353 permissions = ("s3:GetBucketPublicAccessBlock",) 

1354 keys = ( 

1355 'BlockPublicPolicy', 'BlockPublicAcls', 'IgnorePublicAcls', 'RestrictPublicBuckets') 

1356 annotation_key = 'c7n:PublicAccessBlock' 

1357 

1358 def process(self, buckets, event=None): 

1359 results = [] 

1360 with self.executor_factory(max_workers=2) as w: 

1361 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1362 for f in as_completed(futures): 

1363 if f.result(): 

1364 results.append(futures[f]) 

1365 return results 

1366 

1367 def process_bucket(self, bucket): 

1368 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1369 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys})) 

1370 if self.annotation_key not in bucket: 

1371 try: 

1372 config = s3.get_public_access_block( 

1373 Bucket=bucket['Name'])['PublicAccessBlockConfiguration'] 

1374 except ClientError as e: 

1375 error_code = e.response['Error']['Code'] 

1376 if error_code == 'NoSuchPublicAccessBlockConfiguration': 

1377 pass 

1378 elif error_code == 'AccessDenied': 

1379 # Follow the same logic as `assemble_bucket` - log and continue on access 

1380 # denied errors rather than halting a policy altogether 

1381 method = 'GetPublicAccessBlock' 

1382 log.warning( 

1383 "Bucket:%s unable to invoke method:%s error:%s ", 

1384 bucket['Name'], method, e.response['Error']['Message'] 

1385 ) 

1386 bucket.setdefault('c7n:DeniedMethods', []).append(method) 

1387 else: 

1388 raise 

1389 bucket[self.annotation_key] = config 

1390 return self.matches_filter(config) 

1391 

1392 def matches_filter(self, config): 

1393 key_set = [key for key in self.keys if key in self.data] 

1394 if key_set: 

1395 return all([self.data.get(key) is config[key] for key in key_set]) 

1396 else: 

1397 return not all(config.values()) 

1398 

1399 

1400@actions.register('set-public-block') 

1401class SetPublicBlock(BucketActionBase): 

1402 """Action to update Public Access blocks on S3 buckets 

1403 

1404 If no action parameters are provided all settings will be set to the `state`, which defaults 

1405 

1406 If action parameters are provided, those will be set and other extant values preserved. 

1407 

1408 :example: 

1409 

1410 .. code-block:: yaml 

1411 

1412 policies: 

1413 - name: s3-public-block-enable-all 

1414 resource: s3 

1415 filters: 

1416 - type: check-public-block 

1417 actions: 

1418 - type: set-public-block 

1419 

1420 policies: 

1421 - name: s3-public-block-disable-all 

1422 resource: s3 

1423 filters: 

1424 - type: check-public-block 

1425 actions: 

1426 - type: set-public-block 

1427 state: false 

1428 

1429 policies: 

1430 - name: s3-public-block-enable-some 

1431 resource: s3 

1432 filters: 

1433 - or: 

1434 - type: check-public-block 

1435 BlockPublicAcls: false 

1436 - type: check-public-block 

1437 BlockPublicPolicy: false 

1438 actions: 

1439 - type: set-public-block 

1440 BlockPublicAcls: true 

1441 BlockPublicPolicy: true 

1442 

1443 """ 

1444 

1445 schema = type_schema( 

1446 'set-public-block', 

1447 state={'type': 'boolean', 'default': True}, 

1448 BlockPublicAcls={'type': 'boolean'}, 

1449 IgnorePublicAcls={'type': 'boolean'}, 

1450 BlockPublicPolicy={'type': 'boolean'}, 

1451 RestrictPublicBuckets={'type': 'boolean'}) 

1452 permissions = ("s3:GetBucketPublicAccessBlock", "s3:PutBucketPublicAccessBlock") 

1453 keys = FilterPublicBlock.keys 

1454 annotation_key = FilterPublicBlock.annotation_key 

1455 

1456 def process(self, buckets): 

1457 with self.executor_factory(max_workers=3) as w: 

1458 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

1459 for future in as_completed(futures): 

1460 future.result() 

1461 

1462 def process_bucket(self, bucket): 

1463 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

1464 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys})) 

1465 if self.annotation_key not in bucket: 

1466 try: 

1467 config = s3.get_public_access_block( 

1468 Bucket=bucket['Name'])['PublicAccessBlockConfiguration'] 

1469 except ClientError as e: 

1470 if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration': 

1471 raise 

1472 

1473 key_set = [key for key in self.keys if key in self.data] 

1474 if key_set: 

1475 for key in key_set: 

1476 config[key] = self.data.get(key) 

1477 else: 

1478 for key in self.keys: 

1479 config[key] = self.data.get('state', True) 

1480 s3.put_public_access_block( 

1481 Bucket=bucket['Name'], PublicAccessBlockConfiguration=config) 

1482 

1483 

1484@actions.register('toggle-versioning') 

1485class ToggleVersioning(BucketActionBase): 

1486 """Action to enable/suspend versioning on a S3 bucket 

1487 

1488 Note versioning can never be disabled only suspended. 

1489 

1490 :example: 

1491 

1492 .. code-block:: yaml 

1493 

1494 policies: 

1495 - name: s3-enable-versioning 

1496 resource: s3 

1497 filters: 

1498 - or: 

1499 - type: value 

1500 key: Versioning.Status 

1501 value: Suspended 

1502 - type: value 

1503 key: Versioning.Status 

1504 value: absent 

1505 actions: 

1506 - type: toggle-versioning 

1507 enabled: true 

1508 """ 

1509 

1510 schema = type_schema( 

1511 'toggle-versioning', 

1512 enabled={'type': 'boolean'}) 

1513 permissions = ("s3:PutBucketVersioning",) 

1514 

1515 def process_versioning(self, resource, state): 

1516 client = bucket_client( 

1517 local_session(self.manager.session_factory), resource) 

1518 try: 

1519 client.put_bucket_versioning( 

1520 Bucket=resource['Name'], 

1521 VersioningConfiguration={ 

1522 'Status': state}) 

1523 except ClientError as e: 

1524 if e.response['Error']['Code'] != 'AccessDenied': 

1525 log.error( 

1526 "Unable to put bucket versioning on bucket %s: %s" % (resource['Name'], e)) 

1527 raise 

1528 log.warning( 

1529 "Access Denied Bucket:%s while put bucket versioning" % resource['Name']) 

1530 

1531 # mfa delete enablement looks like it needs the serial and a current token. 

1532 def process(self, resources): 

1533 enabled = self.data.get('enabled', True) 

1534 for r in resources: 

1535 if 'Versioning' not in r or not r['Versioning']: 

1536 r['Versioning'] = {'Status': 'Suspended'} 

1537 if enabled and ( 

1538 r['Versioning']['Status'] == 'Suspended'): 

1539 self.process_versioning(r, 'Enabled') 

1540 if not enabled and r['Versioning']['Status'] == 'Enabled': 

1541 self.process_versioning(r, 'Suspended') 

1542 

1543 

1544@actions.register('toggle-logging') 

1545class ToggleLogging(BucketActionBase): 

1546 """Action to enable/disable logging on a S3 bucket. 

1547 

1548 Target bucket ACL must allow for WRITE and READ_ACP Permissions 

1549 Not specifying a target_prefix will default to the current bucket name. 

1550 https://docs.aws.amazon.com/AmazonS3/latest/dev/enable-logging-programming.html 

1551 

1552 :example: 

1553 

1554 .. code-block:: yaml 

1555 

1556 policies: 

1557 - name: s3-enable-logging 

1558 resource: s3 

1559 filters: 

1560 - "tag:Testing": present 

1561 actions: 

1562 - type: toggle-logging 

1563 target_bucket: log-bucket 

1564 target_prefix: logs123/ 

1565 

1566 policies: 

1567 - name: s3-force-standard-logging 

1568 resource: s3 

1569 filters: 

1570 - type: bucket-logging 

1571 op: not-equal 

1572 target_bucket: "{account_id}-{region}-s3-logs" 

1573 target_prefix: "{account}/{source_bucket_name}/" 

1574 actions: 

1575 - type: toggle-logging 

1576 target_bucket: "{account_id}-{region}-s3-logs" 

1577 target_prefix: "{account}/{source_bucket_name}/" 

1578 """ 

1579 schema = type_schema( 

1580 'toggle-logging', 

1581 enabled={'type': 'boolean'}, 

1582 target_bucket={'type': 'string'}, 

1583 target_prefix={'type': 'string'}) 

1584 

1585 permissions = ("s3:PutBucketLogging", "iam:ListAccountAliases") 

1586 

1587 def validate(self): 

1588 if self.data.get('enabled', True): 

1589 if not self.data.get('target_bucket'): 

1590 raise PolicyValidationError( 

1591 "target_bucket must be specified on %s" % ( 

1592 self.manager.data,)) 

1593 return self 

1594 

1595 def process(self, resources): 

1596 session = local_session(self.manager.session_factory) 

1597 kwargs = { 

1598 "enabled": self.data.get('enabled', True), 

1599 "session": session, 

1600 "account_name": get_account_alias_from_sts(session), 

1601 } 

1602 

1603 return self._process_with_futures(resources, **kwargs) 

1604 

1605 def process_bucket(self, r, enabled=None, session=None, account_name=None): 

1606 client = bucket_client(session, r) 

1607 is_logging = bool(r.get('Logging')) 

1608 

1609 if enabled: 

1610 variables = self.get_std_format_args(r) 

1611 variables.update({ 

1612 'account': account_name, 

1613 'source_bucket_name': r['Name'], 

1614 'source_bucket_region': get_region(r), 

1615 'target_bucket_name': self.data.get('target_bucket'), 

1616 'target_prefix': self.data.get('target_prefix'), 

1617 }) 

1618 data = format_string_values(self.data, **variables) 

1619 config = { 

1620 'TargetBucket': data.get('target_bucket'), 

1621 'TargetPrefix': data.get('target_prefix', r['Name'] + '/') 

1622 } 

1623 if not is_logging or r.get('Logging') != config: 

1624 client.put_bucket_logging( 

1625 Bucket=r['Name'], 

1626 BucketLoggingStatus={'LoggingEnabled': config} 

1627 ) 

1628 r['Logging'] = config 

1629 

1630 elif not enabled and is_logging: 

1631 client.put_bucket_logging( 

1632 Bucket=r['Name'], BucketLoggingStatus={}) 

1633 r['Logging'] = {} 

1634 

1635 

1636@actions.register('attach-encrypt') 

1637class AttachLambdaEncrypt(BucketActionBase): 

1638 """Action attaches lambda encryption policy to S3 bucket 

1639 supports attachment via lambda bucket notification or sns notification 

1640 to invoke lambda. a special topic value of `default` will utilize an 

1641 extant notification or create one matching the bucket name. 

1642 

1643 :example: 

1644 

1645 

1646 .. code-block:: yaml 

1647 

1648 

1649 policies: 

1650 - name: attach-lambda-encrypt 

1651 resource: s3 

1652 filters: 

1653 - type: missing-policy-statement 

1654 actions: 

1655 - type: attach-encrypt 

1656 role: arn:aws:iam::123456789012:role/my-role 

1657 

1658 """ 

1659 schema = type_schema( 

1660 'attach-encrypt', 

1661 role={'type': 'string'}, 

1662 tags={'type': 'object'}, 

1663 topic={'type': 'string'}) 

1664 

1665 permissions = ( 

1666 "s3:PutBucketNotification", "s3:GetBucketNotification", 

1667 # lambda manager uses quite a few perms to provision lambdas 

1668 # and event sources, hard to disamgibuate punt for now. 

1669 "lambda:*", 

1670 ) 

1671 

1672 def __init__(self, data=None, manager=None): 

1673 self.data = data or {} 

1674 self.manager = manager 

1675 

1676 def validate(self): 

1677 if (not getattr(self.manager.config, 'dryrun', True) and 

1678 not self.data.get('role', self.manager.config.assume_role)): 

1679 raise PolicyValidationError( 

1680 "attach-encrypt: role must be specified either " 

1681 "via assume or in config on %s" % (self.manager.data,)) 

1682 

1683 return self 

1684 

1685 def process(self, buckets): 

1686 from c7n.mu import LambdaManager 

1687 from c7n.ufuncs.s3crypt import get_function 

1688 

1689 account_id = self.manager.config.account_id 

1690 topic_arn = self.data.get('topic') 

1691 

1692 func = get_function( 

1693 None, self.data.get('role', self.manager.config.assume_role), 

1694 account_id=account_id, tags=self.data.get('tags')) 

1695 

1696 regions = {get_region(b) for b in buckets} 

1697 

1698 # session managers by region 

1699 region_sessions = {} 

1700 for r in regions: 

1701 region_sessions[r] = functools.partial( 

1702 self.manager.session_factory, region=r) 

1703 

1704 # Publish function to all of our buckets regions 

1705 region_funcs = {} 

1706 

1707 for r in regions: 

1708 lambda_mgr = LambdaManager(region_sessions[r]) 

1709 lambda_mgr.publish(func) 

1710 region_funcs[r] = func 

1711 

1712 with self.executor_factory(max_workers=3) as w: 

1713 results = [] 

1714 futures = [] 

1715 for b in buckets: 

1716 region = get_region(b) 

1717 futures.append( 

1718 w.submit( 

1719 self.process_bucket, 

1720 region_funcs[region], 

1721 b, 

1722 topic_arn, 

1723 account_id, 

1724 region_sessions[region] 

1725 )) 

1726 for f in as_completed(futures): 

1727 if f.exception(): 

1728 log.exception( 

1729 "Error attaching lambda-encrypt %s" % (f.exception())) 

1730 results.append(f.result()) 

1731 return list(filter(None, results)) 

1732 

1733 def process_bucket(self, func, bucket, topic, account_id, session_factory): 

1734 from c7n.mu import BucketSNSNotification, BucketLambdaNotification 

1735 if topic: 

1736 topic = None if topic == 'default' else topic 

1737 source = BucketSNSNotification(session_factory, bucket, topic) 

1738 else: 

1739 source = BucketLambdaNotification( 

1740 {'account_s3': account_id}, session_factory, bucket) 

1741 return source.add(func) 

1742 

1743 

1744@actions.register('encryption-policy') 

1745class EncryptionRequiredPolicy(BucketActionBase): 

1746 """Action to apply an encryption policy to S3 buckets 

1747 

1748 

1749 :example: 

1750 

1751 .. code-block:: yaml 

1752 

1753 policies: 

1754 - name: s3-enforce-encryption 

1755 resource: s3 

1756 mode: 

1757 type: cloudtrail 

1758 events: 

1759 - CreateBucket 

1760 actions: 

1761 - encryption-policy 

1762 """ 

1763 

1764 permissions = ("s3:GetBucketPolicy", "s3:PutBucketPolicy") 

1765 schema = type_schema('encryption-policy') 

1766 

1767 def __init__(self, data=None, manager=None): 

1768 self.data = data or {} 

1769 self.manager = manager 

1770 

1771 def process(self, buckets): 

1772 with self.executor_factory(max_workers=3) as w: 

1773 results = w.map(self.process_bucket, buckets) 

1774 results = list(filter(None, list(results))) 

1775 return results 

1776 

1777 def process_bucket(self, b): 

1778 p = b['Policy'] 

1779 if p is None: 

1780 log.info("No policy found, creating new") 

1781 p = {'Version': "2012-10-17", "Statement": []} 

1782 else: 

1783 p = json.loads(p) 

1784 

1785 encryption_sid = "RequiredEncryptedPutObject" 

1786 encryption_statement = { 

1787 'Sid': encryption_sid, 

1788 'Effect': 'Deny', 

1789 'Principal': '*', 

1790 'Action': 's3:PutObject', 

1791 "Resource": "arn:aws:s3:::%s/*" % b['Name'], 

1792 "Condition": { 

1793 # AWS Managed Keys or KMS keys, note policy language 

1794 # does not support custom kms (todo add issue) 

1795 "StringNotEquals": { 

1796 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}} 

1797 

1798 statements = p.get('Statement', []) 

1799 for s in list(statements): 

1800 if s.get('Sid', '') == encryption_sid: 

1801 log.debug("Bucket:%s Found extant encrypt policy", b['Name']) 

1802 if s != encryption_statement: 

1803 log.info( 

1804 "Bucket:%s updating extant encrypt policy", b['Name']) 

1805 statements.remove(s) 

1806 else: 

1807 return 

1808 

1809 session = self.manager.session_factory() 

1810 s3 = bucket_client(session, b) 

1811 statements.append(encryption_statement) 

1812 p['Statement'] = statements 

1813 log.info('Bucket:%s attached encryption policy' % b['Name']) 

1814 

1815 try: 

1816 s3.put_bucket_policy( 

1817 Bucket=b['Name'], 

1818 Policy=json.dumps(p)) 

1819 except ClientError as e: 

1820 if e.response['Error']['Code'] == 'NoSuchBucket': 

1821 return 

1822 self.log.exception( 

1823 "Error on bucket:%s putting policy\n%s error:%s", 

1824 b['Name'], 

1825 json.dumps(statements, indent=2), e) 

1826 raise 

1827 return {'Name': b['Name'], 'State': 'PolicyAttached'} 

1828 

1829 

1830class BucketScanLog: 

1831 """Offload remediated key ids to a disk file in batches 

1832 

1833 A bucket keyspace is effectively infinite, we need to store partial 

1834 results out of memory, this class provides for a json log on disk 

1835 with partial write support. 

1836 

1837 json output format: 

1838 - [list_of_serialized_keys], 

1839 - [] # Empty list of keys at end when we close the buffer 

1840 

1841 """ 

1842 

1843 def __init__(self, log_dir, name): 

1844 self.log_dir = log_dir 

1845 self.name = name 

1846 self.fh = None 

1847 self.count = 0 

1848 

1849 @property 

1850 def path(self): 

1851 return os.path.join(self.log_dir, "%s.json" % self.name) 

1852 

1853 def __enter__(self): 

1854 # Don't require output directories 

1855 if self.log_dir is None: 

1856 return 

1857 

1858 self.fh = open(self.path, 'w') 

1859 self.fh.write("[\n") 

1860 return self 

1861 

1862 def __exit__(self, exc_type=None, exc_value=None, exc_frame=None): 

1863 if self.fh is None: 

1864 return 

1865 # we need an empty marker list at end to avoid trailing commas 

1866 self.fh.write("[]") 

1867 # and close the surrounding list 

1868 self.fh.write("\n]") 

1869 self.fh.close() 

1870 if not self.count: 

1871 os.remove(self.fh.name) 

1872 self.fh = None 

1873 return False 

1874 

1875 def add(self, keys): 

1876 self.count += len(keys) 

1877 if self.fh is None: 

1878 return 

1879 self.fh.write(dumps(keys)) 

1880 self.fh.write(",\n") 

1881 

1882 

1883class ScanBucket(BucketActionBase): 

1884 

1885 permissions = ("s3:ListBucket",) 

1886 

1887 bucket_ops = { 

1888 'standard': { 

1889 'iterator': 'list_objects', 

1890 'contents_key': ['Contents'], 

1891 'key_processor': 'process_key' 

1892 }, 

1893 'versioned': { 

1894 'iterator': 'list_object_versions', 

1895 'contents_key': ['Versions'], 

1896 'key_processor': 'process_version' 

1897 } 

1898 } 

1899 

1900 def __init__(self, data, manager=None): 

1901 super(ScanBucket, self).__init__(data, manager) 

1902 self.denied_buckets = set() 

1903 

1904 def get_bucket_style(self, b): 

1905 return ( 

1906 b.get('Versioning', {'Status': ''}).get('Status') in ( 

1907 'Enabled', 'Suspended') and 'versioned' or 'standard') 

1908 

1909 def get_bucket_op(self, b, op_name): 

1910 bucket_style = self.get_bucket_style(b) 

1911 op = self.bucket_ops[bucket_style][op_name] 

1912 if op_name == 'key_processor': 

1913 return getattr(self, op) 

1914 return op 

1915 

1916 def get_keys(self, b, key_set): 

1917 content_keys = self.get_bucket_op(b, 'contents_key') 

1918 keys = [] 

1919 for ck in content_keys: 

1920 keys.extend(key_set.get(ck, [])) 

1921 return keys 

1922 

1923 def process(self, buckets): 

1924 results = self._process_with_futures(self.process_bucket, buckets) 

1925 self.write_denied_buckets_file() 

1926 return results 

1927 

1928 def _process_with_futures(self, helper, buckets, max_workers=3): 

1929 results = [] 

1930 with self.executor_factory(max_workers) as w: 

1931 futures = {} 

1932 for b in buckets: 

1933 futures[w.submit(helper, b)] = b 

1934 for f in as_completed(futures): 

1935 if f.exception(): 

1936 b = futures[f] 

1937 self.log.error( 

1938 "Error on bucket:%s region:%s policy:%s error: %s", 

1939 b['Name'], b.get('Location', 'unknown'), 

1940 self.manager.data.get('name'), f.exception()) 

1941 self.denied_buckets.add(b['Name']) 

1942 continue 

1943 result = f.result() 

1944 if result: 

1945 results.append(result) 

1946 return results 

1947 

1948 def write_denied_buckets_file(self): 

1949 if (self.denied_buckets and 

1950 self.manager.ctx.log_dir and 

1951 not isinstance(self.manager.ctx.output, NullBlobOutput)): 

1952 with open( 

1953 os.path.join( 

1954 self.manager.ctx.log_dir, 'denied.json'), 'w') as fh: 

1955 json.dump(list(self.denied_buckets), fh, indent=2) 

1956 self.denied_buckets = set() 

1957 

1958 def process_bucket(self, b): 

1959 log.info( 

1960 "Scanning bucket:%s visitor:%s style:%s" % ( 

1961 b['Name'], self.__class__.__name__, self.get_bucket_style(b))) 

1962 

1963 s = self.manager.session_factory() 

1964 s3 = bucket_client(s, b) 

1965 

1966 # The bulk of _process_bucket function executes inline in 

1967 # calling thread/worker context, neither paginator nor 

1968 # bucketscan log should be used across worker boundary. 

1969 p = s3.get_paginator( 

1970 self.get_bucket_op(b, 'iterator')).paginate(Bucket=b['Name']) 

1971 

1972 with BucketScanLog(self.manager.ctx.log_dir, b['Name']) as key_log: 

1973 with self.executor_factory(max_workers=10) as w: 

1974 try: 

1975 return self._process_bucket(b, p, key_log, w) 

1976 except ClientError as e: 

1977 if e.response['Error']['Code'] == 'NoSuchBucket': 

1978 log.warning( 

1979 "Bucket:%s removed while scanning" % b['Name']) 

1980 return 

1981 if e.response['Error']['Code'] == 'AccessDenied': 

1982 log.warning( 

1983 "Access Denied Bucket:%s while scanning" % b['Name']) 

1984 self.denied_buckets.add(b['Name']) 

1985 return 

1986 log.exception( 

1987 "Error processing bucket:%s paginator:%s" % ( 

1988 b['Name'], p)) 

1989 

1990 __call__ = process_bucket 

1991 

1992 def _process_bucket(self, b, p, key_log, w): 

1993 count = 0 

1994 

1995 for key_set in p: 

1996 keys = self.get_keys(b, key_set) 

1997 count += len(keys) 

1998 futures = [] 

1999 

2000 for batch in chunks(keys, size=100): 

2001 if not batch: 

2002 continue 

2003 futures.append(w.submit(self.process_chunk, batch, b)) 

2004 

2005 for f in as_completed(futures): 

2006 if f.exception(): 

2007 log.exception("Exception Processing bucket:%s key batch %s" % ( 

2008 b['Name'], f.exception())) 

2009 continue 

2010 r = f.result() 

2011 if r: 

2012 key_log.add(r) 

2013 

2014 # Log completion at info level, progress at debug level 

2015 if key_set['IsTruncated']: 

2016 log.debug('Scan progress bucket:%s keys:%d remediated:%d ...', 

2017 b['Name'], count, key_log.count) 

2018 else: 

2019 log.info('Scan Complete bucket:%s keys:%d remediated:%d', 

2020 b['Name'], count, key_log.count) 

2021 

2022 b['KeyScanCount'] = count 

2023 b['KeyRemediated'] = key_log.count 

2024 return { 

2025 'Bucket': b['Name'], 'Remediated': key_log.count, 'Count': count} 

2026 

2027 def process_chunk(self, batch, bucket): 

2028 raise NotImplementedError() 

2029 

2030 def process_key(self, s3, key, bucket_name, info=None): 

2031 raise NotImplementedError() 

2032 

2033 def process_version(self, s3, bucket, key): 

2034 raise NotImplementedError() 

2035 

2036 

2037@actions.register('encrypt-keys') 

2038class EncryptExtantKeys(ScanBucket): 

2039 """Action to encrypt unencrypted S3 objects 

2040 

2041 :example: 

2042 

2043 .. code-block:: yaml 

2044 

2045 policies: 

2046 - name: s3-encrypt-objects 

2047 resource: s3 

2048 actions: 

2049 - type: encrypt-keys 

2050 crypto: aws:kms 

2051 key-id: 9c3983be-c6cf-11e6-9d9d-cec0c932ce01 

2052 """ 

2053 

2054 permissions = ( 

2055 "s3:GetObject", 

2056 "s3:PutObject", 

2057 "s3:DeleteObjectVersion", 

2058 "s3:RestoreObject", 

2059 ) + ScanBucket.permissions 

2060 

2061 schema = { 

2062 'type': 'object', 

2063 'additionalProperties': False, 

2064 'properties': { 

2065 'type': {'enum': ['encrypt-keys']}, 

2066 'report-only': {'type': 'boolean'}, 

2067 'glacier': {'type': 'boolean'}, 

2068 'large': {'type': 'boolean'}, 

2069 'crypto': {'enum': ['AES256', 'aws:kms']}, 

2070 'key-id': {'type': 'string'} 

2071 }, 

2072 'dependencies': { 

2073 'key-id': { 

2074 'properties': { 

2075 'crypto': {'pattern': 'aws:kms'} 

2076 }, 

2077 'required': ['crypto'] 

2078 } 

2079 } 

2080 } 

2081 

2082 metrics = [ 

2083 ('Total Keys', {'Scope': 'Account'}), 

2084 ('Unencrypted', {'Scope': 'Account'})] 

2085 

2086 def __init__(self, data, manager=None): 

2087 super(EncryptExtantKeys, self).__init__(data, manager) 

2088 self.kms_id = self.data.get('key-id') 

2089 

2090 def get_permissions(self): 

2091 perms = ("s3:GetObject", "s3:GetObjectVersion") 

2092 if self.data.get('report-only'): 

2093 perms += ('s3:DeleteObject', 's3:DeleteObjectVersion', 

2094 's3:PutObject', 

2095 's3:AbortMultipartUpload', 

2096 's3:ListBucket', 

2097 's3:ListBucketVersions') 

2098 return perms 

2099 

2100 def process(self, buckets): 

2101 

2102 t = time.time() 

2103 results = super(EncryptExtantKeys, self).process(buckets) 

2104 run_time = time.time() - t 

2105 remediated_count = object_count = 0 

2106 

2107 for r in results: 

2108 object_count += r['Count'] 

2109 remediated_count += r['Remediated'] 

2110 self.manager.ctx.metrics.put_metric( 

2111 "Unencrypted", r['Remediated'], "Count", Scope=r['Bucket'], 

2112 buffer=True) 

2113 

2114 self.manager.ctx.metrics.put_metric( 

2115 "Unencrypted", remediated_count, "Count", Scope="Account", 

2116 buffer=True 

2117 ) 

2118 self.manager.ctx.metrics.put_metric( 

2119 "Total Keys", object_count, "Count", Scope="Account", 

2120 buffer=True 

2121 ) 

2122 self.manager.ctx.metrics.flush() 

2123 

2124 log.info( 

2125 ("EncryptExtant Complete keys:%d " 

2126 "remediated:%d rate:%0.2f/s time:%0.2fs"), 

2127 object_count, 

2128 remediated_count, 

2129 float(object_count) / run_time if run_time else 0, 

2130 run_time) 

2131 return results 

2132 

2133 def process_chunk(self, batch, bucket): 

2134 crypto_method = self.data.get('crypto', 'AES256') 

2135 s3 = bucket_client( 

2136 local_session(self.manager.session_factory), bucket, 

2137 kms=(crypto_method == 'aws:kms')) 

2138 b = bucket['Name'] 

2139 results = [] 

2140 key_processor = self.get_bucket_op(bucket, 'key_processor') 

2141 for key in batch: 

2142 r = key_processor(s3, key, b) 

2143 if r: 

2144 results.append(r) 

2145 return results 

2146 

2147 def process_key(self, s3, key, bucket_name, info=None): 

2148 k = key['Key'] 

2149 if info is None: 

2150 info = s3.head_object(Bucket=bucket_name, Key=k) 

2151 

2152 # If the data is already encrypted with AES256 and this request is also 

2153 # for AES256 then we don't need to do anything 

2154 if info.get('ServerSideEncryption') == 'AES256' and not self.kms_id: 

2155 return False 

2156 

2157 if info.get('ServerSideEncryption') == 'aws:kms': 

2158 # If we're not looking for a specific key any key will do. 

2159 if not self.kms_id: 

2160 return False 

2161 # If we're configured to use a specific key and the key matches 

2162 # note this is not a strict equality match. 

2163 if self.kms_id in info.get('SSEKMSKeyId', ''): 

2164 return False 

2165 

2166 if self.data.get('report-only'): 

2167 return k 

2168 

2169 storage_class = info.get('StorageClass', 'STANDARD') 

2170 

2171 if storage_class == 'GLACIER': 

2172 if not self.data.get('glacier'): 

2173 return False 

2174 if 'Restore' not in info: 

2175 # This takes multiple hours, we let the next c7n 

2176 # run take care of followups. 

2177 s3.restore_object( 

2178 Bucket=bucket_name, 

2179 Key=k, 

2180 RestoreRequest={'Days': 30}) 

2181 return False 

2182 elif not restore_complete(info['Restore']): 

2183 return False 

2184 

2185 storage_class = 'STANDARD' 

2186 

2187 crypto_method = self.data.get('crypto', 'AES256') 

2188 key_id = self.data.get('key-id') 

2189 # Note on copy we lose individual object acl grants 

2190 params = {'Bucket': bucket_name, 

2191 'Key': k, 

2192 'CopySource': "/%s/%s" % (bucket_name, k), 

2193 'MetadataDirective': 'COPY', 

2194 'StorageClass': storage_class, 

2195 'ServerSideEncryption': crypto_method} 

2196 

2197 if key_id and crypto_method == 'aws:kms': 

2198 params['SSEKMSKeyId'] = key_id 

2199 

2200 if info['ContentLength'] > MAX_COPY_SIZE and self.data.get( 

2201 'large', True): 

2202 return self.process_large_file(s3, bucket_name, key, info, params) 

2203 

2204 s3.copy_object(**params) 

2205 return k 

2206 

2207 def process_version(self, s3, key, bucket_name): 

2208 info = s3.head_object( 

2209 Bucket=bucket_name, 

2210 Key=key['Key'], 

2211 VersionId=key['VersionId']) 

2212 

2213 if 'ServerSideEncryption' in info: 

2214 return False 

2215 

2216 if self.data.get('report-only'): 

2217 return key['Key'], key['VersionId'] 

2218 

2219 if key['IsLatest']: 

2220 r = self.process_key(s3, key, bucket_name, info) 

2221 # Glacier request processing, wait till we have the restored object 

2222 if not r: 

2223 return r 

2224 s3.delete_object( 

2225 Bucket=bucket_name, 

2226 Key=key['Key'], 

2227 VersionId=key['VersionId']) 

2228 return key['Key'], key['VersionId'] 

2229 

2230 def process_large_file(self, s3, bucket_name, key, info, params): 

2231 """For objects over 5gb, use multipart upload to copy""" 

2232 part_size = MAX_COPY_SIZE - (1024 ** 2) 

2233 num_parts = int(math.ceil(info['ContentLength'] / part_size)) 

2234 source = params.pop('CopySource') 

2235 

2236 params.pop('MetadataDirective') 

2237 if 'Metadata' in info: 

2238 params['Metadata'] = info['Metadata'] 

2239 

2240 upload_id = s3.create_multipart_upload(**params)['UploadId'] 

2241 

2242 params = {'Bucket': bucket_name, 

2243 'Key': key['Key'], 

2244 'UploadId': upload_id, 

2245 'CopySource': source, 

2246 'CopySourceIfMatch': info['ETag']} 

2247 

2248 def upload_part(part_num): 

2249 part_params = dict(params) 

2250 part_params['CopySourceRange'] = "bytes=%d-%d" % ( 

2251 part_size * (part_num - 1), 

2252 min(part_size * part_num - 1, info['ContentLength'] - 1)) 

2253 part_params['PartNumber'] = part_num 

2254 response = s3.upload_part_copy(**part_params) 

2255 return {'ETag': response['CopyPartResult']['ETag'], 

2256 'PartNumber': part_num} 

2257 

2258 try: 

2259 with self.executor_factory(max_workers=2) as w: 

2260 parts = list(w.map(upload_part, range(1, num_parts + 1))) 

2261 except Exception: 

2262 log.warning( 

2263 "Error during large key copy bucket: %s key: %s, " 

2264 "aborting upload", bucket_name, key, exc_info=True) 

2265 s3.abort_multipart_upload( 

2266 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id) 

2267 raise 

2268 s3.complete_multipart_upload( 

2269 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id, 

2270 MultipartUpload={'Parts': parts}) 

2271 return key['Key'] 

2272 

2273 

2274def restore_complete(restore): 

2275 if ',' in restore: 

2276 ongoing, avail = restore.split(',', 1) 

2277 else: 

2278 ongoing = restore 

2279 return 'false' in ongoing 

2280 

2281 

2282@filters.register('is-log-target') 

2283class LogTarget(Filter): 

2284 """Filter and return buckets are log destinations. 

2285 

2286 Not suitable for use in lambda on large accounts, This is a api 

2287 heavy process to detect scan all possible log sources. 

2288 

2289 Sources: 

2290 - elb (Access Log) 

2291 - s3 (Access Log) 

2292 - cfn (Template writes) 

2293 - cloudtrail 

2294 

2295 :example: 

2296 

2297 .. code-block:: yaml 

2298 

2299 policies: 

2300 - name: s3-log-bucket 

2301 resource: s3 

2302 filters: 

2303 - type: is-log-target 

2304 """ 

2305 

2306 schema = type_schema( 

2307 'is-log-target', 

2308 services={'type': 'array', 'items': {'enum': [ 

2309 's3', 'elb', 'cloudtrail']}}, 

2310 self={'type': 'boolean'}, 

2311 value={'type': 'boolean'}) 

2312 

2313 def get_permissions(self): 

2314 perms = self.manager.get_resource_manager('elb').get_permissions() 

2315 perms += ('elasticloadbalancing:DescribeLoadBalancerAttributes',) 

2316 return perms 

2317 

2318 def process(self, buckets, event=None): 

2319 log_buckets = set() 

2320 count = 0 

2321 

2322 services = self.data.get('services', ['elb', 's3', 'cloudtrail']) 

2323 self_log = self.data.get('self', False) 

2324 

2325 if 'elb' in services and not self_log: 

2326 for bucket, _ in self.get_elb_bucket_locations(): 

2327 log_buckets.add(bucket) 

2328 count += 1 

2329 self.log.debug("Found %d elb log targets" % count) 

2330 

2331 if 's3' in services: 

2332 count = 0 

2333 for bucket, _ in self.get_s3_bucket_locations(buckets, self_log): 

2334 count += 1 

2335 log_buckets.add(bucket) 

2336 self.log.debug('Found %d s3 log targets' % count) 

2337 

2338 if 'cloudtrail' in services and not self_log: 

2339 for bucket, _ in self.get_cloud_trail_locations(buckets): 

2340 log_buckets.add(bucket) 

2341 

2342 self.log.info("Found %d log targets for %d buckets" % ( 

2343 len(log_buckets), len(buckets))) 

2344 if self.data.get('value', True): 

2345 return [b for b in buckets if b['Name'] in log_buckets] 

2346 else: 

2347 return [b for b in buckets if b['Name'] not in log_buckets] 

2348 

2349 @staticmethod 

2350 def get_s3_bucket_locations(buckets, self_log=False): 

2351 """return (bucket_name, prefix) for all s3 logging targets""" 

2352 for b in buckets: 

2353 if b.get('Logging'): 

2354 if self_log: 

2355 if b['Name'] != b['Logging']['TargetBucket']: 

2356 continue 

2357 yield (b['Logging']['TargetBucket'], 

2358 b['Logging']['TargetPrefix']) 

2359 if not self_log and b['Name'].startswith('cf-templates-'): 

2360 yield (b['Name'], '') 

2361 

2362 def get_cloud_trail_locations(self, buckets): 

2363 session = local_session(self.manager.session_factory) 

2364 client = session.client('cloudtrail') 

2365 names = {b['Name'] for b in buckets} 

2366 for t in client.describe_trails().get('trailList', ()): 

2367 if t.get('S3BucketName') in names: 

2368 yield (t['S3BucketName'], t.get('S3KeyPrefix', '')) 

2369 

2370 def get_elb_bucket_locations(self): 

2371 elbs = self.manager.get_resource_manager('elb').resources() 

2372 get_elb_attrs = functools.partial( 

2373 _query_elb_attrs, self.manager.session_factory) 

2374 

2375 with self.executor_factory(max_workers=2) as w: 

2376 futures = [] 

2377 for elb_set in chunks(elbs, 100): 

2378 futures.append(w.submit(get_elb_attrs, elb_set)) 

2379 for f in as_completed(futures): 

2380 if f.exception(): 

2381 log.error("Error while scanning elb log targets: %s" % ( 

2382 f.exception())) 

2383 continue 

2384 for tgt in f.result(): 

2385 yield tgt 

2386 

2387 

2388def _query_elb_attrs(session_factory, elb_set): 

2389 session = local_session(session_factory) 

2390 client = session.client('elb') 

2391 log_targets = [] 

2392 for e in elb_set: 

2393 try: 

2394 attrs = client.describe_load_balancer_attributes( 

2395 LoadBalancerName=e['LoadBalancerName'])[ 

2396 'LoadBalancerAttributes'] 

2397 if 'AccessLog' in attrs and attrs['AccessLog']['Enabled']: 

2398 log_targets.append(( 

2399 attrs['AccessLog']['S3BucketName'], 

2400 attrs['AccessLog']['S3BucketPrefix'])) 

2401 except Exception as err: 

2402 log.warning( 

2403 "Could not retrieve load balancer %s: %s" % ( 

2404 e['LoadBalancerName'], err)) 

2405 return log_targets 

2406 

2407 

2408@actions.register('remove-website-hosting') 

2409class RemoveWebsiteHosting(BucketActionBase): 

2410 """Action that removes website hosting configuration.""" 

2411 

2412 schema = type_schema('remove-website-hosting') 

2413 

2414 permissions = ('s3:DeleteBucketWebsite',) 

2415 

2416 def process(self, buckets): 

2417 session = local_session(self.manager.session_factory) 

2418 for bucket in buckets: 

2419 client = bucket_client(session, bucket) 

2420 client.delete_bucket_website(Bucket=bucket['Name']) 

2421 

2422 

2423@actions.register('delete-global-grants') 

2424class DeleteGlobalGrants(BucketActionBase): 

2425 """Deletes global grants associated to a S3 bucket 

2426 

2427 :example: 

2428 

2429 .. code-block:: yaml 

2430 

2431 policies: 

2432 - name: s3-delete-global-grants 

2433 resource: s3 

2434 filters: 

2435 - type: global-grants 

2436 actions: 

2437 - delete-global-grants 

2438 """ 

2439 

2440 schema = type_schema( 

2441 'delete-global-grants', 

2442 grantees={'type': 'array', 'items': {'type': 'string'}}) 

2443 

2444 permissions = ('s3:PutBucketAcl',) 

2445 

2446 def process(self, buckets): 

2447 with self.executor_factory(max_workers=5) as w: 

2448 return list(filter(None, list(w.map(self.process_bucket, buckets)))) 

2449 

2450 def process_bucket(self, b): 

2451 grantees = self.data.get( 

2452 'grantees', [ 

2453 GlobalGrantsFilter.AUTH_ALL, GlobalGrantsFilter.GLOBAL_ALL]) 

2454 

2455 log.info(b) 

2456 

2457 acl = b.get('Acl', {'Grants': []}) 

2458 if not acl or not acl['Grants']: 

2459 return 

2460 new_grants = [] 

2461 for grant in acl['Grants']: 

2462 grantee = grant.get('Grantee', {}) 

2463 if not grantee: 

2464 continue 

2465 # Yuck, 'get_bucket_acl' doesn't return the grantee type. 

2466 if 'URI' in grantee: 

2467 grantee['Type'] = 'Group' 

2468 else: 

2469 grantee['Type'] = 'CanonicalUser' 

2470 if ('URI' in grantee and 

2471 grantee['URI'] in grantees and not 

2472 (grant['Permission'] == 'READ' and b['Website'])): 

2473 # Remove this grantee. 

2474 pass 

2475 else: 

2476 new_grants.append(grant) 

2477 

2478 log.info({'Owner': acl['Owner'], 'Grants': new_grants}) 

2479 

2480 c = bucket_client(self.manager.session_factory(), b) 

2481 try: 

2482 c.put_bucket_acl( 

2483 Bucket=b['Name'], 

2484 AccessControlPolicy={ 

2485 'Owner': acl['Owner'], 'Grants': new_grants}) 

2486 except ClientError as e: 

2487 if e.response['Error']['Code'] == 'NoSuchBucket': 

2488 return 

2489 return b 

2490 

2491 

2492@actions.register('tag') 

2493class BucketTag(Tag): 

2494 """Action to create tags on a S3 bucket 

2495 

2496 :example: 

2497 

2498 .. code-block:: yaml 

2499 

2500 policies: 

2501 - name: s3-tag-region 

2502 resource: s3 

2503 region: us-east-1 

2504 filters: 

2505 - "tag:RegionName": absent 

2506 actions: 

2507 - type: tag 

2508 key: RegionName 

2509 value: us-east-1 

2510 """ 

2511 

2512 def process_resource_set(self, client, resource_set, tags): 

2513 modify_bucket_tags(self.manager.session_factory, resource_set, tags) 

2514 

2515 

2516@actions.register('mark-for-op') 

2517class MarkBucketForOp(TagDelayedAction): 

2518 """Action schedules custodian to perform an action at a certain date 

2519 

2520 :example: 

2521 

2522 .. code-block:: yaml 

2523 

2524 policies: 

2525 - name: s3-encrypt 

2526 resource: s3 

2527 filters: 

2528 - type: missing-statement 

2529 statement_ids: 

2530 - RequiredEncryptedPutObject 

2531 actions: 

2532 - type: mark-for-op 

2533 op: attach-encrypt 

2534 days: 7 

2535 """ 

2536 

2537 schema = type_schema( 

2538 'mark-for-op', rinherit=TagDelayedAction.schema) 

2539 

2540 

2541@actions.register('unmark') 

2542@actions.register('remove-tag') 

2543class RemoveBucketTag(RemoveTag): 

2544 """Removes tag/tags from a S3 object 

2545 

2546 :example: 

2547 

2548 .. code-block:: yaml 

2549 

2550 policies: 

2551 - name: s3-remove-owner-tag 

2552 resource: s3 

2553 filters: 

2554 - "tag:BucketOwner": present 

2555 actions: 

2556 - type: remove-tag 

2557 tags: ['BucketOwner'] 

2558 """ 

2559 

2560 def process_resource_set(self, client, resource_set, tags): 

2561 modify_bucket_tags( 

2562 self.manager.session_factory, resource_set, remove_tags=tags) 

2563 

2564 

2565@filters.register('data-events') 

2566class DataEvents(Filter): 

2567 """Find buckets for which CloudTrail is logging data events. 

2568 

2569 Note that this filter only examines trails that are defined in the 

2570 current account. 

2571 """ 

2572 

2573 schema = type_schema('data-events', state={'enum': ['present', 'absent']}) 

2574 permissions = ( 

2575 'cloudtrail:DescribeTrails', 

2576 'cloudtrail:GetEventSelectors') 

2577 

2578 def get_event_buckets(self, session, trails): 

2579 """Return a mapping of bucket name to cloudtrail. 

2580 

2581 For wildcard trails the bucket name is ''. 

2582 """ 

2583 regions = {t.get('HomeRegion') for t in trails} 

2584 clients = {} 

2585 for region in regions: 

2586 clients[region] = session.client('cloudtrail', region_name=region) 

2587 

2588 event_buckets = {} 

2589 for t in trails: 

2590 for events in clients[t.get('HomeRegion')].get_event_selectors( 

2591 TrailName=t['Name']).get('EventSelectors', ()): 

2592 if 'DataResources' not in events: 

2593 continue 

2594 for data_events in events['DataResources']: 

2595 if data_events['Type'] != 'AWS::S3::Object': 

2596 continue 

2597 for b in data_events['Values']: 

2598 event_buckets[b.rsplit(':')[-1].strip('/')] = t['Name'] 

2599 return event_buckets 

2600 

2601 def process(self, resources, event=None): 

2602 trails = self.manager.get_resource_manager('cloudtrail').resources() 

2603 local_trails = self.filter_resources( 

2604 trails, 

2605 "split(':', TrailARN)[4]", (self.manager.account_id,) 

2606 ) 

2607 session = local_session(self.manager.session_factory) 

2608 event_buckets = self.get_event_buckets(session, local_trails) 

2609 ops = { 

2610 'present': lambda x: ( 

2611 x['Name'] in event_buckets or '' in event_buckets), 

2612 'absent': ( 

2613 lambda x: x['Name'] not in event_buckets and '' 

2614 not in event_buckets)} 

2615 

2616 op = ops[self.data.get('state', 'present')] 

2617 results = [] 

2618 for b in resources: 

2619 if op(b): 

2620 results.append(b) 

2621 return results 

2622 

2623 

2624@filters.register('inventory') 

2625class Inventory(ValueFilter): 

2626 """Filter inventories for a bucket""" 

2627 schema = type_schema('inventory', rinherit=ValueFilter.schema) 

2628 schema_alias = False 

2629 permissions = ('s3:GetInventoryConfiguration',) 

2630 

2631 def process(self, buckets, event=None): 

2632 results = [] 

2633 with self.executor_factory(max_workers=2) as w: 

2634 futures = {} 

2635 for b in buckets: 

2636 futures[w.submit(self.process_bucket, b)] = b 

2637 

2638 for f in as_completed(futures): 

2639 b = futures[f] 

2640 if f.exception(): 

2641 b.setdefault('c7n:DeniedMethods', []).append('GetInventoryConfiguration') 

2642 self.log.error( 

2643 "Error processing bucket: %s error: %s", 

2644 b['Name'], f.exception()) 

2645 continue 

2646 if f.result(): 

2647 results.append(b) 

2648 return results 

2649 

2650 def process_bucket(self, b): 

2651 if 'c7n:inventories' not in b: 

2652 client = bucket_client(local_session(self.manager.session_factory), b) 

2653 inventories = client.list_bucket_inventory_configurations( 

2654 Bucket=b['Name']).get('InventoryConfigurationList', []) 

2655 b['c7n:inventories'] = inventories 

2656 

2657 for i in b['c7n:inventories']: 

2658 if self.match(i): 

2659 return True 

2660 

2661 

2662@actions.register('set-inventory') 

2663class SetInventory(BucketActionBase): 

2664 """Configure bucket inventories for an s3 bucket. 

2665 """ 

2666 schema = type_schema( 

2667 'set-inventory', 

2668 required=['name', 'destination'], 

2669 state={'enum': ['enabled', 'disabled', 'absent']}, 

2670 name={'type': 'string', 'description': 'Name of inventory'}, 

2671 destination={'type': 'string', 'description': 'Name of destination bucket'}, 

2672 prefix={'type': 'string', 'description': 'Destination prefix'}, 

2673 encryption={'enum': ['SSES3', 'SSEKMS']}, 

2674 key_id={'type': 'string', 'description': 'Optional Customer KMS KeyId for SSE-KMS'}, 

2675 versions={'enum': ['All', 'Current']}, 

2676 schedule={'enum': ['Daily', 'Weekly']}, 

2677 format={'enum': ['CSV', 'ORC', 'Parquet']}, 

2678 fields={'type': 'array', 'items': {'enum': [ 

2679 'Size', 'LastModifiedDate', 'StorageClass', 'ETag', 

2680 'IsMultipartUploaded', 'ReplicationStatus', 'EncryptionStatus', 

2681 'ObjectLockRetainUntilDate', 'ObjectLockMode', 'ObjectLockLegalHoldStatus', 

2682 'IntelligentTieringAccessTier', 'BucketKeyStatus', 'ChecksumAlgorithm']}}) 

2683 

2684 permissions = ('s3:PutInventoryConfiguration', 's3:GetInventoryConfiguration') 

2685 

2686 def process(self, buckets): 

2687 with self.executor_factory(max_workers=2) as w: 

2688 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets} 

2689 for future in as_completed(futures): 

2690 bucket = futures[future] 

2691 try: 

2692 future.result() 

2693 except Exception as e: 

2694 self.log.error('Message: %s Bucket: %s', e, bucket['Name']) 

2695 

2696 def process_bucket(self, b): 

2697 inventory_name = self.data.get('name') 

2698 destination = self.data.get('destination') 

2699 prefix = self.data.get('prefix', '') 

2700 schedule = self.data.get('schedule', 'Daily') 

2701 fields = self.data.get('fields', ['LastModifiedDate', 'Size']) 

2702 versions = self.data.get('versions', 'Current') 

2703 state = self.data.get('state', 'enabled') 

2704 encryption = self.data.get('encryption') 

2705 inventory_format = self.data.get('format', 'CSV') 

2706 

2707 if not prefix: 

2708 prefix = "Inventories/%s" % (self.manager.config.account_id) 

2709 

2710 client = bucket_client(local_session(self.manager.session_factory), b) 

2711 if state == 'absent': 

2712 try: 

2713 client.delete_bucket_inventory_configuration( 

2714 Bucket=b['Name'], Id=inventory_name) 

2715 except ClientError as e: 

2716 if e.response['Error']['Code'] != 'NoSuchConfiguration': 

2717 raise 

2718 return 

2719 

2720 bucket = { 

2721 'Bucket': "arn:aws:s3:::%s" % destination, 

2722 'Format': inventory_format 

2723 } 

2724 

2725 inventory = { 

2726 'Destination': { 

2727 'S3BucketDestination': bucket 

2728 }, 

2729 'IsEnabled': state == 'enabled' and True or False, 

2730 'Id': inventory_name, 

2731 'OptionalFields': fields, 

2732 'IncludedObjectVersions': versions, 

2733 'Schedule': { 

2734 'Frequency': schedule 

2735 } 

2736 } 

2737 

2738 if prefix: 

2739 bucket['Prefix'] = prefix 

2740 

2741 if encryption: 

2742 bucket['Encryption'] = {encryption: {}} 

2743 if encryption == 'SSEKMS' and self.data.get('key_id'): 

2744 bucket['Encryption'] = {encryption: { 

2745 'KeyId': self.data['key_id'] 

2746 }} 

2747 

2748 found = self.get_inventory_delta(client, inventory, b) 

2749 if found: 

2750 return 

2751 if found is False: 

2752 self.log.debug("updating bucket:%s inventory configuration id:%s", 

2753 b['Name'], inventory_name) 

2754 client.put_bucket_inventory_configuration( 

2755 Bucket=b['Name'], Id=inventory_name, InventoryConfiguration=inventory) 

2756 

2757 def get_inventory_delta(self, client, inventory, b): 

2758 inventories = client.list_bucket_inventory_configurations(Bucket=b['Name']) 

2759 found = None 

2760 for i in inventories.get('InventoryConfigurationList', []): 

2761 if i['Id'] != inventory['Id']: 

2762 continue 

2763 found = True 

2764 for k, v in inventory.items(): 

2765 if k not in i: 

2766 found = False 

2767 continue 

2768 if isinstance(v, list): 

2769 v.sort() 

2770 i[k].sort() 

2771 if i[k] != v: 

2772 found = False 

2773 return found 

2774 

2775 

2776@filters.register('intelligent-tiering') 

2777class IntelligentTiering(ListItemFilter): 

2778 """Filter for S3 buckets to look at intelligent tiering configurations 

2779 

2780 The schema to supply to the attrs follows the schema here: 

2781 https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_bucket_intelligent_tiering_configurations.html 

2782 

2783 :example: 

2784 

2785 .. code-block:: yaml 

2786 

2787 policies: 

2788 - name: s3-intelligent-tiering-configuration 

2789 resource: s3 

2790 filters: 

2791 - type: intelligent-tiering 

2792 attrs: 

2793 - Status: Enabled 

2794 - Filter: 

2795 And: 

2796 Prefix: test 

2797 Tags: 

2798 - Key: Owner 

2799 Value: c7n 

2800 - Tierings: 

2801 - Days: 100 

2802 - AccessTier: ARCHIVE_ACCESS 

2803 

2804 """ 

2805 schema = type_schema( 

2806 'intelligent-tiering', 

2807 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'}, 

2808 count={'type': 'number'}, 

2809 count_op={'$ref': '#/definitions/filters_common/comparison_operators'} 

2810 ) 

2811 permissions = ('s3:GetIntelligentTieringConfiguration',) 

2812 annotation_key = "c7n:IntelligentTiering" 

2813 annotate_items = True 

2814 

2815 def __init__(self, data, manager=None): 

2816 super().__init__(data, manager) 

2817 self.data['key'] = self.annotation_key 

2818 

2819 def process(self, buckets, event=None): 

2820 with self.executor_factory(max_workers=2) as w: 

2821 futures = {w.submit(self.get_item_values, b): b for b in buckets} 

2822 for future in as_completed(futures): 

2823 b = futures[future] 

2824 if future.exception(): 

2825 self.log.error("Message: %s Bucket: %s", future.exception(), b['Name']) 

2826 continue 

2827 return super().process(buckets, event) 

2828 

2829 def get_item_values(self, b): 

2830 if self.annotation_key not in b: 

2831 client = bucket_client(local_session(self.manager.session_factory), b) 

2832 try: 

2833 int_tier_config = client.list_bucket_intelligent_tiering_configurations( 

2834 Bucket=b['Name']) 

2835 b[self.annotation_key] = int_tier_config.get( 

2836 'IntelligentTieringConfigurationList', []) 

2837 except ClientError as e: 

2838 if e.response['Error']['Code'] == 'AccessDenied': 

2839 method = 'list_bucket_intelligent_tiering_configurations' 

2840 log.warning( 

2841 "Bucket:%s unable to invoke method:%s error:%s ", 

2842 b['Name'], method, e.response['Error']['Message']) 

2843 b.setdefault('c7n:DeniedMethods', []).append(method) 

2844 return b.get(self.annotation_key) 

2845 

2846 

2847@actions.register('set-intelligent-tiering') 

2848class ConfigureIntelligentTiering(BucketActionBase): 

2849 """Action applies an intelligent tiering configuration to a S3 bucket 

2850 

2851 The schema to supply to the configuration follows the schema here: 

2852 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_bucket_intelligent_tiering_configuration.html 

2853 

2854 To delete a configuration, supply Status=delete with the either the Id or Id: matched 

2855 

2856 :example: 

2857 

2858 .. code-block:: yaml 

2859 

2860 policies: 

2861 - name: s3-apply-intelligent-tiering-config 

2862 resource: aws.s3 

2863 filters: 

2864 - not: 

2865 - type: intelligent-tiering 

2866 attrs: 

2867 - Status: Enabled 

2868 - Filter: 

2869 And: 

2870 Prefix: helloworld 

2871 Tags: 

2872 - Key: Hello 

2873 Value: World 

2874 - Tierings: 

2875 - Days: 123 

2876 AccessTier: ARCHIVE_ACCESS 

2877 actions: 

2878 - type: set-intelligent-tiering 

2879 Id: c7n-default 

2880 IntelligentTieringConfiguration: 

2881 Id: c7n-default 

2882 Status: Enabled 

2883 Tierings: 

2884 - Days: 149 

2885 AccessTier: ARCHIVE_ACCESS 

2886 

2887 - name: s3-delete-intelligent-tiering-configuration 

2888 resource: aws.s3 

2889 filters: 

2890 - type: intelligent-tiering 

2891 attrs: 

2892 - Status: Enabled 

2893 - Id: test-config 

2894 actions: 

2895 - type: set-intelligent-tiering 

2896 Id: test-config 

2897 State: delete 

2898 

2899 - name: s3-delete-intelligent-tiering-matched-configs 

2900 resource: aws.s3 

2901 filters: 

2902 - type: intelligent-tiering 

2903 attrs: 

2904 - Status: Enabled 

2905 - Id: test-config 

2906 actions: 

2907 - type: set-intelligent-tiering 

2908 Id: matched 

2909 State: delete 

2910 

2911 """ 

2912 

2913 annotation_key = 'c7n:ListItemMatches' 

2914 shape = 'PutBucketIntelligentTieringConfigurationRequest' 

2915 schema = { 

2916 'type': 'object', 

2917 'additionalProperties': False, 

2918 'oneOf': [ 

2919 {'required': ['type', 'Id', 'IntelligentTieringConfiguration']}, 

2920 {'required': ['type', 'Id', 'State']}], 

2921 'properties': { 

2922 'type': {'enum': ['set-intelligent-tiering']}, 

2923 'Id': {'type': 'string'}, 

2924 # delete intelligent tier configurations via state: delete 

2925 'State': {'type': 'string', 'enum': ['delete']}, 

2926 'IntelligentTieringConfiguration': {'type': 'object'} 

2927 }, 

2928 } 

2929 

2930 permissions = ('s3:PutIntelligentTieringConfiguration',) 

2931 

2932 def validate(self): 

2933 # You can have up to 1,000 S3 Intelligent-Tiering configurations per bucket. 

2934 # Hence, always use it with a filter 

2935 found = False 

2936 for f in self.manager.iter_filters(): 

2937 if isinstance(f, IntelligentTiering): 

2938 found = True 

2939 break 

2940 if not found: 

2941 raise PolicyValidationError( 

2942 '`set-intelligent-tiering` may only be used in ' 

2943 'conjunction with `intelligent-tiering` filter on %s' % (self.manager.data,)) 

2944 cfg = dict(self.data) 

2945 if 'IntelligentTieringConfiguration' in cfg: 

2946 cfg['Bucket'] = 'bucket' 

2947 cfg.pop('type') 

2948 return shape_validate( 

2949 cfg, self.shape, self.manager.resource_type.service) 

2950 

2951 def process(self, buckets): 

2952 with self.executor_factory(max_workers=3) as w: 

2953 futures = {} 

2954 

2955 for b in buckets: 

2956 futures[w.submit(self.process_bucket, b)] = b 

2957 

2958 for future in as_completed(futures): 

2959 if future.exception(): 

2960 bucket = futures[future] 

2961 self.log.error( 

2962 'error modifying bucket intelligent tiering configuration: %s\n%s', 

2963 bucket['Name'], future.exception()) 

2964 continue 

2965 

2966 def process_bucket(self, bucket): 

2967 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

2968 

2969 if 'list_bucket_intelligent_tiering_configurations' in bucket.get( 

2970 'c7n:DeniedMethods', []): 

2971 log.warning("Access Denied Bucket:%s while reading intelligent tiering configurations" 

2972 % bucket['Name']) 

2973 return 

2974 

2975 if self.data.get('Id') and self.data.get('IntelligentTieringConfiguration'): 

2976 try: 

2977 s3.put_bucket_intelligent_tiering_configuration( 

2978 Bucket=bucket['Name'], Id=self.data.get( 

2979 'Id'), IntelligentTieringConfiguration=self.data.get( 

2980 'IntelligentTieringConfiguration')) 

2981 except ClientError as e: 

2982 if e.response['Error']['Code'] == 'AccessDenied': 

2983 log.warning( 

2984 "Access Denied Bucket:%s while applying intelligent tiering configuration" 

2985 % bucket['Name']) 

2986 if self.data.get('State'): 

2987 if self.data.get('Id') == 'matched': 

2988 for config in bucket.get(self.annotation_key): 

2989 self.delete_intelligent_tiering_configurations(s3, config.get('Id'), bucket) 

2990 else: 

2991 self.delete_intelligent_tiering_configurations(s3, self.data.get('Id'), bucket) 

2992 

2993 def delete_intelligent_tiering_configurations(self, s3_client, id, bucket): 

2994 try: 

2995 s3_client.delete_bucket_intelligent_tiering_configuration(Bucket=bucket['Name'], Id=id) 

2996 except ClientError as e: 

2997 if e.response['Error']['Code'] == 'AccessDenied': 

2998 log.warning( 

2999 "Access Denied Bucket:%s while deleting intelligent tiering configuration" 

3000 % bucket['Name']) 

3001 elif e.response['Error']['Code'] == 'NoSuchConfiguration': 

3002 log.warning( 

3003 "No such configuration found:%s while deleting intelligent tiering configuration" 

3004 % bucket['Name']) 

3005 

3006 

3007@actions.register('delete') 

3008class DeleteBucket(ScanBucket): 

3009 """Action deletes a S3 bucket 

3010 

3011 :example: 

3012 

3013 .. code-block:: yaml 

3014 

3015 policies: 

3016 - name: delete-unencrypted-buckets 

3017 resource: s3 

3018 filters: 

3019 - type: missing-statement 

3020 statement_ids: 

3021 - RequiredEncryptedPutObject 

3022 actions: 

3023 - type: delete 

3024 remove-contents: true 

3025 """ 

3026 

3027 schema = type_schema('delete', **{'remove-contents': {'type': 'boolean'}}) 

3028 

3029 permissions = ('s3:*',) 

3030 

3031 bucket_ops = { 

3032 'standard': { 

3033 'iterator': 'list_objects', 

3034 'contents_key': ['Contents'], 

3035 'key_processor': 'process_key' 

3036 }, 

3037 'versioned': { 

3038 'iterator': 'list_object_versions', 

3039 'contents_key': ['Versions', 'DeleteMarkers'], 

3040 'key_processor': 'process_version' 

3041 } 

3042 } 

3043 

3044 def process_delete_enablement(self, b): 

3045 """Prep a bucket for deletion. 

3046 

3047 Clear out any pending multi-part uploads. 

3048 

3049 Disable versioning on the bucket, so deletes don't 

3050 generate fresh deletion markers. 

3051 """ 

3052 client = bucket_client( 

3053 local_session(self.manager.session_factory), b) 

3054 

3055 # Stop replication so we can suspend versioning 

3056 if b.get('Replication') is not None: 

3057 client.delete_bucket_replication(Bucket=b['Name']) 

3058 

3059 # Suspend versioning, so we don't get new delete markers 

3060 # as we walk and delete versions 

3061 if (self.get_bucket_style(b) == 'versioned' and b['Versioning']['Status'] == 'Enabled' and 

3062 self.data.get('remove-contents', True)): 

3063 client.put_bucket_versioning( 

3064 Bucket=b['Name'], 

3065 VersioningConfiguration={'Status': 'Suspended'}) 

3066 

3067 # Clear our multi-part uploads 

3068 uploads = client.get_paginator('list_multipart_uploads') 

3069 for p in uploads.paginate(Bucket=b['Name']): 

3070 for u in p.get('Uploads', ()): 

3071 client.abort_multipart_upload( 

3072 Bucket=b['Name'], 

3073 Key=u['Key'], 

3074 UploadId=u['UploadId']) 

3075 

3076 def process(self, buckets): 

3077 # might be worth sanity checking all our permissions 

3078 # on the bucket up front before disabling versioning/replication. 

3079 if self.data.get('remove-contents', True): 

3080 self._process_with_futures(self.process_delete_enablement, buckets) 

3081 self.empty_buckets(buckets) 

3082 

3083 results = self._process_with_futures(self.delete_bucket, buckets) 

3084 self.write_denied_buckets_file() 

3085 return results 

3086 

3087 def delete_bucket(self, b): 

3088 s3 = bucket_client(self.manager.session_factory(), b) 

3089 try: 

3090 self._run_api(s3.delete_bucket, Bucket=b['Name']) 

3091 except ClientError as e: 

3092 if e.response['Error']['Code'] == 'BucketNotEmpty': 

3093 self.log.error( 

3094 "Error while deleting bucket %s, bucket not empty" % ( 

3095 b['Name'])) 

3096 else: 

3097 raise e 

3098 

3099 def empty_buckets(self, buckets): 

3100 t = time.time() 

3101 results = super(DeleteBucket, self).process(buckets) 

3102 run_time = time.time() - t 

3103 object_count = 0 

3104 

3105 for r in results: 

3106 object_count += r['Count'] 

3107 self.manager.ctx.metrics.put_metric( 

3108 "Total Keys", object_count, "Count", Scope=r['Bucket'], 

3109 buffer=True) 

3110 self.manager.ctx.metrics.put_metric( 

3111 "Total Keys", object_count, "Count", Scope="Account", buffer=True) 

3112 self.manager.ctx.metrics.flush() 

3113 

3114 log.info( 

3115 "EmptyBucket buckets:%d Complete keys:%d rate:%0.2f/s time:%0.2fs", 

3116 len(buckets), object_count, 

3117 float(object_count) / run_time if run_time else 0, run_time) 

3118 return results 

3119 

3120 def process_chunk(self, batch, bucket): 

3121 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3122 objects = [] 

3123 for key in batch: 

3124 obj = {'Key': key['Key']} 

3125 if 'VersionId' in key: 

3126 obj['VersionId'] = key['VersionId'] 

3127 objects.append(obj) 

3128 results = s3.delete_objects( 

3129 Bucket=bucket['Name'], Delete={'Objects': objects}).get('Deleted', ()) 

3130 if self.get_bucket_style(bucket) != 'versioned': 

3131 return results 

3132 

3133 

3134@actions.register('configure-lifecycle') 

3135class Lifecycle(BucketActionBase): 

3136 """Action applies a lifecycle policy to versioned S3 buckets 

3137 

3138 The schema to supply to the rule follows the schema here: 

3139 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_bucket_lifecycle_configuration 

3140 

3141 To delete a lifecycle rule, supply Status=absent 

3142 

3143 :example: 

3144 

3145 .. code-block:: yaml 

3146 

3147 policies: 

3148 - name: s3-apply-lifecycle 

3149 resource: s3 

3150 actions: 

3151 - type: configure-lifecycle 

3152 rules: 

3153 - ID: my-lifecycle-id 

3154 Status: Enabled 

3155 Prefix: foo/ 

3156 Transitions: 

3157 - Days: 60 

3158 StorageClass: GLACIER 

3159 

3160 """ 

3161 

3162 schema = type_schema( 

3163 'configure-lifecycle', 

3164 **{ 

3165 'rules': { 

3166 'type': 'array', 

3167 'items': { 

3168 'type': 'object', 

3169 'required': ['ID', 'Status'], 

3170 'additionalProperties': False, 

3171 'properties': { 

3172 'ID': {'type': 'string'}, 

3173 # c7n intercepts `absent` 

3174 'Status': {'enum': ['Enabled', 'Disabled', 'absent']}, 

3175 'Prefix': {'type': 'string'}, 

3176 'Expiration': { 

3177 'type': 'object', 

3178 'additionalProperties': False, 

3179 'properties': { 

3180 'Date': {'type': 'string'}, # Date 

3181 'Days': {'type': 'integer'}, 

3182 'ExpiredObjectDeleteMarker': {'type': 'boolean'}, 

3183 }, 

3184 }, 

3185 'Filter': { 

3186 'type': 'object', 

3187 'minProperties': 1, 

3188 'maxProperties': 1, 

3189 'additionalProperties': False, 

3190 'properties': { 

3191 'Prefix': {'type': 'string'}, 

3192 'ObjectSizeGreaterThan': {'type': 'integer'}, 

3193 'ObjectSizeLessThan': {'type': 'integer'}, 

3194 'Tag': { 

3195 'type': 'object', 

3196 'required': ['Key', 'Value'], 

3197 'additionalProperties': False, 

3198 'properties': { 

3199 'Key': {'type': 'string'}, 

3200 'Value': {'type': 'string'}, 

3201 }, 

3202 }, 

3203 'And': { 

3204 'type': 'object', 

3205 'additionalProperties': False, 

3206 'properties': { 

3207 'Prefix': {'type': 'string'}, 

3208 'ObjectSizeGreaterThan': {'type': 'integer'}, 

3209 'ObjectSizeLessThan': {'type': 'integer'}, 

3210 'Tags': { 

3211 'type': 'array', 

3212 'items': { 

3213 'type': 'object', 

3214 'required': ['Key', 'Value'], 

3215 'additionalProperties': False, 

3216 'properties': { 

3217 'Key': {'type': 'string'}, 

3218 'Value': {'type': 'string'}, 

3219 }, 

3220 }, 

3221 }, 

3222 }, 

3223 }, 

3224 }, 

3225 }, 

3226 'Transitions': { 

3227 'type': 'array', 

3228 'items': { 

3229 'type': 'object', 

3230 'additionalProperties': False, 

3231 'properties': { 

3232 'Date': {'type': 'string'}, # Date 

3233 'Days': {'type': 'integer'}, 

3234 'StorageClass': {'type': 'string'}, 

3235 }, 

3236 }, 

3237 }, 

3238 'NoncurrentVersionTransitions': { 

3239 'type': 'array', 

3240 'items': { 

3241 'type': 'object', 

3242 'additionalProperties': False, 

3243 'properties': { 

3244 'NoncurrentDays': {'type': 'integer'}, 

3245 'NewerNoncurrentVersions': {'type': 'integer'}, 

3246 'StorageClass': {'type': 'string'}, 

3247 }, 

3248 }, 

3249 }, 

3250 'NoncurrentVersionExpiration': { 

3251 'type': 'object', 

3252 'additionalProperties': False, 

3253 'properties': { 

3254 'NoncurrentDays': {'type': 'integer'}, 

3255 'NewerNoncurrentVersions': {'type': 'integer'} 

3256 }, 

3257 }, 

3258 'AbortIncompleteMultipartUpload': { 

3259 'type': 'object', 

3260 'additionalProperties': False, 

3261 'properties': { 

3262 'DaysAfterInitiation': {'type': 'integer'}, 

3263 }, 

3264 }, 

3265 }, 

3266 }, 

3267 }, 

3268 } 

3269 ) 

3270 

3271 permissions = ('s3:GetLifecycleConfiguration', 's3:PutLifecycleConfiguration') 

3272 

3273 def process(self, buckets): 

3274 with self.executor_factory(max_workers=3) as w: 

3275 futures = {} 

3276 results = [] 

3277 

3278 for b in buckets: 

3279 futures[w.submit(self.process_bucket, b)] = b 

3280 

3281 for future in as_completed(futures): 

3282 if future.exception(): 

3283 bucket = futures[future] 

3284 self.log.error('error modifying bucket lifecycle: %s\n%s', 

3285 bucket['Name'], future.exception()) 

3286 results += filter(None, [future.result()]) 

3287 

3288 return results 

3289 

3290 def process_bucket(self, bucket): 

3291 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3292 

3293 if 'get_bucket_lifecycle_configuration' in bucket.get('c7n:DeniedMethods', []): 

3294 log.warning("Access Denied Bucket:%s while reading lifecycle" % bucket['Name']) 

3295 return 

3296 

3297 # Adjust the existing lifecycle by adding/deleting/overwriting rules as necessary 

3298 config = (bucket.get('Lifecycle') or {}).get('Rules', []) 

3299 for rule in self.data['rules']: 

3300 for index, existing_rule in enumerate(config): 

3301 if not existing_rule: 

3302 continue 

3303 if rule['ID'] == existing_rule['ID']: 

3304 if rule['Status'] == 'absent': 

3305 config[index] = None 

3306 else: 

3307 config[index] = rule 

3308 break 

3309 else: 

3310 if rule['Status'] != 'absent': 

3311 config.append(rule) 

3312 

3313 # The extra `list` conversion is required for python3 

3314 config = list(filter(None, config)) 

3315 

3316 try: 

3317 if not config: 

3318 s3.delete_bucket_lifecycle(Bucket=bucket['Name']) 

3319 else: 

3320 s3.put_bucket_lifecycle_configuration( 

3321 Bucket=bucket['Name'], LifecycleConfiguration={'Rules': config}) 

3322 except ClientError as e: 

3323 if e.response['Error']['Code'] == 'AccessDenied': 

3324 log.warning("Access Denied Bucket:%s while applying lifecycle" % bucket['Name']) 

3325 else: 

3326 raise e 

3327 

3328 

3329class KMSKeyResolverMixin: 

3330 """Builds a dictionary of region specific ARNs""" 

3331 

3332 def __init__(self, data, manager=None): 

3333 self.arns = dict() 

3334 self.data = data 

3335 self.manager = manager 

3336 

3337 def resolve_keys(self, buckets): 

3338 key = self.data.get('key') 

3339 if not key: 

3340 return None 

3341 

3342 regions = {get_region(b) for b in buckets} 

3343 for r in regions: 

3344 client = local_session(self.manager.session_factory).client('kms', region_name=r) 

3345 try: 

3346 key_meta = client.describe_key( 

3347 KeyId=key 

3348 ).get('KeyMetadata', {}) 

3349 key_id = key_meta.get('KeyId') 

3350 

3351 # We need a complete set of alias identifiers (names and ARNs) 

3352 # to fully evaluate bucket encryption filters. 

3353 key_aliases = client.list_aliases( 

3354 KeyId=key_id 

3355 ).get('Aliases', []) 

3356 

3357 self.arns[r] = { 

3358 'KeyId': key_id, 

3359 'Arn': key_meta.get('Arn'), 

3360 'KeyManager': key_meta.get('KeyManager'), 

3361 'Description': key_meta.get('Description'), 

3362 'Aliases': [ 

3363 alias[attr] 

3364 for alias in key_aliases 

3365 for attr in ('AliasArn', 'AliasName') 

3366 ], 

3367 } 

3368 

3369 except ClientError as e: 

3370 self.log.error('Error resolving kms ARNs for set-bucket-encryption: %s key: %s' % ( 

3371 e, self.data.get('key'))) 

3372 

3373 def get_key(self, bucket): 

3374 if 'key' not in self.data: 

3375 return None 

3376 region = get_region(bucket) 

3377 key = self.arns.get(region) 

3378 if not key: 

3379 self.log.warning('Unable to resolve key %s for bucket %s in region %s', 

3380 self.data['key'], bucket.get('Name'), region) 

3381 return key 

3382 

3383 

3384@filters.register('bucket-encryption') 

3385class BucketEncryption(KMSKeyResolverMixin, Filter): 

3386 """Filters for S3 buckets that have bucket-encryption 

3387 

3388 :example 

3389 

3390 .. code-block:: yaml 

3391 

3392 policies: 

3393 - name: s3-bucket-encryption-AES256 

3394 resource: s3 

3395 region: us-east-1 

3396 filters: 

3397 - type: bucket-encryption 

3398 state: True 

3399 crypto: AES256 

3400 - name: s3-bucket-encryption-KMS 

3401 resource: s3 

3402 region: us-east-1 

3403 filters: 

3404 - type: bucket-encryption 

3405 state: True 

3406 crypto: aws:kms 

3407 key: alias/some/alias/key 

3408 - name: s3-bucket-encryption-off 

3409 resource: s3 

3410 region: us-east-1 

3411 filters: 

3412 - type: bucket-encryption 

3413 state: False 

3414 - name: s3-bucket-test-bucket-key-enabled 

3415 resource: s3 

3416 region: us-east-1 

3417 filters: 

3418 - type: bucket-encryption 

3419 bucket_key_enabled: True 

3420 """ 

3421 schema = type_schema('bucket-encryption', 

3422 state={'type': 'boolean'}, 

3423 crypto={'type': 'string', 'enum': ['AES256', 'aws:kms']}, 

3424 key={'type': 'string'}, 

3425 bucket_key_enabled={'type': 'boolean'}) 

3426 

3427 permissions = ('s3:GetEncryptionConfiguration', 'kms:DescribeKey', 'kms:ListAliases') 

3428 annotation_key = 'c7n:bucket-encryption' 

3429 

3430 def validate(self): 

3431 if self.data.get('bucket_key_enabled') is not None and self.data.get('key') is not None: 

3432 raise PolicyValidationError( 

3433 f'key and bucket_key_enabled attributes cannot both be set: {self.data}' 

3434 ) 

3435 

3436 def process(self, buckets, event=None): 

3437 self.resolve_keys(buckets) 

3438 results = [] 

3439 with self.executor_factory(max_workers=2) as w: 

3440 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3441 for future in as_completed(futures): 

3442 b = futures[future] 

3443 if future.exception(): 

3444 self.log.error("Message: %s Bucket: %s", future.exception(), 

3445 b['Name']) 

3446 continue 

3447 if future.result(): 

3448 results.append(b) 

3449 return results 

3450 

3451 def process_bucket(self, b): 

3452 

3453 client = bucket_client(local_session(self.manager.session_factory), b) 

3454 rules = [] 

3455 if self.annotation_key not in b: 

3456 try: 

3457 be = client.get_bucket_encryption(Bucket=b['Name']) 

3458 be.pop('ResponseMetadata', None) 

3459 except ClientError as e: 

3460 if e.response['Error']['Code'] != 'ServerSideEncryptionConfigurationNotFoundError': 

3461 raise 

3462 be = {} 

3463 b[self.annotation_key] = be 

3464 else: 

3465 be = b[self.annotation_key] 

3466 

3467 rules = be.get('ServerSideEncryptionConfiguration', {}).get('Rules', []) 

3468 # default `state` to True as previous impl assumed state == True 

3469 # to preserve backwards compatibility 

3470 if self.data.get('bucket_key_enabled'): 

3471 for rule in rules: 

3472 return self.filter_bucket_key_enabled(rule) 

3473 elif self.data.get('bucket_key_enabled') is False: 

3474 for rule in rules: 

3475 return not self.filter_bucket_key_enabled(rule) 

3476 

3477 if self.data.get('state', True): 

3478 for sse in rules: 

3479 return self.filter_bucket(b, sse) 

3480 return False 

3481 else: 

3482 for sse in rules: 

3483 return not self.filter_bucket(b, sse) 

3484 return True 

3485 

3486 def filter_bucket(self, b, sse): 

3487 allowed = ['AES256', 'aws:kms'] 

3488 key = self.get_key(b) 

3489 crypto = self.data.get('crypto') 

3490 rule = sse.get('ApplyServerSideEncryptionByDefault') 

3491 

3492 if not rule: 

3493 return False 

3494 algo = rule.get('SSEAlgorithm') 

3495 

3496 if not crypto and algo in allowed: 

3497 return True 

3498 

3499 if crypto == 'AES256' and algo == 'AES256': 

3500 return True 

3501 elif crypto == 'aws:kms' and algo == 'aws:kms': 

3502 if not key: 

3503 # There are two broad reasons to have an empty value for 

3504 # the regional key here: 

3505 # 

3506 # * The policy did not specify a key, in which case this 

3507 # filter should match _all_ buckets with a KMS default 

3508 # encryption rule. 

3509 # 

3510 # * The policy specified a key that could not be 

3511 # resolved, in which case this filter shouldn't match 

3512 # any buckets. 

3513 return 'key' not in self.data 

3514 

3515 # The default encryption rule can specify a key ID, 

3516 # key ARN, alias name or alias ARN. Match against any of 

3517 # those attributes. A rule specifying KMS with no master key 

3518 # implies the AWS-managed key. 

3519 key_ids = {key.get('Arn'), key.get('KeyId'), *key['Aliases']} 

3520 return rule.get('KMSMasterKeyID', 'alias/aws/s3') in key_ids 

3521 

3522 def filter_bucket_key_enabled(self, rule) -> bool: 

3523 if not rule: 

3524 return False 

3525 return rule.get('BucketKeyEnabled') 

3526 

3527 

3528@actions.register('set-bucket-encryption') 

3529class SetBucketEncryption(KMSKeyResolverMixin, BucketActionBase): 

3530 """Action enables default encryption on S3 buckets 

3531 

3532 `enabled`: boolean Optional: Defaults to True 

3533 

3534 `crypto`: aws:kms | AES256` Optional: Defaults to AES256 

3535 

3536 `key`: arn, alias, or kms id key 

3537 

3538 `bucket-key`: boolean Optional: 

3539 Defaults to True. 

3540 Reduces amount of API traffic from Amazon S3 to KMS and can reduce KMS request 

3541 costsby up to 99 percent. Requires kms:Decrypt permissions for copy and upload 

3542 on the AWS KMS Key Policy. 

3543 

3544 Bucket Key Docs: https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucket-key.html 

3545 

3546 :example: 

3547 

3548 .. code-block:: yaml 

3549 

3550 policies: 

3551 - name: s3-enable-default-encryption-kms 

3552 resource: s3 

3553 actions: 

3554 - type: set-bucket-encryption 

3555 # enabled: true <------ optional (true by default) 

3556 crypto: aws:kms 

3557 key: 1234abcd-12ab-34cd-56ef-1234567890ab 

3558 bucket-key: true 

3559 

3560 - name: s3-enable-default-encryption-kms-alias 

3561 resource: s3 

3562 actions: 

3563 - type: set-bucket-encryption 

3564 # enabled: true <------ optional (true by default) 

3565 crypto: aws:kms 

3566 key: alias/some/alias/key 

3567 bucket-key: true 

3568 

3569 - name: s3-enable-default-encryption-aes256 

3570 resource: s3 

3571 actions: 

3572 - type: set-bucket-encryption 

3573 # bucket-key: true <--- optional (true by default for AWS SSE) 

3574 # crypto: AES256 <----- optional (AES256 by default) 

3575 # enabled: true <------ optional (true by default) 

3576 

3577 - name: s3-disable-default-encryption 

3578 resource: s3 

3579 actions: 

3580 - type: set-bucket-encryption 

3581 enabled: false 

3582 """ 

3583 

3584 schema = { 

3585 'type': 'object', 

3586 'additionalProperties': False, 

3587 'properties': { 

3588 'type': {'enum': ['set-bucket-encryption']}, 

3589 'enabled': {'type': 'boolean'}, 

3590 'crypto': {'enum': ['aws:kms', 'AES256']}, 

3591 'key': {'type': 'string'}, 

3592 'bucket-key': {'type': 'boolean'} 

3593 }, 

3594 'dependencies': { 

3595 'key': { 

3596 'properties': { 

3597 'crypto': {'pattern': 'aws:kms'} 

3598 }, 

3599 'required': ['crypto'] 

3600 } 

3601 } 

3602 } 

3603 

3604 permissions = ('s3:PutEncryptionConfiguration', 's3:GetEncryptionConfiguration', 

3605 'kms:ListAliases', 'kms:DescribeKey') 

3606 

3607 def process(self, buckets): 

3608 if self.data.get('enabled', True): 

3609 self.resolve_keys(buckets) 

3610 

3611 with self.executor_factory(max_workers=3) as w: 

3612 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3613 for future in as_completed(futures): 

3614 if future.exception(): 

3615 self.log.error('Message: %s Bucket: %s', future.exception(), 

3616 futures[future]['Name']) 

3617 

3618 def process_bucket(self, bucket): 

3619 default_key_desc = 'Default master key that protects my S3 objects when no other key is defined' # noqa 

3620 s3 = bucket_client(local_session(self.manager.session_factory), bucket) 

3621 if not self.data.get('enabled', True): 

3622 s3.delete_bucket_encryption(Bucket=bucket['Name']) 

3623 return 

3624 algo = self.data.get('crypto', 'AES256') 

3625 

3626 # bucket key defaults to True for alias/aws/s3 and AES256 (Amazon SSE) 

3627 # and ignores False values for that crypto 

3628 bucket_key = self.data.get('bucket-key', True) 

3629 config = { 

3630 'Rules': [ 

3631 { 

3632 'ApplyServerSideEncryptionByDefault': { 

3633 'SSEAlgorithm': algo, 

3634 }, 

3635 'BucketKeyEnabled': bucket_key 

3636 } 

3637 ] 

3638 } 

3639 

3640 if algo == 'aws:kms': 

3641 key = self.get_key(bucket) 

3642 if not key: 

3643 raise Exception('Valid KMS Key required but does not exist') 

3644 

3645 config['Rules'][0]['ApplyServerSideEncryptionByDefault']['KMSMasterKeyID'] = key['Arn'] 

3646 s3.put_bucket_encryption( 

3647 Bucket=bucket['Name'], 

3648 ServerSideEncryptionConfiguration=config 

3649 ) 

3650 

3651 

3652OWNERSHIP_CONTROLS = ['BucketOwnerEnforced', 'BucketOwnerPreferred', 'ObjectWriter'] 

3653VALUE_FILTER_MAGIC_VALUES = ['absent', 'present', 'not-null', 'empty'] 

3654 

3655 

3656@filters.register('ownership') 

3657class BucketOwnershipControls(BucketFilterBase, ValueFilter): 

3658 """Filter for object ownership controls 

3659 

3660 Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html 

3661 

3662 :example 

3663 

3664 Find buckets with ACLs disabled 

3665 

3666 .. code-block:: yaml 

3667 

3668 policies: 

3669 - name: s3-bucket-acls-disabled 

3670 resource: aws.s3 

3671 region: us-east-1 

3672 filters: 

3673 - type: ownership 

3674 value: BucketOwnerEnforced 

3675 

3676 :example 

3677 

3678 Find buckets with object ownership preferred or enforced 

3679 

3680 .. code-block:: yaml 

3681 

3682 policies: 

3683 - name: s3-bucket-ownership-preferred 

3684 resource: aws.s3 

3685 region: us-east-1 

3686 filters: 

3687 - type: ownership 

3688 op: in 

3689 value: 

3690 - BucketOwnerEnforced 

3691 - BucketOwnerPreferred 

3692 

3693 :example 

3694 

3695 Find buckets with no object ownership controls 

3696 

3697 .. code-block:: yaml 

3698 

3699 policies: 

3700 - name: s3-bucket-no-ownership-controls 

3701 resource: aws.s3 

3702 region: us-east-1 

3703 filters: 

3704 - type: ownership 

3705 value: empty 

3706 """ 

3707 schema = type_schema('ownership', rinherit=ValueFilter.schema, value={'oneOf': [ 

3708 {'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}, 

3709 {'type': 'array', 'items': { 

3710 'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}}]}) 

3711 permissions = ('s3:GetBucketOwnershipControls',) 

3712 annotation_key = 'c7n:ownership' 

3713 

3714 def __init__(self, data, manager=None): 

3715 super(BucketOwnershipControls, self).__init__(data, manager) 

3716 

3717 # Ownership controls appear as an array of rules. There can only be one 

3718 # ObjectOwnership rule defined for a bucket, so we can automatically 

3719 # match against that if it exists. 

3720 self.data['key'] = f'("{self.annotation_key}".Rules[].ObjectOwnership)[0]' 

3721 

3722 def process(self, buckets, event=None): 

3723 with self.executor_factory(max_workers=2) as w: 

3724 futures = {w.submit(self.process_bucket, b): b for b in buckets} 

3725 for future in as_completed(futures): 

3726 b = futures[future] 

3727 if future.exception(): 

3728 self.log.error("Message: %s Bucket: %s", future.exception(), 

3729 b['Name']) 

3730 continue 

3731 return super(BucketOwnershipControls, self).process(buckets, event) 

3732 

3733 def process_bucket(self, b): 

3734 if self.annotation_key in b: 

3735 return 

3736 client = bucket_client(local_session(self.manager.session_factory), b) 

3737 try: 

3738 controls = client.get_bucket_ownership_controls(Bucket=b['Name']) 

3739 controls.pop('ResponseMetadata', None) 

3740 except ClientError as e: 

3741 if e.response['Error']['Code'] != 'OwnershipControlsNotFoundError': 

3742 raise 

3743 controls = {} 

3744 b[self.annotation_key] = controls.get('OwnershipControls') 

3745 

3746 

3747@filters.register('bucket-replication') 

3748class BucketReplication(ListItemFilter): 

3749 """Filter for S3 buckets to look at bucket replication configurations 

3750 

3751 The schema to supply to the attrs follows the schema here: 

3752 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_bucket_replication.html 

3753 

3754 :example: 

3755 

3756 .. code-block:: yaml 

3757 

3758 policies: 

3759 - name: s3-bucket-replication 

3760 resource: s3 

3761 filters: 

3762 - type: bucket-replication 

3763 attrs: 

3764 - Status: Enabled 

3765 - Filter: 

3766 And: 

3767 Prefix: test 

3768 Tags: 

3769 - Key: Owner 

3770 Value: c7n 

3771 - ExistingObjectReplication: Enabled 

3772 

3773 """ 

3774 schema = type_schema( 

3775 'bucket-replication', 

3776 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'}, 

3777 count={'type': 'number'}, 

3778 count_op={'$ref': '#/definitions/filters_common/comparison_operators'} 

3779 ) 

3780 

3781 permissions = ("s3:GetReplicationConfiguration",) 

3782 annotation_key = 'Replication' 

3783 annotate_items = True 

3784 

3785 def __init__(self, data, manager=None): 

3786 super().__init__(data, manager) 

3787 self.data['key'] = self.annotation_key 

3788 

3789 def get_item_values(self, b): 

3790 client = bucket_client(local_session(self.manager.session_factory), b) 

3791 # replication configuration is called in S3_AUGMENT_TABLE: 

3792 bucket_replication = b[self.annotation_key] 

3793 

3794 rules = [] 

3795 if bucket_replication is not None: 

3796 rules = bucket_replication.get('ReplicationConfiguration', {}).get('Rules', []) 

3797 for replication in rules: 

3798 self.augment_bucket_replication(b, replication, client) 

3799 

3800 return rules 

3801 

3802 def augment_bucket_replication(self, b, replication, client): 

3803 destination_bucket = replication.get('Destination').get('Bucket').split(':')[5] 

3804 destination_region = inspect_bucket_region(destination_bucket, client.meta.endpoint_url) 

3805 source_region = get_region(b) 

3806 replication['DestinationRegion'] = destination_region 

3807 replication['CrossRegion'] = destination_region != source_region