1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from __future__ import annotations
19
20from typing import TYPE_CHECKING
21
22import attrs
23
24from airflow.sdk.bases.timetable import BaseTimetable
25from airflow.sdk.definitions.asset import AssetAll, BaseAsset
26
27if TYPE_CHECKING:
28 from collections.abc import Collection
29
30 from airflow.sdk import Asset
31 from airflow.sdk.definitions.partition_mapper.base import PartitionMapper
32
33
34@attrs.define
35class AssetTriggeredTimetable(BaseTimetable):
36 """
37 Timetable that never schedules anything.
38
39 This should not be directly used anywhere, but only set if a DAG is triggered by assets.
40
41 :meta private:
42 """
43
44 asset_condition: BaseAsset = attrs.field(alias="assets")
45
46
47@attrs.define
48class PartitionedAssetTimetable(AssetTriggeredTimetable):
49 """Asset-driven timetable that listens for partitioned assets."""
50
51 asset_condition: BaseAsset = attrs.field(alias="assets")
52 partition_mapper: PartitionMapper
53
54
55def _coerce_assets(o: Collection[Asset] | BaseAsset) -> BaseAsset:
56 if isinstance(o, BaseAsset):
57 return o
58 return AssetAll(*o)
59
60
61@attrs.define(kw_only=True)
62class AssetOrTimeSchedule(AssetTriggeredTimetable):
63 """
64 Combine time-based scheduling with event-based scheduling.
65
66 :param assets: An asset of list of assets, in the same format as
67 ``DAG(schedule=...)`` when using event-driven scheduling. This is used
68 to evaluate event-based scheduling.
69 :param timetable: A timetable instance to evaluate time-based scheduling.
70 """
71
72 asset_condition: BaseAsset = attrs.field(alias="assets", converter=_coerce_assets)
73 timetable: BaseTimetable
74
75 def __attrs_post_init__(self) -> None:
76 self.active_runs_limit = self.timetable.active_runs_limit
77 self.can_be_scheduled = self.timetable.can_be_scheduled