Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/plugins_manager.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

76 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""SDK wrapper for plugins manager.""" 

19 

20from __future__ import annotations 

21 

22import logging 

23from functools import cache 

24from typing import TYPE_CHECKING 

25 

26from airflow import settings 

27from airflow.sdk._shared.module_loading import import_string 

28from airflow.sdk._shared.observability.metrics.stats import Stats 

29from airflow.sdk._shared.plugins_manager import ( 

30 AirflowPlugin, 

31 _load_entrypoint_plugins, 

32 _load_plugins_from_plugin_directory, 

33 integrate_listener_plugins as _integrate_listener_plugins, 

34 integrate_macros_plugins as _integrate_macros_plugins, 

35 is_valid_plugin, 

36) 

37from airflow.sdk.configuration import conf 

38from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime 

39 

40if TYPE_CHECKING: 

41 from airflow.sdk._shared.listeners.listener import ListenerManager 

42 from airflow.sdk.lineage import HookLineageReader 

43 

44log = logging.getLogger(__name__) 

45 

46 

47def _load_providers_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]: 

48 """Load plugins from providers.""" 

49 log.debug("Loading plugins from providers") 

50 providers_manager = ProvidersManagerTaskRuntime() 

51 providers_manager.initialize_providers_plugins() 

52 

53 plugins: list[AirflowPlugin] = [] 

54 import_errors: dict[str, str] = {} 

55 for plugin in providers_manager.plugins: 

56 log.debug("Importing plugin %s from class %s", plugin.name, plugin.plugin_class) 

57 

58 try: 

59 plugin_instance = import_string(plugin.plugin_class) 

60 if is_valid_plugin(plugin_instance): 

61 plugins.append(plugin_instance) 

62 else: 

63 log.warning("Plugin %s is not a valid plugin", plugin.name) 

64 except ImportError: 

65 log.exception("Failed to load plugin %s from class name %s", plugin.name, plugin.plugin_class) 

66 return plugins, import_errors 

67 

68 

69@cache 

70def _get_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]: 

71 """ 

72 Load plugins from plugins directory and entrypoints. 

73 

74 Plugins are only loaded if they have not been previously loaded. 

75 """ 

76 if not settings.PLUGINS_FOLDER: 

77 raise ValueError("Plugins folder is not set") 

78 

79 log.debug("Loading plugins") 

80 

81 plugins: list[AirflowPlugin] = [] 

82 import_errors: dict[str, str] = {} 

83 loaded_plugins: set[str | None] = set() 

84 

85 def __register_plugins(plugin_instances: list[AirflowPlugin], errors: dict[str, str]) -> None: 

86 for plugin_instance in plugin_instances: 

87 if plugin_instance.name in loaded_plugins: 

88 return 

89 

90 loaded_plugins.add(plugin_instance.name) 

91 try: 

92 plugin_instance.on_load() 

93 plugins.append(plugin_instance) 

94 except Exception as e: 

95 log.exception("Failed to load plugin %s", plugin_instance.name) 

96 name = str(plugin_instance.source) if plugin_instance.source else plugin_instance.name or "" 

97 import_errors[name] = str(e) 

98 import_errors.update(errors) 

99 

100 with Stats.timer() as timer: 

101 load_examples = conf.getboolean("core", "LOAD_EXAMPLES") 

102 __register_plugins( 

103 *_load_plugins_from_plugin_directory( 

104 plugins_folder=settings.PLUGINS_FOLDER, 

105 load_examples=load_examples, 

106 example_plugins_module="airflow.example_dags.plugins" if load_examples else None, 

107 ) 

108 ) 

109 __register_plugins(*_load_entrypoint_plugins()) 

110 

111 if not settings.LAZY_LOAD_PROVIDERS: 

112 __register_plugins(*_load_providers_plugins()) 

113 

114 log.debug("Loading %d plugin(s) took %.2f ms", len(plugins), timer.duration) 

115 return plugins, import_errors 

116 

117 

118@cache 

119def integrate_macros_plugins() -> None: 

120 """Integrates macro plugins.""" 

121 from airflow.sdk.execution_time import macros 

122 

123 plugins, _ = _get_plugins() 

124 _integrate_macros_plugins( 

125 target_macros_module=macros, 

126 macros_module_name_prefix="airflow.sdk.execution_time.macros", 

127 plugins=plugins, 

128 ) 

129 

130 

131def integrate_listener_plugins(listener_manager: ListenerManager) -> None: 

132 """Add listeners from plugins.""" 

133 plugins, _ = _get_plugins() 

134 _integrate_listener_plugins(listener_manager, plugins=plugins) 

135 

136 

137@cache 

138def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]: 

139 """Collect and get hook lineage reader classes registered by plugins.""" 

140 log.debug("Initialize hook lineage readers plugins") 

141 result: list[type[HookLineageReader]] = [] 

142 

143 for plugin in _get_plugins()[0]: 

144 result.extend(plugin.hook_lineage_readers) 

145 return result