Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/ti_deps/dep_context.py: 61%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

36 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from typing import TYPE_CHECKING 

21 

22import attr 

23 

24from airflow.exceptions import TaskNotFound 

25from airflow.utils.state import State 

26 

27if TYPE_CHECKING: 

28 from sqlalchemy.orm.session import Session 

29 

30 from airflow.models.dagrun import DagRun 

31 from airflow.models.taskinstance import TaskInstance 

32 

33 

34@attr.define 

35class DepContext: 

36 """ 

37 A base class for dependency contexts. 

38 

39 Specifies which dependencies should be evaluated in the context for a task 

40 instance to satisfy the requirements of the context. Also stores state 

41 related to the context that can be used by dependency classes. 

42 

43 For example there could be a SomeRunContext that subclasses this class which has 

44 dependencies for: 

45 

46 - Making sure there are slots available on the infrastructure to run the task instance 

47 - A task-instance's task-specific dependencies are met (e.g. the previous task 

48 instance completed successfully) 

49 - ... 

50 

51 :param deps: The context-specific dependencies that need to be evaluated for a 

52 task instance to run in this execution context. 

53 :param flag_upstream_failed: This is a hack to generate the upstream_failed state 

54 creation while checking to see whether the task instance is runnable. It was the 

55 shortest path to add the feature. This is bad since this class should be pure (no 

56 side effects). 

57 :param ignore_all_deps: Whether or not the context should ignore all ignorable 

58 dependencies. Overrides the other ignore_* parameters 

59 :param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs (e.g. for 

60 Backfills) 

61 :param wait_for_past_depends_before_skipping: Wait for past depends before marking the ti as skipped 

62 :param ignore_in_retry_period: Ignore the retry period for task instances 

63 :param ignore_in_reschedule_period: Ignore the reschedule period for task instances 

64 :param ignore_unmapped_tasks: Ignore errors about mapped tasks not yet being expanded 

65 :param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past and 

66 trigger rule 

67 :param ignore_ti_state: Ignore the task instance's previous failure/success 

68 :param finished_tis: A list of all the finished task instances of this run 

69 """ 

70 

71 deps: set = attr.ib(factory=set) 

72 flag_upstream_failed: bool = False 

73 ignore_all_deps: bool = False 

74 ignore_depends_on_past: bool = False 

75 wait_for_past_depends_before_skipping: bool = False 

76 ignore_in_retry_period: bool = False 

77 ignore_in_reschedule_period: bool = False 

78 ignore_task_deps: bool = False 

79 ignore_ti_state: bool = False 

80 ignore_unmapped_tasks: bool = False 

81 finished_tis: list[TaskInstance] | None = None 

82 description: str | None = None 

83 

84 have_changed_ti_states: bool = False 

85 """Have any of the TIs state's been changed as a result of evaluating dependencies""" 

86 

87 def ensure_finished_tis(self, dag_run: DagRun, session: Session) -> list[TaskInstance]: 

88 """ 

89 Ensure finished_tis is populated if it's currently None, which allows running tasks without dag_run. 

90 

91 :param dag_run: The DagRun for which to find finished tasks 

92 :return: A list of all the finished tasks of this DAG and execution_date 

93 """ 

94 if self.finished_tis is None: 

95 finished_tis = dag_run.get_task_instances(state=State.finished, session=session) 

96 for ti in finished_tis: 

97 if not getattr(ti, "task", None) is not None and dag_run.dag: 

98 try: 

99 ti.task = dag_run.dag.get_task(ti.task_id) 

100 except TaskNotFound: 

101 pass 

102 

103 self.finished_tis = finished_tis 

104 else: 

105 finished_tis = self.finished_tis 

106 return finished_tis