Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/plugins_manager.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

335 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Manages all plugins.""" 

19 

20from __future__ import annotations 

21 

22import importlib.machinery 

23import importlib.util 

24import inspect 

25import logging 

26import os 

27import sys 

28import types 

29from collections.abc import Iterable 

30from functools import cache 

31from pathlib import Path 

32from typing import TYPE_CHECKING, Any 

33 

34from airflow import settings 

35from airflow._shared.module_loading import import_string, qualname 

36from airflow.configuration import conf 

37from airflow.task.priority_strategy import ( 

38 PriorityWeightStrategy, 

39 airflow_priority_weight_strategies, 

40) 

41from airflow.utils.entry_points import entry_points_with_dist 

42from airflow.utils.file import find_path_from_directory 

43 

44if TYPE_CHECKING: 

45 from airflow.lineage.hook import HookLineageReader 

46 

47 if sys.version_info >= (3, 12): 

48 from importlib import metadata 

49 else: 

50 import importlib_metadata as metadata 

51 from collections.abc import Generator 

52 from types import ModuleType 

53 

54 from airflow.listeners.listener import ListenerManager 

55 from airflow.timetables.base import Timetable 

56 

57log = logging.getLogger(__name__) 

58 

59 

60class AirflowPluginSource: 

61 """Class used to define an AirflowPluginSource.""" 

62 

63 def __str__(self): 

64 raise NotImplementedError 

65 

66 def __html__(self): 

67 raise NotImplementedError 

68 

69 

70class PluginsDirectorySource(AirflowPluginSource): 

71 """Class used to define Plugins loaded from Plugins Directory.""" 

72 

73 def __init__(self, path): 

74 self.path = os.path.relpath(path, settings.PLUGINS_FOLDER) 

75 

76 def __str__(self): 

77 return f"$PLUGINS_FOLDER/{self.path}" 

78 

79 def __html__(self): 

80 return f"<em>$PLUGINS_FOLDER/</em>{self.path}" 

81 

82 

83class EntryPointSource(AirflowPluginSource): 

84 """Class used to define Plugins loaded from entrypoint.""" 

85 

86 def __init__(self, entrypoint: metadata.EntryPoint, dist: metadata.Distribution): 

87 self.dist = dist.metadata["Name"] # type: ignore[index] 

88 self.version = dist.version 

89 self.entrypoint = str(entrypoint) 

90 

91 def __str__(self): 

92 return f"{self.dist}=={self.version}: {self.entrypoint}" 

93 

94 def __html__(self): 

95 return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}" 

96 

97 

98class AirflowPluginException(Exception): 

99 """Exception when loading plugin.""" 

100 

101 

102class AirflowPlugin: 

103 """Class used to define AirflowPlugin.""" 

104 

105 name: str | None = None 

106 source: AirflowPluginSource | None = None 

107 macros: list[Any] = [] 

108 admin_views: list[Any] = [] 

109 flask_blueprints: list[Any] = [] 

110 fastapi_apps: list[Any] = [] 

111 fastapi_root_middlewares: list[Any] = [] 

112 external_views: list[Any] = [] 

113 react_apps: list[Any] = [] 

114 menu_links: list[Any] = [] 

115 appbuilder_views: list[Any] = [] 

116 appbuilder_menu_items: list[Any] = [] 

117 

118 # A list of global operator extra links that can redirect users to 

119 # external systems. These extra links will be available on the 

120 # task page in the form of buttons. 

121 # 

122 # Note: the global operator extra link can be overridden at each 

123 # operator level. 

124 global_operator_extra_links: list[Any] = [] 

125 

126 # A list of operator extra links to override or add operator links 

127 # to existing Airflow Operators. 

128 # These extra links will be available on the task page in form of 

129 # buttons. 

130 operator_extra_links: list[Any] = [] 

131 

132 # A list of timetable classes that can be used for DAG scheduling. 

133 timetables: list[type[Timetable]] = [] 

134 

135 # A list of listeners that can be used for tracking task and DAG states. 

136 listeners: list[ModuleType | object] = [] 

137 

138 # A list of hook lineage reader classes that can be used for reading lineage information from a hook. 

139 hook_lineage_readers: list[type[HookLineageReader]] = [] 

140 

141 # A list of priority weight strategy classes that can be used for calculating tasks weight priority. 

142 priority_weight_strategies: list[type[PriorityWeightStrategy]] = [] 

143 

144 @classmethod 

145 def validate(cls): 

146 """Validate if plugin has a name.""" 

147 if not cls.name: 

148 raise AirflowPluginException("Your plugin needs a name.") 

149 

150 @classmethod 

151 def on_load(cls, *args, **kwargs): 

152 """ 

153 Execute when the plugin is loaded; This method is only called once during runtime. 

154 

155 :param args: If future arguments are passed in on call. 

156 :param kwargs: If future arguments are passed in on call. 

157 """ 

158 

159 

160def is_valid_plugin(plugin_obj) -> bool: 

161 """ 

162 Check whether a potential object is a subclass of the AirflowPlugin class. 

163 

164 :param plugin_obj: potential subclass of AirflowPlugin 

165 :return: Whether or not the obj is a valid subclass of 

166 AirflowPlugin 

167 """ 

168 if ( 

169 inspect.isclass(plugin_obj) 

170 and issubclass(plugin_obj, AirflowPlugin) 

171 and (plugin_obj is not AirflowPlugin) 

172 ): 

173 plugin_obj.validate() 

174 return True 

175 return False 

176 

177 

178def _load_entrypoint_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]: 

179 """ 

180 Load and register plugins AirflowPlugin subclasses from the entrypoints. 

181 

182 The entry_point group should be 'airflow.plugins'. 

183 """ 

184 log.debug("Loading plugins from entrypoints") 

185 

186 plugins: list[AirflowPlugin] = [] 

187 import_errors: dict[str, str] = {} 

188 for entry_point, dist in entry_points_with_dist("airflow.plugins"): 

189 log.debug("Importing entry_point plugin %s", entry_point.name) 

190 try: 

191 plugin_class = entry_point.load() 

192 if not is_valid_plugin(plugin_class): 

193 continue 

194 

195 plugin_instance: AirflowPlugin = plugin_class() 

196 plugin_instance.source = EntryPointSource(entry_point, dist) 

197 plugins.append(plugin_instance) 

198 except Exception as e: 

199 log.exception("Failed to import plugin %s", entry_point.name) 

200 import_errors[entry_point.module] = str(e) 

201 return plugins, import_errors 

202 

203 

204def _load_plugins_from_plugin_directory() -> tuple[list[AirflowPlugin], dict[str, str]]: 

205 """Load and register Airflow Plugins from plugins directory.""" 

206 if settings.PLUGINS_FOLDER is None: 

207 raise ValueError("Plugins folder is not set") 

208 log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER) 

209 files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore") 

210 plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)] 

211 

212 if conf.getboolean("core", "LOAD_EXAMPLES"): 

213 log.debug("Note: Loading plugins from examples as well: %s", settings.PLUGINS_FOLDER) 

214 from airflow.example_dags import plugins as example_plugins 

215 

216 example_plugins_folder = next(iter(example_plugins.__path__)) 

217 example_files = find_path_from_directory(example_plugins_folder, ".airflowignore") 

218 plugin_search_locations.append((example_plugins.__name__, example_files)) 

219 

220 plugins: list[AirflowPlugin] = [] 

221 import_errors: dict[str, str] = {} 

222 for module_prefix, plugin_files in plugin_search_locations: 

223 for file_path in plugin_files: 

224 path = Path(file_path) 

225 if not path.is_file() or path.suffix != ".py": 

226 continue 

227 mod_name = f"{module_prefix}.{path.stem}" if module_prefix else path.stem 

228 

229 try: 

230 loader = importlib.machinery.SourceFileLoader(mod_name, file_path) 

231 spec = importlib.util.spec_from_loader(mod_name, loader) 

232 if not spec: 

233 log.error("Could not load spec for module %s at %s", mod_name, file_path) 

234 continue 

235 mod = importlib.util.module_from_spec(spec) 

236 sys.modules[spec.name] = mod 

237 loader.exec_module(mod) 

238 

239 for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)): 

240 plugin_instance: AirflowPlugin = mod_attr_value() 

241 plugin_instance.source = PluginsDirectorySource(file_path) 

242 plugins.append(plugin_instance) 

243 except Exception as e: 

244 log.exception("Failed to import plugin %s", file_path) 

245 import_errors[file_path] = str(e) 

246 return plugins, import_errors 

247 

248 

249def _load_providers_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]: 

250 from airflow.providers_manager import ProvidersManager 

251 

252 log.debug("Loading plugins from providers") 

253 providers_manager = ProvidersManager() 

254 providers_manager.initialize_providers_plugins() 

255 

256 plugins: list[AirflowPlugin] = [] 

257 import_errors: dict[str, str] = {} 

258 for plugin in providers_manager.plugins: 

259 log.debug("Importing plugin %s from class %s", plugin.name, plugin.plugin_class) 

260 

261 try: 

262 plugin_instance = import_string(plugin.plugin_class) 

263 if is_valid_plugin(plugin_instance): 

264 plugins.append(plugin_instance) 

265 else: 

266 log.warning("Plugin %s is not a valid plugin", plugin.name) 

267 except ImportError: 

268 log.exception("Failed to load plugin %s from class name %s", plugin.name, plugin.plugin_class) 

269 return plugins, import_errors 

270 

271 

272def make_module(name: str, objects: list[Any]) -> ModuleType | None: 

273 """Create new module.""" 

274 if not objects: 

275 return None 

276 log.debug("Creating module %s", name) 

277 name = name.lower() 

278 module = types.ModuleType(name) 

279 module._name = name.split(".")[-1] # type: ignore 

280 module._objects = objects # type: ignore 

281 module.__dict__.update((o.__name__, o) for o in objects) 

282 return module 

283 

284 

285def ensure_plugins_loaded() -> None: 

286 """ 

287 Load plugins from plugins directory and entrypoints. 

288 

289 Plugins are only loaded if they have not been previously loaded. 

290 """ 

291 _get_plugins() 

292 

293 

294@cache 

295def _get_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]: 

296 """ 

297 Load plugins from plugins directory and entrypoints. 

298 

299 Plugins are only loaded if they have not been previously loaded. 

300 """ 

301 from airflow.observability.stats import Stats 

302 

303 if not settings.PLUGINS_FOLDER: 

304 raise ValueError("Plugins folder is not set") 

305 

306 log.debug("Loading plugins") 

307 

308 plugins: list[AirflowPlugin] = [] 

309 import_errors: dict[str, str] = {} 

310 loaded_plugins: set[str | None] = set() 

311 

312 def __register_plugins(plugin_instances: list[AirflowPlugin], errors: dict[str, str]) -> None: 

313 for plugin_instance in plugin_instances: 

314 if plugin_instance.name in loaded_plugins: 

315 return 

316 

317 loaded_plugins.add(plugin_instance.name) 

318 try: 

319 plugin_instance.on_load() 

320 plugins.append(plugin_instance) 

321 except Exception as e: 

322 log.exception("Failed to load plugin %s", plugin_instance.name) 

323 name = str(plugin_instance.source) if plugin_instance.source else plugin_instance.name or "" 

324 import_errors[name] = str(e) 

325 import_errors.update(errors) 

326 

327 with Stats.timer() as timer: 

328 __register_plugins(*_load_plugins_from_plugin_directory()) 

329 __register_plugins(*_load_entrypoint_plugins()) 

330 

331 if not settings.LAZY_LOAD_PROVIDERS: 

332 __register_plugins(*_load_providers_plugins()) 

333 

334 log.debug("Loading %d plugin(s) took %.2f seconds", len(plugins), timer.duration) 

335 return plugins, import_errors 

336 

337 

338@cache 

339def _get_ui_plugins() -> tuple[list[Any], list[Any]]: 

340 """Collect extension points for the UI.""" 

341 log.debug("Initialize UI plugin") 

342 

343 seen_url_routes: dict[str, str | None] = {} 

344 

345 external_views: list[Any] = [] 

346 react_apps: list[Any] = [] 

347 for plugin in _get_plugins()[0]: 

348 external_views_to_remove = [] 

349 react_apps_to_remove = [] 

350 for external_view in plugin.external_views: 

351 if not isinstance(external_view, dict): 

352 log.warning( 

353 "Plugin '%s' has an external view that is not a dictionary. The view will not be loaded.", 

354 plugin.name, 

355 ) 

356 external_views_to_remove.append(external_view) 

357 continue 

358 url_route = external_view.get("url_route") 

359 if url_route is None: 

360 continue 

361 if url_route in seen_url_routes: 

362 log.warning( 

363 "Plugin '%s' has an external view with an URL route '%s' " 

364 "that conflicts with another plugin '%s'. The view will not be loaded.", 

365 plugin.name, 

366 url_route, 

367 seen_url_routes[url_route], 

368 ) 

369 external_views_to_remove.append(external_view) 

370 continue 

371 external_views.append(external_view) 

372 seen_url_routes[url_route] = plugin.name 

373 

374 for react_app in plugin.react_apps: 

375 if not isinstance(react_app, dict): 

376 log.warning( 

377 "Plugin '%s' has a React App that is not a dictionary. The React App will not be loaded.", 

378 plugin.name, 

379 ) 

380 react_apps_to_remove.append(react_app) 

381 continue 

382 url_route = react_app.get("url_route") 

383 if url_route is None: 

384 continue 

385 if url_route in seen_url_routes: 

386 log.warning( 

387 "Plugin '%s' has a React App with an URL route '%s' " 

388 "that conflicts with another plugin '%s'. The React App will not be loaded.", 

389 plugin.name, 

390 url_route, 

391 seen_url_routes[url_route], 

392 ) 

393 react_apps_to_remove.append(react_app) 

394 continue 

395 react_apps.append(react_app) 

396 seen_url_routes[url_route] = plugin.name 

397 

398 for item in external_views_to_remove: 

399 plugin.external_views.remove(item) 

400 for item in react_apps_to_remove: 

401 plugin.react_apps.remove(item) 

402 return external_views, react_apps 

403 

404 

405@cache 

406def get_flask_plugins() -> tuple[list[Any], list[Any], list[Any]]: 

407 """Collect and get flask extension points for WEB UI (legacy).""" 

408 log.debug("Initialize legacy Web UI plugin") 

409 

410 flask_appbuilder_views: list[Any] = [] 

411 flask_appbuilder_menu_links: list[Any] = [] 

412 flask_blueprints: list[Any] = [] 

413 for plugin in _get_plugins()[0]: 

414 flask_appbuilder_views.extend(plugin.appbuilder_views) 

415 flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items) 

416 flask_blueprints.extend([{"name": plugin.name, "blueprint": bp} for bp in plugin.flask_blueprints]) 

417 

418 if (plugin.admin_views and not plugin.appbuilder_views) or ( 

419 plugin.menu_links and not plugin.appbuilder_menu_items 

420 ): 

421 log.warning( 

422 "Plugin '%s' may not be compatible with the current Airflow version. " 

423 "Please contact the author of the plugin.", 

424 plugin.name, 

425 ) 

426 return flask_blueprints, flask_appbuilder_views, flask_appbuilder_menu_links 

427 

428 

429@cache 

430def get_fastapi_plugins() -> tuple[list[Any], list[Any]]: 

431 """Collect extension points for the API.""" 

432 log.debug("Initialize FastAPI plugins") 

433 

434 fastapi_apps: list[Any] = [] 

435 fastapi_root_middlewares: list[Any] = [] 

436 for plugin in _get_plugins()[0]: 

437 fastapi_apps.extend(plugin.fastapi_apps) 

438 fastapi_root_middlewares.extend(plugin.fastapi_root_middlewares) 

439 return fastapi_apps, fastapi_root_middlewares 

440 

441 

442@cache 

443def _get_extra_operators_links_plugins() -> tuple[list[Any], list[Any]]: 

444 """Create and get modules for loaded extension from extra operators links plugins.""" 

445 log.debug("Initialize extra operators links plugins") 

446 

447 global_operator_extra_links: list[Any] = [] 

448 operator_extra_links: list[Any] = [] 

449 for plugin in _get_plugins()[0]: 

450 global_operator_extra_links.extend(plugin.global_operator_extra_links) 

451 operator_extra_links.extend(list(plugin.operator_extra_links)) 

452 return global_operator_extra_links, operator_extra_links 

453 

454 

455def get_global_operator_extra_links() -> list[Any]: 

456 """Get global operator extra links registered by plugins.""" 

457 return _get_extra_operators_links_plugins()[0] 

458 

459 

460def get_operator_extra_links() -> list[Any]: 

461 """Get operator extra links registered by plugins.""" 

462 return _get_extra_operators_links_plugins()[1] 

463 

464 

465@cache 

466def get_timetables_plugins() -> dict[str, type[Timetable]]: 

467 """Collect and get timetable classes registered by plugins.""" 

468 log.debug("Initialize extra timetables plugins") 

469 

470 return { 

471 qualname(timetable_class): timetable_class 

472 for plugin in _get_plugins()[0] 

473 for timetable_class in plugin.timetables 

474 } 

475 

476 

477@cache 

478def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]: 

479 """Collect and get hook lineage reader classes registered by plugins.""" 

480 log.debug("Initialize hook lineage readers plugins") 

481 result: list[type[HookLineageReader]] = [] 

482 

483 for plugin in _get_plugins()[0]: 

484 result.extend(plugin.hook_lineage_readers) 

485 return result 

486 

487 

488@cache 

489def integrate_macros_plugins() -> None: 

490 """Integrates macro plugins.""" 

491 from airflow.sdk.execution_time import macros 

492 

493 log.debug("Integrate Macros plugins") 

494 

495 for plugin in _get_plugins()[0]: 

496 if plugin.name is None: 

497 raise AirflowPluginException("Invalid plugin name") 

498 

499 macros_module = make_module(f"airflow.sdk.execution_time.macros.{plugin.name}", plugin.macros) 

500 

501 if macros_module: 

502 sys.modules[macros_module.__name__] = macros_module 

503 # Register the newly created module on airflow.macros such that it 

504 # can be accessed when rendering templates. 

505 setattr(macros, plugin.name, macros_module) 

506 

507 

508def integrate_listener_plugins(listener_manager: ListenerManager) -> None: 

509 """Add listeners from plugins.""" 

510 for plugin in _get_plugins()[0]: 

511 if plugin.name is None: 

512 raise AirflowPluginException("Invalid plugin name") 

513 

514 for listener in plugin.listeners: 

515 listener_manager.add_listener(listener) 

516 

517 

518def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str, Any]]: 

519 """ 

520 Dump plugins attributes. 

521 

522 :param attrs_to_dump: A list of plugin attributes to dump 

523 """ 

524 get_flask_plugins() 

525 get_fastapi_plugins() 

526 get_global_operator_extra_links() 

527 get_operator_extra_links() 

528 _get_ui_plugins() 

529 if not attrs_to_dump: 

530 attrs_to_dump = { 

531 "macros", 

532 "admin_views", 

533 "flask_blueprints", 

534 "fastapi_apps", 

535 "fastapi_root_middlewares", 

536 "external_views", 

537 "react_apps", 

538 "menu_links", 

539 "appbuilder_views", 

540 "appbuilder_menu_items", 

541 "global_operator_extra_links", 

542 "operator_extra_links", 

543 "source", 

544 "timetables", 

545 "listeners", 

546 "priority_weight_strategies", 

547 } 

548 plugins_info = [] 

549 for plugin in _get_plugins()[0]: 

550 info: dict[str, Any] = {"name": plugin.name} 

551 for attr in attrs_to_dump: 

552 if attr in ("global_operator_extra_links", "operator_extra_links"): 

553 info[attr] = [f"<{qualname(d.__class__)} object>" for d in getattr(plugin, attr)] 

554 elif attr in ("macros", "timetables", "priority_weight_strategies"): 

555 info[attr] = [qualname(d) for d in getattr(plugin, attr)] 

556 elif attr == "listeners": 

557 # listeners may be modules or class instances 

558 info[attr] = [d.__name__ if inspect.ismodule(d) else qualname(d) for d in plugin.listeners] 

559 elif attr == "appbuilder_views": 

560 info[attr] = [ 

561 {**d, "view": qualname(d["view"].__class__) if "view" in d else None} 

562 for d in plugin.appbuilder_views 

563 ] 

564 elif attr == "flask_blueprints": 

565 info[attr] = [ 

566 f"<{qualname(d.__class__)}: name={d.name!r} import_name={d.import_name!r}>" 

567 for d in plugin.flask_blueprints 

568 ] 

569 elif attr == "fastapi_apps": 

570 info[attr] = [ 

571 {**d, "app": qualname(d["app"].__class__) if "app" in d else None} 

572 for d in plugin.fastapi_apps 

573 ] 

574 elif attr == "fastapi_root_middlewares": 

575 # remove args and kwargs from plugin info to hide potentially sensitive info. 

576 info[attr] = [ 

577 { 

578 k: (v if k != "middleware" else qualname(middleware_dict["middleware"])) 

579 for k, v in middleware_dict.items() 

580 if k not in ("args", "kwargs") 

581 } 

582 for middleware_dict in plugin.fastapi_root_middlewares 

583 ] 

584 else: 

585 info[attr] = getattr(plugin, attr) 

586 plugins_info.append(info) 

587 return plugins_info 

588 

589 

590@cache 

591def get_priority_weight_strategy_plugins() -> dict[str, type[PriorityWeightStrategy]]: 

592 """Collect and get priority weight strategy classes registered by plugins.""" 

593 log.debug("Initialize extra priority weight strategy plugins") 

594 

595 plugins_priority_weight_strategy_classes = { 

596 qualname(priority_weight_strategy_class): priority_weight_strategy_class 

597 for plugin in _get_plugins()[0] 

598 for priority_weight_strategy_class in plugin.priority_weight_strategies 

599 } 

600 return { 

601 **airflow_priority_weight_strategies, 

602 **plugins_priority_weight_strategy_classes, 

603 } 

604 

605 

606def get_import_errors() -> dict[str, str]: 

607 """Get import errors encountered during plugin loading.""" 

608 return _get_plugins()[1]