1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
18
19import warnings
20from typing import TYPE_CHECKING
21
22from itsdangerous import URLSafeSerializer
23from pydantic import BaseModel, ValidationError
24from sqlalchemy import delete, select
25
26from airflow._shared.module_loading import import_string
27from airflow.configuration import conf
28from airflow.dag_processing.bundles.base import BaseDagBundle # noqa: TC001
29from airflow.exceptions import AirflowConfigException
30from airflow.models.dagbundle import DagBundleModel
31from airflow.models.team import Team
32from airflow.utils.log.logging_mixin import LoggingMixin
33from airflow.utils.session import NEW_SESSION, provide_session
34
35if TYPE_CHECKING:
36 from collections.abc import Iterable
37
38 from sqlalchemy.orm import Session
39
40_example_dag_bundle_name = "example_dags"
41
42
43class _ExternalBundleConfig(BaseModel):
44 """Schema defining the user-specified configuration for a DAG bundle."""
45
46 name: str
47 classpath: str
48 kwargs: dict
49 team_name: str | None = None
50
51
52class _InternalBundleConfig(BaseModel):
53 """
54 Schema used internally (in this file) to define the configuration for a DAG bundle.
55
56 Configuration defined by users when read must match ``_ExternalBundleConfig``.
57 This configuration is then parsed and converted to ``_InternalBundleConfig`` to be used across this file.
58 """
59
60 bundle_class: type[BaseDagBundle]
61 kwargs: dict
62 team_name: str | None = None
63
64
65def _bundle_item_exc(msg):
66 return AirflowConfigException(
67 "Invalid config for section `dag_processor` key `dag_bundle_config_list`. " + msg
68 )
69
70
71def _parse_bundle_config(config_list) -> list[_ExternalBundleConfig]:
72 bundles = {}
73 for item in config_list:
74 if not isinstance(item, dict):
75 raise _bundle_item_exc(f"Expected dict but got {item.__class__}")
76
77 try:
78 cfg = _ExternalBundleConfig(**item)
79 except ValidationError as e:
80 raise _bundle_item_exc(f"Item {item} failed validation: {e}")
81
82 if cfg.name == _example_dag_bundle_name:
83 raise AirflowConfigException(
84 f"Bundle name '{_example_dag_bundle_name}' is a reserved name. Please choose another name for your bundle."
85 " Example DAGs can be enabled with the '[core] load_examples' config."
86 )
87
88 bundles[cfg.name] = cfg
89 if len(bundles.keys()) != len(config_list):
90 raise _bundle_item_exc("One or more bundle names appeared multiple times")
91 return list(bundles.values())
92
93
94def _add_example_dag_bundle(bundle_config_list: list[_ExternalBundleConfig]):
95 from airflow import example_dags
96
97 example_dag_folder = next(iter(example_dags.__path__))
98 bundle_config_list.append(
99 _ExternalBundleConfig(
100 name=_example_dag_bundle_name,
101 classpath="airflow.dag_processing.bundles.local.LocalDagBundle",
102 kwargs={
103 "path": example_dag_folder,
104 },
105 )
106 )
107
108
109def _is_safe_bundle_url(url: str) -> bool:
110 """
111 Check if a bundle URL is safe to use.
112
113 This function validates that the URL:
114 - Uses HTTP or HTTPS schemes (no JavaScript, data, or other schemes)
115 - Is properly formatted
116 - Doesn't contain malicious content
117 """
118 import logging
119 from urllib.parse import urlparse
120
121 logger = logging.getLogger(__name__)
122
123 if not url:
124 return False
125
126 try:
127 parsed = urlparse(url)
128 if parsed.scheme not in {"http", "https"}:
129 logger.error(
130 "Bundle URL uses unsafe scheme '%s'. Only 'http' and 'https' are allowed", parsed.scheme
131 )
132 return False
133
134 if not parsed.netloc:
135 logger.error("Bundle URL '%s' has no network location", url)
136 return False
137
138 if any(ord(c) < 32 for c in url):
139 logger.error("Bundle URL '%s' contains control characters (ASCII < 32)", url)
140 return False
141
142 return True
143 except Exception as e:
144 logger.error("Failed to parse bundle URL '%s': %s", url, str(e))
145 return False
146
147
148def _sign_bundle_url(url: str, bundle_name: str) -> str:
149 """
150 Sign a bundle URL for integrity verification.
151
152 :param url: The URL to sign
153 :param bundle_name: The name of the bundle (used in the payload)
154 :return: The signed URL token
155 """
156 serializer = URLSafeSerializer(conf.get_mandatory_value("core", "fernet_key"))
157 payload = {
158 "url": url,
159 "bundle_name": bundle_name,
160 }
161 return serializer.dumps(payload)
162
163
164class DagBundlesManager(LoggingMixin):
165 """Manager for DAG bundles."""
166
167 def __init__(self, *args, **kwargs):
168 super().__init__(*args, **kwargs)
169 self._bundle_config: dict[str, _InternalBundleConfig] = {}
170 self.parse_config()
171
172 def parse_config(self) -> None:
173 """
174 Get all DAG bundle configurations and store in instance variable.
175
176 If a bundle class for a given name has already been imported, it will not be imported again.
177
178 :meta private:
179 """
180 if self._bundle_config:
181 return
182
183 config_list = conf.getjson("dag_processor", "dag_bundle_config_list")
184 if not config_list:
185 return
186 if not isinstance(config_list, list):
187 raise AirflowConfigException(
188 "Section `dag_processor` key `dag_bundle_config_list` "
189 f"must be list but got {config_list.__class__}"
190 )
191 bundle_config_list = _parse_bundle_config(config_list)
192 if conf.getboolean("core", "LOAD_EXAMPLES"):
193 _add_example_dag_bundle(bundle_config_list)
194
195 for bundle_config in bundle_config_list:
196 if bundle_config.team_name and not conf.getboolean("core", "multi_team"):
197 raise AirflowConfigException(
198 "Section `dag_processor` key `dag_bundle_config_list` "
199 "cannot have a team name when multi-team mode is disabled."
200 "To enable multi-team, you need to update section `core` key `multi_team` in your config."
201 )
202
203 class_ = import_string(bundle_config.classpath)
204 self._bundle_config[bundle_config.name] = _InternalBundleConfig(
205 bundle_class=class_,
206 kwargs=bundle_config.kwargs,
207 team_name=bundle_config.team_name,
208 )
209 self.log.info("DAG bundles loaded: %s", ", ".join(self._bundle_config.keys()))
210
211 @provide_session
212 def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
213 self.log.debug("Syncing DAG bundles to the database")
214
215 def _extract_and_sign_template(bundle_name: str) -> tuple[str | None, dict]:
216 bundle_instance = self.get_bundle(name)
217 new_template_ = bundle_instance.view_url_template()
218 new_params_ = self._extract_template_params(bundle_instance)
219 if new_template_:
220 if not _is_safe_bundle_url(new_template_):
221 self.log.warning(
222 "Bundle %s has unsafe URL template '%s', skipping URL update",
223 bundle_name,
224 new_template_,
225 )
226 new_template_ = None
227 else:
228 # Sign the URL for integrity verification
229 new_template_ = _sign_bundle_url(new_template_, bundle_name)
230 self.log.debug("Signed URL template for bundle %s", bundle_name)
231 return new_template_, new_params_
232
233 stored = {b.name: b for b in session.query(DagBundleModel).all()}
234 bundle_to_team = {
235 bundle.name: bundle.teams[0].name if len(bundle.teams) == 1 else None
236 for bundle in stored.values()
237 }
238
239 for name, config in self._bundle_config.items():
240 team: Team | None = None
241 if config.team_name:
242 team = session.scalars(select(Team).where(Team.name == config.team_name)).one_or_none()
243 if not team:
244 raise _bundle_item_exc(f"Team '{config.team_name}' does not exist")
245
246 try:
247 new_template, new_params = _extract_and_sign_template(name)
248 except Exception as e:
249 self.log.exception("Error creating bundle '%s': %s", name, e)
250 continue
251
252 if bundle := stored.pop(name, None):
253 bundle.active = True
254 if new_template != bundle.signed_url_template:
255 bundle.signed_url_template = new_template
256 self.log.debug("Updated URL template for bundle %s", name)
257 if new_params != bundle.template_params:
258 bundle.template_params = new_params
259 self.log.debug("Updated template parameters for bundle %s", name)
260 else:
261 bundle = DagBundleModel(name=name)
262 bundle.signed_url_template = new_template
263 bundle.template_params = new_params
264
265 session.add(bundle)
266 self.log.info("Added new DAG bundle %s to the database", name)
267
268 if team and bundle_to_team.get(name) != config.team_name:
269 # Change of team. It can be associating a team to a dag bundle that did not have one or
270 # swapping a team for another
271 bundle.teams = [team]
272 if bundle_to_team.get(name):
273 self.log.warning(
274 "Changing ownership of team '%s' from Dag bundle '%s' to '%s'",
275 bundle_to_team[name],
276 name,
277 team.name,
278 )
279 elif not team and name in bundle_to_team:
280 # Remove team association
281 self.log.warning(
282 "Removing ownership of team '%s' from Dag bundle '%s'", bundle_to_team[name], name
283 )
284 bundle.teams = []
285
286 # Import here to avoid circular import
287 from airflow.models.errors import ParseImportError
288
289 for name, bundle in stored.items():
290 bundle.active = False
291 bundle.teams = []
292 self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name)
293 session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == name))
294 self.log.info("Deleted import errors for bundle %s which is no longer configured", name)
295
296 @staticmethod
297 def _extract_template_params(bundle_instance: BaseDagBundle) -> dict:
298 """
299 Extract template parameters from a bundle instance's view_url_template method.
300
301 :param bundle_instance: The bundle instance to extract parameters from
302 :return: Dictionary of template parameters
303 """
304 import re
305
306 params: dict[str, str] = {}
307 template = bundle_instance.view_url_template()
308
309 if not template:
310 return params
311
312 # Extract template placeholders using regex
313 # This matches {placeholder} patterns in the template
314 PLACEHOLDER_PATTERN = re.compile(r"\{([^}]+)\}")
315 placeholders = PLACEHOLDER_PATTERN.findall(template)
316
317 # Extract values for each placeholder found in the template
318 for placeholder in placeholders:
319 field_value = getattr(bundle_instance, placeholder, None)
320 if field_value:
321 params[placeholder] = field_value
322
323 return params
324
325 def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle:
326 """
327 Get a DAG bundle by name.
328
329 :param name: The name of the DAG bundle.
330 :param version: The version of the DAG bundle you need (optional). If not provided, ``tracking_ref`` will be used instead.
331
332 :return: The DAG bundle.
333 """
334 cfg_bundle = self._bundle_config.get(name)
335 if not cfg_bundle:
336 raise ValueError(f"Requested bundle '{name}' is not configured.")
337 return cfg_bundle.bundle_class(name=name, version=version, **cfg_bundle.kwargs)
338
339 def get_all_dag_bundles(self) -> Iterable[BaseDagBundle]:
340 """
341 Get all DAG bundles.
342
343 :return: list of DAG bundles.
344 """
345 for name, cfg in self._bundle_config.items():
346 try:
347 yield cfg.bundle_class(name=name, version=None, **cfg.kwargs)
348 except Exception as e:
349 self.log.exception("Error creating bundle '%s': %s", name, e)
350 # Skip this bundle and continue with others
351 continue
352
353 def get_all_bundle_names(self) -> Iterable[str]:
354 """
355 Get all bundle names.
356
357 :return: sorted list of bundle names.
358 """
359 return sorted(self._bundle_config.keys())
360
361 def view_url(self, name: str, version: str | None = None) -> str | None:
362 warnings.warn(
363 "The 'view_url' method is deprecated and will be removed when providers "
364 "have Airflow 3.1 as the minimum supported version. "
365 "Use DagBundleModel.render_url() instead.",
366 DeprecationWarning,
367 stacklevel=2,
368 )
369 bundle = self.get_bundle(name, version)
370 return bundle.view_url(version=version)