Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/timetables/base.py: 76%

58 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19from typing import TYPE_CHECKING, Any, NamedTuple, Sequence 

20from warnings import warn 

21 

22from pendulum import DateTime 

23 

24from airflow.typing_compat import Protocol, runtime_checkable 

25 

26if TYPE_CHECKING: 

27 from airflow.utils.types import DagRunType 

28 

29 

30class DataInterval(NamedTuple): 

31 """A data interval for a DagRun to operate over. 

32 

33 Both ``start`` and ``end`` **MUST** be "aware", i.e. contain timezone 

34 information. 

35 """ 

36 

37 start: DateTime 

38 end: DateTime 

39 

40 @classmethod 

41 def exact(cls, at: DateTime) -> DataInterval: 

42 """Represent an "interval" containing only an exact time.""" 

43 return cls(start=at, end=at) 

44 

45 

46class TimeRestriction(NamedTuple): 

47 """Restriction on when a DAG can be scheduled for a run. 

48 

49 Specifically, the run must not be earlier than ``earliest``, nor later than 

50 ``latest``. If ``catchup`` is *False*, the run must also not be earlier than 

51 the current time, i.e. "missed" schedules are not backfilled. 

52 

53 These values are generally set on the DAG or task's ``start_date``, 

54 ``end_date``, and ``catchup`` arguments. 

55 

56 Both ``earliest`` and ``latest``, if not *None*, are inclusive; a DAG run 

57 can happen exactly at either point of time. They are guaranteed to be aware 

58 (i.e. contain timezone information) for ``TimeRestriction`` instances 

59 created by Airflow. 

60 """ 

61 

62 earliest: DateTime | None 

63 latest: DateTime | None 

64 catchup: bool 

65 

66 

67class DagRunInfo(NamedTuple): 

68 """Information to schedule a DagRun. 

69 

70 Instances of this will be returned by timetables when they are asked to 

71 schedule a DagRun creation. 

72 """ 

73 

74 run_after: DateTime 

75 """The earliest time this DagRun is created and its tasks scheduled. 

76 

77 This **MUST** be "aware", i.e. contain timezone information. 

78 """ 

79 

80 data_interval: DataInterval 

81 """The data interval this DagRun to operate over.""" 

82 

83 @classmethod 

84 def exact(cls, at: DateTime) -> DagRunInfo: 

85 """Represent a run on an exact time.""" 

86 return cls(run_after=at, data_interval=DataInterval.exact(at)) 

87 

88 @classmethod 

89 def interval(cls, start: DateTime, end: DateTime) -> DagRunInfo: 

90 """Represent a run on a continuous schedule. 

91 

92 In such a schedule, each data interval starts right after the previous 

93 one ends, and each run is scheduled right after the interval ends. This 

94 applies to all schedules prior to AIP-39 except ``@once`` and ``None``. 

95 """ 

96 return cls(run_after=end, data_interval=DataInterval(start, end)) 

97 

98 @property 

99 def logical_date(self: DagRunInfo) -> DateTime: 

100 """Infer the logical date to represent a DagRun. 

101 

102 This replaces ``execution_date`` in Airflow 2.1 and prior. The idea is 

103 essentially the same, just a different name. 

104 """ 

105 return self.data_interval.start 

106 

107 

108@runtime_checkable 

109class Timetable(Protocol): 

110 """Protocol that all Timetable classes are expected to implement.""" 

111 

112 description: str = "" 

113 """Human-readable description of the timetable. 

114 

115 For example, this can produce something like ``'At 21:30, only on Friday'`` 

116 from the cron expression ``'30 21 * * 5'``. This is used in the webserver UI. 

117 """ 

118 

119 periodic: bool = True 

120 """Whether this timetable runs periodically. 

121 

122 This defaults to and should generally be *True*, but some special setups 

123 like ``schedule=None`` and ``"@once"`` set it to *False*. 

124 """ 

125 

126 _can_be_scheduled: bool = True 

127 

128 @property 

129 def can_be_scheduled(self): 

130 if hasattr(self, "can_run"): 

131 warn( 

132 'can_run class variable is deprecated. Use "can_be_scheduled" instead.', 

133 DeprecationWarning, 

134 stacklevel=2, 

135 ) 

136 return self.can_run 

137 return self._can_be_scheduled 

138 

139 """Whether this timetable can actually schedule runs in an automated manner. 

140 

141 This defaults to and should generally be *True* (including non periodic 

142 execution types like *@once* and data triggered tables), but 

143 ``NullTimetable`` sets this to *False*. 

144 """ 

145 

146 run_ordering: Sequence[str] = ("data_interval_end", "execution_date") 

147 """How runs triggered from this timetable should be ordered in UI. 

148 

149 This should be a list of field names on the DAG run object. 

150 """ 

151 

152 active_runs_limit: int | None = None 

153 """Override the max_active_runs parameter of any DAGs using this timetable. 

154 This is called during DAG initializing, and will set the max_active_runs if 

155 it returns a value. In most cases this should return None, but in some cases 

156 (for example, the ContinuousTimetable) there are good reasons for limiting 

157 the DAGRun parallelism. 

158 """ 

159 

160 @classmethod 

161 def deserialize(cls, data: dict[str, Any]) -> Timetable: 

162 """Deserialize a timetable from data. 

163 

164 This is called when a serialized DAG is deserialized. ``data`` will be 

165 whatever was returned by ``serialize`` during DAG serialization. The 

166 default implementation constructs the timetable without any arguments. 

167 """ 

168 return cls() 

169 

170 def serialize(self) -> dict[str, Any]: 

171 """Serialize the timetable for JSON encoding. 

172 

173 This is called during DAG serialization to store timetable information 

174 in the database. This should return a JSON-serializable dict that will 

175 be fed into ``deserialize`` when the DAG is deserialized. The default 

176 implementation returns an empty dict. 

177 """ 

178 return {} 

179 

180 def validate(self) -> None: 

181 """Validate the timetable is correctly specified. 

182 

183 Override this method to provide run-time validation raised when a DAG 

184 is put into a dagbag. The default implementation does nothing. 

185 

186 :raises: AirflowTimetableInvalid on validation failure. 

187 """ 

188 return 

189 

190 @property 

191 def summary(self) -> str: 

192 """A short summary for the timetable. 

193 

194 This is used to display the timetable in the web UI. A cron expression 

195 timetable, for example, can use this to display the expression. The 

196 default implementation returns the timetable's type name. 

197 """ 

198 return type(self).__name__ 

199 

200 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: 

201 """When a DAG run is manually triggered, infer a data interval for it. 

202 

203 This is used for e.g. manually-triggered runs, where ``run_after`` would 

204 be when the user triggers the run. The default implementation raises 

205 ``NotImplementedError``. 

206 """ 

207 raise NotImplementedError() 

208 

209 def next_dagrun_info( 

210 self, 

211 *, 

212 last_automated_data_interval: DataInterval | None, 

213 restriction: TimeRestriction, 

214 ) -> DagRunInfo | None: 

215 """Provide information to schedule the next DagRun. 

216 

217 The default implementation raises ``NotImplementedError``. 

218 

219 :param last_automated_data_interval: The data interval of the associated 

220 DAG's last scheduled or backfilled run (manual runs not considered). 

221 :param restriction: Restriction to apply when scheduling the DAG run. 

222 See documentation of :class:`TimeRestriction` for details. 

223 

224 :return: Information on when the next DagRun can be scheduled. None 

225 means a DagRun will not happen. This does not mean no more runs 

226 will be scheduled even again for this DAG; the timetable can return 

227 a DagRunInfo object when asked at another time. 

228 """ 

229 raise NotImplementedError() 

230 

231 def generate_run_id( 

232 self, 

233 *, 

234 run_type: DagRunType, 

235 logical_date: DateTime, 

236 data_interval: DataInterval | None, 

237 **extra, 

238 ) -> str: 

239 return run_type.generate_run_id(logical_date)