Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/plugins_manager.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

68 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""SDK wrapper for plugins manager.""" 

19 

20from __future__ import annotations 

21 

22import logging 

23from functools import cache 

24from typing import TYPE_CHECKING 

25 

26from airflow import settings 

27from airflow.providers_manager import ProvidersManager 

28from airflow.sdk._shared.module_loading import import_string 

29from airflow.sdk._shared.plugins_manager import ( 

30 AirflowPlugin, 

31 _load_entrypoint_plugins, 

32 _load_plugins_from_plugin_directory, 

33 integrate_listener_plugins as _integrate_listener_plugins, 

34 integrate_macros_plugins as _integrate_macros_plugins, 

35 is_valid_plugin, 

36) 

37from airflow.sdk.configuration import conf 

38from airflow.sdk.observability.stats import Stats 

39 

40if TYPE_CHECKING: 

41 from airflow.listeners.listener import ListenerManager 

42 

43log = logging.getLogger(__name__) 

44 

45 

46def _load_providers_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]: 

47 """Load plugins from providers.""" 

48 log.debug("Loading plugins from providers") 

49 providers_manager = ProvidersManager() 

50 providers_manager.initialize_providers_plugins() 

51 

52 plugins: list[AirflowPlugin] = [] 

53 import_errors: dict[str, str] = {} 

54 for plugin in providers_manager.plugins: 

55 log.debug("Importing plugin %s from class %s", plugin.name, plugin.plugin_class) 

56 

57 try: 

58 plugin_instance = import_string(plugin.plugin_class) 

59 if is_valid_plugin(plugin_instance): 

60 plugins.append(plugin_instance) 

61 else: 

62 log.warning("Plugin %s is not a valid plugin", plugin.name) 

63 except ImportError: 

64 log.exception("Failed to load plugin %s from class name %s", plugin.name, plugin.plugin_class) 

65 return plugins, import_errors 

66 

67 

68@cache 

69def _get_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]: 

70 """ 

71 Load plugins from plugins directory and entrypoints. 

72 

73 Plugins are only loaded if they have not been previously loaded. 

74 """ 

75 if not settings.PLUGINS_FOLDER: 

76 raise ValueError("Plugins folder is not set") 

77 

78 log.debug("Loading plugins") 

79 

80 plugins: list[AirflowPlugin] = [] 

81 import_errors: dict[str, str] = {} 

82 loaded_plugins: set[str | None] = set() 

83 

84 def __register_plugins(plugin_instances: list[AirflowPlugin], errors: dict[str, str]) -> None: 

85 for plugin_instance in plugin_instances: 

86 if plugin_instance.name in loaded_plugins: 

87 return 

88 

89 loaded_plugins.add(plugin_instance.name) 

90 try: 

91 plugin_instance.on_load() 

92 plugins.append(plugin_instance) 

93 except Exception as e: 

94 log.exception("Failed to load plugin %s", plugin_instance.name) 

95 name = str(plugin_instance.source) if plugin_instance.source else plugin_instance.name or "" 

96 import_errors[name] = str(e) 

97 import_errors.update(errors) 

98 

99 with Stats.timer() as timer: 

100 load_examples = conf.getboolean("core", "LOAD_EXAMPLES") 

101 __register_plugins( 

102 *_load_plugins_from_plugin_directory( 

103 plugins_folder=settings.PLUGINS_FOLDER, 

104 load_examples=load_examples, 

105 example_plugins_module="airflow.example_dags.plugins" if load_examples else None, 

106 ) 

107 ) 

108 __register_plugins(*_load_entrypoint_plugins()) 

109 

110 if not settings.LAZY_LOAD_PROVIDERS: 

111 __register_plugins(*_load_providers_plugins()) 

112 

113 log.debug("Loading %d plugin(s) took %.2f seconds", len(plugins), timer.duration) 

114 return plugins, import_errors 

115 

116 

117@cache 

118def integrate_macros_plugins() -> None: 

119 """Integrates macro plugins.""" 

120 from airflow.sdk.execution_time import macros 

121 

122 plugins, _ = _get_plugins() 

123 _integrate_macros_plugins( 

124 target_macros_module=macros, 

125 macros_module_name_prefix="airflow.sdk.execution_time.macros", 

126 plugins=plugins, 

127 ) 

128 

129 

130def integrate_listener_plugins(listener_manager: ListenerManager) -> None: 

131 """Add listeners from plugins.""" 

132 plugins, _ = _get_plugins() 

133 _integrate_listener_plugins(listener_manager, plugins=plugins)