Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/timetables/simple.py: 49%
78 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import operator
20from typing import TYPE_CHECKING, Any, Collection
22from pendulum import DateTime
24from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
26if TYPE_CHECKING:
27 from sqlalchemy import Session
29 from airflow.models.dataset import DatasetEvent
30 from airflow.utils.types import DagRunType
33class _TrivialTimetable(Timetable):
34 """Some code reuse for "trivial" timetables that has nothing complex."""
36 periodic = False
37 run_ordering = ("execution_date",)
39 @classmethod
40 def deserialize(cls, data: dict[str, Any]) -> Timetable:
41 return cls()
43 def __eq__(self, other: Any) -> bool:
44 """As long as *other* is of the same type.
46 This is only for testing purposes and should not be relied on otherwise.
47 """
48 if not isinstance(other, type(self)):
49 return NotImplemented
50 return True
52 def serialize(self) -> dict[str, Any]:
53 return {}
55 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
56 return DataInterval.exact(run_after)
59class NullTimetable(_TrivialTimetable):
60 """Timetable that never schedules anything.
62 This corresponds to ``schedule=None``.
63 """
65 can_be_scheduled = False
66 description: str = "Never, external triggers only"
68 @property
69 def summary(self) -> str:
70 return "None"
72 def next_dagrun_info(
73 self,
74 *,
75 last_automated_data_interval: DataInterval | None,
76 restriction: TimeRestriction,
77 ) -> DagRunInfo | None:
78 return None
81class OnceTimetable(_TrivialTimetable):
82 """Timetable that schedules the execution once as soon as possible.
84 This corresponds to ``schedule="@once"``.
85 """
87 description: str = "Once, as soon as possible"
89 @property
90 def summary(self) -> str:
91 return "@once"
93 def next_dagrun_info(
94 self,
95 *,
96 last_automated_data_interval: DataInterval | None,
97 restriction: TimeRestriction,
98 ) -> DagRunInfo | None:
99 if last_automated_data_interval is not None:
100 return None # Already run, no more scheduling.
101 if restriction.earliest is None: # No start date, won't run.
102 return None
103 # "@once" always schedule to the start_date determined by the DAG and
104 # tasks, regardless of catchup or not. This has been the case since 1.10
105 # and we're inheriting it. See AIRFLOW-1928.
106 run_after = restriction.earliest
107 if restriction.latest is not None and run_after > restriction.latest:
108 return None
109 return DagRunInfo.exact(run_after)
112class ContinuousTimetable(_TrivialTimetable):
113 """Timetable that schedules continually, while still respecting start_date and end_date.
115 This corresponds to ``schedule="@continuous"``.
116 """
118 description: str = "As frequently as possible, but only one run at a time."
120 active_runs_limit = 1 # Continuous DAGRuns should be constrained to one run at a time
122 @property
123 def summary(self) -> str:
124 return "@continuous"
126 def next_dagrun_info(
127 self,
128 *,
129 last_automated_data_interval: DataInterval | None,
130 restriction: TimeRestriction,
131 ) -> DagRunInfo | None:
132 if restriction.earliest is None: # No start date, won't run.
133 return None
134 if last_automated_data_interval is not None: # has already run once
135 start = last_automated_data_interval.end
136 end = DateTime.utcnow()
137 else: # first run
138 start = restriction.earliest
139 end = max(restriction.earliest, DateTime.utcnow()) # won't run any earlier than start_date
141 if restriction.latest is not None and end > restriction.latest:
142 return None
144 return DagRunInfo.interval(start, end)
147class DatasetTriggeredTimetable(_TrivialTimetable):
148 """Timetable that never schedules anything.
150 This should not be directly used anywhere, but only set if a DAG is triggered by datasets.
152 :meta private:
153 """
155 description: str = "Triggered by datasets"
157 @property
158 def summary(self) -> str:
159 return "Dataset"
161 def generate_run_id(
162 self,
163 *,
164 run_type: DagRunType,
165 logical_date: DateTime,
166 data_interval: DataInterval | None,
167 session: Session | None = None,
168 events: Collection[DatasetEvent] | None = None,
169 **extra,
170 ) -> str:
171 from airflow.models.dagrun import DagRun
173 return DagRun.generate_run_id(run_type, logical_date)
175 def data_interval_for_events(
176 self,
177 logical_date: DateTime,
178 events: Collection[DatasetEvent],
179 ) -> DataInterval:
181 if not events:
182 return DataInterval(logical_date, logical_date)
184 start = min(
185 events, key=operator.attrgetter("source_dag_run.data_interval_start")
186 ).source_dag_run.data_interval_start
187 end = max(
188 events, key=operator.attrgetter("source_dag_run.data_interval_end")
189 ).source_dag_run.data_interval_end
190 return DataInterval(start, end)
192 def next_dagrun_info(
193 self,
194 *,
195 last_automated_data_interval: DataInterval | None,
196 restriction: TimeRestriction,
197 ) -> DagRunInfo | None:
198 return None