Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/iamanalyzer.py: 34%
56 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
5from .core import ValueFilter
7from c7n.exceptions import PolicyExecutionError
8from c7n.manager import resources
9from c7n.utils import type_schema, local_session, chunks
12class AccessAnalyzer(ValueFilter):
13 """Analyze resource access policies using AWS IAM Access Analyzer.
15 Access analyzer uses logic based reasoning to analyze embedded resource
16 iam access policies to determine access outside of a zone of trust.
18 .. code-block:: yaml
20 policies:
21 - name: s3-check
22 resource: aws.s3
23 filters:
24 - type: iam-analyzer
25 key: isPublic
26 value: true
28 """
30 schema = type_schema('iam-analyzer',
31 analyzer={'type': 'string'}, rinherit=ValueFilter.schema)
32 schema_alias = True
33 permissions = ('access-analyzer:ListFindings', 'access-analyzer:ListAnalyzers')
34 supported_types = (
35 'AWS::IAM::Role',
36 'AWS::KMS::Key',
37 'AWS::Lambda::Function',
38 'AWS::Lambda::LayerVersion',
39 'AWS::S3::Bucket',
40 'AWS::SQS::Queue',
41 )
43 analysis_annotation = 'c7n:AccessAnalysis'
45 def process(self, resources, event=None):
46 client = local_session(self.manager.session_factory).client('accessanalyzer')
47 analyzer_arn = self.get_analyzer(client)
48 results = []
49 self.annotate = False
50 self.get_findings(
51 client, analyzer_arn,
52 [r for r in resources if self.analysis_annotation not in r])
53 for r in resources:
54 findings = r.get(self.analysis_annotation, [])
55 if not findings:
56 continue
57 elif not len(self.data) > 1:
58 results.append(r)
59 continue
60 found = False
61 for f in findings:
62 if self(f):
63 found = True
64 break
65 if found:
66 results.append(r)
67 return results
69 def get_findings(self, client, analyzer_arn, resources):
70 for resource_set in chunks(
71 zip(self.manager.get_arns(resources), resources),
72 20):
73 resource_set = dict(resource_set)
74 filters = {
75 'status': {'eq': ['ACTIVE']},
76 'resource': {'contains': list(resource_set)},
77 'resourceType': {'eq': [self.manager.resource_type.cfn_type]}
78 }
79 for finding in self.manager.retry(
80 client.list_findings,
81 analyzerArn=analyzer_arn, filter=filters).get('findings', ()):
82 r = resource_set[finding['resource']]
83 r.setdefault(self.analysis_annotation, []).append(finding)
85 def get_analyzer(self, client):
86 if self.data.get('analyzer'):
87 return self.data['analyzer']
88 analyzers = client.list_analyzers(type='ACCOUNT').get('analyzers', ())
89 found = False
90 for a in analyzers:
91 if a['status'] != 'ACTIVE':
92 continue
93 found = a
94 if not found:
95 raise PolicyExecutionError(
96 "policy:%s no access analyzer found in account or org analyzer specified" % (
97 self.manager.ctx.policy.name
98 ))
99 return found['arn']
101 @classmethod
102 def register_resources(klass, registry, resource_class):
103 if resource_class.resource_type.cfn_type not in klass.supported_types:
104 return
105 resource_class.filter_registry.register('iam-analyzer', klass)
108resources.subscribe(AccessAnalyzer.register_resources)