Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/plugins_manager.py: 30%
272 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Manages all plugins."""
19from __future__ import annotations
21import importlib
22import importlib.machinery
23import importlib.util
24import inspect
25import logging
26import os
27import sys
28import types
29from typing import TYPE_CHECKING, Any, Iterable
31try:
32 import importlib_metadata
33except ImportError:
34 from importlib import metadata as importlib_metadata # type: ignore[no-redef]
36from types import ModuleType
38from airflow import settings
39from airflow.utils.entry_points import entry_points_with_dist
40from airflow.utils.file import find_path_from_directory
41from airflow.utils.module_loading import qualname
43if TYPE_CHECKING:
44 from airflow.hooks.base import BaseHook
45 from airflow.listeners.listener import ListenerManager
46 from airflow.timetables.base import Timetable
48log = logging.getLogger(__name__)
50import_errors: dict[str, str] = {}
52plugins: list[AirflowPlugin] | None = None
54# Plugin components to integrate as modules
55registered_hooks: list[BaseHook] | None = None
56macros_modules: list[Any] | None = None
57executors_modules: list[Any] | None = None
59# Plugin components to integrate directly
60admin_views: list[Any] | None = None
61flask_blueprints: list[Any] | None = None
62menu_links: list[Any] | None = None
63flask_appbuilder_views: list[Any] | None = None
64flask_appbuilder_menu_links: list[Any] | None = None
65global_operator_extra_links: list[Any] | None = None
66operator_extra_links: list[Any] | None = None
67registered_operator_link_classes: dict[str, type] | None = None
68registered_ti_dep_classes: dict[str, type] | None = None
69timetable_classes: dict[str, type[Timetable]] | None = None
70"""
71Mapping of class names to class of OperatorLinks registered by plugins.
73Used by the DAG serialization code to only allow specific classes to be created
74during deserialization
75"""
76PLUGINS_ATTRIBUTES_TO_DUMP = {
77 "hooks",
78 "executors",
79 "macros",
80 "flask_blueprints",
81 "appbuilder_views",
82 "appbuilder_menu_items",
83 "global_operator_extra_links",
84 "operator_extra_links",
85 "ti_deps",
86 "timetables",
87 "source",
88 "listeners",
89}
92class AirflowPluginSource:
93 """Class used to define an AirflowPluginSource."""
95 def __str__(self):
96 raise NotImplementedError
98 def __html__(self):
99 raise NotImplementedError
102class PluginsDirectorySource(AirflowPluginSource):
103 """Class used to define Plugins loaded from Plugins Directory."""
105 def __init__(self, path):
106 self.path = os.path.relpath(path, settings.PLUGINS_FOLDER)
108 def __str__(self):
109 return f"$PLUGINS_FOLDER/{self.path}"
111 def __html__(self):
112 return f"<em>$PLUGINS_FOLDER/</em>{self.path}"
115class EntryPointSource(AirflowPluginSource):
116 """Class used to define Plugins loaded from entrypoint."""
118 def __init__(self, entrypoint: importlib_metadata.EntryPoint, dist: importlib_metadata.Distribution):
119 self.dist = dist.metadata["Name"]
120 self.version = dist.version
121 self.entrypoint = str(entrypoint)
123 def __str__(self):
124 return f"{self.dist}=={self.version}: {self.entrypoint}"
126 def __html__(self):
127 return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}"
130class AirflowPluginException(Exception):
131 """Exception when loading plugin."""
134class AirflowPlugin:
135 """Class used to define AirflowPlugin."""
137 name: str | None = None
138 source: AirflowPluginSource | None = None
139 hooks: list[Any] = []
140 executors: list[Any] = []
141 macros: list[Any] = []
142 admin_views: list[Any] = []
143 flask_blueprints: list[Any] = []
144 menu_links: list[Any] = []
145 appbuilder_views: list[Any] = []
146 appbuilder_menu_items: list[Any] = []
148 # A list of global operator extra links that can redirect users to
149 # external systems. These extra links will be available on the
150 # task page in the form of buttons.
151 #
152 # Note: the global operator extra link can be overridden at each
153 # operator level.
154 global_operator_extra_links: list[Any] = []
156 # A list of operator extra links to override or add operator links
157 # to existing Airflow Operators.
158 # These extra links will be available on the task page in form of
159 # buttons.
160 operator_extra_links: list[Any] = []
162 ti_deps: list[Any] = []
164 # A list of timetable classes that can be used for DAG scheduling.
165 timetables: list[type[Timetable]] = []
167 listeners: list[ModuleType] = []
169 @classmethod
170 def validate(cls):
171 """Validates that plugin has a name."""
172 if not cls.name:
173 raise AirflowPluginException("Your plugin needs a name.")
175 @classmethod
176 def on_load(cls, *args, **kwargs):
177 """
178 Executed when the plugin is loaded.
179 This method is only called once during runtime.
181 :param args: If future arguments are passed in on call.
182 :param kwargs: If future arguments are passed in on call.
183 """
186def is_valid_plugin(plugin_obj):
187 """
188 Check whether a potential object is a subclass of the AirflowPlugin class.
190 :param plugin_obj: potential subclass of AirflowPlugin
191 :return: Whether or not the obj is a valid subclass of
192 AirflowPlugin
193 """
194 global plugins
196 if (
197 inspect.isclass(plugin_obj)
198 and issubclass(plugin_obj, AirflowPlugin)
199 and (plugin_obj is not AirflowPlugin)
200 ):
201 plugin_obj.validate()
202 return plugin_obj not in plugins
203 return False
206def register_plugin(plugin_instance):
207 """
208 Start plugin load and register it after success initialization.
210 :param plugin_instance: subclass of AirflowPlugin
211 """
212 global plugins
213 plugin_instance.on_load()
214 plugins.append(plugin_instance)
217def load_entrypoint_plugins():
218 """
219 Load and register plugins AirflowPlugin subclasses from the entrypoints.
220 The entry_point group should be 'airflow.plugins'.
221 """
222 global import_errors
224 log.debug("Loading plugins from entrypoints")
226 for entry_point, dist in entry_points_with_dist("airflow.plugins"):
227 log.debug("Importing entry_point plugin %s", entry_point.name)
228 try:
229 plugin_class = entry_point.load()
230 if not is_valid_plugin(plugin_class):
231 continue
233 plugin_instance = plugin_class()
234 plugin_instance.source = EntryPointSource(entry_point, dist)
235 register_plugin(plugin_instance)
236 except Exception as e:
237 log.exception("Failed to import plugin %s", entry_point.name)
238 import_errors[entry_point.module] = str(e)
241def load_plugins_from_plugin_directory():
242 """Load and register Airflow Plugins from plugins directory."""
243 global import_errors
244 log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
246 for file_path in find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore"):
247 if not os.path.isfile(file_path):
248 continue
249 mod_name, file_ext = os.path.splitext(os.path.split(file_path)[-1])
250 if file_ext != ".py":
251 continue
253 try:
254 loader = importlib.machinery.SourceFileLoader(mod_name, file_path)
255 spec = importlib.util.spec_from_loader(mod_name, loader)
256 mod = importlib.util.module_from_spec(spec)
257 sys.modules[spec.name] = mod
258 loader.exec_module(mod)
259 log.debug("Importing plugin module %s", file_path)
261 for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)):
262 plugin_instance = mod_attr_value()
263 plugin_instance.source = PluginsDirectorySource(file_path)
264 register_plugin(plugin_instance)
265 except Exception as e:
266 log.exception("Failed to import plugin %s", file_path)
267 import_errors[file_path] = str(e)
270def make_module(name: str, objects: list[Any]):
271 """Creates new module."""
272 if not objects:
273 return None
274 log.debug("Creating module %s", name)
275 name = name.lower()
276 module = types.ModuleType(name)
277 module._name = name.split(".")[-1] # type: ignore
278 module._objects = objects # type: ignore
279 module.__dict__.update((o.__name__, o) for o in objects)
280 return module
283def ensure_plugins_loaded():
284 """
285 Load plugins from plugins directory and entrypoints.
287 Plugins are only loaded if they have not been previously loaded.
288 """
289 from airflow.stats import Stats
291 global plugins, registered_hooks
293 if plugins is not None:
294 log.debug("Plugins are already loaded. Skipping.")
295 return
297 if not settings.PLUGINS_FOLDER:
298 raise ValueError("Plugins folder is not set")
300 log.debug("Loading plugins")
302 with Stats.timer() as timer:
303 plugins = []
304 registered_hooks = []
306 load_plugins_from_plugin_directory()
307 load_entrypoint_plugins()
309 # We don't do anything with these for now, but we want to keep track of
310 # them so we can integrate them in to the UI's Connection screens
311 for plugin in plugins:
312 registered_hooks.extend(plugin.hooks)
314 num_loaded = len(plugins)
315 if num_loaded > 0:
316 log.debug("Loading %d plugin(s) took %.2f seconds", num_loaded, timer.duration)
319def initialize_web_ui_plugins():
320 """Collect extension points for WEB UI."""
321 global plugins
322 global flask_blueprints
323 global flask_appbuilder_views
324 global flask_appbuilder_menu_links
326 if (
327 flask_blueprints is not None
328 and flask_appbuilder_views is not None
329 and flask_appbuilder_menu_links is not None
330 ):
331 return
333 ensure_plugins_loaded()
335 if plugins is None:
336 raise AirflowPluginException("Can't load plugins.")
338 log.debug("Initialize Web UI plugin")
340 flask_blueprints = []
341 flask_appbuilder_views = []
342 flask_appbuilder_menu_links = []
344 for plugin in plugins:
345 flask_appbuilder_views.extend(plugin.appbuilder_views)
346 flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items)
347 flask_blueprints.extend([{"name": plugin.name, "blueprint": bp} for bp in plugin.flask_blueprints])
349 if (plugin.admin_views and not plugin.appbuilder_views) or (
350 plugin.menu_links and not plugin.appbuilder_menu_items
351 ):
352 log.warning(
353 "Plugin '%s' may not be compatible with the current Airflow version. "
354 "Please contact the author of the plugin.",
355 plugin.name,
356 )
359def initialize_ti_deps_plugins():
360 """Create modules for loaded extension from custom task instance dependency rule plugins."""
361 global registered_ti_dep_classes
362 if registered_ti_dep_classes is not None:
363 return
365 ensure_plugins_loaded()
367 if plugins is None:
368 raise AirflowPluginException("Can't load plugins.")
370 log.debug("Initialize custom taskinstance deps plugins")
372 registered_ti_dep_classes = {}
374 for plugin in plugins:
375 registered_ti_dep_classes.update(
376 {qualname(ti_dep.__class__): ti_dep.__class__ for ti_dep in plugin.ti_deps}
377 )
380def initialize_extra_operators_links_plugins():
381 """Create modules for loaded extension from extra operators links plugins."""
382 global global_operator_extra_links
383 global operator_extra_links
384 global registered_operator_link_classes
386 if (
387 global_operator_extra_links is not None
388 and operator_extra_links is not None
389 and registered_operator_link_classes is not None
390 ):
391 return
393 ensure_plugins_loaded()
395 if plugins is None:
396 raise AirflowPluginException("Can't load plugins.")
398 log.debug("Initialize extra operators links plugins")
400 global_operator_extra_links = []
401 operator_extra_links = []
402 registered_operator_link_classes = {}
404 for plugin in plugins:
405 global_operator_extra_links.extend(plugin.global_operator_extra_links)
406 operator_extra_links.extend(list(plugin.operator_extra_links))
408 registered_operator_link_classes.update(
409 {qualname(link.__class__): link.__class__ for link in plugin.operator_extra_links}
410 )
413def initialize_timetables_plugins():
414 """Collect timetable classes registered by plugins."""
415 global timetable_classes
417 if timetable_classes is not None:
418 return
420 ensure_plugins_loaded()
422 if plugins is None:
423 raise AirflowPluginException("Can't load plugins.")
425 log.debug("Initialize extra timetables plugins")
427 timetable_classes = {
428 qualname(timetable_class): timetable_class
429 for plugin in plugins
430 for timetable_class in plugin.timetables
431 }
434def integrate_executor_plugins() -> None:
435 """Integrate executor plugins to the context."""
436 global plugins
437 global executors_modules
439 if executors_modules is not None:
440 return
442 ensure_plugins_loaded()
444 if plugins is None:
445 raise AirflowPluginException("Can't load plugins.")
447 log.debug("Integrate executor plugins")
449 executors_modules = []
450 for plugin in plugins:
451 if plugin.name is None:
452 raise AirflowPluginException("Invalid plugin name")
453 plugin_name: str = plugin.name
455 executors_module = make_module("airflow.executors." + plugin_name, plugin.executors)
456 if executors_module:
457 executors_modules.append(executors_module)
458 sys.modules[executors_module.__name__] = executors_module
461def integrate_macros_plugins() -> None:
462 """Integrates macro plugins."""
463 global plugins
464 global macros_modules
466 from airflow import macros
468 if macros_modules is not None:
469 return
471 ensure_plugins_loaded()
473 if plugins is None:
474 raise AirflowPluginException("Can't load plugins.")
476 log.debug("Integrate DAG plugins")
478 macros_modules = []
480 for plugin in plugins:
481 if plugin.name is None:
482 raise AirflowPluginException("Invalid plugin name")
484 macros_module = make_module(f"airflow.macros.{plugin.name}", plugin.macros)
486 if macros_module:
487 macros_modules.append(macros_module)
488 sys.modules[macros_module.__name__] = macros_module
489 # Register the newly created module on airflow.macros such that it
490 # can be accessed when rendering templates.
491 setattr(macros, plugin.name, macros_module)
494def integrate_listener_plugins(listener_manager: ListenerManager) -> None:
495 """Add listeners from plugins."""
496 global plugins
498 ensure_plugins_loaded()
500 if plugins:
501 for plugin in plugins:
502 if plugin.name is None:
503 raise AirflowPluginException("Invalid plugin name")
505 for listener in plugin.listeners:
506 listener_manager.add_listener(listener)
509def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str, Any]]:
510 """
511 Dump plugins attributes.
513 :param attrs_to_dump: A list of plugin attributes to dump
514 """
515 ensure_plugins_loaded()
516 integrate_executor_plugins()
517 integrate_macros_plugins()
518 initialize_web_ui_plugins()
519 initialize_extra_operators_links_plugins()
520 if not attrs_to_dump:
521 attrs_to_dump = PLUGINS_ATTRIBUTES_TO_DUMP
522 plugins_info = []
523 if plugins:
524 for plugin in plugins:
525 info: dict[str, Any] = {"name": plugin.name}
526 for attr in attrs_to_dump:
527 if attr in ("global_operator_extra_links", "operator_extra_links"):
528 info[attr] = [f"<{qualname(d.__class__)} object>" for d in getattr(plugin, attr)]
529 elif attr in ("macros", "timetables", "hooks", "executors"):
530 info[attr] = [qualname(d) for d in getattr(plugin, attr)]
531 elif attr == "listeners":
532 # listeners are always modules
533 info[attr] = [d.__name__ for d in getattr(plugin, attr)]
534 elif attr == "appbuilder_views":
535 info[attr] = [
536 {**d, "view": qualname(d["view"].__class__) if "view" in d else None}
537 for d in getattr(plugin, attr)
538 ]
539 elif attr == "flask_blueprints":
540 info[attr] = [
541 f"<{qualname(d.__class__)}: name={d.name!r} import_name={d.import_name!r}>"
542 for d in getattr(plugin, attr)
543 ]
544 else:
545 info[attr] = getattr(plugin, attr)
546 plugins_info.append(info)
547 return plugins_info