Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/setup_teardown.py: 19%

146 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20from typing import TYPE_CHECKING 

21 

22if TYPE_CHECKING: 

23 from airflow.models.operator import Operator 

24 

25 

26class SetupTeardownContext: 

27 """Context manager for setup/teardown tasks.""" 

28 

29 _context_managed_setup_task: Operator | list[Operator] | None = None 

30 _previous_context_managed_setup_task: list[Operator | list[Operator]] = [] 

31 _context_managed_teardown_task: Operator | list[Operator] | None = None 

32 _previous_context_managed_teardown_task: list[Operator | list[Operator]] = [] 

33 active: bool = False 

34 context_map: dict[Operator | tuple[Operator], list[Operator]] = {} 

35 

36 @classmethod 

37 def push_context_managed_setup_task(cls, task: Operator | list[Operator]): 

38 if cls._context_managed_setup_task: 

39 cls._previous_context_managed_setup_task.append(cls._context_managed_setup_task) 

40 cls._context_managed_setup_task = task 

41 

42 @classmethod 

43 def push_context_managed_teardown_task(cls, task: Operator | list[Operator]): 

44 if cls._context_managed_teardown_task: 

45 cls._previous_context_managed_teardown_task.append(cls._context_managed_teardown_task) 

46 cls._context_managed_teardown_task = task 

47 

48 @classmethod 

49 def pop_context_managed_setup_task(cls) -> Operator | list[Operator] | None: 

50 old_setup_task = cls._context_managed_setup_task 

51 if cls._previous_context_managed_setup_task: 

52 cls._context_managed_setup_task = cls._previous_context_managed_setup_task.pop() 

53 setup_task = cls._context_managed_setup_task 

54 if setup_task and old_setup_task: 

55 if isinstance(setup_task, list): 

56 for task in setup_task: 

57 task.set_downstream(old_setup_task) 

58 else: 

59 setup_task.set_downstream(old_setup_task) 

60 else: 

61 cls._context_managed_setup_task = None 

62 return old_setup_task 

63 

64 @classmethod 

65 def update_context_map(cls, operator): 

66 ctx = SetupTeardownContext.context_map 

67 if setup_task := SetupTeardownContext.get_context_managed_setup_task(): 

68 if isinstance(setup_task, list): 

69 setup_task = tuple(setup_task) 

70 if ctx.get(setup_task) is None: 

71 ctx[setup_task] = [operator] 

72 else: 

73 ctx[setup_task].append(operator) 

74 if teardown_task := SetupTeardownContext.get_context_managed_teardown_task(): 

75 if isinstance(teardown_task, list): 

76 teardown_task = tuple(teardown_task) 

77 if ctx.get(teardown_task) is None: 

78 ctx[teardown_task] = [operator] 

79 else: 

80 ctx[teardown_task].append(operator) 

81 

82 @classmethod 

83 def pop_context_managed_teardown_task(cls) -> Operator | list[Operator] | None: 

84 old_teardown_task = cls._context_managed_teardown_task 

85 if cls._previous_context_managed_teardown_task: 

86 cls._context_managed_teardown_task = cls._previous_context_managed_teardown_task.pop() 

87 teardown_task = cls._context_managed_teardown_task 

88 if teardown_task and old_teardown_task: 

89 if isinstance(teardown_task, list): 

90 for task in teardown_task: 

91 task.set_upstream(old_teardown_task) 

92 else: 

93 teardown_task.set_upstream(old_teardown_task) 

94 else: 

95 cls._context_managed_teardown_task = None 

96 return old_teardown_task 

97 

98 @classmethod 

99 def get_context_managed_setup_task(cls) -> Operator | list[Operator] | None: 

100 return cls._context_managed_setup_task 

101 

102 @classmethod 

103 def get_context_managed_teardown_task(cls) -> Operator | list[Operator] | None: 

104 return cls._context_managed_teardown_task 

105 

106 @classmethod 

107 def push_setup_teardown_task(cls, operator: Operator | list[Operator]): 

108 if isinstance(operator, list): 

109 first_task: Operator = operator[0] 

110 if first_task.is_teardown: 

111 if not all(task.is_teardown == first_task.is_teardown for task in operator): 

112 raise ValueError("All tasks in the list must be either setup or teardown tasks") 

113 upstream_tasks = first_task.upstream_list 

114 for task in upstream_tasks: 

115 if not task.is_setup and not task.is_teardown: 

116 raise ValueError( 

117 "All upstream tasks in the context manager must be a setup or teardown task" 

118 ) 

119 SetupTeardownContext.push_context_managed_teardown_task(operator) 

120 upstream_setup: list[Operator] = [task for task in upstream_tasks if task.is_setup] 

121 if upstream_setup: 

122 SetupTeardownContext.push_context_managed_setup_task(upstream_setup) 

123 elif first_task.is_setup: 

124 if not all(task.is_setup == first_task.is_setup for task in operator): 

125 raise ValueError("All tasks in the list must be either setup or teardown tasks") 

126 for task in first_task.upstream_list: 

127 if not task.is_setup and not task.is_teardown: 

128 raise ValueError( 

129 "All upstream tasks in the context manager must be a setup or teardown task" 

130 ) 

131 SetupTeardownContext.push_context_managed_setup_task(operator) 

132 downstream_teardown: list[Operator] = [ 

133 task for task in first_task.downstream_list if task.is_teardown 

134 ] 

135 if downstream_teardown: 

136 SetupTeardownContext.push_context_managed_teardown_task(downstream_teardown) 

137 elif operator.is_teardown: 

138 upstream_tasks = operator.upstream_list 

139 for task in upstream_tasks: 

140 if not task.is_setup and not task.is_teardown: 

141 raise ValueError( 

142 "All upstream tasks in the context manager must be a setup or teardown task" 

143 ) 

144 SetupTeardownContext.push_context_managed_teardown_task(operator) 

145 upstream_setup = [task for task in upstream_tasks if task.is_setup] 

146 if upstream_setup: 

147 SetupTeardownContext.push_context_managed_setup_task(upstream_setup) 

148 elif operator.is_setup: 

149 for task in operator.upstream_list: 

150 if not task.is_setup and not task.is_teardown: 

151 raise ValueError( 

152 "All upstream tasks in the context manager must be a setup or teardown task" 

153 ) 

154 SetupTeardownContext.push_context_managed_setup_task(operator) 

155 downstream_teardown = [task for task in operator.downstream_list if task.is_teardown] 

156 if downstream_teardown: 

157 SetupTeardownContext.push_context_managed_teardown_task(downstream_teardown) 

158 SetupTeardownContext.active = True 

159 

160 @classmethod 

161 def set_work_task_roots_and_leaves(cls): 

162 if setup_task := cls.get_context_managed_setup_task(): 

163 if isinstance(setup_task, list): 

164 setup_task = tuple(setup_task) 

165 tasks_in_context = cls.context_map.get(setup_task, []) 

166 if tasks_in_context: 

167 roots = [task for task in tasks_in_context if not task.upstream_list] 

168 if not roots: 

169 setup_task >> tasks_in_context[0] 

170 elif isinstance(setup_task, tuple): 

171 for task in setup_task: 

172 task >> roots 

173 else: 

174 setup_task >> roots 

175 if teardown_task := cls.get_context_managed_teardown_task(): 

176 if isinstance(teardown_task, list): 

177 teardown_task = tuple(teardown_task) 

178 tasks_in_context = cls.context_map.get(teardown_task, []) 

179 if tasks_in_context: 

180 leaves = [task for task in tasks_in_context if not task.downstream_list] 

181 if not leaves: 

182 teardown_task << tasks_in_context[-1] 

183 elif isinstance(teardown_task, tuple): 

184 for task in teardown_task: 

185 task << leaves 

186 else: 

187 teardown_task << leaves 

188 setup_task = SetupTeardownContext.pop_context_managed_setup_task() 

189 teardown_task = SetupTeardownContext.pop_context_managed_teardown_task() 

190 if isinstance(setup_task, list): 

191 setup_task = tuple(setup_task) 

192 if isinstance(teardown_task, list): 

193 teardown_task = tuple(teardown_task) 

194 SetupTeardownContext.active = False 

195 SetupTeardownContext.context_map.pop(setup_task, None) 

196 SetupTeardownContext.context_map.pop(teardown_task, None)