Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/timetables/interval.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

117 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import datetime 

20from typing import TYPE_CHECKING, Any, Union 

21 

22from dateutil.relativedelta import relativedelta 

23from pendulum import DateTime 

24 

25from airflow.exceptions import AirflowTimetableInvalid 

26from airflow.timetables._cron import CronMixin 

27from airflow.timetables.base import DagRunInfo, DataInterval, Timetable 

28from airflow.utils.timezone import coerce_datetime, convert_to_utc, utcnow 

29 

30if TYPE_CHECKING: 

31 from airflow.timetables.base import TimeRestriction 

32 

33Delta = Union[datetime.timedelta, relativedelta] 

34 

35 

36class _DataIntervalTimetable(Timetable): 

37 """Basis for timetable implementations that schedule data intervals. 

38 

39 This kind of timetable classes create periodic data intervals from an 

40 underlying schedule representation (e.g. a cron expression, or a timedelta 

41 instance), and schedule a DagRun at the end of each interval. 

42 """ 

43 

44 def _skip_to_latest(self, earliest: DateTime | None) -> DateTime: 

45 """Bound the earliest time a run can be scheduled. 

46 

47 This is called when ``catchup=False``. See docstring of subclasses for 

48 exact skipping behaviour of a schedule. 

49 """ 

50 raise NotImplementedError() 

51 

52 def _align_to_next(self, current: DateTime) -> DateTime: 

53 """Align given time to the next scheduled time. 

54 

55 For fixed schedules (e.g. every midnight); this finds the next time that 

56 aligns to the declared time, if the given time does not align. If the 

57 schedule is not fixed (e.g. every hour), the given time is returned. 

58 """ 

59 raise NotImplementedError() 

60 

61 def _align_to_prev(self, current: DateTime) -> DateTime: 

62 """Align given time to the previous scheduled time. 

63 

64 For fixed schedules (e.g. every midnight); this finds the prev time that 

65 aligns to the declared time, if the given time does not align. If the 

66 schedule is not fixed (e.g. every hour), the given time is returned. 

67 

68 It is not enough to use ``_get_prev(_align_to_next())``, since when a 

69 DAG's schedule changes, this alternative would make the first scheduling 

70 after the schedule change remain the same. 

71 """ 

72 raise NotImplementedError() 

73 

74 def _get_next(self, current: DateTime) -> DateTime: 

75 """Get the first schedule after the current time.""" 

76 raise NotImplementedError() 

77 

78 def _get_prev(self, current: DateTime) -> DateTime: 

79 """Get the last schedule before the current time.""" 

80 raise NotImplementedError() 

81 

82 def next_dagrun_info( 

83 self, 

84 *, 

85 last_automated_data_interval: DataInterval | None, 

86 restriction: TimeRestriction, 

87 ) -> DagRunInfo | None: 

88 earliest = restriction.earliest 

89 if not restriction.catchup: 

90 earliest = self._skip_to_latest(earliest) 

91 elif earliest is not None: 

92 earliest = self._align_to_next(earliest) 

93 if last_automated_data_interval is None: 

94 # First run; schedule the run at the first available time matching 

95 # the schedule, and retrospectively create a data interval for it. 

96 if earliest is None: 

97 return None 

98 start = earliest 

99 else: # There's a previous run. 

100 # Alignment is needed when DAG has new schedule interval. 

101 align_last_data_interval_end = self._align_to_prev(last_automated_data_interval.end) 

102 if earliest is not None: 

103 # Catchup is False or DAG has new start date in the future. 

104 # Make sure we get the later one. 

105 start = max(align_last_data_interval_end, earliest) 

106 else: 

107 # Data interval starts from the end of the previous interval. 

108 start = align_last_data_interval_end 

109 if restriction.latest is not None and start > restriction.latest: 

110 return None 

111 end = self._get_next(start) 

112 return DagRunInfo.interval(start=start, end=end) 

113 

114 

115class CronDataIntervalTimetable(CronMixin, _DataIntervalTimetable): 

116 """Timetable that schedules data intervals with a cron expression. 

117 

118 This corresponds to ``schedule=<cron>``, where ``<cron>`` is either 

119 a five/six-segment representation, or one of ``cron_presets``. 

120 

121 The implementation extends on croniter to add timezone awareness. This is 

122 because croniter works only with naive timestamps, and cannot consider DST 

123 when determining the next/previous time. 

124 

125 Don't pass ``@once`` in here; use ``OnceTimetable`` instead. 

126 """ 

127 

128 @classmethod 

129 def deserialize(cls, data: dict[str, Any]) -> Timetable: 

130 from airflow.serialization.serialized_objects import decode_timezone 

131 

132 return cls(data["expression"], decode_timezone(data["timezone"])) 

133 

134 def serialize(self) -> dict[str, Any]: 

135 from airflow.serialization.serialized_objects import encode_timezone 

136 

137 return {"expression": self._expression, "timezone": encode_timezone(self._timezone)} 

138 

139 def _skip_to_latest(self, earliest: DateTime | None) -> DateTime: 

140 """Bound the earliest time a run can be scheduled. 

141 

142 The logic is that we move start_date up until one period before, so the 

143 current time is AFTER the period end, and the job can be created... 

144 

145 This is slightly different from the delta version at terminal values. 

146 If the next schedule should start *right now*, we want the data interval 

147 that start now, not the one that ends now. 

148 """ 

149 current_time = coerce_datetime(utcnow()) 

150 last_start = self._get_prev(current_time) 

151 next_start = self._get_next(last_start) 

152 if next_start == current_time: # Current time is on interval boundary. 

153 new_start = last_start 

154 elif next_start > current_time: # Current time is between boundaries. 

155 new_start = self._get_prev(last_start) 

156 else: 

157 raise AssertionError("next schedule shouldn't be earlier") 

158 if earliest is None: 

159 return new_start 

160 return max(new_start, self._align_to_next(earliest)) 

161 

162 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: 

163 # Get the last complete period before run_after, e.g. if a DAG run is 

164 # scheduled at each midnight, the data interval of a manually triggered 

165 # run at 1am 25th is between 0am 24th and 0am 25th. 

166 end = self._align_to_prev(run_after) 

167 return DataInterval(start=self._get_prev(end), end=end) 

168 

169 

170class DeltaDataIntervalTimetable(_DataIntervalTimetable): 

171 """Timetable that schedules data intervals with a time delta. 

172 

173 This corresponds to ``schedule=<delta>``, where ``<delta>`` is 

174 either a ``datetime.timedelta`` or ``dateutil.relativedelta.relativedelta`` 

175 instance. 

176 """ 

177 

178 def __init__(self, delta: Delta) -> None: 

179 self._delta = delta 

180 

181 @classmethod 

182 def deserialize(cls, data: dict[str, Any]) -> Timetable: 

183 from airflow.serialization.serialized_objects import decode_relativedelta 

184 

185 delta = data["delta"] 

186 if isinstance(delta, dict): 

187 return cls(decode_relativedelta(delta)) 

188 return cls(datetime.timedelta(seconds=delta)) 

189 

190 def __eq__(self, other: Any) -> bool: 

191 """ 

192 Return if the offsets match. 

193 

194 This is only for testing purposes and should not be relied on otherwise. 

195 """ 

196 if not isinstance(other, DeltaDataIntervalTimetable): 

197 return NotImplemented 

198 return self._delta == other._delta 

199 

200 @property 

201 def summary(self) -> str: 

202 return str(self._delta) 

203 

204 def serialize(self) -> dict[str, Any]: 

205 from airflow.serialization.serialized_objects import encode_relativedelta 

206 

207 delta: Any 

208 if isinstance(self._delta, datetime.timedelta): 

209 delta = self._delta.total_seconds() 

210 else: 

211 delta = encode_relativedelta(self._delta) 

212 return {"delta": delta} 

213 

214 def validate(self) -> None: 

215 now = datetime.datetime.now() 

216 if (now + self._delta) <= now: 

217 raise AirflowTimetableInvalid(f"schedule interval must be positive, not {self._delta!r}") 

218 

219 def _get_next(self, current: DateTime) -> DateTime: 

220 return convert_to_utc(current + self._delta) 

221 

222 def _get_prev(self, current: DateTime) -> DateTime: 

223 return convert_to_utc(current - self._delta) 

224 

225 def _align_to_next(self, current: DateTime) -> DateTime: 

226 return current 

227 

228 def _align_to_prev(self, current: DateTime) -> DateTime: 

229 return current 

230 

231 @staticmethod 

232 def _relativedelta_in_seconds(delta: relativedelta) -> int: 

233 return ( 

234 delta.years * 365 * 24 * 60 * 60 

235 + delta.months * 30 * 24 * 60 * 60 

236 + delta.days * 24 * 60 * 60 

237 + delta.hours * 60 * 60 

238 + delta.minutes * 60 

239 + delta.seconds 

240 ) 

241 

242 def _round(self, dt: DateTime) -> DateTime: 

243 """Round the given time to the nearest interval.""" 

244 if isinstance(self._delta, datetime.timedelta): 

245 delta_in_seconds = self._delta.total_seconds() 

246 else: 

247 delta_in_seconds = self._relativedelta_in_seconds(self._delta) 

248 dt_in_seconds = dt.timestamp() 

249 rounded_dt = dt_in_seconds - (dt_in_seconds % delta_in_seconds) 

250 return DateTime.fromtimestamp(rounded_dt, tz=dt.tzinfo) 

251 

252 def _skip_to_latest(self, earliest: DateTime | None) -> DateTime: 

253 """Bound the earliest time a run can be scheduled. 

254 

255 The logic is that we move start_date up until one period before, so the 

256 current time is AFTER the period end, and the job can be created... 

257 

258 This is slightly different from the cron version at terminal values. 

259 """ 

260 round_current_time = self._round(coerce_datetime(utcnow())) 

261 new_start = self._get_prev(round_current_time) 

262 if earliest is None: 

263 return new_start 

264 return max(new_start, earliest) 

265 

266 def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval: 

267 return DataInterval(start=self._get_prev(run_after), end=run_after)