Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/timetables/_cron.py: 45%

71 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import datetime 

20from functools import cached_property 

21from typing import Any 

22 

23from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException 

24from croniter import CroniterBadCronError, CroniterBadDateError, croniter 

25from pendulum import DateTime 

26from pendulum.tz.timezone import Timezone 

27 

28from airflow.exceptions import AirflowTimetableInvalid 

29from airflow.utils.dates import cron_presets 

30from airflow.utils.timezone import convert_to_utc, make_aware, make_naive 

31 

32 

33def _is_schedule_fixed(expression: str) -> bool: 

34 """Figures out if the schedule has a fixed time (e.g. 3 AM every day). 

35 

36 :return: True if the schedule has a fixed time, False if not. 

37 

38 Detection is done by "peeking" the next two cron trigger time; if the 

39 two times have the same minute and hour value, the schedule is fixed, 

40 and we *don't* need to perform the DST fix. 

41 

42 This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00). 

43 """ 

44 cron = croniter(expression) 

45 next_a = cron.get_next(datetime.datetime) 

46 next_b = cron.get_next(datetime.datetime) 

47 return next_b.minute == next_a.minute and next_b.hour == next_a.hour 

48 

49 

50class CronMixin: 

51 """Mixin to provide interface to work with croniter.""" 

52 

53 def __init__(self, cron: str, timezone: str | Timezone) -> None: 

54 self._expression = cron_presets.get(cron, cron) 

55 

56 if isinstance(timezone, str): 

57 timezone = Timezone(timezone) 

58 self._timezone = timezone 

59 

60 descriptor = ExpressionDescriptor( 

61 expression=self._expression, casing_type=CasingTypeEnum.Sentence, use_24hour_time_format=True 

62 ) 

63 try: 

64 # checking for more than 5 parameters in Cron and avoiding evaluation for now, 

65 # as Croniter has inconsistent evaluation with other libraries 

66 if len(croniter(self._expression).expanded) > 5: 

67 raise FormatException() 

68 interval_description = descriptor.get_description() 

69 except (CroniterBadCronError, FormatException, MissingFieldException): 

70 interval_description = "" 

71 self.description = interval_description 

72 

73 def __eq__(self, other: Any) -> bool: 

74 """Both expression and timezone should match. 

75 

76 This is only for testing purposes and should not be relied on otherwise. 

77 """ 

78 if not isinstance(other, type(self)): 

79 return NotImplemented 

80 return self._expression == other._expression and self._timezone == other._timezone 

81 

82 @property 

83 def summary(self) -> str: 

84 return self._expression 

85 

86 def validate(self) -> None: 

87 try: 

88 croniter(self._expression) 

89 except (CroniterBadCronError, CroniterBadDateError) as e: 

90 raise AirflowTimetableInvalid(str(e)) 

91 

92 @cached_property 

93 def _should_fix_dst(self) -> bool: 

94 # This is lazy so instantiating a schedule does not immediately raise 

95 # an exception. Validity is checked with validate() during DAG-bagging. 

96 return not _is_schedule_fixed(self._expression) 

97 

98 def _get_next(self, current: DateTime) -> DateTime: 

99 """Get the first schedule after specified time, with DST fixed.""" 

100 naive = make_naive(current, self._timezone) 

101 cron = croniter(self._expression, start_time=naive) 

102 scheduled = cron.get_next(datetime.datetime) 

103 if not self._should_fix_dst: 

104 return convert_to_utc(make_aware(scheduled, self._timezone)) 

105 delta = scheduled - naive 

106 return convert_to_utc(current.in_timezone(self._timezone) + delta) 

107 

108 def _get_prev(self, current: DateTime) -> DateTime: 

109 """Get the first schedule before specified time, with DST fixed.""" 

110 naive = make_naive(current, self._timezone) 

111 cron = croniter(self._expression, start_time=naive) 

112 scheduled = cron.get_prev(datetime.datetime) 

113 if not self._should_fix_dst: 

114 return convert_to_utc(make_aware(scheduled, self._timezone)) 

115 delta = naive - scheduled 

116 return convert_to_utc(current.in_timezone(self._timezone) - delta) 

117 

118 def _align_to_next(self, current: DateTime) -> DateTime: 

119 """Get the next scheduled time. 

120 

121 This is ``current + interval``, unless ``current`` falls right on the 

122 interval boundary, when ``current`` is returned. 

123 """ 

124 next_time = self._get_next(current) 

125 if self._get_prev(next_time) != current: 

126 return next_time 

127 return current 

128 

129 def _align_to_prev(self, current: DateTime) -> DateTime: 

130 """Get the prev scheduled time. 

131 

132 This is ``current - interval``, unless ``current`` falls right on the 

133 interval boundary, when ``current`` is returned. 

134 """ 

135 prev_time = self._get_prev(current) 

136 if self._get_next(prev_time) != current: 

137 return prev_time 

138 return current