Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/plugins_manager.py: 30%

272 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Manages all plugins.""" 

19from __future__ import annotations 

20 

21import importlib 

22import importlib.machinery 

23import importlib.util 

24import inspect 

25import logging 

26import os 

27import sys 

28import types 

29from typing import TYPE_CHECKING, Any, Iterable 

30 

31try: 

32 import importlib_metadata 

33except ImportError: 

34 from importlib import metadata as importlib_metadata # type: ignore[no-redef] 

35 

36from types import ModuleType 

37 

38from airflow import settings 

39from airflow.utils.entry_points import entry_points_with_dist 

40from airflow.utils.file import find_path_from_directory 

41from airflow.utils.module_loading import qualname 

42 

43if TYPE_CHECKING: 

44 from airflow.hooks.base import BaseHook 

45 from airflow.listeners.listener import ListenerManager 

46 from airflow.timetables.base import Timetable 

47 

48log = logging.getLogger(__name__) 

49 

50import_errors: dict[str, str] = {} 

51 

52plugins: list[AirflowPlugin] | None = None 

53 

54# Plugin components to integrate as modules 

55registered_hooks: list[BaseHook] | None = None 

56macros_modules: list[Any] | None = None 

57executors_modules: list[Any] | None = None 

58 

59# Plugin components to integrate directly 

60admin_views: list[Any] | None = None 

61flask_blueprints: list[Any] | None = None 

62menu_links: list[Any] | None = None 

63flask_appbuilder_views: list[Any] | None = None 

64flask_appbuilder_menu_links: list[Any] | None = None 

65global_operator_extra_links: list[Any] | None = None 

66operator_extra_links: list[Any] | None = None 

67registered_operator_link_classes: dict[str, type] | None = None 

68registered_ti_dep_classes: dict[str, type] | None = None 

69timetable_classes: dict[str, type[Timetable]] | None = None 

70""" 

71Mapping of class names to class of OperatorLinks registered by plugins. 

72 

73Used by the DAG serialization code to only allow specific classes to be created 

74during deserialization 

75""" 

76PLUGINS_ATTRIBUTES_TO_DUMP = { 

77 "hooks", 

78 "executors", 

79 "macros", 

80 "flask_blueprints", 

81 "appbuilder_views", 

82 "appbuilder_menu_items", 

83 "global_operator_extra_links", 

84 "operator_extra_links", 

85 "ti_deps", 

86 "timetables", 

87 "source", 

88 "listeners", 

89} 

90 

91 

92class AirflowPluginSource: 

93 """Class used to define an AirflowPluginSource.""" 

94 

95 def __str__(self): 

96 raise NotImplementedError 

97 

98 def __html__(self): 

99 raise NotImplementedError 

100 

101 

102class PluginsDirectorySource(AirflowPluginSource): 

103 """Class used to define Plugins loaded from Plugins Directory.""" 

104 

105 def __init__(self, path): 

106 self.path = os.path.relpath(path, settings.PLUGINS_FOLDER) 

107 

108 def __str__(self): 

109 return f"$PLUGINS_FOLDER/{self.path}" 

110 

111 def __html__(self): 

112 return f"<em>$PLUGINS_FOLDER/</em>{self.path}" 

113 

114 

115class EntryPointSource(AirflowPluginSource): 

116 """Class used to define Plugins loaded from entrypoint.""" 

117 

118 def __init__(self, entrypoint: importlib_metadata.EntryPoint, dist: importlib_metadata.Distribution): 

119 self.dist = dist.metadata["Name"] 

120 self.version = dist.version 

121 self.entrypoint = str(entrypoint) 

122 

123 def __str__(self): 

124 return f"{self.dist}=={self.version}: {self.entrypoint}" 

125 

126 def __html__(self): 

127 return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}" 

128 

129 

130class AirflowPluginException(Exception): 

131 """Exception when loading plugin.""" 

132 

133 

134class AirflowPlugin: 

135 """Class used to define AirflowPlugin.""" 

136 

137 name: str | None = None 

138 source: AirflowPluginSource | None = None 

139 hooks: list[Any] = [] 

140 executors: list[Any] = [] 

141 macros: list[Any] = [] 

142 admin_views: list[Any] = [] 

143 flask_blueprints: list[Any] = [] 

144 menu_links: list[Any] = [] 

145 appbuilder_views: list[Any] = [] 

146 appbuilder_menu_items: list[Any] = [] 

147 

148 # A list of global operator extra links that can redirect users to 

149 # external systems. These extra links will be available on the 

150 # task page in the form of buttons. 

151 # 

152 # Note: the global operator extra link can be overridden at each 

153 # operator level. 

154 global_operator_extra_links: list[Any] = [] 

155 

156 # A list of operator extra links to override or add operator links 

157 # to existing Airflow Operators. 

158 # These extra links will be available on the task page in form of 

159 # buttons. 

160 operator_extra_links: list[Any] = [] 

161 

162 ti_deps: list[Any] = [] 

163 

164 # A list of timetable classes that can be used for DAG scheduling. 

165 timetables: list[type[Timetable]] = [] 

166 

167 listeners: list[ModuleType] = [] 

168 

169 @classmethod 

170 def validate(cls): 

171 """Validates that plugin has a name.""" 

172 if not cls.name: 

173 raise AirflowPluginException("Your plugin needs a name.") 

174 

175 @classmethod 

176 def on_load(cls, *args, **kwargs): 

177 """ 

178 Executed when the plugin is loaded. 

179 This method is only called once during runtime. 

180 

181 :param args: If future arguments are passed in on call. 

182 :param kwargs: If future arguments are passed in on call. 

183 """ 

184 

185 

186def is_valid_plugin(plugin_obj): 

187 """ 

188 Check whether a potential object is a subclass of the AirflowPlugin class. 

189 

190 :param plugin_obj: potential subclass of AirflowPlugin 

191 :return: Whether or not the obj is a valid subclass of 

192 AirflowPlugin 

193 """ 

194 global plugins 

195 

196 if ( 

197 inspect.isclass(plugin_obj) 

198 and issubclass(plugin_obj, AirflowPlugin) 

199 and (plugin_obj is not AirflowPlugin) 

200 ): 

201 plugin_obj.validate() 

202 return plugin_obj not in plugins 

203 return False 

204 

205 

206def register_plugin(plugin_instance): 

207 """ 

208 Start plugin load and register it after success initialization. 

209 

210 :param plugin_instance: subclass of AirflowPlugin 

211 """ 

212 global plugins 

213 plugin_instance.on_load() 

214 plugins.append(plugin_instance) 

215 

216 

217def load_entrypoint_plugins(): 

218 """ 

219 Load and register plugins AirflowPlugin subclasses from the entrypoints. 

220 The entry_point group should be 'airflow.plugins'. 

221 """ 

222 global import_errors 

223 

224 log.debug("Loading plugins from entrypoints") 

225 

226 for entry_point, dist in entry_points_with_dist("airflow.plugins"): 

227 log.debug("Importing entry_point plugin %s", entry_point.name) 

228 try: 

229 plugin_class = entry_point.load() 

230 if not is_valid_plugin(plugin_class): 

231 continue 

232 

233 plugin_instance = plugin_class() 

234 plugin_instance.source = EntryPointSource(entry_point, dist) 

235 register_plugin(plugin_instance) 

236 except Exception as e: 

237 log.exception("Failed to import plugin %s", entry_point.name) 

238 import_errors[entry_point.module] = str(e) 

239 

240 

241def load_plugins_from_plugin_directory(): 

242 """Load and register Airflow Plugins from plugins directory.""" 

243 global import_errors 

244 log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER) 

245 

246 for file_path in find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore"): 

247 if not os.path.isfile(file_path): 

248 continue 

249 mod_name, file_ext = os.path.splitext(os.path.split(file_path)[-1]) 

250 if file_ext != ".py": 

251 continue 

252 

253 try: 

254 loader = importlib.machinery.SourceFileLoader(mod_name, file_path) 

255 spec = importlib.util.spec_from_loader(mod_name, loader) 

256 mod = importlib.util.module_from_spec(spec) 

257 sys.modules[spec.name] = mod 

258 loader.exec_module(mod) 

259 log.debug("Importing plugin module %s", file_path) 

260 

261 for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)): 

262 plugin_instance = mod_attr_value() 

263 plugin_instance.source = PluginsDirectorySource(file_path) 

264 register_plugin(plugin_instance) 

265 except Exception as e: 

266 log.exception("Failed to import plugin %s", file_path) 

267 import_errors[file_path] = str(e) 

268 

269 

270def make_module(name: str, objects: list[Any]): 

271 """Creates new module.""" 

272 if not objects: 

273 return None 

274 log.debug("Creating module %s", name) 

275 name = name.lower() 

276 module = types.ModuleType(name) 

277 module._name = name.split(".")[-1] # type: ignore 

278 module._objects = objects # type: ignore 

279 module.__dict__.update((o.__name__, o) for o in objects) 

280 return module 

281 

282 

283def ensure_plugins_loaded(): 

284 """ 

285 Load plugins from plugins directory and entrypoints. 

286 

287 Plugins are only loaded if they have not been previously loaded. 

288 """ 

289 from airflow.stats import Stats 

290 

291 global plugins, registered_hooks 

292 

293 if plugins is not None: 

294 log.debug("Plugins are already loaded. Skipping.") 

295 return 

296 

297 if not settings.PLUGINS_FOLDER: 

298 raise ValueError("Plugins folder is not set") 

299 

300 log.debug("Loading plugins") 

301 

302 with Stats.timer() as timer: 

303 plugins = [] 

304 registered_hooks = [] 

305 

306 load_plugins_from_plugin_directory() 

307 load_entrypoint_plugins() 

308 

309 # We don't do anything with these for now, but we want to keep track of 

310 # them so we can integrate them in to the UI's Connection screens 

311 for plugin in plugins: 

312 registered_hooks.extend(plugin.hooks) 

313 

314 num_loaded = len(plugins) 

315 if num_loaded > 0: 

316 log.debug("Loading %d plugin(s) took %.2f seconds", num_loaded, timer.duration) 

317 

318 

319def initialize_web_ui_plugins(): 

320 """Collect extension points for WEB UI.""" 

321 global plugins 

322 global flask_blueprints 

323 global flask_appbuilder_views 

324 global flask_appbuilder_menu_links 

325 

326 if ( 

327 flask_blueprints is not None 

328 and flask_appbuilder_views is not None 

329 and flask_appbuilder_menu_links is not None 

330 ): 

331 return 

332 

333 ensure_plugins_loaded() 

334 

335 if plugins is None: 

336 raise AirflowPluginException("Can't load plugins.") 

337 

338 log.debug("Initialize Web UI plugin") 

339 

340 flask_blueprints = [] 

341 flask_appbuilder_views = [] 

342 flask_appbuilder_menu_links = [] 

343 

344 for plugin in plugins: 

345 flask_appbuilder_views.extend(plugin.appbuilder_views) 

346 flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items) 

347 flask_blueprints.extend([{"name": plugin.name, "blueprint": bp} for bp in plugin.flask_blueprints]) 

348 

349 if (plugin.admin_views and not plugin.appbuilder_views) or ( 

350 plugin.menu_links and not plugin.appbuilder_menu_items 

351 ): 

352 log.warning( 

353 "Plugin '%s' may not be compatible with the current Airflow version. " 

354 "Please contact the author of the plugin.", 

355 plugin.name, 

356 ) 

357 

358 

359def initialize_ti_deps_plugins(): 

360 """Create modules for loaded extension from custom task instance dependency rule plugins.""" 

361 global registered_ti_dep_classes 

362 if registered_ti_dep_classes is not None: 

363 return 

364 

365 ensure_plugins_loaded() 

366 

367 if plugins is None: 

368 raise AirflowPluginException("Can't load plugins.") 

369 

370 log.debug("Initialize custom taskinstance deps plugins") 

371 

372 registered_ti_dep_classes = {} 

373 

374 for plugin in plugins: 

375 registered_ti_dep_classes.update( 

376 {qualname(ti_dep.__class__): ti_dep.__class__ for ti_dep in plugin.ti_deps} 

377 ) 

378 

379 

380def initialize_extra_operators_links_plugins(): 

381 """Create modules for loaded extension from extra operators links plugins.""" 

382 global global_operator_extra_links 

383 global operator_extra_links 

384 global registered_operator_link_classes 

385 

386 if ( 

387 global_operator_extra_links is not None 

388 and operator_extra_links is not None 

389 and registered_operator_link_classes is not None 

390 ): 

391 return 

392 

393 ensure_plugins_loaded() 

394 

395 if plugins is None: 

396 raise AirflowPluginException("Can't load plugins.") 

397 

398 log.debug("Initialize extra operators links plugins") 

399 

400 global_operator_extra_links = [] 

401 operator_extra_links = [] 

402 registered_operator_link_classes = {} 

403 

404 for plugin in plugins: 

405 global_operator_extra_links.extend(plugin.global_operator_extra_links) 

406 operator_extra_links.extend(list(plugin.operator_extra_links)) 

407 

408 registered_operator_link_classes.update( 

409 {qualname(link.__class__): link.__class__ for link in plugin.operator_extra_links} 

410 ) 

411 

412 

413def initialize_timetables_plugins(): 

414 """Collect timetable classes registered by plugins.""" 

415 global timetable_classes 

416 

417 if timetable_classes is not None: 

418 return 

419 

420 ensure_plugins_loaded() 

421 

422 if plugins is None: 

423 raise AirflowPluginException("Can't load plugins.") 

424 

425 log.debug("Initialize extra timetables plugins") 

426 

427 timetable_classes = { 

428 qualname(timetable_class): timetable_class 

429 for plugin in plugins 

430 for timetable_class in plugin.timetables 

431 } 

432 

433 

434def integrate_executor_plugins() -> None: 

435 """Integrate executor plugins to the context.""" 

436 global plugins 

437 global executors_modules 

438 

439 if executors_modules is not None: 

440 return 

441 

442 ensure_plugins_loaded() 

443 

444 if plugins is None: 

445 raise AirflowPluginException("Can't load plugins.") 

446 

447 log.debug("Integrate executor plugins") 

448 

449 executors_modules = [] 

450 for plugin in plugins: 

451 if plugin.name is None: 

452 raise AirflowPluginException("Invalid plugin name") 

453 plugin_name: str = plugin.name 

454 

455 executors_module = make_module("airflow.executors." + plugin_name, plugin.executors) 

456 if executors_module: 

457 executors_modules.append(executors_module) 

458 sys.modules[executors_module.__name__] = executors_module 

459 

460 

461def integrate_macros_plugins() -> None: 

462 """Integrates macro plugins.""" 

463 global plugins 

464 global macros_modules 

465 

466 from airflow import macros 

467 

468 if macros_modules is not None: 

469 return 

470 

471 ensure_plugins_loaded() 

472 

473 if plugins is None: 

474 raise AirflowPluginException("Can't load plugins.") 

475 

476 log.debug("Integrate DAG plugins") 

477 

478 macros_modules = [] 

479 

480 for plugin in plugins: 

481 if plugin.name is None: 

482 raise AirflowPluginException("Invalid plugin name") 

483 

484 macros_module = make_module(f"airflow.macros.{plugin.name}", plugin.macros) 

485 

486 if macros_module: 

487 macros_modules.append(macros_module) 

488 sys.modules[macros_module.__name__] = macros_module 

489 # Register the newly created module on airflow.macros such that it 

490 # can be accessed when rendering templates. 

491 setattr(macros, plugin.name, macros_module) 

492 

493 

494def integrate_listener_plugins(listener_manager: ListenerManager) -> None: 

495 """Add listeners from plugins.""" 

496 global plugins 

497 

498 ensure_plugins_loaded() 

499 

500 if plugins: 

501 for plugin in plugins: 

502 if plugin.name is None: 

503 raise AirflowPluginException("Invalid plugin name") 

504 

505 for listener in plugin.listeners: 

506 listener_manager.add_listener(listener) 

507 

508 

509def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str, Any]]: 

510 """ 

511 Dump plugins attributes. 

512 

513 :param attrs_to_dump: A list of plugin attributes to dump 

514 """ 

515 ensure_plugins_loaded() 

516 integrate_executor_plugins() 

517 integrate_macros_plugins() 

518 initialize_web_ui_plugins() 

519 initialize_extra_operators_links_plugins() 

520 if not attrs_to_dump: 

521 attrs_to_dump = PLUGINS_ATTRIBUTES_TO_DUMP 

522 plugins_info = [] 

523 if plugins: 

524 for plugin in plugins: 

525 info: dict[str, Any] = {"name": plugin.name} 

526 for attr in attrs_to_dump: 

527 if attr in ("global_operator_extra_links", "operator_extra_links"): 

528 info[attr] = [f"<{qualname(d.__class__)} object>" for d in getattr(plugin, attr)] 

529 elif attr in ("macros", "timetables", "hooks", "executors"): 

530 info[attr] = [qualname(d) for d in getattr(plugin, attr)] 

531 elif attr == "listeners": 

532 # listeners are always modules 

533 info[attr] = [d.__name__ for d in getattr(plugin, attr)] 

534 elif attr == "appbuilder_views": 

535 info[attr] = [ 

536 {**d, "view": qualname(d["view"].__class__) if "view" in d else None} 

537 for d in getattr(plugin, attr) 

538 ] 

539 elif attr == "flask_blueprints": 

540 info[attr] = [ 

541 f"<{qualname(d.__class__)}: name={d.name!r} import_name={d.import_name!r}>" 

542 for d in getattr(plugin, attr) 

543 ] 

544 else: 

545 info[attr] = getattr(plugin, attr) 

546 plugins_info.append(info) 

547 return plugins_info