Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/plugins_manager.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

364 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Manages all plugins.""" 

19 

20from __future__ import annotations 

21 

22import importlib.machinery 

23import importlib.util 

24import inspect 

25import logging 

26import os 

27import sys 

28import types 

29from collections.abc import Iterable 

30from pathlib import Path 

31from typing import TYPE_CHECKING, Any 

32 

33from airflow import settings 

34from airflow._shared.module_loading import import_string, qualname 

35from airflow.configuration import conf 

36from airflow.task.priority_strategy import ( 

37 PriorityWeightStrategy, 

38 airflow_priority_weight_strategies, 

39) 

40from airflow.utils.entry_points import entry_points_with_dist 

41from airflow.utils.file import find_path_from_directory 

42 

43if TYPE_CHECKING: 

44 from airflow.lineage.hook import HookLineageReader 

45 

46 try: 

47 import importlib_metadata as metadata 

48 except ImportError: 

49 from importlib import metadata # type: ignore[no-redef] 

50 from collections.abc import Generator 

51 from types import ModuleType 

52 

53 from airflow.listeners.listener import ListenerManager 

54 from airflow.timetables.base import Timetable 

55 

56log = logging.getLogger(__name__) 

57 

58import_errors: dict[str, str] = {} 

59 

60plugins: list[AirflowPlugin] | None = None 

61loaded_plugins: set[str] = set() 

62 

63# Plugin components to integrate as modules 

64macros_modules: list[Any] | None = None 

65 

66# Plugin components to integrate directly 

67admin_views: list[Any] | None = None 

68flask_blueprints: list[Any] | None = None 

69fastapi_apps: list[Any] | None = None 

70fastapi_root_middlewares: list[Any] | None = None 

71external_views: list[Any] | None = None 

72react_apps: list[Any] | None = None 

73menu_links: list[Any] | None = None 

74flask_appbuilder_views: list[Any] | None = None 

75flask_appbuilder_menu_links: list[Any] | None = None 

76global_operator_extra_links: list[Any] | None = None 

77operator_extra_links: list[Any] | None = None 

78registered_operator_link_classes: dict[str, type] | None = None 

79timetable_classes: dict[str, type[Timetable]] | None = None 

80hook_lineage_reader_classes: list[type[HookLineageReader]] | None = None 

81priority_weight_strategy_classes: dict[str, type[PriorityWeightStrategy]] | None = None 

82""" 

83Mapping of class names to class of OperatorLinks registered by plugins. 

84 

85Used by the DAG serialization code to only allow specific classes to be created 

86during deserialization 

87""" 

88PLUGINS_ATTRIBUTES_TO_DUMP = { 

89 "macros", 

90 "admin_views", 

91 "flask_blueprints", 

92 "fastapi_apps", 

93 "fastapi_root_middlewares", 

94 "external_views", 

95 "react_apps", 

96 "menu_links", 

97 "appbuilder_views", 

98 "appbuilder_menu_items", 

99 "global_operator_extra_links", 

100 "operator_extra_links", 

101 "source", 

102 "timetables", 

103 "listeners", 

104 "priority_weight_strategies", 

105} 

106 

107 

108class AirflowPluginSource: 

109 """Class used to define an AirflowPluginSource.""" 

110 

111 def __str__(self): 

112 raise NotImplementedError 

113 

114 def __html__(self): 

115 raise NotImplementedError 

116 

117 

118class PluginsDirectorySource(AirflowPluginSource): 

119 """Class used to define Plugins loaded from Plugins Directory.""" 

120 

121 def __init__(self, path): 

122 self.path = os.path.relpath(path, settings.PLUGINS_FOLDER) 

123 

124 def __str__(self): 

125 return f"$PLUGINS_FOLDER/{self.path}" 

126 

127 def __html__(self): 

128 return f"<em>$PLUGINS_FOLDER/</em>{self.path}" 

129 

130 

131class EntryPointSource(AirflowPluginSource): 

132 """Class used to define Plugins loaded from entrypoint.""" 

133 

134 def __init__(self, entrypoint: metadata.EntryPoint, dist: metadata.Distribution): 

135 self.dist = dist.metadata["Name"] # type: ignore[index] 

136 self.version = dist.version 

137 self.entrypoint = str(entrypoint) 

138 

139 def __str__(self): 

140 return f"{self.dist}=={self.version}: {self.entrypoint}" 

141 

142 def __html__(self): 

143 return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}" 

144 

145 

146class AirflowPluginException(Exception): 

147 """Exception when loading plugin.""" 

148 

149 

150class AirflowPlugin: 

151 """Class used to define AirflowPlugin.""" 

152 

153 name: str | None = None 

154 source: AirflowPluginSource | None = None 

155 macros: list[Any] = [] 

156 admin_views: list[Any] = [] 

157 flask_blueprints: list[Any] = [] 

158 fastapi_apps: list[Any] = [] 

159 fastapi_root_middlewares: list[Any] = [] 

160 external_views: list[Any] = [] 

161 react_apps: list[Any] = [] 

162 menu_links: list[Any] = [] 

163 appbuilder_views: list[Any] = [] 

164 appbuilder_menu_items: list[Any] = [] 

165 

166 # A list of global operator extra links that can redirect users to 

167 # external systems. These extra links will be available on the 

168 # task page in the form of buttons. 

169 # 

170 # Note: the global operator extra link can be overridden at each 

171 # operator level. 

172 global_operator_extra_links: list[Any] = [] 

173 

174 # A list of operator extra links to override or add operator links 

175 # to existing Airflow Operators. 

176 # These extra links will be available on the task page in form of 

177 # buttons. 

178 operator_extra_links: list[Any] = [] 

179 

180 # A list of timetable classes that can be used for DAG scheduling. 

181 timetables: list[type[Timetable]] = [] 

182 

183 # A list of listeners that can be used for tracking task and DAG states. 

184 listeners: list[ModuleType | object] = [] 

185 

186 # A list of hook lineage reader classes that can be used for reading lineage information from a hook. 

187 hook_lineage_readers: list[type[HookLineageReader]] = [] 

188 

189 # A list of priority weight strategy classes that can be used for calculating tasks weight priority. 

190 priority_weight_strategies: list[type[PriorityWeightStrategy]] = [] 

191 

192 @classmethod 

193 def validate(cls): 

194 """Validate if plugin has a name.""" 

195 if not cls.name: 

196 raise AirflowPluginException("Your plugin needs a name.") 

197 

198 @classmethod 

199 def on_load(cls, *args, **kwargs): 

200 """ 

201 Execute when the plugin is loaded; This method is only called once during runtime. 

202 

203 :param args: If future arguments are passed in on call. 

204 :param kwargs: If future arguments are passed in on call. 

205 """ 

206 

207 

208def is_valid_plugin(plugin_obj): 

209 """ 

210 Check whether a potential object is a subclass of the AirflowPlugin class. 

211 

212 :param plugin_obj: potential subclass of AirflowPlugin 

213 :return: Whether or not the obj is a valid subclass of 

214 AirflowPlugin 

215 """ 

216 if ( 

217 inspect.isclass(plugin_obj) 

218 and issubclass(plugin_obj, AirflowPlugin) 

219 and (plugin_obj is not AirflowPlugin) 

220 ): 

221 plugin_obj.validate() 

222 return plugin_obj not in plugins 

223 return False 

224 

225 

226def register_plugin(plugin_instance): 

227 """ 

228 Start plugin load and register it after success initialization. 

229 

230 If plugin is already registered, do nothing. 

231 

232 :param plugin_instance: subclass of AirflowPlugin 

233 """ 

234 if plugin_instance.name in loaded_plugins: 

235 return 

236 

237 loaded_plugins.add(plugin_instance.name) 

238 plugin_instance.on_load() 

239 plugins.append(plugin_instance) 

240 

241 

242def load_entrypoint_plugins(): 

243 """ 

244 Load and register plugins AirflowPlugin subclasses from the entrypoints. 

245 

246 The entry_point group should be 'airflow.plugins'. 

247 """ 

248 log.debug("Loading plugins from entrypoints") 

249 

250 for entry_point, dist in entry_points_with_dist("airflow.plugins"): 

251 log.debug("Importing entry_point plugin %s", entry_point.name) 

252 try: 

253 plugin_class = entry_point.load() 

254 if not is_valid_plugin(plugin_class): 

255 continue 

256 

257 plugin_instance = plugin_class() 

258 plugin_instance.source = EntryPointSource(entry_point, dist) 

259 register_plugin(plugin_instance) 

260 except Exception as e: 

261 log.exception("Failed to import plugin %s", entry_point.name) 

262 import_errors[entry_point.module] = str(e) 

263 

264 

265def load_plugins_from_plugin_directory(): 

266 """Load and register Airflow Plugins from plugins directory.""" 

267 log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER) 

268 files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore") 

269 plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)] 

270 

271 if conf.getboolean("core", "LOAD_EXAMPLES"): 

272 log.debug("Note: Loading plugins from examples as well: %s", settings.PLUGINS_FOLDER) 

273 from airflow.example_dags import plugins 

274 

275 example_plugins_folder = next(iter(plugins.__path__)) 

276 example_files = find_path_from_directory(example_plugins_folder, ".airflowignore") 

277 plugin_search_locations.append((plugins.__name__, example_files)) 

278 

279 for module_prefix, plugin_files in plugin_search_locations: 

280 for file_path in plugin_files: 

281 path = Path(file_path) 

282 if not path.is_file() or path.suffix != ".py": 

283 continue 

284 mod_name = f"{module_prefix}.{path.stem}" if module_prefix else path.stem 

285 

286 try: 

287 loader = importlib.machinery.SourceFileLoader(mod_name, file_path) 

288 spec = importlib.util.spec_from_loader(mod_name, loader) 

289 mod = importlib.util.module_from_spec(spec) 

290 sys.modules[spec.name] = mod 

291 loader.exec_module(mod) 

292 

293 for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)): 

294 plugin_instance = mod_attr_value() 

295 plugin_instance.source = PluginsDirectorySource(file_path) 

296 register_plugin(plugin_instance) 

297 except Exception as e: 

298 log.exception("Failed to import plugin %s", file_path) 

299 import_errors[file_path] = str(e) 

300 

301 

302def load_providers_plugins(): 

303 from airflow.providers_manager import ProvidersManager 

304 

305 log.debug("Loading plugins from providers") 

306 providers_manager = ProvidersManager() 

307 providers_manager.initialize_providers_plugins() 

308 for plugin in providers_manager.plugins: 

309 log.debug("Importing plugin %s from class %s", plugin.name, plugin.plugin_class) 

310 

311 try: 

312 plugin_instance = import_string(plugin.plugin_class) 

313 if is_valid_plugin(plugin_instance): 

314 register_plugin(plugin_instance) 

315 else: 

316 log.warning("Plugin %s is not a valid plugin", plugin.name) 

317 except ImportError: 

318 log.exception("Failed to load plugin %s from class name %s", plugin.name, plugin.plugin_class) 

319 

320 

321def make_module(name: str, objects: list[Any]): 

322 """Create new module.""" 

323 if not objects: 

324 return None 

325 log.debug("Creating module %s", name) 

326 name = name.lower() 

327 module = types.ModuleType(name) 

328 module._name = name.split(".")[-1] # type: ignore 

329 module._objects = objects # type: ignore 

330 module.__dict__.update((o.__name__, o) for o in objects) 

331 return module 

332 

333 

334def ensure_plugins_loaded(): 

335 """ 

336 Load plugins from plugins directory and entrypoints. 

337 

338 Plugins are only loaded if they have not been previously loaded. 

339 """ 

340 from airflow.observability.stats import Stats 

341 

342 global plugins 

343 

344 if plugins is not None: 

345 log.debug("Plugins are already loaded. Skipping.") 

346 return 

347 

348 if not settings.PLUGINS_FOLDER: 

349 raise ValueError("Plugins folder is not set") 

350 

351 log.debug("Loading plugins") 

352 

353 with Stats.timer() as timer: 

354 plugins = [] 

355 

356 load_plugins_from_plugin_directory() 

357 load_entrypoint_plugins() 

358 

359 if not settings.LAZY_LOAD_PROVIDERS: 

360 load_providers_plugins() 

361 

362 if plugins: 

363 log.debug("Loading %d plugin(s) took %.2f seconds", len(plugins), timer.duration) 

364 

365 

366def initialize_ui_plugins(): 

367 """Collect extension points for the UI.""" 

368 global external_views 

369 global react_apps 

370 

371 if external_views is not None and react_apps is not None: 

372 return 

373 

374 ensure_plugins_loaded() 

375 

376 if plugins is None: 

377 raise AirflowPluginException("Can't load plugins.") 

378 

379 log.debug("Initialize UI plugin") 

380 

381 seen_url_route = {} 

382 external_views = [] 

383 react_apps = [] 

384 

385 def _remove_list_item(lst, item): 

386 # Mutate in place the plugin's external views and react apps list to remove the invalid items 

387 # because some function still access these plugin's attribute and not the 

388 # global variables `external_views` `react_apps`. (get_plugin_info, for example) 

389 lst.remove(item) 

390 

391 for plugin in plugins: 

392 external_views_to_remove = [] 

393 react_apps_to_remove = [] 

394 for external_view in plugin.external_views: 

395 if not isinstance(external_view, dict): 

396 log.warning( 

397 "Plugin '%s' has an external view that is not a dictionary. The view will not be loaded.", 

398 plugin.name, 

399 ) 

400 external_views_to_remove.append(external_view) 

401 continue 

402 url_route = external_view.get("url_route") 

403 if url_route is None: 

404 continue 

405 if url_route in seen_url_route: 

406 log.warning( 

407 "Plugin '%s' has an external view with an URL route '%s' " 

408 "that conflicts with another plugin '%s'. The view will not be loaded.", 

409 plugin.name, 

410 url_route, 

411 seen_url_route[url_route], 

412 ) 

413 external_views_to_remove.append(external_view) 

414 continue 

415 external_views.append(external_view) 

416 seen_url_route[url_route] = plugin.name 

417 

418 for react_app in plugin.react_apps: 

419 if not isinstance(react_app, dict): 

420 log.warning( 

421 "Plugin '%s' has a React App that is not a dictionary. The React App will not be loaded.", 

422 plugin.name, 

423 ) 

424 react_apps_to_remove.append(react_app) 

425 continue 

426 url_route = react_app.get("url_route") 

427 if url_route is None: 

428 continue 

429 if url_route in seen_url_route: 

430 log.warning( 

431 "Plugin '%s' has a React App with an URL route '%s' " 

432 "that conflicts with another plugin '%s'. The React App will not be loaded.", 

433 plugin.name, 

434 url_route, 

435 seen_url_route[url_route], 

436 ) 

437 react_apps_to_remove.append(react_app) 

438 continue 

439 react_apps.append(react_app) 

440 seen_url_route[url_route] = plugin.name 

441 

442 for item in external_views_to_remove: 

443 _remove_list_item(plugin.external_views, item) 

444 for item in react_apps_to_remove: 

445 _remove_list_item(plugin.react_apps, item) 

446 

447 

448def initialize_flask_plugins(): 

449 """Collect flask extension points for WEB UI (legacy).""" 

450 global flask_blueprints 

451 global flask_appbuilder_views 

452 global flask_appbuilder_menu_links 

453 

454 if ( 

455 flask_blueprints is not None 

456 and flask_appbuilder_views is not None 

457 and flask_appbuilder_menu_links is not None 

458 ): 

459 return 

460 

461 ensure_plugins_loaded() 

462 

463 if plugins is None: 

464 raise AirflowPluginException("Can't load plugins.") 

465 

466 log.debug("Initialize legacy Web UI plugin") 

467 

468 flask_blueprints = [] 

469 flask_appbuilder_views = [] 

470 flask_appbuilder_menu_links = [] 

471 

472 for plugin in plugins: 

473 flask_appbuilder_views.extend(plugin.appbuilder_views) 

474 flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items) 

475 flask_blueprints.extend([{"name": plugin.name, "blueprint": bp} for bp in plugin.flask_blueprints]) 

476 

477 if (plugin.admin_views and not plugin.appbuilder_views) or ( 

478 plugin.menu_links and not plugin.appbuilder_menu_items 

479 ): 

480 log.warning( 

481 "Plugin '%s' may not be compatible with the current Airflow version. " 

482 "Please contact the author of the plugin.", 

483 plugin.name, 

484 ) 

485 

486 

487def initialize_fastapi_plugins(): 

488 """Collect extension points for the API.""" 

489 global fastapi_apps 

490 global fastapi_root_middlewares 

491 

492 if fastapi_apps is not None and fastapi_root_middlewares is not None: 

493 return 

494 

495 ensure_plugins_loaded() 

496 

497 if plugins is None: 

498 raise AirflowPluginException("Can't load plugins.") 

499 

500 log.debug("Initialize FastAPI plugins") 

501 

502 fastapi_apps = [] 

503 fastapi_root_middlewares = [] 

504 

505 for plugin in plugins: 

506 fastapi_apps.extend(plugin.fastapi_apps) 

507 fastapi_root_middlewares.extend(plugin.fastapi_root_middlewares) 

508 

509 

510def initialize_extra_operators_links_plugins(): 

511 """Create modules for loaded extension from extra operators links plugins.""" 

512 global global_operator_extra_links 

513 global operator_extra_links 

514 global registered_operator_link_classes 

515 

516 if ( 

517 global_operator_extra_links is not None 

518 and operator_extra_links is not None 

519 and registered_operator_link_classes is not None 

520 ): 

521 return 

522 

523 ensure_plugins_loaded() 

524 

525 if plugins is None: 

526 raise AirflowPluginException("Can't load plugins.") 

527 

528 log.debug("Initialize extra operators links plugins") 

529 

530 global_operator_extra_links = [] 

531 operator_extra_links = [] 

532 registered_operator_link_classes = {} 

533 

534 for plugin in plugins: 

535 global_operator_extra_links.extend(plugin.global_operator_extra_links) 

536 operator_extra_links.extend(list(plugin.operator_extra_links)) 

537 

538 registered_operator_link_classes.update( 

539 {qualname(link.__class__): link.__class__ for link in plugin.operator_extra_links} 

540 ) 

541 

542 

543def initialize_timetables_plugins(): 

544 """Collect timetable classes registered by plugins.""" 

545 global timetable_classes 

546 

547 if timetable_classes is not None: 

548 return 

549 

550 ensure_plugins_loaded() 

551 

552 if plugins is None: 

553 raise AirflowPluginException("Can't load plugins.") 

554 

555 log.debug("Initialize extra timetables plugins") 

556 

557 timetable_classes = { 

558 qualname(timetable_class): timetable_class 

559 for plugin in plugins 

560 for timetable_class in plugin.timetables 

561 } 

562 

563 

564def initialize_hook_lineage_readers_plugins(): 

565 """Collect hook lineage reader classes registered by plugins.""" 

566 global hook_lineage_reader_classes 

567 

568 if hook_lineage_reader_classes is not None: 

569 return 

570 

571 ensure_plugins_loaded() 

572 

573 if plugins is None: 

574 raise AirflowPluginException("Can't load plugins.") 

575 

576 log.debug("Initialize hook lineage readers plugins") 

577 

578 hook_lineage_reader_classes = [] 

579 for plugin in plugins: 

580 hook_lineage_reader_classes.extend(plugin.hook_lineage_readers) 

581 

582 

583def integrate_macros_plugins() -> None: 

584 """Integrates macro plugins.""" 

585 global macros_modules 

586 

587 from airflow.sdk.execution_time import macros 

588 

589 if macros_modules is not None: 

590 return 

591 

592 ensure_plugins_loaded() 

593 

594 if plugins is None: 

595 raise AirflowPluginException("Can't load plugins.") 

596 

597 log.debug("Integrate Macros plugins") 

598 

599 macros_modules = [] 

600 

601 for plugin in plugins: 

602 if plugin.name is None: 

603 raise AirflowPluginException("Invalid plugin name") 

604 

605 macros_module = make_module(f"airflow.sdk.execution_time.macros.{plugin.name}", plugin.macros) 

606 

607 if macros_module: 

608 macros_modules.append(macros_module) 

609 sys.modules[macros_module.__name__] = macros_module 

610 # Register the newly created module on airflow.macros such that it 

611 # can be accessed when rendering templates. 

612 setattr(macros, plugin.name, macros_module) 

613 

614 

615def integrate_listener_plugins(listener_manager: ListenerManager) -> None: 

616 """Add listeners from plugins.""" 

617 ensure_plugins_loaded() 

618 

619 if plugins: 

620 for plugin in plugins: 

621 if plugin.name is None: 

622 raise AirflowPluginException("Invalid plugin name") 

623 

624 for listener in plugin.listeners: 

625 listener_manager.add_listener(listener) 

626 

627 

628def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str, Any]]: 

629 """ 

630 Dump plugins attributes. 

631 

632 :param attrs_to_dump: A list of plugin attributes to dump 

633 """ 

634 ensure_plugins_loaded() 

635 integrate_macros_plugins() 

636 initialize_flask_plugins() 

637 initialize_fastapi_plugins() 

638 initialize_ui_plugins() 

639 initialize_extra_operators_links_plugins() 

640 if not attrs_to_dump: 

641 attrs_to_dump = PLUGINS_ATTRIBUTES_TO_DUMP 

642 plugins_info = [] 

643 if plugins: 

644 for plugin in plugins: 

645 info: dict[str, Any] = {"name": plugin.name} 

646 for attr in attrs_to_dump: 

647 if attr in ("global_operator_extra_links", "operator_extra_links"): 

648 info[attr] = [f"<{qualname(d.__class__)} object>" for d in getattr(plugin, attr)] 

649 elif attr in ("macros", "timetables", "priority_weight_strategies"): 

650 info[attr] = [qualname(d) for d in getattr(plugin, attr)] 

651 elif attr == "listeners": 

652 # listeners may be modules or class instances 

653 info[attr] = [ 

654 d.__name__ if inspect.ismodule(d) else qualname(d) for d in getattr(plugin, attr) 

655 ] 

656 elif attr == "appbuilder_views": 

657 info[attr] = [ 

658 {**d, "view": qualname(d["view"].__class__) if "view" in d else None} 

659 for d in getattr(plugin, attr) 

660 ] 

661 elif attr == "flask_blueprints": 

662 info[attr] = [ 

663 f"<{qualname(d.__class__)}: name={d.name!r} import_name={d.import_name!r}>" 

664 for d in getattr(plugin, attr) 

665 ] 

666 elif attr == "fastapi_apps": 

667 info[attr] = [ 

668 {**d, "app": qualname(d["app"].__class__) if "app" in d else None} 

669 for d in getattr(plugin, attr) 

670 ] 

671 elif attr == "fastapi_root_middlewares": 

672 # remove args and kwargs from plugin info to hide potentially sensitive info. 

673 info[attr] = [ 

674 { 

675 k: (v if k != "middleware" else qualname(middleware_dict["middleware"])) 

676 for k, v in middleware_dict.items() 

677 if k not in ("args", "kwargs") 

678 } 

679 for middleware_dict in getattr(plugin, attr) 

680 ] 

681 else: 

682 info[attr] = getattr(plugin, attr) 

683 plugins_info.append(info) 

684 return plugins_info 

685 

686 

687def initialize_priority_weight_strategy_plugins(): 

688 """Collect priority weight strategy classes registered by plugins.""" 

689 global priority_weight_strategy_classes 

690 

691 if priority_weight_strategy_classes is not None: 

692 return 

693 

694 ensure_plugins_loaded() 

695 

696 if plugins is None: 

697 raise AirflowPluginException("Can't load plugins.") 

698 

699 log.debug("Initialize extra priority weight strategy plugins") 

700 

701 plugins_priority_weight_strategy_classes = { 

702 qualname(priority_weight_strategy_class): priority_weight_strategy_class 

703 for plugin in plugins 

704 for priority_weight_strategy_class in plugin.priority_weight_strategies 

705 } 

706 priority_weight_strategy_classes = { 

707 **airflow_priority_weight_strategies, 

708 **plugins_priority_weight_strategy_classes, 

709 }