Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/edgemodifier.py: 27%

56 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19from typing import TYPE_CHECKING, Sequence 

20 

21from airflow.models.taskmixin import DependencyMixin 

22 

23if TYPE_CHECKING: 

24 from airflow.models.baseoperator import BaseOperator 

25 

26 

27class EdgeModifier(DependencyMixin): 

28 """ 

29 Class that represents edge information to be added between two 

30 tasks/operators. Has shorthand factory functions, like Label("hooray"). 

31 

32 Current implementation supports 

33 t1 >> Label("Success route") >> t2 

34 t2 << Label("Success route") << t2 

35 

36 Note that due to the potential for use in either direction, this waits 

37 to make the actual connection between both sides until both are declared, 

38 and will do so progressively if multiple ups/downs are added. 

39 

40 This and EdgeInfo are related - an EdgeModifier is the Python object you 

41 use to add information to (potentially multiple) edges, and EdgeInfo 

42 is the representation of the information for one specific edge. 

43 """ 

44 

45 def __init__(self, label: str | None = None): 

46 self.label = label 

47 self._upstream: list[BaseOperator] = [] 

48 self._downstream: list[BaseOperator] = [] 

49 

50 @property 

51 def roots(self): 

52 return self._downstream 

53 

54 @property 

55 def leaves(self): 

56 return self._upstream 

57 

58 def set_upstream( 

59 self, task_or_task_list: DependencyMixin | Sequence[DependencyMixin], chain: bool = True 

60 ): 

61 """ 

62 Sets the given task/list onto the upstream attribute, and then checks if 

63 we have both sides so we can resolve the relationship. 

64 

65 Providing this also provides << via DependencyMixin. 

66 """ 

67 from airflow.models.baseoperator import BaseOperator 

68 

69 # Ensure we have a list, even if it's just one item 

70 if isinstance(task_or_task_list, DependencyMixin): 

71 task_or_task_list = [task_or_task_list] 

72 # Unfurl it into actual operators 

73 operators: list[BaseOperator] = [] 

74 for task in task_or_task_list: 

75 for root in task.roots: 

76 if not isinstance(root, BaseOperator): 

77 raise TypeError(f"Cannot use edge labels with {type(root).__name__}, only operators") 

78 operators.append(root) 

79 # For each already-declared downstream, pair off with each new upstream 

80 # item and store the edge info. 

81 for operator in operators: 

82 for downstream in self._downstream: 

83 self.add_edge_info(operator.dag, operator.task_id, downstream.task_id) 

84 if chain: 

85 operator.set_downstream(downstream) 

86 # Add the new tasks to our list of ones we've seen 

87 self._upstream.extend(operators) 

88 

89 def set_downstream( 

90 self, task_or_task_list: DependencyMixin | Sequence[DependencyMixin], chain: bool = True 

91 ): 

92 """ 

93 Sets the given task/list onto the downstream attribute, and then checks if 

94 we have both sides so we can resolve the relationship. 

95 

96 Providing this also provides >> via DependencyMixin. 

97 """ 

98 from airflow.models.baseoperator import BaseOperator 

99 

100 # Ensure we have a list, even if it's just one item 

101 if isinstance(task_or_task_list, DependencyMixin): 

102 task_or_task_list = [task_or_task_list] 

103 # Unfurl it into actual operators 

104 operators: list[BaseOperator] = [] 

105 for task in task_or_task_list: 

106 for leaf in task.leaves: 

107 if not isinstance(leaf, BaseOperator): 

108 raise TypeError(f"Cannot use edge labels with {type(leaf).__name__}, only operators") 

109 operators.append(leaf) 

110 # Pair them off with existing 

111 for operator in operators: 

112 for upstream in self._upstream: 

113 self.add_edge_info(upstream.dag, upstream.task_id, operator.task_id) 

114 if chain: 

115 upstream.set_downstream(operator) 

116 # Add the new tasks to our list of ones we've seen 

117 self._downstream.extend(operators) 

118 

119 def update_relative(self, other: DependencyMixin, upstream: bool = True) -> None: 

120 """ 

121 Called if we're not the "main" side of a relationship; we still run the 

122 same logic, though. 

123 """ 

124 if upstream: 

125 self.set_upstream(other, chain=False) 

126 else: 

127 self.set_downstream(other, chain=False) 

128 

129 def add_edge_info(self, dag, upstream_id: str, downstream_id: str): 

130 """ 

131 Adds or updates task info on the DAG for this specific pair of tasks. 

132 

133 Called either from our relationship trigger methods above, or directly 

134 by set_upstream/set_downstream in operators. 

135 """ 

136 dag.set_edge_info(upstream_id, downstream_id, {"label": self.label}) 

137 

138 

139# Factory functions 

140def Label(label: str): 

141 """Creates an EdgeModifier that sets a human-readable label on the edge.""" 

142 return EdgeModifier(label=label)