Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/s3.py: 23%
1630 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""S3 Resource Manager
5Filters:
7The generic Values filters (jmespath) expression and Or filter are
8available with all resources, including buckets, we include several
9additonal bucket data (Tags, Replication, Acl, Policy) as keys within
10a bucket representation.
12Actions:
14 encrypt-keys
16 Scan all keys in a bucket and optionally encrypt them in place.
18 global-grants
20 Check bucket acls for global grants
22 encryption-policy
24 Attach an encryption required policy to a bucket, this will break
25 applications that are not using encryption, including aws log
26 delivery.
28"""
29import copy
30import functools
31import json
32import itertools
33import logging
34import math
35import os
36import time
37import ssl
39from botocore.client import Config
40from botocore.exceptions import ClientError
42from collections import defaultdict
43from concurrent.futures import as_completed
45try:
46 from urllib3.exceptions import SSLError
47except ImportError:
48 from botocore.vendored.requests.packages.urllib3.exceptions import SSLError
51from c7n.actions import (
52 ActionRegistry, BaseAction, PutMetric, RemovePolicyBase)
53from c7n.exceptions import PolicyValidationError, PolicyExecutionError
54from c7n.filters import (
55 FilterRegistry, Filter, CrossAccountAccessFilter, MetricsFilter,
56 ValueFilter, ListItemFilter)
57from .aws import shape_validate
58import c7n.filters.policystatement as polstmt_filter
59from c7n.manager import resources
60from c7n.output import NullBlobOutput
61from c7n import query
62from c7n.resources.securityhub import PostFinding
63from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction
64from c7n.utils import (
65 chunks, local_session, set_annotation, type_schema, filter_empty,
66 dumps, format_string_values, get_account_alias_from_sts)
67from c7n.resources.aws import inspect_bucket_region
70log = logging.getLogger('custodian.s3')
72filters = FilterRegistry('s3.filters')
73actions = ActionRegistry('s3.actions')
74filters.register('marked-for-op', TagActionFilter)
75actions.register('put-metric', PutMetric)
77MAX_COPY_SIZE = 1024 * 1024 * 1024 * 2
80class DescribeS3(query.DescribeSource):
82 def augment(self, buckets):
83 with self.manager.executor_factory(
84 max_workers=min((10, len(buckets) + 1))) as w:
85 results = w.map(
86 assemble_bucket,
87 zip(itertools.repeat(self.manager.session_factory), buckets))
88 results = list(filter(None, results))
89 return results
92class ConfigS3(query.ConfigSource):
94 # normalize config's janky idiosyncratic bespoke formating to the
95 # standard describe api responses.
97 def get_query_params(self, query):
98 q = super(ConfigS3, self).get_query_params(query)
99 if 'expr' in q:
100 q['expr'] = q['expr'].replace('select ', 'select awsRegion, ')
101 return q
103 def load_resource(self, item):
104 resource = super(ConfigS3, self).load_resource(item)
105 cfg = item['supplementaryConfiguration']
106 # aka standard
107 if 'awsRegion' in item and item['awsRegion'] != 'us-east-1':
108 resource['Location'] = {'LocationConstraint': item['awsRegion']}
109 else:
110 resource['Location'] = {}
112 # owner is under acl per describe
113 resource.pop('Owner', None)
115 for k, null_value in S3_CONFIG_SUPPLEMENT_NULL_MAP.items():
116 if k not in cfg:
117 continue
118 if cfg.get(k) == null_value:
119 continue
120 method = getattr(self, "handle_%s" % k, None)
121 if method is None:
122 raise ValueError("unhandled supplementary config %s", k)
123 continue
124 v = cfg[k]
125 if isinstance(cfg[k], str):
126 v = json.loads(cfg[k])
127 method(resource, v)
129 for el in S3_AUGMENT_TABLE:
130 if el[1] not in resource:
131 resource[el[1]] = el[2]
132 return resource
134 PERMISSION_MAP = {
135 'FullControl': 'FULL_CONTROL',
136 'Write': 'WRITE',
137 'WriteAcp': 'WRITE_ACP',
138 'Read': 'READ',
139 'ReadAcp': 'READ_ACP'}
141 GRANTEE_MAP = {
142 'AllUsers': "http://acs.amazonaws.com/groups/global/AllUsers",
143 'AuthenticatedUsers': "http://acs.amazonaws.com/groups/global/AuthenticatedUsers",
144 'LogDelivery': 'http://acs.amazonaws.com/groups/s3/LogDelivery'}
146 def handle_AccessControlList(self, resource, item_value):
147 # double serialized in config for some reason
148 if isinstance(item_value, str):
149 item_value = json.loads(item_value)
151 resource['Acl'] = {}
152 resource['Acl']['Owner'] = {'ID': item_value['owner']['id']}
153 if item_value['owner']['displayName']:
154 resource['Acl']['Owner']['DisplayName'] = item_value[
155 'owner']['displayName']
156 resource['Acl']['Grants'] = grants = []
158 for g in (item_value.get('grantList') or ()):
159 if 'id' not in g['grantee']:
160 assert g['grantee'] in self.GRANTEE_MAP, "unknown grantee %s" % g
161 rg = {'Type': 'Group', 'URI': self.GRANTEE_MAP[g['grantee']]}
162 else:
163 rg = {'ID': g['grantee']['id'], 'Type': 'CanonicalUser'}
165 if 'displayName' in g:
166 rg['DisplayName'] = g['displayName']
168 grants.append({
169 'Permission': self.PERMISSION_MAP[g['permission']],
170 'Grantee': rg,
171 })
173 def handle_BucketAccelerateConfiguration(self, resource, item_value):
174 # not currently auto-augmented by custodian
175 return
177 def handle_BucketLoggingConfiguration(self, resource, item_value):
178 if ('destinationBucketName' not in item_value or
179 item_value['destinationBucketName'] is None):
180 resource[u'Logging'] = {}
181 return
182 resource[u'Logging'] = {
183 'TargetBucket': item_value['destinationBucketName'],
184 'TargetPrefix': item_value['logFilePrefix']}
186 def handle_BucketLifecycleConfiguration(self, resource, item_value):
187 rules = []
188 for r in item_value.get('rules'):
189 rr = {}
190 rules.append(rr)
191 expiry = {}
192 for ek, ck in (
193 ('Date', 'expirationDate'),
194 ('ExpiredObjectDeleteMarker', 'expiredObjectDeleteMarker'),
195 ('Days', 'expirationInDays')):
196 if ck in r and r[ck] and r[ck] != -1:
197 expiry[ek] = r[ck]
198 if expiry:
199 rr['Expiration'] = expiry
201 transitions = []
202 for t in (r.get('transitions') or ()):
203 tr = {}
204 for k in ('date', 'days', 'storageClass'):
205 if t.get(k):
206 tr["%s%s" % (k[0].upper(), k[1:])] = t[k]
207 transitions.append(tr)
208 if transitions:
209 rr['Transitions'] = transitions
211 if r.get('abortIncompleteMultipartUpload'):
212 rr['AbortIncompleteMultipartUpload'] = {
213 'DaysAfterInitiation': r[
214 'abortIncompleteMultipartUpload']['daysAfterInitiation']}
215 if r.get('noncurrentVersionExpirationInDays'):
216 rr['NoncurrentVersionExpiration'] = {
217 'NoncurrentDays': r['noncurrentVersionExpirationInDays']}
219 nonc_transitions = []
220 for t in (r.get('noncurrentVersionTransitions') or ()):
221 nonc_transitions.append({
222 'NoncurrentDays': t['days'],
223 'StorageClass': t['storageClass']})
224 if nonc_transitions:
225 rr['NoncurrentVersionTransitions'] = nonc_transitions
227 rr['Status'] = r['status']
228 rr['ID'] = r['id']
229 if r.get('prefix'):
230 rr['Prefix'] = r['prefix']
231 if 'filter' not in r or not r['filter']:
232 continue
234 if r['filter']['predicate']:
235 rr['Filter'] = self.convertLifePredicate(r['filter']['predicate'])
237 resource['Lifecycle'] = {'Rules': rules}
239 def convertLifePredicate(self, p):
240 if p['type'] == 'LifecyclePrefixPredicate':
241 return {'Prefix': p['prefix']}
242 if p['type'] == 'LifecycleTagPredicate':
243 return {'Tags': [{'Key': p['tag']['key'], 'Value': p['tag']['value']}]}
244 if p['type'] == 'LifecycleAndOperator':
245 n = {}
246 for o in p['operands']:
247 ot = self.convertLifePredicate(o)
248 if 'Tags' in n and 'Tags' in ot:
249 n['Tags'].extend(ot['Tags'])
250 else:
251 n.update(ot)
252 return {'And': n}
254 raise ValueError("unknown predicate: %s" % p)
256 NotifyTypeMap = {
257 'QueueConfiguration': 'QueueConfigurations',
258 'LambdaConfiguration': 'LambdaFunctionConfigurations',
259 'CloudFunctionConfiguration': 'LambdaFunctionConfigurations',
260 'TopicConfiguration': 'TopicConfigurations'}
262 def handle_BucketNotificationConfiguration(self, resource, item_value):
263 d = {}
264 for nid, n in item_value['configurations'].items():
265 ninfo = {}
266 d.setdefault(self.NotifyTypeMap[n['type']], []).append(ninfo)
267 if n['type'] == 'QueueConfiguration':
268 ninfo['QueueArn'] = n['queueARN']
269 elif n['type'] == 'TopicConfiguration':
270 ninfo['TopicArn'] = n['topicARN']
271 elif n['type'] == 'LambdaConfiguration':
272 ninfo['LambdaFunctionArn'] = n['functionARN']
273 ninfo['Id'] = nid
274 ninfo['Events'] = n['events']
275 rules = []
276 if n['filter']:
277 for r in n['filter'].get('s3KeyFilter', {}).get('filterRules', []):
278 rules.append({'Name': r['name'], 'Value': r['value']})
279 if rules:
280 ninfo['Filter'] = {'Key': {'FilterRules': rules}}
281 resource['Notification'] = d
283 def handle_BucketReplicationConfiguration(self, resource, item_value):
284 d = {'Role': item_value['roleARN'], 'Rules': []}
285 for rid, r in item_value['rules'].items():
286 rule = {
287 'ID': rid,
288 'Status': r.get('status', ''),
289 'Prefix': r.get('prefix', ''),
290 'Destination': {
291 'Bucket': r['destinationConfig']['bucketARN']}
292 }
293 if 'Account' in r['destinationConfig']:
294 rule['Destination']['Account'] = r['destinationConfig']['Account']
295 if r['destinationConfig'].get('storageClass'):
296 rule['Destination']['StorageClass'] = r['destinationConfig']['storageClass']
297 d['Rules'].append(rule)
298 resource['Replication'] = {'ReplicationConfiguration': d}
300 def handle_BucketPolicy(self, resource, item_value):
301 resource['Policy'] = item_value.get('policyText')
303 def handle_BucketTaggingConfiguration(self, resource, item_value):
304 resource['Tags'] = [
305 {"Key": k, "Value": v} for k, v in item_value['tagSets'][0]['tags'].items()]
307 def handle_BucketVersioningConfiguration(self, resource, item_value):
308 # Config defaults versioning to 'Off' for a null value
309 if item_value['status'] not in ('Enabled', 'Suspended'):
310 resource['Versioning'] = {}
311 return
312 resource['Versioning'] = {'Status': item_value['status']}
313 # `isMfaDeleteEnabled` is an optional boolean property - the key may be absent,
314 # present with a null value, or present with a boolean value.
315 # Mirror the describe source by populating Versioning.MFADelete only in the
316 # boolean case.
317 mfa_delete = item_value.get('isMfaDeleteEnabled')
318 if mfa_delete is None:
319 return
320 resource['Versioning']['MFADelete'] = (
321 'Enabled' if mfa_delete else 'Disabled'
322 )
324 def handle_BucketWebsiteConfiguration(self, resource, item_value):
325 website = {}
326 if item_value['indexDocumentSuffix']:
327 website['IndexDocument'] = {
328 'Suffix': item_value['indexDocumentSuffix']}
329 if item_value['errorDocument']:
330 website['ErrorDocument'] = {
331 'Key': item_value['errorDocument']}
332 if item_value['redirectAllRequestsTo']:
333 website['RedirectAllRequestsTo'] = {
334 'HostName': item_value['redirectAllRequestsTo']['hostName'],
335 'Protocol': item_value['redirectAllRequestsTo']['protocol']}
336 for r in item_value['routingRules']:
337 redirect = {}
338 rule = {'Redirect': redirect}
339 website.setdefault('RoutingRules', []).append(rule)
340 if 'condition' in r:
341 cond = {}
342 for ck, rk in (
343 ('keyPrefixEquals', 'KeyPrefixEquals'),
344 ('httpErrorCodeReturnedEquals',
345 'HttpErrorCodeReturnedEquals')):
346 if r['condition'][ck]:
347 cond[rk] = r['condition'][ck]
348 rule['Condition'] = cond
349 for ck, rk in (
350 ('protocol', 'Protocol'),
351 ('hostName', 'HostName'),
352 ('replaceKeyPrefixWith', 'ReplaceKeyPrefixWith'),
353 ('replaceKeyWith', 'ReplaceKeyWith'),
354 ('httpRedirectCode', 'HttpRedirectCode')):
355 if r['redirect'][ck]:
356 redirect[rk] = r['redirect'][ck]
357 resource['Website'] = website
360@resources.register('s3')
361class S3(query.QueryResourceManager):
363 class resource_type(query.TypeInfo):
364 service = 's3'
365 arn_type = ''
366 enum_spec = ('list_buckets', 'Buckets[]', None)
367 # not used but we want some consistency on the metadata
368 detail_spec = ('get_bucket_location', 'Bucket', 'Name', 'LocationConstraint')
369 name = id = 'Name'
370 date = 'CreationDate'
371 dimension = 'BucketName'
372 cfn_type = config_type = 'AWS::S3::Bucket'
374 filter_registry = filters
375 action_registry = actions
376 source_mapping = {
377 'describe': DescribeS3,
378 'config': ConfigS3
379 }
381 def get_arns(self, resources):
382 return ["arn:aws:s3:::{}".format(r["Name"]) for r in resources]
384 @classmethod
385 def get_permissions(cls):
386 perms = ["s3:ListAllMyBuckets"]
387 perms.extend([n[-1] for n in S3_AUGMENT_TABLE])
388 return perms
391S3_CONFIG_SUPPLEMENT_NULL_MAP = {
392 'BucketLoggingConfiguration': u'{"destinationBucketName":null,"logFilePrefix":null}',
393 'BucketPolicy': u'{"policyText":null}',
394 'BucketVersioningConfiguration': u'{"status":"Off","isMfaDeleteEnabled":null}',
395 'BucketAccelerateConfiguration': u'{"status":null}',
396 'BucketNotificationConfiguration': u'{"configurations":{}}',
397 'BucketLifecycleConfiguration': None,
398 'AccessControlList': None,
399 'BucketTaggingConfiguration': None,
400 'BucketWebsiteConfiguration': None,
401 'BucketReplicationConfiguration': None
402}
404S3_AUGMENT_TABLE = (
405 ('get_bucket_location', 'Location', {}, None, 's3:GetBucketLocation'),
406 ('get_bucket_tagging', 'Tags', [], 'TagSet', 's3:GetBucketTagging'),
407 ('get_bucket_policy', 'Policy', None, 'Policy', 's3:GetBucketPolicy'),
408 ('get_bucket_acl', 'Acl', None, None, 's3:GetBucketAcl'),
409 ('get_bucket_replication',
410 'Replication', None, None, 's3:GetReplicationConfiguration'),
411 ('get_bucket_versioning', 'Versioning', None, None, 's3:GetBucketVersioning'),
412 ('get_bucket_website', 'Website', None, None, 's3:GetBucketWebsite'),
413 ('get_bucket_logging', 'Logging', None, 'LoggingEnabled', 's3:GetBucketLogging'),
414 ('get_bucket_notification_configuration',
415 'Notification', None, None, 's3:GetBucketNotification'),
416 ('get_bucket_lifecycle_configuration',
417 'Lifecycle', None, None, 's3:GetLifecycleConfiguration'),
418 # ('get_bucket_cors', 'Cors'),
419)
422def assemble_bucket(item):
423 """Assemble a document representing all the config state around a bucket.
425 TODO: Refactor this, the logic here feels quite muddled.
426 """
427 factory, b = item
428 s = factory()
429 c = s.client('s3')
430 # Bucket Location, Current Client Location, Default Location
431 b_location = c_location = location = "us-east-1"
432 methods = list(S3_AUGMENT_TABLE)
433 for minfo in methods:
434 m, k, default, select = minfo[:4]
435 try:
436 method = getattr(c, m)
437 v = method(Bucket=b['Name'])
438 v.pop('ResponseMetadata')
439 if select is not None and select in v:
440 v = v[select]
441 except (ssl.SSLError, SSLError) as e:
442 # Proxy issues? i assume
443 log.warning("Bucket ssl error %s: %s %s",
444 b['Name'], b.get('Location', 'unknown'),
445 e)
446 continue
447 except ClientError as e:
448 code = e.response['Error']['Code']
449 if code.startswith("NoSuch") or "NotFound" in code:
450 v = default
451 elif code == 'PermanentRedirect':
452 s = factory()
453 c = bucket_client(s, b)
454 # Requeue with the correct region given location constraint
455 methods.append((m, k, default, select))
456 continue
457 else:
458 log.warning(
459 "Bucket:%s unable to invoke method:%s error:%s ",
460 b['Name'], m, e.response['Error']['Message'])
461 # For auth failures, we don't bail out, continue processing if we can.
462 # Note this can lead to missing data, but in general is cleaner than
463 # failing hard, due to the common use of locked down s3 bucket policies
464 # that may cause issues fetching information across a fleet of buckets.
466 # This does mean s3 policies depending on augments should check denied
467 # methods annotation, generally though lacking get access to an augment means
468 # they won't have write access either.
470 # For other error types we raise and bail policy execution.
471 if e.response['Error']['Code'] == 'AccessDenied':
472 b.setdefault('c7n:DeniedMethods', []).append(m)
473 continue
474 raise
475 # As soon as we learn location (which generally works)
476 if k == 'Location' and v is not None:
477 b_location = v.get('LocationConstraint')
478 # Location == region for all cases but EU
479 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlocation.html
480 if b_location is None:
481 b_location = "us-east-1"
482 elif b_location == 'EU':
483 b_location = "eu-west-1"
484 v['LocationConstraint'] = 'eu-west-1'
485 if v and v != c_location:
486 c = s.client('s3', region_name=b_location)
487 elif c_location != location:
488 c = s.client('s3', region_name=location)
489 b[k] = v
490 return b
493def bucket_client(session, b, kms=False):
494 region = get_region(b)
496 if kms:
497 # Need v4 signature for aws:kms crypto, else let the sdk decide
498 # based on region support.
499 config = Config(
500 signature_version='s3v4',
501 read_timeout=200, connect_timeout=120)
502 else:
503 config = Config(read_timeout=200, connect_timeout=120)
504 return session.client('s3', region_name=region, config=config)
507def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()):
508 for bucket in buckets:
509 client = bucket_client(local_session(session_factory), bucket)
510 # Bucket tags are set atomically for the set/document, we want
511 # to refetch against current to guard against any staleness in
512 # our cached representation across multiple policies or concurrent
513 # modifications.
515 if 'get_bucket_tagging' in bucket.get('c7n:DeniedMethods', []):
516 # avoid the additional API call if we already know that it's going
517 # to result in AccessDenied. The chances that the resource's perms
518 # would have changed between fetching the resource and acting on it
519 # here are pretty low-- so the check here should suffice.
520 log.warning(
521 "Unable to get new set of bucket tags needed to modify tags,"
522 "skipping tag action for bucket: %s" % bucket["Name"])
523 continue
525 try:
526 bucket['Tags'] = client.get_bucket_tagging(
527 Bucket=bucket['Name']).get('TagSet', [])
528 except ClientError as e:
529 if e.response['Error']['Code'] != 'NoSuchTagSet':
530 raise
531 bucket['Tags'] = []
533 new_tags = {t['Key']: t['Value'] for t in add_tags}
534 for t in bucket.get('Tags', ()):
535 if (t['Key'] not in new_tags and t['Key'] not in remove_tags):
536 new_tags[t['Key']] = t['Value']
537 tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()]
539 try:
540 client.put_bucket_tagging(
541 Bucket=bucket['Name'], Tagging={'TagSet': tag_set})
542 except ClientError as e:
543 log.exception(
544 'Exception tagging bucket %s: %s', bucket['Name'], e)
545 continue
548def get_region(b):
549 """Tries to get the bucket region from Location.LocationConstraint
551 Special cases:
552 LocationConstraint EU defaults to eu-west-1
553 LocationConstraint null defaults to us-east-1
555 Args:
556 b (object): A bucket object
558 Returns:
559 string: an aws region string
560 """
561 remap = {None: 'us-east-1', 'EU': 'eu-west-1'}
562 region = b.get('Location', {}).get('LocationConstraint')
563 return remap.get(region, region)
566@filters.register('metrics')
567class S3Metrics(MetricsFilter):
568 """S3 CW Metrics need special handling for attribute/dimension
569 mismatch, and additional required dimension.
570 """
572 def get_dimensions(self, resource):
573 dims = [{'Name': 'BucketName', 'Value': resource['Name']}]
574 if (self.data['name'] == 'NumberOfObjects' and
575 'dimensions' not in self.data):
576 dims.append(
577 {'Name': 'StorageType', 'Value': 'AllStorageTypes'})
578 return dims
581@filters.register('cross-account')
582class S3CrossAccountFilter(CrossAccountAccessFilter):
583 """Filters cross-account access to S3 buckets
585 :example:
587 .. code-block:: yaml
589 policies:
590 - name: s3-acl
591 resource: s3
592 region: us-east-1
593 filters:
594 - type: cross-account
595 """
596 permissions = ('s3:GetBucketPolicy',)
598 def get_accounts(self):
599 """add in elb access by default
601 ELB Accounts by region
602 https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-access-logs.html
604 Redshift Accounts by region
605 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#db-auditing-manage-log-files
606 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#rs-db-auditing-cloud-trail-rs-acct-ids
608 Cloudtrail Accounts by region
609 https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-supported-regions.html
610 """
611 accounts = super(S3CrossAccountFilter, self).get_accounts()
612 return accounts.union(
613 [
614 # ELB accounts
615 '127311923021', # us-east-1
616 '033677994240', # us-east-2
617 '027434742980', # us-west-1
618 '797873946194', # us-west-2
619 '098369216593', # af-south-1
620 '985666609251', # ca-central-1
621 '054676820928', # eu-central-1
622 '897822967062', # eu-north-1
623 '635631232127', # eu-south-1
624 '156460612806', # eu-west-1
625 '652711504416', # eu-west-2
626 '009996457667', # eu-west-3
627 '754344448648', # ap-east-1
628 '582318560864', # ap-northeast-1
629 '600734575887', # ap-northeast-2
630 '383597477331', # ap-northeast-3
631 '114774131450', # ap-southeast-1
632 '783225319266', # ap-southeast-2
633 '718504428378', # ap-south-1
634 '076674570225', # me-south-1
635 '507241528517', # sa-east-1
636 '048591011584', # us-gov-west-1 or gov-cloud-1
637 '190560391635', # us-gov-east-1
638 '638102146993', # cn-north-1
639 '037604701340', # cn-northwest-1
641 # Redshift audit logging
642 '193672423079', # us-east-1
643 '391106570357', # us-east-2
644 '262260360010', # us-west-1
645 '902366379725', # us-west-2
646 '365689465814', # af-south-1
647 '313564881002', # ap-east-1
648 '865932855811', # ap-south-1
649 '090321488786', # ap-northeast-3
650 '760740231472', # ap-northeast-2
651 '361669875840', # ap-southeast-1
652 '762762565011', # ap-southeast-2
653 '404641285394', # ap-northeast-1
654 '907379612154', # ca-central-1
655 '053454850223', # eu-central-1
656 '210876761215', # eu-west-1
657 '307160386991', # eu-west-2
658 '945612479654', # eu-south-1
659 '915173422425', # eu-west-3
660 '729911121831', # eu-north-1
661 '013126148197', # me-south-1
662 '075028567923', # sa-east-1
664 # Cloudtrail accounts (psa. folks should be using
665 # cloudtrail service in bucket policies)
666 '086441151436', # us-east-1
667 '475085895292', # us-west-2
668 '388731089494', # us-west-1
669 '113285607260', # us-west-2
670 '819402241893', # ca-central-1
671 '977081816279', # ap-south-1
672 '492519147666', # ap-northeast-2
673 '903692715234', # ap-southeast-1
674 '284668455005', # ap-southeast-2
675 '216624486486', # ap-northeast-1
676 '035351147821', # eu-central-1
677 '859597730677', # eu-west-1
678 '282025262664', # eu-west-2
679 '814480443879', # sa-east-1
680 ])
683@filters.register('global-grants')
684class GlobalGrantsFilter(Filter):
685 """Filters for all S3 buckets that have global-grants
687 *Note* by default this filter allows for read access
688 if the bucket has been configured as a website. This
689 can be disabled per the example below.
691 :example:
693 .. code-block:: yaml
695 policies:
696 - name: remove-global-grants
697 resource: s3
698 filters:
699 - type: global-grants
700 allow_website: false
701 actions:
702 - delete-global-grants
704 """
706 schema = type_schema(
707 'global-grants',
708 allow_website={'type': 'boolean'},
709 operator={'type': 'string', 'enum': ['or', 'and']},
710 permissions={
711 'type': 'array', 'items': {
712 'type': 'string', 'enum': [
713 'READ', 'WRITE', 'WRITE_ACP', 'READ_ACP', 'FULL_CONTROL']}})
715 GLOBAL_ALL = "http://acs.amazonaws.com/groups/global/AllUsers"
716 AUTH_ALL = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers"
718 def process(self, buckets, event=None):
719 with self.executor_factory(max_workers=5) as w:
720 results = w.map(self.process_bucket, buckets)
721 results = list(filter(None, list(results)))
722 return results
724 def process_bucket(self, b):
725 acl = b.get('Acl', {'Grants': []})
726 if not acl or not acl['Grants']:
727 return
729 results = []
730 allow_website = self.data.get('allow_website', True)
731 perms = self.data.get('permissions', [])
733 for grant in acl['Grants']:
734 if 'URI' not in grant.get("Grantee", {}):
735 continue
736 if grant['Grantee']['URI'] not in [self.AUTH_ALL, self.GLOBAL_ALL]:
737 continue
738 if allow_website and grant['Permission'] == 'READ' and b['Website']:
739 continue
740 if not perms or (perms and grant['Permission'] in perms):
741 results.append(grant['Permission'])
743 if results:
744 set_annotation(b, 'GlobalPermissions', results)
745 return b
748class BucketActionBase(BaseAction):
750 def get_permissions(self):
751 return self.permissions
753 def get_std_format_args(self, bucket):
754 return {
755 'account_id': self.manager.config.account_id,
756 'region': self.manager.config.region,
757 'bucket_name': bucket['Name'],
758 'bucket_region': get_region(bucket)
759 }
761 def process(self, buckets):
762 return self._process_with_futures(buckets)
764 def _process_with_futures(self, buckets, *args, max_workers=3, **kwargs):
765 errors = 0
766 results = []
767 with self.executor_factory(max_workers=max_workers) as w:
768 futures = {}
769 for b in buckets:
770 futures[w.submit(self.process_bucket, b, *args, **kwargs)] = b
771 for f in as_completed(futures):
772 if f.exception():
773 b = futures[f]
774 self.log.error(
775 'error modifying bucket: policy:%s action:%s bucket:%s error:%s',
776 self.manager.data.get('name'), self.name, b['Name'], f.exception()
777 )
778 errors += 1
779 continue
780 results += filter(None, [f.result()])
781 if errors:
782 self.log.error('encountered %d errors while processing %s', errors, self.name)
783 raise PolicyExecutionError('%d resources failed', errors)
784 return results
787class BucketFilterBase(Filter):
788 def get_std_format_args(self, bucket):
789 return {
790 'account_id': self.manager.config.account_id,
791 'region': self.manager.config.region,
792 'bucket_name': bucket['Name'],
793 'bucket_region': get_region(bucket)
794 }
797@S3.action_registry.register("post-finding")
798class BucketFinding(PostFinding):
800 resource_type = 'AwsS3Bucket'
802 def format_resource(self, r):
803 owner = r.get("Acl", {}).get("Owner", {})
804 resource = {
805 "Type": self.resource_type,
806 "Id": "arn:aws:s3:::{}".format(r["Name"]),
807 "Region": get_region(r),
808 "Tags": {t["Key"]: t["Value"] for t in r.get("Tags", [])},
809 "Details": {self.resource_type: {
810 "OwnerId": owner.get('ID', 'Unknown')}}
811 }
813 if "DisplayName" in owner:
814 resource["Details"]["AwsS3Bucket"]["OwnerName"] = owner['DisplayName']
816 return filter_empty(resource)
819@S3.filter_registry.register('has-statement')
820class HasStatementFilter(polstmt_filter.HasStatementFilter):
821 def get_std_format_args(self, bucket):
822 return {
823 'account_id': self.manager.config.account_id,
824 'region': self.manager.config.region,
825 'bucket_name': bucket['Name'],
826 'bucket_region': get_region(bucket)
827 }
830ENCRYPTION_STATEMENT_GLOB = {
831 'Effect': 'Deny',
832 'Principal': '*',
833 'Action': 's3:PutObject',
834 "Condition": {
835 "StringNotEquals": {
836 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}}
839@filters.register('no-encryption-statement')
840class EncryptionEnabledFilter(Filter):
841 """Find buckets with missing encryption policy statements.
843 :example:
845 .. code-block:: yaml
847 policies:
848 - name: s3-bucket-not-encrypted
849 resource: s3
850 filters:
851 - type: no-encryption-statement
852 """
853 schema = type_schema(
854 'no-encryption-statement')
856 def get_permissions(self):
857 perms = self.manager.get_resource_manager('s3').get_permissions()
858 return perms
860 def process(self, buckets, event=None):
861 return list(filter(None, map(self.process_bucket, buckets)))
863 def process_bucket(self, b):
864 p = b.get('Policy')
865 if p is None:
866 return b
867 p = json.loads(p)
868 encryption_statement = dict(ENCRYPTION_STATEMENT_GLOB)
870 statements = p.get('Statement', [])
871 check = False
872 for s in list(statements):
873 if 'Sid' in s:
874 encryption_statement["Sid"] = s["Sid"]
875 if 'Resource' in s:
876 encryption_statement["Resource"] = s["Resource"]
877 if s == encryption_statement:
878 check = True
879 break
880 if check:
881 return None
882 else:
883 return b
886@filters.register('missing-statement')
887@filters.register('missing-policy-statement')
888class MissingPolicyStatementFilter(Filter):
889 """Find buckets missing a set of named policy statements.
891 :example:
893 .. code-block:: yaml
895 policies:
896 - name: s3-bucket-missing-statement
897 resource: s3
898 filters:
899 - type: missing-statement
900 statement_ids:
901 - RequiredEncryptedPutObject
902 """
904 schema = type_schema(
905 'missing-policy-statement',
906 aliases=('missing-statement',),
907 statement_ids={'type': 'array', 'items': {'type': 'string'}})
909 def __call__(self, b):
910 p = b.get('Policy')
911 if p is None:
912 return b
914 p = json.loads(p)
916 required = list(self.data.get('statement_ids', []))
917 statements = p.get('Statement', [])
918 for s in list(statements):
919 if s.get('Sid') in required:
920 required.remove(s['Sid'])
921 if not required:
922 return False
923 return True
926@filters.register('bucket-notification')
927class BucketNotificationFilter(ValueFilter):
928 """Filter based on bucket notification configuration.
930 :example:
932 .. code-block:: yaml
934 policies:
935 - name: delete-incorrect-notification
936 resource: s3
937 filters:
938 - type: bucket-notification
939 kind: lambda
940 key: Id
941 value: "IncorrectLambda"
942 op: eq
943 actions:
944 - type: delete-bucket-notification
945 statement_ids: matched
946 """
948 schema = type_schema(
949 'bucket-notification',
950 required=['kind'],
951 kind={'type': 'string', 'enum': ['lambda', 'sns', 'sqs']},
952 rinherit=ValueFilter.schema)
953 schema_alias = False
954 annotation_key = 'c7n:MatchedNotificationConfigurationIds'
956 permissions = ('s3:GetBucketNotification',)
958 FIELDS = {
959 'lambda': 'LambdaFunctionConfigurations',
960 'sns': 'TopicConfigurations',
961 'sqs': 'QueueConfigurations'
962 }
964 def process(self, buckets, event=None):
965 return super(BucketNotificationFilter, self).process(buckets, event)
967 def __call__(self, bucket):
969 field = self.FIELDS[self.data['kind']]
970 found = False
971 for config in bucket.get('Notification', {}).get(field, []):
972 if self.match(config):
973 set_annotation(
974 bucket,
975 BucketNotificationFilter.annotation_key,
976 config['Id'])
977 found = True
978 return found
981@filters.register('bucket-logging')
982class BucketLoggingFilter(BucketFilterBase):
983 """Filter based on bucket logging configuration.
985 :example:
987 .. code-block:: yaml
989 policies:
990 - name: add-bucket-logging-if-missing
991 resource: s3
992 filters:
993 - type: bucket-logging
994 op: disabled
995 actions:
996 - type: toggle-logging
997 target_bucket: "{account_id}-{region}-s3-logs"
998 target_prefix: "{source_bucket_name}/"
1000 policies:
1001 - name: update-incorrect-or-missing-logging
1002 resource: s3
1003 filters:
1004 - type: bucket-logging
1005 op: not-equal
1006 target_bucket: "{account_id}-{region}-s3-logs"
1007 target_prefix: "{account}/{source_bucket_name}/"
1008 actions:
1009 - type: toggle-logging
1010 target_bucket: "{account_id}-{region}-s3-logs"
1011 target_prefix: "{account}/{source_bucket_name}/"
1012 """
1014 schema = type_schema(
1015 'bucket-logging',
1016 op={'enum': ['enabled', 'disabled', 'equal', 'not-equal', 'eq', 'ne']},
1017 required=['op'],
1018 target_bucket={'type': 'string'},
1019 target_prefix={'type': 'string'})
1020 schema_alias = False
1021 account_name = None
1023 permissions = ("s3:GetBucketLogging", "iam:ListAccountAliases")
1025 def process(self, buckets, event=None):
1026 return list(filter(None, map(self.process_bucket, buckets)))
1028 def process_bucket(self, b):
1029 if self.match_bucket(b):
1030 return b
1032 def match_bucket(self, b):
1033 op = self.data.get('op')
1035 logging = b.get('Logging', {})
1036 if op == 'disabled':
1037 return logging == {}
1038 elif op == 'enabled':
1039 return logging != {}
1041 if self.account_name is None:
1042 session = local_session(self.manager.session_factory)
1043 self.account_name = get_account_alias_from_sts(session)
1045 variables = self.get_std_format_args(b)
1046 variables.update({
1047 'account': self.account_name,
1048 'source_bucket_name': b['Name'],
1049 'source_bucket_region': get_region(b),
1050 'target_bucket_name': self.data.get('target_bucket'),
1051 'target_prefix': self.data.get('target_prefix'),
1052 })
1053 data = format_string_values(self.data, **variables)
1054 target_bucket = data.get('target_bucket')
1055 target_prefix = data.get('target_prefix', b['Name'] + '/')
1057 target_config = {
1058 "TargetBucket": target_bucket,
1059 "TargetPrefix": target_prefix
1060 } if target_bucket else {}
1062 if op in ('not-equal', 'ne'):
1063 return logging != target_config
1064 else:
1065 return logging == target_config
1068@actions.register('delete-bucket-notification')
1069class DeleteBucketNotification(BucketActionBase):
1070 """Action to delete S3 bucket notification configurations"""
1072 schema = type_schema(
1073 'delete-bucket-notification',
1074 required=['statement_ids'],
1075 statement_ids={'oneOf': [
1076 {'enum': ['matched']},
1077 {'type': 'array', 'items': {'type': 'string'}}]})
1079 permissions = ('s3:PutBucketNotification',)
1081 def process_bucket(self, bucket):
1082 n = bucket['Notification']
1083 if not n:
1084 return
1086 statement_ids = self.data.get('statement_ids')
1087 if statement_ids == 'matched':
1088 statement_ids = bucket.get(BucketNotificationFilter.annotation_key, ())
1089 if not statement_ids:
1090 return
1092 cfg = defaultdict(list)
1094 for t in BucketNotificationFilter.FIELDS.values():
1095 for c in n.get(t, []):
1096 if c['Id'] not in statement_ids:
1097 cfg[t].append(c)
1099 client = bucket_client(local_session(self.manager.session_factory), bucket)
1100 client.put_bucket_notification_configuration(
1101 Bucket=bucket['Name'],
1102 NotificationConfiguration=cfg)
1105@actions.register('no-op')
1106class NoOp(BucketActionBase):
1108 schema = type_schema('no-op')
1109 permissions = ('s3:ListAllMyBuckets',)
1111 def process(self, buckets):
1112 return None
1115@actions.register('set-statements')
1116class SetPolicyStatement(BucketActionBase):
1117 """Action to add or update policy statements to S3 buckets
1119 :example:
1121 .. code-block:: yaml
1123 policies:
1124 - name: force-s3-https
1125 resource: s3
1126 actions:
1127 - type: set-statements
1128 statements:
1129 - Sid: "DenyHttp"
1130 Effect: "Deny"
1131 Action: "s3:GetObject"
1132 Principal:
1133 AWS: "*"
1134 Resource: "arn:aws:s3:::{bucket_name}/*"
1135 Condition:
1136 Bool:
1137 "aws:SecureTransport": false
1138 """
1140 permissions = ('s3:PutBucketPolicy',)
1142 schema = type_schema(
1143 'set-statements',
1144 **{
1145 'statements': {
1146 'type': 'array',
1147 'items': {
1148 'type': 'object',
1149 'properties': {
1150 'Sid': {'type': 'string'},
1151 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']},
1152 'Principal': {'anyOf': [{'type': 'string'},
1153 {'type': 'object'}, {'type': 'array'}]},
1154 'NotPrincipal': {'anyOf': [{'type': 'object'}, {'type': 'array'}]},
1155 'Action': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1156 'NotAction': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1157 'Resource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1158 'NotResource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1159 'Condition': {'type': 'object'}
1160 },
1161 'required': ['Sid', 'Effect'],
1162 'oneOf': [
1163 {'required': ['Principal', 'Action', 'Resource']},
1164 {'required': ['NotPrincipal', 'Action', 'Resource']},
1165 {'required': ['Principal', 'NotAction', 'Resource']},
1166 {'required': ['NotPrincipal', 'NotAction', 'Resource']},
1167 {'required': ['Principal', 'Action', 'NotResource']},
1168 {'required': ['NotPrincipal', 'Action', 'NotResource']},
1169 {'required': ['Principal', 'NotAction', 'NotResource']},
1170 {'required': ['NotPrincipal', 'NotAction', 'NotResource']}
1171 ]
1172 }
1173 }
1174 }
1175 )
1177 def process_bucket(self, bucket):
1178 policy = bucket.get('Policy') or '{}'
1180 target_statements = format_string_values(
1181 copy.deepcopy({s['Sid']: s for s in self.data.get('statements', [])}),
1182 **self.get_std_format_args(bucket))
1184 policy = json.loads(policy)
1185 bucket_statements = policy.setdefault('Statement', [])
1187 for s in bucket_statements:
1188 if s.get('Sid') not in target_statements:
1189 continue
1190 if s == target_statements[s['Sid']]:
1191 target_statements.pop(s['Sid'])
1193 if not target_statements:
1194 return
1196 bucket_statements.extend(target_statements.values())
1197 policy = json.dumps(policy)
1199 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1200 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=policy)
1201 return {'Name': bucket['Name'], 'Policy': policy}
1204@actions.register('remove-statements')
1205class RemovePolicyStatement(RemovePolicyBase):
1206 """Action to remove policy statements from S3 buckets
1208 :example:
1210 .. code-block:: yaml
1212 policies:
1213 - name: s3-remove-encrypt-put
1214 resource: s3
1215 filters:
1216 - type: has-statement
1217 statement_ids:
1218 - RequireEncryptedPutObject
1219 actions:
1220 - type: remove-statements
1221 statement_ids:
1222 - RequiredEncryptedPutObject
1223 """
1225 permissions = ("s3:PutBucketPolicy", "s3:DeleteBucketPolicy")
1227 def process(self, buckets):
1228 with self.executor_factory(max_workers=3) as w:
1229 futures = {}
1230 results = []
1231 for b in buckets:
1232 futures[w.submit(self.process_bucket, b)] = b
1233 for f in as_completed(futures):
1234 if f.exception():
1235 b = futures[f]
1236 self.log.error('error modifying bucket:%s\n%s',
1237 b['Name'], f.exception())
1238 results += filter(None, [f.result()])
1239 return results
1241 def process_bucket(self, bucket):
1242 p = bucket.get('Policy')
1243 if p is None:
1244 return
1246 p = json.loads(p)
1248 statements, found = self.process_policy(
1249 p, bucket, CrossAccountAccessFilter.annotation_key)
1251 if not found:
1252 return
1254 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1256 if not statements:
1257 s3.delete_bucket_policy(Bucket=bucket['Name'])
1258 else:
1259 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=json.dumps(p))
1260 return {'Name': bucket['Name'], 'State': 'PolicyRemoved', 'Statements': found}
1263@actions.register('set-replication')
1264class SetBucketReplicationConfig(BucketActionBase):
1265 """Action to add or remove replication configuration statement from S3 buckets
1267 :example:
1269 .. code-block:: yaml
1271 policies:
1272 - name: s3-unapproved-account-replication
1273 resource: s3
1274 filters:
1275 - type: value
1276 key: Replication.ReplicationConfiguration.Rules[].Destination.Account
1277 value: present
1278 - type: value
1279 key: Replication.ReplicationConfiguration.Rules[].Destination.Account
1280 value_from:
1281 url: 's3:///path/to/file.json'
1282 format: json
1283 expr: "approved_accounts.*"
1284 op: ni
1285 actions:
1286 - type: set-replication
1287 state: enable
1288 """
1289 schema = type_schema(
1290 'set-replication',
1291 state={'type': 'string', 'enum': ['enable', 'disable', 'remove']})
1292 permissions = ("s3:GetReplicationConfiguration", "s3:PutReplicationConfiguration")
1294 def process(self, buckets):
1295 with self.executor_factory(max_workers=3) as w:
1296 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1297 errors = []
1298 for future in as_completed(futures):
1299 bucket = futures[future]
1300 try:
1301 future.result()
1302 except ClientError as e:
1303 errors.append("Message: %s Bucket: %s", e, bucket['Name'])
1304 if errors:
1305 raise Exception('\n'.join(map(str, errors)))
1307 def process_bucket(self, bucket):
1308 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1309 state = self.data.get('state')
1310 if state is not None:
1311 if state == 'remove':
1312 s3.delete_bucket_replication(Bucket=bucket['Name'])
1313 return {'Name': bucket['Name'], 'State': 'ReplicationConfigRemoved'}
1314 if state in ('enable', 'disable'):
1315 config = s3.get_bucket_replication(Bucket=bucket['Name'])
1316 for rule in config['ReplicationConfiguration']['Rules']:
1317 rule['Status'] = 'Enabled' if state == 'enable' else 'Disabled'
1318 s3.put_bucket_replication(
1319 Bucket=bucket['Name'],
1320 ReplicationConfiguration=config['ReplicationConfiguration']
1321 )
1322 return {'Name': bucket['Name'], 'State': 'ReplicationConfigUpdated'}
1325@filters.register('check-public-block')
1326class FilterPublicBlock(Filter):
1327 """Filter for s3 bucket public blocks
1329 If no filter paramaters are provided it checks to see if any are unset or False.
1331 If parameters are provided only the provided ones are checked.
1333 :example:
1335 .. code-block:: yaml
1337 policies:
1338 - name: CheckForPublicAclBlock-Off
1339 resource: s3
1340 region: us-east-1
1341 filters:
1342 - type: check-public-block
1343 BlockPublicAcls: true
1344 BlockPublicPolicy: true
1345 """
1347 schema = type_schema(
1348 'check-public-block',
1349 BlockPublicAcls={'type': 'boolean'},
1350 IgnorePublicAcls={'type': 'boolean'},
1351 BlockPublicPolicy={'type': 'boolean'},
1352 RestrictPublicBuckets={'type': 'boolean'})
1353 permissions = ("s3:GetBucketPublicAccessBlock",)
1354 keys = (
1355 'BlockPublicPolicy', 'BlockPublicAcls', 'IgnorePublicAcls', 'RestrictPublicBuckets')
1356 annotation_key = 'c7n:PublicAccessBlock'
1358 def process(self, buckets, event=None):
1359 results = []
1360 with self.executor_factory(max_workers=2) as w:
1361 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1362 for f in as_completed(futures):
1363 if f.result():
1364 results.append(futures[f])
1365 return results
1367 def process_bucket(self, bucket):
1368 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1369 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys}))
1370 if self.annotation_key not in bucket:
1371 try:
1372 config = s3.get_public_access_block(
1373 Bucket=bucket['Name'])['PublicAccessBlockConfiguration']
1374 except ClientError as e:
1375 error_code = e.response['Error']['Code']
1376 if error_code == 'NoSuchPublicAccessBlockConfiguration':
1377 pass
1378 elif error_code == 'AccessDenied':
1379 # Follow the same logic as `assemble_bucket` - log and continue on access
1380 # denied errors rather than halting a policy altogether
1381 method = 'GetPublicAccessBlock'
1382 log.warning(
1383 "Bucket:%s unable to invoke method:%s error:%s ",
1384 bucket['Name'], method, e.response['Error']['Message']
1385 )
1386 bucket.setdefault('c7n:DeniedMethods', []).append(method)
1387 else:
1388 raise
1389 bucket[self.annotation_key] = config
1390 return self.matches_filter(config)
1392 def matches_filter(self, config):
1393 key_set = [key for key in self.keys if key in self.data]
1394 if key_set:
1395 return all([self.data.get(key) is config[key] for key in key_set])
1396 else:
1397 return not all(config.values())
1400@actions.register('set-public-block')
1401class SetPublicBlock(BucketActionBase):
1402 """Action to update Public Access blocks on S3 buckets
1404 If no action parameters are provided all settings will be set to the `state`, which defaults
1406 If action parameters are provided, those will be set and other extant values preserved.
1408 :example:
1410 .. code-block:: yaml
1412 policies:
1413 - name: s3-public-block-enable-all
1414 resource: s3
1415 filters:
1416 - type: check-public-block
1417 actions:
1418 - type: set-public-block
1420 policies:
1421 - name: s3-public-block-disable-all
1422 resource: s3
1423 filters:
1424 - type: check-public-block
1425 actions:
1426 - type: set-public-block
1427 state: false
1429 policies:
1430 - name: s3-public-block-enable-some
1431 resource: s3
1432 filters:
1433 - or:
1434 - type: check-public-block
1435 BlockPublicAcls: false
1436 - type: check-public-block
1437 BlockPublicPolicy: false
1438 actions:
1439 - type: set-public-block
1440 BlockPublicAcls: true
1441 BlockPublicPolicy: true
1443 """
1445 schema = type_schema(
1446 'set-public-block',
1447 state={'type': 'boolean', 'default': True},
1448 BlockPublicAcls={'type': 'boolean'},
1449 IgnorePublicAcls={'type': 'boolean'},
1450 BlockPublicPolicy={'type': 'boolean'},
1451 RestrictPublicBuckets={'type': 'boolean'})
1452 permissions = ("s3:GetBucketPublicAccessBlock", "s3:PutBucketPublicAccessBlock")
1453 keys = FilterPublicBlock.keys
1454 annotation_key = FilterPublicBlock.annotation_key
1456 def process(self, buckets):
1457 with self.executor_factory(max_workers=3) as w:
1458 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1459 for future in as_completed(futures):
1460 future.result()
1462 def process_bucket(self, bucket):
1463 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1464 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys}))
1465 if self.annotation_key not in bucket:
1466 try:
1467 config = s3.get_public_access_block(
1468 Bucket=bucket['Name'])['PublicAccessBlockConfiguration']
1469 except ClientError as e:
1470 if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration':
1471 raise
1473 key_set = [key for key in self.keys if key in self.data]
1474 if key_set:
1475 for key in key_set:
1476 config[key] = self.data.get(key)
1477 else:
1478 for key in self.keys:
1479 config[key] = self.data.get('state', True)
1480 s3.put_public_access_block(
1481 Bucket=bucket['Name'], PublicAccessBlockConfiguration=config)
1484@actions.register('toggle-versioning')
1485class ToggleVersioning(BucketActionBase):
1486 """Action to enable/suspend versioning on a S3 bucket
1488 Note versioning can never be disabled only suspended.
1490 :example:
1492 .. code-block:: yaml
1494 policies:
1495 - name: s3-enable-versioning
1496 resource: s3
1497 filters:
1498 - or:
1499 - type: value
1500 key: Versioning.Status
1501 value: Suspended
1502 - type: value
1503 key: Versioning.Status
1504 value: absent
1505 actions:
1506 - type: toggle-versioning
1507 enabled: true
1508 """
1510 schema = type_schema(
1511 'toggle-versioning',
1512 enabled={'type': 'boolean'})
1513 permissions = ("s3:PutBucketVersioning",)
1515 def process_versioning(self, resource, state):
1516 client = bucket_client(
1517 local_session(self.manager.session_factory), resource)
1518 try:
1519 client.put_bucket_versioning(
1520 Bucket=resource['Name'],
1521 VersioningConfiguration={
1522 'Status': state})
1523 except ClientError as e:
1524 if e.response['Error']['Code'] != 'AccessDenied':
1525 log.error(
1526 "Unable to put bucket versioning on bucket %s: %s" % (resource['Name'], e))
1527 raise
1528 log.warning(
1529 "Access Denied Bucket:%s while put bucket versioning" % resource['Name'])
1531 # mfa delete enablement looks like it needs the serial and a current token.
1532 def process(self, resources):
1533 enabled = self.data.get('enabled', True)
1534 for r in resources:
1535 if 'Versioning' not in r or not r['Versioning']:
1536 r['Versioning'] = {'Status': 'Suspended'}
1537 if enabled and (
1538 r['Versioning']['Status'] == 'Suspended'):
1539 self.process_versioning(r, 'Enabled')
1540 if not enabled and r['Versioning']['Status'] == 'Enabled':
1541 self.process_versioning(r, 'Suspended')
1544@actions.register('toggle-logging')
1545class ToggleLogging(BucketActionBase):
1546 """Action to enable/disable logging on a S3 bucket.
1548 Target bucket ACL must allow for WRITE and READ_ACP Permissions
1549 Not specifying a target_prefix will default to the current bucket name.
1550 https://docs.aws.amazon.com/AmazonS3/latest/dev/enable-logging-programming.html
1552 :example:
1554 .. code-block:: yaml
1556 policies:
1557 - name: s3-enable-logging
1558 resource: s3
1559 filters:
1560 - "tag:Testing": present
1561 actions:
1562 - type: toggle-logging
1563 target_bucket: log-bucket
1564 target_prefix: logs123/
1566 policies:
1567 - name: s3-force-standard-logging
1568 resource: s3
1569 filters:
1570 - type: bucket-logging
1571 op: not-equal
1572 target_bucket: "{account_id}-{region}-s3-logs"
1573 target_prefix: "{account}/{source_bucket_name}/"
1574 actions:
1575 - type: toggle-logging
1576 target_bucket: "{account_id}-{region}-s3-logs"
1577 target_prefix: "{account}/{source_bucket_name}/"
1578 """
1579 schema = type_schema(
1580 'toggle-logging',
1581 enabled={'type': 'boolean'},
1582 target_bucket={'type': 'string'},
1583 target_prefix={'type': 'string'})
1585 permissions = ("s3:PutBucketLogging", "iam:ListAccountAliases")
1587 def validate(self):
1588 if self.data.get('enabled', True):
1589 if not self.data.get('target_bucket'):
1590 raise PolicyValidationError(
1591 "target_bucket must be specified on %s" % (
1592 self.manager.data,))
1593 return self
1595 def process(self, resources):
1596 session = local_session(self.manager.session_factory)
1597 kwargs = {
1598 "enabled": self.data.get('enabled', True),
1599 "session": session,
1600 "account_name": get_account_alias_from_sts(session),
1601 }
1603 return self._process_with_futures(resources, **kwargs)
1605 def process_bucket(self, r, enabled=None, session=None, account_name=None):
1606 client = bucket_client(session, r)
1607 is_logging = bool(r.get('Logging'))
1609 if enabled:
1610 variables = self.get_std_format_args(r)
1611 variables.update({
1612 'account': account_name,
1613 'source_bucket_name': r['Name'],
1614 'source_bucket_region': get_region(r),
1615 'target_bucket_name': self.data.get('target_bucket'),
1616 'target_prefix': self.data.get('target_prefix'),
1617 })
1618 data = format_string_values(self.data, **variables)
1619 config = {
1620 'TargetBucket': data.get('target_bucket'),
1621 'TargetPrefix': data.get('target_prefix', r['Name'] + '/')
1622 }
1623 if not is_logging or r.get('Logging') != config:
1624 client.put_bucket_logging(
1625 Bucket=r['Name'],
1626 BucketLoggingStatus={'LoggingEnabled': config}
1627 )
1628 r['Logging'] = config
1630 elif not enabled and is_logging:
1631 client.put_bucket_logging(
1632 Bucket=r['Name'], BucketLoggingStatus={})
1633 r['Logging'] = {}
1636@actions.register('attach-encrypt')
1637class AttachLambdaEncrypt(BucketActionBase):
1638 """Action attaches lambda encryption policy to S3 bucket
1639 supports attachment via lambda bucket notification or sns notification
1640 to invoke lambda. a special topic value of `default` will utilize an
1641 extant notification or create one matching the bucket name.
1643 :example:
1646 .. code-block:: yaml
1649 policies:
1650 - name: attach-lambda-encrypt
1651 resource: s3
1652 filters:
1653 - type: missing-policy-statement
1654 actions:
1655 - type: attach-encrypt
1656 role: arn:aws:iam::123456789012:role/my-role
1658 """
1659 schema = type_schema(
1660 'attach-encrypt',
1661 role={'type': 'string'},
1662 tags={'type': 'object'},
1663 topic={'type': 'string'})
1665 permissions = (
1666 "s3:PutBucketNotification", "s3:GetBucketNotification",
1667 # lambda manager uses quite a few perms to provision lambdas
1668 # and event sources, hard to disamgibuate punt for now.
1669 "lambda:*",
1670 )
1672 def __init__(self, data=None, manager=None):
1673 self.data = data or {}
1674 self.manager = manager
1676 def validate(self):
1677 if (not getattr(self.manager.config, 'dryrun', True) and
1678 not self.data.get('role', self.manager.config.assume_role)):
1679 raise PolicyValidationError(
1680 "attach-encrypt: role must be specified either "
1681 "via assume or in config on %s" % (self.manager.data,))
1683 return self
1685 def process(self, buckets):
1686 from c7n.mu import LambdaManager
1687 from c7n.ufuncs.s3crypt import get_function
1689 account_id = self.manager.config.account_id
1690 topic_arn = self.data.get('topic')
1692 func = get_function(
1693 None, self.data.get('role', self.manager.config.assume_role),
1694 account_id=account_id, tags=self.data.get('tags'))
1696 regions = {get_region(b) for b in buckets}
1698 # session managers by region
1699 region_sessions = {}
1700 for r in regions:
1701 region_sessions[r] = functools.partial(
1702 self.manager.session_factory, region=r)
1704 # Publish function to all of our buckets regions
1705 region_funcs = {}
1707 for r in regions:
1708 lambda_mgr = LambdaManager(region_sessions[r])
1709 lambda_mgr.publish(func)
1710 region_funcs[r] = func
1712 with self.executor_factory(max_workers=3) as w:
1713 results = []
1714 futures = []
1715 for b in buckets:
1716 region = get_region(b)
1717 futures.append(
1718 w.submit(
1719 self.process_bucket,
1720 region_funcs[region],
1721 b,
1722 topic_arn,
1723 account_id,
1724 region_sessions[region]
1725 ))
1726 for f in as_completed(futures):
1727 if f.exception():
1728 log.exception(
1729 "Error attaching lambda-encrypt %s" % (f.exception()))
1730 results.append(f.result())
1731 return list(filter(None, results))
1733 def process_bucket(self, func, bucket, topic, account_id, session_factory):
1734 from c7n.mu import BucketSNSNotification, BucketLambdaNotification
1735 if topic:
1736 topic = None if topic == 'default' else topic
1737 source = BucketSNSNotification(session_factory, bucket, topic)
1738 else:
1739 source = BucketLambdaNotification(
1740 {'account_s3': account_id}, session_factory, bucket)
1741 return source.add(func)
1744@actions.register('encryption-policy')
1745class EncryptionRequiredPolicy(BucketActionBase):
1746 """Action to apply an encryption policy to S3 buckets
1749 :example:
1751 .. code-block:: yaml
1753 policies:
1754 - name: s3-enforce-encryption
1755 resource: s3
1756 mode:
1757 type: cloudtrail
1758 events:
1759 - CreateBucket
1760 actions:
1761 - encryption-policy
1762 """
1764 permissions = ("s3:GetBucketPolicy", "s3:PutBucketPolicy")
1765 schema = type_schema('encryption-policy')
1767 def __init__(self, data=None, manager=None):
1768 self.data = data or {}
1769 self.manager = manager
1771 def process(self, buckets):
1772 with self.executor_factory(max_workers=3) as w:
1773 results = w.map(self.process_bucket, buckets)
1774 results = list(filter(None, list(results)))
1775 return results
1777 def process_bucket(self, b):
1778 p = b['Policy']
1779 if p is None:
1780 log.info("No policy found, creating new")
1781 p = {'Version': "2012-10-17", "Statement": []}
1782 else:
1783 p = json.loads(p)
1785 encryption_sid = "RequiredEncryptedPutObject"
1786 encryption_statement = {
1787 'Sid': encryption_sid,
1788 'Effect': 'Deny',
1789 'Principal': '*',
1790 'Action': 's3:PutObject',
1791 "Resource": "arn:aws:s3:::%s/*" % b['Name'],
1792 "Condition": {
1793 # AWS Managed Keys or KMS keys, note policy language
1794 # does not support custom kms (todo add issue)
1795 "StringNotEquals": {
1796 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}}
1798 statements = p.get('Statement', [])
1799 for s in list(statements):
1800 if s.get('Sid', '') == encryption_sid:
1801 log.debug("Bucket:%s Found extant encrypt policy", b['Name'])
1802 if s != encryption_statement:
1803 log.info(
1804 "Bucket:%s updating extant encrypt policy", b['Name'])
1805 statements.remove(s)
1806 else:
1807 return
1809 session = self.manager.session_factory()
1810 s3 = bucket_client(session, b)
1811 statements.append(encryption_statement)
1812 p['Statement'] = statements
1813 log.info('Bucket:%s attached encryption policy' % b['Name'])
1815 try:
1816 s3.put_bucket_policy(
1817 Bucket=b['Name'],
1818 Policy=json.dumps(p))
1819 except ClientError as e:
1820 if e.response['Error']['Code'] == 'NoSuchBucket':
1821 return
1822 self.log.exception(
1823 "Error on bucket:%s putting policy\n%s error:%s",
1824 b['Name'],
1825 json.dumps(statements, indent=2), e)
1826 raise
1827 return {'Name': b['Name'], 'State': 'PolicyAttached'}
1830class BucketScanLog:
1831 """Offload remediated key ids to a disk file in batches
1833 A bucket keyspace is effectively infinite, we need to store partial
1834 results out of memory, this class provides for a json log on disk
1835 with partial write support.
1837 json output format:
1838 - [list_of_serialized_keys],
1839 - [] # Empty list of keys at end when we close the buffer
1841 """
1843 def __init__(self, log_dir, name):
1844 self.log_dir = log_dir
1845 self.name = name
1846 self.fh = None
1847 self.count = 0
1849 @property
1850 def path(self):
1851 return os.path.join(self.log_dir, "%s.json" % self.name)
1853 def __enter__(self):
1854 # Don't require output directories
1855 if self.log_dir is None:
1856 return
1858 self.fh = open(self.path, 'w')
1859 self.fh.write("[\n")
1860 return self
1862 def __exit__(self, exc_type=None, exc_value=None, exc_frame=None):
1863 if self.fh is None:
1864 return
1865 # we need an empty marker list at end to avoid trailing commas
1866 self.fh.write("[]")
1867 # and close the surrounding list
1868 self.fh.write("\n]")
1869 self.fh.close()
1870 if not self.count:
1871 os.remove(self.fh.name)
1872 self.fh = None
1873 return False
1875 def add(self, keys):
1876 self.count += len(keys)
1877 if self.fh is None:
1878 return
1879 self.fh.write(dumps(keys))
1880 self.fh.write(",\n")
1883class ScanBucket(BucketActionBase):
1885 permissions = ("s3:ListBucket",)
1887 bucket_ops = {
1888 'standard': {
1889 'iterator': 'list_objects',
1890 'contents_key': ['Contents'],
1891 'key_processor': 'process_key'
1892 },
1893 'versioned': {
1894 'iterator': 'list_object_versions',
1895 'contents_key': ['Versions'],
1896 'key_processor': 'process_version'
1897 }
1898 }
1900 def __init__(self, data, manager=None):
1901 super(ScanBucket, self).__init__(data, manager)
1902 self.denied_buckets = set()
1904 def get_bucket_style(self, b):
1905 return (
1906 b.get('Versioning', {'Status': ''}).get('Status') in (
1907 'Enabled', 'Suspended') and 'versioned' or 'standard')
1909 def get_bucket_op(self, b, op_name):
1910 bucket_style = self.get_bucket_style(b)
1911 op = self.bucket_ops[bucket_style][op_name]
1912 if op_name == 'key_processor':
1913 return getattr(self, op)
1914 return op
1916 def get_keys(self, b, key_set):
1917 content_keys = self.get_bucket_op(b, 'contents_key')
1918 keys = []
1919 for ck in content_keys:
1920 keys.extend(key_set.get(ck, []))
1921 return keys
1923 def process(self, buckets):
1924 results = self._process_with_futures(self.process_bucket, buckets)
1925 self.write_denied_buckets_file()
1926 return results
1928 def _process_with_futures(self, helper, buckets, max_workers=3):
1929 results = []
1930 with self.executor_factory(max_workers) as w:
1931 futures = {}
1932 for b in buckets:
1933 futures[w.submit(helper, b)] = b
1934 for f in as_completed(futures):
1935 if f.exception():
1936 b = futures[f]
1937 self.log.error(
1938 "Error on bucket:%s region:%s policy:%s error: %s",
1939 b['Name'], b.get('Location', 'unknown'),
1940 self.manager.data.get('name'), f.exception())
1941 self.denied_buckets.add(b['Name'])
1942 continue
1943 result = f.result()
1944 if result:
1945 results.append(result)
1946 return results
1948 def write_denied_buckets_file(self):
1949 if (self.denied_buckets and
1950 self.manager.ctx.log_dir and
1951 not isinstance(self.manager.ctx.output, NullBlobOutput)):
1952 with open(
1953 os.path.join(
1954 self.manager.ctx.log_dir, 'denied.json'), 'w') as fh:
1955 json.dump(list(self.denied_buckets), fh, indent=2)
1956 self.denied_buckets = set()
1958 def process_bucket(self, b):
1959 log.info(
1960 "Scanning bucket:%s visitor:%s style:%s" % (
1961 b['Name'], self.__class__.__name__, self.get_bucket_style(b)))
1963 s = self.manager.session_factory()
1964 s3 = bucket_client(s, b)
1966 # The bulk of _process_bucket function executes inline in
1967 # calling thread/worker context, neither paginator nor
1968 # bucketscan log should be used across worker boundary.
1969 p = s3.get_paginator(
1970 self.get_bucket_op(b, 'iterator')).paginate(Bucket=b['Name'])
1972 with BucketScanLog(self.manager.ctx.log_dir, b['Name']) as key_log:
1973 with self.executor_factory(max_workers=10) as w:
1974 try:
1975 return self._process_bucket(b, p, key_log, w)
1976 except ClientError as e:
1977 if e.response['Error']['Code'] == 'NoSuchBucket':
1978 log.warning(
1979 "Bucket:%s removed while scanning" % b['Name'])
1980 return
1981 if e.response['Error']['Code'] == 'AccessDenied':
1982 log.warning(
1983 "Access Denied Bucket:%s while scanning" % b['Name'])
1984 self.denied_buckets.add(b['Name'])
1985 return
1986 log.exception(
1987 "Error processing bucket:%s paginator:%s" % (
1988 b['Name'], p))
1990 __call__ = process_bucket
1992 def _process_bucket(self, b, p, key_log, w):
1993 count = 0
1995 for key_set in p:
1996 keys = self.get_keys(b, key_set)
1997 count += len(keys)
1998 futures = []
2000 for batch in chunks(keys, size=100):
2001 if not batch:
2002 continue
2003 futures.append(w.submit(self.process_chunk, batch, b))
2005 for f in as_completed(futures):
2006 if f.exception():
2007 log.exception("Exception Processing bucket:%s key batch %s" % (
2008 b['Name'], f.exception()))
2009 continue
2010 r = f.result()
2011 if r:
2012 key_log.add(r)
2014 # Log completion at info level, progress at debug level
2015 if key_set['IsTruncated']:
2016 log.debug('Scan progress bucket:%s keys:%d remediated:%d ...',
2017 b['Name'], count, key_log.count)
2018 else:
2019 log.info('Scan Complete bucket:%s keys:%d remediated:%d',
2020 b['Name'], count, key_log.count)
2022 b['KeyScanCount'] = count
2023 b['KeyRemediated'] = key_log.count
2024 return {
2025 'Bucket': b['Name'], 'Remediated': key_log.count, 'Count': count}
2027 def process_chunk(self, batch, bucket):
2028 raise NotImplementedError()
2030 def process_key(self, s3, key, bucket_name, info=None):
2031 raise NotImplementedError()
2033 def process_version(self, s3, bucket, key):
2034 raise NotImplementedError()
2037@actions.register('encrypt-keys')
2038class EncryptExtantKeys(ScanBucket):
2039 """Action to encrypt unencrypted S3 objects
2041 :example:
2043 .. code-block:: yaml
2045 policies:
2046 - name: s3-encrypt-objects
2047 resource: s3
2048 actions:
2049 - type: encrypt-keys
2050 crypto: aws:kms
2051 key-id: 9c3983be-c6cf-11e6-9d9d-cec0c932ce01
2052 """
2054 permissions = (
2055 "s3:GetObject",
2056 "s3:PutObject",
2057 "s3:DeleteObjectVersion",
2058 "s3:RestoreObject",
2059 ) + ScanBucket.permissions
2061 schema = {
2062 'type': 'object',
2063 'additionalProperties': False,
2064 'properties': {
2065 'type': {'enum': ['encrypt-keys']},
2066 'report-only': {'type': 'boolean'},
2067 'glacier': {'type': 'boolean'},
2068 'large': {'type': 'boolean'},
2069 'crypto': {'enum': ['AES256', 'aws:kms']},
2070 'key-id': {'type': 'string'}
2071 },
2072 'dependencies': {
2073 'key-id': {
2074 'properties': {
2075 'crypto': {'pattern': 'aws:kms'}
2076 },
2077 'required': ['crypto']
2078 }
2079 }
2080 }
2082 metrics = [
2083 ('Total Keys', {'Scope': 'Account'}),
2084 ('Unencrypted', {'Scope': 'Account'})]
2086 def __init__(self, data, manager=None):
2087 super(EncryptExtantKeys, self).__init__(data, manager)
2088 self.kms_id = self.data.get('key-id')
2090 def get_permissions(self):
2091 perms = ("s3:GetObject", "s3:GetObjectVersion")
2092 if self.data.get('report-only'):
2093 perms += ('s3:DeleteObject', 's3:DeleteObjectVersion',
2094 's3:PutObject',
2095 's3:AbortMultipartUpload',
2096 's3:ListBucket',
2097 's3:ListBucketVersions')
2098 return perms
2100 def process(self, buckets):
2102 t = time.time()
2103 results = super(EncryptExtantKeys, self).process(buckets)
2104 run_time = time.time() - t
2105 remediated_count = object_count = 0
2107 for r in results:
2108 object_count += r['Count']
2109 remediated_count += r['Remediated']
2110 self.manager.ctx.metrics.put_metric(
2111 "Unencrypted", r['Remediated'], "Count", Scope=r['Bucket'],
2112 buffer=True)
2114 self.manager.ctx.metrics.put_metric(
2115 "Unencrypted", remediated_count, "Count", Scope="Account",
2116 buffer=True
2117 )
2118 self.manager.ctx.metrics.put_metric(
2119 "Total Keys", object_count, "Count", Scope="Account",
2120 buffer=True
2121 )
2122 self.manager.ctx.metrics.flush()
2124 log.info(
2125 ("EncryptExtant Complete keys:%d "
2126 "remediated:%d rate:%0.2f/s time:%0.2fs"),
2127 object_count,
2128 remediated_count,
2129 float(object_count) / run_time if run_time else 0,
2130 run_time)
2131 return results
2133 def process_chunk(self, batch, bucket):
2134 crypto_method = self.data.get('crypto', 'AES256')
2135 s3 = bucket_client(
2136 local_session(self.manager.session_factory), bucket,
2137 kms=(crypto_method == 'aws:kms'))
2138 b = bucket['Name']
2139 results = []
2140 key_processor = self.get_bucket_op(bucket, 'key_processor')
2141 for key in batch:
2142 r = key_processor(s3, key, b)
2143 if r:
2144 results.append(r)
2145 return results
2147 def process_key(self, s3, key, bucket_name, info=None):
2148 k = key['Key']
2149 if info is None:
2150 info = s3.head_object(Bucket=bucket_name, Key=k)
2152 # If the data is already encrypted with AES256 and this request is also
2153 # for AES256 then we don't need to do anything
2154 if info.get('ServerSideEncryption') == 'AES256' and not self.kms_id:
2155 return False
2157 if info.get('ServerSideEncryption') == 'aws:kms':
2158 # If we're not looking for a specific key any key will do.
2159 if not self.kms_id:
2160 return False
2161 # If we're configured to use a specific key and the key matches
2162 # note this is not a strict equality match.
2163 if self.kms_id in info.get('SSEKMSKeyId', ''):
2164 return False
2166 if self.data.get('report-only'):
2167 return k
2169 storage_class = info.get('StorageClass', 'STANDARD')
2171 if storage_class == 'GLACIER':
2172 if not self.data.get('glacier'):
2173 return False
2174 if 'Restore' not in info:
2175 # This takes multiple hours, we let the next c7n
2176 # run take care of followups.
2177 s3.restore_object(
2178 Bucket=bucket_name,
2179 Key=k,
2180 RestoreRequest={'Days': 30})
2181 return False
2182 elif not restore_complete(info['Restore']):
2183 return False
2185 storage_class = 'STANDARD'
2187 crypto_method = self.data.get('crypto', 'AES256')
2188 key_id = self.data.get('key-id')
2189 # Note on copy we lose individual object acl grants
2190 params = {'Bucket': bucket_name,
2191 'Key': k,
2192 'CopySource': "/%s/%s" % (bucket_name, k),
2193 'MetadataDirective': 'COPY',
2194 'StorageClass': storage_class,
2195 'ServerSideEncryption': crypto_method}
2197 if key_id and crypto_method == 'aws:kms':
2198 params['SSEKMSKeyId'] = key_id
2200 if info['ContentLength'] > MAX_COPY_SIZE and self.data.get(
2201 'large', True):
2202 return self.process_large_file(s3, bucket_name, key, info, params)
2204 s3.copy_object(**params)
2205 return k
2207 def process_version(self, s3, key, bucket_name):
2208 info = s3.head_object(
2209 Bucket=bucket_name,
2210 Key=key['Key'],
2211 VersionId=key['VersionId'])
2213 if 'ServerSideEncryption' in info:
2214 return False
2216 if self.data.get('report-only'):
2217 return key['Key'], key['VersionId']
2219 if key['IsLatest']:
2220 r = self.process_key(s3, key, bucket_name, info)
2221 # Glacier request processing, wait till we have the restored object
2222 if not r:
2223 return r
2224 s3.delete_object(
2225 Bucket=bucket_name,
2226 Key=key['Key'],
2227 VersionId=key['VersionId'])
2228 return key['Key'], key['VersionId']
2230 def process_large_file(self, s3, bucket_name, key, info, params):
2231 """For objects over 5gb, use multipart upload to copy"""
2232 part_size = MAX_COPY_SIZE - (1024 ** 2)
2233 num_parts = int(math.ceil(info['ContentLength'] / part_size))
2234 source = params.pop('CopySource')
2236 params.pop('MetadataDirective')
2237 if 'Metadata' in info:
2238 params['Metadata'] = info['Metadata']
2240 upload_id = s3.create_multipart_upload(**params)['UploadId']
2242 params = {'Bucket': bucket_name,
2243 'Key': key['Key'],
2244 'UploadId': upload_id,
2245 'CopySource': source,
2246 'CopySourceIfMatch': info['ETag']}
2248 def upload_part(part_num):
2249 part_params = dict(params)
2250 part_params['CopySourceRange'] = "bytes=%d-%d" % (
2251 part_size * (part_num - 1),
2252 min(part_size * part_num - 1, info['ContentLength'] - 1))
2253 part_params['PartNumber'] = part_num
2254 response = s3.upload_part_copy(**part_params)
2255 return {'ETag': response['CopyPartResult']['ETag'],
2256 'PartNumber': part_num}
2258 try:
2259 with self.executor_factory(max_workers=2) as w:
2260 parts = list(w.map(upload_part, range(1, num_parts + 1)))
2261 except Exception:
2262 log.warning(
2263 "Error during large key copy bucket: %s key: %s, "
2264 "aborting upload", bucket_name, key, exc_info=True)
2265 s3.abort_multipart_upload(
2266 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id)
2267 raise
2268 s3.complete_multipart_upload(
2269 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id,
2270 MultipartUpload={'Parts': parts})
2271 return key['Key']
2274def restore_complete(restore):
2275 if ',' in restore:
2276 ongoing, avail = restore.split(',', 1)
2277 else:
2278 ongoing = restore
2279 return 'false' in ongoing
2282@filters.register('is-log-target')
2283class LogTarget(Filter):
2284 """Filter and return buckets are log destinations.
2286 Not suitable for use in lambda on large accounts, This is a api
2287 heavy process to detect scan all possible log sources.
2289 Sources:
2290 - elb (Access Log)
2291 - s3 (Access Log)
2292 - cfn (Template writes)
2293 - cloudtrail
2295 :example:
2297 .. code-block:: yaml
2299 policies:
2300 - name: s3-log-bucket
2301 resource: s3
2302 filters:
2303 - type: is-log-target
2304 """
2306 schema = type_schema(
2307 'is-log-target',
2308 services={'type': 'array', 'items': {'enum': [
2309 's3', 'elb', 'cloudtrail']}},
2310 self={'type': 'boolean'},
2311 value={'type': 'boolean'})
2313 def get_permissions(self):
2314 perms = self.manager.get_resource_manager('elb').get_permissions()
2315 perms += ('elasticloadbalancing:DescribeLoadBalancerAttributes',)
2316 return perms
2318 def process(self, buckets, event=None):
2319 log_buckets = set()
2320 count = 0
2322 services = self.data.get('services', ['elb', 's3', 'cloudtrail'])
2323 self_log = self.data.get('self', False)
2325 if 'elb' in services and not self_log:
2326 for bucket, _ in self.get_elb_bucket_locations():
2327 log_buckets.add(bucket)
2328 count += 1
2329 self.log.debug("Found %d elb log targets" % count)
2331 if 's3' in services:
2332 count = 0
2333 for bucket, _ in self.get_s3_bucket_locations(buckets, self_log):
2334 count += 1
2335 log_buckets.add(bucket)
2336 self.log.debug('Found %d s3 log targets' % count)
2338 if 'cloudtrail' in services and not self_log:
2339 for bucket, _ in self.get_cloud_trail_locations(buckets):
2340 log_buckets.add(bucket)
2342 self.log.info("Found %d log targets for %d buckets" % (
2343 len(log_buckets), len(buckets)))
2344 if self.data.get('value', True):
2345 return [b for b in buckets if b['Name'] in log_buckets]
2346 else:
2347 return [b for b in buckets if b['Name'] not in log_buckets]
2349 @staticmethod
2350 def get_s3_bucket_locations(buckets, self_log=False):
2351 """return (bucket_name, prefix) for all s3 logging targets"""
2352 for b in buckets:
2353 if b.get('Logging'):
2354 if self_log:
2355 if b['Name'] != b['Logging']['TargetBucket']:
2356 continue
2357 yield (b['Logging']['TargetBucket'],
2358 b['Logging']['TargetPrefix'])
2359 if not self_log and b['Name'].startswith('cf-templates-'):
2360 yield (b['Name'], '')
2362 def get_cloud_trail_locations(self, buckets):
2363 session = local_session(self.manager.session_factory)
2364 client = session.client('cloudtrail')
2365 names = {b['Name'] for b in buckets}
2366 for t in client.describe_trails().get('trailList', ()):
2367 if t.get('S3BucketName') in names:
2368 yield (t['S3BucketName'], t.get('S3KeyPrefix', ''))
2370 def get_elb_bucket_locations(self):
2371 elbs = self.manager.get_resource_manager('elb').resources()
2372 get_elb_attrs = functools.partial(
2373 _query_elb_attrs, self.manager.session_factory)
2375 with self.executor_factory(max_workers=2) as w:
2376 futures = []
2377 for elb_set in chunks(elbs, 100):
2378 futures.append(w.submit(get_elb_attrs, elb_set))
2379 for f in as_completed(futures):
2380 if f.exception():
2381 log.error("Error while scanning elb log targets: %s" % (
2382 f.exception()))
2383 continue
2384 for tgt in f.result():
2385 yield tgt
2388def _query_elb_attrs(session_factory, elb_set):
2389 session = local_session(session_factory)
2390 client = session.client('elb')
2391 log_targets = []
2392 for e in elb_set:
2393 try:
2394 attrs = client.describe_load_balancer_attributes(
2395 LoadBalancerName=e['LoadBalancerName'])[
2396 'LoadBalancerAttributes']
2397 if 'AccessLog' in attrs and attrs['AccessLog']['Enabled']:
2398 log_targets.append((
2399 attrs['AccessLog']['S3BucketName'],
2400 attrs['AccessLog']['S3BucketPrefix']))
2401 except Exception as err:
2402 log.warning(
2403 "Could not retrieve load balancer %s: %s" % (
2404 e['LoadBalancerName'], err))
2405 return log_targets
2408@actions.register('remove-website-hosting')
2409class RemoveWebsiteHosting(BucketActionBase):
2410 """Action that removes website hosting configuration."""
2412 schema = type_schema('remove-website-hosting')
2414 permissions = ('s3:DeleteBucketWebsite',)
2416 def process(self, buckets):
2417 session = local_session(self.manager.session_factory)
2418 for bucket in buckets:
2419 client = bucket_client(session, bucket)
2420 client.delete_bucket_website(Bucket=bucket['Name'])
2423@actions.register('delete-global-grants')
2424class DeleteGlobalGrants(BucketActionBase):
2425 """Deletes global grants associated to a S3 bucket
2427 :example:
2429 .. code-block:: yaml
2431 policies:
2432 - name: s3-delete-global-grants
2433 resource: s3
2434 filters:
2435 - type: global-grants
2436 actions:
2437 - delete-global-grants
2438 """
2440 schema = type_schema(
2441 'delete-global-grants',
2442 grantees={'type': 'array', 'items': {'type': 'string'}})
2444 permissions = ('s3:PutBucketAcl',)
2446 def process(self, buckets):
2447 with self.executor_factory(max_workers=5) as w:
2448 return list(filter(None, list(w.map(self.process_bucket, buckets))))
2450 def process_bucket(self, b):
2451 grantees = self.data.get(
2452 'grantees', [
2453 GlobalGrantsFilter.AUTH_ALL, GlobalGrantsFilter.GLOBAL_ALL])
2455 log.info(b)
2457 acl = b.get('Acl', {'Grants': []})
2458 if not acl or not acl['Grants']:
2459 return
2460 new_grants = []
2461 for grant in acl['Grants']:
2462 grantee = grant.get('Grantee', {})
2463 if not grantee:
2464 continue
2465 # Yuck, 'get_bucket_acl' doesn't return the grantee type.
2466 if 'URI' in grantee:
2467 grantee['Type'] = 'Group'
2468 else:
2469 grantee['Type'] = 'CanonicalUser'
2470 if ('URI' in grantee and
2471 grantee['URI'] in grantees and not
2472 (grant['Permission'] == 'READ' and b['Website'])):
2473 # Remove this grantee.
2474 pass
2475 else:
2476 new_grants.append(grant)
2478 log.info({'Owner': acl['Owner'], 'Grants': new_grants})
2480 c = bucket_client(self.manager.session_factory(), b)
2481 try:
2482 c.put_bucket_acl(
2483 Bucket=b['Name'],
2484 AccessControlPolicy={
2485 'Owner': acl['Owner'], 'Grants': new_grants})
2486 except ClientError as e:
2487 if e.response['Error']['Code'] == 'NoSuchBucket':
2488 return
2489 return b
2492@actions.register('tag')
2493class BucketTag(Tag):
2494 """Action to create tags on a S3 bucket
2496 :example:
2498 .. code-block:: yaml
2500 policies:
2501 - name: s3-tag-region
2502 resource: s3
2503 region: us-east-1
2504 filters:
2505 - "tag:RegionName": absent
2506 actions:
2507 - type: tag
2508 key: RegionName
2509 value: us-east-1
2510 """
2512 def process_resource_set(self, client, resource_set, tags):
2513 modify_bucket_tags(self.manager.session_factory, resource_set, tags)
2516@actions.register('mark-for-op')
2517class MarkBucketForOp(TagDelayedAction):
2518 """Action schedules custodian to perform an action at a certain date
2520 :example:
2522 .. code-block:: yaml
2524 policies:
2525 - name: s3-encrypt
2526 resource: s3
2527 filters:
2528 - type: missing-statement
2529 statement_ids:
2530 - RequiredEncryptedPutObject
2531 actions:
2532 - type: mark-for-op
2533 op: attach-encrypt
2534 days: 7
2535 """
2537 schema = type_schema(
2538 'mark-for-op', rinherit=TagDelayedAction.schema)
2541@actions.register('unmark')
2542@actions.register('remove-tag')
2543class RemoveBucketTag(RemoveTag):
2544 """Removes tag/tags from a S3 object
2546 :example:
2548 .. code-block:: yaml
2550 policies:
2551 - name: s3-remove-owner-tag
2552 resource: s3
2553 filters:
2554 - "tag:BucketOwner": present
2555 actions:
2556 - type: remove-tag
2557 tags: ['BucketOwner']
2558 """
2560 def process_resource_set(self, client, resource_set, tags):
2561 modify_bucket_tags(
2562 self.manager.session_factory, resource_set, remove_tags=tags)
2565@filters.register('data-events')
2566class DataEvents(Filter):
2567 """Find buckets for which CloudTrail is logging data events.
2569 Note that this filter only examines trails that are defined in the
2570 current account.
2571 """
2573 schema = type_schema('data-events', state={'enum': ['present', 'absent']})
2574 permissions = (
2575 'cloudtrail:DescribeTrails',
2576 'cloudtrail:GetEventSelectors')
2578 def get_event_buckets(self, session, trails):
2579 """Return a mapping of bucket name to cloudtrail.
2581 For wildcard trails the bucket name is ''.
2582 """
2583 regions = {t.get('HomeRegion') for t in trails}
2584 clients = {}
2585 for region in regions:
2586 clients[region] = session.client('cloudtrail', region_name=region)
2588 event_buckets = {}
2589 for t in trails:
2590 for events in clients[t.get('HomeRegion')].get_event_selectors(
2591 TrailName=t['Name']).get('EventSelectors', ()):
2592 if 'DataResources' not in events:
2593 continue
2594 for data_events in events['DataResources']:
2595 if data_events['Type'] != 'AWS::S3::Object':
2596 continue
2597 for b in data_events['Values']:
2598 event_buckets[b.rsplit(':')[-1].strip('/')] = t['Name']
2599 return event_buckets
2601 def process(self, resources, event=None):
2602 trails = self.manager.get_resource_manager('cloudtrail').resources()
2603 local_trails = self.filter_resources(
2604 trails,
2605 "split(':', TrailARN)[4]", (self.manager.account_id,)
2606 )
2607 session = local_session(self.manager.session_factory)
2608 event_buckets = self.get_event_buckets(session, local_trails)
2609 ops = {
2610 'present': lambda x: (
2611 x['Name'] in event_buckets or '' in event_buckets),
2612 'absent': (
2613 lambda x: x['Name'] not in event_buckets and ''
2614 not in event_buckets)}
2616 op = ops[self.data.get('state', 'present')]
2617 results = []
2618 for b in resources:
2619 if op(b):
2620 results.append(b)
2621 return results
2624@filters.register('inventory')
2625class Inventory(ValueFilter):
2626 """Filter inventories for a bucket"""
2627 schema = type_schema('inventory', rinherit=ValueFilter.schema)
2628 schema_alias = False
2629 permissions = ('s3:GetInventoryConfiguration',)
2631 def process(self, buckets, event=None):
2632 results = []
2633 with self.executor_factory(max_workers=2) as w:
2634 futures = {}
2635 for b in buckets:
2636 futures[w.submit(self.process_bucket, b)] = b
2638 for f in as_completed(futures):
2639 b = futures[f]
2640 if f.exception():
2641 b.setdefault('c7n:DeniedMethods', []).append('GetInventoryConfiguration')
2642 self.log.error(
2643 "Error processing bucket: %s error: %s",
2644 b['Name'], f.exception())
2645 continue
2646 if f.result():
2647 results.append(b)
2648 return results
2650 def process_bucket(self, b):
2651 if 'c7n:inventories' not in b:
2652 client = bucket_client(local_session(self.manager.session_factory), b)
2653 inventories = client.list_bucket_inventory_configurations(
2654 Bucket=b['Name']).get('InventoryConfigurationList', [])
2655 b['c7n:inventories'] = inventories
2657 for i in b['c7n:inventories']:
2658 if self.match(i):
2659 return True
2662@actions.register('set-inventory')
2663class SetInventory(BucketActionBase):
2664 """Configure bucket inventories for an s3 bucket.
2665 """
2666 schema = type_schema(
2667 'set-inventory',
2668 required=['name', 'destination'],
2669 state={'enum': ['enabled', 'disabled', 'absent']},
2670 name={'type': 'string', 'description': 'Name of inventory'},
2671 destination={'type': 'string', 'description': 'Name of destination bucket'},
2672 prefix={'type': 'string', 'description': 'Destination prefix'},
2673 encryption={'enum': ['SSES3', 'SSEKMS']},
2674 key_id={'type': 'string', 'description': 'Optional Customer KMS KeyId for SSE-KMS'},
2675 versions={'enum': ['All', 'Current']},
2676 schedule={'enum': ['Daily', 'Weekly']},
2677 format={'enum': ['CSV', 'ORC', 'Parquet']},
2678 fields={'type': 'array', 'items': {'enum': [
2679 'Size', 'LastModifiedDate', 'StorageClass', 'ETag',
2680 'IsMultipartUploaded', 'ReplicationStatus', 'EncryptionStatus',
2681 'ObjectLockRetainUntilDate', 'ObjectLockMode', 'ObjectLockLegalHoldStatus',
2682 'IntelligentTieringAccessTier', 'BucketKeyStatus', 'ChecksumAlgorithm']}})
2684 permissions = ('s3:PutInventoryConfiguration', 's3:GetInventoryConfiguration')
2686 def process(self, buckets):
2687 with self.executor_factory(max_workers=2) as w:
2688 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
2689 for future in as_completed(futures):
2690 bucket = futures[future]
2691 try:
2692 future.result()
2693 except Exception as e:
2694 self.log.error('Message: %s Bucket: %s', e, bucket['Name'])
2696 def process_bucket(self, b):
2697 inventory_name = self.data.get('name')
2698 destination = self.data.get('destination')
2699 prefix = self.data.get('prefix', '')
2700 schedule = self.data.get('schedule', 'Daily')
2701 fields = self.data.get('fields', ['LastModifiedDate', 'Size'])
2702 versions = self.data.get('versions', 'Current')
2703 state = self.data.get('state', 'enabled')
2704 encryption = self.data.get('encryption')
2705 inventory_format = self.data.get('format', 'CSV')
2707 if not prefix:
2708 prefix = "Inventories/%s" % (self.manager.config.account_id)
2710 client = bucket_client(local_session(self.manager.session_factory), b)
2711 if state == 'absent':
2712 try:
2713 client.delete_bucket_inventory_configuration(
2714 Bucket=b['Name'], Id=inventory_name)
2715 except ClientError as e:
2716 if e.response['Error']['Code'] != 'NoSuchConfiguration':
2717 raise
2718 return
2720 bucket = {
2721 'Bucket': "arn:aws:s3:::%s" % destination,
2722 'Format': inventory_format
2723 }
2725 inventory = {
2726 'Destination': {
2727 'S3BucketDestination': bucket
2728 },
2729 'IsEnabled': state == 'enabled' and True or False,
2730 'Id': inventory_name,
2731 'OptionalFields': fields,
2732 'IncludedObjectVersions': versions,
2733 'Schedule': {
2734 'Frequency': schedule
2735 }
2736 }
2738 if prefix:
2739 bucket['Prefix'] = prefix
2741 if encryption:
2742 bucket['Encryption'] = {encryption: {}}
2743 if encryption == 'SSEKMS' and self.data.get('key_id'):
2744 bucket['Encryption'] = {encryption: {
2745 'KeyId': self.data['key_id']
2746 }}
2748 found = self.get_inventory_delta(client, inventory, b)
2749 if found:
2750 return
2751 if found is False:
2752 self.log.debug("updating bucket:%s inventory configuration id:%s",
2753 b['Name'], inventory_name)
2754 client.put_bucket_inventory_configuration(
2755 Bucket=b['Name'], Id=inventory_name, InventoryConfiguration=inventory)
2757 def get_inventory_delta(self, client, inventory, b):
2758 inventories = client.list_bucket_inventory_configurations(Bucket=b['Name'])
2759 found = None
2760 for i in inventories.get('InventoryConfigurationList', []):
2761 if i['Id'] != inventory['Id']:
2762 continue
2763 found = True
2764 for k, v in inventory.items():
2765 if k not in i:
2766 found = False
2767 continue
2768 if isinstance(v, list):
2769 v.sort()
2770 i[k].sort()
2771 if i[k] != v:
2772 found = False
2773 return found
2776@filters.register('intelligent-tiering')
2777class IntelligentTiering(ListItemFilter):
2778 """Filter for S3 buckets to look at intelligent tiering configurations
2780 The schema to supply to the attrs follows the schema here:
2781 https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_bucket_intelligent_tiering_configurations.html
2783 :example:
2785 .. code-block:: yaml
2787 policies:
2788 - name: s3-intelligent-tiering-configuration
2789 resource: s3
2790 filters:
2791 - type: intelligent-tiering
2792 attrs:
2793 - Status: Enabled
2794 - Filter:
2795 And:
2796 Prefix: test
2797 Tags:
2798 - Key: Owner
2799 Value: c7n
2800 - Tierings:
2801 - Days: 100
2802 - AccessTier: ARCHIVE_ACCESS
2804 """
2805 schema = type_schema(
2806 'intelligent-tiering',
2807 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'},
2808 count={'type': 'number'},
2809 count_op={'$ref': '#/definitions/filters_common/comparison_operators'}
2810 )
2811 permissions = ('s3:GetIntelligentTieringConfiguration',)
2812 annotation_key = "c7n:IntelligentTiering"
2813 annotate_items = True
2815 def __init__(self, data, manager=None):
2816 super().__init__(data, manager)
2817 self.data['key'] = self.annotation_key
2819 def process(self, buckets, event=None):
2820 with self.executor_factory(max_workers=2) as w:
2821 futures = {w.submit(self.get_item_values, b): b for b in buckets}
2822 for future in as_completed(futures):
2823 b = futures[future]
2824 if future.exception():
2825 self.log.error("Message: %s Bucket: %s", future.exception(), b['Name'])
2826 continue
2827 return super().process(buckets, event)
2829 def get_item_values(self, b):
2830 if self.annotation_key not in b:
2831 client = bucket_client(local_session(self.manager.session_factory), b)
2832 try:
2833 int_tier_config = client.list_bucket_intelligent_tiering_configurations(
2834 Bucket=b['Name'])
2835 b[self.annotation_key] = int_tier_config.get(
2836 'IntelligentTieringConfigurationList', [])
2837 except ClientError as e:
2838 if e.response['Error']['Code'] == 'AccessDenied':
2839 method = 'list_bucket_intelligent_tiering_configurations'
2840 log.warning(
2841 "Bucket:%s unable to invoke method:%s error:%s ",
2842 b['Name'], method, e.response['Error']['Message'])
2843 b.setdefault('c7n:DeniedMethods', []).append(method)
2844 return b.get(self.annotation_key)
2847@actions.register('set-intelligent-tiering')
2848class ConfigureIntelligentTiering(BucketActionBase):
2849 """Action applies an intelligent tiering configuration to a S3 bucket
2851 The schema to supply to the configuration follows the schema here:
2852 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_bucket_intelligent_tiering_configuration.html
2854 To delete a configuration, supply Status=delete with the either the Id or Id: matched
2856 :example:
2858 .. code-block:: yaml
2860 policies:
2861 - name: s3-apply-intelligent-tiering-config
2862 resource: aws.s3
2863 filters:
2864 - not:
2865 - type: intelligent-tiering
2866 attrs:
2867 - Status: Enabled
2868 - Filter:
2869 And:
2870 Prefix: helloworld
2871 Tags:
2872 - Key: Hello
2873 Value: World
2874 - Tierings:
2875 - Days: 123
2876 AccessTier: ARCHIVE_ACCESS
2877 actions:
2878 - type: set-intelligent-tiering
2879 Id: c7n-default
2880 IntelligentTieringConfiguration:
2881 Id: c7n-default
2882 Status: Enabled
2883 Tierings:
2884 - Days: 149
2885 AccessTier: ARCHIVE_ACCESS
2887 - name: s3-delete-intelligent-tiering-configuration
2888 resource: aws.s3
2889 filters:
2890 - type: intelligent-tiering
2891 attrs:
2892 - Status: Enabled
2893 - Id: test-config
2894 actions:
2895 - type: set-intelligent-tiering
2896 Id: test-config
2897 State: delete
2899 - name: s3-delete-intelligent-tiering-matched-configs
2900 resource: aws.s3
2901 filters:
2902 - type: intelligent-tiering
2903 attrs:
2904 - Status: Enabled
2905 - Id: test-config
2906 actions:
2907 - type: set-intelligent-tiering
2908 Id: matched
2909 State: delete
2911 """
2913 annotation_key = 'c7n:ListItemMatches'
2914 shape = 'PutBucketIntelligentTieringConfigurationRequest'
2915 schema = {
2916 'type': 'object',
2917 'additionalProperties': False,
2918 'oneOf': [
2919 {'required': ['type', 'Id', 'IntelligentTieringConfiguration']},
2920 {'required': ['type', 'Id', 'State']}],
2921 'properties': {
2922 'type': {'enum': ['set-intelligent-tiering']},
2923 'Id': {'type': 'string'},
2924 # delete intelligent tier configurations via state: delete
2925 'State': {'type': 'string', 'enum': ['delete']},
2926 'IntelligentTieringConfiguration': {'type': 'object'}
2927 },
2928 }
2930 permissions = ('s3:PutIntelligentTieringConfiguration',)
2932 def validate(self):
2933 # You can have up to 1,000 S3 Intelligent-Tiering configurations per bucket.
2934 # Hence, always use it with a filter
2935 found = False
2936 for f in self.manager.iter_filters():
2937 if isinstance(f, IntelligentTiering):
2938 found = True
2939 break
2940 if not found:
2941 raise PolicyValidationError(
2942 '`set-intelligent-tiering` may only be used in '
2943 'conjunction with `intelligent-tiering` filter on %s' % (self.manager.data,))
2944 cfg = dict(self.data)
2945 if 'IntelligentTieringConfiguration' in cfg:
2946 cfg['Bucket'] = 'bucket'
2947 cfg.pop('type')
2948 return shape_validate(
2949 cfg, self.shape, self.manager.resource_type.service)
2951 def process(self, buckets):
2952 with self.executor_factory(max_workers=3) as w:
2953 futures = {}
2955 for b in buckets:
2956 futures[w.submit(self.process_bucket, b)] = b
2958 for future in as_completed(futures):
2959 if future.exception():
2960 bucket = futures[future]
2961 self.log.error(
2962 'error modifying bucket intelligent tiering configuration: %s\n%s',
2963 bucket['Name'], future.exception())
2964 continue
2966 def process_bucket(self, bucket):
2967 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
2969 if 'list_bucket_intelligent_tiering_configurations' in bucket.get(
2970 'c7n:DeniedMethods', []):
2971 log.warning("Access Denied Bucket:%s while reading intelligent tiering configurations"
2972 % bucket['Name'])
2973 return
2975 if self.data.get('Id') and self.data.get('IntelligentTieringConfiguration'):
2976 try:
2977 s3.put_bucket_intelligent_tiering_configuration(
2978 Bucket=bucket['Name'], Id=self.data.get(
2979 'Id'), IntelligentTieringConfiguration=self.data.get(
2980 'IntelligentTieringConfiguration'))
2981 except ClientError as e:
2982 if e.response['Error']['Code'] == 'AccessDenied':
2983 log.warning(
2984 "Access Denied Bucket:%s while applying intelligent tiering configuration"
2985 % bucket['Name'])
2986 if self.data.get('State'):
2987 if self.data.get('Id') == 'matched':
2988 for config in bucket.get(self.annotation_key):
2989 self.delete_intelligent_tiering_configurations(s3, config.get('Id'), bucket)
2990 else:
2991 self.delete_intelligent_tiering_configurations(s3, self.data.get('Id'), bucket)
2993 def delete_intelligent_tiering_configurations(self, s3_client, id, bucket):
2994 try:
2995 s3_client.delete_bucket_intelligent_tiering_configuration(Bucket=bucket['Name'], Id=id)
2996 except ClientError as e:
2997 if e.response['Error']['Code'] == 'AccessDenied':
2998 log.warning(
2999 "Access Denied Bucket:%s while deleting intelligent tiering configuration"
3000 % bucket['Name'])
3001 elif e.response['Error']['Code'] == 'NoSuchConfiguration':
3002 log.warning(
3003 "No such configuration found:%s while deleting intelligent tiering configuration"
3004 % bucket['Name'])
3007@actions.register('delete')
3008class DeleteBucket(ScanBucket):
3009 """Action deletes a S3 bucket
3011 :example:
3013 .. code-block:: yaml
3015 policies:
3016 - name: delete-unencrypted-buckets
3017 resource: s3
3018 filters:
3019 - type: missing-statement
3020 statement_ids:
3021 - RequiredEncryptedPutObject
3022 actions:
3023 - type: delete
3024 remove-contents: true
3025 """
3027 schema = type_schema('delete', **{'remove-contents': {'type': 'boolean'}})
3029 permissions = ('s3:*',)
3031 bucket_ops = {
3032 'standard': {
3033 'iterator': 'list_objects',
3034 'contents_key': ['Contents'],
3035 'key_processor': 'process_key'
3036 },
3037 'versioned': {
3038 'iterator': 'list_object_versions',
3039 'contents_key': ['Versions', 'DeleteMarkers'],
3040 'key_processor': 'process_version'
3041 }
3042 }
3044 def process_delete_enablement(self, b):
3045 """Prep a bucket for deletion.
3047 Clear out any pending multi-part uploads.
3049 Disable versioning on the bucket, so deletes don't
3050 generate fresh deletion markers.
3051 """
3052 client = bucket_client(
3053 local_session(self.manager.session_factory), b)
3055 # Stop replication so we can suspend versioning
3056 if b.get('Replication') is not None:
3057 client.delete_bucket_replication(Bucket=b['Name'])
3059 # Suspend versioning, so we don't get new delete markers
3060 # as we walk and delete versions
3061 if (self.get_bucket_style(b) == 'versioned' and b['Versioning']['Status'] == 'Enabled' and
3062 self.data.get('remove-contents', True)):
3063 client.put_bucket_versioning(
3064 Bucket=b['Name'],
3065 VersioningConfiguration={'Status': 'Suspended'})
3067 # Clear our multi-part uploads
3068 uploads = client.get_paginator('list_multipart_uploads')
3069 for p in uploads.paginate(Bucket=b['Name']):
3070 for u in p.get('Uploads', ()):
3071 client.abort_multipart_upload(
3072 Bucket=b['Name'],
3073 Key=u['Key'],
3074 UploadId=u['UploadId'])
3076 def process(self, buckets):
3077 # might be worth sanity checking all our permissions
3078 # on the bucket up front before disabling versioning/replication.
3079 if self.data.get('remove-contents', True):
3080 self._process_with_futures(self.process_delete_enablement, buckets)
3081 self.empty_buckets(buckets)
3083 results = self._process_with_futures(self.delete_bucket, buckets)
3084 self.write_denied_buckets_file()
3085 return results
3087 def delete_bucket(self, b):
3088 s3 = bucket_client(self.manager.session_factory(), b)
3089 try:
3090 self._run_api(s3.delete_bucket, Bucket=b['Name'])
3091 except ClientError as e:
3092 if e.response['Error']['Code'] == 'BucketNotEmpty':
3093 self.log.error(
3094 "Error while deleting bucket %s, bucket not empty" % (
3095 b['Name']))
3096 else:
3097 raise e
3099 def empty_buckets(self, buckets):
3100 t = time.time()
3101 results = super(DeleteBucket, self).process(buckets)
3102 run_time = time.time() - t
3103 object_count = 0
3105 for r in results:
3106 object_count += r['Count']
3107 self.manager.ctx.metrics.put_metric(
3108 "Total Keys", object_count, "Count", Scope=r['Bucket'],
3109 buffer=True)
3110 self.manager.ctx.metrics.put_metric(
3111 "Total Keys", object_count, "Count", Scope="Account", buffer=True)
3112 self.manager.ctx.metrics.flush()
3114 log.info(
3115 "EmptyBucket buckets:%d Complete keys:%d rate:%0.2f/s time:%0.2fs",
3116 len(buckets), object_count,
3117 float(object_count) / run_time if run_time else 0, run_time)
3118 return results
3120 def process_chunk(self, batch, bucket):
3121 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3122 objects = []
3123 for key in batch:
3124 obj = {'Key': key['Key']}
3125 if 'VersionId' in key:
3126 obj['VersionId'] = key['VersionId']
3127 objects.append(obj)
3128 results = s3.delete_objects(
3129 Bucket=bucket['Name'], Delete={'Objects': objects}).get('Deleted', ())
3130 if self.get_bucket_style(bucket) != 'versioned':
3131 return results
3134@actions.register('configure-lifecycle')
3135class Lifecycle(BucketActionBase):
3136 """Action applies a lifecycle policy to versioned S3 buckets
3138 The schema to supply to the rule follows the schema here:
3139 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_bucket_lifecycle_configuration
3141 To delete a lifecycle rule, supply Status=absent
3143 :example:
3145 .. code-block:: yaml
3147 policies:
3148 - name: s3-apply-lifecycle
3149 resource: s3
3150 actions:
3151 - type: configure-lifecycle
3152 rules:
3153 - ID: my-lifecycle-id
3154 Status: Enabled
3155 Prefix: foo/
3156 Transitions:
3157 - Days: 60
3158 StorageClass: GLACIER
3160 """
3162 schema = type_schema(
3163 'configure-lifecycle',
3164 **{
3165 'rules': {
3166 'type': 'array',
3167 'items': {
3168 'type': 'object',
3169 'required': ['ID', 'Status'],
3170 'additionalProperties': False,
3171 'properties': {
3172 'ID': {'type': 'string'},
3173 # c7n intercepts `absent`
3174 'Status': {'enum': ['Enabled', 'Disabled', 'absent']},
3175 'Prefix': {'type': 'string'},
3176 'Expiration': {
3177 'type': 'object',
3178 'additionalProperties': False,
3179 'properties': {
3180 'Date': {'type': 'string'}, # Date
3181 'Days': {'type': 'integer'},
3182 'ExpiredObjectDeleteMarker': {'type': 'boolean'},
3183 },
3184 },
3185 'Filter': {
3186 'type': 'object',
3187 'minProperties': 1,
3188 'maxProperties': 1,
3189 'additionalProperties': False,
3190 'properties': {
3191 'Prefix': {'type': 'string'},
3192 'ObjectSizeGreaterThan': {'type': 'integer'},
3193 'ObjectSizeLessThan': {'type': 'integer'},
3194 'Tag': {
3195 'type': 'object',
3196 'required': ['Key', 'Value'],
3197 'additionalProperties': False,
3198 'properties': {
3199 'Key': {'type': 'string'},
3200 'Value': {'type': 'string'},
3201 },
3202 },
3203 'And': {
3204 'type': 'object',
3205 'additionalProperties': False,
3206 'properties': {
3207 'Prefix': {'type': 'string'},
3208 'ObjectSizeGreaterThan': {'type': 'integer'},
3209 'ObjectSizeLessThan': {'type': 'integer'},
3210 'Tags': {
3211 'type': 'array',
3212 'items': {
3213 'type': 'object',
3214 'required': ['Key', 'Value'],
3215 'additionalProperties': False,
3216 'properties': {
3217 'Key': {'type': 'string'},
3218 'Value': {'type': 'string'},
3219 },
3220 },
3221 },
3222 },
3223 },
3224 },
3225 },
3226 'Transitions': {
3227 'type': 'array',
3228 'items': {
3229 'type': 'object',
3230 'additionalProperties': False,
3231 'properties': {
3232 'Date': {'type': 'string'}, # Date
3233 'Days': {'type': 'integer'},
3234 'StorageClass': {'type': 'string'},
3235 },
3236 },
3237 },
3238 'NoncurrentVersionTransitions': {
3239 'type': 'array',
3240 'items': {
3241 'type': 'object',
3242 'additionalProperties': False,
3243 'properties': {
3244 'NoncurrentDays': {'type': 'integer'},
3245 'NewerNoncurrentVersions': {'type': 'integer'},
3246 'StorageClass': {'type': 'string'},
3247 },
3248 },
3249 },
3250 'NoncurrentVersionExpiration': {
3251 'type': 'object',
3252 'additionalProperties': False,
3253 'properties': {
3254 'NoncurrentDays': {'type': 'integer'},
3255 'NewerNoncurrentVersions': {'type': 'integer'}
3256 },
3257 },
3258 'AbortIncompleteMultipartUpload': {
3259 'type': 'object',
3260 'additionalProperties': False,
3261 'properties': {
3262 'DaysAfterInitiation': {'type': 'integer'},
3263 },
3264 },
3265 },
3266 },
3267 },
3268 }
3269 )
3271 permissions = ('s3:GetLifecycleConfiguration', 's3:PutLifecycleConfiguration')
3273 def process(self, buckets):
3274 with self.executor_factory(max_workers=3) as w:
3275 futures = {}
3276 results = []
3278 for b in buckets:
3279 futures[w.submit(self.process_bucket, b)] = b
3281 for future in as_completed(futures):
3282 if future.exception():
3283 bucket = futures[future]
3284 self.log.error('error modifying bucket lifecycle: %s\n%s',
3285 bucket['Name'], future.exception())
3286 results += filter(None, [future.result()])
3288 return results
3290 def process_bucket(self, bucket):
3291 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3293 if 'get_bucket_lifecycle_configuration' in bucket.get('c7n:DeniedMethods', []):
3294 log.warning("Access Denied Bucket:%s while reading lifecycle" % bucket['Name'])
3295 return
3297 # Adjust the existing lifecycle by adding/deleting/overwriting rules as necessary
3298 config = (bucket.get('Lifecycle') or {}).get('Rules', [])
3299 for rule in self.data['rules']:
3300 for index, existing_rule in enumerate(config):
3301 if not existing_rule:
3302 continue
3303 if rule['ID'] == existing_rule['ID']:
3304 if rule['Status'] == 'absent':
3305 config[index] = None
3306 else:
3307 config[index] = rule
3308 break
3309 else:
3310 if rule['Status'] != 'absent':
3311 config.append(rule)
3313 # The extra `list` conversion is required for python3
3314 config = list(filter(None, config))
3316 try:
3317 if not config:
3318 s3.delete_bucket_lifecycle(Bucket=bucket['Name'])
3319 else:
3320 s3.put_bucket_lifecycle_configuration(
3321 Bucket=bucket['Name'], LifecycleConfiguration={'Rules': config})
3322 except ClientError as e:
3323 if e.response['Error']['Code'] == 'AccessDenied':
3324 log.warning("Access Denied Bucket:%s while applying lifecycle" % bucket['Name'])
3325 else:
3326 raise e
3329class KMSKeyResolverMixin:
3330 """Builds a dictionary of region specific ARNs"""
3332 def __init__(self, data, manager=None):
3333 self.arns = dict()
3334 self.data = data
3335 self.manager = manager
3337 def resolve_keys(self, buckets):
3338 key = self.data.get('key')
3339 if not key:
3340 return None
3342 regions = {get_region(b) for b in buckets}
3343 for r in regions:
3344 client = local_session(self.manager.session_factory).client('kms', region_name=r)
3345 try:
3346 key_meta = client.describe_key(
3347 KeyId=key
3348 ).get('KeyMetadata', {})
3349 key_id = key_meta.get('KeyId')
3351 # We need a complete set of alias identifiers (names and ARNs)
3352 # to fully evaluate bucket encryption filters.
3353 key_aliases = client.list_aliases(
3354 KeyId=key_id
3355 ).get('Aliases', [])
3357 self.arns[r] = {
3358 'KeyId': key_id,
3359 'Arn': key_meta.get('Arn'),
3360 'KeyManager': key_meta.get('KeyManager'),
3361 'Description': key_meta.get('Description'),
3362 'Aliases': [
3363 alias[attr]
3364 for alias in key_aliases
3365 for attr in ('AliasArn', 'AliasName')
3366 ],
3367 }
3369 except ClientError as e:
3370 self.log.error('Error resolving kms ARNs for set-bucket-encryption: %s key: %s' % (
3371 e, self.data.get('key')))
3373 def get_key(self, bucket):
3374 if 'key' not in self.data:
3375 return None
3376 region = get_region(bucket)
3377 key = self.arns.get(region)
3378 if not key:
3379 self.log.warning('Unable to resolve key %s for bucket %s in region %s',
3380 self.data['key'], bucket.get('Name'), region)
3381 return key
3384@filters.register('bucket-encryption')
3385class BucketEncryption(KMSKeyResolverMixin, Filter):
3386 """Filters for S3 buckets that have bucket-encryption
3388 :example
3390 .. code-block:: yaml
3392 policies:
3393 - name: s3-bucket-encryption-AES256
3394 resource: s3
3395 region: us-east-1
3396 filters:
3397 - type: bucket-encryption
3398 state: True
3399 crypto: AES256
3400 - name: s3-bucket-encryption-KMS
3401 resource: s3
3402 region: us-east-1
3403 filters:
3404 - type: bucket-encryption
3405 state: True
3406 crypto: aws:kms
3407 key: alias/some/alias/key
3408 - name: s3-bucket-encryption-off
3409 resource: s3
3410 region: us-east-1
3411 filters:
3412 - type: bucket-encryption
3413 state: False
3414 - name: s3-bucket-test-bucket-key-enabled
3415 resource: s3
3416 region: us-east-1
3417 filters:
3418 - type: bucket-encryption
3419 bucket_key_enabled: True
3420 """
3421 schema = type_schema('bucket-encryption',
3422 state={'type': 'boolean'},
3423 crypto={'type': 'string', 'enum': ['AES256', 'aws:kms']},
3424 key={'type': 'string'},
3425 bucket_key_enabled={'type': 'boolean'})
3427 permissions = ('s3:GetEncryptionConfiguration', 'kms:DescribeKey', 'kms:ListAliases')
3428 annotation_key = 'c7n:bucket-encryption'
3430 def validate(self):
3431 if self.data.get('bucket_key_enabled') is not None and self.data.get('key') is not None:
3432 raise PolicyValidationError(
3433 f'key and bucket_key_enabled attributes cannot both be set: {self.data}'
3434 )
3436 def process(self, buckets, event=None):
3437 self.resolve_keys(buckets)
3438 results = []
3439 with self.executor_factory(max_workers=2) as w:
3440 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3441 for future in as_completed(futures):
3442 b = futures[future]
3443 if future.exception():
3444 self.log.error("Message: %s Bucket: %s", future.exception(),
3445 b['Name'])
3446 continue
3447 if future.result():
3448 results.append(b)
3449 return results
3451 def process_bucket(self, b):
3453 client = bucket_client(local_session(self.manager.session_factory), b)
3454 rules = []
3455 if self.annotation_key not in b:
3456 try:
3457 be = client.get_bucket_encryption(Bucket=b['Name'])
3458 be.pop('ResponseMetadata', None)
3459 except ClientError as e:
3460 if e.response['Error']['Code'] != 'ServerSideEncryptionConfigurationNotFoundError':
3461 raise
3462 be = {}
3463 b[self.annotation_key] = be
3464 else:
3465 be = b[self.annotation_key]
3467 rules = be.get('ServerSideEncryptionConfiguration', {}).get('Rules', [])
3468 # default `state` to True as previous impl assumed state == True
3469 # to preserve backwards compatibility
3470 if self.data.get('bucket_key_enabled'):
3471 for rule in rules:
3472 return self.filter_bucket_key_enabled(rule)
3473 elif self.data.get('bucket_key_enabled') is False:
3474 for rule in rules:
3475 return not self.filter_bucket_key_enabled(rule)
3477 if self.data.get('state', True):
3478 for sse in rules:
3479 return self.filter_bucket(b, sse)
3480 return False
3481 else:
3482 for sse in rules:
3483 return not self.filter_bucket(b, sse)
3484 return True
3486 def filter_bucket(self, b, sse):
3487 allowed = ['AES256', 'aws:kms']
3488 key = self.get_key(b)
3489 crypto = self.data.get('crypto')
3490 rule = sse.get('ApplyServerSideEncryptionByDefault')
3492 if not rule:
3493 return False
3494 algo = rule.get('SSEAlgorithm')
3496 if not crypto and algo in allowed:
3497 return True
3499 if crypto == 'AES256' and algo == 'AES256':
3500 return True
3501 elif crypto == 'aws:kms' and algo == 'aws:kms':
3502 if not key:
3503 # There are two broad reasons to have an empty value for
3504 # the regional key here:
3505 #
3506 # * The policy did not specify a key, in which case this
3507 # filter should match _all_ buckets with a KMS default
3508 # encryption rule.
3509 #
3510 # * The policy specified a key that could not be
3511 # resolved, in which case this filter shouldn't match
3512 # any buckets.
3513 return 'key' not in self.data
3515 # The default encryption rule can specify a key ID,
3516 # key ARN, alias name or alias ARN. Match against any of
3517 # those attributes. A rule specifying KMS with no master key
3518 # implies the AWS-managed key.
3519 key_ids = {key.get('Arn'), key.get('KeyId'), *key['Aliases']}
3520 return rule.get('KMSMasterKeyID', 'alias/aws/s3') in key_ids
3522 def filter_bucket_key_enabled(self, rule) -> bool:
3523 if not rule:
3524 return False
3525 return rule.get('BucketKeyEnabled')
3528@actions.register('set-bucket-encryption')
3529class SetBucketEncryption(KMSKeyResolverMixin, BucketActionBase):
3530 """Action enables default encryption on S3 buckets
3532 `enabled`: boolean Optional: Defaults to True
3534 `crypto`: aws:kms | AES256` Optional: Defaults to AES256
3536 `key`: arn, alias, or kms id key
3538 `bucket-key`: boolean Optional:
3539 Defaults to True.
3540 Reduces amount of API traffic from Amazon S3 to KMS and can reduce KMS request
3541 costsby up to 99 percent. Requires kms:Decrypt permissions for copy and upload
3542 on the AWS KMS Key Policy.
3544 Bucket Key Docs: https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucket-key.html
3546 :example:
3548 .. code-block:: yaml
3550 policies:
3551 - name: s3-enable-default-encryption-kms
3552 resource: s3
3553 actions:
3554 - type: set-bucket-encryption
3555 # enabled: true <------ optional (true by default)
3556 crypto: aws:kms
3557 key: 1234abcd-12ab-34cd-56ef-1234567890ab
3558 bucket-key: true
3560 - name: s3-enable-default-encryption-kms-alias
3561 resource: s3
3562 actions:
3563 - type: set-bucket-encryption
3564 # enabled: true <------ optional (true by default)
3565 crypto: aws:kms
3566 key: alias/some/alias/key
3567 bucket-key: true
3569 - name: s3-enable-default-encryption-aes256
3570 resource: s3
3571 actions:
3572 - type: set-bucket-encryption
3573 # bucket-key: true <--- optional (true by default for AWS SSE)
3574 # crypto: AES256 <----- optional (AES256 by default)
3575 # enabled: true <------ optional (true by default)
3577 - name: s3-disable-default-encryption
3578 resource: s3
3579 actions:
3580 - type: set-bucket-encryption
3581 enabled: false
3582 """
3584 schema = {
3585 'type': 'object',
3586 'additionalProperties': False,
3587 'properties': {
3588 'type': {'enum': ['set-bucket-encryption']},
3589 'enabled': {'type': 'boolean'},
3590 'crypto': {'enum': ['aws:kms', 'AES256']},
3591 'key': {'type': 'string'},
3592 'bucket-key': {'type': 'boolean'}
3593 },
3594 'dependencies': {
3595 'key': {
3596 'properties': {
3597 'crypto': {'pattern': 'aws:kms'}
3598 },
3599 'required': ['crypto']
3600 }
3601 }
3602 }
3604 permissions = ('s3:PutEncryptionConfiguration', 's3:GetEncryptionConfiguration',
3605 'kms:ListAliases', 'kms:DescribeKey')
3607 def process(self, buckets):
3608 if self.data.get('enabled', True):
3609 self.resolve_keys(buckets)
3611 with self.executor_factory(max_workers=3) as w:
3612 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3613 for future in as_completed(futures):
3614 if future.exception():
3615 self.log.error('Message: %s Bucket: %s', future.exception(),
3616 futures[future]['Name'])
3618 def process_bucket(self, bucket):
3619 default_key_desc = 'Default master key that protects my S3 objects when no other key is defined' # noqa
3620 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3621 if not self.data.get('enabled', True):
3622 s3.delete_bucket_encryption(Bucket=bucket['Name'])
3623 return
3624 algo = self.data.get('crypto', 'AES256')
3626 # bucket key defaults to True for alias/aws/s3 and AES256 (Amazon SSE)
3627 # and ignores False values for that crypto
3628 bucket_key = self.data.get('bucket-key', True)
3629 config = {
3630 'Rules': [
3631 {
3632 'ApplyServerSideEncryptionByDefault': {
3633 'SSEAlgorithm': algo,
3634 },
3635 'BucketKeyEnabled': bucket_key
3636 }
3637 ]
3638 }
3640 if algo == 'aws:kms':
3641 key = self.get_key(bucket)
3642 if not key:
3643 raise Exception('Valid KMS Key required but does not exist')
3645 config['Rules'][0]['ApplyServerSideEncryptionByDefault']['KMSMasterKeyID'] = key['Arn']
3646 s3.put_bucket_encryption(
3647 Bucket=bucket['Name'],
3648 ServerSideEncryptionConfiguration=config
3649 )
3652OWNERSHIP_CONTROLS = ['BucketOwnerEnforced', 'BucketOwnerPreferred', 'ObjectWriter']
3653VALUE_FILTER_MAGIC_VALUES = ['absent', 'present', 'not-null', 'empty']
3656@filters.register('ownership')
3657class BucketOwnershipControls(BucketFilterBase, ValueFilter):
3658 """Filter for object ownership controls
3660 Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html
3662 :example
3664 Find buckets with ACLs disabled
3666 .. code-block:: yaml
3668 policies:
3669 - name: s3-bucket-acls-disabled
3670 resource: aws.s3
3671 region: us-east-1
3672 filters:
3673 - type: ownership
3674 value: BucketOwnerEnforced
3676 :example
3678 Find buckets with object ownership preferred or enforced
3680 .. code-block:: yaml
3682 policies:
3683 - name: s3-bucket-ownership-preferred
3684 resource: aws.s3
3685 region: us-east-1
3686 filters:
3687 - type: ownership
3688 op: in
3689 value:
3690 - BucketOwnerEnforced
3691 - BucketOwnerPreferred
3693 :example
3695 Find buckets with no object ownership controls
3697 .. code-block:: yaml
3699 policies:
3700 - name: s3-bucket-no-ownership-controls
3701 resource: aws.s3
3702 region: us-east-1
3703 filters:
3704 - type: ownership
3705 value: empty
3706 """
3707 schema = type_schema('ownership', rinherit=ValueFilter.schema, value={'oneOf': [
3708 {'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES},
3709 {'type': 'array', 'items': {
3710 'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}}]})
3711 permissions = ('s3:GetBucketOwnershipControls',)
3712 annotation_key = 'c7n:ownership'
3714 def __init__(self, data, manager=None):
3715 super(BucketOwnershipControls, self).__init__(data, manager)
3717 # Ownership controls appear as an array of rules. There can only be one
3718 # ObjectOwnership rule defined for a bucket, so we can automatically
3719 # match against that if it exists.
3720 self.data['key'] = f'("{self.annotation_key}".Rules[].ObjectOwnership)[0]'
3722 def process(self, buckets, event=None):
3723 with self.executor_factory(max_workers=2) as w:
3724 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3725 for future in as_completed(futures):
3726 b = futures[future]
3727 if future.exception():
3728 self.log.error("Message: %s Bucket: %s", future.exception(),
3729 b['Name'])
3730 continue
3731 return super(BucketOwnershipControls, self).process(buckets, event)
3733 def process_bucket(self, b):
3734 if self.annotation_key in b:
3735 return
3736 client = bucket_client(local_session(self.manager.session_factory), b)
3737 try:
3738 controls = client.get_bucket_ownership_controls(Bucket=b['Name'])
3739 controls.pop('ResponseMetadata', None)
3740 except ClientError as e:
3741 if e.response['Error']['Code'] != 'OwnershipControlsNotFoundError':
3742 raise
3743 controls = {}
3744 b[self.annotation_key] = controls.get('OwnershipControls')
3747@filters.register('bucket-replication')
3748class BucketReplication(ListItemFilter):
3749 """Filter for S3 buckets to look at bucket replication configurations
3751 The schema to supply to the attrs follows the schema here:
3752 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_bucket_replication.html
3754 :example:
3756 .. code-block:: yaml
3758 policies:
3759 - name: s3-bucket-replication
3760 resource: s3
3761 filters:
3762 - type: bucket-replication
3763 attrs:
3764 - Status: Enabled
3765 - Filter:
3766 And:
3767 Prefix: test
3768 Tags:
3769 - Key: Owner
3770 Value: c7n
3771 - ExistingObjectReplication: Enabled
3773 """
3774 schema = type_schema(
3775 'bucket-replication',
3776 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'},
3777 count={'type': 'number'},
3778 count_op={'$ref': '#/definitions/filters_common/comparison_operators'}
3779 )
3781 permissions = ("s3:GetReplicationConfiguration",)
3782 annotation_key = 'Replication'
3783 annotate_items = True
3785 def __init__(self, data, manager=None):
3786 super().__init__(data, manager)
3787 self.data['key'] = self.annotation_key
3789 def get_item_values(self, b):
3790 client = bucket_client(local_session(self.manager.session_factory), b)
3791 # replication configuration is called in S3_AUGMENT_TABLE:
3792 bucket_replication = b[self.annotation_key]
3794 rules = []
3795 if bucket_replication is not None:
3796 rules = bucket_replication.get('ReplicationConfiguration', {}).get('Rules', [])
3797 for replication in rules:
3798 self.augment_bucket_replication(b, replication, client)
3800 return rules
3802 def augment_bucket_replication(self, b, replication, client):
3803 destination_bucket = replication.get('Destination').get('Bucket').split(':')[5]
3804 destination_region = inspect_bucket_region(destination_bucket, client.meta.endpoint_url)
3805 source_region = get_region(b)
3806 replication['DestinationRegion'] = destination_region
3807 replication['CrossRegion'] = destination_region != source_region