1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Manages all plugins."""
19
20from __future__ import annotations
21
22import importlib.machinery
23import importlib.util
24import inspect
25import logging
26import os
27import sys
28import types
29from collections.abc import Iterable
30from pathlib import Path
31from typing import TYPE_CHECKING, Any
32
33from airflow import settings
34from airflow._shared.module_loading import import_string, qualname
35from airflow.configuration import conf
36from airflow.task.priority_strategy import (
37 PriorityWeightStrategy,
38 airflow_priority_weight_strategies,
39)
40from airflow.utils.entry_points import entry_points_with_dist
41from airflow.utils.file import find_path_from_directory
42
43if TYPE_CHECKING:
44 from airflow.lineage.hook import HookLineageReader
45
46 try:
47 import importlib_metadata as metadata
48 except ImportError:
49 from importlib import metadata # type: ignore[no-redef]
50 from collections.abc import Generator
51 from types import ModuleType
52
53 from airflow.listeners.listener import ListenerManager
54 from airflow.timetables.base import Timetable
55
56log = logging.getLogger(__name__)
57
58import_errors: dict[str, str] = {}
59
60plugins: list[AirflowPlugin] | None = None
61loaded_plugins: set[str] = set()
62
63# Plugin components to integrate as modules
64macros_modules: list[Any] | None = None
65
66# Plugin components to integrate directly
67admin_views: list[Any] | None = None
68flask_blueprints: list[Any] | None = None
69fastapi_apps: list[Any] | None = None
70fastapi_root_middlewares: list[Any] | None = None
71external_views: list[Any] | None = None
72react_apps: list[Any] | None = None
73menu_links: list[Any] | None = None
74flask_appbuilder_views: list[Any] | None = None
75flask_appbuilder_menu_links: list[Any] | None = None
76global_operator_extra_links: list[Any] | None = None
77operator_extra_links: list[Any] | None = None
78registered_operator_link_classes: dict[str, type] | None = None
79timetable_classes: dict[str, type[Timetable]] | None = None
80hook_lineage_reader_classes: list[type[HookLineageReader]] | None = None
81priority_weight_strategy_classes: dict[str, type[PriorityWeightStrategy]] | None = None
82"""
83Mapping of class names to class of OperatorLinks registered by plugins.
84
85Used by the DAG serialization code to only allow specific classes to be created
86during deserialization
87"""
88PLUGINS_ATTRIBUTES_TO_DUMP = {
89 "macros",
90 "admin_views",
91 "flask_blueprints",
92 "fastapi_apps",
93 "fastapi_root_middlewares",
94 "external_views",
95 "react_apps",
96 "menu_links",
97 "appbuilder_views",
98 "appbuilder_menu_items",
99 "global_operator_extra_links",
100 "operator_extra_links",
101 "source",
102 "timetables",
103 "listeners",
104 "priority_weight_strategies",
105}
106
107
108class AirflowPluginSource:
109 """Class used to define an AirflowPluginSource."""
110
111 def __str__(self):
112 raise NotImplementedError
113
114 def __html__(self):
115 raise NotImplementedError
116
117
118class PluginsDirectorySource(AirflowPluginSource):
119 """Class used to define Plugins loaded from Plugins Directory."""
120
121 def __init__(self, path):
122 self.path = os.path.relpath(path, settings.PLUGINS_FOLDER)
123
124 def __str__(self):
125 return f"$PLUGINS_FOLDER/{self.path}"
126
127 def __html__(self):
128 return f"<em>$PLUGINS_FOLDER/</em>{self.path}"
129
130
131class EntryPointSource(AirflowPluginSource):
132 """Class used to define Plugins loaded from entrypoint."""
133
134 def __init__(self, entrypoint: metadata.EntryPoint, dist: metadata.Distribution):
135 self.dist = dist.metadata["Name"] # type: ignore[index]
136 self.version = dist.version
137 self.entrypoint = str(entrypoint)
138
139 def __str__(self):
140 return f"{self.dist}=={self.version}: {self.entrypoint}"
141
142 def __html__(self):
143 return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}"
144
145
146class AirflowPluginException(Exception):
147 """Exception when loading plugin."""
148
149
150class AirflowPlugin:
151 """Class used to define AirflowPlugin."""
152
153 name: str | None = None
154 source: AirflowPluginSource | None = None
155 macros: list[Any] = []
156 admin_views: list[Any] = []
157 flask_blueprints: list[Any] = []
158 fastapi_apps: list[Any] = []
159 fastapi_root_middlewares: list[Any] = []
160 external_views: list[Any] = []
161 react_apps: list[Any] = []
162 menu_links: list[Any] = []
163 appbuilder_views: list[Any] = []
164 appbuilder_menu_items: list[Any] = []
165
166 # A list of global operator extra links that can redirect users to
167 # external systems. These extra links will be available on the
168 # task page in the form of buttons.
169 #
170 # Note: the global operator extra link can be overridden at each
171 # operator level.
172 global_operator_extra_links: list[Any] = []
173
174 # A list of operator extra links to override or add operator links
175 # to existing Airflow Operators.
176 # These extra links will be available on the task page in form of
177 # buttons.
178 operator_extra_links: list[Any] = []
179
180 # A list of timetable classes that can be used for DAG scheduling.
181 timetables: list[type[Timetable]] = []
182
183 # A list of listeners that can be used for tracking task and DAG states.
184 listeners: list[ModuleType | object] = []
185
186 # A list of hook lineage reader classes that can be used for reading lineage information from a hook.
187 hook_lineage_readers: list[type[HookLineageReader]] = []
188
189 # A list of priority weight strategy classes that can be used for calculating tasks weight priority.
190 priority_weight_strategies: list[type[PriorityWeightStrategy]] = []
191
192 @classmethod
193 def validate(cls):
194 """Validate if plugin has a name."""
195 if not cls.name:
196 raise AirflowPluginException("Your plugin needs a name.")
197
198 @classmethod
199 def on_load(cls, *args, **kwargs):
200 """
201 Execute when the plugin is loaded; This method is only called once during runtime.
202
203 :param args: If future arguments are passed in on call.
204 :param kwargs: If future arguments are passed in on call.
205 """
206
207
208def is_valid_plugin(plugin_obj):
209 """
210 Check whether a potential object is a subclass of the AirflowPlugin class.
211
212 :param plugin_obj: potential subclass of AirflowPlugin
213 :return: Whether or not the obj is a valid subclass of
214 AirflowPlugin
215 """
216 if (
217 inspect.isclass(plugin_obj)
218 and issubclass(plugin_obj, AirflowPlugin)
219 and (plugin_obj is not AirflowPlugin)
220 ):
221 plugin_obj.validate()
222 return plugin_obj not in plugins
223 return False
224
225
226def register_plugin(plugin_instance):
227 """
228 Start plugin load and register it after success initialization.
229
230 If plugin is already registered, do nothing.
231
232 :param plugin_instance: subclass of AirflowPlugin
233 """
234 if plugin_instance.name in loaded_plugins:
235 return
236
237 loaded_plugins.add(plugin_instance.name)
238 plugin_instance.on_load()
239 plugins.append(plugin_instance)
240
241
242def load_entrypoint_plugins():
243 """
244 Load and register plugins AirflowPlugin subclasses from the entrypoints.
245
246 The entry_point group should be 'airflow.plugins'.
247 """
248 log.debug("Loading plugins from entrypoints")
249
250 for entry_point, dist in entry_points_with_dist("airflow.plugins"):
251 log.debug("Importing entry_point plugin %s", entry_point.name)
252 try:
253 plugin_class = entry_point.load()
254 if not is_valid_plugin(plugin_class):
255 continue
256
257 plugin_instance = plugin_class()
258 plugin_instance.source = EntryPointSource(entry_point, dist)
259 register_plugin(plugin_instance)
260 except Exception as e:
261 log.exception("Failed to import plugin %s", entry_point.name)
262 import_errors[entry_point.module] = str(e)
263
264
265def load_plugins_from_plugin_directory():
266 """Load and register Airflow Plugins from plugins directory."""
267 log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
268 files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore")
269 plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)]
270
271 if conf.getboolean("core", "LOAD_EXAMPLES"):
272 log.debug("Note: Loading plugins from examples as well: %s", settings.PLUGINS_FOLDER)
273 from airflow.example_dags import plugins
274
275 example_plugins_folder = next(iter(plugins.__path__))
276 example_files = find_path_from_directory(example_plugins_folder, ".airflowignore")
277 plugin_search_locations.append((plugins.__name__, example_files))
278
279 for module_prefix, plugin_files in plugin_search_locations:
280 for file_path in plugin_files:
281 path = Path(file_path)
282 if not path.is_file() or path.suffix != ".py":
283 continue
284 mod_name = f"{module_prefix}.{path.stem}" if module_prefix else path.stem
285
286 try:
287 loader = importlib.machinery.SourceFileLoader(mod_name, file_path)
288 spec = importlib.util.spec_from_loader(mod_name, loader)
289 mod = importlib.util.module_from_spec(spec)
290 sys.modules[spec.name] = mod
291 loader.exec_module(mod)
292
293 for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)):
294 plugin_instance = mod_attr_value()
295 plugin_instance.source = PluginsDirectorySource(file_path)
296 register_plugin(plugin_instance)
297 except Exception as e:
298 log.exception("Failed to import plugin %s", file_path)
299 import_errors[file_path] = str(e)
300
301
302def load_providers_plugins():
303 from airflow.providers_manager import ProvidersManager
304
305 log.debug("Loading plugins from providers")
306 providers_manager = ProvidersManager()
307 providers_manager.initialize_providers_plugins()
308 for plugin in providers_manager.plugins:
309 log.debug("Importing plugin %s from class %s", plugin.name, plugin.plugin_class)
310
311 try:
312 plugin_instance = import_string(plugin.plugin_class)
313 if is_valid_plugin(plugin_instance):
314 register_plugin(plugin_instance)
315 else:
316 log.warning("Plugin %s is not a valid plugin", plugin.name)
317 except ImportError:
318 log.exception("Failed to load plugin %s from class name %s", plugin.name, plugin.plugin_class)
319
320
321def make_module(name: str, objects: list[Any]):
322 """Create new module."""
323 if not objects:
324 return None
325 log.debug("Creating module %s", name)
326 name = name.lower()
327 module = types.ModuleType(name)
328 module._name = name.split(".")[-1] # type: ignore
329 module._objects = objects # type: ignore
330 module.__dict__.update((o.__name__, o) for o in objects)
331 return module
332
333
334def ensure_plugins_loaded():
335 """
336 Load plugins from plugins directory and entrypoints.
337
338 Plugins are only loaded if they have not been previously loaded.
339 """
340 from airflow.observability.stats import Stats
341
342 global plugins
343
344 if plugins is not None:
345 log.debug("Plugins are already loaded. Skipping.")
346 return
347
348 if not settings.PLUGINS_FOLDER:
349 raise ValueError("Plugins folder is not set")
350
351 log.debug("Loading plugins")
352
353 with Stats.timer() as timer:
354 plugins = []
355
356 load_plugins_from_plugin_directory()
357 load_entrypoint_plugins()
358
359 if not settings.LAZY_LOAD_PROVIDERS:
360 load_providers_plugins()
361
362 if plugins:
363 log.debug("Loading %d plugin(s) took %.2f seconds", len(plugins), timer.duration)
364
365
366def initialize_ui_plugins():
367 """Collect extension points for the UI."""
368 global external_views
369 global react_apps
370
371 if external_views is not None and react_apps is not None:
372 return
373
374 ensure_plugins_loaded()
375
376 if plugins is None:
377 raise AirflowPluginException("Can't load plugins.")
378
379 log.debug("Initialize UI plugin")
380
381 seen_url_route = {}
382 external_views = []
383 react_apps = []
384
385 def _remove_list_item(lst, item):
386 # Mutate in place the plugin's external views and react apps list to remove the invalid items
387 # because some function still access these plugin's attribute and not the
388 # global variables `external_views` `react_apps`. (get_plugin_info, for example)
389 lst.remove(item)
390
391 for plugin in plugins:
392 external_views_to_remove = []
393 react_apps_to_remove = []
394 for external_view in plugin.external_views:
395 if not isinstance(external_view, dict):
396 log.warning(
397 "Plugin '%s' has an external view that is not a dictionary. The view will not be loaded.",
398 plugin.name,
399 )
400 external_views_to_remove.append(external_view)
401 continue
402 url_route = external_view.get("url_route")
403 if url_route is None:
404 continue
405 if url_route in seen_url_route:
406 log.warning(
407 "Plugin '%s' has an external view with an URL route '%s' "
408 "that conflicts with another plugin '%s'. The view will not be loaded.",
409 plugin.name,
410 url_route,
411 seen_url_route[url_route],
412 )
413 external_views_to_remove.append(external_view)
414 continue
415 external_views.append(external_view)
416 seen_url_route[url_route] = plugin.name
417
418 for react_app in plugin.react_apps:
419 if not isinstance(react_app, dict):
420 log.warning(
421 "Plugin '%s' has a React App that is not a dictionary. The React App will not be loaded.",
422 plugin.name,
423 )
424 react_apps_to_remove.append(react_app)
425 continue
426 url_route = react_app.get("url_route")
427 if url_route is None:
428 continue
429 if url_route in seen_url_route:
430 log.warning(
431 "Plugin '%s' has a React App with an URL route '%s' "
432 "that conflicts with another plugin '%s'. The React App will not be loaded.",
433 plugin.name,
434 url_route,
435 seen_url_route[url_route],
436 )
437 react_apps_to_remove.append(react_app)
438 continue
439 react_apps.append(react_app)
440 seen_url_route[url_route] = plugin.name
441
442 for item in external_views_to_remove:
443 _remove_list_item(plugin.external_views, item)
444 for item in react_apps_to_remove:
445 _remove_list_item(plugin.react_apps, item)
446
447
448def initialize_flask_plugins():
449 """Collect flask extension points for WEB UI (legacy)."""
450 global flask_blueprints
451 global flask_appbuilder_views
452 global flask_appbuilder_menu_links
453
454 if (
455 flask_blueprints is not None
456 and flask_appbuilder_views is not None
457 and flask_appbuilder_menu_links is not None
458 ):
459 return
460
461 ensure_plugins_loaded()
462
463 if plugins is None:
464 raise AirflowPluginException("Can't load plugins.")
465
466 log.debug("Initialize legacy Web UI plugin")
467
468 flask_blueprints = []
469 flask_appbuilder_views = []
470 flask_appbuilder_menu_links = []
471
472 for plugin in plugins:
473 flask_appbuilder_views.extend(plugin.appbuilder_views)
474 flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items)
475 flask_blueprints.extend([{"name": plugin.name, "blueprint": bp} for bp in plugin.flask_blueprints])
476
477 if (plugin.admin_views and not plugin.appbuilder_views) or (
478 plugin.menu_links and not plugin.appbuilder_menu_items
479 ):
480 log.warning(
481 "Plugin '%s' may not be compatible with the current Airflow version. "
482 "Please contact the author of the plugin.",
483 plugin.name,
484 )
485
486
487def initialize_fastapi_plugins():
488 """Collect extension points for the API."""
489 global fastapi_apps
490 global fastapi_root_middlewares
491
492 if fastapi_apps is not None and fastapi_root_middlewares is not None:
493 return
494
495 ensure_plugins_loaded()
496
497 if plugins is None:
498 raise AirflowPluginException("Can't load plugins.")
499
500 log.debug("Initialize FastAPI plugins")
501
502 fastapi_apps = []
503 fastapi_root_middlewares = []
504
505 for plugin in plugins:
506 fastapi_apps.extend(plugin.fastapi_apps)
507 fastapi_root_middlewares.extend(plugin.fastapi_root_middlewares)
508
509
510def initialize_extra_operators_links_plugins():
511 """Create modules for loaded extension from extra operators links plugins."""
512 global global_operator_extra_links
513 global operator_extra_links
514 global registered_operator_link_classes
515
516 if (
517 global_operator_extra_links is not None
518 and operator_extra_links is not None
519 and registered_operator_link_classes is not None
520 ):
521 return
522
523 ensure_plugins_loaded()
524
525 if plugins is None:
526 raise AirflowPluginException("Can't load plugins.")
527
528 log.debug("Initialize extra operators links plugins")
529
530 global_operator_extra_links = []
531 operator_extra_links = []
532 registered_operator_link_classes = {}
533
534 for plugin in plugins:
535 global_operator_extra_links.extend(plugin.global_operator_extra_links)
536 operator_extra_links.extend(list(plugin.operator_extra_links))
537
538 registered_operator_link_classes.update(
539 {qualname(link.__class__): link.__class__ for link in plugin.operator_extra_links}
540 )
541
542
543def initialize_timetables_plugins():
544 """Collect timetable classes registered by plugins."""
545 global timetable_classes
546
547 if timetable_classes is not None:
548 return
549
550 ensure_plugins_loaded()
551
552 if plugins is None:
553 raise AirflowPluginException("Can't load plugins.")
554
555 log.debug("Initialize extra timetables plugins")
556
557 timetable_classes = {
558 qualname(timetable_class): timetable_class
559 for plugin in plugins
560 for timetable_class in plugin.timetables
561 }
562
563
564def initialize_hook_lineage_readers_plugins():
565 """Collect hook lineage reader classes registered by plugins."""
566 global hook_lineage_reader_classes
567
568 if hook_lineage_reader_classes is not None:
569 return
570
571 ensure_plugins_loaded()
572
573 if plugins is None:
574 raise AirflowPluginException("Can't load plugins.")
575
576 log.debug("Initialize hook lineage readers plugins")
577
578 hook_lineage_reader_classes = []
579 for plugin in plugins:
580 hook_lineage_reader_classes.extend(plugin.hook_lineage_readers)
581
582
583def integrate_macros_plugins() -> None:
584 """Integrates macro plugins."""
585 global macros_modules
586
587 from airflow.sdk.execution_time import macros
588
589 if macros_modules is not None:
590 return
591
592 ensure_plugins_loaded()
593
594 if plugins is None:
595 raise AirflowPluginException("Can't load plugins.")
596
597 log.debug("Integrate Macros plugins")
598
599 macros_modules = []
600
601 for plugin in plugins:
602 if plugin.name is None:
603 raise AirflowPluginException("Invalid plugin name")
604
605 macros_module = make_module(f"airflow.sdk.execution_time.macros.{plugin.name}", plugin.macros)
606
607 if macros_module:
608 macros_modules.append(macros_module)
609 sys.modules[macros_module.__name__] = macros_module
610 # Register the newly created module on airflow.macros such that it
611 # can be accessed when rendering templates.
612 setattr(macros, plugin.name, macros_module)
613
614
615def integrate_listener_plugins(listener_manager: ListenerManager) -> None:
616 """Add listeners from plugins."""
617 ensure_plugins_loaded()
618
619 if plugins:
620 for plugin in plugins:
621 if plugin.name is None:
622 raise AirflowPluginException("Invalid plugin name")
623
624 for listener in plugin.listeners:
625 listener_manager.add_listener(listener)
626
627
628def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str, Any]]:
629 """
630 Dump plugins attributes.
631
632 :param attrs_to_dump: A list of plugin attributes to dump
633 """
634 ensure_plugins_loaded()
635 integrate_macros_plugins()
636 initialize_flask_plugins()
637 initialize_fastapi_plugins()
638 initialize_ui_plugins()
639 initialize_extra_operators_links_plugins()
640 if not attrs_to_dump:
641 attrs_to_dump = PLUGINS_ATTRIBUTES_TO_DUMP
642 plugins_info = []
643 if plugins:
644 for plugin in plugins:
645 info: dict[str, Any] = {"name": plugin.name}
646 for attr in attrs_to_dump:
647 if attr in ("global_operator_extra_links", "operator_extra_links"):
648 info[attr] = [f"<{qualname(d.__class__)} object>" for d in getattr(plugin, attr)]
649 elif attr in ("macros", "timetables", "priority_weight_strategies"):
650 info[attr] = [qualname(d) for d in getattr(plugin, attr)]
651 elif attr == "listeners":
652 # listeners may be modules or class instances
653 info[attr] = [
654 d.__name__ if inspect.ismodule(d) else qualname(d) for d in getattr(plugin, attr)
655 ]
656 elif attr == "appbuilder_views":
657 info[attr] = [
658 {**d, "view": qualname(d["view"].__class__) if "view" in d else None}
659 for d in getattr(plugin, attr)
660 ]
661 elif attr == "flask_blueprints":
662 info[attr] = [
663 f"<{qualname(d.__class__)}: name={d.name!r} import_name={d.import_name!r}>"
664 for d in getattr(plugin, attr)
665 ]
666 elif attr == "fastapi_apps":
667 info[attr] = [
668 {**d, "app": qualname(d["app"].__class__) if "app" in d else None}
669 for d in getattr(plugin, attr)
670 ]
671 elif attr == "fastapi_root_middlewares":
672 # remove args and kwargs from plugin info to hide potentially sensitive info.
673 info[attr] = [
674 {
675 k: (v if k != "middleware" else qualname(middleware_dict["middleware"]))
676 for k, v in middleware_dict.items()
677 if k not in ("args", "kwargs")
678 }
679 for middleware_dict in getattr(plugin, attr)
680 ]
681 else:
682 info[attr] = getattr(plugin, attr)
683 plugins_info.append(info)
684 return plugins_info
685
686
687def initialize_priority_weight_strategy_plugins():
688 """Collect priority weight strategy classes registered by plugins."""
689 global priority_weight_strategy_classes
690
691 if priority_weight_strategy_classes is not None:
692 return
693
694 ensure_plugins_loaded()
695
696 if plugins is None:
697 raise AirflowPluginException("Can't load plugins.")
698
699 log.debug("Initialize extra priority weight strategy plugins")
700
701 plugins_priority_weight_strategy_classes = {
702 qualname(priority_weight_strategy_class): priority_weight_strategy_class
703 for plugin in plugins
704 for priority_weight_strategy_class in plugin.priority_weight_strategies
705 }
706 priority_weight_strategy_classes = {
707 **airflow_priority_weight_strategies,
708 **plugins_priority_weight_strategy_classes,
709 }