Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/timetables/base.py: 78%
49 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19from typing import TYPE_CHECKING, Any, NamedTuple, Sequence
21from pendulum import DateTime
23from airflow.typing_compat import Protocol, runtime_checkable
25if TYPE_CHECKING:
26 from airflow.utils.types import DagRunType
29class DataInterval(NamedTuple):
30 """A data interval for a DagRun to operate over.
32 Both ``start`` and ``end`` **MUST** be "aware", i.e. contain timezone
33 information.
34 """
36 start: DateTime
37 end: DateTime
39 @classmethod
40 def exact(cls, at: DateTime) -> DataInterval:
41 """Represent an "interval" containing only an exact time."""
42 return cls(start=at, end=at)
45class TimeRestriction(NamedTuple):
46 """Restriction on when a DAG can be scheduled for a run.
48 Specifically, the run must not be earlier than ``earliest``, nor later than
49 ``latest``. If ``catchup`` is *False*, the run must also not be earlier than
50 the current time, i.e. "missed" schedules are not backfilled.
52 These values are generally set on the DAG or task's ``start_date``,
53 ``end_date``, and ``catchup`` arguments.
55 Both ``earliest`` and ``latest``, if not *None*, are inclusive; a DAG run
56 can happen exactly at either point of time. They are guaranteed to be aware
57 (i.e. contain timezone information) for ``TimeRestriction`` instances
58 created by Airflow.
59 """
61 earliest: DateTime | None
62 latest: DateTime | None
63 catchup: bool
66class DagRunInfo(NamedTuple):
67 """Information to schedule a DagRun.
69 Instances of this will be returned by timetables when they are asked to
70 schedule a DagRun creation.
71 """
73 run_after: DateTime
74 """The earliest time this DagRun is created and its tasks scheduled.
76 This **MUST** be "aware", i.e. contain timezone information.
77 """
79 data_interval: DataInterval
80 """The data interval this DagRun to operate over."""
82 @classmethod
83 def exact(cls, at: DateTime) -> DagRunInfo:
84 """Represent a run on an exact time."""
85 return cls(run_after=at, data_interval=DataInterval.exact(at))
87 @classmethod
88 def interval(cls, start: DateTime, end: DateTime) -> DagRunInfo:
89 """Represent a run on a continuous schedule.
91 In such a schedule, each data interval starts right after the previous
92 one ends, and each run is scheduled right after the interval ends. This
93 applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
94 """
95 return cls(run_after=end, data_interval=DataInterval(start, end))
97 @property
98 def logical_date(self: DagRunInfo) -> DateTime:
99 """Infer the logical date to represent a DagRun.
101 This replaces ``execution_date`` in Airflow 2.1 and prior. The idea is
102 essentially the same, just a different name.
103 """
104 return self.data_interval.start
107@runtime_checkable
108class Timetable(Protocol):
109 """Protocol that all Timetable classes are expected to implement."""
111 description: str = ""
112 """Human-readable description of the timetable.
114 For example, this can produce something like ``'At 21:30, only on Friday'``
115 from the cron expression ``'30 21 * * 5'``. This is used in the webserver UI.
116 """
118 periodic: bool = True
119 """Whether this timetable runs periodically.
121 This defaults to and should generally be *True*, but some special setups
122 like ``schedule=None`` and ``"@once"`` set it to *False*.
123 """
125 can_run: bool = True
126 """Whether this timetable can actually schedule runs.
128 This defaults to and should generally be *True*, but ``NullTimetable`` sets
129 this to *False*.
130 """
132 run_ordering: Sequence[str] = ("data_interval_end", "execution_date")
133 """How runs triggered from this timetable should be ordered in UI.
135 This should be a list of field names on the DAG run object.
136 """
138 @classmethod
139 def deserialize(cls, data: dict[str, Any]) -> Timetable:
140 """Deserialize a timetable from data.
142 This is called when a serialized DAG is deserialized. ``data`` will be
143 whatever was returned by ``serialize`` during DAG serialization. The
144 default implementation constructs the timetable without any arguments.
145 """
146 return cls()
148 def serialize(self) -> dict[str, Any]:
149 """Serialize the timetable for JSON encoding.
151 This is called during DAG serialization to store timetable information
152 in the database. This should return a JSON-serializable dict that will
153 be fed into ``deserialize`` when the DAG is deserialized. The default
154 implementation returns an empty dict.
155 """
156 return {}
158 def validate(self) -> None:
159 """Validate the timetable is correctly specified.
161 Override this method to provide run-time validation raised when a DAG
162 is put into a dagbag. The default implementation does nothing.
164 :raises: AirflowTimetableInvalid on validation failure.
165 """
167 @property
168 def summary(self) -> str:
169 """A short summary for the timetable.
171 This is used to display the timetable in the web UI. A cron expression
172 timetable, for example, can use this to display the expression. The
173 default implementation returns the timetable's type name.
174 """
175 return type(self).__name__
177 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
178 """When a DAG run is manually triggered, infer a data interval for it.
180 This is used for e.g. manually-triggered runs, where ``run_after`` would
181 be when the user triggers the run. The default implementation raises
182 ``NotImplementedError``.
183 """
184 raise NotImplementedError()
186 def next_dagrun_info(
187 self,
188 *,
189 last_automated_data_interval: DataInterval | None,
190 restriction: TimeRestriction,
191 ) -> DagRunInfo | None:
192 """Provide information to schedule the next DagRun.
194 The default implementation raises ``NotImplementedError``.
196 :param last_automated_data_interval: The data interval of the associated
197 DAG's last scheduled or backfilled run (manual runs not considered).
198 :param restriction: Restriction to apply when scheduling the DAG run.
199 See documentation of :class:`TimeRestriction` for details.
201 :return: Information on when the next DagRun can be scheduled. None
202 means a DagRun will not happen. This does not mean no more runs
203 will be scheduled even again for this DAG; the timetable can return
204 a DagRunInfo object when asked at another time.
205 """
206 raise NotImplementedError()
208 def generate_run_id(
209 self,
210 *,
211 run_type: DagRunType,
212 logical_date: DateTime,
213 data_interval: DataInterval | None,
214 **extra,
215 ) -> str:
216 return run_type.generate_run_id(logical_date)