Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/setup_teardown.py: 19%
146 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
18from __future__ import annotations
20from typing import TYPE_CHECKING
22if TYPE_CHECKING:
23 from airflow.models.operator import Operator
26class SetupTeardownContext:
27 """Context manager for setup/teardown tasks."""
29 _context_managed_setup_task: Operator | list[Operator] | None = None
30 _previous_context_managed_setup_task: list[Operator | list[Operator]] = []
31 _context_managed_teardown_task: Operator | list[Operator] | None = None
32 _previous_context_managed_teardown_task: list[Operator | list[Operator]] = []
33 active: bool = False
34 context_map: dict[Operator | tuple[Operator], list[Operator]] = {}
36 @classmethod
37 def push_context_managed_setup_task(cls, task: Operator | list[Operator]):
38 if cls._context_managed_setup_task:
39 cls._previous_context_managed_setup_task.append(cls._context_managed_setup_task)
40 cls._context_managed_setup_task = task
42 @classmethod
43 def push_context_managed_teardown_task(cls, task: Operator | list[Operator]):
44 if cls._context_managed_teardown_task:
45 cls._previous_context_managed_teardown_task.append(cls._context_managed_teardown_task)
46 cls._context_managed_teardown_task = task
48 @classmethod
49 def pop_context_managed_setup_task(cls) -> Operator | list[Operator] | None:
50 old_setup_task = cls._context_managed_setup_task
51 if cls._previous_context_managed_setup_task:
52 cls._context_managed_setup_task = cls._previous_context_managed_setup_task.pop()
53 setup_task = cls._context_managed_setup_task
54 if setup_task and old_setup_task:
55 if isinstance(setup_task, list):
56 for task in setup_task:
57 task.set_downstream(old_setup_task)
58 else:
59 setup_task.set_downstream(old_setup_task)
60 else:
61 cls._context_managed_setup_task = None
62 return old_setup_task
64 @classmethod
65 def update_context_map(cls, operator):
66 ctx = SetupTeardownContext.context_map
67 if setup_task := SetupTeardownContext.get_context_managed_setup_task():
68 if isinstance(setup_task, list):
69 setup_task = tuple(setup_task)
70 if ctx.get(setup_task) is None:
71 ctx[setup_task] = [operator]
72 else:
73 ctx[setup_task].append(operator)
74 if teardown_task := SetupTeardownContext.get_context_managed_teardown_task():
75 if isinstance(teardown_task, list):
76 teardown_task = tuple(teardown_task)
77 if ctx.get(teardown_task) is None:
78 ctx[teardown_task] = [operator]
79 else:
80 ctx[teardown_task].append(operator)
82 @classmethod
83 def pop_context_managed_teardown_task(cls) -> Operator | list[Operator] | None:
84 old_teardown_task = cls._context_managed_teardown_task
85 if cls._previous_context_managed_teardown_task:
86 cls._context_managed_teardown_task = cls._previous_context_managed_teardown_task.pop()
87 teardown_task = cls._context_managed_teardown_task
88 if teardown_task and old_teardown_task:
89 if isinstance(teardown_task, list):
90 for task in teardown_task:
91 task.set_upstream(old_teardown_task)
92 else:
93 teardown_task.set_upstream(old_teardown_task)
94 else:
95 cls._context_managed_teardown_task = None
96 return old_teardown_task
98 @classmethod
99 def get_context_managed_setup_task(cls) -> Operator | list[Operator] | None:
100 return cls._context_managed_setup_task
102 @classmethod
103 def get_context_managed_teardown_task(cls) -> Operator | list[Operator] | None:
104 return cls._context_managed_teardown_task
106 @classmethod
107 def push_setup_teardown_task(cls, operator: Operator | list[Operator]):
108 if isinstance(operator, list):
109 first_task: Operator = operator[0]
110 if first_task.is_teardown:
111 if not all(task.is_teardown == first_task.is_teardown for task in operator):
112 raise ValueError("All tasks in the list must be either setup or teardown tasks")
113 upstream_tasks = first_task.upstream_list
114 for task in upstream_tasks:
115 if not task.is_setup and not task.is_teardown:
116 raise ValueError(
117 "All upstream tasks in the context manager must be a setup or teardown task"
118 )
119 SetupTeardownContext.push_context_managed_teardown_task(operator)
120 upstream_setup: list[Operator] = [task for task in upstream_tasks if task.is_setup]
121 if upstream_setup:
122 SetupTeardownContext.push_context_managed_setup_task(upstream_setup)
123 elif first_task.is_setup:
124 if not all(task.is_setup == first_task.is_setup for task in operator):
125 raise ValueError("All tasks in the list must be either setup or teardown tasks")
126 for task in first_task.upstream_list:
127 if not task.is_setup and not task.is_teardown:
128 raise ValueError(
129 "All upstream tasks in the context manager must be a setup or teardown task"
130 )
131 SetupTeardownContext.push_context_managed_setup_task(operator)
132 downstream_teardown: list[Operator] = [
133 task for task in first_task.downstream_list if task.is_teardown
134 ]
135 if downstream_teardown:
136 SetupTeardownContext.push_context_managed_teardown_task(downstream_teardown)
137 elif operator.is_teardown:
138 upstream_tasks = operator.upstream_list
139 for task in upstream_tasks:
140 if not task.is_setup and not task.is_teardown:
141 raise ValueError(
142 "All upstream tasks in the context manager must be a setup or teardown task"
143 )
144 SetupTeardownContext.push_context_managed_teardown_task(operator)
145 upstream_setup = [task for task in upstream_tasks if task.is_setup]
146 if upstream_setup:
147 SetupTeardownContext.push_context_managed_setup_task(upstream_setup)
148 elif operator.is_setup:
149 for task in operator.upstream_list:
150 if not task.is_setup and not task.is_teardown:
151 raise ValueError(
152 "All upstream tasks in the context manager must be a setup or teardown task"
153 )
154 SetupTeardownContext.push_context_managed_setup_task(operator)
155 downstream_teardown = [task for task in operator.downstream_list if task.is_teardown]
156 if downstream_teardown:
157 SetupTeardownContext.push_context_managed_teardown_task(downstream_teardown)
158 SetupTeardownContext.active = True
160 @classmethod
161 def set_work_task_roots_and_leaves(cls):
162 if setup_task := cls.get_context_managed_setup_task():
163 if isinstance(setup_task, list):
164 setup_task = tuple(setup_task)
165 tasks_in_context = cls.context_map.get(setup_task, [])
166 if tasks_in_context:
167 roots = [task for task in tasks_in_context if not task.upstream_list]
168 if not roots:
169 setup_task >> tasks_in_context[0]
170 elif isinstance(setup_task, tuple):
171 for task in setup_task:
172 task >> roots
173 else:
174 setup_task >> roots
175 if teardown_task := cls.get_context_managed_teardown_task():
176 if isinstance(teardown_task, list):
177 teardown_task = tuple(teardown_task)
178 tasks_in_context = cls.context_map.get(teardown_task, [])
179 if tasks_in_context:
180 leaves = [task for task in tasks_in_context if not task.downstream_list]
181 if not leaves:
182 teardown_task << tasks_in_context[-1]
183 elif isinstance(teardown_task, tuple):
184 for task in teardown_task:
185 task << leaves
186 else:
187 teardown_task << leaves
188 setup_task = SetupTeardownContext.pop_context_managed_setup_task()
189 teardown_task = SetupTeardownContext.pop_context_managed_teardown_task()
190 if isinstance(setup_task, list):
191 setup_task = tuple(setup_task)
192 if isinstance(teardown_task, list):
193 teardown_task = tuple(teardown_task)
194 SetupTeardownContext.active = False
195 SetupTeardownContext.context_map.pop(setup_task, None)
196 SetupTeardownContext.context_map.pop(teardown_task, None)