1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Manages all plugins."""
19
20from __future__ import annotations
21
22import importlib
23import importlib.machinery
24import importlib.util
25import inspect
26import logging
27import os
28import sys
29import types
30from pathlib import Path
31from typing import TYPE_CHECKING, Any
32
33if TYPE_CHECKING:
34 if sys.version_info >= (3, 12):
35 from importlib import metadata
36 else:
37 import importlib_metadata as metadata
38 from collections.abc import Generator
39 from types import ModuleType
40
41 from ..listeners.listener import ListenerManager
42
43log = logging.getLogger(__name__)
44
45
46class AirflowPluginSource:
47 """Class used to define an AirflowPluginSource."""
48
49 def __str__(self):
50 raise NotImplementedError
51
52 def __html__(self):
53 raise NotImplementedError
54
55
56class PluginsDirectorySource(AirflowPluginSource):
57 """Class used to define Plugins loaded from Plugins Directory."""
58
59 def __init__(self, path, plugins_folder: str):
60 self.path = os.path.relpath(path, plugins_folder)
61
62 def __str__(self):
63 return f"$PLUGINS_FOLDER/{self.path}"
64
65 def __html__(self):
66 return f"<em>$PLUGINS_FOLDER/</em>{self.path}"
67
68
69class EntryPointSource(AirflowPluginSource):
70 """Class used to define Plugins loaded from entrypoint."""
71
72 def __init__(self, entrypoint: metadata.EntryPoint, dist: metadata.Distribution):
73 self.dist = dist.metadata["Name"] # type: ignore[index]
74 self.version = dist.version
75 self.entrypoint = str(entrypoint)
76
77 def __str__(self):
78 return f"{self.dist}=={self.version}: {self.entrypoint}"
79
80 def __html__(self):
81 return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}"
82
83
84class AirflowPluginException(Exception):
85 """Exception when loading plugin."""
86
87
88class AirflowPlugin:
89 """Class used to define AirflowPlugin."""
90
91 name: str | None = None
92 source: AirflowPluginSource | None = None
93 macros: list[Any] = []
94 admin_views: list[Any] = []
95 flask_blueprints: list[Any] = []
96 fastapi_apps: list[Any] = []
97 fastapi_root_middlewares: list[Any] = []
98 external_views: list[Any] = []
99 react_apps: list[Any] = []
100 menu_links: list[Any] = []
101 appbuilder_views: list[Any] = []
102 appbuilder_menu_items: list[Any] = []
103
104 # A list of global operator extra links that can redirect users to
105 # external systems. These extra links will be available on the
106 # task page in the form of buttons.
107 #
108 # Note: the global operator extra link can be overridden at each
109 # operator level.
110 global_operator_extra_links: list[Any] = []
111
112 # A list of operator extra links to override or add operator links
113 # to existing Airflow Operators.
114 #
115 # These extra links will be available on the task page in form of
116 # buttons.
117 operator_extra_links: list[Any] = []
118
119 # A list of timetable classes that can be used for Dag scheduling.
120 timetables: list[Any] = []
121
122 # A list of timetable classes that can be used for Dag scheduling.
123 partition_mappers: list[Any] = []
124
125 # A list of listeners that can be used for tracking task and Dag states.
126 listeners: list[ModuleType | object] = []
127
128 # A list of hook lineage reader classes that can be used for reading lineage information from a hook.
129 hook_lineage_readers: list[Any] = []
130
131 # A list of priority weight strategy classes that can be used for calculating tasks weight priority.
132 priority_weight_strategies: list[Any] = []
133
134 @classmethod
135 def validate(cls):
136 """Validate if plugin has a name."""
137 if not cls.name:
138 raise AirflowPluginException("Your plugin needs a name.")
139
140 @classmethod
141 def on_load(cls, *args, **kwargs):
142 """
143 Execute when the plugin is loaded; This method is only called once during runtime.
144
145 :param args: If future arguments are passed in on call.
146 :param kwargs: If future arguments are passed in on call.
147 """
148
149
150def is_valid_plugin(plugin_obj) -> bool:
151 """
152 Check whether a potential object is a subclass of the AirflowPlugin class.
153
154 :param plugin_obj: potential subclass of AirflowPlugin
155 :return: Whether or not the obj is a valid subclass of
156 AirflowPlugin
157 """
158 if not inspect.isclass(plugin_obj):
159 return False
160
161 # Temporarily here, we use a name base checking instead of issubclass() because the shared library
162 # is accessed via different symlink paths in core (airflow._shared) and task sdk (airflow.sdk._shared).
163 # Python treats these as different modules, so the AirflowPlugin class has different identities in each context.
164 # Providers will typically inherit from SDK AirflowPlugin, so using issubclass() would fail when core tries
165 # to validate those plugins and provider plugins won't work in airflow core.
166 # For now, by validating by class name, we allow plugins defined with either
167 # core's or SDK's AirflowPlugin to be loaded.
168 is_airflow_plugin = any(
169 base.__name__ == "AirflowPlugin" and "plugins_manager" in base.__module__
170 for base in plugin_obj.__mro__
171 )
172
173 if is_airflow_plugin and plugin_obj.__name__ != "AirflowPlugin":
174 plugin_obj.validate()
175 return True
176 return False
177
178
179def _load_entrypoint_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
180 """
181 Load and register plugins AirflowPlugin subclasses from the entrypoints.
182
183 The entry_point group should be 'airflow.plugins'.
184 """
185 from ..module_loading import entry_points_with_dist
186
187 log.debug("Loading plugins from entrypoints")
188
189 plugins: list[AirflowPlugin] = []
190 import_errors: dict[str, str] = {}
191 for entry_point, dist in entry_points_with_dist("airflow.plugins"):
192 log.debug("Importing entry_point plugin %s", entry_point.name)
193 try:
194 plugin_class = entry_point.load()
195 if not is_valid_plugin(plugin_class):
196 continue
197
198 plugin_instance: AirflowPlugin = plugin_class()
199 plugin_instance.source = EntryPointSource(entry_point, dist)
200 plugins.append(plugin_instance)
201 except Exception as e:
202 log.exception("Failed to import plugin %s", entry_point.name)
203 import_errors[entry_point.module] = str(e)
204 return plugins, import_errors
205
206
207def _load_plugins_from_plugin_directory(
208 plugins_folder: str,
209 load_examples: bool = False,
210 example_plugins_module: str | None = None,
211 ignore_file_syntax: str = "glob",
212) -> tuple[list[AirflowPlugin], dict[str, str]]:
213 """Load and register Airflow Plugins from plugins directory."""
214 from ..module_loading import find_path_from_directory
215
216 if not plugins_folder:
217 raise ValueError("Plugins folder is not set")
218 log.debug("Loading plugins from directory: %s", plugins_folder)
219 files = find_path_from_directory(plugins_folder, ".airflowignore", ignore_file_syntax)
220 plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)]
221
222 if load_examples:
223 if not example_plugins_module:
224 raise ValueError("example_plugins_module is required when load_examples is True")
225 log.debug("Note: Loading plugins from examples as well: %s", plugins_folder)
226 example_plugins = importlib.import_module(example_plugins_module)
227 example_plugins_folder = next(iter(example_plugins.__path__))
228 example_files = find_path_from_directory(example_plugins_folder, ".airflowignore")
229 plugin_search_locations.append((example_plugins.__name__, example_files))
230
231 plugins: list[AirflowPlugin] = []
232 import_errors: dict[str, str] = {}
233 for module_prefix, plugin_files in plugin_search_locations:
234 for file_path in plugin_files:
235 path = Path(file_path)
236 if not path.is_file() or path.suffix != ".py":
237 continue
238 mod_name = f"{module_prefix}.{path.stem}" if module_prefix else path.stem
239
240 try:
241 loader = importlib.machinery.SourceFileLoader(mod_name, file_path)
242 spec = importlib.util.spec_from_loader(mod_name, loader)
243 if not spec:
244 log.error("Could not load spec for module %s at %s", mod_name, file_path)
245 continue
246 mod = importlib.util.module_from_spec(spec)
247 sys.modules[spec.name] = mod
248 loader.exec_module(mod)
249
250 for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)):
251 plugin_instance: AirflowPlugin = mod_attr_value()
252 plugin_instance.source = PluginsDirectorySource(file_path, plugins_folder)
253 plugins.append(plugin_instance)
254 except Exception as e:
255 log.exception("Failed to import plugin %s", file_path)
256 import_errors[file_path] = str(e)
257 return plugins, import_errors
258
259
260def make_module(name: str, objects: list[Any]) -> ModuleType | None:
261 """Create new module."""
262 if not objects:
263 return None
264 log.debug("Creating module %s", name)
265 name = name.lower()
266 module = types.ModuleType(name)
267 module._name = name.split(".")[-1] # type: ignore
268 module._objects = objects # type: ignore
269 module.__dict__.update((o.__name__, o) for o in objects)
270 return module
271
272
273def integrate_macros_plugins(
274 target_macros_module: ModuleType, macros_module_name_prefix: str, plugins: list[AirflowPlugin]
275) -> None:
276 """
277 Register macros from plugins onto the target macros module.
278
279 For each plugin with macros, creates a submodule and attaches it to
280 the target module so macros can be accessed in templates as
281 ``{{ macros.plugin_name.macro_func() }}``.
282 """
283 log.debug("Integrate Macros plugins")
284
285 for plugin in plugins:
286 if plugin.name is None:
287 raise AirflowPluginException("Invalid plugin name")
288
289 macros_module_instance = make_module(f"{macros_module_name_prefix}.{plugin.name}", plugin.macros)
290
291 if macros_module_instance:
292 sys.modules[macros_module_instance.__name__] = macros_module_instance
293 # Register the newly created module on the provided macros module
294 # so it can be accessed when rendering templates.
295 setattr(target_macros_module, plugin.name, macros_module_instance)
296
297
298def integrate_listener_plugins(listener_manager: ListenerManager, plugins: list[AirflowPlugin]) -> None:
299 """
300 Register listeners from plugins with the listener manager.
301
302 For each plugin with listeners, registers them with the provided
303 ListenerManager.
304 """
305 for plugin in plugins:
306 if plugin.name is None:
307 raise AirflowPluginException("Invalid plugin name")
308
309 for listener in plugin.listeners:
310 listener_manager.add_listener(listener)