Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/timetables/interval.py: 37%

104 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import datetime 

20from typing import Any, Union 

21 

22from dateutil.relativedelta import relativedelta 

23from pendulum import DateTime 

24 

25from airflow.exceptions import AirflowTimetableInvalid 

26from airflow.timetables._cron import CronMixin 

27from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable 

28from airflow.utils.timezone import convert_to_utc 

29 

30Delta = Union[datetime.timedelta, relativedelta] 

31 

32 

33class _DataIntervalTimetable(Timetable): 

34 """Basis for timetable implementations that schedule data intervals. 

35 

36 This kind of timetable classes create periodic data intervals from an 

37 underlying schedule representation (e.g. a cron expression, or a timedelta 

38 instance), and schedule a DagRun at the end of each interval. 

39 """ 

40 

41 def _skip_to_latest(self, earliest: DateTime | None) -> DateTime: 

42 """Bound the earliest time a run can be scheduled. 

43 

44 This is called when ``catchup=False``. See docstring of subclasses for 

45 exact skipping behaviour of a schedule. 

46 """ 

47 raise NotImplementedError() 

48 

49 def _align_to_next(self, current: DateTime) -> DateTime: 

50 """Align given time to the next scheduled time. 

51 

52 For fixed schedules (e.g. every midnight); this finds the next time that 

53 aligns to the declared time, if the given time does not align. If the 

54 schedule is not fixed (e.g. every hour), the given time is returned. 

55 """ 

56 raise NotImplementedError() 

57 

58 def _align_to_prev(self, current: DateTime) -> DateTime: 

59 """Align given time to the previous scheduled time. 

60 

61 For fixed schedules (e.g. every midnight); this finds the prev time that 

62 aligns to the declared time, if the given time does not align. If the 

63 schedule is not fixed (e.g. every hour), the given time is returned. 

64 

65 It is not enough to use ``_get_prev(_align_to_next())``, since when a 

66 DAG's schedule changes, this alternative would make the first scheduling 

67 after the schedule change remain the same. 

68 """ 

69 raise NotImplementedError() 

70 

71 def _get_next(self, current: DateTime) -> DateTime: 

72 """Get the first schedule after the current time.""" 

73 raise NotImplementedError() 

74 

75 def _get_prev(self, current: DateTime) -> DateTime: 

76 """Get the last schedule before the current time.""" 

77 raise NotImplementedError() 

78 

79 def next_dagrun_info( 

80 self, 

81 *, 

82 last_automated_data_interval: DataInterval | None, 

83 restriction: TimeRestriction, 

84 ) -> DagRunInfo | None: 

85 earliest = restriction.earliest 

86 if not restriction.catchup: 

87 earliest = self._skip_to_latest(earliest) 

88 elif earliest is not None: 

89 earliest = self._align_to_next(earliest) 

90 if last_automated_data_interval is None: 

91 # First run; schedule the run at the first available time matching 

92 # the schedule, and retrospectively create a data interval for it. 

93 if earliest is None: 

94 return None 

95 start = earliest 

96 else: # There's a previous run. 

97 # Alignment is needed when DAG has new schedule interval. 

98 align_last_data_interval_end = self._align_to_prev(last_automated_data_interval.end) 

99 if earliest is not None: 

100 # Catchup is False or DAG has new start date in the future. 

101 # Make sure we get the later one. 

102 start = max(align_last_data_interval_end, earliest) 

103 else: 

104 # Data interval starts from the end of the previous interval. 

105 start = align_last_data_interval_end 

106 if restriction.latest is not None and start > restriction.latest: 

107 return None 

108 end = self._get_next(start) 

109 return DagRunInfo.interval(start=start, end=end) 

110 

111 

112class CronDataIntervalTimetable(CronMixin, _DataIntervalTimetable): 

113 """Timetable that schedules data intervals with a cron expression. 

114 

115 This corresponds to ``schedule=<cron>``, where ``<cron>`` is either 

116 a five/six-segment representation, or one of ``cron_presets``. 

117 

118 The implementation extends on croniter to add timezone awareness. This is 

119 because croniter works only with naive timestamps, and cannot consider DST 

120 when determining the next/previous time. 

121 

122 Don't pass ``@once`` in here; use ``OnceTimetable`` instead. 

123 """ 

124 

125 @classmethod 

126 def deserialize(cls, data: dict[str, Any]) -> Timetable: 

127 from airflow.serialization.serialized_objects import decode_timezone 

128 

129 return cls(data["expression"], decode_timezone(data["timezone"])) 

130 

131 def serialize(self) -> dict[str, Any]: 

132 from airflow.serialization.serialized_objects import encode_timezone 

133 

134 return {"expression": self._expression, "timezone": encode_timezone(self._timezone)} 

135 

136 def _skip_to_latest(self, earliest: DateTime | None) -> DateTime: 

137 """Bound the earliest time a run can be scheduled. 

138 

139 The logic is that we move start_date up until one period before, so the 

140 current time is AFTER the period end, and the job can be created... 

141 

142 This is slightly different from the delta version at terminal values. 

143 If the next schedule should start *right now*, we want the data interval 

144 that start now, not the one that ends now. 

145 """ 

146 current_time = DateTime.utcnow() 

147 last_start = self._get_prev(current_time) 

148 next_start = self._get_next(last_start) 

149 if next_start == current_time: # Current time is on interval boundary. 

150 new_start = last_start 

151 elif next_start > current_time: # Current time is between boundaries. 

152 new_start = self._get_prev(last_start) 

153 else: 

154 raise AssertionError("next schedule shouldn't be earlier") 

155 if earliest is None: 

156 return new_start 

157 return max(new_start, self._align_to_next(earliest)) 

158 

159 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: 

160 # Get the last complete period before run_after, e.g. if a DAG run is 

161 # scheduled at each midnight, the data interval of a manually triggered 

162 # run at 1am 25th is between 0am 24th and 0am 25th. 

163 end = self._align_to_prev(run_after) 

164 return DataInterval(start=self._get_prev(end), end=end) 

165 

166 

167class DeltaDataIntervalTimetable(_DataIntervalTimetable): 

168 """Timetable that schedules data intervals with a time delta. 

169 

170 This corresponds to ``schedule=<delta>``, where ``<delta>`` is 

171 either a ``datetime.timedelta`` or ``dateutil.relativedelta.relativedelta`` 

172 instance. 

173 """ 

174 

175 def __init__(self, delta: Delta) -> None: 

176 self._delta = delta 

177 

178 @classmethod 

179 def deserialize(cls, data: dict[str, Any]) -> Timetable: 

180 from airflow.serialization.serialized_objects import decode_relativedelta 

181 

182 delta = data["delta"] 

183 if isinstance(delta, dict): 

184 return cls(decode_relativedelta(delta)) 

185 return cls(datetime.timedelta(seconds=delta)) 

186 

187 def __eq__(self, other: Any) -> bool: 

188 """The offset should match. 

189 

190 This is only for testing purposes and should not be relied on otherwise. 

191 """ 

192 if not isinstance(other, DeltaDataIntervalTimetable): 

193 return NotImplemented 

194 return self._delta == other._delta 

195 

196 @property 

197 def summary(self) -> str: 

198 return str(self._delta) 

199 

200 def serialize(self) -> dict[str, Any]: 

201 from airflow.serialization.serialized_objects import encode_relativedelta 

202 

203 delta: Any 

204 if isinstance(self._delta, datetime.timedelta): 

205 delta = self._delta.total_seconds() 

206 else: 

207 delta = encode_relativedelta(self._delta) 

208 return {"delta": delta} 

209 

210 def validate(self) -> None: 

211 now = datetime.datetime.now() 

212 if (now + self._delta) <= now: 

213 raise AirflowTimetableInvalid(f"schedule interval must be positive, not {self._delta!r}") 

214 

215 def _get_next(self, current: DateTime) -> DateTime: 

216 return convert_to_utc(current + self._delta) 

217 

218 def _get_prev(self, current: DateTime) -> DateTime: 

219 return convert_to_utc(current - self._delta) 

220 

221 def _align_to_next(self, current: DateTime) -> DateTime: 

222 return current 

223 

224 def _align_to_prev(self, current: DateTime) -> DateTime: 

225 return current 

226 

227 def _skip_to_latest(self, earliest: DateTime | None) -> DateTime: 

228 """Bound the earliest time a run can be scheduled. 

229 

230 The logic is that we move start_date up until one period before, so the 

231 current time is AFTER the period end, and the job can be created... 

232 

233 This is slightly different from the cron version at terminal values. 

234 """ 

235 new_start = self._get_prev(DateTime.utcnow()) 

236 if earliest is None: 

237 return new_start 

238 return max(new_start, earliest) 

239 

240 def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval: 

241 return DataInterval(start=self._get_prev(run_after), end=run_after)