1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
18
19import logging
20from datetime import datetime, timedelta
21from typing import TYPE_CHECKING
22
23from airflow.models.deadline import DeadlineReferenceType, ReferenceModels
24from airflow.sdk.definitions.callback import AsyncCallback, Callback
25
26if TYPE_CHECKING:
27 from collections.abc import Callable
28 from typing import TypeAlias
29
30logger = logging.getLogger(__name__)
31
32DeadlineReferenceTypes: TypeAlias = tuple[type[ReferenceModels.BaseDeadlineReference], ...]
33
34
35class DeadlineAlert:
36 """Store Deadline values needed to calculate the need-by timestamp and the callback information."""
37
38 def __init__(
39 self,
40 reference: DeadlineReferenceType,
41 interval: timedelta,
42 callback: Callback,
43 ):
44 self.reference = reference
45 self.interval = interval
46
47 if not isinstance(callback, AsyncCallback):
48 raise ValueError(f"Callbacks of type {type(callback).__name__} are not currently supported")
49 self.callback = callback
50
51 def __eq__(self, other: object) -> bool:
52 if not isinstance(other, DeadlineAlert):
53 return NotImplemented
54 return (
55 isinstance(self.reference, type(other.reference))
56 and self.interval == other.interval
57 and self.callback == other.callback
58 )
59
60 def __hash__(self) -> int:
61 return hash(
62 (
63 type(self.reference).__name__,
64 self.interval,
65 self.callback,
66 )
67 )
68
69
70class DeadlineReference:
71 """
72 The public interface class for all DeadlineReference options.
73
74 This class provides a unified interface for working with Deadlines, supporting both
75 calculated deadlines (which fetch values from the database) and fixed deadlines
76 (which return a predefined datetime).
77
78 ------
79 Usage:
80 ------
81
82 1. Example deadline references:
83
84 .. code-block:: python
85
86 fixed = DeadlineReference.FIXED_DATETIME(datetime(2025, 5, 4))
87 logical = DeadlineReference.DAGRUN_LOGICAL_DATE
88 queued = DeadlineReference.DAGRUN_QUEUED_AT
89
90 2. Using in a DAG:
91
92 .. code-block:: python
93
94 DAG(
95 dag_id="dag_with_deadline",
96 deadline=DeadlineAlert(
97 reference=DeadlineReference.DAGRUN_LOGICAL_DATE,
98 interval=timedelta(hours=1),
99 callback=hello_callback,
100 ),
101 )
102
103 3. Evaluating deadlines will ignore unexpected parameters:
104
105 .. code-block:: python
106
107 # For deadlines requiring parameters:
108 deadline = DeadlineReference.DAGRUN_LOGICAL_DATE
109 deadline.evaluate_with(dag_id=dag.dag_id)
110
111 # For deadlines with no required parameters:
112 deadline = DeadlineReference.FIXED_DATETIME(datetime(2025, 5, 4))
113 deadline.evaluate_with()
114 """
115
116 class TYPES:
117 """Collection of DeadlineReference types for type checking."""
118
119 # Deadlines that should be created when the DagRun is created.
120 DAGRUN_CREATED: DeadlineReferenceTypes = (
121 ReferenceModels.DagRunLogicalDateDeadline,
122 ReferenceModels.FixedDatetimeDeadline,
123 ReferenceModels.AverageRuntimeDeadline,
124 )
125
126 # Deadlines that should be created when the DagRun is queued.
127 DAGRUN_QUEUED: DeadlineReferenceTypes = (ReferenceModels.DagRunQueuedAtDeadline,)
128
129 # All DagRun-related deadline types.
130 DAGRUN: DeadlineReferenceTypes = DAGRUN_CREATED + DAGRUN_QUEUED
131
132 from airflow.models.deadline import ReferenceModels
133
134 DAGRUN_LOGICAL_DATE: DeadlineReferenceType = ReferenceModels.DagRunLogicalDateDeadline()
135 DAGRUN_QUEUED_AT: DeadlineReferenceType = ReferenceModels.DagRunQueuedAtDeadline()
136
137 @classmethod
138 def AVERAGE_RUNTIME(cls, max_runs: int = 0, min_runs: int | None = None) -> DeadlineReferenceType:
139 if max_runs == 0:
140 max_runs = cls.ReferenceModels.AverageRuntimeDeadline.DEFAULT_LIMIT
141 if min_runs is None:
142 min_runs = max_runs
143 return cls.ReferenceModels.AverageRuntimeDeadline(max_runs, min_runs)
144
145 @classmethod
146 def FIXED_DATETIME(cls, datetime: datetime) -> DeadlineReferenceType:
147 return cls.ReferenceModels.FixedDatetimeDeadline(datetime)
148
149 # TODO: Remove this once other deadline types exist.
150 # This is a temporary reference type used only in tests to verify that
151 # dag.has_dagrun_deadline() returns false if the dag has a non-dagrun deadline type.
152 # It should be replaced with a real non-dagrun deadline type when one is available.
153 _TEMPORARY_TEST_REFERENCE = type(
154 "TemporaryTestDeadlineForTypeChecking",
155 (DeadlineReferenceType,),
156 {"_evaluate_with": lambda self, **kwargs: datetime.now()},
157 )()
158
159 @classmethod
160 def register_custom_reference(
161 cls,
162 reference_class: type[ReferenceModels.BaseDeadlineReference],
163 deadline_reference_type: DeadlineReferenceTypes | None = None,
164 ) -> type[ReferenceModels.BaseDeadlineReference]:
165 """
166 Register a custom deadline reference class.
167
168 :param reference_class: The custom reference class inheriting from BaseDeadlineReference
169 :param deadline_reference_type: A DeadlineReference.TYPES for when the deadline should be evaluated ("DAGRUN_CREATED",
170 "DAGRUN_QUEUED", etc.); defaults to DeadlineReference.TYPES.DAGRUN_CREATED
171 """
172 from airflow.models.deadline import ReferenceModels
173
174 # Default to DAGRUN_CREATED if no deadline_reference_type specified
175 if deadline_reference_type is None:
176 deadline_reference_type = cls.TYPES.DAGRUN_CREATED
177
178 # Validate the reference class inherits from BaseDeadlineReference
179 if not issubclass(reference_class, ReferenceModels.BaseDeadlineReference):
180 raise ValueError(f"{reference_class.__name__} must inherit from BaseDeadlineReference")
181
182 # Register the new reference with ReferenceModels and DeadlineReference for discoverability
183 setattr(ReferenceModels, reference_class.__name__, reference_class)
184 setattr(cls, reference_class.__name__, reference_class())
185 logger.info("Registered DeadlineReference %s", reference_class.__name__)
186
187 # Add to appropriate deadline_reference_type classification
188 if deadline_reference_type is cls.TYPES.DAGRUN_CREATED:
189 cls.TYPES.DAGRUN_CREATED = cls.TYPES.DAGRUN_CREATED + (reference_class,)
190 elif deadline_reference_type is cls.TYPES.DAGRUN_QUEUED:
191 cls.TYPES.DAGRUN_QUEUED = cls.TYPES.DAGRUN_QUEUED + (reference_class,)
192 else:
193 raise ValueError(
194 f"Invalid deadline reference type {deadline_reference_type}; "
195 "must be a valid DeadlineReference.TYPES option."
196 )
197
198 # Refresh the combined DAGRUN tuple
199 cls.TYPES.DAGRUN = cls.TYPES.DAGRUN_CREATED + cls.TYPES.DAGRUN_QUEUED
200
201 return reference_class
202
203
204def deadline_reference(
205 deadline_reference_type: DeadlineReferenceTypes | None = None,
206) -> Callable[[type[ReferenceModels.BaseDeadlineReference]], type[ReferenceModels.BaseDeadlineReference]]:
207 """
208 Decorate a class to register a custom deadline reference.
209
210 Usage:
211 @deadline_reference()
212 class MyCustomReference(ReferenceModels.BaseDeadlineReference):
213 # By default, evaluate_with will be called when a new dagrun is created.
214 def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
215 # Put your business logic here
216 return some_datetime
217
218 @deadline_reference(DeadlineReference.TYPES.DAGRUN_QUEUED)
219 class MyQueuedRef(ReferenceModels.BaseDeadlineReference):
220 # Optionally, you can specify when you want it calculated by providing a DeadlineReference.TYPES
221 def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
222 # Put your business logic here
223 return some_datetime
224 """
225
226 def decorator(
227 reference_class: type[ReferenceModels.BaseDeadlineReference],
228 ) -> type[ReferenceModels.BaseDeadlineReference]:
229 DeadlineReference.register_custom_reference(reference_class, deadline_reference_type)
230 return reference_class
231
232 return decorator