Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/timetables/simple.py: 49%

78 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import operator 

20from typing import TYPE_CHECKING, Any, Collection 

21 

22from pendulum import DateTime 

23 

24from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable 

25 

26if TYPE_CHECKING: 

27 from sqlalchemy import Session 

28 

29 from airflow.models.dataset import DatasetEvent 

30 from airflow.utils.types import DagRunType 

31 

32 

33class _TrivialTimetable(Timetable): 

34 """Some code reuse for "trivial" timetables that has nothing complex.""" 

35 

36 periodic = False 

37 run_ordering = ("execution_date",) 

38 

39 @classmethod 

40 def deserialize(cls, data: dict[str, Any]) -> Timetable: 

41 return cls() 

42 

43 def __eq__(self, other: Any) -> bool: 

44 """As long as *other* is of the same type. 

45 

46 This is only for testing purposes and should not be relied on otherwise. 

47 """ 

48 if not isinstance(other, type(self)): 

49 return NotImplemented 

50 return True 

51 

52 def serialize(self) -> dict[str, Any]: 

53 return {} 

54 

55 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: 

56 return DataInterval.exact(run_after) 

57 

58 

59class NullTimetable(_TrivialTimetable): 

60 """Timetable that never schedules anything. 

61 

62 This corresponds to ``schedule=None``. 

63 """ 

64 

65 can_be_scheduled = False 

66 description: str = "Never, external triggers only" 

67 

68 @property 

69 def summary(self) -> str: 

70 return "None" 

71 

72 def next_dagrun_info( 

73 self, 

74 *, 

75 last_automated_data_interval: DataInterval | None, 

76 restriction: TimeRestriction, 

77 ) -> DagRunInfo | None: 

78 return None 

79 

80 

81class OnceTimetable(_TrivialTimetable): 

82 """Timetable that schedules the execution once as soon as possible. 

83 

84 This corresponds to ``schedule="@once"``. 

85 """ 

86 

87 description: str = "Once, as soon as possible" 

88 

89 @property 

90 def summary(self) -> str: 

91 return "@once" 

92 

93 def next_dagrun_info( 

94 self, 

95 *, 

96 last_automated_data_interval: DataInterval | None, 

97 restriction: TimeRestriction, 

98 ) -> DagRunInfo | None: 

99 if last_automated_data_interval is not None: 

100 return None # Already run, no more scheduling. 

101 if restriction.earliest is None: # No start date, won't run. 

102 return None 

103 # "@once" always schedule to the start_date determined by the DAG and 

104 # tasks, regardless of catchup or not. This has been the case since 1.10 

105 # and we're inheriting it. See AIRFLOW-1928. 

106 run_after = restriction.earliest 

107 if restriction.latest is not None and run_after > restriction.latest: 

108 return None 

109 return DagRunInfo.exact(run_after) 

110 

111 

112class ContinuousTimetable(_TrivialTimetable): 

113 """Timetable that schedules continually, while still respecting start_date and end_date. 

114 

115 This corresponds to ``schedule="@continuous"``. 

116 """ 

117 

118 description: str = "As frequently as possible, but only one run at a time." 

119 

120 active_runs_limit = 1 # Continuous DAGRuns should be constrained to one run at a time 

121 

122 @property 

123 def summary(self) -> str: 

124 return "@continuous" 

125 

126 def next_dagrun_info( 

127 self, 

128 *, 

129 last_automated_data_interval: DataInterval | None, 

130 restriction: TimeRestriction, 

131 ) -> DagRunInfo | None: 

132 if restriction.earliest is None: # No start date, won't run. 

133 return None 

134 if last_automated_data_interval is not None: # has already run once 

135 start = last_automated_data_interval.end 

136 end = DateTime.utcnow() 

137 else: # first run 

138 start = restriction.earliest 

139 end = max(restriction.earliest, DateTime.utcnow()) # won't run any earlier than start_date 

140 

141 if restriction.latest is not None and end > restriction.latest: 

142 return None 

143 

144 return DagRunInfo.interval(start, end) 

145 

146 

147class DatasetTriggeredTimetable(_TrivialTimetable): 

148 """Timetable that never schedules anything. 

149 

150 This should not be directly used anywhere, but only set if a DAG is triggered by datasets. 

151 

152 :meta private: 

153 """ 

154 

155 description: str = "Triggered by datasets" 

156 

157 @property 

158 def summary(self) -> str: 

159 return "Dataset" 

160 

161 def generate_run_id( 

162 self, 

163 *, 

164 run_type: DagRunType, 

165 logical_date: DateTime, 

166 data_interval: DataInterval | None, 

167 session: Session | None = None, 

168 events: Collection[DatasetEvent] | None = None, 

169 **extra, 

170 ) -> str: 

171 from airflow.models.dagrun import DagRun 

172 

173 return DagRun.generate_run_id(run_type, logical_date) 

174 

175 def data_interval_for_events( 

176 self, 

177 logical_date: DateTime, 

178 events: Collection[DatasetEvent], 

179 ) -> DataInterval: 

180 

181 if not events: 

182 return DataInterval(logical_date, logical_date) 

183 

184 start = min( 

185 events, key=operator.attrgetter("source_dag_run.data_interval_start") 

186 ).source_dag_run.data_interval_start 

187 end = max( 

188 events, key=operator.attrgetter("source_dag_run.data_interval_end") 

189 ).source_dag_run.data_interval_end 

190 return DataInterval(start, end) 

191 

192 def next_dagrun_info( 

193 self, 

194 *, 

195 last_automated_data_interval: DataInterval | None, 

196 restriction: TimeRestriction, 

197 ) -> DagRunInfo | None: 

198 return None