Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/ti_deps/deps/base_ti_dep.py: 57%
47 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from typing import TYPE_CHECKING, Any, Iterator, NamedTuple
22from airflow.ti_deps.dep_context import DepContext
23from airflow.utils.session import provide_session
25if TYPE_CHECKING:
26 from sqlalchemy.orm import Session
28 from airflow.models.taskinstance import TaskInstance
31class BaseTIDep:
32 """
33 Abstract base class for task instances dependencies.
35 All dependencies must be satisfied in order for task instances to run.
36 For example, a task that can only run if a certain number of its upstream tasks succeed.
37 This is an abstract class and must be subclassed to be used.
38 """
40 # If this dependency can be ignored by a context in which it is added to. Needed
41 # because some dependencies should never be ignorable in their contexts.
42 IGNORABLE = False
44 # Whether this dependency is not a global task instance dependency but specific
45 # to some tasks (e.g. depends_on_past is not specified by all tasks).
46 IS_TASK_DEP = False
48 def __eq__(self, other: Any) -> bool:
49 return isinstance(self, type(other))
51 def __hash__(self) -> int:
52 return hash(type(self))
54 def __repr__(self) -> str:
55 return f"<TIDep({self.name})>"
57 @property
58 def name(self) -> str:
59 """The human-readable name for the dependency.
61 Use the class name as the default if ``NAME`` is not provided.
62 """
63 return getattr(self, "NAME", self.__class__.__name__)
65 def _get_dep_statuses(
66 self,
67 ti: TaskInstance,
68 session: Session,
69 dep_context: DepContext,
70 ) -> Iterator[TIDepStatus]:
71 """
72 Abstract method that returns an iterable of TIDepStatus objects.
74 Each object describes whether the given task instance has this dependency met.
76 For example a subclass could return an iterable of TIDepStatus objects, each one
77 representing if each of the passed in task's upstream tasks succeeded or not.
79 :param ti: the task instance to get the dependency status for
80 :param session: database session
81 :param dep_context: the context for which this dependency should be evaluated for
82 """
83 raise NotImplementedError
85 @provide_session
86 def get_dep_statuses(
87 self,
88 ti: TaskInstance,
89 session: Session,
90 dep_context: DepContext | None = None,
91 ) -> Iterator[TIDepStatus]:
92 """
93 Wrapper around the private _get_dep_statuses method.
95 Contains some global checks for all dependencies.
97 :param ti: the task instance to get the dependency status for
98 :param session: database session
99 :param dep_context: the context for which this dependency should be evaluated for
100 """
101 cxt = DepContext() if dep_context is None else dep_context
103 if self.IGNORABLE and cxt.ignore_all_deps:
104 yield self._passing_status(reason="Context specified all dependencies should be ignored.")
105 return
107 if self.IS_TASK_DEP and cxt.ignore_task_deps:
108 yield self._passing_status(reason="Context specified all task dependencies should be ignored.")
109 return
111 yield from self._get_dep_statuses(ti, session, cxt)
113 @provide_session
114 def is_met(self, ti: TaskInstance, session: Session, dep_context: DepContext | None = None) -> bool:
115 """
116 Returns whether a dependency is met for a given task instance.
118 A dependency is considered met if all the dependency statuses it reports are passing.
120 :param ti: the task instance to see if this dependency is met for
121 :param session: database session
122 :param dep_context: The context this dependency is being checked under that stores
123 state that can be used by this dependency.
124 """
125 return all(status.passed for status in self.get_dep_statuses(ti, session, dep_context))
127 @provide_session
128 def get_failure_reasons(
129 self,
130 ti: TaskInstance,
131 session: Session,
132 dep_context: DepContext | None = None,
133 ) -> Iterator[str]:
134 """
135 Returns an iterable of strings that explain why this dependency wasn't met.
137 :param ti: the task instance to see if this dependency is met for
138 :param session: database session
139 :param dep_context: The context this dependency is being checked under that stores
140 state that can be used by this dependency.
141 """
142 for dep_status in self.get_dep_statuses(ti, session, dep_context):
143 if not dep_status.passed:
144 yield dep_status.reason
146 def _failing_status(self, reason: str = "") -> TIDepStatus:
147 return TIDepStatus(self.name, False, reason)
149 def _passing_status(self, reason: str = "") -> TIDepStatus:
150 return TIDepStatus(self.name, True, reason)
153class TIDepStatus(NamedTuple):
154 """Dependency status for a task instance indicating whether the task instance passed the dependency."""
156 dep_name: str
157 passed: bool
158 reason: str