Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/timetables/_cron.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

65 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import datetime 

20from typing import TYPE_CHECKING, Any 

21 

22from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException 

23from croniter import CroniterBadCronError, CroniterBadDateError, croniter 

24 

25from airflow.exceptions import AirflowTimetableInvalid 

26from airflow.utils.dates import cron_presets 

27from airflow.utils.timezone import convert_to_utc, make_aware, make_naive, parse_timezone 

28 

29if TYPE_CHECKING: 

30 from pendulum import DateTime 

31 from pendulum.tz.timezone import FixedTimezone, Timezone 

32 

33 

34def _covers_every_hour(cron: croniter) -> bool: 

35 """Check whether the given cron runs at least once an hour. 

36 

37 This indicates whether we need to implement a workaround for (what I call) 

38 the "fold hour problem". Folding happens when a region switches time 

39 backwards, usually as a part of ending a DST period, causing a block of time 

40 to occur twice in the wall clock. This is indicated by the ``fold`` flag on 

41 datetime. 

42 

43 As an example, Switzerland in 2023 ended DST on 3am (wall clock time, UTC+2) 

44 by dialing back the clock to 2am (UTC+1). So for (say) ``30 * * * *``, if 

45 the last run was 2:30am (UTC+2), the next needs to be 2:30am (UTC+1, folded) 

46 instead of 3:30am. 

47 

48 While this technically happens for all cron schedules (in such a timezone), 

49 we only care about schedules that create at least one run every hour, and 

50 can provide a somewhat reasonable rationale to skip the fold hour for things 

51 such as ``*/2`` (every two hours). Therefore, we try to *minially* peak into 

52 croniter internals to work around the issue. 

53 

54 The check is simple since croniter internally normalizes things to ``*``. 

55 More edge cases can be added later as needed. 

56 

57 See also: https://github.com/kiorky/croniter/issues/56. 

58 """ 

59 return cron.expanded[1] == ["*"] 

60 

61 

62class CronMixin: 

63 """Mixin to provide interface to work with croniter.""" 

64 

65 def __init__(self, cron: str, timezone: str | Timezone | FixedTimezone) -> None: 

66 self._expression = cron_presets.get(cron, cron) 

67 

68 if isinstance(timezone, str): 

69 timezone = parse_timezone(timezone) 

70 self._timezone = timezone 

71 

72 try: 

73 descriptor = ExpressionDescriptor( 

74 expression=self._expression, casing_type=CasingTypeEnum.Sentence, use_24hour_time_format=True 

75 ) 

76 # checking for more than 5 parameters in Cron and avoiding evaluation for now, 

77 # as Croniter has inconsistent evaluation with other libraries 

78 if len(croniter(self._expression).expanded) > 5: 

79 raise FormatException() 

80 interval_description: str = descriptor.get_description() 

81 except (CroniterBadCronError, FormatException, MissingFieldException): 

82 interval_description = "" 

83 self.description: str = interval_description 

84 

85 def __eq__(self, other: Any) -> bool: 

86 """Both expression and timezone should match. 

87 

88 This is only for testing purposes and should not be relied on otherwise. 

89 """ 

90 if not isinstance(other, type(self)): 

91 return NotImplemented 

92 return self._expression == other._expression and self._timezone == other._timezone 

93 

94 @property 

95 def summary(self) -> str: 

96 return self._expression 

97 

98 def validate(self) -> None: 

99 try: 

100 croniter(self._expression) 

101 except (CroniterBadCronError, CroniterBadDateError) as e: 

102 raise AirflowTimetableInvalid(str(e)) 

103 

104 def _get_next(self, current: DateTime) -> DateTime: 

105 """Get the first schedule after specified time, with DST fixed.""" 

106 naive = make_naive(current, self._timezone) 

107 cron = croniter(self._expression, start_time=naive) 

108 scheduled = cron.get_next(datetime.datetime) 

109 if not _covers_every_hour(cron): 

110 return convert_to_utc(make_aware(scheduled, self._timezone)) 

111 delta = scheduled - naive 

112 return convert_to_utc(current.in_timezone(self._timezone) + delta) 

113 

114 def _get_prev(self, current: DateTime) -> DateTime: 

115 """Get the first schedule before specified time, with DST fixed.""" 

116 naive = make_naive(current, self._timezone) 

117 cron = croniter(self._expression, start_time=naive) 

118 scheduled = cron.get_prev(datetime.datetime) 

119 if not _covers_every_hour(cron): 

120 return convert_to_utc(make_aware(scheduled, self._timezone)) 

121 delta = naive - scheduled 

122 return convert_to_utc(current.in_timezone(self._timezone) - delta) 

123 

124 def _align_to_next(self, current: DateTime) -> DateTime: 

125 """Get the next scheduled time. 

126 

127 This is ``current + interval``, unless ``current`` falls right on the 

128 interval boundary, when ``current`` is returned. 

129 """ 

130 next_time = self._get_next(current) 

131 if self._get_prev(next_time) != current: 

132 return next_time 

133 return current 

134 

135 def _align_to_prev(self, current: DateTime) -> DateTime: 

136 """Get the prev scheduled time. 

137 

138 This is ``current - interval``, unless ``current`` falls right on the 

139 interval boundary, when ``current`` is returned. 

140 """ 

141 prev_time = self._get_prev(current) 

142 if self._get_next(prev_time) != current: 

143 return prev_time 

144 return current