1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""SDK wrapper for plugins manager."""
19
20from __future__ import annotations
21
22import logging
23from functools import cache
24from typing import TYPE_CHECKING
25
26from airflow import settings
27from airflow.sdk._shared.module_loading import import_string
28from airflow.sdk._shared.observability.metrics.stats import Stats
29from airflow.sdk._shared.plugins_manager import (
30 AirflowPlugin,
31 _load_entrypoint_plugins,
32 _load_plugins_from_plugin_directory,
33 integrate_listener_plugins as _integrate_listener_plugins,
34 integrate_macros_plugins as _integrate_macros_plugins,
35 is_valid_plugin,
36)
37from airflow.sdk.configuration import conf
38from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
39
40if TYPE_CHECKING:
41 from airflow.sdk._shared.listeners.listener import ListenerManager
42 from airflow.sdk.lineage import HookLineageReader
43
44log = logging.getLogger(__name__)
45
46
47def _load_providers_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
48 """Load plugins from providers."""
49 log.debug("Loading plugins from providers")
50 providers_manager = ProvidersManagerTaskRuntime()
51 providers_manager.initialize_providers_plugins()
52
53 plugins: list[AirflowPlugin] = []
54 import_errors: dict[str, str] = {}
55 for plugin in providers_manager.plugins:
56 log.debug("Importing plugin %s from class %s", plugin.name, plugin.plugin_class)
57
58 try:
59 plugin_instance = import_string(plugin.plugin_class)
60 if is_valid_plugin(plugin_instance):
61 plugins.append(plugin_instance)
62 else:
63 log.warning("Plugin %s is not a valid plugin", plugin.name)
64 except ImportError:
65 log.exception("Failed to load plugin %s from class name %s", plugin.name, plugin.plugin_class)
66 return plugins, import_errors
67
68
69@cache
70def _get_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
71 """
72 Load plugins from plugins directory and entrypoints.
73
74 Plugins are only loaded if they have not been previously loaded.
75 """
76 if not settings.PLUGINS_FOLDER:
77 raise ValueError("Plugins folder is not set")
78
79 log.debug("Loading plugins")
80
81 plugins: list[AirflowPlugin] = []
82 import_errors: dict[str, str] = {}
83 loaded_plugins: set[str | None] = set()
84
85 def __register_plugins(plugin_instances: list[AirflowPlugin], errors: dict[str, str]) -> None:
86 for plugin_instance in plugin_instances:
87 if plugin_instance.name in loaded_plugins:
88 return
89
90 loaded_plugins.add(plugin_instance.name)
91 try:
92 plugin_instance.on_load()
93 plugins.append(plugin_instance)
94 except Exception as e:
95 log.exception("Failed to load plugin %s", plugin_instance.name)
96 name = str(plugin_instance.source) if plugin_instance.source else plugin_instance.name or ""
97 import_errors[name] = str(e)
98 import_errors.update(errors)
99
100 with Stats.timer() as timer:
101 load_examples = conf.getboolean("core", "LOAD_EXAMPLES")
102 __register_plugins(
103 *_load_plugins_from_plugin_directory(
104 plugins_folder=settings.PLUGINS_FOLDER,
105 load_examples=load_examples,
106 example_plugins_module="airflow.example_dags.plugins" if load_examples else None,
107 )
108 )
109 __register_plugins(*_load_entrypoint_plugins())
110
111 if not settings.LAZY_LOAD_PROVIDERS:
112 __register_plugins(*_load_providers_plugins())
113
114 log.debug("Loading %d plugin(s) took %.2f ms", len(plugins), timer.duration)
115 return plugins, import_errors
116
117
118@cache
119def integrate_macros_plugins() -> None:
120 """Integrates macro plugins."""
121 from airflow.sdk.execution_time import macros
122
123 plugins, _ = _get_plugins()
124 _integrate_macros_plugins(
125 target_macros_module=macros,
126 macros_module_name_prefix="airflow.sdk.execution_time.macros",
127 plugins=plugins,
128 )
129
130
131def integrate_listener_plugins(listener_manager: ListenerManager) -> None:
132 """Add listeners from plugins."""
133 plugins, _ = _get_plugins()
134 _integrate_listener_plugins(listener_manager, plugins=plugins)
135
136
137@cache
138def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]:
139 """Collect and get hook lineage reader classes registered by plugins."""
140 log.debug("Initialize hook lineage readers plugins")
141 result: list[type[HookLineageReader]] = []
142
143 for plugin in _get_plugins()[0]:
144 result.extend(plugin.hook_lineage_readers)
145 return result