Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/serialization/dag_dependency.py: 63%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

19 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19from dataclasses import dataclass 

20from typing import Literal 

21 

22 

23@dataclass(frozen=True, order=True) 

24class DagDependency: 

25 """ 

26 Dataclass for representing dependencies between dags. 

27 

28 These are calculated during serialization and attached to serialized dags. 

29 

30 The source and target keys store the information of what component depends on what. 

31 For an asset related dependency, a root node will have the source value equal to its dependency_type and 

32 an end node will have the target value equal to its dependency_type. It's easier to explain by examples. 

33 

34 For the example below, 

35 

36 .. code-block:: python 

37 

38 # we assume the asset is active 

39 DAG(dag_id="dag_1", schedule=[Asset.ref(uri="uri")]) 

40 

41 we get dag dependency like 

42 

43 .. code-block:: python 

44 

45 DagDependency( 

46 source="asset", 

47 target="dag_1", 

48 label="name", # asset name, we always use asset name as label 

49 dependency_type="asset", 

50 dependency_id=1, # asset id 

51 ) 

52 

53 This will look like `Asset name` -> `Dag dag_1` on the dependency graph. This is a root asset node as it 

54 has the source value as asset, and it points to its target "dag_1" 

55 

56 For more complex dependency like asset alias, 

57 

58 .. code-block:: python 

59 

60 # we assume the asset is active 

61 DAG( 

62 dag_id="dag_2", 

63 schedule=[ 

64 AssetAlias(name="alias_1"), # resolved into Asset(uri="uri", name="name") 

65 AssetAlias(name="alias_2"), # resolved to nothing 

66 ], 

67 ) 

68 

69 we'll need to store more data, 

70 

71 .. code-block:: python 

72 

73 [ 

74 DagDependency( 

75 source="asset", 

76 target="asset-alias:alias_1", 

77 label="name", 

78 dependency_type="asset", 

79 dependency_id="1", 

80 ), 

81 DagDependency( 

82 source="asset:1", 

83 target="dag_2", 

84 label="alias_1", 

85 dependency_type="asset-alias", 

86 dependency_id="alias_1", 

87 ), 

88 DagDependency( 

89 source="asset-alias", 

90 target="dag_2", 

91 label="alias_2", 

92 dependency_type="asset-alias", 

93 dependency_id="alias_2", 

94 ), 

95 ] 

96 

97 

98 We want it to look like `Asset name` -> `AssetAlias alias_1` -> `Dag dag_1` on the dependency graph. The 

99 first node here is a root node point to an asset alias. Thus, its target is set to the asset we're point 

100 to. The second node represents the asset alias points to this asset and then this asset points to the dag. 

101 The third node represents a dependency between an asset alias and dag directly as it's not resolved. 

102 

103 For asset ref cases, it works similar to asset if it's a valid asset ref. If not, it works the same as 

104 an unresolved asset alias. 

105 """ 

106 

107 source: str 

108 target: str 

109 label: str 

110 dependency_type: Literal["asset", "asset-alias", "asset-name-ref", "asset-uri-ref", "trigger", "sensor"] 

111 dependency_id: str | None = None 

112 

113 @property 

114 def node_id(self): 

115 """Node ID for graph rendering.""" 

116 val = f"{self.dependency_type}" 

117 if self.dependency_type not in ("asset", "asset-alias", "asset-name-ref", "asset-uri-ref"): 

118 val = f"{val}:{self.source}:{self.target}" 

119 if self.dependency_id: 

120 val = f"{val}:{self.dependency_id}" 

121 return val