Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/_shared/plugins_manager/plugins_manager.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

160 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Manages all plugins.""" 

19 

20from __future__ import annotations 

21 

22import importlib 

23import importlib.machinery 

24import importlib.util 

25import inspect 

26import logging 

27import os 

28import sys 

29import types 

30from pathlib import Path 

31from typing import TYPE_CHECKING, Any 

32 

33if TYPE_CHECKING: 

34 if sys.version_info >= (3, 12): 

35 from importlib import metadata 

36 else: 

37 import importlib_metadata as metadata 

38 from collections.abc import Generator 

39 from types import ModuleType 

40 

41 from ..listeners.listener import ListenerManager 

42 

43log = logging.getLogger(__name__) 

44 

45 

46class AirflowPluginSource: 

47 """Class used to define an AirflowPluginSource.""" 

48 

49 def __str__(self): 

50 raise NotImplementedError 

51 

52 def __html__(self): 

53 raise NotImplementedError 

54 

55 

56class PluginsDirectorySource(AirflowPluginSource): 

57 """Class used to define Plugins loaded from Plugins Directory.""" 

58 

59 def __init__(self, path, plugins_folder: str): 

60 self.path = os.path.relpath(path, plugins_folder) 

61 

62 def __str__(self): 

63 return f"$PLUGINS_FOLDER/{self.path}" 

64 

65 def __html__(self): 

66 return f"<em>$PLUGINS_FOLDER/</em>{self.path}" 

67 

68 

69class EntryPointSource(AirflowPluginSource): 

70 """Class used to define Plugins loaded from entrypoint.""" 

71 

72 def __init__(self, entrypoint: metadata.EntryPoint, dist: metadata.Distribution): 

73 self.dist = dist.metadata["Name"] # type: ignore[index] 

74 self.version = dist.version 

75 self.entrypoint = str(entrypoint) 

76 

77 def __str__(self): 

78 return f"{self.dist}=={self.version}: {self.entrypoint}" 

79 

80 def __html__(self): 

81 return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}" 

82 

83 

84class AirflowPluginException(Exception): 

85 """Exception when loading plugin.""" 

86 

87 

88class AirflowPlugin: 

89 """Class used to define AirflowPlugin.""" 

90 

91 name: str | None = None 

92 source: AirflowPluginSource | None = None 

93 macros: list[Any] = [] 

94 admin_views: list[Any] = [] 

95 flask_blueprints: list[Any] = [] 

96 fastapi_apps: list[Any] = [] 

97 fastapi_root_middlewares: list[Any] = [] 

98 external_views: list[Any] = [] 

99 react_apps: list[Any] = [] 

100 menu_links: list[Any] = [] 

101 appbuilder_views: list[Any] = [] 

102 appbuilder_menu_items: list[Any] = [] 

103 

104 # A list of global operator extra links that can redirect users to 

105 # external systems. These extra links will be available on the 

106 # task page in the form of buttons. 

107 # 

108 # Note: the global operator extra link can be overridden at each 

109 # operator level. 

110 global_operator_extra_links: list[Any] = [] 

111 

112 # A list of operator extra links to override or add operator links 

113 # to existing Airflow Operators. 

114 # 

115 # These extra links will be available on the task page in form of 

116 # buttons. 

117 operator_extra_links: list[Any] = [] 

118 

119 # A list of timetable classes that can be used for Dag scheduling. 

120 timetables: list[Any] = [] 

121 

122 # A list of timetable classes that can be used for Dag scheduling. 

123 partition_mappers: list[Any] = [] 

124 

125 # A list of listeners that can be used for tracking task and Dag states. 

126 listeners: list[ModuleType | object] = [] 

127 

128 # A list of hook lineage reader classes that can be used for reading lineage information from a hook. 

129 hook_lineage_readers: list[Any] = [] 

130 

131 # A list of priority weight strategy classes that can be used for calculating tasks weight priority. 

132 priority_weight_strategies: list[Any] = [] 

133 

134 @classmethod 

135 def validate(cls): 

136 """Validate if plugin has a name.""" 

137 if not cls.name: 

138 raise AirflowPluginException("Your plugin needs a name.") 

139 

140 @classmethod 

141 def on_load(cls, *args, **kwargs): 

142 """ 

143 Execute when the plugin is loaded; This method is only called once during runtime. 

144 

145 :param args: If future arguments are passed in on call. 

146 :param kwargs: If future arguments are passed in on call. 

147 """ 

148 

149 

150def is_valid_plugin(plugin_obj) -> bool: 

151 """ 

152 Check whether a potential object is a subclass of the AirflowPlugin class. 

153 

154 :param plugin_obj: potential subclass of AirflowPlugin 

155 :return: Whether or not the obj is a valid subclass of 

156 AirflowPlugin 

157 """ 

158 if not inspect.isclass(plugin_obj): 

159 return False 

160 

161 # Temporarily here, we use a name base checking instead of issubclass() because the shared library 

162 # is accessed via different symlink paths in core (airflow._shared) and task sdk (airflow.sdk._shared). 

163 # Python treats these as different modules, so the AirflowPlugin class has different identities in each context. 

164 # Providers will typically inherit from SDK AirflowPlugin, so using issubclass() would fail when core tries 

165 # to validate those plugins and provider plugins won't work in airflow core. 

166 # For now, by validating by class name, we allow plugins defined with either 

167 # core's or SDK's AirflowPlugin to be loaded. 

168 is_airflow_plugin = any( 

169 base.__name__ == "AirflowPlugin" and "plugins_manager" in base.__module__ 

170 for base in plugin_obj.__mro__ 

171 ) 

172 

173 if is_airflow_plugin and plugin_obj.__name__ != "AirflowPlugin": 

174 plugin_obj.validate() 

175 return True 

176 return False 

177 

178 

179def _load_entrypoint_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]: 

180 """ 

181 Load and register plugins AirflowPlugin subclasses from the entrypoints. 

182 

183 The entry_point group should be 'airflow.plugins'. 

184 """ 

185 from ..module_loading import entry_points_with_dist 

186 

187 log.debug("Loading plugins from entrypoints") 

188 

189 plugins: list[AirflowPlugin] = [] 

190 import_errors: dict[str, str] = {} 

191 for entry_point, dist in entry_points_with_dist("airflow.plugins"): 

192 log.debug("Importing entry_point plugin %s", entry_point.name) 

193 try: 

194 plugin_class = entry_point.load() 

195 if not is_valid_plugin(plugin_class): 

196 continue 

197 

198 plugin_instance: AirflowPlugin = plugin_class() 

199 plugin_instance.source = EntryPointSource(entry_point, dist) 

200 plugins.append(plugin_instance) 

201 except Exception as e: 

202 log.exception("Failed to import plugin %s", entry_point.name) 

203 import_errors[entry_point.module] = str(e) 

204 return plugins, import_errors 

205 

206 

207def _load_plugins_from_plugin_directory( 

208 plugins_folder: str, 

209 load_examples: bool = False, 

210 example_plugins_module: str | None = None, 

211 ignore_file_syntax: str = "glob", 

212) -> tuple[list[AirflowPlugin], dict[str, str]]: 

213 """Load and register Airflow Plugins from plugins directory.""" 

214 from ..module_loading import find_path_from_directory 

215 

216 if not plugins_folder: 

217 raise ValueError("Plugins folder is not set") 

218 log.debug("Loading plugins from directory: %s", plugins_folder) 

219 files = find_path_from_directory(plugins_folder, ".airflowignore", ignore_file_syntax) 

220 plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)] 

221 

222 if load_examples: 

223 if not example_plugins_module: 

224 raise ValueError("example_plugins_module is required when load_examples is True") 

225 log.debug("Note: Loading plugins from examples as well: %s", plugins_folder) 

226 example_plugins = importlib.import_module(example_plugins_module) 

227 example_plugins_folder = next(iter(example_plugins.__path__)) 

228 example_files = find_path_from_directory(example_plugins_folder, ".airflowignore") 

229 plugin_search_locations.append((example_plugins.__name__, example_files)) 

230 

231 plugins: list[AirflowPlugin] = [] 

232 import_errors: dict[str, str] = {} 

233 for module_prefix, plugin_files in plugin_search_locations: 

234 for file_path in plugin_files: 

235 path = Path(file_path) 

236 if not path.is_file() or path.suffix != ".py": 

237 continue 

238 mod_name = f"{module_prefix}.{path.stem}" if module_prefix else path.stem 

239 

240 try: 

241 loader = importlib.machinery.SourceFileLoader(mod_name, file_path) 

242 spec = importlib.util.spec_from_loader(mod_name, loader) 

243 if not spec: 

244 log.error("Could not load spec for module %s at %s", mod_name, file_path) 

245 continue 

246 mod = importlib.util.module_from_spec(spec) 

247 sys.modules[spec.name] = mod 

248 loader.exec_module(mod) 

249 

250 for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)): 

251 plugin_instance: AirflowPlugin = mod_attr_value() 

252 plugin_instance.source = PluginsDirectorySource(file_path, plugins_folder) 

253 plugins.append(plugin_instance) 

254 except Exception as e: 

255 log.exception("Failed to import plugin %s", file_path) 

256 import_errors[file_path] = str(e) 

257 return plugins, import_errors 

258 

259 

260def make_module(name: str, objects: list[Any]) -> ModuleType | None: 

261 """Create new module.""" 

262 if not objects: 

263 return None 

264 log.debug("Creating module %s", name) 

265 name = name.lower() 

266 module = types.ModuleType(name) 

267 module._name = name.split(".")[-1] # type: ignore 

268 module._objects = objects # type: ignore 

269 module.__dict__.update((o.__name__, o) for o in objects) 

270 return module 

271 

272 

273def integrate_macros_plugins( 

274 target_macros_module: ModuleType, macros_module_name_prefix: str, plugins: list[AirflowPlugin] 

275) -> None: 

276 """ 

277 Register macros from plugins onto the target macros module. 

278 

279 For each plugin with macros, creates a submodule and attaches it to 

280 the target module so macros can be accessed in templates as 

281 ``{{ macros.plugin_name.macro_func() }}``. 

282 """ 

283 log.debug("Integrate Macros plugins") 

284 

285 for plugin in plugins: 

286 if plugin.name is None: 

287 raise AirflowPluginException("Invalid plugin name") 

288 

289 macros_module_instance = make_module(f"{macros_module_name_prefix}.{plugin.name}", plugin.macros) 

290 

291 if macros_module_instance: 

292 sys.modules[macros_module_instance.__name__] = macros_module_instance 

293 # Register the newly created module on the provided macros module 

294 # so it can be accessed when rendering templates. 

295 setattr(target_macros_module, plugin.name, macros_module_instance) 

296 

297 

298def integrate_listener_plugins(listener_manager: ListenerManager, plugins: list[AirflowPlugin]) -> None: 

299 """ 

300 Register listeners from plugins with the listener manager. 

301 

302 For each plugin with listeners, registers them with the provided 

303 ListenerManager. 

304 """ 

305 for plugin in plugins: 

306 if plugin.name is None: 

307 raise AirflowPluginException("Invalid plugin name") 

308 

309 for listener in plugin.listeners: 

310 listener_manager.add_listener(listener)