Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/dates.py: 19%

105 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import warnings 

21from datetime import datetime, timedelta 

22from typing import Collection 

23 

24from croniter import croniter 

25from dateutil.relativedelta import relativedelta # for doctest 

26 

27from airflow.exceptions import RemovedInAirflow3Warning 

28from airflow.typing_compat import Literal 

29from airflow.utils import timezone 

30 

31cron_presets: dict[str, str] = { 

32 "@hourly": "0 * * * *", 

33 "@daily": "0 0 * * *", 

34 "@weekly": "0 0 * * 0", 

35 "@monthly": "0 0 1 * *", 

36 "@quarterly": "0 0 1 */3 *", 

37 "@yearly": "0 0 1 1 *", 

38} 

39 

40 

41def date_range( 

42 start_date: datetime, 

43 end_date: datetime | None = None, 

44 num: int | None = None, 

45 delta: str | timedelta | relativedelta | None = None, 

46) -> list[datetime]: 

47 """Get a list of dates in the specified range, separated by delta. 

48 

49 .. code-block:: pycon 

50 >>> from airflow.utils.dates import date_range 

51 >>> from datetime import datetime, timedelta 

52 >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1)) 

53 [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 

54 datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')), 

55 datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))] 

56 >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta="0 0 * * *") 

57 [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 

58 datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')), 

59 datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))] 

60 >>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *") 

61 [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), 

62 datetime.datetime(2016, 2, 1, 0, 0, tzinfo=Timezone('UTC')), 

63 datetime.datetime(2016, 3, 1, 0, 0, tzinfo=Timezone('UTC'))] 

64 

65 :param start_date: anchor date to start the series from 

66 :param end_date: right boundary for the date range 

67 :param num: alternatively to end_date, you can specify the number of 

68 number of entries you want in the range. This number can be negative, 

69 output will always be sorted regardless 

70 :param delta: step length. It can be datetime.timedelta or cron expression as string 

71 """ 

72 warnings.warn( 

73 "`airflow.utils.dates.date_range()` is deprecated. Please use `airflow.timetables`.", 

74 category=RemovedInAirflow3Warning, 

75 stacklevel=2, 

76 ) 

77 

78 if not delta: 

79 return [] 

80 if end_date: 

81 if start_date > end_date: 

82 raise Exception("Wait. start_date needs to be before end_date") 

83 if num: 

84 raise Exception("Wait. Either specify end_date OR num") 

85 if not end_date and not num: 

86 end_date = timezone.utcnow() 

87 

88 delta_iscron = False 

89 time_zone = start_date.tzinfo 

90 

91 abs_delta: timedelta | relativedelta 

92 if isinstance(delta, str): 

93 delta_iscron = True 

94 if timezone.is_localized(start_date): 

95 start_date = timezone.make_naive(start_date, time_zone) 

96 cron = croniter(cron_presets.get(delta, delta), start_date) 

97 elif isinstance(delta, timedelta): 

98 abs_delta = abs(delta) 

99 elif isinstance(delta, relativedelta): 

100 abs_delta = abs(delta) 

101 else: 

102 raise Exception("Wait. delta must be either datetime.timedelta or cron expression as str") 

103 

104 dates = [] 

105 if end_date: 

106 if timezone.is_naive(start_date) and not timezone.is_naive(end_date): 

107 end_date = timezone.make_naive(end_date, time_zone) 

108 while start_date <= end_date: # type: ignore 

109 if timezone.is_naive(start_date): 

110 dates.append(timezone.make_aware(start_date, time_zone)) 

111 else: 

112 dates.append(start_date) 

113 

114 if delta_iscron: 

115 start_date = cron.get_next(datetime) 

116 else: 

117 start_date += abs_delta 

118 else: 

119 num_entries: int = num # type: ignore 

120 for _ in range(abs(num_entries)): 

121 if timezone.is_naive(start_date): 

122 dates.append(timezone.make_aware(start_date, time_zone)) 

123 else: 

124 dates.append(start_date) 

125 

126 if delta_iscron and num_entries > 0: 

127 start_date = cron.get_next(datetime) 

128 elif delta_iscron: 

129 start_date = cron.get_prev(datetime) 

130 elif num_entries > 0: 

131 start_date += abs_delta 

132 else: 

133 start_date -= abs_delta 

134 

135 return sorted(dates) 

136 

137 

138def round_time( 

139 dt: datetime, 

140 delta: str | timedelta | relativedelta, 

141 start_date: datetime = timezone.make_aware(datetime.min), 

142): 

143 """Returns ``start_date + i * delta`` for given ``i`` where the result is closest to ``dt``. 

144 

145 .. code-block:: pycon 

146 

147 >>> round_time(datetime(2015, 1, 1, 6), timedelta(days=1)) 

148 datetime.datetime(2015, 1, 1, 0, 0) 

149 >>> round_time(datetime(2015, 1, 2), relativedelta(months=1)) 

150 datetime.datetime(2015, 1, 1, 0, 0) 

151 >>> round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) 

152 datetime.datetime(2015, 9, 16, 0, 0) 

153 >>> round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) 

154 datetime.datetime(2015, 9, 15, 0, 0) 

155 >>> round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) 

156 datetime.datetime(2015, 9, 14, 0, 0) 

157 >>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) 

158 datetime.datetime(2015, 9, 14, 0, 0) 

159 """ 

160 if isinstance(delta, str): 

161 # It's cron based, so it's easy 

162 time_zone = start_date.tzinfo 

163 start_date = timezone.make_naive(start_date, time_zone) 

164 cron = croniter(delta, start_date) 

165 prev = cron.get_prev(datetime) 

166 if prev == start_date: 

167 return timezone.make_aware(start_date, time_zone) 

168 else: 

169 return timezone.make_aware(prev, time_zone) 

170 

171 # Ignore the microseconds of dt 

172 dt -= timedelta(microseconds=dt.microsecond) 

173 

174 # We are looking for a datetime in the form start_date + i * delta 

175 # which is as close as possible to dt. Since delta could be a relative 

176 # delta we don't know its exact length in seconds so we cannot rely on 

177 # division to find i. Instead we employ a binary search algorithm, first 

178 # finding an upper and lower limit and then dissecting the interval until 

179 # we have found the closest match. 

180 

181 # We first search an upper limit for i for which start_date + upper * delta 

182 # exceeds dt. 

183 upper = 1 

184 while start_date + upper * delta < dt: 

185 # To speed up finding an upper limit we grow this exponentially by a 

186 # factor of 2 

187 upper *= 2 

188 

189 # Since upper is the first value for which start_date + upper * delta 

190 # exceeds dt, upper // 2 is below dt and therefore forms a lower limited 

191 # for the i we are looking for 

192 lower = upper // 2 

193 

194 # We now continue to intersect the interval between 

195 # start_date + lower * delta and start_date + upper * delta 

196 # until we find the closest value 

197 while True: 

198 # Invariant: start + lower * delta < dt <= start + upper * delta 

199 # If start_date + (lower + 1)*delta exceeds dt, then either lower or 

200 # lower+1 has to be the solution we are searching for 

201 if start_date + (lower + 1) * delta >= dt: 

202 # Check if start_date + (lower + 1)*delta or 

203 # start_date + lower*delta is closer to dt and return the solution 

204 if (start_date + (lower + 1) * delta) - dt <= dt - (start_date + lower * delta): 

205 return start_date + (lower + 1) * delta 

206 else: 

207 return start_date + lower * delta 

208 

209 # We intersect the interval and either replace the lower or upper 

210 # limit with the candidate 

211 candidate = lower + (upper - lower) // 2 

212 if start_date + candidate * delta >= dt: 

213 upper = candidate 

214 else: 

215 lower = candidate 

216 

217 # in the special case when start_date > dt the search for upper will 

218 # immediately stop for upper == 1 which results in lower = upper // 2 = 0 

219 # and this function returns start_date. 

220 

221 

222TimeUnit = Literal["days", "hours", "minutes", "seconds"] 

223 

224 

225def infer_time_unit(time_seconds_arr: Collection[float]) -> TimeUnit: 

226 """Determine the most appropriate time unit for given durations (in seconds). 

227 

228 e.g. 5400 seconds => 'minutes', 36000 seconds => 'hours' 

229 """ 

230 if len(time_seconds_arr) == 0: 

231 return "hours" 

232 max_time_seconds = max(time_seconds_arr) 

233 if max_time_seconds <= 60 * 2: 

234 return "seconds" 

235 elif max_time_seconds <= 60 * 60 * 2: 

236 return "minutes" 

237 elif max_time_seconds <= 24 * 60 * 60 * 2: 

238 return "hours" 

239 else: 

240 return "days" 

241 

242 

243def scale_time_units(time_seconds_arr: Collection[float], unit: TimeUnit) -> Collection[float]: 

244 """Convert an array of time durations in seconds to the specified time unit.""" 

245 if unit == "minutes": 

246 return list(map(lambda x: x / 60, time_seconds_arr)) 

247 elif unit == "hours": 

248 return list(map(lambda x: x / (60 * 60), time_seconds_arr)) 

249 elif unit == "days": 

250 return list(map(lambda x: x / (24 * 60 * 60), time_seconds_arr)) 

251 return time_seconds_arr 

252 

253 

254def days_ago(n, hour=0, minute=0, second=0, microsecond=0): 

255 """Get a datetime object representing *n* days ago. 

256 

257 By default the time is set to midnight. 

258 """ 

259 warnings.warn( 

260 "Function `days_ago` is deprecated and will be removed in Airflow 3.0. " 

261 "You can achieve equivalent behavior with `pendulum.today('UTC').add(days=-N, ...)`", 

262 RemovedInAirflow3Warning, 

263 stacklevel=2, 

264 ) 

265 

266 today = timezone.utcnow().replace(hour=hour, minute=minute, second=second, microsecond=microsecond) 

267 return today - timedelta(days=n) 

268 

269 

270def parse_execution_date(execution_date_str): 

271 """Parse execution date string to datetime object.""" 

272 return timezone.parse(execution_date_str)