1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
18
19import datetime
20from typing import TYPE_CHECKING, Any
21
22from airflow.timetables._cron import CronMixin
23from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
24from airflow.utils import timezone
25
26if TYPE_CHECKING:
27 from dateutil.relativedelta import relativedelta
28 from pendulum import DateTime
29 from pendulum.tz.timezone import FixedTimezone, Timezone
30
31 from airflow.timetables.base import TimeRestriction
32
33
34class CronTriggerTimetable(CronMixin, Timetable):
35 """Timetable that triggers DAG runs according to a cron expression.
36
37 This is different from ``CronDataIntervalTimetable``, where the cron
38 expression specifies the *data interval* of a DAG run. With this timetable,
39 the data intervals are specified independently from the cron expression.
40 Also for the same reason, this timetable kicks off a DAG run immediately at
41 the start of the period (similar to POSIX cron), instead of needing to wait
42 for one data interval to pass.
43
44 Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
45 """
46
47 def __init__(
48 self,
49 cron: str,
50 *,
51 timezone: str | Timezone | FixedTimezone,
52 interval: datetime.timedelta | relativedelta = datetime.timedelta(),
53 ) -> None:
54 super().__init__(cron, timezone)
55 self._interval = interval
56
57 @classmethod
58 def deserialize(cls, data: dict[str, Any]) -> Timetable:
59 from airflow.serialization.serialized_objects import decode_relativedelta, decode_timezone
60
61 interval: datetime.timedelta | relativedelta
62 if isinstance(data["interval"], dict):
63 interval = decode_relativedelta(data["interval"])
64 else:
65 interval = datetime.timedelta(seconds=data["interval"])
66 return cls(data["expression"], timezone=decode_timezone(data["timezone"]), interval=interval)
67
68 def serialize(self) -> dict[str, Any]:
69 from airflow.serialization.serialized_objects import encode_relativedelta, encode_timezone
70
71 interval: float | dict[str, Any]
72 if isinstance(self._interval, datetime.timedelta):
73 interval = self._interval.total_seconds()
74 else:
75 interval = encode_relativedelta(self._interval)
76 timezone = encode_timezone(self._timezone)
77 return {"expression": self._expression, "timezone": timezone, "interval": interval}
78
79 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
80 return DataInterval(
81 # pendulum.Datetime ± timedelta should return pendulum.Datetime
82 # however mypy decide that output would be datetime.datetime
83 run_after - self._interval, # type: ignore[arg-type]
84 run_after,
85 )
86
87 def next_dagrun_info(
88 self,
89 *,
90 last_automated_data_interval: DataInterval | None,
91 restriction: TimeRestriction,
92 ) -> DagRunInfo | None:
93 if restriction.catchup:
94 if last_automated_data_interval is not None:
95 next_start_time = self._get_next(last_automated_data_interval.end)
96 elif restriction.earliest is None:
97 return None # Don't know where to catch up from, give up.
98 else:
99 next_start_time = self._align_to_next(restriction.earliest)
100 else:
101 start_time_candidates = [self._align_to_prev(timezone.coerce_datetime(timezone.utcnow()))]
102 if last_automated_data_interval is not None:
103 start_time_candidates.append(self._get_next(last_automated_data_interval.end))
104 if restriction.earliest is not None:
105 start_time_candidates.append(self._align_to_next(restriction.earliest))
106 next_start_time = max(start_time_candidates)
107 if restriction.latest is not None and restriction.latest < next_start_time:
108 return None
109 return DagRunInfo.interval(
110 # pendulum.Datetime ± timedelta should return pendulum.Datetime
111 # however mypy decide that output would be datetime.datetime
112 next_start_time - self._interval, # type: ignore[arg-type]
113 next_start_time,
114 )