Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/dates.py: 19%
105 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import warnings
21from datetime import datetime, timedelta
22from typing import Collection
24from croniter import croniter
25from dateutil.relativedelta import relativedelta # for doctest
27from airflow.exceptions import RemovedInAirflow3Warning
28from airflow.typing_compat import Literal
29from airflow.utils import timezone
31cron_presets: dict[str, str] = {
32 "@hourly": "0 * * * *",
33 "@daily": "0 0 * * *",
34 "@weekly": "0 0 * * 0",
35 "@monthly": "0 0 1 * *",
36 "@quarterly": "0 0 1 */3 *",
37 "@yearly": "0 0 1 1 *",
38}
41def date_range(
42 start_date: datetime,
43 end_date: datetime | None = None,
44 num: int | None = None,
45 delta: str | timedelta | relativedelta | None = None,
46) -> list[datetime]:
47 """Get a list of dates in the specified range, separated by delta.
49 .. code-block:: pycon
50 >>> from airflow.utils.dates import date_range
51 >>> from datetime import datetime, timedelta
52 >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1))
53 [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
54 datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')),
55 datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))]
56 >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta="0 0 * * *")
57 [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
58 datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')),
59 datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))]
60 >>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *")
61 [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
62 datetime.datetime(2016, 2, 1, 0, 0, tzinfo=Timezone('UTC')),
63 datetime.datetime(2016, 3, 1, 0, 0, tzinfo=Timezone('UTC'))]
65 :param start_date: anchor date to start the series from
66 :param end_date: right boundary for the date range
67 :param num: alternatively to end_date, you can specify the number of
68 number of entries you want in the range. This number can be negative,
69 output will always be sorted regardless
70 :param delta: step length. It can be datetime.timedelta or cron expression as string
71 """
72 warnings.warn(
73 "`airflow.utils.dates.date_range()` is deprecated. Please use `airflow.timetables`.",
74 category=RemovedInAirflow3Warning,
75 stacklevel=2,
76 )
78 if not delta:
79 return []
80 if end_date:
81 if start_date > end_date:
82 raise Exception("Wait. start_date needs to be before end_date")
83 if num:
84 raise Exception("Wait. Either specify end_date OR num")
85 if not end_date and not num:
86 end_date = timezone.utcnow()
88 delta_iscron = False
89 time_zone = start_date.tzinfo
91 abs_delta: timedelta | relativedelta
92 if isinstance(delta, str):
93 delta_iscron = True
94 if timezone.is_localized(start_date):
95 start_date = timezone.make_naive(start_date, time_zone)
96 cron = croniter(cron_presets.get(delta, delta), start_date)
97 elif isinstance(delta, timedelta):
98 abs_delta = abs(delta)
99 elif isinstance(delta, relativedelta):
100 abs_delta = abs(delta)
101 else:
102 raise Exception("Wait. delta must be either datetime.timedelta or cron expression as str")
104 dates = []
105 if end_date:
106 if timezone.is_naive(start_date) and not timezone.is_naive(end_date):
107 end_date = timezone.make_naive(end_date, time_zone)
108 while start_date <= end_date: # type: ignore
109 if timezone.is_naive(start_date):
110 dates.append(timezone.make_aware(start_date, time_zone))
111 else:
112 dates.append(start_date)
114 if delta_iscron:
115 start_date = cron.get_next(datetime)
116 else:
117 start_date += abs_delta
118 else:
119 num_entries: int = num # type: ignore
120 for _ in range(abs(num_entries)):
121 if timezone.is_naive(start_date):
122 dates.append(timezone.make_aware(start_date, time_zone))
123 else:
124 dates.append(start_date)
126 if delta_iscron and num_entries > 0:
127 start_date = cron.get_next(datetime)
128 elif delta_iscron:
129 start_date = cron.get_prev(datetime)
130 elif num_entries > 0:
131 start_date += abs_delta
132 else:
133 start_date -= abs_delta
135 return sorted(dates)
138def round_time(
139 dt: datetime,
140 delta: str | timedelta | relativedelta,
141 start_date: datetime = timezone.make_aware(datetime.min),
142):
143 """Returns ``start_date + i * delta`` for given ``i`` where the result is closest to ``dt``.
145 .. code-block:: pycon
147 >>> round_time(datetime(2015, 1, 1, 6), timedelta(days=1))
148 datetime.datetime(2015, 1, 1, 0, 0)
149 >>> round_time(datetime(2015, 1, 2), relativedelta(months=1))
150 datetime.datetime(2015, 1, 1, 0, 0)
151 >>> round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
152 datetime.datetime(2015, 9, 16, 0, 0)
153 >>> round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
154 datetime.datetime(2015, 9, 15, 0, 0)
155 >>> round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
156 datetime.datetime(2015, 9, 14, 0, 0)
157 >>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
158 datetime.datetime(2015, 9, 14, 0, 0)
159 """
160 if isinstance(delta, str):
161 # It's cron based, so it's easy
162 time_zone = start_date.tzinfo
163 start_date = timezone.make_naive(start_date, time_zone)
164 cron = croniter(delta, start_date)
165 prev = cron.get_prev(datetime)
166 if prev == start_date:
167 return timezone.make_aware(start_date, time_zone)
168 else:
169 return timezone.make_aware(prev, time_zone)
171 # Ignore the microseconds of dt
172 dt -= timedelta(microseconds=dt.microsecond)
174 # We are looking for a datetime in the form start_date + i * delta
175 # which is as close as possible to dt. Since delta could be a relative
176 # delta we don't know its exact length in seconds so we cannot rely on
177 # division to find i. Instead we employ a binary search algorithm, first
178 # finding an upper and lower limit and then dissecting the interval until
179 # we have found the closest match.
181 # We first search an upper limit for i for which start_date + upper * delta
182 # exceeds dt.
183 upper = 1
184 while start_date + upper * delta < dt:
185 # To speed up finding an upper limit we grow this exponentially by a
186 # factor of 2
187 upper *= 2
189 # Since upper is the first value for which start_date + upper * delta
190 # exceeds dt, upper // 2 is below dt and therefore forms a lower limited
191 # for the i we are looking for
192 lower = upper // 2
194 # We now continue to intersect the interval between
195 # start_date + lower * delta and start_date + upper * delta
196 # until we find the closest value
197 while True:
198 # Invariant: start + lower * delta < dt <= start + upper * delta
199 # If start_date + (lower + 1)*delta exceeds dt, then either lower or
200 # lower+1 has to be the solution we are searching for
201 if start_date + (lower + 1) * delta >= dt:
202 # Check if start_date + (lower + 1)*delta or
203 # start_date + lower*delta is closer to dt and return the solution
204 if (start_date + (lower + 1) * delta) - dt <= dt - (start_date + lower * delta):
205 return start_date + (lower + 1) * delta
206 else:
207 return start_date + lower * delta
209 # We intersect the interval and either replace the lower or upper
210 # limit with the candidate
211 candidate = lower + (upper - lower) // 2
212 if start_date + candidate * delta >= dt:
213 upper = candidate
214 else:
215 lower = candidate
217 # in the special case when start_date > dt the search for upper will
218 # immediately stop for upper == 1 which results in lower = upper // 2 = 0
219 # and this function returns start_date.
222TimeUnit = Literal["days", "hours", "minutes", "seconds"]
225def infer_time_unit(time_seconds_arr: Collection[float]) -> TimeUnit:
226 """Determine the most appropriate time unit for given durations (in seconds).
228 e.g. 5400 seconds => 'minutes', 36000 seconds => 'hours'
229 """
230 if len(time_seconds_arr) == 0:
231 return "hours"
232 max_time_seconds = max(time_seconds_arr)
233 if max_time_seconds <= 60 * 2:
234 return "seconds"
235 elif max_time_seconds <= 60 * 60 * 2:
236 return "minutes"
237 elif max_time_seconds <= 24 * 60 * 60 * 2:
238 return "hours"
239 else:
240 return "days"
243def scale_time_units(time_seconds_arr: Collection[float], unit: TimeUnit) -> Collection[float]:
244 """Convert an array of time durations in seconds to the specified time unit."""
245 if unit == "minutes":
246 return list(map(lambda x: x / 60, time_seconds_arr))
247 elif unit == "hours":
248 return list(map(lambda x: x / (60 * 60), time_seconds_arr))
249 elif unit == "days":
250 return list(map(lambda x: x / (24 * 60 * 60), time_seconds_arr))
251 return time_seconds_arr
254def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
255 """Get a datetime object representing *n* days ago.
257 By default the time is set to midnight.
258 """
259 warnings.warn(
260 "Function `days_ago` is deprecated and will be removed in Airflow 3.0. "
261 "You can achieve equivalent behavior with `pendulum.today('UTC').add(days=-N, ...)`",
262 RemovedInAirflow3Warning,
263 stacklevel=2,
264 )
266 today = timezone.utcnow().replace(hour=hour, minute=minute, second=second, microsecond=microsecond)
267 return today - timedelta(days=n)
270def parse_execution_date(execution_date_str):
271 """Parse execution date string to datetime object."""
272 return timezone.parse(execution_date_str)