Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/models/taskfail.py: 61%
33 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Taskfail tracks the failed run durations of each task instance."""
19from __future__ import annotations
21from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, text
22from sqlalchemy.orm import relationship
24from airflow.models.base import Base, StringID
25from airflow.utils.sqlalchemy import UtcDateTime
28class TaskFail(Base):
29 """TaskFail tracks the failed run durations of each task instance."""
31 __tablename__ = "task_fail"
33 id = Column(Integer, primary_key=True)
34 task_id = Column(StringID(), nullable=False)
35 dag_id = Column(StringID(), nullable=False)
36 run_id = Column(StringID(), nullable=False)
37 map_index = Column(Integer, nullable=False, server_default=text("-1"))
38 start_date = Column(UtcDateTime)
39 end_date = Column(UtcDateTime)
40 duration = Column(Integer)
42 __table_args__ = (
43 Index("idx_task_fail_task_instance", dag_id, task_id, run_id, map_index),
44 ForeignKeyConstraint(
45 [dag_id, task_id, run_id, map_index],
46 [
47 "task_instance.dag_id",
48 "task_instance.task_id",
49 "task_instance.run_id",
50 "task_instance.map_index",
51 ],
52 name="task_fail_ti_fkey",
53 ondelete="CASCADE",
54 ),
55 )
57 # We don't need a DB level FK here, as we already have that to TI (which has one to DR) but by defining
58 # the relationship we can more easily find the execution date for these rows
59 dag_run = relationship(
60 "DagRun",
61 primaryjoin="""and_(
62 TaskFail.dag_id == foreign(DagRun.dag_id),
63 TaskFail.run_id == foreign(DagRun.run_id),
64 )""",
65 viewonly=True,
66 )
68 def __init__(self, ti):
69 self.dag_id = ti.dag_id
70 self.task_id = ti.task_id
71 self.run_id = ti.run_id
72 self.map_index = ti.map_index
73 self.start_date = ti.start_date
74 self.end_date = ti.end_date
75 if self.end_date and self.start_date:
76 self.duration = int((self.end_date - self.start_date).total_seconds())
77 else:
78 self.duration = None
80 def __repr__(self):
81 prefix = f"<{self.__class__.__name__}: {self.dag_id}.{self.task_id} {self.run_id}"
82 if self.map_index != -1:
83 prefix += f" map_index={self.map_index}"
84 return prefix + ">"