Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/ti_deps/deps/base_ti_dep.py: 57%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from typing import TYPE_CHECKING, Any, Iterator, NamedTuple
22from airflow.ti_deps.dep_context import DepContext
23from airflow.utils.session import provide_session
25if TYPE_CHECKING:
26 from sqlalchemy.orm import Session
28 from airflow.models.taskinstance import TaskInstance
31class BaseTIDep:
32 """
33 Abstract base class for task instances dependencies.
35 All dependencies must be satisfied in order for task instances to run.
36 For example, a task that can only run if a certain number of its upstream tasks succeed.
37 This is an abstract class and must be subclassed to be used.
38 """
40 # If this dependency can be ignored by a context in which it is added to. Needed
41 # because some dependencies should never be ignorable in their contexts.
42 IGNORABLE = False
44 # Whether this dependency is not a global task instance dependency but specific
45 # to some tasks (e.g. depends_on_past is not specified by all tasks).
46 IS_TASK_DEP = False
48 def __eq__(self, other: Any) -> bool:
49 """Check if two task instance dependencies are equal by comparing their types."""
50 return isinstance(self, type(other))
52 def __hash__(self) -> int:
53 """Compute the hash value based on the task instance dependency type."""
54 return hash(type(self))
56 def __repr__(self) -> str:
57 """Return a string representation of the task instance dependency."""
58 return f"<TIDep({self.name})>"
60 @property
61 def name(self) -> str:
62 """The human-readable name for the dependency.
64 Use the class name as the default if ``NAME`` is not provided.
65 """
66 return getattr(self, "NAME", self.__class__.__name__)
68 def _get_dep_statuses(
69 self,
70 ti: TaskInstance,
71 session: Session,
72 dep_context: DepContext,
73 ) -> Iterator[TIDepStatus]:
74 """
75 Abstract method that returns an iterable of TIDepStatus objects.
77 Each object describes whether the given task instance has this dependency met.
79 For example a subclass could return an iterable of TIDepStatus objects, each one
80 representing if each of the passed in task's upstream tasks succeeded or not.
82 :param ti: the task instance to get the dependency status for
83 :param session: database session
84 :param dep_context: the context for which this dependency should be evaluated for
85 """
86 raise NotImplementedError
88 @provide_session
89 def get_dep_statuses(
90 self,
91 ti: TaskInstance,
92 session: Session,
93 dep_context: DepContext | None = None,
94 ) -> Iterator[TIDepStatus]:
95 """
96 Wrap around the private _get_dep_statuses method.
98 Contains some global checks for all dependencies.
100 :param ti: the task instance to get the dependency status for
101 :param session: database session
102 :param dep_context: the context for which this dependency should be evaluated for
103 """
104 cxt = DepContext() if dep_context is None else dep_context
106 if self.IGNORABLE and cxt.ignore_all_deps:
107 yield self._passing_status(reason="Context specified all dependencies should be ignored.")
108 return
110 if self.IS_TASK_DEP and cxt.ignore_task_deps:
111 yield self._passing_status(reason="Context specified all task dependencies should be ignored.")
112 return
114 yield from self._get_dep_statuses(ti, session, cxt)
116 @provide_session
117 def is_met(self, ti: TaskInstance, session: Session, dep_context: DepContext | None = None) -> bool:
118 """
119 Return whether a dependency is met for a given task instance.
121 A dependency is considered met if all the dependency statuses it reports are passing.
123 :param ti: the task instance to see if this dependency is met for
124 :param session: database session
125 :param dep_context: The context this dependency is being checked under that stores
126 state that can be used by this dependency.
127 """
128 return all(status.passed for status in self.get_dep_statuses(ti, session, dep_context))
130 @provide_session
131 def get_failure_reasons(
132 self,
133 ti: TaskInstance,
134 session: Session,
135 dep_context: DepContext | None = None,
136 ) -> Iterator[str]:
137 """
138 Return an iterable of strings that explain why this dependency wasn't met.
140 :param ti: the task instance to see if this dependency is met for
141 :param session: database session
142 :param dep_context: The context this dependency is being checked under that stores
143 state that can be used by this dependency.
144 """
145 for dep_status in self.get_dep_statuses(ti, session, dep_context):
146 if not dep_status.passed:
147 yield dep_status.reason
149 def _failing_status(self, reason: str = "") -> TIDepStatus:
150 return TIDepStatus(self.name, False, reason)
152 def _passing_status(self, reason: str = "") -> TIDepStatus:
153 return TIDepStatus(self.name, True, reason)
156class TIDepStatus(NamedTuple):
157 """Dependency status for a task instance indicating whether the task instance passed the dependency."""
159 dep_name: str
160 passed: bool
161 reason: str