1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from __future__ import annotations
19
20import typing
21
22from airflow.datasets import BaseDataset, DatasetAll
23from airflow.exceptions import AirflowTimetableInvalid
24from airflow.timetables.simple import DatasetTriggeredTimetable as DatasetTriggeredSchedule
25from airflow.utils.types import DagRunType
26
27if typing.TYPE_CHECKING:
28 from collections.abc import Collection
29
30 import pendulum
31
32 from airflow.datasets import Dataset
33 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
34
35
36class DatasetOrTimeSchedule(DatasetTriggeredSchedule):
37 """Combine time-based scheduling with event-based scheduling."""
38
39 def __init__(
40 self,
41 *,
42 timetable: Timetable,
43 datasets: Collection[Dataset] | BaseDataset,
44 ) -> None:
45 self.timetable = timetable
46 if isinstance(datasets, BaseDataset):
47 self.datasets = datasets
48 else:
49 self.datasets = DatasetAll(*datasets)
50
51 self.description = f"Triggered by datasets or {timetable.description}"
52 self.periodic = timetable.periodic
53 self._can_be_scheduled = timetable._can_be_scheduled
54 self.active_runs_limit = timetable.active_runs_limit
55
56 @classmethod
57 def deserialize(cls, data: dict[str, typing.Any]) -> Timetable:
58 from airflow.serialization.serialized_objects import decode_timetable
59
60 return cls(
61 timetable=decode_timetable(data["timetable"]),
62 # don't need the datasets after deserialization
63 # they are already stored on dataset_triggers attr on DAG
64 # and this is what scheduler looks at
65 datasets=[],
66 )
67
68 def serialize(self) -> dict[str, typing.Any]:
69 from airflow.serialization.serialized_objects import encode_timetable
70
71 return {"timetable": encode_timetable(self.timetable)}
72
73 def validate(self) -> None:
74 if isinstance(self.timetable, DatasetTriggeredSchedule):
75 raise AirflowTimetableInvalid("cannot nest dataset timetables")
76 if not isinstance(self.datasets, BaseDataset):
77 raise AirflowTimetableInvalid("all elements in 'datasets' must be datasets")
78
79 @property
80 def summary(self) -> str:
81 return f"Dataset or {self.timetable.summary}"
82
83 def infer_manual_data_interval(self, *, run_after: pendulum.DateTime) -> DataInterval:
84 return self.timetable.infer_manual_data_interval(run_after=run_after)
85
86 def next_dagrun_info(
87 self, *, last_automated_data_interval: DataInterval | None, restriction: TimeRestriction
88 ) -> DagRunInfo | None:
89 return self.timetable.next_dagrun_info(
90 last_automated_data_interval=last_automated_data_interval,
91 restriction=restriction,
92 )
93
94 def generate_run_id(self, *, run_type: DagRunType, **kwargs: typing.Any) -> str:
95 if run_type != DagRunType.DATASET_TRIGGERED:
96 return self.timetable.generate_run_id(run_type=run_type, **kwargs)
97 return super().generate_run_id(run_type=run_type, **kwargs)