Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/s3.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""S3 Resource Manager
5Filters:
7The generic Values filters (jmespath) expression and Or filter are
8available with all resources, including buckets, we include several
9additonal bucket data (Tags, Replication, Acl, Policy) as keys within
10a bucket representation.
12Actions:
14 encrypt-keys
16 Scan all keys in a bucket and optionally encrypt them in place.
18 global-grants
20 Check bucket acls for global grants
22 encryption-policy
24 Attach an encryption required policy to a bucket, this will break
25 applications that are not using encryption, including aws log
26 delivery.
28"""
29import copy
30import functools
31import json
32import itertools
33import logging
34import math
35import os
36import time
37import ssl
39from botocore.client import Config
40from botocore.exceptions import ClientError
42from collections import defaultdict
43from concurrent.futures import as_completed
45try:
46 from urllib3.exceptions import SSLError
47except ImportError:
48 from botocore.vendored.requests.packages.urllib3.exceptions import SSLError
51from c7n.actions import (
52 ActionRegistry, BaseAction, PutMetric, RemovePolicyBase)
53from c7n.exceptions import PolicyValidationError, PolicyExecutionError
54from c7n.filters import (
55 FilterRegistry, Filter, CrossAccountAccessFilter, MetricsFilter,
56 ValueFilter, ListItemFilter)
57from .aws import shape_validate
58import c7n.filters.policystatement as polstmt_filter
59from c7n.manager import resources
60from c7n.output import NullBlobOutput
61from c7n import query
62from c7n.resources.securityhub import PostFinding
63from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction
64from c7n.utils import (
65 chunks, local_session, set_annotation, type_schema, filter_empty,
66 dumps, format_string_values, get_account_alias_from_sts)
67from c7n.resources.aws import inspect_bucket_region
70log = logging.getLogger('custodian.s3')
72filters = FilterRegistry('s3.filters')
73actions = ActionRegistry('s3.actions')
74filters.register('marked-for-op', TagActionFilter)
75actions.register('put-metric', PutMetric)
77MAX_COPY_SIZE = 1024 * 1024 * 1024 * 2
80class DescribeS3(query.DescribeSource):
82 def augment(self, buckets):
83 with self.manager.executor_factory(
84 max_workers=min((10, len(buckets) + 1))) as w:
85 results = w.map(
86 assemble_bucket,
87 zip(itertools.repeat(self.manager.session_factory), buckets))
88 results = list(filter(None, results))
89 return results
92class ConfigS3(query.ConfigSource):
94 # normalize config's janky idiosyncratic bespoke formating to the
95 # standard describe api responses.
97 def get_query_params(self, query):
98 q = super(ConfigS3, self).get_query_params(query)
99 if 'expr' in q:
100 q['expr'] = q['expr'].replace('select ', 'select awsRegion, ')
101 return q
103 def load_resource(self, item):
104 resource = super(ConfigS3, self).load_resource(item)
105 cfg = item['supplementaryConfiguration']
106 # aka standard
107 if 'awsRegion' in item and item['awsRegion'] != 'us-east-1':
108 resource['Location'] = {'LocationConstraint': item['awsRegion']}
109 else:
110 resource['Location'] = {}
112 # owner is under acl per describe
113 resource.pop('Owner', None)
115 for k, null_value in S3_CONFIG_SUPPLEMENT_NULL_MAP.items():
116 if k not in cfg:
117 continue
118 if cfg.get(k) == null_value:
119 continue
120 method = getattr(self, "handle_%s" % k, None)
121 if method is None:
122 raise ValueError("unhandled supplementary config %s", k)
123 continue
124 v = cfg[k]
125 if isinstance(cfg[k], str):
126 v = json.loads(cfg[k])
127 method(resource, v)
129 for el in S3_AUGMENT_TABLE:
130 if el[1] not in resource:
131 resource[el[1]] = el[2]
132 return resource
134 PERMISSION_MAP = {
135 'FullControl': 'FULL_CONTROL',
136 'Write': 'WRITE',
137 'WriteAcp': 'WRITE_ACP',
138 'Read': 'READ',
139 'ReadAcp': 'READ_ACP'}
141 GRANTEE_MAP = {
142 'AllUsers': "http://acs.amazonaws.com/groups/global/AllUsers",
143 'AuthenticatedUsers': "http://acs.amazonaws.com/groups/global/AuthenticatedUsers",
144 'LogDelivery': 'http://acs.amazonaws.com/groups/s3/LogDelivery'}
146 def handle_AccessControlList(self, resource, item_value):
147 # double serialized in config for some reason
148 if isinstance(item_value, str):
149 item_value = json.loads(item_value)
151 resource['Acl'] = {}
152 resource['Acl']['Owner'] = {'ID': item_value['owner']['id']}
153 if item_value['owner']['displayName']:
154 resource['Acl']['Owner']['DisplayName'] = item_value[
155 'owner']['displayName']
156 resource['Acl']['Grants'] = grants = []
158 for g in (item_value.get('grantList') or ()):
159 if 'id' not in g['grantee']:
160 assert g['grantee'] in self.GRANTEE_MAP, "unknown grantee %s" % g
161 rg = {'Type': 'Group', 'URI': self.GRANTEE_MAP[g['grantee']]}
162 else:
163 rg = {'ID': g['grantee']['id'], 'Type': 'CanonicalUser'}
165 if 'displayName' in g:
166 rg['DisplayName'] = g['displayName']
168 grants.append({
169 'Permission': self.PERMISSION_MAP[g['permission']],
170 'Grantee': rg,
171 })
173 def handle_BucketAccelerateConfiguration(self, resource, item_value):
174 # not currently auto-augmented by custodian
175 return
177 def handle_BucketLoggingConfiguration(self, resource, item_value):
178 if ('destinationBucketName' not in item_value or
179 item_value['destinationBucketName'] is None):
180 resource[u'Logging'] = {}
181 return
182 resource[u'Logging'] = {
183 'TargetBucket': item_value['destinationBucketName'],
184 'TargetPrefix': item_value['logFilePrefix']}
186 def handle_BucketLifecycleConfiguration(self, resource, item_value):
187 rules = []
188 for r in item_value.get('rules'):
189 rr = {}
190 rules.append(rr)
191 expiry = {}
192 for ek, ck in (
193 ('Date', 'expirationDate'),
194 ('ExpiredObjectDeleteMarker', 'expiredObjectDeleteMarker'),
195 ('Days', 'expirationInDays')):
196 if ck in r and r[ck] and r[ck] != -1:
197 expiry[ek] = r[ck]
198 if expiry:
199 rr['Expiration'] = expiry
201 transitions = []
202 for t in (r.get('transitions') or ()):
203 tr = {}
204 for k in ('date', 'days', 'storageClass'):
205 if t.get(k):
206 tr["%s%s" % (k[0].upper(), k[1:])] = t[k]
207 transitions.append(tr)
208 if transitions:
209 rr['Transitions'] = transitions
211 if r.get('abortIncompleteMultipartUpload'):
212 rr['AbortIncompleteMultipartUpload'] = {
213 'DaysAfterInitiation': r[
214 'abortIncompleteMultipartUpload']['daysAfterInitiation']}
215 if r.get('noncurrentVersionExpirationInDays'):
216 rr['NoncurrentVersionExpiration'] = {
217 'NoncurrentDays': r['noncurrentVersionExpirationInDays']}
219 nonc_transitions = []
220 for t in (r.get('noncurrentVersionTransitions') or ()):
221 nonc_transitions.append({
222 'NoncurrentDays': t['days'],
223 'StorageClass': t['storageClass']})
224 if nonc_transitions:
225 rr['NoncurrentVersionTransitions'] = nonc_transitions
227 rr['Status'] = r['status']
228 rr['ID'] = r['id']
229 if r.get('prefix'):
230 rr['Prefix'] = r['prefix']
231 if 'filter' not in r or not r['filter']:
232 continue
234 if r['filter']['predicate']:
235 rr['Filter'] = self.convertLifePredicate(r['filter']['predicate'])
237 resource['Lifecycle'] = {'Rules': rules}
239 def convertLifePredicate(self, p):
240 if p['type'] == 'LifecyclePrefixPredicate':
241 return {'Prefix': p['prefix']}
242 if p['type'] == 'LifecycleTagPredicate':
243 return {'Tags': [{'Key': p['tag']['key'], 'Value': p['tag']['value']}]}
244 if p['type'] == 'LifecycleAndOperator':
245 n = {}
246 for o in p['operands']:
247 ot = self.convertLifePredicate(o)
248 if 'Tags' in n and 'Tags' in ot:
249 n['Tags'].extend(ot['Tags'])
250 else:
251 n.update(ot)
252 return {'And': n}
254 raise ValueError("unknown predicate: %s" % p)
256 NotifyTypeMap = {
257 'QueueConfiguration': 'QueueConfigurations',
258 'LambdaConfiguration': 'LambdaFunctionConfigurations',
259 'CloudFunctionConfiguration': 'LambdaFunctionConfigurations',
260 'TopicConfiguration': 'TopicConfigurations'}
262 def handle_BucketNotificationConfiguration(self, resource, item_value):
263 d = {}
264 for nid, n in item_value['configurations'].items():
265 ninfo = {}
266 d.setdefault(self.NotifyTypeMap[n['type']], []).append(ninfo)
267 if n['type'] == 'QueueConfiguration':
268 ninfo['QueueArn'] = n['queueARN']
269 elif n['type'] == 'TopicConfiguration':
270 ninfo['TopicArn'] = n['topicARN']
271 elif n['type'] == 'LambdaConfiguration':
272 ninfo['LambdaFunctionArn'] = n['functionARN']
273 ninfo['Id'] = nid
274 ninfo['Events'] = n['events']
275 rules = []
276 if n['filter']:
277 for r in n['filter'].get('s3KeyFilter', {}).get('filterRules', []):
278 rules.append({'Name': r['name'], 'Value': r['value']})
279 if rules:
280 ninfo['Filter'] = {'Key': {'FilterRules': rules}}
281 resource['Notification'] = d
283 def handle_BucketReplicationConfiguration(self, resource, item_value):
284 d = {'Role': item_value['roleARN'], 'Rules': []}
285 for rid, r in item_value['rules'].items():
286 rule = {
287 'ID': rid,
288 'Status': r.get('status', ''),
289 'Prefix': r.get('prefix', ''),
290 'Destination': {
291 'Bucket': r['destinationConfig']['bucketARN']}
292 }
293 if 'Account' in r['destinationConfig']:
294 rule['Destination']['Account'] = r['destinationConfig']['Account']
295 if r['destinationConfig'].get('storageClass'):
296 rule['Destination']['StorageClass'] = r['destinationConfig']['storageClass']
297 d['Rules'].append(rule)
298 resource['Replication'] = {'ReplicationConfiguration': d}
300 def handle_BucketPolicy(self, resource, item_value):
301 resource['Policy'] = item_value.get('policyText')
303 def handle_BucketTaggingConfiguration(self, resource, item_value):
304 resource['Tags'] = [
305 {"Key": k, "Value": v} for k, v in item_value['tagSets'][0]['tags'].items()]
307 def handle_BucketVersioningConfiguration(self, resource, item_value):
308 # Config defaults versioning to 'Off' for a null value
309 if item_value['status'] not in ('Enabled', 'Suspended'):
310 resource['Versioning'] = {}
311 return
312 resource['Versioning'] = {'Status': item_value['status']}
313 # `isMfaDeleteEnabled` is an optional boolean property - the key may be absent,
314 # present with a null value, or present with a boolean value.
315 # Mirror the describe source by populating Versioning.MFADelete only in the
316 # boolean case.
317 mfa_delete = item_value.get('isMfaDeleteEnabled')
318 if mfa_delete is None:
319 return
320 resource['Versioning']['MFADelete'] = (
321 'Enabled' if mfa_delete else 'Disabled'
322 )
324 def handle_BucketWebsiteConfiguration(self, resource, item_value):
325 website = {}
326 if item_value['indexDocumentSuffix']:
327 website['IndexDocument'] = {
328 'Suffix': item_value['indexDocumentSuffix']}
329 if item_value['errorDocument']:
330 website['ErrorDocument'] = {
331 'Key': item_value['errorDocument']}
332 if item_value['redirectAllRequestsTo']:
333 website['RedirectAllRequestsTo'] = {
334 'HostName': item_value['redirectAllRequestsTo']['hostName'],
335 'Protocol': item_value['redirectAllRequestsTo']['protocol']}
336 for r in item_value['routingRules']:
337 redirect = {}
338 rule = {'Redirect': redirect}
339 website.setdefault('RoutingRules', []).append(rule)
340 if 'condition' in r:
341 cond = {}
342 for ck, rk in (
343 ('keyPrefixEquals', 'KeyPrefixEquals'),
344 ('httpErrorCodeReturnedEquals',
345 'HttpErrorCodeReturnedEquals')):
346 if r['condition'][ck]:
347 cond[rk] = r['condition'][ck]
348 rule['Condition'] = cond
349 for ck, rk in (
350 ('protocol', 'Protocol'),
351 ('hostName', 'HostName'),
352 ('replaceKeyPrefixWith', 'ReplaceKeyPrefixWith'),
353 ('replaceKeyWith', 'ReplaceKeyWith'),
354 ('httpRedirectCode', 'HttpRedirectCode')):
355 if r['redirect'][ck]:
356 redirect[rk] = r['redirect'][ck]
357 resource['Website'] = website
360@resources.register('s3')
361class S3(query.QueryResourceManager):
363 class resource_type(query.TypeInfo):
364 service = 's3'
365 arn_type = ''
366 enum_spec = ('list_buckets', 'Buckets[]', None)
367 # not used but we want some consistency on the metadata
368 detail_spec = ('get_bucket_location', 'Bucket', 'Name', 'LocationConstraint')
369 permissions_augment = (
370 "s3:GetBucketAcl",
371 "s3:GetBucketLocation",
372 "s3:GetBucketPolicy",
373 "s3:GetBucketTagging",
374 "s3:GetBucketVersioning",
375 "s3:GetBucketLogging",
376 "s3:GetBucketNotification",
377 "s3:GetBucketWebsite",
378 "s3:GetLifecycleConfiguration",
379 "s3:GetReplicationConfiguration"
380 )
381 name = id = 'Name'
382 date = 'CreationDate'
383 dimension = 'BucketName'
384 cfn_type = config_type = 'AWS::S3::Bucket'
386 filter_registry = filters
387 action_registry = actions
388 source_mapping = {
389 'describe': DescribeS3,
390 'config': ConfigS3
391 }
393 def get_arns(self, resources):
394 return ["arn:aws:s3:::{}".format(r["Name"]) for r in resources]
396 @classmethod
397 def get_permissions(cls):
398 perms = ["s3:ListAllMyBuckets"]
399 perms.extend([n[-1] for n in S3_AUGMENT_TABLE])
400 return perms
403S3_CONFIG_SUPPLEMENT_NULL_MAP = {
404 'BucketLoggingConfiguration': u'{"destinationBucketName":null,"logFilePrefix":null}',
405 'BucketPolicy': u'{"policyText":null}',
406 'BucketVersioningConfiguration': u'{"status":"Off","isMfaDeleteEnabled":null}',
407 'BucketAccelerateConfiguration': u'{"status":null}',
408 'BucketNotificationConfiguration': u'{"configurations":{}}',
409 'BucketLifecycleConfiguration': None,
410 'AccessControlList': None,
411 'BucketTaggingConfiguration': None,
412 'BucketWebsiteConfiguration': None,
413 'BucketReplicationConfiguration': None
414}
416S3_AUGMENT_TABLE = (
417 ('get_bucket_location', 'Location', {}, None, 's3:GetBucketLocation'),
418 ('get_bucket_tagging', 'Tags', [], 'TagSet', 's3:GetBucketTagging'),
419 ('get_bucket_policy', 'Policy', None, 'Policy', 's3:GetBucketPolicy'),
420 ('get_bucket_acl', 'Acl', None, None, 's3:GetBucketAcl'),
421 ('get_bucket_replication',
422 'Replication', None, None, 's3:GetReplicationConfiguration'),
423 ('get_bucket_versioning', 'Versioning', None, None, 's3:GetBucketVersioning'),
424 ('get_bucket_website', 'Website', None, None, 's3:GetBucketWebsite'),
425 ('get_bucket_logging', 'Logging', None, 'LoggingEnabled', 's3:GetBucketLogging'),
426 ('get_bucket_notification_configuration',
427 'Notification', None, None, 's3:GetBucketNotification'),
428 ('get_bucket_lifecycle_configuration',
429 'Lifecycle', None, None, 's3:GetLifecycleConfiguration'),
430 # ('get_bucket_cors', 'Cors'),
431)
434def assemble_bucket(item):
435 """Assemble a document representing all the config state around a bucket.
437 TODO: Refactor this, the logic here feels quite muddled.
438 """
439 factory, b = item
440 s = factory()
441 c = s.client('s3')
442 # Bucket Location, Current Client Location, Default Location
443 b_location = c_location = location = "us-east-1"
444 methods = list(S3_AUGMENT_TABLE)
445 for minfo in methods:
446 m, k, default, select = minfo[:4]
447 try:
448 method = getattr(c, m)
449 v = method(Bucket=b['Name'])
450 v.pop('ResponseMetadata')
451 if select is not None and select in v:
452 v = v[select]
453 except (ssl.SSLError, SSLError) as e:
454 # Proxy issues? i assume
455 log.warning("Bucket ssl error %s: %s %s",
456 b['Name'], b.get('Location', 'unknown'),
457 e)
458 continue
459 except ClientError as e:
460 code = e.response['Error']['Code']
461 if code.startswith("NoSuch") or "NotFound" in code:
462 v = default
463 elif code == 'PermanentRedirect':
464 s = factory()
465 c = bucket_client(s, b)
466 # Requeue with the correct region given location constraint
467 methods.append((m, k, default, select))
468 continue
469 else:
470 log.warning(
471 "Bucket:%s unable to invoke method:%s error:%s ",
472 b['Name'], m, e.response['Error']['Message'])
473 # For auth failures, we don't bail out, continue processing if we can.
474 # Note this can lead to missing data, but in general is cleaner than
475 # failing hard, due to the common use of locked down s3 bucket policies
476 # that may cause issues fetching information across a fleet of buckets.
478 # This does mean s3 policies depending on augments should check denied
479 # methods annotation, generally though lacking get access to an augment means
480 # they won't have write access either.
482 # For other error types we raise and bail policy execution.
483 if e.response['Error']['Code'] == 'AccessDenied':
484 b.setdefault('c7n:DeniedMethods', []).append(m)
485 continue
486 raise
487 # As soon as we learn location (which generally works)
488 if k == 'Location' and v is not None:
489 b_location = v.get('LocationConstraint')
490 # Location == region for all cases but EU
491 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlocation.html
492 if b_location is None:
493 b_location = "us-east-1"
494 elif b_location == 'EU':
495 b_location = "eu-west-1"
496 v['LocationConstraint'] = 'eu-west-1'
497 if v and v != c_location:
498 c = s.client('s3', region_name=b_location)
499 elif c_location != location:
500 c = s.client('s3', region_name=location)
501 b[k] = v
502 return b
505def bucket_client(session, b, kms=False):
506 region = get_region(b)
508 if kms:
509 # Need v4 signature for aws:kms crypto, else let the sdk decide
510 # based on region support.
511 config = Config(
512 signature_version='s3v4',
513 read_timeout=200, connect_timeout=120)
514 else:
515 config = Config(read_timeout=200, connect_timeout=120)
516 return session.client('s3', region_name=region, config=config)
519def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()):
520 for bucket in buckets:
521 client = bucket_client(local_session(session_factory), bucket)
522 # Bucket tags are set atomically for the set/document, we want
523 # to refetch against current to guard against any staleness in
524 # our cached representation across multiple policies or concurrent
525 # modifications.
527 if 'get_bucket_tagging' in bucket.get('c7n:DeniedMethods', []):
528 # avoid the additional API call if we already know that it's going
529 # to result in AccessDenied. The chances that the resource's perms
530 # would have changed between fetching the resource and acting on it
531 # here are pretty low-- so the check here should suffice.
532 log.warning(
533 "Unable to get new set of bucket tags needed to modify tags,"
534 "skipping tag action for bucket: %s" % bucket["Name"])
535 continue
537 try:
538 bucket['Tags'] = client.get_bucket_tagging(
539 Bucket=bucket['Name']).get('TagSet', [])
540 except ClientError as e:
541 if e.response['Error']['Code'] != 'NoSuchTagSet':
542 raise
543 bucket['Tags'] = []
545 new_tags = {t['Key']: t['Value'] for t in add_tags}
546 for t in bucket.get('Tags', ()):
547 if (t['Key'] not in new_tags and t['Key'] not in remove_tags):
548 new_tags[t['Key']] = t['Value']
549 tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()]
551 try:
552 client.put_bucket_tagging(
553 Bucket=bucket['Name'], Tagging={'TagSet': tag_set})
554 except ClientError as e:
555 log.exception(
556 'Exception tagging bucket %s: %s', bucket['Name'], e)
557 continue
560def get_region(b):
561 """Tries to get the bucket region from Location.LocationConstraint
563 Special cases:
564 LocationConstraint EU defaults to eu-west-1
565 LocationConstraint null defaults to us-east-1
567 Args:
568 b (object): A bucket object
570 Returns:
571 string: an aws region string
572 """
573 remap = {None: 'us-east-1', 'EU': 'eu-west-1'}
574 region = b.get('Location', {}).get('LocationConstraint')
575 return remap.get(region, region)
578@filters.register('metrics')
579class S3Metrics(MetricsFilter):
580 """S3 CW Metrics need special handling for attribute/dimension
581 mismatch, and additional required dimension.
582 """
584 def get_dimensions(self, resource):
585 dims = [{'Name': 'BucketName', 'Value': resource['Name']}]
586 if (self.data['name'] == 'NumberOfObjects' and
587 'dimensions' not in self.data):
588 dims.append(
589 {'Name': 'StorageType', 'Value': 'AllStorageTypes'})
590 return dims
593@filters.register('cross-account')
594class S3CrossAccountFilter(CrossAccountAccessFilter):
595 """Filters cross-account access to S3 buckets
597 :example:
599 .. code-block:: yaml
601 policies:
602 - name: s3-acl
603 resource: s3
604 region: us-east-1
605 filters:
606 - type: cross-account
607 """
608 permissions = ('s3:GetBucketPolicy',)
610 def get_accounts(self):
611 """add in elb access by default
613 ELB Accounts by region
614 https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-access-logs.html
616 Redshift Accounts by region
617 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#db-auditing-manage-log-files
618 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#rs-db-auditing-cloud-trail-rs-acct-ids
620 Cloudtrail Accounts by region
621 https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-supported-regions.html
622 """
623 accounts = super(S3CrossAccountFilter, self).get_accounts()
624 return accounts.union(
625 [
626 # ELB accounts
627 '127311923021', # us-east-1
628 '033677994240', # us-east-2
629 '027434742980', # us-west-1
630 '797873946194', # us-west-2
631 '098369216593', # af-south-1
632 '985666609251', # ca-central-1
633 '054676820928', # eu-central-1
634 '897822967062', # eu-north-1
635 '635631232127', # eu-south-1
636 '156460612806', # eu-west-1
637 '652711504416', # eu-west-2
638 '009996457667', # eu-west-3
639 '754344448648', # ap-east-1
640 '582318560864', # ap-northeast-1
641 '600734575887', # ap-northeast-2
642 '383597477331', # ap-northeast-3
643 '114774131450', # ap-southeast-1
644 '783225319266', # ap-southeast-2
645 '718504428378', # ap-south-1
646 '076674570225', # me-south-1
647 '507241528517', # sa-east-1
648 '048591011584', # us-gov-west-1 or gov-cloud-1
649 '190560391635', # us-gov-east-1
650 '638102146993', # cn-north-1
651 '037604701340', # cn-northwest-1
653 # Redshift audit logging
654 '193672423079', # us-east-1
655 '391106570357', # us-east-2
656 '262260360010', # us-west-1
657 '902366379725', # us-west-2
658 '365689465814', # af-south-1
659 '313564881002', # ap-east-1
660 '865932855811', # ap-south-1
661 '090321488786', # ap-northeast-3
662 '760740231472', # ap-northeast-2
663 '361669875840', # ap-southeast-1
664 '762762565011', # ap-southeast-2
665 '404641285394', # ap-northeast-1
666 '907379612154', # ca-central-1
667 '053454850223', # eu-central-1
668 '210876761215', # eu-west-1
669 '307160386991', # eu-west-2
670 '945612479654', # eu-south-1
671 '915173422425', # eu-west-3
672 '729911121831', # eu-north-1
673 '013126148197', # me-south-1
674 '075028567923', # sa-east-1
676 # Cloudtrail accounts (psa. folks should be using
677 # cloudtrail service in bucket policies)
678 '086441151436', # us-east-1
679 '475085895292', # us-west-2
680 '388731089494', # us-west-1
681 '113285607260', # us-west-2
682 '819402241893', # ca-central-1
683 '977081816279', # ap-south-1
684 '492519147666', # ap-northeast-2
685 '903692715234', # ap-southeast-1
686 '284668455005', # ap-southeast-2
687 '216624486486', # ap-northeast-1
688 '035351147821', # eu-central-1
689 '859597730677', # eu-west-1
690 '282025262664', # eu-west-2
691 '814480443879', # sa-east-1
692 ])
695@filters.register('global-grants')
696class GlobalGrantsFilter(Filter):
697 """Filters for all S3 buckets that have global-grants
699 *Note* by default this filter allows for read access
700 if the bucket has been configured as a website. This
701 can be disabled per the example below.
703 :example:
705 .. code-block:: yaml
707 policies:
708 - name: remove-global-grants
709 resource: s3
710 filters:
711 - type: global-grants
712 allow_website: false
713 actions:
714 - delete-global-grants
716 """
718 schema = type_schema(
719 'global-grants',
720 allow_website={'type': 'boolean'},
721 operator={'type': 'string', 'enum': ['or', 'and']},
722 permissions={
723 'type': 'array', 'items': {
724 'type': 'string', 'enum': [
725 'READ', 'WRITE', 'WRITE_ACP', 'READ_ACP', 'FULL_CONTROL']}})
727 GLOBAL_ALL = "http://acs.amazonaws.com/groups/global/AllUsers"
728 AUTH_ALL = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers"
730 def process(self, buckets, event=None):
731 with self.executor_factory(max_workers=5) as w:
732 results = w.map(self.process_bucket, buckets)
733 results = list(filter(None, list(results)))
734 return results
736 def process_bucket(self, b):
737 acl = b.get('Acl', {'Grants': []})
738 if not acl or not acl['Grants']:
739 return
741 results = []
742 allow_website = self.data.get('allow_website', True)
743 perms = self.data.get('permissions', [])
745 for grant in acl['Grants']:
746 if 'URI' not in grant.get("Grantee", {}):
747 continue
748 if grant['Grantee']['URI'] not in [self.AUTH_ALL, self.GLOBAL_ALL]:
749 continue
750 if allow_website and grant['Permission'] == 'READ' and b['Website']:
751 continue
752 if not perms or (perms and grant['Permission'] in perms):
753 results.append(grant['Permission'])
755 if results:
756 set_annotation(b, 'GlobalPermissions', results)
757 return b
760class BucketActionBase(BaseAction):
762 def get_permissions(self):
763 return self.permissions
765 def get_std_format_args(self, bucket):
766 return {
767 'account_id': self.manager.config.account_id,
768 'region': self.manager.config.region,
769 'bucket_name': bucket['Name'],
770 'bucket_region': get_region(bucket)
771 }
773 def process(self, buckets):
774 return self._process_with_futures(buckets)
776 def _process_with_futures(self, buckets, *args, max_workers=3, **kwargs):
777 errors = 0
778 results = []
779 with self.executor_factory(max_workers=max_workers) as w:
780 futures = {}
781 for b in buckets:
782 futures[w.submit(self.process_bucket, b, *args, **kwargs)] = b
783 for f in as_completed(futures):
784 if f.exception():
785 b = futures[f]
786 self.log.error(
787 'error modifying bucket: policy:%s action:%s bucket:%s error:%s',
788 self.manager.data.get('name'), self.name, b['Name'], f.exception()
789 )
790 errors += 1
791 continue
792 results += filter(None, [f.result()])
793 if errors:
794 self.log.error('encountered %d errors while processing %s', errors, self.name)
795 raise PolicyExecutionError('%d resources failed', errors)
796 return results
799class BucketFilterBase(Filter):
800 def get_std_format_args(self, bucket):
801 return {
802 'account_id': self.manager.config.account_id,
803 'region': self.manager.config.region,
804 'bucket_name': bucket['Name'],
805 'bucket_region': get_region(bucket)
806 }
809@S3.action_registry.register("post-finding")
810class BucketFinding(PostFinding):
812 resource_type = 'AwsS3Bucket'
814 def format_resource(self, r):
815 owner = r.get("Acl", {}).get("Owner", {})
816 resource = {
817 "Type": self.resource_type,
818 "Id": "arn:aws:s3:::{}".format(r["Name"]),
819 "Region": get_region(r),
820 "Tags": {t["Key"]: t["Value"] for t in r.get("Tags", [])},
821 "Details": {self.resource_type: {
822 "OwnerId": owner.get('ID', 'Unknown')}}
823 }
825 if "DisplayName" in owner:
826 resource["Details"]["AwsS3Bucket"]["OwnerName"] = owner['DisplayName']
828 return filter_empty(resource)
831@S3.filter_registry.register('has-statement')
832class HasStatementFilter(polstmt_filter.HasStatementFilter):
833 def get_std_format_args(self, bucket):
834 return {
835 'account_id': self.manager.config.account_id,
836 'region': self.manager.config.region,
837 'bucket_name': bucket['Name'],
838 'bucket_region': get_region(bucket)
839 }
842@S3.filter_registry.register('lock-configuration')
843class S3LockConfigurationFilter(ValueFilter):
844 """
845 Filter S3 buckets based on their object lock configurations
847 :example:
849 Get all buckets where lock configuration mode is COMPLIANCE
851 .. code-block:: yaml
853 policies:
854 - name: lock-configuration-compliance
855 resource: aws.s3
856 filters:
857 - type: lock-configuration
858 key: Rule.DefaultRetention.Mode
859 value: COMPLIANCE
861 """
862 schema = type_schema('lock-configuration', rinherit=ValueFilter.schema)
863 permissions = ('s3:GetBucketObjectLockConfiguration',)
864 annotate = True
865 annotation_key = 'c7n:ObjectLockConfiguration'
867 def _process_resource(self, client, resource):
868 try:
869 config = client.get_object_lock_configuration(
870 Bucket=resource['Name']
871 )['ObjectLockConfiguration']
872 except ClientError as e:
873 if e.response['Error']['Code'] == 'ObjectLockConfigurationNotFoundError':
874 config = None
875 else:
876 raise
877 resource[self.annotation_key] = config
879 def process(self, resources, event=None):
880 client = local_session(self.manager.session_factory).client('s3')
881 with self.executor_factory(max_workers=3) as w:
882 futures = []
883 for res in resources:
884 if self.annotation_key in res:
885 continue
886 futures.append(w.submit(self._process_resource, client, res))
887 for f in as_completed(futures):
888 exc = f.exception()
889 if exc:
890 self.log.error(
891 "Exception getting bucket lock configuration \n %s" % (
892 exc))
893 return super().process(resources, event)
895 def __call__(self, r):
896 return super().__call__(r.setdefault(self.annotation_key, None))
899ENCRYPTION_STATEMENT_GLOB = {
900 'Effect': 'Deny',
901 'Principal': '*',
902 'Action': 's3:PutObject',
903 "Condition": {
904 "StringNotEquals": {
905 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}}
908@filters.register('no-encryption-statement')
909class EncryptionEnabledFilter(Filter):
910 """Find buckets with missing encryption policy statements.
912 :example:
914 .. code-block:: yaml
916 policies:
917 - name: s3-bucket-not-encrypted
918 resource: s3
919 filters:
920 - type: no-encryption-statement
921 """
922 schema = type_schema(
923 'no-encryption-statement')
925 def get_permissions(self):
926 perms = self.manager.get_resource_manager('s3').get_permissions()
927 return perms
929 def process(self, buckets, event=None):
930 return list(filter(None, map(self.process_bucket, buckets)))
932 def process_bucket(self, b):
933 p = b.get('Policy')
934 if p is None:
935 return b
936 p = json.loads(p)
937 encryption_statement = dict(ENCRYPTION_STATEMENT_GLOB)
939 statements = p.get('Statement', [])
940 check = False
941 for s in list(statements):
942 if 'Sid' in s:
943 encryption_statement["Sid"] = s["Sid"]
944 if 'Resource' in s:
945 encryption_statement["Resource"] = s["Resource"]
946 if s == encryption_statement:
947 check = True
948 break
949 if check:
950 return None
951 else:
952 return b
955@filters.register('missing-statement')
956@filters.register('missing-policy-statement')
957class MissingPolicyStatementFilter(Filter):
958 """Find buckets missing a set of named policy statements.
960 :example:
962 .. code-block:: yaml
964 policies:
965 - name: s3-bucket-missing-statement
966 resource: s3
967 filters:
968 - type: missing-statement
969 statement_ids:
970 - RequiredEncryptedPutObject
971 """
973 schema = type_schema(
974 'missing-policy-statement',
975 aliases=('missing-statement',),
976 statement_ids={'type': 'array', 'items': {'type': 'string'}})
978 def __call__(self, b):
979 p = b.get('Policy')
980 if p is None:
981 return b
983 p = json.loads(p)
985 required = list(self.data.get('statement_ids', []))
986 statements = p.get('Statement', [])
987 for s in list(statements):
988 if s.get('Sid') in required:
989 required.remove(s['Sid'])
990 if not required:
991 return False
992 return True
995@filters.register('bucket-notification')
996class BucketNotificationFilter(ValueFilter):
997 """Filter based on bucket notification configuration.
999 :example:
1001 .. code-block:: yaml
1003 policies:
1004 - name: delete-incorrect-notification
1005 resource: s3
1006 filters:
1007 - type: bucket-notification
1008 kind: lambda
1009 key: Id
1010 value: "IncorrectLambda"
1011 op: eq
1012 actions:
1013 - type: delete-bucket-notification
1014 statement_ids: matched
1015 """
1017 schema = type_schema(
1018 'bucket-notification',
1019 required=['kind'],
1020 kind={'type': 'string', 'enum': ['lambda', 'sns', 'sqs']},
1021 rinherit=ValueFilter.schema)
1022 schema_alias = False
1023 annotation_key = 'c7n:MatchedNotificationConfigurationIds'
1025 permissions = ('s3:GetBucketNotification',)
1027 FIELDS = {
1028 'lambda': 'LambdaFunctionConfigurations',
1029 'sns': 'TopicConfigurations',
1030 'sqs': 'QueueConfigurations'
1031 }
1033 def process(self, buckets, event=None):
1034 return super(BucketNotificationFilter, self).process(buckets, event)
1036 def __call__(self, bucket):
1038 field = self.FIELDS[self.data['kind']]
1039 found = False
1040 for config in bucket.get('Notification', {}).get(field, []):
1041 if self.match(config):
1042 set_annotation(
1043 bucket,
1044 BucketNotificationFilter.annotation_key,
1045 config['Id'])
1046 found = True
1047 return found
1050@filters.register('bucket-logging')
1051class BucketLoggingFilter(BucketFilterBase):
1052 """Filter based on bucket logging configuration.
1054 :example:
1056 .. code-block:: yaml
1058 policies:
1059 - name: add-bucket-logging-if-missing
1060 resource: s3
1061 filters:
1062 - type: bucket-logging
1063 op: disabled
1064 actions:
1065 - type: toggle-logging
1066 target_bucket: "{account_id}-{region}-s3-logs"
1067 target_prefix: "{source_bucket_name}/"
1069 policies:
1070 - name: update-incorrect-or-missing-logging
1071 resource: s3
1072 filters:
1073 - type: bucket-logging
1074 op: not-equal
1075 target_bucket: "{account_id}-{region}-s3-logs"
1076 target_prefix: "{account}/{source_bucket_name}/"
1077 actions:
1078 - type: toggle-logging
1079 target_bucket: "{account_id}-{region}-s3-logs"
1080 target_prefix: "{account}/{source_bucket_name}/"
1081 """
1083 schema = type_schema(
1084 'bucket-logging',
1085 op={'enum': ['enabled', 'disabled', 'equal', 'not-equal', 'eq', 'ne']},
1086 required=['op'],
1087 target_bucket={'type': 'string'},
1088 target_prefix={'type': 'string'})
1089 schema_alias = False
1090 account_name = None
1092 permissions = ("s3:GetBucketLogging", "iam:ListAccountAliases")
1094 def process(self, buckets, event=None):
1095 return list(filter(None, map(self.process_bucket, buckets)))
1097 def process_bucket(self, b):
1098 if self.match_bucket(b):
1099 return b
1101 def match_bucket(self, b):
1102 op = self.data.get('op')
1104 logging = b.get('Logging', {})
1105 if op == 'disabled':
1106 return logging == {}
1107 elif op == 'enabled':
1108 return logging != {}
1110 if self.account_name is None:
1111 session = local_session(self.manager.session_factory)
1112 self.account_name = get_account_alias_from_sts(session)
1114 variables = self.get_std_format_args(b)
1115 variables.update({
1116 'account': self.account_name,
1117 'source_bucket_name': b['Name'],
1118 'source_bucket_region': get_region(b),
1119 'target_bucket_name': self.data.get('target_bucket'),
1120 'target_prefix': self.data.get('target_prefix'),
1121 })
1122 data = format_string_values(self.data, **variables)
1123 target_bucket = data.get('target_bucket')
1124 target_prefix = data.get('target_prefix', b['Name'] + '/')
1126 target_config = {
1127 "TargetBucket": target_bucket,
1128 "TargetPrefix": target_prefix
1129 } if target_bucket else {}
1131 if op in ('not-equal', 'ne'):
1132 return logging != target_config
1133 else:
1134 return logging == target_config
1137@actions.register('delete-bucket-notification')
1138class DeleteBucketNotification(BucketActionBase):
1139 """Action to delete S3 bucket notification configurations"""
1141 schema = type_schema(
1142 'delete-bucket-notification',
1143 required=['statement_ids'],
1144 statement_ids={'oneOf': [
1145 {'enum': ['matched']},
1146 {'type': 'array', 'items': {'type': 'string'}}]})
1148 permissions = ('s3:PutBucketNotification',)
1150 def process_bucket(self, bucket):
1151 n = bucket['Notification']
1152 if not n:
1153 return
1155 statement_ids = self.data.get('statement_ids')
1156 if statement_ids == 'matched':
1157 statement_ids = bucket.get(BucketNotificationFilter.annotation_key, ())
1158 if not statement_ids:
1159 return
1161 cfg = defaultdict(list)
1163 for t in BucketNotificationFilter.FIELDS.values():
1164 for c in n.get(t, []):
1165 if c['Id'] not in statement_ids:
1166 cfg[t].append(c)
1168 client = bucket_client(local_session(self.manager.session_factory), bucket)
1169 client.put_bucket_notification_configuration(
1170 Bucket=bucket['Name'],
1171 NotificationConfiguration=cfg)
1174@actions.register('no-op')
1175class NoOp(BucketActionBase):
1177 schema = type_schema('no-op')
1178 permissions = ('s3:ListAllMyBuckets',)
1180 def process(self, buckets):
1181 return None
1184@actions.register('set-statements')
1185class SetPolicyStatement(BucketActionBase):
1186 """Action to add or update policy statements to S3 buckets
1188 :example:
1190 .. code-block:: yaml
1192 policies:
1193 - name: force-s3-https
1194 resource: s3
1195 actions:
1196 - type: set-statements
1197 statements:
1198 - Sid: "DenyHttp"
1199 Effect: "Deny"
1200 Action: "s3:GetObject"
1201 Principal:
1202 AWS: "*"
1203 Resource: "arn:aws:s3:::{bucket_name}/*"
1204 Condition:
1205 Bool:
1206 "aws:SecureTransport": false
1207 """
1209 permissions = ('s3:PutBucketPolicy',)
1211 schema = type_schema(
1212 'set-statements',
1213 **{
1214 'statements': {
1215 'type': 'array',
1216 'items': {
1217 'type': 'object',
1218 'properties': {
1219 'Sid': {'type': 'string'},
1220 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']},
1221 'Principal': {'anyOf': [{'type': 'string'},
1222 {'type': 'object'}, {'type': 'array'}]},
1223 'NotPrincipal': {'anyOf': [{'type': 'object'}, {'type': 'array'}]},
1224 'Action': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1225 'NotAction': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1226 'Resource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1227 'NotResource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1228 'Condition': {'type': 'object'}
1229 },
1230 'required': ['Sid', 'Effect'],
1231 'oneOf': [
1232 {'required': ['Principal', 'Action', 'Resource']},
1233 {'required': ['NotPrincipal', 'Action', 'Resource']},
1234 {'required': ['Principal', 'NotAction', 'Resource']},
1235 {'required': ['NotPrincipal', 'NotAction', 'Resource']},
1236 {'required': ['Principal', 'Action', 'NotResource']},
1237 {'required': ['NotPrincipal', 'Action', 'NotResource']},
1238 {'required': ['Principal', 'NotAction', 'NotResource']},
1239 {'required': ['NotPrincipal', 'NotAction', 'NotResource']}
1240 ]
1241 }
1242 }
1243 }
1244 )
1246 def process_bucket(self, bucket):
1247 policy = bucket.get('Policy') or '{}'
1249 target_statements = format_string_values(
1250 copy.deepcopy({s['Sid']: s for s in self.data.get('statements', [])}),
1251 **self.get_std_format_args(bucket))
1253 policy = json.loads(policy)
1254 bucket_statements = policy.setdefault('Statement', [])
1256 for s in bucket_statements:
1257 if s.get('Sid') not in target_statements:
1258 continue
1259 if s == target_statements[s['Sid']]:
1260 target_statements.pop(s['Sid'])
1262 if not target_statements:
1263 return
1265 bucket_statements.extend(target_statements.values())
1266 policy = json.dumps(policy)
1268 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1269 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=policy)
1270 return {'Name': bucket['Name'], 'Policy': policy}
1273@actions.register('remove-statements')
1274class RemovePolicyStatement(RemovePolicyBase):
1275 """Action to remove policy statements from S3 buckets
1277 :example:
1279 .. code-block:: yaml
1281 policies:
1282 - name: s3-remove-encrypt-put
1283 resource: s3
1284 filters:
1285 - type: has-statement
1286 statement_ids:
1287 - RequireEncryptedPutObject
1288 actions:
1289 - type: remove-statements
1290 statement_ids:
1291 - RequiredEncryptedPutObject
1292 """
1294 permissions = ("s3:PutBucketPolicy", "s3:DeleteBucketPolicy")
1296 def process(self, buckets):
1297 with self.executor_factory(max_workers=3) as w:
1298 futures = {}
1299 results = []
1300 for b in buckets:
1301 futures[w.submit(self.process_bucket, b)] = b
1302 for f in as_completed(futures):
1303 if f.exception():
1304 b = futures[f]
1305 self.log.error('error modifying bucket:%s\n%s',
1306 b['Name'], f.exception())
1307 results += filter(None, [f.result()])
1308 return results
1310 def process_bucket(self, bucket):
1311 p = bucket.get('Policy')
1312 if p is None:
1313 return
1315 p = json.loads(p)
1317 statements, found = self.process_policy(
1318 p, bucket, CrossAccountAccessFilter.annotation_key)
1320 if not found:
1321 return
1323 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1325 if not statements:
1326 s3.delete_bucket_policy(Bucket=bucket['Name'])
1327 else:
1328 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=json.dumps(p))
1329 return {'Name': bucket['Name'], 'State': 'PolicyRemoved', 'Statements': found}
1332@actions.register('set-replication')
1333class SetBucketReplicationConfig(BucketActionBase):
1334 """Action to add or remove replication configuration statement from S3 buckets
1336 :example:
1338 .. code-block:: yaml
1340 policies:
1341 - name: s3-unapproved-account-replication
1342 resource: s3
1343 filters:
1344 - type: value
1345 key: Replication.ReplicationConfiguration.Rules[].Destination.Account
1346 value: present
1347 - type: value
1348 key: Replication.ReplicationConfiguration.Rules[].Destination.Account
1349 value_from:
1350 url: 's3:///path/to/file.json'
1351 format: json
1352 expr: "approved_accounts.*"
1353 op: ni
1354 actions:
1355 - type: set-replication
1356 state: enable
1357 """
1358 schema = type_schema(
1359 'set-replication',
1360 state={'type': 'string', 'enum': ['enable', 'disable', 'remove']})
1361 permissions = ("s3:GetReplicationConfiguration", "s3:PutReplicationConfiguration")
1363 def process(self, buckets):
1364 with self.executor_factory(max_workers=3) as w:
1365 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1366 errors = []
1367 for future in as_completed(futures):
1368 bucket = futures[future]
1369 try:
1370 future.result()
1371 except ClientError as e:
1372 errors.append("Message: %s Bucket: %s", e, bucket['Name'])
1373 if errors:
1374 raise Exception('\n'.join(map(str, errors)))
1376 def process_bucket(self, bucket):
1377 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1378 state = self.data.get('state')
1379 if state is not None:
1380 if state == 'remove':
1381 s3.delete_bucket_replication(Bucket=bucket['Name'])
1382 return {'Name': bucket['Name'], 'State': 'ReplicationConfigRemoved'}
1383 if state in ('enable', 'disable'):
1384 config = s3.get_bucket_replication(Bucket=bucket['Name'])
1385 for rule in config['ReplicationConfiguration']['Rules']:
1386 rule['Status'] = 'Enabled' if state == 'enable' else 'Disabled'
1387 s3.put_bucket_replication(
1388 Bucket=bucket['Name'],
1389 ReplicationConfiguration=config['ReplicationConfiguration']
1390 )
1391 return {'Name': bucket['Name'], 'State': 'ReplicationConfigUpdated'}
1394@filters.register('check-public-block')
1395class FilterPublicBlock(Filter):
1396 """Filter for s3 bucket public blocks
1398 If no filter paramaters are provided it checks to see if any are unset or False.
1400 If parameters are provided only the provided ones are checked.
1402 :example:
1404 .. code-block:: yaml
1406 policies:
1407 - name: CheckForPublicAclBlock-Off
1408 resource: s3
1409 region: us-east-1
1410 filters:
1411 - type: check-public-block
1412 BlockPublicAcls: true
1413 BlockPublicPolicy: true
1414 """
1416 schema = type_schema(
1417 'check-public-block',
1418 BlockPublicAcls={'type': 'boolean'},
1419 IgnorePublicAcls={'type': 'boolean'},
1420 BlockPublicPolicy={'type': 'boolean'},
1421 RestrictPublicBuckets={'type': 'boolean'})
1422 permissions = ("s3:GetBucketPublicAccessBlock",)
1423 keys = (
1424 'BlockPublicPolicy', 'BlockPublicAcls', 'IgnorePublicAcls', 'RestrictPublicBuckets')
1425 annotation_key = 'c7n:PublicAccessBlock'
1427 def process(self, buckets, event=None):
1428 results = []
1429 with self.executor_factory(max_workers=2) as w:
1430 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1431 for f in as_completed(futures):
1432 if f.result():
1433 results.append(futures[f])
1434 return results
1436 def process_bucket(self, bucket):
1437 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1438 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys}))
1439 if self.annotation_key not in bucket:
1440 try:
1441 config = s3.get_public_access_block(
1442 Bucket=bucket['Name'])['PublicAccessBlockConfiguration']
1443 except ClientError as e:
1444 error_code = e.response['Error']['Code']
1445 if error_code == 'NoSuchPublicAccessBlockConfiguration':
1446 pass
1447 elif error_code == 'AccessDenied':
1448 # Follow the same logic as `assemble_bucket` - log and continue on access
1449 # denied errors rather than halting a policy altogether
1450 method = 'GetPublicAccessBlock'
1451 log.warning(
1452 "Bucket:%s unable to invoke method:%s error:%s ",
1453 bucket['Name'], method, e.response['Error']['Message']
1454 )
1455 bucket.setdefault('c7n:DeniedMethods', []).append(method)
1456 else:
1457 raise
1458 bucket[self.annotation_key] = config
1459 return self.matches_filter(config)
1461 def matches_filter(self, config):
1462 key_set = [key for key in self.keys if key in self.data]
1463 if key_set:
1464 return all([self.data.get(key) is config[key] for key in key_set])
1465 else:
1466 return not all(config.values())
1469@actions.register('set-public-block')
1470class SetPublicBlock(BucketActionBase):
1471 """Action to update Public Access blocks on S3 buckets
1473 If no action parameters are provided all settings will be set to the `state`, which defaults
1475 If action parameters are provided, those will be set and other extant values preserved.
1477 :example:
1479 .. code-block:: yaml
1481 policies:
1482 - name: s3-public-block-enable-all
1483 resource: s3
1484 filters:
1485 - type: check-public-block
1486 actions:
1487 - type: set-public-block
1489 policies:
1490 - name: s3-public-block-disable-all
1491 resource: s3
1492 filters:
1493 - type: check-public-block
1494 actions:
1495 - type: set-public-block
1496 state: false
1498 policies:
1499 - name: s3-public-block-enable-some
1500 resource: s3
1501 filters:
1502 - or:
1503 - type: check-public-block
1504 BlockPublicAcls: false
1505 - type: check-public-block
1506 BlockPublicPolicy: false
1507 actions:
1508 - type: set-public-block
1509 BlockPublicAcls: true
1510 BlockPublicPolicy: true
1512 """
1514 schema = type_schema(
1515 'set-public-block',
1516 state={'type': 'boolean', 'default': True},
1517 BlockPublicAcls={'type': 'boolean'},
1518 IgnorePublicAcls={'type': 'boolean'},
1519 BlockPublicPolicy={'type': 'boolean'},
1520 RestrictPublicBuckets={'type': 'boolean'})
1521 permissions = ("s3:GetBucketPublicAccessBlock", "s3:PutBucketPublicAccessBlock")
1522 keys = FilterPublicBlock.keys
1523 annotation_key = FilterPublicBlock.annotation_key
1525 def process(self, buckets):
1526 with self.executor_factory(max_workers=3) as w:
1527 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1528 for future in as_completed(futures):
1529 future.result()
1531 def process_bucket(self, bucket):
1532 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1533 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys}))
1534 if self.annotation_key not in bucket:
1535 try:
1536 config = s3.get_public_access_block(
1537 Bucket=bucket['Name'])['PublicAccessBlockConfiguration']
1538 except ClientError as e:
1539 if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration':
1540 raise
1542 key_set = [key for key in self.keys if key in self.data]
1543 if key_set:
1544 for key in key_set:
1545 config[key] = self.data.get(key)
1546 else:
1547 for key in self.keys:
1548 config[key] = self.data.get('state', True)
1549 s3.put_public_access_block(
1550 Bucket=bucket['Name'], PublicAccessBlockConfiguration=config)
1553@actions.register('toggle-versioning')
1554class ToggleVersioning(BucketActionBase):
1555 """Action to enable/suspend versioning on a S3 bucket
1557 Note versioning can never be disabled only suspended.
1559 :example:
1561 .. code-block:: yaml
1563 policies:
1564 - name: s3-enable-versioning
1565 resource: s3
1566 filters:
1567 - or:
1568 - type: value
1569 key: Versioning.Status
1570 value: Suspended
1571 - type: value
1572 key: Versioning.Status
1573 value: absent
1574 actions:
1575 - type: toggle-versioning
1576 enabled: true
1577 """
1579 schema = type_schema(
1580 'toggle-versioning',
1581 enabled={'type': 'boolean'})
1582 permissions = ("s3:PutBucketVersioning",)
1584 def process_versioning(self, resource, state):
1585 client = bucket_client(
1586 local_session(self.manager.session_factory), resource)
1587 try:
1588 client.put_bucket_versioning(
1589 Bucket=resource['Name'],
1590 VersioningConfiguration={
1591 'Status': state})
1592 except ClientError as e:
1593 if e.response['Error']['Code'] != 'AccessDenied':
1594 log.error(
1595 "Unable to put bucket versioning on bucket %s: %s" % (resource['Name'], e))
1596 raise
1597 log.warning(
1598 "Access Denied Bucket:%s while put bucket versioning" % resource['Name'])
1600 # mfa delete enablement looks like it needs the serial and a current token.
1601 def process(self, resources):
1602 enabled = self.data.get('enabled', True)
1603 for r in resources:
1604 if 'Versioning' not in r or not r['Versioning']:
1605 r['Versioning'] = {'Status': 'Suspended'}
1606 if enabled and (
1607 r['Versioning']['Status'] == 'Suspended'):
1608 self.process_versioning(r, 'Enabled')
1609 if not enabled and r['Versioning']['Status'] == 'Enabled':
1610 self.process_versioning(r, 'Suspended')
1613@actions.register('toggle-logging')
1614class ToggleLogging(BucketActionBase):
1615 """Action to enable/disable logging on a S3 bucket.
1617 Target bucket ACL must allow for WRITE and READ_ACP Permissions
1618 Not specifying a target_prefix will default to the current bucket name.
1619 https://docs.aws.amazon.com/AmazonS3/latest/dev/enable-logging-programming.html
1621 :example:
1623 .. code-block:: yaml
1625 policies:
1626 - name: s3-enable-logging
1627 resource: s3
1628 filters:
1629 - "tag:Testing": present
1630 actions:
1631 - type: toggle-logging
1632 target_bucket: log-bucket
1633 target_prefix: logs123/
1635 policies:
1636 - name: s3-force-standard-logging
1637 resource: s3
1638 filters:
1639 - type: bucket-logging
1640 op: not-equal
1641 target_bucket: "{account_id}-{region}-s3-logs"
1642 target_prefix: "{account}/{source_bucket_name}/"
1643 actions:
1644 - type: toggle-logging
1645 target_bucket: "{account_id}-{region}-s3-logs"
1646 target_prefix: "{account}/{source_bucket_name}/"
1647 """
1648 schema = type_schema(
1649 'toggle-logging',
1650 enabled={'type': 'boolean'},
1651 target_bucket={'type': 'string'},
1652 target_prefix={'type': 'string'})
1654 permissions = ("s3:PutBucketLogging", "iam:ListAccountAliases")
1656 def validate(self):
1657 if self.data.get('enabled', True):
1658 if not self.data.get('target_bucket'):
1659 raise PolicyValidationError(
1660 "target_bucket must be specified on %s" % (
1661 self.manager.data,))
1662 return self
1664 def process(self, resources):
1665 session = local_session(self.manager.session_factory)
1666 kwargs = {
1667 "enabled": self.data.get('enabled', True),
1668 "session": session,
1669 "account_name": get_account_alias_from_sts(session),
1670 }
1672 return self._process_with_futures(resources, **kwargs)
1674 def process_bucket(self, r, enabled=None, session=None, account_name=None):
1675 client = bucket_client(session, r)
1676 is_logging = bool(r.get('Logging'))
1678 if enabled:
1679 variables = self.get_std_format_args(r)
1680 variables.update({
1681 'account': account_name,
1682 'source_bucket_name': r['Name'],
1683 'source_bucket_region': get_region(r),
1684 'target_bucket_name': self.data.get('target_bucket'),
1685 'target_prefix': self.data.get('target_prefix'),
1686 })
1687 data = format_string_values(self.data, **variables)
1688 config = {
1689 'TargetBucket': data.get('target_bucket'),
1690 'TargetPrefix': data.get('target_prefix', r['Name'] + '/')
1691 }
1692 if not is_logging or r.get('Logging') != config:
1693 client.put_bucket_logging(
1694 Bucket=r['Name'],
1695 BucketLoggingStatus={'LoggingEnabled': config}
1696 )
1697 r['Logging'] = config
1699 elif not enabled and is_logging:
1700 client.put_bucket_logging(
1701 Bucket=r['Name'], BucketLoggingStatus={})
1702 r['Logging'] = {}
1705@actions.register('attach-encrypt')
1706class AttachLambdaEncrypt(BucketActionBase):
1707 """Action attaches lambda encryption policy to S3 bucket
1708 supports attachment via lambda bucket notification or sns notification
1709 to invoke lambda. a special topic value of `default` will utilize an
1710 extant notification or create one matching the bucket name.
1712 :example:
1715 .. code-block:: yaml
1718 policies:
1719 - name: attach-lambda-encrypt
1720 resource: s3
1721 filters:
1722 - type: missing-policy-statement
1723 actions:
1724 - type: attach-encrypt
1725 role: arn:aws:iam::123456789012:role/my-role
1727 """
1728 schema = type_schema(
1729 'attach-encrypt',
1730 role={'type': 'string'},
1731 tags={'type': 'object'},
1732 topic={'type': 'string'})
1734 permissions = (
1735 "s3:PutBucketNotification", "s3:GetBucketNotification",
1736 # lambda manager uses quite a few perms to provision lambdas
1737 # and event sources, hard to disamgibuate punt for now.
1738 "lambda:*",
1739 )
1741 def __init__(self, data=None, manager=None):
1742 self.data = data or {}
1743 self.manager = manager
1745 def validate(self):
1746 if (not getattr(self.manager.config, 'dryrun', True) and
1747 not self.data.get('role', self.manager.config.assume_role)):
1748 raise PolicyValidationError(
1749 "attach-encrypt: role must be specified either "
1750 "via assume or in config on %s" % (self.manager.data,))
1752 return self
1754 def process(self, buckets):
1755 from c7n.mu import LambdaManager
1756 from c7n.ufuncs.s3crypt import get_function
1758 account_id = self.manager.config.account_id
1759 topic_arn = self.data.get('topic')
1761 func = get_function(
1762 None, self.data.get('role', self.manager.config.assume_role),
1763 account_id=account_id, tags=self.data.get('tags'))
1765 regions = {get_region(b) for b in buckets}
1767 # session managers by region
1768 region_sessions = {}
1769 for r in regions:
1770 region_sessions[r] = functools.partial(
1771 self.manager.session_factory, region=r)
1773 # Publish function to all of our buckets regions
1774 region_funcs = {}
1776 for r in regions:
1777 lambda_mgr = LambdaManager(region_sessions[r])
1778 lambda_mgr.publish(func)
1779 region_funcs[r] = func
1781 with self.executor_factory(max_workers=3) as w:
1782 results = []
1783 futures = []
1784 for b in buckets:
1785 region = get_region(b)
1786 futures.append(
1787 w.submit(
1788 self.process_bucket,
1789 region_funcs[region],
1790 b,
1791 topic_arn,
1792 account_id,
1793 region_sessions[region]
1794 ))
1795 for f in as_completed(futures):
1796 if f.exception():
1797 log.exception(
1798 "Error attaching lambda-encrypt %s" % (f.exception()))
1799 results.append(f.result())
1800 return list(filter(None, results))
1802 def process_bucket(self, func, bucket, topic, account_id, session_factory):
1803 from c7n.mu import BucketSNSNotification, BucketLambdaNotification
1804 if topic:
1805 topic = None if topic == 'default' else topic
1806 source = BucketSNSNotification(session_factory, bucket, topic)
1807 else:
1808 source = BucketLambdaNotification(
1809 {'account_s3': account_id}, session_factory, bucket)
1810 return source.add(func, None)
1813@actions.register('encryption-policy')
1814class EncryptionRequiredPolicy(BucketActionBase):
1815 """Action to apply an encryption policy to S3 buckets
1818 :example:
1820 .. code-block:: yaml
1822 policies:
1823 - name: s3-enforce-encryption
1824 resource: s3
1825 mode:
1826 type: cloudtrail
1827 events:
1828 - CreateBucket
1829 actions:
1830 - encryption-policy
1831 """
1833 permissions = ("s3:GetBucketPolicy", "s3:PutBucketPolicy")
1834 schema = type_schema('encryption-policy')
1836 def __init__(self, data=None, manager=None):
1837 self.data = data or {}
1838 self.manager = manager
1840 def process(self, buckets):
1841 with self.executor_factory(max_workers=3) as w:
1842 results = w.map(self.process_bucket, buckets)
1843 results = list(filter(None, list(results)))
1844 return results
1846 def process_bucket(self, b):
1847 p = b['Policy']
1848 if p is None:
1849 log.info("No policy found, creating new")
1850 p = {'Version': "2012-10-17", "Statement": []}
1851 else:
1852 p = json.loads(p)
1854 encryption_sid = "RequiredEncryptedPutObject"
1855 encryption_statement = {
1856 'Sid': encryption_sid,
1857 'Effect': 'Deny',
1858 'Principal': '*',
1859 'Action': 's3:PutObject',
1860 "Resource": "arn:aws:s3:::%s/*" % b['Name'],
1861 "Condition": {
1862 # AWS Managed Keys or KMS keys, note policy language
1863 # does not support custom kms (todo add issue)
1864 "StringNotEquals": {
1865 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}}
1867 statements = p.get('Statement', [])
1868 for s in list(statements):
1869 if s.get('Sid', '') == encryption_sid:
1870 log.debug("Bucket:%s Found extant encrypt policy", b['Name'])
1871 if s != encryption_statement:
1872 log.info(
1873 "Bucket:%s updating extant encrypt policy", b['Name'])
1874 statements.remove(s)
1875 else:
1876 return
1878 session = self.manager.session_factory()
1879 s3 = bucket_client(session, b)
1880 statements.append(encryption_statement)
1881 p['Statement'] = statements
1882 log.info('Bucket:%s attached encryption policy' % b['Name'])
1884 try:
1885 s3.put_bucket_policy(
1886 Bucket=b['Name'],
1887 Policy=json.dumps(p))
1888 except ClientError as e:
1889 if e.response['Error']['Code'] == 'NoSuchBucket':
1890 return
1891 self.log.exception(
1892 "Error on bucket:%s putting policy\n%s error:%s",
1893 b['Name'],
1894 json.dumps(statements, indent=2), e)
1895 raise
1896 return {'Name': b['Name'], 'State': 'PolicyAttached'}
1899class BucketScanLog:
1900 """Offload remediated key ids to a disk file in batches
1902 A bucket keyspace is effectively infinite, we need to store partial
1903 results out of memory, this class provides for a json log on disk
1904 with partial write support.
1906 json output format:
1907 - [list_of_serialized_keys],
1908 - [] # Empty list of keys at end when we close the buffer
1910 """
1912 def __init__(self, log_dir, name):
1913 self.log_dir = log_dir
1914 self.name = name
1915 self.fh = None
1916 self.count = 0
1918 @property
1919 def path(self):
1920 return os.path.join(self.log_dir, "%s.json" % self.name)
1922 def __enter__(self):
1923 # Don't require output directories
1924 if self.log_dir is None:
1925 return
1927 self.fh = open(self.path, 'w')
1928 self.fh.write("[\n")
1929 return self
1931 def __exit__(self, exc_type=None, exc_value=None, exc_frame=None):
1932 if self.fh is None:
1933 return
1934 # we need an empty marker list at end to avoid trailing commas
1935 self.fh.write("[]")
1936 # and close the surrounding list
1937 self.fh.write("\n]")
1938 self.fh.close()
1939 if not self.count:
1940 os.remove(self.fh.name)
1941 self.fh = None
1942 return False
1944 def add(self, keys):
1945 self.count += len(keys)
1946 if self.fh is None:
1947 return
1948 self.fh.write(dumps(keys))
1949 self.fh.write(",\n")
1952class ScanBucket(BucketActionBase):
1954 permissions = ("s3:ListBucket",)
1956 bucket_ops = {
1957 'standard': {
1958 'iterator': 'list_objects',
1959 'contents_key': ['Contents'],
1960 'key_processor': 'process_key'
1961 },
1962 'versioned': {
1963 'iterator': 'list_object_versions',
1964 'contents_key': ['Versions'],
1965 'key_processor': 'process_version'
1966 }
1967 }
1969 def __init__(self, data, manager=None):
1970 super(ScanBucket, self).__init__(data, manager)
1971 self.denied_buckets = set()
1973 def get_bucket_style(self, b):
1974 return (
1975 b.get('Versioning', {'Status': ''}).get('Status') in (
1976 'Enabled', 'Suspended') and 'versioned' or 'standard')
1978 def get_bucket_op(self, b, op_name):
1979 bucket_style = self.get_bucket_style(b)
1980 op = self.bucket_ops[bucket_style][op_name]
1981 if op_name == 'key_processor':
1982 return getattr(self, op)
1983 return op
1985 def get_keys(self, b, key_set):
1986 content_keys = self.get_bucket_op(b, 'contents_key')
1987 keys = []
1988 for ck in content_keys:
1989 keys.extend(key_set.get(ck, []))
1990 return keys
1992 def process(self, buckets):
1993 results = self._process_with_futures(self.process_bucket, buckets)
1994 self.write_denied_buckets_file()
1995 return results
1997 def _process_with_futures(self, helper, buckets, max_workers=3):
1998 results = []
1999 with self.executor_factory(max_workers) as w:
2000 futures = {}
2001 for b in buckets:
2002 futures[w.submit(helper, b)] = b
2003 for f in as_completed(futures):
2004 if f.exception():
2005 b = futures[f]
2006 self.log.error(
2007 "Error on bucket:%s region:%s policy:%s error: %s",
2008 b['Name'], b.get('Location', 'unknown'),
2009 self.manager.data.get('name'), f.exception())
2010 self.denied_buckets.add(b['Name'])
2011 continue
2012 result = f.result()
2013 if result:
2014 results.append(result)
2015 return results
2017 def write_denied_buckets_file(self):
2018 if (self.denied_buckets and
2019 self.manager.ctx.log_dir and
2020 not isinstance(self.manager.ctx.output, NullBlobOutput)):
2021 with open(
2022 os.path.join(
2023 self.manager.ctx.log_dir, 'denied.json'), 'w') as fh:
2024 json.dump(list(self.denied_buckets), fh, indent=2)
2025 self.denied_buckets = set()
2027 def process_bucket(self, b):
2028 log.info(
2029 "Scanning bucket:%s visitor:%s style:%s" % (
2030 b['Name'], self.__class__.__name__, self.get_bucket_style(b)))
2032 s = self.manager.session_factory()
2033 s3 = bucket_client(s, b)
2035 # The bulk of _process_bucket function executes inline in
2036 # calling thread/worker context, neither paginator nor
2037 # bucketscan log should be used across worker boundary.
2038 p = s3.get_paginator(
2039 self.get_bucket_op(b, 'iterator')).paginate(Bucket=b['Name'])
2041 with BucketScanLog(self.manager.ctx.log_dir, b['Name']) as key_log:
2042 with self.executor_factory(max_workers=10) as w:
2043 try:
2044 return self._process_bucket(b, p, key_log, w)
2045 except ClientError as e:
2046 if e.response['Error']['Code'] == 'NoSuchBucket':
2047 log.warning(
2048 "Bucket:%s removed while scanning" % b['Name'])
2049 return
2050 if e.response['Error']['Code'] == 'AccessDenied':
2051 log.warning(
2052 "Access Denied Bucket:%s while scanning" % b['Name'])
2053 self.denied_buckets.add(b['Name'])
2054 return
2055 log.exception(
2056 "Error processing bucket:%s paginator:%s" % (
2057 b['Name'], p))
2059 __call__ = process_bucket
2061 def _process_bucket(self, b, p, key_log, w):
2062 count = 0
2064 for key_set in p:
2065 keys = self.get_keys(b, key_set)
2066 count += len(keys)
2067 futures = []
2069 for batch in chunks(keys, size=100):
2070 if not batch:
2071 continue
2072 futures.append(w.submit(self.process_chunk, batch, b))
2074 for f in as_completed(futures):
2075 if f.exception():
2076 log.exception("Exception Processing bucket:%s key batch %s" % (
2077 b['Name'], f.exception()))
2078 continue
2079 r = f.result()
2080 if r:
2081 key_log.add(r)
2083 # Log completion at info level, progress at debug level
2084 if key_set['IsTruncated']:
2085 log.debug('Scan progress bucket:%s keys:%d remediated:%d ...',
2086 b['Name'], count, key_log.count)
2087 else:
2088 log.info('Scan Complete bucket:%s keys:%d remediated:%d',
2089 b['Name'], count, key_log.count)
2091 b['KeyScanCount'] = count
2092 b['KeyRemediated'] = key_log.count
2093 return {
2094 'Bucket': b['Name'], 'Remediated': key_log.count, 'Count': count}
2096 def process_chunk(self, batch, bucket):
2097 raise NotImplementedError()
2099 def process_key(self, s3, key, bucket_name, info=None):
2100 raise NotImplementedError()
2102 def process_version(self, s3, bucket, key):
2103 raise NotImplementedError()
2106@actions.register('encrypt-keys')
2107class EncryptExtantKeys(ScanBucket):
2108 """Action to encrypt unencrypted S3 objects
2110 :example:
2112 .. code-block:: yaml
2114 policies:
2115 - name: s3-encrypt-objects
2116 resource: s3
2117 actions:
2118 - type: encrypt-keys
2119 crypto: aws:kms
2120 key-id: 9c3983be-c6cf-11e6-9d9d-cec0c932ce01
2121 """
2123 permissions = (
2124 "s3:GetObject",
2125 "s3:PutObject",
2126 "s3:DeleteObjectVersion",
2127 "s3:RestoreObject",
2128 ) + ScanBucket.permissions
2130 schema = {
2131 'type': 'object',
2132 'additionalProperties': False,
2133 'properties': {
2134 'type': {'enum': ['encrypt-keys']},
2135 'report-only': {'type': 'boolean'},
2136 'glacier': {'type': 'boolean'},
2137 'large': {'type': 'boolean'},
2138 'crypto': {'enum': ['AES256', 'aws:kms']},
2139 'key-id': {'type': 'string'}
2140 },
2141 'dependencies': {
2142 'key-id': {
2143 'properties': {
2144 'crypto': {'pattern': 'aws:kms'}
2145 },
2146 'required': ['crypto']
2147 }
2148 }
2149 }
2151 metrics = [
2152 ('Total Keys', {'Scope': 'Account'}),
2153 ('Unencrypted', {'Scope': 'Account'})]
2155 def __init__(self, data, manager=None):
2156 super(EncryptExtantKeys, self).__init__(data, manager)
2157 self.kms_id = self.data.get('key-id')
2159 def get_permissions(self):
2160 perms = ("s3:GetObject", "s3:GetObjectVersion")
2161 if self.data.get('report-only'):
2162 perms += ('s3:DeleteObject', 's3:DeleteObjectVersion',
2163 's3:PutObject',
2164 's3:AbortMultipartUpload',
2165 's3:ListBucket',
2166 's3:ListBucketVersions')
2167 return perms
2169 def process(self, buckets):
2171 t = time.time()
2172 results = super(EncryptExtantKeys, self).process(buckets)
2173 run_time = time.time() - t
2174 remediated_count = object_count = 0
2176 for r in results:
2177 object_count += r['Count']
2178 remediated_count += r['Remediated']
2179 self.manager.ctx.metrics.put_metric(
2180 "Unencrypted", r['Remediated'], "Count", Scope=r['Bucket'],
2181 buffer=True)
2183 self.manager.ctx.metrics.put_metric(
2184 "Unencrypted", remediated_count, "Count", Scope="Account",
2185 buffer=True
2186 )
2187 self.manager.ctx.metrics.put_metric(
2188 "Total Keys", object_count, "Count", Scope="Account",
2189 buffer=True
2190 )
2191 self.manager.ctx.metrics.flush()
2193 log.info(
2194 ("EncryptExtant Complete keys:%d "
2195 "remediated:%d rate:%0.2f/s time:%0.2fs"),
2196 object_count,
2197 remediated_count,
2198 float(object_count) / run_time if run_time else 0,
2199 run_time)
2200 return results
2202 def process_chunk(self, batch, bucket):
2203 crypto_method = self.data.get('crypto', 'AES256')
2204 s3 = bucket_client(
2205 local_session(self.manager.session_factory), bucket,
2206 kms=(crypto_method == 'aws:kms'))
2207 b = bucket['Name']
2208 results = []
2209 key_processor = self.get_bucket_op(bucket, 'key_processor')
2210 for key in batch:
2211 r = key_processor(s3, key, b)
2212 if r:
2213 results.append(r)
2214 return results
2216 def process_key(self, s3, key, bucket_name, info=None):
2217 k = key['Key']
2218 if info is None:
2219 info = s3.head_object(Bucket=bucket_name, Key=k)
2221 # If the data is already encrypted with AES256 and this request is also
2222 # for AES256 then we don't need to do anything
2223 if info.get('ServerSideEncryption') == 'AES256' and not self.kms_id:
2224 return False
2226 if info.get('ServerSideEncryption') == 'aws:kms':
2227 # If we're not looking for a specific key any key will do.
2228 if not self.kms_id:
2229 return False
2230 # If we're configured to use a specific key and the key matches
2231 # note this is not a strict equality match.
2232 if self.kms_id in info.get('SSEKMSKeyId', ''):
2233 return False
2235 if self.data.get('report-only'):
2236 return k
2238 storage_class = info.get('StorageClass', 'STANDARD')
2240 if storage_class == 'GLACIER':
2241 if not self.data.get('glacier'):
2242 return False
2243 if 'Restore' not in info:
2244 # This takes multiple hours, we let the next c7n
2245 # run take care of followups.
2246 s3.restore_object(
2247 Bucket=bucket_name,
2248 Key=k,
2249 RestoreRequest={'Days': 30})
2250 return False
2251 elif not restore_complete(info['Restore']):
2252 return False
2254 storage_class = 'STANDARD'
2256 crypto_method = self.data.get('crypto', 'AES256')
2257 key_id = self.data.get('key-id')
2258 # Note on copy we lose individual object acl grants
2259 params = {'Bucket': bucket_name,
2260 'Key': k,
2261 'CopySource': "/%s/%s" % (bucket_name, k),
2262 'MetadataDirective': 'COPY',
2263 'StorageClass': storage_class,
2264 'ServerSideEncryption': crypto_method}
2266 if key_id and crypto_method == 'aws:kms':
2267 params['SSEKMSKeyId'] = key_id
2269 if info['ContentLength'] > MAX_COPY_SIZE and self.data.get(
2270 'large', True):
2271 return self.process_large_file(s3, bucket_name, key, info, params)
2273 s3.copy_object(**params)
2274 return k
2276 def process_version(self, s3, key, bucket_name):
2277 info = s3.head_object(
2278 Bucket=bucket_name,
2279 Key=key['Key'],
2280 VersionId=key['VersionId'])
2282 if 'ServerSideEncryption' in info:
2283 return False
2285 if self.data.get('report-only'):
2286 return key['Key'], key['VersionId']
2288 if key['IsLatest']:
2289 r = self.process_key(s3, key, bucket_name, info)
2290 # Glacier request processing, wait till we have the restored object
2291 if not r:
2292 return r
2293 s3.delete_object(
2294 Bucket=bucket_name,
2295 Key=key['Key'],
2296 VersionId=key['VersionId'])
2297 return key['Key'], key['VersionId']
2299 def process_large_file(self, s3, bucket_name, key, info, params):
2300 """For objects over 5gb, use multipart upload to copy"""
2301 part_size = MAX_COPY_SIZE - (1024 ** 2)
2302 num_parts = int(math.ceil(info['ContentLength'] / part_size))
2303 source = params.pop('CopySource')
2305 params.pop('MetadataDirective')
2306 if 'Metadata' in info:
2307 params['Metadata'] = info['Metadata']
2309 upload_id = s3.create_multipart_upload(**params)['UploadId']
2311 params = {'Bucket': bucket_name,
2312 'Key': key['Key'],
2313 'UploadId': upload_id,
2314 'CopySource': source,
2315 'CopySourceIfMatch': info['ETag']}
2317 def upload_part(part_num):
2318 part_params = dict(params)
2319 part_params['CopySourceRange'] = "bytes=%d-%d" % (
2320 part_size * (part_num - 1),
2321 min(part_size * part_num - 1, info['ContentLength'] - 1))
2322 part_params['PartNumber'] = part_num
2323 response = s3.upload_part_copy(**part_params)
2324 return {'ETag': response['CopyPartResult']['ETag'],
2325 'PartNumber': part_num}
2327 try:
2328 with self.executor_factory(max_workers=2) as w:
2329 parts = list(w.map(upload_part, range(1, num_parts + 1)))
2330 except Exception:
2331 log.warning(
2332 "Error during large key copy bucket: %s key: %s, "
2333 "aborting upload", bucket_name, key, exc_info=True)
2334 s3.abort_multipart_upload(
2335 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id)
2336 raise
2337 s3.complete_multipart_upload(
2338 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id,
2339 MultipartUpload={'Parts': parts})
2340 return key['Key']
2343def restore_complete(restore):
2344 if ',' in restore:
2345 ongoing, _ = restore.split(',', 1)
2346 else:
2347 ongoing = restore
2348 return 'false' in ongoing
2351@filters.register('is-log-target')
2352class LogTarget(Filter):
2353 """Filter and return buckets are log destinations.
2355 Not suitable for use in lambda on large accounts, This is a api
2356 heavy process to detect scan all possible log sources.
2358 Sources:
2359 - elb (Access Log)
2360 - s3 (Access Log)
2361 - cfn (Template writes)
2362 - cloudtrail
2364 :example:
2366 .. code-block:: yaml
2368 policies:
2369 - name: s3-log-bucket
2370 resource: s3
2371 filters:
2372 - type: is-log-target
2373 """
2375 schema = type_schema(
2376 'is-log-target',
2377 services={'type': 'array', 'items': {'enum': [
2378 's3', 'elb', 'cloudtrail']}},
2379 self={'type': 'boolean'},
2380 value={'type': 'boolean'})
2382 def get_permissions(self):
2383 perms = self.manager.get_resource_manager('elb').get_permissions()
2384 perms += ('elasticloadbalancing:DescribeLoadBalancerAttributes',)
2385 return perms
2387 def process(self, buckets, event=None):
2388 log_buckets = set()
2389 count = 0
2391 services = self.data.get('services', ['elb', 's3', 'cloudtrail'])
2392 self_log = self.data.get('self', False)
2394 if 'elb' in services and not self_log:
2395 for bucket, _ in self.get_elb_bucket_locations():
2396 log_buckets.add(bucket)
2397 count += 1
2398 self.log.debug("Found %d elb log targets" % count)
2400 if 's3' in services:
2401 count = 0
2402 for bucket, _ in self.get_s3_bucket_locations(buckets, self_log):
2403 count += 1
2404 log_buckets.add(bucket)
2405 self.log.debug('Found %d s3 log targets' % count)
2407 if 'cloudtrail' in services and not self_log:
2408 for bucket, _ in self.get_cloud_trail_locations(buckets):
2409 log_buckets.add(bucket)
2411 self.log.info("Found %d log targets for %d buckets" % (
2412 len(log_buckets), len(buckets)))
2413 if self.data.get('value', True):
2414 return [b for b in buckets if b['Name'] in log_buckets]
2415 else:
2416 return [b for b in buckets if b['Name'] not in log_buckets]
2418 @staticmethod
2419 def get_s3_bucket_locations(buckets, self_log=False):
2420 """return (bucket_name, prefix) for all s3 logging targets"""
2421 for b in buckets:
2422 if b.get('Logging'):
2423 if self_log:
2424 if b['Name'] != b['Logging']['TargetBucket']:
2425 continue
2426 yield (b['Logging']['TargetBucket'],
2427 b['Logging']['TargetPrefix'])
2428 if not self_log and b['Name'].startswith('cf-templates-'):
2429 yield (b['Name'], '')
2431 def get_cloud_trail_locations(self, buckets):
2432 session = local_session(self.manager.session_factory)
2433 client = session.client('cloudtrail')
2434 names = {b['Name'] for b in buckets}
2435 for t in client.describe_trails().get('trailList', ()):
2436 if t.get('S3BucketName') in names:
2437 yield (t['S3BucketName'], t.get('S3KeyPrefix', ''))
2439 def get_elb_bucket_locations(self):
2440 elbs = self.manager.get_resource_manager('elb').resources()
2441 get_elb_attrs = functools.partial(
2442 _query_elb_attrs, self.manager.session_factory)
2444 with self.executor_factory(max_workers=2) as w:
2445 futures = []
2446 for elb_set in chunks(elbs, 100):
2447 futures.append(w.submit(get_elb_attrs, elb_set))
2448 for f in as_completed(futures):
2449 if f.exception():
2450 log.error("Error while scanning elb log targets: %s" % (
2451 f.exception()))
2452 continue
2453 for tgt in f.result():
2454 yield tgt
2457def _query_elb_attrs(session_factory, elb_set):
2458 session = local_session(session_factory)
2459 client = session.client('elb')
2460 log_targets = []
2461 for e in elb_set:
2462 try:
2463 attrs = client.describe_load_balancer_attributes(
2464 LoadBalancerName=e['LoadBalancerName'])[
2465 'LoadBalancerAttributes']
2466 if 'AccessLog' in attrs and attrs['AccessLog']['Enabled']:
2467 log_targets.append((
2468 attrs['AccessLog']['S3BucketName'],
2469 attrs['AccessLog']['S3BucketPrefix']))
2470 except Exception as err:
2471 log.warning(
2472 "Could not retrieve load balancer %s: %s" % (
2473 e['LoadBalancerName'], err))
2474 return log_targets
2477@actions.register('remove-website-hosting')
2478class RemoveWebsiteHosting(BucketActionBase):
2479 """Action that removes website hosting configuration."""
2481 schema = type_schema('remove-website-hosting')
2483 permissions = ('s3:DeleteBucketWebsite',)
2485 def process(self, buckets):
2486 session = local_session(self.manager.session_factory)
2487 for bucket in buckets:
2488 client = bucket_client(session, bucket)
2489 client.delete_bucket_website(Bucket=bucket['Name'])
2492@actions.register('delete-global-grants')
2493class DeleteGlobalGrants(BucketActionBase):
2494 """Deletes global grants associated to a S3 bucket
2496 :example:
2498 .. code-block:: yaml
2500 policies:
2501 - name: s3-delete-global-grants
2502 resource: s3
2503 filters:
2504 - type: global-grants
2505 actions:
2506 - delete-global-grants
2507 """
2509 schema = type_schema(
2510 'delete-global-grants',
2511 grantees={'type': 'array', 'items': {'type': 'string'}})
2513 permissions = ('s3:PutBucketAcl',)
2515 def process(self, buckets):
2516 with self.executor_factory(max_workers=5) as w:
2517 return list(filter(None, list(w.map(self.process_bucket, buckets))))
2519 def process_bucket(self, b):
2520 grantees = self.data.get(
2521 'grantees', [
2522 GlobalGrantsFilter.AUTH_ALL, GlobalGrantsFilter.GLOBAL_ALL])
2524 log.info(b)
2526 acl = b.get('Acl', {'Grants': []})
2527 if not acl or not acl['Grants']:
2528 return
2529 new_grants = []
2530 for grant in acl['Grants']:
2531 grantee = grant.get('Grantee', {})
2532 if not grantee:
2533 continue
2534 # Yuck, 'get_bucket_acl' doesn't return the grantee type.
2535 if 'URI' in grantee:
2536 grantee['Type'] = 'Group'
2537 else:
2538 grantee['Type'] = 'CanonicalUser'
2539 if ('URI' in grantee and
2540 grantee['URI'] in grantees and not
2541 (grant['Permission'] == 'READ' and b['Website'])):
2542 # Remove this grantee.
2543 pass
2544 else:
2545 new_grants.append(grant)
2547 log.info({'Owner': acl['Owner'], 'Grants': new_grants})
2549 c = bucket_client(self.manager.session_factory(), b)
2550 try:
2551 c.put_bucket_acl(
2552 Bucket=b['Name'],
2553 AccessControlPolicy={
2554 'Owner': acl['Owner'], 'Grants': new_grants})
2555 except ClientError as e:
2556 if e.response['Error']['Code'] == 'NoSuchBucket':
2557 return
2558 return b
2561@actions.register('tag')
2562class BucketTag(Tag):
2563 """Action to create tags on a S3 bucket
2565 :example:
2567 .. code-block:: yaml
2569 policies:
2570 - name: s3-tag-region
2571 resource: s3
2572 region: us-east-1
2573 filters:
2574 - "tag:RegionName": absent
2575 actions:
2576 - type: tag
2577 key: RegionName
2578 value: us-east-1
2579 """
2581 def process_resource_set(self, client, resource_set, tags):
2582 modify_bucket_tags(self.manager.session_factory, resource_set, tags)
2585@actions.register('mark-for-op')
2586class MarkBucketForOp(TagDelayedAction):
2587 """Action schedules custodian to perform an action at a certain date
2589 :example:
2591 .. code-block:: yaml
2593 policies:
2594 - name: s3-encrypt
2595 resource: s3
2596 filters:
2597 - type: missing-statement
2598 statement_ids:
2599 - RequiredEncryptedPutObject
2600 actions:
2601 - type: mark-for-op
2602 op: attach-encrypt
2603 days: 7
2604 """
2606 schema = type_schema(
2607 'mark-for-op', rinherit=TagDelayedAction.schema)
2610@actions.register('unmark')
2611@actions.register('remove-tag')
2612class RemoveBucketTag(RemoveTag):
2613 """Removes tag/tags from a S3 object
2615 :example:
2617 .. code-block:: yaml
2619 policies:
2620 - name: s3-remove-owner-tag
2621 resource: s3
2622 filters:
2623 - "tag:BucketOwner": present
2624 actions:
2625 - type: remove-tag
2626 tags: ['BucketOwner']
2627 """
2629 def process_resource_set(self, client, resource_set, tags):
2630 modify_bucket_tags(
2631 self.manager.session_factory, resource_set, remove_tags=tags)
2634@filters.register('data-events')
2635class DataEvents(Filter):
2636 """Find buckets for which CloudTrail is logging data events.
2638 Note that this filter only examines trails that are defined in the
2639 current account.
2640 """
2642 schema = type_schema('data-events', state={'enum': ['present', 'absent']})
2643 permissions = (
2644 'cloudtrail:DescribeTrails',
2645 'cloudtrail:GetEventSelectors')
2647 def get_event_buckets(self, session, trails):
2648 """Return a mapping of bucket name to cloudtrail.
2650 For wildcard trails the bucket name is ''.
2651 """
2652 regions = {t.get('HomeRegion') for t in trails}
2653 clients = {}
2654 for region in regions:
2655 clients[region] = session.client('cloudtrail', region_name=region)
2657 event_buckets = {}
2658 for t in trails:
2659 for events in clients[t.get('HomeRegion')].get_event_selectors(
2660 TrailName=t['Name']).get('EventSelectors', ()):
2661 if 'DataResources' not in events:
2662 continue
2663 for data_events in events['DataResources']:
2664 if data_events['Type'] != 'AWS::S3::Object':
2665 continue
2666 for b in data_events['Values']:
2667 event_buckets[b.rsplit(':')[-1].strip('/')] = t['Name']
2668 return event_buckets
2670 def process(self, resources, event=None):
2671 trails = self.manager.get_resource_manager('cloudtrail').resources()
2672 local_trails = self.filter_resources(
2673 trails,
2674 "split(':', TrailARN)[4]", (self.manager.account_id,)
2675 )
2676 session = local_session(self.manager.session_factory)
2677 event_buckets = self.get_event_buckets(session, local_trails)
2678 ops = {
2679 'present': lambda x: (
2680 x['Name'] in event_buckets or '' in event_buckets),
2681 'absent': (
2682 lambda x: x['Name'] not in event_buckets and ''
2683 not in event_buckets)}
2685 op = ops[self.data.get('state', 'present')]
2686 results = []
2687 for b in resources:
2688 if op(b):
2689 results.append(b)
2690 return results
2693@filters.register('inventory')
2694class Inventory(ValueFilter):
2695 """Filter inventories for a bucket"""
2696 schema = type_schema('inventory', rinherit=ValueFilter.schema)
2697 schema_alias = False
2698 permissions = ('s3:GetInventoryConfiguration',)
2700 def process(self, buckets, event=None):
2701 results = []
2702 with self.executor_factory(max_workers=2) as w:
2703 futures = {}
2704 for b in buckets:
2705 futures[w.submit(self.process_bucket, b)] = b
2707 for f in as_completed(futures):
2708 b = futures[f]
2709 if f.exception():
2710 b.setdefault('c7n:DeniedMethods', []).append('GetInventoryConfiguration')
2711 self.log.error(
2712 "Error processing bucket: %s error: %s",
2713 b['Name'], f.exception())
2714 continue
2715 if f.result():
2716 results.append(b)
2717 return results
2719 def process_bucket(self, b):
2720 if 'c7n:inventories' not in b:
2721 client = bucket_client(local_session(self.manager.session_factory), b)
2722 inventories = client.list_bucket_inventory_configurations(
2723 Bucket=b['Name']).get('InventoryConfigurationList', [])
2724 b['c7n:inventories'] = inventories
2726 for i in b['c7n:inventories']:
2727 if self.match(i):
2728 return True
2731@actions.register('set-inventory')
2732class SetInventory(BucketActionBase):
2733 """Configure bucket inventories for an s3 bucket.
2734 """
2735 schema = type_schema(
2736 'set-inventory',
2737 required=['name', 'destination'],
2738 state={'enum': ['enabled', 'disabled', 'absent']},
2739 name={'type': 'string', 'description': 'Name of inventory'},
2740 destination={'type': 'string', 'description': 'Name of destination bucket'},
2741 prefix={'type': 'string', 'description': 'Destination prefix'},
2742 encryption={'enum': ['SSES3', 'SSEKMS']},
2743 key_id={'type': 'string', 'description': 'Optional Customer KMS KeyId for SSE-KMS'},
2744 versions={'enum': ['All', 'Current']},
2745 schedule={'enum': ['Daily', 'Weekly']},
2746 format={'enum': ['CSV', 'ORC', 'Parquet']},
2747 fields={'type': 'array', 'items': {'enum': [
2748 'Size', 'LastModifiedDate', 'StorageClass', 'ETag',
2749 'IsMultipartUploaded', 'ReplicationStatus', 'EncryptionStatus',
2750 'ObjectLockRetainUntilDate', 'ObjectLockMode', 'ObjectLockLegalHoldStatus',
2751 'IntelligentTieringAccessTier', 'BucketKeyStatus', 'ChecksumAlgorithm']}})
2753 permissions = ('s3:PutInventoryConfiguration', 's3:GetInventoryConfiguration')
2755 def process(self, buckets):
2756 with self.executor_factory(max_workers=2) as w:
2757 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
2758 for future in as_completed(futures):
2759 bucket = futures[future]
2760 try:
2761 future.result()
2762 except Exception as e:
2763 self.log.error('Message: %s Bucket: %s', e, bucket['Name'])
2765 def process_bucket(self, b):
2766 inventory_name = self.data.get('name')
2767 destination = self.data.get('destination')
2768 prefix = self.data.get('prefix', '')
2769 schedule = self.data.get('schedule', 'Daily')
2770 fields = self.data.get('fields', ['LastModifiedDate', 'Size'])
2771 versions = self.data.get('versions', 'Current')
2772 state = self.data.get('state', 'enabled')
2773 encryption = self.data.get('encryption')
2774 inventory_format = self.data.get('format', 'CSV')
2776 if not prefix:
2777 prefix = "Inventories/%s" % (self.manager.config.account_id)
2779 client = bucket_client(local_session(self.manager.session_factory), b)
2780 if state == 'absent':
2781 try:
2782 client.delete_bucket_inventory_configuration(
2783 Bucket=b['Name'], Id=inventory_name)
2784 except ClientError as e:
2785 if e.response['Error']['Code'] != 'NoSuchConfiguration':
2786 raise
2787 return
2789 bucket = {
2790 'Bucket': "arn:aws:s3:::%s" % destination,
2791 'Format': inventory_format
2792 }
2794 inventory = {
2795 'Destination': {
2796 'S3BucketDestination': bucket
2797 },
2798 'IsEnabled': state == 'enabled' and True or False,
2799 'Id': inventory_name,
2800 'OptionalFields': fields,
2801 'IncludedObjectVersions': versions,
2802 'Schedule': {
2803 'Frequency': schedule
2804 }
2805 }
2807 if prefix:
2808 bucket['Prefix'] = prefix
2810 if encryption:
2811 bucket['Encryption'] = {encryption: {}}
2812 if encryption == 'SSEKMS' and self.data.get('key_id'):
2813 bucket['Encryption'] = {encryption: {
2814 'KeyId': self.data['key_id']
2815 }}
2817 found = self.get_inventory_delta(client, inventory, b)
2818 if found:
2819 return
2820 if found is False:
2821 self.log.debug("updating bucket:%s inventory configuration id:%s",
2822 b['Name'], inventory_name)
2823 client.put_bucket_inventory_configuration(
2824 Bucket=b['Name'], Id=inventory_name, InventoryConfiguration=inventory)
2826 def get_inventory_delta(self, client, inventory, b):
2827 inventories = client.list_bucket_inventory_configurations(Bucket=b['Name'])
2828 found = None
2829 for i in inventories.get('InventoryConfigurationList', []):
2830 if i['Id'] != inventory['Id']:
2831 continue
2832 found = True
2833 for k, v in inventory.items():
2834 if k not in i:
2835 found = False
2836 continue
2837 if isinstance(v, list):
2838 v.sort()
2839 i[k].sort()
2840 if i[k] != v:
2841 found = False
2842 return found
2845@filters.register('intelligent-tiering')
2846class IntelligentTiering(ListItemFilter):
2847 """Filter for S3 buckets to look at intelligent tiering configurations
2849 The schema to supply to the attrs follows the schema here:
2850 https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_bucket_intelligent_tiering_configurations.html
2852 :example:
2854 .. code-block:: yaml
2856 policies:
2857 - name: s3-intelligent-tiering-configuration
2858 resource: s3
2859 filters:
2860 - type: intelligent-tiering
2861 attrs:
2862 - Status: Enabled
2863 - Filter:
2864 And:
2865 Prefix: test
2866 Tags:
2867 - Key: Owner
2868 Value: c7n
2869 - Tierings:
2870 - Days: 100
2871 - AccessTier: ARCHIVE_ACCESS
2873 """
2874 schema = type_schema(
2875 'intelligent-tiering',
2876 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'},
2877 count={'type': 'number'},
2878 count_op={'$ref': '#/definitions/filters_common/comparison_operators'}
2879 )
2880 permissions = ('s3:GetIntelligentTieringConfiguration',)
2881 annotation_key = "c7n:IntelligentTiering"
2882 annotate_items = True
2884 def __init__(self, data, manager=None):
2885 super().__init__(data, manager)
2886 self.data['key'] = self.annotation_key
2888 def process(self, buckets, event=None):
2889 with self.executor_factory(max_workers=2) as w:
2890 futures = {w.submit(self.get_item_values, b): b for b in buckets}
2891 for future in as_completed(futures):
2892 b = futures[future]
2893 if future.exception():
2894 self.log.error("Message: %s Bucket: %s", future.exception(), b['Name'])
2895 continue
2896 return super().process(buckets, event)
2898 def get_item_values(self, b):
2899 if self.annotation_key not in b:
2900 client = bucket_client(local_session(self.manager.session_factory), b)
2901 try:
2902 int_tier_config = client.list_bucket_intelligent_tiering_configurations(
2903 Bucket=b['Name'])
2904 b[self.annotation_key] = int_tier_config.get(
2905 'IntelligentTieringConfigurationList', [])
2906 except ClientError as e:
2907 if e.response['Error']['Code'] == 'AccessDenied':
2908 method = 'list_bucket_intelligent_tiering_configurations'
2909 log.warning(
2910 "Bucket:%s unable to invoke method:%s error:%s ",
2911 b['Name'], method, e.response['Error']['Message'])
2912 b.setdefault('c7n:DeniedMethods', []).append(method)
2913 return b.get(self.annotation_key)
2916@actions.register('set-intelligent-tiering')
2917class ConfigureIntelligentTiering(BucketActionBase):
2918 """Action applies an intelligent tiering configuration to a S3 bucket
2920 The schema to supply to the configuration follows the schema here:
2921 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_bucket_intelligent_tiering_configuration.html
2923 To delete a configuration, supply Status=delete with the either the Id or Id: matched
2925 :example:
2927 .. code-block:: yaml
2929 policies:
2930 - name: s3-apply-intelligent-tiering-config
2931 resource: aws.s3
2932 filters:
2933 - not:
2934 - type: intelligent-tiering
2935 attrs:
2936 - Status: Enabled
2937 - Filter:
2938 And:
2939 Prefix: helloworld
2940 Tags:
2941 - Key: Hello
2942 Value: World
2943 - Tierings:
2944 - Days: 123
2945 AccessTier: ARCHIVE_ACCESS
2946 actions:
2947 - type: set-intelligent-tiering
2948 Id: c7n-default
2949 IntelligentTieringConfiguration:
2950 Id: c7n-default
2951 Status: Enabled
2952 Tierings:
2953 - Days: 149
2954 AccessTier: ARCHIVE_ACCESS
2956 - name: s3-delete-intelligent-tiering-configuration
2957 resource: aws.s3
2958 filters:
2959 - type: intelligent-tiering
2960 attrs:
2961 - Status: Enabled
2962 - Id: test-config
2963 actions:
2964 - type: set-intelligent-tiering
2965 Id: test-config
2966 State: delete
2968 - name: s3-delete-intelligent-tiering-matched-configs
2969 resource: aws.s3
2970 filters:
2971 - type: intelligent-tiering
2972 attrs:
2973 - Status: Enabled
2974 - Id: test-config
2975 actions:
2976 - type: set-intelligent-tiering
2977 Id: matched
2978 State: delete
2980 """
2982 annotation_key = 'c7n:ListItemMatches'
2983 shape = 'PutBucketIntelligentTieringConfigurationRequest'
2984 schema = {
2985 'type': 'object',
2986 'additionalProperties': False,
2987 'oneOf': [
2988 {'required': ['type', 'Id', 'IntelligentTieringConfiguration']},
2989 {'required': ['type', 'Id', 'State']}],
2990 'properties': {
2991 'type': {'enum': ['set-intelligent-tiering']},
2992 'Id': {'type': 'string'},
2993 # delete intelligent tier configurations via state: delete
2994 'State': {'type': 'string', 'enum': ['delete']},
2995 'IntelligentTieringConfiguration': {'type': 'object'}
2996 },
2997 }
2999 permissions = ('s3:PutIntelligentTieringConfiguration',)
3001 def validate(self):
3002 # You can have up to 1,000 S3 Intelligent-Tiering configurations per bucket.
3003 # Hence, always use it with a filter
3004 found = False
3005 for f in self.manager.iter_filters():
3006 if isinstance(f, IntelligentTiering):
3007 found = True
3008 break
3009 if not found:
3010 raise PolicyValidationError(
3011 '`set-intelligent-tiering` may only be used in '
3012 'conjunction with `intelligent-tiering` filter on %s' % (self.manager.data,))
3013 cfg = dict(self.data)
3014 if 'IntelligentTieringConfiguration' in cfg:
3015 cfg['Bucket'] = 'bucket'
3016 cfg.pop('type')
3017 return shape_validate(
3018 cfg, self.shape, self.manager.resource_type.service)
3020 def process(self, buckets):
3021 with self.executor_factory(max_workers=3) as w:
3022 futures = {}
3024 for b in buckets:
3025 futures[w.submit(self.process_bucket, b)] = b
3027 for future in as_completed(futures):
3028 if future.exception():
3029 bucket = futures[future]
3030 self.log.error(
3031 'error modifying bucket intelligent tiering configuration: %s\n%s',
3032 bucket['Name'], future.exception())
3033 continue
3035 def process_bucket(self, bucket):
3036 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3038 if 'list_bucket_intelligent_tiering_configurations' in bucket.get(
3039 'c7n:DeniedMethods', []):
3040 log.warning("Access Denied Bucket:%s while reading intelligent tiering configurations"
3041 % bucket['Name'])
3042 return
3044 if self.data.get('Id') and self.data.get('IntelligentTieringConfiguration'):
3045 try:
3046 s3.put_bucket_intelligent_tiering_configuration(
3047 Bucket=bucket['Name'], Id=self.data.get(
3048 'Id'), IntelligentTieringConfiguration=self.data.get(
3049 'IntelligentTieringConfiguration'))
3050 except ClientError as e:
3051 if e.response['Error']['Code'] == 'AccessDenied':
3052 log.warning(
3053 "Access Denied Bucket:%s while applying intelligent tiering configuration"
3054 % bucket['Name'])
3055 if self.data.get('State'):
3056 if self.data.get('Id') == 'matched':
3057 for config in bucket.get(self.annotation_key):
3058 self.delete_intelligent_tiering_configurations(s3, config.get('Id'), bucket)
3059 else:
3060 self.delete_intelligent_tiering_configurations(s3, self.data.get('Id'), bucket)
3062 def delete_intelligent_tiering_configurations(self, s3_client, id, bucket):
3063 try:
3064 s3_client.delete_bucket_intelligent_tiering_configuration(Bucket=bucket['Name'], Id=id)
3065 except ClientError as e:
3066 if e.response['Error']['Code'] == 'AccessDenied':
3067 log.warning(
3068 "Access Denied Bucket:%s while deleting intelligent tiering configuration"
3069 % bucket['Name'])
3070 elif e.response['Error']['Code'] == 'NoSuchConfiguration':
3071 log.warning(
3072 "No such configuration found:%s while deleting intelligent tiering configuration"
3073 % bucket['Name'])
3076@actions.register('delete')
3077class DeleteBucket(ScanBucket):
3078 """Action deletes a S3 bucket
3080 :example:
3082 .. code-block:: yaml
3084 policies:
3085 - name: delete-unencrypted-buckets
3086 resource: s3
3087 filters:
3088 - type: missing-statement
3089 statement_ids:
3090 - RequiredEncryptedPutObject
3091 actions:
3092 - type: delete
3093 remove-contents: true
3094 """
3096 schema = type_schema('delete', **{'remove-contents': {'type': 'boolean'}})
3098 permissions = ('s3:*',)
3100 bucket_ops = {
3101 'standard': {
3102 'iterator': 'list_objects',
3103 'contents_key': ['Contents'],
3104 'key_processor': 'process_key'
3105 },
3106 'versioned': {
3107 'iterator': 'list_object_versions',
3108 'contents_key': ['Versions', 'DeleteMarkers'],
3109 'key_processor': 'process_version'
3110 }
3111 }
3113 def process_delete_enablement(self, b):
3114 """Prep a bucket for deletion.
3116 Clear out any pending multi-part uploads.
3118 Disable versioning on the bucket, so deletes don't
3119 generate fresh deletion markers.
3120 """
3121 client = bucket_client(
3122 local_session(self.manager.session_factory), b)
3124 # Stop replication so we can suspend versioning
3125 if b.get('Replication') is not None:
3126 client.delete_bucket_replication(Bucket=b['Name'])
3128 # Suspend versioning, so we don't get new delete markers
3129 # as we walk and delete versions
3130 if (self.get_bucket_style(b) == 'versioned' and b['Versioning']['Status'] == 'Enabled' and
3131 self.data.get('remove-contents', True)):
3132 client.put_bucket_versioning(
3133 Bucket=b['Name'],
3134 VersioningConfiguration={'Status': 'Suspended'})
3136 # Clear our multi-part uploads
3137 uploads = client.get_paginator('list_multipart_uploads')
3138 for p in uploads.paginate(Bucket=b['Name']):
3139 for u in p.get('Uploads', ()):
3140 client.abort_multipart_upload(
3141 Bucket=b['Name'],
3142 Key=u['Key'],
3143 UploadId=u['UploadId'])
3145 def process(self, buckets):
3146 # might be worth sanity checking all our permissions
3147 # on the bucket up front before disabling versioning/replication.
3148 if self.data.get('remove-contents', True):
3149 self._process_with_futures(self.process_delete_enablement, buckets)
3150 self.empty_buckets(buckets)
3152 results = self._process_with_futures(self.delete_bucket, buckets)
3153 self.write_denied_buckets_file()
3154 return results
3156 def delete_bucket(self, b):
3157 s3 = bucket_client(self.manager.session_factory(), b)
3158 try:
3159 self._run_api(s3.delete_bucket, Bucket=b['Name'])
3160 except ClientError as e:
3161 if e.response['Error']['Code'] == 'BucketNotEmpty':
3162 self.log.error(
3163 "Error while deleting bucket %s, bucket not empty" % (
3164 b['Name']))
3165 else:
3166 raise e
3168 def empty_buckets(self, buckets):
3169 t = time.time()
3170 results = super(DeleteBucket, self).process(buckets)
3171 run_time = time.time() - t
3172 object_count = 0
3174 for r in results:
3175 object_count += r['Count']
3176 self.manager.ctx.metrics.put_metric(
3177 "Total Keys", object_count, "Count", Scope=r['Bucket'],
3178 buffer=True)
3179 self.manager.ctx.metrics.put_metric(
3180 "Total Keys", object_count, "Count", Scope="Account", buffer=True)
3181 self.manager.ctx.metrics.flush()
3183 log.info(
3184 "EmptyBucket buckets:%d Complete keys:%d rate:%0.2f/s time:%0.2fs",
3185 len(buckets), object_count,
3186 float(object_count) / run_time if run_time else 0, run_time)
3187 return results
3189 def process_chunk(self, batch, bucket):
3190 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3191 objects = []
3192 for key in batch:
3193 obj = {'Key': key['Key']}
3194 if 'VersionId' in key:
3195 obj['VersionId'] = key['VersionId']
3196 objects.append(obj)
3197 results = s3.delete_objects(
3198 Bucket=bucket['Name'], Delete={'Objects': objects}).get('Deleted', ())
3199 if self.get_bucket_style(bucket) != 'versioned':
3200 return results
3203@actions.register('configure-lifecycle')
3204class Lifecycle(BucketActionBase):
3205 """Action applies a lifecycle policy to versioned S3 buckets
3207 The schema to supply to the rule follows the schema here:
3208 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_bucket_lifecycle_configuration
3210 To delete a lifecycle rule, supply Status=absent
3212 :example:
3214 .. code-block:: yaml
3216 policies:
3217 - name: s3-apply-lifecycle
3218 resource: s3
3219 actions:
3220 - type: configure-lifecycle
3221 rules:
3222 - ID: my-lifecycle-id
3223 Status: Enabled
3224 Prefix: foo/
3225 Transitions:
3226 - Days: 60
3227 StorageClass: GLACIER
3229 """
3231 schema = type_schema(
3232 'configure-lifecycle',
3233 **{
3234 'rules': {
3235 'type': 'array',
3236 'items': {
3237 'type': 'object',
3238 'required': ['ID', 'Status'],
3239 'additionalProperties': False,
3240 'properties': {
3241 'ID': {'type': 'string'},
3242 # c7n intercepts `absent`
3243 'Status': {'enum': ['Enabled', 'Disabled', 'absent']},
3244 'Prefix': {'type': 'string'},
3245 'Expiration': {
3246 'type': 'object',
3247 'additionalProperties': False,
3248 'properties': {
3249 'Date': {'type': 'string'}, # Date
3250 'Days': {'type': 'integer'},
3251 'ExpiredObjectDeleteMarker': {'type': 'boolean'},
3252 },
3253 },
3254 'Filter': {
3255 'type': 'object',
3256 'minProperties': 1,
3257 'maxProperties': 1,
3258 'additionalProperties': False,
3259 'properties': {
3260 'Prefix': {'type': 'string'},
3261 'ObjectSizeGreaterThan': {'type': 'integer'},
3262 'ObjectSizeLessThan': {'type': 'integer'},
3263 'Tag': {
3264 'type': 'object',
3265 'required': ['Key', 'Value'],
3266 'additionalProperties': False,
3267 'properties': {
3268 'Key': {'type': 'string'},
3269 'Value': {'type': 'string'},
3270 },
3271 },
3272 'And': {
3273 'type': 'object',
3274 'additionalProperties': False,
3275 'properties': {
3276 'Prefix': {'type': 'string'},
3277 'ObjectSizeGreaterThan': {'type': 'integer'},
3278 'ObjectSizeLessThan': {'type': 'integer'},
3279 'Tags': {
3280 'type': 'array',
3281 'items': {
3282 'type': 'object',
3283 'required': ['Key', 'Value'],
3284 'additionalProperties': False,
3285 'properties': {
3286 'Key': {'type': 'string'},
3287 'Value': {'type': 'string'},
3288 },
3289 },
3290 },
3291 },
3292 },
3293 },
3294 },
3295 'Transitions': {
3296 'type': 'array',
3297 'items': {
3298 'type': 'object',
3299 'additionalProperties': False,
3300 'properties': {
3301 'Date': {'type': 'string'}, # Date
3302 'Days': {'type': 'integer'},
3303 'StorageClass': {'type': 'string'},
3304 },
3305 },
3306 },
3307 'NoncurrentVersionTransitions': {
3308 'type': 'array',
3309 'items': {
3310 'type': 'object',
3311 'additionalProperties': False,
3312 'properties': {
3313 'NoncurrentDays': {'type': 'integer'},
3314 'NewerNoncurrentVersions': {'type': 'integer'},
3315 'StorageClass': {'type': 'string'},
3316 },
3317 },
3318 },
3319 'NoncurrentVersionExpiration': {
3320 'type': 'object',
3321 'additionalProperties': False,
3322 'properties': {
3323 'NoncurrentDays': {'type': 'integer'},
3324 'NewerNoncurrentVersions': {'type': 'integer'}
3325 },
3326 },
3327 'AbortIncompleteMultipartUpload': {
3328 'type': 'object',
3329 'additionalProperties': False,
3330 'properties': {
3331 'DaysAfterInitiation': {'type': 'integer'},
3332 },
3333 },
3334 },
3335 },
3336 },
3337 }
3338 )
3340 permissions = ('s3:GetLifecycleConfiguration', 's3:PutLifecycleConfiguration')
3342 def process(self, buckets):
3343 with self.executor_factory(max_workers=3) as w:
3344 futures = {}
3345 results = []
3347 for b in buckets:
3348 futures[w.submit(self.process_bucket, b)] = b
3350 for future in as_completed(futures):
3351 if future.exception():
3352 bucket = futures[future]
3353 self.log.error('error modifying bucket lifecycle: %s\n%s',
3354 bucket['Name'], future.exception())
3355 results += filter(None, [future.result()])
3357 return results
3359 def process_bucket(self, bucket):
3360 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3362 if 'get_bucket_lifecycle_configuration' in bucket.get('c7n:DeniedMethods', []):
3363 log.warning("Access Denied Bucket:%s while reading lifecycle" % bucket['Name'])
3364 return
3366 # Adjust the existing lifecycle by adding/deleting/overwriting rules as necessary
3367 config = (bucket.get('Lifecycle') or {}).get('Rules', [])
3368 for rule in self.data['rules']:
3369 for index, existing_rule in enumerate(config):
3370 if not existing_rule:
3371 continue
3372 if rule['ID'] == existing_rule['ID']:
3373 if rule['Status'] == 'absent':
3374 config[index] = None
3375 else:
3376 config[index] = rule
3377 break
3378 else:
3379 if rule['Status'] != 'absent':
3380 config.append(rule)
3382 # The extra `list` conversion is required for python3
3383 config = list(filter(None, config))
3385 try:
3386 if not config:
3387 s3.delete_bucket_lifecycle(Bucket=bucket['Name'])
3388 else:
3389 s3.put_bucket_lifecycle_configuration(
3390 Bucket=bucket['Name'], LifecycleConfiguration={'Rules': config})
3391 except ClientError as e:
3392 if e.response['Error']['Code'] == 'AccessDenied':
3393 log.warning("Access Denied Bucket:%s while applying lifecycle" % bucket['Name'])
3394 else:
3395 raise e
3398class KMSKeyResolverMixin:
3399 """Builds a dictionary of region specific ARNs"""
3401 def __init__(self, data, manager=None):
3402 self.arns = dict()
3403 self.data = data
3404 self.manager = manager
3406 def resolve_keys(self, buckets):
3407 key = self.data.get('key')
3408 if not key:
3409 return None
3411 regions = {get_region(b) for b in buckets}
3412 for r in regions:
3413 client = local_session(self.manager.session_factory).client('kms', region_name=r)
3414 try:
3415 key_meta = client.describe_key(
3416 KeyId=key
3417 ).get('KeyMetadata', {})
3418 key_id = key_meta.get('KeyId')
3420 # We need a complete set of alias identifiers (names and ARNs)
3421 # to fully evaluate bucket encryption filters.
3422 key_aliases = client.list_aliases(
3423 KeyId=key_id
3424 ).get('Aliases', [])
3426 self.arns[r] = {
3427 'KeyId': key_id,
3428 'Arn': key_meta.get('Arn'),
3429 'KeyManager': key_meta.get('KeyManager'),
3430 'Description': key_meta.get('Description'),
3431 'Aliases': [
3432 alias[attr]
3433 for alias in key_aliases
3434 for attr in ('AliasArn', 'AliasName')
3435 ],
3436 }
3438 except ClientError as e:
3439 self.log.error('Error resolving kms ARNs for set-bucket-encryption: %s key: %s' % (
3440 e, self.data.get('key')))
3442 def get_key(self, bucket):
3443 if 'key' not in self.data:
3444 return None
3445 region = get_region(bucket)
3446 key = self.arns.get(region)
3447 if not key:
3448 self.log.warning('Unable to resolve key %s for bucket %s in region %s',
3449 self.data['key'], bucket.get('Name'), region)
3450 return key
3453@filters.register('bucket-encryption')
3454class BucketEncryption(KMSKeyResolverMixin, Filter):
3455 """Filters for S3 buckets that have bucket-encryption
3457 :example
3459 .. code-block:: yaml
3461 policies:
3462 - name: s3-bucket-encryption-AES256
3463 resource: s3
3464 region: us-east-1
3465 filters:
3466 - type: bucket-encryption
3467 state: True
3468 crypto: AES256
3469 - name: s3-bucket-encryption-KMS
3470 resource: s3
3471 region: us-east-1
3472 filters:
3473 - type: bucket-encryption
3474 state: True
3475 crypto: aws:kms
3476 key: alias/some/alias/key
3477 - name: s3-bucket-encryption-off
3478 resource: s3
3479 region: us-east-1
3480 filters:
3481 - type: bucket-encryption
3482 state: False
3483 - name: s3-bucket-test-bucket-key-enabled
3484 resource: s3
3485 region: us-east-1
3486 filters:
3487 - type: bucket-encryption
3488 bucket_key_enabled: True
3489 """
3490 schema = type_schema('bucket-encryption',
3491 state={'type': 'boolean'},
3492 crypto={'type': 'string', 'enum': ['AES256', 'aws:kms']},
3493 key={'type': 'string'},
3494 bucket_key_enabled={'type': 'boolean'})
3496 permissions = ('s3:GetEncryptionConfiguration', 'kms:DescribeKey', 'kms:ListAliases')
3497 annotation_key = 'c7n:bucket-encryption'
3499 def validate(self):
3500 if self.data.get('bucket_key_enabled') is not None and self.data.get('key') is not None:
3501 raise PolicyValidationError(
3502 f'key and bucket_key_enabled attributes cannot both be set: {self.data}'
3503 )
3505 def process(self, buckets, event=None):
3506 self.resolve_keys(buckets)
3507 results = []
3508 with self.executor_factory(max_workers=2) as w:
3509 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3510 for future in as_completed(futures):
3511 b = futures[future]
3512 if future.exception():
3513 self.log.error("Message: %s Bucket: %s", future.exception(),
3514 b['Name'])
3515 continue
3516 if future.result():
3517 results.append(b)
3518 return results
3520 def process_bucket(self, b):
3522 client = bucket_client(local_session(self.manager.session_factory), b)
3523 rules = []
3524 if self.annotation_key not in b:
3525 try:
3526 be = client.get_bucket_encryption(Bucket=b['Name'])
3527 be.pop('ResponseMetadata', None)
3528 except ClientError as e:
3529 if e.response['Error']['Code'] != 'ServerSideEncryptionConfigurationNotFoundError':
3530 raise
3531 be = {}
3532 b[self.annotation_key] = be
3533 else:
3534 be = b[self.annotation_key]
3536 rules = be.get('ServerSideEncryptionConfiguration', {}).get('Rules', [])
3537 # default `state` to True as previous impl assumed state == True
3538 # to preserve backwards compatibility
3539 if self.data.get('bucket_key_enabled'):
3540 for rule in rules:
3541 return self.filter_bucket_key_enabled(rule)
3542 elif self.data.get('bucket_key_enabled') is False:
3543 for rule in rules:
3544 return not self.filter_bucket_key_enabled(rule)
3546 if self.data.get('state', True):
3547 for sse in rules:
3548 return self.filter_bucket(b, sse)
3549 return False
3550 else:
3551 for sse in rules:
3552 return not self.filter_bucket(b, sse)
3553 return True
3555 def filter_bucket(self, b, sse):
3556 allowed = ['AES256', 'aws:kms']
3557 key = self.get_key(b)
3558 crypto = self.data.get('crypto')
3559 rule = sse.get('ApplyServerSideEncryptionByDefault')
3561 if not rule:
3562 return False
3563 algo = rule.get('SSEAlgorithm')
3565 if not crypto and algo in allowed:
3566 return True
3568 if crypto == 'AES256' and algo == 'AES256':
3569 return True
3570 elif crypto == 'aws:kms' and algo == 'aws:kms':
3571 if not key:
3572 # There are two broad reasons to have an empty value for
3573 # the regional key here:
3574 #
3575 # * The policy did not specify a key, in which case this
3576 # filter should match _all_ buckets with a KMS default
3577 # encryption rule.
3578 #
3579 # * The policy specified a key that could not be
3580 # resolved, in which case this filter shouldn't match
3581 # any buckets.
3582 return 'key' not in self.data
3584 # The default encryption rule can specify a key ID,
3585 # key ARN, alias name or alias ARN. Match against any of
3586 # those attributes. A rule specifying KMS with no master key
3587 # implies the AWS-managed key.
3588 key_ids = {key.get('Arn'), key.get('KeyId'), *key['Aliases']}
3589 return rule.get('KMSMasterKeyID', 'alias/aws/s3') in key_ids
3591 def filter_bucket_key_enabled(self, rule) -> bool:
3592 if not rule:
3593 return False
3594 return rule.get('BucketKeyEnabled')
3597@actions.register('set-bucket-encryption')
3598class SetBucketEncryption(KMSKeyResolverMixin, BucketActionBase):
3599 """Action enables default encryption on S3 buckets
3601 `enabled`: boolean Optional: Defaults to True
3603 `crypto`: aws:kms | AES256` Optional: Defaults to AES256
3605 `key`: arn, alias, or kms id key
3607 `bucket-key`: boolean Optional:
3608 Defaults to True.
3609 Reduces amount of API traffic from Amazon S3 to KMS and can reduce KMS request
3610 costsby up to 99 percent. Requires kms:Decrypt permissions for copy and upload
3611 on the AWS KMS Key Policy.
3613 Bucket Key Docs: https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucket-key.html
3615 :example:
3617 .. code-block:: yaml
3619 policies:
3620 - name: s3-enable-default-encryption-kms
3621 resource: s3
3622 actions:
3623 - type: set-bucket-encryption
3624 # enabled: true <------ optional (true by default)
3625 crypto: aws:kms
3626 key: 1234abcd-12ab-34cd-56ef-1234567890ab
3627 bucket-key: true
3629 - name: s3-enable-default-encryption-kms-alias
3630 resource: s3
3631 actions:
3632 - type: set-bucket-encryption
3633 # enabled: true <------ optional (true by default)
3634 crypto: aws:kms
3635 key: alias/some/alias/key
3636 bucket-key: true
3638 - name: s3-enable-default-encryption-aes256
3639 resource: s3
3640 actions:
3641 - type: set-bucket-encryption
3642 # bucket-key: true <--- optional (true by default for AWS SSE)
3643 # crypto: AES256 <----- optional (AES256 by default)
3644 # enabled: true <------ optional (true by default)
3646 - name: s3-disable-default-encryption
3647 resource: s3
3648 actions:
3649 - type: set-bucket-encryption
3650 enabled: false
3651 """
3653 schema = {
3654 'type': 'object',
3655 'additionalProperties': False,
3656 'properties': {
3657 'type': {'enum': ['set-bucket-encryption']},
3658 'enabled': {'type': 'boolean'},
3659 'crypto': {'enum': ['aws:kms', 'AES256']},
3660 'key': {'type': 'string'},
3661 'bucket-key': {'type': 'boolean'}
3662 },
3663 'dependencies': {
3664 'key': {
3665 'properties': {
3666 'crypto': {'pattern': 'aws:kms'}
3667 },
3668 'required': ['crypto']
3669 }
3670 }
3671 }
3673 permissions = ('s3:PutEncryptionConfiguration', 's3:GetEncryptionConfiguration',
3674 'kms:ListAliases', 'kms:DescribeKey')
3676 def process(self, buckets):
3677 if self.data.get('enabled', True):
3678 self.resolve_keys(buckets)
3680 with self.executor_factory(max_workers=3) as w:
3681 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3682 for future in as_completed(futures):
3683 if future.exception():
3684 self.log.error('Message: %s Bucket: %s', future.exception(),
3685 futures[future]['Name'])
3687 def process_bucket(self, bucket):
3688 default_key_desc = 'Default master key that protects my S3 objects when no other key is defined' # noqa
3689 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3690 if not self.data.get('enabled', True):
3691 s3.delete_bucket_encryption(Bucket=bucket['Name'])
3692 return
3693 algo = self.data.get('crypto', 'AES256')
3695 # bucket key defaults to True for alias/aws/s3 and AES256 (Amazon SSE)
3696 # and ignores False values for that crypto
3697 bucket_key = self.data.get('bucket-key', True)
3698 config = {
3699 'Rules': [
3700 {
3701 'ApplyServerSideEncryptionByDefault': {
3702 'SSEAlgorithm': algo,
3703 },
3704 'BucketKeyEnabled': bucket_key
3705 }
3706 ]
3707 }
3709 if algo == 'aws:kms':
3710 key = self.get_key(bucket)
3711 if not key:
3712 raise Exception('Valid KMS Key required but does not exist')
3714 config['Rules'][0]['ApplyServerSideEncryptionByDefault']['KMSMasterKeyID'] = key['Arn']
3715 s3.put_bucket_encryption(
3716 Bucket=bucket['Name'],
3717 ServerSideEncryptionConfiguration=config
3718 )
3721OWNERSHIP_CONTROLS = ['BucketOwnerEnforced', 'BucketOwnerPreferred', 'ObjectWriter']
3722VALUE_FILTER_MAGIC_VALUES = ['absent', 'present', 'not-null', 'empty']
3725@filters.register('ownership')
3726class BucketOwnershipControls(BucketFilterBase, ValueFilter):
3727 """Filter for object ownership controls
3729 Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html
3731 :example
3733 Find buckets with ACLs disabled
3735 .. code-block:: yaml
3737 policies:
3738 - name: s3-bucket-acls-disabled
3739 resource: aws.s3
3740 region: us-east-1
3741 filters:
3742 - type: ownership
3743 value: BucketOwnerEnforced
3745 :example
3747 Find buckets with object ownership preferred or enforced
3749 .. code-block:: yaml
3751 policies:
3752 - name: s3-bucket-ownership-preferred
3753 resource: aws.s3
3754 region: us-east-1
3755 filters:
3756 - type: ownership
3757 op: in
3758 value:
3759 - BucketOwnerEnforced
3760 - BucketOwnerPreferred
3762 :example
3764 Find buckets with no object ownership controls
3766 .. code-block:: yaml
3768 policies:
3769 - name: s3-bucket-no-ownership-controls
3770 resource: aws.s3
3771 region: us-east-1
3772 filters:
3773 - type: ownership
3774 value: empty
3775 """
3776 schema = type_schema('ownership', rinherit=ValueFilter.schema, value={'oneOf': [
3777 {'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES},
3778 {'type': 'array', 'items': {
3779 'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}}]})
3780 permissions = ('s3:GetBucketOwnershipControls',)
3781 annotation_key = 'c7n:ownership'
3783 def __init__(self, data, manager=None):
3784 super(BucketOwnershipControls, self).__init__(data, manager)
3786 # Ownership controls appear as an array of rules. There can only be one
3787 # ObjectOwnership rule defined for a bucket, so we can automatically
3788 # match against that if it exists.
3789 self.data['key'] = f'("{self.annotation_key}".Rules[].ObjectOwnership)[0]'
3791 def process(self, buckets, event=None):
3792 with self.executor_factory(max_workers=2) as w:
3793 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3794 for future in as_completed(futures):
3795 b = futures[future]
3796 if future.exception():
3797 self.log.error("Message: %s Bucket: %s", future.exception(),
3798 b['Name'])
3799 continue
3800 return super(BucketOwnershipControls, self).process(buckets, event)
3802 def process_bucket(self, b):
3803 if self.annotation_key in b:
3804 return
3805 client = bucket_client(local_session(self.manager.session_factory), b)
3806 try:
3807 controls = client.get_bucket_ownership_controls(Bucket=b['Name'])
3808 controls.pop('ResponseMetadata', None)
3809 except ClientError as e:
3810 if e.response['Error']['Code'] != 'OwnershipControlsNotFoundError':
3811 raise
3812 controls = {}
3813 b[self.annotation_key] = controls.get('OwnershipControls')
3816@filters.register('bucket-replication')
3817class BucketReplication(ListItemFilter):
3818 """Filter for S3 buckets to look at bucket replication configurations
3820 The schema to supply to the attrs follows the schema here:
3821 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_bucket_replication.html
3823 :example:
3825 .. code-block:: yaml
3827 policies:
3828 - name: s3-bucket-replication
3829 resource: s3
3830 filters:
3831 - type: bucket-replication
3832 attrs:
3833 - Status: Enabled
3834 - Filter:
3835 And:
3836 Prefix: test
3837 Tags:
3838 - Key: Owner
3839 Value: c7n
3840 - ExistingObjectReplication: Enabled
3842 """
3843 schema = type_schema(
3844 'bucket-replication',
3845 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'},
3846 count={'type': 'number'},
3847 count_op={'$ref': '#/definitions/filters_common/comparison_operators'}
3848 )
3850 permissions = ("s3:GetReplicationConfiguration",)
3851 annotation_key = 'Replication'
3852 annotate_items = True
3854 def __init__(self, data, manager=None):
3855 super().__init__(data, manager)
3856 self.data['key'] = self.annotation_key
3858 def get_item_values(self, b):
3859 client = bucket_client(local_session(self.manager.session_factory), b)
3860 # replication configuration is called in S3_AUGMENT_TABLE:
3861 bucket_replication = b.get(self.annotation_key)
3863 rules = []
3864 if bucket_replication is not None:
3865 rules = bucket_replication.get('ReplicationConfiguration', {}).get('Rules', [])
3866 for replication in rules:
3867 self.augment_bucket_replication(b, replication, client)
3869 return rules
3871 def augment_bucket_replication(self, b, replication, client):
3872 destination_bucket = replication.get('Destination').get('Bucket').split(':')[5]
3873 destination_region = inspect_bucket_region(destination_bucket, client.meta.endpoint_url)
3874 source_region = get_region(b)
3875 replication['DestinationRegion'] = destination_region
3876 replication['CrossRegion'] = destination_region != source_region
3879@resources.register('s3-directory')
3880class S3Directory(query.QueryResourceManager):
3882 class resource_type(query.TypeInfo):
3883 service = 's3'
3884 permission_prefix = "s3express"
3885 arn_service = "s3express"
3886 arn_type = 'bucket'
3887 enum_spec = ('list_directory_buckets', 'Buckets[]', None)
3888 name = id = 'Name'
3889 date = 'CreationDate'
3890 dimension = 'BucketName'
3891 cfn_type = 'AWS::S3Express::DirectoryBucket'
3892 permissions_enum = ("s3express:ListAllMyDirectoryBuckets",)