1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""S3 Resource Manager
4
5Filters:
6
7The generic Values filters (jmespath) expression and Or filter are
8available with all resources, including buckets, we include several
9additonal bucket data (Tags, Replication, Acl, Policy) as keys within
10a bucket representation.
11
12Actions:
13
14 encrypt-keys
15
16 Scan all keys in a bucket and optionally encrypt them in place.
17
18 global-grants
19
20 Check bucket acls for global grants
21
22 encryption-policy
23
24 Attach an encryption required policy to a bucket, this will break
25 applications that are not using encryption, including aws log
26 delivery.
27
28"""
29import copy
30import functools
31import json
32import itertools
33import logging
34import math
35import os
36import time
37import ssl
38
39from botocore.client import Config
40from botocore.exceptions import ClientError
41
42from collections import defaultdict
43from concurrent.futures import as_completed
44
45try:
46 from urllib3.exceptions import SSLError
47except ImportError:
48 from botocore.vendored.requests.packages.urllib3.exceptions import SSLError
49
50
51from c7n.actions import (
52 ActionRegistry, BaseAction, PutMetric, RemovePolicyBase)
53from c7n.exceptions import PolicyValidationError, PolicyExecutionError
54from c7n.filters import (
55 FilterRegistry, Filter, CrossAccountAccessFilter, MetricsFilter,
56 ValueFilter, ListItemFilter)
57from .aws import shape_validate
58from c7n.filters.policystatement import HasStatementFilter
59from c7n.manager import resources
60from c7n.output import NullBlobOutput
61from c7n import query
62from c7n.resources.securityhub import PostFinding
63from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction
64from c7n.utils import (
65 chunks, local_session, set_annotation, type_schema, filter_empty,
66 dumps, format_string_values, get_account_alias_from_sts)
67from c7n.resources.aws import inspect_bucket_region
68
69
70log = logging.getLogger('custodian.s3')
71
72filters = FilterRegistry('s3.filters')
73actions = ActionRegistry('s3.actions')
74filters.register('marked-for-op', TagActionFilter)
75actions.register('put-metric', PutMetric)
76
77MAX_COPY_SIZE = 1024 * 1024 * 1024 * 2
78
79
80class DescribeS3(query.DescribeSource):
81
82 def augment(self, buckets):
83 with self.manager.executor_factory(
84 max_workers=min((10, len(buckets) + 1))) as w:
85 results = w.map(
86 assemble_bucket,
87 zip(itertools.repeat(self.manager.session_factory), buckets))
88 results = list(filter(None, results))
89 return results
90
91
92class ConfigS3(query.ConfigSource):
93
94 # normalize config's janky idiosyncratic bespoke formating to the
95 # standard describe api responses.
96
97 def get_query_params(self, query):
98 q = super(ConfigS3, self).get_query_params(query)
99 if 'expr' in q:
100 q['expr'] = q['expr'].replace('select ', 'select awsRegion, ')
101 return q
102
103 def load_resource(self, item):
104 resource = super(ConfigS3, self).load_resource(item)
105 cfg = item['supplementaryConfiguration']
106 # aka standard
107 if 'awsRegion' in item and item['awsRegion'] != 'us-east-1':
108 resource['Location'] = {'LocationConstraint': item['awsRegion']}
109 else:
110 resource['Location'] = {}
111
112 # owner is under acl per describe
113 resource.pop('Owner', None)
114
115 for k, null_value in S3_CONFIG_SUPPLEMENT_NULL_MAP.items():
116 if k not in cfg:
117 continue
118 if cfg.get(k) == null_value:
119 continue
120 method = getattr(self, "handle_%s" % k, None)
121 if method is None:
122 raise ValueError("unhandled supplementary config %s", k)
123 continue
124 v = cfg[k]
125 if isinstance(cfg[k], str):
126 v = json.loads(cfg[k])
127 method(resource, v)
128
129 for el in S3_AUGMENT_TABLE:
130 if el[1] not in resource:
131 resource[el[1]] = el[2]
132 return resource
133
134 PERMISSION_MAP = {
135 'FullControl': 'FULL_CONTROL',
136 'Write': 'WRITE',
137 'WriteAcp': 'WRITE_ACP',
138 'Read': 'READ',
139 'ReadAcp': 'READ_ACP'}
140
141 GRANTEE_MAP = {
142 'AllUsers': "http://acs.amazonaws.com/groups/global/AllUsers",
143 'AuthenticatedUsers': "http://acs.amazonaws.com/groups/global/AuthenticatedUsers",
144 'LogDelivery': 'http://acs.amazonaws.com/groups/s3/LogDelivery'}
145
146 def handle_AccessControlList(self, resource, item_value):
147 # double serialized in config for some reason
148 if isinstance(item_value, str):
149 item_value = json.loads(item_value)
150
151 resource['Acl'] = {}
152 resource['Acl']['Owner'] = {'ID': item_value['owner']['id']}
153 if item_value['owner']['displayName']:
154 resource['Acl']['Owner']['DisplayName'] = item_value[
155 'owner']['displayName']
156 resource['Acl']['Grants'] = grants = []
157
158 for g in (item_value.get('grantList') or ()):
159 if 'id' not in g['grantee']:
160 assert g['grantee'] in self.GRANTEE_MAP, "unknown grantee %s" % g
161 rg = {'Type': 'Group', 'URI': self.GRANTEE_MAP[g['grantee']]}
162 else:
163 rg = {'ID': g['grantee']['id'], 'Type': 'CanonicalUser'}
164
165 if 'displayName' in g:
166 rg['DisplayName'] = g['displayName']
167
168 grants.append({
169 'Permission': self.PERMISSION_MAP[g['permission']],
170 'Grantee': rg,
171 })
172
173 def handle_BucketAccelerateConfiguration(self, resource, item_value):
174 # not currently auto-augmented by custodian
175 return
176
177 def handle_BucketLoggingConfiguration(self, resource, item_value):
178 if ('destinationBucketName' not in item_value or
179 item_value['destinationBucketName'] is None):
180 resource[u'Logging'] = {}
181 return
182 resource[u'Logging'] = {
183 'TargetBucket': item_value['destinationBucketName'],
184 'TargetPrefix': item_value['logFilePrefix']}
185
186 def handle_BucketLifecycleConfiguration(self, resource, item_value):
187 rules = []
188 for r in item_value.get('rules'):
189 rr = {}
190 rules.append(rr)
191 expiry = {}
192 for ek, ck in (
193 ('Date', 'expirationDate'),
194 ('ExpiredObjectDeleteMarker', 'expiredObjectDeleteMarker'),
195 ('Days', 'expirationInDays')):
196 if ck in r and r[ck] and r[ck] != -1:
197 expiry[ek] = r[ck]
198 if expiry:
199 rr['Expiration'] = expiry
200
201 transitions = []
202 for t in (r.get('transitions') or ()):
203 tr = {}
204 for k in ('date', 'days', 'storageClass'):
205 if t.get(k):
206 tr["%s%s" % (k[0].upper(), k[1:])] = t[k]
207 transitions.append(tr)
208 if transitions:
209 rr['Transitions'] = transitions
210
211 if r.get('abortIncompleteMultipartUpload'):
212 rr['AbortIncompleteMultipartUpload'] = {
213 'DaysAfterInitiation': r[
214 'abortIncompleteMultipartUpload']['daysAfterInitiation']}
215 if r.get('noncurrentVersionExpirationInDays'):
216 rr['NoncurrentVersionExpiration'] = {
217 'NoncurrentDays': r['noncurrentVersionExpirationInDays']}
218
219 nonc_transitions = []
220 for t in (r.get('noncurrentVersionTransitions') or ()):
221 nonc_transitions.append({
222 'NoncurrentDays': t['days'],
223 'StorageClass': t['storageClass']})
224 if nonc_transitions:
225 rr['NoncurrentVersionTransitions'] = nonc_transitions
226
227 rr['Status'] = r['status']
228 rr['ID'] = r['id']
229 if r.get('prefix'):
230 rr['Prefix'] = r['prefix']
231 if 'filter' not in r or not r['filter']:
232 continue
233
234 if r['filter']['predicate']:
235 rr['Filter'] = self.convertLifePredicate(r['filter']['predicate'])
236
237 resource['Lifecycle'] = {'Rules': rules}
238
239 def convertLifePredicate(self, p):
240 if p['type'] == 'LifecyclePrefixPredicate':
241 return {'Prefix': p['prefix']}
242 if p['type'] == 'LifecycleTagPredicate':
243 return {'Tags': [{'Key': p['tag']['key'], 'Value': p['tag']['value']}]}
244 if p['type'] == 'LifecycleAndOperator':
245 n = {}
246 for o in p['operands']:
247 ot = self.convertLifePredicate(o)
248 if 'Tags' in n and 'Tags' in ot:
249 n['Tags'].extend(ot['Tags'])
250 else:
251 n.update(ot)
252 return {'And': n}
253
254 raise ValueError("unknown predicate: %s" % p)
255
256 NotifyTypeMap = {
257 'QueueConfiguration': 'QueueConfigurations',
258 'LambdaConfiguration': 'LambdaFunctionConfigurations',
259 'CloudFunctionConfiguration': 'LambdaFunctionConfigurations',
260 'TopicConfiguration': 'TopicConfigurations'}
261
262 def handle_BucketNotificationConfiguration(self, resource, item_value):
263 d = {}
264 for nid, n in item_value['configurations'].items():
265 ninfo = {}
266 d.setdefault(self.NotifyTypeMap[n['type']], []).append(ninfo)
267 if n['type'] == 'QueueConfiguration':
268 ninfo['QueueArn'] = n['queueARN']
269 elif n['type'] == 'TopicConfiguration':
270 ninfo['TopicArn'] = n['topicARN']
271 elif n['type'] == 'LambdaConfiguration':
272 ninfo['LambdaFunctionArn'] = n['functionARN']
273 ninfo['Id'] = nid
274 ninfo['Events'] = n['events']
275 rules = []
276 if n['filter']:
277 for r in n['filter'].get('s3KeyFilter', {}).get('filterRules', []):
278 rules.append({'Name': r['name'], 'Value': r['value']})
279 if rules:
280 ninfo['Filter'] = {'Key': {'FilterRules': rules}}
281 resource['Notification'] = d
282
283 def handle_BucketReplicationConfiguration(self, resource, item_value):
284 d = {'Role': item_value['roleARN'], 'Rules': []}
285 for rid, r in item_value['rules'].items():
286 rule = {
287 'ID': rid,
288 'Status': r.get('status', ''),
289 'Prefix': r.get('prefix', ''),
290 'Destination': {
291 'Bucket': r['destinationConfig']['bucketARN']}
292 }
293 if 'Account' in r['destinationConfig']:
294 rule['Destination']['Account'] = r['destinationConfig']['Account']
295 if r['destinationConfig'].get('storageClass'):
296 rule['Destination']['StorageClass'] = r['destinationConfig']['storageClass']
297 d['Rules'].append(rule)
298 resource['Replication'] = {'ReplicationConfiguration': d}
299
300 def handle_BucketPolicy(self, resource, item_value):
301 resource['Policy'] = item_value.get('policyText')
302
303 def handle_BucketTaggingConfiguration(self, resource, item_value):
304 resource['Tags'] = [
305 {"Key": k, "Value": v} for k, v in item_value['tagSets'][0]['tags'].items()]
306
307 def handle_BucketVersioningConfiguration(self, resource, item_value):
308 # Config defaults versioning to 'Off' for a null value
309 if item_value['status'] not in ('Enabled', 'Suspended'):
310 resource['Versioning'] = {}
311 return
312 resource['Versioning'] = {'Status': item_value['status']}
313 # `isMfaDeleteEnabled` is an optional boolean property - the key may be absent,
314 # present with a null value, or present with a boolean value.
315 # Mirror the describe source by populating Versioning.MFADelete only in the
316 # boolean case.
317 mfa_delete = item_value.get('isMfaDeleteEnabled')
318 if mfa_delete is None:
319 return
320 resource['Versioning']['MFADelete'] = (
321 'Enabled' if mfa_delete else 'Disabled'
322 )
323
324 def handle_BucketWebsiteConfiguration(self, resource, item_value):
325 website = {}
326 if item_value['indexDocumentSuffix']:
327 website['IndexDocument'] = {
328 'Suffix': item_value['indexDocumentSuffix']}
329 if item_value['errorDocument']:
330 website['ErrorDocument'] = {
331 'Key': item_value['errorDocument']}
332 if item_value['redirectAllRequestsTo']:
333 website['RedirectAllRequestsTo'] = {
334 'HostName': item_value['redirectAllRequestsTo']['hostName'],
335 'Protocol': item_value['redirectAllRequestsTo']['protocol']}
336 for r in item_value['routingRules']:
337 redirect = {}
338 rule = {'Redirect': redirect}
339 website.setdefault('RoutingRules', []).append(rule)
340 if 'condition' in r:
341 cond = {}
342 for ck, rk in (
343 ('keyPrefixEquals', 'KeyPrefixEquals'),
344 ('httpErrorCodeReturnedEquals',
345 'HttpErrorCodeReturnedEquals')):
346 if r['condition'][ck]:
347 cond[rk] = r['condition'][ck]
348 rule['Condition'] = cond
349 for ck, rk in (
350 ('protocol', 'Protocol'),
351 ('hostName', 'HostName'),
352 ('replaceKeyPrefixWith', 'ReplaceKeyPrefixWith'),
353 ('replaceKeyWith', 'ReplaceKeyWith'),
354 ('httpRedirectCode', 'HttpRedirectCode')):
355 if r['redirect'][ck]:
356 redirect[rk] = r['redirect'][ck]
357 resource['Website'] = website
358
359
360@resources.register('s3')
361class S3(query.QueryResourceManager):
362
363 class resource_type(query.TypeInfo):
364 service = 's3'
365 arn_type = ''
366 enum_spec = ('list_buckets', 'Buckets[]', None)
367 # not used but we want some consistency on the metadata
368 detail_spec = ('get_bucket_location', 'Bucket', 'Name', 'LocationConstraint')
369 permissions_augment = (
370 "s3:GetBucketAcl",
371 "s3:GetBucketLocation",
372 "s3:GetBucketPolicy",
373 "s3:GetBucketTagging",
374 "s3:GetBucketVersioning",
375 "s3:GetBucketLogging",
376 "s3:GetBucketNotification",
377 "s3:GetBucketWebsite",
378 "s3:GetLifecycleConfiguration",
379 "s3:GetReplicationConfiguration"
380 )
381 name = id = 'Name'
382 date = 'CreationDate'
383 dimension = 'BucketName'
384 cfn_type = config_type = 'AWS::S3::Bucket'
385
386 filter_registry = filters
387 action_registry = actions
388 source_mapping = {
389 'describe': DescribeS3,
390 'config': ConfigS3
391 }
392
393 def get_arns(self, resources):
394 return ["arn:aws:s3:::{}".format(r["Name"]) for r in resources]
395
396 @classmethod
397 def get_permissions(cls):
398 perms = ["s3:ListAllMyBuckets"]
399 perms.extend([n[-1] for n in S3_AUGMENT_TABLE])
400 return perms
401
402
403S3_CONFIG_SUPPLEMENT_NULL_MAP = {
404 'BucketLoggingConfiguration': u'{"destinationBucketName":null,"logFilePrefix":null}',
405 'BucketPolicy': u'{"policyText":null}',
406 'BucketVersioningConfiguration': u'{"status":"Off","isMfaDeleteEnabled":null}',
407 'BucketAccelerateConfiguration': u'{"status":null}',
408 'BucketNotificationConfiguration': u'{"configurations":{}}',
409 'BucketLifecycleConfiguration': None,
410 'AccessControlList': None,
411 'BucketTaggingConfiguration': None,
412 'BucketWebsiteConfiguration': None,
413 'BucketReplicationConfiguration': None
414}
415
416S3_AUGMENT_TABLE = (
417 ('get_bucket_location', 'Location', {}, None, 's3:GetBucketLocation'),
418 ('get_bucket_tagging', 'Tags', [], 'TagSet', 's3:GetBucketTagging'),
419 ('get_bucket_policy', 'Policy', None, 'Policy', 's3:GetBucketPolicy'),
420 ('get_bucket_acl', 'Acl', None, None, 's3:GetBucketAcl'),
421 ('get_bucket_replication',
422 'Replication', None, None, 's3:GetReplicationConfiguration'),
423 ('get_bucket_versioning', 'Versioning', None, None, 's3:GetBucketVersioning'),
424 ('get_bucket_website', 'Website', None, None, 's3:GetBucketWebsite'),
425 ('get_bucket_logging', 'Logging', None, 'LoggingEnabled', 's3:GetBucketLogging'),
426 ('get_bucket_notification_configuration',
427 'Notification', None, None, 's3:GetBucketNotification'),
428 ('get_bucket_lifecycle_configuration',
429 'Lifecycle', None, None, 's3:GetLifecycleConfiguration'),
430 # ('get_bucket_cors', 'Cors'),
431)
432
433
434def assemble_bucket(item):
435 """Assemble a document representing all the config state around a bucket.
436
437 TODO: Refactor this, the logic here feels quite muddled.
438 """
439 factory, b = item
440 s = factory()
441 c = s.client('s3')
442 # Bucket Location, Current Client Location, Default Location
443 b_location = c_location = location = "us-east-1"
444 methods = list(S3_AUGMENT_TABLE)
445 for minfo in methods:
446 m, k, default, select = minfo[:4]
447 try:
448 method = getattr(c, m)
449 v = method(Bucket=b['Name'])
450 v.pop('ResponseMetadata')
451 if select is not None and select in v:
452 v = v[select]
453 except (ssl.SSLError, SSLError) as e:
454 # Proxy issues? i assume
455 log.warning("Bucket ssl error %s: %s %s",
456 b['Name'], b.get('Location', 'unknown'),
457 e)
458 continue
459 except ClientError as e:
460 code = e.response['Error']['Code']
461 if code.startswith("NoSuch") or "NotFound" in code:
462 v = default
463 elif code == 'PermanentRedirect':
464 s = factory()
465 c = bucket_client(s, b)
466 # Requeue with the correct region given location constraint
467 methods.append((m, k, default, select))
468 continue
469 else:
470 log.warning(
471 "Bucket:%s unable to invoke method:%s error:%s ",
472 b['Name'], m, e.response['Error']['Message'])
473 # For auth failures, we don't bail out, continue processing if we can.
474 # Note this can lead to missing data, but in general is cleaner than
475 # failing hard, due to the common use of locked down s3 bucket policies
476 # that may cause issues fetching information across a fleet of buckets.
477
478 # This does mean s3 policies depending on augments should check denied
479 # methods annotation, generally though lacking get access to an augment means
480 # they won't have write access either.
481
482 # For other error types we raise and bail policy execution.
483 if e.response['Error']['Code'] == 'AccessDenied':
484 b.setdefault('c7n:DeniedMethods', []).append(m)
485 continue
486 raise
487 # As soon as we learn location (which generally works)
488 if k == 'Location' and v is not None:
489 b_location = v.get('LocationConstraint')
490 # Location == region for all cases but EU
491 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlocation.html
492 if b_location is None:
493 b_location = "us-east-1"
494 elif b_location == 'EU':
495 b_location = "eu-west-1"
496 v['LocationConstraint'] = 'eu-west-1'
497 if v and v != c_location:
498 c = s.client('s3', region_name=b_location)
499 elif c_location != location:
500 c = s.client('s3', region_name=location)
501 b[k] = v
502 return b
503
504
505def bucket_client(session, b, kms=False):
506 region = get_region(b)
507
508 if kms:
509 # Need v4 signature for aws:kms crypto, else let the sdk decide
510 # based on region support.
511 config = Config(
512 signature_version='s3v4',
513 read_timeout=200, connect_timeout=120)
514 else:
515 config = Config(read_timeout=200, connect_timeout=120)
516 return session.client('s3', region_name=region, config=config)
517
518
519def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()):
520 for bucket in buckets:
521 client = bucket_client(local_session(session_factory), bucket)
522 # Bucket tags are set atomically for the set/document, we want
523 # to refetch against current to guard against any staleness in
524 # our cached representation across multiple policies or concurrent
525 # modifications.
526
527 if 'get_bucket_tagging' in bucket.get('c7n:DeniedMethods', []):
528 # avoid the additional API call if we already know that it's going
529 # to result in AccessDenied. The chances that the resource's perms
530 # would have changed between fetching the resource and acting on it
531 # here are pretty low-- so the check here should suffice.
532 log.warning(
533 "Unable to get new set of bucket tags needed to modify tags,"
534 "skipping tag action for bucket: %s" % bucket["Name"])
535 continue
536
537 try:
538 bucket['Tags'] = client.get_bucket_tagging(
539 Bucket=bucket['Name']).get('TagSet', [])
540 except ClientError as e:
541 if e.response['Error']['Code'] != 'NoSuchTagSet':
542 raise
543 bucket['Tags'] = []
544
545 new_tags = {t['Key']: t['Value'] for t in add_tags}
546 for t in bucket.get('Tags', ()):
547 if (t['Key'] not in new_tags and t['Key'] not in remove_tags):
548 new_tags[t['Key']] = t['Value']
549 tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()]
550
551 try:
552 client.put_bucket_tagging(
553 Bucket=bucket['Name'], Tagging={'TagSet': tag_set})
554 except ClientError as e:
555 log.exception(
556 'Exception tagging bucket %s: %s', bucket['Name'], e)
557 continue
558
559
560def get_region(b):
561 """Tries to get the bucket region from Location.LocationConstraint
562
563 Special cases:
564 LocationConstraint EU defaults to eu-west-1
565 LocationConstraint null defaults to us-east-1
566
567 Args:
568 b (object): A bucket object
569
570 Returns:
571 string: an aws region string
572 """
573 remap = {None: 'us-east-1', 'EU': 'eu-west-1'}
574 region = b.get('Location', {}).get('LocationConstraint')
575 return remap.get(region, region)
576
577
578@filters.register('metrics')
579class S3Metrics(MetricsFilter):
580 """S3 CW Metrics need special handling for attribute/dimension
581 mismatch, and additional required dimension.
582 """
583
584 def get_dimensions(self, resource):
585 dims = [{'Name': 'BucketName', 'Value': resource['Name']}]
586 if (self.data['name'] == 'NumberOfObjects' and
587 'dimensions' not in self.data):
588 dims.append(
589 {'Name': 'StorageType', 'Value': 'AllStorageTypes'})
590 return dims
591
592
593@filters.register('cross-account')
594class S3CrossAccountFilter(CrossAccountAccessFilter):
595 """Filters cross-account access to S3 buckets
596
597 :example:
598
599 .. code-block:: yaml
600
601 policies:
602 - name: s3-acl
603 resource: s3
604 region: us-east-1
605 filters:
606 - type: cross-account
607 """
608 permissions = ('s3:GetBucketPolicy',)
609
610 def get_accounts(self):
611 """add in elb access by default
612
613 ELB Accounts by region
614 https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-access-logs.html
615
616 Redshift Accounts by region
617 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#db-auditing-manage-log-files
618 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#rs-db-auditing-cloud-trail-rs-acct-ids
619
620 Cloudtrail Accounts by region
621 https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-supported-regions.html
622 """
623 accounts = super(S3CrossAccountFilter, self).get_accounts()
624 return accounts.union(
625 [
626 # ELB accounts
627 '127311923021', # us-east-1
628 '033677994240', # us-east-2
629 '027434742980', # us-west-1
630 '797873946194', # us-west-2
631 '098369216593', # af-south-1
632 '985666609251', # ca-central-1
633 '054676820928', # eu-central-1
634 '897822967062', # eu-north-1
635 '635631232127', # eu-south-1
636 '156460612806', # eu-west-1
637 '652711504416', # eu-west-2
638 '009996457667', # eu-west-3
639 '754344448648', # ap-east-1
640 '582318560864', # ap-northeast-1
641 '600734575887', # ap-northeast-2
642 '383597477331', # ap-northeast-3
643 '114774131450', # ap-southeast-1
644 '783225319266', # ap-southeast-2
645 '718504428378', # ap-south-1
646 '076674570225', # me-south-1
647 '507241528517', # sa-east-1
648 '048591011584', # us-gov-west-1 or gov-cloud-1
649 '190560391635', # us-gov-east-1
650 '638102146993', # cn-north-1
651 '037604701340', # cn-northwest-1
652
653 # Redshift audit logging
654 '193672423079', # us-east-1
655 '391106570357', # us-east-2
656 '262260360010', # us-west-1
657 '902366379725', # us-west-2
658 '365689465814', # af-south-1
659 '313564881002', # ap-east-1
660 '865932855811', # ap-south-1
661 '090321488786', # ap-northeast-3
662 '760740231472', # ap-northeast-2
663 '361669875840', # ap-southeast-1
664 '762762565011', # ap-southeast-2
665 '404641285394', # ap-northeast-1
666 '907379612154', # ca-central-1
667 '053454850223', # eu-central-1
668 '210876761215', # eu-west-1
669 '307160386991', # eu-west-2
670 '945612479654', # eu-south-1
671 '915173422425', # eu-west-3
672 '729911121831', # eu-north-1
673 '013126148197', # me-south-1
674 '075028567923', # sa-east-1
675
676 # Cloudtrail accounts (psa. folks should be using
677 # cloudtrail service in bucket policies)
678 '086441151436', # us-east-1
679 '475085895292', # us-west-2
680 '388731089494', # us-west-1
681 '113285607260', # us-west-2
682 '819402241893', # ca-central-1
683 '977081816279', # ap-south-1
684 '492519147666', # ap-northeast-2
685 '903692715234', # ap-southeast-1
686 '284668455005', # ap-southeast-2
687 '216624486486', # ap-northeast-1
688 '035351147821', # eu-central-1
689 '859597730677', # eu-west-1
690 '282025262664', # eu-west-2
691 '814480443879', # sa-east-1
692 ])
693
694
695@filters.register('global-grants')
696class GlobalGrantsFilter(Filter):
697 """Filters for all S3 buckets that have global-grants
698
699 *Note* by default this filter allows for read access
700 if the bucket has been configured as a website. This
701 can be disabled per the example below.
702
703 :example:
704
705 .. code-block:: yaml
706
707 policies:
708 - name: remove-global-grants
709 resource: s3
710 filters:
711 - type: global-grants
712 allow_website: false
713 actions:
714 - delete-global-grants
715
716 """
717
718 schema = type_schema(
719 'global-grants',
720 allow_website={'type': 'boolean'},
721 operator={'type': 'string', 'enum': ['or', 'and']},
722 permissions={
723 'type': 'array', 'items': {
724 'type': 'string', 'enum': [
725 'READ', 'WRITE', 'WRITE_ACP', 'READ_ACP', 'FULL_CONTROL']}})
726
727 GLOBAL_ALL = "http://acs.amazonaws.com/groups/global/AllUsers"
728 AUTH_ALL = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers"
729
730 def process(self, buckets, event=None):
731 with self.executor_factory(max_workers=5) as w:
732 results = w.map(self.process_bucket, buckets)
733 results = list(filter(None, list(results)))
734 return results
735
736 def process_bucket(self, b):
737 acl = b.get('Acl', {'Grants': []})
738 if not acl or not acl['Grants']:
739 return
740
741 results = []
742 allow_website = self.data.get('allow_website', True)
743 perms = self.data.get('permissions', [])
744
745 for grant in acl['Grants']:
746 if 'URI' not in grant.get("Grantee", {}):
747 continue
748 if grant['Grantee']['URI'] not in [self.AUTH_ALL, self.GLOBAL_ALL]:
749 continue
750 if allow_website and grant['Permission'] == 'READ' and b['Website']:
751 continue
752 if not perms or (perms and grant['Permission'] in perms):
753 results.append(grant['Permission'])
754
755 if results:
756 set_annotation(b, 'GlobalPermissions', results)
757 return b
758
759
760class BucketActionBase(BaseAction):
761
762 def get_permissions(self):
763 return self.permissions
764
765 def get_std_format_args(self, bucket):
766 return {
767 'account_id': self.manager.config.account_id,
768 'region': self.manager.config.region,
769 'bucket_name': bucket['Name'],
770 'bucket_region': get_region(bucket)
771 }
772
773 def process(self, buckets):
774 return self._process_with_futures(buckets)
775
776 def _process_with_futures(self, buckets, *args, max_workers=3, **kwargs):
777 errors = 0
778 results = []
779 with self.executor_factory(max_workers=max_workers) as w:
780 futures = {}
781 for b in buckets:
782 futures[w.submit(self.process_bucket, b, *args, **kwargs)] = b
783 for f in as_completed(futures):
784 if f.exception():
785 b = futures[f]
786 self.log.error(
787 'error modifying bucket: policy:%s action:%s bucket:%s error:%s',
788 self.manager.data.get('name'), self.name, b['Name'], f.exception()
789 )
790 errors += 1
791 continue
792 results += filter(None, [f.result()])
793 if errors:
794 self.log.error('encountered %d errors while processing %s', errors, self.name)
795 raise PolicyExecutionError('%d resources failed', errors)
796 return results
797
798
799class BucketFilterBase(Filter):
800 def get_std_format_args(self, bucket):
801 return {
802 'account_id': self.manager.config.account_id,
803 'region': self.manager.config.region,
804 'bucket_name': bucket['Name'],
805 'bucket_region': get_region(bucket)
806 }
807
808
809@S3.action_registry.register("post-finding")
810class BucketFinding(PostFinding):
811
812 resource_type = 'AwsS3Bucket'
813
814 def format_resource(self, r):
815 owner = r.get("Acl", {}).get("Owner", {})
816 resource = {
817 "Type": self.resource_type,
818 "Id": "arn:aws:s3:::{}".format(r["Name"]),
819 "Region": get_region(r),
820 "Tags": {t["Key"]: t["Value"] for t in r.get("Tags", [])},
821 "Details": {self.resource_type: {
822 "OwnerId": owner.get('ID', 'Unknown')}}
823 }
824
825 if "DisplayName" in owner:
826 resource["Details"]["AwsS3Bucket"]["OwnerName"] = owner['DisplayName']
827
828 return filter_empty(resource)
829
830
831@S3.filter_registry.register('has-statement')
832class S3HasStatementFilter(HasStatementFilter):
833 def get_std_format_args(self, bucket):
834 return {
835 'account_id': self.manager.config.account_id,
836 'region': self.manager.config.region,
837 'bucket_name': bucket['Name'],
838 'bucket_region': get_region(bucket)
839 }
840
841
842@S3.filter_registry.register('lock-configuration')
843class S3LockConfigurationFilter(ValueFilter):
844 """
845 Filter S3 buckets based on their object lock configurations
846
847 :example:
848
849 Get all buckets where lock configuration mode is COMPLIANCE
850
851 .. code-block:: yaml
852
853 policies:
854 - name: lock-configuration-compliance
855 resource: aws.s3
856 filters:
857 - type: lock-configuration
858 key: Rule.DefaultRetention.Mode
859 value: COMPLIANCE
860
861 """
862 schema = type_schema('lock-configuration', rinherit=ValueFilter.schema)
863 permissions = ('s3:GetBucketObjectLockConfiguration',)
864 annotate = True
865 annotation_key = 'c7n:ObjectLockConfiguration'
866
867 def _process_resource(self, client, resource):
868 try:
869 config = client.get_object_lock_configuration(
870 Bucket=resource['Name']
871 )['ObjectLockConfiguration']
872 except ClientError as e:
873 if e.response['Error']['Code'] == 'ObjectLockConfigurationNotFoundError':
874 config = None
875 else:
876 raise
877 resource[self.annotation_key] = config
878
879 def process(self, resources, event=None):
880 client = local_session(self.manager.session_factory).client('s3')
881 with self.executor_factory(max_workers=3) as w:
882 futures = []
883 for res in resources:
884 if self.annotation_key in res:
885 continue
886 futures.append(w.submit(self._process_resource, client, res))
887 for f in as_completed(futures):
888 exc = f.exception()
889 if exc:
890 self.log.error(
891 "Exception getting bucket lock configuration \n %s" % (
892 exc))
893 return super().process(resources, event)
894
895 def __call__(self, r):
896 return super().__call__(r.setdefault(self.annotation_key, None))
897
898
899ENCRYPTION_STATEMENT_GLOB = {
900 'Effect': 'Deny',
901 'Principal': '*',
902 'Action': 's3:PutObject',
903 "Condition": {
904 "StringNotEquals": {
905 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}}
906
907
908@filters.register('no-encryption-statement')
909class EncryptionEnabledFilter(Filter):
910 """Find buckets with missing encryption policy statements.
911
912 :example:
913
914 .. code-block:: yaml
915
916 policies:
917 - name: s3-bucket-not-encrypted
918 resource: s3
919 filters:
920 - type: no-encryption-statement
921 """
922 schema = type_schema(
923 'no-encryption-statement')
924
925 def get_permissions(self):
926 perms = self.manager.get_resource_manager('s3').get_permissions()
927 return perms
928
929 def process(self, buckets, event=None):
930 return list(filter(None, map(self.process_bucket, buckets)))
931
932 def process_bucket(self, b):
933 p = b.get('Policy')
934 if p is None:
935 return b
936 p = json.loads(p)
937 encryption_statement = dict(ENCRYPTION_STATEMENT_GLOB)
938
939 statements = p.get('Statement', [])
940 check = False
941 for s in list(statements):
942 if 'Sid' in s:
943 encryption_statement["Sid"] = s["Sid"]
944 if 'Resource' in s:
945 encryption_statement["Resource"] = s["Resource"]
946 if s == encryption_statement:
947 check = True
948 break
949 if check:
950 return None
951 else:
952 return b
953
954
955@filters.register('missing-statement')
956@filters.register('missing-policy-statement')
957class MissingPolicyStatementFilter(Filter):
958 """Find buckets missing a set of named policy statements.
959
960 :example:
961
962 .. code-block:: yaml
963
964 policies:
965 - name: s3-bucket-missing-statement
966 resource: s3
967 filters:
968 - type: missing-statement
969 statement_ids:
970 - RequiredEncryptedPutObject
971 """
972
973 schema = type_schema(
974 'missing-policy-statement',
975 aliases=('missing-statement',),
976 statement_ids={'type': 'array', 'items': {'type': 'string'}})
977
978 def __call__(self, b):
979 p = b.get('Policy')
980 if p is None:
981 return b
982
983 p = json.loads(p)
984
985 required = list(self.data.get('statement_ids', []))
986 statements = p.get('Statement', [])
987 for s in list(statements):
988 if s.get('Sid') in required:
989 required.remove(s['Sid'])
990 if not required:
991 return False
992 return True
993
994
995@filters.register('bucket-notification')
996class BucketNotificationFilter(ValueFilter):
997 """Filter based on bucket notification configuration.
998
999 :example:
1000
1001 .. code-block:: yaml
1002
1003 policies:
1004 - name: delete-incorrect-notification
1005 resource: s3
1006 filters:
1007 - type: bucket-notification
1008 kind: lambda
1009 key: Id
1010 value: "IncorrectLambda"
1011 op: eq
1012 actions:
1013 - type: delete-bucket-notification
1014 statement_ids: matched
1015 """
1016
1017 schema = type_schema(
1018 'bucket-notification',
1019 required=['kind'],
1020 kind={'type': 'string', 'enum': ['lambda', 'sns', 'sqs']},
1021 rinherit=ValueFilter.schema)
1022 schema_alias = False
1023 annotation_key = 'c7n:MatchedNotificationConfigurationIds'
1024
1025 permissions = ('s3:GetBucketNotification',)
1026
1027 FIELDS = {
1028 'lambda': 'LambdaFunctionConfigurations',
1029 'sns': 'TopicConfigurations',
1030 'sqs': 'QueueConfigurations'
1031 }
1032
1033 def process(self, buckets, event=None):
1034 return super(BucketNotificationFilter, self).process(buckets, event)
1035
1036 def __call__(self, bucket):
1037
1038 field = self.FIELDS[self.data['kind']]
1039 found = False
1040 for config in bucket.get('Notification', {}).get(field, []):
1041 if self.match(config):
1042 set_annotation(
1043 bucket,
1044 BucketNotificationFilter.annotation_key,
1045 config['Id'])
1046 found = True
1047 return found
1048
1049
1050@filters.register('bucket-logging')
1051class BucketLoggingFilter(BucketFilterBase):
1052 """Filter based on bucket logging configuration.
1053
1054 :example:
1055
1056 .. code-block:: yaml
1057
1058 policies:
1059 - name: add-bucket-logging-if-missing
1060 resource: s3
1061 filters:
1062 - type: bucket-logging
1063 op: disabled
1064 actions:
1065 - type: toggle-logging
1066 target_bucket: "{account_id}-{region}-s3-logs"
1067 target_prefix: "{source_bucket_name}/"
1068
1069 policies:
1070 - name: update-incorrect-or-missing-logging
1071 resource: s3
1072 filters:
1073 - type: bucket-logging
1074 op: not-equal
1075 target_bucket: "{account_id}-{region}-s3-logs"
1076 target_prefix: "{account}/{source_bucket_name}/"
1077 actions:
1078 - type: toggle-logging
1079 target_bucket: "{account_id}-{region}-s3-logs"
1080 target_prefix: "{account}/{source_bucket_name}/"
1081 """
1082
1083 schema = type_schema(
1084 'bucket-logging',
1085 op={'enum': ['enabled', 'disabled', 'equal', 'not-equal', 'eq', 'ne']},
1086 required=['op'],
1087 target_bucket={'type': 'string'},
1088 target_prefix={'type': 'string'})
1089 schema_alias = False
1090 account_name = None
1091
1092 permissions = ("s3:GetBucketLogging", "iam:ListAccountAliases")
1093
1094 def process(self, buckets, event=None):
1095 return list(filter(None, map(self.process_bucket, buckets)))
1096
1097 def process_bucket(self, b):
1098 if self.match_bucket(b):
1099 return b
1100
1101 def match_bucket(self, b):
1102 op = self.data.get('op')
1103
1104 logging = b.get('Logging', {})
1105 if op == 'disabled':
1106 return logging == {}
1107 elif op == 'enabled':
1108 return logging != {}
1109
1110 if self.account_name is None:
1111 session = local_session(self.manager.session_factory)
1112 self.account_name = get_account_alias_from_sts(session)
1113
1114 variables = self.get_std_format_args(b)
1115 variables.update({
1116 'account': self.account_name,
1117 'source_bucket_name': b['Name'],
1118 'source_bucket_region': get_region(b),
1119 'target_bucket_name': self.data.get('target_bucket'),
1120 'target_prefix': self.data.get('target_prefix'),
1121 })
1122 data = format_string_values(self.data, **variables)
1123 target_bucket = data.get('target_bucket')
1124 target_prefix = data.get('target_prefix', b['Name'] + '/')
1125
1126 target_config = {
1127 "TargetBucket": target_bucket,
1128 "TargetPrefix": target_prefix
1129 } if target_bucket else {}
1130
1131 if op in ('not-equal', 'ne'):
1132 return logging != target_config
1133 else:
1134 return logging == target_config
1135
1136
1137@actions.register('delete-bucket-notification')
1138class DeleteBucketNotification(BucketActionBase):
1139 """Action to delete S3 bucket notification configurations"""
1140
1141 schema = type_schema(
1142 'delete-bucket-notification',
1143 required=['statement_ids'],
1144 statement_ids={'oneOf': [
1145 {'enum': ['matched']},
1146 {'type': 'array', 'items': {'type': 'string'}}]})
1147
1148 permissions = ('s3:PutBucketNotification',)
1149
1150 def process_bucket(self, bucket):
1151 n = bucket['Notification']
1152 if not n:
1153 return
1154
1155 statement_ids = self.data.get('statement_ids')
1156 if statement_ids == 'matched':
1157 statement_ids = bucket.get(BucketNotificationFilter.annotation_key, ())
1158 if not statement_ids:
1159 return
1160
1161 cfg = defaultdict(list)
1162
1163 for t in BucketNotificationFilter.FIELDS.values():
1164 for c in n.get(t, []):
1165 if c['Id'] not in statement_ids:
1166 cfg[t].append(c)
1167
1168 client = bucket_client(local_session(self.manager.session_factory), bucket)
1169 client.put_bucket_notification_configuration(
1170 Bucket=bucket['Name'],
1171 NotificationConfiguration=cfg)
1172
1173
1174@actions.register('no-op')
1175class NoOp(BucketActionBase):
1176
1177 schema = type_schema('no-op')
1178 permissions = ('s3:ListAllMyBuckets',)
1179
1180 def process(self, buckets):
1181 return None
1182
1183
1184@actions.register('set-statements')
1185class SetPolicyStatement(BucketActionBase):
1186 """Action to add or update policy statements to S3 buckets
1187
1188 :example:
1189
1190 .. code-block:: yaml
1191
1192 policies:
1193 - name: force-s3-https
1194 resource: s3
1195 actions:
1196 - type: set-statements
1197 statements:
1198 - Sid: "DenyHttp"
1199 Effect: "Deny"
1200 Action: "s3:GetObject"
1201 Principal:
1202 AWS: "*"
1203 Resource: "arn:aws:s3:::{bucket_name}/*"
1204 Condition:
1205 Bool:
1206 "aws:SecureTransport": false
1207 """
1208
1209 permissions = ('s3:PutBucketPolicy',)
1210
1211 schema = type_schema(
1212 'set-statements',
1213 **{
1214 'statements': {
1215 'type': 'array',
1216 'items': {
1217 'type': 'object',
1218 'properties': {
1219 'Sid': {'type': 'string'},
1220 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']},
1221 'Principal': {'anyOf': [{'type': 'string'},
1222 {'type': 'object'}, {'type': 'array'}]},
1223 'NotPrincipal': {'anyOf': [{'type': 'object'}, {'type': 'array'}]},
1224 'Action': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1225 'NotAction': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1226 'Resource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1227 'NotResource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1228 'Condition': {'type': 'object'}
1229 },
1230 'required': ['Sid', 'Effect'],
1231 'oneOf': [
1232 {'required': ['Principal', 'Action', 'Resource']},
1233 {'required': ['NotPrincipal', 'Action', 'Resource']},
1234 {'required': ['Principal', 'NotAction', 'Resource']},
1235 {'required': ['NotPrincipal', 'NotAction', 'Resource']},
1236 {'required': ['Principal', 'Action', 'NotResource']},
1237 {'required': ['NotPrincipal', 'Action', 'NotResource']},
1238 {'required': ['Principal', 'NotAction', 'NotResource']},
1239 {'required': ['NotPrincipal', 'NotAction', 'NotResource']}
1240 ]
1241 }
1242 }
1243 }
1244 )
1245
1246 def process_bucket(self, bucket):
1247 policy = bucket.get('Policy') or '{}'
1248
1249 target_statements = format_string_values(
1250 copy.deepcopy({s['Sid']: s for s in self.data.get('statements', [])}),
1251 **self.get_std_format_args(bucket))
1252
1253 policy = json.loads(policy)
1254 bucket_statements = policy.setdefault('Statement', [])
1255
1256 for s in bucket_statements:
1257 if s.get('Sid') not in target_statements:
1258 continue
1259 if s == target_statements[s['Sid']]:
1260 target_statements.pop(s['Sid'])
1261
1262 if not target_statements:
1263 return
1264
1265 bucket_statements.extend(target_statements.values())
1266 policy = json.dumps(policy)
1267
1268 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1269 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=policy)
1270 return {'Name': bucket['Name'], 'Policy': policy}
1271
1272
1273@actions.register('remove-statements')
1274class RemovePolicyStatement(RemovePolicyBase):
1275 """Action to remove policy statements from S3 buckets
1276
1277 :example:
1278
1279 .. code-block:: yaml
1280
1281 policies:
1282 - name: s3-remove-encrypt-put
1283 resource: s3
1284 filters:
1285 - type: has-statement
1286 statement_ids:
1287 - RequireEncryptedPutObject
1288 actions:
1289 - type: remove-statements
1290 statement_ids:
1291 - RequiredEncryptedPutObject
1292 """
1293
1294 permissions = ("s3:PutBucketPolicy", "s3:DeleteBucketPolicy")
1295
1296 def process(self, buckets):
1297 with self.executor_factory(max_workers=3) as w:
1298 futures = {}
1299 results = []
1300 for b in buckets:
1301 futures[w.submit(self.process_bucket, b)] = b
1302 for f in as_completed(futures):
1303 if f.exception():
1304 b = futures[f]
1305 self.log.error('error modifying bucket:%s\n%s',
1306 b['Name'], f.exception())
1307 results += filter(None, [f.result()])
1308 return results
1309
1310 def process_bucket(self, bucket):
1311 p = bucket.get('Policy')
1312 if p is None:
1313 return
1314
1315 p = json.loads(p)
1316
1317 statements, found = self.process_policy(
1318 p, bucket, CrossAccountAccessFilter.annotation_key)
1319
1320 if not found:
1321 return
1322
1323 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1324
1325 if not statements:
1326 s3.delete_bucket_policy(Bucket=bucket['Name'])
1327 else:
1328 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=json.dumps(p))
1329 return {'Name': bucket['Name'], 'State': 'PolicyRemoved', 'Statements': found}
1330
1331
1332@actions.register('set-replication')
1333class SetBucketReplicationConfig(BucketActionBase):
1334 """Action to add or remove replication configuration statement from S3 buckets
1335
1336 :example:
1337
1338 .. code-block:: yaml
1339
1340 policies:
1341 - name: s3-unapproved-account-replication
1342 resource: s3
1343 filters:
1344 - type: value
1345 key: Replication.ReplicationConfiguration.Rules[].Destination.Account
1346 value: present
1347 - type: value
1348 key: Replication.ReplicationConfiguration.Rules[].Destination.Account
1349 value_from:
1350 url: 's3:///path/to/file.json'
1351 format: json
1352 expr: "approved_accounts.*"
1353 op: ni
1354 actions:
1355 - type: set-replication
1356 state: enable
1357 """
1358 schema = type_schema(
1359 'set-replication',
1360 state={'type': 'string', 'enum': ['enable', 'disable', 'remove']})
1361 permissions = ("s3:GetReplicationConfiguration", "s3:PutReplicationConfiguration")
1362
1363 def process(self, buckets):
1364 with self.executor_factory(max_workers=3) as w:
1365 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1366 errors = []
1367 for future in as_completed(futures):
1368 bucket = futures[future]
1369 try:
1370 future.result()
1371 except ClientError as e:
1372 errors.append("Message: %s Bucket: %s", e, bucket['Name'])
1373 if errors:
1374 raise Exception('\n'.join(map(str, errors)))
1375
1376 def process_bucket(self, bucket):
1377 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1378 state = self.data.get('state')
1379 if state is not None:
1380 if state == 'remove':
1381 s3.delete_bucket_replication(Bucket=bucket['Name'])
1382 return {'Name': bucket['Name'], 'State': 'ReplicationConfigRemoved'}
1383 if state in ('enable', 'disable'):
1384 config = s3.get_bucket_replication(Bucket=bucket['Name'])
1385 for rule in config['ReplicationConfiguration']['Rules']:
1386 rule['Status'] = 'Enabled' if state == 'enable' else 'Disabled'
1387 s3.put_bucket_replication(
1388 Bucket=bucket['Name'],
1389 ReplicationConfiguration=config['ReplicationConfiguration']
1390 )
1391 return {'Name': bucket['Name'], 'State': 'ReplicationConfigUpdated'}
1392
1393
1394@filters.register('check-public-block')
1395class FilterPublicBlock(Filter):
1396 """Filter for s3 bucket public blocks
1397
1398 If no filter paramaters are provided it checks to see if any are unset or False.
1399
1400 If parameters are provided only the provided ones are checked.
1401
1402 :example:
1403
1404 .. code-block:: yaml
1405
1406 policies:
1407 - name: CheckForPublicAclBlock-Off
1408 resource: s3
1409 region: us-east-1
1410 filters:
1411 - type: check-public-block
1412 BlockPublicAcls: true
1413 BlockPublicPolicy: true
1414 """
1415
1416 schema = type_schema(
1417 'check-public-block',
1418 BlockPublicAcls={'type': 'boolean'},
1419 IgnorePublicAcls={'type': 'boolean'},
1420 BlockPublicPolicy={'type': 'boolean'},
1421 RestrictPublicBuckets={'type': 'boolean'})
1422 permissions = ("s3:GetBucketPublicAccessBlock",)
1423 keys = (
1424 'BlockPublicPolicy', 'BlockPublicAcls', 'IgnorePublicAcls', 'RestrictPublicBuckets')
1425 annotation_key = 'c7n:PublicAccessBlock'
1426
1427 def process(self, buckets, event=None):
1428 results = []
1429 with self.executor_factory(max_workers=2) as w:
1430 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1431 for f in as_completed(futures):
1432 if f.result():
1433 results.append(futures[f])
1434 return results
1435
1436 def process_bucket(self, bucket):
1437 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1438 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys}))
1439 if self.annotation_key not in bucket:
1440 try:
1441 config = s3.get_public_access_block(
1442 Bucket=bucket['Name'])['PublicAccessBlockConfiguration']
1443 except ClientError as e:
1444 error_code = e.response['Error']['Code']
1445 if error_code == 'NoSuchPublicAccessBlockConfiguration':
1446 pass
1447 elif error_code == 'AccessDenied':
1448 # Follow the same logic as `assemble_bucket` - log and continue on access
1449 # denied errors rather than halting a policy altogether
1450 method = 'GetPublicAccessBlock'
1451 log.warning(
1452 "Bucket:%s unable to invoke method:%s error:%s ",
1453 bucket['Name'], method, e.response['Error']['Message']
1454 )
1455 bucket.setdefault('c7n:DeniedMethods', []).append(method)
1456 else:
1457 raise
1458 bucket[self.annotation_key] = config
1459 return self.matches_filter(config)
1460
1461 def matches_filter(self, config):
1462 key_set = [key for key in self.keys if key in self.data]
1463 if key_set:
1464 return all([self.data.get(key) is config[key] for key in key_set])
1465 else:
1466 return not all(config.values())
1467
1468
1469@actions.register('set-public-block')
1470class SetPublicBlock(BucketActionBase):
1471 """Action to update Public Access blocks on S3 buckets
1472
1473 If no action parameters are provided all settings will be set to the `state`, which defaults
1474
1475 If action parameters are provided, those will be set and other extant values preserved.
1476
1477 :example:
1478
1479 .. code-block:: yaml
1480
1481 policies:
1482 - name: s3-public-block-enable-all
1483 resource: s3
1484 filters:
1485 - type: check-public-block
1486 actions:
1487 - type: set-public-block
1488
1489 policies:
1490 - name: s3-public-block-disable-all
1491 resource: s3
1492 filters:
1493 - type: check-public-block
1494 actions:
1495 - type: set-public-block
1496 state: false
1497
1498 policies:
1499 - name: s3-public-block-enable-some
1500 resource: s3
1501 filters:
1502 - or:
1503 - type: check-public-block
1504 BlockPublicAcls: false
1505 - type: check-public-block
1506 BlockPublicPolicy: false
1507 actions:
1508 - type: set-public-block
1509 BlockPublicAcls: true
1510 BlockPublicPolicy: true
1511
1512 """
1513
1514 schema = type_schema(
1515 'set-public-block',
1516 state={'type': 'boolean', 'default': True},
1517 BlockPublicAcls={'type': 'boolean'},
1518 IgnorePublicAcls={'type': 'boolean'},
1519 BlockPublicPolicy={'type': 'boolean'},
1520 RestrictPublicBuckets={'type': 'boolean'})
1521 permissions = ("s3:GetBucketPublicAccessBlock", "s3:PutBucketPublicAccessBlock")
1522 keys = FilterPublicBlock.keys
1523 annotation_key = FilterPublicBlock.annotation_key
1524
1525 def process(self, buckets):
1526 with self.executor_factory(max_workers=3) as w:
1527 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1528 for future in as_completed(futures):
1529 future.result()
1530
1531 def process_bucket(self, bucket):
1532 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1533 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys}))
1534 if self.annotation_key not in bucket:
1535 try:
1536 config = s3.get_public_access_block(
1537 Bucket=bucket['Name'])['PublicAccessBlockConfiguration']
1538 except ClientError as e:
1539 if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration':
1540 raise
1541
1542 key_set = [key for key in self.keys if key in self.data]
1543 if key_set:
1544 for key in key_set:
1545 config[key] = self.data.get(key)
1546 else:
1547 for key in self.keys:
1548 config[key] = self.data.get('state', True)
1549 s3.put_public_access_block(
1550 Bucket=bucket['Name'], PublicAccessBlockConfiguration=config)
1551
1552
1553@actions.register('toggle-versioning')
1554class ToggleVersioning(BucketActionBase):
1555 """Action to enable/suspend versioning on a S3 bucket
1556
1557 Note versioning can never be disabled only suspended.
1558
1559 :example:
1560
1561 .. code-block:: yaml
1562
1563 policies:
1564 - name: s3-enable-versioning
1565 resource: s3
1566 filters:
1567 - or:
1568 - type: value
1569 key: Versioning.Status
1570 value: Suspended
1571 - type: value
1572 key: Versioning.Status
1573 value: absent
1574 actions:
1575 - type: toggle-versioning
1576 enabled: true
1577 """
1578
1579 schema = type_schema(
1580 'toggle-versioning',
1581 enabled={'type': 'boolean'})
1582 permissions = ("s3:PutBucketVersioning",)
1583
1584 def process_versioning(self, resource, state):
1585 client = bucket_client(
1586 local_session(self.manager.session_factory), resource)
1587 try:
1588 client.put_bucket_versioning(
1589 Bucket=resource['Name'],
1590 VersioningConfiguration={
1591 'Status': state})
1592 except ClientError as e:
1593 if e.response['Error']['Code'] != 'AccessDenied':
1594 log.error(
1595 "Unable to put bucket versioning on bucket %s: %s" % (resource['Name'], e))
1596 raise
1597 log.warning(
1598 "Access Denied Bucket:%s while put bucket versioning" % resource['Name'])
1599
1600 # mfa delete enablement looks like it needs the serial and a current token.
1601 def process(self, resources):
1602 enabled = self.data.get('enabled', True)
1603 for r in resources:
1604 if 'Versioning' not in r or not r['Versioning']:
1605 r['Versioning'] = {'Status': 'Suspended'}
1606 if enabled and (
1607 r['Versioning']['Status'] == 'Suspended'):
1608 self.process_versioning(r, 'Enabled')
1609 if not enabled and r['Versioning']['Status'] == 'Enabled':
1610 self.process_versioning(r, 'Suspended')
1611
1612
1613@actions.register('toggle-logging')
1614class ToggleLogging(BucketActionBase):
1615 """Action to enable/disable logging on a S3 bucket.
1616
1617 Target bucket ACL must allow for WRITE and READ_ACP Permissions
1618 Not specifying a target_prefix will default to the current bucket name.
1619 https://docs.aws.amazon.com/AmazonS3/latest/dev/enable-logging-programming.html
1620
1621 :example:
1622
1623 .. code-block:: yaml
1624
1625 policies:
1626 - name: s3-enable-logging
1627 resource: s3
1628 filters:
1629 - "tag:Testing": present
1630 actions:
1631 - type: toggle-logging
1632 target_bucket: log-bucket
1633 target_prefix: logs123/
1634
1635 policies:
1636 - name: s3-force-standard-logging
1637 resource: s3
1638 filters:
1639 - type: bucket-logging
1640 op: not-equal
1641 target_bucket: "{account_id}-{region}-s3-logs"
1642 target_prefix: "{account}/{source_bucket_name}/"
1643 actions:
1644 - type: toggle-logging
1645 target_bucket: "{account_id}-{region}-s3-logs"
1646 target_prefix: "{account}/{source_bucket_name}/"
1647 """
1648 schema = type_schema(
1649 'toggle-logging',
1650 enabled={'type': 'boolean'},
1651 target_bucket={'type': 'string'},
1652 target_prefix={'type': 'string'})
1653
1654 permissions = ("s3:PutBucketLogging", "iam:ListAccountAliases")
1655
1656 def validate(self):
1657 if self.data.get('enabled', True):
1658 if not self.data.get('target_bucket'):
1659 raise PolicyValidationError(
1660 "target_bucket must be specified on %s" % (
1661 self.manager.data,))
1662 return self
1663
1664 def process(self, resources):
1665 session = local_session(self.manager.session_factory)
1666 kwargs = {
1667 "enabled": self.data.get('enabled', True),
1668 "session": session,
1669 "account_name": get_account_alias_from_sts(session),
1670 }
1671
1672 return self._process_with_futures(resources, **kwargs)
1673
1674 def process_bucket(self, r, enabled=None, session=None, account_name=None):
1675 client = bucket_client(session, r)
1676 is_logging = bool(r.get('Logging'))
1677
1678 if enabled:
1679 variables = self.get_std_format_args(r)
1680 variables.update({
1681 'account': account_name,
1682 'source_bucket_name': r['Name'],
1683 'source_bucket_region': get_region(r),
1684 'target_bucket_name': self.data.get('target_bucket'),
1685 'target_prefix': self.data.get('target_prefix'),
1686 })
1687 data = format_string_values(self.data, **variables)
1688 config = {
1689 'TargetBucket': data.get('target_bucket'),
1690 'TargetPrefix': data.get('target_prefix', r['Name'] + '/')
1691 }
1692 if not is_logging or r.get('Logging') != config:
1693 client.put_bucket_logging(
1694 Bucket=r['Name'],
1695 BucketLoggingStatus={'LoggingEnabled': config}
1696 )
1697 r['Logging'] = config
1698
1699 elif not enabled and is_logging:
1700 client.put_bucket_logging(
1701 Bucket=r['Name'], BucketLoggingStatus={})
1702 r['Logging'] = {}
1703
1704
1705@actions.register('attach-encrypt')
1706class AttachLambdaEncrypt(BucketActionBase):
1707 """Action attaches lambda encryption policy to S3 bucket
1708 supports attachment via lambda bucket notification or sns notification
1709 to invoke lambda. a special topic value of `default` will utilize an
1710 extant notification or create one matching the bucket name.
1711
1712 :example:
1713
1714
1715 .. code-block:: yaml
1716
1717
1718 policies:
1719 - name: attach-lambda-encrypt
1720 resource: s3
1721 filters:
1722 - type: missing-policy-statement
1723 actions:
1724 - type: attach-encrypt
1725 role: arn:aws:iam::123456789012:role/my-role
1726
1727 """
1728 schema = type_schema(
1729 'attach-encrypt',
1730 role={'type': 'string'},
1731 tags={'type': 'object'},
1732 topic={'type': 'string'})
1733
1734 permissions = (
1735 "s3:PutBucketNotification", "s3:GetBucketNotification",
1736 # lambda manager uses quite a few perms to provision lambdas
1737 # and event sources, hard to disamgibuate punt for now.
1738 "lambda:*",
1739 )
1740
1741 def __init__(self, data=None, manager=None):
1742 self.data = data or {}
1743 self.manager = manager
1744
1745 def validate(self):
1746 if (not getattr(self.manager.config, 'dryrun', True) and
1747 not self.data.get('role', self.manager.config.assume_role)):
1748 raise PolicyValidationError(
1749 "attach-encrypt: role must be specified either "
1750 "via assume or in config on %s" % (self.manager.data,))
1751
1752 return self
1753
1754 def process(self, buckets):
1755 from c7n.mu import LambdaManager
1756 from c7n.ufuncs.s3crypt import get_function
1757
1758 account_id = self.manager.config.account_id
1759 topic_arn = self.data.get('topic')
1760
1761 func = get_function(
1762 None, self.data.get('role', self.manager.config.assume_role),
1763 account_id=account_id, tags=self.data.get('tags'))
1764
1765 regions = {get_region(b) for b in buckets}
1766
1767 # session managers by region
1768 region_sessions = {}
1769 for r in regions:
1770 region_sessions[r] = functools.partial(
1771 self.manager.session_factory, region=r)
1772
1773 # Publish function to all of our buckets regions
1774 region_funcs = {}
1775
1776 for r in regions:
1777 lambda_mgr = LambdaManager(region_sessions[r])
1778 lambda_mgr.publish(func)
1779 region_funcs[r] = func
1780
1781 with self.executor_factory(max_workers=3) as w:
1782 results = []
1783 futures = []
1784 for b in buckets:
1785 region = get_region(b)
1786 futures.append(
1787 w.submit(
1788 self.process_bucket,
1789 region_funcs[region],
1790 b,
1791 topic_arn,
1792 account_id,
1793 region_sessions[region]
1794 ))
1795 for f in as_completed(futures):
1796 if f.exception():
1797 log.exception(
1798 "Error attaching lambda-encrypt %s" % (f.exception()))
1799 results.append(f.result())
1800 return list(filter(None, results))
1801
1802 def process_bucket(self, func, bucket, topic, account_id, session_factory):
1803 from c7n.mu import BucketSNSNotification, BucketLambdaNotification
1804 if topic:
1805 topic = None if topic == 'default' else topic
1806 source = BucketSNSNotification(session_factory, bucket, topic)
1807 else:
1808 source = BucketLambdaNotification(
1809 {'account_s3': account_id}, session_factory, bucket)
1810 return source.add(func, None)
1811
1812
1813@actions.register('encryption-policy')
1814class EncryptionRequiredPolicy(BucketActionBase):
1815 """Action to apply an encryption policy to S3 buckets
1816
1817
1818 :example:
1819
1820 .. code-block:: yaml
1821
1822 policies:
1823 - name: s3-enforce-encryption
1824 resource: s3
1825 mode:
1826 type: cloudtrail
1827 events:
1828 - CreateBucket
1829 actions:
1830 - encryption-policy
1831 """
1832
1833 permissions = ("s3:GetBucketPolicy", "s3:PutBucketPolicy")
1834 schema = type_schema('encryption-policy')
1835
1836 def __init__(self, data=None, manager=None):
1837 self.data = data or {}
1838 self.manager = manager
1839
1840 def process(self, buckets):
1841 with self.executor_factory(max_workers=3) as w:
1842 results = w.map(self.process_bucket, buckets)
1843 results = list(filter(None, list(results)))
1844 return results
1845
1846 def process_bucket(self, b):
1847 p = b['Policy']
1848 if p is None:
1849 log.info("No policy found, creating new")
1850 p = {'Version': "2012-10-17", "Statement": []}
1851 else:
1852 p = json.loads(p)
1853
1854 encryption_sid = "RequiredEncryptedPutObject"
1855 encryption_statement = {
1856 'Sid': encryption_sid,
1857 'Effect': 'Deny',
1858 'Principal': '*',
1859 'Action': 's3:PutObject',
1860 "Resource": "arn:aws:s3:::%s/*" % b['Name'],
1861 "Condition": {
1862 # AWS Managed Keys or KMS keys, note policy language
1863 # does not support custom kms (todo add issue)
1864 "StringNotEquals": {
1865 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}}
1866
1867 statements = p.get('Statement', [])
1868 for s in list(statements):
1869 if s.get('Sid', '') == encryption_sid:
1870 log.debug("Bucket:%s Found extant encrypt policy", b['Name'])
1871 if s != encryption_statement:
1872 log.info(
1873 "Bucket:%s updating extant encrypt policy", b['Name'])
1874 statements.remove(s)
1875 else:
1876 return
1877
1878 session = self.manager.session_factory()
1879 s3 = bucket_client(session, b)
1880 statements.append(encryption_statement)
1881 p['Statement'] = statements
1882 log.info('Bucket:%s attached encryption policy' % b['Name'])
1883
1884 try:
1885 s3.put_bucket_policy(
1886 Bucket=b['Name'],
1887 Policy=json.dumps(p))
1888 except ClientError as e:
1889 if e.response['Error']['Code'] == 'NoSuchBucket':
1890 return
1891 self.log.exception(
1892 "Error on bucket:%s putting policy\n%s error:%s",
1893 b['Name'],
1894 json.dumps(statements, indent=2), e)
1895 raise
1896 return {'Name': b['Name'], 'State': 'PolicyAttached'}
1897
1898
1899class BucketScanLog:
1900 """Offload remediated key ids to a disk file in batches
1901
1902 A bucket keyspace is effectively infinite, we need to store partial
1903 results out of memory, this class provides for a json log on disk
1904 with partial write support.
1905
1906 json output format:
1907 - [list_of_serialized_keys],
1908 - [] # Empty list of keys at end when we close the buffer
1909
1910 """
1911
1912 def __init__(self, log_dir, name):
1913 self.log_dir = log_dir
1914 self.name = name
1915 self.fh = None
1916 self.count = 0
1917
1918 @property
1919 def path(self):
1920 return os.path.join(self.log_dir, "%s.json" % self.name)
1921
1922 def __enter__(self):
1923 # Don't require output directories
1924 if self.log_dir is None:
1925 return
1926
1927 self.fh = open(self.path, 'w')
1928 self.fh.write("[\n")
1929 return self
1930
1931 def __exit__(self, exc_type=None, exc_value=None, exc_frame=None):
1932 if self.fh is None:
1933 return
1934 # we need an empty marker list at end to avoid trailing commas
1935 self.fh.write("[]")
1936 # and close the surrounding list
1937 self.fh.write("\n]")
1938 self.fh.close()
1939 if not self.count:
1940 os.remove(self.fh.name)
1941 self.fh = None
1942 return False
1943
1944 def add(self, keys):
1945 self.count += len(keys)
1946 if self.fh is None:
1947 return
1948 self.fh.write(dumps(keys))
1949 self.fh.write(",\n")
1950
1951
1952class ScanBucket(BucketActionBase):
1953
1954 permissions = ("s3:ListBucket",)
1955
1956 bucket_ops = {
1957 'standard': {
1958 'iterator': 'list_objects',
1959 'contents_key': ['Contents'],
1960 'key_processor': 'process_key'
1961 },
1962 'versioned': {
1963 'iterator': 'list_object_versions',
1964 'contents_key': ['Versions'],
1965 'key_processor': 'process_version'
1966 }
1967 }
1968
1969 def __init__(self, data, manager=None):
1970 super(ScanBucket, self).__init__(data, manager)
1971 self.denied_buckets = set()
1972
1973 def get_bucket_style(self, b):
1974 return (
1975 b.get('Versioning', {'Status': ''}).get('Status') in (
1976 'Enabled', 'Suspended') and 'versioned' or 'standard')
1977
1978 def get_bucket_op(self, b, op_name):
1979 bucket_style = self.get_bucket_style(b)
1980 op = self.bucket_ops[bucket_style][op_name]
1981 if op_name == 'key_processor':
1982 return getattr(self, op)
1983 return op
1984
1985 def get_keys(self, b, key_set):
1986 content_keys = self.get_bucket_op(b, 'contents_key')
1987 keys = []
1988 for ck in content_keys:
1989 keys.extend(key_set.get(ck, []))
1990 return keys
1991
1992 def process(self, buckets):
1993 results = self._process_with_futures(self.process_bucket, buckets)
1994 self.write_denied_buckets_file()
1995 return results
1996
1997 def _process_with_futures(self, helper, buckets, max_workers=3):
1998 results = []
1999 with self.executor_factory(max_workers) as w:
2000 futures = {}
2001 for b in buckets:
2002 futures[w.submit(helper, b)] = b
2003 for f in as_completed(futures):
2004 if f.exception():
2005 b = futures[f]
2006 self.log.error(
2007 "Error on bucket:%s region:%s policy:%s error: %s",
2008 b['Name'], b.get('Location', 'unknown'),
2009 self.manager.data.get('name'), f.exception())
2010 self.denied_buckets.add(b['Name'])
2011 continue
2012 result = f.result()
2013 if result:
2014 results.append(result)
2015 return results
2016
2017 def write_denied_buckets_file(self):
2018 if (self.denied_buckets and
2019 self.manager.ctx.log_dir and
2020 not isinstance(self.manager.ctx.output, NullBlobOutput)):
2021 with open(
2022 os.path.join(
2023 self.manager.ctx.log_dir, 'denied.json'), 'w') as fh:
2024 json.dump(list(self.denied_buckets), fh, indent=2)
2025 self.denied_buckets = set()
2026
2027 def process_bucket(self, b):
2028 log.info(
2029 "Scanning bucket:%s visitor:%s style:%s" % (
2030 b['Name'], self.__class__.__name__, self.get_bucket_style(b)))
2031
2032 s = self.manager.session_factory()
2033 s3 = bucket_client(s, b)
2034
2035 # The bulk of _process_bucket function executes inline in
2036 # calling thread/worker context, neither paginator nor
2037 # bucketscan log should be used across worker boundary.
2038 p = s3.get_paginator(
2039 self.get_bucket_op(b, 'iterator')).paginate(Bucket=b['Name'])
2040
2041 with BucketScanLog(self.manager.ctx.log_dir, b['Name']) as key_log:
2042 with self.executor_factory(max_workers=10) as w:
2043 try:
2044 return self._process_bucket(b, p, key_log, w)
2045 except ClientError as e:
2046 if e.response['Error']['Code'] == 'NoSuchBucket':
2047 log.warning(
2048 "Bucket:%s removed while scanning" % b['Name'])
2049 return
2050 if e.response['Error']['Code'] == 'AccessDenied':
2051 log.warning(
2052 "Access Denied Bucket:%s while scanning" % b['Name'])
2053 self.denied_buckets.add(b['Name'])
2054 return
2055 log.exception(
2056 "Error processing bucket:%s paginator:%s" % (
2057 b['Name'], p))
2058
2059 __call__ = process_bucket
2060
2061 def _process_bucket(self, b, p, key_log, w):
2062 count = 0
2063
2064 for key_set in p:
2065 keys = self.get_keys(b, key_set)
2066 count += len(keys)
2067 futures = []
2068
2069 for batch in chunks(keys, size=100):
2070 if not batch:
2071 continue
2072 futures.append(w.submit(self.process_chunk, batch, b))
2073
2074 for f in as_completed(futures):
2075 if f.exception():
2076 log.exception("Exception Processing bucket:%s key batch %s" % (
2077 b['Name'], f.exception()))
2078 continue
2079 r = f.result()
2080 if r:
2081 key_log.add(r)
2082
2083 # Log completion at info level, progress at debug level
2084 if key_set['IsTruncated']:
2085 log.debug('Scan progress bucket:%s keys:%d remediated:%d ...',
2086 b['Name'], count, key_log.count)
2087 else:
2088 log.info('Scan Complete bucket:%s keys:%d remediated:%d',
2089 b['Name'], count, key_log.count)
2090
2091 b['KeyScanCount'] = count
2092 b['KeyRemediated'] = key_log.count
2093 return {
2094 'Bucket': b['Name'], 'Remediated': key_log.count, 'Count': count}
2095
2096 def process_chunk(self, batch, bucket):
2097 raise NotImplementedError()
2098
2099 def process_key(self, s3, key, bucket_name, info=None):
2100 raise NotImplementedError()
2101
2102 def process_version(self, s3, bucket, key):
2103 raise NotImplementedError()
2104
2105
2106@actions.register('encrypt-keys')
2107class EncryptExtantKeys(ScanBucket):
2108 """Action to encrypt unencrypted S3 objects
2109
2110 :example:
2111
2112 .. code-block:: yaml
2113
2114 policies:
2115 - name: s3-encrypt-objects
2116 resource: s3
2117 actions:
2118 - type: encrypt-keys
2119 crypto: aws:kms
2120 key-id: 9c3983be-c6cf-11e6-9d9d-cec0c932ce01
2121 """
2122
2123 permissions = (
2124 "s3:GetObject",
2125 "s3:PutObject",
2126 "s3:DeleteObjectVersion",
2127 "s3:RestoreObject",
2128 ) + ScanBucket.permissions
2129
2130 schema = {
2131 'type': 'object',
2132 'additionalProperties': False,
2133 'properties': {
2134 'type': {'enum': ['encrypt-keys']},
2135 'report-only': {'type': 'boolean'},
2136 'glacier': {'type': 'boolean'},
2137 'large': {'type': 'boolean'},
2138 'crypto': {'enum': ['AES256', 'aws:kms']},
2139 'key-id': {'type': 'string'}
2140 },
2141 'dependencies': {
2142 'key-id': {
2143 'properties': {
2144 'crypto': {'pattern': 'aws:kms'}
2145 },
2146 'required': ['crypto']
2147 }
2148 }
2149 }
2150
2151 metrics = [
2152 ('Total Keys', {'Scope': 'Account'}),
2153 ('Unencrypted', {'Scope': 'Account'})]
2154
2155 def __init__(self, data, manager=None):
2156 super(EncryptExtantKeys, self).__init__(data, manager)
2157 self.kms_id = self.data.get('key-id')
2158
2159 def get_permissions(self):
2160 perms = ("s3:GetObject", "s3:GetObjectVersion")
2161 if self.data.get('report-only'):
2162 perms += ('s3:DeleteObject', 's3:DeleteObjectVersion',
2163 's3:PutObject',
2164 's3:AbortMultipartUpload',
2165 's3:ListBucket',
2166 's3:ListBucketVersions')
2167 return perms
2168
2169 def process(self, buckets):
2170
2171 t = time.time()
2172 results = super(EncryptExtantKeys, self).process(buckets)
2173 run_time = time.time() - t
2174 remediated_count = object_count = 0
2175
2176 for r in results:
2177 object_count += r['Count']
2178 remediated_count += r['Remediated']
2179 self.manager.ctx.metrics.put_metric(
2180 "Unencrypted", r['Remediated'], "Count", Scope=r['Bucket'],
2181 buffer=True)
2182
2183 self.manager.ctx.metrics.put_metric(
2184 "Unencrypted", remediated_count, "Count", Scope="Account",
2185 buffer=True
2186 )
2187 self.manager.ctx.metrics.put_metric(
2188 "Total Keys", object_count, "Count", Scope="Account",
2189 buffer=True
2190 )
2191 self.manager.ctx.metrics.flush()
2192
2193 log.info(
2194 ("EncryptExtant Complete keys:%d "
2195 "remediated:%d rate:%0.2f/s time:%0.2fs"),
2196 object_count,
2197 remediated_count,
2198 float(object_count) / run_time if run_time else 0,
2199 run_time)
2200 return results
2201
2202 def process_chunk(self, batch, bucket):
2203 crypto_method = self.data.get('crypto', 'AES256')
2204 s3 = bucket_client(
2205 local_session(self.manager.session_factory), bucket,
2206 kms=(crypto_method == 'aws:kms'))
2207 b = bucket['Name']
2208 results = []
2209 key_processor = self.get_bucket_op(bucket, 'key_processor')
2210 for key in batch:
2211 r = key_processor(s3, key, b)
2212 if r:
2213 results.append(r)
2214 return results
2215
2216 def process_key(self, s3, key, bucket_name, info=None):
2217 k = key['Key']
2218 if info is None:
2219 info = s3.head_object(Bucket=bucket_name, Key=k)
2220
2221 # If the data is already encrypted with AES256 and this request is also
2222 # for AES256 then we don't need to do anything
2223 if info.get('ServerSideEncryption') == 'AES256' and not self.kms_id:
2224 return False
2225
2226 if info.get('ServerSideEncryption') == 'aws:kms':
2227 # If we're not looking for a specific key any key will do.
2228 if not self.kms_id:
2229 return False
2230 # If we're configured to use a specific key and the key matches
2231 # note this is not a strict equality match.
2232 if self.kms_id in info.get('SSEKMSKeyId', ''):
2233 return False
2234
2235 if self.data.get('report-only'):
2236 return k
2237
2238 storage_class = info.get('StorageClass', 'STANDARD')
2239
2240 if storage_class == 'GLACIER':
2241 if not self.data.get('glacier'):
2242 return False
2243 if 'Restore' not in info:
2244 # This takes multiple hours, we let the next c7n
2245 # run take care of followups.
2246 s3.restore_object(
2247 Bucket=bucket_name,
2248 Key=k,
2249 RestoreRequest={'Days': 30})
2250 return False
2251 elif not restore_complete(info['Restore']):
2252 return False
2253
2254 storage_class = 'STANDARD'
2255
2256 crypto_method = self.data.get('crypto', 'AES256')
2257 key_id = self.data.get('key-id')
2258 # Note on copy we lose individual object acl grants
2259 params = {'Bucket': bucket_name,
2260 'Key': k,
2261 'CopySource': "/%s/%s" % (bucket_name, k),
2262 'MetadataDirective': 'COPY',
2263 'StorageClass': storage_class,
2264 'ServerSideEncryption': crypto_method}
2265
2266 if key_id and crypto_method == 'aws:kms':
2267 params['SSEKMSKeyId'] = key_id
2268
2269 if info['ContentLength'] > MAX_COPY_SIZE and self.data.get(
2270 'large', True):
2271 return self.process_large_file(s3, bucket_name, key, info, params)
2272
2273 s3.copy_object(**params)
2274 return k
2275
2276 def process_version(self, s3, key, bucket_name):
2277 info = s3.head_object(
2278 Bucket=bucket_name,
2279 Key=key['Key'],
2280 VersionId=key['VersionId'])
2281
2282 if 'ServerSideEncryption' in info:
2283 return False
2284
2285 if self.data.get('report-only'):
2286 return key['Key'], key['VersionId']
2287
2288 if key['IsLatest']:
2289 r = self.process_key(s3, key, bucket_name, info)
2290 # Glacier request processing, wait till we have the restored object
2291 if not r:
2292 return r
2293 s3.delete_object(
2294 Bucket=bucket_name,
2295 Key=key['Key'],
2296 VersionId=key['VersionId'])
2297 return key['Key'], key['VersionId']
2298
2299 def process_large_file(self, s3, bucket_name, key, info, params):
2300 """For objects over 5gb, use multipart upload to copy"""
2301 part_size = MAX_COPY_SIZE - (1024 ** 2)
2302 num_parts = int(math.ceil(info['ContentLength'] / part_size))
2303 source = params.pop('CopySource')
2304
2305 params.pop('MetadataDirective')
2306 if 'Metadata' in info:
2307 params['Metadata'] = info['Metadata']
2308
2309 upload_id = s3.create_multipart_upload(**params)['UploadId']
2310
2311 params = {'Bucket': bucket_name,
2312 'Key': key['Key'],
2313 'UploadId': upload_id,
2314 'CopySource': source,
2315 'CopySourceIfMatch': info['ETag']}
2316
2317 def upload_part(part_num):
2318 part_params = dict(params)
2319 part_params['CopySourceRange'] = "bytes=%d-%d" % (
2320 part_size * (part_num - 1),
2321 min(part_size * part_num - 1, info['ContentLength'] - 1))
2322 part_params['PartNumber'] = part_num
2323 response = s3.upload_part_copy(**part_params)
2324 return {'ETag': response['CopyPartResult']['ETag'],
2325 'PartNumber': part_num}
2326
2327 try:
2328 with self.executor_factory(max_workers=2) as w:
2329 parts = list(w.map(upload_part, range(1, num_parts + 1)))
2330 except Exception:
2331 log.warning(
2332 "Error during large key copy bucket: %s key: %s, "
2333 "aborting upload", bucket_name, key, exc_info=True)
2334 s3.abort_multipart_upload(
2335 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id)
2336 raise
2337 s3.complete_multipart_upload(
2338 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id,
2339 MultipartUpload={'Parts': parts})
2340 return key['Key']
2341
2342
2343def restore_complete(restore):
2344 if ',' in restore:
2345 ongoing, _ = restore.split(',', 1)
2346 else:
2347 ongoing = restore
2348 return 'false' in ongoing
2349
2350
2351@filters.register('is-log-target')
2352class LogTarget(Filter):
2353 """Filter and return buckets are log destinations.
2354
2355 Not suitable for use in lambda on large accounts, This is a api
2356 heavy process to detect scan all possible log sources.
2357
2358 Sources:
2359 - elb (Access Log)
2360 - s3 (Access Log)
2361 - cfn (Template writes)
2362 - cloudtrail
2363
2364 :example:
2365
2366 .. code-block:: yaml
2367
2368 policies:
2369 - name: s3-log-bucket
2370 resource: s3
2371 filters:
2372 - type: is-log-target
2373 """
2374
2375 schema = type_schema(
2376 'is-log-target',
2377 services={'type': 'array', 'items': {'enum': [
2378 's3', 'elb', 'cloudtrail']}},
2379 self={'type': 'boolean'},
2380 value={'type': 'boolean'})
2381
2382 def get_permissions(self):
2383 perms = self.manager.get_resource_manager('elb').get_permissions()
2384 perms += ('elasticloadbalancing:DescribeLoadBalancerAttributes',)
2385 return perms
2386
2387 def process(self, buckets, event=None):
2388 log_buckets = set()
2389 count = 0
2390
2391 services = self.data.get('services', ['elb', 's3', 'cloudtrail'])
2392 self_log = self.data.get('self', False)
2393
2394 if 'elb' in services and not self_log:
2395 for bucket, _ in self.get_elb_bucket_locations():
2396 log_buckets.add(bucket)
2397 count += 1
2398 self.log.debug("Found %d elb log targets" % count)
2399
2400 if 's3' in services:
2401 count = 0
2402 for bucket, _ in self.get_s3_bucket_locations(buckets, self_log):
2403 count += 1
2404 log_buckets.add(bucket)
2405 self.log.debug('Found %d s3 log targets' % count)
2406
2407 if 'cloudtrail' in services and not self_log:
2408 for bucket, _ in self.get_cloud_trail_locations(buckets):
2409 log_buckets.add(bucket)
2410
2411 self.log.info("Found %d log targets for %d buckets" % (
2412 len(log_buckets), len(buckets)))
2413 if self.data.get('value', True):
2414 return [b for b in buckets if b['Name'] in log_buckets]
2415 else:
2416 return [b for b in buckets if b['Name'] not in log_buckets]
2417
2418 @staticmethod
2419 def get_s3_bucket_locations(buckets, self_log=False):
2420 """return (bucket_name, prefix) for all s3 logging targets"""
2421 for b in buckets:
2422 if b.get('Logging'):
2423 if self_log:
2424 if b['Name'] != b['Logging']['TargetBucket']:
2425 continue
2426 yield (b['Logging']['TargetBucket'],
2427 b['Logging']['TargetPrefix'])
2428 if not self_log and b['Name'].startswith('cf-templates-'):
2429 yield (b['Name'], '')
2430
2431 def get_cloud_trail_locations(self, buckets):
2432 session = local_session(self.manager.session_factory)
2433 client = session.client('cloudtrail')
2434 names = {b['Name'] for b in buckets}
2435 for t in client.describe_trails().get('trailList', ()):
2436 if t.get('S3BucketName') in names:
2437 yield (t['S3BucketName'], t.get('S3KeyPrefix', ''))
2438
2439 def get_elb_bucket_locations(self):
2440 elbs = self.manager.get_resource_manager('elb').resources()
2441 get_elb_attrs = functools.partial(
2442 _query_elb_attrs, self.manager.session_factory)
2443
2444 with self.executor_factory(max_workers=2) as w:
2445 futures = []
2446 for elb_set in chunks(elbs, 100):
2447 futures.append(w.submit(get_elb_attrs, elb_set))
2448 for f in as_completed(futures):
2449 if f.exception():
2450 log.error("Error while scanning elb log targets: %s" % (
2451 f.exception()))
2452 continue
2453 for tgt in f.result():
2454 yield tgt
2455
2456
2457def _query_elb_attrs(session_factory, elb_set):
2458 session = local_session(session_factory)
2459 client = session.client('elb')
2460 log_targets = []
2461 for e in elb_set:
2462 try:
2463 attrs = client.describe_load_balancer_attributes(
2464 LoadBalancerName=e['LoadBalancerName'])[
2465 'LoadBalancerAttributes']
2466 if 'AccessLog' in attrs and attrs['AccessLog']['Enabled']:
2467 log_targets.append((
2468 attrs['AccessLog']['S3BucketName'],
2469 attrs['AccessLog']['S3BucketPrefix']))
2470 except Exception as err:
2471 log.warning(
2472 "Could not retrieve load balancer %s: %s" % (
2473 e['LoadBalancerName'], err))
2474 return log_targets
2475
2476
2477@actions.register('remove-website-hosting')
2478class RemoveWebsiteHosting(BucketActionBase):
2479 """Action that removes website hosting configuration."""
2480
2481 schema = type_schema('remove-website-hosting')
2482
2483 permissions = ('s3:DeleteBucketWebsite',)
2484
2485 def process(self, buckets):
2486 session = local_session(self.manager.session_factory)
2487 for bucket in buckets:
2488 client = bucket_client(session, bucket)
2489 client.delete_bucket_website(Bucket=bucket['Name'])
2490
2491
2492@actions.register('delete-global-grants')
2493class DeleteGlobalGrants(BucketActionBase):
2494 """Deletes global grants associated to a S3 bucket
2495
2496 :example:
2497
2498 .. code-block:: yaml
2499
2500 policies:
2501 - name: s3-delete-global-grants
2502 resource: s3
2503 filters:
2504 - type: global-grants
2505 actions:
2506 - delete-global-grants
2507 """
2508
2509 schema = type_schema(
2510 'delete-global-grants',
2511 grantees={'type': 'array', 'items': {'type': 'string'}})
2512
2513 permissions = ('s3:PutBucketAcl',)
2514
2515 def process(self, buckets):
2516 with self.executor_factory(max_workers=5) as w:
2517 return list(filter(None, list(w.map(self.process_bucket, buckets))))
2518
2519 def process_bucket(self, b):
2520 grantees = self.data.get(
2521 'grantees', [
2522 GlobalGrantsFilter.AUTH_ALL, GlobalGrantsFilter.GLOBAL_ALL])
2523
2524 log.info(b)
2525
2526 acl = b.get('Acl', {'Grants': []})
2527 if not acl or not acl['Grants']:
2528 return
2529 new_grants = []
2530 for grant in acl['Grants']:
2531 grantee = grant.get('Grantee', {})
2532 if not grantee:
2533 continue
2534 # Yuck, 'get_bucket_acl' doesn't return the grantee type.
2535 if 'URI' in grantee:
2536 grantee['Type'] = 'Group'
2537 else:
2538 grantee['Type'] = 'CanonicalUser'
2539 if ('URI' in grantee and
2540 grantee['URI'] in grantees and not
2541 (grant['Permission'] == 'READ' and b['Website'])):
2542 # Remove this grantee.
2543 pass
2544 else:
2545 new_grants.append(grant)
2546
2547 log.info({'Owner': acl['Owner'], 'Grants': new_grants})
2548
2549 c = bucket_client(self.manager.session_factory(), b)
2550 try:
2551 c.put_bucket_acl(
2552 Bucket=b['Name'],
2553 AccessControlPolicy={
2554 'Owner': acl['Owner'], 'Grants': new_grants})
2555 except ClientError as e:
2556 if e.response['Error']['Code'] == 'NoSuchBucket':
2557 return
2558 return b
2559
2560
2561@actions.register('tag')
2562class BucketTag(Tag):
2563 """Action to create tags on a S3 bucket
2564
2565 :example:
2566
2567 .. code-block:: yaml
2568
2569 policies:
2570 - name: s3-tag-region
2571 resource: s3
2572 region: us-east-1
2573 filters:
2574 - "tag:RegionName": absent
2575 actions:
2576 - type: tag
2577 key: RegionName
2578 value: us-east-1
2579 """
2580
2581 def process_resource_set(self, client, resource_set, tags):
2582 modify_bucket_tags(self.manager.session_factory, resource_set, tags)
2583
2584
2585@actions.register('mark-for-op')
2586class MarkBucketForOp(TagDelayedAction):
2587 """Action schedules custodian to perform an action at a certain date
2588
2589 :example:
2590
2591 .. code-block:: yaml
2592
2593 policies:
2594 - name: s3-encrypt
2595 resource: s3
2596 filters:
2597 - type: missing-statement
2598 statement_ids:
2599 - RequiredEncryptedPutObject
2600 actions:
2601 - type: mark-for-op
2602 op: attach-encrypt
2603 days: 7
2604 """
2605
2606 schema = type_schema(
2607 'mark-for-op', rinherit=TagDelayedAction.schema)
2608
2609
2610@actions.register('unmark')
2611@actions.register('remove-tag')
2612class RemoveBucketTag(RemoveTag):
2613 """Removes tag/tags from a S3 object
2614
2615 :example:
2616
2617 .. code-block:: yaml
2618
2619 policies:
2620 - name: s3-remove-owner-tag
2621 resource: s3
2622 filters:
2623 - "tag:BucketOwner": present
2624 actions:
2625 - type: remove-tag
2626 tags: ['BucketOwner']
2627 """
2628
2629 def process_resource_set(self, client, resource_set, tags):
2630 modify_bucket_tags(
2631 self.manager.session_factory, resource_set, remove_tags=tags)
2632
2633
2634@filters.register('data-events')
2635class DataEvents(Filter):
2636 """Find buckets for which CloudTrail is logging data events.
2637
2638 Note that this filter only examines trails that are defined in the
2639 current account.
2640 """
2641
2642 schema = type_schema('data-events', state={'enum': ['present', 'absent']})
2643 permissions = (
2644 'cloudtrail:DescribeTrails',
2645 'cloudtrail:GetEventSelectors')
2646
2647 def get_event_buckets(self, session, trails):
2648 """Return a mapping of bucket name to cloudtrail.
2649
2650 For wildcard trails the bucket name is ''.
2651 """
2652 regions = {t.get('HomeRegion') for t in trails}
2653 clients = {}
2654 for region in regions:
2655 clients[region] = session.client('cloudtrail', region_name=region)
2656
2657 event_buckets = {}
2658 for t in trails:
2659 for events in clients[t.get('HomeRegion')].get_event_selectors(
2660 TrailName=t['Name']).get('EventSelectors', ()):
2661 if 'DataResources' not in events:
2662 continue
2663 for data_events in events['DataResources']:
2664 if data_events['Type'] != 'AWS::S3::Object':
2665 continue
2666 for b in data_events['Values']:
2667 event_buckets[b.rsplit(':')[-1].strip('/')] = t['Name']
2668 return event_buckets
2669
2670 def process(self, resources, event=None):
2671 trails = self.manager.get_resource_manager('cloudtrail').resources()
2672 local_trails = self.filter_resources(
2673 trails,
2674 "split(':', TrailARN)[4]", (self.manager.account_id,)
2675 )
2676 session = local_session(self.manager.session_factory)
2677 event_buckets = self.get_event_buckets(session, local_trails)
2678 ops = {
2679 'present': lambda x: (
2680 x['Name'] in event_buckets or '' in event_buckets),
2681 'absent': (
2682 lambda x: x['Name'] not in event_buckets and ''
2683 not in event_buckets)}
2684
2685 op = ops[self.data.get('state', 'present')]
2686 results = []
2687 for b in resources:
2688 if op(b):
2689 results.append(b)
2690 return results
2691
2692
2693@filters.register('inventory')
2694class Inventory(ValueFilter):
2695 """Filter inventories for a bucket"""
2696 schema = type_schema('inventory', rinherit=ValueFilter.schema)
2697 schema_alias = False
2698 permissions = ('s3:GetInventoryConfiguration',)
2699
2700 def process(self, buckets, event=None):
2701 results = []
2702 with self.executor_factory(max_workers=2) as w:
2703 futures = {}
2704 for b in buckets:
2705 futures[w.submit(self.process_bucket, b)] = b
2706
2707 for f in as_completed(futures):
2708 b = futures[f]
2709 if f.exception():
2710 b.setdefault('c7n:DeniedMethods', []).append('GetInventoryConfiguration')
2711 self.log.error(
2712 "Error processing bucket: %s error: %s",
2713 b['Name'], f.exception())
2714 continue
2715 if f.result():
2716 results.append(b)
2717 return results
2718
2719 def process_bucket(self, b):
2720 if 'c7n:inventories' not in b:
2721 client = bucket_client(local_session(self.manager.session_factory), b)
2722 inventories = client.list_bucket_inventory_configurations(
2723 Bucket=b['Name']).get('InventoryConfigurationList', [])
2724 b['c7n:inventories'] = inventories
2725
2726 for i in b['c7n:inventories']:
2727 if self.match(i):
2728 return True
2729
2730
2731@actions.register('set-inventory')
2732class SetInventory(BucketActionBase):
2733 """Configure bucket inventories for an s3 bucket.
2734 """
2735 schema = type_schema(
2736 'set-inventory',
2737 required=['name', 'destination'],
2738 state={'enum': ['enabled', 'disabled', 'absent']},
2739 name={'type': 'string', 'description': 'Name of inventory'},
2740 destination={'type': 'string', 'description': 'Name of destination bucket'},
2741 prefix={'type': 'string', 'description': 'Destination prefix'},
2742 encryption={'enum': ['SSES3', 'SSEKMS']},
2743 key_id={'type': 'string', 'description': 'Optional Customer KMS KeyId for SSE-KMS'},
2744 versions={'enum': ['All', 'Current']},
2745 schedule={'enum': ['Daily', 'Weekly']},
2746 format={'enum': ['CSV', 'ORC', 'Parquet']},
2747 fields={'type': 'array', 'items': {'enum': [
2748 'Size', 'LastModifiedDate', 'StorageClass', 'ETag',
2749 'IsMultipartUploaded', 'ReplicationStatus', 'EncryptionStatus',
2750 'ObjectLockRetainUntilDate', 'ObjectLockMode', 'ObjectLockLegalHoldStatus',
2751 'IntelligentTieringAccessTier', 'BucketKeyStatus', 'ChecksumAlgorithm',
2752 'ObjectAccessControlList', 'ObjectOwner']}})
2753
2754 permissions = ('s3:PutInventoryConfiguration', 's3:GetInventoryConfiguration')
2755
2756 def process(self, buckets):
2757 with self.executor_factory(max_workers=2) as w:
2758 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
2759 for future in as_completed(futures):
2760 bucket = futures[future]
2761 try:
2762 future.result()
2763 except Exception as e:
2764 self.log.error('Message: %s Bucket: %s', e, bucket['Name'])
2765
2766 def process_bucket(self, b):
2767 inventory_name = self.data.get('name')
2768 destination = self.data.get('destination')
2769 prefix = self.data.get('prefix', '')
2770 schedule = self.data.get('schedule', 'Daily')
2771 fields = self.data.get('fields', ['LastModifiedDate', 'Size'])
2772 versions = self.data.get('versions', 'Current')
2773 state = self.data.get('state', 'enabled')
2774 encryption = self.data.get('encryption')
2775 inventory_format = self.data.get('format', 'CSV')
2776
2777 if not prefix:
2778 prefix = "Inventories/%s" % (self.manager.config.account_id)
2779
2780 client = bucket_client(local_session(self.manager.session_factory), b)
2781 if state == 'absent':
2782 try:
2783 client.delete_bucket_inventory_configuration(
2784 Bucket=b['Name'], Id=inventory_name)
2785 except ClientError as e:
2786 if e.response['Error']['Code'] != 'NoSuchConfiguration':
2787 raise
2788 return
2789
2790 bucket = {
2791 'Bucket': "arn:aws:s3:::%s" % destination,
2792 'Format': inventory_format
2793 }
2794
2795 inventory = {
2796 'Destination': {
2797 'S3BucketDestination': bucket
2798 },
2799 'IsEnabled': state == 'enabled' and True or False,
2800 'Id': inventory_name,
2801 'OptionalFields': fields,
2802 'IncludedObjectVersions': versions,
2803 'Schedule': {
2804 'Frequency': schedule
2805 }
2806 }
2807
2808 if prefix:
2809 bucket['Prefix'] = prefix
2810
2811 if encryption:
2812 bucket['Encryption'] = {encryption: {}}
2813 if encryption == 'SSEKMS' and self.data.get('key_id'):
2814 bucket['Encryption'] = {encryption: {
2815 'KeyId': self.data['key_id']
2816 }}
2817
2818 found = self.get_inventory_delta(client, inventory, b)
2819 if found:
2820 return
2821 if found is False:
2822 self.log.debug("updating bucket:%s inventory configuration id:%s",
2823 b['Name'], inventory_name)
2824 client.put_bucket_inventory_configuration(
2825 Bucket=b['Name'], Id=inventory_name, InventoryConfiguration=inventory)
2826
2827 def get_inventory_delta(self, client, inventory, b):
2828 inventories = client.list_bucket_inventory_configurations(Bucket=b['Name'])
2829 found = None
2830 for i in inventories.get('InventoryConfigurationList', []):
2831 if i['Id'] != inventory['Id']:
2832 continue
2833 found = True
2834 for k, v in inventory.items():
2835 if k not in i:
2836 found = False
2837 continue
2838 if isinstance(v, list):
2839 v.sort()
2840 i[k].sort()
2841 if i[k] != v:
2842 found = False
2843 return found
2844
2845
2846@filters.register('intelligent-tiering')
2847class IntelligentTiering(ListItemFilter):
2848 """Filter for S3 buckets to look at intelligent tiering configurations
2849
2850 The schema to supply to the attrs follows the schema here:
2851 https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_bucket_intelligent_tiering_configurations.html
2852
2853 :example:
2854
2855 .. code-block:: yaml
2856
2857 policies:
2858 - name: s3-intelligent-tiering-configuration
2859 resource: s3
2860 filters:
2861 - type: intelligent-tiering
2862 attrs:
2863 - Status: Enabled
2864 - Filter:
2865 And:
2866 Prefix: test
2867 Tags:
2868 - Key: Owner
2869 Value: c7n
2870 - Tierings:
2871 - Days: 100
2872 - AccessTier: ARCHIVE_ACCESS
2873
2874 """
2875 schema = type_schema(
2876 'intelligent-tiering',
2877 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'},
2878 count={'type': 'number'},
2879 count_op={'$ref': '#/definitions/filters_common/comparison_operators'}
2880 )
2881 permissions = ('s3:GetIntelligentTieringConfiguration',)
2882 annotation_key = "c7n:IntelligentTiering"
2883 annotate_items = True
2884
2885 def __init__(self, data, manager=None):
2886 super().__init__(data, manager)
2887 self.data['key'] = self.annotation_key
2888
2889 def process(self, buckets, event=None):
2890 with self.executor_factory(max_workers=2) as w:
2891 futures = {w.submit(self.get_item_values, b): b for b in buckets}
2892 for future in as_completed(futures):
2893 b = futures[future]
2894 if future.exception():
2895 self.log.error("Message: %s Bucket: %s", future.exception(), b['Name'])
2896 continue
2897 return super().process(buckets, event)
2898
2899 def get_item_values(self, b):
2900 if self.annotation_key not in b:
2901 client = bucket_client(local_session(self.manager.session_factory), b)
2902 try:
2903 int_tier_config = client.list_bucket_intelligent_tiering_configurations(
2904 Bucket=b['Name'])
2905 b[self.annotation_key] = int_tier_config.get(
2906 'IntelligentTieringConfigurationList', [])
2907 except ClientError as e:
2908 if e.response['Error']['Code'] == 'AccessDenied':
2909 method = 'list_bucket_intelligent_tiering_configurations'
2910 log.warning(
2911 "Bucket:%s unable to invoke method:%s error:%s ",
2912 b['Name'], method, e.response['Error']['Message'])
2913 b.setdefault('c7n:DeniedMethods', []).append(method)
2914 return b.get(self.annotation_key)
2915
2916
2917@actions.register('set-intelligent-tiering')
2918class ConfigureIntelligentTiering(BucketActionBase):
2919 """Action applies an intelligent tiering configuration to a S3 bucket
2920
2921 The schema to supply to the configuration follows the schema here:
2922 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_bucket_intelligent_tiering_configuration.html
2923
2924 To delete a configuration, supply Status=delete with the either the Id or Id: matched
2925
2926 :example:
2927
2928 .. code-block:: yaml
2929
2930 policies:
2931 - name: s3-apply-intelligent-tiering-config
2932 resource: aws.s3
2933 filters:
2934 - not:
2935 - type: intelligent-tiering
2936 attrs:
2937 - Status: Enabled
2938 - Filter:
2939 And:
2940 Prefix: helloworld
2941 Tags:
2942 - Key: Hello
2943 Value: World
2944 - Tierings:
2945 - Days: 123
2946 AccessTier: ARCHIVE_ACCESS
2947 actions:
2948 - type: set-intelligent-tiering
2949 Id: c7n-default
2950 IntelligentTieringConfiguration:
2951 Id: c7n-default
2952 Status: Enabled
2953 Tierings:
2954 - Days: 149
2955 AccessTier: ARCHIVE_ACCESS
2956
2957 - name: s3-delete-intelligent-tiering-configuration
2958 resource: aws.s3
2959 filters:
2960 - type: intelligent-tiering
2961 attrs:
2962 - Status: Enabled
2963 - Id: test-config
2964 actions:
2965 - type: set-intelligent-tiering
2966 Id: test-config
2967 State: delete
2968
2969 - name: s3-delete-intelligent-tiering-matched-configs
2970 resource: aws.s3
2971 filters:
2972 - type: intelligent-tiering
2973 attrs:
2974 - Status: Enabled
2975 - Id: test-config
2976 actions:
2977 - type: set-intelligent-tiering
2978 Id: matched
2979 State: delete
2980
2981 """
2982
2983 annotation_key = 'c7n:ListItemMatches'
2984 shape = 'PutBucketIntelligentTieringConfigurationRequest'
2985 schema = {
2986 'type': 'object',
2987 'additionalProperties': False,
2988 'oneOf': [
2989 {'required': ['type', 'Id', 'IntelligentTieringConfiguration']},
2990 {'required': ['type', 'Id', 'State']}],
2991 'properties': {
2992 'type': {'enum': ['set-intelligent-tiering']},
2993 'Id': {'type': 'string'},
2994 # delete intelligent tier configurations via state: delete
2995 'State': {'type': 'string', 'enum': ['delete']},
2996 'IntelligentTieringConfiguration': {'type': 'object'}
2997 },
2998 }
2999
3000 permissions = ('s3:PutIntelligentTieringConfiguration',)
3001
3002 def validate(self):
3003 # You can have up to 1,000 S3 Intelligent-Tiering configurations per bucket.
3004 # Hence, always use it with a filter
3005 found = False
3006 for f in self.manager.iter_filters():
3007 if isinstance(f, IntelligentTiering):
3008 found = True
3009 break
3010 if not found:
3011 raise PolicyValidationError(
3012 '`set-intelligent-tiering` may only be used in '
3013 'conjunction with `intelligent-tiering` filter on %s' % (self.manager.data,))
3014 cfg = dict(self.data)
3015 if 'IntelligentTieringConfiguration' in cfg:
3016 cfg['Bucket'] = 'bucket'
3017 cfg.pop('type')
3018 return shape_validate(
3019 cfg, self.shape, self.manager.resource_type.service)
3020
3021 def process(self, buckets):
3022 with self.executor_factory(max_workers=3) as w:
3023 futures = {}
3024
3025 for b in buckets:
3026 futures[w.submit(self.process_bucket, b)] = b
3027
3028 for future in as_completed(futures):
3029 if future.exception():
3030 bucket = futures[future]
3031 self.log.error(
3032 'error modifying bucket intelligent tiering configuration: %s\n%s',
3033 bucket['Name'], future.exception())
3034 continue
3035
3036 def process_bucket(self, bucket):
3037 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3038
3039 if 'list_bucket_intelligent_tiering_configurations' in bucket.get(
3040 'c7n:DeniedMethods', []):
3041 log.warning("Access Denied Bucket:%s while reading intelligent tiering configurations"
3042 % bucket['Name'])
3043 return
3044
3045 if self.data.get('Id') and self.data.get('IntelligentTieringConfiguration'):
3046 try:
3047 s3.put_bucket_intelligent_tiering_configuration(
3048 Bucket=bucket['Name'], Id=self.data.get(
3049 'Id'), IntelligentTieringConfiguration=self.data.get(
3050 'IntelligentTieringConfiguration'))
3051 except ClientError as e:
3052 if e.response['Error']['Code'] == 'AccessDenied':
3053 log.warning(
3054 "Access Denied Bucket:%s while applying intelligent tiering configuration"
3055 % bucket['Name'])
3056 if self.data.get('State'):
3057 if self.data.get('Id') == 'matched':
3058 for config in bucket.get(self.annotation_key):
3059 self.delete_intelligent_tiering_configurations(s3, config.get('Id'), bucket)
3060 else:
3061 self.delete_intelligent_tiering_configurations(s3, self.data.get('Id'), bucket)
3062
3063 def delete_intelligent_tiering_configurations(self, s3_client, id, bucket):
3064 try:
3065 s3_client.delete_bucket_intelligent_tiering_configuration(Bucket=bucket['Name'], Id=id)
3066 except ClientError as e:
3067 if e.response['Error']['Code'] == 'AccessDenied':
3068 log.warning(
3069 "Access Denied Bucket:%s while deleting intelligent tiering configuration"
3070 % bucket['Name'])
3071 elif e.response['Error']['Code'] == 'NoSuchConfiguration':
3072 log.warning(
3073 "No such configuration found:%s while deleting intelligent tiering configuration"
3074 % bucket['Name'])
3075
3076
3077@actions.register('delete')
3078class DeleteBucket(ScanBucket):
3079 """Action deletes a S3 bucket
3080
3081 :example:
3082
3083 .. code-block:: yaml
3084
3085 policies:
3086 - name: delete-unencrypted-buckets
3087 resource: s3
3088 filters:
3089 - type: missing-statement
3090 statement_ids:
3091 - RequiredEncryptedPutObject
3092 actions:
3093 - type: delete
3094 remove-contents: true
3095 """
3096
3097 schema = type_schema('delete', **{'remove-contents': {'type': 'boolean'}})
3098
3099 permissions = ('s3:*',)
3100
3101 bucket_ops = {
3102 'standard': {
3103 'iterator': 'list_objects',
3104 'contents_key': ['Contents'],
3105 'key_processor': 'process_key'
3106 },
3107 'versioned': {
3108 'iterator': 'list_object_versions',
3109 'contents_key': ['Versions', 'DeleteMarkers'],
3110 'key_processor': 'process_version'
3111 }
3112 }
3113
3114 def process_delete_enablement(self, b):
3115 """Prep a bucket for deletion.
3116
3117 Clear out any pending multi-part uploads.
3118
3119 Disable versioning on the bucket, so deletes don't
3120 generate fresh deletion markers.
3121 """
3122 client = bucket_client(
3123 local_session(self.manager.session_factory), b)
3124
3125 # Stop replication so we can suspend versioning
3126 if b.get('Replication') is not None:
3127 client.delete_bucket_replication(Bucket=b['Name'])
3128
3129 # Suspend versioning, so we don't get new delete markers
3130 # as we walk and delete versions
3131 if (self.get_bucket_style(b) == 'versioned' and b['Versioning']['Status'] == 'Enabled' and
3132 self.data.get('remove-contents', True)):
3133 client.put_bucket_versioning(
3134 Bucket=b['Name'],
3135 VersioningConfiguration={'Status': 'Suspended'})
3136
3137 # Clear our multi-part uploads
3138 uploads = client.get_paginator('list_multipart_uploads')
3139 for p in uploads.paginate(Bucket=b['Name']):
3140 for u in p.get('Uploads', ()):
3141 client.abort_multipart_upload(
3142 Bucket=b['Name'],
3143 Key=u['Key'],
3144 UploadId=u['UploadId'])
3145
3146 def process(self, buckets):
3147 # might be worth sanity checking all our permissions
3148 # on the bucket up front before disabling versioning/replication.
3149 if self.data.get('remove-contents', True):
3150 self._process_with_futures(self.process_delete_enablement, buckets)
3151 self.empty_buckets(buckets)
3152
3153 results = self._process_with_futures(self.delete_bucket, buckets)
3154 self.write_denied_buckets_file()
3155 return results
3156
3157 def delete_bucket(self, b):
3158 s3 = bucket_client(self.manager.session_factory(), b)
3159 try:
3160 self._run_api(s3.delete_bucket, Bucket=b['Name'])
3161 except ClientError as e:
3162 if e.response['Error']['Code'] == 'BucketNotEmpty':
3163 self.log.error(
3164 "Error while deleting bucket %s, bucket not empty" % (
3165 b['Name']))
3166 else:
3167 raise e
3168
3169 def empty_buckets(self, buckets):
3170 t = time.time()
3171 results = super(DeleteBucket, self).process(buckets)
3172 run_time = time.time() - t
3173 object_count = 0
3174
3175 for r in results:
3176 object_count += r['Count']
3177 self.manager.ctx.metrics.put_metric(
3178 "Total Keys", object_count, "Count", Scope=r['Bucket'],
3179 buffer=True)
3180 self.manager.ctx.metrics.put_metric(
3181 "Total Keys", object_count, "Count", Scope="Account", buffer=True)
3182 self.manager.ctx.metrics.flush()
3183
3184 log.info(
3185 "EmptyBucket buckets:%d Complete keys:%d rate:%0.2f/s time:%0.2fs",
3186 len(buckets), object_count,
3187 float(object_count) / run_time if run_time else 0, run_time)
3188 return results
3189
3190 def process_chunk(self, batch, bucket):
3191 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3192 objects = []
3193 for key in batch:
3194 obj = {'Key': key['Key']}
3195 if 'VersionId' in key:
3196 obj['VersionId'] = key['VersionId']
3197 objects.append(obj)
3198 results = s3.delete_objects(
3199 Bucket=bucket['Name'], Delete={'Objects': objects}).get('Deleted', ())
3200 if self.get_bucket_style(bucket) != 'versioned':
3201 return results
3202
3203
3204@actions.register('configure-lifecycle')
3205class Lifecycle(BucketActionBase):
3206 """Action applies a lifecycle policy to versioned S3 buckets
3207
3208 The schema to supply to the rule follows the schema here:
3209 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_bucket_lifecycle_configuration
3210
3211 To delete a lifecycle rule, supply Status=absent
3212
3213 :example:
3214
3215 .. code-block:: yaml
3216
3217 policies:
3218 - name: s3-apply-lifecycle
3219 resource: s3
3220 actions:
3221 - type: configure-lifecycle
3222 rules:
3223 - ID: my-lifecycle-id
3224 Status: Enabled
3225 Prefix: foo/
3226 Transitions:
3227 - Days: 60
3228 StorageClass: GLACIER
3229
3230 """
3231
3232 schema = type_schema(
3233 'configure-lifecycle',
3234 **{
3235 'rules': {
3236 'type': 'array',
3237 'items': {
3238 'type': 'object',
3239 'required': ['ID', 'Status'],
3240 'additionalProperties': False,
3241 'properties': {
3242 'ID': {'type': 'string'},
3243 # c7n intercepts `absent`
3244 'Status': {'enum': ['Enabled', 'Disabled', 'absent']},
3245 'Prefix': {'type': 'string'},
3246 'Expiration': {
3247 'type': 'object',
3248 'additionalProperties': False,
3249 'properties': {
3250 'Date': {'type': 'string'}, # Date
3251 'Days': {'type': 'integer'},
3252 'ExpiredObjectDeleteMarker': {'type': 'boolean'},
3253 },
3254 },
3255 'Filter': {
3256 'type': 'object',
3257 'minProperties': 1,
3258 'maxProperties': 1,
3259 'additionalProperties': False,
3260 'properties': {
3261 'Prefix': {'type': 'string'},
3262 'ObjectSizeGreaterThan': {'type': 'integer'},
3263 'ObjectSizeLessThan': {'type': 'integer'},
3264 'Tag': {
3265 'type': 'object',
3266 'required': ['Key', 'Value'],
3267 'additionalProperties': False,
3268 'properties': {
3269 'Key': {'type': 'string'},
3270 'Value': {'type': 'string'},
3271 },
3272 },
3273 'And': {
3274 'type': 'object',
3275 'additionalProperties': False,
3276 'properties': {
3277 'Prefix': {'type': 'string'},
3278 'ObjectSizeGreaterThan': {'type': 'integer'},
3279 'ObjectSizeLessThan': {'type': 'integer'},
3280 'Tags': {
3281 'type': 'array',
3282 'items': {
3283 'type': 'object',
3284 'required': ['Key', 'Value'],
3285 'additionalProperties': False,
3286 'properties': {
3287 'Key': {'type': 'string'},
3288 'Value': {'type': 'string'},
3289 },
3290 },
3291 },
3292 },
3293 },
3294 },
3295 },
3296 'Transitions': {
3297 'type': 'array',
3298 'items': {
3299 'type': 'object',
3300 'additionalProperties': False,
3301 'properties': {
3302 'Date': {'type': 'string'}, # Date
3303 'Days': {'type': 'integer'},
3304 'StorageClass': {'type': 'string'},
3305 },
3306 },
3307 },
3308 'NoncurrentVersionTransitions': {
3309 'type': 'array',
3310 'items': {
3311 'type': 'object',
3312 'additionalProperties': False,
3313 'properties': {
3314 'NoncurrentDays': {'type': 'integer'},
3315 'NewerNoncurrentVersions': {'type': 'integer'},
3316 'StorageClass': {'type': 'string'},
3317 },
3318 },
3319 },
3320 'NoncurrentVersionExpiration': {
3321 'type': 'object',
3322 'additionalProperties': False,
3323 'properties': {
3324 'NoncurrentDays': {'type': 'integer'},
3325 'NewerNoncurrentVersions': {'type': 'integer'}
3326 },
3327 },
3328 'AbortIncompleteMultipartUpload': {
3329 'type': 'object',
3330 'additionalProperties': False,
3331 'properties': {
3332 'DaysAfterInitiation': {'type': 'integer'},
3333 },
3334 },
3335 },
3336 },
3337 },
3338 }
3339 )
3340
3341 permissions = ('s3:GetLifecycleConfiguration', 's3:PutLifecycleConfiguration')
3342
3343 def process(self, buckets):
3344 with self.executor_factory(max_workers=3) as w:
3345 futures = {}
3346 results = []
3347
3348 for b in buckets:
3349 futures[w.submit(self.process_bucket, b)] = b
3350
3351 for future in as_completed(futures):
3352 if future.exception():
3353 bucket = futures[future]
3354 self.log.error('error modifying bucket lifecycle: %s\n%s',
3355 bucket['Name'], future.exception())
3356 results += filter(None, [future.result()])
3357
3358 return results
3359
3360 def process_bucket(self, bucket):
3361 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3362
3363 if 'get_bucket_lifecycle_configuration' in bucket.get('c7n:DeniedMethods', []):
3364 log.warning("Access Denied Bucket:%s while reading lifecycle" % bucket['Name'])
3365 return
3366
3367 # Adjust the existing lifecycle by adding/deleting/overwriting rules as necessary
3368 config = (bucket.get('Lifecycle') or {}).get('Rules', [])
3369 for rule in self.data['rules']:
3370 for index, existing_rule in enumerate(config):
3371 if not existing_rule:
3372 continue
3373 if rule['ID'] == existing_rule['ID']:
3374 if rule['Status'] == 'absent':
3375 config[index] = None
3376 else:
3377 config[index] = rule
3378 break
3379 else:
3380 if rule['Status'] != 'absent':
3381 config.append(rule)
3382
3383 # The extra `list` conversion is required for python3
3384 config = list(filter(None, config))
3385
3386 try:
3387 if not config:
3388 s3.delete_bucket_lifecycle(Bucket=bucket['Name'])
3389 else:
3390 s3.put_bucket_lifecycle_configuration(
3391 Bucket=bucket['Name'], LifecycleConfiguration={'Rules': config})
3392 except ClientError as e:
3393 if e.response['Error']['Code'] == 'AccessDenied':
3394 log.warning("Access Denied Bucket:%s while applying lifecycle" % bucket['Name'])
3395 else:
3396 raise e
3397
3398
3399class KMSKeyResolverMixin:
3400 """Builds a dictionary of region specific ARNs"""
3401
3402 def __init__(self, data, manager=None):
3403 self.arns = dict()
3404 self.data = data
3405 self.manager = manager
3406
3407 def resolve_keys(self, buckets):
3408 key = self.data.get('key')
3409 if not key:
3410 return None
3411
3412 regions = {get_region(b) for b in buckets}
3413 for r in regions:
3414 client = local_session(self.manager.session_factory).client('kms', region_name=r)
3415 try:
3416 key_meta = client.describe_key(
3417 KeyId=key
3418 ).get('KeyMetadata', {})
3419 key_id = key_meta.get('KeyId')
3420
3421 # We need a complete set of alias identifiers (names and ARNs)
3422 # to fully evaluate bucket encryption filters.
3423 key_aliases = client.list_aliases(
3424 KeyId=key_id
3425 ).get('Aliases', [])
3426
3427 self.arns[r] = {
3428 'KeyId': key_id,
3429 'Arn': key_meta.get('Arn'),
3430 'KeyManager': key_meta.get('KeyManager'),
3431 'Description': key_meta.get('Description'),
3432 'Aliases': [
3433 alias[attr]
3434 for alias in key_aliases
3435 for attr in ('AliasArn', 'AliasName')
3436 ],
3437 }
3438
3439 except ClientError as e:
3440 self.log.error('Error resolving kms ARNs for set-bucket-encryption: %s key: %s' % (
3441 e, self.data.get('key')))
3442
3443 def get_key(self, bucket):
3444 if 'key' not in self.data:
3445 return None
3446 region = get_region(bucket)
3447 key = self.arns.get(region)
3448 if not key:
3449 self.log.warning('Unable to resolve key %s for bucket %s in region %s',
3450 self.data['key'], bucket.get('Name'), region)
3451 return key
3452
3453
3454@filters.register('bucket-encryption')
3455class BucketEncryption(KMSKeyResolverMixin, Filter):
3456 """Filters for S3 buckets that have bucket-encryption
3457
3458 :example
3459
3460 .. code-block:: yaml
3461
3462 policies:
3463 - name: s3-bucket-encryption-AES256
3464 resource: s3
3465 region: us-east-1
3466 filters:
3467 - type: bucket-encryption
3468 state: True
3469 crypto: AES256
3470 - name: s3-bucket-encryption-KMS
3471 resource: s3
3472 region: us-east-1
3473 filters:
3474 - type: bucket-encryption
3475 state: True
3476 crypto: aws:kms
3477 key: alias/some/alias/key
3478 - name: s3-bucket-encryption-off
3479 resource: s3
3480 region: us-east-1
3481 filters:
3482 - type: bucket-encryption
3483 state: False
3484 - name: s3-bucket-test-bucket-key-enabled
3485 resource: s3
3486 region: us-east-1
3487 filters:
3488 - type: bucket-encryption
3489 bucket_key_enabled: True
3490 """
3491 schema = type_schema('bucket-encryption',
3492 state={'type': 'boolean'},
3493 crypto={'type': 'string', 'enum': ['AES256', 'aws:kms']},
3494 key={'type': 'string'},
3495 bucket_key_enabled={'type': 'boolean'})
3496
3497 permissions = ('s3:GetEncryptionConfiguration', 'kms:DescribeKey', 'kms:ListAliases')
3498 annotation_key = 'c7n:bucket-encryption'
3499
3500 def validate(self):
3501 if self.data.get('bucket_key_enabled') is not None and self.data.get('key') is not None:
3502 raise PolicyValidationError(
3503 f'key and bucket_key_enabled attributes cannot both be set: {self.data}'
3504 )
3505
3506 def process(self, buckets, event=None):
3507 self.resolve_keys(buckets)
3508 results = []
3509 with self.executor_factory(max_workers=2) as w:
3510 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3511 for future in as_completed(futures):
3512 b = futures[future]
3513 if future.exception():
3514 self.log.error("Message: %s Bucket: %s", future.exception(),
3515 b['Name'])
3516 continue
3517 if future.result():
3518 results.append(b)
3519 return results
3520
3521 def process_bucket(self, b):
3522
3523 client = bucket_client(local_session(self.manager.session_factory), b)
3524 rules = []
3525 if self.annotation_key not in b:
3526 try:
3527 be = client.get_bucket_encryption(Bucket=b['Name'])
3528 be.pop('ResponseMetadata', None)
3529 except ClientError as e:
3530 if e.response['Error']['Code'] != 'ServerSideEncryptionConfigurationNotFoundError':
3531 raise
3532 be = {}
3533 b[self.annotation_key] = be
3534 else:
3535 be = b[self.annotation_key]
3536
3537 rules = be.get('ServerSideEncryptionConfiguration', {}).get('Rules', [])
3538 # default `state` to True as previous impl assumed state == True
3539 # to preserve backwards compatibility
3540 if self.data.get('bucket_key_enabled'):
3541 for rule in rules:
3542 return self.filter_bucket_key_enabled(rule)
3543 elif self.data.get('bucket_key_enabled') is False:
3544 for rule in rules:
3545 return not self.filter_bucket_key_enabled(rule)
3546
3547 if self.data.get('state', True):
3548 for sse in rules:
3549 return self.filter_bucket(b, sse)
3550 return False
3551 else:
3552 for sse in rules:
3553 return not self.filter_bucket(b, sse)
3554 return True
3555
3556 def filter_bucket(self, b, sse):
3557 allowed = ['AES256', 'aws:kms']
3558 key = self.get_key(b)
3559 crypto = self.data.get('crypto')
3560 rule = sse.get('ApplyServerSideEncryptionByDefault')
3561
3562 if not rule:
3563 return False
3564 algo = rule.get('SSEAlgorithm')
3565
3566 if not crypto and algo in allowed:
3567 return True
3568
3569 if crypto == 'AES256' and algo == 'AES256':
3570 return True
3571 elif crypto == 'aws:kms' and algo == 'aws:kms':
3572 if not key:
3573 # There are two broad reasons to have an empty value for
3574 # the regional key here:
3575 #
3576 # * The policy did not specify a key, in which case this
3577 # filter should match _all_ buckets with a KMS default
3578 # encryption rule.
3579 #
3580 # * The policy specified a key that could not be
3581 # resolved, in which case this filter shouldn't match
3582 # any buckets.
3583 return 'key' not in self.data
3584
3585 # The default encryption rule can specify a key ID,
3586 # key ARN, alias name or alias ARN. Match against any of
3587 # those attributes. A rule specifying KMS with no master key
3588 # implies the AWS-managed key.
3589 key_ids = {key.get('Arn'), key.get('KeyId'), *key['Aliases']}
3590 return rule.get('KMSMasterKeyID', 'alias/aws/s3') in key_ids
3591
3592 def filter_bucket_key_enabled(self, rule) -> bool:
3593 if not rule:
3594 return False
3595 return rule.get('BucketKeyEnabled')
3596
3597
3598@actions.register('set-bucket-encryption')
3599class SetBucketEncryption(KMSKeyResolverMixin, BucketActionBase):
3600 """Action enables default encryption on S3 buckets
3601
3602 `enabled`: boolean Optional: Defaults to True
3603
3604 `crypto`: aws:kms | AES256` Optional: Defaults to AES256
3605
3606 `key`: arn, alias, or kms id key
3607
3608 `bucket-key`: boolean Optional:
3609 Defaults to True.
3610 Reduces amount of API traffic from Amazon S3 to KMS and can reduce KMS request
3611 costsby up to 99 percent. Requires kms:Decrypt permissions for copy and upload
3612 on the AWS KMS Key Policy.
3613
3614 Bucket Key Docs: https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucket-key.html
3615
3616 :example:
3617
3618 .. code-block:: yaml
3619
3620 policies:
3621 - name: s3-enable-default-encryption-kms
3622 resource: s3
3623 actions:
3624 - type: set-bucket-encryption
3625 # enabled: true <------ optional (true by default)
3626 crypto: aws:kms
3627 key: 1234abcd-12ab-34cd-56ef-1234567890ab
3628 bucket-key: true
3629
3630 - name: s3-enable-default-encryption-kms-alias
3631 resource: s3
3632 actions:
3633 - type: set-bucket-encryption
3634 # enabled: true <------ optional (true by default)
3635 crypto: aws:kms
3636 key: alias/some/alias/key
3637 bucket-key: true
3638
3639 - name: s3-enable-default-encryption-aes256
3640 resource: s3
3641 actions:
3642 - type: set-bucket-encryption
3643 # bucket-key: true <--- optional (true by default for AWS SSE)
3644 # crypto: AES256 <----- optional (AES256 by default)
3645 # enabled: true <------ optional (true by default)
3646
3647 - name: s3-disable-default-encryption
3648 resource: s3
3649 actions:
3650 - type: set-bucket-encryption
3651 enabled: false
3652 """
3653
3654 schema = {
3655 'type': 'object',
3656 'additionalProperties': False,
3657 'properties': {
3658 'type': {'enum': ['set-bucket-encryption']},
3659 'enabled': {'type': 'boolean'},
3660 'crypto': {'enum': ['aws:kms', 'AES256']},
3661 'key': {'type': 'string'},
3662 'bucket-key': {'type': 'boolean'}
3663 },
3664 'dependencies': {
3665 'key': {
3666 'properties': {
3667 'crypto': {'pattern': 'aws:kms'}
3668 },
3669 'required': ['crypto']
3670 }
3671 }
3672 }
3673
3674 permissions = ('s3:PutEncryptionConfiguration', 's3:GetEncryptionConfiguration',
3675 'kms:ListAliases', 'kms:DescribeKey')
3676
3677 def process(self, buckets):
3678 if self.data.get('enabled', True):
3679 self.resolve_keys(buckets)
3680
3681 with self.executor_factory(max_workers=3) as w:
3682 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3683 for future in as_completed(futures):
3684 if future.exception():
3685 self.log.error('Message: %s Bucket: %s', future.exception(),
3686 futures[future]['Name'])
3687
3688 def process_bucket(self, bucket):
3689 default_key_desc = 'Default master key that protects my S3 objects when no other key is defined' # noqa
3690 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3691 if not self.data.get('enabled', True):
3692 s3.delete_bucket_encryption(Bucket=bucket['Name'])
3693 return
3694 algo = self.data.get('crypto', 'AES256')
3695
3696 # bucket key defaults to True for alias/aws/s3 and AES256 (Amazon SSE)
3697 # and ignores False values for that crypto
3698 bucket_key = self.data.get('bucket-key', True)
3699 config = {
3700 'Rules': [
3701 {
3702 'ApplyServerSideEncryptionByDefault': {
3703 'SSEAlgorithm': algo,
3704 },
3705 'BucketKeyEnabled': bucket_key
3706 }
3707 ]
3708 }
3709
3710 if algo == 'aws:kms':
3711 key = self.get_key(bucket)
3712 if not key:
3713 raise Exception('Valid KMS Key required but does not exist')
3714
3715 config['Rules'][0]['ApplyServerSideEncryptionByDefault']['KMSMasterKeyID'] = key['Arn']
3716 s3.put_bucket_encryption(
3717 Bucket=bucket['Name'],
3718 ServerSideEncryptionConfiguration=config
3719 )
3720
3721
3722OWNERSHIP_CONTROLS = ['BucketOwnerEnforced', 'BucketOwnerPreferred', 'ObjectWriter']
3723VALUE_FILTER_MAGIC_VALUES = ['absent', 'present', 'not-null', 'empty']
3724
3725
3726@filters.register('ownership')
3727class BucketOwnershipControls(BucketFilterBase, ValueFilter):
3728 """Filter for object ownership controls
3729
3730 Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html
3731
3732 :example
3733
3734 Find buckets with ACLs disabled
3735
3736 .. code-block:: yaml
3737
3738 policies:
3739 - name: s3-bucket-acls-disabled
3740 resource: aws.s3
3741 region: us-east-1
3742 filters:
3743 - type: ownership
3744 value: BucketOwnerEnforced
3745
3746 :example
3747
3748 Find buckets with object ownership preferred or enforced
3749
3750 .. code-block:: yaml
3751
3752 policies:
3753 - name: s3-bucket-ownership-preferred
3754 resource: aws.s3
3755 region: us-east-1
3756 filters:
3757 - type: ownership
3758 op: in
3759 value:
3760 - BucketOwnerEnforced
3761 - BucketOwnerPreferred
3762
3763 :example
3764
3765 Find buckets with no object ownership controls
3766
3767 .. code-block:: yaml
3768
3769 policies:
3770 - name: s3-bucket-no-ownership-controls
3771 resource: aws.s3
3772 region: us-east-1
3773 filters:
3774 - type: ownership
3775 value: empty
3776 """
3777 schema = type_schema('ownership', rinherit=ValueFilter.schema, value={'oneOf': [
3778 {'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES},
3779 {'type': 'array', 'items': {
3780 'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}}]})
3781 permissions = ('s3:GetBucketOwnershipControls',)
3782 annotation_key = 'c7n:ownership'
3783
3784 def __init__(self, data, manager=None):
3785 super(BucketOwnershipControls, self).__init__(data, manager)
3786
3787 # Ownership controls appear as an array of rules. There can only be one
3788 # ObjectOwnership rule defined for a bucket, so we can automatically
3789 # match against that if it exists.
3790 self.data['key'] = f'("{self.annotation_key}".Rules[].ObjectOwnership)[0]'
3791
3792 def process(self, buckets, event=None):
3793 with self.executor_factory(max_workers=2) as w:
3794 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3795 for future in as_completed(futures):
3796 b = futures[future]
3797 if future.exception():
3798 self.log.error("Message: %s Bucket: %s", future.exception(),
3799 b['Name'])
3800 continue
3801 return super(BucketOwnershipControls, self).process(buckets, event)
3802
3803 def process_bucket(self, b):
3804 if self.annotation_key in b:
3805 return
3806 client = bucket_client(local_session(self.manager.session_factory), b)
3807 try:
3808 controls = client.get_bucket_ownership_controls(Bucket=b['Name'])
3809 controls.pop('ResponseMetadata', None)
3810 except ClientError as e:
3811 if e.response['Error']['Code'] != 'OwnershipControlsNotFoundError':
3812 raise
3813 controls = {}
3814 b[self.annotation_key] = controls.get('OwnershipControls')
3815
3816
3817@filters.register('bucket-replication')
3818class BucketReplication(ListItemFilter):
3819 """Filter for S3 buckets to look at bucket replication configurations
3820
3821 The schema to supply to the attrs follows the schema here:
3822 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_bucket_replication.html
3823
3824 :example:
3825
3826 .. code-block:: yaml
3827
3828 policies:
3829 - name: s3-bucket-replication
3830 resource: s3
3831 filters:
3832 - type: bucket-replication
3833 attrs:
3834 - Status: Enabled
3835 - Filter:
3836 And:
3837 Prefix: test
3838 Tags:
3839 - Key: Owner
3840 Value: c7n
3841 - ExistingObjectReplication: Enabled
3842
3843 """
3844 schema = type_schema(
3845 'bucket-replication',
3846 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'},
3847 count={'type': 'number'},
3848 count_op={'$ref': '#/definitions/filters_common/comparison_operators'}
3849 )
3850
3851 permissions = ("s3:GetReplicationConfiguration",)
3852 annotation_key = 'Replication'
3853 annotate_items = True
3854
3855 def __init__(self, data, manager=None):
3856 super().__init__(data, manager)
3857 self.data['key'] = self.annotation_key
3858
3859 def get_item_values(self, b):
3860 client = bucket_client(local_session(self.manager.session_factory), b)
3861 # replication configuration is called in S3_AUGMENT_TABLE:
3862 bucket_replication = b.get(self.annotation_key)
3863
3864 rules = []
3865 if bucket_replication is not None:
3866 rules = bucket_replication.get('ReplicationConfiguration', {}).get('Rules', [])
3867 for replication in rules:
3868 self.augment_bucket_replication(b, replication, client)
3869
3870 return rules
3871
3872 def augment_bucket_replication(self, b, replication, client):
3873 destination_bucket = replication.get('Destination').get('Bucket').split(':')[5]
3874 try:
3875 destination_region = inspect_bucket_region(destination_bucket, client.meta.endpoint_url)
3876 except ValueError:
3877 replication['DestinationBucketAvailable'] = False
3878 return
3879 source_region = get_region(b)
3880 replication['DestinationBucketAvailable'] = True
3881 replication['DestinationRegion'] = destination_region
3882 replication['CrossRegion'] = destination_region != source_region
3883
3884
3885@resources.register('s3-directory')
3886class S3Directory(query.QueryResourceManager):
3887
3888 class resource_type(query.TypeInfo):
3889 service = 's3'
3890 permission_prefix = "s3express"
3891 arn_service = "s3express"
3892 arn_type = 'bucket'
3893 enum_spec = ('list_directory_buckets', 'Buckets[]', None)
3894 name = id = 'Name'
3895 date = 'CreationDate'
3896 dimension = 'BucketName'
3897 cfn_type = 'AWS::S3Express::DirectoryBucket'
3898 permissions_enum = ("s3express:ListAllMyDirectoryBuckets",)