Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/timetables/base.py: 76%
58 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19from typing import TYPE_CHECKING, Any, NamedTuple, Sequence
20from warnings import warn
22from pendulum import DateTime
24from airflow.typing_compat import Protocol, runtime_checkable
26if TYPE_CHECKING:
27 from airflow.utils.types import DagRunType
30class DataInterval(NamedTuple):
31 """A data interval for a DagRun to operate over.
33 Both ``start`` and ``end`` **MUST** be "aware", i.e. contain timezone
34 information.
35 """
37 start: DateTime
38 end: DateTime
40 @classmethod
41 def exact(cls, at: DateTime) -> DataInterval:
42 """Represent an "interval" containing only an exact time."""
43 return cls(start=at, end=at)
46class TimeRestriction(NamedTuple):
47 """Restriction on when a DAG can be scheduled for a run.
49 Specifically, the run must not be earlier than ``earliest``, nor later than
50 ``latest``. If ``catchup`` is *False*, the run must also not be earlier than
51 the current time, i.e. "missed" schedules are not backfilled.
53 These values are generally set on the DAG or task's ``start_date``,
54 ``end_date``, and ``catchup`` arguments.
56 Both ``earliest`` and ``latest``, if not *None*, are inclusive; a DAG run
57 can happen exactly at either point of time. They are guaranteed to be aware
58 (i.e. contain timezone information) for ``TimeRestriction`` instances
59 created by Airflow.
60 """
62 earliest: DateTime | None
63 latest: DateTime | None
64 catchup: bool
67class DagRunInfo(NamedTuple):
68 """Information to schedule a DagRun.
70 Instances of this will be returned by timetables when they are asked to
71 schedule a DagRun creation.
72 """
74 run_after: DateTime
75 """The earliest time this DagRun is created and its tasks scheduled.
77 This **MUST** be "aware", i.e. contain timezone information.
78 """
80 data_interval: DataInterval
81 """The data interval this DagRun to operate over."""
83 @classmethod
84 def exact(cls, at: DateTime) -> DagRunInfo:
85 """Represent a run on an exact time."""
86 return cls(run_after=at, data_interval=DataInterval.exact(at))
88 @classmethod
89 def interval(cls, start: DateTime, end: DateTime) -> DagRunInfo:
90 """Represent a run on a continuous schedule.
92 In such a schedule, each data interval starts right after the previous
93 one ends, and each run is scheduled right after the interval ends. This
94 applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
95 """
96 return cls(run_after=end, data_interval=DataInterval(start, end))
98 @property
99 def logical_date(self: DagRunInfo) -> DateTime:
100 """Infer the logical date to represent a DagRun.
102 This replaces ``execution_date`` in Airflow 2.1 and prior. The idea is
103 essentially the same, just a different name.
104 """
105 return self.data_interval.start
108@runtime_checkable
109class Timetable(Protocol):
110 """Protocol that all Timetable classes are expected to implement."""
112 description: str = ""
113 """Human-readable description of the timetable.
115 For example, this can produce something like ``'At 21:30, only on Friday'``
116 from the cron expression ``'30 21 * * 5'``. This is used in the webserver UI.
117 """
119 periodic: bool = True
120 """Whether this timetable runs periodically.
122 This defaults to and should generally be *True*, but some special setups
123 like ``schedule=None`` and ``"@once"`` set it to *False*.
124 """
126 _can_be_scheduled: bool = True
128 @property
129 def can_be_scheduled(self):
130 if hasattr(self, "can_run"):
131 warn(
132 'can_run class variable is deprecated. Use "can_be_scheduled" instead.',
133 DeprecationWarning,
134 stacklevel=2,
135 )
136 return self.can_run
137 return self._can_be_scheduled
139 """Whether this timetable can actually schedule runs in an automated manner.
141 This defaults to and should generally be *True* (including non periodic
142 execution types like *@once* and data triggered tables), but
143 ``NullTimetable`` sets this to *False*.
144 """
146 run_ordering: Sequence[str] = ("data_interval_end", "execution_date")
147 """How runs triggered from this timetable should be ordered in UI.
149 This should be a list of field names on the DAG run object.
150 """
152 active_runs_limit: int | None = None
153 """Override the max_active_runs parameter of any DAGs using this timetable.
154 This is called during DAG initializing, and will set the max_active_runs if
155 it returns a value. In most cases this should return None, but in some cases
156 (for example, the ContinuousTimetable) there are good reasons for limiting
157 the DAGRun parallelism.
158 """
160 @classmethod
161 def deserialize(cls, data: dict[str, Any]) -> Timetable:
162 """Deserialize a timetable from data.
164 This is called when a serialized DAG is deserialized. ``data`` will be
165 whatever was returned by ``serialize`` during DAG serialization. The
166 default implementation constructs the timetable without any arguments.
167 """
168 return cls()
170 def serialize(self) -> dict[str, Any]:
171 """Serialize the timetable for JSON encoding.
173 This is called during DAG serialization to store timetable information
174 in the database. This should return a JSON-serializable dict that will
175 be fed into ``deserialize`` when the DAG is deserialized. The default
176 implementation returns an empty dict.
177 """
178 return {}
180 def validate(self) -> None:
181 """Validate the timetable is correctly specified.
183 Override this method to provide run-time validation raised when a DAG
184 is put into a dagbag. The default implementation does nothing.
186 :raises: AirflowTimetableInvalid on validation failure.
187 """
188 return
190 @property
191 def summary(self) -> str:
192 """A short summary for the timetable.
194 This is used to display the timetable in the web UI. A cron expression
195 timetable, for example, can use this to display the expression. The
196 default implementation returns the timetable's type name.
197 """
198 return type(self).__name__
200 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
201 """When a DAG run is manually triggered, infer a data interval for it.
203 This is used for e.g. manually-triggered runs, where ``run_after`` would
204 be when the user triggers the run. The default implementation raises
205 ``NotImplementedError``.
206 """
207 raise NotImplementedError()
209 def next_dagrun_info(
210 self,
211 *,
212 last_automated_data_interval: DataInterval | None,
213 restriction: TimeRestriction,
214 ) -> DagRunInfo | None:
215 """Provide information to schedule the next DagRun.
217 The default implementation raises ``NotImplementedError``.
219 :param last_automated_data_interval: The data interval of the associated
220 DAG's last scheduled or backfilled run (manual runs not considered).
221 :param restriction: Restriction to apply when scheduling the DAG run.
222 See documentation of :class:`TimeRestriction` for details.
224 :return: Information on when the next DagRun can be scheduled. None
225 means a DagRun will not happen. This does not mean no more runs
226 will be scheduled even again for this DAG; the timetable can return
227 a DagRunInfo object when asked at another time.
228 """
229 raise NotImplementedError()
231 def generate_run_id(
232 self,
233 *,
234 run_type: DagRunType,
235 logical_date: DateTime,
236 data_interval: DataInterval | None,
237 **extra,
238 ) -> str:
239 return run_type.generate_run_id(logical_date)