Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/airflow.py: 65%

72 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from c7n.manager import resources 

4from c7n.query import QueryResourceManager, TypeInfo 

5from c7n.filters.kms import KmsRelatedFilter 

6from c7n.tags import RemoveTag, Tag, TagDelayedAction, TagActionFilter 

7from c7n.actions import Action 

8from c7n.utils import local_session, type_schema 

9 

10 

11@resources.register('airflow') 

12class ApacheAirflow(QueryResourceManager): 

13 class resource_type(TypeInfo): 

14 service = 'mwaa' 

15 id = name = 'Name' 

16 enum_spec = ('list_environments', 'Environments', None) 

17 detail_spec = ('get_environment', 'Name', None, 'Environment') 

18 arn = 'Arn' 

19 arn_type = 'environment' 

20 cfn_type = 'AWS::MWAA::Environment' 

21 permission_prefix = 'airflow' 

22 

23 permissions = ( 

24 'airflow:GetEnvironment', 

25 'airflow:ListEnvironments', 

26 ) 

27 

28 def augment(self, resources): 

29 resources = super(ApacheAirflow, self).augment(resources) 

30 for r in resources: 

31 r['Tags'] = [{'Key': k, 'Value': v} for k, v in r.get('Tags', {}).items()] 

32 return resources 

33 

34 

35@ApacheAirflow.filter_registry.register('kms-key') 

36class ApacheAirflowKmsFilter(KmsRelatedFilter): 

37 """ 

38 

39 Filter a Managed Workflow for Apache Airflow environment by its associcated kms key 

40 and optionally the aliasname of the kms key by using 'c7n:AliasName' 

41 

42 :example: 

43 

44 .. code-block:: yaml 

45 

46 policies: 

47 - name: airflow-kms-key-filter 

48 resource: airflow 

49 filters: 

50 - type: kms-key 

51 key: c7n:AliasName 

52 value: alias/aws/mwaa 

53 """ 

54 RelatedIdsExpression = 'KmsKey' 

55 

56 

57@ApacheAirflow.action_registry.register('tag') 

58class TagApacheAirflow(Tag): 

59 """Action to create tag(s) on a Managed Workflow for Apache Airflow environment 

60 

61 :example: 

62 

63 .. code-block:: yaml 

64 

65 policies: 

66 - name: tag-airflow 

67 resource: airflow 

68 filters: 

69 - "tag:target-tag": absent 

70 actions: 

71 - type: tag 

72 key: target-tag 

73 value: target-tag-value 

74 """ 

75 

76 permissions = ('airflow:TagResource',) 

77 

78 def process_resource_set(self, client, airflow, new_tags): 

79 for r in airflow: 

80 try: 

81 client.tag_resource( 

82 ResourceArn=r['Arn'], 

83 Tags={t['Key']: t['Value'] for t in new_tags}) 

84 except client.exceptions.ResourceNotFound: 

85 continue 

86 

87 

88@ApacheAirflow.action_registry.register('remove-tag') 

89class UntagApacheAirflow(RemoveTag): 

90 """Action to remove tag(s) on a Managed Workflow for Apache Airflow environment 

91 

92 :example: 

93 

94 .. code-block:: yaml 

95 

96 policies: 

97 - name: airflow-remove-tag 

98 resource: airflow 

99 filters: 

100 - "tag:OutdatedTag": present 

101 actions: 

102 - type: remove-tag 

103 tags: ["OutdatedTag"] 

104 """ 

105 

106 permissions = ('airflow:UntagResource',) 

107 

108 def process_resource_set(self, client, airflow, tags): 

109 for r in airflow: 

110 try: 

111 client.untag_resource(ResourceArn=r['Arn'], tagKeys=tags) 

112 except client.exceptions.ResourceNotFound: 

113 continue 

114 

115 

116ApacheAirflow.filter_registry.register('marked-for-op', TagActionFilter) 

117ApacheAirflow.action_registry.register('mark-for-op', TagDelayedAction) 

118 

119@ApacheAirflow.action_registry.register('update-environment') 

120class UpdateApacheAirflowEnvironment(Action): 

121 """ 

122 Action to update an Airflow environment to 

123 set the WebserverAccessMode to PRIVATE_ONLY or PUBLIC_ONLY. 

124 

125 :example: 

126 

127 .. code-block:: yaml 

128 

129 policies: 

130 - name: set-webserver-access-mode 

131 resource: airflow 

132 actions: 

133 - type: update-environment 

134 access_mode: PRIVATE_ONLY 

135 """ 

136 

137 permissions = ('airflow:UpdateEnvironment',) 

138 

139 schema = type_schema( 

140 'update-environment', 

141 access_mode={'type': 'string', 'enum': ['PRIVATE_ONLY', 'PUBLIC_ONLY']}, 

142 required=['access_mode'] 

143 ) 

144 valid_origin_states = ('AVAILABLE', 'UPDATE_FAILED') 

145 

146 def process(self, resources): 

147 resources = self.filter_resources(resources, 'Status', self.valid_origin_states) 

148 client = local_session(self.manager.session_factory).client('mwaa') 

149 access_mode = self.data.get('access_mode') 

150 for r in resources: 

151 self.process_environment(r, client, access_mode) 

152 

153 def process_environment(self, r, client, access_mode): 

154 current_access_mode = r.get('WebserverAccessMode') 

155 if current_access_mode == access_mode: 

156 return 

157 client.update_environment( 

158 Name=r['Name'], 

159 WebserverAccessMode=access_mode 

160 ) 

161 

162@ApacheAirflow.action_registry.register('delete-environment') 

163class DeleteApacheAirflowEnvironment(Action): 

164 """ 

165 Action to delete a Managed Workflow for Apache Airflow environment 

166 

167 :example: 

168 

169 .. code-block:: yaml 

170 

171 policies: 

172 - name: delete-airflow-environment 

173 resource: airflow 

174 actions: 

175 - type: delete-environment 

176 """ 

177 

178 permissions = ('airflow:DeleteEnvironment',) 

179 

180 schema = type_schema('delete-environment') 

181 valid_origin_states = ('AVAILABLE', 'CREATE_FAILED', 'DELETE_FAILED',) 

182 

183 def process(self, resources): 

184 resources = self.filter_resources(resources, 'Status', self.valid_origin_states) 

185 client = local_session(self.manager.session_factory).client('mwaa') 

186 for r in resources: 

187 self.manager.retry( 

188 client.delete_environment, 

189 Name=r["Name"], 

190 ignore_err_codes=("ResourceNotFoundException",) 

191 )