1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from __future__ import annotations
19
20import typing
21
22import attrs
23
24from airflow.sdk.bases.timetable import BaseTimetable
25from airflow.sdk.definitions.asset import AssetAll, BaseAsset
26
27if typing.TYPE_CHECKING:
28 from collections.abc import Collection
29
30 from airflow.sdk import Asset
31
32
33@attrs.define
34class AssetTriggeredTimetable(BaseTimetable):
35 """
36 Timetable that never schedules anything.
37
38 This should not be directly used anywhere, but only set if a DAG is triggered by assets.
39
40 :meta private:
41 """
42
43 asset_condition: BaseAsset = attrs.field(alias="assets")
44
45
46def _coerce_assets(o: Collection[Asset] | BaseAsset) -> BaseAsset:
47 if isinstance(o, BaseAsset):
48 return o
49 return AssetAll(*o)
50
51
52@attrs.define(kw_only=True)
53class AssetOrTimeSchedule(AssetTriggeredTimetable):
54 """
55 Combine time-based scheduling with event-based scheduling.
56
57 :param assets: An asset of list of assets, in the same format as
58 ``DAG(schedule=...)`` when using event-driven scheduling. This is used
59 to evaluate event-based scheduling.
60 :param timetable: A timetable instance to evaluate time-based scheduling.
61 """
62
63 asset_condition: BaseAsset = attrs.field(alias="assets", converter=_coerce_assets)
64 timetable: BaseTimetable
65
66 def __attrs_post_init__(self) -> None:
67 self.active_runs_limit = self.timetable.active_runs_limit
68 self.can_be_scheduled = self.timetable.can_be_scheduled