Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/ti_deps/dep_context.py: 64%
36 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from typing import TYPE_CHECKING
22import attr
23from sqlalchemy.orm.session import Session
25from airflow.exceptions import TaskNotFound
26from airflow.utils.state import State
28if TYPE_CHECKING:
29 from airflow.models.dagrun import DagRun
30 from airflow.models.taskinstance import TaskInstance
33@attr.define
34class DepContext:
35 """
36 A base class for dependency contexts.
38 Specifies which dependencies should be evaluated in the context for a task
39 instance to satisfy the requirements of the context. Also stores state
40 related to the context that can be used by dependency classes.
42 For example there could be a SomeRunContext that subclasses this class which has
43 dependencies for:
45 - Making sure there are slots available on the infrastructure to run the task instance
46 - A task-instance's task-specific dependencies are met (e.g. the previous task
47 instance completed successfully)
48 - ...
50 :param deps: The context-specific dependencies that need to be evaluated for a
51 task instance to run in this execution context.
52 :param flag_upstream_failed: This is a hack to generate the upstream_failed state
53 creation while checking to see whether the task instance is runnable. It was the
54 shortest path to add the feature. This is bad since this class should be pure (no
55 side effects).
56 :param ignore_all_deps: Whether or not the context should ignore all ignorable
57 dependencies. Overrides the other ignore_* parameters
58 :param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs (e.g. for
59 Backfills)
60 :param wait_for_past_depends_before_skipping: Wait for past depends before marking the ti as skipped
61 :param ignore_in_retry_period: Ignore the retry period for task instances
62 :param ignore_in_reschedule_period: Ignore the reschedule period for task instances
63 :param ignore_unmapped_tasks: Ignore errors about mapped tasks not yet being expanded
64 :param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past and
65 trigger rule
66 :param ignore_ti_state: Ignore the task instance's previous failure/success
67 :param finished_tis: A list of all the finished task instances of this run
68 """
70 deps: set = attr.ib(factory=set)
71 flag_upstream_failed: bool = False
72 ignore_all_deps: bool = False
73 ignore_depends_on_past: bool = False
74 wait_for_past_depends_before_skipping: bool = False
75 ignore_in_retry_period: bool = False
76 ignore_in_reschedule_period: bool = False
77 ignore_task_deps: bool = False
78 ignore_ti_state: bool = False
79 ignore_unmapped_tasks: bool = False
80 finished_tis: list[TaskInstance] | None = None
81 description: str | None = None
83 have_changed_ti_states: bool = False
84 """Have any of the TIs state's been changed as a result of evaluating dependencies"""
86 def ensure_finished_tis(self, dag_run: DagRun, session: Session) -> list[TaskInstance]:
87 """
88 This method makes sure finished_tis is populated if it's currently None.
89 This is for the strange feature of running tasks without dag_run.
91 :param dag_run: The DagRun for which to find finished tasks
92 :return: A list of all the finished tasks of this DAG and execution_date
93 """
94 if self.finished_tis is None:
95 finished_tis = dag_run.get_task_instances(state=State.finished, session=session)
96 for ti in finished_tis:
97 if not hasattr(ti, "task") and dag_run.dag:
98 try:
99 ti.task = dag_run.dag.get_task(ti.task_id)
100 except TaskNotFound:
101 pass
103 self.finished_tis = finished_tis
104 else:
105 finished_tis = self.finished_tis
106 return finished_tis