Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/timetables/interval.py: 37%
104 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import datetime
20from typing import Any, Union
22from dateutil.relativedelta import relativedelta
23from pendulum import DateTime
25from airflow.exceptions import AirflowTimetableInvalid
26from airflow.timetables._cron import CronMixin
27from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
28from airflow.utils.timezone import convert_to_utc
30Delta = Union[datetime.timedelta, relativedelta]
33class _DataIntervalTimetable(Timetable):
34 """Basis for timetable implementations that schedule data intervals.
36 This kind of timetable classes create periodic data intervals from an
37 underlying schedule representation (e.g. a cron expression, or a timedelta
38 instance), and schedule a DagRun at the end of each interval.
39 """
41 def _skip_to_latest(self, earliest: DateTime | None) -> DateTime:
42 """Bound the earliest time a run can be scheduled.
44 This is called when ``catchup=False``. See docstring of subclasses for
45 exact skipping behaviour of a schedule.
46 """
47 raise NotImplementedError()
49 def _align_to_next(self, current: DateTime) -> DateTime:
50 """Align given time to the next scheduled time.
52 For fixed schedules (e.g. every midnight); this finds the next time that
53 aligns to the declared time, if the given time does not align. If the
54 schedule is not fixed (e.g. every hour), the given time is returned.
55 """
56 raise NotImplementedError()
58 def _align_to_prev(self, current: DateTime) -> DateTime:
59 """Align given time to the previous scheduled time.
61 For fixed schedules (e.g. every midnight); this finds the prev time that
62 aligns to the declared time, if the given time does not align. If the
63 schedule is not fixed (e.g. every hour), the given time is returned.
65 It is not enough to use ``_get_prev(_align_to_next())``, since when a
66 DAG's schedule changes, this alternative would make the first scheduling
67 after the schedule change remain the same.
68 """
69 raise NotImplementedError()
71 def _get_next(self, current: DateTime) -> DateTime:
72 """Get the first schedule after the current time."""
73 raise NotImplementedError()
75 def _get_prev(self, current: DateTime) -> DateTime:
76 """Get the last schedule before the current time."""
77 raise NotImplementedError()
79 def next_dagrun_info(
80 self,
81 *,
82 last_automated_data_interval: DataInterval | None,
83 restriction: TimeRestriction,
84 ) -> DagRunInfo | None:
85 earliest = restriction.earliest
86 if not restriction.catchup:
87 earliest = self._skip_to_latest(earliest)
88 elif earliest is not None:
89 earliest = self._align_to_next(earliest)
90 if last_automated_data_interval is None:
91 # First run; schedule the run at the first available time matching
92 # the schedule, and retrospectively create a data interval for it.
93 if earliest is None:
94 return None
95 start = earliest
96 else: # There's a previous run.
97 # Alignment is needed when DAG has new schedule interval.
98 align_last_data_interval_end = self._align_to_prev(last_automated_data_interval.end)
99 if earliest is not None:
100 # Catchup is False or DAG has new start date in the future.
101 # Make sure we get the later one.
102 start = max(align_last_data_interval_end, earliest)
103 else:
104 # Data interval starts from the end of the previous interval.
105 start = align_last_data_interval_end
106 if restriction.latest is not None and start > restriction.latest:
107 return None
108 end = self._get_next(start)
109 return DagRunInfo.interval(start=start, end=end)
112class CronDataIntervalTimetable(CronMixin, _DataIntervalTimetable):
113 """Timetable that schedules data intervals with a cron expression.
115 This corresponds to ``schedule=<cron>``, where ``<cron>`` is either
116 a five/six-segment representation, or one of ``cron_presets``.
118 The implementation extends on croniter to add timezone awareness. This is
119 because croniter works only with naive timestamps, and cannot consider DST
120 when determining the next/previous time.
122 Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
123 """
125 @classmethod
126 def deserialize(cls, data: dict[str, Any]) -> Timetable:
127 from airflow.serialization.serialized_objects import decode_timezone
129 return cls(data["expression"], decode_timezone(data["timezone"]))
131 def serialize(self) -> dict[str, Any]:
132 from airflow.serialization.serialized_objects import encode_timezone
134 return {"expression": self._expression, "timezone": encode_timezone(self._timezone)}
136 def _skip_to_latest(self, earliest: DateTime | None) -> DateTime:
137 """Bound the earliest time a run can be scheduled.
139 The logic is that we move start_date up until one period before, so the
140 current time is AFTER the period end, and the job can be created...
142 This is slightly different from the delta version at terminal values.
143 If the next schedule should start *right now*, we want the data interval
144 that start now, not the one that ends now.
145 """
146 current_time = DateTime.utcnow()
147 last_start = self._get_prev(current_time)
148 next_start = self._get_next(last_start)
149 if next_start == current_time: # Current time is on interval boundary.
150 new_start = last_start
151 elif next_start > current_time: # Current time is between boundaries.
152 new_start = self._get_prev(last_start)
153 else:
154 raise AssertionError("next schedule shouldn't be earlier")
155 if earliest is None:
156 return new_start
157 return max(new_start, self._align_to_next(earliest))
159 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
160 # Get the last complete period before run_after, e.g. if a DAG run is
161 # scheduled at each midnight, the data interval of a manually triggered
162 # run at 1am 25th is between 0am 24th and 0am 25th.
163 end = self._align_to_prev(run_after)
164 return DataInterval(start=self._get_prev(end), end=end)
167class DeltaDataIntervalTimetable(_DataIntervalTimetable):
168 """Timetable that schedules data intervals with a time delta.
170 This corresponds to ``schedule=<delta>``, where ``<delta>`` is
171 either a ``datetime.timedelta`` or ``dateutil.relativedelta.relativedelta``
172 instance.
173 """
175 def __init__(self, delta: Delta) -> None:
176 self._delta = delta
178 @classmethod
179 def deserialize(cls, data: dict[str, Any]) -> Timetable:
180 from airflow.serialization.serialized_objects import decode_relativedelta
182 delta = data["delta"]
183 if isinstance(delta, dict):
184 return cls(decode_relativedelta(delta))
185 return cls(datetime.timedelta(seconds=delta))
187 def __eq__(self, other: Any) -> bool:
188 """The offset should match.
190 This is only for testing purposes and should not be relied on otherwise.
191 """
192 if not isinstance(other, DeltaDataIntervalTimetable):
193 return NotImplemented
194 return self._delta == other._delta
196 @property
197 def summary(self) -> str:
198 return str(self._delta)
200 def serialize(self) -> dict[str, Any]:
201 from airflow.serialization.serialized_objects import encode_relativedelta
203 delta: Any
204 if isinstance(self._delta, datetime.timedelta):
205 delta = self._delta.total_seconds()
206 else:
207 delta = encode_relativedelta(self._delta)
208 return {"delta": delta}
210 def validate(self) -> None:
211 now = datetime.datetime.now()
212 if (now + self._delta) <= now:
213 raise AirflowTimetableInvalid(f"schedule interval must be positive, not {self._delta!r}")
215 def _get_next(self, current: DateTime) -> DateTime:
216 return convert_to_utc(current + self._delta)
218 def _get_prev(self, current: DateTime) -> DateTime:
219 return convert_to_utc(current - self._delta)
221 def _align_to_next(self, current: DateTime) -> DateTime:
222 return current
224 def _align_to_prev(self, current: DateTime) -> DateTime:
225 return current
227 def _skip_to_latest(self, earliest: DateTime | None) -> DateTime:
228 """Bound the earliest time a run can be scheduled.
230 The logic is that we move start_date up until one period before, so the
231 current time is AFTER the period end, and the job can be created...
233 This is slightly different from the cron version at terminal values.
234 """
235 new_start = self._get_prev(DateTime.utcnow())
236 if earliest is None:
237 return new_start
238 return max(new_start, earliest)
240 def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
241 return DataInterval(start=self._get_prev(run_after), end=run_after)