Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/timetables/trigger.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

47 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import datetime 

20from typing import TYPE_CHECKING, Any 

21 

22from airflow.timetables._cron import CronMixin 

23from airflow.timetables.base import DagRunInfo, DataInterval, Timetable 

24from airflow.utils import timezone 

25 

26if TYPE_CHECKING: 

27 from dateutil.relativedelta import relativedelta 

28 from pendulum import DateTime 

29 from pendulum.tz.timezone import FixedTimezone, Timezone 

30 

31 from airflow.timetables.base import TimeRestriction 

32 

33 

34class CronTriggerTimetable(CronMixin, Timetable): 

35 """Timetable that triggers DAG runs according to a cron expression. 

36 

37 This is different from ``CronDataIntervalTimetable``, where the cron 

38 expression specifies the *data interval* of a DAG run. With this timetable, 

39 the data intervals are specified independently from the cron expression. 

40 Also for the same reason, this timetable kicks off a DAG run immediately at 

41 the start of the period (similar to POSIX cron), instead of needing to wait 

42 for one data interval to pass. 

43 

44 Don't pass ``@once`` in here; use ``OnceTimetable`` instead. 

45 """ 

46 

47 def __init__( 

48 self, 

49 cron: str, 

50 *, 

51 timezone: str | Timezone | FixedTimezone, 

52 interval: datetime.timedelta | relativedelta = datetime.timedelta(), 

53 ) -> None: 

54 super().__init__(cron, timezone) 

55 self._interval = interval 

56 

57 @classmethod 

58 def deserialize(cls, data: dict[str, Any]) -> Timetable: 

59 from airflow.serialization.serialized_objects import decode_relativedelta, decode_timezone 

60 

61 interval: datetime.timedelta | relativedelta 

62 if isinstance(data["interval"], dict): 

63 interval = decode_relativedelta(data["interval"]) 

64 else: 

65 interval = datetime.timedelta(seconds=data["interval"]) 

66 return cls(data["expression"], timezone=decode_timezone(data["timezone"]), interval=interval) 

67 

68 def serialize(self) -> dict[str, Any]: 

69 from airflow.serialization.serialized_objects import encode_relativedelta, encode_timezone 

70 

71 interval: float | dict[str, Any] 

72 if isinstance(self._interval, datetime.timedelta): 

73 interval = self._interval.total_seconds() 

74 else: 

75 interval = encode_relativedelta(self._interval) 

76 timezone = encode_timezone(self._timezone) 

77 return {"expression": self._expression, "timezone": timezone, "interval": interval} 

78 

79 def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: 

80 return DataInterval( 

81 # pendulum.Datetime ± timedelta should return pendulum.Datetime 

82 # however mypy decide that output would be datetime.datetime 

83 run_after - self._interval, # type: ignore[arg-type] 

84 run_after, 

85 ) 

86 

87 def next_dagrun_info( 

88 self, 

89 *, 

90 last_automated_data_interval: DataInterval | None, 

91 restriction: TimeRestriction, 

92 ) -> DagRunInfo | None: 

93 if restriction.catchup: 

94 if last_automated_data_interval is not None: 

95 next_start_time = self._get_next(last_automated_data_interval.end) 

96 elif restriction.earliest is None: 

97 return None # Don't know where to catch up from, give up. 

98 else: 

99 next_start_time = self._align_to_next(restriction.earliest) 

100 else: 

101 start_time_candidates = [self._align_to_prev(timezone.coerce_datetime(timezone.utcnow()))] 

102 if last_automated_data_interval is not None: 

103 start_time_candidates.append(self._get_next(last_automated_data_interval.end)) 

104 if restriction.earliest is not None: 

105 start_time_candidates.append(self._align_to_next(restriction.earliest)) 

106 next_start_time = max(start_time_candidates) 

107 if restriction.latest is not None and restriction.latest < next_start_time: 

108 return None 

109 return DagRunInfo.interval( 

110 # pendulum.Datetime ± timedelta should return pendulum.Datetime 

111 # however mypy decide that output would be datetime.datetime 

112 next_start_time - self._interval, # type: ignore[arg-type] 

113 next_start_time, 

114 )