Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/timetables/simple.py: 51%
59 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import operator
20from typing import TYPE_CHECKING, Any, Collection
22from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
24if TYPE_CHECKING:
25 from pendulum import DateTime
26 from sqlalchemy import Session
28 from airflow.models.dataset import DatasetEvent
29 from airflow.utils.types import DagRunType
32class _TrivialTimetable(Timetable):
33 """Some code reuse for "trivial" timetables that has nothing complex."""
35 periodic = False
36 can_run = False
37 run_ordering = ("execution_date",)
39 @classmethod
40 def deserialize(cls, data: dict[str, Any]) -> Timetable:
41 return cls()
43 def __eq__(self, other: Any) -> bool:
44 """As long as *other* is of the same type.
46 This is only for testing purposes and should not be relied on otherwise.
47 """
48 if not isinstance(other, type(self)):
49 return NotImplemented
50 return True
52 def serialize(self) -> dict[str, Any]:
53 return {}
55 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
56 return DataInterval.exact(run_after)
59class NullTimetable(_TrivialTimetable):
60 """Timetable that never schedules anything.
62 This corresponds to ``schedule=None``.
63 """
65 description: str = "Never, external triggers only"
67 @property
68 def summary(self) -> str:
69 return "None"
71 def next_dagrun_info(
72 self,
73 *,
74 last_automated_data_interval: DataInterval | None,
75 restriction: TimeRestriction,
76 ) -> DagRunInfo | None:
77 return None
80class OnceTimetable(_TrivialTimetable):
81 """Timetable that schedules the execution once as soon as possible.
83 This corresponds to ``schedule="@once"``.
84 """
86 description: str = "Once, as soon as possible"
88 @property
89 def summary(self) -> str:
90 return "@once"
92 def next_dagrun_info(
93 self,
94 *,
95 last_automated_data_interval: DataInterval | None,
96 restriction: TimeRestriction,
97 ) -> DagRunInfo | None:
98 if last_automated_data_interval is not None:
99 return None # Already run, no more scheduling.
100 if restriction.earliest is None: # No start date, won't run.
101 return None
102 # "@once" always schedule to the start_date determined by the DAG and
103 # tasks, regardless of catchup or not. This has been the case since 1.10
104 # and we're inheriting it. See AIRFLOW-1928.
105 run_after = restriction.earliest
106 if restriction.latest is not None and run_after > restriction.latest:
107 return None
108 return DagRunInfo.exact(run_after)
111class DatasetTriggeredTimetable(NullTimetable):
112 """Timetable that never schedules anything.
114 This should not be directly used anywhere, but only set if a DAG is triggered by datasets.
116 :meta private:
117 """
119 description: str = "Triggered by datasets"
121 @property
122 def summary(self) -> str:
123 return "Dataset"
125 def generate_run_id(
126 self,
127 *,
128 run_type: DagRunType,
129 logical_date: DateTime,
130 data_interval: DataInterval | None,
131 session: Session | None = None,
132 events: Collection[DatasetEvent] | None = None,
133 **extra,
134 ) -> str:
135 from airflow.models.dagrun import DagRun
137 return DagRun.generate_run_id(run_type, logical_date)
139 def data_interval_for_events(
140 self,
141 logical_date: DateTime,
142 events: Collection[DatasetEvent],
143 ) -> DataInterval:
145 if not events:
146 return DataInterval(logical_date, logical_date)
148 start = min(
149 events, key=operator.attrgetter("source_dag_run.data_interval_start")
150 ).source_dag_run.data_interval_start
151 end = max(
152 events, key=operator.attrgetter("source_dag_run.data_interval_end")
153 ).source_dag_run.data_interval_end
154 return DataInterval(start, end)