Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/models/taskreschedule.py: 58%
60 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""TaskReschedule tracks rescheduled task instances."""
19from __future__ import annotations
21import datetime
22from typing import TYPE_CHECKING
24from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, String, asc, desc, event, text
25from sqlalchemy.ext.associationproxy import association_proxy
26from sqlalchemy.orm import relationship
28from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
29from airflow.utils.session import provide_session
30from airflow.utils.sqlalchemy import UtcDateTime
32if TYPE_CHECKING:
33 from airflow.models.baseoperator import BaseOperator
36class TaskReschedule(Base):
37 """TaskReschedule tracks rescheduled task instances."""
39 __tablename__ = "task_reschedule"
41 id = Column(Integer, primary_key=True)
42 task_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
43 dag_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
44 run_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
45 map_index = Column(Integer, nullable=False, server_default=text("-1"))
46 try_number = Column(Integer, nullable=False)
47 start_date = Column(UtcDateTime, nullable=False)
48 end_date = Column(UtcDateTime, nullable=False)
49 duration = Column(Integer, nullable=False)
50 reschedule_date = Column(UtcDateTime, nullable=False)
52 __table_args__ = (
53 Index("idx_task_reschedule_dag_task_run", dag_id, task_id, run_id, map_index, unique=False),
54 ForeignKeyConstraint(
55 [dag_id, task_id, run_id, map_index],
56 [
57 "task_instance.dag_id",
58 "task_instance.task_id",
59 "task_instance.run_id",
60 "task_instance.map_index",
61 ],
62 name="task_reschedule_ti_fkey",
63 ondelete="CASCADE",
64 ),
65 Index("idx_task_reschedule_dag_run", dag_id, run_id),
66 ForeignKeyConstraint(
67 [dag_id, run_id],
68 ["dag_run.dag_id", "dag_run.run_id"],
69 name="task_reschedule_dr_fkey",
70 ondelete="CASCADE",
71 ),
72 )
73 dag_run = relationship("DagRun")
74 execution_date = association_proxy("dag_run", "execution_date")
76 def __init__(
77 self,
78 task: BaseOperator,
79 run_id: str,
80 try_number: int,
81 start_date: datetime.datetime,
82 end_date: datetime.datetime,
83 reschedule_date: datetime.datetime,
84 map_index: int = -1,
85 ):
86 self.dag_id = task.dag_id
87 self.task_id = task.task_id
88 self.run_id = run_id
89 self.map_index = map_index
90 self.try_number = try_number
91 self.start_date = start_date
92 self.end_date = end_date
93 self.reschedule_date = reschedule_date
94 self.duration = (self.end_date - self.start_date).total_seconds()
96 @staticmethod
97 @provide_session
98 def query_for_task_instance(task_instance, descending=False, session=None, try_number=None):
99 """
100 Returns query for task reschedules for a given the task instance.
102 :param session: the database session object
103 :param task_instance: the task instance to find task reschedules for
104 :param descending: If True then records are returned in descending order
105 :param try_number: Look for TaskReschedule of the given try_number. Default is None which
106 looks for the same try_number of the given task_instance.
107 """
108 if try_number is None:
109 try_number = task_instance.try_number
111 TR = TaskReschedule
112 qry = session.query(TR).filter(
113 TR.dag_id == task_instance.dag_id,
114 TR.task_id == task_instance.task_id,
115 TR.run_id == task_instance.run_id,
116 TR.map_index == task_instance.map_index,
117 TR.try_number == try_number,
118 )
119 if descending:
120 return qry.order_by(desc(TR.id))
121 else:
122 return qry.order_by(asc(TR.id))
124 @staticmethod
125 @provide_session
126 def find_for_task_instance(task_instance, session=None, try_number=None):
127 """
128 Returns all task reschedules for the task instance and try number,
129 in ascending order.
131 :param session: the database session object
132 :param task_instance: the task instance to find task reschedules for
133 :param try_number: Look for TaskReschedule of the given try_number. Default is None which
134 looks for the same try_number of the given task_instance.
135 """
136 return TaskReschedule.query_for_task_instance(
137 task_instance, session=session, try_number=try_number
138 ).all()
141@event.listens_for(TaskReschedule.__table__, "before_create")
142def add_ondelete_for_mssql(table, conn, **kw):
143 if conn.dialect.name != "mssql":
144 return
146 for constraint in table.constraints:
147 if constraint.name != "task_reschedule_dr_fkey":
148 continue
149 constraint.ondelete = "NO ACTION"
150 return