Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/sfn.py: 59%

70 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4from c7n.actions import Action 

5from c7n.manager import resources 

6from c7n.query import QueryResourceManager, TypeInfo, DescribeSource, ConfigSource 

7from c7n.tags import Tag, RemoveTag, universal_augment 

8from c7n.utils import type_schema, local_session, dumps, chunks 

9 

10 

11class DescribeStepFunction(DescribeSource): 

12 

13 def augment(self, resources): 

14 resources = super().augment(resources) 

15 return universal_augment(self.manager, resources) 

16 

17 

18@resources.register('step-machine') 

19class StepFunction(QueryResourceManager): 

20 """AWS Step Functions State Machine""" 

21 

22 class resource_type(TypeInfo): 

23 service = 'stepfunctions' 

24 permission_prefix = 'states' 

25 enum_spec = ('list_state_machines', 'stateMachines', None) 

26 arn = id = 'stateMachineArn' 

27 arn_service = 'states' 

28 arn_type = 'stateMachine' 

29 cfn_type = config_type = 'AWS::StepFunctions::StateMachine' 

30 name = 'name' 

31 date = 'creationDate' 

32 detail_spec = ( 

33 "describe_state_machine", "stateMachineArn", 

34 'stateMachineArn', None) 

35 

36 source_mapping = { 

37 'describe': DescribeStepFunction, 

38 'config': ConfigSource 

39 } 

40 

41 

42class InvokeStepFunction(Action): 

43 """Invoke step function on resources. 

44 

45 By default this will invoke a step function for each resource 

46 providing both the `policy` and `resource` as input. 

47 

48 That behavior can be configured setting policy and bulk 

49 boolean flags on the action. 

50 

51 If bulk action parameter is set to true, then the step 

52 function will be invoked in bulk, with a set of resource arns 

53 under the `resources` key. 

54 

55 The size of the batch can be configured via the batch-size 

56 parameter. Note step function state (input, execution, etc)must 

57 fit within 32k, we default to batch size 250. 

58 

59 :example: 

60 

61 .. code-block:: yaml 

62 

63 policies: 

64 - name: invoke-step-function 

65 resource: s3 

66 filters: 

67 - is-log-target 

68 - "tag:IngestSetup": absent 

69 actions: 

70 - type: invoke-sfn 

71 # This will cause the workflow to be invoked 

72 # with many resources arns in a single execution. 

73 # Note this is *not* the default. 

74 bulk: true 

75 batch-size: 10 

76 state-machine: LogIngestSetup 

77 """ 

78 

79 schema = type_schema( 

80 'invoke-sfn', 

81 required=['state-machine'], 

82 **{'state-machine': {'type': 'string'}, 

83 'batch-size': {'type': 'integer'}, 

84 'bulk': {'type': 'boolean'}, 

85 'policy': {'type': 'boolean'}}) 

86 schema_alias = True 

87 permissions = ('states:StartExecution',) 

88 

89 def process(self, resources): 

90 client = local_session( 

91 self.manager.session_factory).client('stepfunctions') 

92 arn = self.data['state-machine'] 

93 if not arn.startswith('arn'): 

94 arn = 'arn:aws:states:{}:{}:stateMachine:{}'.format( 

95 self.manager.config.region, self.manager.config.account_id, arn) 

96 

97 params = {'stateMachineArn': arn} 

98 pinput = {} 

99 

100 if self.data.get('policy', True): 

101 pinput['policy'] = dict(self.manager.data) 

102 

103 resource_set = list(zip(self.manager.get_arns(resources), resources)) 

104 if self.data.get('bulk', False) is True: 

105 return self.invoke_batch(client, params, pinput, resource_set) 

106 

107 for arn, r in resource_set: 

108 pinput['resource'] = r 

109 params['input'] = dumps(pinput) 

110 r['c7n:execution-arn'] = self.manager.retry( 

111 client.start_execution, **params).get('executionArn') 

112 

113 def invoke_batch(self, client, params, pinput, resource_set): 

114 for batch_rset in chunks(resource_set, self.data.get('batch-size', 250)): 

115 pinput['resources'] = [rarn for rarn, _ in batch_rset] 

116 params['input'] = dumps(pinput) 

117 exec_arn = self.manager.retry( 

118 client.start_execution, **params).get('executionArn') 

119 for _, r in resource_set: 

120 r['c7n:execution-arn'] = exec_arn 

121 

122 @classmethod 

123 def register_resources(cls, registry, resource_class): 

124 if 'invoke-sfn' not in resource_class.action_registry: 

125 resource_class.action_registry.register('invoke-sfn', cls) 

126 

127 

128resources.subscribe(InvokeStepFunction.register_resources) 

129 

130 

131@StepFunction.action_registry.register('tag') 

132class TagStepFunction(Tag): 

133 """Action to create tag(s) on a step function 

134 

135 :example: 

136 

137 .. code-block:: yaml 

138 

139 policies: 

140 - name: tag-step-function 

141 resource: step-machine 

142 actions: 

143 - type: tag 

144 key: target-tag 

145 value: target-tag-value 

146 """ 

147 

148 permissions = ('states:TagResource',) 

149 

150 def process_resource_set(self, client, resources, tags): 

151 

152 tags_lower = [] 

153 

154 for tag in tags: 

155 tags_lower.append({k.lower(): v for k, v in tag.items()}) 

156 

157 for r in resources: 

158 client.tag_resource(resourceArn=r['stateMachineArn'], tags=tags_lower) 

159 

160 

161@StepFunction.action_registry.register('remove-tag') 

162class UnTagStepFunction(RemoveTag): 

163 """Action to create tag(s) on a step function 

164 

165 :example: 

166 

167 .. code-block:: yaml 

168 

169 policies: 

170 - name: step-function-remove-tag 

171 resource: step-machine 

172 actions: 

173 - type: remove-tag 

174 tags: ["test"] 

175 """ 

176 

177 permissions = ('states:UntagResource',) 

178 

179 def process_resource_set(self, client, resources, tag_keys): 

180 

181 for r in resources: 

182 client.untag_resource(resourceArn=r['stateMachineArn'], tagKeys=tag_keys)