Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/timetables/_cron.py: 43%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import datetime
20from typing import TYPE_CHECKING, Any
22from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException
23from croniter import CroniterBadCronError, CroniterBadDateError, croniter
25from airflow.exceptions import AirflowTimetableInvalid
26from airflow.utils.dates import cron_presets
27from airflow.utils.timezone import convert_to_utc, make_aware, make_naive, parse_timezone
29if TYPE_CHECKING:
30 from pendulum import DateTime
31 from pendulum.tz.timezone import FixedTimezone, Timezone
34def _covers_every_hour(cron: croniter) -> bool:
35 """Check whether the given cron runs at least once an hour.
37 This indicates whether we need to implement a workaround for (what I call)
38 the "fold hour problem". Folding happens when a region switches time
39 backwards, usually as a part of ending a DST period, causing a block of time
40 to occur twice in the wall clock. This is indicated by the ``fold`` flag on
41 datetime.
43 As an example, Switzerland in 2023 ended DST on 3am (wall clock time, UTC+2)
44 by dialing back the clock to 2am (UTC+1). So for (say) ``30 * * * *``, if
45 the last run was 2:30am (UTC+2), the next needs to be 2:30am (UTC+1, folded)
46 instead of 3:30am.
48 While this technically happens for all cron schedules (in such a timezone),
49 we only care about schedules that create at least one run every hour, and
50 can provide a somewhat reasonable rationale to skip the fold hour for things
51 such as ``*/2`` (every two hours). Therefore, we try to *minially* peak into
52 croniter internals to work around the issue.
54 The check is simple since croniter internally normalizes things to ``*``.
55 More edge cases can be added later as needed.
57 See also: https://github.com/kiorky/croniter/issues/56.
58 """
59 return cron.expanded[1] == ["*"]
62class CronMixin:
63 """Mixin to provide interface to work with croniter."""
65 def __init__(self, cron: str, timezone: str | Timezone | FixedTimezone) -> None:
66 self._expression = cron_presets.get(cron, cron)
68 if isinstance(timezone, str):
69 timezone = parse_timezone(timezone)
70 self._timezone = timezone
72 try:
73 descriptor = ExpressionDescriptor(
74 expression=self._expression, casing_type=CasingTypeEnum.Sentence, use_24hour_time_format=True
75 )
76 # checking for more than 5 parameters in Cron and avoiding evaluation for now,
77 # as Croniter has inconsistent evaluation with other libraries
78 if len(croniter(self._expression).expanded) > 5:
79 raise FormatException()
80 interval_description: str = descriptor.get_description()
81 except (CroniterBadCronError, FormatException, MissingFieldException):
82 interval_description = ""
83 self.description: str = interval_description
85 def __eq__(self, other: Any) -> bool:
86 """Both expression and timezone should match.
88 This is only for testing purposes and should not be relied on otherwise.
89 """
90 if not isinstance(other, type(self)):
91 return NotImplemented
92 return self._expression == other._expression and self._timezone == other._timezone
94 @property
95 def summary(self) -> str:
96 return self._expression
98 def validate(self) -> None:
99 try:
100 croniter(self._expression)
101 except (CroniterBadCronError, CroniterBadDateError) as e:
102 raise AirflowTimetableInvalid(str(e))
104 def _get_next(self, current: DateTime) -> DateTime:
105 """Get the first schedule after specified time, with DST fixed."""
106 naive = make_naive(current, self._timezone)
107 cron = croniter(self._expression, start_time=naive)
108 scheduled = cron.get_next(datetime.datetime)
109 if not _covers_every_hour(cron):
110 return convert_to_utc(make_aware(scheduled, self._timezone))
111 delta = scheduled - naive
112 return convert_to_utc(current.in_timezone(self._timezone) + delta)
114 def _get_prev(self, current: DateTime) -> DateTime:
115 """Get the first schedule before specified time, with DST fixed."""
116 naive = make_naive(current, self._timezone)
117 cron = croniter(self._expression, start_time=naive)
118 scheduled = cron.get_prev(datetime.datetime)
119 if not _covers_every_hour(cron):
120 return convert_to_utc(make_aware(scheduled, self._timezone))
121 delta = naive - scheduled
122 return convert_to_utc(current.in_timezone(self._timezone) - delta)
124 def _align_to_next(self, current: DateTime) -> DateTime:
125 """Get the next scheduled time.
127 This is ``current + interval``, unless ``current`` falls right on the
128 interval boundary, when ``current`` is returned.
129 """
130 next_time = self._get_next(current)
131 if self._get_prev(next_time) != current:
132 return next_time
133 return current
135 def _align_to_prev(self, current: DateTime) -> DateTime:
136 """Get the prev scheduled time.
138 This is ``current - interval``, unless ``current`` falls right on the
139 interval boundary, when ``current`` is returned.
140 """
141 prev_time = self._get_prev(current)
142 if self._get_next(prev_time) != current:
143 return prev_time
144 return current