Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/definitions/deadline.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

66 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import logging 

20from datetime import datetime, timedelta 

21from typing import TYPE_CHECKING 

22 

23from airflow.models.deadline import DeadlineReferenceType, ReferenceModels 

24from airflow.sdk.definitions.callback import AsyncCallback, Callback 

25 

26if TYPE_CHECKING: 

27 from collections.abc import Callable 

28 from typing import TypeAlias 

29 

30logger = logging.getLogger(__name__) 

31 

32DeadlineReferenceTypes: TypeAlias = tuple[type[ReferenceModels.BaseDeadlineReference], ...] 

33 

34 

35class DeadlineAlert: 

36 """Store Deadline values needed to calculate the need-by timestamp and the callback information.""" 

37 

38 def __init__( 

39 self, 

40 reference: DeadlineReferenceType, 

41 interval: timedelta, 

42 callback: Callback, 

43 ): 

44 self.reference = reference 

45 self.interval = interval 

46 

47 if not isinstance(callback, AsyncCallback): 

48 raise ValueError(f"Callbacks of type {type(callback).__name__} are not currently supported") 

49 self.callback = callback 

50 

51 def __eq__(self, other: object) -> bool: 

52 if not isinstance(other, DeadlineAlert): 

53 return NotImplemented 

54 return ( 

55 isinstance(self.reference, type(other.reference)) 

56 and self.interval == other.interval 

57 and self.callback == other.callback 

58 ) 

59 

60 def __hash__(self) -> int: 

61 return hash( 

62 ( 

63 type(self.reference).__name__, 

64 self.interval, 

65 self.callback, 

66 ) 

67 ) 

68 

69 

70class DeadlineReference: 

71 """ 

72 The public interface class for all DeadlineReference options. 

73 

74 This class provides a unified interface for working with Deadlines, supporting both 

75 calculated deadlines (which fetch values from the database) and fixed deadlines 

76 (which return a predefined datetime). 

77 

78 ------ 

79 Usage: 

80 ------ 

81 

82 1. Example deadline references: 

83 

84 .. code-block:: python 

85 

86 fixed = DeadlineReference.FIXED_DATETIME(datetime(2025, 5, 4)) 

87 logical = DeadlineReference.DAGRUN_LOGICAL_DATE 

88 queued = DeadlineReference.DAGRUN_QUEUED_AT 

89 

90 2. Using in a DAG: 

91 

92 .. code-block:: python 

93 

94 DAG( 

95 dag_id="dag_with_deadline", 

96 deadline=DeadlineAlert( 

97 reference=DeadlineReference.DAGRUN_LOGICAL_DATE, 

98 interval=timedelta(hours=1), 

99 callback=hello_callback, 

100 ), 

101 ) 

102 

103 3. Evaluating deadlines will ignore unexpected parameters: 

104 

105 .. code-block:: python 

106 

107 # For deadlines requiring parameters: 

108 deadline = DeadlineReference.DAGRUN_LOGICAL_DATE 

109 deadline.evaluate_with(dag_id=dag.dag_id) 

110 

111 # For deadlines with no required parameters: 

112 deadline = DeadlineReference.FIXED_DATETIME(datetime(2025, 5, 4)) 

113 deadline.evaluate_with() 

114 """ 

115 

116 class TYPES: 

117 """Collection of DeadlineReference types for type checking.""" 

118 

119 # Deadlines that should be created when the DagRun is created. 

120 DAGRUN_CREATED: DeadlineReferenceTypes = ( 

121 ReferenceModels.DagRunLogicalDateDeadline, 

122 ReferenceModels.FixedDatetimeDeadline, 

123 ReferenceModels.AverageRuntimeDeadline, 

124 ) 

125 

126 # Deadlines that should be created when the DagRun is queued. 

127 DAGRUN_QUEUED: DeadlineReferenceTypes = (ReferenceModels.DagRunQueuedAtDeadline,) 

128 

129 # All DagRun-related deadline types. 

130 DAGRUN: DeadlineReferenceTypes = DAGRUN_CREATED + DAGRUN_QUEUED 

131 

132 from airflow.models.deadline import ReferenceModels 

133 

134 DAGRUN_LOGICAL_DATE: DeadlineReferenceType = ReferenceModels.DagRunLogicalDateDeadline() 

135 DAGRUN_QUEUED_AT: DeadlineReferenceType = ReferenceModels.DagRunQueuedAtDeadline() 

136 

137 @classmethod 

138 def AVERAGE_RUNTIME(cls, max_runs: int = 0, min_runs: int | None = None) -> DeadlineReferenceType: 

139 if max_runs == 0: 

140 max_runs = cls.ReferenceModels.AverageRuntimeDeadline.DEFAULT_LIMIT 

141 if min_runs is None: 

142 min_runs = max_runs 

143 return cls.ReferenceModels.AverageRuntimeDeadline(max_runs, min_runs) 

144 

145 @classmethod 

146 def FIXED_DATETIME(cls, datetime: datetime) -> DeadlineReferenceType: 

147 return cls.ReferenceModels.FixedDatetimeDeadline(datetime) 

148 

149 # TODO: Remove this once other deadline types exist. 

150 # This is a temporary reference type used only in tests to verify that 

151 # dag.has_dagrun_deadline() returns false if the dag has a non-dagrun deadline type. 

152 # It should be replaced with a real non-dagrun deadline type when one is available. 

153 _TEMPORARY_TEST_REFERENCE = type( 

154 "TemporaryTestDeadlineForTypeChecking", 

155 (DeadlineReferenceType,), 

156 {"_evaluate_with": lambda self, **kwargs: datetime.now()}, 

157 )() 

158 

159 @classmethod 

160 def register_custom_reference( 

161 cls, 

162 reference_class: type[ReferenceModels.BaseDeadlineReference], 

163 deadline_reference_type: DeadlineReferenceTypes | None = None, 

164 ) -> type[ReferenceModels.BaseDeadlineReference]: 

165 """ 

166 Register a custom deadline reference class. 

167 

168 :param reference_class: The custom reference class inheriting from BaseDeadlineReference 

169 :param deadline_reference_type: A DeadlineReference.TYPES for when the deadline should be evaluated ("DAGRUN_CREATED", 

170 "DAGRUN_QUEUED", etc.); defaults to DeadlineReference.TYPES.DAGRUN_CREATED 

171 """ 

172 from airflow.models.deadline import ReferenceModels 

173 

174 # Default to DAGRUN_CREATED if no deadline_reference_type specified 

175 if deadline_reference_type is None: 

176 deadline_reference_type = cls.TYPES.DAGRUN_CREATED 

177 

178 # Validate the reference class inherits from BaseDeadlineReference 

179 if not issubclass(reference_class, ReferenceModels.BaseDeadlineReference): 

180 raise ValueError(f"{reference_class.__name__} must inherit from BaseDeadlineReference") 

181 

182 # Register the new reference with ReferenceModels and DeadlineReference for discoverability 

183 setattr(ReferenceModels, reference_class.__name__, reference_class) 

184 setattr(cls, reference_class.__name__, reference_class()) 

185 logger.info("Registered DeadlineReference %s", reference_class.__name__) 

186 

187 # Add to appropriate deadline_reference_type classification 

188 if deadline_reference_type is cls.TYPES.DAGRUN_CREATED: 

189 cls.TYPES.DAGRUN_CREATED = cls.TYPES.DAGRUN_CREATED + (reference_class,) 

190 elif deadline_reference_type is cls.TYPES.DAGRUN_QUEUED: 

191 cls.TYPES.DAGRUN_QUEUED = cls.TYPES.DAGRUN_QUEUED + (reference_class,) 

192 else: 

193 raise ValueError( 

194 f"Invalid deadline reference type {deadline_reference_type}; " 

195 "must be a valid DeadlineReference.TYPES option." 

196 ) 

197 

198 # Refresh the combined DAGRUN tuple 

199 cls.TYPES.DAGRUN = cls.TYPES.DAGRUN_CREATED + cls.TYPES.DAGRUN_QUEUED 

200 

201 return reference_class 

202 

203 

204def deadline_reference( 

205 deadline_reference_type: DeadlineReferenceTypes | None = None, 

206) -> Callable[[type[ReferenceModels.BaseDeadlineReference]], type[ReferenceModels.BaseDeadlineReference]]: 

207 """ 

208 Decorate a class to register a custom deadline reference. 

209 

210 Usage: 

211 @deadline_reference() 

212 class MyCustomReference(ReferenceModels.BaseDeadlineReference): 

213 # By default, evaluate_with will be called when a new dagrun is created. 

214 def _evaluate_with(self, *, session: Session, **kwargs) -> datetime: 

215 # Put your business logic here 

216 return some_datetime 

217 

218 @deadline_reference(DeadlineReference.TYPES.DAGRUN_QUEUED) 

219 class MyQueuedRef(ReferenceModels.BaseDeadlineReference): 

220 # Optionally, you can specify when you want it calculated by providing a DeadlineReference.TYPES 

221 def _evaluate_with(self, *, session: Session, **kwargs) -> datetime: 

222 # Put your business logic here 

223 return some_datetime 

224 """ 

225 

226 def decorator( 

227 reference_class: type[ReferenceModels.BaseDeadlineReference], 

228 ) -> type[ReferenceModels.BaseDeadlineReference]: 

229 DeadlineReference.register_custom_reference(reference_class, deadline_reference_type) 

230 return reference_class 

231 

232 return decorator