Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/sfn.py: 59%
70 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
4from c7n.actions import Action
5from c7n.manager import resources
6from c7n.query import QueryResourceManager, TypeInfo, DescribeSource, ConfigSource
7from c7n.tags import Tag, RemoveTag, universal_augment
8from c7n.utils import type_schema, local_session, dumps, chunks
11class DescribeStepFunction(DescribeSource):
13 def augment(self, resources):
14 resources = super().augment(resources)
15 return universal_augment(self.manager, resources)
18@resources.register('step-machine')
19class StepFunction(QueryResourceManager):
20 """AWS Step Functions State Machine"""
22 class resource_type(TypeInfo):
23 service = 'stepfunctions'
24 permission_prefix = 'states'
25 enum_spec = ('list_state_machines', 'stateMachines', None)
26 arn = id = 'stateMachineArn'
27 arn_service = 'states'
28 arn_type = 'stateMachine'
29 cfn_type = config_type = 'AWS::StepFunctions::StateMachine'
30 name = 'name'
31 date = 'creationDate'
32 detail_spec = (
33 "describe_state_machine", "stateMachineArn",
34 'stateMachineArn', None)
36 source_mapping = {
37 'describe': DescribeStepFunction,
38 'config': ConfigSource
39 }
42class InvokeStepFunction(Action):
43 """Invoke step function on resources.
45 By default this will invoke a step function for each resource
46 providing both the `policy` and `resource` as input.
48 That behavior can be configured setting policy and bulk
49 boolean flags on the action.
51 If bulk action parameter is set to true, then the step
52 function will be invoked in bulk, with a set of resource arns
53 under the `resources` key.
55 The size of the batch can be configured via the batch-size
56 parameter. Note step function state (input, execution, etc)must
57 fit within 32k, we default to batch size 250.
59 :example:
61 .. code-block:: yaml
63 policies:
64 - name: invoke-step-function
65 resource: s3
66 filters:
67 - is-log-target
68 - "tag:IngestSetup": absent
69 actions:
70 - type: invoke-sfn
71 # This will cause the workflow to be invoked
72 # with many resources arns in a single execution.
73 # Note this is *not* the default.
74 bulk: true
75 batch-size: 10
76 state-machine: LogIngestSetup
77 """
79 schema = type_schema(
80 'invoke-sfn',
81 required=['state-machine'],
82 **{'state-machine': {'type': 'string'},
83 'batch-size': {'type': 'integer'},
84 'bulk': {'type': 'boolean'},
85 'policy': {'type': 'boolean'}})
86 schema_alias = True
87 permissions = ('states:StartExecution',)
89 def process(self, resources):
90 client = local_session(
91 self.manager.session_factory).client('stepfunctions')
92 arn = self.data['state-machine']
93 if not arn.startswith('arn'):
94 arn = 'arn:aws:states:{}:{}:stateMachine:{}'.format(
95 self.manager.config.region, self.manager.config.account_id, arn)
97 params = {'stateMachineArn': arn}
98 pinput = {}
100 if self.data.get('policy', True):
101 pinput['policy'] = dict(self.manager.data)
103 resource_set = list(zip(self.manager.get_arns(resources), resources))
104 if self.data.get('bulk', False) is True:
105 return self.invoke_batch(client, params, pinput, resource_set)
107 for arn, r in resource_set:
108 pinput['resource'] = r
109 params['input'] = dumps(pinput)
110 r['c7n:execution-arn'] = self.manager.retry(
111 client.start_execution, **params).get('executionArn')
113 def invoke_batch(self, client, params, pinput, resource_set):
114 for batch_rset in chunks(resource_set, self.data.get('batch-size', 250)):
115 pinput['resources'] = [rarn for rarn, _ in batch_rset]
116 params['input'] = dumps(pinput)
117 exec_arn = self.manager.retry(
118 client.start_execution, **params).get('executionArn')
119 for _, r in resource_set:
120 r['c7n:execution-arn'] = exec_arn
122 @classmethod
123 def register_resources(cls, registry, resource_class):
124 if 'invoke-sfn' not in resource_class.action_registry:
125 resource_class.action_registry.register('invoke-sfn', cls)
128resources.subscribe(InvokeStepFunction.register_resources)
131@StepFunction.action_registry.register('tag')
132class TagStepFunction(Tag):
133 """Action to create tag(s) on a step function
135 :example:
137 .. code-block:: yaml
139 policies:
140 - name: tag-step-function
141 resource: step-machine
142 actions:
143 - type: tag
144 key: target-tag
145 value: target-tag-value
146 """
148 permissions = ('states:TagResource',)
150 def process_resource_set(self, client, resources, tags):
152 tags_lower = []
154 for tag in tags:
155 tags_lower.append({k.lower(): v for k, v in tag.items()})
157 for r in resources:
158 client.tag_resource(resourceArn=r['stateMachineArn'], tags=tags_lower)
161@StepFunction.action_registry.register('remove-tag')
162class UnTagStepFunction(RemoveTag):
163 """Action to create tag(s) on a step function
165 :example:
167 .. code-block:: yaml
169 policies:
170 - name: step-function-remove-tag
171 resource: step-machine
172 actions:
173 - type: remove-tag
174 tags: ["test"]
175 """
177 permissions = ('states:UntagResource',)
179 def process_resource_set(self, client, resources, tag_keys):
181 for r in resources:
182 client.untag_resource(resourceArn=r['stateMachineArn'], tagKeys=tag_keys)