Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/airflow.py: 65%
72 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from c7n.manager import resources
4from c7n.query import QueryResourceManager, TypeInfo
5from c7n.filters.kms import KmsRelatedFilter
6from c7n.tags import RemoveTag, Tag, TagDelayedAction, TagActionFilter
7from c7n.actions import Action
8from c7n.utils import local_session, type_schema
11@resources.register('airflow')
12class ApacheAirflow(QueryResourceManager):
13 class resource_type(TypeInfo):
14 service = 'mwaa'
15 id = name = 'Name'
16 enum_spec = ('list_environments', 'Environments', None)
17 detail_spec = ('get_environment', 'Name', None, 'Environment')
18 arn = 'Arn'
19 arn_type = 'environment'
20 cfn_type = 'AWS::MWAA::Environment'
21 permission_prefix = 'airflow'
23 permissions = (
24 'airflow:GetEnvironment',
25 'airflow:ListEnvironments',
26 )
28 def augment(self, resources):
29 resources = super(ApacheAirflow, self).augment(resources)
30 for r in resources:
31 r['Tags'] = [{'Key': k, 'Value': v} for k, v in r.get('Tags', {}).items()]
32 return resources
35@ApacheAirflow.filter_registry.register('kms-key')
36class ApacheAirflowKmsFilter(KmsRelatedFilter):
37 """
39 Filter a Managed Workflow for Apache Airflow environment by its associcated kms key
40 and optionally the aliasname of the kms key by using 'c7n:AliasName'
42 :example:
44 .. code-block:: yaml
46 policies:
47 - name: airflow-kms-key-filter
48 resource: airflow
49 filters:
50 - type: kms-key
51 key: c7n:AliasName
52 value: alias/aws/mwaa
53 """
54 RelatedIdsExpression = 'KmsKey'
57@ApacheAirflow.action_registry.register('tag')
58class TagApacheAirflow(Tag):
59 """Action to create tag(s) on a Managed Workflow for Apache Airflow environment
61 :example:
63 .. code-block:: yaml
65 policies:
66 - name: tag-airflow
67 resource: airflow
68 filters:
69 - "tag:target-tag": absent
70 actions:
71 - type: tag
72 key: target-tag
73 value: target-tag-value
74 """
76 permissions = ('airflow:TagResource',)
78 def process_resource_set(self, client, airflow, new_tags):
79 for r in airflow:
80 try:
81 client.tag_resource(
82 ResourceArn=r['Arn'],
83 Tags={t['Key']: t['Value'] for t in new_tags})
84 except client.exceptions.ResourceNotFound:
85 continue
88@ApacheAirflow.action_registry.register('remove-tag')
89class UntagApacheAirflow(RemoveTag):
90 """Action to remove tag(s) on a Managed Workflow for Apache Airflow environment
92 :example:
94 .. code-block:: yaml
96 policies:
97 - name: airflow-remove-tag
98 resource: airflow
99 filters:
100 - "tag:OutdatedTag": present
101 actions:
102 - type: remove-tag
103 tags: ["OutdatedTag"]
104 """
106 permissions = ('airflow:UntagResource',)
108 def process_resource_set(self, client, airflow, tags):
109 for r in airflow:
110 try:
111 client.untag_resource(ResourceArn=r['Arn'], tagKeys=tags)
112 except client.exceptions.ResourceNotFound:
113 continue
116ApacheAirflow.filter_registry.register('marked-for-op', TagActionFilter)
117ApacheAirflow.action_registry.register('mark-for-op', TagDelayedAction)
119@ApacheAirflow.action_registry.register('update-environment')
120class UpdateApacheAirflowEnvironment(Action):
121 """
122 Action to update an Airflow environment to
123 set the WebserverAccessMode to PRIVATE_ONLY or PUBLIC_ONLY.
125 :example:
127 .. code-block:: yaml
129 policies:
130 - name: set-webserver-access-mode
131 resource: airflow
132 actions:
133 - type: update-environment
134 access_mode: PRIVATE_ONLY
135 """
137 permissions = ('airflow:UpdateEnvironment',)
139 schema = type_schema(
140 'update-environment',
141 access_mode={'type': 'string', 'enum': ['PRIVATE_ONLY', 'PUBLIC_ONLY']},
142 required=['access_mode']
143 )
144 valid_origin_states = ('AVAILABLE', 'UPDATE_FAILED')
146 def process(self, resources):
147 resources = self.filter_resources(resources, 'Status', self.valid_origin_states)
148 client = local_session(self.manager.session_factory).client('mwaa')
149 access_mode = self.data.get('access_mode')
150 for r in resources:
151 self.process_environment(r, client, access_mode)
153 def process_environment(self, r, client, access_mode):
154 current_access_mode = r.get('WebserverAccessMode')
155 if current_access_mode == access_mode:
156 return
157 client.update_environment(
158 Name=r['Name'],
159 WebserverAccessMode=access_mode
160 )
162@ApacheAirflow.action_registry.register('delete-environment')
163class DeleteApacheAirflowEnvironment(Action):
164 """
165 Action to delete a Managed Workflow for Apache Airflow environment
167 :example:
169 .. code-block:: yaml
171 policies:
172 - name: delete-airflow-environment
173 resource: airflow
174 actions:
175 - type: delete-environment
176 """
178 permissions = ('airflow:DeleteEnvironment',)
180 schema = type_schema('delete-environment')
181 valid_origin_states = ('AVAILABLE', 'CREATE_FAILED', 'DELETE_FAILED',)
183 def process(self, resources):
184 resources = self.filter_resources(resources, 'Status', self.valid_origin_states)
185 client = local_session(self.manager.session_factory).client('mwaa')
186 for r in resources:
187 self.manager.retry(
188 client.delete_environment,
189 Name=r["Name"],
190 ignore_err_codes=("ResourceNotFoundException",)
191 )