1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""SDK wrapper for plugins manager."""
19
20from __future__ import annotations
21
22import logging
23from functools import cache
24from typing import TYPE_CHECKING
25
26from airflow import settings
27from airflow.providers_manager import ProvidersManager
28from airflow.sdk._shared.module_loading import import_string
29from airflow.sdk._shared.plugins_manager import (
30 AirflowPlugin,
31 _load_entrypoint_plugins,
32 _load_plugins_from_plugin_directory,
33 integrate_listener_plugins as _integrate_listener_plugins,
34 integrate_macros_plugins as _integrate_macros_plugins,
35 is_valid_plugin,
36)
37from airflow.sdk.configuration import conf
38from airflow.sdk.observability.stats import Stats
39
40if TYPE_CHECKING:
41 from airflow.listeners.listener import ListenerManager
42
43log = logging.getLogger(__name__)
44
45
46def _load_providers_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
47 """Load plugins from providers."""
48 log.debug("Loading plugins from providers")
49 providers_manager = ProvidersManager()
50 providers_manager.initialize_providers_plugins()
51
52 plugins: list[AirflowPlugin] = []
53 import_errors: dict[str, str] = {}
54 for plugin in providers_manager.plugins:
55 log.debug("Importing plugin %s from class %s", plugin.name, plugin.plugin_class)
56
57 try:
58 plugin_instance = import_string(plugin.plugin_class)
59 if is_valid_plugin(plugin_instance):
60 plugins.append(plugin_instance)
61 else:
62 log.warning("Plugin %s is not a valid plugin", plugin.name)
63 except ImportError:
64 log.exception("Failed to load plugin %s from class name %s", plugin.name, plugin.plugin_class)
65 return plugins, import_errors
66
67
68@cache
69def _get_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
70 """
71 Load plugins from plugins directory and entrypoints.
72
73 Plugins are only loaded if they have not been previously loaded.
74 """
75 if not settings.PLUGINS_FOLDER:
76 raise ValueError("Plugins folder is not set")
77
78 log.debug("Loading plugins")
79
80 plugins: list[AirflowPlugin] = []
81 import_errors: dict[str, str] = {}
82 loaded_plugins: set[str | None] = set()
83
84 def __register_plugins(plugin_instances: list[AirflowPlugin], errors: dict[str, str]) -> None:
85 for plugin_instance in plugin_instances:
86 if plugin_instance.name in loaded_plugins:
87 return
88
89 loaded_plugins.add(plugin_instance.name)
90 try:
91 plugin_instance.on_load()
92 plugins.append(plugin_instance)
93 except Exception as e:
94 log.exception("Failed to load plugin %s", plugin_instance.name)
95 name = str(plugin_instance.source) if plugin_instance.source else plugin_instance.name or ""
96 import_errors[name] = str(e)
97 import_errors.update(errors)
98
99 with Stats.timer() as timer:
100 load_examples = conf.getboolean("core", "LOAD_EXAMPLES")
101 __register_plugins(
102 *_load_plugins_from_plugin_directory(
103 plugins_folder=settings.PLUGINS_FOLDER,
104 load_examples=load_examples,
105 example_plugins_module="airflow.example_dags.plugins" if load_examples else None,
106 )
107 )
108 __register_plugins(*_load_entrypoint_plugins())
109
110 if not settings.LAZY_LOAD_PROVIDERS:
111 __register_plugins(*_load_providers_plugins())
112
113 log.debug("Loading %d plugin(s) took %.2f seconds", len(plugins), timer.duration)
114 return plugins, import_errors
115
116
117@cache
118def integrate_macros_plugins() -> None:
119 """Integrates macro plugins."""
120 from airflow.sdk.execution_time import macros
121
122 plugins, _ = _get_plugins()
123 _integrate_macros_plugins(
124 target_macros_module=macros,
125 macros_module_name_prefix="airflow.sdk.execution_time.macros",
126 plugins=plugins,
127 )
128
129
130def integrate_listener_plugins(listener_manager: ListenerManager) -> None:
131 """Add listeners from plugins."""
132 plugins, _ = _get_plugins()
133 _integrate_listener_plugins(listener_manager, plugins=plugins)