Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/providers_manager.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

707 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Manages all providers.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import functools 

24import inspect 

25import json 

26import logging 

27import traceback 

28import warnings 

29from collections.abc import Callable, MutableMapping 

30from dataclasses import dataclass 

31from functools import wraps 

32from importlib.resources import files as resource_files 

33from time import perf_counter 

34from typing import TYPE_CHECKING, Any, NamedTuple, ParamSpec, TypeVar, cast 

35 

36from packaging.utils import canonicalize_name 

37 

38from airflow._shared.module_loading import entry_points_with_dist, import_string 

39from airflow.exceptions import AirflowOptionalProviderFeatureException 

40from airflow.utils.log.logging_mixin import LoggingMixin 

41 

42if TYPE_CHECKING: 

43 from airflow.cli.cli_config import CLICommand 

44 

45log = logging.getLogger(__name__) 

46 

47 

48PS = ParamSpec("PS") 

49RT = TypeVar("RT") 

50 

51MIN_PROVIDER_VERSIONS = { 

52 "apache-airflow-providers-celery": "2.1.0", 

53} 

54 

55 

56def _ensure_prefix_for_placeholders(field_behaviors: dict[str, Any], conn_type: str): 

57 """ 

58 Verify the correct placeholder prefix. 

59 

60 If the given field_behaviors dict contains a placeholder's node, and there 

61 are placeholders for extra fields (i.e. anything other than the built-in conn 

62 attrs), and if those extra fields are unprefixed, then add the prefix. 

63 

64 The reason we need to do this is, all custom conn fields live in the same dictionary, 

65 so we need to namespace them with a prefix internally. But for user convenience, 

66 and consistency between the `get_ui_field_behaviour` method and the extra dict itself, 

67 we allow users to supply the unprefixed name. 

68 """ 

69 conn_attrs = {"host", "schema", "login", "password", "port", "extra"} 

70 

71 def ensure_prefix(field): 

72 if field not in conn_attrs and not field.startswith("extra__"): 

73 return f"extra__{conn_type}__{field}" 

74 return field 

75 

76 if "placeholders" in field_behaviors: 

77 placeholders = field_behaviors["placeholders"] 

78 field_behaviors["placeholders"] = {ensure_prefix(k): v for k, v in placeholders.items()} 

79 

80 return field_behaviors 

81 

82 

83if TYPE_CHECKING: 

84 from urllib.parse import SplitResult 

85 

86 from airflow.sdk import BaseHook 

87 from airflow.sdk.bases.decorator import TaskDecorator 

88 from airflow.sdk.definitions.asset import Asset 

89 

90 

91class LazyDictWithCache(MutableMapping): 

92 """ 

93 Lazy-loaded cached dictionary. 

94 

95 Dictionary, which in case you set callable, executes the passed callable with `key` attribute 

96 at first use - and returns and caches the result. 

97 """ 

98 

99 __slots__ = ["_resolved", "_raw_dict"] 

100 

101 def __init__(self, *args, **kw): 

102 self._resolved = set() 

103 self._raw_dict = dict(*args, **kw) 

104 

105 def __setitem__(self, key, value): 

106 self._raw_dict.__setitem__(key, value) 

107 

108 def __getitem__(self, key): 

109 value = self._raw_dict.__getitem__(key) 

110 if key not in self._resolved and callable(value): 

111 # exchange callable with result of calling it -- but only once! allow resolver to return a 

112 # callable itself 

113 value = value() 

114 self._resolved.add(key) 

115 self._raw_dict.__setitem__(key, value) 

116 return value 

117 

118 def __delitem__(self, key): 

119 with contextlib.suppress(KeyError): 

120 self._resolved.remove(key) 

121 self._raw_dict.__delitem__(key) 

122 

123 def __iter__(self): 

124 return iter(self._raw_dict) 

125 

126 def __len__(self): 

127 return len(self._raw_dict) 

128 

129 def __contains__(self, key): 

130 return key in self._raw_dict 

131 

132 def clear(self): 

133 self._resolved.clear() 

134 self._raw_dict.clear() 

135 

136 

137def _read_schema_from_resources_or_local_file(filename: str) -> dict: 

138 try: 

139 with resource_files("airflow").joinpath(filename).open("rb") as f: 

140 schema = json.load(f) 

141 except (TypeError, FileNotFoundError): 

142 import pathlib 

143 

144 with (pathlib.Path(__file__).parent / filename).open("rb") as f: 

145 schema = json.load(f) 

146 return schema 

147 

148 

149def _create_provider_info_schema_validator(): 

150 """Create JSON schema validator from the provider_info.schema.json.""" 

151 import jsonschema 

152 

153 schema = _read_schema_from_resources_or_local_file("provider_info.schema.json") 

154 cls = jsonschema.validators.validator_for(schema) 

155 validator = cls(schema) 

156 return validator 

157 

158 

159def _create_customized_form_field_behaviours_schema_validator(): 

160 """Create JSON schema validator from the customized_form_field_behaviours.schema.json.""" 

161 import jsonschema 

162 

163 schema = _read_schema_from_resources_or_local_file("customized_form_field_behaviours.schema.json") 

164 cls = jsonschema.validators.validator_for(schema) 

165 validator = cls(schema) 

166 return validator 

167 

168 

169def _check_builtin_provider_prefix(provider_package: str, class_name: str) -> bool: 

170 if provider_package.startswith("apache-airflow"): 

171 provider_path = provider_package[len("apache-") :].replace("-", ".") 

172 if not class_name.startswith(provider_path): 

173 log.warning( 

174 "Coherence check failed when importing '%s' from '%s' package. It should start with '%s'", 

175 class_name, 

176 provider_package, 

177 provider_path, 

178 ) 

179 return False 

180 return True 

181 

182 

183@dataclass 

184class ProviderInfo: 

185 """ 

186 Provider information. 

187 

188 :param version: version string 

189 :param data: dictionary with information about the provider 

190 """ 

191 

192 version: str 

193 data: dict 

194 

195 

196class HookClassProvider(NamedTuple): 

197 """Hook class and Provider it comes from.""" 

198 

199 hook_class_name: str 

200 package_name: str 

201 

202 

203class DialectInfo(NamedTuple): 

204 """Dialect class and Provider it comes from.""" 

205 

206 name: str 

207 dialect_class_name: str 

208 provider_name: str 

209 

210 

211class TriggerInfo(NamedTuple): 

212 """Trigger class and provider it comes from.""" 

213 

214 trigger_class_name: str 

215 package_name: str 

216 integration_name: str 

217 

218 

219class NotificationInfo(NamedTuple): 

220 """Notification class and provider it comes from.""" 

221 

222 notification_class_name: str 

223 package_name: str 

224 

225 

226class PluginInfo(NamedTuple): 

227 """Plugin class, name and provider it comes from.""" 

228 

229 name: str 

230 plugin_class: str 

231 provider_name: str 

232 

233 

234class HookInfo(NamedTuple): 

235 """Hook information.""" 

236 

237 hook_class_name: str 

238 connection_id_attribute_name: str 

239 package_name: str 

240 hook_name: str 

241 connection_type: str 

242 connection_testable: bool 

243 dialects: list[str] = [] 

244 

245 

246class ConnectionFormWidgetInfo(NamedTuple): 

247 """Connection Form Widget information.""" 

248 

249 hook_class_name: str 

250 package_name: str 

251 field: Any 

252 field_name: str 

253 is_sensitive: bool 

254 

255 

256def log_optional_feature_disabled(class_name, e, provider_package): 

257 """Log optional feature disabled.""" 

258 log.debug( 

259 "Optional feature disabled on exception when importing '%s' from '%s' package", 

260 class_name, 

261 provider_package, 

262 exc_info=e, 

263 ) 

264 log.info( 

265 "Optional provider feature disabled when importing '%s' from '%s' package", 

266 class_name, 

267 provider_package, 

268 ) 

269 

270 

271def log_import_warning(class_name, e, provider_package): 

272 """Log import warning.""" 

273 log.warning( 

274 "Exception when importing '%s' from '%s' package", 

275 class_name, 

276 provider_package, 

277 exc_info=e, 

278 ) 

279 

280 

281# This is a temporary measure until all community providers will add AirflowOptionalProviderFeatureException 

282# where they have optional features. We are going to add tests in our CI to catch all such cases and will 

283# fix them, but until now all "known unhandled optional feature errors" from community providers 

284# should be added here 

285KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS = [("apache-airflow-providers-google", "No module named 'paramiko'")] 

286 

287 

288def _correctness_check(provider_package: str, class_name: str, provider_info: ProviderInfo) -> Any: 

289 """ 

290 Perform coherence check on provider classes. 

291 

292 For apache-airflow providers - it checks if it starts with appropriate package. For all providers 

293 it tries to import the provider - checking that there are no exceptions during importing. 

294 It logs appropriate warning in case it detects any problems. 

295 

296 :param provider_package: name of the provider package 

297 :param class_name: name of the class to import 

298 

299 :return the class if the class is OK, None otherwise. 

300 """ 

301 if not _check_builtin_provider_prefix(provider_package, class_name): 

302 return None 

303 try: 

304 imported_class = import_string(class_name) 

305 except AirflowOptionalProviderFeatureException as e: 

306 # When the provider class raises AirflowOptionalProviderFeatureException 

307 # this is an expected case when only some classes in provider are 

308 # available. We just log debug level here and print info message in logs so that 

309 # the user is aware of it 

310 log_optional_feature_disabled(class_name, e, provider_package) 

311 return None 

312 except ImportError as e: 

313 if "No module named 'airflow.providers." in e.msg: 

314 # handle cases where another provider is missing. This can only happen if 

315 # there is an optional feature, so we log debug and print information about it 

316 log_optional_feature_disabled(class_name, e, provider_package) 

317 return None 

318 for known_error in KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS: 

319 # Until we convert all providers to use AirflowOptionalProviderFeatureException 

320 # we assume any problem with importing another "provider" is because this is an 

321 # optional feature, so we log debug and print information about it 

322 if known_error[0] == provider_package and known_error[1] in e.msg: 

323 log_optional_feature_disabled(class_name, e, provider_package) 

324 return None 

325 # But when we have no idea - we print warning to logs 

326 log_import_warning(class_name, e, provider_package) 

327 return None 

328 except Exception as e: 

329 log_import_warning(class_name, e, provider_package) 

330 return None 

331 return imported_class 

332 

333 

334# We want to have better control over initialization of parameters and be able to debug and test it 

335# So we add our own decorator 

336def provider_info_cache(cache_name: str) -> Callable[[Callable[PS, None]], Callable[PS, None]]: 

337 """ 

338 Decorate and cache provider info. 

339 

340 Decorator factory that create decorator that caches initialization of provider's parameters 

341 :param cache_name: Name of the cache 

342 """ 

343 

344 def provider_info_cache_decorator(func: Callable[PS, None]) -> Callable[PS, None]: 

345 @wraps(func) 

346 def wrapped_function(*args: PS.args, **kwargs: PS.kwargs) -> None: 

347 providers_manager_instance = args[0] 

348 if TYPE_CHECKING: 

349 assert isinstance(providers_manager_instance, ProvidersManager) 

350 

351 if cache_name in providers_manager_instance._initialized_cache: 

352 return 

353 start_time = perf_counter() 

354 log.debug("Initializing Providers Manager[%s]", cache_name) 

355 func(*args, **kwargs) 

356 providers_manager_instance._initialized_cache[cache_name] = True 

357 log.debug( 

358 "Initialization of Providers Manager[%s] took %.2f seconds", 

359 cache_name, 

360 perf_counter() - start_time, 

361 ) 

362 

363 return wrapped_function 

364 

365 return provider_info_cache_decorator 

366 

367 

368class ProvidersManager(LoggingMixin): 

369 """ 

370 Manages all provider distributions. 

371 

372 This is a Singleton class. The first time it is 

373 instantiated, it discovers all available providers in installed packages. 

374 """ 

375 

376 resource_version = "0" 

377 _initialized: bool = False 

378 _initialization_stack_trace = None 

379 _instance: ProvidersManager | None = None 

380 

381 def __new__(cls): 

382 if cls._instance is None: 

383 cls._instance = super().__new__(cls) 

384 return cls._instance 

385 

386 @staticmethod 

387 def initialized() -> bool: 

388 return ProvidersManager._initialized 

389 

390 @staticmethod 

391 def initialization_stack_trace() -> str | None: 

392 return ProvidersManager._initialization_stack_trace 

393 

394 def __init__(self): 

395 """Initialize the manager.""" 

396 # skip initialization if already initialized 

397 if self.initialized(): 

398 return 

399 

400 super().__init__() 

401 ProvidersManager._initialized = True 

402 ProvidersManager._initialization_stack_trace = "".join(traceback.format_stack(inspect.currentframe())) 

403 self._initialized_cache: dict[str, bool] = {} 

404 # Keeps dict of providers keyed by module name 

405 self._provider_dict: dict[str, ProviderInfo] = {} 

406 self._fs_set: set[str] = set() 

407 self._asset_uri_handlers: dict[str, Callable[[SplitResult], SplitResult]] = {} 

408 self._asset_factories: dict[str, Callable[..., Asset]] = {} 

409 self._asset_to_openlineage_converters: dict[str, Callable] = {} 

410 self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache() 

411 # keeps mapping between connection_types and hook class, package they come from 

412 self._hook_provider_dict: dict[str, HookClassProvider] = {} 

413 self._dialect_provider_dict: dict[str, DialectInfo] = {} 

414 # Keeps dict of hooks keyed by connection type. They are lazy evaluated at access time 

415 self._hooks_lazy_dict: LazyDictWithCache[str, HookInfo | Callable] = LazyDictWithCache() 

416 # Keeps methods that should be used to add custom widgets tuple of keyed by name of the extra field 

417 self._connection_form_widgets: dict[str, ConnectionFormWidgetInfo] = {} 

418 # Customizations for javascript fields are kept here 

419 self._field_behaviours: dict[str, dict] = {} 

420 self._cli_command_functions_set: set[Callable[[], list[CLICommand]]] = set() 

421 self._cli_command_provider_name_set: set[str] = set() 

422 self._extra_link_class_name_set: set[str] = set() 

423 self._logging_class_name_set: set[str] = set() 

424 self._auth_manager_class_name_set: set[str] = set() 

425 self._auth_manager_without_check_set: set[tuple[str, str]] = set() 

426 self._secrets_backend_class_name_set: set[str] = set() 

427 self._executor_class_name_set: set[str] = set() 

428 self._executor_without_check_set: set[tuple[str, str]] = set() 

429 self._queue_class_name_set: set[str] = set() 

430 self._provider_configs: dict[str, dict[str, Any]] = {} 

431 self._trigger_info_set: set[TriggerInfo] = set() 

432 self._notification_info_set: set[NotificationInfo] = set() 

433 self._provider_schema_validator = _create_provider_info_schema_validator() 

434 self._customized_form_fields_schema_validator = ( 

435 _create_customized_form_field_behaviours_schema_validator() 

436 ) 

437 # Set of plugins contained in providers 

438 self._plugins_set: set[PluginInfo] = set() 

439 self._init_airflow_core_hooks() 

440 

441 def _init_airflow_core_hooks(self): 

442 """Initialize the hooks dict with default hooks from Airflow core.""" 

443 core_dummy_hooks = { 

444 "generic": "Generic", 

445 "email": "Email", 

446 } 

447 for key, display in core_dummy_hooks.items(): 

448 self._hooks_lazy_dict[key] = HookInfo( 

449 hook_class_name=None, 

450 connection_id_attribute_name=None, 

451 package_name=None, 

452 hook_name=display, 

453 connection_type=None, 

454 connection_testable=False, 

455 ) 

456 for conn_type, class_name in ( 

457 ("fs", "airflow.providers.standard.hooks.filesystem.FSHook"), 

458 ("package_index", "airflow.providers.standard.hooks.package_index.PackageIndexHook"), 

459 ): 

460 self._hooks_lazy_dict[conn_type] = functools.partial( 

461 self._import_hook, 

462 connection_type=None, 

463 package_name="apache-airflow-providers-standard", 

464 hook_class_name=class_name, 

465 provider_info=None, 

466 ) 

467 

468 @provider_info_cache("list") 

469 def initialize_providers_list(self): 

470 """Lazy initialization of providers list.""" 

471 # Local source folders are loaded first. They should take precedence over the package ones for 

472 # Development purpose. In production provider.yaml files are not present in the 'airflow" directory 

473 # So there is no risk we are going to override package provider accidentally. This can only happen 

474 # in case of local development 

475 self._discover_all_providers_from_packages() 

476 self._verify_all_providers_all_compatible() 

477 self._provider_dict = dict(sorted(self._provider_dict.items())) 

478 

479 def _verify_all_providers_all_compatible(self): 

480 from packaging import version as packaging_version 

481 

482 for provider_id, info in self._provider_dict.items(): 

483 min_version = MIN_PROVIDER_VERSIONS.get(provider_id) 

484 if min_version: 

485 if packaging_version.parse(min_version) > packaging_version.parse(info.version): 

486 log.warning( 

487 "The package %s is not compatible with this version of Airflow. " 

488 "The package has version %s but the minimum supported version " 

489 "of the package is %s", 

490 provider_id, 

491 info.version, 

492 min_version, 

493 ) 

494 

495 @provider_info_cache("hooks") 

496 def initialize_providers_hooks(self): 

497 """Lazy initialization of providers hooks.""" 

498 self._init_airflow_core_hooks() 

499 self.initialize_providers_list() 

500 self._discover_hooks() 

501 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items())) 

502 

503 @provider_info_cache("filesystems") 

504 def initialize_providers_filesystems(self): 

505 """Lazy initialization of providers filesystems.""" 

506 self.initialize_providers_list() 

507 self._discover_filesystems() 

508 

509 @provider_info_cache("asset_uris") 

510 def initialize_providers_asset_uri_resources(self): 

511 """Lazy initialization of provider asset URI handlers, factories, converters etc.""" 

512 self.initialize_providers_list() 

513 self._discover_asset_uri_resources() 

514 

515 @provider_info_cache("hook_lineage_writers") 

516 @provider_info_cache("taskflow_decorators") 

517 def initialize_providers_taskflow_decorator(self): 

518 """Lazy initialization of providers hooks.""" 

519 self.initialize_providers_list() 

520 self._discover_taskflow_decorators() 

521 

522 @provider_info_cache("extra_links") 

523 def initialize_providers_extra_links(self): 

524 """Lazy initialization of providers extra links.""" 

525 self.initialize_providers_list() 

526 self._discover_extra_links() 

527 

528 @provider_info_cache("logging") 

529 def initialize_providers_logging(self): 

530 """Lazy initialization of providers logging information.""" 

531 self.initialize_providers_list() 

532 self._discover_logging() 

533 

534 @provider_info_cache("secrets_backends") 

535 def initialize_providers_secrets_backends(self): 

536 """Lazy initialization of providers secrets_backends information.""" 

537 self.initialize_providers_list() 

538 self._discover_secrets_backends() 

539 

540 @provider_info_cache("executors") 

541 def initialize_providers_executors(self): 

542 """Lazy initialization of providers executors information.""" 

543 self.initialize_providers_list() 

544 self._discover_executors(check=True) 

545 

546 @provider_info_cache("executors_without_check") 

547 def initialize_providers_executors_without_check(self): 

548 """Lazy initialization of providers executors information.""" 

549 self.initialize_providers_list() 

550 self._discover_executors(check=False) 

551 

552 @provider_info_cache("queues") 

553 def initialize_providers_queues(self): 

554 """Lazy initialization of providers queue information.""" 

555 self.initialize_providers_list() 

556 self._discover_queues() 

557 

558 @provider_info_cache("notifications") 

559 def initialize_providers_notifications(self): 

560 """Lazy initialization of providers notifications information.""" 

561 self.initialize_providers_list() 

562 self._discover_notifications() 

563 

564 @provider_info_cache("auth_managers") 

565 def initialize_providers_auth_managers(self): 

566 """Lazy initialization of providers auth manager information.""" 

567 self.initialize_providers_list() 

568 self._discover_auth_managers(check=True) 

569 

570 @provider_info_cache("auth_managers_without_check") 

571 def initialize_providers_auth_managers_without_check(self): 

572 """Lazy initialization of providers auth manager information.""" 

573 self.initialize_providers_list() 

574 self._discover_auth_managers(check=False) 

575 

576 @provider_info_cache("config") 

577 def initialize_providers_configuration(self): 

578 """Lazy initialization of providers configuration information.""" 

579 self._initialize_providers_configuration() 

580 

581 def _initialize_providers_configuration(self): 

582 """ 

583 Initialize providers configuration information. 

584 

585 Should be used if we do not want to trigger caching for ``initialize_providers_configuration`` method. 

586 In some cases we might want to make sure that the configuration is initialized, but we do not want 

587 to cache the initialization method - for example when we just want to write configuration with 

588 providers, but it is used in the context where no providers are loaded yet we will eventually 

589 restore the original configuration and we want the subsequent ``initialize_providers_configuration`` 

590 method to be run in order to load the configuration for providers again. 

591 """ 

592 self.initialize_providers_list() 

593 self._discover_config() 

594 # Now update conf with the new provider configuration from providers 

595 from airflow.configuration import conf 

596 

597 conf.load_providers_configuration() 

598 

599 @provider_info_cache("plugins") 

600 def initialize_providers_plugins(self): 

601 self.initialize_providers_list() 

602 self._discover_plugins() 

603 

604 @provider_info_cache("cli_command") 

605 def initialize_providers_cli_command(self): 

606 """Lazy initialization of providers CLI commands.""" 

607 self.initialize_providers_list() 

608 self._discover_cli_command() 

609 

610 def _discover_all_providers_from_packages(self) -> None: 

611 """ 

612 Discover all providers by scanning packages installed. 

613 

614 The list of providers should be returned via the 'apache_airflow_provider' 

615 entrypoint as a dictionary conforming to the 'airflow/provider_info.schema.json' 

616 schema. Note that the schema is different at runtime than provider.yaml.schema.json. 

617 The development version of provider schema is more strict and changes together with 

618 the code. The runtime version is more relaxed (allows for additional properties) 

619 and verifies only the subset of fields that are needed at runtime. 

620 """ 

621 for entry_point, dist in entry_points_with_dist("apache_airflow_provider"): 

622 if not dist.metadata: 

623 continue 

624 package_name = canonicalize_name(dist.metadata["name"]) 

625 if package_name in self._provider_dict: 

626 continue 

627 log.debug("Loading %s from package %s", entry_point, package_name) 

628 version = dist.version 

629 provider_info = entry_point.load()() 

630 self._provider_schema_validator.validate(provider_info) 

631 provider_info_package_name = provider_info["package-name"] 

632 if package_name != provider_info_package_name: 

633 raise ValueError( 

634 f"The package '{package_name}' from packaging information " 

635 f"{provider_info_package_name} do not match. Please make sure they are aligned" 

636 ) 

637 

638 # issue-59576: Retrieve the project.urls.documentation from dist.metadata 

639 project_urls = dist.metadata.get_all("Project-URL") 

640 documentation_url: str | None = None 

641 

642 if project_urls: 

643 for entry in project_urls: 

644 if "," in entry: 

645 name, url = entry.split(",") 

646 if name.strip().lower() == "documentation": 

647 documentation_url = url 

648 break 

649 

650 provider_info["documentation-url"] = documentation_url 

651 

652 if package_name not in self._provider_dict: 

653 self._provider_dict[package_name] = ProviderInfo(version, provider_info) 

654 else: 

655 log.warning( 

656 "The provider for package '%s' could not be registered from because providers for that " 

657 "package name have already been registered", 

658 package_name, 

659 ) 

660 

661 def _discover_hooks_from_connection_types( 

662 self, 

663 hook_class_names_registered: set[str], 

664 already_registered_warning_connection_types: set[str], 

665 package_name: str, 

666 provider: ProviderInfo, 

667 ): 

668 """ 

669 Discover hooks from the "connection-types" property. 

670 

671 This is new, better method that replaces discovery from hook-class-names as it 

672 allows to lazy import individual Hook classes when they are accessed. 

673 The "connection-types" keeps information about both - connection type and class 

674 name so we can discover all connection-types without importing the classes. 

675 :param hook_class_names_registered: set of registered hook class names for this provider 

676 :param already_registered_warning_connection_types: set of connections for which warning should be 

677 printed in logs as they were already registered before 

678 :param package_name: 

679 :param provider: 

680 :return: 

681 """ 

682 provider_uses_connection_types = False 

683 connection_types = provider.data.get("connection-types") 

684 if connection_types: 

685 for connection_type_dict in connection_types: 

686 connection_type = connection_type_dict["connection-type"] 

687 hook_class_name = connection_type_dict["hook-class-name"] 

688 hook_class_names_registered.add(hook_class_name) 

689 already_registered = self._hook_provider_dict.get(connection_type) 

690 if already_registered: 

691 if already_registered.package_name != package_name: 

692 already_registered_warning_connection_types.add(connection_type) 

693 else: 

694 log.warning( 

695 "The connection type '%s' is already registered in the" 

696 " package '%s' with different class names: '%s' and '%s'. ", 

697 connection_type, 

698 package_name, 

699 already_registered.hook_class_name, 

700 hook_class_name, 

701 ) 

702 else: 

703 self._hook_provider_dict[connection_type] = HookClassProvider( 

704 hook_class_name=hook_class_name, package_name=package_name 

705 ) 

706 # Defer importing hook to access time by setting import hook method as dict value 

707 self._hooks_lazy_dict[connection_type] = functools.partial( 

708 self._import_hook, 

709 connection_type=connection_type, 

710 provider_info=provider, 

711 ) 

712 provider_uses_connection_types = True 

713 return provider_uses_connection_types 

714 

715 def _discover_hooks_from_hook_class_names( 

716 self, 

717 hook_class_names_registered: set[str], 

718 already_registered_warning_connection_types: set[str], 

719 package_name: str, 

720 provider: ProviderInfo, 

721 provider_uses_connection_types: bool, 

722 ): 

723 """ 

724 Discover hooks from "hook-class-names' property. 

725 

726 This property is deprecated but we should support it in Airflow 2. 

727 The hook-class-names array contained just Hook names without connection type, 

728 therefore we need to import all those classes immediately to know which connection types 

729 are supported. This makes it impossible to selectively only import those hooks that are used. 

730 :param already_registered_warning_connection_types: list of connection hooks that we should warn 

731 about when finished discovery 

732 :param package_name: name of the provider package 

733 :param provider: class that keeps information about version and details of the provider 

734 :param provider_uses_connection_types: determines whether the provider uses "connection-types" new 

735 form of passing connection types 

736 :return: 

737 """ 

738 hook_class_names = provider.data.get("hook-class-names") 

739 if hook_class_names: 

740 for hook_class_name in hook_class_names: 

741 if hook_class_name in hook_class_names_registered: 

742 # Silently ignore the hook class - it's already marked for lazy-import by 

743 # connection-types discovery 

744 continue 

745 hook_info = self._import_hook( 

746 connection_type=None, 

747 provider_info=provider, 

748 hook_class_name=hook_class_name, 

749 package_name=package_name, 

750 ) 

751 if not hook_info: 

752 # Problem why importing class - we ignore it. Log is written at import time 

753 continue 

754 already_registered = self._hook_provider_dict.get(hook_info.connection_type) 

755 if already_registered: 

756 if already_registered.package_name != package_name: 

757 already_registered_warning_connection_types.add(hook_info.connection_type) 

758 else: 

759 if already_registered.hook_class_name != hook_class_name: 

760 log.warning( 

761 "The hook connection type '%s' is registered twice in the" 

762 " package '%s' with different class names: '%s' and '%s'. " 

763 " Please fix it!", 

764 hook_info.connection_type, 

765 package_name, 

766 already_registered.hook_class_name, 

767 hook_class_name, 

768 ) 

769 else: 

770 self._hook_provider_dict[hook_info.connection_type] = HookClassProvider( 

771 hook_class_name=hook_class_name, package_name=package_name 

772 ) 

773 self._hooks_lazy_dict[hook_info.connection_type] = hook_info 

774 

775 if not provider_uses_connection_types: 

776 warnings.warn( 

777 f"The provider {package_name} uses `hook-class-names` " 

778 "property in provider-info and has no `connection-types` one. " 

779 "The 'hook-class-names' property has been deprecated in favour " 

780 "of 'connection-types' in Airflow 2.2. Use **both** in case you want to " 

781 "have backwards compatibility with Airflow < 2.2", 

782 DeprecationWarning, 

783 stacklevel=1, 

784 ) 

785 for already_registered_connection_type in already_registered_warning_connection_types: 

786 log.warning( 

787 "The connection_type '%s' has been already registered by provider '%s.'", 

788 already_registered_connection_type, 

789 self._hook_provider_dict[already_registered_connection_type].package_name, 

790 ) 

791 

792 def _discover_hooks(self) -> None: 

793 """Retrieve all connections defined in the providers via Hooks.""" 

794 for package_name, provider in self._provider_dict.items(): 

795 duplicated_connection_types: set[str] = set() 

796 hook_class_names_registered: set[str] = set() 

797 self._discover_provider_dialects(package_name, provider) 

798 provider_uses_connection_types = self._discover_hooks_from_connection_types( 

799 hook_class_names_registered, duplicated_connection_types, package_name, provider 

800 ) 

801 self._discover_hooks_from_hook_class_names( 

802 hook_class_names_registered, 

803 duplicated_connection_types, 

804 package_name, 

805 provider, 

806 provider_uses_connection_types, 

807 ) 

808 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items())) 

809 

810 def _discover_provider_dialects(self, provider_name: str, provider: ProviderInfo): 

811 dialects = provider.data.get("dialects", []) 

812 if dialects: 

813 self._dialect_provider_dict.update( 

814 { 

815 item["dialect-type"]: DialectInfo( 

816 name=item["dialect-type"], 

817 dialect_class_name=item["dialect-class-name"], 

818 provider_name=provider_name, 

819 ) 

820 for item in dialects 

821 } 

822 ) 

823 

824 @provider_info_cache("import_all_hooks") 

825 def _import_info_from_all_hooks(self): 

826 """Force-import all hooks and initialize the connections/fields.""" 

827 # Retrieve all hooks to make sure that all of them are imported 

828 _ = list(self._hooks_lazy_dict.values()) 

829 self._field_behaviours = dict(sorted(self._field_behaviours.items())) 

830 

831 # Widgets for connection forms are currently used in two places: 

832 # 1. In the UI Connections, expected same order that it defined in Hook. 

833 # 2. cli command - `airflow providers widgets` and expected that it in alphabetical order. 

834 # It is not possible to recover original ordering after sorting, 

835 # that the main reason why original sorting moved to cli part: 

836 # self._connection_form_widgets = dict(sorted(self._connection_form_widgets.items())) 

837 

838 def _discover_filesystems(self) -> None: 

839 """Retrieve all filesystems defined in the providers.""" 

840 for provider_package, provider in self._provider_dict.items(): 

841 for fs_module_name in provider.data.get("filesystems", []): 

842 if _correctness_check(provider_package, f"{fs_module_name}.get_fs", provider): 

843 self._fs_set.add(fs_module_name) 

844 self._fs_set = set(sorted(self._fs_set)) 

845 

846 def _discover_asset_uri_resources(self) -> None: 

847 """Discovers and registers asset URI handlers, factories, and converters for all providers.""" 

848 from airflow.sdk.definitions.asset import normalize_noop 

849 

850 def _safe_register_resource( 

851 provider_package_name: str, 

852 schemes_list: list[str], 

853 resource_path: str | None, 

854 resource_registry: dict, 

855 default_resource: Any = None, 

856 ): 

857 """ 

858 Register a specific resource (handler, factory, or converter) for the given schemes. 

859 

860 If the resolved resource (either from the path or the default) is valid, it updates 

861 the resource registry with the appropriate resource for each scheme. 

862 """ 

863 resource = ( 

864 _correctness_check(provider_package_name, resource_path, provider) 

865 if resource_path is not None 

866 else default_resource 

867 ) 

868 if resource: 

869 resource_registry.update((scheme, resource) for scheme in schemes_list) 

870 

871 for provider_name, provider in self._provider_dict.items(): 

872 for uri_info in provider.data.get("asset-uris", []): 

873 if "schemes" not in uri_info or "handler" not in uri_info: 

874 continue # Both schemas and handler must be explicitly set, handler can be set to null 

875 common_args = {"schemes_list": uri_info["schemes"], "provider_package_name": provider_name} 

876 _safe_register_resource( 

877 resource_path=uri_info["handler"], 

878 resource_registry=self._asset_uri_handlers, 

879 default_resource=normalize_noop, 

880 **common_args, 

881 ) 

882 _safe_register_resource( 

883 resource_path=uri_info.get("factory"), 

884 resource_registry=self._asset_factories, 

885 **common_args, 

886 ) 

887 _safe_register_resource( 

888 resource_path=uri_info.get("to_openlineage_converter"), 

889 resource_registry=self._asset_to_openlineage_converters, 

890 **common_args, 

891 ) 

892 

893 def _discover_taskflow_decorators(self) -> None: 

894 for name, info in self._provider_dict.items(): 

895 for taskflow_decorator in info.data.get("task-decorators", []): 

896 self._add_taskflow_decorator( 

897 taskflow_decorator["name"], taskflow_decorator["class-name"], name 

898 ) 

899 

900 def _add_taskflow_decorator(self, name, decorator_class_name: str, provider_package: str) -> None: 

901 if not _check_builtin_provider_prefix(provider_package, decorator_class_name): 

902 return 

903 

904 if name in self._taskflow_decorators: 

905 try: 

906 existing = self._taskflow_decorators[name] 

907 other_name = f"{existing.__module__}.{existing.__name__}" 

908 except Exception: 

909 # If problem importing, then get the value from the functools.partial 

910 other_name = self._taskflow_decorators._raw_dict[name].args[0] # type: ignore[attr-defined] 

911 

912 log.warning( 

913 "The taskflow decorator '%s' has been already registered (by %s).", 

914 name, 

915 other_name, 

916 ) 

917 return 

918 

919 self._taskflow_decorators[name] = functools.partial(import_string, decorator_class_name) 

920 

921 @staticmethod 

922 def _get_attr(obj: Any, attr_name: str): 

923 """Retrieve attributes of an object, or warn if not found.""" 

924 if not hasattr(obj, attr_name): 

925 log.warning("The object '%s' is missing %s attribute and cannot be registered", obj, attr_name) 

926 return None 

927 return getattr(obj, attr_name) 

928 

929 def _import_hook( 

930 self, 

931 connection_type: str | None, 

932 provider_info: ProviderInfo, 

933 hook_class_name: str | None = None, 

934 package_name: str | None = None, 

935 ) -> HookInfo | None: 

936 """ 

937 Import hook and retrieve hook information. 

938 

939 Either connection_type (for lazy loading) or hook_class_name must be set - but not both). 

940 Only needs package_name if hook_class_name is passed (for lazy loading, package_name 

941 is retrieved from _connection_type_class_provider_dict together with hook_class_name). 

942 

943 :param connection_type: type of the connection 

944 :param hook_class_name: name of the hook class 

945 :param package_name: provider package - only needed in case connection_type is missing 

946 : return 

947 """ 

948 if connection_type is None and hook_class_name is None: 

949 raise ValueError("Either connection_type or hook_class_name must be set") 

950 if connection_type is not None and hook_class_name is not None: 

951 raise ValueError( 

952 f"Both connection_type ({connection_type} and " 

953 f"hook_class_name {hook_class_name} are set. Only one should be set!" 

954 ) 

955 if connection_type is not None: 

956 class_provider = self._hook_provider_dict[connection_type] 

957 package_name = class_provider.package_name 

958 hook_class_name = class_provider.hook_class_name 

959 else: 

960 if not hook_class_name: 

961 raise ValueError("Either connection_type or hook_class_name must be set") 

962 if not package_name: 

963 raise ValueError( 

964 f"Provider package name is not set when hook_class_name ({hook_class_name}) is used" 

965 ) 

966 hook_class: type[BaseHook] | None = _correctness_check(package_name, hook_class_name, provider_info) 

967 if hook_class is None: 

968 return None 

969 try: 

970 from wtforms import BooleanField, IntegerField, PasswordField, StringField 

971 

972 allowed_field_classes = [IntegerField, PasswordField, StringField, BooleanField] 

973 module, class_name = hook_class_name.rsplit(".", maxsplit=1) 

974 # Do not use attr here. We want to check only direct class fields not those 

975 # inherited from parent hook. This way we add form fields only once for the whole 

976 # hierarchy and we add it only from the parent hook that provides those! 

977 if "get_connection_form_widgets" in hook_class.__dict__: 

978 widgets = hook_class.get_connection_form_widgets() 

979 if widgets: 

980 for widget in widgets.values(): 

981 if widget.field_class not in allowed_field_classes: 

982 log.warning( 

983 "The hook_class '%s' uses field of unsupported class '%s'. " 

984 "Only '%s' field classes are supported", 

985 hook_class_name, 

986 widget.field_class, 

987 allowed_field_classes, 

988 ) 

989 return None 

990 self._add_widgets(package_name, hook_class, widgets) 

991 if "get_ui_field_behaviour" in hook_class.__dict__: 

992 field_behaviours = hook_class.get_ui_field_behaviour() 

993 if field_behaviours: 

994 self._add_customized_fields(package_name, hook_class, field_behaviours) 

995 except ImportError as e: 

996 if e.name in ["flask_appbuilder", "wtforms"]: 

997 log.info( 

998 "The hook_class '%s' is not fully initialized (UI widgets will be missing), because " 

999 "the 'flask_appbuilder' package is not installed, however it is not required for " 

1000 "Airflow components to work", 

1001 hook_class_name, 

1002 ) 

1003 except Exception as e: 

1004 log.warning( 

1005 "Exception when importing '%s' from '%s' package: %s", 

1006 hook_class_name, 

1007 package_name, 

1008 e, 

1009 ) 

1010 return None 

1011 hook_connection_type = self._get_attr(hook_class, "conn_type") 

1012 if connection_type: 

1013 if hook_connection_type != connection_type: 

1014 log.warning( 

1015 "Inconsistency! The hook class '%s' declares connection type '%s'" 

1016 " but it is added by provider '%s' as connection_type '%s' in provider info. " 

1017 "This should be fixed!", 

1018 hook_class, 

1019 hook_connection_type, 

1020 package_name, 

1021 connection_type, 

1022 ) 

1023 connection_type = hook_connection_type 

1024 connection_id_attribute_name: str = self._get_attr(hook_class, "conn_name_attr") 

1025 hook_name: str = self._get_attr(hook_class, "hook_name") 

1026 

1027 if not connection_type or not connection_id_attribute_name or not hook_name: 

1028 log.warning( 

1029 "The hook misses one of the key attributes: " 

1030 "conn_type: %s, conn_id_attribute_name: %s, hook_name: %s", 

1031 connection_type, 

1032 connection_id_attribute_name, 

1033 hook_name, 

1034 ) 

1035 return None 

1036 

1037 return HookInfo( 

1038 hook_class_name=hook_class_name, 

1039 connection_id_attribute_name=connection_id_attribute_name, 

1040 package_name=package_name, 

1041 hook_name=hook_name, 

1042 connection_type=connection_type, 

1043 connection_testable=hasattr(hook_class, "test_connection"), 

1044 ) 

1045 

1046 def _add_widgets(self, package_name: str, hook_class: type, widgets: dict[str, Any]): 

1047 conn_type = hook_class.conn_type # type: ignore 

1048 for field_identifier, field in widgets.items(): 

1049 if field_identifier.startswith("extra__"): 

1050 prefixed_field_name = field_identifier 

1051 else: 

1052 prefixed_field_name = f"extra__{conn_type}__{field_identifier}" 

1053 if prefixed_field_name in self._connection_form_widgets: 

1054 log.warning( 

1055 "The field %s from class %s has already been added by another provider. Ignoring it.", 

1056 field_identifier, 

1057 hook_class.__name__, 

1058 ) 

1059 # In case of inherited hooks this might be happening several times 

1060 else: 

1061 self._connection_form_widgets[prefixed_field_name] = ConnectionFormWidgetInfo( 

1062 hook_class.__name__, 

1063 package_name, 

1064 field, 

1065 field_identifier, 

1066 hasattr(field.field_class.widget, "input_type") 

1067 and field.field_class.widget.input_type == "password", 

1068 ) 

1069 

1070 def _add_customized_fields(self, package_name: str, hook_class: type, customized_fields: dict): 

1071 try: 

1072 connection_type = getattr(hook_class, "conn_type") 

1073 

1074 self._customized_form_fields_schema_validator.validate(customized_fields) 

1075 

1076 if connection_type: 

1077 customized_fields = _ensure_prefix_for_placeholders(customized_fields, connection_type) 

1078 

1079 if connection_type in self._field_behaviours: 

1080 log.warning( 

1081 "The connection_type %s from package %s and class %s has already been added " 

1082 "by another provider. Ignoring it.", 

1083 connection_type, 

1084 package_name, 

1085 hook_class.__name__, 

1086 ) 

1087 return 

1088 self._field_behaviours[connection_type] = customized_fields 

1089 except Exception as e: 

1090 log.warning( 

1091 "Error when loading customized fields from package '%s' hook class '%s': %s", 

1092 package_name, 

1093 hook_class.__name__, 

1094 e, 

1095 ) 

1096 

1097 def _discover_auth_managers(self, *, check: bool) -> None: 

1098 """Retrieve all auth managers defined in the providers.""" 

1099 for provider_package, provider in self._provider_dict.items(): 

1100 if provider.data.get("auth-managers"): 

1101 for auth_manager_class_name in provider.data["auth-managers"]: 

1102 if not check: 

1103 self._auth_manager_without_check_set.add((auth_manager_class_name, provider_package)) 

1104 elif _correctness_check(provider_package, auth_manager_class_name, provider): 

1105 self._auth_manager_class_name_set.add(auth_manager_class_name) 

1106 

1107 def _discover_cli_command(self) -> None: 

1108 """Retrieve all CLI command functions defined in the providers.""" 

1109 for provider_package, provider in self._provider_dict.items(): 

1110 if provider.data.get("cli"): 

1111 for cli_command_function_name in provider.data["cli"]: 

1112 # _correctness_check will return the function if found and correct 

1113 # we store the function itself instead of its name to avoid importing it again later in cli_parser to speed up cli loading 

1114 if cli_func := _correctness_check(provider_package, cli_command_function_name, provider): 

1115 cli_func = cast("Callable[[], list[CLICommand]]", cli_func) 

1116 self._cli_command_functions_set.add(cli_func) 

1117 self._cli_command_provider_name_set.add(provider_package) 

1118 

1119 def _discover_notifications(self) -> None: 

1120 """Retrieve all notifications defined in the providers.""" 

1121 for provider_package, provider in self._provider_dict.items(): 

1122 if provider.data.get("notifications"): 

1123 for notification_class_name in provider.data["notifications"]: 

1124 if _correctness_check(provider_package, notification_class_name, provider): 

1125 self._notification_info_set.add(notification_class_name) 

1126 

1127 def _discover_extra_links(self) -> None: 

1128 """Retrieve all extra links defined in the providers.""" 

1129 for provider_package, provider in self._provider_dict.items(): 

1130 if provider.data.get("extra-links"): 

1131 for extra_link_class_name in provider.data["extra-links"]: 

1132 if _correctness_check(provider_package, extra_link_class_name, provider): 

1133 self._extra_link_class_name_set.add(extra_link_class_name) 

1134 

1135 def _discover_logging(self) -> None: 

1136 """Retrieve all logging defined in the providers.""" 

1137 for provider_package, provider in self._provider_dict.items(): 

1138 if provider.data.get("logging"): 

1139 for logging_class_name in provider.data["logging"]: 

1140 if _correctness_check(provider_package, logging_class_name, provider): 

1141 self._logging_class_name_set.add(logging_class_name) 

1142 

1143 def _discover_secrets_backends(self) -> None: 

1144 """Retrieve all secrets backends defined in the providers.""" 

1145 for provider_package, provider in self._provider_dict.items(): 

1146 if provider.data.get("secrets-backends"): 

1147 for secrets_backends_class_name in provider.data["secrets-backends"]: 

1148 if _correctness_check(provider_package, secrets_backends_class_name, provider): 

1149 self._secrets_backend_class_name_set.add(secrets_backends_class_name) 

1150 

1151 def _discover_executors(self, *, check: bool) -> None: 

1152 """Retrieve all executors defined in the providers.""" 

1153 for provider_package, provider in self._provider_dict.items(): 

1154 if provider.data.get("executors"): 

1155 for executors_class_path in provider.data["executors"]: 

1156 if not check: 

1157 self._executor_without_check_set.add((executors_class_path, provider_package)) 

1158 elif _correctness_check(provider_package, executors_class_path, provider): 

1159 self._executor_class_name_set.add(executors_class_path) 

1160 

1161 def _discover_queues(self) -> None: 

1162 """Retrieve all queues defined in the providers.""" 

1163 for provider_package, provider in self._provider_dict.items(): 

1164 if provider.data.get("queues"): 

1165 for queue_class_name in provider.data["queues"]: 

1166 if _correctness_check(provider_package, queue_class_name, provider): 

1167 self._queue_class_name_set.add(queue_class_name) 

1168 

1169 def _discover_config(self) -> None: 

1170 """Retrieve all configs defined in the providers.""" 

1171 for provider_package, provider in self._provider_dict.items(): 

1172 if provider.data.get("config"): 

1173 self._provider_configs[provider_package] = provider.data.get("config") # type: ignore[assignment] 

1174 

1175 def _discover_plugins(self) -> None: 

1176 """Retrieve all plugins defined in the providers.""" 

1177 for provider_package, provider in self._provider_dict.items(): 

1178 for plugin_dict in provider.data.get("plugins", ()): 

1179 if not _correctness_check(provider_package, plugin_dict["plugin-class"], provider): 

1180 log.warning("Plugin not loaded due to above correctness check problem.") 

1181 continue 

1182 self._plugins_set.add( 

1183 PluginInfo( 

1184 name=plugin_dict["name"], 

1185 plugin_class=plugin_dict["plugin-class"], 

1186 provider_name=provider_package, 

1187 ) 

1188 ) 

1189 

1190 @provider_info_cache("triggers") 

1191 def initialize_providers_triggers(self): 

1192 """Initialize providers triggers.""" 

1193 self.initialize_providers_list() 

1194 for provider_package, provider in self._provider_dict.items(): 

1195 for trigger in provider.data.get("triggers", []): 

1196 for trigger_class_name in trigger.get("python-modules"): 

1197 self._trigger_info_set.add( 

1198 TriggerInfo( 

1199 package_name=provider_package, 

1200 trigger_class_name=trigger_class_name, 

1201 integration_name=trigger.get("integration-name", ""), 

1202 ) 

1203 ) 

1204 

1205 @property 

1206 def auth_managers(self) -> list[str]: 

1207 """Returns information about available providers notifications class.""" 

1208 self.initialize_providers_auth_managers() 

1209 return sorted(self._auth_manager_class_name_set) 

1210 

1211 @property 

1212 def auth_manager_without_check(self) -> set[tuple[str, str]]: 

1213 """Returns set of (auth manager class names, provider package name) without correctness check.""" 

1214 self.initialize_providers_auth_managers_without_check() 

1215 return self._auth_manager_without_check_set 

1216 

1217 @property 

1218 def cli_command_functions(self) -> set[Callable[[], list[CLICommand]]]: 

1219 """Returns list of CLI command function names from providers.""" 

1220 self.initialize_providers_cli_command() 

1221 return self._cli_command_functions_set 

1222 

1223 @property 

1224 def cli_command_providers(self) -> set[str]: 

1225 """Returns set of provider package names that provide CLI commands.""" 

1226 self.initialize_providers_cli_command() 

1227 return self._cli_command_provider_name_set 

1228 

1229 @property 

1230 def notification(self) -> list[NotificationInfo]: 

1231 """Returns information about available providers notifications class.""" 

1232 self.initialize_providers_notifications() 

1233 return sorted(self._notification_info_set) 

1234 

1235 @property 

1236 def trigger(self) -> list[TriggerInfo]: 

1237 """Returns information about available providers trigger class.""" 

1238 self.initialize_providers_triggers() 

1239 return sorted(self._trigger_info_set, key=lambda x: x.package_name) 

1240 

1241 @property 

1242 def providers(self) -> dict[str, ProviderInfo]: 

1243 """Returns information about available providers.""" 

1244 self.initialize_providers_list() 

1245 return self._provider_dict 

1246 

1247 @property 

1248 def hooks(self) -> MutableMapping[str, HookInfo | None]: 

1249 """ 

1250 Return dictionary of connection_type-to-hook mapping. 

1251 

1252 Note that the dict can contain None values if a hook discovered cannot be imported! 

1253 """ 

1254 self.initialize_providers_hooks() 

1255 # When we return hooks here it will only be used to retrieve hook information 

1256 return self._hooks_lazy_dict 

1257 

1258 @property 

1259 def dialects(self) -> MutableMapping[str, DialectInfo]: 

1260 """Return dictionary of connection_type-to-dialect mapping.""" 

1261 self.initialize_providers_hooks() 

1262 # When we return dialects here it will only be used to retrieve dialect information 

1263 return self._dialect_provider_dict 

1264 

1265 @property 

1266 def plugins(self) -> list[PluginInfo]: 

1267 """Returns information about plugins available in providers.""" 

1268 self.initialize_providers_plugins() 

1269 return sorted(self._plugins_set, key=lambda x: x.plugin_class) 

1270 

1271 @property 

1272 def taskflow_decorators(self) -> dict[str, TaskDecorator]: 

1273 self.initialize_providers_taskflow_decorator() 

1274 return self._taskflow_decorators # type: ignore[return-value] 

1275 

1276 @property 

1277 def extra_links_class_names(self) -> list[str]: 

1278 """Returns set of extra link class names.""" 

1279 self.initialize_providers_extra_links() 

1280 return sorted(self._extra_link_class_name_set) 

1281 

1282 @property 

1283 def connection_form_widgets(self) -> dict[str, ConnectionFormWidgetInfo]: 

1284 """ 

1285 Returns widgets for connection forms. 

1286 

1287 Dictionary keys in the same order that it defined in Hook. 

1288 """ 

1289 self.initialize_providers_hooks() 

1290 self._import_info_from_all_hooks() 

1291 return self._connection_form_widgets 

1292 

1293 @property 

1294 def field_behaviours(self) -> dict[str, dict]: 

1295 """Returns dictionary with field behaviours for connection types.""" 

1296 self.initialize_providers_hooks() 

1297 self._import_info_from_all_hooks() 

1298 return self._field_behaviours 

1299 

1300 @property 

1301 def logging_class_names(self) -> list[str]: 

1302 """Returns set of log task handlers class names.""" 

1303 self.initialize_providers_logging() 

1304 return sorted(self._logging_class_name_set) 

1305 

1306 @property 

1307 def secrets_backend_class_names(self) -> list[str]: 

1308 """Returns set of secret backend class names.""" 

1309 self.initialize_providers_secrets_backends() 

1310 return sorted(self._secrets_backend_class_name_set) 

1311 

1312 @property 

1313 def executor_class_names(self) -> list[str]: 

1314 self.initialize_providers_executors() 

1315 return sorted(self._executor_class_name_set) 

1316 

1317 @property 

1318 def executor_without_check(self) -> set[tuple[str, str]]: 

1319 """Returns set of (executor class names, provider package name) without correctness check.""" 

1320 self.initialize_providers_executors_without_check() 

1321 return self._executor_without_check_set 

1322 

1323 @property 

1324 def queue_class_names(self) -> list[str]: 

1325 self.initialize_providers_queues() 

1326 return sorted(self._queue_class_name_set) 

1327 

1328 @property 

1329 def filesystem_module_names(self) -> list[str]: 

1330 self.initialize_providers_filesystems() 

1331 return sorted(self._fs_set) 

1332 

1333 @property 

1334 def asset_factories(self) -> dict[str, Callable[..., Asset]]: 

1335 self.initialize_providers_asset_uri_resources() 

1336 return self._asset_factories 

1337 

1338 @property 

1339 def asset_uri_handlers(self) -> dict[str, Callable[[SplitResult], SplitResult]]: 

1340 self.initialize_providers_asset_uri_resources() 

1341 return self._asset_uri_handlers 

1342 

1343 @property 

1344 def asset_to_openlineage_converters( 

1345 self, 

1346 ) -> dict[str, Callable]: 

1347 self.initialize_providers_asset_uri_resources() 

1348 return self._asset_to_openlineage_converters 

1349 

1350 @property 

1351 def provider_configs(self) -> list[tuple[str, dict[str, Any]]]: 

1352 self.initialize_providers_configuration() 

1353 return sorted(self._provider_configs.items(), key=lambda x: x[0]) 

1354 

1355 @property 

1356 def already_initialized_provider_configs(self) -> list[tuple[str, dict[str, Any]]]: 

1357 return sorted(self._provider_configs.items(), key=lambda x: x[0]) 

1358 

1359 def _cleanup(self): 

1360 self._initialized_cache.clear() 

1361 self._provider_dict.clear() 

1362 self._fs_set.clear() 

1363 self._taskflow_decorators.clear() 

1364 self._hook_provider_dict.clear() 

1365 self._dialect_provider_dict.clear() 

1366 self._hooks_lazy_dict.clear() 

1367 self._connection_form_widgets.clear() 

1368 self._field_behaviours.clear() 

1369 self._extra_link_class_name_set.clear() 

1370 self._logging_class_name_set.clear() 

1371 self._auth_manager_class_name_set.clear() 

1372 self._auth_manager_without_check_set.clear() 

1373 self._secrets_backend_class_name_set.clear() 

1374 self._executor_class_name_set.clear() 

1375 self._executor_without_check_set.clear() 

1376 self._queue_class_name_set.clear() 

1377 self._provider_configs.clear() 

1378 self._trigger_info_set.clear() 

1379 self._notification_info_set.clear() 

1380 self._plugins_set.clear() 

1381 self._cli_command_functions_set.clear() 

1382 self._cli_command_provider_name_set.clear() 

1383 

1384 self._initialized = False 

1385 self._initialization_stack_trace = None