Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/serialization/pydantic/dag.py: 69%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

80 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import pathlib 

20from datetime import datetime, timedelta 

21from typing import Any, List, Optional 

22 

23from dateutil import relativedelta 

24from typing_extensions import Annotated 

25 

26from airflow import DAG, settings 

27from airflow.configuration import conf as airflow_conf 

28from airflow.utils.pydantic import ( 

29 BaseModel as BaseModelPydantic, 

30 ConfigDict, 

31 PlainSerializer, 

32 PlainValidator, 

33 ValidationInfo, 

34) 

35from airflow.utils.sqlalchemy import Interval 

36 

37 

38def serialize_interval(value: Interval) -> Interval: 

39 interval = Interval() 

40 return interval.process_bind_param(value, None) 

41 

42 

43def validate_interval(value: Interval | Any, _info: ValidationInfo) -> Any: 

44 if ( 

45 isinstance(value, Interval) 

46 or isinstance(value, timedelta) 

47 or isinstance(value, relativedelta.relativedelta) 

48 ): 

49 return value 

50 interval = Interval() 

51 try: 

52 return interval.process_result_value(value, None) 

53 except ValueError as e: 

54 # Interval may be provided in string format (cron), 

55 # so it must be returned as valid value. 

56 if isinstance(value, str): 

57 return value 

58 raise e 

59 

60 

61PydanticInterval = Annotated[ 

62 Interval, 

63 PlainValidator(validate_interval), 

64 PlainSerializer(serialize_interval, return_type=Interval), 

65] 

66 

67 

68def serialize_operator(x: DAG) -> dict: 

69 from airflow.serialization.serialized_objects import SerializedDAG 

70 

71 return SerializedDAG.serialize_dag(x) 

72 

73 

74def validate_operator(x: DAG | dict[str, Any], _info: ValidationInfo) -> Any: 

75 from airflow.serialization.serialized_objects import SerializedDAG 

76 

77 if isinstance(x, DAG): 

78 return x 

79 return SerializedDAG.deserialize_dag(x) 

80 

81 

82PydanticDag = Annotated[ 

83 DAG, 

84 PlainValidator(validate_operator), 

85 PlainSerializer(serialize_operator, return_type=dict), 

86] 

87 

88 

89class DagOwnerAttributesPydantic(BaseModelPydantic): 

90 """Serializable representation of the DagOwnerAttributes ORM SqlAlchemyModel used by internal API.""" 

91 

92 owner: str 

93 link: str 

94 

95 model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True) 

96 

97 

98class DagTagPydantic(BaseModelPydantic): 

99 """Serializable representation of the DagTag ORM SqlAlchemyModel used by internal API.""" 

100 

101 name: str 

102 dag_id: str 

103 

104 model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True) 

105 

106 

107class DagModelPydantic(BaseModelPydantic): 

108 """Serializable representation of the DagModel ORM SqlAlchemyModel used by internal API.""" 

109 

110 dag_id: str 

111 root_dag_id: Optional[str] 

112 is_paused_at_creation: bool = airflow_conf.getboolean("core", "dags_are_paused_at_creation") 

113 is_paused: bool = is_paused_at_creation 

114 is_subdag: Optional[bool] = False 

115 is_active: Optional[bool] = False 

116 last_parsed_time: Optional[datetime] 

117 last_pickled: Optional[datetime] 

118 last_expired: Optional[datetime] 

119 scheduler_lock: Optional[bool] 

120 pickle_id: Optional[int] 

121 fileloc: str 

122 processor_subdir: Optional[str] 

123 owners: Optional[str] 

124 description: Optional[str] 

125 default_view: Optional[str] 

126 schedule_interval: Optional[PydanticInterval] 

127 timetable_description: Optional[str] 

128 tags: List[DagTagPydantic] # noqa: UP006 

129 dag_owner_links: List[DagOwnerAttributesPydantic] # noqa: UP006 

130 parent_dag: Optional[PydanticDag] 

131 

132 max_active_tasks: int 

133 max_active_runs: Optional[int] 

134 max_consecutive_failed_dag_runs: Optional[int] 

135 

136 has_task_concurrency_limits: bool 

137 has_import_errors: Optional[bool] = False 

138 

139 _processor_dags_folder: Optional[str] = None 

140 

141 model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True) 

142 

143 @property 

144 def relative_fileloc(self) -> pathlib.Path: 

145 """File location of the importable dag 'file' relative to the configured DAGs folder.""" 

146 path = pathlib.Path(self.fileloc) 

147 try: 

148 rel_path = path.relative_to(self._processor_dags_folder or settings.DAGS_FOLDER) 

149 if rel_path == pathlib.Path("."): 

150 return path 

151 else: 

152 return rel_path 

153 except ValueError: 

154 # Not relative to DAGS_FOLDER. 

155 return path