Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/actions/policy.py: 29%

49 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import hashlib 

4 

5from .core import BaseAction 

6from c7n import utils 

7 

8 

9class RemovePolicyBase(BaseAction): 

10 

11 schema = utils.type_schema( 

12 'remove-statements', 

13 required=['statement_ids'], 

14 statement_ids={'oneOf': [ 

15 {'enum': ['matched', "*"]}, 

16 {'type': 'array', 'items': {'type': 'string'}}]}) 

17 

18 def process_policy(self, policy, resource, matched_key): 

19 statements = policy.get('Statement', []) 

20 resource_statements = resource.get(matched_key, ()) 

21 

22 return remove_statements( 

23 self.data['statement_ids'], statements, resource_statements) 

24 

25 

26def remove_statements(match_ids, statements, matched=()): 

27 found = [] 

28 for s in list(statements): 

29 s_found = False 

30 if match_ids == '*': 

31 s_found = True 

32 elif match_ids == 'matched': 

33 if s in matched: 

34 s_found = True 

35 elif 'Sid' in s and s['Sid'] in match_ids: 

36 s_found = True 

37 if s_found: 

38 found.append(s) 

39 statements.remove(s) 

40 if not found: 

41 return None, found 

42 return statements, found 

43 

44 

45def statement_id(s): 

46 # for statements without a sid, use a checksum for identity 

47 return hashlib.sha224(utils.dumps(s, indent=0).encode('utf8')).hexdigest() 

48 

49 

50class ModifyPolicyBase(BaseAction): 

51 """Action to modify resource IAM policy statements. 

52 

53 Applies to all resources with embedded IAM Policies. 

54 

55 :example: 

56 

57 .. code-block:: yaml 

58 

59 policies: 

60 - name: sns-yank-cross-account 

61 resource: sns 

62 filters: 

63 - type: cross-account 

64 actions: 

65 - type: modify-policy 

66 add-statements: [{ 

67 "Sid": "ReplaceWithMe", 

68 "Effect": "Allow", 

69 "Principal": "*", 

70 "Action": ["SNS:GetTopicAttributes"], 

71 "Resource": topic_arn, 

72 }] 

73 remove-statements: '*' 

74 """ 

75 

76 schema_alias = True 

77 schema = utils.type_schema( 

78 'modify-policy', 

79 **{ 

80 'add-statements': { 

81 'type': 'array', 

82 'items': {'$ref': '#/definitions/iam-statement'}, 

83 }, 

84 'remove-statements': { 

85 'type': ['array', 'string'], 

86 'oneOf': [ 

87 {'enum': ['matched', '*']}, 

88 {'type': 'array', 'items': {'type': 'string'}} 

89 ], 

90 } 

91 } 

92 ) 

93 

94 def __init__(self, data=None, manager=None): 

95 if manager is not None: 

96 config_args = { 

97 'account_id': manager.config.account_id, 

98 'region': manager.config.region 

99 } 

100 self.data = utils.format_string_values(data, **config_args) 

101 else: 

102 self.data = utils.format_string_values(data) 

103 self.manager = manager 

104 

105 def add_statements(self, policy_statements): 

106 current = {s.get('Sid', statement_id(s)): s for s in policy_statements} 

107 additional = {s.get('Sid', statement_id(s)): s for s in self.data.get('add-statements', [])} 

108 current.update(additional) 

109 return list(current.values()), bool(additional) 

110 

111 def remove_statements(self, policy_statements, resource, matched_key): 

112 statement_ids = self.data.get('remove-statements', []) 

113 found = [] 

114 if len(statement_ids) == 0: 

115 return policy_statements, found 

116 resource_statements = resource.get(matched_key, ()) 

117 return remove_statements( 

118 statement_ids, policy_statements, resource_statements)