Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/timetables/simple.py: 43%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19from typing import TYPE_CHECKING, Any, Collection, Sequence
21from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
22from airflow.utils import timezone
24if TYPE_CHECKING:
25 from pendulum import DateTime
26 from sqlalchemy import Session
28 from airflow.models.dataset import DatasetEvent
29 from airflow.timetables.base import TimeRestriction
30 from airflow.utils.types import DagRunType
33class _TrivialTimetable(Timetable):
34 """Some code reuse for "trivial" timetables that has nothing complex."""
36 periodic = False
37 run_ordering: Sequence[str] = ("execution_date",)
39 @classmethod
40 def deserialize(cls, data: dict[str, Any]) -> Timetable:
41 return cls()
43 def __eq__(self, other: Any) -> bool:
44 """As long as *other* is of the same type.
46 This is only for testing purposes and should not be relied on otherwise.
47 """
48 if not isinstance(other, type(self)):
49 return NotImplemented
50 return True
52 def serialize(self) -> dict[str, Any]:
53 return {}
55 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
56 return DataInterval.exact(run_after)
59class NullTimetable(_TrivialTimetable):
60 """Timetable that never schedules anything.
62 This corresponds to ``schedule=None``.
63 """
65 can_be_scheduled = False
66 description: str = "Never, external triggers only"
68 @property
69 def summary(self) -> str:
70 return "None"
72 def next_dagrun_info(
73 self,
74 *,
75 last_automated_data_interval: DataInterval | None,
76 restriction: TimeRestriction,
77 ) -> DagRunInfo | None:
78 return None
81class OnceTimetable(_TrivialTimetable):
82 """Timetable that schedules the execution once as soon as possible.
84 This corresponds to ``schedule="@once"``.
85 """
87 description: str = "Once, as soon as possible"
89 @property
90 def summary(self) -> str:
91 return "@once"
93 def next_dagrun_info(
94 self,
95 *,
96 last_automated_data_interval: DataInterval | None,
97 restriction: TimeRestriction,
98 ) -> DagRunInfo | None:
99 if last_automated_data_interval is not None:
100 return None # Already run, no more scheduling.
101 if restriction.earliest is None: # No start date, won't run.
102 return None
103 # "@once" always schedule to the start_date determined by the DAG and
104 # tasks, regardless of catchup or not. This has been the case since 1.10
105 # and we're inheriting it.
106 run_after = restriction.earliest
107 if restriction.latest is not None and run_after > restriction.latest:
108 return None
109 return DagRunInfo.exact(run_after)
112class ContinuousTimetable(_TrivialTimetable):
113 """Timetable that schedules continually, while still respecting start_date and end_date.
115 This corresponds to ``schedule="@continuous"``.
116 """
118 description: str = "As frequently as possible, but only one run at a time."
120 active_runs_limit = 1 # Continuous DAGRuns should be constrained to one run at a time
122 @property
123 def summary(self) -> str:
124 return "@continuous"
126 def next_dagrun_info(
127 self,
128 *,
129 last_automated_data_interval: DataInterval | None,
130 restriction: TimeRestriction,
131 ) -> DagRunInfo | None:
132 if restriction.earliest is None: # No start date, won't run.
133 return None
134 if last_automated_data_interval is not None: # has already run once
135 start = last_automated_data_interval.end
136 end = timezone.coerce_datetime(timezone.utcnow())
137 else: # first run
138 start = restriction.earliest
139 end = max(
140 restriction.earliest, timezone.coerce_datetime(timezone.utcnow())
141 ) # won't run any earlier than start_date
143 if restriction.latest is not None and end > restriction.latest:
144 return None
146 return DagRunInfo.interval(start, end)
149class DatasetTriggeredTimetable(_TrivialTimetable):
150 """Timetable that never schedules anything.
152 This should not be directly used anywhere, but only set if a DAG is triggered by datasets.
154 :meta private:
155 """
157 description: str = "Triggered by datasets"
159 @property
160 def summary(self) -> str:
161 return "Dataset"
163 def generate_run_id(
164 self,
165 *,
166 run_type: DagRunType,
167 logical_date: DateTime,
168 data_interval: DataInterval | None,
169 session: Session | None = None,
170 events: Collection[DatasetEvent] | None = None,
171 **extra,
172 ) -> str:
173 from airflow.models.dagrun import DagRun
175 return DagRun.generate_run_id(run_type, logical_date)
177 def data_interval_for_events(
178 self,
179 logical_date: DateTime,
180 events: Collection[DatasetEvent],
181 ) -> DataInterval:
182 if not events:
183 return DataInterval(logical_date, logical_date)
185 start_dates, end_dates = [], []
186 for event in events:
187 if event.source_dag_run is not None:
188 start_dates.append(event.source_dag_run.data_interval_start)
189 end_dates.append(event.source_dag_run.data_interval_end)
190 else:
191 start_dates.append(event.timestamp)
192 end_dates.append(event.timestamp)
194 start = min(start_dates)
195 end = max(end_dates)
196 return DataInterval(start, end)
198 def next_dagrun_info(
199 self,
200 *,
201 last_automated_data_interval: DataInterval | None,
202 restriction: TimeRestriction,
203 ) -> DagRunInfo | None:
204 return None