Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/timetables/datasets.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

44 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import typing 

21 

22from airflow.datasets import BaseDataset, DatasetAll 

23from airflow.exceptions import AirflowTimetableInvalid 

24from airflow.timetables.simple import DatasetTriggeredTimetable as DatasetTriggeredSchedule 

25from airflow.utils.types import DagRunType 

26 

27if typing.TYPE_CHECKING: 

28 from collections.abc import Collection 

29 

30 import pendulum 

31 

32 from airflow.datasets import Dataset 

33 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable 

34 

35 

36class DatasetOrTimeSchedule(DatasetTriggeredSchedule): 

37 """Combine time-based scheduling with event-based scheduling.""" 

38 

39 def __init__( 

40 self, 

41 *, 

42 timetable: Timetable, 

43 datasets: Collection[Dataset] | BaseDataset, 

44 ) -> None: 

45 self.timetable = timetable 

46 if isinstance(datasets, BaseDataset): 

47 self.datasets = datasets 

48 else: 

49 self.datasets = DatasetAll(*datasets) 

50 

51 self.description = f"Triggered by datasets or {timetable.description}" 

52 self.periodic = timetable.periodic 

53 self._can_be_scheduled = timetable._can_be_scheduled 

54 self.active_runs_limit = timetable.active_runs_limit 

55 

56 @classmethod 

57 def deserialize(cls, data: dict[str, typing.Any]) -> Timetable: 

58 from airflow.serialization.serialized_objects import decode_timetable 

59 

60 return cls( 

61 timetable=decode_timetable(data["timetable"]), 

62 # don't need the datasets after deserialization 

63 # they are already stored on dataset_triggers attr on DAG 

64 # and this is what scheduler looks at 

65 datasets=[], 

66 ) 

67 

68 def serialize(self) -> dict[str, typing.Any]: 

69 from airflow.serialization.serialized_objects import encode_timetable 

70 

71 return {"timetable": encode_timetable(self.timetable)} 

72 

73 def validate(self) -> None: 

74 if isinstance(self.timetable, DatasetTriggeredSchedule): 

75 raise AirflowTimetableInvalid("cannot nest dataset timetables") 

76 if not isinstance(self.datasets, BaseDataset): 

77 raise AirflowTimetableInvalid("all elements in 'datasets' must be datasets") 

78 

79 @property 

80 def summary(self) -> str: 

81 return f"Dataset or {self.timetable.summary}" 

82 

83 def infer_manual_data_interval(self, *, run_after: pendulum.DateTime) -> DataInterval: 

84 return self.timetable.infer_manual_data_interval(run_after=run_after) 

85 

86 def next_dagrun_info( 

87 self, *, last_automated_data_interval: DataInterval | None, restriction: TimeRestriction 

88 ) -> DagRunInfo | None: 

89 return self.timetable.next_dagrun_info( 

90 last_automated_data_interval=last_automated_data_interval, 

91 restriction=restriction, 

92 ) 

93 

94 def generate_run_id(self, *, run_type: DagRunType, **kwargs: typing.Any) -> str: 

95 if run_type != DagRunType.DATASET_TRIGGERED: 

96 return self.timetable.generate_run_id(run_type=run_type, **kwargs) 

97 return super().generate_run_id(run_type=run_type, **kwargs)