1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""S3 Resource Manager
4
5Filters:
6
7The generic Values filters (jmespath) expression and Or filter are
8available with all resources, including buckets, we include several
9additonal bucket data (Tags, Replication, Acl, Policy) as keys within
10a bucket representation.
11
12Actions:
13
14 encrypt-keys
15
16 Scan all keys in a bucket and optionally encrypt them in place.
17
18 global-grants
19
20 Check bucket acls for global grants
21
22 encryption-policy
23
24 Attach an encryption required policy to a bucket, this will break
25 applications that are not using encryption, including aws log
26 delivery.
27
28"""
29import copy
30import functools
31import json
32import logging
33import math
34import os
35import time
36import threading
37import ssl
38
39from botocore.client import Config
40from botocore.exceptions import ClientError
41
42from collections import defaultdict
43from concurrent.futures import as_completed
44
45try:
46 from urllib3.exceptions import SSLError
47except ImportError:
48 from botocore.vendored.requests.packages.urllib3.exceptions import SSLError
49
50
51from c7n.actions import (
52 ActionRegistry, BaseAction, PutMetric, RemovePolicyBase)
53from c7n.exceptions import PolicyValidationError, PolicyExecutionError
54from c7n.filters import (
55 FilterRegistry, Filter, CrossAccountAccessFilter, MetricsFilter,
56 ValueFilter, ListItemFilter)
57from .aws import shape_validate
58from c7n.filters.policystatement import HasStatementFilter
59from c7n.manager import resources
60from c7n.output import NullBlobOutput
61from c7n import query
62from c7n.resources.securityhub import PostFinding
63from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction
64from c7n.utils import (
65 chunks, local_session, set_annotation, type_schema, filter_empty,
66 dumps, format_string_values, get_account_alias_from_sts)
67from c7n.resources.aws import inspect_bucket_region
68
69
70log = logging.getLogger('custodian.s3')
71
72filters = FilterRegistry('s3.filters')
73actions = ActionRegistry('s3.actions')
74filters.register('marked-for-op', TagActionFilter)
75actions.register('put-metric', PutMetric)
76
77MAX_COPY_SIZE = 1024 * 1024 * 1024 * 2
78
79
80class DescribeS3(query.DescribeSource):
81
82 def augment(self, buckets):
83 assembler = BucketAssembly(self.manager)
84 assembler.initialize()
85
86 with self.manager.executor_factory(
87 max_workers=min((10, len(buckets) + 1))) as w:
88 results = w.map(assembler.assemble, buckets)
89 results = list(filter(None, results))
90 return results
91
92
93class ConfigS3(query.ConfigSource):
94
95 # normalize config's janky idiosyncratic bespoke formating to the
96 # standard describe api responses.
97
98 def get_query_params(self, query):
99 q = super(ConfigS3, self).get_query_params(query)
100 if 'expr' in q:
101 q['expr'] = q['expr'].replace('select ', 'select awsRegion, ')
102 return q
103
104 def load_resource(self, item):
105 resource = super(ConfigS3, self).load_resource(item)
106 cfg = item['supplementaryConfiguration']
107 # aka standard
108 if 'awsRegion' in item and item['awsRegion'] != 'us-east-1':
109 resource['Location'] = {'LocationConstraint': item['awsRegion']}
110 else:
111 resource['Location'] = {}
112
113 # owner is under acl per describe
114 resource.pop('Owner', None)
115
116 for k, null_value in S3_CONFIG_SUPPLEMENT_NULL_MAP.items():
117 if k not in cfg:
118 continue
119 if cfg.get(k) == null_value:
120 continue
121 method = getattr(self, "handle_%s" % k, None)
122 if method is None:
123 raise ValueError("unhandled supplementary config %s", k)
124 continue
125 v = cfg[k]
126 if isinstance(cfg[k], str):
127 v = json.loads(cfg[k])
128 method(resource, v)
129
130 for el in S3_AUGMENT_TABLE:
131 if el[1] not in resource:
132 resource[el[1]] = el[2]
133 return resource
134
135 PERMISSION_MAP = {
136 'FullControl': 'FULL_CONTROL',
137 'Write': 'WRITE',
138 'WriteAcp': 'WRITE_ACP',
139 'Read': 'READ',
140 'ReadAcp': 'READ_ACP'}
141
142 GRANTEE_MAP = {
143 'AllUsers': "http://acs.amazonaws.com/groups/global/AllUsers",
144 'AuthenticatedUsers': "http://acs.amazonaws.com/groups/global/AuthenticatedUsers",
145 'LogDelivery': 'http://acs.amazonaws.com/groups/s3/LogDelivery'}
146
147 def handle_AccessControlList(self, resource, item_value):
148 # double serialized in config for some reason
149 if isinstance(item_value, str):
150 item_value = json.loads(item_value)
151
152 resource['Acl'] = {}
153 resource['Acl']['Owner'] = {'ID': item_value['owner']['id']}
154 if item_value['owner']['displayName']:
155 resource['Acl']['Owner']['DisplayName'] = item_value[
156 'owner']['displayName']
157 resource['Acl']['Grants'] = grants = []
158
159 for g in (item_value.get('grantList') or ()):
160 if 'id' not in g['grantee']:
161 assert g['grantee'] in self.GRANTEE_MAP, "unknown grantee %s" % g
162 rg = {'Type': 'Group', 'URI': self.GRANTEE_MAP[g['grantee']]}
163 else:
164 rg = {'ID': g['grantee']['id'], 'Type': 'CanonicalUser'}
165
166 if 'displayName' in g:
167 rg['DisplayName'] = g['displayName']
168
169 grants.append({
170 'Permission': self.PERMISSION_MAP[g['permission']],
171 'Grantee': rg,
172 })
173
174 def handle_BucketAccelerateConfiguration(self, resource, item_value):
175 # not currently auto-augmented by custodian
176 return
177
178 def handle_BucketLoggingConfiguration(self, resource, item_value):
179 if ('destinationBucketName' not in item_value or
180 item_value['destinationBucketName'] is None):
181 resource[u'Logging'] = {}
182 return
183 resource[u'Logging'] = {
184 'TargetBucket': item_value['destinationBucketName'],
185 'TargetPrefix': item_value['logFilePrefix']}
186
187 def handle_BucketLifecycleConfiguration(self, resource, item_value):
188 rules = []
189 for r in item_value.get('rules'):
190 rr = {}
191 rules.append(rr)
192 expiry = {}
193 for ek, ck in (
194 ('Date', 'expirationDate'),
195 ('ExpiredObjectDeleteMarker', 'expiredObjectDeleteMarker'),
196 ('Days', 'expirationInDays')):
197 if ck in r and r[ck] and r[ck] != -1:
198 expiry[ek] = r[ck]
199 if expiry:
200 rr['Expiration'] = expiry
201
202 transitions = []
203 for t in (r.get('transitions') or ()):
204 tr = {}
205 for k in ('date', 'days', 'storageClass'):
206 if t.get(k):
207 tr["%s%s" % (k[0].upper(), k[1:])] = t[k]
208 transitions.append(tr)
209 if transitions:
210 rr['Transitions'] = transitions
211
212 if r.get('abortIncompleteMultipartUpload'):
213 rr['AbortIncompleteMultipartUpload'] = {
214 'DaysAfterInitiation': r[
215 'abortIncompleteMultipartUpload']['daysAfterInitiation']}
216 if r.get('noncurrentVersionExpirationInDays'):
217 rr['NoncurrentVersionExpiration'] = {
218 'NoncurrentDays': r['noncurrentVersionExpirationInDays']}
219
220 nonc_transitions = []
221 for t in (r.get('noncurrentVersionTransitions') or ()):
222 nonc_transitions.append({
223 'NoncurrentDays': t['days'],
224 'StorageClass': t['storageClass']})
225 if nonc_transitions:
226 rr['NoncurrentVersionTransitions'] = nonc_transitions
227
228 rr['Status'] = r['status']
229 rr['ID'] = r['id']
230 if r.get('prefix'):
231 rr['Prefix'] = r['prefix']
232 if 'filter' not in r or not r['filter']:
233 continue
234
235 if r['filter']['predicate']:
236 rr['Filter'] = self.convertLifePredicate(r['filter']['predicate'])
237
238 resource['Lifecycle'] = {'Rules': rules}
239
240 def convertLifePredicate(self, p):
241 if p['type'] == 'LifecyclePrefixPredicate':
242 return {'Prefix': p['prefix']}
243 if p['type'] == 'LifecycleTagPredicate':
244 return {'Tags': [{'Key': p['tag']['key'], 'Value': p['tag']['value']}]}
245 if p['type'] == 'LifecycleAndOperator':
246 n = {}
247 for o in p['operands']:
248 ot = self.convertLifePredicate(o)
249 if 'Tags' in n and 'Tags' in ot:
250 n['Tags'].extend(ot['Tags'])
251 else:
252 n.update(ot)
253 return {'And': n}
254
255 raise ValueError("unknown predicate: %s" % p)
256
257 NotifyTypeMap = {
258 'QueueConfiguration': 'QueueConfigurations',
259 'LambdaConfiguration': 'LambdaFunctionConfigurations',
260 'CloudFunctionConfiguration': 'LambdaFunctionConfigurations',
261 'TopicConfiguration': 'TopicConfigurations'}
262
263 def handle_BucketNotificationConfiguration(self, resource, item_value):
264 d = {}
265 for nid, n in item_value['configurations'].items():
266 ninfo = {}
267 d.setdefault(self.NotifyTypeMap[n['type']], []).append(ninfo)
268 if n['type'] == 'QueueConfiguration':
269 ninfo['QueueArn'] = n['queueARN']
270 elif n['type'] == 'TopicConfiguration':
271 ninfo['TopicArn'] = n['topicARN']
272 elif n['type'] == 'LambdaConfiguration':
273 ninfo['LambdaFunctionArn'] = n['functionARN']
274 ninfo['Id'] = nid
275 ninfo['Events'] = n['events']
276 rules = []
277 if n['filter']:
278 for r in n['filter'].get('s3KeyFilter', {}).get('filterRules', []):
279 rules.append({'Name': r['name'], 'Value': r['value']})
280 if rules:
281 ninfo['Filter'] = {'Key': {'FilterRules': rules}}
282 resource['Notification'] = d
283
284 def handle_BucketReplicationConfiguration(self, resource, item_value):
285 d = {'Role': item_value['roleARN'], 'Rules': []}
286 for rid, r in item_value['rules'].items():
287 rule = {
288 'ID': rid,
289 'Status': r.get('status', ''),
290 'Prefix': r.get('prefix', ''),
291 'Destination': {
292 'Bucket': r['destinationConfig']['bucketARN']}
293 }
294 if 'Account' in r['destinationConfig']:
295 rule['Destination']['Account'] = r['destinationConfig']['Account']
296 if r['destinationConfig'].get('storageClass'):
297 rule['Destination']['StorageClass'] = r['destinationConfig']['storageClass']
298 d['Rules'].append(rule)
299 resource['Replication'] = {'ReplicationConfiguration': d}
300
301 def handle_BucketPolicy(self, resource, item_value):
302 resource['Policy'] = item_value.get('policyText')
303
304 def handle_BucketTaggingConfiguration(self, resource, item_value):
305 resource['Tags'] = [
306 {"Key": k, "Value": v} for k, v in item_value['tagSets'][0]['tags'].items()]
307
308 def handle_BucketVersioningConfiguration(self, resource, item_value):
309 # Config defaults versioning to 'Off' for a null value
310 if item_value['status'] not in ('Enabled', 'Suspended'):
311 resource['Versioning'] = {}
312 return
313 resource['Versioning'] = {'Status': item_value['status']}
314 # `isMfaDeleteEnabled` is an optional boolean property - the key may be absent,
315 # present with a null value, or present with a boolean value.
316 # Mirror the describe source by populating Versioning.MFADelete only in the
317 # boolean case.
318 mfa_delete = item_value.get('isMfaDeleteEnabled')
319 if mfa_delete is None:
320 return
321 resource['Versioning']['MFADelete'] = (
322 'Enabled' if mfa_delete else 'Disabled'
323 )
324
325 def handle_BucketWebsiteConfiguration(self, resource, item_value):
326 website = {}
327 if item_value['indexDocumentSuffix']:
328 website['IndexDocument'] = {
329 'Suffix': item_value['indexDocumentSuffix']}
330 if item_value['errorDocument']:
331 website['ErrorDocument'] = {
332 'Key': item_value['errorDocument']}
333 if item_value['redirectAllRequestsTo']:
334 website['RedirectAllRequestsTo'] = {
335 'HostName': item_value['redirectAllRequestsTo']['hostName'],
336 'Protocol': item_value['redirectAllRequestsTo']['protocol']}
337 for r in item_value['routingRules']:
338 redirect = {}
339 rule = {'Redirect': redirect}
340 website.setdefault('RoutingRules', []).append(rule)
341 if 'condition' in r:
342 cond = {}
343 for ck, rk in (
344 ('keyPrefixEquals', 'KeyPrefixEquals'),
345 ('httpErrorCodeReturnedEquals',
346 'HttpErrorCodeReturnedEquals')):
347 if r['condition'][ck]:
348 cond[rk] = r['condition'][ck]
349 rule['Condition'] = cond
350 for ck, rk in (
351 ('protocol', 'Protocol'),
352 ('hostName', 'HostName'),
353 ('replaceKeyPrefixWith', 'ReplaceKeyPrefixWith'),
354 ('replaceKeyWith', 'ReplaceKeyWith'),
355 ('httpRedirectCode', 'HttpRedirectCode')):
356 if r['redirect'][ck]:
357 redirect[rk] = r['redirect'][ck]
358 resource['Website'] = website
359
360
361@resources.register('s3')
362class S3(query.QueryResourceManager):
363 """Amazon's Simple Storage Service Buckets.
364
365
366 By default and due to historical compatiblity cloud custodian will
367 fetch a number of subdocuments (Acl, Policy, Tagging, Versioning,
368 Website, Notification, Lifecycle, and Replication) for each bucket
369 to allow policies authors's to target common bucket
370 configurations.
371
372 This behavior can be customized to avoid extraneous api calls if a
373 particular sub document is not needed for a policy, by setting the
374 `augment-keys` parameter in a query block of the policy.
375
376 ie if we only care about bucket website and replication
377 configuration, we can minimize the api calls needed to fetch a
378 bucket by setting up augment-keys as follows.
379
380 :example:
381
382 .. code-block:: yaml
383
384 policies:
385 - name: check-website-replication
386 resource: s3
387 query:
388 - augment-keys: ['Website', 'Replication']
389 filters:
390 - Website.ErrorDocument: not-null
391 - Replication.ReplicationConfiguration.Rules: not-null
392
393 It also supports an automatic detection mode where the use of a subdocument
394 in a filter is automically with the augment-keys value of 'detect'.
395
396 :example:
397
398 .. code-block:: yaml
399
400 policies:
401 - name: check-website-replication
402 resource: s3
403 query:
404 - augment-keys: 'detect'
405 filters:
406 - Website.ErrorDocument: not-null
407 - Replication.ReplicationConfiguration.Rules: not-null
408
409 The default value for augment-keys is `all` to preserve historical
410 compatiblity. `augment-keys` also supports the value of 'none' to
411 disable all subdocument fetching except Location and Tags.
412
413 Note certain actions may implicitly depend on the corresponding
414 subdocument being present.
415
416 """
417
418 class resource_type(query.TypeInfo):
419 service = 's3'
420 arn_type = ''
421 enum_spec = ('list_buckets', 'Buckets[]', None)
422 # not used but we want some consistency on the metadata
423 detail_spec = ('get_bucket_location', 'Bucket', 'Name', 'LocationConstraint')
424 permissions_augment = (
425 "s3:GetBucketAcl",
426 "s3:GetBucketLocation",
427 "s3:GetBucketPolicy",
428 "s3:GetBucketTagging",
429 "s3:GetBucketVersioning",
430 "s3:GetBucketLogging",
431 "s3:GetBucketNotification",
432 "s3:GetBucketWebsite",
433 "s3:GetLifecycleConfiguration",
434 "s3:GetReplicationConfiguration"
435 )
436 name = id = 'Name'
437 date = 'CreationDate'
438 dimension = 'BucketName'
439 cfn_type = config_type = 'AWS::S3::Bucket'
440
441 filter_registry = filters
442 action_registry = actions
443 source_mapping = {
444 'describe': DescribeS3,
445 'config': ConfigS3
446 }
447
448 def validate(self):
449 super().validate()
450 BucketAssembly(self).validate()
451
452 def get_arns(self, resources):
453 return ["arn:aws:s3:::{}".format(r["Name"]) for r in resources]
454
455 @classmethod
456 def get_permissions(cls):
457 perms = ["s3:ListAllMyBuckets"]
458 perms.extend([n[-1] for n in S3_AUGMENT_TABLE])
459 return perms
460
461
462S3_CONFIG_SUPPLEMENT_NULL_MAP = {
463 'BucketLoggingConfiguration': u'{"destinationBucketName":null,"logFilePrefix":null}',
464 'BucketPolicy': u'{"policyText":null}',
465 'BucketVersioningConfiguration': u'{"status":"Off","isMfaDeleteEnabled":null}',
466 'BucketAccelerateConfiguration': u'{"status":null}',
467 'BucketNotificationConfiguration': u'{"configurations":{}}',
468 'BucketLifecycleConfiguration': None,
469 'AccessControlList': None,
470 'BucketTaggingConfiguration': None,
471 'BucketWebsiteConfiguration': None,
472 'BucketReplicationConfiguration': None
473}
474
475S3_AUGMENT_TABLE = (
476 ('get_bucket_location', 'Location', {}, None, 's3:GetBucketLocation'),
477 ('get_bucket_tagging', 'Tags', [], 'TagSet', 's3:GetBucketTagging'),
478 ('get_bucket_policy', 'Policy', None, 'Policy', 's3:GetBucketPolicy'),
479 ('get_bucket_acl', 'Acl', None, None, 's3:GetBucketAcl'),
480 ('get_bucket_replication',
481 'Replication', None, None, 's3:GetReplicationConfiguration'),
482 ('get_bucket_versioning', 'Versioning', None, None, 's3:GetBucketVersioning'),
483 ('get_bucket_website', 'Website', None, None, 's3:GetBucketWebsite'),
484 ('get_bucket_logging', 'Logging', None, 'LoggingEnabled', 's3:GetBucketLogging'),
485 ('get_bucket_notification_configuration',
486 'Notification', None, None, 's3:GetBucketNotification'),
487 ('get_bucket_lifecycle_configuration',
488 'Lifecycle', None, None, 's3:GetLifecycleConfiguration'),
489 # ('get_bucket_cors', 'Cors'),
490)
491
492
493class BucketAssembly:
494
495 def __init__(self, manager):
496 self.manager = manager
497 self.default_region = None
498 self.region_clients = {}
499 self.session = None
500 self.session_lock = None
501 self.augment_fields = []
502
503 def initialize(self):
504 # construct a default boto3 client, using the current session region.
505 self.session = local_session(self.manager.session_factory)
506 self.session_lock = threading.RLock()
507 self.default_region = self.manager.config.region
508 self.region_clients[self.default_region] = self.session.client('s3')
509 self.augment_fields = set(self.detect_augment_fields())
510 # location is required for client construction
511 self.augment_fields.add('Location')
512 # custodian always returns tags
513 self.augment_fields.add('Tags')
514
515 def validate(self):
516 config = self.get_augment_config()
517 if isinstance(config, str) and config not in ('all', 'detect', 'none'):
518 raise PolicyValidationError(
519 "augment-keys supports 'all', 'detect', 'none' or list of keys found: %s" % config)
520 elif isinstance(config, list):
521 delta = set(config).difference([row[1] for row in S3_AUGMENT_TABLE])
522 if delta:
523 raise PolicyValidationError("augment-keys - found invalid keys: %s" % (list(delta)))
524 if not isinstance(config, (list, str)):
525 raise PolicyValidationError(
526 "augment-keys supports 'all', 'detect', 'none' or list of keys found: %s" % config)
527
528 def get_augment_config(self):
529 augment_config = None
530 for option in self.manager.data.get('query', []):
531 if option and option.get('augment-keys') is not None:
532 augment_config = option['augment-keys']
533 if augment_config is None:
534 augment_config = 'all'
535 return augment_config
536
537 def detect_augment_fields(self):
538 # try to detect augment fields required for the policy execution
539 # we want to avoid extraneous api calls unless they are being used by the policy.
540
541 detected_keys = []
542 augment_keys = [row[1] for row in S3_AUGMENT_TABLE]
543 augment_config = self.get_augment_config()
544
545 if augment_config == 'all':
546 return augment_keys
547 elif augment_config == 'none':
548 return []
549 elif isinstance(augment_config, list):
550 return augment_config
551
552 for f in self.manager.iter_filters():
553 fkey = None
554 if not isinstance(f, ValueFilter):
555 continue
556
557 f = f.data
558 # type: value
559 if f.get('type', '') == 'value':
560 fkey = f.get('key')
561 # k: v dict
562 elif len(f) == 1:
563 fkey = list(f.keys())[0]
564 if fkey is None: # pragma: no cover
565 continue
566
567 # remove any jmespath expressions
568 fkey = fkey.split('.', 1)[0]
569
570 # tags have explicit handling in value filters.
571 if fkey.startswith('tag:'):
572 fkey = 'Tags'
573
574 # denied methods checks get all keys
575 if fkey.startswith('c7n:DeniedMethods'):
576 return augment_keys
577
578 if fkey in augment_keys:
579 detected_keys.append(fkey)
580
581 return detected_keys
582
583 def get_client(self, region):
584 if region in self.region_clients:
585 return self.region_clients[region]
586 with self.session_lock:
587 self.region_clients[region] = self.session.client('s3', region_name=region)
588 return self.region_clients[region]
589
590 def assemble(self, bucket):
591
592 client = self.get_client(self.default_region)
593 augments = list(S3_AUGMENT_TABLE)
594
595 for info in augments:
596 # we use the offset, as tests manipulate the augments table
597 method_name, key, default, select = info[:4]
598 if key not in self.augment_fields:
599 continue
600
601 method = getattr(client, method_name)
602
603 try:
604 response = method(Bucket=bucket['Name'])
605 # This is here as exception handling will change to defaults if not present
606 response.pop('ResponseMetadata', None)
607 value = response
608 if select and select in value:
609 value = value[select]
610 except (ssl.SSLError, SSLError) as e:
611 # Proxy issue most likely
612 log.warning(
613 "Bucket ssl error %s: %s %s",
614 bucket['Name'], bucket.get('Location', 'unknown'), e)
615 continue
616 except ClientError as e:
617 code = e.response['Error']['Code']
618 if code.startswith("NoSuch") or "NotFound" in code:
619 value = default
620 elif code == 'PermanentRedirect': # pragma: no cover
621 # (09/2025)- its not clear how we get here given a client region switch post
622 # location detection.
623 #
624 # change client region
625 client = self.get_client(get_region(bucket))
626 # requeue now that we have correct region
627 augments.append((method_name, key, default, select))
628 continue
629 else:
630 # for auth errors record as attribute and move on
631 if e.response['Error']['Code'] == 'AccessDenied':
632 bucket.setdefault('c7n:DeniedMethods', []).append(method_name)
633 continue
634 # else log and raise
635 log.warning(
636 "Bucket:%s unable to invoke method:%s error:%s ",
637 bucket['Name'], method_name, e.response['Error']['Message'])
638 raise
639
640 # for historical reasons we normalize EU to eu-west-1 on the bucket
641 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlocation.html
642 if key == 'Location' and value and value.get('LocationConstraint', '') == 'EU':
643 value['LocationConstraint'] = 'eu-west-1'
644
645 bucket[key] = value
646
647 # For all subsequent attributes after location, use a client that is targeted to
648 # the bucket's regional s3 endpoint.
649 if key == 'Location' and get_region(bucket) != client.meta.region_name:
650 client = self.get_client(get_region(bucket))
651 return bucket
652
653
654def bucket_client(session, b, kms=False):
655 region = get_region(b)
656
657 if kms:
658 # Need v4 signature for aws:kms crypto, else let the sdk decide
659 # based on region support.
660 config = Config(
661 signature_version='s3v4',
662 read_timeout=200, connect_timeout=120)
663 else:
664 config = Config(read_timeout=200, connect_timeout=120)
665 return session.client('s3', region_name=region, config=config)
666
667
668def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()):
669 for bucket in buckets:
670 client = bucket_client(local_session(session_factory), bucket)
671 # Bucket tags are set atomically for the set/document, we want
672 # to refetch against current to guard against any staleness in
673 # our cached representation across multiple policies or concurrent
674 # modifications.
675
676 if 'get_bucket_tagging' in bucket.get('c7n:DeniedMethods', []):
677 # avoid the additional API call if we already know that it's going
678 # to result in AccessDenied. The chances that the resource's perms
679 # would have changed between fetching the resource and acting on it
680 # here are pretty low-- so the check here should suffice.
681 log.warning(
682 "Unable to get new set of bucket tags needed to modify tags,"
683 "skipping tag action for bucket: %s" % bucket["Name"])
684 continue
685
686 try:
687 bucket['Tags'] = client.get_bucket_tagging(
688 Bucket=bucket['Name']).get('TagSet', [])
689 except ClientError as e:
690 if e.response['Error']['Code'] != 'NoSuchTagSet':
691 raise
692 bucket['Tags'] = []
693
694 new_tags = {t['Key']: t['Value'] for t in add_tags}
695 for t in bucket.get('Tags', ()):
696 if (t['Key'] not in new_tags and t['Key'] not in remove_tags):
697 new_tags[t['Key']] = t['Value']
698 tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()]
699
700 try:
701 client.put_bucket_tagging(
702 Bucket=bucket['Name'], Tagging={'TagSet': tag_set})
703 except ClientError as e:
704 log.exception(
705 'Exception tagging bucket %s: %s', bucket['Name'], e)
706 continue
707
708
709def get_region(b):
710 """Tries to get the bucket region from Location.LocationConstraint
711
712 Special cases:
713 LocationConstraint EU defaults to eu-west-1
714 LocationConstraint null defaults to us-east-1
715
716 Args:
717 b (object): A bucket object
718
719 Returns:
720 string: an aws region string
721 """
722 remap = {None: 'us-east-1', 'EU': 'eu-west-1'}
723 region = b.get('Location', {}).get('LocationConstraint')
724 return remap.get(region, region)
725
726
727@filters.register('metrics')
728class S3Metrics(MetricsFilter):
729 """S3 CW Metrics need special handling for attribute/dimension
730 mismatch, and additional required dimension.
731 """
732
733 def get_dimensions(self, resource):
734 dims = [{'Name': 'BucketName', 'Value': resource['Name']}]
735 if (self.data['name'] == 'NumberOfObjects' and
736 'dimensions' not in self.data):
737 dims.append(
738 {'Name': 'StorageType', 'Value': 'AllStorageTypes'})
739 return dims
740
741
742@filters.register('cross-account')
743class S3CrossAccountFilter(CrossAccountAccessFilter):
744 """Filters cross-account access to S3 buckets
745
746 :example:
747
748 .. code-block:: yaml
749
750 policies:
751 - name: s3-acl
752 resource: s3
753 region: us-east-1
754 filters:
755 - type: cross-account
756 """
757 permissions = ('s3:GetBucketPolicy',)
758
759 def get_accounts(self):
760 """add in elb access by default
761
762 ELB Accounts by region
763 https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-access-logs.html
764
765 Redshift Accounts by region
766 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#db-auditing-manage-log-files
767 https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html#rs-db-auditing-cloud-trail-rs-acct-ids
768
769 Cloudtrail Accounts by region
770 https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-supported-regions.html
771 """
772 accounts = super(S3CrossAccountFilter, self).get_accounts()
773 return accounts.union(
774 [
775 # ELB accounts
776 '127311923021', # us-east-1
777 '033677994240', # us-east-2
778 '027434742980', # us-west-1
779 '797873946194', # us-west-2
780 '098369216593', # af-south-1
781 '985666609251', # ca-central-1
782 '054676820928', # eu-central-1
783 '897822967062', # eu-north-1
784 '635631232127', # eu-south-1
785 '156460612806', # eu-west-1
786 '652711504416', # eu-west-2
787 '009996457667', # eu-west-3
788 '754344448648', # ap-east-1
789 '582318560864', # ap-northeast-1
790 '600734575887', # ap-northeast-2
791 '383597477331', # ap-northeast-3
792 '114774131450', # ap-southeast-1
793 '783225319266', # ap-southeast-2
794 '718504428378', # ap-south-1
795 '076674570225', # me-south-1
796 '507241528517', # sa-east-1
797 '048591011584', # us-gov-west-1 or gov-cloud-1
798 '190560391635', # us-gov-east-1
799 '638102146993', # cn-north-1
800 '037604701340', # cn-northwest-1
801
802 # Redshift audit logging
803 '193672423079', # us-east-1
804 '391106570357', # us-east-2
805 '262260360010', # us-west-1
806 '902366379725', # us-west-2
807 '365689465814', # af-south-1
808 '313564881002', # ap-east-1
809 '865932855811', # ap-south-1
810 '090321488786', # ap-northeast-3
811 '760740231472', # ap-northeast-2
812 '361669875840', # ap-southeast-1
813 '762762565011', # ap-southeast-2
814 '404641285394', # ap-northeast-1
815 '907379612154', # ca-central-1
816 '053454850223', # eu-central-1
817 '210876761215', # eu-west-1
818 '307160386991', # eu-west-2
819 '945612479654', # eu-south-1
820 '915173422425', # eu-west-3
821 '729911121831', # eu-north-1
822 '013126148197', # me-south-1
823 '075028567923', # sa-east-1
824
825 # Cloudtrail accounts (psa. folks should be using
826 # cloudtrail service in bucket policies)
827 '086441151436', # us-east-1
828 '475085895292', # us-west-2
829 '388731089494', # us-west-1
830 '113285607260', # us-west-2
831 '819402241893', # ca-central-1
832 '977081816279', # ap-south-1
833 '492519147666', # ap-northeast-2
834 '903692715234', # ap-southeast-1
835 '284668455005', # ap-southeast-2
836 '216624486486', # ap-northeast-1
837 '035351147821', # eu-central-1
838 '859597730677', # eu-west-1
839 '282025262664', # eu-west-2
840 '814480443879', # sa-east-1
841 ])
842
843
844@filters.register('global-grants')
845class GlobalGrantsFilter(Filter):
846 """Filters for all S3 buckets that have global-grants
847
848 *Note* by default this filter allows for read access
849 if the bucket has been configured as a website. This
850 can be disabled per the example below.
851
852 :example:
853
854 .. code-block:: yaml
855
856 policies:
857 - name: remove-global-grants
858 resource: s3
859 filters:
860 - type: global-grants
861 allow_website: false
862 actions:
863 - delete-global-grants
864
865 """
866
867 schema = type_schema(
868 'global-grants',
869 allow_website={'type': 'boolean'},
870 operator={'type': 'string', 'enum': ['or', 'and']},
871 permissions={
872 'type': 'array', 'items': {
873 'type': 'string', 'enum': [
874 'READ', 'WRITE', 'WRITE_ACP', 'READ_ACP', 'FULL_CONTROL']}})
875
876 GLOBAL_ALL = "http://acs.amazonaws.com/groups/global/AllUsers"
877 AUTH_ALL = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers"
878
879 def process(self, buckets, event=None):
880 with self.executor_factory(max_workers=5) as w:
881 results = w.map(self.process_bucket, buckets)
882 results = list(filter(None, list(results)))
883 return results
884
885 def process_bucket(self, b):
886 acl = b.get('Acl', {'Grants': []})
887 if not acl or not acl['Grants']:
888 return
889
890 results = []
891 allow_website = self.data.get('allow_website', True)
892 perms = self.data.get('permissions', [])
893
894 for grant in acl['Grants']:
895 if 'URI' not in grant.get("Grantee", {}):
896 continue
897 if grant['Grantee']['URI'] not in [self.AUTH_ALL, self.GLOBAL_ALL]:
898 continue
899 if allow_website and grant['Permission'] == 'READ' and b['Website']:
900 continue
901 if not perms or (perms and grant['Permission'] in perms):
902 results.append(grant['Permission'])
903
904 if results:
905 set_annotation(b, 'GlobalPermissions', results)
906 return b
907
908
909class BucketActionBase(BaseAction):
910
911 def get_permissions(self):
912 return self.permissions
913
914 def get_std_format_args(self, bucket):
915 return {
916 'account_id': self.manager.config.account_id,
917 'region': self.manager.config.region,
918 'bucket_name': bucket['Name'],
919 'bucket_region': get_region(bucket)
920 }
921
922 def process(self, buckets):
923 return self._process_with_futures(buckets)
924
925 def _process_with_futures(self, buckets, *args, max_workers=3, **kwargs):
926 errors = 0
927 results = []
928 with self.executor_factory(max_workers=max_workers) as w:
929 futures = {}
930 for b in buckets:
931 futures[w.submit(self.process_bucket, b, *args, **kwargs)] = b
932 for f in as_completed(futures):
933 if f.exception():
934 b = futures[f]
935 self.log.error(
936 'error modifying bucket: policy:%s action:%s bucket:%s error:%s',
937 self.manager.data.get('name'), self.name, b['Name'], f.exception()
938 )
939 errors += 1
940 continue
941 results += filter(None, [f.result()])
942 if errors:
943 self.log.error('encountered %d errors while processing %s', errors, self.name)
944 raise PolicyExecutionError('%d resources failed', errors)
945 return results
946
947
948class BucketFilterBase(Filter):
949 def get_std_format_args(self, bucket):
950 return {
951 'account_id': self.manager.config.account_id,
952 'region': self.manager.config.region,
953 'bucket_name': bucket['Name'],
954 'bucket_region': get_region(bucket)
955 }
956
957
958@S3.action_registry.register("post-finding")
959class BucketFinding(PostFinding):
960
961 resource_type = 'AwsS3Bucket'
962
963 def format_resource(self, r):
964 owner = r.get("Acl", {}).get("Owner", {})
965 resource = {
966 "Type": self.resource_type,
967 "Id": "arn:aws:s3:::{}".format(r["Name"]),
968 "Region": get_region(r),
969 "Tags": {t["Key"]: t["Value"] for t in r.get("Tags", [])},
970 "Details": {self.resource_type: {
971 "OwnerId": owner.get('ID', 'Unknown')}}
972 }
973
974 if "DisplayName" in owner:
975 resource["Details"]["AwsS3Bucket"]["OwnerName"] = owner['DisplayName']
976
977 return filter_empty(resource)
978
979
980@S3.filter_registry.register('has-statement')
981class S3HasStatementFilter(HasStatementFilter):
982 def get_std_format_args(self, bucket):
983 return {
984 'account_id': self.manager.config.account_id,
985 'region': self.manager.config.region,
986 'bucket_name': bucket['Name'],
987 'bucket_region': get_region(bucket)
988 }
989
990
991@S3.filter_registry.register('lock-configuration')
992class S3LockConfigurationFilter(ValueFilter):
993 """
994 Filter S3 buckets based on their object lock configurations
995
996 :example:
997
998 Get all buckets where lock configuration mode is COMPLIANCE
999
1000 .. code-block:: yaml
1001
1002 policies:
1003 - name: lock-configuration-compliance
1004 resource: aws.s3
1005 filters:
1006 - type: lock-configuration
1007 key: Rule.DefaultRetention.Mode
1008 value: COMPLIANCE
1009
1010 """
1011 schema = type_schema('lock-configuration', rinherit=ValueFilter.schema)
1012 permissions = ('s3:GetBucketObjectLockConfiguration',)
1013 annotate = True
1014 annotation_key = 'c7n:ObjectLockConfiguration'
1015
1016 def _process_resource(self, client, resource):
1017 try:
1018 config = client.get_object_lock_configuration(
1019 Bucket=resource['Name']
1020 )['ObjectLockConfiguration']
1021 except ClientError as e:
1022 if e.response['Error']['Code'] == 'ObjectLockConfigurationNotFoundError':
1023 config = None
1024 else:
1025 raise
1026 resource[self.annotation_key] = config
1027
1028 def process(self, resources, event=None):
1029 client = local_session(self.manager.session_factory).client('s3')
1030 with self.executor_factory(max_workers=3) as w:
1031 futures = []
1032 for res in resources:
1033 if self.annotation_key in res:
1034 continue
1035 futures.append(w.submit(self._process_resource, client, res))
1036 for f in as_completed(futures):
1037 exc = f.exception()
1038 if exc:
1039 self.log.error(
1040 "Exception getting bucket lock configuration \n %s" % (
1041 exc))
1042 return super().process(resources, event)
1043
1044 def __call__(self, r):
1045 return super().__call__(r.setdefault(self.annotation_key, None))
1046
1047
1048ENCRYPTION_STATEMENT_GLOB = {
1049 'Effect': 'Deny',
1050 'Principal': '*',
1051 'Action': 's3:PutObject',
1052 "Condition": {
1053 "StringNotEquals": {
1054 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}}
1055
1056
1057@filters.register('no-encryption-statement')
1058class EncryptionEnabledFilter(Filter):
1059 """Find buckets with missing encryption policy statements.
1060
1061 :example:
1062
1063 .. code-block:: yaml
1064
1065 policies:
1066 - name: s3-bucket-not-encrypted
1067 resource: s3
1068 filters:
1069 - type: no-encryption-statement
1070 """
1071 schema = type_schema(
1072 'no-encryption-statement')
1073
1074 def get_permissions(self):
1075 perms = self.manager.get_resource_manager('s3').get_permissions()
1076 return perms
1077
1078 def process(self, buckets, event=None):
1079 return list(filter(None, map(self.process_bucket, buckets)))
1080
1081 def process_bucket(self, b):
1082 p = b.get('Policy')
1083 if p is None:
1084 return b
1085 p = json.loads(p)
1086 encryption_statement = dict(ENCRYPTION_STATEMENT_GLOB)
1087
1088 statements = p.get('Statement', [])
1089 check = False
1090 for s in list(statements):
1091 if 'Sid' in s:
1092 encryption_statement["Sid"] = s["Sid"]
1093 if 'Resource' in s:
1094 encryption_statement["Resource"] = s["Resource"]
1095 if s == encryption_statement:
1096 check = True
1097 break
1098 if check:
1099 return None
1100 else:
1101 return b
1102
1103
1104@filters.register('missing-statement')
1105@filters.register('missing-policy-statement')
1106class MissingPolicyStatementFilter(Filter):
1107 """Find buckets missing a set of named policy statements.
1108
1109 :example:
1110
1111 .. code-block:: yaml
1112
1113 policies:
1114 - name: s3-bucket-missing-statement
1115 resource: s3
1116 filters:
1117 - type: missing-statement
1118 statement_ids:
1119 - RequiredEncryptedPutObject
1120 """
1121
1122 schema = type_schema(
1123 'missing-policy-statement',
1124 aliases=('missing-statement',),
1125 statement_ids={'type': 'array', 'items': {'type': 'string'}})
1126
1127 def __call__(self, b):
1128 p = b.get('Policy')
1129 if p is None:
1130 return b
1131
1132 p = json.loads(p)
1133
1134 required = list(self.data.get('statement_ids', []))
1135 statements = p.get('Statement', [])
1136 for s in list(statements):
1137 if s.get('Sid') in required:
1138 required.remove(s['Sid'])
1139 if not required:
1140 return False
1141 return True
1142
1143
1144@filters.register('bucket-notification')
1145class BucketNotificationFilter(ValueFilter):
1146 """Filter based on bucket notification configuration.
1147
1148 :example:
1149
1150 .. code-block:: yaml
1151
1152 policies:
1153 - name: delete-incorrect-notification
1154 resource: s3
1155 filters:
1156 - type: bucket-notification
1157 kind: lambda
1158 key: Id
1159 value: "IncorrectLambda"
1160 op: eq
1161 actions:
1162 - type: delete-bucket-notification
1163 statement_ids: matched
1164 """
1165
1166 schema = type_schema(
1167 'bucket-notification',
1168 required=['kind'],
1169 kind={'type': 'string', 'enum': ['lambda', 'sns', 'sqs']},
1170 rinherit=ValueFilter.schema)
1171 schema_alias = False
1172 annotation_key = 'c7n:MatchedNotificationConfigurationIds'
1173
1174 permissions = ('s3:GetBucketNotification',)
1175
1176 FIELDS = {
1177 'lambda': 'LambdaFunctionConfigurations',
1178 'sns': 'TopicConfigurations',
1179 'sqs': 'QueueConfigurations'
1180 }
1181
1182 def process(self, buckets, event=None):
1183 return super(BucketNotificationFilter, self).process(buckets, event)
1184
1185 def __call__(self, bucket):
1186
1187 field = self.FIELDS[self.data['kind']]
1188 found = False
1189 for config in bucket.get('Notification', {}).get(field, []):
1190 if self.match(config):
1191 set_annotation(
1192 bucket,
1193 BucketNotificationFilter.annotation_key,
1194 config['Id'])
1195 found = True
1196 return found
1197
1198
1199@filters.register('bucket-logging')
1200class BucketLoggingFilter(BucketFilterBase):
1201 """Filter based on bucket logging configuration.
1202
1203 :example:
1204
1205 .. code-block:: yaml
1206
1207 policies:
1208 - name: add-bucket-logging-if-missing
1209 resource: s3
1210 filters:
1211 - type: bucket-logging
1212 op: disabled
1213 actions:
1214 - type: toggle-logging
1215 target_bucket: "{account_id}-{region}-s3-logs"
1216 target_prefix: "{source_bucket_name}/"
1217
1218 policies:
1219 - name: update-incorrect-or-missing-logging
1220 resource: s3
1221 filters:
1222 - type: bucket-logging
1223 op: not-equal
1224 target_bucket: "{account_id}-{region}-s3-logs"
1225 target_prefix: "{account}/{source_bucket_name}/"
1226 actions:
1227 - type: toggle-logging
1228 target_bucket: "{account_id}-{region}-s3-logs"
1229 target_prefix: "{account}/{source_bucket_name}/"
1230 """
1231
1232 schema = type_schema(
1233 'bucket-logging',
1234 op={'enum': ['enabled', 'disabled', 'equal', 'not-equal', 'eq', 'ne']},
1235 required=['op'],
1236 target_bucket={'type': 'string'},
1237 target_prefix={'type': 'string'})
1238 schema_alias = False
1239 account_name = None
1240
1241 permissions = ("s3:GetBucketLogging", "iam:ListAccountAliases")
1242
1243 def process(self, buckets, event=None):
1244 return list(filter(None, map(self.process_bucket, buckets)))
1245
1246 def process_bucket(self, b):
1247 if self.match_bucket(b):
1248 return b
1249
1250 def match_bucket(self, b):
1251 op = self.data.get('op')
1252
1253 logging = b.get('Logging', {})
1254 if op == 'disabled':
1255 return logging == {}
1256 elif op == 'enabled':
1257 return logging != {}
1258
1259 if self.account_name is None:
1260 session = local_session(self.manager.session_factory)
1261 self.account_name = get_account_alias_from_sts(session)
1262
1263 variables = self.get_std_format_args(b)
1264 variables.update({
1265 'account': self.account_name,
1266 'source_bucket_name': b['Name'],
1267 'source_bucket_region': get_region(b),
1268 'target_bucket_name': self.data.get('target_bucket'),
1269 'target_prefix': self.data.get('target_prefix'),
1270 })
1271 data = format_string_values(self.data, **variables)
1272 target_bucket = data.get('target_bucket')
1273 target_prefix = data.get('target_prefix', b['Name'] + '/')
1274
1275 target_config = {
1276 "TargetBucket": target_bucket,
1277 "TargetPrefix": target_prefix
1278 } if target_bucket else {}
1279
1280 if op in ('not-equal', 'ne'):
1281 return logging != target_config
1282 else:
1283 return logging == target_config
1284
1285
1286@actions.register('delete-bucket-notification')
1287class DeleteBucketNotification(BucketActionBase):
1288 """Action to delete S3 bucket notification configurations"""
1289
1290 schema = type_schema(
1291 'delete-bucket-notification',
1292 required=['statement_ids'],
1293 statement_ids={'oneOf': [
1294 {'enum': ['matched']},
1295 {'type': 'array', 'items': {'type': 'string'}}]})
1296
1297 permissions = ('s3:PutBucketNotification',)
1298
1299 def process_bucket(self, bucket):
1300 n = bucket['Notification']
1301 if not n:
1302 return
1303
1304 statement_ids = self.data.get('statement_ids')
1305 if statement_ids == 'matched':
1306 statement_ids = bucket.get(BucketNotificationFilter.annotation_key, ())
1307 if not statement_ids:
1308 return
1309
1310 cfg = defaultdict(list)
1311
1312 for t in BucketNotificationFilter.FIELDS.values():
1313 for c in n.get(t, []):
1314 if c['Id'] not in statement_ids:
1315 cfg[t].append(c)
1316
1317 client = bucket_client(local_session(self.manager.session_factory), bucket)
1318 client.put_bucket_notification_configuration(
1319 Bucket=bucket['Name'],
1320 NotificationConfiguration=cfg)
1321
1322
1323@actions.register('no-op')
1324class NoOp(BucketActionBase):
1325
1326 schema = type_schema('no-op')
1327 permissions = ('s3:ListAllMyBuckets',)
1328
1329 def process(self, buckets):
1330 return None
1331
1332
1333@actions.register('set-statements')
1334class SetPolicyStatement(BucketActionBase):
1335 """Action to add or update policy statements to S3 buckets
1336
1337 :example:
1338
1339 .. code-block:: yaml
1340
1341 policies:
1342 - name: force-s3-https
1343 resource: s3
1344 actions:
1345 - type: set-statements
1346 statements:
1347 - Sid: "DenyHttp"
1348 Effect: "Deny"
1349 Action: "s3:GetObject"
1350 Principal:
1351 AWS: "*"
1352 Resource: "arn:aws:s3:::{bucket_name}/*"
1353 Condition:
1354 Bool:
1355 "aws:SecureTransport": false
1356 """
1357
1358 permissions = ('s3:PutBucketPolicy',)
1359
1360 schema = type_schema(
1361 'set-statements',
1362 **{
1363 'statements': {
1364 'type': 'array',
1365 'items': {
1366 'type': 'object',
1367 'properties': {
1368 'Sid': {'type': 'string'},
1369 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']},
1370 'Principal': {'anyOf': [{'type': 'string'},
1371 {'type': 'object'}, {'type': 'array'}]},
1372 'NotPrincipal': {'anyOf': [{'type': 'object'}, {'type': 'array'}]},
1373 'Action': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1374 'NotAction': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1375 'Resource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1376 'NotResource': {'anyOf': [{'type': 'string'}, {'type': 'array'}]},
1377 'Condition': {'type': 'object'}
1378 },
1379 'required': ['Sid', 'Effect'],
1380 'oneOf': [
1381 {'required': ['Principal', 'Action', 'Resource']},
1382 {'required': ['NotPrincipal', 'Action', 'Resource']},
1383 {'required': ['Principal', 'NotAction', 'Resource']},
1384 {'required': ['NotPrincipal', 'NotAction', 'Resource']},
1385 {'required': ['Principal', 'Action', 'NotResource']},
1386 {'required': ['NotPrincipal', 'Action', 'NotResource']},
1387 {'required': ['Principal', 'NotAction', 'NotResource']},
1388 {'required': ['NotPrincipal', 'NotAction', 'NotResource']}
1389 ]
1390 }
1391 }
1392 }
1393 )
1394
1395 def process_bucket(self, bucket):
1396 policy = bucket.get('Policy') or '{}'
1397
1398 target_statements = format_string_values(
1399 copy.deepcopy({s['Sid']: s for s in self.data.get('statements', [])}),
1400 **self.get_std_format_args(bucket))
1401
1402 policy = json.loads(policy)
1403 bucket_statements = policy.setdefault('Statement', [])
1404
1405 for s in bucket_statements:
1406 if s.get('Sid') not in target_statements:
1407 continue
1408 if s == target_statements[s['Sid']]:
1409 target_statements.pop(s['Sid'])
1410
1411 if not target_statements:
1412 return
1413
1414 bucket_statements.extend(target_statements.values())
1415 policy = json.dumps(policy)
1416
1417 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1418 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=policy)
1419 return {'Name': bucket['Name'], 'Policy': policy}
1420
1421
1422@actions.register('remove-statements')
1423class RemovePolicyStatement(RemovePolicyBase):
1424 """Action to remove policy statements from S3 buckets
1425
1426 :example:
1427
1428 .. code-block:: yaml
1429
1430 policies:
1431 - name: s3-remove-encrypt-put
1432 resource: s3
1433 filters:
1434 - type: has-statement
1435 statement_ids:
1436 - RequireEncryptedPutObject
1437 actions:
1438 - type: remove-statements
1439 statement_ids:
1440 - RequiredEncryptedPutObject
1441 """
1442
1443 permissions = ("s3:PutBucketPolicy", "s3:DeleteBucketPolicy")
1444
1445 def process(self, buckets):
1446 with self.executor_factory(max_workers=3) as w:
1447 futures = {}
1448 results = []
1449 for b in buckets:
1450 futures[w.submit(self.process_bucket, b)] = b
1451 for f in as_completed(futures):
1452 if f.exception():
1453 b = futures[f]
1454 self.log.error('error modifying bucket:%s\n%s',
1455 b['Name'], f.exception())
1456 results += filter(None, [f.result()])
1457 return results
1458
1459 def process_bucket(self, bucket):
1460 p = bucket.get('Policy')
1461 if p is None:
1462 return
1463
1464 p = json.loads(p)
1465
1466 statements, found = self.process_policy(
1467 p, bucket, CrossAccountAccessFilter.annotation_key)
1468
1469 if not found:
1470 return
1471
1472 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1473
1474 if not statements:
1475 s3.delete_bucket_policy(Bucket=bucket['Name'])
1476 else:
1477 s3.put_bucket_policy(Bucket=bucket['Name'], Policy=json.dumps(p))
1478 return {'Name': bucket['Name'], 'State': 'PolicyRemoved', 'Statements': found}
1479
1480
1481@actions.register('set-replication')
1482class SetBucketReplicationConfig(BucketActionBase):
1483 """Action to add or remove replication configuration statement from S3 buckets
1484
1485 :example:
1486
1487 .. code-block:: yaml
1488
1489 policies:
1490 - name: s3-unapproved-account-replication
1491 resource: s3
1492 filters:
1493 - type: value
1494 key: Replication.ReplicationConfiguration.Rules[].Destination.Account
1495 value: present
1496 - type: value
1497 key: Replication.ReplicationConfiguration.Rules[].Destination.Account
1498 value_from:
1499 url: 's3:///path/to/file.json'
1500 format: json
1501 expr: "approved_accounts.*"
1502 op: ni
1503 actions:
1504 - type: set-replication
1505 state: enable
1506 """
1507 schema = type_schema(
1508 'set-replication',
1509 state={'type': 'string', 'enum': ['enable', 'disable', 'remove']})
1510 permissions = ("s3:GetReplicationConfiguration", "s3:PutReplicationConfiguration")
1511
1512 def process(self, buckets):
1513 with self.executor_factory(max_workers=3) as w:
1514 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1515 errors = []
1516 for future in as_completed(futures):
1517 bucket = futures[future]
1518 try:
1519 future.result()
1520 except ClientError as e:
1521 errors.append("Message: %s Bucket: %s", e, bucket['Name'])
1522 if errors:
1523 raise Exception('\n'.join(map(str, errors)))
1524
1525 def process_bucket(self, bucket):
1526 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1527 state = self.data.get('state')
1528 if state is not None:
1529 if state == 'remove':
1530 s3.delete_bucket_replication(Bucket=bucket['Name'])
1531 return {'Name': bucket['Name'], 'State': 'ReplicationConfigRemoved'}
1532 if state in ('enable', 'disable'):
1533 config = s3.get_bucket_replication(Bucket=bucket['Name'])
1534 for rule in config['ReplicationConfiguration']['Rules']:
1535 rule['Status'] = 'Enabled' if state == 'enable' else 'Disabled'
1536 s3.put_bucket_replication(
1537 Bucket=bucket['Name'],
1538 ReplicationConfiguration=config['ReplicationConfiguration']
1539 )
1540 return {'Name': bucket['Name'], 'State': 'ReplicationConfigUpdated'}
1541
1542
1543@filters.register('check-public-block')
1544class FilterPublicBlock(Filter):
1545 """Filter for s3 bucket public blocks
1546
1547 If no filter paramaters are provided it checks to see if any are unset or False.
1548
1549 If parameters are provided only the provided ones are checked.
1550
1551 :example:
1552
1553 .. code-block:: yaml
1554
1555 policies:
1556 - name: CheckForPublicAclBlock-Off
1557 resource: s3
1558 region: us-east-1
1559 filters:
1560 - type: check-public-block
1561 BlockPublicAcls: true
1562 BlockPublicPolicy: true
1563 """
1564
1565 schema = type_schema(
1566 'check-public-block',
1567 BlockPublicAcls={'type': 'boolean'},
1568 IgnorePublicAcls={'type': 'boolean'},
1569 BlockPublicPolicy={'type': 'boolean'},
1570 RestrictPublicBuckets={'type': 'boolean'})
1571 permissions = ("s3:GetBucketPublicAccessBlock",)
1572 keys = (
1573 'BlockPublicPolicy', 'BlockPublicAcls', 'IgnorePublicAcls', 'RestrictPublicBuckets')
1574 annotation_key = 'c7n:PublicAccessBlock'
1575
1576 def process(self, buckets, event=None):
1577 results = []
1578 with self.executor_factory(max_workers=2) as w:
1579 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1580 for f in as_completed(futures):
1581 if f.result():
1582 results.append(futures[f])
1583 return results
1584
1585 def process_bucket(self, bucket):
1586 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1587 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys}))
1588 if self.annotation_key not in bucket:
1589 try:
1590 config = s3.get_public_access_block(
1591 Bucket=bucket['Name'])['PublicAccessBlockConfiguration']
1592 except ClientError as e:
1593 error_code = e.response['Error']['Code']
1594 if error_code == 'NoSuchPublicAccessBlockConfiguration':
1595 pass
1596 elif error_code == 'AccessDenied':
1597 # Follow the same logic as `assemble_bucket` - log and continue on access
1598 # denied errors rather than halting a policy altogether
1599 method = 'GetPublicAccessBlock'
1600 log.warning(
1601 "Bucket:%s unable to invoke method:%s error:%s ",
1602 bucket['Name'], method, e.response['Error']['Message']
1603 )
1604 bucket.setdefault('c7n:DeniedMethods', []).append(method)
1605 else:
1606 raise
1607 bucket[self.annotation_key] = config
1608 return self.matches_filter(config)
1609
1610 def matches_filter(self, config):
1611 key_set = [key for key in self.keys if key in self.data]
1612 if key_set:
1613 return all([self.data.get(key) is config[key] for key in key_set])
1614 else:
1615 return not all(config.values())
1616
1617
1618@actions.register('set-public-block')
1619class SetPublicBlock(BucketActionBase):
1620 """Action to update Public Access blocks on S3 buckets
1621
1622 If no action parameters are provided all settings will be set to the `state`, which defaults
1623
1624 If action parameters are provided, those will be set and other extant values preserved.
1625
1626 :example:
1627
1628 .. code-block:: yaml
1629
1630 policies:
1631 - name: s3-public-block-enable-all
1632 resource: s3
1633 filters:
1634 - type: check-public-block
1635 actions:
1636 - type: set-public-block
1637
1638 policies:
1639 - name: s3-public-block-disable-all
1640 resource: s3
1641 filters:
1642 - type: check-public-block
1643 actions:
1644 - type: set-public-block
1645 state: false
1646
1647 policies:
1648 - name: s3-public-block-enable-some
1649 resource: s3
1650 filters:
1651 - or:
1652 - type: check-public-block
1653 BlockPublicAcls: false
1654 - type: check-public-block
1655 BlockPublicPolicy: false
1656 actions:
1657 - type: set-public-block
1658 BlockPublicAcls: true
1659 BlockPublicPolicy: true
1660
1661 """
1662
1663 schema = type_schema(
1664 'set-public-block',
1665 state={'type': 'boolean', 'default': True},
1666 BlockPublicAcls={'type': 'boolean'},
1667 IgnorePublicAcls={'type': 'boolean'},
1668 BlockPublicPolicy={'type': 'boolean'},
1669 RestrictPublicBuckets={'type': 'boolean'})
1670 permissions = ("s3:GetBucketPublicAccessBlock", "s3:PutBucketPublicAccessBlock")
1671 keys = FilterPublicBlock.keys
1672 annotation_key = FilterPublicBlock.annotation_key
1673
1674 def process(self, buckets):
1675 with self.executor_factory(max_workers=3) as w:
1676 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
1677 for future in as_completed(futures):
1678 future.result()
1679
1680 def process_bucket(self, bucket):
1681 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
1682 config = dict(bucket.get(self.annotation_key, {key: False for key in self.keys}))
1683 if self.annotation_key not in bucket:
1684 try:
1685 config = s3.get_public_access_block(
1686 Bucket=bucket['Name'])['PublicAccessBlockConfiguration']
1687 except ClientError as e:
1688 if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration':
1689 raise
1690
1691 key_set = [key for key in self.keys if key in self.data]
1692 if key_set:
1693 for key in key_set:
1694 config[key] = self.data.get(key)
1695 else:
1696 for key in self.keys:
1697 config[key] = self.data.get('state', True)
1698 s3.put_public_access_block(
1699 Bucket=bucket['Name'], PublicAccessBlockConfiguration=config)
1700
1701
1702@actions.register('toggle-versioning')
1703class ToggleVersioning(BucketActionBase):
1704 """Action to enable/suspend versioning on a S3 bucket
1705
1706 Note versioning can never be disabled only suspended.
1707
1708 :example:
1709
1710 .. code-block:: yaml
1711
1712 policies:
1713 - name: s3-enable-versioning
1714 resource: s3
1715 filters:
1716 - or:
1717 - type: value
1718 key: Versioning.Status
1719 value: Suspended
1720 - type: value
1721 key: Versioning.Status
1722 value: absent
1723 actions:
1724 - type: toggle-versioning
1725 enabled: true
1726 """
1727
1728 schema = type_schema(
1729 'toggle-versioning',
1730 enabled={'type': 'boolean'})
1731 permissions = ("s3:PutBucketVersioning",)
1732
1733 def process_versioning(self, resource, state):
1734 client = bucket_client(
1735 local_session(self.manager.session_factory), resource)
1736 try:
1737 client.put_bucket_versioning(
1738 Bucket=resource['Name'],
1739 VersioningConfiguration={
1740 'Status': state})
1741 except ClientError as e:
1742 if e.response['Error']['Code'] != 'AccessDenied':
1743 log.error(
1744 "Unable to put bucket versioning on bucket %s: %s" % (resource['Name'], e))
1745 raise
1746 log.warning(
1747 "Access Denied Bucket:%s while put bucket versioning" % resource['Name'])
1748
1749 # mfa delete enablement looks like it needs the serial and a current token.
1750 def process(self, resources):
1751 enabled = self.data.get('enabled', True)
1752 for r in resources:
1753 if 'Versioning' not in r or not r['Versioning']:
1754 r['Versioning'] = {'Status': 'Suspended'}
1755 if enabled and (
1756 r['Versioning']['Status'] == 'Suspended'):
1757 self.process_versioning(r, 'Enabled')
1758 if not enabled and r['Versioning']['Status'] == 'Enabled':
1759 self.process_versioning(r, 'Suspended')
1760
1761
1762@actions.register('toggle-logging')
1763class ToggleLogging(BucketActionBase):
1764 """Action to enable/disable logging on a S3 bucket.
1765
1766 Target bucket ACL must allow for WRITE and READ_ACP Permissions
1767 Not specifying a target_prefix will default to the current bucket name.
1768 https://docs.aws.amazon.com/AmazonS3/latest/dev/enable-logging-programming.html
1769
1770 :example:
1771
1772 .. code-block:: yaml
1773
1774 policies:
1775 - name: s3-enable-logging
1776 resource: s3
1777 filters:
1778 - "tag:Testing": present
1779 actions:
1780 - type: toggle-logging
1781 target_bucket: log-bucket
1782 target_prefix: logs123/
1783
1784 policies:
1785 - name: s3-force-standard-logging
1786 resource: s3
1787 filters:
1788 - type: bucket-logging
1789 op: not-equal
1790 target_bucket: "{account_id}-{region}-s3-logs"
1791 target_prefix: "{account}/{source_bucket_name}/"
1792 actions:
1793 - type: toggle-logging
1794 target_bucket: "{account_id}-{region}-s3-logs"
1795 target_prefix: "{account}/{source_bucket_name}/"
1796 """
1797 schema = type_schema(
1798 'toggle-logging',
1799 enabled={'type': 'boolean'},
1800 target_bucket={'type': 'string'},
1801 target_prefix={'type': 'string'})
1802
1803 permissions = ("s3:PutBucketLogging", "iam:ListAccountAliases")
1804
1805 def validate(self):
1806 if self.data.get('enabled', True):
1807 if not self.data.get('target_bucket'):
1808 raise PolicyValidationError(
1809 "target_bucket must be specified on %s" % (
1810 self.manager.data,))
1811 return self
1812
1813 def process(self, resources):
1814 session = local_session(self.manager.session_factory)
1815 kwargs = {
1816 "enabled": self.data.get('enabled', True),
1817 "session": session,
1818 "account_name": get_account_alias_from_sts(session),
1819 }
1820
1821 return self._process_with_futures(resources, **kwargs)
1822
1823 def process_bucket(self, r, enabled=None, session=None, account_name=None):
1824 client = bucket_client(session, r)
1825 is_logging = bool(r.get('Logging'))
1826
1827 if enabled:
1828 variables = self.get_std_format_args(r)
1829 variables.update({
1830 'account': account_name,
1831 'source_bucket_name': r['Name'],
1832 'source_bucket_region': get_region(r),
1833 'target_bucket_name': self.data.get('target_bucket'),
1834 'target_prefix': self.data.get('target_prefix'),
1835 })
1836 data = format_string_values(self.data, **variables)
1837 config = {
1838 'TargetBucket': data.get('target_bucket'),
1839 'TargetPrefix': data.get('target_prefix', r['Name'] + '/')
1840 }
1841 if not is_logging or r.get('Logging') != config:
1842 client.put_bucket_logging(
1843 Bucket=r['Name'],
1844 BucketLoggingStatus={'LoggingEnabled': config}
1845 )
1846 r['Logging'] = config
1847
1848 elif not enabled and is_logging:
1849 client.put_bucket_logging(
1850 Bucket=r['Name'], BucketLoggingStatus={})
1851 r['Logging'] = {}
1852
1853
1854@actions.register('attach-encrypt')
1855class AttachLambdaEncrypt(BucketActionBase):
1856 """Action attaches lambda encryption policy to S3 bucket
1857 supports attachment via lambda bucket notification or sns notification
1858 to invoke lambda. a special topic value of `default` will utilize an
1859 extant notification or create one matching the bucket name.
1860
1861 :example:
1862
1863
1864 .. code-block:: yaml
1865
1866
1867 policies:
1868 - name: attach-lambda-encrypt
1869 resource: s3
1870 filters:
1871 - type: missing-policy-statement
1872 actions:
1873 - type: attach-encrypt
1874 role: arn:aws:iam::123456789012:role/my-role
1875
1876 """
1877 schema = type_schema(
1878 'attach-encrypt',
1879 role={'type': 'string'},
1880 tags={'type': 'object'},
1881 topic={'type': 'string'})
1882
1883 permissions = (
1884 "s3:PutBucketNotification", "s3:GetBucketNotification",
1885 # lambda manager uses quite a few perms to provision lambdas
1886 # and event sources, hard to disamgibuate punt for now.
1887 "lambda:*",
1888 )
1889
1890 def __init__(self, data=None, manager=None):
1891 self.data = data or {}
1892 self.manager = manager
1893
1894 def validate(self):
1895 if (not getattr(self.manager.config, 'dryrun', True) and
1896 not self.data.get('role', self.manager.config.assume_role)):
1897 raise PolicyValidationError(
1898 "attach-encrypt: role must be specified either "
1899 "via assume or in config on %s" % (self.manager.data,))
1900
1901 return self
1902
1903 def process(self, buckets):
1904 from c7n.mu import LambdaManager
1905 from c7n.ufuncs.s3crypt import get_function
1906
1907 account_id = self.manager.config.account_id
1908 topic_arn = self.data.get('topic')
1909
1910 func = get_function(
1911 None, self.data.get('role', self.manager.config.assume_role),
1912 account_id=account_id, tags=self.data.get('tags'))
1913
1914 regions = {get_region(b) for b in buckets}
1915
1916 # session managers by region
1917 region_sessions = {}
1918 for r in regions:
1919 region_sessions[r] = functools.partial(
1920 self.manager.session_factory, region=r)
1921
1922 # Publish function to all of our buckets regions
1923 region_funcs = {}
1924
1925 for r in regions:
1926 lambda_mgr = LambdaManager(region_sessions[r])
1927 lambda_mgr.publish(func)
1928 region_funcs[r] = func
1929
1930 with self.executor_factory(max_workers=3) as w:
1931 results = []
1932 futures = []
1933 for b in buckets:
1934 region = get_region(b)
1935 futures.append(
1936 w.submit(
1937 self.process_bucket,
1938 region_funcs[region],
1939 b,
1940 topic_arn,
1941 account_id,
1942 region_sessions[region]
1943 ))
1944 for f in as_completed(futures):
1945 if f.exception():
1946 log.exception(
1947 "Error attaching lambda-encrypt %s" % (f.exception()))
1948 results.append(f.result())
1949 return list(filter(None, results))
1950
1951 def process_bucket(self, func, bucket, topic, account_id, session_factory):
1952 from c7n.mu import BucketSNSNotification, BucketLambdaNotification
1953 if topic:
1954 topic = None if topic == 'default' else topic
1955 source = BucketSNSNotification(session_factory, bucket, topic)
1956 else:
1957 source = BucketLambdaNotification(
1958 {'account_s3': account_id}, session_factory, bucket)
1959 return source.add(func, None)
1960
1961
1962@actions.register('encryption-policy')
1963class EncryptionRequiredPolicy(BucketActionBase):
1964 """Action to apply an encryption policy to S3 buckets
1965
1966
1967 :example:
1968
1969 .. code-block:: yaml
1970
1971 policies:
1972 - name: s3-enforce-encryption
1973 resource: s3
1974 mode:
1975 type: cloudtrail
1976 events:
1977 - CreateBucket
1978 actions:
1979 - encryption-policy
1980 """
1981
1982 permissions = ("s3:GetBucketPolicy", "s3:PutBucketPolicy")
1983 schema = type_schema('encryption-policy')
1984
1985 def __init__(self, data=None, manager=None):
1986 self.data = data or {}
1987 self.manager = manager
1988
1989 def process(self, buckets):
1990 with self.executor_factory(max_workers=3) as w:
1991 results = w.map(self.process_bucket, buckets)
1992 results = list(filter(None, list(results)))
1993 return results
1994
1995 def process_bucket(self, b):
1996 p = b['Policy']
1997 if p is None:
1998 log.info("No policy found, creating new")
1999 p = {'Version': "2012-10-17", "Statement": []}
2000 else:
2001 p = json.loads(p)
2002
2003 encryption_sid = "RequiredEncryptedPutObject"
2004 encryption_statement = {
2005 'Sid': encryption_sid,
2006 'Effect': 'Deny',
2007 'Principal': '*',
2008 'Action': 's3:PutObject',
2009 "Resource": "arn:aws:s3:::%s/*" % b['Name'],
2010 "Condition": {
2011 # AWS Managed Keys or KMS keys, note policy language
2012 # does not support custom kms (todo add issue)
2013 "StringNotEquals": {
2014 "s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]}}}
2015
2016 statements = p.get('Statement', [])
2017 for s in list(statements):
2018 if s.get('Sid', '') == encryption_sid:
2019 log.debug("Bucket:%s Found extant encrypt policy", b['Name'])
2020 if s != encryption_statement:
2021 log.info(
2022 "Bucket:%s updating extant encrypt policy", b['Name'])
2023 statements.remove(s)
2024 else:
2025 return
2026
2027 session = self.manager.session_factory()
2028 s3 = bucket_client(session, b)
2029 statements.append(encryption_statement)
2030 p['Statement'] = statements
2031 log.info('Bucket:%s attached encryption policy' % b['Name'])
2032
2033 try:
2034 s3.put_bucket_policy(
2035 Bucket=b['Name'],
2036 Policy=json.dumps(p))
2037 except ClientError as e:
2038 if e.response['Error']['Code'] == 'NoSuchBucket':
2039 return
2040 self.log.exception(
2041 "Error on bucket:%s putting policy\n%s error:%s",
2042 b['Name'],
2043 json.dumps(statements, indent=2), e)
2044 raise
2045 return {'Name': b['Name'], 'State': 'PolicyAttached'}
2046
2047
2048class BucketScanLog:
2049 """Offload remediated key ids to a disk file in batches
2050
2051 A bucket keyspace is effectively infinite, we need to store partial
2052 results out of memory, this class provides for a json log on disk
2053 with partial write support.
2054
2055 json output format:
2056 - [list_of_serialized_keys],
2057 - [] # Empty list of keys at end when we close the buffer
2058
2059 """
2060
2061 def __init__(self, log_dir, name):
2062 self.log_dir = log_dir
2063 self.name = name
2064 self.fh = None
2065 self.count = 0
2066
2067 @property
2068 def path(self):
2069 return os.path.join(self.log_dir, "%s.json" % self.name)
2070
2071 def __enter__(self):
2072 # Don't require output directories
2073 if self.log_dir is None:
2074 return
2075
2076 self.fh = open(self.path, 'w')
2077 self.fh.write("[\n")
2078 return self
2079
2080 def __exit__(self, exc_type=None, exc_value=None, exc_frame=None):
2081 if self.fh is None:
2082 return
2083 # we need an empty marker list at end to avoid trailing commas
2084 self.fh.write("[]")
2085 # and close the surrounding list
2086 self.fh.write("\n]")
2087 self.fh.close()
2088 if not self.count:
2089 os.remove(self.fh.name)
2090 self.fh = None
2091 return False
2092
2093 def add(self, keys):
2094 self.count += len(keys)
2095 if self.fh is None:
2096 return
2097 self.fh.write(dumps(keys))
2098 self.fh.write(",\n")
2099
2100
2101class ScanBucket(BucketActionBase):
2102
2103 permissions = ("s3:ListBucket",)
2104
2105 bucket_ops = {
2106 'standard': {
2107 'iterator': 'list_objects',
2108 'contents_key': ['Contents'],
2109 'key_processor': 'process_key'
2110 },
2111 'versioned': {
2112 'iterator': 'list_object_versions',
2113 'contents_key': ['Versions'],
2114 'key_processor': 'process_version'
2115 }
2116 }
2117
2118 def __init__(self, data, manager=None):
2119 super(ScanBucket, self).__init__(data, manager)
2120 self.denied_buckets = set()
2121
2122 def get_bucket_style(self, b):
2123 return (
2124 b.get('Versioning', {'Status': ''}).get('Status') in (
2125 'Enabled', 'Suspended') and 'versioned' or 'standard')
2126
2127 def get_bucket_op(self, b, op_name):
2128 bucket_style = self.get_bucket_style(b)
2129 op = self.bucket_ops[bucket_style][op_name]
2130 if op_name == 'key_processor':
2131 return getattr(self, op)
2132 return op
2133
2134 def get_keys(self, b, key_set):
2135 content_keys = self.get_bucket_op(b, 'contents_key')
2136 keys = []
2137 for ck in content_keys:
2138 keys.extend(key_set.get(ck, []))
2139 return keys
2140
2141 def process(self, buckets):
2142 results = self._process_with_futures(self.process_bucket, buckets)
2143 self.write_denied_buckets_file()
2144 return results
2145
2146 def _process_with_futures(self, helper, buckets, max_workers=3):
2147 results = []
2148 with self.executor_factory(max_workers) as w:
2149 futures = {}
2150 for b in buckets:
2151 futures[w.submit(helper, b)] = b
2152 for f in as_completed(futures):
2153 if f.exception():
2154 b = futures[f]
2155 self.log.error(
2156 "Error on bucket:%s region:%s policy:%s error: %s",
2157 b['Name'], b.get('Location', 'unknown'),
2158 self.manager.data.get('name'), f.exception())
2159 self.denied_buckets.add(b['Name'])
2160 continue
2161 result = f.result()
2162 if result:
2163 results.append(result)
2164 return results
2165
2166 def write_denied_buckets_file(self):
2167 if (self.denied_buckets and
2168 self.manager.ctx.log_dir and
2169 not isinstance(self.manager.ctx.output, NullBlobOutput)):
2170 with open(
2171 os.path.join(
2172 self.manager.ctx.log_dir, 'denied.json'), 'w') as fh:
2173 json.dump(list(self.denied_buckets), fh, indent=2)
2174 self.denied_buckets = set()
2175
2176 def process_bucket(self, b):
2177 log.info(
2178 "Scanning bucket:%s visitor:%s style:%s" % (
2179 b['Name'], self.__class__.__name__, self.get_bucket_style(b)))
2180
2181 s = self.manager.session_factory()
2182 s3 = bucket_client(s, b)
2183
2184 # The bulk of _process_bucket function executes inline in
2185 # calling thread/worker context, neither paginator nor
2186 # bucketscan log should be used across worker boundary.
2187 p = s3.get_paginator(
2188 self.get_bucket_op(b, 'iterator')).paginate(Bucket=b['Name'])
2189
2190 with BucketScanLog(self.manager.ctx.log_dir, b['Name']) as key_log:
2191 with self.executor_factory(max_workers=10) as w:
2192 try:
2193 return self._process_bucket(b, p, key_log, w)
2194 except ClientError as e:
2195 if e.response['Error']['Code'] == 'NoSuchBucket':
2196 log.warning(
2197 "Bucket:%s removed while scanning" % b['Name'])
2198 return
2199 if e.response['Error']['Code'] == 'AccessDenied':
2200 log.warning(
2201 "Access Denied Bucket:%s while scanning" % b['Name'])
2202 self.denied_buckets.add(b['Name'])
2203 return
2204 log.exception(
2205 "Error processing bucket:%s paginator:%s" % (
2206 b['Name'], p))
2207
2208 __call__ = process_bucket
2209
2210 def _process_bucket(self, b, p, key_log, w):
2211 count = 0
2212
2213 for key_set in p:
2214 keys = self.get_keys(b, key_set)
2215 count += len(keys)
2216 futures = []
2217
2218 for batch in chunks(keys, size=100):
2219 if not batch:
2220 continue
2221 futures.append(w.submit(self.process_chunk, batch, b))
2222
2223 for f in as_completed(futures):
2224 if f.exception():
2225 log.exception("Exception Processing bucket:%s key batch %s" % (
2226 b['Name'], f.exception()))
2227 continue
2228 r = f.result()
2229 if r:
2230 key_log.add(r)
2231
2232 # Log completion at info level, progress at debug level
2233 if key_set['IsTruncated']:
2234 log.debug('Scan progress bucket:%s keys:%d remediated:%d ...',
2235 b['Name'], count, key_log.count)
2236 else:
2237 log.info('Scan Complete bucket:%s keys:%d remediated:%d',
2238 b['Name'], count, key_log.count)
2239
2240 b['KeyScanCount'] = count
2241 b['KeyRemediated'] = key_log.count
2242 return {
2243 'Bucket': b['Name'], 'Remediated': key_log.count, 'Count': count}
2244
2245 def process_chunk(self, batch, bucket):
2246 raise NotImplementedError()
2247
2248 def process_key(self, s3, key, bucket_name, info=None):
2249 raise NotImplementedError()
2250
2251 def process_version(self, s3, bucket, key):
2252 raise NotImplementedError()
2253
2254
2255@actions.register('encrypt-keys')
2256class EncryptExtantKeys(ScanBucket):
2257 """Action to encrypt unencrypted S3 objects
2258
2259 :example:
2260
2261 .. code-block:: yaml
2262
2263 policies:
2264 - name: s3-encrypt-objects
2265 resource: s3
2266 actions:
2267 - type: encrypt-keys
2268 crypto: aws:kms
2269 key-id: 9c3983be-c6cf-11e6-9d9d-cec0c932ce01
2270 """
2271
2272 permissions = (
2273 "s3:GetObject",
2274 "s3:PutObject",
2275 "s3:DeleteObjectVersion",
2276 "s3:RestoreObject",
2277 ) + ScanBucket.permissions
2278
2279 schema = {
2280 'type': 'object',
2281 'additionalProperties': False,
2282 'properties': {
2283 'type': {'enum': ['encrypt-keys']},
2284 'report-only': {'type': 'boolean'},
2285 'glacier': {'type': 'boolean'},
2286 'large': {'type': 'boolean'},
2287 'crypto': {'enum': ['AES256', 'aws:kms']},
2288 'key-id': {'type': 'string'}
2289 },
2290 'dependencies': {
2291 'key-id': {
2292 'properties': {
2293 'crypto': {'pattern': 'aws:kms'}
2294 },
2295 'required': ['crypto']
2296 }
2297 }
2298 }
2299
2300 metrics = [
2301 ('Total Keys', {'Scope': 'Account'}),
2302 ('Unencrypted', {'Scope': 'Account'})]
2303
2304 def __init__(self, data, manager=None):
2305 super(EncryptExtantKeys, self).__init__(data, manager)
2306 self.kms_id = self.data.get('key-id')
2307
2308 def get_permissions(self):
2309 perms = ("s3:GetObject", "s3:GetObjectVersion")
2310 if self.data.get('report-only'):
2311 perms += ('s3:DeleteObject', 's3:DeleteObjectVersion',
2312 's3:PutObject',
2313 's3:AbortMultipartUpload',
2314 's3:ListBucket',
2315 's3:ListBucketVersions')
2316 return perms
2317
2318 def process(self, buckets):
2319
2320 t = time.time()
2321 results = super(EncryptExtantKeys, self).process(buckets)
2322 run_time = time.time() - t
2323 remediated_count = object_count = 0
2324
2325 for r in results:
2326 object_count += r['Count']
2327 remediated_count += r['Remediated']
2328 self.manager.ctx.metrics.put_metric(
2329 "Unencrypted", r['Remediated'], "Count", Scope=r['Bucket'],
2330 buffer=True)
2331
2332 self.manager.ctx.metrics.put_metric(
2333 "Unencrypted", remediated_count, "Count", Scope="Account",
2334 buffer=True
2335 )
2336 self.manager.ctx.metrics.put_metric(
2337 "Total Keys", object_count, "Count", Scope="Account",
2338 buffer=True
2339 )
2340 self.manager.ctx.metrics.flush()
2341
2342 log.info(
2343 ("EncryptExtant Complete keys:%d "
2344 "remediated:%d rate:%0.2f/s time:%0.2fs"),
2345 object_count,
2346 remediated_count,
2347 float(object_count) / run_time if run_time else 0,
2348 run_time)
2349 return results
2350
2351 def process_chunk(self, batch, bucket):
2352 crypto_method = self.data.get('crypto', 'AES256')
2353 s3 = bucket_client(
2354 local_session(self.manager.session_factory), bucket,
2355 kms=(crypto_method == 'aws:kms'))
2356 b = bucket['Name']
2357 results = []
2358 key_processor = self.get_bucket_op(bucket, 'key_processor')
2359 for key in batch:
2360 r = key_processor(s3, key, b)
2361 if r:
2362 results.append(r)
2363 return results
2364
2365 def process_key(self, s3, key, bucket_name, info=None):
2366 k = key['Key']
2367 if info is None:
2368 info = s3.head_object(Bucket=bucket_name, Key=k)
2369
2370 # If the data is already encrypted with AES256 and this request is also
2371 # for AES256 then we don't need to do anything
2372 if info.get('ServerSideEncryption') == 'AES256' and not self.kms_id:
2373 return False
2374
2375 if info.get('ServerSideEncryption') == 'aws:kms':
2376 # If we're not looking for a specific key any key will do.
2377 if not self.kms_id:
2378 return False
2379 # If we're configured to use a specific key and the key matches
2380 # note this is not a strict equality match.
2381 if self.kms_id in info.get('SSEKMSKeyId', ''):
2382 return False
2383
2384 if self.data.get('report-only'):
2385 return k
2386
2387 storage_class = info.get('StorageClass', 'STANDARD')
2388
2389 if storage_class == 'GLACIER':
2390 if not self.data.get('glacier'):
2391 return False
2392 if 'Restore' not in info:
2393 # This takes multiple hours, we let the next c7n
2394 # run take care of followups.
2395 s3.restore_object(
2396 Bucket=bucket_name,
2397 Key=k,
2398 RestoreRequest={'Days': 30})
2399 return False
2400 elif not restore_complete(info['Restore']):
2401 return False
2402
2403 storage_class = 'STANDARD'
2404
2405 crypto_method = self.data.get('crypto', 'AES256')
2406 key_id = self.data.get('key-id')
2407 # Note on copy we lose individual object acl grants
2408 params = {'Bucket': bucket_name,
2409 'Key': k,
2410 'CopySource': "/%s/%s" % (bucket_name, k),
2411 'MetadataDirective': 'COPY',
2412 'StorageClass': storage_class,
2413 'ServerSideEncryption': crypto_method}
2414
2415 if key_id and crypto_method == 'aws:kms':
2416 params['SSEKMSKeyId'] = key_id
2417
2418 if info['ContentLength'] > MAX_COPY_SIZE and self.data.get(
2419 'large', True):
2420 return self.process_large_file(s3, bucket_name, key, info, params)
2421
2422 s3.copy_object(**params)
2423 return k
2424
2425 def process_version(self, s3, key, bucket_name):
2426 info = s3.head_object(
2427 Bucket=bucket_name,
2428 Key=key['Key'],
2429 VersionId=key['VersionId'])
2430
2431 if 'ServerSideEncryption' in info:
2432 return False
2433
2434 if self.data.get('report-only'):
2435 return key['Key'], key['VersionId']
2436
2437 if key['IsLatest']:
2438 r = self.process_key(s3, key, bucket_name, info)
2439 # Glacier request processing, wait till we have the restored object
2440 if not r:
2441 return r
2442 s3.delete_object(
2443 Bucket=bucket_name,
2444 Key=key['Key'],
2445 VersionId=key['VersionId'])
2446 return key['Key'], key['VersionId']
2447
2448 def process_large_file(self, s3, bucket_name, key, info, params):
2449 """For objects over 5gb, use multipart upload to copy"""
2450 part_size = MAX_COPY_SIZE - (1024 ** 2)
2451 num_parts = int(math.ceil(info['ContentLength'] / part_size))
2452 source = params.pop('CopySource')
2453
2454 params.pop('MetadataDirective')
2455 if 'Metadata' in info:
2456 params['Metadata'] = info['Metadata']
2457
2458 upload_id = s3.create_multipart_upload(**params)['UploadId']
2459
2460 params = {'Bucket': bucket_name,
2461 'Key': key['Key'],
2462 'UploadId': upload_id,
2463 'CopySource': source,
2464 'CopySourceIfMatch': info['ETag']}
2465
2466 def upload_part(part_num):
2467 part_params = dict(params)
2468 part_params['CopySourceRange'] = "bytes=%d-%d" % (
2469 part_size * (part_num - 1),
2470 min(part_size * part_num - 1, info['ContentLength'] - 1))
2471 part_params['PartNumber'] = part_num
2472 response = s3.upload_part_copy(**part_params)
2473 return {'ETag': response['CopyPartResult']['ETag'],
2474 'PartNumber': part_num}
2475
2476 try:
2477 with self.executor_factory(max_workers=2) as w:
2478 parts = list(w.map(upload_part, range(1, num_parts + 1)))
2479 except Exception:
2480 log.warning(
2481 "Error during large key copy bucket: %s key: %s, "
2482 "aborting upload", bucket_name, key, exc_info=True)
2483 s3.abort_multipart_upload(
2484 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id)
2485 raise
2486 s3.complete_multipart_upload(
2487 Bucket=bucket_name, Key=key['Key'], UploadId=upload_id,
2488 MultipartUpload={'Parts': parts})
2489 return key['Key']
2490
2491
2492def restore_complete(restore):
2493 if ',' in restore:
2494 ongoing, _ = restore.split(',', 1)
2495 else:
2496 ongoing = restore
2497 return 'false' in ongoing
2498
2499
2500@filters.register('is-log-target')
2501class LogTarget(Filter):
2502 """Filter and return buckets are log destinations.
2503
2504 Not suitable for use in lambda on large accounts, This is a api
2505 heavy process to detect scan all possible log sources.
2506
2507 Sources:
2508 - elb (Access Log)
2509 - s3 (Access Log)
2510 - cfn (Template writes)
2511 - cloudtrail
2512
2513 :example:
2514
2515 .. code-block:: yaml
2516
2517 policies:
2518 - name: s3-log-bucket
2519 resource: s3
2520 filters:
2521 - type: is-log-target
2522 """
2523
2524 schema = type_schema(
2525 'is-log-target',
2526 services={'type': 'array', 'items': {'enum': [
2527 's3', 'elb', 'cloudtrail']}},
2528 self={'type': 'boolean'},
2529 value={'type': 'boolean'})
2530
2531 def get_permissions(self):
2532 perms = self.manager.get_resource_manager('elb').get_permissions()
2533 perms += ('elasticloadbalancing:DescribeLoadBalancerAttributes',)
2534 return perms
2535
2536 def process(self, buckets, event=None):
2537 log_buckets = set()
2538 count = 0
2539
2540 services = self.data.get('services', ['elb', 's3', 'cloudtrail'])
2541 self_log = self.data.get('self', False)
2542
2543 if 'elb' in services and not self_log:
2544 for bucket, _ in self.get_elb_bucket_locations():
2545 log_buckets.add(bucket)
2546 count += 1
2547 self.log.debug("Found %d elb log targets" % count)
2548
2549 if 's3' in services:
2550 count = 0
2551 for bucket, _ in self.get_s3_bucket_locations(buckets, self_log):
2552 count += 1
2553 log_buckets.add(bucket)
2554 self.log.debug('Found %d s3 log targets' % count)
2555
2556 if 'cloudtrail' in services and not self_log:
2557 for bucket, _ in self.get_cloud_trail_locations(buckets):
2558 log_buckets.add(bucket)
2559
2560 self.log.info("Found %d log targets for %d buckets" % (
2561 len(log_buckets), len(buckets)))
2562 if self.data.get('value', True):
2563 return [b for b in buckets if b['Name'] in log_buckets]
2564 else:
2565 return [b for b in buckets if b['Name'] not in log_buckets]
2566
2567 @staticmethod
2568 def get_s3_bucket_locations(buckets, self_log=False):
2569 """return (bucket_name, prefix) for all s3 logging targets"""
2570 for b in buckets:
2571 if b.get('Logging'):
2572 if self_log:
2573 if b['Name'] != b['Logging']['TargetBucket']:
2574 continue
2575 yield (b['Logging']['TargetBucket'],
2576 b['Logging']['TargetPrefix'])
2577 if not self_log and b['Name'].startswith('cf-templates-'):
2578 yield (b['Name'], '')
2579
2580 def get_cloud_trail_locations(self, buckets):
2581 session = local_session(self.manager.session_factory)
2582 client = session.client('cloudtrail')
2583 names = {b['Name'] for b in buckets}
2584 for t in client.describe_trails().get('trailList', ()):
2585 if t.get('S3BucketName') in names:
2586 yield (t['S3BucketName'], t.get('S3KeyPrefix', ''))
2587
2588 def get_elb_bucket_locations(self):
2589 elbs = self.manager.get_resource_manager('elb').resources()
2590 get_elb_attrs = functools.partial(
2591 _query_elb_attrs, self.manager.session_factory)
2592
2593 with self.executor_factory(max_workers=2) as w:
2594 futures = []
2595 for elb_set in chunks(elbs, 100):
2596 futures.append(w.submit(get_elb_attrs, elb_set))
2597 for f in as_completed(futures):
2598 if f.exception():
2599 log.error("Error while scanning elb log targets: %s" % (
2600 f.exception()))
2601 continue
2602 for tgt in f.result():
2603 yield tgt
2604
2605
2606def _query_elb_attrs(session_factory, elb_set):
2607 session = local_session(session_factory)
2608 client = session.client('elb')
2609 log_targets = []
2610 for e in elb_set:
2611 try:
2612 attrs = client.describe_load_balancer_attributes(
2613 LoadBalancerName=e['LoadBalancerName'])[
2614 'LoadBalancerAttributes']
2615 if 'AccessLog' in attrs and attrs['AccessLog']['Enabled']:
2616 log_targets.append((
2617 attrs['AccessLog']['S3BucketName'],
2618 attrs['AccessLog']['S3BucketPrefix']))
2619 except Exception as err:
2620 log.warning(
2621 "Could not retrieve load balancer %s: %s" % (
2622 e['LoadBalancerName'], err))
2623 return log_targets
2624
2625
2626@actions.register('remove-website-hosting')
2627class RemoveWebsiteHosting(BucketActionBase):
2628 """Action that removes website hosting configuration."""
2629
2630 schema = type_schema('remove-website-hosting')
2631
2632 permissions = ('s3:DeleteBucketWebsite',)
2633
2634 def process(self, buckets):
2635 session = local_session(self.manager.session_factory)
2636 for bucket in buckets:
2637 client = bucket_client(session, bucket)
2638 client.delete_bucket_website(Bucket=bucket['Name'])
2639
2640
2641@actions.register('delete-global-grants')
2642class DeleteGlobalGrants(BucketActionBase):
2643 """Deletes global grants associated to a S3 bucket
2644
2645 :example:
2646
2647 .. code-block:: yaml
2648
2649 policies:
2650 - name: s3-delete-global-grants
2651 resource: s3
2652 filters:
2653 - type: global-grants
2654 actions:
2655 - delete-global-grants
2656 """
2657
2658 schema = type_schema(
2659 'delete-global-grants',
2660 grantees={'type': 'array', 'items': {'type': 'string'}})
2661
2662 permissions = ('s3:PutBucketAcl',)
2663
2664 def process(self, buckets):
2665 with self.executor_factory(max_workers=5) as w:
2666 return list(filter(None, list(w.map(self.process_bucket, buckets))))
2667
2668 def process_bucket(self, b):
2669 grantees = self.data.get(
2670 'grantees', [
2671 GlobalGrantsFilter.AUTH_ALL, GlobalGrantsFilter.GLOBAL_ALL])
2672
2673 log.info(b)
2674
2675 acl = b.get('Acl', {'Grants': []})
2676 if not acl or not acl['Grants']:
2677 return
2678 new_grants = []
2679 for grant in acl['Grants']:
2680 grantee = grant.get('Grantee', {})
2681 if not grantee:
2682 continue
2683 # Yuck, 'get_bucket_acl' doesn't return the grantee type.
2684 if 'URI' in grantee:
2685 grantee['Type'] = 'Group'
2686 else:
2687 grantee['Type'] = 'CanonicalUser'
2688 if ('URI' in grantee and
2689 grantee['URI'] in grantees and not
2690 (grant['Permission'] == 'READ' and b['Website'])):
2691 # Remove this grantee.
2692 pass
2693 else:
2694 new_grants.append(grant)
2695
2696 log.info({'Owner': acl['Owner'], 'Grants': new_grants})
2697
2698 c = bucket_client(self.manager.session_factory(), b)
2699 try:
2700 c.put_bucket_acl(
2701 Bucket=b['Name'],
2702 AccessControlPolicy={
2703 'Owner': acl['Owner'], 'Grants': new_grants})
2704 except ClientError as e:
2705 if e.response['Error']['Code'] == 'NoSuchBucket':
2706 return
2707 return b
2708
2709
2710@actions.register('tag')
2711class BucketTag(Tag):
2712 """Action to create tags on a S3 bucket
2713
2714 :example:
2715
2716 .. code-block:: yaml
2717
2718 policies:
2719 - name: s3-tag-region
2720 resource: s3
2721 region: us-east-1
2722 filters:
2723 - "tag:RegionName": absent
2724 actions:
2725 - type: tag
2726 key: RegionName
2727 value: us-east-1
2728 """
2729
2730 def process_resource_set(self, client, resource_set, tags):
2731 modify_bucket_tags(self.manager.session_factory, resource_set, tags)
2732
2733
2734@actions.register('mark-for-op')
2735class MarkBucketForOp(TagDelayedAction):
2736 """Action schedules custodian to perform an action at a certain date
2737
2738 :example:
2739
2740 .. code-block:: yaml
2741
2742 policies:
2743 - name: s3-encrypt
2744 resource: s3
2745 filters:
2746 - type: missing-statement
2747 statement_ids:
2748 - RequiredEncryptedPutObject
2749 actions:
2750 - type: mark-for-op
2751 op: attach-encrypt
2752 days: 7
2753 """
2754
2755 schema = type_schema(
2756 'mark-for-op', rinherit=TagDelayedAction.schema)
2757
2758
2759@actions.register('unmark')
2760@actions.register('remove-tag')
2761class RemoveBucketTag(RemoveTag):
2762 """Removes tag/tags from a S3 object
2763
2764 :example:
2765
2766 .. code-block:: yaml
2767
2768 policies:
2769 - name: s3-remove-owner-tag
2770 resource: s3
2771 filters:
2772 - "tag:BucketOwner": present
2773 actions:
2774 - type: remove-tag
2775 tags: ['BucketOwner']
2776 """
2777
2778 def process_resource_set(self, client, resource_set, tags):
2779 modify_bucket_tags(
2780 self.manager.session_factory, resource_set, remove_tags=tags)
2781
2782
2783@filters.register('data-events')
2784class DataEvents(Filter):
2785 """Find buckets for which CloudTrail is logging data events.
2786
2787 Note that this filter only examines trails that are defined in the
2788 current account.
2789 """
2790
2791 schema = type_schema('data-events', state={'enum': ['present', 'absent']})
2792 permissions = (
2793 'cloudtrail:DescribeTrails',
2794 'cloudtrail:GetEventSelectors')
2795
2796 def get_event_buckets(self, session, trails):
2797 """Return a mapping of bucket name to cloudtrail.
2798
2799 For wildcard trails the bucket name is ''.
2800 """
2801 regions = {t.get('HomeRegion') for t in trails}
2802 clients = {}
2803 for region in regions:
2804 clients[region] = session.client('cloudtrail', region_name=region)
2805
2806 event_buckets = {}
2807 for t in trails:
2808 for events in clients[t.get('HomeRegion')].get_event_selectors(
2809 TrailName=t['Name']).get('EventSelectors', ()):
2810 if 'DataResources' not in events:
2811 continue
2812 for data_events in events['DataResources']:
2813 if data_events['Type'] != 'AWS::S3::Object':
2814 continue
2815 for b in data_events['Values']:
2816 event_buckets[b.rsplit(':')[-1].strip('/')] = t['Name']
2817 return event_buckets
2818
2819 def process(self, resources, event=None):
2820 trails = self.manager.get_resource_manager('cloudtrail').resources()
2821 local_trails = self.filter_resources(
2822 trails,
2823 "split(':', TrailARN)[4]", (self.manager.account_id,)
2824 )
2825 session = local_session(self.manager.session_factory)
2826 event_buckets = self.get_event_buckets(session, local_trails)
2827 ops = {
2828 'present': lambda x: (
2829 x['Name'] in event_buckets or '' in event_buckets),
2830 'absent': (
2831 lambda x: x['Name'] not in event_buckets and ''
2832 not in event_buckets)}
2833
2834 op = ops[self.data.get('state', 'present')]
2835 results = []
2836 for b in resources:
2837 if op(b):
2838 results.append(b)
2839 return results
2840
2841
2842@filters.register('inventory')
2843class Inventory(ValueFilter):
2844 """Filter inventories for a bucket"""
2845 schema = type_schema('inventory', rinherit=ValueFilter.schema)
2846 schema_alias = False
2847 permissions = ('s3:GetInventoryConfiguration',)
2848
2849 def process(self, buckets, event=None):
2850 results = []
2851 with self.executor_factory(max_workers=2) as w:
2852 futures = {}
2853 for b in buckets:
2854 futures[w.submit(self.process_bucket, b)] = b
2855
2856 for f in as_completed(futures):
2857 b = futures[f]
2858 if f.exception():
2859 b.setdefault('c7n:DeniedMethods', []).append('GetInventoryConfiguration')
2860 self.log.error(
2861 "Error processing bucket: %s error: %s",
2862 b['Name'], f.exception())
2863 continue
2864 if f.result():
2865 results.append(b)
2866 return results
2867
2868 def process_bucket(self, b):
2869 if 'c7n:inventories' not in b:
2870 client = bucket_client(local_session(self.manager.session_factory), b)
2871 inventories = client.list_bucket_inventory_configurations(
2872 Bucket=b['Name']).get('InventoryConfigurationList', [])
2873 b['c7n:inventories'] = inventories
2874
2875 for i in b['c7n:inventories']:
2876 if self.match(i):
2877 return True
2878
2879
2880@actions.register('set-inventory')
2881class SetInventory(BucketActionBase):
2882 """Configure bucket inventories for an s3 bucket.
2883 """
2884 schema = type_schema(
2885 'set-inventory',
2886 required=['name', 'destination'],
2887 state={'enum': ['enabled', 'disabled', 'absent']},
2888 name={'type': 'string', 'description': 'Name of inventory'},
2889 destination={'type': 'string', 'description': 'Name of destination bucket'},
2890 prefix={'type': 'string', 'description': 'Destination prefix'},
2891 encryption={'enum': ['SSES3', 'SSEKMS']},
2892 key_id={'type': 'string', 'description': 'Optional Customer KMS KeyId for SSE-KMS'},
2893 versions={'enum': ['All', 'Current']},
2894 schedule={'enum': ['Daily', 'Weekly']},
2895 format={'enum': ['CSV', 'ORC', 'Parquet']},
2896 fields={'type': 'array', 'items': {'enum': [
2897 'Size', 'LastModifiedDate', 'StorageClass', 'ETag',
2898 'IsMultipartUploaded', 'ReplicationStatus', 'EncryptionStatus',
2899 'ObjectLockRetainUntilDate', 'ObjectLockMode', 'ObjectLockLegalHoldStatus',
2900 'IntelligentTieringAccessTier', 'BucketKeyStatus', 'ChecksumAlgorithm',
2901 'ObjectAccessControlList', 'ObjectOwner']}})
2902
2903 permissions = ('s3:PutInventoryConfiguration', 's3:GetInventoryConfiguration')
2904
2905 def process(self, buckets):
2906 with self.executor_factory(max_workers=2) as w:
2907 futures = {w.submit(self.process_bucket, bucket): bucket for bucket in buckets}
2908 for future in as_completed(futures):
2909 bucket = futures[future]
2910 try:
2911 future.result()
2912 except Exception as e:
2913 self.log.error('Message: %s Bucket: %s', e, bucket['Name'])
2914
2915 def process_bucket(self, b):
2916 inventory_name = self.data.get('name')
2917 destination = self.data.get('destination')
2918 prefix = self.data.get('prefix', '')
2919 schedule = self.data.get('schedule', 'Daily')
2920 fields = self.data.get('fields', ['LastModifiedDate', 'Size'])
2921 versions = self.data.get('versions', 'Current')
2922 state = self.data.get('state', 'enabled')
2923 encryption = self.data.get('encryption')
2924 inventory_format = self.data.get('format', 'CSV')
2925
2926 if not prefix:
2927 prefix = "Inventories/%s" % (self.manager.config.account_id)
2928
2929 client = bucket_client(local_session(self.manager.session_factory), b)
2930 if state == 'absent':
2931 try:
2932 client.delete_bucket_inventory_configuration(
2933 Bucket=b['Name'], Id=inventory_name)
2934 except ClientError as e:
2935 if e.response['Error']['Code'] != 'NoSuchConfiguration':
2936 raise
2937 return
2938
2939 bucket = {
2940 'Bucket': "arn:aws:s3:::%s" % destination,
2941 'Format': inventory_format
2942 }
2943
2944 inventory = {
2945 'Destination': {
2946 'S3BucketDestination': bucket
2947 },
2948 'IsEnabled': state == 'enabled' and True or False,
2949 'Id': inventory_name,
2950 'OptionalFields': fields,
2951 'IncludedObjectVersions': versions,
2952 'Schedule': {
2953 'Frequency': schedule
2954 }
2955 }
2956
2957 if prefix:
2958 bucket['Prefix'] = prefix
2959
2960 if encryption:
2961 bucket['Encryption'] = {encryption: {}}
2962 if encryption == 'SSEKMS' and self.data.get('key_id'):
2963 bucket['Encryption'] = {encryption: {
2964 'KeyId': self.data['key_id']
2965 }}
2966
2967 found = self.get_inventory_delta(client, inventory, b)
2968 if found:
2969 return
2970 if found is False:
2971 self.log.debug("updating bucket:%s inventory configuration id:%s",
2972 b['Name'], inventory_name)
2973 client.put_bucket_inventory_configuration(
2974 Bucket=b['Name'], Id=inventory_name, InventoryConfiguration=inventory)
2975
2976 def get_inventory_delta(self, client, inventory, b):
2977 inventories = client.list_bucket_inventory_configurations(Bucket=b['Name'])
2978 found = None
2979 for i in inventories.get('InventoryConfigurationList', []):
2980 if i['Id'] != inventory['Id']:
2981 continue
2982 found = True
2983 for k, v in inventory.items():
2984 if k not in i:
2985 found = False
2986 continue
2987 if isinstance(v, list):
2988 v.sort()
2989 i[k].sort()
2990 if i[k] != v:
2991 found = False
2992 return found
2993
2994
2995@filters.register('intelligent-tiering')
2996class IntelligentTiering(ListItemFilter):
2997 """Filter for S3 buckets to look at intelligent tiering configurations
2998
2999 The schema to supply to the attrs follows the schema here:
3000 https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_bucket_intelligent_tiering_configurations.html
3001
3002 :example:
3003
3004 .. code-block:: yaml
3005
3006 policies:
3007 - name: s3-intelligent-tiering-configuration
3008 resource: s3
3009 filters:
3010 - type: intelligent-tiering
3011 attrs:
3012 - Status: Enabled
3013 - Filter:
3014 And:
3015 Prefix: test
3016 Tags:
3017 - Key: Owner
3018 Value: c7n
3019 - Tierings:
3020 - Days: 100
3021 - AccessTier: ARCHIVE_ACCESS
3022
3023 """
3024 schema = type_schema(
3025 'intelligent-tiering',
3026 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'},
3027 count={'type': 'number'},
3028 count_op={'$ref': '#/definitions/filters_common/comparison_operators'}
3029 )
3030 permissions = ('s3:GetIntelligentTieringConfiguration',)
3031 annotation_key = "c7n:IntelligentTiering"
3032 annotate_items = True
3033
3034 def __init__(self, data, manager=None):
3035 super().__init__(data, manager)
3036 self.data['key'] = self.annotation_key
3037
3038 def process(self, buckets, event=None):
3039 with self.executor_factory(max_workers=2) as w:
3040 futures = {w.submit(self.get_item_values, b): b for b in buckets}
3041 for future in as_completed(futures):
3042 b = futures[future]
3043 if future.exception():
3044 self.log.error("Message: %s Bucket: %s", future.exception(), b['Name'])
3045 continue
3046 return super().process(buckets, event)
3047
3048 def get_item_values(self, b):
3049 if self.annotation_key not in b:
3050 client = bucket_client(local_session(self.manager.session_factory), b)
3051 try:
3052 int_tier_config = client.list_bucket_intelligent_tiering_configurations(
3053 Bucket=b['Name'])
3054 b[self.annotation_key] = int_tier_config.get(
3055 'IntelligentTieringConfigurationList', [])
3056 except ClientError as e:
3057 if e.response['Error']['Code'] == 'AccessDenied':
3058 method = 'list_bucket_intelligent_tiering_configurations'
3059 log.warning(
3060 "Bucket:%s unable to invoke method:%s error:%s ",
3061 b['Name'], method, e.response['Error']['Message'])
3062 b.setdefault('c7n:DeniedMethods', []).append(method)
3063 return b.get(self.annotation_key)
3064
3065
3066@actions.register('set-intelligent-tiering')
3067class ConfigureIntelligentTiering(BucketActionBase):
3068 """Action applies an intelligent tiering configuration to a S3 bucket
3069
3070 The schema to supply to the configuration follows the schema here:
3071 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_bucket_intelligent_tiering_configuration.html
3072
3073 To delete a configuration, supply Status=delete with the either the Id or Id: matched
3074
3075 :example:
3076
3077 .. code-block:: yaml
3078
3079 policies:
3080 - name: s3-apply-intelligent-tiering-config
3081 resource: aws.s3
3082 filters:
3083 - not:
3084 - type: intelligent-tiering
3085 attrs:
3086 - Status: Enabled
3087 - Filter:
3088 And:
3089 Prefix: helloworld
3090 Tags:
3091 - Key: Hello
3092 Value: World
3093 - Tierings:
3094 - Days: 123
3095 AccessTier: ARCHIVE_ACCESS
3096 actions:
3097 - type: set-intelligent-tiering
3098 Id: c7n-default
3099 IntelligentTieringConfiguration:
3100 Id: c7n-default
3101 Status: Enabled
3102 Tierings:
3103 - Days: 149
3104 AccessTier: ARCHIVE_ACCESS
3105
3106 - name: s3-delete-intelligent-tiering-configuration
3107 resource: aws.s3
3108 filters:
3109 - type: intelligent-tiering
3110 attrs:
3111 - Status: Enabled
3112 - Id: test-config
3113 actions:
3114 - type: set-intelligent-tiering
3115 Id: test-config
3116 State: delete
3117
3118 - name: s3-delete-intelligent-tiering-matched-configs
3119 resource: aws.s3
3120 filters:
3121 - type: intelligent-tiering
3122 attrs:
3123 - Status: Enabled
3124 - Id: test-config
3125 actions:
3126 - type: set-intelligent-tiering
3127 Id: matched
3128 State: delete
3129
3130 """
3131
3132 annotation_key = 'c7n:ListItemMatches'
3133 shape = 'PutBucketIntelligentTieringConfigurationRequest'
3134 schema = {
3135 'type': 'object',
3136 'additionalProperties': False,
3137 'oneOf': [
3138 {'required': ['type', 'Id', 'IntelligentTieringConfiguration']},
3139 {'required': ['type', 'Id', 'State']}],
3140 'properties': {
3141 'type': {'enum': ['set-intelligent-tiering']},
3142 'Id': {'type': 'string'},
3143 # delete intelligent tier configurations via state: delete
3144 'State': {'type': 'string', 'enum': ['delete']},
3145 'IntelligentTieringConfiguration': {'type': 'object'}
3146 },
3147 }
3148
3149 permissions = ('s3:PutIntelligentTieringConfiguration',)
3150
3151 def validate(self):
3152 # You can have up to 1,000 S3 Intelligent-Tiering configurations per bucket.
3153 # Hence, always use it with a filter
3154 found = False
3155 for f in self.manager.iter_filters():
3156 if isinstance(f, IntelligentTiering):
3157 found = True
3158 break
3159 if not found:
3160 raise PolicyValidationError(
3161 '`set-intelligent-tiering` may only be used in '
3162 'conjunction with `intelligent-tiering` filter on %s' % (self.manager.data,))
3163 cfg = dict(self.data)
3164 if 'IntelligentTieringConfiguration' in cfg:
3165 cfg['Bucket'] = 'bucket'
3166 cfg.pop('type')
3167 return shape_validate(
3168 cfg, self.shape, self.manager.resource_type.service)
3169
3170 def process(self, buckets):
3171 with self.executor_factory(max_workers=3) as w:
3172 futures = {}
3173
3174 for b in buckets:
3175 futures[w.submit(self.process_bucket, b)] = b
3176
3177 for future in as_completed(futures):
3178 if future.exception():
3179 bucket = futures[future]
3180 self.log.error(
3181 'error modifying bucket intelligent tiering configuration: %s\n%s',
3182 bucket['Name'], future.exception())
3183 continue
3184
3185 def process_bucket(self, bucket):
3186 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3187
3188 if 'list_bucket_intelligent_tiering_configurations' in bucket.get(
3189 'c7n:DeniedMethods', []):
3190 log.warning("Access Denied Bucket:%s while reading intelligent tiering configurations"
3191 % bucket['Name'])
3192 return
3193
3194 if self.data.get('Id') and self.data.get('IntelligentTieringConfiguration'):
3195 try:
3196 s3.put_bucket_intelligent_tiering_configuration(
3197 Bucket=bucket['Name'], Id=self.data.get(
3198 'Id'), IntelligentTieringConfiguration=self.data.get(
3199 'IntelligentTieringConfiguration'))
3200 except ClientError as e:
3201 if e.response['Error']['Code'] == 'AccessDenied':
3202 log.warning(
3203 "Access Denied Bucket:%s while applying intelligent tiering configuration"
3204 % bucket['Name'])
3205 if self.data.get('State'):
3206 if self.data.get('Id') == 'matched':
3207 for config in bucket.get(self.annotation_key):
3208 self.delete_intelligent_tiering_configurations(s3, config.get('Id'), bucket)
3209 else:
3210 self.delete_intelligent_tiering_configurations(s3, self.data.get('Id'), bucket)
3211
3212 def delete_intelligent_tiering_configurations(self, s3_client, id, bucket):
3213 try:
3214 s3_client.delete_bucket_intelligent_tiering_configuration(Bucket=bucket['Name'], Id=id)
3215 except ClientError as e:
3216 if e.response['Error']['Code'] == 'AccessDenied':
3217 log.warning(
3218 "Access Denied Bucket:%s while deleting intelligent tiering configuration"
3219 % bucket['Name'])
3220 elif e.response['Error']['Code'] == 'NoSuchConfiguration':
3221 log.warning(
3222 "No such configuration found:%s while deleting intelligent tiering configuration"
3223 % bucket['Name'])
3224
3225
3226@actions.register('delete')
3227class DeleteBucket(ScanBucket):
3228 """Action deletes a S3 bucket
3229
3230 :example:
3231
3232 .. code-block:: yaml
3233
3234 policies:
3235 - name: delete-unencrypted-buckets
3236 resource: s3
3237 filters:
3238 - type: missing-statement
3239 statement_ids:
3240 - RequiredEncryptedPutObject
3241 actions:
3242 - type: delete
3243 remove-contents: true
3244 """
3245
3246 schema = type_schema('delete', **{'remove-contents': {'type': 'boolean'}})
3247
3248 permissions = ('s3:*',)
3249
3250 bucket_ops = {
3251 'standard': {
3252 'iterator': 'list_objects',
3253 'contents_key': ['Contents'],
3254 'key_processor': 'process_key'
3255 },
3256 'versioned': {
3257 'iterator': 'list_object_versions',
3258 'contents_key': ['Versions', 'DeleteMarkers'],
3259 'key_processor': 'process_version'
3260 }
3261 }
3262
3263 def process_delete_enablement(self, b):
3264 """Prep a bucket for deletion.
3265
3266 Clear out any pending multi-part uploads.
3267
3268 Disable versioning on the bucket, so deletes don't
3269 generate fresh deletion markers.
3270 """
3271 client = bucket_client(
3272 local_session(self.manager.session_factory), b)
3273
3274 # Stop replication so we can suspend versioning
3275 if b.get('Replication') is not None:
3276 client.delete_bucket_replication(Bucket=b['Name'])
3277
3278 # Suspend versioning, so we don't get new delete markers
3279 # as we walk and delete versions
3280 if (self.get_bucket_style(b) == 'versioned' and b['Versioning']['Status'] == 'Enabled' and
3281 self.data.get('remove-contents', True)):
3282 client.put_bucket_versioning(
3283 Bucket=b['Name'],
3284 VersioningConfiguration={'Status': 'Suspended'})
3285
3286 # Clear our multi-part uploads
3287 uploads = client.get_paginator('list_multipart_uploads')
3288 for p in uploads.paginate(Bucket=b['Name']):
3289 for u in p.get('Uploads', ()):
3290 client.abort_multipart_upload(
3291 Bucket=b['Name'],
3292 Key=u['Key'],
3293 UploadId=u['UploadId'])
3294
3295 def process(self, buckets):
3296 # might be worth sanity checking all our permissions
3297 # on the bucket up front before disabling versioning/replication.
3298 if self.data.get('remove-contents', True):
3299 self._process_with_futures(self.process_delete_enablement, buckets)
3300 self.empty_buckets(buckets)
3301
3302 results = self._process_with_futures(self.delete_bucket, buckets)
3303 self.write_denied_buckets_file()
3304 return results
3305
3306 def delete_bucket(self, b):
3307 s3 = bucket_client(self.manager.session_factory(), b)
3308 try:
3309 self._run_api(s3.delete_bucket, Bucket=b['Name'])
3310 except ClientError as e:
3311 if e.response['Error']['Code'] == 'BucketNotEmpty':
3312 self.log.error(
3313 "Error while deleting bucket %s, bucket not empty" % (
3314 b['Name']))
3315 else:
3316 raise e
3317
3318 def empty_buckets(self, buckets):
3319 t = time.time()
3320 results = super(DeleteBucket, self).process(buckets)
3321 run_time = time.time() - t
3322 object_count = 0
3323
3324 for r in results:
3325 object_count += r['Count']
3326 self.manager.ctx.metrics.put_metric(
3327 "Total Keys", object_count, "Count", Scope=r['Bucket'],
3328 buffer=True)
3329 self.manager.ctx.metrics.put_metric(
3330 "Total Keys", object_count, "Count", Scope="Account", buffer=True)
3331 self.manager.ctx.metrics.flush()
3332
3333 log.info(
3334 "EmptyBucket buckets:%d Complete keys:%d rate:%0.2f/s time:%0.2fs",
3335 len(buckets), object_count,
3336 float(object_count) / run_time if run_time else 0, run_time)
3337 return results
3338
3339 def process_chunk(self, batch, bucket):
3340 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3341 objects = []
3342 for key in batch:
3343 obj = {'Key': key['Key']}
3344 if 'VersionId' in key:
3345 obj['VersionId'] = key['VersionId']
3346 objects.append(obj)
3347 results = s3.delete_objects(
3348 Bucket=bucket['Name'], Delete={'Objects': objects}).get('Deleted', ())
3349 if self.get_bucket_style(bucket) != 'versioned':
3350 return results
3351
3352
3353@actions.register('configure-lifecycle')
3354class Lifecycle(BucketActionBase):
3355 """Action applies a lifecycle policy to versioned S3 buckets
3356
3357 The schema to supply to the rule follows the schema here:
3358 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_bucket_lifecycle_configuration
3359
3360 To delete a lifecycle rule, supply Status=absent
3361
3362 :example:
3363
3364 .. code-block:: yaml
3365
3366 policies:
3367 - name: s3-apply-lifecycle
3368 resource: s3
3369 actions:
3370 - type: configure-lifecycle
3371 rules:
3372 - ID: my-lifecycle-id
3373 Status: Enabled
3374 Prefix: foo/
3375 Transitions:
3376 - Days: 60
3377 StorageClass: GLACIER
3378
3379 """
3380
3381 schema = type_schema(
3382 'configure-lifecycle',
3383 **{
3384 'rules': {
3385 'type': 'array',
3386 'items': {
3387 'type': 'object',
3388 'required': ['ID', 'Status'],
3389 'additionalProperties': False,
3390 'properties': {
3391 'ID': {'type': 'string'},
3392 # c7n intercepts `absent`
3393 'Status': {'enum': ['Enabled', 'Disabled', 'absent']},
3394 'Prefix': {'type': 'string'},
3395 'Expiration': {
3396 'type': 'object',
3397 'additionalProperties': False,
3398 'properties': {
3399 'Date': {'type': 'string'}, # Date
3400 'Days': {'type': 'integer'},
3401 'ExpiredObjectDeleteMarker': {'type': 'boolean'},
3402 },
3403 },
3404 'Filter': {
3405 'type': 'object',
3406 'minProperties': 1,
3407 'maxProperties': 1,
3408 'additionalProperties': False,
3409 'properties': {
3410 'Prefix': {'type': 'string'},
3411 'ObjectSizeGreaterThan': {'type': 'integer'},
3412 'ObjectSizeLessThan': {'type': 'integer'},
3413 'Tag': {
3414 'type': 'object',
3415 'required': ['Key', 'Value'],
3416 'additionalProperties': False,
3417 'properties': {
3418 'Key': {'type': 'string'},
3419 'Value': {'type': 'string'},
3420 },
3421 },
3422 'And': {
3423 'type': 'object',
3424 'additionalProperties': False,
3425 'properties': {
3426 'Prefix': {'type': 'string'},
3427 'ObjectSizeGreaterThan': {'type': 'integer'},
3428 'ObjectSizeLessThan': {'type': 'integer'},
3429 'Tags': {
3430 'type': 'array',
3431 'items': {
3432 'type': 'object',
3433 'required': ['Key', 'Value'],
3434 'additionalProperties': False,
3435 'properties': {
3436 'Key': {'type': 'string'},
3437 'Value': {'type': 'string'},
3438 },
3439 },
3440 },
3441 },
3442 },
3443 },
3444 },
3445 'Transitions': {
3446 'type': 'array',
3447 'items': {
3448 'type': 'object',
3449 'additionalProperties': False,
3450 'properties': {
3451 'Date': {'type': 'string'}, # Date
3452 'Days': {'type': 'integer'},
3453 'StorageClass': {'type': 'string'},
3454 },
3455 },
3456 },
3457 'NoncurrentVersionTransitions': {
3458 'type': 'array',
3459 'items': {
3460 'type': 'object',
3461 'additionalProperties': False,
3462 'properties': {
3463 'NoncurrentDays': {'type': 'integer'},
3464 'NewerNoncurrentVersions': {'type': 'integer'},
3465 'StorageClass': {'type': 'string'},
3466 },
3467 },
3468 },
3469 'NoncurrentVersionExpiration': {
3470 'type': 'object',
3471 'additionalProperties': False,
3472 'properties': {
3473 'NoncurrentDays': {'type': 'integer'},
3474 'NewerNoncurrentVersions': {'type': 'integer'}
3475 },
3476 },
3477 'AbortIncompleteMultipartUpload': {
3478 'type': 'object',
3479 'additionalProperties': False,
3480 'properties': {
3481 'DaysAfterInitiation': {'type': 'integer'},
3482 },
3483 },
3484 },
3485 },
3486 },
3487 }
3488 )
3489
3490 permissions = ('s3:GetLifecycleConfiguration', 's3:PutLifecycleConfiguration')
3491
3492 def process(self, buckets):
3493 with self.executor_factory(max_workers=3) as w:
3494 futures = {}
3495 results = []
3496
3497 for b in buckets:
3498 futures[w.submit(self.process_bucket, b)] = b
3499
3500 for future in as_completed(futures):
3501 if future.exception():
3502 bucket = futures[future]
3503 self.log.error('error modifying bucket lifecycle: %s\n%s',
3504 bucket['Name'], future.exception())
3505 results += filter(None, [future.result()])
3506
3507 return results
3508
3509 def process_bucket(self, bucket):
3510 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3511
3512 if 'get_bucket_lifecycle_configuration' in bucket.get('c7n:DeniedMethods', []):
3513 log.warning("Access Denied Bucket:%s while reading lifecycle" % bucket['Name'])
3514 return
3515
3516 # Adjust the existing lifecycle by adding/deleting/overwriting rules as necessary
3517 config = (bucket.get('Lifecycle') or {}).get('Rules', [])
3518 for rule in self.data['rules']:
3519 for index, existing_rule in enumerate(config):
3520 if not existing_rule:
3521 continue
3522 if rule['ID'] == existing_rule['ID']:
3523 if rule['Status'] == 'absent':
3524 config[index] = None
3525 else:
3526 config[index] = rule
3527 break
3528 else:
3529 if rule['Status'] != 'absent':
3530 config.append(rule)
3531
3532 # The extra `list` conversion is required for python3
3533 config = list(filter(None, config))
3534
3535 try:
3536 if not config:
3537 s3.delete_bucket_lifecycle(Bucket=bucket['Name'])
3538 else:
3539 s3.put_bucket_lifecycle_configuration(
3540 Bucket=bucket['Name'], LifecycleConfiguration={'Rules': config})
3541 except ClientError as e:
3542 if e.response['Error']['Code'] == 'AccessDenied':
3543 log.warning("Access Denied Bucket:%s while applying lifecycle" % bucket['Name'])
3544 else:
3545 raise e
3546
3547
3548class KMSKeyResolverMixin:
3549 """Builds a dictionary of region specific ARNs"""
3550
3551 def __init__(self, data, manager=None):
3552 self.arns = dict()
3553 self.data = data
3554 self.manager = manager
3555
3556 def resolve_keys(self, buckets):
3557 key = self.data.get('key')
3558 if not key:
3559 return None
3560
3561 regions = {get_region(b) for b in buckets}
3562 for r in regions:
3563 client = local_session(self.manager.session_factory).client('kms', region_name=r)
3564 try:
3565 key_meta = client.describe_key(
3566 KeyId=key
3567 ).get('KeyMetadata', {})
3568 key_id = key_meta.get('KeyId')
3569
3570 # We need a complete set of alias identifiers (names and ARNs)
3571 # to fully evaluate bucket encryption filters.
3572 key_aliases = client.list_aliases(
3573 KeyId=key_id
3574 ).get('Aliases', [])
3575
3576 self.arns[r] = {
3577 'KeyId': key_id,
3578 'Arn': key_meta.get('Arn'),
3579 'KeyManager': key_meta.get('KeyManager'),
3580 'Description': key_meta.get('Description'),
3581 'Aliases': [
3582 alias[attr]
3583 for alias in key_aliases
3584 for attr in ('AliasArn', 'AliasName')
3585 ],
3586 }
3587
3588 except ClientError as e:
3589 self.log.error('Error resolving kms ARNs for set-bucket-encryption: %s key: %s' % (
3590 e, self.data.get('key')))
3591
3592 def get_key(self, bucket):
3593 if 'key' not in self.data:
3594 return None
3595 region = get_region(bucket)
3596 key = self.arns.get(region)
3597 if not key:
3598 self.log.warning('Unable to resolve key %s for bucket %s in region %s',
3599 self.data['key'], bucket.get('Name'), region)
3600 return key
3601
3602
3603@filters.register('bucket-encryption')
3604class BucketEncryption(KMSKeyResolverMixin, Filter):
3605 """Filters for S3 buckets that have bucket-encryption
3606
3607 :example
3608
3609 .. code-block:: yaml
3610
3611 policies:
3612 - name: s3-bucket-encryption-AES256
3613 resource: s3
3614 region: us-east-1
3615 filters:
3616 - type: bucket-encryption
3617 state: True
3618 crypto: AES256
3619 - name: s3-bucket-encryption-KMS
3620 resource: s3
3621 region: us-east-1
3622 filters:
3623 - type: bucket-encryption
3624 state: True
3625 crypto: aws:kms
3626 key: alias/some/alias/key
3627 - name: s3-bucket-encryption-off
3628 resource: s3
3629 region: us-east-1
3630 filters:
3631 - type: bucket-encryption
3632 state: False
3633 - name: s3-bucket-test-bucket-key-enabled
3634 resource: s3
3635 region: us-east-1
3636 filters:
3637 - type: bucket-encryption
3638 bucket_key_enabled: True
3639 """
3640 schema = type_schema('bucket-encryption',
3641 state={'type': 'boolean'},
3642 crypto={'type': 'string', 'enum': ['AES256', 'aws:kms']},
3643 key={'type': 'string'},
3644 bucket_key_enabled={'type': 'boolean'})
3645
3646 permissions = ('s3:GetEncryptionConfiguration', 'kms:DescribeKey', 'kms:ListAliases')
3647 annotation_key = 'c7n:bucket-encryption'
3648
3649 def validate(self):
3650 if self.data.get('bucket_key_enabled') is not None and self.data.get('key') is not None:
3651 raise PolicyValidationError(
3652 f'key and bucket_key_enabled attributes cannot both be set: {self.data}'
3653 )
3654
3655 def process(self, buckets, event=None):
3656 self.resolve_keys(buckets)
3657 results = []
3658 with self.executor_factory(max_workers=2) as w:
3659 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3660 for future in as_completed(futures):
3661 b = futures[future]
3662 if future.exception():
3663 self.log.error("Message: %s Bucket: %s", future.exception(),
3664 b['Name'])
3665 continue
3666 if future.result():
3667 results.append(b)
3668 return results
3669
3670 def process_bucket(self, b):
3671
3672 client = bucket_client(local_session(self.manager.session_factory), b)
3673 rules = []
3674 if self.annotation_key not in b:
3675 try:
3676 be = client.get_bucket_encryption(Bucket=b['Name'])
3677 be.pop('ResponseMetadata', None)
3678 except ClientError as e:
3679 if e.response['Error']['Code'] != 'ServerSideEncryptionConfigurationNotFoundError':
3680 raise
3681 be = {}
3682 b[self.annotation_key] = be
3683 else:
3684 be = b[self.annotation_key]
3685
3686 rules = be.get('ServerSideEncryptionConfiguration', {}).get('Rules', [])
3687 # default `state` to True as previous impl assumed state == True
3688 # to preserve backwards compatibility
3689 if self.data.get('bucket_key_enabled'):
3690 for rule in rules:
3691 return self.filter_bucket_key_enabled(rule)
3692 elif self.data.get('bucket_key_enabled') is False:
3693 for rule in rules:
3694 return not self.filter_bucket_key_enabled(rule)
3695
3696 if self.data.get('state', True):
3697 for sse in rules:
3698 return self.filter_bucket(b, sse)
3699 return False
3700 else:
3701 for sse in rules:
3702 return not self.filter_bucket(b, sse)
3703 return True
3704
3705 def filter_bucket(self, b, sse):
3706 allowed = ['AES256', 'aws:kms']
3707 key = self.get_key(b)
3708 crypto = self.data.get('crypto')
3709 rule = sse.get('ApplyServerSideEncryptionByDefault')
3710
3711 if not rule:
3712 return False
3713 algo = rule.get('SSEAlgorithm')
3714
3715 if not crypto and algo in allowed:
3716 return True
3717
3718 if crypto == 'AES256' and algo == 'AES256':
3719 return True
3720 elif crypto == 'aws:kms' and algo == 'aws:kms':
3721 if not key:
3722 # There are two broad reasons to have an empty value for
3723 # the regional key here:
3724 #
3725 # * The policy did not specify a key, in which case this
3726 # filter should match _all_ buckets with a KMS default
3727 # encryption rule.
3728 #
3729 # * The policy specified a key that could not be
3730 # resolved, in which case this filter shouldn't match
3731 # any buckets.
3732 return 'key' not in self.data
3733
3734 # The default encryption rule can specify a key ID,
3735 # key ARN, alias name or alias ARN. Match against any of
3736 # those attributes. A rule specifying KMS with no master key
3737 # implies the AWS-managed key.
3738 key_ids = {key.get('Arn'), key.get('KeyId'), *key['Aliases']}
3739 return rule.get('KMSMasterKeyID', 'alias/aws/s3') in key_ids
3740
3741 def filter_bucket_key_enabled(self, rule) -> bool:
3742 if not rule:
3743 return False
3744 return rule.get('BucketKeyEnabled')
3745
3746
3747@actions.register('set-bucket-encryption')
3748class SetBucketEncryption(KMSKeyResolverMixin, BucketActionBase):
3749 """Action enables default encryption on S3 buckets
3750
3751 `enabled`: boolean Optional: Defaults to True
3752
3753 `crypto`: aws:kms | AES256` Optional: Defaults to AES256
3754
3755 `key`: arn, alias, or kms id key
3756
3757 `bucket-key`: boolean Optional:
3758 Defaults to True.
3759 Reduces amount of API traffic from Amazon S3 to KMS and can reduce KMS request
3760 costsby up to 99 percent. Requires kms:Decrypt permissions for copy and upload
3761 on the AWS KMS Key Policy.
3762
3763 Bucket Key Docs: https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucket-key.html
3764
3765 :example:
3766
3767 .. code-block:: yaml
3768
3769 policies:
3770 - name: s3-enable-default-encryption-kms
3771 resource: s3
3772 actions:
3773 - type: set-bucket-encryption
3774 # enabled: true <------ optional (true by default)
3775 crypto: aws:kms
3776 key: 1234abcd-12ab-34cd-56ef-1234567890ab
3777 bucket-key: true
3778
3779 - name: s3-enable-default-encryption-kms-alias
3780 resource: s3
3781 actions:
3782 - type: set-bucket-encryption
3783 # enabled: true <------ optional (true by default)
3784 crypto: aws:kms
3785 key: alias/some/alias/key
3786 bucket-key: true
3787
3788 - name: s3-enable-default-encryption-aes256
3789 resource: s3
3790 actions:
3791 - type: set-bucket-encryption
3792 # bucket-key: true <--- optional (true by default for AWS SSE)
3793 # crypto: AES256 <----- optional (AES256 by default)
3794 # enabled: true <------ optional (true by default)
3795
3796 - name: s3-disable-default-encryption
3797 resource: s3
3798 actions:
3799 - type: set-bucket-encryption
3800 enabled: false
3801 """
3802
3803 schema = {
3804 'type': 'object',
3805 'additionalProperties': False,
3806 'properties': {
3807 'type': {'enum': ['set-bucket-encryption']},
3808 'enabled': {'type': 'boolean'},
3809 'crypto': {'enum': ['aws:kms', 'AES256']},
3810 'key': {'type': 'string'},
3811 'bucket-key': {'type': 'boolean'}
3812 },
3813 'dependencies': {
3814 'key': {
3815 'properties': {
3816 'crypto': {'pattern': 'aws:kms'}
3817 },
3818 'required': ['crypto']
3819 }
3820 }
3821 }
3822
3823 permissions = ('s3:PutEncryptionConfiguration', 's3:GetEncryptionConfiguration',
3824 'kms:ListAliases', 'kms:DescribeKey')
3825
3826 def process(self, buckets):
3827 if self.data.get('enabled', True):
3828 self.resolve_keys(buckets)
3829
3830 with self.executor_factory(max_workers=3) as w:
3831 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3832 for future in as_completed(futures):
3833 if future.exception():
3834 self.log.error('Message: %s Bucket: %s', future.exception(),
3835 futures[future]['Name'])
3836
3837 def process_bucket(self, bucket):
3838 default_key_desc = 'Default master key that protects my S3 objects when no other key is defined' # noqa
3839 s3 = bucket_client(local_session(self.manager.session_factory), bucket)
3840 if not self.data.get('enabled', True):
3841 s3.delete_bucket_encryption(Bucket=bucket['Name'])
3842 return
3843 algo = self.data.get('crypto', 'AES256')
3844
3845 # bucket key defaults to True for alias/aws/s3 and AES256 (Amazon SSE)
3846 # and ignores False values for that crypto
3847 bucket_key = self.data.get('bucket-key', True)
3848 config = {
3849 'Rules': [
3850 {
3851 'ApplyServerSideEncryptionByDefault': {
3852 'SSEAlgorithm': algo,
3853 },
3854 'BucketKeyEnabled': bucket_key
3855 }
3856 ]
3857 }
3858
3859 if algo == 'aws:kms':
3860 key = self.get_key(bucket)
3861 if not key:
3862 raise Exception('Valid KMS Key required but does not exist')
3863
3864 config['Rules'][0]['ApplyServerSideEncryptionByDefault']['KMSMasterKeyID'] = key['Arn']
3865 s3.put_bucket_encryption(
3866 Bucket=bucket['Name'],
3867 ServerSideEncryptionConfiguration=config
3868 )
3869
3870
3871OWNERSHIP_CONTROLS = ['BucketOwnerEnforced', 'BucketOwnerPreferred', 'ObjectWriter']
3872VALUE_FILTER_MAGIC_VALUES = ['absent', 'present', 'not-null', 'empty']
3873
3874
3875@filters.register('ownership')
3876class BucketOwnershipControls(BucketFilterBase, ValueFilter):
3877 """Filter for object ownership controls
3878
3879 Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html
3880
3881 :example
3882
3883 Find buckets with ACLs disabled
3884
3885 .. code-block:: yaml
3886
3887 policies:
3888 - name: s3-bucket-acls-disabled
3889 resource: aws.s3
3890 region: us-east-1
3891 filters:
3892 - type: ownership
3893 value: BucketOwnerEnforced
3894
3895 :example
3896
3897 Find buckets with object ownership preferred or enforced
3898
3899 .. code-block:: yaml
3900
3901 policies:
3902 - name: s3-bucket-ownership-preferred
3903 resource: aws.s3
3904 region: us-east-1
3905 filters:
3906 - type: ownership
3907 op: in
3908 value:
3909 - BucketOwnerEnforced
3910 - BucketOwnerPreferred
3911
3912 :example
3913
3914 Find buckets with no object ownership controls
3915
3916 .. code-block:: yaml
3917
3918 policies:
3919 - name: s3-bucket-no-ownership-controls
3920 resource: aws.s3
3921 region: us-east-1
3922 filters:
3923 - type: ownership
3924 value: empty
3925 """
3926 schema = type_schema('ownership', rinherit=ValueFilter.schema, value={'oneOf': [
3927 {'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES},
3928 {'type': 'array', 'items': {
3929 'type': 'string', 'enum': OWNERSHIP_CONTROLS + VALUE_FILTER_MAGIC_VALUES}}]})
3930 permissions = ('s3:GetBucketOwnershipControls',)
3931 annotation_key = 'c7n:ownership'
3932
3933 def __init__(self, data, manager=None):
3934 super(BucketOwnershipControls, self).__init__(data, manager)
3935
3936 # Ownership controls appear as an array of rules. There can only be one
3937 # ObjectOwnership rule defined for a bucket, so we can automatically
3938 # match against that if it exists.
3939 self.data['key'] = f'("{self.annotation_key}".Rules[].ObjectOwnership)[0]'
3940
3941 def process(self, buckets, event=None):
3942 with self.executor_factory(max_workers=2) as w:
3943 futures = {w.submit(self.process_bucket, b): b for b in buckets}
3944 for future in as_completed(futures):
3945 b = futures[future]
3946 if future.exception():
3947 self.log.error("Message: %s Bucket: %s", future.exception(),
3948 b['Name'])
3949 continue
3950 return super(BucketOwnershipControls, self).process(buckets, event)
3951
3952 def process_bucket(self, b):
3953 if self.annotation_key in b:
3954 return
3955 client = bucket_client(local_session(self.manager.session_factory), b)
3956 try:
3957 controls = client.get_bucket_ownership_controls(Bucket=b['Name'])
3958 controls.pop('ResponseMetadata', None)
3959 except ClientError as e:
3960 if e.response['Error']['Code'] != 'OwnershipControlsNotFoundError':
3961 raise
3962 controls = {}
3963 b[self.annotation_key] = controls.get('OwnershipControls')
3964
3965
3966@filters.register('bucket-replication')
3967class BucketReplication(ListItemFilter):
3968 """Filter for S3 buckets to look at bucket replication configurations
3969
3970 The schema to supply to the attrs follows the schema here:
3971 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_bucket_replication.html
3972
3973 :example:
3974
3975 .. code-block:: yaml
3976
3977 policies:
3978 - name: s3-bucket-replication
3979 resource: s3
3980 filters:
3981 - type: bucket-replication
3982 attrs:
3983 - Status: Enabled
3984 - Filter:
3985 And:
3986 Prefix: test
3987 Tags:
3988 - Key: Owner
3989 Value: c7n
3990 - ExistingObjectReplication: Enabled
3991
3992 """
3993 schema = type_schema(
3994 'bucket-replication',
3995 attrs={'$ref': '#/definitions/filters_common/list_item_attrs'},
3996 count={'type': 'number'},
3997 count_op={'$ref': '#/definitions/filters_common/comparison_operators'}
3998 )
3999
4000 permissions = ("s3:GetReplicationConfiguration",)
4001 annotation_key = 'Replication'
4002 annotate_items = True
4003
4004 def __init__(self, data, manager=None):
4005 super().__init__(data, manager)
4006 self.data['key'] = self.annotation_key
4007
4008 def get_item_values(self, b):
4009 client = bucket_client(local_session(self.manager.session_factory), b)
4010 # replication configuration is called in S3_AUGMENT_TABLE:
4011 bucket_replication = b.get(self.annotation_key)
4012
4013 rules = []
4014 if bucket_replication is not None:
4015 rules = bucket_replication.get('ReplicationConfiguration', {}).get('Rules', [])
4016 for replication in rules:
4017 self.augment_bucket_replication(b, replication, client)
4018
4019 return rules
4020
4021 def augment_bucket_replication(self, b, replication, client):
4022 destination_bucket = replication.get('Destination').get('Bucket').split(':')[5]
4023 try:
4024 destination_region = inspect_bucket_region(destination_bucket, client.meta.endpoint_url)
4025 except ValueError:
4026 replication['DestinationBucketAvailable'] = False
4027 return
4028 source_region = get_region(b)
4029 replication['DestinationBucketAvailable'] = True
4030 replication['DestinationRegion'] = destination_region
4031 replication['CrossRegion'] = destination_region != source_region
4032
4033
4034@resources.register('s3-directory')
4035class S3Directory(query.QueryResourceManager):
4036
4037 class resource_type(query.TypeInfo):
4038 service = 's3'
4039 permission_prefix = "s3express"
4040 arn_service = "s3express"
4041 arn_type = 'bucket'
4042 enum_spec = ('list_directory_buckets', 'Buckets[]', None)
4043 name = id = 'Name'
4044 date = 'CreationDate'
4045 dimension = 'BucketName'
4046 cfn_type = 'AWS::S3Express::DirectoryBucket'
4047 permissions_enum = ("s3express:ListAllMyDirectoryBuckets",)