1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Manages all plugins."""
19
20from __future__ import annotations
21
22import importlib.machinery
23import importlib.util
24import inspect
25import logging
26import os
27import sys
28import types
29from collections.abc import Iterable
30from functools import cache
31from pathlib import Path
32from typing import TYPE_CHECKING, Any
33
34from airflow import settings
35from airflow._shared.module_loading import import_string, qualname
36from airflow.configuration import conf
37from airflow.task.priority_strategy import (
38 PriorityWeightStrategy,
39 airflow_priority_weight_strategies,
40)
41from airflow.utils.entry_points import entry_points_with_dist
42from airflow.utils.file import find_path_from_directory
43
44if TYPE_CHECKING:
45 from airflow.lineage.hook import HookLineageReader
46
47 if sys.version_info >= (3, 12):
48 from importlib import metadata
49 else:
50 import importlib_metadata as metadata
51 from collections.abc import Generator
52 from types import ModuleType
53
54 from airflow.listeners.listener import ListenerManager
55 from airflow.timetables.base import Timetable
56
57log = logging.getLogger(__name__)
58
59
60class AirflowPluginSource:
61 """Class used to define an AirflowPluginSource."""
62
63 def __str__(self):
64 raise NotImplementedError
65
66 def __html__(self):
67 raise NotImplementedError
68
69
70class PluginsDirectorySource(AirflowPluginSource):
71 """Class used to define Plugins loaded from Plugins Directory."""
72
73 def __init__(self, path):
74 self.path = os.path.relpath(path, settings.PLUGINS_FOLDER)
75
76 def __str__(self):
77 return f"$PLUGINS_FOLDER/{self.path}"
78
79 def __html__(self):
80 return f"<em>$PLUGINS_FOLDER/</em>{self.path}"
81
82
83class EntryPointSource(AirflowPluginSource):
84 """Class used to define Plugins loaded from entrypoint."""
85
86 def __init__(self, entrypoint: metadata.EntryPoint, dist: metadata.Distribution):
87 self.dist = dist.metadata["Name"] # type: ignore[index]
88 self.version = dist.version
89 self.entrypoint = str(entrypoint)
90
91 def __str__(self):
92 return f"{self.dist}=={self.version}: {self.entrypoint}"
93
94 def __html__(self):
95 return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}"
96
97
98class AirflowPluginException(Exception):
99 """Exception when loading plugin."""
100
101
102class AirflowPlugin:
103 """Class used to define AirflowPlugin."""
104
105 name: str | None = None
106 source: AirflowPluginSource | None = None
107 macros: list[Any] = []
108 admin_views: list[Any] = []
109 flask_blueprints: list[Any] = []
110 fastapi_apps: list[Any] = []
111 fastapi_root_middlewares: list[Any] = []
112 external_views: list[Any] = []
113 react_apps: list[Any] = []
114 menu_links: list[Any] = []
115 appbuilder_views: list[Any] = []
116 appbuilder_menu_items: list[Any] = []
117
118 # A list of global operator extra links that can redirect users to
119 # external systems. These extra links will be available on the
120 # task page in the form of buttons.
121 #
122 # Note: the global operator extra link can be overridden at each
123 # operator level.
124 global_operator_extra_links: list[Any] = []
125
126 # A list of operator extra links to override or add operator links
127 # to existing Airflow Operators.
128 # These extra links will be available on the task page in form of
129 # buttons.
130 operator_extra_links: list[Any] = []
131
132 # A list of timetable classes that can be used for DAG scheduling.
133 timetables: list[type[Timetable]] = []
134
135 # A list of listeners that can be used for tracking task and DAG states.
136 listeners: list[ModuleType | object] = []
137
138 # A list of hook lineage reader classes that can be used for reading lineage information from a hook.
139 hook_lineage_readers: list[type[HookLineageReader]] = []
140
141 # A list of priority weight strategy classes that can be used for calculating tasks weight priority.
142 priority_weight_strategies: list[type[PriorityWeightStrategy]] = []
143
144 @classmethod
145 def validate(cls):
146 """Validate if plugin has a name."""
147 if not cls.name:
148 raise AirflowPluginException("Your plugin needs a name.")
149
150 @classmethod
151 def on_load(cls, *args, **kwargs):
152 """
153 Execute when the plugin is loaded; This method is only called once during runtime.
154
155 :param args: If future arguments are passed in on call.
156 :param kwargs: If future arguments are passed in on call.
157 """
158
159
160def is_valid_plugin(plugin_obj) -> bool:
161 """
162 Check whether a potential object is a subclass of the AirflowPlugin class.
163
164 :param plugin_obj: potential subclass of AirflowPlugin
165 :return: Whether or not the obj is a valid subclass of
166 AirflowPlugin
167 """
168 if (
169 inspect.isclass(plugin_obj)
170 and issubclass(plugin_obj, AirflowPlugin)
171 and (plugin_obj is not AirflowPlugin)
172 ):
173 plugin_obj.validate()
174 return True
175 return False
176
177
178def _load_entrypoint_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
179 """
180 Load and register plugins AirflowPlugin subclasses from the entrypoints.
181
182 The entry_point group should be 'airflow.plugins'.
183 """
184 log.debug("Loading plugins from entrypoints")
185
186 plugins: list[AirflowPlugin] = []
187 import_errors: dict[str, str] = {}
188 for entry_point, dist in entry_points_with_dist("airflow.plugins"):
189 log.debug("Importing entry_point plugin %s", entry_point.name)
190 try:
191 plugin_class = entry_point.load()
192 if not is_valid_plugin(plugin_class):
193 continue
194
195 plugin_instance: AirflowPlugin = plugin_class()
196 plugin_instance.source = EntryPointSource(entry_point, dist)
197 plugins.append(plugin_instance)
198 except Exception as e:
199 log.exception("Failed to import plugin %s", entry_point.name)
200 import_errors[entry_point.module] = str(e)
201 return plugins, import_errors
202
203
204def _load_plugins_from_plugin_directory() -> tuple[list[AirflowPlugin], dict[str, str]]:
205 """Load and register Airflow Plugins from plugins directory."""
206 if settings.PLUGINS_FOLDER is None:
207 raise ValueError("Plugins folder is not set")
208 log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
209 files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore")
210 plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)]
211
212 if conf.getboolean("core", "LOAD_EXAMPLES"):
213 log.debug("Note: Loading plugins from examples as well: %s", settings.PLUGINS_FOLDER)
214 from airflow.example_dags import plugins as example_plugins
215
216 example_plugins_folder = next(iter(example_plugins.__path__))
217 example_files = find_path_from_directory(example_plugins_folder, ".airflowignore")
218 plugin_search_locations.append((example_plugins.__name__, example_files))
219
220 plugins: list[AirflowPlugin] = []
221 import_errors: dict[str, str] = {}
222 for module_prefix, plugin_files in plugin_search_locations:
223 for file_path in plugin_files:
224 path = Path(file_path)
225 if not path.is_file() or path.suffix != ".py":
226 continue
227 mod_name = f"{module_prefix}.{path.stem}" if module_prefix else path.stem
228
229 try:
230 loader = importlib.machinery.SourceFileLoader(mod_name, file_path)
231 spec = importlib.util.spec_from_loader(mod_name, loader)
232 if not spec:
233 log.error("Could not load spec for module %s at %s", mod_name, file_path)
234 continue
235 mod = importlib.util.module_from_spec(spec)
236 sys.modules[spec.name] = mod
237 loader.exec_module(mod)
238
239 for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)):
240 plugin_instance: AirflowPlugin = mod_attr_value()
241 plugin_instance.source = PluginsDirectorySource(file_path)
242 plugins.append(plugin_instance)
243 except Exception as e:
244 log.exception("Failed to import plugin %s", file_path)
245 import_errors[file_path] = str(e)
246 return plugins, import_errors
247
248
249def _load_providers_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
250 from airflow.providers_manager import ProvidersManager
251
252 log.debug("Loading plugins from providers")
253 providers_manager = ProvidersManager()
254 providers_manager.initialize_providers_plugins()
255
256 plugins: list[AirflowPlugin] = []
257 import_errors: dict[str, str] = {}
258 for plugin in providers_manager.plugins:
259 log.debug("Importing plugin %s from class %s", plugin.name, plugin.plugin_class)
260
261 try:
262 plugin_instance = import_string(plugin.plugin_class)
263 if is_valid_plugin(plugin_instance):
264 plugins.append(plugin_instance)
265 else:
266 log.warning("Plugin %s is not a valid plugin", plugin.name)
267 except ImportError:
268 log.exception("Failed to load plugin %s from class name %s", plugin.name, plugin.plugin_class)
269 return plugins, import_errors
270
271
272def make_module(name: str, objects: list[Any]) -> ModuleType | None:
273 """Create new module."""
274 if not objects:
275 return None
276 log.debug("Creating module %s", name)
277 name = name.lower()
278 module = types.ModuleType(name)
279 module._name = name.split(".")[-1] # type: ignore
280 module._objects = objects # type: ignore
281 module.__dict__.update((o.__name__, o) for o in objects)
282 return module
283
284
285def ensure_plugins_loaded() -> None:
286 """
287 Load plugins from plugins directory and entrypoints.
288
289 Plugins are only loaded if they have not been previously loaded.
290 """
291 _get_plugins()
292
293
294@cache
295def _get_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
296 """
297 Load plugins from plugins directory and entrypoints.
298
299 Plugins are only loaded if they have not been previously loaded.
300 """
301 from airflow.observability.stats import Stats
302
303 if not settings.PLUGINS_FOLDER:
304 raise ValueError("Plugins folder is not set")
305
306 log.debug("Loading plugins")
307
308 plugins: list[AirflowPlugin] = []
309 import_errors: dict[str, str] = {}
310 loaded_plugins: set[str | None] = set()
311
312 def __register_plugins(plugin_instances: list[AirflowPlugin], errors: dict[str, str]) -> None:
313 for plugin_instance in plugin_instances:
314 if plugin_instance.name in loaded_plugins:
315 return
316
317 loaded_plugins.add(plugin_instance.name)
318 try:
319 plugin_instance.on_load()
320 plugins.append(plugin_instance)
321 except Exception as e:
322 log.exception("Failed to load plugin %s", plugin_instance.name)
323 name = str(plugin_instance.source) if plugin_instance.source else plugin_instance.name or ""
324 import_errors[name] = str(e)
325 import_errors.update(errors)
326
327 with Stats.timer() as timer:
328 __register_plugins(*_load_plugins_from_plugin_directory())
329 __register_plugins(*_load_entrypoint_plugins())
330
331 if not settings.LAZY_LOAD_PROVIDERS:
332 __register_plugins(*_load_providers_plugins())
333
334 log.debug("Loading %d plugin(s) took %.2f seconds", len(plugins), timer.duration)
335 return plugins, import_errors
336
337
338@cache
339def _get_ui_plugins() -> tuple[list[Any], list[Any]]:
340 """Collect extension points for the UI."""
341 log.debug("Initialize UI plugin")
342
343 seen_url_routes: dict[str, str | None] = {}
344
345 external_views: list[Any] = []
346 react_apps: list[Any] = []
347 for plugin in _get_plugins()[0]:
348 external_views_to_remove = []
349 react_apps_to_remove = []
350 for external_view in plugin.external_views:
351 if not isinstance(external_view, dict):
352 log.warning(
353 "Plugin '%s' has an external view that is not a dictionary. The view will not be loaded.",
354 plugin.name,
355 )
356 external_views_to_remove.append(external_view)
357 continue
358 url_route = external_view.get("url_route")
359 if url_route is None:
360 continue
361 if url_route in seen_url_routes:
362 log.warning(
363 "Plugin '%s' has an external view with an URL route '%s' "
364 "that conflicts with another plugin '%s'. The view will not be loaded.",
365 plugin.name,
366 url_route,
367 seen_url_routes[url_route],
368 )
369 external_views_to_remove.append(external_view)
370 continue
371 external_views.append(external_view)
372 seen_url_routes[url_route] = plugin.name
373
374 for react_app in plugin.react_apps:
375 if not isinstance(react_app, dict):
376 log.warning(
377 "Plugin '%s' has a React App that is not a dictionary. The React App will not be loaded.",
378 plugin.name,
379 )
380 react_apps_to_remove.append(react_app)
381 continue
382 url_route = react_app.get("url_route")
383 if url_route is None:
384 continue
385 if url_route in seen_url_routes:
386 log.warning(
387 "Plugin '%s' has a React App with an URL route '%s' "
388 "that conflicts with another plugin '%s'. The React App will not be loaded.",
389 plugin.name,
390 url_route,
391 seen_url_routes[url_route],
392 )
393 react_apps_to_remove.append(react_app)
394 continue
395 react_apps.append(react_app)
396 seen_url_routes[url_route] = plugin.name
397
398 for item in external_views_to_remove:
399 plugin.external_views.remove(item)
400 for item in react_apps_to_remove:
401 plugin.react_apps.remove(item)
402 return external_views, react_apps
403
404
405@cache
406def get_flask_plugins() -> tuple[list[Any], list[Any], list[Any]]:
407 """Collect and get flask extension points for WEB UI (legacy)."""
408 log.debug("Initialize legacy Web UI plugin")
409
410 flask_appbuilder_views: list[Any] = []
411 flask_appbuilder_menu_links: list[Any] = []
412 flask_blueprints: list[Any] = []
413 for plugin in _get_plugins()[0]:
414 flask_appbuilder_views.extend(plugin.appbuilder_views)
415 flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items)
416 flask_blueprints.extend([{"name": plugin.name, "blueprint": bp} for bp in plugin.flask_blueprints])
417
418 if (plugin.admin_views and not plugin.appbuilder_views) or (
419 plugin.menu_links and not plugin.appbuilder_menu_items
420 ):
421 log.warning(
422 "Plugin '%s' may not be compatible with the current Airflow version. "
423 "Please contact the author of the plugin.",
424 plugin.name,
425 )
426 return flask_blueprints, flask_appbuilder_views, flask_appbuilder_menu_links
427
428
429@cache
430def get_fastapi_plugins() -> tuple[list[Any], list[Any]]:
431 """Collect extension points for the API."""
432 log.debug("Initialize FastAPI plugins")
433
434 fastapi_apps: list[Any] = []
435 fastapi_root_middlewares: list[Any] = []
436 for plugin in _get_plugins()[0]:
437 fastapi_apps.extend(plugin.fastapi_apps)
438 fastapi_root_middlewares.extend(plugin.fastapi_root_middlewares)
439 return fastapi_apps, fastapi_root_middlewares
440
441
442@cache
443def _get_extra_operators_links_plugins() -> tuple[list[Any], list[Any]]:
444 """Create and get modules for loaded extension from extra operators links plugins."""
445 log.debug("Initialize extra operators links plugins")
446
447 global_operator_extra_links: list[Any] = []
448 operator_extra_links: list[Any] = []
449 for plugin in _get_plugins()[0]:
450 global_operator_extra_links.extend(plugin.global_operator_extra_links)
451 operator_extra_links.extend(list(plugin.operator_extra_links))
452 return global_operator_extra_links, operator_extra_links
453
454
455def get_global_operator_extra_links() -> list[Any]:
456 """Get global operator extra links registered by plugins."""
457 return _get_extra_operators_links_plugins()[0]
458
459
460def get_operator_extra_links() -> list[Any]:
461 """Get operator extra links registered by plugins."""
462 return _get_extra_operators_links_plugins()[1]
463
464
465@cache
466def get_timetables_plugins() -> dict[str, type[Timetable]]:
467 """Collect and get timetable classes registered by plugins."""
468 log.debug("Initialize extra timetables plugins")
469
470 return {
471 qualname(timetable_class): timetable_class
472 for plugin in _get_plugins()[0]
473 for timetable_class in plugin.timetables
474 }
475
476
477@cache
478def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]:
479 """Collect and get hook lineage reader classes registered by plugins."""
480 log.debug("Initialize hook lineage readers plugins")
481 result: list[type[HookLineageReader]] = []
482
483 for plugin in _get_plugins()[0]:
484 result.extend(plugin.hook_lineage_readers)
485 return result
486
487
488@cache
489def integrate_macros_plugins() -> None:
490 """Integrates macro plugins."""
491 from airflow.sdk.execution_time import macros
492
493 log.debug("Integrate Macros plugins")
494
495 for plugin in _get_plugins()[0]:
496 if plugin.name is None:
497 raise AirflowPluginException("Invalid plugin name")
498
499 macros_module = make_module(f"airflow.sdk.execution_time.macros.{plugin.name}", plugin.macros)
500
501 if macros_module:
502 sys.modules[macros_module.__name__] = macros_module
503 # Register the newly created module on airflow.macros such that it
504 # can be accessed when rendering templates.
505 setattr(macros, plugin.name, macros_module)
506
507
508def integrate_listener_plugins(listener_manager: ListenerManager) -> None:
509 """Add listeners from plugins."""
510 for plugin in _get_plugins()[0]:
511 if plugin.name is None:
512 raise AirflowPluginException("Invalid plugin name")
513
514 for listener in plugin.listeners:
515 listener_manager.add_listener(listener)
516
517
518def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str, Any]]:
519 """
520 Dump plugins attributes.
521
522 :param attrs_to_dump: A list of plugin attributes to dump
523 """
524 get_flask_plugins()
525 get_fastapi_plugins()
526 get_global_operator_extra_links()
527 get_operator_extra_links()
528 _get_ui_plugins()
529 if not attrs_to_dump:
530 attrs_to_dump = {
531 "macros",
532 "admin_views",
533 "flask_blueprints",
534 "fastapi_apps",
535 "fastapi_root_middlewares",
536 "external_views",
537 "react_apps",
538 "menu_links",
539 "appbuilder_views",
540 "appbuilder_menu_items",
541 "global_operator_extra_links",
542 "operator_extra_links",
543 "source",
544 "timetables",
545 "listeners",
546 "priority_weight_strategies",
547 }
548 plugins_info = []
549 for plugin in _get_plugins()[0]:
550 info: dict[str, Any] = {"name": plugin.name}
551 for attr in attrs_to_dump:
552 if attr in ("global_operator_extra_links", "operator_extra_links"):
553 info[attr] = [f"<{qualname(d.__class__)} object>" for d in getattr(plugin, attr)]
554 elif attr in ("macros", "timetables", "priority_weight_strategies"):
555 info[attr] = [qualname(d) for d in getattr(plugin, attr)]
556 elif attr == "listeners":
557 # listeners may be modules or class instances
558 info[attr] = [d.__name__ if inspect.ismodule(d) else qualname(d) for d in plugin.listeners]
559 elif attr == "appbuilder_views":
560 info[attr] = [
561 {**d, "view": qualname(d["view"].__class__) if "view" in d else None}
562 for d in plugin.appbuilder_views
563 ]
564 elif attr == "flask_blueprints":
565 info[attr] = [
566 f"<{qualname(d.__class__)}: name={d.name!r} import_name={d.import_name!r}>"
567 for d in plugin.flask_blueprints
568 ]
569 elif attr == "fastapi_apps":
570 info[attr] = [
571 {**d, "app": qualname(d["app"].__class__) if "app" in d else None}
572 for d in plugin.fastapi_apps
573 ]
574 elif attr == "fastapi_root_middlewares":
575 # remove args and kwargs from plugin info to hide potentially sensitive info.
576 info[attr] = [
577 {
578 k: (v if k != "middleware" else qualname(middleware_dict["middleware"]))
579 for k, v in middleware_dict.items()
580 if k not in ("args", "kwargs")
581 }
582 for middleware_dict in plugin.fastapi_root_middlewares
583 ]
584 else:
585 info[attr] = getattr(plugin, attr)
586 plugins_info.append(info)
587 return plugins_info
588
589
590@cache
591def get_priority_weight_strategy_plugins() -> dict[str, type[PriorityWeightStrategy]]:
592 """Collect and get priority weight strategy classes registered by plugins."""
593 log.debug("Initialize extra priority weight strategy plugins")
594
595 plugins_priority_weight_strategy_classes = {
596 qualname(priority_weight_strategy_class): priority_weight_strategy_class
597 for plugin in _get_plugins()[0]
598 for priority_weight_strategy_class in plugin.priority_weight_strategies
599 }
600 return {
601 **airflow_priority_weight_strategies,
602 **plugins_priority_weight_strategy_classes,
603 }
604
605
606def get_import_errors() -> dict[str, str]:
607 """Get import errors encountered during plugin loading."""
608 return _get_plugins()[1]