1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from __future__ import annotations
19
20from collections.abc import Iterator
21from typing import TYPE_CHECKING, Any
22
23from airflow.sdk.definitions.asset import AssetUniqueKey, BaseAsset
24
25if TYPE_CHECKING:
26 from sqlalchemy.orm import Session
27
28 from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetRef
29 from airflow.serialization.dag_dependency import DagDependency
30
31
32class NullAsset(BaseAsset):
33 """
34 Sentinel type that represents "no assets".
35
36 This is only implemented to make typing easier in timetables, and not
37 expected to be used anywhere else.
38
39 :meta private:
40 """
41
42 def __bool__(self) -> bool:
43 return False
44
45 def __or__(self, other: BaseAsset) -> BaseAsset:
46 return NotImplemented
47
48 def __and__(self, other: BaseAsset) -> BaseAsset:
49 return NotImplemented
50
51 def as_expression(self) -> Any:
52 return None
53
54 def evaluate(self, statuses: dict[AssetUniqueKey, bool], *, session: Session | None = None) -> bool:
55 return False
56
57 def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]:
58 return iter(())
59
60 def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
61 return iter(())
62
63 def iter_asset_refs(self) -> Iterator[AssetRef]:
64 return iter(())
65
66 def iter_dag_dependencies(self, source, target) -> Iterator[DagDependency]:
67 return iter(())
68
69
70class BaseTimetable:
71 """Base class inherited by all user-facing timetables."""
72
73 can_be_scheduled: bool = True
74 """
75 Whether this timetable can actually schedule runs in an automated manner.
76
77 This defaults to and should generally be *True* (including non periodic
78 execution types like *@once* and data triggered tables), but
79 ``NullTimetable`` sets this to *False*.
80 """
81
82 active_runs_limit: int | None = None
83 """
84 Maximum active runs that can be active at one time for a DAG.
85
86 This is called during DAG initialization, and the return value is used as
87 the DAG's default ``max_active_runs`` if not set on the DAG explicitly. This
88 should generally return *None* (no limit), but some timetables may limit
89 parallelism, such as ``ContinuousTimetable``.
90 """
91
92 asset_condition: BaseAsset = NullAsset()
93
94 def validate(self) -> None:
95 """
96 Validate the timetable is correctly specified.
97
98 Override this method to provide run-time validation raised when a DAG
99 is put into a dagbag. The default implementation does nothing.
100
101 :raises: :class:`~airflow.sdk.exceptions.AirflowTimetableInvalid` on validation failure.
102 """