Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/_shared/plugins_manager/plugins_manager.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

155 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Manages all plugins.""" 

19 

20from __future__ import annotations 

21 

22import inspect 

23import logging 

24import os 

25import sys 

26import types 

27from pathlib import Path 

28from typing import TYPE_CHECKING, Any 

29 

30if TYPE_CHECKING: 

31 if sys.version_info >= (3, 12): 

32 from importlib import metadata 

33 else: 

34 import importlib_metadata as metadata 

35 from collections.abc import Generator 

36 from types import ModuleType 

37 

38 from airflow.listeners.listener import ListenerManager 

39 

40log = logging.getLogger(__name__) 

41 

42 

43class AirflowPluginSource: 

44 """Class used to define an AirflowPluginSource.""" 

45 

46 def __str__(self): 

47 raise NotImplementedError 

48 

49 def __html__(self): 

50 raise NotImplementedError 

51 

52 

53class PluginsDirectorySource(AirflowPluginSource): 

54 """Class used to define Plugins loaded from Plugins Directory.""" 

55 

56 def __init__(self, path, plugins_folder: str): 

57 self.path = os.path.relpath(path, plugins_folder) 

58 

59 def __str__(self): 

60 return f"$PLUGINS_FOLDER/{self.path}" 

61 

62 def __html__(self): 

63 return f"<em>$PLUGINS_FOLDER/</em>{self.path}" 

64 

65 

66class EntryPointSource(AirflowPluginSource): 

67 """Class used to define Plugins loaded from entrypoint.""" 

68 

69 def __init__(self, entrypoint: metadata.EntryPoint, dist: metadata.Distribution): 

70 self.dist = dist.metadata["Name"] # type: ignore[index] 

71 self.version = dist.version 

72 self.entrypoint = str(entrypoint) 

73 

74 def __str__(self): 

75 return f"{self.dist}=={self.version}: {self.entrypoint}" 

76 

77 def __html__(self): 

78 return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}" 

79 

80 

81class AirflowPluginException(Exception): 

82 """Exception when loading plugin.""" 

83 

84 

85class AirflowPlugin: 

86 """Class used to define AirflowPlugin.""" 

87 

88 name: str | None = None 

89 source: AirflowPluginSource | None = None 

90 macros: list[Any] = [] 

91 admin_views: list[Any] = [] 

92 flask_blueprints: list[Any] = [] 

93 fastapi_apps: list[Any] = [] 

94 fastapi_root_middlewares: list[Any] = [] 

95 external_views: list[Any] = [] 

96 react_apps: list[Any] = [] 

97 menu_links: list[Any] = [] 

98 appbuilder_views: list[Any] = [] 

99 appbuilder_menu_items: list[Any] = [] 

100 

101 # A list of global operator extra links that can redirect users to 

102 # external systems. These extra links will be available on the 

103 # task page in the form of buttons. 

104 # 

105 # Note: the global operator extra link can be overridden at each 

106 # operator level. 

107 global_operator_extra_links: list[Any] = [] 

108 

109 # A list of operator extra links to override or add operator links 

110 # to existing Airflow Operators. 

111 # These extra links will be available on the task page in form of 

112 # buttons. 

113 operator_extra_links: list[Any] = [] 

114 

115 # A list of timetable classes that can be used for DAG scheduling. 

116 timetables: list[Any] = [] 

117 

118 # A list of listeners that can be used for tracking task and DAG states. 

119 listeners: list[ModuleType | object] = [] 

120 

121 # A list of hook lineage reader classes that can be used for reading lineage information from a hook. 

122 hook_lineage_readers: list[Any] = [] 

123 

124 # A list of priority weight strategy classes that can be used for calculating tasks weight priority. 

125 priority_weight_strategies: list[Any] = [] 

126 

127 @classmethod 

128 def validate(cls): 

129 """Validate if plugin has a name.""" 

130 if not cls.name: 

131 raise AirflowPluginException("Your plugin needs a name.") 

132 

133 @classmethod 

134 def on_load(cls, *args, **kwargs): 

135 """ 

136 Execute when the plugin is loaded; This method is only called once during runtime. 

137 

138 :param args: If future arguments are passed in on call. 

139 :param kwargs: If future arguments are passed in on call. 

140 """ 

141 

142 

143def is_valid_plugin(plugin_obj) -> bool: 

144 """ 

145 Check whether a potential object is a subclass of the AirflowPlugin class. 

146 

147 :param plugin_obj: potential subclass of AirflowPlugin 

148 :return: Whether or not the obj is a valid subclass of 

149 AirflowPlugin 

150 """ 

151 if not inspect.isclass(plugin_obj): 

152 return False 

153 

154 # Temporarily here, we use a name base checking instead of issubclass() because the shared library 

155 # is accessed via different symlink paths in core (airflow._shared) and task sdk (airflow.sdk._shared). 

156 # Python treats these as different modules, so the AirflowPlugin class has different identities in each context. 

157 # Providers will typically inherit from SDK AirflowPlugin, so using issubclass() would fail when core tries 

158 # to validate those plugins and provider plugins won't work in airflow core. 

159 # For now, by validating by class name, we allow plugins defined with either 

160 # core's or SDK's AirflowPlugin to be loaded. 

161 is_airflow_plugin = any( 

162 base.__name__ == "AirflowPlugin" and "plugins_manager" in base.__module__ 

163 for base in plugin_obj.__mro__ 

164 ) 

165 

166 if is_airflow_plugin and plugin_obj.__name__ != "AirflowPlugin": 

167 plugin_obj.validate() 

168 return True 

169 return False 

170 

171 

172def _load_entrypoint_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]: 

173 """ 

174 Load and register plugins AirflowPlugin subclasses from the entrypoints. 

175 

176 The entry_point group should be 'airflow.plugins'. 

177 """ 

178 from ..module_loading import entry_points_with_dist 

179 

180 log.debug("Loading plugins from entrypoints") 

181 

182 plugins: list[AirflowPlugin] = [] 

183 import_errors: dict[str, str] = {} 

184 for entry_point, dist in entry_points_with_dist("airflow.plugins"): 

185 log.debug("Importing entry_point plugin %s", entry_point.name) 

186 try: 

187 plugin_class = entry_point.load() 

188 if not is_valid_plugin(plugin_class): 

189 continue 

190 

191 plugin_instance: AirflowPlugin = plugin_class() 

192 plugin_instance.source = EntryPointSource(entry_point, dist) 

193 plugins.append(plugin_instance) 

194 except Exception as e: 

195 log.exception("Failed to import plugin %s", entry_point.name) 

196 import_errors[entry_point.module] = str(e) 

197 return plugins, import_errors 

198 

199 

200def _load_plugins_from_plugin_directory( 

201 plugins_folder: str, 

202 load_examples: bool = False, 

203 example_plugins_module: str | None = None, 

204 ignore_file_syntax: str = "glob", 

205) -> tuple[list[AirflowPlugin], dict[str, str]]: 

206 """Load and register Airflow Plugins from plugins directory.""" 

207 from ..module_loading import find_path_from_directory 

208 

209 if not plugins_folder: 

210 raise ValueError("Plugins folder is not set") 

211 log.debug("Loading plugins from directory: %s", plugins_folder) 

212 files = find_path_from_directory(plugins_folder, ".airflowignore", ignore_file_syntax) 

213 plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)] 

214 

215 if load_examples: 

216 log.debug("Note: Loading plugins from examples as well: %s", plugins_folder) 

217 import importlib 

218 

219 example_plugins = importlib.import_module(example_plugins_module) 

220 example_plugins_folder = next(iter(example_plugins.__path__)) 

221 example_files = find_path_from_directory(example_plugins_folder, ".airflowignore") 

222 plugin_search_locations.append((example_plugins.__name__, example_files)) 

223 

224 plugins: list[AirflowPlugin] = [] 

225 import_errors: dict[str, str] = {} 

226 for module_prefix, plugin_files in plugin_search_locations: 

227 for file_path in plugin_files: 

228 path = Path(file_path) 

229 if not path.is_file() or path.suffix != ".py": 

230 continue 

231 mod_name = f"{module_prefix}.{path.stem}" if module_prefix else path.stem 

232 

233 try: 

234 loader = importlib.machinery.SourceFileLoader(mod_name, file_path) 

235 spec = importlib.util.spec_from_loader(mod_name, loader) 

236 if not spec: 

237 log.error("Could not load spec for module %s at %s", mod_name, file_path) 

238 continue 

239 mod = importlib.util.module_from_spec(spec) 

240 sys.modules[spec.name] = mod 

241 loader.exec_module(mod) 

242 

243 for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)): 

244 plugin_instance: AirflowPlugin = mod_attr_value() 

245 plugin_instance.source = PluginsDirectorySource(file_path, plugins_folder) 

246 plugins.append(plugin_instance) 

247 except Exception as e: 

248 log.exception("Failed to import plugin %s", file_path) 

249 import_errors[file_path] = str(e) 

250 return plugins, import_errors 

251 

252 

253def make_module(name: str, objects: list[Any]) -> ModuleType | None: 

254 """Create new module.""" 

255 if not objects: 

256 return None 

257 log.debug("Creating module %s", name) 

258 name = name.lower() 

259 module = types.ModuleType(name) 

260 module._name = name.split(".")[-1] # type: ignore 

261 module._objects = objects # type: ignore 

262 module.__dict__.update((o.__name__, o) for o in objects) 

263 return module 

264 

265 

266def integrate_macros_plugins( 

267 target_macros_module: ModuleType, macros_module_name_prefix: str, plugins: list[AirflowPlugin] 

268) -> None: 

269 """ 

270 Register macros from plugins onto the target macros module. 

271 

272 For each plugin with macros, creates a submodule and attaches it to 

273 the target module so macros can be accessed in templates as 

274 ``{{ macros.plugin_name.macro_func() }}``. 

275 """ 

276 log.debug("Integrate Macros plugins") 

277 

278 for plugin in plugins: 

279 if plugin.name is None: 

280 raise AirflowPluginException("Invalid plugin name") 

281 

282 macros_module_instance = make_module(f"{macros_module_name_prefix}.{plugin.name}", plugin.macros) 

283 

284 if macros_module_instance: 

285 sys.modules[macros_module_instance.__name__] = macros_module_instance 

286 # Register the newly created module on the provided macros module 

287 # so it can be accessed when rendering templates. 

288 setattr(target_macros_module, plugin.name, macros_module_instance) 

289 

290 

291def integrate_listener_plugins(listener_manager: ListenerManager, plugins: list[AirflowPlugin]) -> None: 

292 """ 

293 Register listeners from plugins with the listener manager. 

294 

295 For each plugin with listeners, registers them with the provided 

296 ListenerManager. 

297 """ 

298 for plugin in plugins: 

299 if plugin.name is None: 

300 raise AirflowPluginException("Invalid plugin name") 

301 

302 for listener in plugin.listeners: 

303 listener_manager.add_listener(listener)