Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/policystatement.py: 20%

44 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import json 

4 

5from .core import Filter 

6from c7n.utils import type_schema, format_string_values 

7 

8 

9class HasStatementFilter(Filter): 

10 """Find resources with matching access policy statements. 

11 :Example: 

12 

13 .. code-block:: yaml 

14 

15 policies: 

16 - name: sns-check-statement-id 

17 resource: sns 

18 filters: 

19 - type: has-statement 

20 statement_ids: 

21 - BlockNonSSL 

22 policies: 

23 - name: sns-check-block-non-ssl 

24 resource: sns 

25 filters: 

26 - type: has-statement 

27 statements: 

28 - Effect: Deny 

29 Action: 'SNS:Publish' 

30 Principal: '*' 

31 Condition: 

32 Bool: 

33 "aws:SecureTransport": "false" 

34 """ 

35 schema = type_schema( 

36 'has-statement', 

37 statement_ids={'type': 'array', 'items': {'type': 'string'}}, 

38 statements={ 

39 'type': 'array', 

40 'items': { 

41 'type': 'object', 

42 'properties': { 

43 'Sid': {'type': 'string'}, 

44 'Effect': {'type': 'string', 'enum': ['Allow', 'Deny']}, 

45 'Principal': {'anyOf': [ 

46 {'type': 'string'}, 

47 {'type': 'object'}, {'type': 'array'}]}, 

48 'NotPrincipal': { 

49 'anyOf': [{'type': 'object'}, {'type': 'array'}]}, 

50 'Action': { 

51 'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

52 'NotAction': { 

53 'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

54 'Resource': { 

55 'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

56 'NotResource': { 

57 'anyOf': [{'type': 'string'}, {'type': 'array'}]}, 

58 'Condition': {'type': 'object'} 

59 }, 

60 'required': ['Effect'] 

61 } 

62 }) 

63 

64 def process(self, resources, event=None): 

65 return list(filter(None, map(self.process_resource, resources))) 

66 

67 def action_resource_case_insensitive(self, actions): 

68 if isinstance(actions, str): 

69 if len(actions.split(':')) > 1: 

70 actionsFormatted = '{}:{}'.format(actions.split(':')[0].lower(), 

71 actions.split(':')[1]) 

72 else: 

73 actionsFormatted = actions 

74 else: 

75 actionsFormatted = [] 

76 for action in actions: 

77 actionsFormatted.append('{}:{}'.format(action.split(':')[0].lower(), 

78 action.split(':')[1])) 

79 return actionsFormatted 

80 

81 def process_resource(self, resource): 

82 policy_attribute = getattr(self, 'policy_attribute', 'Policy') 

83 p = resource.get(policy_attribute) 

84 if p is None: 

85 return None 

86 p = json.loads(p) 

87 

88 required = list(self.data.get('statement_ids', [])) 

89 statements = p.get('Statement', []) 

90 for s in list(statements): 

91 if s.get('Sid') in required: 

92 required.remove(s['Sid']) 

93 

94 required_statements = list(self.data.get('statements', [])) 

95 

96 required_statements = format_string_values(list(self.data.get('statements', [])), 

97 **self.get_std_format_args(resource)) 

98 

99 for required_statement in required_statements: 

100 for statement in statements: 

101 found = 0 

102 for key, value in required_statement.items(): 

103 if key in ['Action', 'NotAction']: 

104 if key in statement and self.action_resource_case_insensitive(value) \ 

105 == self.action_resource_case_insensitive(statement[key]): 

106 found += 1 

107 else: 

108 if key in statement and value == statement[key]: 

109 found += 1 

110 if found and found == len(required_statement): 

111 required_statements.remove(required_statement) 

112 break 

113 

114 if (self.data.get('statement_ids', []) and not required) or \ 

115 (self.data.get('statements', []) and not required_statements): 

116 return resource 

117 return None