Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/appflow.py: 70%
44 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from c7n.actions import BaseAction
4from c7n.manager import resources
5from c7n.query import QueryResourceManager, TypeInfo
6from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction
7from c7n.utils import local_session, type_schema
10@resources.register('app-flow')
11class AppFlow(QueryResourceManager):
13 class resource_type(TypeInfo):
14 service = 'appflow'
15 arn_type = 'flow'
16 enum_spec = ('list_flows', 'flows', {'maxResults': 100})
17 id = name = 'flowName'
18 arn = 'flowArn'
19 detail_spec = ('describe_flow', 'flowName', 'flowName', None)
21 def augment(self, resources):
22 resources = super(AppFlow, self).augment(resources)
23 for r in resources:
24 if 'tags' in r:
25 r['Tags'] = [{'Key': k, 'Value': v} for k, v in r['tags'].items()]
26 return resources
29@AppFlow.action_registry.register('tag')
30class TagAppFlowResource(Tag):
31 """Action to create tag(s) on an AppFlow resource
33 :example:
35 .. code-block:: yaml
37 policies:
38 - name: tag-app-flow
39 resource: app-flow
40 actions:
41 - type: tag
42 key: tag-key
43 value: tag-value
44 """
46 permissions = ('appflow:TagResource',)
48 def process_resource_set(self, client, resources, new_tags):
49 tags = {t.get('Key'): t.get('Value') for t in new_tags}
50 for r in resources:
51 client.tag_resource(resourceArn=r['flowArn'], tags=tags)
54@AppFlow.action_registry.register('remove-tag')
55class RemoveTagAppFlowResource(RemoveTag):
56 """Action to remove tag(s) on an AppFlow resource
58 :example:
60 .. code-block:: yaml
62 policies:
63 - name: untag-app-flow
64 resource: app-flow
65 actions:
66 - type: remove-tag
67 tags: ['tag-key']
68 """
70 permissions = ('appflow:UntagResource',)
72 def process_resource_set(self, client, resources, tag_keys):
73 for r in resources:
74 client.untag_resource(resourceArn=r['flowArn'], tagKeys=tag_keys)
77AppFlow.action_registry.register('mark-for-op', TagDelayedAction)
78AppFlow.filter_registry.register('marked-for-op', TagActionFilter)
81@AppFlow.action_registry.register('delete')
82class DeleteAppFlowResource(BaseAction):
83 """Action to delete an AppFlow
85 The 'force' parameter is needed when deleting an AppFlow that is currently
86 in use.
88 :example:
90 .. code-block:: yaml
92 policies:
93 - name: app-flow-delete
94 resource: app-flow
95 filters:
96 - type: marked-for-op
97 op: delete
98 actions:
99 - type: delete
100 force: true
101 """
103 permissions = ('appflow:DeleteFlow',)
104 schema = type_schema('delete', force={'type': 'boolean'})
106 def process(self, resources):
107 client = local_session(
108 self.manager.session_factory).client('appflow')
109 force_delete = self.data.get('force', False)
110 for r in resources:
111 self.manager.retry(
112 client.delete_flow,
113 flowName=r['flowName'],
114 forceDelete=force_delete,
115 ignore_err_codes=('ResourceNotFoundException',)
116 )