Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/s3.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""S3 Resource Manager
5Filters:
7The generic Values filters (jmespath) expression and Or filter are
8available with all resources, including buckets, we include several
9additonal bucket data (Tags, Replication, Acl, Policy) as keys within
10a bucket representation.
12Actions:
14 encrypt-keys
16 Scan all keys in a bucket and optionally encrypt them in place.
18 global-grants
20 Check bucket acls for global grants
22 encryption-policy
24 Attach an encryption required policy to a bucket, this will break
25 applications that are not using encryption, including aws log
26 delivery.
28"""
29import copy
30import functools
31import json
32import logging
33import math
34import os
35import time
36import threading
37import ssl
39from botocore.client import Config
40from botocore.exceptions import ClientError
42from collections import defaultdict
43from concurrent.futures import as_completed
45try:
46 from urllib3.exceptions import SSLError
47except ImportError:
48 from botocore.vendored.requests.packages.urllib3.exceptions import SSLError
51from c7n.actions import (
52 ActionRegistry, BaseAction, PutMetric, RemovePolicyBase)
53from c7n.exceptions import PolicyValidationError, PolicyExecutionError
54from c7n.filters import (
55 FilterRegistry, Filter, CrossAccountAccessFilter, MetricsFilter,
56 ValueFilter, ListItemFilter)
57from .aws import shape_validate
58from c7n.filters.policystatement import HasStatementFilter
59from c7n.manager import resources
60from c7n.output import NullBlobOutput
61from c7n import query
62from c7n.resources.securityhub import PostFinding
63from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction
64from c7n.utils import (
65 chunks, local_session, set_annotation, type_schema, filter_empty,
66 dumps, format_string_values, get_account_alias_from_sts)
67from c7n.resources.aws import inspect_bucket_region
70log = logging.getLogger('custodian.s3')
72filters = FilterRegistry('s3.filters')
73actions = ActionRegistry('s3.actions')
74filters.register('marked-for-op', TagActionFilter)
75actions.register('put-metric', PutMetric)
77MAX_COPY_SIZE = 1024 * 1024 * 1024 * 2
80class DescribeS3(query.DescribeSource):
82 def augment(self, buckets):
83 assembler = BucketAssembly(self.manager)
84 assembler.initialize()
86 with self.manager.executor_factory(
87 max_workers=min((10, len(buckets) + 1))) as w:
88 results = w.map(assembler.assemble, buckets)
89 results = list(filter(None, results))
90 return results
93class ConfigS3(query.ConfigSource):
95 # normalize config's janky idiosyncratic bespoke formating to the
96 # standard describe api responses.
98 def get_query_params(self, query):
99 q = super(ConfigS3, self).get_query_params(query)
100 if 'expr' in q:
101 q['expr'] = q['expr'].replace('select ', 'select awsRegion, ')
102 return q
104 def load_resource(self, item):
105 resource = super(ConfigS3, self).load_resource(item)
106 cfg = item['supplementaryConfiguration']
107 # aka standard
108 if 'awsRegion' in item and item['awsRegion'] != 'us-east-1':
109 resource['Location'] = {'LocationConstraint': item['awsRegion']}
110 else:
111 resource['Location'] = {}
113 # owner is under acl per describe
114 resource.pop('Owner', None)
116 for k, null_value in S3_CONFIG_SUPPLEMENT_NULL_MAP.items():
117 if k not in cfg:
118 continue
119 if cfg.get(k) == null_value:
120 continue
121 method = getattr(self, "handle_%s" % k, None)
122 if method is None:
123 raise ValueError("unhandled supplementary config %s", k)
124 continue
125 v = cfg[k]
126 if isinstance(cfg[k], str):
127 v = json.loads(cfg[k])
128 method(resource, v)
130 for el in S3_AUGMENT_TABLE:
131 if el[1] not in resource:
132 resource[el[1]] = el[2]
133 return resource
135 PERMISSION_MAP = {
136 'FullControl': 'FULL_CONTROL',
137 'Write': 'WRITE',
138 'WriteAcp': 'WRITE_ACP',
139 'Read': 'READ',
140 'ReadAcp': 'READ_ACP'}
142 GRANTEE_MAP = {
143 'AllUsers': "http://acs.amazonaws.com/groups/global/AllUsers",
144 'AuthenticatedUsers': "http://acs.amazonaws.com/groups/global/AuthenticatedUsers",
145 'LogDelivery': 'http://acs.amazonaws.com/groups/s3/LogDelivery'}
147 def handle_AccessControlList(self, resource, item_value):
148 # double serialized in config for some reason
149 if isinstance(item_value, str):
150 item_value = json.loads(item_value)
152 resource['Acl'] = {}
153 resource['Acl']['Owner'] = {'ID': item_value['owner']['id']}
154 if item_value['owner']['displayName']:
155 resource['Acl']['Owner']['DisplayName'] = item_value[
156 'owner']['displayName']
157 resource['Acl']['Grants'] = grants = []
159 for g in (item_value.get('grantList') or ()):
160 if 'id' not in g['grantee']:
161 assert g['grantee'] in self.GRANTEE_MAP, "unknown grantee %s" % g
162 rg = {'Type': 'Group', 'URI': self.GRANTEE_MAP[g['grantee']]}
163 else:
164 rg = {'ID': g['grantee']['id'], 'Type': 'CanonicalUser'}
166 if 'displayName' in g:
167 rg['DisplayName'] = g['displayName']
169 grants.append({
170 'Permission': self.PERMISSION_MAP[g['permission']],
171 'Grantee': rg,
172 })
174 def handle_BucketAccelerateConfiguration(self, resource, item_value):
175 # not currently auto-augmented by custodian
176 return
178 def handle_BucketLoggingConfiguration(self, resource, item_value):
179 if ('destinationBucketName' not in item_value or
180 item_value['destinationBucketName'] is None):
181 resource[u'Logging'] = {}
182 return
183 resource[u'Logging'] = {
184 'TargetBucket': item_value['destinationBucketName'],
185 'TargetPrefix': item_value['logFilePrefix']}
187 def handle_BucketLifecycleConfiguration(self, resource, item_value):
188 rules = []
189 for r in item_value.get('rules'):
190 rr = {}
191 rules.append(rr)
192 expiry = {}
193 for ek, ck in (
194 ('Date', 'expirationDate'),
195 ('ExpiredObjectDeleteMarker', 'expiredObjectDeleteMarker'),
196 ('Days', 'expirationInDays')):
197 if ck in r and r[ck] and r[ck] != -1:
198 expiry[ek] = r[ck]
199 if expiry:
200 rr['Expiration'] = expiry
202 transitions = []
203 for t in (r.get('transitions') or ()):
204 tr = {}
205 for k in ('date', 'days', 'storageClass'):
206 if t.get(k):
207 tr["%s%s" % (k[0].upper(), k[1:])] = t[k]
208 transitions.append(tr)
209 if transitions:
210 rr['Transitions'] = transitions
212 if r.get('abortIncompleteMultipartUpload'):
213 rr['AbortIncompleteMultipartUpload'] = {
214 'DaysAfterInitiation': r[
215 'abortIncompleteMultipartUpload']['daysAfterInitiation']}
216 if r.get('noncurrentVersionExpirationInDays'):
217 rr['NoncurrentVersionExpiration'] = {
218 'NoncurrentDays': r['noncurrentVersionExpirationInDays']}
220 nonc_transitions = []
221 for t in (r.get('noncurrentVersionTransitions') or ()):
222 nonc_transitions.append({
223 'NoncurrentDays': t['days'],
224 'StorageClass': t['storageClass']})
225 if nonc_transitions:
226 rr['NoncurrentVersionTransitions'] = nonc_transitions
228 rr['Status'] = r['status']
229 rr['ID'] = r['id']
230 if r.get('prefix'):
231 rr['Prefix'] = r['prefix']
232 if 'filter' not in r or not r['filter']:
233 continue
235 if r['filter']['predicate']:
236 rr['Filter'] = self.convertLifePredicate(r['filter']['predicate'])
238 resource['Lifecycle'] = {'Rules': rules}
240 def convertLifePredicate(self, p):
241 if p['type'] == 'LifecyclePrefixPredicate':
242 return {'Prefix': p['prefix']}
243 if p['type'] == 'LifecycleTagPredicate':
244 return {'Tags': [{'Key': p['tag']['key'], 'Value': p['tag']['value']}]}
245 if p['type'] == 'LifecycleAndOperator':
246 n = {}
247 for o in p['operands']:
248 ot = self.convertLifePredicate(o)
249 if 'Tags' in n and 'Tags' in ot:
250 n['Tags'].extend(ot['Tags'])
251 else:
252 n.update(ot)
253 return {'And': n}
255 raise ValueError("unknown predicate: %s" % p)
257 NotifyTypeMap = {
258 'QueueConfiguration': 'QueueConfigurations',
259 'LambdaConfiguration': 'LambdaFunctionConfigurations',
260 'CloudFunctionConfiguration': 'LambdaFunctionConfigurations',
261 'TopicConfiguration': 'TopicConfigurations'}
263 def handle_BucketNotificationConfiguration(self, resource, item_value):
264 d = {}
265 for nid, n in item_value['configurations'].items():
266 ninfo = {}
267 d.setdefault(self.NotifyTypeMap[n['type']], []).append(ninfo)
268 if n['type'] == 'QueueConfiguration':
269 ninfo['QueueArn'] = n['queueARN']
270 elif n['type'] == 'TopicConfiguration':
271 ninfo['TopicArn'] = n['topicARN']
272 elif n['type'] == 'LambdaConfiguration':
273 ninfo['LambdaFunctionArn'] = n['functionARN']
274 ninfo['Id'] = nid
275 ninfo['Events'] = n['events']
276 rules = []
277 if n['filter']:
278 for r in n['filter'].get('s3KeyFilter', {}).get('filterRules', []):
279 rules.append({'Name': r['name'], 'Value': r['value']})
280 if rules:
281 ninfo['Filter'] = {'Key': {'FilterRules': rules}}
282 resource['Notification'] = d
284 def handle_BucketReplicationConfiguration(self, resource, item_value):
285 d = {'Role': item_value['roleARN'], 'Rules': []}
286 for rid, r in item_value['rules'].items():
287 rule = {
288 'ID': rid,
289 'Status': r.get('status', ''),
290 'Prefix': r.get('prefix', ''),
291 'Destination': {
292 'Bucket': r['destinationConfig']['bucketARN']}
293 }
294 if 'Account' in r['destinationConfig']:
295 rule['Destination']['Account'] = r['destinationConfig']['Account']
296 if r['destinationConfig'].get('storageClass'):
297 rule['Destination']['StorageClass'] = r['destinationConfig']['storageClass']
298 d['Rules'].append(rule)
299 resource['Replication'] = {'ReplicationConfiguration': d}
301 def handle_BucketPolicy(self, resource, item_value):
302 resource['Policy'] = item_value.get('policyText')
304 def handle_BucketTaggingConfiguration(self, resource, item_value):
305 resource['Tags'] = [
306 {"Key": k, "Value": v} for k, v in item_value['tagSets'][0]['tags'].items()]
308 def handle_BucketVersioningConfiguration(self, resource, item_value):
309 # Config defaults versioning to 'Off' for a null value
310 if item_value['status'] not in ('Enabled', 'Suspended'):
311 resource['Versioning'] = {}
312 return
313 resource['Versioning'] = {'Status': item_value['status']}
314 # `isMfaDeleteEnabled` is an optional boolean property - the key may be absent,
315 # present with a null value, or present with a boolean value.
316 # Mirror the describe source by populating Versioning.MFADelete only in the
317 # boolean case.
318 mfa_delete = item_value.get('isMfaDeleteEnabled')
319 if mfa_delete is None:
320 return
321 resource['Versioning']['MFADelete'] = (
322 'Enabled' if mfa_delete else 'Disabled'
323 )
325 def handle_BucketWebsiteConfiguration(self, resource, item_value):
326 website = {}
327 if item_value['indexDocumentSuffix']:
328 website['IndexDocument'] = {
329 'Suffix': item_value['indexDocumentSuffix']}
330 if item_value['errorDocument']:
331 website['ErrorDocument'] = {
332 'Key': item_value['errorDocument']}
333 if item_value['redirectAllRequestsTo']:
334 website['RedirectAllRequestsTo'] = {
335 'HostName': item_value['redirectAllRequestsTo']['hostName'],
336 'Protocol': item_value['redirectAllRequestsTo']['protocol']}
337 for r in item_value['routingRules']:
338 redirect = {}
339 rule = {'Redirect': redirect}
340 website.setdefault('RoutingRules', []).append(rule)
341 if 'condition' in r:
342 cond = {}
343 for ck, rk in (
344 ('keyPrefixEquals', 'KeyPrefixEquals'),
345 ('httpErrorCodeReturnedEquals',
346 'HttpErrorCodeReturnedEquals')):
347 if r['condition'][ck]:
348 cond[rk] = r['condition'][ck]
349 rule['Condition'] = cond
350 for ck, rk in (
351 ('protocol', 'Protocol'),
352 ('hostName', 'HostName'),
353 ('replaceKeyPrefixWith', 'ReplaceKeyPrefixWith'),
354 ('replaceKeyWith', 'ReplaceKeyWith'),
355 ('httpRedirectCode', 'HttpRedirectCode')):
356 if r['redirect'][ck]:
357 redirect[rk] = r['redirect'][ck]
358 resource['Website'] = website
361@resources.register('s3')
362class S3(query.QueryResourceManager):
363 """Amazon's Simple Storage Service Buckets.
366 By default and due to historical compatiblity cloud custodian will
367 fetch a number of subdocuments (Acl, Policy, Tagging, Versioning,
368 Website, Notification, Lifecycle, and Replication) for each bucket
369 to allow policies authors's to target common bucket
370 configurations.
372 This behavior can be customized to avoid extraneous api calls if a
373 particular sub document is not needed for a policy, by setting the
374 `augment-keys` parameter in a query block of the policy.
376 ie if we only care about bucket website and replication
377 configuration, we can minimize the api calls needed to fetch a
378 bucket by setting up augment-keys as follows.
380 :example:
382 .. code-block:: yaml
384 policies:
385 - name: check-website-replication
386 resource: s3
387 query:
388 - augment-keys: ['Website', 'Replication']
389 filters:
390 - Website.ErrorDocument: not-null
391 - Replication.ReplicationConfiguration.Rules: not-null
393 It also supports an automatic detection mode where the use of a subdocument
394 in a filter is automically with the augment-keys value of 'detect'.
396 :example:
398 .. code-block:: yaml
400 policies:
401 - name: check-website-replication
402 resource: s3
403 query:
404 - augment-keys: 'detect'
405 filters:
406 - Website.ErrorDocument: not-null
407 - Replication.ReplicationConfiguration.Rules: not-null
409 The default value for augment-keys is `all` to preserve historical
410 compatiblity. `augment-keys` also supports the value of 'none' to
411 disable all subdocument fetching except Location and Tags.
413 Note certain actions may implicitly depend on the corresponding
414 subdocument being present.
416 """
418 class resource_type(query.TypeInfo):
419 service = 's3'
420 arn_type = ''
421 enum_spec = ('list_buckets', 'Buckets[]', None)
422 # not used but we want some consistency on the metadata
423 detail_spec = ('get_bucket_location', 'Bucket', 'Name', 'LocationConstraint')
424 permissions_augment = (
425 "s3:GetBucketAcl",
426 "s3:GetBucketLocation",
427 "s3:GetBucketPolicy",
428 "s3:GetBucketTagging",
429 "s3:GetBucketVersioning",
430 "s3:GetBucketLogging",
431 "s3:GetBucketNotification",
432 "s3:GetBucketWebsite",
433 "s3:GetLifecycleConfiguration",
434 "s3:GetReplicationConfiguration"
435 )
436 name = id = 'Name'
437 date = 'CreationDate'
438 dimension = 'BucketName'
439 cfn_type = config_type = 'AWS::S3::Bucket'
441 filter_registry = filters
442 action_registry = actions
443 source_mapping = {
444 'describe': DescribeS3,
445 'config': ConfigS3
446 }
448 def validate(self):
449 super().validate()
450 BucketAssembly(self).validate()
452 def get_arns(self, resources):
453 return ["arn:aws:s3:::{}".format(r["Name"]) for r in resources]
455 @classmethod
456 def get_permissions(cls):
457 perms = ["s3:ListAllMyBuckets"]
458 perms.extend([n[-1] for n in S3_AUGMENT_TABLE])
459 return perms
462S3_CONFIG_SUPPLEMENT_NULL_MAP = {
463 'BucketLoggingConfiguration': u'{"destinationBucketName":null,"logFilePrefix":null}',
464 'BucketPolicy': u'{"policyText":null}',
465 'BucketVersioningConfiguration': u'{"status":"Off","isMfaDeleteEnabled":null}',
466 'BucketAccelerateConfiguration': u'{"status":null}',
467 'BucketNotificationConfiguration': u'{"configurations":{}}',
468 'BucketLifecycleConfiguration': None,
469 'AccessControlList': None,
470 'BucketTaggingConfiguration': None,
471 'BucketWebsiteConfiguration': None,
472 'BucketReplicationConfiguration': None
473}
475S3_AUGMENT_TABLE = (
476 ('get_bucket_location', 'Location', {}, None, 's3:GetBucketLocation'),
477 ('get_bucket_tagging', 'Tags', [], 'TagSet', 's3:GetBucketTagging'),
478 ('get_bucket_policy', 'Policy', None, 'Policy', 's3:GetBucketPolicy'),
479 ('get_bucket_acl', 'Acl', None, None, 's3:GetBucketAcl'),
480 ('get_bucket_replication',
481 'Replication', None, None, 's3:GetReplicationConfiguration'),
482 ('get_bucket_versioning', 'Versioning', None, None, 's3:GetBucketVersioning'),
483 ('get_bucket_website', 'Website', None, None, 's3:GetBucketWebsite'),
484 ('get_bucket_logging', 'Logging', None, 'LoggingEnabled', 's3:GetBucketLogging'),
485 ('get_bucket_notification_configuration',
486 'Notification', None, None, 's3:GetBucketNotification'),
487 ('get_bucket_lifecycle_configuration',
488 'Lifecycle', None, None, 's3:GetLifecycleConfiguration'),
489 # ('get_bucket_cors', 'Cors'),
490)
493class BucketAssembly:
495 def __init__(self, manager):
496 self.manager = manager
497 self.default_region = None
498 self.region_clients = {}
499 self.session = None
500 self.session_lock = None
501 self.augment_fields = []
503 def initialize(self):
504 # construct a default boto3 client, using the current session region.
505 self.session = local_session(self.manager.session_factory)
506 self.session_lock = threading.RLock()
507 self.default_region = self.manager.config.region
508 self.region_clients[self.default_region] = self.session.client('s3')
509 self.augment_fields = set(self.detect_augment_fields())
510 # location is required for client construction
511 self.augment_fields.add('Location')
512 # custodian always returns tags
513 self.augment_fields.add('Tags')
515 def validate(self):
516 config = self.get_augment_config()
517 if isinstance(config, str) and config not in ('all', 'detect', 'none'):
518 raise PolicyValidationError(
519 "augment-keys supports 'all', 'detect', 'none' or list of keys found: %s" % config)
520 elif isinstance(config, list):
521 delta = set(config).difference([row[1] for row in S3_AUGMENT_TABLE])
522 if delta:
523 raise PolicyValidationError("augment-keys - found invalid keys: %s" % (list(delta)))
524 if not isinstance(config, (list, str)):
525 raise PolicyValidationError(
526 "augment-keys supports 'all', 'detect', 'none' or list of keys found: %s" % config)
528 def get_augment_config(self):
529 augment_config = None
530 for option in self.manager.data.get('query', []):
531 if option and option.get('augment-keys') is not None:
532 augment_config = option['augment-keys']
533 if augment_config is None:
534 augment_config = 'all'
535 return augment_config
537 def detect_augment_fields(self):
538 # try to detect augment fields required for the policy execution
539 # we want to avoid extraneous api calls unless they are being used by the policy.
541 detected_keys = []
542 augment_keys = [row[1] for row in S3_AUGMENT_TABLE]
543 augment_config = self.get_augment_config()
545 if augment_config == 'all':
546 return augment_keys
547 elif augment_config == 'none':
548 return []
549 elif isinstance(augment_config, list):
550 return augment_config
552 for f in self.manager.iter_filters():
553 fkey = None
554 if not isinstance(f, ValueFilter):
555 continue
557 f = f.data
558 # type: value
559 if f.get('type', '') == 'value':
560 fkey = f.get('key')
561 # k: v dict
562 elif len(f) == 1:
563 fkey = list(f.keys())[0]
564 if fkey is None: # pragma: no cover
565 continue
567 # remove any jmespath expressions
568 fkey = fkey.split('.', 1)[0]
570 # tags have explicit handling in value filters.
571 if fkey.startswith('tag:'):
572 fkey = 'Tags'
574 # denied methods checks get all keys
575 if fkey.startswith('c7n:DeniedMethods'):
576 return augment_keys
578 if fkey in augment_keys:
579 detected_keys.append(fkey)
581 return detected_keys
583 def get_client(self, region):
584 if region in self.region_clients:
585 return self.region_clients[region]
586 with self.session_lock:
587 self.region_clients[region] = self.session.client('s3', region_name=region)
588 return self.region_clients[region]
590 def assemble(self, bucket):
592 client = self.get_client(self.default_region)
593 augments = list(S3_AUGMENT_TABLE)
595 for info in augments:
596 # we use the offset, as tests manipulate the augments table
597 method_name, key, default, select = info[:4]
598 if key not in self.augment_fields:
599 continue
601 method = getattr(client, method_name)
603 try:
604 response = method(Bucket=bucket['Name'])
605 # This is here as exception handling will change to defaults if not present
606 response.pop('ResponseMetadata', None)
607 value = response
608 if select and select in value:
609 value = value[select]
610 except (ssl.SSLError, SSLError) as e:
611 # Proxy issue most likely
612 log.warning(
613 "Bucket ssl error %s: %s %s",
614 bucket['Name'], bucket.get('Location', 'unknown'), e)
615 continue
616 except ClientError as e:
617 code = e.response['Error']['Code']
618 if code.startswith("NoSuch") or "NotFound" in code:
619 value = default
620 elif code == 'PermanentRedirect': # pragma: no cover
621 # (09/2025)- its not clear how we get here given a client region switch post
622 # location detection.
623 #
624 # change client region
625 client = self.get_client(get_region(bucket))
626 # requeue now that we have correct region
627 augments.append((method_name, key, default, select))
628 continue
629 else:
630 # for auth errors record as attribute and move on
631 if e.response['Error']['Code'] == 'AccessDenied':
632 bucket.setdefault('c7n:DeniedMethods', []).append(method_name)
633 continue
634 # else log and raise
635 log.warning(
636 "Bucket:%s unable to invoke method:%s error:%s ",
637 bucket['Name'], method_name, e.response['Error']['Message'])
638 raise
640 # for historical reasons we normalize EU to eu-west-1 on the bucket
641 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlocation.html
642 if key == 'Location' and value and value.get('LocationConstraint', '') == 'EU':
643 value['LocationConstraint'] = 'eu-west-1'
645 bucket[key] = value
647 # For all subsequent attributes after location, use a client that is targeted to
648 # the bucket's regional s3 endpoint.
649 if key == 'Location' and get_region(bucket) != client.meta.region_name:
650 client = self.get_client(get_region(bucket))
651 return bucket
654def bucket_client(session, b, kms=False):
655 region = get_region(b)
657 if kms:
658 # Need v4 signature for aws:kms crypto, else let the sdk decide
659 # based on region support.
660 config = Config(
661 signature_version='s3v4',
662 read_timeout=200, connect_timeout=120)
663 else:
664 config = Config(read_timeout=200, connect_timeout=120)
665 return session.client('s3', region_name=region, config=config)
668def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()):
669 for bucket in buckets:
670 client = bucket_client(local_session(session_factory), bucket)
671 # Bucket tags are set atomically for the set/document, we want
672 # to refetch against current to guard against any staleness in
673 # our cached representation across multiple policies or concurrent
674 # modifications.
676 if 'get_bucket_tagging' in bucket.get('c7n:DeniedMethods', []):
677 # avoid the additional API call if we already know that it's going
678 # to result in AccessDenied. The chances that the resource's perms
679 # would have changed between fetching the resource and acting on it
680 # here are pretty low-- so the check here should suffice.
681 log.warning(
682 "Unable to get new set of bucket tags needed to modify tags,"
683 "skipping tag action for bucket: %s" % bucket["Name"])
684 continue
686 try:
687 bucket['Tags'] = client.get_bucket_tagging(
688 Bucket=bucket['Name']).get('TagSet', [])
689 except ClientError as e:
690 if e.response['Error']['Code'] != 'NoSuchTagSet':
691 raise
692 bucket['Tags'] = []
694 new_tags = {t['Key']: t['Value'] for t in add_tags}
695 for t in bucket.get('Tags', ()):
696 if (t['Key'] not in new_tags and t['Key'] not in remove_tags):
697 new_tags[t['Key']] = t['Value']
698 tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()]
700 try:
701 client.put_bucket_tagging(
702 Bucket=bucket['Name'], Tagging={'TagSet': tag_set})
703 except ClientError as e:
704 log.exception(
705 'Exception tagging bucket %s: %s', bucket['Name'], e)
706 continue
709def get_region(b):
710 """Tries to get the bucket region from Location.LocationConstraint
712 Special cases:
713 LocationConstraint EU defaults to eu-west-1
714 LocationConstraint null defaults to us-east-1
716 Args:
717 b (object): A bucket object
719 Returns:
720 string: an aws region string
721 """
722 remap = {None: 'us-east-1', 'EU': 'eu-west-1'}
723 region = b.get('Location', {}).get('LocationConstraint')
724 return remap.get(region, region)
727@filters.register('metrics')
728class S3Metrics(MetricsFilter):
729 """S3 CW Metrics need special handling for attribute/dimension
730 mismatch, and additional required dimension.
731 """
733 def get_dimensions(self, resource):
734 dims = [{'Name': 'BucketName', 'Value': resource['Name']}]
735 if (self.data['name'] == 'NumberOfObjects' and
736 'dimensions' not in self.data):
737 dims.append(
738 {'Name': 'StorageType', 'Value': 'AllStorageTypes'})
739 return dims
742@filters.register('cross-account')
743class S3CrossAccountFilter(CrossAccountAccessFilter):
744 """Filters cross-account access to S3 buckets
746 :example:
748 .. code-block:: yaml
750 policies:
751 - name: s3-acl
752 resource: s3
753 region: us-east-1
754 filters:
755 - type: cross-account
756 """
757 permissions = ('s3:GetBucketPolicy',)
759 def get_accounts(self):
760 """add in elb access by default
762 ELB Accounts by region
763 https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-access-logs.html
765 Redshift Accounts by region
766 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#db-auditing-manage-log-files
767 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#rs-db-auditing-cloud-trail-rs-acct-ids
769 Cloudtrail Accounts by region
770 https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-supported-regions.html
771 """
772 accounts = super(S3CrossAccountFilter, self).get_accounts()
773 return accounts.union(
774 [
775 # ELB accounts
776 '127311923021', # us-east-1
777 '033677994240', # us-east-2
778 '027434742980', # us-west-1
779 '797873946194', # us-west-2
780 '098369216593', # af-south-1
781 '985666609251', # ca-central-1
782 '054676820928', # eu-central-1
783 '897822967062', # eu-north-1
784 '635631232127', # eu-south-1
785 '156460612806', # eu-west-1
786 '652711504416', # eu-west-2
787 '009996457667', # eu-west-3
788 '754344448648', # ap-east-1
789 '582318560864', # ap-northeast-1
790 '600734575887', # ap-northeast-2
791 '383597477331', # ap-northeast-3
792 '114774131450', # ap-southeast-1
793 '783225319266', # ap-southeast-2
794 '718504428378', # ap-south-1
795 '076674570225', # me-south-1
796 '507241528517', # sa-east-1
797 '048591011584', # us-gov-west-1 or gov-cloud-1
798 '190560391635', # us-gov-east-1
799 '638102146993', # cn-north-1
800 '037604701340', # cn-northwest-1
802 # Redshift audit logging
803 '193672423079', # us-east-1
804 '391106570357', # us-east-2
805 '262260360010', # us-west-1
806 '902366379725', # us-west-2
807 '365689465814', # af-south-1
808 '313564881002', # ap-east-1
809 '865932855811', # ap-south-1
810 '090321488786', # ap-northeast-3
811 '760740231472', # ap-northeast-2
812 '361669875840', # ap-southeast-1
813 '762762565011', # ap-southeast-2
814 '404641285394', # ap-northeast-1
815 '907379612154', # ca-central-1
816 '053454850223', # eu-central-1
817 '210876761215', # eu-west-1
818 '307160386991', # eu-west-2
819 '945612479654', # eu-south-1
820 '915173422425', # eu-west-3
821 '729911121831', # eu-north-1
822 '013126148197', # me-south-1
823 '075028567923', # sa-east-1
825 # Cloudtrail accounts (psa. folks should be using
826 # cloudtrail service in bucket policies)
827 '086441151436', # us-east-1
828 '475085895292', # us-west-2
829 '388731089494', # us-west-1
830 '113285607260', # us-west-2
831 '819402241893', # ca-central-1
832 '977081816279', # ap-south-1
833 '492519147666', # ap-northeast-2
834 '903692715234', # ap-southeast-1
835 '284668455005', # ap-southeast-2
836 '216624486486', # ap-northeast-1
837 '035351147821', # eu-central-1
838 '859597730677', # eu-west-1
839 '282025262664', # eu-west-2
840 '814480443879', # sa-east-1
841 ])
844@filters.register('global-grants')
845class GlobalGrantsFilter(Filter):
846 """Filters for all S3 buckets that have global-grants
848 *Note* by default this filter allows for read access
849 if the bucket has been configured as a website. This
850 can be disabled per the example below.
852 :example:
854 .. code-block:: yaml
856 policies:
857 - name: remove-global-grants
858 resource: s3
859 filters:
860 - type: global-grants
861 allow_website: false
862 actions:
863 - delete-global-grants
865 """
867 schema = type_schema(
868 'global-grants',
869 allow_website={'type': 'boolean'},
870 operator={'type': 'string', 'enum': ['or', 'and']},
871 permissions={
872 'type': 'array', 'items': {
873 'type': 'string', 'enum': [
874 'READ', 'WRITE', 'WRITE_ACP', 'READ_ACP', 'FULL_CONTROL']}})
876 GLOBAL_ALL = "http://acs.amazonaws.com/groups/global/AllUsers"
877 AUTH_ALL = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers"
879 def process(self, buckets, event=None):
880 with self.executor_factory(max_workers=5) as w:
881 results = w.map(self.process_bucket, buckets)
882 results = list(filter(None, list(results)))
883 return results
885 def process_bucket(self, b):
886 acl = b.get('Acl', {'Grants': []})
887 if not acl or not acl['Grants']:
888 return
890 results = []
891 allow_website = self.data.get('allow_website', True)
892 perms = self.data.get('permissions', [])
894 for grant in acl['Grants']:
895 if 'URI' not in grant.get("Grantee", {}):
896 continue
897 if grant['Grantee']['URI'] not in [self.AUTH_ALL, self.GLOBAL_ALL]:
898 continue
899 if allow_website and grant['Permission'] == 'READ' and b['Website']:
900 continue
901 if not perms or (perms and grant['Permission'] in perms):
902 results.append(grant['Permission'])
904 if results:
905 set_annotation(b, 'GlobalPermissions', results)
906 return b
909class BucketActionBase(BaseAction):
911 def get_permissions(self):
912 return self.permissions
914 def get_std_format_args(self, bucket):
915 return {
916 'account_id': self.manager.config.account_id,
917 'region': self.manager.config.region,
918 'bucket_name': bucket['Name'],
919 'bucket_region': get_region(bucket)
920 }
922 def process(self, buckets):
923 return self._process_with_futures(buckets)
925 def _process_with_futures(self, buckets, *args, max_workers=3, **kwargs):
926 errors = 0
927 results = []
928 with self.executor_factory(max_workers=max_workers) as w:
929 futures = {}
930 for b in buckets:
931 futures[w.submit(self.process_bucket, b, *args, **kwargs)] = b
932 for f in as_completed(futures):
933 if f.exception():
934 b = futures[f]
935 self.log.error(
936 'error modifying bucket: policy:%s action:%s bucket:%s error:%s',
937 self.manager.data.get('name'), self.name, b['Name'], f.exception()
938 )
939 errors += 1
940 continue
941 results += filter(None, [f.result()])
942 if errors:
943 self.log.error('encountered %d errors while processing %s', errors, self.name)
944 raise PolicyExecutionError('%d resources failed', errors)
945 return results
948class BucketFilterBase(Filter):
949 def get_std_format_args(self, bucket):
950 return {
951 'account_id': self.manager.config.account_id,
952 'region': self.manager.config.region,
953 'bucket_name': bucket['Name'],
954 'bucket_region': get_region(bucket)
955 }
958@S3.action_registry.register("post-finding")
959class BucketFinding(PostFinding):
961 resource_type = 'AwsS3Bucket'
963 def format_resource(self, r):
964 owner = r.get("Acl", {}).get("Owner", {})
965 resource = {
966 "Type": self.resource_type,
967 "Id": "arn:aws:s3:::{}".format(r["Name"]),
968 "Region": get_region(r),
969 "Tags": {t["Key"]: t["Value"] for t in r.get("Tags", [])},
970 "Details": {self.resource_type: {
971 "OwnerId": owner.get('ID', 'Unknown')}}
972 }
974 if "DisplayName" in owner:
975 resource["Details"]["AwsS3Bucket"]["OwnerName"] = owner['DisplayName']
977 return filter_empty(resource)
980@S3.filter_registry.register('has-statement')
981class S3HasStatementFilter(HasStatementFilter):
982 def get_std_format_args(self, bucket):
983 return {
984 'account_id': self.manager.config.account_id,
985 'region': self.manager.config.region,
986 'bucket_name': bucket['Name'],
987 'bucket_region': get_region(bucket)
988 }
991@S3.filter_registry.register('lock-configuration')
992class S3LockConfigurationFilter(ValueFilter):
993 """
994 Filter S3 buckets based on their object lock configurations
996 :example:
998 Get all buckets where lock configuration mode is COMPLIANCE
1000 .. code-block:: yaml
1002 policies:
1003 - name: lock-configuration-compliance
1004 resource: aws.s3
1005 filters:
1006 - type: lock-configuration
1007 key: Rule.DefaultRetention.Mode
1008 value: COMPLIANCE
1010 """
1011 schema = type_schema('lock-configuration', rinherit=ValueFilter.schema)
1012 permissions = ('s3:GetBucketObjectLockConfiguration',)
1013 annotate = True
1014 annotation_key = 'c7n:ObjectLockConfiguration'
1016 def _process_resource(self, client, resource):
1017 try:
1018 config = client.get_object_lock_configuration(
1019 Bucket=resource['Name']
1020 )['ObjectLockConfiguration']
1021 except ClientError as e:
1022 if e.response['Error']['Code'] == 'ObjectLockConfigurationNotFoundError':
1023 config = None
1024 else:
1025 raise
1026 resource[self.annotation_key] = config
1028 def process(self, resources, event=None):
1029 client = local_session(self.manager.session_factory).client('s3')
1030 with self.executor_factory(max_workers=3) as w:
1031 futures = []
1032 for res in resources:
1033 if self.annotation_key in res:
1034 continue
1035 futures.append(w.submit(self._process_resource, client, res))
1036 for f in as_completed(futures):
1037 exc = f.exception()
1038 if exc:
1039 self.log.error(
1040 "Exception getting bucket lock configuration \n %s" % (
1041 exc))
1042 return super().process(resources, event)
1044 def __call__(self, r):
1045 return super().__call__(r.setdefault(self.annotation_key, None))
1048ENCRYPTION_STATEMENT_GLOB = {
1049 'Effect': 'Deny',
1050 'Principal': '*',
1051 'Action': 's3:PutObject',
1052 "Condition": {
1053 "StringNotEquals": {
1054 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}}
1057@filters.register('no-encryption-statement')
1058class EncryptionEnabledFilter(Filter):
1059 """Find buckets with missing encryption policy statements.
1061 :example:
1063 .. code-block:: yaml
1065 policies:
1066 - name: s3-bucket-not-encrypted
1067 resource: s3
1068 filters:
1069 - type: no-encryption-statement
1070 """
1071 schema = type_schema(
1072 'no-encryption-statement')
1074 def get_permissions(self):
1075 perms = self.manager.get_resource_manager('s3').get_permissions()
1076 return perms
1078 def process(self, buckets, event=None):
1079 return list(filter(None, map(self.process_bucket, buckets)))
1081 def process_bucket(self, b):
1082 p = b.get('Policy')
1083 if p is None:
1084 return b
1085 p = json.loads(p)
1086 encryption_statement = dict(ENCRYPTION_STATEMENT_GLOB)
1088 statements = p.get('Statement', [])
1089 check = False
1090 for s in list(statements):
1091 if 'Sid' in s:
1092 encryption_statement["Sid"] = s["Sid"]
1093 if 'Resource' in s:
1094 encryption_statement["Resource"] = s["Resource"]
1095 if s == encryption_statement:
1096 check = True
1097 break
1098 if check:
1099 return None
1100 else:
1101 return b
1104@filters.register('missing-statement')
1105@filters.register('missing-policy-statement')
1106class MissingPolicyStatementFilter(Filter):
1107 """Find buckets missing a set of named policy statements.
1109 :example:
1111 .. code-block:: yaml
1113 policies:
1114 - name: s3-bucket-missing-statement
1115 resource: s3
1116 filters:
1117 - type: missing-statement
1118 statement_ids:
1119 - RequiredEncryptedPutObject
1120 """
1122 schema = type_schema(
1123 'missing-policy-statement',
1124 aliases=('missing-statement',),
1125 statement_ids={'type': 'array', 'items': {'type': 'string'}})
1127 def __call__(self, b):
1128 p = b.get('Policy')
1129 if p is None:
1130 return b
1132 p = json.loads(p)
1134 required = list(self.data.get('statement_ids', []))
1135 statements = p.get('Statement', [])
1136 for s in list(statements):
1137 if s.get('Sid') in required:
1138 required.remove(s['Sid'])
1139 if not required:
1140 return False
1141 return True
1144@filters.register('bucket-notification')
1145class BucketNotificationFilter(ValueFilter):
1146 """Filter based on bucket notification configuration.
1148 :example:
1150 .. code-block:: yaml
1152 policies:
1153 - name: delete-incorrect-notification
1154 resource: s3
1155 filters:
1156 - type: bucket-notification
1157 kind: lambda
1158 key: Id
1159 value: "IncorrectLambda"
1160 op: eq
1161 actions:
1162 - type: delete-bucket-notification
1163 statement_ids: matched
1164 """
1166 schema = type_schema(
1167 'bucket-notification',
1168 required=['kind'],
1169 kind={'type': 'string', 'enum': ['lambda', 'sns', 'sqs']},
1170 rinherit=ValueFilter.schema)
1171 schema_alias = False
1172 annotation_key = 'c7n:MatchedNotificationConfigurationIds'
1174 permissions = ('s3:GetBucketNotification',)
1176 FIELDS = {
1177 'lambda': 'LambdaFunctionConfigurations',
1178 'sns': 'TopicConfigurations',
1179 'sqs': 'QueueConfigurations'
1180 }
1182 def process(self, buckets, event=None):
1183 return super(BucketNotificationFilter, self).process(buckets, event)
1185 def __call__(self, bucket):
1187 field = self.FIELDS[self.data['kind']]
1188 found = False
1189 for config in bucket.get('Notification', {}).get(field, []):
1190 if self.match(config):
1191 set_annotation(
1192 bucket,
1193 BucketNotificationFilter.annotation_key,
1194 config['Id'])
1195 found = True
1196 return found
1199@filters.register('bucket-logging')
1200class BucketLoggingFilter(BucketFilterBase):
1201 """Filter based on bucket logging configuration.
1203 :example:
1205 .. code-block:: yaml
1207 policies:
1208 - name: add-bucket-logging-if-missing
1209 resource: s3
1210 filters:
1211 - type: bucket-logging
1212 op: disabled
1213 actions:
1214 - type: toggle-logging
1215 target_bucket: "{account_id}-{region}-s3-logs"
1216 target_prefix: "{source_bucket_name}/"
1218 policies:
1219 - name: update-incorrect-or-missing-logging
1220 resource: s3
1221 filters:
1222 - type: bucket-logging
1223 op: not-equal
1224 target_bucket: "{account_id}-{region}-s3-logs"
1225 target_prefix: "{account}/{source_bucket_name}/"
1226 actions:
1227 - type: toggle-logging
1228 target_bucket: "{account_id}-{region}-s3-logs"
1229 target_prefix: "{account}/{source_bucket_name}/"
1230 """
1232 schema = type_schema(
1233 'bucket-logging',
1234 op={'enum': ['enabled', 'disabled', 'equal', 'not-equal', 'eq', 'ne']},
1235 required=['op'],
1236 target_bucket={'type': 'string'},
1237 target_prefix={'type': 'string'})
1238 schema_alias = False
1239 account_name = None
1241 permissions = ("s3:GetBucketLogging", "iam:ListAccountAliases")
1243 def process(self, buckets, event=None):
1244 return list(filter(None, map(self.process_bucket, buckets)))
1246 def process_bucket(self, b):
1247 if self.match_bucket(b):
1248 return b
1250 def match_bucket(self, b):
1251 op = self.data.get('op')
1253 logging = b.get('Logging', {})
1254 if op == 'disabled':
1255 return logging == {}
1256 elif op == 'enabled':
1257 return logging != {}
1259 if self.account_name is None:
1260 session = local_session(self.manager.session_factory)
1261 self.account_name = get_account_alias_from_sts(session)
1263 variables = self.get_std_format_args(b)
1264 variables.update({
1265 'account': self.account_name,
1266 'source_bucket_name': b['Name'],
1267 'source_bucket_region': get_region(b),
1268 'target_bucket_name': self.data.get('target_bucket'),
1269 'target_prefix': self.data.get('target_prefix'),
1270 })
1271 data = format_string_values(self.data, **variables)
1272 target_bucket = data.get('target_bucket')
1273 target_prefix = data.get('target_prefix', b['Name'] + '/')
1275 target_config = {
1276 "TargetBucket": target_bucket,
1277 "TargetPrefix": target_prefix
1278 } if target_bucket else {}
1280 if op in ('not-equal', 'ne'):
1281 return logging != target_config
1282 else:
1283 return logging == target_config
1286@actions.register('delete-bucket-notification')
1287class DeleteBucketNotification(BucketActionBase):
1288 """Action to delete S3 bucket notification configurations"""
1290 schema = type_schema(
1291 'delete-bucket-notification',
1292 required=['statement_ids'],
1293 statement_ids={'oneOf': [
1294 {'enum': ['matched']},
1295 {'type': 'array', 'items': {'type': 'string'}}]})
1297 permissions = ('s3:PutBucketNotification',)
1299 def process_bucket(self, bucket):
1300 n = bucket['Notification']
1301 if not n:
1302 return
1304 statement_ids = self.data.get('statement_ids')
1305 if statement_ids == 'matched':
1306 statement_ids = bucket.get(BucketNotificationFilter.annotation_key, ())
1307 if not statement_ids:
1308 return
1310 cfg = defaultdict(list)
1312 for t in BucketNotificationFilter.FIELDS.values():
1313 for c in n.get(t, []):
1314 if c['Id'] not in statement_ids:
1315 cfg[t].append(c)
1317 client = bucket_client(local_session(self.manager.session_factory), bucket)
1318 client.put_bucket_notification_configuration(
1319 Bucket=bucket['Name'],
1320 NotificationConfiguration=cfg)
1323@actions.register('no-op')
1324class NoOp(BucketActionBase):
1326 schema = type_schema('no-op')
1327 permissions = ('s3:ListAllMyBuckets',)
1329 def process(self, buckets):
1330 return None
1333@actions.register('set-statements')
1334class SetPolicyStatement(BucketActionBase):
1335 """Action to add or update policy statements to S3 buckets
1337 :example:
1339 .. code-block:: yaml
1341 policies:
1342 - name: force-s3-https
1343 resource: s3
1344 actions:
1345 - type: set-statements
1346 statements:
1347 - Sid: "DenyHttp"
1348 Effect: "Deny"
1349 Action: "s3:GetObject"
1350 Principal:
1351 AWS: "*"
1352 Resource: "arn:aws:s3:::{bucket_name}/*"
1353 Condition:
1354 Bool:
1355 "aws:SecureTransport": false
1356 """
1358 permissions = ('s3:PutBucketPolicy',)
1360 schema = type_schema(
1361 'set-statements',
1362 **{
1363 'statements': {
1364 'type': 'array',
1365 'items': {
1366 'type': 'object',
1367 'properties': {
1368 'Sid': {'type': 'string'},
1369 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']},
1370 'Principal': {'anyOf': [{'type': 'string'},
1371 {'type': 'object'}, {'type': 'array'}]},
1372 'NotPrincipal': {'anyOf': [{'type': 'object'}, {'type': 'array'}]},
1373 'Action': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1374 'NotAction': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1375 'Resource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1376 'NotResource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1377 'Condition': {'type': 'object'}
1378 },
1379 'required': ['Sid', 'Effect'],
1380 'oneOf': [
1381 {'required': ['Principal', 'Action', 'Resource']},
1382 {'required': ['NotPrincipal', 'Action', 'Resource']},
1383 {'required': ['Principal', 'NotAction', 'Resource']},
1384 {'required': ['NotPrincipal', 'NotAction', 'Resource']},
1385 {'required': ['Principal', 'Action', 'NotResource']},
1386 {'required': ['NotPrincipal', 'Action', 'NotResource']},
1387 {'required': ['Principal', 'NotAction', 'NotResource']},
1388 {'required': ['NotPrincipal', 'NotAction', 'NotResource']}
1389 ]
1390 }
1391 }
1392 }
1393 )
1395 def process_bucket(self, bucket):
1396 policy = bucket.get('Policy') or '{}'
1398 target_statements = format_string_values(
1399 copy.deepcopy({s['Sid']: s for s in self.data.get('statements', [])}),
1400 **self.get_std_format_args(bucket))
1402 policy = json.loads(policy)
1403 bucket_statements = policy.setdefault('Statement', [])
1405 for s in bucket_statements:
1406 if s.get('Sid') not in target_statements:
1407 continue
1408 if s == target_statements[s['Sid']]:
1409 target_statements.pop(s['Sid'])
1411 if not target_statements:
1412 return
1414 bucket_statements.extend(target_statements.values())
1415 policy = json.dumps(policy)
1417 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1418 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=policy)
1419 return {'Name': bucket['Name'], 'Policy': policy}
1422@actions.register('remove-statements')
1423class RemovePolicyStatement(RemovePolicyBase):
1424 """Action to remove policy statements from S3 buckets
1426 :example:
1428 .. code-block:: yaml
1430 policies:
1431 - name: s3-remove-encrypt-put
1432 resource: s3
1433 filters:
1434 - type: has-statement
1435 statement_ids:
1436 - RequireEncryptedPutObject
1437 actions:
1438 - type: remove-statements
1439 statement_ids:
1440 - RequiredEncryptedPutObject
1441 """
1443 permissions = ("s3:PutBucketPolicy", "s3:DeleteBucketPolicy")
1445 def process(self, buckets):
1446 with self.executor_factory(max_workers=3) as w:
1447 futures = {}
1448 results = []
1449 for b in buckets:
1450 futures[w.submit(self.process_bucket, b)] = b
1451 for f in as_completed(futures):
1452 if f.exception():
1453 b = futures[f]
1454 self.log.error('error modifying bucket:%s\n%s',
1455 b['Name'], f.exception())
1456 results += filter(None, [f.result()])
1457 return results
1459 def process_bucket(self, bucket):
1460 p = bucket.get('Policy')
1461 if p is None:
1462 return
1464 p = json.loads(p)
1466 statements, found = self.process_policy(
1467 p, bucket, CrossAccountAccessFilter.annotation_key)
1469 if not found:
1470 return
1472 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1474 if not statements:
1475 s3.delete_bucket_policy(Bucket=bucket['Name'])
1476 else:
1477 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=json.dumps(p))
1478 return {'Name': bucket['Name'], 'State': 'PolicyRemoved', 'Statements': found}
1481@actions.register('set-replication')
1482class SetBucketReplicationConfig(BucketActionBase):
1483 """Action to add or remove replication configuration statement from S3 buckets
1485 :example:
1487 .. code-block:: yaml
1489 policies:
1490 - name: s3-unapproved-account-replication
1491 resource: s3
1492 filters:
1493 - type: value
1494 key: Replication.ReplicationConfiguration.Rules[].Destination.Account
1495 value: present
1496 - type: value
1497 key: Replication.ReplicationConfiguration.Rules[].Destination.Account
1498 value_from:
1499 url: 's3:///path/to/file.json'
1500 format: json
1501 expr: "approved_accounts.*"
1502 op: ni
1503 actions:
1504 - type: set-replication
1505 state: enable
1506 """
1507 schema = type_schema(
1508 'set-replication',
1509 state={'type': 'string', 'enum': ['enable', 'disable', 'remove']})
1510 permissions = ("s3:GetReplicationConfiguration", "s3:PutReplicationConfiguration")
1512 def process(self, buckets):
1513 with self.executor_factory(max_workers=3) as w:
1514 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1515 errors = []
1516 for future in as_completed(futures):
1517 bucket = futures[future]
1518 try:
1519 future.result()
1520 except ClientError as e:
1521 errors.append("Message: %s Bucket: %s", e, bucket['Name'])
1522 if errors:
1523 raise Exception('\n'.join(map(str, errors)))
1525 def process_bucket(self, bucket):
1526 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1527 state = self.data.get('state')
1528 if state is not None:
1529 if state == 'remove':
1530 s3.delete_bucket_replication(Bucket=bucket['Name'])
1531 return {'Name': bucket['Name'], 'State': 'ReplicationConfigRemoved'}
1532 if state in ('enable', 'disable'):
1533 config = s3.get_bucket_replication(Bucket=bucket['Name'])
1534 for rule in config['ReplicationConfiguration']['Rules']:
1535 rule['Status'] = 'Enabled' if state == 'enable' else 'Disabled'
1536 s3.put_bucket_replication(
1537 Bucket=bucket['Name'],
1538 ReplicationConfiguration=config['ReplicationConfiguration']
1539 )
1540 return {'Name': bucket['Name'], 'State': 'ReplicationConfigUpdated'}
1543@filters.register('check-public-block')
1544class FilterPublicBlock(Filter):
1545 """Filter for s3 bucket public blocks
1547 If no filter paramaters are provided it checks to see if any are unset or False.
1549 If parameters are provided only the provided ones are checked.
1551 :example:
1553 .. code-block:: yaml
1555 policies:
1556 - name: CheckForPublicAclBlock-Off
1557 resource: s3
1558 region: us-east-1
1559 filters:
1560 - type: check-public-block
1561 BlockPublicAcls: true
1562 BlockPublicPolicy: true
1563 """
1565 schema = type_schema(
1566 'check-public-block',
1567 BlockPublicAcls={'type': 'boolean'},
1568 IgnorePublicAcls={'type': 'boolean'},
1569 BlockPublicPolicy={'type': 'boolean'},
1570 RestrictPublicBuckets={'type': 'boolean'})
1571 permissions = ("s3:GetBucketPublicAccessBlock",)
1572 keys = (
1573 'BlockPublicPolicy', 'BlockPublicAcls', 'IgnorePublicAcls', 'RestrictPublicBuckets')
1574 annotation_key = 'c7n:PublicAccessBlock'
1576 def process(self, buckets, event=None):
1577 results = []
1578 with self.executor_factory(max_workers=2) as w:
1579 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1580 for f in as_completed(futures):
1581 if f.result():
1582 results.append(futures[f])
1583 return results
1585 def process_bucket(self, bucket):
1586 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1587 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys}))
1588 if self.annotation_key not in bucket:
1589 try:
1590 config = s3.get_public_access_block(
1591 Bucket=bucket['Name'])['PublicAccessBlockConfiguration']
1592 except ClientError as e:
1593 error_code = e.response['Error']['Code']
1594 if error_code == 'NoSuchPublicAccessBlockConfiguration':
1595 pass
1596 elif error_code == 'AccessDenied':
1597 # Follow the same logic as `assemble_bucket` - log and continue on access
1598 # denied errors rather than halting a policy altogether
1599 method = 'GetPublicAccessBlock'
1600 log.warning(
1601 "Bucket:%s unable to invoke method:%s error:%s ",
1602 bucket['Name'], method, e.response['Error']['Message']
1603 )
1604 bucket.setdefault('c7n:DeniedMethods', []).append(method)
1605 else:
1606 raise
1607 bucket[self.annotation_key] = config
1608 return self.matches_filter(config)
1610 def matches_filter(self, config):
1611 key_set = [key for key in self.keys if key in self.data]
1612 if key_set:
1613 return all([self.data.get(key) is config[key] for key in key_set])
1614 else:
1615 return not all(config.values())
1618@actions.register('set-public-block')
1619class SetPublicBlock(BucketActionBase):
1620 """Action to update Public Access blocks on S3 buckets
1622 If no action parameters are provided all settings will be set to the `state`, which defaults
1624 If action parameters are provided, those will be set and other extant values preserved.
1626 :example:
1628 .. code-block:: yaml
1630 policies:
1631 - name: s3-public-block-enable-all
1632 resource: s3
1633 filters:
1634 - type: check-public-block
1635 actions:
1636 - type: set-public-block
1638 policies:
1639 - name: s3-public-block-disable-all
1640 resource: s3
1641 filters:
1642 - type: check-public-block
1643 actions:
1644 - type: set-public-block
1645 state: false
1647 policies:
1648 - name: s3-public-block-enable-some
1649 resource: s3
1650 filters:
1651 - or:
1652 - type: check-public-block
1653 BlockPublicAcls: false
1654 - type: check-public-block
1655 BlockPublicPolicy: false
1656 actions:
1657 - type: set-public-block
1658 BlockPublicAcls: true
1659 BlockPublicPolicy: true
1661 """
1663 schema = type_schema(
1664 'set-public-block',
1665 state={'type': 'boolean', 'default': True},
1666 BlockPublicAcls={'type': 'boolean'},
1667 IgnorePublicAcls={'type': 'boolean'},
1668 BlockPublicPolicy={'type': 'boolean'},
1669 RestrictPublicBuckets={'type': 'boolean'})
1670 permissions = ("s3:GetBucketPublicAccessBlock", "s3:PutBucketPublicAccessBlock")
1671 keys = FilterPublicBlock.keys
1672 annotation_key = FilterPublicBlock.annotation_key
1674 def process(self, buckets):
1675 with self.executor_factory(max_workers=3) as w:
1676 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1677 for future in as_completed(futures):
1678 future.result()
1680 def process_bucket(self, bucket):
1681 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1682 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys}))
1683 if self.annotation_key not in bucket:
1684 try:
1685 config = s3.get_public_access_block(
1686 Bucket=bucket['Name'])['PublicAccessBlockConfiguration']
1687 except ClientError as e:
1688 if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration':
1689 raise
1691 key_set = [key for key in self.keys if key in self.data]
1692 if key_set:
1693 for key in key_set:
1694 config[key] = self.data.get(key)
1695 else:
1696 for key in self.keys:
1697 config[key] = self.data.get('state', True)
1698 s3.put_public_access_block(
1699 Bucket=bucket['Name'], PublicAccessBlockConfiguration=config)
1702@actions.register('toggle-versioning')
1703class ToggleVersioning(BucketActionBase):
1704 """Action to enable/suspend versioning on a S3 bucket
1706 Note versioning can never be disabled only suspended.
1708 :example:
1710 .. code-block:: yaml
1712 policies:
1713 - name: s3-enable-versioning
1714 resource: s3
1715 filters:
1716 - or:
1717 - type: value
1718 key: Versioning.Status
1719 value: Suspended
1720 - type: value
1721 key: Versioning.Status
1722 value: absent
1723 actions:
1724 - type: toggle-versioning
1725 enabled: true
1726 """
1728 schema = type_schema(
1729 'toggle-versioning',
1730 enabled={'type': 'boolean'})
1731 permissions = ("s3:PutBucketVersioning",)
1733 def process_versioning(self, resource, state):
1734 client = bucket_client(
1735 local_session(self.manager.session_factory), resource)
1736 try:
1737 client.put_bucket_versioning(
1738 Bucket=resource['Name'],
1739 VersioningConfiguration={
1740 'Status': state})
1741 except ClientError as e:
1742 if e.response['Error']['Code'] != 'AccessDenied':
1743 log.error(
1744 "Unable to put bucket versioning on bucket %s: %s" % (resource['Name'], e))
1745 raise
1746 log.warning(
1747 "Access Denied Bucket:%s while put bucket versioning" % resource['Name'])
1749 # mfa delete enablement looks like it needs the serial and a current token.
1750 def process(self, resources):
1751 enabled = self.data.get('enabled', True)
1752 for r in resources:
1753 if 'Versioning' not in r or not r['Versioning']:
1754 r['Versioning'] = {'Status': 'Suspended'}
1755 if enabled and (
1756 r['Versioning']['Status'] == 'Suspended'):
1757 self.process_versioning(r, 'Enabled')
1758 if not enabled and r['Versioning']['Status'] == 'Enabled':
1759 self.process_versioning(r, 'Suspended')
1762@actions.register('toggle-logging')
1763class ToggleLogging(BucketActionBase):
1764 """Action to enable/disable logging on a S3 bucket.
1766 Target bucket ACL must allow for WRITE and READ_ACP Permissions
1767 Not specifying a target_prefix will default to the current bucket name.
1768 https://docs.aws.amazon.com/AmazonS3/latest/dev/enable-logging-programming.html
1770 :example:
1772 .. code-block:: yaml
1774 policies:
1775 - name: s3-enable-logging
1776 resource: s3
1777 filters:
1778 - "tag:Testing": present
1779 actions:
1780 - type: toggle-logging
1781 target_bucket: log-bucket
1782 target_prefix: logs123/
1784 policies:
1785 - name: s3-force-standard-logging
1786 resource: s3
1787 filters:
1788 - type: bucket-logging
1789 op: not-equal
1790 target_bucket: "{account_id}-{region}-s3-logs"
1791 target_prefix: "{account}/{source_bucket_name}/"
1792 actions:
1793 - type: toggle-logging
1794 target_bucket: "{account_id}-{region}-s3-logs"
1795 target_prefix: "{account}/{source_bucket_name}/"
1796 """
1797 schema = type_schema(
1798 'toggle-logging',
1799 enabled={'type': 'boolean'},
1800 target_bucket={'type': 'string'},
1801 target_prefix={'type': 'string'})
1803 permissions = ("s3:PutBucketLogging", "iam:ListAccountAliases")
1805 def validate(self):
1806 if self.data.get('enabled', True):
1807 if not self.data.get('target_bucket'):
1808 raise PolicyValidationError(
1809 "target_bucket must be specified on %s" % (
1810 self.manager.data,))
1811 return self
1813 def process(self, resources):
1814 session = local_session(self.manager.session_factory)
1815 kwargs = {
1816 "enabled": self.data.get('enabled', True),
1817 "session": session,
1818 "account_name": get_account_alias_from_sts(session),
1819 }
1821 return self._process_with_futures(resources, **kwargs)
1823 def process_bucket(self, r, enabled=None, session=None, account_name=None):
1824 client = bucket_client(session, r)
1825 is_logging = bool(r.get('Logging'))
1827 if enabled:
1828 variables = self.get_std_format_args(r)
1829 variables.update({
1830 'account': account_name,
1831 'source_bucket_name': r['Name'],
1832 'source_bucket_region': get_region(r),
1833 'target_bucket_name': self.data.get('target_bucket'),
1834 'target_prefix': self.data.get('target_prefix'),
1835 })
1836 data = format_string_values(self.data, **variables)
1837 config = {
1838 'TargetBucket': data.get('target_bucket'),
1839 'TargetPrefix': data.get('target_prefix', r['Name'] + '/')
1840 }
1841 if not is_logging or r.get('Logging') != config:
1842 client.put_bucket_logging(
1843 Bucket=r['Name'],
1844 BucketLoggingStatus={'LoggingEnabled': config}
1845 )
1846 r['Logging'] = config
1848 elif not enabled and is_logging:
1849 client.put_bucket_logging(
1850 Bucket=r['Name'], BucketLoggingStatus={})
1851 r['Logging'] = {}
1854@actions.register('attach-encrypt')
1855class AttachLambdaEncrypt(BucketActionBase):
1856 """Action attaches lambda encryption policy to S3 bucket
1857 supports attachment via lambda bucket notification or sns notification
1858 to invoke lambda. a special topic value of `default` will utilize an
1859 extant notification or create one matching the bucket name.
1861 :example:
1864 .. code-block:: yaml
1867 policies:
1868 - name: attach-lambda-encrypt
1869 resource: s3
1870 filters:
1871 - type: missing-policy-statement
1872 actions:
1873 - type: attach-encrypt
1874 role: arn:aws:iam::123456789012:role/my-role
1876 """
1877 schema = type_schema(
1878 'attach-encrypt',
1879 role={'type': 'string'},
1880 tags={'type': 'object'},
1881 topic={'type': 'string'})
1883 permissions = (
1884 "s3:PutBucketNotification", "s3:GetBucketNotification",
1885 # lambda manager uses quite a few perms to provision lambdas
1886 # and event sources, hard to disamgibuate punt for now.
1887 "lambda:*",
1888 )
1890 def __init__(self, data=None, manager=None):
1891 self.data = data or {}
1892 self.manager = manager
1894 def validate(self):
1895 if (not getattr(self.manager.config, 'dryrun', True) and
1896 not self.data.get('role', self.manager.config.assume_role)):
1897 raise PolicyValidationError(
1898 "attach-encrypt: role must be specified either "
1899 "via assume or in config on %s" % (self.manager.data,))
1901 return self
1903 def process(self, buckets):
1904 from c7n.mu import LambdaManager
1905 from c7n.ufuncs.s3crypt import get_function
1907 account_id = self.manager.config.account_id
1908 topic_arn = self.data.get('topic')
1910 func = get_function(
1911 None, self.data.get('role', self.manager.config.assume_role),
1912 account_id=account_id, tags=self.data.get('tags'))
1914 regions = {get_region(b) for b in buckets}
1916 # session managers by region
1917 region_sessions = {}
1918 for r in regions:
1919 region_sessions[r] = functools.partial(
1920 self.manager.session_factory, region=r)
1922 # Publish function to all of our buckets regions
1923 region_funcs = {}
1925 for r in regions:
1926 lambda_mgr = LambdaManager(region_sessions[r])
1927 lambda_mgr.publish(func)
1928 region_funcs[r] = func
1930 with self.executor_factory(max_workers=3) as w:
1931 results = []
1932 futures = []
1933 for b in buckets:
1934 region = get_region(b)
1935 futures.append(
1936 w.submit(
1937 self.process_bucket,
1938 region_funcs[region],
1939 b,
1940 topic_arn,
1941 account_id,
1942 region_sessions[region]
1943 ))
1944 for f in as_completed(futures):
1945 if f.exception():
1946 log.exception(
1947 "Error attaching lambda-encrypt %s" % (f.exception()))
1948 results.append(f.result())
1949 return list(filter(None, results))
1951 def process_bucket(self, func, bucket, topic, account_id, session_factory):
1952 from c7n.mu import BucketSNSNotification, BucketLambdaNotification
1953 if topic:
1954 topic = None if topic == 'default' else topic
1955 source = BucketSNSNotification(session_factory, bucket, topic)
1956 else:
1957 source = BucketLambdaNotification(
1958 {'account_s3': account_id}, session_factory, bucket)
1959 return source.add(func, None)
1962@actions.register('encryption-policy')
1963class EncryptionRequiredPolicy(BucketActionBase):
1964 """Action to apply an encryption policy to S3 buckets
1967 :example:
1969 .. code-block:: yaml
1971 policies:
1972 - name: s3-enforce-encryption
1973 resource: s3
1974 mode:
1975 type: cloudtrail
1976 events:
1977 - CreateBucket
1978 actions:
1979 - encryption-policy
1980 """
1982 permissions = ("s3:GetBucketPolicy", "s3:PutBucketPolicy")
1983 schema = type_schema('encryption-policy')
1985 def __init__(self, data=None, manager=None):
1986 self.data = data or {}
1987 self.manager = manager
1989 def process(self, buckets):
1990 with self.executor_factory(max_workers=3) as w:
1991 results = w.map(self.process_bucket, buckets)
1992 results = list(filter(None, list(results)))
1993 return results
1995 def process_bucket(self, b):
1996 p = b['Policy']
1997 if p is None:
1998 log.info("No policy found, creating new")
1999 p = {'Version': "2012-10-17", "Statement": []}
2000 else:
2001 p = json.loads(p)
2003 encryption_sid = "RequiredEncryptedPutObject"
2004 encryption_statement = {
2005 'Sid': encryption_sid,
2006 'Effect': 'Deny',
2007 'Principal': '*',
2008 'Action': 's3:PutObject',
2009 "Resource": "arn:aws:s3:::%s/*" % b['Name'],
2010 "Condition": {
2011 # AWS Managed Keys or KMS keys, note policy language
2012 # does not support custom kms (todo add issue)
2013 "StringNotEquals": {
2014 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}}
2016 statements = p.get('Statement', [])
2017 for s in list(statements):
2018 if s.get('Sid', '') == encryption_sid:
2019 log.debug("Bucket:%s Found extant encrypt policy", b['Name'])
2020 if s != encryption_statement:
2021 log.info(
2022 "Bucket:%s updating extant encrypt policy", b['Name'])
2023 statements.remove(s)
2024 else:
2025 return
2027 session = self.manager.session_factory()
2028 s3 = bucket_client(session, b)
2029 statements.append(encryption_statement)
2030 p['Statement'] = statements
2031 log.info('Bucket:%s attached encryption policy' % b['Name'])
2033 try:
2034 s3.put_bucket_policy(
2035 Bucket=b['Name'],
2036 Policy=json.dumps(p))
2037 except ClientError as e:
2038 if e.response['Error']['Code'] == 'NoSuchBucket':
2039 return
2040 self.log.exception(
2041 "Error on bucket:%s putting policy\n%s error:%s",
2042 b['Name'],
2043 json.dumps(statements, indent=2), e)
2044 raise
2045 return {'Name': b['Name'], 'State': 'PolicyAttached'}
2048class BucketScanLog:
2049 """Offload remediated key ids to a disk file in batches
2051 A bucket keyspace is effectively infinite, we need to store partial
2052 results out of memory, this class provides for a json log on disk
2053 with partial write support.
2055 json output format:
2056 - [list_of_serialized_keys],
2057 - [] # Empty list of keys at end when we close the buffer
2059 """
2061 def __init__(self, log_dir, name):
2062 self.log_dir = log_dir
2063 self.name = name
2064 self.fh = None
2065 self.count = 0
2067 @property
2068 def path(self):
2069 return os.path.join(self.log_dir, "%s.json" % self.name)
2071 def __enter__(self):
2072 # Don't require output directories
2073 if self.log_dir is None:
2074 return
2076 self.fh = open(self.path, 'w')
2077 self.fh.write("[\n")
2078 return self
2080 def __exit__(self, exc_type=None, exc_value=None, exc_frame=None):
2081 if self.fh is None:
2082 return
2083 # we need an empty marker list at end to avoid trailing commas
2084 self.fh.write("[]")
2085 # and close the surrounding list
2086 self.fh.write("\n]")
2087 self.fh.close()
2088 if not self.count:
2089 os.remove(self.fh.name)
2090 self.fh = None
2091 return False
2093 def add(self, keys):
2094 self.count += len(keys)
2095 if self.fh is None:
2096 return
2097 self.fh.write(dumps(keys))
2098 self.fh.write(",\n")
2101class ScanBucket(BucketActionBase):
2103 permissions = ("s3:ListBucket",)
2105 bucket_ops = {
2106 'standard': {
2107 'iterator': 'list_objects',
2108 'contents_key': ['Contents'],
2109 'key_processor': 'process_key'
2110 },
2111 'versioned': {
2112 'iterator': 'list_object_versions',
2113 'contents_key': ['Versions'],
2114 'key_processor': 'process_version'
2115 }
2116 }
2118 def __init__(self, data, manager=None):
2119 super(ScanBucket, self).__init__(data, manager)
2120 self.denied_buckets = set()
2122 def get_bucket_style(self, b):
2123 return (
2124 b.get('Versioning', {'Status': ''}).get('Status') in (
2125 'Enabled', 'Suspended') and 'versioned' or 'standard')
2127 def get_bucket_op(self, b, op_name):
2128 bucket_style = self.get_bucket_style(b)
2129 op = self.bucket_ops[bucket_style][op_name]
2130 if op_name == 'key_processor':
2131 return getattr(self, op)
2132 return op
2134 def get_keys(self, b, key_set):
2135 content_keys = self.get_bucket_op(b, 'contents_key')
2136 keys = []
2137 for ck in content_keys:
2138 keys.extend(key_set.get(ck, []))
2139 return keys
2141 def process(self, buckets):
2142 results = self._process_with_futures(self.process_bucket, buckets)
2143 self.write_denied_buckets_file()
2144 return results
2146 def _process_with_futures(self, helper, buckets, max_workers=3):
2147 results = []
2148 with self.executor_factory(max_workers) as w:
2149 futures = {}
2150 for b in buckets:
2151 futures[w.submit(helper, b)] = b
2152 for f in as_completed(futures):
2153 if f.exception():
2154 b = futures[f]
2155 self.log.error(
2156 "Error on bucket:%s region:%s policy:%s error: %s",
2157 b['Name'], b.get('Location', 'unknown'),
2158 self.manager.data.get('name'), f.exception())
2159 self.denied_buckets.add(b['Name'])
2160 continue
2161 result = f.result()
2162 if result:
2163 results.append(result)
2164 return results
2166 def write_denied_buckets_file(self):
2167 if (self.denied_buckets and
2168 self.manager.ctx.log_dir and
2169 not isinstance(self.manager.ctx.output, NullBlobOutput)):
2170 with open(
2171 os.path.join(
2172 self.manager.ctx.log_dir, 'denied.json'), 'w') as fh:
2173 json.dump(list(self.denied_buckets), fh, indent=2)
2174 self.denied_buckets = set()
2176 def process_bucket(self, b):
2177 log.info(
2178 "Scanning bucket:%s visitor:%s style:%s" % (
2179 b['Name'], self.__class__.__name__, self.get_bucket_style(b)))
2181 s = self.manager.session_factory()
2182 s3 = bucket_client(s, b)
2184 # The bulk of _process_bucket function executes inline in
2185 # calling thread/worker context, neither paginator nor
2186 # bucketscan log should be used across worker boundary.
2187 p = s3.get_paginator(
2188 self.get_bucket_op(b, 'iterator')).paginate(Bucket=b['Name'])
2190 with BucketScanLog(self.manager.ctx.log_dir, b['Name']) as key_log:
2191 with self.executor_factory(max_workers=10) as w:
2192 try:
2193 return self._process_bucket(b, p, key_log, w)
2194 except ClientError as e:
2195 if e.response['Error']['Code'] == 'NoSuchBucket':
2196 log.warning(
2197 "Bucket:%s removed while scanning" % b['Name'])
2198 return
2199 if e.response['Error']['Code'] == 'AccessDenied':
2200 log.warning(
2201 "Access Denied Bucket:%s while scanning" % b['Name'])
2202 self.denied_buckets.add(b['Name'])
2203 return
2204 log.exception(
2205 "Error processing bucket:%s paginator:%s" % (
2206 b['Name'], p))
2208 __call__ = process_bucket
2210 def _process_bucket(self, b, p, key_log, w):
2211 count = 0
2213 for key_set in p:
2214 keys = self.get_keys(b, key_set)
2215 count += len(keys)
2216 futures = []
2218 for batch in chunks(keys, size=100):
2219 if not batch:
2220 continue
2221 futures.append(w.submit(self.process_chunk, batch, b))
2223 for f in as_completed(futures):
2224 if f.exception():
2225 log.exception("Exception Processing bucket:%s key batch %s" % (
2226 b['Name'], f.exception()))
2227 continue
2228 r = f.result()
2229 if r:
2230 key_log.add(r)
2232 # Log completion at info level, progress at debug level
2233 if key_set['IsTruncated']:
2234 log.debug('Scan progress bucket:%s keys:%d remediated:%d ...',
2235 b['Name'], count, key_log.count)
2236 else:
2237 log.info('Scan Complete bucket:%s keys:%d remediated:%d',
2238 b['Name'], count, key_log.count)
2240 b['KeyScanCount'] = count
2241 b['KeyRemediated'] = key_log.count
2242 return {
2243 'Bucket': b['Name'], 'Remediated': key_log.count, 'Count': count}
2245 def process_chunk(self, batch, bucket):
2246 raise NotImplementedError()
2248 def process_key(self, s3, key, bucket_name, info=None):
2249 raise NotImplementedError()
2251 def process_version(self, s3, bucket, key):
2252 raise NotImplementedError()
2255@actions.register('encrypt-keys')
2256class EncryptExtantKeys(ScanBucket):
2257 """Action to encrypt unencrypted S3 objects
2259 :example:
2261 .. code-block:: yaml
2263 policies:
2264 - name: s3-encrypt-objects
2265 resource: s3
2266 actions:
2267 - type: encrypt-keys
2268 crypto: aws:kms
2269 key-id: 9c3983be-c6cf-11e6-9d9d-cec0c932ce01
2270 """
2272 permissions = (
2273 "s3:GetObject",
2274 "s3:PutObject",
2275 "s3:DeleteObjectVersion",
2276 "s3:RestoreObject",
2277 ) + ScanBucket.permissions
2279 schema = {
2280 'type': 'object',
2281 'additionalProperties': False,
2282 'properties': {
2283 'type': {'enum': ['encrypt-keys']},
2284 'report-only': {'type': 'boolean'},
2285 'glacier': {'type': 'boolean'},
2286 'large': {'type': 'boolean'},
2287 'crypto': {'enum': ['AES256', 'aws:kms']},
2288 'key-id': {'type': 'string'}
2289 },
2290 'dependencies': {
2291 'key-id': {
2292 'properties': {
2293 'crypto': {'pattern': 'aws:kms'}
2294 },
2295 'required': ['crypto']
2296 }
2297 }
2298 }
2300 metrics = [
2301 ('Total Keys', {'Scope': 'Account'}),
2302 ('Unencrypted', {'Scope': 'Account'})]
2304 def __init__(self, data, manager=None):
2305 super(EncryptExtantKeys, self).__init__(data, manager)
2306 self.kms_id = self.data.get('key-id')
2308 def get_permissions(self):
2309 perms = ("s3:GetObject", "s3:GetObjectVersion")
2310 if self.data.get('report-only'):
2311 perms += ('s3:DeleteObject', 's3:DeleteObjectVersion',
2312 's3:PutObject',
2313 's3:AbortMultipartUpload',
2314 's3:ListBucket',
2315 's3:ListBucketVersions')
2316 return perms
2318 def process(self, buckets):
2320 t = time.time()
2321 results = super(EncryptExtantKeys, self).process(buckets)
2322 run_time = time.time() - t
2323 remediated_count = object_count = 0
2325 for r in results:
2326 object_count += r['Count']
2327 remediated_count += r['Remediated']
2328 self.manager.ctx.metrics.put_metric(
2329 "Unencrypted", r['Remediated'], "Count", Scope=r['Bucket'],
2330 buffer=True)
2332 self.manager.ctx.metrics.put_metric(
2333 "Unencrypted", remediated_count, "Count", Scope="Account",
2334 buffer=True
2335 )
2336 self.manager.ctx.metrics.put_metric(
2337 "Total Keys", object_count, "Count", Scope="Account",
2338 buffer=True
2339 )
2340 self.manager.ctx.metrics.flush()
2342 log.info(
2343 ("EncryptExtant Complete keys:%d "
2344 "remediated:%d rate:%0.2f/s time:%0.2fs"),
2345 object_count,
2346 remediated_count,
2347 float(object_count) / run_time if run_time else 0,
2348 run_time)
2349 return results
2351 def process_chunk(self, batch, bucket):
2352 crypto_method = self.data.get('crypto', 'AES256')
2353 s3 = bucket_client(
2354 local_session(self.manager.session_factory), bucket,
2355 kms=(crypto_method == 'aws:kms'))
2356 b = bucket['Name']
2357 results = []
2358 key_processor = self.get_bucket_op(bucket, 'key_processor')
2359 for key in batch:
2360 r = key_processor(s3, key, b)
2361 if r:
2362 results.append(r)
2363 return results
2365 def process_key(self, s3, key, bucket_name, info=None):
2366 k = key['Key']
2367 if info is None:
2368 info = s3.head_object(Bucket=bucket_name, Key=k)
2370 # If the data is already encrypted with AES256 and this request is also
2371 # for AES256 then we don't need to do anything
2372 if info.get('ServerSideEncryption') == 'AES256' and not self.kms_id:
2373 return False
2375 if info.get('ServerSideEncryption') == 'aws:kms':
2376 # If we're not looking for a specific key any key will do.
2377 if not self.kms_id:
2378 return False
2379 # If we're configured to use a specific key and the key matches
2380 # note this is not a strict equality match.
2381 if self.kms_id in info.get('SSEKMSKeyId', ''):
2382 return False
2384 if self.data.get('report-only'):
2385 return k
2387 storage_class = info.get('StorageClass', 'STANDARD')
2389 if storage_class == 'GLACIER':
2390 if not self.data.get('glacier'):
2391 return False
2392 if 'Restore' not in info:
2393 # This takes multiple hours, we let the next c7n
2394 # run take care of followups.
2395 s3.restore_object(
2396 Bucket=bucket_name,
2397 Key=k,
2398 RestoreRequest={'Days': 30})
2399 return False
2400 elif not restore_complete(info['Restore']):
2401 return False
2403 storage_class = 'STANDARD'
2405 crypto_method = self.data.get('crypto', 'AES256')
2406 key_id = self.data.get('key-id')
2407 # Note on copy we lose individual object acl grants
2408 params = {'Bucket': bucket_name,
2409 'Key': k,
2410 'CopySource': "/%s/%s" % (bucket_name, k),
2411 'MetadataDirective': 'COPY',
2412 'StorageClass': storage_class,
2413 'ServerSideEncryption': crypto_method}
2415 if key_id and crypto_method == 'aws:kms':
2416 params['SSEKMSKeyId'] = key_id
2418 if info['ContentLength'] > MAX_COPY_SIZE and self.data.get(
2419 'large', True):
2420 return self.process_large_file(s3, bucket_name, key, info, params)
2422 s3.copy_object(**params)
2423 return k
2425 def process_version(self, s3, key, bucket_name):
2426 info = s3.head_object(
2427 Bucket=bucket_name,
2428 Key=key['Key'],
2429 VersionId=key['VersionId'])
2431 if 'ServerSideEncryption' in info:
2432 return False
2434 if self.data.get('report-only'):
2435 return key['Key'], key['VersionId']
2437 if key['IsLatest']:
2438 r = self.process_key(s3, key, bucket_name, info)
2439 # Glacier request processing, wait till we have the restored object
2440 if not r:
2441 return r
2442 s3.delete_object(
2443 Bucket=bucket_name,
2444 Key=key['Key'],
2445 VersionId=key['VersionId'])
2446 return key['Key'], key['VersionId']
2448 def process_large_file(self, s3, bucket_name, key, info, params):
2449 """For objects over 5gb, use multipart upload to copy"""
2450 part_size = MAX_COPY_SIZE - (1024 ** 2)
2451 num_parts = int(math.ceil(info['ContentLength'] / part_size))
2452 source = params.pop('CopySource')
2454 params.pop('MetadataDirective')
2455 if 'Metadata' in info:
2456 params['Metadata'] = info['Metadata']
2458 upload_id = s3.create_multipart_upload(**params)['UploadId']
2460 params = {'Bucket': bucket_name,
2461 'Key': key['Key'],
2462 'UploadId': upload_id,
2463 'CopySource': source,
2464 'CopySourceIfMatch': info['ETag']}
2466 def upload_part(part_num):
2467 part_params = dict(params)
2468 part_params['CopySourceRange'] = "bytes=%d-%d" % (
2469 part_size * (part_num - 1),
2470 min(part_size * part_num - 1, info['ContentLength'] - 1))
2471 part_params['PartNumber'] = part_num
2472 response = s3.upload_part_copy(**part_params)
2473 return {'ETag': response['CopyPartResult']['ETag'],
2474 'PartNumber': part_num}
2476 try:
2477 with self.executor_factory(max_workers=2) as w:
2478 parts = list(w.map(upload_part, range(1, num_parts + 1)))
2479 except Exception:
2480 log.warning(
2481 "Error during large key copy bucket: %s key: %s, "
2482 "aborting upload", bucket_name, key, exc_info=True)
2483 s3.abort_multipart_upload(
2484 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id)
2485 raise
2486 s3.complete_multipart_upload(
2487 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id,
2488 MultipartUpload={'Parts': parts})
2489 return key['Key']
2492def restore_complete(restore):
2493 if ',' in restore:
2494 ongoing, _ = restore.split(',', 1)
2495 else:
2496 ongoing = restore
2497 return 'false' in ongoing
2500@filters.register('is-log-target')
2501class LogTarget(Filter):
2502 """Filter and return buckets are log destinations.
2504 Not suitable for use in lambda on large accounts, This is a api
2505 heavy process to detect scan all possible log sources.
2507 Sources:
2508 - elb (Access Log)
2509 - s3 (Access Log)
2510 - cfn (Template writes)
2511 - cloudtrail
2513 :example:
2515 .. code-block:: yaml
2517 policies:
2518 - name: s3-log-bucket
2519 resource: s3
2520 filters:
2521 - type: is-log-target
2522 """
2524 schema = type_schema(
2525 'is-log-target',
2526 services={'type': 'array', 'items': {'enum': [
2527 's3', 'elb', 'cloudtrail']}},
2528 self={'type': 'boolean'},
2529 value={'type': 'boolean'})
2531 def get_permissions(self):
2532 perms = self.manager.get_resource_manager('elb').get_permissions()
2533 perms += ('elasticloadbalancing:DescribeLoadBalancerAttributes',)
2534 return perms
2536 def process(self, buckets, event=None):
2537 log_buckets = set()
2538 count = 0
2540 services = self.data.get('services', ['elb', 's3', 'cloudtrail'])
2541 self_log = self.data.get('self', False)
2543 if 'elb' in services and not self_log:
2544 for bucket, _ in self.get_elb_bucket_locations():
2545 log_buckets.add(bucket)
2546 count += 1
2547 self.log.debug("Found %d elb log targets" % count)
2549 if 's3' in services:
2550 count = 0
2551 for bucket, _ in self.get_s3_bucket_locations(buckets, self_log):
2552 count += 1
2553 log_buckets.add(bucket)
2554 self.log.debug('Found %d s3 log targets' % count)
2556 if 'cloudtrail' in services and not self_log:
2557 for bucket, _ in self.get_cloud_trail_locations(buckets):
2558 log_buckets.add(bucket)
2560 self.log.info("Found %d log targets for %d buckets" % (
2561 len(log_buckets), len(buckets)))
2562 if self.data.get('value', True):
2563 return [b for b in buckets if b['Name'] in log_buckets]
2564 else:
2565 return [b for b in buckets if b['Name'] not in log_buckets]
2567 @staticmethod
2568 def get_s3_bucket_locations(buckets, self_log=False):
2569 """return (bucket_name, prefix) for all s3 logging targets"""
2570 for b in buckets:
2571 if b.get('Logging'):
2572 if self_log:
2573 if b['Name'] != b['Logging']['TargetBucket']:
2574 continue
2575 yield (b['Logging']['TargetBucket'],
2576 b['Logging']['TargetPrefix'])
2577 if not self_log and b['Name'].startswith('cf-templates-'):
2578 yield (b['Name'], '')
2580 def get_cloud_trail_locations(self, buckets):
2581 session = local_session(self.manager.session_factory)
2582 client = session.client('cloudtrail')
2583 names = {b['Name'] for b in buckets}
2584 for t in client.describe_trails().get('trailList', ()):
2585 if t.get('S3BucketName') in names:
2586 yield (t['S3BucketName'], t.get('S3KeyPrefix', ''))
2588 def get_elb_bucket_locations(self):
2589 elbs = self.manager.get_resource_manager('elb').resources()
2590 get_elb_attrs = functools.partial(
2591 _query_elb_attrs, self.manager.session_factory)
2593 with self.executor_factory(max_workers=2) as w:
2594 futures = []
2595 for elb_set in chunks(elbs, 100):
2596 futures.append(w.submit(get_elb_attrs, elb_set))
2597 for f in as_completed(futures):
2598 if f.exception():
2599 log.error("Error while scanning elb log targets: %s" % (
2600 f.exception()))
2601 continue
2602 for tgt in f.result():
2603 yield tgt
2606def _query_elb_attrs(session_factory, elb_set):
2607 session = local_session(session_factory)
2608 client = session.client('elb')
2609 log_targets = []
2610 for e in elb_set:
2611 try:
2612 attrs = client.describe_load_balancer_attributes(
2613 LoadBalancerName=e['LoadBalancerName'])[
2614 'LoadBalancerAttributes']
2615 if 'AccessLog' in attrs and attrs['AccessLog']['Enabled']:
2616 log_targets.append((
2617 attrs['AccessLog']['S3BucketName'],
2618 attrs['AccessLog']['S3BucketPrefix']))
2619 except Exception as err:
2620 log.warning(
2621 "Could not retrieve load balancer %s: %s" % (
2622 e['LoadBalancerName'], err))
2623 return log_targets
2626@actions.register('remove-website-hosting')
2627class RemoveWebsiteHosting(BucketActionBase):
2628 """Action that removes website hosting configuration."""
2630 schema = type_schema('remove-website-hosting')
2632 permissions = ('s3:DeleteBucketWebsite',)
2634 def process(self, buckets):
2635 session = local_session(self.manager.session_factory)
2636 for bucket in buckets:
2637 client = bucket_client(session, bucket)
2638 client.delete_bucket_website(Bucket=bucket['Name'])
2641@actions.register('delete-global-grants')
2642class DeleteGlobalGrants(BucketActionBase):
2643 """Deletes global grants associated to a S3 bucket
2645 :example:
2647 .. code-block:: yaml
2649 policies:
2650 - name: s3-delete-global-grants
2651 resource: s3
2652 filters:
2653 - type: global-grants
2654 actions:
2655 - delete-global-grants
2656 """
2658 schema = type_schema(
2659 'delete-global-grants',
2660 grantees={'type': 'array', 'items': {'type': 'string'}})
2662 permissions = ('s3:PutBucketAcl',)
2664 def process(self, buckets):
2665 with self.executor_factory(max_workers=5) as w:
2666 return list(filter(None, list(w.map(self.process_bucket, buckets))))
2668 def process_bucket(self, b):
2669 grantees = self.data.get(
2670 'grantees', [
2671 GlobalGrantsFilter.AUTH_ALL, GlobalGrantsFilter.GLOBAL_ALL])
2673 log.info(b)
2675 acl = b.get('Acl', {'Grants': []})
2676 if not acl or not acl['Grants']:
2677 return
2678 new_grants = []
2679 for grant in acl['Grants']:
2680 grantee = grant.get('Grantee', {})
2681 if not grantee:
2682 continue
2683 # Yuck, 'get_bucket_acl' doesn't return the grantee type.
2684 if 'URI' in grantee:
2685 grantee['Type'] = 'Group'
2686 else:
2687 grantee['Type'] = 'CanonicalUser'
2688 if ('URI' in grantee and
2689 grantee['URI'] in grantees and not
2690 (grant['Permission'] == 'READ' and b['Website'])):
2691 # Remove this grantee.
2692 pass
2693 else:
2694 new_grants.append(grant)
2696 log.info({'Owner': acl['Owner'], 'Grants': new_grants})
2698 c = bucket_client(self.manager.session_factory(), b)
2699 try:
2700 c.put_bucket_acl(
2701 Bucket=b['Name'],
2702 AccessControlPolicy={
2703 'Owner': acl['Owner'], 'Grants': new_grants})
2704 except ClientError as e:
2705 if e.response['Error']['Code'] == 'NoSuchBucket':
2706 return
2707 return b
2710@actions.register('tag')
2711class BucketTag(Tag):
2712 """Action to create tags on a S3 bucket
2714 :example:
2716 .. code-block:: yaml
2718 policies:
2719 - name: s3-tag-region
2720 resource: s3
2721 region: us-east-1
2722 filters:
2723 - "tag:RegionName": absent
2724 actions:
2725 - type: tag
2726 key: RegionName
2727 value: us-east-1
2728 """
2730 def process_resource_set(self, client, resource_set, tags):
2731 modify_bucket_tags(self.manager.session_factory, resource_set, tags)
2734@actions.register('mark-for-op')
2735class MarkBucketForOp(TagDelayedAction):
2736 """Action schedules custodian to perform an action at a certain date
2738 :example:
2740 .. code-block:: yaml
2742 policies:
2743 - name: s3-encrypt
2744 resource: s3
2745 filters:
2746 - type: missing-statement
2747 statement_ids:
2748 - RequiredEncryptedPutObject
2749 actions:
2750 - type: mark-for-op
2751 op: attach-encrypt
2752 days: 7
2753 """
2755 schema = type_schema(
2756 'mark-for-op', rinherit=TagDelayedAction.schema)
2759@actions.register('unmark')
2760@actions.register('remove-tag')
2761class RemoveBucketTag(RemoveTag):
2762 """Removes tag/tags from a S3 object
2764 :example:
2766 .. code-block:: yaml
2768 policies:
2769 - name: s3-remove-owner-tag
2770 resource: s3
2771 filters:
2772 - "tag:BucketOwner": present
2773 actions:
2774 - type: remove-tag
2775 tags: ['BucketOwner']
2776 """
2778 def process_resource_set(self, client, resource_set, tags):
2779 modify_bucket_tags(
2780 self.manager.session_factory, resource_set, remove_tags=tags)
2783@filters.register('data-events')
2784class DataEvents(Filter):
2785 """Find buckets for which CloudTrail is logging data events.
2787 Note that this filter only examines trails that are defined in the
2788 current account.
2789 """
2791 schema = type_schema('data-events', state={'enum': ['present', 'absent']})
2792 permissions = (
2793 'cloudtrail:DescribeTrails',
2794 'cloudtrail:GetEventSelectors')
2796 def get_event_buckets(self, session, trails):
2797 """Return a mapping of bucket name to cloudtrail.
2799 For wildcard trails the bucket name is ''.
2800 """
2801 regions = {t.get('HomeRegion') for t in trails}
2802 clients = {}
2803 for region in regions:
2804 clients[region] = session.client('cloudtrail', region_name=region)
2806 event_buckets = {}
2807 for t in trails:
2808 for events in clients[t.get('HomeRegion')].get_event_selectors(
2809 TrailName=t['Name']).get('EventSelectors', ()):
2810 if 'DataResources' not in events:
2811 continue
2812 for data_events in events['DataResources']:
2813 if data_events['Type'] != 'AWS::S3::Object':
2814 continue
2815 for b in data_events['Values']:
2816 event_buckets[b.rsplit(':')[-1].strip('/')] = t['Name']
2817 return event_buckets
2819 def process(self, resources, event=None):
2820 trails = self.manager.get_resource_manager('cloudtrail').resources()
2821 local_trails = self.filter_resources(
2822 trails,
2823 "split(':', TrailARN)[4]", (self.manager.account_id,)
2824 )
2825 session = local_session(self.manager.session_factory)
2826 event_buckets = self.get_event_buckets(session, local_trails)
2827 ops = {
2828 'present': lambda x: (
2829 x['Name'] in event_buckets or '' in event_buckets),
2830 'absent': (
2831 lambda x: x['Name'] not in event_buckets and ''
2832 not in event_buckets)}
2834 op = ops[self.data.get('state', 'present')]
2835 results = []
2836 for b in resources:
2837 if op(b):
2838 results.append(b)
2839 return results
2842@filters.register('inventory')
2843class Inventory(ValueFilter):
2844 """Filter inventories for a bucket"""
2845 schema = type_schema('inventory', rinherit=ValueFilter.schema)
2846 schema_alias = False
2847 permissions = ('s3:GetInventoryConfiguration',)
2849 def process(self, buckets, event=None):
2850 results = []
2851 with self.executor_factory(max_workers=2) as w:
2852 futures = {}
2853 for b in buckets:
2854 futures[w.submit(self.process_bucket, b)] = b
2856 for f in as_completed(futures):
2857 b = futures[f]
2858 if f.exception():
2859 b.setdefault('c7n:DeniedMethods', []).append('GetInventoryConfiguration')
2860 self.log.error(
2861 "Error processing bucket: %s error: %s",
2862 b['Name'], f.exception())
2863 continue
2864 if f.result():
2865 results.append(b)
2866 return results
2868 def process_bucket(self, b):
2869 if 'c7n:inventories' not in b:
2870 client = bucket_client(local_session(self.manager.session_factory), b)
2871 inventories = client.list_bucket_inventory_configurations(
2872 Bucket=b['Name']).get('InventoryConfigurationList', [])
2873 b['c7n:inventories'] = inventories
2875 for i in b['c7n:inventories']:
2876 if self.match(i):
2877 return True
2880@actions.register('set-inventory')
2881class SetInventory(BucketActionBase):
2882 """Configure bucket inventories for an s3 bucket.
2883 """
2884 schema = type_schema(
2885 'set-inventory',
2886 required=['name', 'destination'],
2887 state={'enum': ['enabled', 'disabled', 'absent']},
2888 name={'type': 'string', 'description': 'Name of inventory'},
2889 destination={'type': 'string', 'description': 'Name of destination bucket'},
2890 prefix={'type': 'string', 'description': 'Destination prefix'},
2891 encryption={'enum': ['SSES3', 'SSEKMS']},
2892 key_id={'type': 'string', 'description': 'Optional Customer KMS KeyId for SSE-KMS'},
2893 versions={'enum': ['All', 'Current']},
2894 schedule={'enum': ['Daily', 'Weekly']},
2895 format={'enum': ['CSV', 'ORC', 'Parquet']},
2896 fields={'type': 'array', 'items': {'enum': [
2897 'Size', 'LastModifiedDate', 'StorageClass', 'ETag',
2898 'IsMultipartUploaded', 'ReplicationStatus', 'EncryptionStatus',
2899 'ObjectLockRetainUntilDate', 'ObjectLockMode', 'ObjectLockLegalHoldStatus',
2900 'IntelligentTieringAccessTier', 'BucketKeyStatus', 'ChecksumAlgorithm',
2901 'ObjectAccessControlList', 'ObjectOwner']}})
2903 permissions = ('s3:PutInventoryConfiguration', 's3:GetInventoryConfiguration')
2905 def process(self, buckets):
2906 with self.executor_factory(max_workers=2) as w:
2907 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
2908 for future in as_completed(futures):
2909 bucket = futures[future]
2910 try:
2911 future.result()
2912 except Exception as e:
2913 self.log.error('Message: %s Bucket: %s', e, bucket['Name'])
2915 def process_bucket(self, b):
2916 inventory_name = self.data.get('name')
2917 destination = self.data.get('destination')
2918 prefix = self.data.get('prefix', '')
2919 schedule = self.data.get('schedule', 'Daily')
2920 fields = self.data.get('fields', ['LastModifiedDate', 'Size'])
2921 versions = self.data.get('versions', 'Current')
2922 state = self.data.get('state', 'enabled')
2923 encryption = self.data.get('encryption')
2924 inventory_format = self.data.get('format', 'CSV')
2926 if not prefix:
2927 prefix = "Inventories/%s" % (self.manager.config.account_id)
2929 client = bucket_client(local_session(self.manager.session_factory), b)
2930 if state == 'absent':
2931 try:
2932 client.delete_bucket_inventory_configuration(
2933 Bucket=b['Name'], Id=inventory_name)
2934 except ClientError as e:
2935 if e.response['Error']['Code'] != 'NoSuchConfiguration':
2936 raise
2937 return
2939 bucket = {
2940 'Bucket': "arn:aws:s3:::%s" % destination,
2941 'Format': inventory_format
2942 }
2944 inventory = {
2945 'Destination': {
2946 'S3BucketDestination': bucket
2947 },
2948 'IsEnabled': state == 'enabled' and True or False,
2949 'Id': inventory_name,
2950 'OptionalFields': fields,
2951 'IncludedObjectVersions': versions,
2952 'Schedule': {
2953 'Frequency': schedule
2954 }
2955 }
2957 if prefix:
2958 bucket['Prefix'] = prefix
2960 if encryption:
2961 bucket['Encryption'] = {encryption: {}}
2962 if encryption == 'SSEKMS' and self.data.get('key_id'):
2963 bucket['Encryption'] = {encryption: {
2964 'KeyId': self.data['key_id']
2965 }}
2967 found = self.get_inventory_delta(client, inventory, b)
2968 if found:
2969 return
2970 if found is False:
2971 self.log.debug("updating bucket:%s inventory configuration id:%s",
2972 b['Name'], inventory_name)
2973 client.put_bucket_inventory_configuration(
2974 Bucket=b['Name'], Id=inventory_name, InventoryConfiguration=inventory)
2976 def get_inventory_delta(self, client, inventory, b):
2977 inventories = client.list_bucket_inventory_configurations(Bucket=b['Name'])
2978 found = None
2979 for i in inventories.get('InventoryConfigurationList', []):
2980 if i['Id'] != inventory['Id']:
2981 continue
2982 found = True
2983 for k, v in inventory.items():
2984 if k not in i:
2985 found = False
2986 continue
2987 if isinstance(v, list):
2988 v.sort()
2989 i[k].sort()
2990 if i[k] != v:
2991 found = False
2992 return found
2995@filters.register('intelligent-tiering')
2996class IntelligentTiering(ListItemFilter):
2997 """Filter for S3 buckets to look at intelligent tiering configurations
2999 The schema to supply to the attrs follows the schema here:
3000 https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_bucket_intelligent_tiering_configurations.html
3002 :example:
3004 .. code-block:: yaml
3006 policies:
3007 - name: s3-intelligent-tiering-configuration
3008 resource: s3
3009 filters:
3010 - type: intelligent-tiering
3011 attrs:
3012 - Status: Enabled
3013 - Filter:
3014 And:
3015 Prefix: test
3016 Tags:
3017 - Key: Owner
3018 Value: c7n
3019 - Tierings:
3020 - Days: 100
3021 - AccessTier: ARCHIVE_ACCESS
3023 """
3024 schema = type_schema(
3025 'intelligent-tiering',
3026 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'},
3027 count={'type': 'number'},
3028 count_op={'$ref': '#/definitions/filters_common/comparison_operators'}
3029 )
3030 permissions = ('s3:GetIntelligentTieringConfiguration',)
3031 annotation_key = "c7n:IntelligentTiering"
3032 annotate_items = True
3034 def __init__(self, data, manager=None):
3035 super().__init__(data, manager)
3036 self.data['key'] = self.annotation_key
3038 def process(self, buckets, event=None):
3039 with self.executor_factory(max_workers=2) as w:
3040 futures = {w.submit(self.get_item_values, b): b for b in buckets}
3041 for future in as_completed(futures):
3042 b = futures[future]
3043 if future.exception():
3044 self.log.error("Message: %s Bucket: %s", future.exception(), b['Name'])
3045 continue
3046 return super().process(buckets, event)
3048 def get_item_values(self, b):
3049 if self.annotation_key not in b:
3050 client = bucket_client(local_session(self.manager.session_factory), b)
3051 try:
3052 int_tier_config = client.list_bucket_intelligent_tiering_configurations(
3053 Bucket=b['Name'])
3054 b[self.annotation_key] = int_tier_config.get(
3055 'IntelligentTieringConfigurationList', [])
3056 except ClientError as e:
3057 if e.response['Error']['Code'] == 'AccessDenied':
3058 method = 'list_bucket_intelligent_tiering_configurations'
3059 log.warning(
3060 "Bucket:%s unable to invoke method:%s error:%s ",
3061 b['Name'], method, e.response['Error']['Message'])
3062 b.setdefault('c7n:DeniedMethods', []).append(method)
3063 return b.get(self.annotation_key)
3066@actions.register('set-intelligent-tiering')
3067class ConfigureIntelligentTiering(BucketActionBase):
3068 """Action applies an intelligent tiering configuration to a S3 bucket
3070 The schema to supply to the configuration follows the schema here:
3071 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_bucket_intelligent_tiering_configuration.html
3073 To delete a configuration, supply Status=delete with the either the Id or Id: matched
3075 :example:
3077 .. code-block:: yaml
3079 policies:
3080 - name: s3-apply-intelligent-tiering-config
3081 resource: aws.s3
3082 filters:
3083 - not:
3084 - type: intelligent-tiering
3085 attrs:
3086 - Status: Enabled
3087 - Filter:
3088 And:
3089 Prefix: helloworld
3090 Tags:
3091 - Key: Hello
3092 Value: World
3093 - Tierings:
3094 - Days: 123
3095 AccessTier: ARCHIVE_ACCESS
3096 actions:
3097 - type: set-intelligent-tiering
3098 Id: c7n-default
3099 IntelligentTieringConfiguration:
3100 Id: c7n-default
3101 Status: Enabled
3102 Tierings:
3103 - Days: 149
3104 AccessTier: ARCHIVE_ACCESS
3106 - name: s3-delete-intelligent-tiering-configuration
3107 resource: aws.s3
3108 filters:
3109 - type: intelligent-tiering
3110 attrs:
3111 - Status: Enabled
3112 - Id: test-config
3113 actions:
3114 - type: set-intelligent-tiering
3115 Id: test-config
3116 State: delete
3118 - name: s3-delete-intelligent-tiering-matched-configs
3119 resource: aws.s3
3120 filters:
3121 - type: intelligent-tiering
3122 attrs:
3123 - Status: Enabled
3124 - Id: test-config
3125 actions:
3126 - type: set-intelligent-tiering
3127 Id: matched
3128 State: delete
3130 """
3132 annotation_key = 'c7n:ListItemMatches'
3133 shape = 'PutBucketIntelligentTieringConfigurationRequest'
3134 schema = {
3135 'type': 'object',
3136 'additionalProperties': False,
3137 'oneOf': [
3138 {'required': ['type', 'Id', 'IntelligentTieringConfiguration']},
3139 {'required': ['type', 'Id', 'State']}],
3140 'properties': {
3141 'type': {'enum': ['set-intelligent-tiering']},
3142 'Id': {'type': 'string'},
3143 # delete intelligent tier configurations via state: delete
3144 'State': {'type': 'string', 'enum': ['delete']},
3145 'IntelligentTieringConfiguration': {'type': 'object'}
3146 },
3147 }
3149 permissions = ('s3:PutIntelligentTieringConfiguration',)
3151 def validate(self):
3152 # You can have up to 1,000 S3 Intelligent-Tiering configurations per bucket.
3153 # Hence, always use it with a filter
3154 found = False
3155 for f in self.manager.iter_filters():
3156 if isinstance(f, IntelligentTiering):
3157 found = True
3158 break
3159 if not found:
3160 raise PolicyValidationError(
3161 '`set-intelligent-tiering` may only be used in '
3162 'conjunction with `intelligent-tiering` filter on %s' % (self.manager.data,))
3163 cfg = dict(self.data)
3164 if 'IntelligentTieringConfiguration' in cfg:
3165 cfg['Bucket'] = 'bucket'
3166 cfg.pop('type')
3167 return shape_validate(
3168 cfg, self.shape, self.manager.resource_type.service)
3170 def process(self, buckets):
3171 with self.executor_factory(max_workers=3) as w:
3172 futures = {}
3174 for b in buckets:
3175 futures[w.submit(self.process_bucket, b)] = b
3177 for future in as_completed(futures):
3178 if future.exception():
3179 bucket = futures[future]
3180 self.log.error(
3181 'error modifying bucket intelligent tiering configuration: %s\n%s',
3182 bucket['Name'], future.exception())
3183 continue
3185 def process_bucket(self, bucket):
3186 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3188 if 'list_bucket_intelligent_tiering_configurations' in bucket.get(
3189 'c7n:DeniedMethods', []):
3190 log.warning("Access Denied Bucket:%s while reading intelligent tiering configurations"
3191 % bucket['Name'])
3192 return
3194 if self.data.get('Id') and self.data.get('IntelligentTieringConfiguration'):
3195 try:
3196 s3.put_bucket_intelligent_tiering_configuration(
3197 Bucket=bucket['Name'], Id=self.data.get(
3198 'Id'), IntelligentTieringConfiguration=self.data.get(
3199 'IntelligentTieringConfiguration'))
3200 except ClientError as e:
3201 if e.response['Error']['Code'] == 'AccessDenied':
3202 log.warning(
3203 "Access Denied Bucket:%s while applying intelligent tiering configuration"
3204 % bucket['Name'])
3205 if self.data.get('State'):
3206 if self.data.get('Id') == 'matched':
3207 for config in bucket.get(self.annotation_key):
3208 self.delete_intelligent_tiering_configurations(s3, config.get('Id'), bucket)
3209 else:
3210 self.delete_intelligent_tiering_configurations(s3, self.data.get('Id'), bucket)
3212 def delete_intelligent_tiering_configurations(self, s3_client, id, bucket):
3213 try:
3214 s3_client.delete_bucket_intelligent_tiering_configuration(Bucket=bucket['Name'], Id=id)
3215 except ClientError as e:
3216 if e.response['Error']['Code'] == 'AccessDenied':
3217 log.warning(
3218 "Access Denied Bucket:%s while deleting intelligent tiering configuration"
3219 % bucket['Name'])
3220 elif e.response['Error']['Code'] == 'NoSuchConfiguration':
3221 log.warning(
3222 "No such configuration found:%s while deleting intelligent tiering configuration"
3223 % bucket['Name'])
3226@actions.register('delete')
3227class DeleteBucket(ScanBucket):
3228 """Action deletes a S3 bucket
3230 :example:
3232 .. code-block:: yaml
3234 policies:
3235 - name: delete-unencrypted-buckets
3236 resource: s3
3237 filters:
3238 - type: missing-statement
3239 statement_ids:
3240 - RequiredEncryptedPutObject
3241 actions:
3242 - type: delete
3243 remove-contents: true
3244 """
3246 schema = type_schema('delete', **{'remove-contents': {'type': 'boolean'}})
3248 permissions = ('s3:*',)
3250 bucket_ops = {
3251 'standard': {
3252 'iterator': 'list_objects',
3253 'contents_key': ['Contents'],
3254 'key_processor': 'process_key'
3255 },
3256 'versioned': {
3257 'iterator': 'list_object_versions',
3258 'contents_key': ['Versions', 'DeleteMarkers'],
3259 'key_processor': 'process_version'
3260 }
3261 }
3263 def process_delete_enablement(self, b):
3264 """Prep a bucket for deletion.
3266 Clear out any pending multi-part uploads.
3268 Disable versioning on the bucket, so deletes don't
3269 generate fresh deletion markers.
3270 """
3271 client = bucket_client(
3272 local_session(self.manager.session_factory), b)
3274 # Stop replication so we can suspend versioning
3275 if b.get('Replication') is not None:
3276 client.delete_bucket_replication(Bucket=b['Name'])
3278 # Suspend versioning, so we don't get new delete markers
3279 # as we walk and delete versions
3280 if (self.get_bucket_style(b) == 'versioned' and b['Versioning']['Status'] == 'Enabled' and
3281 self.data.get('remove-contents', True)):
3282 client.put_bucket_versioning(
3283 Bucket=b['Name'],
3284 VersioningConfiguration={'Status': 'Suspended'})
3286 # Clear our multi-part uploads
3287 uploads = client.get_paginator('list_multipart_uploads')
3288 for p in uploads.paginate(Bucket=b['Name']):
3289 for u in p.get('Uploads', ()):
3290 client.abort_multipart_upload(
3291 Bucket=b['Name'],
3292 Key=u['Key'],
3293 UploadId=u['UploadId'])
3295 def process(self, buckets):
3296 # might be worth sanity checking all our permissions
3297 # on the bucket up front before disabling versioning/replication.
3298 if self.data.get('remove-contents', True):
3299 self._process_with_futures(self.process_delete_enablement, buckets)
3300 self.empty_buckets(buckets)
3302 results = self._process_with_futures(self.delete_bucket, buckets)
3303 self.write_denied_buckets_file()
3304 return results
3306 def delete_bucket(self, b):
3307 s3 = bucket_client(self.manager.session_factory(), b)
3308 try:
3309 self._run_api(s3.delete_bucket, Bucket=b['Name'])
3310 except ClientError as e:
3311 if e.response['Error']['Code'] == 'BucketNotEmpty':
3312 self.log.error(
3313 "Error while deleting bucket %s, bucket not empty" % (
3314 b['Name']))
3315 else:
3316 raise e
3318 def empty_buckets(self, buckets):
3319 t = time.time()
3320 results = super(DeleteBucket, self).process(buckets)
3321 run_time = time.time() - t
3322 object_count = 0
3324 for r in results:
3325 object_count += r['Count']
3326 self.manager.ctx.metrics.put_metric(
3327 "Total Keys", object_count, "Count", Scope=r['Bucket'],
3328 buffer=True)
3329 self.manager.ctx.metrics.put_metric(
3330 "Total Keys", object_count, "Count", Scope="Account", buffer=True)
3331 self.manager.ctx.metrics.flush()
3333 log.info(
3334 "EmptyBucket buckets:%d Complete keys:%d rate:%0.2f/s time:%0.2fs",
3335 len(buckets), object_count,
3336 float(object_count) / run_time if run_time else 0, run_time)
3337 return results
3339 def process_chunk(self, batch, bucket):
3340 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3341 objects = []
3342 for key in batch:
3343 obj = {'Key': key['Key']}
3344 if 'VersionId' in key:
3345 obj['VersionId'] = key['VersionId']
3346 objects.append(obj)
3347 results = s3.delete_objects(
3348 Bucket=bucket['Name'], Delete={'Objects': objects}).get('Deleted', ())
3349 if self.get_bucket_style(bucket) != 'versioned':
3350 return results
3353@actions.register('configure-lifecycle')
3354class Lifecycle(BucketActionBase):
3355 """Action applies a lifecycle policy to versioned S3 buckets
3357 The schema to supply to the rule follows the schema here:
3358 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_bucket_lifecycle_configuration
3360 To delete a lifecycle rule, supply Status=absent
3362 :example:
3364 .. code-block:: yaml
3366 policies:
3367 - name: s3-apply-lifecycle
3368 resource: s3
3369 actions:
3370 - type: configure-lifecycle
3371 rules:
3372 - ID: my-lifecycle-id
3373 Status: Enabled
3374 Prefix: foo/
3375 Transitions:
3376 - Days: 60
3377 StorageClass: GLACIER
3379 """
3381 schema = type_schema(
3382 'configure-lifecycle',
3383 **{
3384 'rules': {
3385 'type': 'array',
3386 'items': {
3387 'type': 'object',
3388 'required': ['ID', 'Status'],
3389 'additionalProperties': False,
3390 'properties': {
3391 'ID': {'type': 'string'},
3392 # c7n intercepts `absent`
3393 'Status': {'enum': ['Enabled', 'Disabled', 'absent']},
3394 'Prefix': {'type': 'string'},
3395 'Expiration': {
3396 'type': 'object',
3397 'additionalProperties': False,
3398 'properties': {
3399 'Date': {'type': 'string'}, # Date
3400 'Days': {'type': 'integer'},
3401 'ExpiredObjectDeleteMarker': {'type': 'boolean'},
3402 },
3403 },
3404 'Filter': {
3405 'type': 'object',
3406 'minProperties': 1,
3407 'maxProperties': 1,
3408 'additionalProperties': False,
3409 'properties': {
3410 'Prefix': {'type': 'string'},
3411 'ObjectSizeGreaterThan': {'type': 'integer'},
3412 'ObjectSizeLessThan': {'type': 'integer'},
3413 'Tag': {
3414 'type': 'object',
3415 'required': ['Key', 'Value'],
3416 'additionalProperties': False,
3417 'properties': {
3418 'Key': {'type': 'string'},
3419 'Value': {'type': 'string'},
3420 },
3421 },
3422 'And': {
3423 'type': 'object',
3424 'additionalProperties': False,
3425 'properties': {
3426 'Prefix': {'type': 'string'},
3427 'ObjectSizeGreaterThan': {'type': 'integer'},
3428 'ObjectSizeLessThan': {'type': 'integer'},
3429 'Tags': {
3430 'type': 'array',
3431 'items': {
3432 'type': 'object',
3433 'required': ['Key', 'Value'],
3434 'additionalProperties': False,
3435 'properties': {
3436 'Key': {'type': 'string'},
3437 'Value': {'type': 'string'},
3438 },
3439 },
3440 },
3441 },
3442 },
3443 },
3444 },
3445 'Transitions': {
3446 'type': 'array',
3447 'items': {
3448 'type': 'object',
3449 'additionalProperties': False,
3450 'properties': {
3451 'Date': {'type': 'string'}, # Date
3452 'Days': {'type': 'integer'},
3453 'StorageClass': {'type': 'string'},
3454 },
3455 },
3456 },
3457 'NoncurrentVersionTransitions': {
3458 'type': 'array',
3459 'items': {
3460 'type': 'object',
3461 'additionalProperties': False,
3462 'properties': {
3463 'NoncurrentDays': {'type': 'integer'},
3464 'NewerNoncurrentVersions': {'type': 'integer'},
3465 'StorageClass': {'type': 'string'},
3466 },
3467 },
3468 },
3469 'NoncurrentVersionExpiration': {
3470 'type': 'object',
3471 'additionalProperties': False,
3472 'properties': {
3473 'NoncurrentDays': {'type': 'integer'},
3474 'NewerNoncurrentVersions': {'type': 'integer'}
3475 },
3476 },
3477 'AbortIncompleteMultipartUpload': {
3478 'type': 'object',
3479 'additionalProperties': False,
3480 'properties': {
3481 'DaysAfterInitiation': {'type': 'integer'},
3482 },
3483 },
3484 },
3485 },
3486 },
3487 }
3488 )
3490 permissions = ('s3:GetLifecycleConfiguration', 's3:PutLifecycleConfiguration')
3492 def process(self, buckets):
3493 with self.executor_factory(max_workers=3) as w:
3494 futures = {}
3495 results = []
3497 for b in buckets:
3498 futures[w.submit(self.process_bucket, b)] = b
3500 for future in as_completed(futures):
3501 if future.exception():
3502 bucket = futures[future]
3503 self.log.error('error modifying bucket lifecycle: %s\n%s',
3504 bucket['Name'], future.exception())
3505 results += filter(None, [future.result()])
3507 return results
3509 def process_bucket(self, bucket):
3510 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3512 if 'get_bucket_lifecycle_configuration' in bucket.get('c7n:DeniedMethods', []):
3513 log.warning("Access Denied Bucket:%s while reading lifecycle" % bucket['Name'])
3514 return
3516 # Adjust the existing lifecycle by adding/deleting/overwriting rules as necessary
3517 config = (bucket.get('Lifecycle') or {}).get('Rules', [])
3518 for rule in self.data['rules']:
3519 for index, existing_rule in enumerate(config):
3520 if not existing_rule:
3521 continue
3522 if rule['ID'] == existing_rule['ID']:
3523 if rule['Status'] == 'absent':
3524 config[index] = None
3525 else:
3526 config[index] = rule
3527 break
3528 else:
3529 if rule['Status'] != 'absent':
3530 config.append(rule)
3532 # The extra `list` conversion is required for python3
3533 config = list(filter(None, config))
3535 try:
3536 if not config:
3537 s3.delete_bucket_lifecycle(Bucket=bucket['Name'])
3538 else:
3539 s3.put_bucket_lifecycle_configuration(
3540 Bucket=bucket['Name'], LifecycleConfiguration={'Rules': config})
3541 except ClientError as e:
3542 if e.response['Error']['Code'] == 'AccessDenied':
3543 log.warning("Access Denied Bucket:%s while applying lifecycle" % bucket['Name'])
3544 else:
3545 raise e
3548class KMSKeyResolverMixin:
3549 """Builds a dictionary of region specific ARNs"""
3551 def __init__(self, data, manager=None):
3552 self.arns = dict()
3553 self.data = data
3554 self.manager = manager
3556 def resolve_keys(self, buckets):
3557 key = self.data.get('key')
3558 if not key:
3559 return None
3561 regions = {get_region(b) for b in buckets}
3562 for r in regions:
3563 client = local_session(self.manager.session_factory).client('kms', region_name=r)
3564 try:
3565 key_meta = client.describe_key(
3566 KeyId=key
3567 ).get('KeyMetadata', {})
3568 key_id = key_meta.get('KeyId')
3570 # We need a complete set of alias identifiers (names and ARNs)
3571 # to fully evaluate bucket encryption filters.
3572 key_aliases = client.list_aliases(
3573 KeyId=key_id
3574 ).get('Aliases', [])
3576 self.arns[r] = {
3577 'KeyId': key_id,
3578 'Arn': key_meta.get('Arn'),
3579 'KeyManager': key_meta.get('KeyManager'),
3580 'Description': key_meta.get('Description'),
3581 'Aliases': [
3582 alias[attr]
3583 for alias in key_aliases
3584 for attr in ('AliasArn', 'AliasName')
3585 ],
3586 }
3588 except ClientError as e:
3589 self.log.error('Error resolving kms ARNs for set-bucket-encryption: %s key: %s' % (
3590 e, self.data.get('key')))
3592 def get_key(self, bucket):
3593 if 'key' not in self.data:
3594 return None
3595 region = get_region(bucket)
3596 key = self.arns.get(region)
3597 if not key:
3598 self.log.warning('Unable to resolve key %s for bucket %s in region %s',
3599 self.data['key'], bucket.get('Name'), region)
3600 return key
3603@filters.register('bucket-encryption')
3604class BucketEncryption(KMSKeyResolverMixin, Filter):
3605 """Filters for S3 buckets that have bucket-encryption
3607 :example
3609 .. code-block:: yaml
3611 policies:
3612 - name: s3-bucket-encryption-AES256
3613 resource: s3
3614 region: us-east-1
3615 filters:
3616 - type: bucket-encryption
3617 state: True
3618 crypto: AES256
3619 - name: s3-bucket-encryption-KMS
3620 resource: s3
3621 region: us-east-1
3622 filters:
3623 - type: bucket-encryption
3624 state: True
3625 crypto: aws:kms
3626 key: alias/some/alias/key
3627 - name: s3-bucket-encryption-off
3628 resource: s3
3629 region: us-east-1
3630 filters:
3631 - type: bucket-encryption
3632 state: False
3633 - name: s3-bucket-test-bucket-key-enabled
3634 resource: s3
3635 region: us-east-1
3636 filters:
3637 - type: bucket-encryption
3638 bucket_key_enabled: True
3639 """
3640 schema = type_schema('bucket-encryption',
3641 state={'type': 'boolean'},
3642 crypto={'type': 'string', 'enum': ['AES256', 'aws:kms']},
3643 key={'type': 'string'},
3644 bucket_key_enabled={'type': 'boolean'})
3646 permissions = ('s3:GetEncryptionConfiguration', 'kms:DescribeKey', 'kms:ListAliases')
3647 annotation_key = 'c7n:bucket-encryption'
3649 def validate(self):
3650 if self.data.get('bucket_key_enabled') is not None and self.data.get('key') is not None:
3651 raise PolicyValidationError(
3652 f'key and bucket_key_enabled attributes cannot both be set: {self.data}'
3653 )
3655 def process(self, buckets, event=None):
3656 self.resolve_keys(buckets)
3657 results = []
3658 with self.executor_factory(max_workers=2) as w:
3659 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3660 for future in as_completed(futures):
3661 b = futures[future]
3662 if future.exception():
3663 self.log.error("Message: %s Bucket: %s", future.exception(),
3664 b['Name'])
3665 continue
3666 if future.result():
3667 results.append(b)
3668 return results
3670 def process_bucket(self, b):
3672 client = bucket_client(local_session(self.manager.session_factory), b)
3673 rules = []
3674 if self.annotation_key not in b:
3675 try:
3676 be = client.get_bucket_encryption(Bucket=b['Name'])
3677 be.pop('ResponseMetadata', None)
3678 except ClientError as e:
3679 if e.response['Error']['Code'] != 'ServerSideEncryptionConfigurationNotFoundError':
3680 raise
3681 be = {}
3682 b[self.annotation_key] = be
3683 else:
3684 be = b[self.annotation_key]
3686 rules = be.get('ServerSideEncryptionConfiguration', {}).get('Rules', [])
3687 # default `state` to True as previous impl assumed state == True
3688 # to preserve backwards compatibility
3689 if self.data.get('bucket_key_enabled'):
3690 for rule in rules:
3691 return self.filter_bucket_key_enabled(rule)
3692 elif self.data.get('bucket_key_enabled') is False:
3693 for rule in rules:
3694 return not self.filter_bucket_key_enabled(rule)
3696 if self.data.get('state', True):
3697 for sse in rules:
3698 return self.filter_bucket(b, sse)
3699 return False
3700 else:
3701 for sse in rules:
3702 return not self.filter_bucket(b, sse)
3703 return True
3705 def filter_bucket(self, b, sse):
3706 allowed = ['AES256', 'aws:kms']
3707 key = self.get_key(b)
3708 crypto = self.data.get('crypto')
3709 rule = sse.get('ApplyServerSideEncryptionByDefault')
3711 if not rule:
3712 return False
3713 algo = rule.get('SSEAlgorithm')
3715 if not crypto and algo in allowed:
3716 return True
3718 if crypto == 'AES256' and algo == 'AES256':
3719 return True
3720 elif crypto == 'aws:kms' and algo == 'aws:kms':
3721 if not key:
3722 # There are two broad reasons to have an empty value for
3723 # the regional key here:
3724 #
3725 # * The policy did not specify a key, in which case this
3726 # filter should match _all_ buckets with a KMS default
3727 # encryption rule.
3728 #
3729 # * The policy specified a key that could not be
3730 # resolved, in which case this filter shouldn't match
3731 # any buckets.
3732 return 'key' not in self.data
3734 # The default encryption rule can specify a key ID,
3735 # key ARN, alias name or alias ARN. Match against any of
3736 # those attributes. A rule specifying KMS with no master key
3737 # implies the AWS-managed key.
3738 key_ids = {key.get('Arn'), key.get('KeyId'), *key['Aliases']}
3739 return rule.get('KMSMasterKeyID', 'alias/aws/s3') in key_ids
3741 def filter_bucket_key_enabled(self, rule) -> bool:
3742 if not rule:
3743 return False
3744 return rule.get('BucketKeyEnabled')
3747@actions.register('set-bucket-encryption')
3748class SetBucketEncryption(KMSKeyResolverMixin, BucketActionBase):
3749 """Action enables default encryption on S3 buckets
3751 `enabled`: boolean Optional: Defaults to True
3753 `crypto`: aws:kms | AES256` Optional: Defaults to AES256
3755 `key`: arn, alias, or kms id key
3757 `bucket-key`: boolean Optional:
3758 Defaults to True.
3759 Reduces amount of API traffic from Amazon S3 to KMS and can reduce KMS request
3760 costsby up to 99 percent. Requires kms:Decrypt permissions for copy and upload
3761 on the AWS KMS Key Policy.
3763 Bucket Key Docs: https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucket-key.html
3765 :example:
3767 .. code-block:: yaml
3769 policies:
3770 - name: s3-enable-default-encryption-kms
3771 resource: s3
3772 actions:
3773 - type: set-bucket-encryption
3774 # enabled: true <------ optional (true by default)
3775 crypto: aws:kms
3776 key: 1234abcd-12ab-34cd-56ef-1234567890ab
3777 bucket-key: true
3779 - name: s3-enable-default-encryption-kms-alias
3780 resource: s3
3781 actions:
3782 - type: set-bucket-encryption
3783 # enabled: true <------ optional (true by default)
3784 crypto: aws:kms
3785 key: alias/some/alias/key
3786 bucket-key: true
3788 - name: s3-enable-default-encryption-aes256
3789 resource: s3
3790 actions:
3791 - type: set-bucket-encryption
3792 # bucket-key: true <--- optional (true by default for AWS SSE)
3793 # crypto: AES256 <----- optional (AES256 by default)
3794 # enabled: true <------ optional (true by default)
3796 - name: s3-disable-default-encryption
3797 resource: s3
3798 actions:
3799 - type: set-bucket-encryption
3800 enabled: false
3801 """
3803 schema = {
3804 'type': 'object',
3805 'additionalProperties': False,
3806 'properties': {
3807 'type': {'enum': ['set-bucket-encryption']},
3808 'enabled': {'type': 'boolean'},
3809 'crypto': {'enum': ['aws:kms', 'AES256']},
3810 'key': {'type': 'string'},
3811 'bucket-key': {'type': 'boolean'}
3812 },
3813 'dependencies': {
3814 'key': {
3815 'properties': {
3816 'crypto': {'pattern': 'aws:kms'}
3817 },
3818 'required': ['crypto']
3819 }
3820 }
3821 }
3823 permissions = ('s3:PutEncryptionConfiguration', 's3:GetEncryptionConfiguration',
3824 'kms:ListAliases', 'kms:DescribeKey')
3826 def process(self, buckets):
3827 if self.data.get('enabled', True):
3828 self.resolve_keys(buckets)
3830 with self.executor_factory(max_workers=3) as w:
3831 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3832 for future in as_completed(futures):
3833 if future.exception():
3834 self.log.error('Message: %s Bucket: %s', future.exception(),
3835 futures[future]['Name'])
3837 def process_bucket(self, bucket):
3838 default_key_desc = 'Default master key that protects my S3 objects when no other key is defined' # noqa
3839 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3840 if not self.data.get('enabled', True):
3841 s3.delete_bucket_encryption(Bucket=bucket['Name'])
3842 return
3843 algo = self.data.get('crypto', 'AES256')
3845 # bucket key defaults to True for alias/aws/s3 and AES256 (Amazon SSE)
3846 # and ignores False values for that crypto
3847 bucket_key = self.data.get('bucket-key', True)
3848 config = {
3849 'Rules': [
3850 {
3851 'ApplyServerSideEncryptionByDefault': {
3852 'SSEAlgorithm': algo,
3853 },
3854 'BucketKeyEnabled': bucket_key
3855 }
3856 ]
3857 }
3859 if algo == 'aws:kms':
3860 key = self.get_key(bucket)
3861 if not key:
3862 raise Exception('Valid KMS Key required but does not exist')
3864 config['Rules'][0]['ApplyServerSideEncryptionByDefault']['KMSMasterKeyID'] = key['Arn']
3865 s3.put_bucket_encryption(
3866 Bucket=bucket['Name'],
3867 ServerSideEncryptionConfiguration=config
3868 )
3871OWNERSHIP_CONTROLS = ['BucketOwnerEnforced', 'BucketOwnerPreferred', 'ObjectWriter']
3872VALUE_FILTER_MAGIC_VALUES = ['absent', 'present', 'not-null', 'empty']
3875@filters.register('ownership')
3876class BucketOwnershipControls(BucketFilterBase, ValueFilter):
3877 """Filter for object ownership controls
3879 Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html
3881 :example
3883 Find buckets with ACLs disabled
3885 .. code-block:: yaml
3887 policies:
3888 - name: s3-bucket-acls-disabled
3889 resource: aws.s3
3890 region: us-east-1
3891 filters:
3892 - type: ownership
3893 value: BucketOwnerEnforced
3895 :example
3897 Find buckets with object ownership preferred or enforced
3899 .. code-block:: yaml
3901 policies:
3902 - name: s3-bucket-ownership-preferred
3903 resource: aws.s3
3904 region: us-east-1
3905 filters:
3906 - type: ownership
3907 op: in
3908 value:
3909 - BucketOwnerEnforced
3910 - BucketOwnerPreferred
3912 :example
3914 Find buckets with no object ownership controls
3916 .. code-block:: yaml
3918 policies:
3919 - name: s3-bucket-no-ownership-controls
3920 resource: aws.s3
3921 region: us-east-1
3922 filters:
3923 - type: ownership
3924 value: empty
3925 """
3926 schema = type_schema('ownership', rinherit=ValueFilter.schema, value={'oneOf': [
3927 {'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES},
3928 {'type': 'array', 'items': {
3929 'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}}]})
3930 permissions = ('s3:GetBucketOwnershipControls',)
3931 annotation_key = 'c7n:ownership'
3933 def __init__(self, data, manager=None):
3934 super(BucketOwnershipControls, self).__init__(data, manager)
3936 # Ownership controls appear as an array of rules. There can only be one
3937 # ObjectOwnership rule defined for a bucket, so we can automatically
3938 # match against that if it exists.
3939 self.data['key'] = f'("{self.annotation_key}".Rules[].ObjectOwnership)[0]'
3941 def process(self, buckets, event=None):
3942 with self.executor_factory(max_workers=2) as w:
3943 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3944 for future in as_completed(futures):
3945 b = futures[future]
3946 if future.exception():
3947 self.log.error("Message: %s Bucket: %s", future.exception(),
3948 b['Name'])
3949 continue
3950 return super(BucketOwnershipControls, self).process(buckets, event)
3952 def process_bucket(self, b):
3953 if self.annotation_key in b:
3954 return
3955 client = bucket_client(local_session(self.manager.session_factory), b)
3956 try:
3957 controls = client.get_bucket_ownership_controls(Bucket=b['Name'])
3958 controls.pop('ResponseMetadata', None)
3959 except ClientError as e:
3960 if e.response['Error']['Code'] != 'OwnershipControlsNotFoundError':
3961 raise
3962 controls = {}
3963 b[self.annotation_key] = controls.get('OwnershipControls')
3966@filters.register('bucket-replication')
3967class BucketReplication(ListItemFilter):
3968 """Filter for S3 buckets to look at bucket replication configurations
3970 The schema to supply to the attrs follows the schema here:
3971 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_bucket_replication.html
3973 :example:
3975 .. code-block:: yaml
3977 policies:
3978 - name: s3-bucket-replication
3979 resource: s3
3980 filters:
3981 - type: bucket-replication
3982 attrs:
3983 - Status: Enabled
3984 - Filter:
3985 And:
3986 Prefix: test
3987 Tags:
3988 - Key: Owner
3989 Value: c7n
3990 - ExistingObjectReplication: Enabled
3992 """
3993 schema = type_schema(
3994 'bucket-replication',
3995 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'},
3996 count={'type': 'number'},
3997 count_op={'$ref': '#/definitions/filters_common/comparison_operators'}
3998 )
4000 permissions = ("s3:GetReplicationConfiguration",)
4001 annotation_key = 'Replication'
4002 annotate_items = True
4004 def __init__(self, data, manager=None):
4005 super().__init__(data, manager)
4006 self.data['key'] = self.annotation_key
4008 def get_item_values(self, b):
4009 client = bucket_client(local_session(self.manager.session_factory), b)
4010 # replication configuration is called in S3_AUGMENT_TABLE:
4011 bucket_replication = b.get(self.annotation_key)
4013 rules = []
4014 if bucket_replication is not None:
4015 rules = bucket_replication.get('ReplicationConfiguration', {}).get('Rules', [])
4016 for replication in rules:
4017 self.augment_bucket_replication(b, replication, client)
4019 return rules
4021 def augment_bucket_replication(self, b, replication, client):
4022 destination_bucket = replication.get('Destination').get('Bucket').split(':')[5]
4023 try:
4024 destination_region = inspect_bucket_region(destination_bucket, client.meta.endpoint_url)
4025 except ValueError:
4026 replication['DestinationBucketAvailable'] = False
4027 return
4028 source_region = get_region(b)
4029 replication['DestinationBucketAvailable'] = True
4030 replication['DestinationRegion'] = destination_region
4031 replication['CrossRegion'] = destination_region != source_region
4034@resources.register('s3-directory')
4035class S3Directory(query.QueryResourceManager):
4037 class resource_type(query.TypeInfo):
4038 service = 's3'
4039 permission_prefix = "s3express"
4040 arn_service = "s3express"
4041 arn_type = 'bucket'
4042 enum_spec = ('list_directory_buckets', 'Buckets[]', None)
4043 name = id = 'Name'
4044 date = 'CreationDate'
4045 dimension = 'BucketName'
4046 cfn_type = 'AWS::S3Express::DirectoryBucket'
4047 permissions_enum = ("s3express:ListAllMyDirectoryBuckets",)