Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/plugins_manager.py: 28%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Manages all plugins."""
20from __future__ import annotations
22import importlib
23import importlib.machinery
24import importlib.util
25import inspect
26import logging
27import os
28import sys
29import types
30from pathlib import Path
31from typing import TYPE_CHECKING, Any, Iterable
33from airflow import settings
34from airflow.task.priority_strategy import (
35 PriorityWeightStrategy,
36 airflow_priority_weight_strategies,
37)
38from airflow.utils.entry_points import entry_points_with_dist
39from airflow.utils.file import find_path_from_directory
40from airflow.utils.module_loading import import_string, qualname
42if TYPE_CHECKING:
43 try:
44 import importlib_metadata as metadata
45 except ImportError:
46 from importlib import metadata # type: ignore[no-redef]
47 from types import ModuleType
49 from airflow.hooks.base import BaseHook
50 from airflow.listeners.listener import ListenerManager
51 from airflow.timetables.base import Timetable
53log = logging.getLogger(__name__)
55import_errors: dict[str, str] = {}
57plugins: list[AirflowPlugin] | None = None
58loaded_plugins: set[str] = set()
60# Plugin components to integrate as modules
61registered_hooks: list[BaseHook] | None = None
62macros_modules: list[Any] | None = None
63executors_modules: list[Any] | None = None
65# Plugin components to integrate directly
66admin_views: list[Any] | None = None
67flask_blueprints: list[Any] | None = None
68menu_links: list[Any] | None = None
69flask_appbuilder_views: list[Any] | None = None
70flask_appbuilder_menu_links: list[Any] | None = None
71global_operator_extra_links: list[Any] | None = None
72operator_extra_links: list[Any] | None = None
73registered_operator_link_classes: dict[str, type] | None = None
74registered_ti_dep_classes: dict[str, type] | None = None
75timetable_classes: dict[str, type[Timetable]] | None = None
76priority_weight_strategy_classes: dict[str, type[PriorityWeightStrategy]] | None = None
77"""
78Mapping of class names to class of OperatorLinks registered by plugins.
80Used by the DAG serialization code to only allow specific classes to be created
81during deserialization
82"""
83PLUGINS_ATTRIBUTES_TO_DUMP = {
84 "hooks",
85 "executors",
86 "macros",
87 "admin_views",
88 "flask_blueprints",
89 "menu_links",
90 "appbuilder_views",
91 "appbuilder_menu_items",
92 "global_operator_extra_links",
93 "operator_extra_links",
94 "source",
95 "ti_deps",
96 "timetables",
97 "listeners",
98 "priority_weight_strategies",
99}
102class AirflowPluginSource:
103 """Class used to define an AirflowPluginSource."""
105 def __str__(self):
106 raise NotImplementedError
108 def __html__(self):
109 raise NotImplementedError
112class PluginsDirectorySource(AirflowPluginSource):
113 """Class used to define Plugins loaded from Plugins Directory."""
115 def __init__(self, path):
116 self.path = os.path.relpath(path, settings.PLUGINS_FOLDER)
118 def __str__(self):
119 return f"$PLUGINS_FOLDER/{self.path}"
121 def __html__(self):
122 return f"<em>$PLUGINS_FOLDER/</em>{self.path}"
125class EntryPointSource(AirflowPluginSource):
126 """Class used to define Plugins loaded from entrypoint."""
128 def __init__(self, entrypoint: metadata.EntryPoint, dist: metadata.Distribution):
129 self.dist = dist.metadata["Name"]
130 self.version = dist.version
131 self.entrypoint = str(entrypoint)
133 def __str__(self):
134 return f"{self.dist}=={self.version}: {self.entrypoint}"
136 def __html__(self):
137 return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}"
140class AirflowPluginException(Exception):
141 """Exception when loading plugin."""
144class AirflowPlugin:
145 """Class used to define AirflowPlugin."""
147 name: str | None = None
148 source: AirflowPluginSource | None = None
149 hooks: list[Any] = []
150 executors: list[Any] = []
151 macros: list[Any] = []
152 admin_views: list[Any] = []
153 flask_blueprints: list[Any] = []
154 menu_links: list[Any] = []
155 appbuilder_views: list[Any] = []
156 appbuilder_menu_items: list[Any] = []
158 # A list of global operator extra links that can redirect users to
159 # external systems. These extra links will be available on the
160 # task page in the form of buttons.
161 #
162 # Note: the global operator extra link can be overridden at each
163 # operator level.
164 global_operator_extra_links: list[Any] = []
166 # A list of operator extra links to override or add operator links
167 # to existing Airflow Operators.
168 # These extra links will be available on the task page in form of
169 # buttons.
170 operator_extra_links: list[Any] = []
172 ti_deps: list[Any] = []
174 # A list of timetable classes that can be used for DAG scheduling.
175 timetables: list[type[Timetable]] = []
177 listeners: list[ModuleType | object] = []
179 # A list of priority weight strategy classes that can be used for calculating tasks weight priority.
180 priority_weight_strategies: list[type[PriorityWeightStrategy]] = []
182 @classmethod
183 def validate(cls):
184 """Validate if plugin has a name."""
185 if not cls.name:
186 raise AirflowPluginException("Your plugin needs a name.")
188 @classmethod
189 def on_load(cls, *args, **kwargs):
190 """
191 Execute when the plugin is loaded; This method is only called once during runtime.
193 :param args: If future arguments are passed in on call.
194 :param kwargs: If future arguments are passed in on call.
195 """
198def is_valid_plugin(plugin_obj):
199 """
200 Check whether a potential object is a subclass of the AirflowPlugin class.
202 :param plugin_obj: potential subclass of AirflowPlugin
203 :return: Whether or not the obj is a valid subclass of
204 AirflowPlugin
205 """
206 global plugins
208 if (
209 inspect.isclass(plugin_obj)
210 and issubclass(plugin_obj, AirflowPlugin)
211 and (plugin_obj is not AirflowPlugin)
212 ):
213 plugin_obj.validate()
214 return plugin_obj not in plugins
215 return False
218def register_plugin(plugin_instance):
219 """
220 Start plugin load and register it after success initialization.
222 If plugin is already registered, do nothing.
224 :param plugin_instance: subclass of AirflowPlugin
225 """
226 global plugins
228 if plugin_instance.name in loaded_plugins:
229 return
231 loaded_plugins.add(plugin_instance.name)
232 plugin_instance.on_load()
233 plugins.append(plugin_instance)
236def load_entrypoint_plugins():
237 """
238 Load and register plugins AirflowPlugin subclasses from the entrypoints.
240 The entry_point group should be 'airflow.plugins'.
241 """
242 global import_errors
244 log.debug("Loading plugins from entrypoints")
246 for entry_point, dist in entry_points_with_dist("airflow.plugins"):
247 log.debug("Importing entry_point plugin %s", entry_point.name)
248 try:
249 plugin_class = entry_point.load()
250 if not is_valid_plugin(plugin_class):
251 continue
253 plugin_instance = plugin_class()
254 plugin_instance.source = EntryPointSource(entry_point, dist)
255 register_plugin(plugin_instance)
256 except Exception as e:
257 log.exception("Failed to import plugin %s", entry_point.name)
258 import_errors[entry_point.module] = str(e)
261def load_plugins_from_plugin_directory():
262 """Load and register Airflow Plugins from plugins directory."""
263 global import_errors
264 log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
266 for file_path in find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore"):
267 path = Path(file_path)
268 if not path.is_file() or path.suffix != ".py":
269 continue
270 mod_name = path.stem
272 try:
273 loader = importlib.machinery.SourceFileLoader(mod_name, file_path)
274 spec = importlib.util.spec_from_loader(mod_name, loader)
275 mod = importlib.util.module_from_spec(spec)
276 sys.modules[spec.name] = mod
277 loader.exec_module(mod)
278 log.debug("Importing plugin module %s", file_path)
280 for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)):
281 plugin_instance = mod_attr_value()
282 plugin_instance.source = PluginsDirectorySource(file_path)
283 register_plugin(plugin_instance)
284 except Exception as e:
285 log.exception("Failed to import plugin %s", file_path)
286 import_errors[file_path] = str(e)
289def load_providers_plugins():
290 from airflow.providers_manager import ProvidersManager
292 log.debug("Loading plugins from providers")
293 providers_manager = ProvidersManager()
294 providers_manager.initialize_providers_plugins()
295 for plugin in providers_manager.plugins:
296 log.debug("Importing plugin %s from class %s", plugin.name, plugin.plugin_class)
298 try:
299 plugin_instance = import_string(plugin.plugin_class)
300 if is_valid_plugin(plugin_instance):
301 register_plugin(plugin_instance)
302 else:
303 log.warning("Plugin %s is not a valid plugin", plugin.name)
304 except ImportError:
305 log.exception("Failed to load plugin %s from class name %s", plugin.name, plugin.plugin_class)
308def make_module(name: str, objects: list[Any]):
309 """Create new module."""
310 if not objects:
311 return None
312 log.debug("Creating module %s", name)
313 name = name.lower()
314 module = types.ModuleType(name)
315 module._name = name.split(".")[-1] # type: ignore
316 module._objects = objects # type: ignore
317 module.__dict__.update((o.__name__, o) for o in objects)
318 return module
321def ensure_plugins_loaded():
322 """
323 Load plugins from plugins directory and entrypoints.
325 Plugins are only loaded if they have not been previously loaded.
326 """
327 from airflow.stats import Stats
329 global plugins, registered_hooks
331 if plugins is not None:
332 log.debug("Plugins are already loaded. Skipping.")
333 return
335 if not settings.PLUGINS_FOLDER:
336 raise ValueError("Plugins folder is not set")
338 log.debug("Loading plugins")
340 with Stats.timer() as timer:
341 plugins = []
342 registered_hooks = []
344 load_plugins_from_plugin_directory()
345 load_entrypoint_plugins()
347 if not settings.LAZY_LOAD_PROVIDERS:
348 load_providers_plugins()
350 # We don't do anything with these for now, but we want to keep track of
351 # them so we can integrate them in to the UI's Connection screens
352 for plugin in plugins:
353 registered_hooks.extend(plugin.hooks)
355 if plugins:
356 log.debug("Loading %d plugin(s) took %.2f seconds", len(plugins), timer.duration)
359def initialize_web_ui_plugins():
360 """Collect extension points for WEB UI."""
361 global plugins
362 global flask_blueprints
363 global flask_appbuilder_views
364 global flask_appbuilder_menu_links
366 if (
367 flask_blueprints is not None
368 and flask_appbuilder_views is not None
369 and flask_appbuilder_menu_links is not None
370 ):
371 return
373 ensure_plugins_loaded()
375 if plugins is None:
376 raise AirflowPluginException("Can't load plugins.")
378 log.debug("Initialize Web UI plugin")
380 flask_blueprints = []
381 flask_appbuilder_views = []
382 flask_appbuilder_menu_links = []
384 for plugin in plugins:
385 flask_appbuilder_views.extend(plugin.appbuilder_views)
386 flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items)
387 flask_blueprints.extend([{"name": plugin.name, "blueprint": bp} for bp in plugin.flask_blueprints])
389 if (plugin.admin_views and not plugin.appbuilder_views) or (
390 plugin.menu_links and not plugin.appbuilder_menu_items
391 ):
392 log.warning(
393 "Plugin '%s' may not be compatible with the current Airflow version. "
394 "Please contact the author of the plugin.",
395 plugin.name,
396 )
399def initialize_ti_deps_plugins():
400 """Create modules for loaded extension from custom task instance dependency rule plugins."""
401 global registered_ti_dep_classes
402 if registered_ti_dep_classes is not None:
403 return
405 ensure_plugins_loaded()
407 if plugins is None:
408 raise AirflowPluginException("Can't load plugins.")
410 log.debug("Initialize custom taskinstance deps plugins")
412 registered_ti_dep_classes = {}
414 for plugin in plugins:
415 registered_ti_dep_classes.update(
416 {qualname(ti_dep.__class__): ti_dep.__class__ for ti_dep in plugin.ti_deps}
417 )
420def initialize_extra_operators_links_plugins():
421 """Create modules for loaded extension from extra operators links plugins."""
422 global global_operator_extra_links
423 global operator_extra_links
424 global registered_operator_link_classes
426 if (
427 global_operator_extra_links is not None
428 and operator_extra_links is not None
429 and registered_operator_link_classes is not None
430 ):
431 return
433 ensure_plugins_loaded()
435 if plugins is None:
436 raise AirflowPluginException("Can't load plugins.")
438 log.debug("Initialize extra operators links plugins")
440 global_operator_extra_links = []
441 operator_extra_links = []
442 registered_operator_link_classes = {}
444 for plugin in plugins:
445 global_operator_extra_links.extend(plugin.global_operator_extra_links)
446 operator_extra_links.extend(list(plugin.operator_extra_links))
448 registered_operator_link_classes.update(
449 {qualname(link.__class__): link.__class__ for link in plugin.operator_extra_links}
450 )
453def initialize_timetables_plugins():
454 """Collect timetable classes registered by plugins."""
455 global timetable_classes
457 if timetable_classes is not None:
458 return
460 ensure_plugins_loaded()
462 if plugins is None:
463 raise AirflowPluginException("Can't load plugins.")
465 log.debug("Initialize extra timetables plugins")
467 timetable_classes = {
468 qualname(timetable_class): timetable_class
469 for plugin in plugins
470 for timetable_class in plugin.timetables
471 }
474def integrate_executor_plugins() -> None:
475 """Integrate executor plugins to the context."""
476 global plugins
477 global executors_modules
479 if executors_modules is not None:
480 return
482 ensure_plugins_loaded()
484 if plugins is None:
485 raise AirflowPluginException("Can't load plugins.")
487 log.debug("Integrate executor plugins")
489 executors_modules = []
490 for plugin in plugins:
491 if plugin.name is None:
492 raise AirflowPluginException("Invalid plugin name")
493 plugin_name: str = plugin.name
495 executors_module = make_module("airflow.executors." + plugin_name, plugin.executors)
496 if executors_module:
497 executors_modules.append(executors_module)
498 sys.modules[executors_module.__name__] = executors_module
501def integrate_macros_plugins() -> None:
502 """Integrates macro plugins."""
503 global plugins
504 global macros_modules
506 from airflow import macros
508 if macros_modules is not None:
509 return
511 ensure_plugins_loaded()
513 if plugins is None:
514 raise AirflowPluginException("Can't load plugins.")
516 log.debug("Integrate DAG plugins")
518 macros_modules = []
520 for plugin in plugins:
521 if plugin.name is None:
522 raise AirflowPluginException("Invalid plugin name")
524 macros_module = make_module(f"airflow.macros.{plugin.name}", plugin.macros)
526 if macros_module:
527 macros_modules.append(macros_module)
528 sys.modules[macros_module.__name__] = macros_module
529 # Register the newly created module on airflow.macros such that it
530 # can be accessed when rendering templates.
531 setattr(macros, plugin.name, macros_module)
534def integrate_listener_plugins(listener_manager: ListenerManager) -> None:
535 """Add listeners from plugins."""
536 global plugins
538 ensure_plugins_loaded()
540 if plugins:
541 for plugin in plugins:
542 if plugin.name is None:
543 raise AirflowPluginException("Invalid plugin name")
545 for listener in plugin.listeners:
546 listener_manager.add_listener(listener)
549def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str, Any]]:
550 """
551 Dump plugins attributes.
553 :param attrs_to_dump: A list of plugin attributes to dump
554 """
555 ensure_plugins_loaded()
556 integrate_executor_plugins()
557 integrate_macros_plugins()
558 initialize_web_ui_plugins()
559 initialize_extra_operators_links_plugins()
560 if not attrs_to_dump:
561 attrs_to_dump = PLUGINS_ATTRIBUTES_TO_DUMP
562 plugins_info = []
563 if plugins:
564 for plugin in plugins:
565 info: dict[str, Any] = {"name": plugin.name}
566 for attr in attrs_to_dump:
567 if attr in ("global_operator_extra_links", "operator_extra_links"):
568 info[attr] = [f"<{qualname(d.__class__)} object>" for d in getattr(plugin, attr)]
569 elif attr in ("macros", "timetables", "hooks", "executors", "priority_weight_strategies"):
570 info[attr] = [qualname(d) for d in getattr(plugin, attr)]
571 elif attr == "listeners":
572 # listeners may be modules or class instances
573 info[attr] = [
574 d.__name__ if inspect.ismodule(d) else qualname(d) for d in getattr(plugin, attr)
575 ]
576 elif attr == "appbuilder_views":
577 info[attr] = [
578 {**d, "view": qualname(d["view"].__class__) if "view" in d else None}
579 for d in getattr(plugin, attr)
580 ]
581 elif attr == "flask_blueprints":
582 info[attr] = [
583 f"<{qualname(d.__class__)}: name={d.name!r} import_name={d.import_name!r}>"
584 for d in getattr(plugin, attr)
585 ]
586 else:
587 info[attr] = getattr(plugin, attr)
588 plugins_info.append(info)
589 return plugins_info
592def initialize_priority_weight_strategy_plugins():
593 """Collect priority weight strategy classes registered by plugins."""
594 global priority_weight_strategy_classes
596 if priority_weight_strategy_classes is not None:
597 return
599 ensure_plugins_loaded()
601 if plugins is None:
602 raise AirflowPluginException("Can't load plugins.")
604 log.debug("Initialize extra priority weight strategy plugins")
606 plugins_priority_weight_strategy_classes = {
607 qualname(priority_weight_strategy_class): priority_weight_strategy_class
608 for plugin in plugins
609 for priority_weight_strategy_class in plugin.priority_weight_strategies
610 }
611 priority_weight_strategy_classes = {
612 **airflow_priority_weight_strategies,
613 **plugins_priority_weight_strategy_classes,
614 }