Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/ti_deps/deps/valid_state_dep.py: 64%
25 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from airflow.exceptions import AirflowException
21from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
22from airflow.utils.session import provide_session
25class ValidStateDep(BaseTIDep):
26 """
27 Ensures that the task instance's state is in a given set of valid states.
29 :param valid_states: A list of valid states that a task instance can have to meet
30 this dependency.
31 :return: whether or not the task instance's state is valid
32 """
34 NAME = "Task Instance State"
35 IGNORABLE = True
37 def __init__(self, valid_states):
38 super().__init__()
40 if not valid_states:
41 raise AirflowException("ValidStatesDep received an empty set of valid states.")
42 self._valid_states = valid_states
44 def __eq__(self, other):
45 return isinstance(self, type(other)) and self._valid_states == other._valid_states
47 def __hash__(self):
48 return hash((type(self), tuple(self._valid_states)))
50 @provide_session
51 def _get_dep_statuses(self, ti, session, dep_context):
52 if dep_context.ignore_ti_state:
53 yield self._passing_status(reason="Context specified that state should be ignored.")
54 return
56 if ti.state in self._valid_states:
57 yield self._passing_status(reason=f"Task state {ti.state} was valid.")
58 return
60 yield self._failing_status(reason=f"Task is in the '{ti.state}' state.")