Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/plugins_manager.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

303 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Manages all plugins.""" 

19 

20from __future__ import annotations 

21 

22import importlib 

23import importlib.machinery 

24import importlib.util 

25import inspect 

26import logging 

27import os 

28import sys 

29import types 

30from pathlib import Path 

31from typing import TYPE_CHECKING, Any, Iterable 

32 

33from airflow import settings 

34from airflow.task.priority_strategy import ( 

35 PriorityWeightStrategy, 

36 airflow_priority_weight_strategies, 

37) 

38from airflow.utils.entry_points import entry_points_with_dist 

39from airflow.utils.file import find_path_from_directory 

40from airflow.utils.module_loading import import_string, qualname 

41 

42if TYPE_CHECKING: 

43 try: 

44 import importlib_metadata as metadata 

45 except ImportError: 

46 from importlib import metadata # type: ignore[no-redef] 

47 from types import ModuleType 

48 

49 from airflow.hooks.base import BaseHook 

50 from airflow.listeners.listener import ListenerManager 

51 from airflow.timetables.base import Timetable 

52 

53log = logging.getLogger(__name__) 

54 

55import_errors: dict[str, str] = {} 

56 

57plugins: list[AirflowPlugin] | None = None 

58loaded_plugins: set[str] = set() 

59 

60# Plugin components to integrate as modules 

61registered_hooks: list[BaseHook] | None = None 

62macros_modules: list[Any] | None = None 

63executors_modules: list[Any] | None = None 

64 

65# Plugin components to integrate directly 

66admin_views: list[Any] | None = None 

67flask_blueprints: list[Any] | None = None 

68menu_links: list[Any] | None = None 

69flask_appbuilder_views: list[Any] | None = None 

70flask_appbuilder_menu_links: list[Any] | None = None 

71global_operator_extra_links: list[Any] | None = None 

72operator_extra_links: list[Any] | None = None 

73registered_operator_link_classes: dict[str, type] | None = None 

74registered_ti_dep_classes: dict[str, type] | None = None 

75timetable_classes: dict[str, type[Timetable]] | None = None 

76priority_weight_strategy_classes: dict[str, type[PriorityWeightStrategy]] | None = None 

77""" 

78Mapping of class names to class of OperatorLinks registered by plugins. 

79 

80Used by the DAG serialization code to only allow specific classes to be created 

81during deserialization 

82""" 

83PLUGINS_ATTRIBUTES_TO_DUMP = { 

84 "hooks", 

85 "executors", 

86 "macros", 

87 "admin_views", 

88 "flask_blueprints", 

89 "menu_links", 

90 "appbuilder_views", 

91 "appbuilder_menu_items", 

92 "global_operator_extra_links", 

93 "operator_extra_links", 

94 "source", 

95 "ti_deps", 

96 "timetables", 

97 "listeners", 

98 "priority_weight_strategies", 

99} 

100 

101 

102class AirflowPluginSource: 

103 """Class used to define an AirflowPluginSource.""" 

104 

105 def __str__(self): 

106 raise NotImplementedError 

107 

108 def __html__(self): 

109 raise NotImplementedError 

110 

111 

112class PluginsDirectorySource(AirflowPluginSource): 

113 """Class used to define Plugins loaded from Plugins Directory.""" 

114 

115 def __init__(self, path): 

116 self.path = os.path.relpath(path, settings.PLUGINS_FOLDER) 

117 

118 def __str__(self): 

119 return f"$PLUGINS_FOLDER/{self.path}" 

120 

121 def __html__(self): 

122 return f"<em>$PLUGINS_FOLDER/</em>{self.path}" 

123 

124 

125class EntryPointSource(AirflowPluginSource): 

126 """Class used to define Plugins loaded from entrypoint.""" 

127 

128 def __init__(self, entrypoint: metadata.EntryPoint, dist: metadata.Distribution): 

129 self.dist = dist.metadata["Name"] 

130 self.version = dist.version 

131 self.entrypoint = str(entrypoint) 

132 

133 def __str__(self): 

134 return f"{self.dist}=={self.version}: {self.entrypoint}" 

135 

136 def __html__(self): 

137 return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}" 

138 

139 

140class AirflowPluginException(Exception): 

141 """Exception when loading plugin.""" 

142 

143 

144class AirflowPlugin: 

145 """Class used to define AirflowPlugin.""" 

146 

147 name: str | None = None 

148 source: AirflowPluginSource | None = None 

149 hooks: list[Any] = [] 

150 executors: list[Any] = [] 

151 macros: list[Any] = [] 

152 admin_views: list[Any] = [] 

153 flask_blueprints: list[Any] = [] 

154 menu_links: list[Any] = [] 

155 appbuilder_views: list[Any] = [] 

156 appbuilder_menu_items: list[Any] = [] 

157 

158 # A list of global operator extra links that can redirect users to 

159 # external systems. These extra links will be available on the 

160 # task page in the form of buttons. 

161 # 

162 # Note: the global operator extra link can be overridden at each 

163 # operator level. 

164 global_operator_extra_links: list[Any] = [] 

165 

166 # A list of operator extra links to override or add operator links 

167 # to existing Airflow Operators. 

168 # These extra links will be available on the task page in form of 

169 # buttons. 

170 operator_extra_links: list[Any] = [] 

171 

172 ti_deps: list[Any] = [] 

173 

174 # A list of timetable classes that can be used for DAG scheduling. 

175 timetables: list[type[Timetable]] = [] 

176 

177 listeners: list[ModuleType | object] = [] 

178 

179 # A list of priority weight strategy classes that can be used for calculating tasks weight priority. 

180 priority_weight_strategies: list[type[PriorityWeightStrategy]] = [] 

181 

182 @classmethod 

183 def validate(cls): 

184 """Validate if plugin has a name.""" 

185 if not cls.name: 

186 raise AirflowPluginException("Your plugin needs a name.") 

187 

188 @classmethod 

189 def on_load(cls, *args, **kwargs): 

190 """ 

191 Execute when the plugin is loaded; This method is only called once during runtime. 

192 

193 :param args: If future arguments are passed in on call. 

194 :param kwargs: If future arguments are passed in on call. 

195 """ 

196 

197 

198def is_valid_plugin(plugin_obj): 

199 """ 

200 Check whether a potential object is a subclass of the AirflowPlugin class. 

201 

202 :param plugin_obj: potential subclass of AirflowPlugin 

203 :return: Whether or not the obj is a valid subclass of 

204 AirflowPlugin 

205 """ 

206 global plugins 

207 

208 if ( 

209 inspect.isclass(plugin_obj) 

210 and issubclass(plugin_obj, AirflowPlugin) 

211 and (plugin_obj is not AirflowPlugin) 

212 ): 

213 plugin_obj.validate() 

214 return plugin_obj not in plugins 

215 return False 

216 

217 

218def register_plugin(plugin_instance): 

219 """ 

220 Start plugin load and register it after success initialization. 

221 

222 If plugin is already registered, do nothing. 

223 

224 :param plugin_instance: subclass of AirflowPlugin 

225 """ 

226 global plugins 

227 

228 if plugin_instance.name in loaded_plugins: 

229 return 

230 

231 loaded_plugins.add(plugin_instance.name) 

232 plugin_instance.on_load() 

233 plugins.append(plugin_instance) 

234 

235 

236def load_entrypoint_plugins(): 

237 """ 

238 Load and register plugins AirflowPlugin subclasses from the entrypoints. 

239 

240 The entry_point group should be 'airflow.plugins'. 

241 """ 

242 global import_errors 

243 

244 log.debug("Loading plugins from entrypoints") 

245 

246 for entry_point, dist in entry_points_with_dist("airflow.plugins"): 

247 log.debug("Importing entry_point plugin %s", entry_point.name) 

248 try: 

249 plugin_class = entry_point.load() 

250 if not is_valid_plugin(plugin_class): 

251 continue 

252 

253 plugin_instance = plugin_class() 

254 plugin_instance.source = EntryPointSource(entry_point, dist) 

255 register_plugin(plugin_instance) 

256 except Exception as e: 

257 log.exception("Failed to import plugin %s", entry_point.name) 

258 import_errors[entry_point.module] = str(e) 

259 

260 

261def load_plugins_from_plugin_directory(): 

262 """Load and register Airflow Plugins from plugins directory.""" 

263 global import_errors 

264 log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER) 

265 

266 for file_path in find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore"): 

267 path = Path(file_path) 

268 if not path.is_file() or path.suffix != ".py": 

269 continue 

270 mod_name = path.stem 

271 

272 try: 

273 loader = importlib.machinery.SourceFileLoader(mod_name, file_path) 

274 spec = importlib.util.spec_from_loader(mod_name, loader) 

275 mod = importlib.util.module_from_spec(spec) 

276 sys.modules[spec.name] = mod 

277 loader.exec_module(mod) 

278 log.debug("Importing plugin module %s", file_path) 

279 

280 for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)): 

281 plugin_instance = mod_attr_value() 

282 plugin_instance.source = PluginsDirectorySource(file_path) 

283 register_plugin(plugin_instance) 

284 except Exception as e: 

285 log.exception("Failed to import plugin %s", file_path) 

286 import_errors[file_path] = str(e) 

287 

288 

289def load_providers_plugins(): 

290 from airflow.providers_manager import ProvidersManager 

291 

292 log.debug("Loading plugins from providers") 

293 providers_manager = ProvidersManager() 

294 providers_manager.initialize_providers_plugins() 

295 for plugin in providers_manager.plugins: 

296 log.debug("Importing plugin %s from class %s", plugin.name, plugin.plugin_class) 

297 

298 try: 

299 plugin_instance = import_string(plugin.plugin_class) 

300 if is_valid_plugin(plugin_instance): 

301 register_plugin(plugin_instance) 

302 else: 

303 log.warning("Plugin %s is not a valid plugin", plugin.name) 

304 except ImportError: 

305 log.exception("Failed to load plugin %s from class name %s", plugin.name, plugin.plugin_class) 

306 

307 

308def make_module(name: str, objects: list[Any]): 

309 """Create new module.""" 

310 if not objects: 

311 return None 

312 log.debug("Creating module %s", name) 

313 name = name.lower() 

314 module = types.ModuleType(name) 

315 module._name = name.split(".")[-1] # type: ignore 

316 module._objects = objects # type: ignore 

317 module.__dict__.update((o.__name__, o) for o in objects) 

318 return module 

319 

320 

321def ensure_plugins_loaded(): 

322 """ 

323 Load plugins from plugins directory and entrypoints. 

324 

325 Plugins are only loaded if they have not been previously loaded. 

326 """ 

327 from airflow.stats import Stats 

328 

329 global plugins, registered_hooks 

330 

331 if plugins is not None: 

332 log.debug("Plugins are already loaded. Skipping.") 

333 return 

334 

335 if not settings.PLUGINS_FOLDER: 

336 raise ValueError("Plugins folder is not set") 

337 

338 log.debug("Loading plugins") 

339 

340 with Stats.timer() as timer: 

341 plugins = [] 

342 registered_hooks = [] 

343 

344 load_plugins_from_plugin_directory() 

345 load_entrypoint_plugins() 

346 

347 if not settings.LAZY_LOAD_PROVIDERS: 

348 load_providers_plugins() 

349 

350 # We don't do anything with these for now, but we want to keep track of 

351 # them so we can integrate them in to the UI's Connection screens 

352 for plugin in plugins: 

353 registered_hooks.extend(plugin.hooks) 

354 

355 if plugins: 

356 log.debug("Loading %d plugin(s) took %.2f seconds", len(plugins), timer.duration) 

357 

358 

359def initialize_web_ui_plugins(): 

360 """Collect extension points for WEB UI.""" 

361 global plugins 

362 global flask_blueprints 

363 global flask_appbuilder_views 

364 global flask_appbuilder_menu_links 

365 

366 if ( 

367 flask_blueprints is not None 

368 and flask_appbuilder_views is not None 

369 and flask_appbuilder_menu_links is not None 

370 ): 

371 return 

372 

373 ensure_plugins_loaded() 

374 

375 if plugins is None: 

376 raise AirflowPluginException("Can't load plugins.") 

377 

378 log.debug("Initialize Web UI plugin") 

379 

380 flask_blueprints = [] 

381 flask_appbuilder_views = [] 

382 flask_appbuilder_menu_links = [] 

383 

384 for plugin in plugins: 

385 flask_appbuilder_views.extend(plugin.appbuilder_views) 

386 flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items) 

387 flask_blueprints.extend([{"name": plugin.name, "blueprint": bp} for bp in plugin.flask_blueprints]) 

388 

389 if (plugin.admin_views and not plugin.appbuilder_views) or ( 

390 plugin.menu_links and not plugin.appbuilder_menu_items 

391 ): 

392 log.warning( 

393 "Plugin '%s' may not be compatible with the current Airflow version. " 

394 "Please contact the author of the plugin.", 

395 plugin.name, 

396 ) 

397 

398 

399def initialize_ti_deps_plugins(): 

400 """Create modules for loaded extension from custom task instance dependency rule plugins.""" 

401 global registered_ti_dep_classes 

402 if registered_ti_dep_classes is not None: 

403 return 

404 

405 ensure_plugins_loaded() 

406 

407 if plugins is None: 

408 raise AirflowPluginException("Can't load plugins.") 

409 

410 log.debug("Initialize custom taskinstance deps plugins") 

411 

412 registered_ti_dep_classes = {} 

413 

414 for plugin in plugins: 

415 registered_ti_dep_classes.update( 

416 {qualname(ti_dep.__class__): ti_dep.__class__ for ti_dep in plugin.ti_deps} 

417 ) 

418 

419 

420def initialize_extra_operators_links_plugins(): 

421 """Create modules for loaded extension from extra operators links plugins.""" 

422 global global_operator_extra_links 

423 global operator_extra_links 

424 global registered_operator_link_classes 

425 

426 if ( 

427 global_operator_extra_links is not None 

428 and operator_extra_links is not None 

429 and registered_operator_link_classes is not None 

430 ): 

431 return 

432 

433 ensure_plugins_loaded() 

434 

435 if plugins is None: 

436 raise AirflowPluginException("Can't load plugins.") 

437 

438 log.debug("Initialize extra operators links plugins") 

439 

440 global_operator_extra_links = [] 

441 operator_extra_links = [] 

442 registered_operator_link_classes = {} 

443 

444 for plugin in plugins: 

445 global_operator_extra_links.extend(plugin.global_operator_extra_links) 

446 operator_extra_links.extend(list(plugin.operator_extra_links)) 

447 

448 registered_operator_link_classes.update( 

449 {qualname(link.__class__): link.__class__ for link in plugin.operator_extra_links} 

450 ) 

451 

452 

453def initialize_timetables_plugins(): 

454 """Collect timetable classes registered by plugins.""" 

455 global timetable_classes 

456 

457 if timetable_classes is not None: 

458 return 

459 

460 ensure_plugins_loaded() 

461 

462 if plugins is None: 

463 raise AirflowPluginException("Can't load plugins.") 

464 

465 log.debug("Initialize extra timetables plugins") 

466 

467 timetable_classes = { 

468 qualname(timetable_class): timetable_class 

469 for plugin in plugins 

470 for timetable_class in plugin.timetables 

471 } 

472 

473 

474def integrate_executor_plugins() -> None: 

475 """Integrate executor plugins to the context.""" 

476 global plugins 

477 global executors_modules 

478 

479 if executors_modules is not None: 

480 return 

481 

482 ensure_plugins_loaded() 

483 

484 if plugins is None: 

485 raise AirflowPluginException("Can't load plugins.") 

486 

487 log.debug("Integrate executor plugins") 

488 

489 executors_modules = [] 

490 for plugin in plugins: 

491 if plugin.name is None: 

492 raise AirflowPluginException("Invalid plugin name") 

493 plugin_name: str = plugin.name 

494 

495 executors_module = make_module("airflow.executors." + plugin_name, plugin.executors) 

496 if executors_module: 

497 executors_modules.append(executors_module) 

498 sys.modules[executors_module.__name__] = executors_module 

499 

500 

501def integrate_macros_plugins() -> None: 

502 """Integrates macro plugins.""" 

503 global plugins 

504 global macros_modules 

505 

506 from airflow import macros 

507 

508 if macros_modules is not None: 

509 return 

510 

511 ensure_plugins_loaded() 

512 

513 if plugins is None: 

514 raise AirflowPluginException("Can't load plugins.") 

515 

516 log.debug("Integrate DAG plugins") 

517 

518 macros_modules = [] 

519 

520 for plugin in plugins: 

521 if plugin.name is None: 

522 raise AirflowPluginException("Invalid plugin name") 

523 

524 macros_module = make_module(f"airflow.macros.{plugin.name}", plugin.macros) 

525 

526 if macros_module: 

527 macros_modules.append(macros_module) 

528 sys.modules[macros_module.__name__] = macros_module 

529 # Register the newly created module on airflow.macros such that it 

530 # can be accessed when rendering templates. 

531 setattr(macros, plugin.name, macros_module) 

532 

533 

534def integrate_listener_plugins(listener_manager: ListenerManager) -> None: 

535 """Add listeners from plugins.""" 

536 global plugins 

537 

538 ensure_plugins_loaded() 

539 

540 if plugins: 

541 for plugin in plugins: 

542 if plugin.name is None: 

543 raise AirflowPluginException("Invalid plugin name") 

544 

545 for listener in plugin.listeners: 

546 listener_manager.add_listener(listener) 

547 

548 

549def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str, Any]]: 

550 """ 

551 Dump plugins attributes. 

552 

553 :param attrs_to_dump: A list of plugin attributes to dump 

554 """ 

555 ensure_plugins_loaded() 

556 integrate_executor_plugins() 

557 integrate_macros_plugins() 

558 initialize_web_ui_plugins() 

559 initialize_extra_operators_links_plugins() 

560 if not attrs_to_dump: 

561 attrs_to_dump = PLUGINS_ATTRIBUTES_TO_DUMP 

562 plugins_info = [] 

563 if plugins: 

564 for plugin in plugins: 

565 info: dict[str, Any] = {"name": plugin.name} 

566 for attr in attrs_to_dump: 

567 if attr in ("global_operator_extra_links", "operator_extra_links"): 

568 info[attr] = [f"<{qualname(d.__class__)} object>" for d in getattr(plugin, attr)] 

569 elif attr in ("macros", "timetables", "hooks", "executors", "priority_weight_strategies"): 

570 info[attr] = [qualname(d) for d in getattr(plugin, attr)] 

571 elif attr == "listeners": 

572 # listeners may be modules or class instances 

573 info[attr] = [ 

574 d.__name__ if inspect.ismodule(d) else qualname(d) for d in getattr(plugin, attr) 

575 ] 

576 elif attr == "appbuilder_views": 

577 info[attr] = [ 

578 {**d, "view": qualname(d["view"].__class__) if "view" in d else None} 

579 for d in getattr(plugin, attr) 

580 ] 

581 elif attr == "flask_blueprints": 

582 info[attr] = [ 

583 f"<{qualname(d.__class__)}: name={d.name!r} import_name={d.import_name!r}>" 

584 for d in getattr(plugin, attr) 

585 ] 

586 else: 

587 info[attr] = getattr(plugin, attr) 

588 plugins_info.append(info) 

589 return plugins_info 

590 

591 

592def initialize_priority_weight_strategy_plugins(): 

593 """Collect priority weight strategy classes registered by plugins.""" 

594 global priority_weight_strategy_classes 

595 

596 if priority_weight_strategy_classes is not None: 

597 return 

598 

599 ensure_plugins_loaded() 

600 

601 if plugins is None: 

602 raise AirflowPluginException("Can't load plugins.") 

603 

604 log.debug("Initialize extra priority weight strategy plugins") 

605 

606 plugins_priority_weight_strategy_classes = { 

607 qualname(priority_weight_strategy_class): priority_weight_strategy_class 

608 for plugin in plugins 

609 for priority_weight_strategy_class in plugin.priority_weight_strategies 

610 } 

611 priority_weight_strategy_classes = { 

612 **airflow_priority_weight_strategies, 

613 **plugins_priority_weight_strategy_classes, 

614 }