1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Taskfail tracks the failed run durations of each task instance."""
19
20from __future__ import annotations
21
22from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, text
23from sqlalchemy.orm import relationship
24
25from airflow.models.base import StringID, TaskInstanceDependencies
26from airflow.utils.sqlalchemy import UtcDateTime
27
28
29class TaskFail(TaskInstanceDependencies):
30 """TaskFail tracks the failed run durations of each task instance."""
31
32 __tablename__ = "task_fail"
33
34 id = Column(Integer, primary_key=True)
35 task_id = Column(StringID(), nullable=False)
36 dag_id = Column(StringID(), nullable=False)
37 run_id = Column(StringID(), nullable=False)
38 map_index = Column(Integer, nullable=False, server_default=text("-1"))
39 start_date = Column(UtcDateTime)
40 end_date = Column(UtcDateTime)
41 duration = Column(Integer)
42
43 __table_args__ = (
44 Index("idx_task_fail_task_instance", dag_id, task_id, run_id, map_index),
45 ForeignKeyConstraint(
46 [dag_id, task_id, run_id, map_index],
47 [
48 "task_instance.dag_id",
49 "task_instance.task_id",
50 "task_instance.run_id",
51 "task_instance.map_index",
52 ],
53 name="task_fail_ti_fkey",
54 ondelete="CASCADE",
55 ),
56 )
57
58 # We don't need a DB level FK here, as we already have that to TI (which has one to DR) but by defining
59 # the relationship we can more easily find the execution date for these rows
60 dag_run = relationship(
61 "DagRun",
62 primaryjoin="""and_(
63 TaskFail.dag_id == foreign(DagRun.dag_id),
64 TaskFail.run_id == foreign(DagRun.run_id),
65 )""",
66 viewonly=True,
67 )
68
69 def __init__(self, ti):
70 self.dag_id = ti.dag_id
71 self.task_id = ti.task_id
72 self.run_id = ti.run_id
73 self.map_index = ti.map_index
74 self.start_date = ti.start_date
75 self.end_date = ti.end_date
76 if self.end_date and self.start_date:
77 self.duration = int((self.end_date - self.start_date).total_seconds())
78 else:
79 self.duration = None
80
81 def __repr__(self):
82 prefix = f"<{self.__class__.__name__}: {self.dag_id}.{self.task_id} {self.run_id}"
83 if self.map_index != -1:
84 prefix += f" map_index={self.map_index}"
85 return prefix + ">"