Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/models/taskreschedule.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

63 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""TaskReschedule tracks rescheduled task instances.""" 

19 

20from __future__ import annotations 

21 

22import warnings 

23from typing import TYPE_CHECKING 

24 

25from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, String, asc, desc, select, text 

26from sqlalchemy.ext.associationproxy import association_proxy 

27from sqlalchemy.orm import relationship 

28 

29from airflow.exceptions import RemovedInAirflow3Warning 

30from airflow.models.base import COLLATION_ARGS, ID_LEN, TaskInstanceDependencies 

31from airflow.utils.session import NEW_SESSION, provide_session 

32from airflow.utils.sqlalchemy import UtcDateTime 

33 

34if TYPE_CHECKING: 

35 import datetime 

36 

37 from sqlalchemy.orm import Query, Session 

38 from sqlalchemy.sql import Select 

39 

40 from airflow.models.taskinstance import TaskInstance 

41 from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic 

42 

43 

44class TaskReschedule(TaskInstanceDependencies): 

45 """TaskReschedule tracks rescheduled task instances.""" 

46 

47 __tablename__ = "task_reschedule" 

48 

49 id = Column(Integer, primary_key=True) 

50 task_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False) 

51 dag_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False) 

52 run_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False) 

53 map_index = Column(Integer, nullable=False, server_default=text("-1")) 

54 try_number = Column(Integer, nullable=False) 

55 start_date = Column(UtcDateTime, nullable=False) 

56 end_date = Column(UtcDateTime, nullable=False) 

57 duration = Column(Integer, nullable=False) 

58 reschedule_date = Column(UtcDateTime, nullable=False) 

59 

60 __table_args__ = ( 

61 Index("idx_task_reschedule_dag_task_run", dag_id, task_id, run_id, map_index, unique=False), 

62 ForeignKeyConstraint( 

63 [dag_id, task_id, run_id, map_index], 

64 [ 

65 "task_instance.dag_id", 

66 "task_instance.task_id", 

67 "task_instance.run_id", 

68 "task_instance.map_index", 

69 ], 

70 name="task_reschedule_ti_fkey", 

71 ondelete="CASCADE", 

72 ), 

73 Index("idx_task_reschedule_dag_run", dag_id, run_id), 

74 ForeignKeyConstraint( 

75 [dag_id, run_id], 

76 ["dag_run.dag_id", "dag_run.run_id"], 

77 name="task_reschedule_dr_fkey", 

78 ondelete="CASCADE", 

79 ), 

80 ) 

81 dag_run = relationship("DagRun") 

82 execution_date = association_proxy("dag_run", "execution_date") 

83 

84 def __init__( 

85 self, 

86 task_id: str, 

87 dag_id: str, 

88 run_id: str, 

89 try_number: int, 

90 start_date: datetime.datetime, 

91 end_date: datetime.datetime, 

92 reschedule_date: datetime.datetime, 

93 map_index: int = -1, 

94 ) -> None: 

95 self.dag_id = dag_id 

96 self.task_id = task_id 

97 self.run_id = run_id 

98 self.map_index = map_index 

99 self.try_number = try_number 

100 self.start_date = start_date 

101 self.end_date = end_date 

102 self.reschedule_date = reschedule_date 

103 self.duration = (self.end_date - self.start_date).total_seconds() 

104 

105 @classmethod 

106 def stmt_for_task_instance( 

107 cls, 

108 ti: TaskInstance | TaskInstancePydantic, 

109 *, 

110 try_number: int | None = None, 

111 descending: bool = False, 

112 ) -> Select: 

113 """ 

114 Statement for task reschedules for a given the task instance. 

115 

116 :param ti: the task instance to find task reschedules for 

117 :param descending: If True then records are returned in descending order 

118 :param try_number: Look for TaskReschedule of the given try_number. Default is None which 

119 looks for the same try_number of the given task_instance. 

120 :meta private: 

121 """ 

122 if try_number is None: 

123 try_number = ti.try_number 

124 

125 return ( 

126 select(cls) 

127 .where( 

128 cls.dag_id == ti.dag_id, 

129 cls.task_id == ti.task_id, 

130 cls.run_id == ti.run_id, 

131 cls.map_index == ti.map_index, 

132 cls.try_number == try_number, 

133 ) 

134 .order_by(desc(cls.id) if descending else asc(cls.id)) 

135 ) 

136 

137 @staticmethod 

138 @provide_session 

139 def query_for_task_instance( 

140 task_instance: TaskInstance, 

141 descending: bool = False, 

142 session: Session = NEW_SESSION, 

143 try_number: int | None = None, 

144 ) -> Query: 

145 """ 

146 Return query for task reschedules for a given the task instance (deprecated). 

147 

148 :param session: the database session object 

149 :param task_instance: the task instance to find task reschedules for 

150 :param descending: If True then records are returned in descending order 

151 :param try_number: Look for TaskReschedule of the given try_number. Default is None which 

152 looks for the same try_number of the given task_instance. 

153 """ 

154 warnings.warn( 

155 "Using this method is no longer advised, and it is expected to be removed in the future.", 

156 category=RemovedInAirflow3Warning, 

157 stacklevel=2, 

158 ) 

159 

160 if try_number is None: 

161 try_number = task_instance.try_number 

162 

163 TR = TaskReschedule 

164 qry = session.query(TR).filter( 

165 TR.dag_id == task_instance.dag_id, 

166 TR.task_id == task_instance.task_id, 

167 TR.run_id == task_instance.run_id, 

168 TR.map_index == task_instance.map_index, 

169 TR.try_number == try_number, 

170 ) 

171 if descending: 

172 return qry.order_by(desc(TR.id)) 

173 else: 

174 return qry.order_by(asc(TR.id)) 

175 

176 @staticmethod 

177 @provide_session 

178 def find_for_task_instance( 

179 task_instance: TaskInstance, 

180 session: Session = NEW_SESSION, 

181 try_number: int | None = None, 

182 ) -> list[TaskReschedule]: 

183 """ 

184 Return all task reschedules for the task instance and try number, in ascending order. 

185 

186 :param session: the database session object 

187 :param task_instance: the task instance to find task reschedules for 

188 :param try_number: Look for TaskReschedule of the given try_number. Default is None which 

189 looks for the same try_number of the given task_instance. 

190 """ 

191 warnings.warn( 

192 "Using this method is no longer advised, and it is expected to be removed in the future.", 

193 category=RemovedInAirflow3Warning, 

194 stacklevel=2, 

195 ) 

196 return session.scalars( 

197 TaskReschedule.stmt_for_task_instance(ti=task_instance, try_number=try_number, descending=False) 

198 ).all()