Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/timetables/simple.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

86 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19from typing import TYPE_CHECKING, Any, Collection, Sequence 

20 

21from airflow.timetables.base import DagRunInfo, DataInterval, Timetable 

22from airflow.utils import timezone 

23 

24if TYPE_CHECKING: 

25 from pendulum import DateTime 

26 from sqlalchemy import Session 

27 

28 from airflow.models.dataset import DatasetEvent 

29 from airflow.timetables.base import TimeRestriction 

30 from airflow.utils.types import DagRunType 

31 

32 

33class _TrivialTimetable(Timetable): 

34 """Some code reuse for "trivial" timetables that has nothing complex.""" 

35 

36 periodic = False 

37 run_ordering: Sequence[str] = ("execution_date",) 

38 

39 @classmethod 

40 def deserialize(cls, data: dict[str, Any]) -> Timetable: 

41 return cls() 

42 

43 def __eq__(self, other: Any) -> bool: 

44 """As long as *other* is of the same type. 

45 

46 This is only for testing purposes and should not be relied on otherwise. 

47 """ 

48 if not isinstance(other, type(self)): 

49 return NotImplemented 

50 return True 

51 

52 def serialize(self) -> dict[str, Any]: 

53 return {} 

54 

55 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: 

56 return DataInterval.exact(run_after) 

57 

58 

59class NullTimetable(_TrivialTimetable): 

60 """Timetable that never schedules anything. 

61 

62 This corresponds to ``schedule=None``. 

63 """ 

64 

65 can_be_scheduled = False 

66 description: str = "Never, external triggers only" 

67 

68 @property 

69 def summary(self) -> str: 

70 return "None" 

71 

72 def next_dagrun_info( 

73 self, 

74 *, 

75 last_automated_data_interval: DataInterval | None, 

76 restriction: TimeRestriction, 

77 ) -> DagRunInfo | None: 

78 return None 

79 

80 

81class OnceTimetable(_TrivialTimetable): 

82 """Timetable that schedules the execution once as soon as possible. 

83 

84 This corresponds to ``schedule="@once"``. 

85 """ 

86 

87 description: str = "Once, as soon as possible" 

88 

89 @property 

90 def summary(self) -> str: 

91 return "@once" 

92 

93 def next_dagrun_info( 

94 self, 

95 *, 

96 last_automated_data_interval: DataInterval | None, 

97 restriction: TimeRestriction, 

98 ) -> DagRunInfo | None: 

99 if last_automated_data_interval is not None: 

100 return None # Already run, no more scheduling. 

101 if restriction.earliest is None: # No start date, won't run. 

102 return None 

103 # "@once" always schedule to the start_date determined by the DAG and 

104 # tasks, regardless of catchup or not. This has been the case since 1.10 

105 # and we're inheriting it. 

106 run_after = restriction.earliest 

107 if restriction.latest is not None and run_after > restriction.latest: 

108 return None 

109 return DagRunInfo.exact(run_after) 

110 

111 

112class ContinuousTimetable(_TrivialTimetable): 

113 """Timetable that schedules continually, while still respecting start_date and end_date. 

114 

115 This corresponds to ``schedule="@continuous"``. 

116 """ 

117 

118 description: str = "As frequently as possible, but only one run at a time." 

119 

120 active_runs_limit = 1 # Continuous DAGRuns should be constrained to one run at a time 

121 

122 @property 

123 def summary(self) -> str: 

124 return "@continuous" 

125 

126 def next_dagrun_info( 

127 self, 

128 *, 

129 last_automated_data_interval: DataInterval | None, 

130 restriction: TimeRestriction, 

131 ) -> DagRunInfo | None: 

132 if restriction.earliest is None: # No start date, won't run. 

133 return None 

134 if last_automated_data_interval is not None: # has already run once 

135 start = last_automated_data_interval.end 

136 end = timezone.coerce_datetime(timezone.utcnow()) 

137 else: # first run 

138 start = restriction.earliest 

139 end = max( 

140 restriction.earliest, timezone.coerce_datetime(timezone.utcnow()) 

141 ) # won't run any earlier than start_date 

142 

143 if restriction.latest is not None and end > restriction.latest: 

144 return None 

145 

146 return DagRunInfo.interval(start, end) 

147 

148 

149class DatasetTriggeredTimetable(_TrivialTimetable): 

150 """Timetable that never schedules anything. 

151 

152 This should not be directly used anywhere, but only set if a DAG is triggered by datasets. 

153 

154 :meta private: 

155 """ 

156 

157 description: str = "Triggered by datasets" 

158 

159 @property 

160 def summary(self) -> str: 

161 return "Dataset" 

162 

163 def generate_run_id( 

164 self, 

165 *, 

166 run_type: DagRunType, 

167 logical_date: DateTime, 

168 data_interval: DataInterval | None, 

169 session: Session | None = None, 

170 events: Collection[DatasetEvent] | None = None, 

171 **extra, 

172 ) -> str: 

173 from airflow.models.dagrun import DagRun 

174 

175 return DagRun.generate_run_id(run_type, logical_date) 

176 

177 def data_interval_for_events( 

178 self, 

179 logical_date: DateTime, 

180 events: Collection[DatasetEvent], 

181 ) -> DataInterval: 

182 if not events: 

183 return DataInterval(logical_date, logical_date) 

184 

185 start_dates, end_dates = [], [] 

186 for event in events: 

187 if event.source_dag_run is not None: 

188 start_dates.append(event.source_dag_run.data_interval_start) 

189 end_dates.append(event.source_dag_run.data_interval_end) 

190 else: 

191 start_dates.append(event.timestamp) 

192 end_dates.append(event.timestamp) 

193 

194 start = min(start_dates) 

195 end = max(end_dates) 

196 return DataInterval(start, end) 

197 

198 def next_dagrun_info( 

199 self, 

200 *, 

201 last_automated_data_interval: DataInterval | None, 

202 restriction: TimeRestriction, 

203 ) -> DagRunInfo | None: 

204 return None