Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/timetables/base.py: 78%

49 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19from typing import TYPE_CHECKING, Any, NamedTuple, Sequence 

20 

21from pendulum import DateTime 

22 

23from airflow.typing_compat import Protocol, runtime_checkable 

24 

25if TYPE_CHECKING: 

26 from airflow.utils.types import DagRunType 

27 

28 

29class DataInterval(NamedTuple): 

30 """A data interval for a DagRun to operate over. 

31 

32 Both ``start`` and ``end`` **MUST** be "aware", i.e. contain timezone 

33 information. 

34 """ 

35 

36 start: DateTime 

37 end: DateTime 

38 

39 @classmethod 

40 def exact(cls, at: DateTime) -> DataInterval: 

41 """Represent an "interval" containing only an exact time.""" 

42 return cls(start=at, end=at) 

43 

44 

45class TimeRestriction(NamedTuple): 

46 """Restriction on when a DAG can be scheduled for a run. 

47 

48 Specifically, the run must not be earlier than ``earliest``, nor later than 

49 ``latest``. If ``catchup`` is *False*, the run must also not be earlier than 

50 the current time, i.e. "missed" schedules are not backfilled. 

51 

52 These values are generally set on the DAG or task's ``start_date``, 

53 ``end_date``, and ``catchup`` arguments. 

54 

55 Both ``earliest`` and ``latest``, if not *None*, are inclusive; a DAG run 

56 can happen exactly at either point of time. They are guaranteed to be aware 

57 (i.e. contain timezone information) for ``TimeRestriction`` instances 

58 created by Airflow. 

59 """ 

60 

61 earliest: DateTime | None 

62 latest: DateTime | None 

63 catchup: bool 

64 

65 

66class DagRunInfo(NamedTuple): 

67 """Information to schedule a DagRun. 

68 

69 Instances of this will be returned by timetables when they are asked to 

70 schedule a DagRun creation. 

71 """ 

72 

73 run_after: DateTime 

74 """The earliest time this DagRun is created and its tasks scheduled. 

75 

76 This **MUST** be "aware", i.e. contain timezone information. 

77 """ 

78 

79 data_interval: DataInterval 

80 """The data interval this DagRun to operate over.""" 

81 

82 @classmethod 

83 def exact(cls, at: DateTime) -> DagRunInfo: 

84 """Represent a run on an exact time.""" 

85 return cls(run_after=at, data_interval=DataInterval.exact(at)) 

86 

87 @classmethod 

88 def interval(cls, start: DateTime, end: DateTime) -> DagRunInfo: 

89 """Represent a run on a continuous schedule. 

90 

91 In such a schedule, each data interval starts right after the previous 

92 one ends, and each run is scheduled right after the interval ends. This 

93 applies to all schedules prior to AIP-39 except ``@once`` and ``None``. 

94 """ 

95 return cls(run_after=end, data_interval=DataInterval(start, end)) 

96 

97 @property 

98 def logical_date(self: DagRunInfo) -> DateTime: 

99 """Infer the logical date to represent a DagRun. 

100 

101 This replaces ``execution_date`` in Airflow 2.1 and prior. The idea is 

102 essentially the same, just a different name. 

103 """ 

104 return self.data_interval.start 

105 

106 

107@runtime_checkable 

108class Timetable(Protocol): 

109 """Protocol that all Timetable classes are expected to implement.""" 

110 

111 description: str = "" 

112 """Human-readable description of the timetable. 

113 

114 For example, this can produce something like ``'At 21:30, only on Friday'`` 

115 from the cron expression ``'30 21 * * 5'``. This is used in the webserver UI. 

116 """ 

117 

118 periodic: bool = True 

119 """Whether this timetable runs periodically. 

120 

121 This defaults to and should generally be *True*, but some special setups 

122 like ``schedule=None`` and ``"@once"`` set it to *False*. 

123 """ 

124 

125 can_run: bool = True 

126 """Whether this timetable can actually schedule runs. 

127 

128 This defaults to and should generally be *True*, but ``NullTimetable`` sets 

129 this to *False*. 

130 """ 

131 

132 run_ordering: Sequence[str] = ("data_interval_end", "execution_date") 

133 """How runs triggered from this timetable should be ordered in UI. 

134 

135 This should be a list of field names on the DAG run object. 

136 """ 

137 

138 @classmethod 

139 def deserialize(cls, data: dict[str, Any]) -> Timetable: 

140 """Deserialize a timetable from data. 

141 

142 This is called when a serialized DAG is deserialized. ``data`` will be 

143 whatever was returned by ``serialize`` during DAG serialization. The 

144 default implementation constructs the timetable without any arguments. 

145 """ 

146 return cls() 

147 

148 def serialize(self) -> dict[str, Any]: 

149 """Serialize the timetable for JSON encoding. 

150 

151 This is called during DAG serialization to store timetable information 

152 in the database. This should return a JSON-serializable dict that will 

153 be fed into ``deserialize`` when the DAG is deserialized. The default 

154 implementation returns an empty dict. 

155 """ 

156 return {} 

157 

158 def validate(self) -> None: 

159 """Validate the timetable is correctly specified. 

160 

161 Override this method to provide run-time validation raised when a DAG 

162 is put into a dagbag. The default implementation does nothing. 

163 

164 :raises: AirflowTimetableInvalid on validation failure. 

165 """ 

166 

167 @property 

168 def summary(self) -> str: 

169 """A short summary for the timetable. 

170 

171 This is used to display the timetable in the web UI. A cron expression 

172 timetable, for example, can use this to display the expression. The 

173 default implementation returns the timetable's type name. 

174 """ 

175 return type(self).__name__ 

176 

177 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: 

178 """When a DAG run is manually triggered, infer a data interval for it. 

179 

180 This is used for e.g. manually-triggered runs, where ``run_after`` would 

181 be when the user triggers the run. The default implementation raises 

182 ``NotImplementedError``. 

183 """ 

184 raise NotImplementedError() 

185 

186 def next_dagrun_info( 

187 self, 

188 *, 

189 last_automated_data_interval: DataInterval | None, 

190 restriction: TimeRestriction, 

191 ) -> DagRunInfo | None: 

192 """Provide information to schedule the next DagRun. 

193 

194 The default implementation raises ``NotImplementedError``. 

195 

196 :param last_automated_data_interval: The data interval of the associated 

197 DAG's last scheduled or backfilled run (manual runs not considered). 

198 :param restriction: Restriction to apply when scheduling the DAG run. 

199 See documentation of :class:`TimeRestriction` for details. 

200 

201 :return: Information on when the next DagRun can be scheduled. None 

202 means a DagRun will not happen. This does not mean no more runs 

203 will be scheduled even again for this DAG; the timetable can return 

204 a DagRunInfo object when asked at another time. 

205 """ 

206 raise NotImplementedError() 

207 

208 def generate_run_id( 

209 self, 

210 *, 

211 run_type: DagRunType, 

212 logical_date: DateTime, 

213 data_interval: DataInterval | None, 

214 **extra, 

215 ) -> str: 

216 return run_type.generate_run_id(logical_date)