1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
18
19import pathlib
20from datetime import datetime, timedelta
21from typing import Any, List, Optional
22
23from dateutil import relativedelta
24from typing_extensions import Annotated
25
26from airflow import DAG, settings
27from airflow.configuration import conf as airflow_conf
28from airflow.utils.pydantic import (
29 BaseModel as BaseModelPydantic,
30 ConfigDict,
31 PlainSerializer,
32 PlainValidator,
33 ValidationInfo,
34)
35from airflow.utils.sqlalchemy import Interval
36
37
38def serialize_interval(value: Interval) -> Interval:
39 interval = Interval()
40 return interval.process_bind_param(value, None)
41
42
43def validate_interval(value: Interval | Any, _info: ValidationInfo) -> Any:
44 if (
45 isinstance(value, Interval)
46 or isinstance(value, timedelta)
47 or isinstance(value, relativedelta.relativedelta)
48 ):
49 return value
50 interval = Interval()
51 try:
52 return interval.process_result_value(value, None)
53 except ValueError as e:
54 # Interval may be provided in string format (cron),
55 # so it must be returned as valid value.
56 if isinstance(value, str):
57 return value
58 raise e
59
60
61PydanticInterval = Annotated[
62 Interval,
63 PlainValidator(validate_interval),
64 PlainSerializer(serialize_interval, return_type=Interval),
65]
66
67
68def serialize_operator(x: DAG) -> dict:
69 from airflow.serialization.serialized_objects import SerializedDAG
70
71 return SerializedDAG.serialize_dag(x)
72
73
74def validate_operator(x: DAG | dict[str, Any], _info: ValidationInfo) -> Any:
75 from airflow.serialization.serialized_objects import SerializedDAG
76
77 if isinstance(x, DAG):
78 return x
79 return SerializedDAG.deserialize_dag(x)
80
81
82PydanticDag = Annotated[
83 DAG,
84 PlainValidator(validate_operator),
85 PlainSerializer(serialize_operator, return_type=dict),
86]
87
88
89class DagOwnerAttributesPydantic(BaseModelPydantic):
90 """Serializable representation of the DagOwnerAttributes ORM SqlAlchemyModel used by internal API."""
91
92 owner: str
93 link: str
94
95 model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)
96
97
98class DagTagPydantic(BaseModelPydantic):
99 """Serializable representation of the DagTag ORM SqlAlchemyModel used by internal API."""
100
101 name: str
102 dag_id: str
103
104 model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)
105
106
107class DagModelPydantic(BaseModelPydantic):
108 """Serializable representation of the DagModel ORM SqlAlchemyModel used by internal API."""
109
110 dag_id: str
111 root_dag_id: Optional[str]
112 is_paused_at_creation: bool = airflow_conf.getboolean("core", "dags_are_paused_at_creation")
113 is_paused: bool = is_paused_at_creation
114 is_subdag: Optional[bool] = False
115 is_active: Optional[bool] = False
116 last_parsed_time: Optional[datetime]
117 last_pickled: Optional[datetime]
118 last_expired: Optional[datetime]
119 scheduler_lock: Optional[bool]
120 pickle_id: Optional[int]
121 fileloc: str
122 processor_subdir: Optional[str]
123 owners: Optional[str]
124 description: Optional[str]
125 default_view: Optional[str]
126 schedule_interval: Optional[PydanticInterval]
127 timetable_description: Optional[str]
128 tags: List[DagTagPydantic] # noqa: UP006
129 dag_owner_links: List[DagOwnerAttributesPydantic] # noqa: UP006
130 parent_dag: Optional[PydanticDag]
131
132 max_active_tasks: int
133 max_active_runs: Optional[int]
134 max_consecutive_failed_dag_runs: Optional[int]
135
136 has_task_concurrency_limits: bool
137 has_import_errors: Optional[bool] = False
138
139 _processor_dags_folder: Optional[str] = None
140
141 model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)
142
143 @property
144 def relative_fileloc(self) -> pathlib.Path:
145 """File location of the importable dag 'file' relative to the configured DAGs folder."""
146 path = pathlib.Path(self.fileloc)
147 try:
148 rel_path = path.relative_to(self._processor_dags_folder or settings.DAGS_FOLDER)
149 if rel_path == pathlib.Path("."):
150 return path
151 else:
152 return rel_path
153 except ValueError:
154 # Not relative to DAGS_FOLDER.
155 return path