Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/models/taskreschedule.py: 57%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""TaskReschedule tracks rescheduled task instances."""
20from __future__ import annotations
22import warnings
23from typing import TYPE_CHECKING
25from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, String, asc, desc, select, text
26from sqlalchemy.ext.associationproxy import association_proxy
27from sqlalchemy.orm import relationship
29from airflow.exceptions import RemovedInAirflow3Warning
30from airflow.models.base import COLLATION_ARGS, ID_LEN, TaskInstanceDependencies
31from airflow.utils.session import NEW_SESSION, provide_session
32from airflow.utils.sqlalchemy import UtcDateTime
34if TYPE_CHECKING:
35 import datetime
37 from sqlalchemy.orm import Query, Session
38 from sqlalchemy.sql import Select
40 from airflow.models.taskinstance import TaskInstance
41 from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
44class TaskReschedule(TaskInstanceDependencies):
45 """TaskReschedule tracks rescheduled task instances."""
47 __tablename__ = "task_reschedule"
49 id = Column(Integer, primary_key=True)
50 task_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
51 dag_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
52 run_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
53 map_index = Column(Integer, nullable=False, server_default=text("-1"))
54 try_number = Column(Integer, nullable=False)
55 start_date = Column(UtcDateTime, nullable=False)
56 end_date = Column(UtcDateTime, nullable=False)
57 duration = Column(Integer, nullable=False)
58 reschedule_date = Column(UtcDateTime, nullable=False)
60 __table_args__ = (
61 Index("idx_task_reschedule_dag_task_run", dag_id, task_id, run_id, map_index, unique=False),
62 ForeignKeyConstraint(
63 [dag_id, task_id, run_id, map_index],
64 [
65 "task_instance.dag_id",
66 "task_instance.task_id",
67 "task_instance.run_id",
68 "task_instance.map_index",
69 ],
70 name="task_reschedule_ti_fkey",
71 ondelete="CASCADE",
72 ),
73 Index("idx_task_reschedule_dag_run", dag_id, run_id),
74 ForeignKeyConstraint(
75 [dag_id, run_id],
76 ["dag_run.dag_id", "dag_run.run_id"],
77 name="task_reschedule_dr_fkey",
78 ondelete="CASCADE",
79 ),
80 )
81 dag_run = relationship("DagRun")
82 execution_date = association_proxy("dag_run", "execution_date")
84 def __init__(
85 self,
86 task_id: str,
87 dag_id: str,
88 run_id: str,
89 try_number: int,
90 start_date: datetime.datetime,
91 end_date: datetime.datetime,
92 reschedule_date: datetime.datetime,
93 map_index: int = -1,
94 ) -> None:
95 self.dag_id = dag_id
96 self.task_id = task_id
97 self.run_id = run_id
98 self.map_index = map_index
99 self.try_number = try_number
100 self.start_date = start_date
101 self.end_date = end_date
102 self.reschedule_date = reschedule_date
103 self.duration = (self.end_date - self.start_date).total_seconds()
105 @classmethod
106 def stmt_for_task_instance(
107 cls,
108 ti: TaskInstance | TaskInstancePydantic,
109 *,
110 try_number: int | None = None,
111 descending: bool = False,
112 ) -> Select:
113 """
114 Statement for task reschedules for a given the task instance.
116 :param ti: the task instance to find task reschedules for
117 :param descending: If True then records are returned in descending order
118 :param try_number: Look for TaskReschedule of the given try_number. Default is None which
119 looks for the same try_number of the given task_instance.
120 :meta private:
121 """
122 if try_number is None:
123 try_number = ti.try_number
125 return (
126 select(cls)
127 .where(
128 cls.dag_id == ti.dag_id,
129 cls.task_id == ti.task_id,
130 cls.run_id == ti.run_id,
131 cls.map_index == ti.map_index,
132 cls.try_number == try_number,
133 )
134 .order_by(desc(cls.id) if descending else asc(cls.id))
135 )
137 @staticmethod
138 @provide_session
139 def query_for_task_instance(
140 task_instance: TaskInstance,
141 descending: bool = False,
142 session: Session = NEW_SESSION,
143 try_number: int | None = None,
144 ) -> Query:
145 """
146 Return query for task reschedules for a given the task instance (deprecated).
148 :param session: the database session object
149 :param task_instance: the task instance to find task reschedules for
150 :param descending: If True then records are returned in descending order
151 :param try_number: Look for TaskReschedule of the given try_number. Default is None which
152 looks for the same try_number of the given task_instance.
153 """
154 warnings.warn(
155 "Using this method is no longer advised, and it is expected to be removed in the future.",
156 category=RemovedInAirflow3Warning,
157 stacklevel=2,
158 )
160 if try_number is None:
161 try_number = task_instance.try_number
163 TR = TaskReschedule
164 qry = session.query(TR).filter(
165 TR.dag_id == task_instance.dag_id,
166 TR.task_id == task_instance.task_id,
167 TR.run_id == task_instance.run_id,
168 TR.map_index == task_instance.map_index,
169 TR.try_number == try_number,
170 )
171 if descending:
172 return qry.order_by(desc(TR.id))
173 else:
174 return qry.order_by(asc(TR.id))
176 @staticmethod
177 @provide_session
178 def find_for_task_instance(
179 task_instance: TaskInstance,
180 session: Session = NEW_SESSION,
181 try_number: int | None = None,
182 ) -> list[TaskReschedule]:
183 """
184 Return all task reschedules for the task instance and try number, in ascending order.
186 :param session: the database session object
187 :param task_instance: the task instance to find task reschedules for
188 :param try_number: Look for TaskReschedule of the given try_number. Default is None which
189 looks for the same try_number of the given task_instance.
190 """
191 warnings.warn(
192 "Using this method is no longer advised, and it is expected to be removed in the future.",
193 category=RemovedInAirflow3Warning,
194 stacklevel=2,
195 )
196 return session.scalars(
197 TaskReschedule.stmt_for_task_instance(ti=task_instance, try_number=try_number, descending=False)
198 ).all()