Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/appflow.py: 70%

44 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from c7n.actions import BaseAction 

4from c7n.manager import resources 

5from c7n.query import QueryResourceManager, TypeInfo 

6from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction 

7from c7n.utils import local_session, type_schema 

8 

9 

10@resources.register('app-flow') 

11class AppFlow(QueryResourceManager): 

12 

13 class resource_type(TypeInfo): 

14 service = 'appflow' 

15 arn_type = 'flow' 

16 enum_spec = ('list_flows', 'flows', {'maxResults': 100}) 

17 id = name = 'flowName' 

18 arn = 'flowArn' 

19 detail_spec = ('describe_flow', 'flowName', 'flowName', None) 

20 

21 def augment(self, resources): 

22 resources = super(AppFlow, self).augment(resources) 

23 for r in resources: 

24 if 'tags' in r: 

25 r['Tags'] = [{'Key': k, 'Value': v} for k, v in r['tags'].items()] 

26 return resources 

27 

28 

29@AppFlow.action_registry.register('tag') 

30class TagAppFlowResource(Tag): 

31 """Action to create tag(s) on an AppFlow resource 

32 

33 :example: 

34 

35 .. code-block:: yaml 

36 

37 policies: 

38 - name: tag-app-flow 

39 resource: app-flow 

40 actions: 

41 - type: tag 

42 key: tag-key 

43 value: tag-value 

44 """ 

45 

46 permissions = ('appflow:TagResource',) 

47 

48 def process_resource_set(self, client, resources, new_tags): 

49 tags = {t.get('Key'): t.get('Value') for t in new_tags} 

50 for r in resources: 

51 client.tag_resource(resourceArn=r['flowArn'], tags=tags) 

52 

53 

54@AppFlow.action_registry.register('remove-tag') 

55class RemoveTagAppFlowResource(RemoveTag): 

56 """Action to remove tag(s) on an AppFlow resource 

57 

58 :example: 

59 

60 .. code-block:: yaml 

61 

62 policies: 

63 - name: untag-app-flow 

64 resource: app-flow 

65 actions: 

66 - type: remove-tag 

67 tags: ['tag-key'] 

68 """ 

69 

70 permissions = ('appflow:UntagResource',) 

71 

72 def process_resource_set(self, client, resources, tag_keys): 

73 for r in resources: 

74 client.untag_resource(resourceArn=r['flowArn'], tagKeys=tag_keys) 

75 

76 

77AppFlow.action_registry.register('mark-for-op', TagDelayedAction) 

78AppFlow.filter_registry.register('marked-for-op', TagActionFilter) 

79 

80 

81@AppFlow.action_registry.register('delete') 

82class DeleteAppFlowResource(BaseAction): 

83 """Action to delete an AppFlow 

84 

85 The 'force' parameter is needed when deleting an AppFlow that is currently 

86 in use. 

87 

88 :example: 

89 

90 .. code-block:: yaml 

91 

92 policies: 

93 - name: app-flow-delete 

94 resource: app-flow 

95 filters: 

96 - type: marked-for-op 

97 op: delete 

98 actions: 

99 - type: delete 

100 force: true 

101 """ 

102 

103 permissions = ('appflow:DeleteFlow',) 

104 schema = type_schema('delete', force={'type': 'boolean'}) 

105 

106 def process(self, resources): 

107 client = local_session( 

108 self.manager.session_factory).client('appflow') 

109 force_delete = self.data.get('force', False) 

110 for r in resources: 

111 self.manager.retry( 

112 client.delete_flow, 

113 flowName=r['flowName'], 

114 forceDelete=force_delete, 

115 ignore_err_codes=('ResourceNotFoundException',) 

116 )