Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/s3.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""S3 Resource Manager
5Filters:
7The generic Values filters (jmespath) expression and Or filter are
8available with all resources, including buckets, we include several
9additonal bucket data (Tags, Replication, Acl, Policy) as keys within
10a bucket representation.
12Actions:
14 encrypt-keys
16 Scan all keys in a bucket and optionally encrypt them in place.
18 global-grants
20 Check bucket acls for global grants
22 encryption-policy
24 Attach an encryption required policy to a bucket, this will break
25 applications that are not using encryption, including aws log
26 delivery.
28"""
29import copy
30import functools
31import json
32import logging
33import math
34import os
35import time
36import threading
37import ssl
39from botocore.client import Config
40from botocore.exceptions import ClientError
42from collections import defaultdict
43from concurrent.futures import as_completed
45try:
46 from urllib3.exceptions import SSLError
47except ImportError:
48 from botocore.vendored.requests.packages.urllib3.exceptions import SSLError
51from c7n import deprecated
52from c7n.actions import (
53 ActionRegistry, BaseAction, PutMetric, RemovePolicyBase, remove_statements)
54from c7n.exceptions import PolicyValidationError, PolicyExecutionError
55from c7n.filters import (
56 FilterRegistry, Filter, CrossAccountAccessFilter, MetricsFilter,
57 ValueFilter, ListItemFilter)
58from .aws import shape_validate
59from c7n.filters.policystatement import HasStatementFilter
60from c7n.manager import resources
61from c7n.output import NullBlobOutput
62from c7n import query
63from c7n.resources.securityhub import PostFinding
64from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction
65from c7n.utils import (
66 chunks, local_session, set_annotation, type_schema, filter_empty,
67 dumps, format_string_values, get_account_alias_from_sts)
68from c7n.resources.aws import inspect_bucket_region
71log = logging.getLogger('custodian.s3')
73filters = FilterRegistry('s3.filters')
74actions = ActionRegistry('s3.actions')
75filters.register('marked-for-op', TagActionFilter)
76actions.register('put-metric', PutMetric)
78MAX_COPY_SIZE = 1024 * 1024 * 1024 * 2
81class DescribeS3(query.DescribeSource):
83 def augment(self, buckets):
84 assembler = BucketAssembly(self.manager)
85 assembler.initialize()
87 with self.manager.executor_factory(
88 max_workers=min((10, len(buckets) + 1))) as w:
89 results = w.map(assembler.assemble, buckets)
90 results = list(filter(None, results))
91 return results
94class ConfigS3(query.ConfigSource):
96 # normalize config's janky idiosyncratic bespoke formating to the
97 # standard describe api responses.
99 def get_query_params(self, query):
100 q = super(ConfigS3, self).get_query_params(query)
101 if 'expr' in q:
102 q['expr'] = q['expr'].replace('select ', 'select awsRegion, ')
103 return q
105 def load_resource(self, item):
106 resource = super(ConfigS3, self).load_resource(item)
107 cfg = item['supplementaryConfiguration']
108 # aka standard
109 if 'awsRegion' in item and item['awsRegion'] != 'us-east-1':
110 resource['Location'] = {'LocationConstraint': item['awsRegion']}
111 else:
112 resource['Location'] = {}
114 # owner is under acl per describe
115 resource.pop('Owner', None)
117 for k, null_value in S3_CONFIG_SUPPLEMENT_NULL_MAP.items():
118 if k not in cfg:
119 continue
120 if cfg.get(k) == null_value:
121 continue
122 method = getattr(self, "handle_%s" % k, None)
123 if method is None:
124 raise ValueError("unhandled supplementary config %s", k)
125 continue
126 v = cfg[k]
127 if isinstance(cfg[k], str):
128 v = json.loads(cfg[k])
129 method(resource, v)
131 for el in S3_AUGMENT_TABLE:
132 if el[1] not in resource:
133 resource[el[1]] = el[2]
134 return resource
136 PERMISSION_MAP = {
137 'FullControl': 'FULL_CONTROL',
138 'Write': 'WRITE',
139 'WriteAcp': 'WRITE_ACP',
140 'Read': 'READ',
141 'ReadAcp': 'READ_ACP'}
143 GRANTEE_MAP = {
144 'AllUsers': "http://acs.amazonaws.com/groups/global/AllUsers",
145 'AuthenticatedUsers': "http://acs.amazonaws.com/groups/global/AuthenticatedUsers",
146 'LogDelivery': 'http://acs.amazonaws.com/groups/s3/LogDelivery'}
148 def handle_AccessControlList(self, resource, item_value):
149 # double serialized in config for some reason
150 if isinstance(item_value, str):
151 item_value = json.loads(item_value)
153 resource['Acl'] = {}
154 resource['Acl']['Owner'] = {'ID': item_value['owner']['id']}
155 if item_value['owner']['displayName']:
156 resource['Acl']['Owner']['DisplayName'] = item_value[
157 'owner']['displayName']
158 resource['Acl']['Grants'] = grants = []
160 for g in (item_value.get('grantList') or ()):
161 if 'id' not in g['grantee']:
162 assert g['grantee'] in self.GRANTEE_MAP, "unknown grantee %s" % g
163 rg = {'Type': 'Group', 'URI': self.GRANTEE_MAP[g['grantee']]}
164 else:
165 rg = {'ID': g['grantee']['id'], 'Type': 'CanonicalUser'}
167 if 'displayName' in g:
168 rg['DisplayName'] = g['displayName']
170 grants.append({
171 'Permission': self.PERMISSION_MAP[g['permission']],
172 'Grantee': rg,
173 })
175 def handle_BucketAccelerateConfiguration(self, resource, item_value):
176 # not currently auto-augmented by custodian
177 return
179 def handle_BucketLoggingConfiguration(self, resource, item_value):
180 if ('destinationBucketName' not in item_value or
181 item_value['destinationBucketName'] is None):
182 resource[u'Logging'] = {}
183 return
184 resource[u'Logging'] = {
185 'TargetBucket': item_value['destinationBucketName'],
186 'TargetPrefix': item_value['logFilePrefix']}
188 def handle_BucketLifecycleConfiguration(self, resource, item_value):
189 rules = []
190 for r in item_value.get('rules'):
191 rr = {}
192 rules.append(rr)
193 expiry = {}
194 for ek, ck in (
195 ('Date', 'expirationDate'),
196 ('ExpiredObjectDeleteMarker', 'expiredObjectDeleteMarker'),
197 ('Days', 'expirationInDays')):
198 if ck in r and r[ck] and r[ck] != -1:
199 expiry[ek] = r[ck]
200 if expiry:
201 rr['Expiration'] = expiry
203 transitions = []
204 for t in (r.get('transitions') or ()):
205 tr = {}
206 for k in ('date', 'days', 'storageClass'):
207 if t.get(k):
208 tr["%s%s" % (k[0].upper(), k[1:])] = t[k]
209 transitions.append(tr)
210 if transitions:
211 rr['Transitions'] = transitions
213 if r.get('abortIncompleteMultipartUpload'):
214 rr['AbortIncompleteMultipartUpload'] = {
215 'DaysAfterInitiation': r[
216 'abortIncompleteMultipartUpload']['daysAfterInitiation']}
217 if r.get('noncurrentVersionExpirationInDays'):
218 rr['NoncurrentVersionExpiration'] = {
219 'NoncurrentDays': r['noncurrentVersionExpirationInDays']}
221 nonc_transitions = []
222 for t in (r.get('noncurrentVersionTransitions') or ()):
223 nonc_transitions.append({
224 'NoncurrentDays': t['days'],
225 'StorageClass': t['storageClass']})
226 if nonc_transitions:
227 rr['NoncurrentVersionTransitions'] = nonc_transitions
229 rr['Status'] = r['status']
230 rr['ID'] = r['id']
231 if r.get('prefix'):
232 rr['Prefix'] = r['prefix']
233 if 'filter' not in r or not r['filter']:
234 continue
236 if r['filter']['predicate']:
237 rr['Filter'] = self.convertLifePredicate(r['filter']['predicate'])
239 resource['Lifecycle'] = {'Rules': rules}
241 def convertLifePredicate(self, p):
242 if p['type'] == 'LifecyclePrefixPredicate':
243 return {'Prefix': p['prefix']}
244 if p['type'] == 'LifecycleTagPredicate':
245 return {'Tags': [{'Key': p['tag']['key'], 'Value': p['tag']['value']}]}
246 if p['type'] == 'LifecycleAndOperator':
247 n = {}
248 for o in p['operands']:
249 ot = self.convertLifePredicate(o)
250 if 'Tags' in n and 'Tags' in ot:
251 n['Tags'].extend(ot['Tags'])
252 else:
253 n.update(ot)
254 return {'And': n}
256 raise ValueError("unknown predicate: %s" % p)
258 NotifyTypeMap = {
259 'QueueConfiguration': 'QueueConfigurations',
260 'LambdaConfiguration': 'LambdaFunctionConfigurations',
261 'CloudFunctionConfiguration': 'LambdaFunctionConfigurations',
262 'TopicConfiguration': 'TopicConfigurations'}
264 def handle_BucketNotificationConfiguration(self, resource, item_value):
265 d = {}
266 for nid, n in item_value['configurations'].items():
267 ninfo = {}
268 d.setdefault(self.NotifyTypeMap[n['type']], []).append(ninfo)
269 if n['type'] == 'QueueConfiguration':
270 ninfo['QueueArn'] = n['queueARN']
271 elif n['type'] == 'TopicConfiguration':
272 ninfo['TopicArn'] = n['topicARN']
273 elif n['type'] == 'LambdaConfiguration':
274 ninfo['LambdaFunctionArn'] = n['functionARN']
275 ninfo['Id'] = nid
276 ninfo['Events'] = n['events']
277 rules = []
278 if n['filter']:
279 for r in n['filter'].get('s3KeyFilter', {}).get('filterRules', []):
280 rules.append({'Name': r['name'], 'Value': r['value']})
281 if rules:
282 ninfo['Filter'] = {'Key': {'FilterRules': rules}}
283 resource['Notification'] = d
285 def handle_BucketReplicationConfiguration(self, resource, item_value):
286 d = {'Role': item_value['roleARN'], 'Rules': []}
287 for rid, r in item_value['rules'].items():
288 rule = {
289 'ID': rid,
290 'Status': r.get('status', ''),
291 'Prefix': r.get('prefix', ''),
292 'Destination': {
293 'Bucket': r['destinationConfig']['bucketARN']}
294 }
295 if 'Account' in r['destinationConfig']:
296 rule['Destination']['Account'] = r['destinationConfig']['Account']
297 if r['destinationConfig'].get('storageClass'):
298 rule['Destination']['StorageClass'] = r['destinationConfig']['storageClass']
299 d['Rules'].append(rule)
300 resource['Replication'] = {'ReplicationConfiguration': d}
302 def handle_BucketPolicy(self, resource, item_value):
303 resource['Policy'] = item_value.get('policyText')
305 def handle_BucketTaggingConfiguration(self, resource, item_value):
306 resource['Tags'] = [
307 {"Key": k, "Value": v} for k, v in item_value['tagSets'][0]['tags'].items()]
309 def handle_BucketVersioningConfiguration(self, resource, item_value):
310 # Config defaults versioning to 'Off' for a null value
311 if item_value['status'] not in ('Enabled', 'Suspended'):
312 resource['Versioning'] = {}
313 return
314 resource['Versioning'] = {'Status': item_value['status']}
315 # `isMfaDeleteEnabled` is an optional boolean property - the key may be absent,
316 # present with a null value, or present with a boolean value.
317 # Mirror the describe source by populating Versioning.MFADelete only in the
318 # boolean case.
319 mfa_delete = item_value.get('isMfaDeleteEnabled')
320 if mfa_delete is None:
321 return
322 resource['Versioning']['MFADelete'] = (
323 'Enabled' if mfa_delete else 'Disabled'
324 )
326 def handle_BucketWebsiteConfiguration(self, resource, item_value):
327 website = {}
328 if item_value['indexDocumentSuffix']:
329 website['IndexDocument'] = {
330 'Suffix': item_value['indexDocumentSuffix']}
331 if item_value['errorDocument']:
332 website['ErrorDocument'] = {
333 'Key': item_value['errorDocument']}
334 if item_value['redirectAllRequestsTo']:
335 website['RedirectAllRequestsTo'] = {
336 'HostName': item_value['redirectAllRequestsTo']['hostName'],
337 'Protocol': item_value['redirectAllRequestsTo']['protocol']}
338 for r in item_value['routingRules']:
339 redirect = {}
340 rule = {'Redirect': redirect}
341 website.setdefault('RoutingRules', []).append(rule)
342 if 'condition' in r:
343 cond = {}
344 for ck, rk in (
345 ('keyPrefixEquals', 'KeyPrefixEquals'),
346 ('httpErrorCodeReturnedEquals',
347 'HttpErrorCodeReturnedEquals')):
348 if r['condition'][ck]:
349 cond[rk] = r['condition'][ck]
350 rule['Condition'] = cond
351 for ck, rk in (
352 ('protocol', 'Protocol'),
353 ('hostName', 'HostName'),
354 ('replaceKeyPrefixWith', 'ReplaceKeyPrefixWith'),
355 ('replaceKeyWith', 'ReplaceKeyWith'),
356 ('httpRedirectCode', 'HttpRedirectCode')):
357 if r['redirect'][ck]:
358 redirect[rk] = r['redirect'][ck]
359 resource['Website'] = website
362@resources.register('s3')
363class S3(query.QueryResourceManager):
364 """Amazon's Simple Storage Service Buckets.
367 By default and due to historical compatiblity cloud custodian will
368 fetch a number of subdocuments (Acl, Policy, Tagging, Versioning,
369 Website, Notification, Lifecycle, and Replication) for each bucket
370 to allow policies authors's to target common bucket
371 configurations.
373 This behavior can be customized to avoid extraneous api calls if a
374 particular sub document is not needed for a policy, by setting the
375 `augment-keys` parameter in a query block of the policy.
377 ie if we only care about bucket website and replication
378 configuration, we can minimize the api calls needed to fetch a
379 bucket by setting up augment-keys as follows.
381 :example:
383 .. code-block:: yaml
385 policies:
386 - name: check-website-replication
387 resource: s3
388 query:
389 - augment-keys: ['Website', 'Replication']
390 filters:
391 - Website.ErrorDocument: not-null
392 - Replication.ReplicationConfiguration.Rules: not-null
394 It also supports an automatic detection mode where the use of a subdocument
395 in a filter is automically with the augment-keys value of 'detect'.
397 :example:
399 .. code-block:: yaml
401 policies:
402 - name: check-website-replication
403 resource: s3
404 query:
405 - augment-keys: 'detect'
406 filters:
407 - Website.ErrorDocument: not-null
408 - Replication.ReplicationConfiguration.Rules: not-null
410 The default value for augment-keys is `all` to preserve historical
411 compatiblity. `augment-keys` also supports the value of 'none' to
412 disable all subdocument fetching except Location and Tags.
414 Note certain actions may implicitly depend on the corresponding
415 subdocument being present.
417 """
419 class resource_type(query.TypeInfo):
420 service = 's3'
421 arn_type = ''
422 enum_spec = ('list_buckets', 'Buckets[]', None)
423 # not used but we want some consistency on the metadata
424 detail_spec = ('get_bucket_location', 'Bucket', 'Name', 'LocationConstraint')
425 permissions_augment = (
426 "s3:GetBucketAcl",
427 "s3:GetBucketLocation",
428 "s3:GetBucketPolicy",
429 "s3:GetBucketTagging",
430 "s3:GetBucketVersioning",
431 "s3:GetBucketLogging",
432 "s3:GetBucketNotification",
433 "s3:GetBucketWebsite",
434 "s3:GetLifecycleConfiguration",
435 "s3:GetReplicationConfiguration"
436 )
437 name = id = 'Name'
438 date = 'CreationDate'
439 dimension = 'BucketName'
440 cfn_type = config_type = 'AWS::S3::Bucket'
442 filter_registry = filters
443 action_registry = actions
444 source_mapping = {
445 'describe': DescribeS3,
446 'config': ConfigS3
447 }
449 def validate(self):
450 super().validate()
451 BucketAssembly(self).validate()
453 def get_arns(self, resources):
454 return ["arn:aws:s3:::{}".format(r["Name"]) for r in resources]
456 @classmethod
457 def get_permissions(cls):
458 perms = ["s3:ListAllMyBuckets"]
459 perms.extend([n[-1] for n in S3_AUGMENT_TABLE])
460 return perms
463S3_CONFIG_SUPPLEMENT_NULL_MAP = {
464 'BucketLoggingConfiguration': u'{"destinationBucketName":null,"logFilePrefix":null}',
465 'BucketPolicy': u'{"policyText":null}',
466 'BucketVersioningConfiguration': u'{"status":"Off","isMfaDeleteEnabled":null}',
467 'BucketAccelerateConfiguration': u'{"status":null}',
468 'BucketNotificationConfiguration': u'{"configurations":{}}',
469 'BucketLifecycleConfiguration': None,
470 'AccessControlList': None,
471 'BucketTaggingConfiguration': None,
472 'BucketWebsiteConfiguration': None,
473 'BucketReplicationConfiguration': None
474}
476S3_AUGMENT_TABLE = (
477 ('get_bucket_location', 'Location', {}, None, 's3:GetBucketLocation'),
478 ('get_bucket_tagging', 'Tags', [], 'TagSet', 's3:GetBucketTagging'),
479 ('get_bucket_policy', 'Policy', None, 'Policy', 's3:GetBucketPolicy'),
480 ('get_bucket_acl', 'Acl', None, None, 's3:GetBucketAcl'),
481 ('get_bucket_replication',
482 'Replication', None, None, 's3:GetReplicationConfiguration'),
483 ('get_bucket_versioning', 'Versioning', None, None, 's3:GetBucketVersioning'),
484 ('get_bucket_website', 'Website', None, None, 's3:GetBucketWebsite'),
485 ('get_bucket_logging', 'Logging', None, 'LoggingEnabled', 's3:GetBucketLogging'),
486 ('get_bucket_notification_configuration',
487 'Notification', None, None, 's3:GetBucketNotification'),
488 ('get_bucket_lifecycle_configuration',
489 'Lifecycle', None, None, 's3:GetLifecycleConfiguration'),
490 # ('get_bucket_cors', 'Cors'),
491)
494class BucketAssembly:
496 def __init__(self, manager):
497 self.manager = manager
498 self.default_region = None
499 self.region_clients = {}
500 self.session = None
501 self.session_lock = None
502 self.augment_fields = []
504 def initialize(self):
505 # construct a default boto3 client, using the current session region.
506 self.session = local_session(self.manager.session_factory)
507 self.session_lock = threading.RLock()
508 self.default_region = self.manager.config.region
509 self.region_clients[self.default_region] = self.session.client('s3')
510 self.augment_fields = set(self.detect_augment_fields())
511 # location is required for client construction
512 self.augment_fields.add('Location')
513 # custodian always returns tags
514 self.augment_fields.add('Tags')
516 def validate(self):
517 config = self.get_augment_config()
518 if isinstance(config, str) and config not in ('all', 'detect', 'none'):
519 raise PolicyValidationError(
520 "augment-keys supports 'all', 'detect', 'none' or list of keys found: %s" % config)
521 elif isinstance(config, list):
522 delta = set(config).difference([row[1] for row in S3_AUGMENT_TABLE])
523 if delta:
524 raise PolicyValidationError("augment-keys - found invalid keys: %s" % (list(delta)))
525 if not isinstance(config, (list, str)):
526 raise PolicyValidationError(
527 "augment-keys supports 'all', 'detect', 'none' or list of keys found: %s" % config)
529 def get_augment_config(self):
530 augment_config = None
531 for option in self.manager.data.get('query', []):
532 if option and option.get('augment-keys') is not None:
533 augment_config = option['augment-keys']
534 if augment_config is None:
535 augment_config = 'all'
536 return augment_config
538 def detect_augment_fields(self):
539 # try to detect augment fields required for the policy execution
540 # we want to avoid extraneous api calls unless they are being used by the policy.
542 detected_keys = []
543 augment_keys = [row[1] for row in S3_AUGMENT_TABLE]
544 augment_config = self.get_augment_config()
546 if augment_config == 'all':
547 return augment_keys
548 elif augment_config == 'none':
549 return []
550 elif isinstance(augment_config, list):
551 return augment_config
553 for f in self.manager.iter_filters():
554 fkey = None
555 if not isinstance(f, ValueFilter):
556 continue
558 f = f.data
559 # type: value
560 if f.get('type', '') == 'value':
561 fkey = f.get('key')
562 # k: v dict
563 elif len(f) == 1:
564 fkey = list(f.keys())[0]
565 if fkey is None: # pragma: no cover
566 continue
568 # remove any jmespath expressions
569 fkey = fkey.split('.', 1)[0]
571 # tags have explicit handling in value filters.
572 if fkey.startswith('tag:'):
573 fkey = 'Tags'
575 # denied methods checks get all keys
576 if fkey.startswith('c7n:DeniedMethods'):
577 return augment_keys
579 if fkey in augment_keys:
580 detected_keys.append(fkey)
582 return detected_keys
584 def get_client(self, region):
585 if region in self.region_clients:
586 return self.region_clients[region]
587 with self.session_lock:
588 self.region_clients[region] = self.session.client('s3', region_name=region)
589 return self.region_clients[region]
591 def assemble(self, bucket):
593 client = self.get_client(self.default_region)
594 augments = list(S3_AUGMENT_TABLE)
596 for info in augments:
597 # we use the offset, as tests manipulate the augments table
598 method_name, key, default, select = info[:4]
599 if key not in self.augment_fields:
600 continue
602 method = getattr(client, method_name)
604 try:
605 response = method(Bucket=bucket['Name'])
606 # This is here as exception handling will change to defaults if not present
607 response.pop('ResponseMetadata', None)
608 value = response
609 if select and select in value:
610 value = value[select]
611 except (ssl.SSLError, SSLError) as e:
612 # Proxy issue most likely
613 log.warning(
614 "Bucket ssl error %s: %s %s",
615 bucket['Name'], bucket.get('Location', 'unknown'), e)
616 continue
617 except ClientError as e:
618 code = e.response['Error']['Code']
619 if code.startswith("NoSuch") or "NotFound" in code:
620 value = default
621 elif code == 'PermanentRedirect': # pragma: no cover
622 # (09/2025)- its not clear how we get here given a client region switch post
623 # location detection.
624 #
625 # change client region
626 client = self.get_client(get_region(bucket))
627 # requeue now that we have correct region
628 augments.append((method_name, key, default, select))
629 continue
630 else:
631 # for auth errors record as attribute and move on
632 if e.response['Error']['Code'] == 'AccessDenied':
633 bucket.setdefault('c7n:DeniedMethods', []).append(method_name)
634 continue
635 # else log and raise
636 log.warning(
637 "Bucket:%s unable to invoke method:%s error:%s ",
638 bucket['Name'], method_name, e.response['Error']['Message'])
639 raise
641 # for historical reasons we normalize EU to eu-west-1 on the bucket
642 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlocation.html
643 if key == 'Location' and value and value.get('LocationConstraint', '') == 'EU':
644 value['LocationConstraint'] = 'eu-west-1'
646 bucket[key] = value
648 # For all subsequent attributes after location, use a client that is targeted to
649 # the bucket's regional s3 endpoint.
650 if key == 'Location' and get_region(bucket) != client.meta.region_name:
651 client = self.get_client(get_region(bucket))
652 return bucket
655def bucket_client(session, b, kms=False):
656 region = get_region(b)
658 if kms:
659 # Need v4 signature for aws:kms crypto, else let the sdk decide
660 # based on region support.
661 config = Config(
662 signature_version='s3v4',
663 read_timeout=200, connect_timeout=120)
664 else:
665 config = Config(read_timeout=200, connect_timeout=120)
666 return session.client('s3', region_name=region, config=config)
669def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()):
670 for bucket in buckets:
671 client = bucket_client(local_session(session_factory), bucket)
672 # Bucket tags are set atomically for the set/document, we want
673 # to refetch against current to guard against any staleness in
674 # our cached representation across multiple policies or concurrent
675 # modifications.
677 if 'get_bucket_tagging' in bucket.get('c7n:DeniedMethods', []):
678 # avoid the additional API call if we already know that it's going
679 # to result in AccessDenied. The chances that the resource's perms
680 # would have changed between fetching the resource and acting on it
681 # here are pretty low-- so the check here should suffice.
682 log.warning(
683 "Unable to get new set of bucket tags needed to modify tags,"
684 "skipping tag action for bucket: %s" % bucket["Name"])
685 continue
687 try:
688 bucket['Tags'] = client.get_bucket_tagging(
689 Bucket=bucket['Name']).get('TagSet', [])
690 except ClientError as e:
691 if e.response['Error']['Code'] != 'NoSuchTagSet':
692 raise
693 bucket['Tags'] = []
695 new_tags = {t['Key']: t['Value'] for t in add_tags}
696 for t in bucket.get('Tags', ()):
697 if (t['Key'] not in new_tags and t['Key'] not in remove_tags):
698 new_tags[t['Key']] = t['Value']
699 tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()]
701 try:
702 client.put_bucket_tagging(
703 Bucket=bucket['Name'], Tagging={'TagSet': tag_set})
704 except ClientError as e:
705 log.exception(
706 'Exception tagging bucket %s: %s', bucket['Name'], e)
707 continue
710def get_region(b):
711 """Tries to get the bucket region from Location.LocationConstraint
713 Special cases:
714 LocationConstraint EU defaults to eu-west-1
715 LocationConstraint null defaults to us-east-1
717 Args:
718 b (object): A bucket object
720 Returns:
721 string: an aws region string
722 """
723 remap = {None: 'us-east-1', 'EU': 'eu-west-1'}
724 region = b.get('Location', {}).get('LocationConstraint')
725 return remap.get(region, region)
728@filters.register('metrics')
729class S3Metrics(MetricsFilter):
730 """S3 CW Metrics need special handling for attribute/dimension
731 mismatch, and additional required dimension.
732 """
734 def get_dimensions(self, resource):
735 dims = [{'Name': 'BucketName', 'Value': resource['Name']}]
736 if (self.data['name'] == 'NumberOfObjects' and
737 'dimensions' not in self.data):
738 dims.append(
739 {'Name': 'StorageType', 'Value': 'AllStorageTypes'})
740 return dims
743@filters.register('cross-account')
744class S3CrossAccountFilter(CrossAccountAccessFilter):
745 """Filters cross-account access to S3 buckets
747 :example:
749 .. code-block:: yaml
751 policies:
752 - name: s3-acl
753 resource: s3
754 region: us-east-1
755 filters:
756 - type: cross-account
757 """
758 permissions = ('s3:GetBucketPolicy',)
760 def get_accounts(self):
761 """add in elb access by default
763 ELB Accounts by region
764 https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-access-logs.html
766 Redshift Accounts by region
767 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#db-auditing-manage-log-files
768 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#rs-db-auditing-cloud-trail-rs-acct-ids
770 Cloudtrail Accounts by region
771 https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-supported-regions.html
772 """
773 accounts = super(S3CrossAccountFilter, self).get_accounts()
774 return accounts.union(
775 [
776 # ELB accounts
777 '127311923021', # us-east-1
778 '033677994240', # us-east-2
779 '027434742980', # us-west-1
780 '797873946194', # us-west-2
781 '098369216593', # af-south-1
782 '985666609251', # ca-central-1
783 '054676820928', # eu-central-1
784 '897822967062', # eu-north-1
785 '635631232127', # eu-south-1
786 '156460612806', # eu-west-1
787 '652711504416', # eu-west-2
788 '009996457667', # eu-west-3
789 '754344448648', # ap-east-1
790 '582318560864', # ap-northeast-1
791 '600734575887', # ap-northeast-2
792 '383597477331', # ap-northeast-3
793 '114774131450', # ap-southeast-1
794 '783225319266', # ap-southeast-2
795 '718504428378', # ap-south-1
796 '076674570225', # me-south-1
797 '507241528517', # sa-east-1
798 '048591011584', # us-gov-west-1 or gov-cloud-1
799 '190560391635', # us-gov-east-1
800 '638102146993', # cn-north-1
801 '037604701340', # cn-northwest-1
803 # Redshift audit logging
804 '193672423079', # us-east-1
805 '391106570357', # us-east-2
806 '262260360010', # us-west-1
807 '902366379725', # us-west-2
808 '365689465814', # af-south-1
809 '313564881002', # ap-east-1
810 '865932855811', # ap-south-1
811 '090321488786', # ap-northeast-3
812 '760740231472', # ap-northeast-2
813 '361669875840', # ap-southeast-1
814 '762762565011', # ap-southeast-2
815 '404641285394', # ap-northeast-1
816 '907379612154', # ca-central-1
817 '053454850223', # eu-central-1
818 '210876761215', # eu-west-1
819 '307160386991', # eu-west-2
820 '945612479654', # eu-south-1
821 '915173422425', # eu-west-3
822 '729911121831', # eu-north-1
823 '013126148197', # me-south-1
824 '075028567923', # sa-east-1
826 # Cloudtrail accounts (psa. folks should be using
827 # cloudtrail service in bucket policies)
828 '086441151436', # us-east-1
829 '475085895292', # us-west-2
830 '388731089494', # us-west-1
831 '113285607260', # us-west-2
832 '819402241893', # ca-central-1
833 '977081816279', # ap-south-1
834 '492519147666', # ap-northeast-2
835 '903692715234', # ap-southeast-1
836 '284668455005', # ap-southeast-2
837 '216624486486', # ap-northeast-1
838 '035351147821', # eu-central-1
839 '859597730677', # eu-west-1
840 '282025262664', # eu-west-2
841 '814480443879', # sa-east-1
842 ])
845@filters.register('global-grants')
846class GlobalGrantsFilter(Filter):
847 """Filters for all S3 buckets that have global-grants
849 *Note* by default this filter allows for read access
850 if the bucket has been configured as a website. This
851 can be disabled per the example below.
853 :example:
855 .. code-block:: yaml
857 policies:
858 - name: remove-global-grants
859 resource: s3
860 filters:
861 - type: global-grants
862 allow_website: false
863 actions:
864 - delete-global-grants
866 """
868 schema = type_schema(
869 'global-grants',
870 allow_website={'type': 'boolean'},
871 operator={'type': 'string', 'enum': ['or', 'and']},
872 permissions={
873 'type': 'array', 'items': {
874 'type': 'string', 'enum': [
875 'READ', 'WRITE', 'WRITE_ACP', 'READ_ACP', 'FULL_CONTROL']}})
877 GLOBAL_ALL = "http://acs.amazonaws.com/groups/global/AllUsers"
878 AUTH_ALL = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers"
880 def process(self, buckets, event=None):
881 with self.executor_factory(max_workers=5) as w:
882 results = w.map(self.process_bucket, buckets)
883 results = list(filter(None, list(results)))
884 return results
886 def process_bucket(self, b):
887 acl = b.get('Acl', {'Grants': []})
888 if not acl or not acl['Grants']:
889 return
891 results = []
892 allow_website = self.data.get('allow_website', True)
893 perms = self.data.get('permissions', [])
895 for grant in acl['Grants']:
896 if 'URI' not in grant.get("Grantee", {}):
897 continue
898 if grant['Grantee']['URI'] not in [self.AUTH_ALL, self.GLOBAL_ALL]:
899 continue
900 if allow_website and grant['Permission'] == 'READ' and b['Website']:
901 continue
902 if not perms or (perms and grant['Permission'] in perms):
903 results.append(grant['Permission'])
905 if results:
906 set_annotation(b, 'GlobalPermissions', results)
907 return b
910class BucketActionBase(BaseAction):
912 def get_permissions(self):
913 return self.permissions
915 def get_std_format_args(self, bucket):
916 return {
917 'account_id': self.manager.config.account_id,
918 'region': self.manager.config.region,
919 'bucket_name': bucket['Name'],
920 'bucket_region': get_region(bucket)
921 }
923 def process(self, buckets):
924 return self._process_with_futures(buckets)
926 def _process_with_futures(self, buckets, *args, max_workers=3, **kwargs):
927 errors = 0
928 results = []
929 with self.executor_factory(max_workers=max_workers) as w:
930 futures = {}
931 for b in buckets:
932 futures[w.submit(self.process_bucket, b, *args, **kwargs)] = b
933 for f in as_completed(futures):
934 if f.exception():
935 b = futures[f]
936 self.log.error(
937 'error modifying bucket: policy:%s action:%s bucket:%s error:%s',
938 self.manager.data.get('name'), self.name, b['Name'], f.exception()
939 )
940 errors += 1
941 continue
942 results += filter(None, [f.result()])
943 if errors:
944 self.log.error('encountered %d errors while processing %s', errors, self.name)
945 raise PolicyExecutionError('%d resources failed', errors)
946 return results
949class BucketFilterBase(Filter):
950 def get_std_format_args(self, bucket):
951 return {
952 'account_id': self.manager.config.account_id,
953 'region': self.manager.config.region,
954 'bucket_name': bucket['Name'],
955 'bucket_region': get_region(bucket)
956 }
959@S3.action_registry.register("post-finding")
960class BucketFinding(PostFinding):
962 resource_type = 'AwsS3Bucket'
964 def format_resource(self, r):
965 owner = r.get("Acl", {}).get("Owner", {})
966 resource = {
967 "Type": self.resource_type,
968 "Id": "arn:aws:s3:::{}".format(r["Name"]),
969 "Region": get_region(r),
970 "Tags": {t["Key"]: t["Value"] for t in r.get("Tags", [])},
971 "Details": {self.resource_type: {
972 "OwnerId": owner.get('ID', 'Unknown')}}
973 }
975 if "DisplayName" in owner:
976 resource["Details"]["AwsS3Bucket"]["OwnerName"] = owner['DisplayName']
978 return filter_empty(resource)
981@S3.filter_registry.register('has-statement')
982class S3HasStatementFilter(HasStatementFilter):
983 def get_std_format_args(self, bucket):
984 return {
985 'account_id': self.manager.config.account_id,
986 'region': self.manager.config.region,
987 'bucket_name': bucket['Name'],
988 'bucket_region': get_region(bucket)
989 }
992@S3.filter_registry.register('lock-configuration')
993class S3LockConfigurationFilter(ValueFilter):
994 """
995 Filter S3 buckets based on their object lock configurations
997 :example:
999 Get all buckets where lock configuration mode is COMPLIANCE
1001 .. code-block:: yaml
1003 policies:
1004 - name: lock-configuration-compliance
1005 resource: aws.s3
1006 filters:
1007 - type: lock-configuration
1008 key: Rule.DefaultRetention.Mode
1009 value: COMPLIANCE
1011 """
1012 schema = type_schema('lock-configuration', rinherit=ValueFilter.schema)
1013 permissions = ('s3:GetBucketObjectLockConfiguration',)
1014 annotate = True
1015 annotation_key = 'c7n:ObjectLockConfiguration'
1017 def _process_resource(self, client, resource):
1018 try:
1019 config = client.get_object_lock_configuration(
1020 Bucket=resource['Name']
1021 )['ObjectLockConfiguration']
1022 except ClientError as e:
1023 if e.response['Error']['Code'] == 'ObjectLockConfigurationNotFoundError':
1024 config = None
1025 else:
1026 raise
1027 resource[self.annotation_key] = config
1029 def process(self, resources, event=None):
1030 client = local_session(self.manager.session_factory).client('s3')
1031 with self.executor_factory(max_workers=3) as w:
1032 futures = []
1033 for res in resources:
1034 if self.annotation_key in res:
1035 continue
1036 futures.append(w.submit(self._process_resource, client, res))
1037 for f in as_completed(futures):
1038 exc = f.exception()
1039 if exc:
1040 self.log.error(
1041 "Exception getting bucket lock configuration \n %s" % (
1042 exc))
1043 return super().process(resources, event)
1045 def __call__(self, r):
1046 return super().__call__(r.setdefault(self.annotation_key, None))
1049ENCRYPTION_STATEMENT_GLOB = {
1050 'Effect': 'Deny',
1051 'Principal': '*',
1052 'Action': 's3:PutObject',
1053 "Condition": {
1054 "StringNotEquals": {
1055 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}}
1058@filters.register('no-encryption-statement')
1059class EncryptionEnabledFilter(Filter):
1060 """Find buckets with missing encryption policy statements.
1062 :example:
1064 .. code-block:: yaml
1066 policies:
1067 - name: s3-bucket-not-encrypted
1068 resource: s3
1069 filters:
1070 - type: no-encryption-statement
1071 """
1072 schema = type_schema(
1073 'no-encryption-statement')
1075 def get_permissions(self):
1076 perms = self.manager.get_resource_manager('s3').get_permissions()
1077 return perms
1079 def process(self, buckets, event=None):
1080 return list(filter(None, map(self.process_bucket, buckets)))
1082 def process_bucket(self, b):
1083 p = b.get('Policy')
1084 if p is None:
1085 return b
1086 p = json.loads(p)
1087 encryption_statement = dict(ENCRYPTION_STATEMENT_GLOB)
1089 statements = p.get('Statement', [])
1090 check = False
1091 for s in list(statements):
1092 if 'Sid' in s:
1093 encryption_statement["Sid"] = s["Sid"]
1094 if 'Resource' in s:
1095 encryption_statement["Resource"] = s["Resource"]
1096 if s == encryption_statement:
1097 check = True
1098 break
1099 if check:
1100 return None
1101 else:
1102 return b
1105@filters.register('missing-statement')
1106@filters.register('missing-policy-statement')
1107class MissingPolicyStatementFilter(Filter):
1108 """Find buckets missing a set of named policy statements.
1110 :example:
1112 .. code-block:: yaml
1114 policies:
1115 - name: s3-bucket-missing-statement
1116 resource: s3
1117 filters:
1118 - type: missing-statement
1119 statement_ids:
1120 - RequiredEncryptedPutObject
1121 """
1123 schema = type_schema(
1124 'missing-policy-statement',
1125 aliases=('missing-statement',),
1126 statement_ids={'type': 'array', 'items': {'type': 'string'}})
1128 def __call__(self, b):
1129 p = b.get('Policy')
1130 if p is None:
1131 return b
1133 p = json.loads(p)
1135 required = list(self.data.get('statement_ids', []))
1136 statements = p.get('Statement', [])
1137 for s in list(statements):
1138 if s.get('Sid') in required:
1139 required.remove(s['Sid'])
1140 if not required:
1141 return False
1142 return True
1145@filters.register('bucket-notification')
1146class BucketNotificationFilter(ValueFilter):
1147 """Filter based on bucket notification configuration.
1149 :example:
1151 .. code-block:: yaml
1153 policies:
1154 - name: delete-incorrect-notification
1155 resource: s3
1156 filters:
1157 - type: bucket-notification
1158 kind: lambda
1159 key: Id
1160 value: "IncorrectLambda"
1161 op: eq
1162 actions:
1163 - type: delete-bucket-notification
1164 statement_ids: matched
1165 """
1167 schema = type_schema(
1168 'bucket-notification',
1169 required=['kind'],
1170 kind={'type': 'string', 'enum': ['lambda', 'sns', 'sqs']},
1171 rinherit=ValueFilter.schema)
1172 schema_alias = False
1173 annotation_key = 'c7n:MatchedNotificationConfigurationIds'
1175 permissions = ('s3:GetBucketNotification',)
1177 FIELDS = {
1178 'lambda': 'LambdaFunctionConfigurations',
1179 'sns': 'TopicConfigurations',
1180 'sqs': 'QueueConfigurations'
1181 }
1183 def process(self, buckets, event=None):
1184 return super(BucketNotificationFilter, self).process(buckets, event)
1186 def __call__(self, bucket):
1188 field = self.FIELDS[self.data['kind']]
1189 found = False
1190 for config in bucket.get('Notification', {}).get(field, []):
1191 if self.match(config):
1192 set_annotation(
1193 bucket,
1194 BucketNotificationFilter.annotation_key,
1195 config['Id'])
1196 found = True
1197 return found
1200@filters.register('bucket-logging')
1201class BucketLoggingFilter(BucketFilterBase):
1202 """Filter based on bucket logging configuration.
1204 :example:
1206 .. code-block:: yaml
1208 policies:
1209 - name: add-bucket-logging-if-missing
1210 resource: s3
1211 filters:
1212 - type: bucket-logging
1213 op: disabled
1214 actions:
1215 - type: toggle-logging
1216 target_bucket: "{account_id}-{region}-s3-logs"
1217 target_prefix: "{source_bucket_name}/"
1219 policies:
1220 - name: update-incorrect-or-missing-logging
1221 resource: s3
1222 filters:
1223 - type: bucket-logging
1224 op: not-equal
1225 target_bucket: "{account_id}-{region}-s3-logs"
1226 target_prefix: "{account}/{source_bucket_name}/"
1227 actions:
1228 - type: toggle-logging
1229 target_bucket: "{account_id}-{region}-s3-logs"
1230 target_prefix: "{account}/{source_bucket_name}/"
1231 """
1233 schema = type_schema(
1234 'bucket-logging',
1235 op={'enum': ['enabled', 'disabled', 'equal', 'not-equal', 'eq', 'ne']},
1236 required=['op'],
1237 target_bucket={'type': 'string'},
1238 target_prefix={'type': 'string'})
1239 schema_alias = False
1240 account_name = None
1242 permissions = ("s3:GetBucketLogging", "iam:ListAccountAliases")
1244 def process(self, buckets, event=None):
1245 return list(filter(None, map(self.process_bucket, buckets)))
1247 def process_bucket(self, b):
1248 if self.match_bucket(b):
1249 return b
1251 def match_bucket(self, b):
1252 op = self.data.get('op')
1254 logging = b.get('Logging', {})
1255 if op == 'disabled':
1256 return logging == {}
1257 elif op == 'enabled':
1258 return logging != {}
1260 if self.account_name is None:
1261 session = local_session(self.manager.session_factory)
1262 self.account_name = get_account_alias_from_sts(session)
1264 variables = self.get_std_format_args(b)
1265 variables.update({
1266 'account': self.account_name,
1267 'source_bucket_name': b['Name'],
1268 'source_bucket_region': get_region(b),
1269 'target_bucket_name': self.data.get('target_bucket'),
1270 'target_prefix': self.data.get('target_prefix'),
1271 })
1272 data = format_string_values(self.data, **variables)
1273 target_bucket = data.get('target_bucket')
1274 target_prefix = data.get('target_prefix', b['Name'] + '/')
1276 target_config = {
1277 "TargetBucket": target_bucket,
1278 "TargetPrefix": target_prefix
1279 } if target_bucket else {}
1281 if op in ('not-equal', 'ne'):
1282 return logging != target_config
1283 else:
1284 return logging == target_config
1287@actions.register('delete-bucket-notification')
1288class DeleteBucketNotification(BucketActionBase):
1289 """Action to delete S3 bucket notification configurations"""
1291 schema = type_schema(
1292 'delete-bucket-notification',
1293 required=['statement_ids'],
1294 statement_ids={'oneOf': [
1295 {'enum': ['matched']},
1296 {'type': 'array', 'items': {'type': 'string'}}]})
1298 permissions = ('s3:PutBucketNotification',)
1300 def process_bucket(self, bucket):
1301 n = bucket['Notification']
1302 if not n:
1303 return
1305 statement_ids = self.data.get('statement_ids')
1306 if statement_ids == 'matched':
1307 statement_ids = bucket.get(BucketNotificationFilter.annotation_key, ())
1308 if not statement_ids:
1309 return
1311 cfg = defaultdict(list)
1313 for t in BucketNotificationFilter.FIELDS.values():
1314 for c in n.get(t, []):
1315 if c['Id'] not in statement_ids:
1316 cfg[t].append(c)
1318 client = bucket_client(local_session(self.manager.session_factory), bucket)
1319 client.put_bucket_notification_configuration(
1320 Bucket=bucket['Name'],
1321 NotificationConfiguration=cfg)
1324@actions.register('no-op')
1325class NoOp(BucketActionBase):
1327 schema = type_schema('no-op')
1328 permissions = ('s3:ListAllMyBuckets',)
1330 def process(self, buckets):
1331 return None
1334@actions.register('set-statements')
1335class SetPolicyStatement(BucketActionBase):
1336 """Action to add or update policy statements to S3 buckets
1338 :example:
1340 Remove all existing policies, and block insecure HTTP requests.
1342 .. code-block:: yaml
1344 policies:
1345 - name: force-s3-https
1346 resource: s3
1347 actions:
1348 - type: set-statements
1349 remove: "*"
1350 statements:
1351 - Sid: "DenyHttp"
1352 Effect: "Deny"
1353 Action: "s3:GetObject"
1354 Principal:
1355 AWS: "*"
1356 Resource: "arn:aws:s3:::{bucket_name}/*"
1357 Condition:
1358 Bool:
1359 "aws:SecureTransport": false
1361 :example:
1363 Remove existing statements that grant cross-account access, and
1364 block insecure HTTP requests.
1366 .. code-block:: yaml
1368 policies:
1369 - name: s3-tighten-bucket-policy
1370 resource: aws.s3
1371 filters:
1372 - type: cross-account
1373 actions:
1374 - type: set-statements
1375 remove: "matched"
1376 statements:
1377 - Sid: "DenyHttp"
1378 Effect: "Deny"
1379 Action: "s3:GetObject"
1380 Principal:
1381 AWS: "*"
1382 Resource: "arn:aws:s3:::{bucket_name}/*"
1383 Condition:
1384 Bool:
1385 "aws:SecureTransport": false
1387 """
1389 permissions = ('s3:PutBucketPolicy', 's3:DeleteBucketPolicy')
1391 schema = type_schema(
1392 'set-statements',
1393 **{
1394 'remove': {'oneOf': [
1395 {'enum': ['matched', "*"]},
1396 {'type': 'array', 'items': {'type': 'string'}}]},
1397 'statements': {
1398 'type': 'array',
1399 'items': {
1400 'type': 'object',
1401 'properties': {
1402 'Sid': {'type': 'string'},
1403 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']},
1404 'Principal': {'anyOf': [{'type': 'string'},
1405 {'type': 'object'}, {'type': 'array'}]},
1406 'NotPrincipal': {'anyOf': [{'type': 'object'}, {'type': 'array'}]},
1407 'Action': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1408 'NotAction': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1409 'Resource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1410 'NotResource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1411 'Condition': {'type': 'object'}
1412 },
1413 'required': ['Sid', 'Effect'],
1414 'oneOf': [
1415 {'required': ['Principal', 'Action', 'Resource']},
1416 {'required': ['NotPrincipal', 'Action', 'Resource']},
1417 {'required': ['Principal', 'NotAction', 'Resource']},
1418 {'required': ['NotPrincipal', 'NotAction', 'Resource']},
1419 {'required': ['Principal', 'Action', 'NotResource']},
1420 {'required': ['NotPrincipal', 'Action', 'NotResource']},
1421 {'required': ['Principal', 'NotAction', 'NotResource']},
1422 {'required': ['NotPrincipal', 'NotAction', 'NotResource']}
1423 ]
1424 }
1425 }
1426 }
1427 )
1429 def process_bucket_remove(self, policy, bucket):
1430 statements = policy.get('Statement', [])
1431 resource_statements = bucket.get(CrossAccountAccessFilter.annotation_key, ())
1433 statements, found = remove_statements(
1434 self.data.get('remove', []), statements, resource_statements)
1436 return statements, found
1438 def process_bucket_add(self, policy, bucket):
1439 target_statements = format_string_values(
1440 copy.deepcopy({s['Sid']: s for s in self.data.get('statements', [])}),
1441 **self.get_std_format_args(bucket))
1443 bucket_statements = policy.setdefault('Statement', [])
1445 for s in bucket_statements:
1446 if s.get('Sid') not in target_statements:
1447 continue
1448 if s == target_statements[s['Sid']]:
1449 target_statements.pop(s['Sid'])
1451 if not target_statements:
1452 return False
1454 bucket_statements.extend(target_statements.values())
1455 return True
1457 def process_bucket(self, bucket):
1458 policy = bucket.get('Policy') or '{}'
1459 policy = json.loads(policy)
1461 statements, found = self.process_bucket_remove(policy, bucket)
1462 modified = self.process_bucket_add(policy, bucket)
1464 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1466 if not modified and not found:
1467 return
1469 policy = json.dumps(policy)
1471 if not statements and found and not modified:
1472 s3.delete_bucket_policy(Bucket=bucket['Name'])
1473 return {'Name': bucket['Name'], 'Policy': policy}
1475 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=policy)
1476 return {'Name': bucket['Name'], 'Policy': policy}
1479@actions.register('remove-statements')
1480class RemovePolicyStatement(RemovePolicyBase):
1481 """Action to remove policy statements from S3 buckets
1483 This action has been deprecated. Please use the 'set-statements' action
1484 with the 'remove' attribute to remove policy statements from S3 buckets.
1486 :example:
1488 .. code-block:: yaml
1490 policies:
1491 - name: s3-remove-encrypt-put
1492 resource: s3
1493 filters:
1494 - type: has-statement
1495 statement_ids:
1496 - RequireEncryptedPutObject
1497 actions:
1498 - type: remove-statements
1499 statement_ids:
1500 - RequiredEncryptedPutObject
1501 """
1503 deprecations = (
1504 deprecated.filter("use the 'set-statements' action with 'remove' attribute"),
1505 )
1507 permissions = ("s3:PutBucketPolicy", "s3:DeleteBucketPolicy")
1509 def process(self, buckets):
1510 with self.executor_factory(max_workers=3) as w:
1511 futures = {}
1512 results = []
1513 for b in buckets:
1514 futures[w.submit(self.process_bucket, b)] = b
1515 for f in as_completed(futures):
1516 if f.exception():
1517 b = futures[f]
1518 self.log.error('error modifying bucket:%s\n%s',
1519 b['Name'], f.exception())
1520 results += filter(None, [f.result()])
1521 return results
1523 def process_bucket(self, bucket):
1524 p = bucket.get('Policy')
1525 if p is None:
1526 return
1528 p = json.loads(p)
1530 statements, found = self.process_policy(
1531 p, bucket, CrossAccountAccessFilter.annotation_key)
1533 if not found:
1534 return
1536 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1538 if not statements:
1539 s3.delete_bucket_policy(Bucket=bucket['Name'])
1540 else:
1541 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=json.dumps(p))
1542 return {'Name': bucket['Name'], 'State': 'PolicyRemoved', 'Statements': found}
1545@actions.register('set-replication')
1546class SetBucketReplicationConfig(BucketActionBase):
1547 """Action to add or remove replication configuration statement from S3 buckets
1549 :example:
1551 .. code-block:: yaml
1553 policies:
1554 - name: s3-unapproved-account-replication
1555 resource: s3
1556 filters:
1557 - type: value
1558 key: Replication.ReplicationConfiguration.Rules[].Destination.Account
1559 value: present
1560 - type: value
1561 key: Replication.ReplicationConfiguration.Rules[].Destination.Account
1562 value_from:
1563 url: 's3:///path/to/file.json'
1564 format: json
1565 expr: "approved_accounts.*"
1566 op: ni
1567 actions:
1568 - type: set-replication
1569 state: enable
1570 """
1571 schema = type_schema(
1572 'set-replication',
1573 state={'type': 'string', 'enum': ['enable', 'disable', 'remove']})
1574 permissions = ("s3:GetReplicationConfiguration", "s3:PutReplicationConfiguration")
1576 def process(self, buckets):
1577 with self.executor_factory(max_workers=3) as w:
1578 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1579 errors = []
1580 for future in as_completed(futures):
1581 bucket = futures[future]
1582 try:
1583 future.result()
1584 except ClientError as e:
1585 errors.append("Message: %s Bucket: %s", e, bucket['Name'])
1586 if errors:
1587 raise Exception('\n'.join(map(str, errors)))
1589 def process_bucket(self, bucket):
1590 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1591 state = self.data.get('state')
1592 if state is not None:
1593 if state == 'remove':
1594 s3.delete_bucket_replication(Bucket=bucket['Name'])
1595 return {'Name': bucket['Name'], 'State': 'ReplicationConfigRemoved'}
1596 if state in ('enable', 'disable'):
1597 config = s3.get_bucket_replication(Bucket=bucket['Name'])
1598 for rule in config['ReplicationConfiguration']['Rules']:
1599 rule['Status'] = 'Enabled' if state == 'enable' else 'Disabled'
1600 s3.put_bucket_replication(
1601 Bucket=bucket['Name'],
1602 ReplicationConfiguration=config['ReplicationConfiguration']
1603 )
1604 return {'Name': bucket['Name'], 'State': 'ReplicationConfigUpdated'}
1607@filters.register('check-public-block')
1608class FilterPublicBlock(Filter):
1609 """Filter for s3 bucket public blocks
1611 If no filter paramaters are provided it checks to see if any are unset or False.
1613 If parameters are provided only the provided ones are checked.
1615 :example:
1617 .. code-block:: yaml
1619 policies:
1620 - name: CheckForPublicAclBlock-Off
1621 resource: s3
1622 region: us-east-1
1623 filters:
1624 - type: check-public-block
1625 BlockPublicAcls: true
1626 BlockPublicPolicy: true
1627 """
1629 schema = type_schema(
1630 'check-public-block',
1631 BlockPublicAcls={'type': 'boolean'},
1632 IgnorePublicAcls={'type': 'boolean'},
1633 BlockPublicPolicy={'type': 'boolean'},
1634 RestrictPublicBuckets={'type': 'boolean'})
1635 permissions = ("s3:GetBucketPublicAccessBlock",)
1636 keys = (
1637 'BlockPublicPolicy', 'BlockPublicAcls', 'IgnorePublicAcls', 'RestrictPublicBuckets')
1638 annotation_key = 'c7n:PublicAccessBlock'
1640 def process(self, buckets, event=None):
1641 results = []
1642 with self.executor_factory(max_workers=2) as w:
1643 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1644 for f in as_completed(futures):
1645 if f.result():
1646 results.append(futures[f])
1647 return results
1649 def process_bucket(self, bucket):
1650 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1651 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys}))
1652 if self.annotation_key not in bucket:
1653 try:
1654 config = s3.get_public_access_block(
1655 Bucket=bucket['Name'])['PublicAccessBlockConfiguration']
1656 except ClientError as e:
1657 error_code = e.response['Error']['Code']
1658 if error_code == 'NoSuchPublicAccessBlockConfiguration':
1659 pass
1660 elif error_code == 'AccessDenied':
1661 # Follow the same logic as `assemble_bucket` - log and continue on access
1662 # denied errors rather than halting a policy altogether
1663 method = 'GetPublicAccessBlock'
1664 log.warning(
1665 "Bucket:%s unable to invoke method:%s error:%s ",
1666 bucket['Name'], method, e.response['Error']['Message']
1667 )
1668 bucket.setdefault('c7n:DeniedMethods', []).append(method)
1669 else:
1670 raise
1671 bucket[self.annotation_key] = config
1672 return self.matches_filter(config)
1674 def matches_filter(self, config):
1675 key_set = [key for key in self.keys if key in self.data]
1676 if key_set:
1677 return all([self.data.get(key) is config[key] for key in key_set])
1678 else:
1679 return not all(config.values())
1682@actions.register('set-public-block')
1683class SetPublicBlock(BucketActionBase):
1684 """Action to update Public Access blocks on S3 buckets
1686 If no action parameters are provided all settings will be set to the `state`, which defaults
1688 If action parameters are provided, those will be set and other extant values preserved.
1690 :example:
1692 .. code-block:: yaml
1694 policies:
1695 - name: s3-public-block-enable-all
1696 resource: s3
1697 filters:
1698 - type: check-public-block
1699 actions:
1700 - type: set-public-block
1702 policies:
1703 - name: s3-public-block-disable-all
1704 resource: s3
1705 filters:
1706 - type: check-public-block
1707 actions:
1708 - type: set-public-block
1709 state: false
1711 policies:
1712 - name: s3-public-block-enable-some
1713 resource: s3
1714 filters:
1715 - or:
1716 - type: check-public-block
1717 BlockPublicAcls: false
1718 - type: check-public-block
1719 BlockPublicPolicy: false
1720 actions:
1721 - type: set-public-block
1722 BlockPublicAcls: true
1723 BlockPublicPolicy: true
1725 """
1727 schema = type_schema(
1728 'set-public-block',
1729 state={'type': 'boolean', 'default': True},
1730 BlockPublicAcls={'type': 'boolean'},
1731 IgnorePublicAcls={'type': 'boolean'},
1732 BlockPublicPolicy={'type': 'boolean'},
1733 RestrictPublicBuckets={'type': 'boolean'})
1734 permissions = ("s3:GetBucketPublicAccessBlock", "s3:PutBucketPublicAccessBlock")
1735 keys = FilterPublicBlock.keys
1736 annotation_key = FilterPublicBlock.annotation_key
1738 def process(self, buckets):
1739 with self.executor_factory(max_workers=3) as w:
1740 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1741 for future in as_completed(futures):
1742 future.result()
1744 def process_bucket(self, bucket):
1745 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1746 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys}))
1747 if self.annotation_key not in bucket:
1748 try:
1749 config = s3.get_public_access_block(
1750 Bucket=bucket['Name'])['PublicAccessBlockConfiguration']
1751 except ClientError as e:
1752 if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration':
1753 raise
1755 key_set = [key for key in self.keys if key in self.data]
1756 if key_set:
1757 for key in key_set:
1758 config[key] = self.data.get(key)
1759 else:
1760 for key in self.keys:
1761 config[key] = self.data.get('state', True)
1762 s3.put_public_access_block(
1763 Bucket=bucket['Name'], PublicAccessBlockConfiguration=config)
1766@actions.register('toggle-versioning')
1767class ToggleVersioning(BucketActionBase):
1768 """Action to enable/suspend versioning on a S3 bucket
1770 Note versioning can never be disabled only suspended.
1772 :example:
1774 .. code-block:: yaml
1776 policies:
1777 - name: s3-enable-versioning
1778 resource: s3
1779 filters:
1780 - or:
1781 - type: value
1782 key: Versioning.Status
1783 value: Suspended
1784 - type: value
1785 key: Versioning.Status
1786 value: absent
1787 actions:
1788 - type: toggle-versioning
1789 enabled: true
1790 """
1792 schema = type_schema(
1793 'toggle-versioning',
1794 enabled={'type': 'boolean'})
1795 permissions = ("s3:PutBucketVersioning",)
1797 def process_versioning(self, resource, state):
1798 client = bucket_client(
1799 local_session(self.manager.session_factory), resource)
1800 try:
1801 client.put_bucket_versioning(
1802 Bucket=resource['Name'],
1803 VersioningConfiguration={
1804 'Status': state})
1805 except ClientError as e:
1806 if e.response['Error']['Code'] != 'AccessDenied':
1807 log.error(
1808 "Unable to put bucket versioning on bucket %s: %s" % (resource['Name'], e))
1809 raise
1810 log.warning(
1811 "Access Denied Bucket:%s while put bucket versioning" % resource['Name'])
1813 # mfa delete enablement looks like it needs the serial and a current token.
1814 def process(self, resources):
1815 enabled = self.data.get('enabled', True)
1816 for r in resources:
1817 if 'Versioning' not in r or not r['Versioning']:
1818 r['Versioning'] = {'Status': 'Suspended'}
1819 if enabled and (
1820 r['Versioning']['Status'] == 'Suspended'):
1821 self.process_versioning(r, 'Enabled')
1822 if not enabled and r['Versioning']['Status'] == 'Enabled':
1823 self.process_versioning(r, 'Suspended')
1826@actions.register('toggle-logging')
1827class ToggleLogging(BucketActionBase):
1828 """Action to enable/disable logging on a S3 bucket.
1830 Target bucket ACL must allow for WRITE and READ_ACP Permissions
1831 Not specifying a target_prefix will default to the current bucket name.
1832 https://docs.aws.amazon.com/AmazonS3/latest/dev/enable-logging-programming.html
1834 :example:
1836 .. code-block:: yaml
1838 policies:
1839 - name: s3-enable-logging
1840 resource: s3
1841 filters:
1842 - "tag:Testing": present
1843 actions:
1844 - type: toggle-logging
1845 target_bucket: log-bucket
1846 target_prefix: logs123/
1848 policies:
1849 - name: s3-force-standard-logging
1850 resource: s3
1851 filters:
1852 - type: bucket-logging
1853 op: not-equal
1854 target_bucket: "{account_id}-{region}-s3-logs"
1855 target_prefix: "{account}/{source_bucket_name}/"
1856 actions:
1857 - type: toggle-logging
1858 target_bucket: "{account_id}-{region}-s3-logs"
1859 target_prefix: "{account}/{source_bucket_name}/"
1860 """
1861 schema = type_schema(
1862 'toggle-logging',
1863 enabled={'type': 'boolean'},
1864 target_bucket={'type': 'string'},
1865 target_prefix={'type': 'string'})
1867 permissions = ("s3:PutBucketLogging", "iam:ListAccountAliases")
1869 def validate(self):
1870 if self.data.get('enabled', True):
1871 if not self.data.get('target_bucket'):
1872 raise PolicyValidationError(
1873 "target_bucket must be specified on %s" % (
1874 self.manager.data,))
1875 return self
1877 def process(self, resources):
1878 session = local_session(self.manager.session_factory)
1879 kwargs = {
1880 "enabled": self.data.get('enabled', True),
1881 "session": session,
1882 "account_name": get_account_alias_from_sts(session),
1883 }
1885 return self._process_with_futures(resources, **kwargs)
1887 def process_bucket(self, r, enabled=None, session=None, account_name=None):
1888 client = bucket_client(session, r)
1889 is_logging = bool(r.get('Logging'))
1891 if enabled:
1892 variables = self.get_std_format_args(r)
1893 variables.update({
1894 'account': account_name,
1895 'source_bucket_name': r['Name'],
1896 'source_bucket_region': get_region(r),
1897 'target_bucket_name': self.data.get('target_bucket'),
1898 'target_prefix': self.data.get('target_prefix'),
1899 })
1900 data = format_string_values(self.data, **variables)
1901 config = {
1902 'TargetBucket': data.get('target_bucket'),
1903 'TargetPrefix': data.get('target_prefix', r['Name'] + '/')
1904 }
1905 if not is_logging or r.get('Logging') != config:
1906 client.put_bucket_logging(
1907 Bucket=r['Name'],
1908 BucketLoggingStatus={'LoggingEnabled': config}
1909 )
1910 r['Logging'] = config
1912 elif not enabled and is_logging:
1913 client.put_bucket_logging(
1914 Bucket=r['Name'], BucketLoggingStatus={})
1915 r['Logging'] = {}
1918@actions.register('attach-encrypt')
1919class AttachLambdaEncrypt(BucketActionBase):
1920 """Action attaches lambda encryption policy to S3 bucket
1921 supports attachment via lambda bucket notification or sns notification
1922 to invoke lambda. a special topic value of `default` will utilize an
1923 extant notification or create one matching the bucket name.
1925 :example:
1928 .. code-block:: yaml
1931 policies:
1932 - name: attach-lambda-encrypt
1933 resource: s3
1934 filters:
1935 - type: missing-policy-statement
1936 actions:
1937 - type: attach-encrypt
1938 role: arn:aws:iam::123456789012:role/my-role
1940 """
1941 schema = type_schema(
1942 'attach-encrypt',
1943 role={'type': 'string'},
1944 tags={'type': 'object'},
1945 topic={'type': 'string'})
1947 permissions = (
1948 "s3:PutBucketNotification", "s3:GetBucketNotification",
1949 # lambda manager uses quite a few perms to provision lambdas
1950 # and event sources, hard to disamgibuate punt for now.
1951 "lambda:*",
1952 )
1954 def __init__(self, data=None, manager=None):
1955 self.data = data or {}
1956 self.manager = manager
1958 def validate(self):
1959 if (not getattr(self.manager.config, 'dryrun', True) and
1960 not self.data.get('role', self.manager.config.assume_role)):
1961 raise PolicyValidationError(
1962 "attach-encrypt: role must be specified either "
1963 "via assume or in config on %s" % (self.manager.data,))
1965 return self
1967 def process(self, buckets):
1968 from c7n.mu import LambdaManager
1969 from c7n.ufuncs.s3crypt import get_function
1971 account_id = self.manager.config.account_id
1972 topic_arn = self.data.get('topic')
1974 func = get_function(
1975 None, self.data.get('role', self.manager.config.assume_role),
1976 account_id=account_id, tags=self.data.get('tags'))
1978 regions = {get_region(b) for b in buckets}
1980 # session managers by region
1981 region_sessions = {}
1982 for r in regions:
1983 region_sessions[r] = functools.partial(
1984 self.manager.session_factory, region=r)
1986 # Publish function to all of our buckets regions
1987 region_funcs = {}
1989 for r in regions:
1990 lambda_mgr = LambdaManager(region_sessions[r])
1991 lambda_mgr.publish(func)
1992 region_funcs[r] = func
1994 with self.executor_factory(max_workers=3) as w:
1995 results = []
1996 futures = []
1997 for b in buckets:
1998 region = get_region(b)
1999 futures.append(
2000 w.submit(
2001 self.process_bucket,
2002 region_funcs[region],
2003 b,
2004 topic_arn,
2005 account_id,
2006 region_sessions[region]
2007 ))
2008 for f in as_completed(futures):
2009 if f.exception():
2010 log.exception(
2011 "Error attaching lambda-encrypt %s" % (f.exception()))
2012 results.append(f.result())
2013 return list(filter(None, results))
2015 def process_bucket(self, func, bucket, topic, account_id, session_factory):
2016 from c7n.mu import BucketSNSNotification, BucketLambdaNotification
2017 if topic:
2018 topic = None if topic == 'default' else topic
2019 source = BucketSNSNotification(session_factory, bucket, topic)
2020 else:
2021 source = BucketLambdaNotification(
2022 {'account_s3': account_id}, session_factory, bucket)
2023 return source.add(func, None)
2026@actions.register('encryption-policy')
2027class EncryptionRequiredPolicy(BucketActionBase):
2028 """Action to apply an encryption policy to S3 buckets
2031 :example:
2033 .. code-block:: yaml
2035 policies:
2036 - name: s3-enforce-encryption
2037 resource: s3
2038 mode:
2039 type: cloudtrail
2040 events:
2041 - CreateBucket
2042 actions:
2043 - encryption-policy
2044 """
2046 permissions = ("s3:GetBucketPolicy", "s3:PutBucketPolicy")
2047 schema = type_schema('encryption-policy')
2049 def __init__(self, data=None, manager=None):
2050 self.data = data or {}
2051 self.manager = manager
2053 def process(self, buckets):
2054 with self.executor_factory(max_workers=3) as w:
2055 results = w.map(self.process_bucket, buckets)
2056 results = list(filter(None, list(results)))
2057 return results
2059 def process_bucket(self, b):
2060 p = b['Policy']
2061 if p is None:
2062 log.info("No policy found, creating new")
2063 p = {'Version': "2012-10-17", "Statement": []}
2064 else:
2065 p = json.loads(p)
2067 encryption_sid = "RequiredEncryptedPutObject"
2068 encryption_statement = {
2069 'Sid': encryption_sid,
2070 'Effect': 'Deny',
2071 'Principal': '*',
2072 'Action': 's3:PutObject',
2073 "Resource": "arn:aws:s3:::%s/*" % b['Name'],
2074 "Condition": {
2075 # AWS Managed Keys or KMS keys, note policy language
2076 # does not support custom kms (todo add issue)
2077 "StringNotEquals": {
2078 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}}
2080 statements = p.get('Statement', [])
2081 for s in list(statements):
2082 if s.get('Sid', '') == encryption_sid:
2083 log.debug("Bucket:%s Found extant encrypt policy", b['Name'])
2084 if s != encryption_statement:
2085 log.info(
2086 "Bucket:%s updating extant encrypt policy", b['Name'])
2087 statements.remove(s)
2088 else:
2089 return
2091 session = self.manager.session_factory()
2092 s3 = bucket_client(session, b)
2093 statements.append(encryption_statement)
2094 p['Statement'] = statements
2095 log.info('Bucket:%s attached encryption policy' % b['Name'])
2097 try:
2098 s3.put_bucket_policy(
2099 Bucket=b['Name'],
2100 Policy=json.dumps(p))
2101 except ClientError as e:
2102 if e.response['Error']['Code'] == 'NoSuchBucket':
2103 return
2104 self.log.exception(
2105 "Error on bucket:%s putting policy\n%s error:%s",
2106 b['Name'],
2107 json.dumps(statements, indent=2), e)
2108 raise
2109 return {'Name': b['Name'], 'State': 'PolicyAttached'}
2112class BucketScanLog:
2113 """Offload remediated key ids to a disk file in batches
2115 A bucket keyspace is effectively infinite, we need to store partial
2116 results out of memory, this class provides for a json log on disk
2117 with partial write support.
2119 json output format:
2120 - [list_of_serialized_keys],
2121 - [] # Empty list of keys at end when we close the buffer
2123 """
2125 def __init__(self, log_dir, name):
2126 self.log_dir = log_dir
2127 self.name = name
2128 self.fh = None
2129 self.count = 0
2131 @property
2132 def path(self):
2133 return os.path.join(self.log_dir, "%s.json" % self.name)
2135 def __enter__(self):
2136 # Don't require output directories
2137 if self.log_dir is None:
2138 return
2140 self.fh = open(self.path, 'w')
2141 self.fh.write("[\n")
2142 return self
2144 def __exit__(self, exc_type=None, exc_value=None, exc_frame=None):
2145 if self.fh is None:
2146 return
2147 # we need an empty marker list at end to avoid trailing commas
2148 self.fh.write("[]")
2149 # and close the surrounding list
2150 self.fh.write("\n]")
2151 self.fh.close()
2152 if not self.count:
2153 os.remove(self.fh.name)
2154 self.fh = None
2155 return False
2157 def add(self, keys):
2158 self.count += len(keys)
2159 if self.fh is None:
2160 return
2161 self.fh.write(dumps(keys))
2162 self.fh.write(",\n")
2165class ScanBucket(BucketActionBase):
2167 permissions = ("s3:ListBucket",)
2169 bucket_ops = {
2170 'standard': {
2171 'iterator': 'list_objects',
2172 'contents_key': ['Contents'],
2173 'key_processor': 'process_key'
2174 },
2175 'versioned': {
2176 'iterator': 'list_object_versions',
2177 'contents_key': ['Versions'],
2178 'key_processor': 'process_version'
2179 }
2180 }
2182 def __init__(self, data, manager=None):
2183 super(ScanBucket, self).__init__(data, manager)
2184 self.denied_buckets = set()
2186 def get_bucket_style(self, b):
2187 return (
2188 b.get('Versioning', {'Status': ''}).get('Status') in (
2189 'Enabled', 'Suspended') and 'versioned' or 'standard')
2191 def get_bucket_op(self, b, op_name):
2192 bucket_style = self.get_bucket_style(b)
2193 op = self.bucket_ops[bucket_style][op_name]
2194 if op_name == 'key_processor':
2195 return getattr(self, op)
2196 return op
2198 def get_keys(self, b, key_set):
2199 content_keys = self.get_bucket_op(b, 'contents_key')
2200 keys = []
2201 for ck in content_keys:
2202 keys.extend(key_set.get(ck, []))
2203 return keys
2205 def process(self, buckets):
2206 results = self._process_with_futures(self.process_bucket, buckets)
2207 self.write_denied_buckets_file()
2208 return results
2210 def _process_with_futures(self, helper, buckets, max_workers=3):
2211 results = []
2212 with self.executor_factory(max_workers) as w:
2213 futures = {}
2214 for b in buckets:
2215 futures[w.submit(helper, b)] = b
2216 for f in as_completed(futures):
2217 if f.exception():
2218 b = futures[f]
2219 self.log.error(
2220 "Error on bucket:%s region:%s policy:%s error: %s",
2221 b['Name'], b.get('Location', 'unknown'),
2222 self.manager.data.get('name'), f.exception())
2223 self.denied_buckets.add(b['Name'])
2224 continue
2225 result = f.result()
2226 if result:
2227 results.append(result)
2228 return results
2230 def write_denied_buckets_file(self):
2231 if (self.denied_buckets and
2232 self.manager.ctx.log_dir and
2233 not isinstance(self.manager.ctx.output, NullBlobOutput)):
2234 with open(
2235 os.path.join(
2236 self.manager.ctx.log_dir, 'denied.json'), 'w') as fh:
2237 json.dump(list(self.denied_buckets), fh, indent=2)
2238 self.denied_buckets = set()
2240 def process_bucket(self, b):
2241 log.info(
2242 "Scanning bucket:%s visitor:%s style:%s" % (
2243 b['Name'], self.__class__.__name__, self.get_bucket_style(b)))
2245 s = self.manager.session_factory()
2246 s3 = bucket_client(s, b)
2248 # The bulk of _process_bucket function executes inline in
2249 # calling thread/worker context, neither paginator nor
2250 # bucketscan log should be used across worker boundary.
2251 p = s3.get_paginator(
2252 self.get_bucket_op(b, 'iterator')).paginate(Bucket=b['Name'])
2254 with BucketScanLog(self.manager.ctx.log_dir, b['Name']) as key_log:
2255 with self.executor_factory(max_workers=10) as w:
2256 try:
2257 return self._process_bucket(b, p, key_log, w)
2258 except ClientError as e:
2259 if e.response['Error']['Code'] == 'NoSuchBucket':
2260 log.warning(
2261 "Bucket:%s removed while scanning" % b['Name'])
2262 return
2263 if e.response['Error']['Code'] == 'AccessDenied':
2264 log.warning(
2265 "Access Denied Bucket:%s while scanning" % b['Name'])
2266 self.denied_buckets.add(b['Name'])
2267 return
2268 log.exception(
2269 "Error processing bucket:%s paginator:%s" % (
2270 b['Name'], p))
2272 __call__ = process_bucket
2274 def _process_bucket(self, b, p, key_log, w):
2275 count = 0
2277 for key_set in p:
2278 keys = self.get_keys(b, key_set)
2279 count += len(keys)
2280 futures = []
2282 for batch in chunks(keys, size=100):
2283 if not batch:
2284 continue
2285 futures.append(w.submit(self.process_chunk, batch, b))
2287 for f in as_completed(futures):
2288 if f.exception():
2289 log.exception("Exception Processing bucket:%s key batch %s" % (
2290 b['Name'], f.exception()))
2291 continue
2292 r = f.result()
2293 if r:
2294 key_log.add(r)
2296 # Log completion at info level, progress at debug level
2297 if key_set['IsTruncated']:
2298 log.debug('Scan progress bucket:%s keys:%d remediated:%d ...',
2299 b['Name'], count, key_log.count)
2300 else:
2301 log.info('Scan Complete bucket:%s keys:%d remediated:%d',
2302 b['Name'], count, key_log.count)
2304 b['KeyScanCount'] = count
2305 b['KeyRemediated'] = key_log.count
2306 return {
2307 'Bucket': b['Name'], 'Remediated': key_log.count, 'Count': count}
2309 def process_chunk(self, batch, bucket):
2310 raise NotImplementedError()
2312 def process_key(self, s3, key, bucket_name, info=None):
2313 raise NotImplementedError()
2315 def process_version(self, s3, bucket, key):
2316 raise NotImplementedError()
2319@actions.register('encrypt-keys')
2320class EncryptExtantKeys(ScanBucket):
2321 """Action to encrypt unencrypted S3 objects
2323 :example:
2325 .. code-block:: yaml
2327 policies:
2328 - name: s3-encrypt-objects
2329 resource: s3
2330 actions:
2331 - type: encrypt-keys
2332 crypto: aws:kms
2333 key-id: 9c3983be-c6cf-11e6-9d9d-cec0c932ce01
2334 """
2336 permissions = (
2337 "s3:GetObject",
2338 "s3:PutObject",
2339 "s3:DeleteObjectVersion",
2340 "s3:RestoreObject",
2341 ) + ScanBucket.permissions
2343 schema = {
2344 'type': 'object',
2345 'additionalProperties': False,
2346 'properties': {
2347 'type': {'enum': ['encrypt-keys']},
2348 'report-only': {'type': 'boolean'},
2349 'glacier': {'type': 'boolean'},
2350 'large': {'type': 'boolean'},
2351 'crypto': {'enum': ['AES256', 'aws:kms']},
2352 'key-id': {'type': 'string'}
2353 },
2354 'dependencies': {
2355 'key-id': {
2356 'properties': {
2357 'crypto': {'pattern': 'aws:kms'}
2358 },
2359 'required': ['crypto']
2360 }
2361 }
2362 }
2364 metrics = [
2365 ('Total Keys', {'Scope': 'Account'}),
2366 ('Unencrypted', {'Scope': 'Account'})]
2368 def __init__(self, data, manager=None):
2369 super(EncryptExtantKeys, self).__init__(data, manager)
2370 self.kms_id = self.data.get('key-id')
2372 def get_permissions(self):
2373 perms = ("s3:GetObject", "s3:GetObjectVersion")
2374 if self.data.get('report-only'):
2375 perms += ('s3:DeleteObject', 's3:DeleteObjectVersion',
2376 's3:PutObject',
2377 's3:AbortMultipartUpload',
2378 's3:ListBucket',
2379 's3:ListBucketVersions')
2380 return perms
2382 def process(self, buckets):
2384 t = time.time()
2385 results = super(EncryptExtantKeys, self).process(buckets)
2386 run_time = time.time() - t
2387 remediated_count = object_count = 0
2389 for r in results:
2390 object_count += r['Count']
2391 remediated_count += r['Remediated']
2392 self.manager.ctx.metrics.put_metric(
2393 "Unencrypted", r['Remediated'], "Count", Scope=r['Bucket'],
2394 buffer=True)
2396 self.manager.ctx.metrics.put_metric(
2397 "Unencrypted", remediated_count, "Count", Scope="Account",
2398 buffer=True
2399 )
2400 self.manager.ctx.metrics.put_metric(
2401 "Total Keys", object_count, "Count", Scope="Account",
2402 buffer=True
2403 )
2404 self.manager.ctx.metrics.flush()
2406 log.info(
2407 ("EncryptExtant Complete keys:%d "
2408 "remediated:%d rate:%0.2f/s time:%0.2fs"),
2409 object_count,
2410 remediated_count,
2411 float(object_count) / run_time if run_time else 0,
2412 run_time)
2413 return results
2415 def process_chunk(self, batch, bucket):
2416 crypto_method = self.data.get('crypto', 'AES256')
2417 s3 = bucket_client(
2418 local_session(self.manager.session_factory), bucket,
2419 kms=(crypto_method == 'aws:kms'))
2420 b = bucket['Name']
2421 results = []
2422 key_processor = self.get_bucket_op(bucket, 'key_processor')
2423 for key in batch:
2424 r = key_processor(s3, key, b)
2425 if r:
2426 results.append(r)
2427 return results
2429 def process_key(self, s3, key, bucket_name, info=None):
2430 k = key['Key']
2431 if info is None:
2432 info = s3.head_object(Bucket=bucket_name, Key=k)
2434 # If the data is already encrypted with AES256 and this request is also
2435 # for AES256 then we don't need to do anything
2436 if info.get('ServerSideEncryption') == 'AES256' and not self.kms_id:
2437 return False
2439 if info.get('ServerSideEncryption') == 'aws:kms':
2440 # If we're not looking for a specific key any key will do.
2441 if not self.kms_id:
2442 return False
2443 # If we're configured to use a specific key and the key matches
2444 # note this is not a strict equality match.
2445 if self.kms_id in info.get('SSEKMSKeyId', ''):
2446 return False
2448 if self.data.get('report-only'):
2449 return k
2451 storage_class = info.get('StorageClass', 'STANDARD')
2453 if storage_class == 'GLACIER':
2454 if not self.data.get('glacier'):
2455 return False
2456 if 'Restore' not in info:
2457 # This takes multiple hours, we let the next c7n
2458 # run take care of followups.
2459 s3.restore_object(
2460 Bucket=bucket_name,
2461 Key=k,
2462 RestoreRequest={'Days': 30})
2463 return False
2464 elif not restore_complete(info['Restore']):
2465 return False
2467 storage_class = 'STANDARD'
2469 crypto_method = self.data.get('crypto', 'AES256')
2470 key_id = self.data.get('key-id')
2471 # Note on copy we lose individual object acl grants
2472 params = {'Bucket': bucket_name,
2473 'Key': k,
2474 'CopySource': "/%s/%s" % (bucket_name, k),
2475 'MetadataDirective': 'COPY',
2476 'StorageClass': storage_class,
2477 'ServerSideEncryption': crypto_method}
2479 if key_id and crypto_method == 'aws:kms':
2480 params['SSEKMSKeyId'] = key_id
2482 if info['ContentLength'] > MAX_COPY_SIZE and self.data.get(
2483 'large', True):
2484 return self.process_large_file(s3, bucket_name, key, info, params)
2486 s3.copy_object(**params)
2487 return k
2489 def process_version(self, s3, key, bucket_name):
2490 info = s3.head_object(
2491 Bucket=bucket_name,
2492 Key=key['Key'],
2493 VersionId=key['VersionId'])
2495 if 'ServerSideEncryption' in info:
2496 return False
2498 if self.data.get('report-only'):
2499 return key['Key'], key['VersionId']
2501 if key['IsLatest']:
2502 r = self.process_key(s3, key, bucket_name, info)
2503 # Glacier request processing, wait till we have the restored object
2504 if not r:
2505 return r
2506 s3.delete_object(
2507 Bucket=bucket_name,
2508 Key=key['Key'],
2509 VersionId=key['VersionId'])
2510 return key['Key'], key['VersionId']
2512 def process_large_file(self, s3, bucket_name, key, info, params):
2513 """For objects over 5gb, use multipart upload to copy"""
2514 part_size = MAX_COPY_SIZE - (1024 ** 2)
2515 num_parts = int(math.ceil(info['ContentLength'] / part_size))
2516 source = params.pop('CopySource')
2518 params.pop('MetadataDirective')
2519 if 'Metadata' in info:
2520 params['Metadata'] = info['Metadata']
2522 upload_id = s3.create_multipart_upload(**params)['UploadId']
2524 params = {'Bucket': bucket_name,
2525 'Key': key['Key'],
2526 'UploadId': upload_id,
2527 'CopySource': source,
2528 'CopySourceIfMatch': info['ETag']}
2530 def upload_part(part_num):
2531 part_params = dict(params)
2532 part_params['CopySourceRange'] = "bytes=%d-%d" % (
2533 part_size * (part_num - 1),
2534 min(part_size * part_num - 1, info['ContentLength'] - 1))
2535 part_params['PartNumber'] = part_num
2536 response = s3.upload_part_copy(**part_params)
2537 return {'ETag': response['CopyPartResult']['ETag'],
2538 'PartNumber': part_num}
2540 try:
2541 with self.executor_factory(max_workers=2) as w:
2542 parts = list(w.map(upload_part, range(1, num_parts + 1)))
2543 except Exception:
2544 log.warning(
2545 "Error during large key copy bucket: %s key: %s, "
2546 "aborting upload", bucket_name, key, exc_info=True)
2547 s3.abort_multipart_upload(
2548 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id)
2549 raise
2550 s3.complete_multipart_upload(
2551 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id,
2552 MultipartUpload={'Parts': parts})
2553 return key['Key']
2556def restore_complete(restore):
2557 if ',' in restore:
2558 ongoing, _ = restore.split(',', 1)
2559 else:
2560 ongoing = restore
2561 return 'false' in ongoing
2564@filters.register('is-log-target')
2565class LogTarget(Filter):
2566 """Filter and return buckets are log destinations.
2568 Not suitable for use in lambda on large accounts, This is a api
2569 heavy process to detect scan all possible log sources.
2571 Sources:
2572 - elb (Access Log)
2573 - s3 (Access Log)
2574 - cfn (Template writes)
2575 - cloudtrail
2577 :example:
2579 .. code-block:: yaml
2581 policies:
2582 - name: s3-log-bucket
2583 resource: s3
2584 filters:
2585 - type: is-log-target
2586 """
2588 schema = type_schema(
2589 'is-log-target',
2590 services={'type': 'array', 'items': {'enum': [
2591 's3', 'elb', 'cloudtrail']}},
2592 self={'type': 'boolean'},
2593 value={'type': 'boolean'})
2595 def get_permissions(self):
2596 perms = self.manager.get_resource_manager('elb').get_permissions()
2597 perms += ('elasticloadbalancing:DescribeLoadBalancerAttributes',)
2598 return perms
2600 def process(self, buckets, event=None):
2601 log_buckets = set()
2602 count = 0
2604 services = self.data.get('services', ['elb', 's3', 'cloudtrail'])
2605 self_log = self.data.get('self', False)
2607 if 'elb' in services and not self_log:
2608 for bucket, _ in self.get_elb_bucket_locations():
2609 log_buckets.add(bucket)
2610 count += 1
2611 self.log.debug("Found %d elb log targets" % count)
2613 if 's3' in services:
2614 count = 0
2615 for bucket, _ in self.get_s3_bucket_locations(buckets, self_log):
2616 count += 1
2617 log_buckets.add(bucket)
2618 self.log.debug('Found %d s3 log targets' % count)
2620 if 'cloudtrail' in services and not self_log:
2621 for bucket, _ in self.get_cloud_trail_locations(buckets):
2622 log_buckets.add(bucket)
2624 self.log.info("Found %d log targets for %d buckets" % (
2625 len(log_buckets), len(buckets)))
2626 if self.data.get('value', True):
2627 return [b for b in buckets if b['Name'] in log_buckets]
2628 else:
2629 return [b for b in buckets if b['Name'] not in log_buckets]
2631 @staticmethod
2632 def get_s3_bucket_locations(buckets, self_log=False):
2633 """return (bucket_name, prefix) for all s3 logging targets"""
2634 for b in buckets:
2635 if b.get('Logging'):
2636 if self_log:
2637 if b['Name'] != b['Logging']['TargetBucket']:
2638 continue
2639 yield (b['Logging']['TargetBucket'],
2640 b['Logging']['TargetPrefix'])
2641 if not self_log and b['Name'].startswith('cf-templates-'):
2642 yield (b['Name'], '')
2644 def get_cloud_trail_locations(self, buckets):
2645 session = local_session(self.manager.session_factory)
2646 client = session.client('cloudtrail')
2647 names = {b['Name'] for b in buckets}
2648 for t in client.describe_trails().get('trailList', ()):
2649 if t.get('S3BucketName') in names:
2650 yield (t['S3BucketName'], t.get('S3KeyPrefix', ''))
2652 def get_elb_bucket_locations(self):
2653 elbs = self.manager.get_resource_manager('elb').resources()
2654 get_elb_attrs = functools.partial(
2655 _query_elb_attrs, self.manager.session_factory)
2657 with self.executor_factory(max_workers=2) as w:
2658 futures = []
2659 for elb_set in chunks(elbs, 100):
2660 futures.append(w.submit(get_elb_attrs, elb_set))
2661 for f in as_completed(futures):
2662 if f.exception():
2663 log.error("Error while scanning elb log targets: %s" % (
2664 f.exception()))
2665 continue
2666 for tgt in f.result():
2667 yield tgt
2670def _query_elb_attrs(session_factory, elb_set):
2671 session = local_session(session_factory)
2672 client = session.client('elb')
2673 log_targets = []
2674 for e in elb_set:
2675 try:
2676 attrs = client.describe_load_balancer_attributes(
2677 LoadBalancerName=e['LoadBalancerName'])[
2678 'LoadBalancerAttributes']
2679 if 'AccessLog' in attrs and attrs['AccessLog']['Enabled']:
2680 log_targets.append((
2681 attrs['AccessLog']['S3BucketName'],
2682 attrs['AccessLog']['S3BucketPrefix']))
2683 except Exception as err:
2684 log.warning(
2685 "Could not retrieve load balancer %s: %s" % (
2686 e['LoadBalancerName'], err))
2687 return log_targets
2690@actions.register('remove-website-hosting')
2691class RemoveWebsiteHosting(BucketActionBase):
2692 """Action that removes website hosting configuration."""
2694 schema = type_schema('remove-website-hosting')
2696 permissions = ('s3:DeleteBucketWebsite',)
2698 def process(self, buckets):
2699 session = local_session(self.manager.session_factory)
2700 for bucket in buckets:
2701 client = bucket_client(session, bucket)
2702 client.delete_bucket_website(Bucket=bucket['Name'])
2705@actions.register('delete-global-grants')
2706class DeleteGlobalGrants(BucketActionBase):
2707 """Deletes global grants associated to a S3 bucket
2709 :example:
2711 .. code-block:: yaml
2713 policies:
2714 - name: s3-delete-global-grants
2715 resource: s3
2716 filters:
2717 - type: global-grants
2718 actions:
2719 - delete-global-grants
2720 """
2722 schema = type_schema(
2723 'delete-global-grants',
2724 grantees={'type': 'array', 'items': {'type': 'string'}})
2726 permissions = ('s3:PutBucketAcl',)
2728 def process(self, buckets):
2729 with self.executor_factory(max_workers=5) as w:
2730 return list(filter(None, list(w.map(self.process_bucket, buckets))))
2732 def process_bucket(self, b):
2733 grantees = self.data.get(
2734 'grantees', [
2735 GlobalGrantsFilter.AUTH_ALL, GlobalGrantsFilter.GLOBAL_ALL])
2737 log.info(b)
2739 acl = b.get('Acl', {'Grants': []})
2740 if not acl or not acl['Grants']:
2741 return
2742 new_grants = []
2743 for grant in acl['Grants']:
2744 grantee = grant.get('Grantee', {})
2745 if not grantee:
2746 continue
2747 # Yuck, 'get_bucket_acl' doesn't return the grantee type.
2748 if 'URI' in grantee:
2749 grantee['Type'] = 'Group'
2750 else:
2751 grantee['Type'] = 'CanonicalUser'
2752 if ('URI' in grantee and
2753 grantee['URI'] in grantees and not
2754 (grant['Permission'] == 'READ' and b['Website'])):
2755 # Remove this grantee.
2756 pass
2757 else:
2758 new_grants.append(grant)
2760 log.info({'Owner': acl['Owner'], 'Grants': new_grants})
2762 c = bucket_client(self.manager.session_factory(), b)
2763 try:
2764 c.put_bucket_acl(
2765 Bucket=b['Name'],
2766 AccessControlPolicy={
2767 'Owner': acl['Owner'], 'Grants': new_grants})
2768 except ClientError as e:
2769 if e.response['Error']['Code'] == 'NoSuchBucket':
2770 return
2771 return b
2774@actions.register('tag')
2775class BucketTag(Tag):
2776 """Action to create tags on a S3 bucket
2778 :example:
2780 .. code-block:: yaml
2782 policies:
2783 - name: s3-tag-region
2784 resource: s3
2785 region: us-east-1
2786 filters:
2787 - "tag:RegionName": absent
2788 actions:
2789 - type: tag
2790 key: RegionName
2791 value: us-east-1
2792 """
2794 def process_resource_set(self, client, resource_set, tags):
2795 modify_bucket_tags(self.manager.session_factory, resource_set, tags)
2798@actions.register('mark-for-op')
2799class MarkBucketForOp(TagDelayedAction):
2800 """Action schedules custodian to perform an action at a certain date
2802 :example:
2804 .. code-block:: yaml
2806 policies:
2807 - name: s3-encrypt
2808 resource: s3
2809 filters:
2810 - type: missing-statement
2811 statement_ids:
2812 - RequiredEncryptedPutObject
2813 actions:
2814 - type: mark-for-op
2815 op: attach-encrypt
2816 days: 7
2817 """
2819 schema = type_schema(
2820 'mark-for-op', rinherit=TagDelayedAction.schema)
2823@actions.register('unmark')
2824@actions.register('remove-tag')
2825class RemoveBucketTag(RemoveTag):
2826 """Removes tag/tags from a S3 object
2828 :example:
2830 .. code-block:: yaml
2832 policies:
2833 - name: s3-remove-owner-tag
2834 resource: s3
2835 filters:
2836 - "tag:BucketOwner": present
2837 actions:
2838 - type: remove-tag
2839 tags: ['BucketOwner']
2840 """
2842 def process_resource_set(self, client, resource_set, tags):
2843 modify_bucket_tags(
2844 self.manager.session_factory, resource_set, remove_tags=tags)
2847@filters.register('data-events')
2848class DataEvents(Filter):
2849 """Find buckets for which CloudTrail is logging data events.
2851 Note that this filter only examines trails that are defined in the
2852 current account.
2853 """
2855 schema = type_schema('data-events', state={'enum': ['present', 'absent']})
2856 permissions = (
2857 'cloudtrail:DescribeTrails',
2858 'cloudtrail:GetEventSelectors')
2860 def get_event_buckets(self, session, trails):
2861 """Return a mapping of bucket name to cloudtrail.
2863 For wildcard trails the bucket name is ''.
2864 """
2865 regions = {t.get('HomeRegion') for t in trails}
2866 clients = {}
2867 for region in regions:
2868 clients[region] = session.client('cloudtrail', region_name=region)
2870 event_buckets = {}
2871 for t in trails:
2872 for events in clients[t.get('HomeRegion')].get_event_selectors(
2873 TrailName=t['Name']).get('EventSelectors', ()):
2874 if 'DataResources' not in events:
2875 continue
2876 for data_events in events['DataResources']:
2877 if data_events['Type'] != 'AWS::S3::Object':
2878 continue
2879 for b in data_events['Values']:
2880 event_buckets[b.rsplit(':')[-1].strip('/')] = t['Name']
2881 return event_buckets
2883 def process(self, resources, event=None):
2884 trails = self.manager.get_resource_manager('cloudtrail').resources()
2885 local_trails = self.filter_resources(
2886 trails,
2887 "split(':', TrailARN)[4]", (self.manager.account_id,)
2888 )
2889 session = local_session(self.manager.session_factory)
2890 event_buckets = self.get_event_buckets(session, local_trails)
2891 ops = {
2892 'present': lambda x: (
2893 x['Name'] in event_buckets or '' in event_buckets),
2894 'absent': (
2895 lambda x: x['Name'] not in event_buckets and ''
2896 not in event_buckets)}
2898 op = ops[self.data.get('state', 'present')]
2899 results = []
2900 for b in resources:
2901 if op(b):
2902 results.append(b)
2903 return results
2906@filters.register('inventory')
2907class Inventory(ValueFilter):
2908 """Filter inventories for a bucket"""
2909 schema = type_schema('inventory', rinherit=ValueFilter.schema)
2910 schema_alias = False
2911 permissions = ('s3:GetInventoryConfiguration',)
2913 def process(self, buckets, event=None):
2914 results = []
2915 with self.executor_factory(max_workers=2) as w:
2916 futures = {}
2917 for b in buckets:
2918 futures[w.submit(self.process_bucket, b)] = b
2920 for f in as_completed(futures):
2921 b = futures[f]
2922 if f.exception():
2923 b.setdefault('c7n:DeniedMethods', []).append('GetInventoryConfiguration')
2924 self.log.error(
2925 "Error processing bucket: %s error: %s",
2926 b['Name'], f.exception())
2927 continue
2928 if f.result():
2929 results.append(b)
2930 return results
2932 def process_bucket(self, b):
2933 if 'c7n:inventories' not in b:
2934 client = bucket_client(local_session(self.manager.session_factory), b)
2935 inventories = client.list_bucket_inventory_configurations(
2936 Bucket=b['Name']).get('InventoryConfigurationList', [])
2937 b['c7n:inventories'] = inventories
2939 for i in b['c7n:inventories']:
2940 if self.match(i):
2941 return True
2944@actions.register('set-inventory')
2945class SetInventory(BucketActionBase):
2946 """Configure bucket inventories for an s3 bucket.
2947 """
2948 schema = type_schema(
2949 'set-inventory',
2950 required=['name', 'destination'],
2951 state={'enum': ['enabled', 'disabled', 'absent']},
2952 name={'type': 'string', 'description': 'Name of inventory'},
2953 destination={'type': 'string', 'description': 'Name of destination bucket'},
2954 prefix={'type': 'string', 'description': 'Destination prefix'},
2955 encryption={'enum': ['SSES3', 'SSEKMS']},
2956 key_id={'type': 'string', 'description': 'Optional Customer KMS KeyId for SSE-KMS'},
2957 versions={'enum': ['All', 'Current']},
2958 schedule={'enum': ['Daily', 'Weekly']},
2959 format={'enum': ['CSV', 'ORC', 'Parquet']},
2960 fields={'type': 'array', 'items': {'enum': [
2961 'Size', 'LastModifiedDate', 'StorageClass', 'ETag',
2962 'IsMultipartUploaded', 'ReplicationStatus', 'EncryptionStatus',
2963 'ObjectLockRetainUntilDate', 'ObjectLockMode', 'ObjectLockLegalHoldStatus',
2964 'IntelligentTieringAccessTier', 'BucketKeyStatus', 'ChecksumAlgorithm',
2965 'ObjectAccessControlList', 'ObjectOwner']}})
2967 permissions = ('s3:PutInventoryConfiguration', 's3:GetInventoryConfiguration')
2969 def process(self, buckets):
2970 with self.executor_factory(max_workers=2) as w:
2971 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
2972 for future in as_completed(futures):
2973 bucket = futures[future]
2974 try:
2975 future.result()
2976 except Exception as e:
2977 self.log.error('Message: %s Bucket: %s', e, bucket['Name'])
2979 def process_bucket(self, b):
2980 inventory_name = self.data.get('name')
2981 destination = self.data.get('destination')
2982 prefix = self.data.get('prefix', '')
2983 schedule = self.data.get('schedule', 'Daily')
2984 fields = self.data.get('fields', ['LastModifiedDate', 'Size'])
2985 versions = self.data.get('versions', 'Current')
2986 state = self.data.get('state', 'enabled')
2987 encryption = self.data.get('encryption')
2988 inventory_format = self.data.get('format', 'CSV')
2990 if not prefix:
2991 prefix = "Inventories/%s" % (self.manager.config.account_id)
2993 client = bucket_client(local_session(self.manager.session_factory), b)
2994 if state == 'absent':
2995 try:
2996 client.delete_bucket_inventory_configuration(
2997 Bucket=b['Name'], Id=inventory_name)
2998 except ClientError as e:
2999 if e.response['Error']['Code'] != 'NoSuchConfiguration':
3000 raise
3001 return
3003 bucket = {
3004 'Bucket': "arn:aws:s3:::%s" % destination,
3005 'Format': inventory_format
3006 }
3008 inventory = {
3009 'Destination': {
3010 'S3BucketDestination': bucket
3011 },
3012 'IsEnabled': state == 'enabled' and True or False,
3013 'Id': inventory_name,
3014 'OptionalFields': fields,
3015 'IncludedObjectVersions': versions,
3016 'Schedule': {
3017 'Frequency': schedule
3018 }
3019 }
3021 if prefix:
3022 bucket['Prefix'] = prefix
3024 if encryption:
3025 bucket['Encryption'] = {encryption: {}}
3026 if encryption == 'SSEKMS' and self.data.get('key_id'):
3027 bucket['Encryption'] = {encryption: {
3028 'KeyId': self.data['key_id']
3029 }}
3031 found = self.get_inventory_delta(client, inventory, b)
3032 if found:
3033 return
3034 if found is False:
3035 self.log.debug("updating bucket:%s inventory configuration id:%s",
3036 b['Name'], inventory_name)
3037 client.put_bucket_inventory_configuration(
3038 Bucket=b['Name'], Id=inventory_name, InventoryConfiguration=inventory)
3040 def get_inventory_delta(self, client, inventory, b):
3041 inventories = client.list_bucket_inventory_configurations(Bucket=b['Name'])
3042 found = None
3043 for i in inventories.get('InventoryConfigurationList', []):
3044 if i['Id'] != inventory['Id']:
3045 continue
3046 found = True
3047 for k, v in inventory.items():
3048 if k not in i:
3049 found = False
3050 continue
3051 if isinstance(v, list):
3052 v.sort()
3053 i[k].sort()
3054 if i[k] != v:
3055 found = False
3056 return found
3059@filters.register('intelligent-tiering')
3060class IntelligentTiering(ListItemFilter):
3061 """Filter for S3 buckets to look at intelligent tiering configurations
3063 The schema to supply to the attrs follows the schema here:
3064 https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_bucket_intelligent_tiering_configurations.html
3066 :example:
3068 .. code-block:: yaml
3070 policies:
3071 - name: s3-intelligent-tiering-configuration
3072 resource: s3
3073 filters:
3074 - type: intelligent-tiering
3075 attrs:
3076 - Status: Enabled
3077 - Filter:
3078 And:
3079 Prefix: test
3080 Tags:
3081 - Key: Owner
3082 Value: c7n
3083 - Tierings:
3084 - Days: 100
3085 - AccessTier: ARCHIVE_ACCESS
3087 """
3088 schema = type_schema(
3089 'intelligent-tiering',
3090 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'},
3091 count={'type': 'number'},
3092 count_op={'$ref': '#/definitions/filters_common/comparison_operators'}
3093 )
3094 permissions = ('s3:GetIntelligentTieringConfiguration',)
3095 annotation_key = "c7n:IntelligentTiering"
3096 annotate_items = True
3098 def __init__(self, data, manager=None):
3099 super().__init__(data, manager)
3100 self.data['key'] = self.annotation_key
3102 def process(self, buckets, event=None):
3103 with self.executor_factory(max_workers=2) as w:
3104 futures = {w.submit(self.get_item_values, b): b for b in buckets}
3105 for future in as_completed(futures):
3106 b = futures[future]
3107 if future.exception():
3108 self.log.error("Message: %s Bucket: %s", future.exception(), b['Name'])
3109 continue
3110 return super().process(buckets, event)
3112 def get_item_values(self, b):
3113 if self.annotation_key not in b:
3114 client = bucket_client(local_session(self.manager.session_factory), b)
3115 try:
3116 int_tier_config = client.list_bucket_intelligent_tiering_configurations(
3117 Bucket=b['Name'])
3118 b[self.annotation_key] = int_tier_config.get(
3119 'IntelligentTieringConfigurationList', [])
3120 except ClientError as e:
3121 if e.response['Error']['Code'] == 'AccessDenied':
3122 method = 'list_bucket_intelligent_tiering_configurations'
3123 log.warning(
3124 "Bucket:%s unable to invoke method:%s error:%s ",
3125 b['Name'], method, e.response['Error']['Message'])
3126 b.setdefault('c7n:DeniedMethods', []).append(method)
3127 return b.get(self.annotation_key)
3130@actions.register('set-intelligent-tiering')
3131class ConfigureIntelligentTiering(BucketActionBase):
3132 """Action applies an intelligent tiering configuration to a S3 bucket
3134 The schema to supply to the configuration follows the schema here:
3135 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_bucket_intelligent_tiering_configuration.html
3137 To delete a configuration, supply Status=delete with the either the Id or Id: matched
3139 :example:
3141 .. code-block:: yaml
3143 policies:
3144 - name: s3-apply-intelligent-tiering-config
3145 resource: aws.s3
3146 filters:
3147 - not:
3148 - type: intelligent-tiering
3149 attrs:
3150 - Status: Enabled
3151 - Filter:
3152 And:
3153 Prefix: helloworld
3154 Tags:
3155 - Key: Hello
3156 Value: World
3157 - Tierings:
3158 - Days: 123
3159 AccessTier: ARCHIVE_ACCESS
3160 actions:
3161 - type: set-intelligent-tiering
3162 Id: c7n-default
3163 IntelligentTieringConfiguration:
3164 Id: c7n-default
3165 Status: Enabled
3166 Tierings:
3167 - Days: 149
3168 AccessTier: ARCHIVE_ACCESS
3170 - name: s3-delete-intelligent-tiering-configuration
3171 resource: aws.s3
3172 filters:
3173 - type: intelligent-tiering
3174 attrs:
3175 - Status: Enabled
3176 - Id: test-config
3177 actions:
3178 - type: set-intelligent-tiering
3179 Id: test-config
3180 State: delete
3182 - name: s3-delete-intelligent-tiering-matched-configs
3183 resource: aws.s3
3184 filters:
3185 - type: intelligent-tiering
3186 attrs:
3187 - Status: Enabled
3188 - Id: test-config
3189 actions:
3190 - type: set-intelligent-tiering
3191 Id: matched
3192 State: delete
3194 """
3196 annotation_key = 'c7n:ListItemMatches'
3197 shape = 'PutBucketIntelligentTieringConfigurationRequest'
3198 schema = {
3199 'type': 'object',
3200 'additionalProperties': False,
3201 'oneOf': [
3202 {'required': ['type', 'Id', 'IntelligentTieringConfiguration']},
3203 {'required': ['type', 'Id', 'State']}],
3204 'properties': {
3205 'type': {'enum': ['set-intelligent-tiering']},
3206 'Id': {'type': 'string'},
3207 # delete intelligent tier configurations via state: delete
3208 'State': {'type': 'string', 'enum': ['delete']},
3209 'IntelligentTieringConfiguration': {'type': 'object'}
3210 },
3211 }
3213 permissions = ('s3:PutIntelligentTieringConfiguration',)
3215 def validate(self):
3216 # You can have up to 1,000 S3 Intelligent-Tiering configurations per bucket.
3217 # Hence, always use it with a filter
3218 found = False
3219 for f in self.manager.iter_filters():
3220 if isinstance(f, IntelligentTiering):
3221 found = True
3222 break
3223 if not found:
3224 raise PolicyValidationError(
3225 '`set-intelligent-tiering` may only be used in '
3226 'conjunction with `intelligent-tiering` filter on %s' % (self.manager.data,))
3227 cfg = dict(self.data)
3228 if 'IntelligentTieringConfiguration' in cfg:
3229 cfg['Bucket'] = 'bucket'
3230 cfg.pop('type')
3231 return shape_validate(
3232 cfg, self.shape, self.manager.resource_type.service)
3234 def process(self, buckets):
3235 with self.executor_factory(max_workers=3) as w:
3236 futures = {}
3238 for b in buckets:
3239 futures[w.submit(self.process_bucket, b)] = b
3241 for future in as_completed(futures):
3242 if future.exception():
3243 bucket = futures[future]
3244 self.log.error(
3245 'error modifying bucket intelligent tiering configuration: %s\n%s',
3246 bucket['Name'], future.exception())
3247 continue
3249 def process_bucket(self, bucket):
3250 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3252 if 'list_bucket_intelligent_tiering_configurations' in bucket.get(
3253 'c7n:DeniedMethods', []):
3254 log.warning("Access Denied Bucket:%s while reading intelligent tiering configurations"
3255 % bucket['Name'])
3256 return
3258 if self.data.get('Id') and self.data.get('IntelligentTieringConfiguration'):
3259 try:
3260 s3.put_bucket_intelligent_tiering_configuration(
3261 Bucket=bucket['Name'], Id=self.data.get(
3262 'Id'), IntelligentTieringConfiguration=self.data.get(
3263 'IntelligentTieringConfiguration'))
3264 except ClientError as e:
3265 if e.response['Error']['Code'] == 'AccessDenied':
3266 log.warning(
3267 "Access Denied Bucket:%s while applying intelligent tiering configuration"
3268 % bucket['Name'])
3269 if self.data.get('State'):
3270 if self.data.get('Id') == 'matched':
3271 for config in bucket.get(self.annotation_key):
3272 self.delete_intelligent_tiering_configurations(s3, config.get('Id'), bucket)
3273 else:
3274 self.delete_intelligent_tiering_configurations(s3, self.data.get('Id'), bucket)
3276 def delete_intelligent_tiering_configurations(self, s3_client, id, bucket):
3277 try:
3278 s3_client.delete_bucket_intelligent_tiering_configuration(Bucket=bucket['Name'], Id=id)
3279 except ClientError as e:
3280 if e.response['Error']['Code'] == 'AccessDenied':
3281 log.warning(
3282 "Access Denied Bucket:%s while deleting intelligent tiering configuration"
3283 % bucket['Name'])
3284 elif e.response['Error']['Code'] == 'NoSuchConfiguration':
3285 log.warning(
3286 "No such configuration found:%s while deleting intelligent tiering configuration"
3287 % bucket['Name'])
3290@actions.register('delete')
3291class DeleteBucket(ScanBucket):
3292 """Action deletes a S3 bucket
3294 :example:
3296 .. code-block:: yaml
3298 policies:
3299 - name: delete-unencrypted-buckets
3300 resource: s3
3301 filters:
3302 - type: missing-statement
3303 statement_ids:
3304 - RequiredEncryptedPutObject
3305 actions:
3306 - type: delete
3307 remove-contents: true
3308 """
3310 schema = type_schema('delete', **{'remove-contents': {'type': 'boolean'}})
3312 permissions = ('s3:*',)
3314 bucket_ops = {
3315 'standard': {
3316 'iterator': 'list_objects',
3317 'contents_key': ['Contents'],
3318 'key_processor': 'process_key'
3319 },
3320 'versioned': {
3321 'iterator': 'list_object_versions',
3322 'contents_key': ['Versions', 'DeleteMarkers'],
3323 'key_processor': 'process_version'
3324 }
3325 }
3327 def process_delete_enablement(self, b):
3328 """Prep a bucket for deletion.
3330 Clear out any pending multi-part uploads.
3332 Disable versioning on the bucket, so deletes don't
3333 generate fresh deletion markers.
3334 """
3335 client = bucket_client(
3336 local_session(self.manager.session_factory), b)
3338 # Stop replication so we can suspend versioning
3339 if b.get('Replication') is not None:
3340 client.delete_bucket_replication(Bucket=b['Name'])
3342 # Suspend versioning, so we don't get new delete markers
3343 # as we walk and delete versions
3344 if (self.get_bucket_style(b) == 'versioned' and b['Versioning']['Status'] == 'Enabled' and
3345 self.data.get('remove-contents', True)):
3346 client.put_bucket_versioning(
3347 Bucket=b['Name'],
3348 VersioningConfiguration={'Status': 'Suspended'})
3350 # Clear our multi-part uploads
3351 uploads = client.get_paginator('list_multipart_uploads')
3352 for p in uploads.paginate(Bucket=b['Name']):
3353 for u in p.get('Uploads', ()):
3354 client.abort_multipart_upload(
3355 Bucket=b['Name'],
3356 Key=u['Key'],
3357 UploadId=u['UploadId'])
3359 def process(self, buckets):
3360 # might be worth sanity checking all our permissions
3361 # on the bucket up front before disabling versioning/replication.
3362 if self.data.get('remove-contents', True):
3363 self._process_with_futures(self.process_delete_enablement, buckets)
3364 self.empty_buckets(buckets)
3366 results = self._process_with_futures(self.delete_bucket, buckets)
3367 self.write_denied_buckets_file()
3368 return results
3370 def delete_bucket(self, b):
3371 s3 = bucket_client(self.manager.session_factory(), b)
3372 try:
3373 self._run_api(s3.delete_bucket, Bucket=b['Name'])
3374 except ClientError as e:
3375 if e.response['Error']['Code'] == 'BucketNotEmpty':
3376 self.log.error(
3377 "Error while deleting bucket %s, bucket not empty" % (
3378 b['Name']))
3379 else:
3380 raise e
3382 def empty_buckets(self, buckets):
3383 t = time.time()
3384 results = super(DeleteBucket, self).process(buckets)
3385 run_time = time.time() - t
3386 object_count = 0
3388 for r in results:
3389 object_count += r['Count']
3390 self.manager.ctx.metrics.put_metric(
3391 "Total Keys", object_count, "Count", Scope=r['Bucket'],
3392 buffer=True)
3393 self.manager.ctx.metrics.put_metric(
3394 "Total Keys", object_count, "Count", Scope="Account", buffer=True)
3395 self.manager.ctx.metrics.flush()
3397 log.info(
3398 "EmptyBucket buckets:%d Complete keys:%d rate:%0.2f/s time:%0.2fs",
3399 len(buckets), object_count,
3400 float(object_count) / run_time if run_time else 0, run_time)
3401 return results
3403 def process_chunk(self, batch, bucket):
3404 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3405 objects = []
3406 for key in batch:
3407 obj = {'Key': key['Key']}
3408 if 'VersionId' in key:
3409 obj['VersionId'] = key['VersionId']
3410 objects.append(obj)
3411 results = s3.delete_objects(
3412 Bucket=bucket['Name'], Delete={'Objects': objects}).get('Deleted', ())
3413 if self.get_bucket_style(bucket) != 'versioned':
3414 return results
3417@actions.register('configure-lifecycle')
3418class Lifecycle(BucketActionBase):
3419 """Action applies a lifecycle policy to versioned S3 buckets
3421 The schema to supply to the rule follows the schema here:
3422 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_bucket_lifecycle_configuration
3424 To delete a lifecycle rule, supply Status=absent
3426 :example:
3428 .. code-block:: yaml
3430 policies:
3431 - name: s3-apply-lifecycle
3432 resource: s3
3433 actions:
3434 - type: configure-lifecycle
3435 rules:
3436 - ID: my-lifecycle-id
3437 Status: Enabled
3438 Prefix: foo/
3439 Transitions:
3440 - Days: 60
3441 StorageClass: GLACIER
3443 """
3445 schema = type_schema(
3446 'configure-lifecycle',
3447 **{
3448 'rules': {
3449 'type': 'array',
3450 'items': {
3451 'type': 'object',
3452 'required': ['ID', 'Status'],
3453 'additionalProperties': False,
3454 'properties': {
3455 'ID': {'type': 'string'},
3456 # c7n intercepts `absent`
3457 'Status': {'enum': ['Enabled', 'Disabled', 'absent']},
3458 'Prefix': {'type': 'string'},
3459 'Expiration': {
3460 'type': 'object',
3461 'additionalProperties': False,
3462 'properties': {
3463 'Date': {'type': 'string'}, # Date
3464 'Days': {'type': 'integer'},
3465 'ExpiredObjectDeleteMarker': {'type': 'boolean'},
3466 },
3467 },
3468 'Filter': {
3469 'type': 'object',
3470 'minProperties': 1,
3471 'maxProperties': 1,
3472 'additionalProperties': False,
3473 'properties': {
3474 'Prefix': {'type': 'string'},
3475 'ObjectSizeGreaterThan': {'type': 'integer'},
3476 'ObjectSizeLessThan': {'type': 'integer'},
3477 'Tag': {
3478 'type': 'object',
3479 'required': ['Key', 'Value'],
3480 'additionalProperties': False,
3481 'properties': {
3482 'Key': {'type': 'string'},
3483 'Value': {'type': 'string'},
3484 },
3485 },
3486 'And': {
3487 'type': 'object',
3488 'additionalProperties': False,
3489 'properties': {
3490 'Prefix': {'type': 'string'},
3491 'ObjectSizeGreaterThan': {'type': 'integer'},
3492 'ObjectSizeLessThan': {'type': 'integer'},
3493 'Tags': {
3494 'type': 'array',
3495 'items': {
3496 'type': 'object',
3497 'required': ['Key', 'Value'],
3498 'additionalProperties': False,
3499 'properties': {
3500 'Key': {'type': 'string'},
3501 'Value': {'type': 'string'},
3502 },
3503 },
3504 },
3505 },
3506 },
3507 },
3508 },
3509 'Transitions': {
3510 'type': 'array',
3511 'items': {
3512 'type': 'object',
3513 'additionalProperties': False,
3514 'properties': {
3515 'Date': {'type': 'string'}, # Date
3516 'Days': {'type': 'integer'},
3517 'StorageClass': {'type': 'string'},
3518 },
3519 },
3520 },
3521 'NoncurrentVersionTransitions': {
3522 'type': 'array',
3523 'items': {
3524 'type': 'object',
3525 'additionalProperties': False,
3526 'properties': {
3527 'NoncurrentDays': {'type': 'integer'},
3528 'NewerNoncurrentVersions': {'type': 'integer'},
3529 'StorageClass': {'type': 'string'},
3530 },
3531 },
3532 },
3533 'NoncurrentVersionExpiration': {
3534 'type': 'object',
3535 'additionalProperties': False,
3536 'properties': {
3537 'NoncurrentDays': {'type': 'integer'},
3538 'NewerNoncurrentVersions': {'type': 'integer'}
3539 },
3540 },
3541 'AbortIncompleteMultipartUpload': {
3542 'type': 'object',
3543 'additionalProperties': False,
3544 'properties': {
3545 'DaysAfterInitiation': {'type': 'integer'},
3546 },
3547 },
3548 },
3549 },
3550 },
3551 }
3552 )
3554 permissions = ('s3:GetLifecycleConfiguration', 's3:PutLifecycleConfiguration')
3556 def process(self, buckets):
3557 with self.executor_factory(max_workers=3) as w:
3558 futures = {}
3559 results = []
3561 for b in buckets:
3562 futures[w.submit(self.process_bucket, b)] = b
3564 for future in as_completed(futures):
3565 if future.exception():
3566 bucket = futures[future]
3567 self.log.error('error modifying bucket lifecycle: %s\n%s',
3568 bucket['Name'], future.exception())
3569 results += filter(None, [future.result()])
3571 return results
3573 def process_bucket(self, bucket):
3574 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3576 if 'get_bucket_lifecycle_configuration' in bucket.get('c7n:DeniedMethods', []):
3577 log.warning("Access Denied Bucket:%s while reading lifecycle" % bucket['Name'])
3578 return
3580 # Adjust the existing lifecycle by adding/deleting/overwriting rules as necessary
3581 config = (bucket.get('Lifecycle') or {}).get('Rules', [])
3582 for rule in self.data['rules']:
3583 for index, existing_rule in enumerate(config):
3584 if not existing_rule:
3585 continue
3586 if rule['ID'] == existing_rule['ID']:
3587 if rule['Status'] == 'absent':
3588 config[index] = None
3589 else:
3590 config[index] = rule
3591 break
3592 else:
3593 if rule['Status'] != 'absent':
3594 config.append(rule)
3596 # The extra `list` conversion is required for python3
3597 config = list(filter(None, config))
3599 try:
3600 if not config:
3601 s3.delete_bucket_lifecycle(Bucket=bucket['Name'])
3602 else:
3603 s3.put_bucket_lifecycle_configuration(
3604 Bucket=bucket['Name'], LifecycleConfiguration={'Rules': config})
3605 except ClientError as e:
3606 if e.response['Error']['Code'] == 'AccessDenied':
3607 log.warning("Access Denied Bucket:%s while applying lifecycle" % bucket['Name'])
3608 else:
3609 raise e
3612class KMSKeyResolverMixin:
3613 """Builds a dictionary of region specific ARNs"""
3615 def __init__(self, data, manager=None):
3616 self.arns = dict()
3617 self.data = data
3618 self.manager = manager
3620 def resolve_keys(self, buckets):
3621 key = self.data.get('key')
3622 if not key:
3623 return None
3625 regions = {get_region(b) for b in buckets}
3626 for r in regions:
3627 client = local_session(self.manager.session_factory).client('kms', region_name=r)
3628 try:
3629 key_meta = client.describe_key(
3630 KeyId=key
3631 ).get('KeyMetadata', {})
3632 key_id = key_meta.get('KeyId')
3634 # We need a complete set of alias identifiers (names and ARNs)
3635 # to fully evaluate bucket encryption filters.
3636 key_aliases = client.list_aliases(
3637 KeyId=key_id
3638 ).get('Aliases', [])
3640 self.arns[r] = {
3641 'KeyId': key_id,
3642 'Arn': key_meta.get('Arn'),
3643 'KeyManager': key_meta.get('KeyManager'),
3644 'Description': key_meta.get('Description'),
3645 'Aliases': [
3646 alias[attr]
3647 for alias in key_aliases
3648 for attr in ('AliasArn', 'AliasName')
3649 ],
3650 }
3652 except ClientError as e:
3653 self.log.error('Error resolving kms ARNs for set-bucket-encryption: %s key: %s' % (
3654 e, self.data.get('key')))
3656 def get_key(self, bucket):
3657 if 'key' not in self.data:
3658 return None
3659 region = get_region(bucket)
3660 key = self.arns.get(region)
3661 if not key:
3662 self.log.warning('Unable to resolve key %s for bucket %s in region %s',
3663 self.data['key'], bucket.get('Name'), region)
3664 return key
3667@filters.register('bucket-encryption')
3668class BucketEncryption(KMSKeyResolverMixin, Filter):
3669 """Filters for S3 buckets that have bucket-encryption
3671 :example
3673 .. code-block:: yaml
3675 policies:
3676 - name: s3-bucket-encryption-AES256
3677 resource: s3
3678 region: us-east-1
3679 filters:
3680 - type: bucket-encryption
3681 state: True
3682 crypto: AES256
3683 - name: s3-bucket-encryption-KMS
3684 resource: s3
3685 region: us-east-1
3686 filters:
3687 - type: bucket-encryption
3688 state: True
3689 crypto: aws:kms
3690 key: alias/some/alias/key
3691 - name: s3-bucket-encryption-off
3692 resource: s3
3693 region: us-east-1
3694 filters:
3695 - type: bucket-encryption
3696 state: False
3697 - name: s3-bucket-test-bucket-key-enabled
3698 resource: s3
3699 region: us-east-1
3700 filters:
3701 - type: bucket-encryption
3702 bucket_key_enabled: True
3703 """
3704 schema = type_schema('bucket-encryption',
3705 state={'type': 'boolean'},
3706 crypto={'type': 'string', 'enum': ['AES256', 'aws:kms']},
3707 key={'type': 'string'},
3708 bucket_key_enabled={'type': 'boolean'})
3710 permissions = ('s3:GetEncryptionConfiguration', 'kms:DescribeKey', 'kms:ListAliases')
3711 annotation_key = 'c7n:bucket-encryption'
3713 def validate(self):
3714 if self.data.get('bucket_key_enabled') is not None and self.data.get('key') is not None:
3715 raise PolicyValidationError(
3716 f'key and bucket_key_enabled attributes cannot both be set: {self.data}'
3717 )
3719 def process(self, buckets, event=None):
3720 self.resolve_keys(buckets)
3721 results = []
3722 with self.executor_factory(max_workers=2) as w:
3723 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3724 for future in as_completed(futures):
3725 b = futures[future]
3726 if future.exception():
3727 self.log.error("Message: %s Bucket: %s", future.exception(),
3728 b['Name'])
3729 continue
3730 if future.result():
3731 results.append(b)
3732 return results
3734 def process_bucket(self, b):
3736 client = bucket_client(local_session(self.manager.session_factory), b)
3737 rules = []
3738 if self.annotation_key not in b:
3739 try:
3740 be = client.get_bucket_encryption(Bucket=b['Name'])
3741 be.pop('ResponseMetadata', None)
3742 except ClientError as e:
3743 if e.response['Error']['Code'] != 'ServerSideEncryptionConfigurationNotFoundError':
3744 raise
3745 be = {}
3746 b[self.annotation_key] = be
3747 else:
3748 be = b[self.annotation_key]
3750 rules = be.get('ServerSideEncryptionConfiguration', {}).get('Rules', [])
3751 # default `state` to True as previous impl assumed state == True
3752 # to preserve backwards compatibility
3753 if self.data.get('bucket_key_enabled'):
3754 for rule in rules:
3755 return self.filter_bucket_key_enabled(rule)
3756 elif self.data.get('bucket_key_enabled') is False:
3757 for rule in rules:
3758 return not self.filter_bucket_key_enabled(rule)
3760 if self.data.get('state', True):
3761 for sse in rules:
3762 return self.filter_bucket(b, sse)
3763 return False
3764 else:
3765 for sse in rules:
3766 return not self.filter_bucket(b, sse)
3767 return True
3769 def filter_bucket(self, b, sse):
3770 allowed = ['AES256', 'aws:kms']
3771 key = self.get_key(b)
3772 crypto = self.data.get('crypto')
3773 rule = sse.get('ApplyServerSideEncryptionByDefault')
3775 if not rule:
3776 return False
3777 algo = rule.get('SSEAlgorithm')
3779 if not crypto and algo in allowed:
3780 return True
3782 if crypto == 'AES256' and algo == 'AES256':
3783 return True
3784 elif crypto == 'aws:kms' and algo == 'aws:kms':
3785 if not key:
3786 # There are two broad reasons to have an empty value for
3787 # the regional key here:
3788 #
3789 # * The policy did not specify a key, in which case this
3790 # filter should match _all_ buckets with a KMS default
3791 # encryption rule.
3792 #
3793 # * The policy specified a key that could not be
3794 # resolved, in which case this filter shouldn't match
3795 # any buckets.
3796 return 'key' not in self.data
3798 # The default encryption rule can specify a key ID,
3799 # key ARN, alias name or alias ARN. Match against any of
3800 # those attributes. A rule specifying KMS with no master key
3801 # implies the AWS-managed key.
3802 key_ids = {key.get('Arn'), key.get('KeyId'), *key['Aliases']}
3803 return rule.get('KMSMasterKeyID', 'alias/aws/s3') in key_ids
3805 def filter_bucket_key_enabled(self, rule) -> bool:
3806 if not rule:
3807 return False
3808 return rule.get('BucketKeyEnabled')
3811@actions.register('set-bucket-encryption')
3812class SetBucketEncryption(KMSKeyResolverMixin, BucketActionBase):
3813 """Action enables default encryption on S3 buckets
3815 `enabled`: boolean Optional: Defaults to True
3817 `crypto`: aws:kms | AES256` Optional: Defaults to AES256
3819 `key`: arn, alias, or kms id key
3821 `bucket-key`: boolean Optional:
3822 Defaults to True.
3823 Reduces amount of API traffic from Amazon S3 to KMS and can reduce KMS request
3824 costsby up to 99 percent. Requires kms:Decrypt permissions for copy and upload
3825 on the AWS KMS Key Policy.
3827 Bucket Key Docs: https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucket-key.html
3829 :example:
3831 .. code-block:: yaml
3833 policies:
3834 - name: s3-enable-default-encryption-kms
3835 resource: s3
3836 actions:
3837 - type: set-bucket-encryption
3838 # enabled: true <------ optional (true by default)
3839 crypto: aws:kms
3840 key: 1234abcd-12ab-34cd-56ef-1234567890ab
3841 bucket-key: true
3843 - name: s3-enable-default-encryption-kms-alias
3844 resource: s3
3845 actions:
3846 - type: set-bucket-encryption
3847 # enabled: true <------ optional (true by default)
3848 crypto: aws:kms
3849 key: alias/some/alias/key
3850 bucket-key: true
3852 - name: s3-enable-default-encryption-aes256
3853 resource: s3
3854 actions:
3855 - type: set-bucket-encryption
3856 # bucket-key: true <--- optional (true by default for AWS SSE)
3857 # crypto: AES256 <----- optional (AES256 by default)
3858 # enabled: true <------ optional (true by default)
3860 - name: s3-disable-default-encryption
3861 resource: s3
3862 actions:
3863 - type: set-bucket-encryption
3864 enabled: false
3865 """
3867 schema = {
3868 'type': 'object',
3869 'additionalProperties': False,
3870 'properties': {
3871 'type': {'enum': ['set-bucket-encryption']},
3872 'enabled': {'type': 'boolean'},
3873 'crypto': {'enum': ['aws:kms', 'AES256']},
3874 'key': {'type': 'string'},
3875 'bucket-key': {'type': 'boolean'}
3876 },
3877 'dependencies': {
3878 'key': {
3879 'properties': {
3880 'crypto': {'pattern': 'aws:kms'}
3881 },
3882 'required': ['crypto']
3883 }
3884 }
3885 }
3887 permissions = ('s3:PutEncryptionConfiguration', 's3:GetEncryptionConfiguration',
3888 'kms:ListAliases', 'kms:DescribeKey')
3890 def process(self, buckets):
3891 if self.data.get('enabled', True):
3892 self.resolve_keys(buckets)
3894 with self.executor_factory(max_workers=3) as w:
3895 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3896 for future in as_completed(futures):
3897 if future.exception():
3898 self.log.error('Message: %s Bucket: %s', future.exception(),
3899 futures[future]['Name'])
3901 def process_bucket(self, bucket):
3902 default_key_desc = 'Default master key that protects my S3 objects when no other key is defined' # noqa
3903 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3904 if not self.data.get('enabled', True):
3905 s3.delete_bucket_encryption(Bucket=bucket['Name'])
3906 return
3907 algo = self.data.get('crypto', 'AES256')
3909 # bucket key defaults to True for alias/aws/s3 and AES256 (Amazon SSE)
3910 # and ignores False values for that crypto
3911 bucket_key = self.data.get('bucket-key', True)
3912 config = {
3913 'Rules': [
3914 {
3915 'ApplyServerSideEncryptionByDefault': {
3916 'SSEAlgorithm': algo,
3917 },
3918 'BucketKeyEnabled': bucket_key
3919 }
3920 ]
3921 }
3923 if algo == 'aws:kms':
3924 key = self.get_key(bucket)
3925 if not key:
3926 raise Exception('Valid KMS Key required but does not exist')
3928 config['Rules'][0]['ApplyServerSideEncryptionByDefault']['KMSMasterKeyID'] = key['Arn']
3929 s3.put_bucket_encryption(
3930 Bucket=bucket['Name'],
3931 ServerSideEncryptionConfiguration=config
3932 )
3935OWNERSHIP_CONTROLS = ['BucketOwnerEnforced', 'BucketOwnerPreferred', 'ObjectWriter']
3936VALUE_FILTER_MAGIC_VALUES = ['absent', 'present', 'not-null', 'empty']
3939@filters.register('ownership')
3940class BucketOwnershipControls(BucketFilterBase, ValueFilter):
3941 """Filter for object ownership controls
3943 Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html
3945 :example
3947 Find buckets with ACLs disabled
3949 .. code-block:: yaml
3951 policies:
3952 - name: s3-bucket-acls-disabled
3953 resource: aws.s3
3954 region: us-east-1
3955 filters:
3956 - type: ownership
3957 value: BucketOwnerEnforced
3959 :example
3961 Find buckets with object ownership preferred or enforced
3963 .. code-block:: yaml
3965 policies:
3966 - name: s3-bucket-ownership-preferred
3967 resource: aws.s3
3968 region: us-east-1
3969 filters:
3970 - type: ownership
3971 op: in
3972 value:
3973 - BucketOwnerEnforced
3974 - BucketOwnerPreferred
3976 :example
3978 Find buckets with no object ownership controls
3980 .. code-block:: yaml
3982 policies:
3983 - name: s3-bucket-no-ownership-controls
3984 resource: aws.s3
3985 region: us-east-1
3986 filters:
3987 - type: ownership
3988 value: empty
3989 """
3990 schema = type_schema('ownership', rinherit=ValueFilter.schema, value={'oneOf': [
3991 {'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES},
3992 {'type': 'array', 'items': {
3993 'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}}]})
3994 permissions = ('s3:GetBucketOwnershipControls',)
3995 annotation_key = 'c7n:ownership'
3997 def __init__(self, data, manager=None):
3998 super(BucketOwnershipControls, self).__init__(data, manager)
4000 # Ownership controls appear as an array of rules. There can only be one
4001 # ObjectOwnership rule defined for a bucket, so we can automatically
4002 # match against that if it exists.
4003 self.data['key'] = f'("{self.annotation_key}".Rules[].ObjectOwnership)[0]'
4005 def process(self, buckets, event=None):
4006 with self.executor_factory(max_workers=2) as w:
4007 futures = {w.submit(self.process_bucket, b): b for b in buckets}
4008 for future in as_completed(futures):
4009 b = futures[future]
4010 if future.exception():
4011 self.log.error("Message: %s Bucket: %s", future.exception(),
4012 b['Name'])
4013 continue
4014 return super(BucketOwnershipControls, self).process(buckets, event)
4016 def process_bucket(self, b):
4017 if self.annotation_key in b:
4018 return
4019 client = bucket_client(local_session(self.manager.session_factory), b)
4020 try:
4021 controls = client.get_bucket_ownership_controls(Bucket=b['Name'])
4022 controls.pop('ResponseMetadata', None)
4023 except ClientError as e:
4024 if e.response['Error']['Code'] != 'OwnershipControlsNotFoundError':
4025 raise
4026 controls = {}
4027 b[self.annotation_key] = controls.get('OwnershipControls')
4030@filters.register('bucket-replication')
4031class BucketReplication(ListItemFilter):
4032 """Filter for S3 buckets to look at bucket replication configurations
4034 The schema to supply to the attrs follows the schema here:
4035 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_bucket_replication.html
4037 :example:
4039 .. code-block:: yaml
4041 policies:
4042 - name: s3-bucket-replication
4043 resource: s3
4044 filters:
4045 - type: bucket-replication
4046 attrs:
4047 - Status: Enabled
4048 - Filter:
4049 And:
4050 Prefix: test
4051 Tags:
4052 - Key: Owner
4053 Value: c7n
4054 - ExistingObjectReplication: Enabled
4056 """
4057 schema = type_schema(
4058 'bucket-replication',
4059 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'},
4060 count={'type': 'number'},
4061 count_op={'$ref': '#/definitions/filters_common/comparison_operators'}
4062 )
4064 permissions = ("s3:GetReplicationConfiguration",)
4065 annotation_key = 'Replication'
4066 annotate_items = True
4068 def __init__(self, data, manager=None):
4069 super().__init__(data, manager)
4070 self.data['key'] = self.annotation_key
4072 def get_item_values(self, b):
4073 client = bucket_client(local_session(self.manager.session_factory), b)
4074 # replication configuration is called in S3_AUGMENT_TABLE:
4075 bucket_replication = b.get(self.annotation_key)
4077 rules = []
4078 if bucket_replication is not None:
4079 rules = bucket_replication.get('ReplicationConfiguration', {}).get('Rules', [])
4080 for replication in rules:
4081 self.augment_bucket_replication(b, replication, client)
4083 return rules
4085 def augment_bucket_replication(self, b, replication, client):
4086 destination_bucket = replication.get('Destination').get('Bucket').split(':')[5]
4087 try:
4088 destination_region = inspect_bucket_region(destination_bucket, client.meta.endpoint_url)
4089 except ValueError:
4090 replication['DestinationBucketAvailable'] = False
4091 return
4092 source_region = get_region(b)
4093 replication['DestinationBucketAvailable'] = True
4094 replication['DestinationRegion'] = destination_region
4095 replication['CrossRegion'] = destination_region != source_region
4098@resources.register('s3-directory')
4099class S3Directory(query.QueryResourceManager):
4101 class resource_type(query.TypeInfo):
4102 service = 's3'
4103 permission_prefix = "s3express"
4104 arn_service = "s3express"
4105 arn_type = 'bucket'
4106 enum_spec = ('list_directory_buckets', 'Buckets[]', None)
4107 name = id = 'Name'
4108 date = 'CreationDate'
4109 dimension = 'BucketName'
4110 cfn_type = 'AWS::S3Express::DirectoryBucket'
4111 permissions_enum = ("s3express:ListAllMyDirectoryBuckets",)