1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Manages all plugins."""
19
20from __future__ import annotations
21
22import inspect
23import logging
24import os
25import sys
26import types
27from pathlib import Path
28from typing import TYPE_CHECKING, Any
29
30if TYPE_CHECKING:
31 if sys.version_info >= (3, 12):
32 from importlib import metadata
33 else:
34 import importlib_metadata as metadata
35 from collections.abc import Generator
36 from types import ModuleType
37
38 from airflow.listeners.listener import ListenerManager
39
40log = logging.getLogger(__name__)
41
42
43class AirflowPluginSource:
44 """Class used to define an AirflowPluginSource."""
45
46 def __str__(self):
47 raise NotImplementedError
48
49 def __html__(self):
50 raise NotImplementedError
51
52
53class PluginsDirectorySource(AirflowPluginSource):
54 """Class used to define Plugins loaded from Plugins Directory."""
55
56 def __init__(self, path, plugins_folder: str):
57 self.path = os.path.relpath(path, plugins_folder)
58
59 def __str__(self):
60 return f"$PLUGINS_FOLDER/{self.path}"
61
62 def __html__(self):
63 return f"<em>$PLUGINS_FOLDER/</em>{self.path}"
64
65
66class EntryPointSource(AirflowPluginSource):
67 """Class used to define Plugins loaded from entrypoint."""
68
69 def __init__(self, entrypoint: metadata.EntryPoint, dist: metadata.Distribution):
70 self.dist = dist.metadata["Name"] # type: ignore[index]
71 self.version = dist.version
72 self.entrypoint = str(entrypoint)
73
74 def __str__(self):
75 return f"{self.dist}=={self.version}: {self.entrypoint}"
76
77 def __html__(self):
78 return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}"
79
80
81class AirflowPluginException(Exception):
82 """Exception when loading plugin."""
83
84
85class AirflowPlugin:
86 """Class used to define AirflowPlugin."""
87
88 name: str | None = None
89 source: AirflowPluginSource | None = None
90 macros: list[Any] = []
91 admin_views: list[Any] = []
92 flask_blueprints: list[Any] = []
93 fastapi_apps: list[Any] = []
94 fastapi_root_middlewares: list[Any] = []
95 external_views: list[Any] = []
96 react_apps: list[Any] = []
97 menu_links: list[Any] = []
98 appbuilder_views: list[Any] = []
99 appbuilder_menu_items: list[Any] = []
100
101 # A list of global operator extra links that can redirect users to
102 # external systems. These extra links will be available on the
103 # task page in the form of buttons.
104 #
105 # Note: the global operator extra link can be overridden at each
106 # operator level.
107 global_operator_extra_links: list[Any] = []
108
109 # A list of operator extra links to override or add operator links
110 # to existing Airflow Operators.
111 # These extra links will be available on the task page in form of
112 # buttons.
113 operator_extra_links: list[Any] = []
114
115 # A list of timetable classes that can be used for DAG scheduling.
116 timetables: list[Any] = []
117
118 # A list of listeners that can be used for tracking task and DAG states.
119 listeners: list[ModuleType | object] = []
120
121 # A list of hook lineage reader classes that can be used for reading lineage information from a hook.
122 hook_lineage_readers: list[Any] = []
123
124 # A list of priority weight strategy classes that can be used for calculating tasks weight priority.
125 priority_weight_strategies: list[Any] = []
126
127 @classmethod
128 def validate(cls):
129 """Validate if plugin has a name."""
130 if not cls.name:
131 raise AirflowPluginException("Your plugin needs a name.")
132
133 @classmethod
134 def on_load(cls, *args, **kwargs):
135 """
136 Execute when the plugin is loaded; This method is only called once during runtime.
137
138 :param args: If future arguments are passed in on call.
139 :param kwargs: If future arguments are passed in on call.
140 """
141
142
143def is_valid_plugin(plugin_obj) -> bool:
144 """
145 Check whether a potential object is a subclass of the AirflowPlugin class.
146
147 :param plugin_obj: potential subclass of AirflowPlugin
148 :return: Whether or not the obj is a valid subclass of
149 AirflowPlugin
150 """
151 if not inspect.isclass(plugin_obj):
152 return False
153
154 # Temporarily here, we use a name base checking instead of issubclass() because the shared library
155 # is accessed via different symlink paths in core (airflow._shared) and task sdk (airflow.sdk._shared).
156 # Python treats these as different modules, so the AirflowPlugin class has different identities in each context.
157 # Providers will typically inherit from SDK AirflowPlugin, so using issubclass() would fail when core tries
158 # to validate those plugins and provider plugins won't work in airflow core.
159 # For now, by validating by class name, we allow plugins defined with either
160 # core's or SDK's AirflowPlugin to be loaded.
161 is_airflow_plugin = any(
162 base.__name__ == "AirflowPlugin" and "plugins_manager" in base.__module__
163 for base in plugin_obj.__mro__
164 )
165
166 if is_airflow_plugin and plugin_obj.__name__ != "AirflowPlugin":
167 plugin_obj.validate()
168 return True
169 return False
170
171
172def _load_entrypoint_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
173 """
174 Load and register plugins AirflowPlugin subclasses from the entrypoints.
175
176 The entry_point group should be 'airflow.plugins'.
177 """
178 from ..module_loading import entry_points_with_dist
179
180 log.debug("Loading plugins from entrypoints")
181
182 plugins: list[AirflowPlugin] = []
183 import_errors: dict[str, str] = {}
184 for entry_point, dist in entry_points_with_dist("airflow.plugins"):
185 log.debug("Importing entry_point plugin %s", entry_point.name)
186 try:
187 plugin_class = entry_point.load()
188 if not is_valid_plugin(plugin_class):
189 continue
190
191 plugin_instance: AirflowPlugin = plugin_class()
192 plugin_instance.source = EntryPointSource(entry_point, dist)
193 plugins.append(plugin_instance)
194 except Exception as e:
195 log.exception("Failed to import plugin %s", entry_point.name)
196 import_errors[entry_point.module] = str(e)
197 return plugins, import_errors
198
199
200def _load_plugins_from_plugin_directory(
201 plugins_folder: str,
202 load_examples: bool = False,
203 example_plugins_module: str | None = None,
204 ignore_file_syntax: str = "glob",
205) -> tuple[list[AirflowPlugin], dict[str, str]]:
206 """Load and register Airflow Plugins from plugins directory."""
207 from ..module_loading import find_path_from_directory
208
209 if not plugins_folder:
210 raise ValueError("Plugins folder is not set")
211 log.debug("Loading plugins from directory: %s", plugins_folder)
212 files = find_path_from_directory(plugins_folder, ".airflowignore", ignore_file_syntax)
213 plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)]
214
215 if load_examples:
216 log.debug("Note: Loading plugins from examples as well: %s", plugins_folder)
217 import importlib
218
219 example_plugins = importlib.import_module(example_plugins_module)
220 example_plugins_folder = next(iter(example_plugins.__path__))
221 example_files = find_path_from_directory(example_plugins_folder, ".airflowignore")
222 plugin_search_locations.append((example_plugins.__name__, example_files))
223
224 plugins: list[AirflowPlugin] = []
225 import_errors: dict[str, str] = {}
226 for module_prefix, plugin_files in plugin_search_locations:
227 for file_path in plugin_files:
228 path = Path(file_path)
229 if not path.is_file() or path.suffix != ".py":
230 continue
231 mod_name = f"{module_prefix}.{path.stem}" if module_prefix else path.stem
232
233 try:
234 loader = importlib.machinery.SourceFileLoader(mod_name, file_path)
235 spec = importlib.util.spec_from_loader(mod_name, loader)
236 if not spec:
237 log.error("Could not load spec for module %s at %s", mod_name, file_path)
238 continue
239 mod = importlib.util.module_from_spec(spec)
240 sys.modules[spec.name] = mod
241 loader.exec_module(mod)
242
243 for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)):
244 plugin_instance: AirflowPlugin = mod_attr_value()
245 plugin_instance.source = PluginsDirectorySource(file_path, plugins_folder)
246 plugins.append(plugin_instance)
247 except Exception as e:
248 log.exception("Failed to import plugin %s", file_path)
249 import_errors[file_path] = str(e)
250 return plugins, import_errors
251
252
253def make_module(name: str, objects: list[Any]) -> ModuleType | None:
254 """Create new module."""
255 if not objects:
256 return None
257 log.debug("Creating module %s", name)
258 name = name.lower()
259 module = types.ModuleType(name)
260 module._name = name.split(".")[-1] # type: ignore
261 module._objects = objects # type: ignore
262 module.__dict__.update((o.__name__, o) for o in objects)
263 return module
264
265
266def integrate_macros_plugins(
267 target_macros_module: ModuleType, macros_module_name_prefix: str, plugins: list[AirflowPlugin]
268) -> None:
269 """
270 Register macros from plugins onto the target macros module.
271
272 For each plugin with macros, creates a submodule and attaches it to
273 the target module so macros can be accessed in templates as
274 ``{{ macros.plugin_name.macro_func() }}``.
275 """
276 log.debug("Integrate Macros plugins")
277
278 for plugin in plugins:
279 if plugin.name is None:
280 raise AirflowPluginException("Invalid plugin name")
281
282 macros_module_instance = make_module(f"{macros_module_name_prefix}.{plugin.name}", plugin.macros)
283
284 if macros_module_instance:
285 sys.modules[macros_module_instance.__name__] = macros_module_instance
286 # Register the newly created module on the provided macros module
287 # so it can be accessed when rendering templates.
288 setattr(target_macros_module, plugin.name, macros_module_instance)
289
290
291def integrate_listener_plugins(listener_manager: ListenerManager, plugins: list[AirflowPlugin]) -> None:
292 """
293 Register listeners from plugins with the listener manager.
294
295 For each plugin with listeners, registers them with the provided
296 ListenerManager.
297 """
298 for plugin in plugins:
299 if plugin.name is None:
300 raise AirflowPluginException("Invalid plugin name")
301
302 for listener in plugin.listeners:
303 listener_manager.add_listener(listener)