Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/iamanalyzer.py: 34%

56 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4 

5from .core import ValueFilter 

6 

7from c7n.exceptions import PolicyExecutionError 

8from c7n.manager import resources 

9from c7n.utils import type_schema, local_session, chunks 

10 

11 

12class AccessAnalyzer(ValueFilter): 

13 """Analyze resource access policies using AWS IAM Access Analyzer. 

14 

15 Access analyzer uses logic based reasoning to analyze embedded resource 

16 iam access policies to determine access outside of a zone of trust. 

17 

18 .. code-block:: yaml 

19 

20 policies: 

21 - name: s3-check 

22 resource: aws.s3 

23 filters: 

24 - type: iam-analyzer 

25 key: isPublic 

26 value: true 

27 

28 """ 

29 

30 schema = type_schema('iam-analyzer', 

31 analyzer={'type': 'string'}, rinherit=ValueFilter.schema) 

32 schema_alias = True 

33 permissions = ('access-analyzer:ListFindings', 'access-analyzer:ListAnalyzers') 

34 supported_types = ( 

35 'AWS::IAM::Role', 

36 'AWS::KMS::Key', 

37 'AWS::Lambda::Function', 

38 'AWS::Lambda::LayerVersion', 

39 'AWS::S3::Bucket', 

40 'AWS::SQS::Queue', 

41 ) 

42 

43 analysis_annotation = 'c7n:AccessAnalysis' 

44 

45 def process(self, resources, event=None): 

46 client = local_session(self.manager.session_factory).client('accessanalyzer') 

47 analyzer_arn = self.get_analyzer(client) 

48 results = [] 

49 self.annotate = False 

50 self.get_findings( 

51 client, analyzer_arn, 

52 [r for r in resources if self.analysis_annotation not in r]) 

53 for r in resources: 

54 findings = r.get(self.analysis_annotation, []) 

55 if not findings: 

56 continue 

57 elif not len(self.data) > 1: 

58 results.append(r) 

59 continue 

60 found = False 

61 for f in findings: 

62 if self(f): 

63 found = True 

64 break 

65 if found: 

66 results.append(r) 

67 return results 

68 

69 def get_findings(self, client, analyzer_arn, resources): 

70 for resource_set in chunks( 

71 zip(self.manager.get_arns(resources), resources), 

72 20): 

73 resource_set = dict(resource_set) 

74 filters = { 

75 'status': {'eq': ['ACTIVE']}, 

76 'resource': {'contains': list(resource_set)}, 

77 'resourceType': {'eq': [self.manager.resource_type.cfn_type]} 

78 } 

79 for finding in self.manager.retry( 

80 client.list_findings, 

81 analyzerArn=analyzer_arn, filter=filters).get('findings', ()): 

82 r = resource_set[finding['resource']] 

83 r.setdefault(self.analysis_annotation, []).append(finding) 

84 

85 def get_analyzer(self, client): 

86 if self.data.get('analyzer'): 

87 return self.data['analyzer'] 

88 analyzers = client.list_analyzers(type='ACCOUNT').get('analyzers', ()) 

89 found = False 

90 for a in analyzers: 

91 if a['status'] != 'ACTIVE': 

92 continue 

93 found = a 

94 if not found: 

95 raise PolicyExecutionError( 

96 "policy:%s no access analyzer found in account or org analyzer specified" % ( 

97 self.manager.ctx.policy.name 

98 )) 

99 return found['arn'] 

100 

101 @classmethod 

102 def register_resources(klass, registry, resource_class): 

103 if resource_class.resource_type.cfn_type not in klass.supported_types: 

104 return 

105 resource_class.filter_registry.register('iam-analyzer', klass) 

106 

107 

108resources.subscribe(AccessAnalyzer.register_resources)