Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/models/taskreschedule.py: 57%
61 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""TaskReschedule tracks rescheduled task instances."""
19from __future__ import annotations
21import datetime
22from typing import TYPE_CHECKING
24from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, String, asc, desc, event, text
25from sqlalchemy.ext.associationproxy import association_proxy
26from sqlalchemy.orm import Query, Session, relationship
28from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
29from airflow.utils.session import NEW_SESSION, provide_session
30from airflow.utils.sqlalchemy import UtcDateTime
32if TYPE_CHECKING:
33 from airflow.models.operator import Operator
34 from airflow.models.taskinstance import TaskInstance
37class TaskReschedule(Base):
38 """TaskReschedule tracks rescheduled task instances."""
40 __tablename__ = "task_reschedule"
42 id = Column(Integer, primary_key=True)
43 task_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
44 dag_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
45 run_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
46 map_index = Column(Integer, nullable=False, server_default=text("-1"))
47 try_number = Column(Integer, nullable=False)
48 start_date = Column(UtcDateTime, nullable=False)
49 end_date = Column(UtcDateTime, nullable=False)
50 duration = Column(Integer, nullable=False)
51 reschedule_date = Column(UtcDateTime, nullable=False)
53 __table_args__ = (
54 Index("idx_task_reschedule_dag_task_run", dag_id, task_id, run_id, map_index, unique=False),
55 ForeignKeyConstraint(
56 [dag_id, task_id, run_id, map_index],
57 [
58 "task_instance.dag_id",
59 "task_instance.task_id",
60 "task_instance.run_id",
61 "task_instance.map_index",
62 ],
63 name="task_reschedule_ti_fkey",
64 ondelete="CASCADE",
65 ),
66 Index("idx_task_reschedule_dag_run", dag_id, run_id),
67 ForeignKeyConstraint(
68 [dag_id, run_id],
69 ["dag_run.dag_id", "dag_run.run_id"],
70 name="task_reschedule_dr_fkey",
71 ondelete="CASCADE",
72 ),
73 )
74 dag_run = relationship("DagRun")
75 execution_date = association_proxy("dag_run", "execution_date")
77 def __init__(
78 self,
79 task: Operator,
80 run_id: str,
81 try_number: int,
82 start_date: datetime.datetime,
83 end_date: datetime.datetime,
84 reschedule_date: datetime.datetime,
85 map_index: int = -1,
86 ) -> None:
87 self.dag_id = task.dag_id
88 self.task_id = task.task_id
89 self.run_id = run_id
90 self.map_index = map_index
91 self.try_number = try_number
92 self.start_date = start_date
93 self.end_date = end_date
94 self.reschedule_date = reschedule_date
95 self.duration = (self.end_date - self.start_date).total_seconds()
97 @staticmethod
98 @provide_session
99 def query_for_task_instance(
100 task_instance: TaskInstance,
101 descending: bool = False,
102 session: Session = NEW_SESSION,
103 try_number: int | None = None,
104 ) -> Query:
105 """
106 Returns query for task reschedules for a given the task instance.
108 :param session: the database session object
109 :param task_instance: the task instance to find task reschedules for
110 :param descending: If True then records are returned in descending order
111 :param try_number: Look for TaskReschedule of the given try_number. Default is None which
112 looks for the same try_number of the given task_instance.
113 """
114 if try_number is None:
115 try_number = task_instance.try_number
117 TR = TaskReschedule
118 qry = session.query(TR).filter(
119 TR.dag_id == task_instance.dag_id,
120 TR.task_id == task_instance.task_id,
121 TR.run_id == task_instance.run_id,
122 TR.map_index == task_instance.map_index,
123 TR.try_number == try_number,
124 )
125 if descending:
126 return qry.order_by(desc(TR.id))
127 else:
128 return qry.order_by(asc(TR.id))
130 @staticmethod
131 @provide_session
132 def find_for_task_instance(
133 task_instance: TaskInstance,
134 session: Session = NEW_SESSION,
135 try_number: int | None = None,
136 ) -> list[TaskReschedule]:
137 """
138 Returns all task reschedules for the task instance and try number,
139 in ascending order.
141 :param session: the database session object
142 :param task_instance: the task instance to find task reschedules for
143 :param try_number: Look for TaskReschedule of the given try_number. Default is None which
144 looks for the same try_number of the given task_instance.
145 """
146 return TaskReschedule.query_for_task_instance(
147 task_instance, session=session, try_number=try_number
148 ).all()
151@event.listens_for(TaskReschedule.__table__, "before_create")
152def add_ondelete_for_mssql(table, conn, **kw):
153 if conn.dialect.name != "mssql":
154 return
156 for constraint in table.constraints:
157 if constraint.name != "task_reschedule_dr_fkey":
158 continue
159 constraint.ondelete = "NO ACTION"
160 return