Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/ti_deps/deps/valid_state_dep.py: 64%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from airflow.exceptions import AirflowException
21from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
22from airflow.utils.session import provide_session
25class ValidStateDep(BaseTIDep):
26 """
27 Ensures that the task instance's state is in a given set of valid states.
29 :param valid_states: A list of valid states that a task instance can have to meet
30 this dependency.
31 :return: whether or not the task instance's state is valid
32 """
34 NAME = "Task Instance State"
35 IGNORABLE = True
37 def __init__(self, valid_states):
38 super().__init__()
40 if not valid_states:
41 raise AirflowException("ValidStatesDep received an empty set of valid states.")
42 self._valid_states = valid_states
44 def __eq__(self, other):
45 """Check if two task instance dependencies are equal by comparing their types and valid states."""
46 return isinstance(self, type(other)) and self._valid_states == other._valid_states
48 def __hash__(self):
49 """Compute the hash value based on the type of the task instance dependency and its valid states."""
50 return hash((type(self), tuple(self._valid_states)))
52 @provide_session
53 def _get_dep_statuses(self, ti, session, dep_context):
54 if dep_context.ignore_ti_state:
55 yield self._passing_status(reason="Context specified that state should be ignored.")
56 return
58 if ti.state in self._valid_states:
59 yield self._passing_status(reason=f"Task state {ti.state} was valid.")
60 return
62 yield self._failing_status(reason=f"Task is in the '{ti.state}' state.")