Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/providers_manager.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

701 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Manages all providers.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import functools 

24import inspect 

25import json 

26import logging 

27import traceback 

28import warnings 

29from collections.abc import Callable, MutableMapping 

30from dataclasses import dataclass 

31from functools import wraps 

32from importlib.resources import files as resource_files 

33from time import perf_counter 

34from typing import TYPE_CHECKING, Any, NamedTuple, ParamSpec, TypeVar, cast 

35 

36from packaging.utils import canonicalize_name 

37 

38from airflow._shared.module_loading import entry_points_with_dist, import_string 

39from airflow.exceptions import AirflowOptionalProviderFeatureException 

40from airflow.utils.log.logging_mixin import LoggingMixin 

41from airflow.utils.singleton import Singleton 

42 

43if TYPE_CHECKING: 

44 from airflow.cli.cli_config import CLICommand 

45 

46log = logging.getLogger(__name__) 

47 

48 

49PS = ParamSpec("PS") 

50RT = TypeVar("RT") 

51 

52MIN_PROVIDER_VERSIONS = { 

53 "apache-airflow-providers-celery": "2.1.0", 

54} 

55 

56 

57def _ensure_prefix_for_placeholders(field_behaviors: dict[str, Any], conn_type: str): 

58 """ 

59 Verify the correct placeholder prefix. 

60 

61 If the given field_behaviors dict contains a placeholder's node, and there 

62 are placeholders for extra fields (i.e. anything other than the built-in conn 

63 attrs), and if those extra fields are unprefixed, then add the prefix. 

64 

65 The reason we need to do this is, all custom conn fields live in the same dictionary, 

66 so we need to namespace them with a prefix internally. But for user convenience, 

67 and consistency between the `get_ui_field_behaviour` method and the extra dict itself, 

68 we allow users to supply the unprefixed name. 

69 """ 

70 conn_attrs = {"host", "schema", "login", "password", "port", "extra"} 

71 

72 def ensure_prefix(field): 

73 if field not in conn_attrs and not field.startswith("extra__"): 

74 return f"extra__{conn_type}__{field}" 

75 return field 

76 

77 if "placeholders" in field_behaviors: 

78 placeholders = field_behaviors["placeholders"] 

79 field_behaviors["placeholders"] = {ensure_prefix(k): v for k, v in placeholders.items()} 

80 

81 return field_behaviors 

82 

83 

84if TYPE_CHECKING: 

85 from urllib.parse import SplitResult 

86 

87 from airflow.sdk import BaseHook 

88 from airflow.sdk.bases.decorator import TaskDecorator 

89 from airflow.sdk.definitions.asset import Asset 

90 

91 

92class LazyDictWithCache(MutableMapping): 

93 """ 

94 Lazy-loaded cached dictionary. 

95 

96 Dictionary, which in case you set callable, executes the passed callable with `key` attribute 

97 at first use - and returns and caches the result. 

98 """ 

99 

100 __slots__ = ["_resolved", "_raw_dict"] 

101 

102 def __init__(self, *args, **kw): 

103 self._resolved = set() 

104 self._raw_dict = dict(*args, **kw) 

105 

106 def __setitem__(self, key, value): 

107 self._raw_dict.__setitem__(key, value) 

108 

109 def __getitem__(self, key): 

110 value = self._raw_dict.__getitem__(key) 

111 if key not in self._resolved and callable(value): 

112 # exchange callable with result of calling it -- but only once! allow resolver to return a 

113 # callable itself 

114 value = value() 

115 self._resolved.add(key) 

116 self._raw_dict.__setitem__(key, value) 

117 return value 

118 

119 def __delitem__(self, key): 

120 with contextlib.suppress(KeyError): 

121 self._resolved.remove(key) 

122 self._raw_dict.__delitem__(key) 

123 

124 def __iter__(self): 

125 return iter(self._raw_dict) 

126 

127 def __len__(self): 

128 return len(self._raw_dict) 

129 

130 def __contains__(self, key): 

131 return key in self._raw_dict 

132 

133 def clear(self): 

134 self._resolved.clear() 

135 self._raw_dict.clear() 

136 

137 

138def _read_schema_from_resources_or_local_file(filename: str) -> dict: 

139 try: 

140 with resource_files("airflow").joinpath(filename).open("rb") as f: 

141 schema = json.load(f) 

142 except (TypeError, FileNotFoundError): 

143 import pathlib 

144 

145 with (pathlib.Path(__file__).parent / filename).open("rb") as f: 

146 schema = json.load(f) 

147 return schema 

148 

149 

150def _create_provider_info_schema_validator(): 

151 """Create JSON schema validator from the provider_info.schema.json.""" 

152 import jsonschema 

153 

154 schema = _read_schema_from_resources_or_local_file("provider_info.schema.json") 

155 cls = jsonschema.validators.validator_for(schema) 

156 validator = cls(schema) 

157 return validator 

158 

159 

160def _create_customized_form_field_behaviours_schema_validator(): 

161 """Create JSON schema validator from the customized_form_field_behaviours.schema.json.""" 

162 import jsonschema 

163 

164 schema = _read_schema_from_resources_or_local_file("customized_form_field_behaviours.schema.json") 

165 cls = jsonschema.validators.validator_for(schema) 

166 validator = cls(schema) 

167 return validator 

168 

169 

170def _check_builtin_provider_prefix(provider_package: str, class_name: str) -> bool: 

171 if provider_package.startswith("apache-airflow"): 

172 provider_path = provider_package[len("apache-") :].replace("-", ".") 

173 if not class_name.startswith(provider_path): 

174 log.warning( 

175 "Coherence check failed when importing '%s' from '%s' package. It should start with '%s'", 

176 class_name, 

177 provider_package, 

178 provider_path, 

179 ) 

180 return False 

181 return True 

182 

183 

184@dataclass 

185class ProviderInfo: 

186 """ 

187 Provider information. 

188 

189 :param version: version string 

190 :param data: dictionary with information about the provider 

191 """ 

192 

193 version: str 

194 data: dict 

195 

196 

197class HookClassProvider(NamedTuple): 

198 """Hook class and Provider it comes from.""" 

199 

200 hook_class_name: str 

201 package_name: str 

202 

203 

204class DialectInfo(NamedTuple): 

205 """Dialect class and Provider it comes from.""" 

206 

207 name: str 

208 dialect_class_name: str 

209 provider_name: str 

210 

211 

212class TriggerInfo(NamedTuple): 

213 """Trigger class and provider it comes from.""" 

214 

215 trigger_class_name: str 

216 package_name: str 

217 integration_name: str 

218 

219 

220class NotificationInfo(NamedTuple): 

221 """Notification class and provider it comes from.""" 

222 

223 notification_class_name: str 

224 package_name: str 

225 

226 

227class PluginInfo(NamedTuple): 

228 """Plugin class, name and provider it comes from.""" 

229 

230 name: str 

231 plugin_class: str 

232 provider_name: str 

233 

234 

235class HookInfo(NamedTuple): 

236 """Hook information.""" 

237 

238 hook_class_name: str 

239 connection_id_attribute_name: str 

240 package_name: str 

241 hook_name: str 

242 connection_type: str 

243 connection_testable: bool 

244 dialects: list[str] = [] 

245 

246 

247class ConnectionFormWidgetInfo(NamedTuple): 

248 """Connection Form Widget information.""" 

249 

250 hook_class_name: str 

251 package_name: str 

252 field: Any 

253 field_name: str 

254 is_sensitive: bool 

255 

256 

257def log_optional_feature_disabled(class_name, e, provider_package): 

258 """Log optional feature disabled.""" 

259 log.debug( 

260 "Optional feature disabled on exception when importing '%s' from '%s' package", 

261 class_name, 

262 provider_package, 

263 exc_info=e, 

264 ) 

265 log.info( 

266 "Optional provider feature disabled when importing '%s' from '%s' package", 

267 class_name, 

268 provider_package, 

269 ) 

270 

271 

272def log_import_warning(class_name, e, provider_package): 

273 """Log import warning.""" 

274 log.warning( 

275 "Exception when importing '%s' from '%s' package", 

276 class_name, 

277 provider_package, 

278 exc_info=e, 

279 ) 

280 

281 

282# This is a temporary measure until all community providers will add AirflowOptionalProviderFeatureException 

283# where they have optional features. We are going to add tests in our CI to catch all such cases and will 

284# fix them, but until now all "known unhandled optional feature errors" from community providers 

285# should be added here 

286KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS = [("apache-airflow-providers-google", "No module named 'paramiko'")] 

287 

288 

289def _correctness_check(provider_package: str, class_name: str, provider_info: ProviderInfo) -> Any: 

290 """ 

291 Perform coherence check on provider classes. 

292 

293 For apache-airflow providers - it checks if it starts with appropriate package. For all providers 

294 it tries to import the provider - checking that there are no exceptions during importing. 

295 It logs appropriate warning in case it detects any problems. 

296 

297 :param provider_package: name of the provider package 

298 :param class_name: name of the class to import 

299 

300 :return the class if the class is OK, None otherwise. 

301 """ 

302 if not _check_builtin_provider_prefix(provider_package, class_name): 

303 return None 

304 try: 

305 imported_class = import_string(class_name) 

306 except AirflowOptionalProviderFeatureException as e: 

307 # When the provider class raises AirflowOptionalProviderFeatureException 

308 # this is an expected case when only some classes in provider are 

309 # available. We just log debug level here and print info message in logs so that 

310 # the user is aware of it 

311 log_optional_feature_disabled(class_name, e, provider_package) 

312 return None 

313 except ImportError as e: 

314 if "No module named 'airflow.providers." in e.msg: 

315 # handle cases where another provider is missing. This can only happen if 

316 # there is an optional feature, so we log debug and print information about it 

317 log_optional_feature_disabled(class_name, e, provider_package) 

318 return None 

319 for known_error in KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS: 

320 # Until we convert all providers to use AirflowOptionalProviderFeatureException 

321 # we assume any problem with importing another "provider" is because this is an 

322 # optional feature, so we log debug and print information about it 

323 if known_error[0] == provider_package and known_error[1] in e.msg: 

324 log_optional_feature_disabled(class_name, e, provider_package) 

325 return None 

326 # But when we have no idea - we print warning to logs 

327 log_import_warning(class_name, e, provider_package) 

328 return None 

329 except Exception as e: 

330 log_import_warning(class_name, e, provider_package) 

331 return None 

332 return imported_class 

333 

334 

335# We want to have better control over initialization of parameters and be able to debug and test it 

336# So we add our own decorator 

337def provider_info_cache(cache_name: str) -> Callable[[Callable[PS, None]], Callable[PS, None]]: 

338 """ 

339 Decorate and cache provider info. 

340 

341 Decorator factory that create decorator that caches initialization of provider's parameters 

342 :param cache_name: Name of the cache 

343 """ 

344 

345 def provider_info_cache_decorator(func: Callable[PS, None]) -> Callable[PS, None]: 

346 @wraps(func) 

347 def wrapped_function(*args: PS.args, **kwargs: PS.kwargs) -> None: 

348 providers_manager_instance = args[0] 

349 if TYPE_CHECKING: 

350 assert isinstance(providers_manager_instance, ProvidersManager) 

351 

352 if cache_name in providers_manager_instance._initialized_cache: 

353 return 

354 start_time = perf_counter() 

355 log.debug("Initializing Providers Manager[%s]", cache_name) 

356 func(*args, **kwargs) 

357 providers_manager_instance._initialized_cache[cache_name] = True 

358 log.debug( 

359 "Initialization of Providers Manager[%s] took %.2f seconds", 

360 cache_name, 

361 perf_counter() - start_time, 

362 ) 

363 

364 return wrapped_function 

365 

366 return provider_info_cache_decorator 

367 

368 

369class ProvidersManager(LoggingMixin, metaclass=Singleton): 

370 """ 

371 Manages all provider distributions. 

372 

373 This is a Singleton class. The first time it is 

374 instantiated, it discovers all available providers in installed packages. 

375 """ 

376 

377 resource_version = "0" 

378 _initialized: bool = False 

379 _initialization_stack_trace = None 

380 

381 @staticmethod 

382 def initialized() -> bool: 

383 return ProvidersManager._initialized 

384 

385 @staticmethod 

386 def initialization_stack_trace() -> str | None: 

387 return ProvidersManager._initialization_stack_trace 

388 

389 def __init__(self): 

390 """Initialize the manager.""" 

391 super().__init__() 

392 ProvidersManager._initialized = True 

393 ProvidersManager._initialization_stack_trace = "".join(traceback.format_stack(inspect.currentframe())) 

394 self._initialized_cache: dict[str, bool] = {} 

395 # Keeps dict of providers keyed by module name 

396 self._provider_dict: dict[str, ProviderInfo] = {} 

397 self._fs_set: set[str] = set() 

398 self._asset_uri_handlers: dict[str, Callable[[SplitResult], SplitResult]] = {} 

399 self._asset_factories: dict[str, Callable[..., Asset]] = {} 

400 self._asset_to_openlineage_converters: dict[str, Callable] = {} 

401 self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache() 

402 # keeps mapping between connection_types and hook class, package they come from 

403 self._hook_provider_dict: dict[str, HookClassProvider] = {} 

404 self._dialect_provider_dict: dict[str, DialectInfo] = {} 

405 # Keeps dict of hooks keyed by connection type. They are lazy evaluated at access time 

406 self._hooks_lazy_dict: LazyDictWithCache[str, HookInfo | Callable] = LazyDictWithCache() 

407 # Keeps methods that should be used to add custom widgets tuple of keyed by name of the extra field 

408 self._connection_form_widgets: dict[str, ConnectionFormWidgetInfo] = {} 

409 # Customizations for javascript fields are kept here 

410 self._field_behaviours: dict[str, dict] = {} 

411 self._cli_command_functions_set: set[Callable[[], list[CLICommand]]] = set() 

412 self._cli_command_provider_name_set: set[str] = set() 

413 self._extra_link_class_name_set: set[str] = set() 

414 self._logging_class_name_set: set[str] = set() 

415 self._auth_manager_class_name_set: set[str] = set() 

416 self._auth_manager_without_check_set: set[tuple[str, str]] = set() 

417 self._secrets_backend_class_name_set: set[str] = set() 

418 self._executor_class_name_set: set[str] = set() 

419 self._executor_without_check_set: set[tuple[str, str]] = set() 

420 self._queue_class_name_set: set[str] = set() 

421 self._provider_configs: dict[str, dict[str, Any]] = {} 

422 self._trigger_info_set: set[TriggerInfo] = set() 

423 self._notification_info_set: set[NotificationInfo] = set() 

424 self._provider_schema_validator = _create_provider_info_schema_validator() 

425 self._customized_form_fields_schema_validator = ( 

426 _create_customized_form_field_behaviours_schema_validator() 

427 ) 

428 # Set of plugins contained in providers 

429 self._plugins_set: set[PluginInfo] = set() 

430 self._init_airflow_core_hooks() 

431 

432 def _init_airflow_core_hooks(self): 

433 """Initialize the hooks dict with default hooks from Airflow core.""" 

434 core_dummy_hooks = { 

435 "generic": "Generic", 

436 "email": "Email", 

437 } 

438 for key, display in core_dummy_hooks.items(): 

439 self._hooks_lazy_dict[key] = HookInfo( 

440 hook_class_name=None, 

441 connection_id_attribute_name=None, 

442 package_name=None, 

443 hook_name=display, 

444 connection_type=None, 

445 connection_testable=False, 

446 ) 

447 for conn_type, class_name in ( 

448 ("fs", "airflow.providers.standard.hooks.filesystem.FSHook"), 

449 ("package_index", "airflow.providers.standard.hooks.package_index.PackageIndexHook"), 

450 ): 

451 self._hooks_lazy_dict[conn_type] = functools.partial( 

452 self._import_hook, 

453 connection_type=None, 

454 package_name="apache-airflow-providers-standard", 

455 hook_class_name=class_name, 

456 provider_info=None, 

457 ) 

458 

459 @provider_info_cache("list") 

460 def initialize_providers_list(self): 

461 """Lazy initialization of providers list.""" 

462 # Local source folders are loaded first. They should take precedence over the package ones for 

463 # Development purpose. In production provider.yaml files are not present in the 'airflow" directory 

464 # So there is no risk we are going to override package provider accidentally. This can only happen 

465 # in case of local development 

466 self._discover_all_providers_from_packages() 

467 self._verify_all_providers_all_compatible() 

468 self._provider_dict = dict(sorted(self._provider_dict.items())) 

469 

470 def _verify_all_providers_all_compatible(self): 

471 from packaging import version as packaging_version 

472 

473 for provider_id, info in self._provider_dict.items(): 

474 min_version = MIN_PROVIDER_VERSIONS.get(provider_id) 

475 if min_version: 

476 if packaging_version.parse(min_version) > packaging_version.parse(info.version): 

477 log.warning( 

478 "The package %s is not compatible with this version of Airflow. " 

479 "The package has version %s but the minimum supported version " 

480 "of the package is %s", 

481 provider_id, 

482 info.version, 

483 min_version, 

484 ) 

485 

486 @provider_info_cache("hooks") 

487 def initialize_providers_hooks(self): 

488 """Lazy initialization of providers hooks.""" 

489 self._init_airflow_core_hooks() 

490 self.initialize_providers_list() 

491 self._discover_hooks() 

492 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items())) 

493 

494 @provider_info_cache("filesystems") 

495 def initialize_providers_filesystems(self): 

496 """Lazy initialization of providers filesystems.""" 

497 self.initialize_providers_list() 

498 self._discover_filesystems() 

499 

500 @provider_info_cache("asset_uris") 

501 def initialize_providers_asset_uri_resources(self): 

502 """Lazy initialization of provider asset URI handlers, factories, converters etc.""" 

503 self.initialize_providers_list() 

504 self._discover_asset_uri_resources() 

505 

506 @provider_info_cache("hook_lineage_writers") 

507 @provider_info_cache("taskflow_decorators") 

508 def initialize_providers_taskflow_decorator(self): 

509 """Lazy initialization of providers hooks.""" 

510 self.initialize_providers_list() 

511 self._discover_taskflow_decorators() 

512 

513 @provider_info_cache("extra_links") 

514 def initialize_providers_extra_links(self): 

515 """Lazy initialization of providers extra links.""" 

516 self.initialize_providers_list() 

517 self._discover_extra_links() 

518 

519 @provider_info_cache("logging") 

520 def initialize_providers_logging(self): 

521 """Lazy initialization of providers logging information.""" 

522 self.initialize_providers_list() 

523 self._discover_logging() 

524 

525 @provider_info_cache("secrets_backends") 

526 def initialize_providers_secrets_backends(self): 

527 """Lazy initialization of providers secrets_backends information.""" 

528 self.initialize_providers_list() 

529 self._discover_secrets_backends() 

530 

531 @provider_info_cache("executors") 

532 def initialize_providers_executors(self): 

533 """Lazy initialization of providers executors information.""" 

534 self.initialize_providers_list() 

535 self._discover_executors(check=True) 

536 

537 @provider_info_cache("executors_without_check") 

538 def initialize_providers_executors_without_check(self): 

539 """Lazy initialization of providers executors information.""" 

540 self.initialize_providers_list() 

541 self._discover_executors(check=False) 

542 

543 @provider_info_cache("queues") 

544 def initialize_providers_queues(self): 

545 """Lazy initialization of providers queue information.""" 

546 self.initialize_providers_list() 

547 self._discover_queues() 

548 

549 @provider_info_cache("notifications") 

550 def initialize_providers_notifications(self): 

551 """Lazy initialization of providers notifications information.""" 

552 self.initialize_providers_list() 

553 self._discover_notifications() 

554 

555 @provider_info_cache("auth_managers") 

556 def initialize_providers_auth_managers(self): 

557 """Lazy initialization of providers auth manager information.""" 

558 self.initialize_providers_list() 

559 self._discover_auth_managers(check=True) 

560 

561 @provider_info_cache("auth_managers_without_check") 

562 def initialize_providers_auth_managers_without_check(self): 

563 """Lazy initialization of providers auth manager information.""" 

564 self.initialize_providers_list() 

565 self._discover_auth_managers(check=False) 

566 

567 @provider_info_cache("config") 

568 def initialize_providers_configuration(self): 

569 """Lazy initialization of providers configuration information.""" 

570 self._initialize_providers_configuration() 

571 

572 def _initialize_providers_configuration(self): 

573 """ 

574 Initialize providers configuration information. 

575 

576 Should be used if we do not want to trigger caching for ``initialize_providers_configuration`` method. 

577 In some cases we might want to make sure that the configuration is initialized, but we do not want 

578 to cache the initialization method - for example when we just want to write configuration with 

579 providers, but it is used in the context where no providers are loaded yet we will eventually 

580 restore the original configuration and we want the subsequent ``initialize_providers_configuration`` 

581 method to be run in order to load the configuration for providers again. 

582 """ 

583 self.initialize_providers_list() 

584 self._discover_config() 

585 # Now update conf with the new provider configuration from providers 

586 from airflow.configuration import conf 

587 

588 conf.load_providers_configuration() 

589 

590 @provider_info_cache("plugins") 

591 def initialize_providers_plugins(self): 

592 self.initialize_providers_list() 

593 self._discover_plugins() 

594 

595 @provider_info_cache("cli_command") 

596 def initialize_providers_cli_command(self): 

597 """Lazy initialization of providers CLI commands.""" 

598 self.initialize_providers_list() 

599 self._discover_cli_command() 

600 

601 def _discover_all_providers_from_packages(self) -> None: 

602 """ 

603 Discover all providers by scanning packages installed. 

604 

605 The list of providers should be returned via the 'apache_airflow_provider' 

606 entrypoint as a dictionary conforming to the 'airflow/provider_info.schema.json' 

607 schema. Note that the schema is different at runtime than provider.yaml.schema.json. 

608 The development version of provider schema is more strict and changes together with 

609 the code. The runtime version is more relaxed (allows for additional properties) 

610 and verifies only the subset of fields that are needed at runtime. 

611 """ 

612 for entry_point, dist in entry_points_with_dist("apache_airflow_provider"): 

613 if not dist.metadata: 

614 continue 

615 package_name = canonicalize_name(dist.metadata["name"]) 

616 if package_name in self._provider_dict: 

617 continue 

618 log.debug("Loading %s from package %s", entry_point, package_name) 

619 version = dist.version 

620 provider_info = entry_point.load()() 

621 self._provider_schema_validator.validate(provider_info) 

622 provider_info_package_name = provider_info["package-name"] 

623 if package_name != provider_info_package_name: 

624 raise ValueError( 

625 f"The package '{package_name}' from packaging information " 

626 f"{provider_info_package_name} do not match. Please make sure they are aligned" 

627 ) 

628 

629 # issue-59576: Retrieve the project.urls.documentation from dist.metadata 

630 project_urls = dist.metadata.get_all("Project-URL") 

631 documentation_url: str | None = None 

632 

633 if project_urls: 

634 for entry in project_urls: 

635 if "," in entry: 

636 name, url = entry.split(",") 

637 if name.strip().lower() == "documentation": 

638 documentation_url = url 

639 break 

640 

641 provider_info["documentation-url"] = documentation_url 

642 

643 if package_name not in self._provider_dict: 

644 self._provider_dict[package_name] = ProviderInfo(version, provider_info) 

645 else: 

646 log.warning( 

647 "The provider for package '%s' could not be registered from because providers for that " 

648 "package name have already been registered", 

649 package_name, 

650 ) 

651 

652 def _discover_hooks_from_connection_types( 

653 self, 

654 hook_class_names_registered: set[str], 

655 already_registered_warning_connection_types: set[str], 

656 package_name: str, 

657 provider: ProviderInfo, 

658 ): 

659 """ 

660 Discover hooks from the "connection-types" property. 

661 

662 This is new, better method that replaces discovery from hook-class-names as it 

663 allows to lazy import individual Hook classes when they are accessed. 

664 The "connection-types" keeps information about both - connection type and class 

665 name so we can discover all connection-types without importing the classes. 

666 :param hook_class_names_registered: set of registered hook class names for this provider 

667 :param already_registered_warning_connection_types: set of connections for which warning should be 

668 printed in logs as they were already registered before 

669 :param package_name: 

670 :param provider: 

671 :return: 

672 """ 

673 provider_uses_connection_types = False 

674 connection_types = provider.data.get("connection-types") 

675 if connection_types: 

676 for connection_type_dict in connection_types: 

677 connection_type = connection_type_dict["connection-type"] 

678 hook_class_name = connection_type_dict["hook-class-name"] 

679 hook_class_names_registered.add(hook_class_name) 

680 already_registered = self._hook_provider_dict.get(connection_type) 

681 if already_registered: 

682 if already_registered.package_name != package_name: 

683 already_registered_warning_connection_types.add(connection_type) 

684 else: 

685 log.warning( 

686 "The connection type '%s' is already registered in the" 

687 " package '%s' with different class names: '%s' and '%s'. ", 

688 connection_type, 

689 package_name, 

690 already_registered.hook_class_name, 

691 hook_class_name, 

692 ) 

693 else: 

694 self._hook_provider_dict[connection_type] = HookClassProvider( 

695 hook_class_name=hook_class_name, package_name=package_name 

696 ) 

697 # Defer importing hook to access time by setting import hook method as dict value 

698 self._hooks_lazy_dict[connection_type] = functools.partial( 

699 self._import_hook, 

700 connection_type=connection_type, 

701 provider_info=provider, 

702 ) 

703 provider_uses_connection_types = True 

704 return provider_uses_connection_types 

705 

706 def _discover_hooks_from_hook_class_names( 

707 self, 

708 hook_class_names_registered: set[str], 

709 already_registered_warning_connection_types: set[str], 

710 package_name: str, 

711 provider: ProviderInfo, 

712 provider_uses_connection_types: bool, 

713 ): 

714 """ 

715 Discover hooks from "hook-class-names' property. 

716 

717 This property is deprecated but we should support it in Airflow 2. 

718 The hook-class-names array contained just Hook names without connection type, 

719 therefore we need to import all those classes immediately to know which connection types 

720 are supported. This makes it impossible to selectively only import those hooks that are used. 

721 :param already_registered_warning_connection_types: list of connection hooks that we should warn 

722 about when finished discovery 

723 :param package_name: name of the provider package 

724 :param provider: class that keeps information about version and details of the provider 

725 :param provider_uses_connection_types: determines whether the provider uses "connection-types" new 

726 form of passing connection types 

727 :return: 

728 """ 

729 hook_class_names = provider.data.get("hook-class-names") 

730 if hook_class_names: 

731 for hook_class_name in hook_class_names: 

732 if hook_class_name in hook_class_names_registered: 

733 # Silently ignore the hook class - it's already marked for lazy-import by 

734 # connection-types discovery 

735 continue 

736 hook_info = self._import_hook( 

737 connection_type=None, 

738 provider_info=provider, 

739 hook_class_name=hook_class_name, 

740 package_name=package_name, 

741 ) 

742 if not hook_info: 

743 # Problem why importing class - we ignore it. Log is written at import time 

744 continue 

745 already_registered = self._hook_provider_dict.get(hook_info.connection_type) 

746 if already_registered: 

747 if already_registered.package_name != package_name: 

748 already_registered_warning_connection_types.add(hook_info.connection_type) 

749 else: 

750 if already_registered.hook_class_name != hook_class_name: 

751 log.warning( 

752 "The hook connection type '%s' is registered twice in the" 

753 " package '%s' with different class names: '%s' and '%s'. " 

754 " Please fix it!", 

755 hook_info.connection_type, 

756 package_name, 

757 already_registered.hook_class_name, 

758 hook_class_name, 

759 ) 

760 else: 

761 self._hook_provider_dict[hook_info.connection_type] = HookClassProvider( 

762 hook_class_name=hook_class_name, package_name=package_name 

763 ) 

764 self._hooks_lazy_dict[hook_info.connection_type] = hook_info 

765 

766 if not provider_uses_connection_types: 

767 warnings.warn( 

768 f"The provider {package_name} uses `hook-class-names` " 

769 "property in provider-info and has no `connection-types` one. " 

770 "The 'hook-class-names' property has been deprecated in favour " 

771 "of 'connection-types' in Airflow 2.2. Use **both** in case you want to " 

772 "have backwards compatibility with Airflow < 2.2", 

773 DeprecationWarning, 

774 stacklevel=1, 

775 ) 

776 for already_registered_connection_type in already_registered_warning_connection_types: 

777 log.warning( 

778 "The connection_type '%s' has been already registered by provider '%s.'", 

779 already_registered_connection_type, 

780 self._hook_provider_dict[already_registered_connection_type].package_name, 

781 ) 

782 

783 def _discover_hooks(self) -> None: 

784 """Retrieve all connections defined in the providers via Hooks.""" 

785 for package_name, provider in self._provider_dict.items(): 

786 duplicated_connection_types: set[str] = set() 

787 hook_class_names_registered: set[str] = set() 

788 self._discover_provider_dialects(package_name, provider) 

789 provider_uses_connection_types = self._discover_hooks_from_connection_types( 

790 hook_class_names_registered, duplicated_connection_types, package_name, provider 

791 ) 

792 self._discover_hooks_from_hook_class_names( 

793 hook_class_names_registered, 

794 duplicated_connection_types, 

795 package_name, 

796 provider, 

797 provider_uses_connection_types, 

798 ) 

799 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items())) 

800 

801 def _discover_provider_dialects(self, provider_name: str, provider: ProviderInfo): 

802 dialects = provider.data.get("dialects", []) 

803 if dialects: 

804 self._dialect_provider_dict.update( 

805 { 

806 item["dialect-type"]: DialectInfo( 

807 name=item["dialect-type"], 

808 dialect_class_name=item["dialect-class-name"], 

809 provider_name=provider_name, 

810 ) 

811 for item in dialects 

812 } 

813 ) 

814 

815 @provider_info_cache("import_all_hooks") 

816 def _import_info_from_all_hooks(self): 

817 """Force-import all hooks and initialize the connections/fields.""" 

818 # Retrieve all hooks to make sure that all of them are imported 

819 _ = list(self._hooks_lazy_dict.values()) 

820 self._field_behaviours = dict(sorted(self._field_behaviours.items())) 

821 

822 # Widgets for connection forms are currently used in two places: 

823 # 1. In the UI Connections, expected same order that it defined in Hook. 

824 # 2. cli command - `airflow providers widgets` and expected that it in alphabetical order. 

825 # It is not possible to recover original ordering after sorting, 

826 # that the main reason why original sorting moved to cli part: 

827 # self._connection_form_widgets = dict(sorted(self._connection_form_widgets.items())) 

828 

829 def _discover_filesystems(self) -> None: 

830 """Retrieve all filesystems defined in the providers.""" 

831 for provider_package, provider in self._provider_dict.items(): 

832 for fs_module_name in provider.data.get("filesystems", []): 

833 if _correctness_check(provider_package, f"{fs_module_name}.get_fs", provider): 

834 self._fs_set.add(fs_module_name) 

835 self._fs_set = set(sorted(self._fs_set)) 

836 

837 def _discover_asset_uri_resources(self) -> None: 

838 """Discovers and registers asset URI handlers, factories, and converters for all providers.""" 

839 from airflow.sdk.definitions.asset import normalize_noop 

840 

841 def _safe_register_resource( 

842 provider_package_name: str, 

843 schemes_list: list[str], 

844 resource_path: str | None, 

845 resource_registry: dict, 

846 default_resource: Any = None, 

847 ): 

848 """ 

849 Register a specific resource (handler, factory, or converter) for the given schemes. 

850 

851 If the resolved resource (either from the path or the default) is valid, it updates 

852 the resource registry with the appropriate resource for each scheme. 

853 """ 

854 resource = ( 

855 _correctness_check(provider_package_name, resource_path, provider) 

856 if resource_path is not None 

857 else default_resource 

858 ) 

859 if resource: 

860 resource_registry.update((scheme, resource) for scheme in schemes_list) 

861 

862 for provider_name, provider in self._provider_dict.items(): 

863 for uri_info in provider.data.get("asset-uris", []): 

864 if "schemes" not in uri_info or "handler" not in uri_info: 

865 continue # Both schemas and handler must be explicitly set, handler can be set to null 

866 common_args = {"schemes_list": uri_info["schemes"], "provider_package_name": provider_name} 

867 _safe_register_resource( 

868 resource_path=uri_info["handler"], 

869 resource_registry=self._asset_uri_handlers, 

870 default_resource=normalize_noop, 

871 **common_args, 

872 ) 

873 _safe_register_resource( 

874 resource_path=uri_info.get("factory"), 

875 resource_registry=self._asset_factories, 

876 **common_args, 

877 ) 

878 _safe_register_resource( 

879 resource_path=uri_info.get("to_openlineage_converter"), 

880 resource_registry=self._asset_to_openlineage_converters, 

881 **common_args, 

882 ) 

883 

884 def _discover_taskflow_decorators(self) -> None: 

885 for name, info in self._provider_dict.items(): 

886 for taskflow_decorator in info.data.get("task-decorators", []): 

887 self._add_taskflow_decorator( 

888 taskflow_decorator["name"], taskflow_decorator["class-name"], name 

889 ) 

890 

891 def _add_taskflow_decorator(self, name, decorator_class_name: str, provider_package: str) -> None: 

892 if not _check_builtin_provider_prefix(provider_package, decorator_class_name): 

893 return 

894 

895 if name in self._taskflow_decorators: 

896 try: 

897 existing = self._taskflow_decorators[name] 

898 other_name = f"{existing.__module__}.{existing.__name__}" 

899 except Exception: 

900 # If problem importing, then get the value from the functools.partial 

901 other_name = self._taskflow_decorators._raw_dict[name].args[0] # type: ignore[attr-defined] 

902 

903 log.warning( 

904 "The taskflow decorator '%s' has been already registered (by %s).", 

905 name, 

906 other_name, 

907 ) 

908 return 

909 

910 self._taskflow_decorators[name] = functools.partial(import_string, decorator_class_name) 

911 

912 @staticmethod 

913 def _get_attr(obj: Any, attr_name: str): 

914 """Retrieve attributes of an object, or warn if not found.""" 

915 if not hasattr(obj, attr_name): 

916 log.warning("The object '%s' is missing %s attribute and cannot be registered", obj, attr_name) 

917 return None 

918 return getattr(obj, attr_name) 

919 

920 def _import_hook( 

921 self, 

922 connection_type: str | None, 

923 provider_info: ProviderInfo, 

924 hook_class_name: str | None = None, 

925 package_name: str | None = None, 

926 ) -> HookInfo | None: 

927 """ 

928 Import hook and retrieve hook information. 

929 

930 Either connection_type (for lazy loading) or hook_class_name must be set - but not both). 

931 Only needs package_name if hook_class_name is passed (for lazy loading, package_name 

932 is retrieved from _connection_type_class_provider_dict together with hook_class_name). 

933 

934 :param connection_type: type of the connection 

935 :param hook_class_name: name of the hook class 

936 :param package_name: provider package - only needed in case connection_type is missing 

937 : return 

938 """ 

939 if connection_type is None and hook_class_name is None: 

940 raise ValueError("Either connection_type or hook_class_name must be set") 

941 if connection_type is not None and hook_class_name is not None: 

942 raise ValueError( 

943 f"Both connection_type ({connection_type} and " 

944 f"hook_class_name {hook_class_name} are set. Only one should be set!" 

945 ) 

946 if connection_type is not None: 

947 class_provider = self._hook_provider_dict[connection_type] 

948 package_name = class_provider.package_name 

949 hook_class_name = class_provider.hook_class_name 

950 else: 

951 if not hook_class_name: 

952 raise ValueError("Either connection_type or hook_class_name must be set") 

953 if not package_name: 

954 raise ValueError( 

955 f"Provider package name is not set when hook_class_name ({hook_class_name}) is used" 

956 ) 

957 hook_class: type[BaseHook] | None = _correctness_check(package_name, hook_class_name, provider_info) 

958 if hook_class is None: 

959 return None 

960 try: 

961 from wtforms import BooleanField, IntegerField, PasswordField, StringField 

962 

963 allowed_field_classes = [IntegerField, PasswordField, StringField, BooleanField] 

964 module, class_name = hook_class_name.rsplit(".", maxsplit=1) 

965 # Do not use attr here. We want to check only direct class fields not those 

966 # inherited from parent hook. This way we add form fields only once for the whole 

967 # hierarchy and we add it only from the parent hook that provides those! 

968 if "get_connection_form_widgets" in hook_class.__dict__: 

969 widgets = hook_class.get_connection_form_widgets() 

970 if widgets: 

971 for widget in widgets.values(): 

972 if widget.field_class not in allowed_field_classes: 

973 log.warning( 

974 "The hook_class '%s' uses field of unsupported class '%s'. " 

975 "Only '%s' field classes are supported", 

976 hook_class_name, 

977 widget.field_class, 

978 allowed_field_classes, 

979 ) 

980 return None 

981 self._add_widgets(package_name, hook_class, widgets) 

982 if "get_ui_field_behaviour" in hook_class.__dict__: 

983 field_behaviours = hook_class.get_ui_field_behaviour() 

984 if field_behaviours: 

985 self._add_customized_fields(package_name, hook_class, field_behaviours) 

986 except ImportError as e: 

987 if e.name in ["flask_appbuilder", "wtforms"]: 

988 log.info( 

989 "The hook_class '%s' is not fully initialized (UI widgets will be missing), because " 

990 "the 'flask_appbuilder' package is not installed, however it is not required for " 

991 "Airflow components to work", 

992 hook_class_name, 

993 ) 

994 except Exception as e: 

995 log.warning( 

996 "Exception when importing '%s' from '%s' package: %s", 

997 hook_class_name, 

998 package_name, 

999 e, 

1000 ) 

1001 return None 

1002 hook_connection_type = self._get_attr(hook_class, "conn_type") 

1003 if connection_type: 

1004 if hook_connection_type != connection_type: 

1005 log.warning( 

1006 "Inconsistency! The hook class '%s' declares connection type '%s'" 

1007 " but it is added by provider '%s' as connection_type '%s' in provider info. " 

1008 "This should be fixed!", 

1009 hook_class, 

1010 hook_connection_type, 

1011 package_name, 

1012 connection_type, 

1013 ) 

1014 connection_type = hook_connection_type 

1015 connection_id_attribute_name: str = self._get_attr(hook_class, "conn_name_attr") 

1016 hook_name: str = self._get_attr(hook_class, "hook_name") 

1017 

1018 if not connection_type or not connection_id_attribute_name or not hook_name: 

1019 log.warning( 

1020 "The hook misses one of the key attributes: " 

1021 "conn_type: %s, conn_id_attribute_name: %s, hook_name: %s", 

1022 connection_type, 

1023 connection_id_attribute_name, 

1024 hook_name, 

1025 ) 

1026 return None 

1027 

1028 return HookInfo( 

1029 hook_class_name=hook_class_name, 

1030 connection_id_attribute_name=connection_id_attribute_name, 

1031 package_name=package_name, 

1032 hook_name=hook_name, 

1033 connection_type=connection_type, 

1034 connection_testable=hasattr(hook_class, "test_connection"), 

1035 ) 

1036 

1037 def _add_widgets(self, package_name: str, hook_class: type, widgets: dict[str, Any]): 

1038 conn_type = hook_class.conn_type # type: ignore 

1039 for field_identifier, field in widgets.items(): 

1040 if field_identifier.startswith("extra__"): 

1041 prefixed_field_name = field_identifier 

1042 else: 

1043 prefixed_field_name = f"extra__{conn_type}__{field_identifier}" 

1044 if prefixed_field_name in self._connection_form_widgets: 

1045 log.warning( 

1046 "The field %s from class %s has already been added by another provider. Ignoring it.", 

1047 field_identifier, 

1048 hook_class.__name__, 

1049 ) 

1050 # In case of inherited hooks this might be happening several times 

1051 else: 

1052 self._connection_form_widgets[prefixed_field_name] = ConnectionFormWidgetInfo( 

1053 hook_class.__name__, 

1054 package_name, 

1055 field, 

1056 field_identifier, 

1057 hasattr(field.field_class.widget, "input_type") 

1058 and field.field_class.widget.input_type == "password", 

1059 ) 

1060 

1061 def _add_customized_fields(self, package_name: str, hook_class: type, customized_fields: dict): 

1062 try: 

1063 connection_type = getattr(hook_class, "conn_type") 

1064 

1065 self._customized_form_fields_schema_validator.validate(customized_fields) 

1066 

1067 if connection_type: 

1068 customized_fields = _ensure_prefix_for_placeholders(customized_fields, connection_type) 

1069 

1070 if connection_type in self._field_behaviours: 

1071 log.warning( 

1072 "The connection_type %s from package %s and class %s has already been added " 

1073 "by another provider. Ignoring it.", 

1074 connection_type, 

1075 package_name, 

1076 hook_class.__name__, 

1077 ) 

1078 return 

1079 self._field_behaviours[connection_type] = customized_fields 

1080 except Exception as e: 

1081 log.warning( 

1082 "Error when loading customized fields from package '%s' hook class '%s': %s", 

1083 package_name, 

1084 hook_class.__name__, 

1085 e, 

1086 ) 

1087 

1088 def _discover_auth_managers(self, *, check: bool) -> None: 

1089 """Retrieve all auth managers defined in the providers.""" 

1090 for provider_package, provider in self._provider_dict.items(): 

1091 if provider.data.get("auth-managers"): 

1092 for auth_manager_class_name in provider.data["auth-managers"]: 

1093 if not check: 

1094 self._auth_manager_without_check_set.add((auth_manager_class_name, provider_package)) 

1095 elif _correctness_check(provider_package, auth_manager_class_name, provider): 

1096 self._auth_manager_class_name_set.add(auth_manager_class_name) 

1097 

1098 def _discover_cli_command(self) -> None: 

1099 """Retrieve all CLI command functions defined in the providers.""" 

1100 for provider_package, provider in self._provider_dict.items(): 

1101 if provider.data.get("cli"): 

1102 for cli_command_function_name in provider.data["cli"]: 

1103 # _correctness_check will return the function if found and correct 

1104 # we store the function itself instead of its name to avoid importing it again later in cli_parser to speed up cli loading 

1105 if cli_func := _correctness_check(provider_package, cli_command_function_name, provider): 

1106 cli_func = cast("Callable[[], list[CLICommand]]", cli_func) 

1107 self._cli_command_functions_set.add(cli_func) 

1108 self._cli_command_provider_name_set.add(provider_package) 

1109 

1110 def _discover_notifications(self) -> None: 

1111 """Retrieve all notifications defined in the providers.""" 

1112 for provider_package, provider in self._provider_dict.items(): 

1113 if provider.data.get("notifications"): 

1114 for notification_class_name in provider.data["notifications"]: 

1115 if _correctness_check(provider_package, notification_class_name, provider): 

1116 self._notification_info_set.add(notification_class_name) 

1117 

1118 def _discover_extra_links(self) -> None: 

1119 """Retrieve all extra links defined in the providers.""" 

1120 for provider_package, provider in self._provider_dict.items(): 

1121 if provider.data.get("extra-links"): 

1122 for extra_link_class_name in provider.data["extra-links"]: 

1123 if _correctness_check(provider_package, extra_link_class_name, provider): 

1124 self._extra_link_class_name_set.add(extra_link_class_name) 

1125 

1126 def _discover_logging(self) -> None: 

1127 """Retrieve all logging defined in the providers.""" 

1128 for provider_package, provider in self._provider_dict.items(): 

1129 if provider.data.get("logging"): 

1130 for logging_class_name in provider.data["logging"]: 

1131 if _correctness_check(provider_package, logging_class_name, provider): 

1132 self._logging_class_name_set.add(logging_class_name) 

1133 

1134 def _discover_secrets_backends(self) -> None: 

1135 """Retrieve all secrets backends defined in the providers.""" 

1136 for provider_package, provider in self._provider_dict.items(): 

1137 if provider.data.get("secrets-backends"): 

1138 for secrets_backends_class_name in provider.data["secrets-backends"]: 

1139 if _correctness_check(provider_package, secrets_backends_class_name, provider): 

1140 self._secrets_backend_class_name_set.add(secrets_backends_class_name) 

1141 

1142 def _discover_executors(self, *, check: bool) -> None: 

1143 """Retrieve all executors defined in the providers.""" 

1144 for provider_package, provider in self._provider_dict.items(): 

1145 if provider.data.get("executors"): 

1146 for executors_class_path in provider.data["executors"]: 

1147 if not check: 

1148 self._executor_without_check_set.add((executors_class_path, provider_package)) 

1149 elif _correctness_check(provider_package, executors_class_path, provider): 

1150 self._executor_class_name_set.add(executors_class_path) 

1151 

1152 def _discover_queues(self) -> None: 

1153 """Retrieve all queues defined in the providers.""" 

1154 for provider_package, provider in self._provider_dict.items(): 

1155 if provider.data.get("queues"): 

1156 for queue_class_name in provider.data["queues"]: 

1157 if _correctness_check(provider_package, queue_class_name, provider): 

1158 self._queue_class_name_set.add(queue_class_name) 

1159 

1160 def _discover_config(self) -> None: 

1161 """Retrieve all configs defined in the providers.""" 

1162 for provider_package, provider in self._provider_dict.items(): 

1163 if provider.data.get("config"): 

1164 self._provider_configs[provider_package] = provider.data.get("config") # type: ignore[assignment] 

1165 

1166 def _discover_plugins(self) -> None: 

1167 """Retrieve all plugins defined in the providers.""" 

1168 for provider_package, provider in self._provider_dict.items(): 

1169 for plugin_dict in provider.data.get("plugins", ()): 

1170 if not _correctness_check(provider_package, plugin_dict["plugin-class"], provider): 

1171 log.warning("Plugin not loaded due to above correctness check problem.") 

1172 continue 

1173 self._plugins_set.add( 

1174 PluginInfo( 

1175 name=plugin_dict["name"], 

1176 plugin_class=plugin_dict["plugin-class"], 

1177 provider_name=provider_package, 

1178 ) 

1179 ) 

1180 

1181 @provider_info_cache("triggers") 

1182 def initialize_providers_triggers(self): 

1183 """Initialize providers triggers.""" 

1184 self.initialize_providers_list() 

1185 for provider_package, provider in self._provider_dict.items(): 

1186 for trigger in provider.data.get("triggers", []): 

1187 for trigger_class_name in trigger.get("python-modules"): 

1188 self._trigger_info_set.add( 

1189 TriggerInfo( 

1190 package_name=provider_package, 

1191 trigger_class_name=trigger_class_name, 

1192 integration_name=trigger.get("integration-name", ""), 

1193 ) 

1194 ) 

1195 

1196 @property 

1197 def auth_managers(self) -> list[str]: 

1198 """Returns information about available providers notifications class.""" 

1199 self.initialize_providers_auth_managers() 

1200 return sorted(self._auth_manager_class_name_set) 

1201 

1202 @property 

1203 def auth_manager_without_check(self) -> set[tuple[str, str]]: 

1204 """Returns set of (auth manager class names, provider package name) without correctness check.""" 

1205 self.initialize_providers_auth_managers_without_check() 

1206 return self._auth_manager_without_check_set 

1207 

1208 @property 

1209 def cli_command_functions(self) -> set[Callable[[], list[CLICommand]]]: 

1210 """Returns list of CLI command function names from providers.""" 

1211 self.initialize_providers_cli_command() 

1212 return self._cli_command_functions_set 

1213 

1214 @property 

1215 def cli_command_providers(self) -> set[str]: 

1216 """Returns set of provider package names that provide CLI commands.""" 

1217 self.initialize_providers_cli_command() 

1218 return self._cli_command_provider_name_set 

1219 

1220 @property 

1221 def notification(self) -> list[NotificationInfo]: 

1222 """Returns information about available providers notifications class.""" 

1223 self.initialize_providers_notifications() 

1224 return sorted(self._notification_info_set) 

1225 

1226 @property 

1227 def trigger(self) -> list[TriggerInfo]: 

1228 """Returns information about available providers trigger class.""" 

1229 self.initialize_providers_triggers() 

1230 return sorted(self._trigger_info_set, key=lambda x: x.package_name) 

1231 

1232 @property 

1233 def providers(self) -> dict[str, ProviderInfo]: 

1234 """Returns information about available providers.""" 

1235 self.initialize_providers_list() 

1236 return self._provider_dict 

1237 

1238 @property 

1239 def hooks(self) -> MutableMapping[str, HookInfo | None]: 

1240 """ 

1241 Return dictionary of connection_type-to-hook mapping. 

1242 

1243 Note that the dict can contain None values if a hook discovered cannot be imported! 

1244 """ 

1245 self.initialize_providers_hooks() 

1246 # When we return hooks here it will only be used to retrieve hook information 

1247 return self._hooks_lazy_dict 

1248 

1249 @property 

1250 def dialects(self) -> MutableMapping[str, DialectInfo]: 

1251 """Return dictionary of connection_type-to-dialect mapping.""" 

1252 self.initialize_providers_hooks() 

1253 # When we return dialects here it will only be used to retrieve dialect information 

1254 return self._dialect_provider_dict 

1255 

1256 @property 

1257 def plugins(self) -> list[PluginInfo]: 

1258 """Returns information about plugins available in providers.""" 

1259 self.initialize_providers_plugins() 

1260 return sorted(self._plugins_set, key=lambda x: x.plugin_class) 

1261 

1262 @property 

1263 def taskflow_decorators(self) -> dict[str, TaskDecorator]: 

1264 self.initialize_providers_taskflow_decorator() 

1265 return self._taskflow_decorators # type: ignore[return-value] 

1266 

1267 @property 

1268 def extra_links_class_names(self) -> list[str]: 

1269 """Returns set of extra link class names.""" 

1270 self.initialize_providers_extra_links() 

1271 return sorted(self._extra_link_class_name_set) 

1272 

1273 @property 

1274 def connection_form_widgets(self) -> dict[str, ConnectionFormWidgetInfo]: 

1275 """ 

1276 Returns widgets for connection forms. 

1277 

1278 Dictionary keys in the same order that it defined in Hook. 

1279 """ 

1280 self.initialize_providers_hooks() 

1281 self._import_info_from_all_hooks() 

1282 return self._connection_form_widgets 

1283 

1284 @property 

1285 def field_behaviours(self) -> dict[str, dict]: 

1286 """Returns dictionary with field behaviours for connection types.""" 

1287 self.initialize_providers_hooks() 

1288 self._import_info_from_all_hooks() 

1289 return self._field_behaviours 

1290 

1291 @property 

1292 def logging_class_names(self) -> list[str]: 

1293 """Returns set of log task handlers class names.""" 

1294 self.initialize_providers_logging() 

1295 return sorted(self._logging_class_name_set) 

1296 

1297 @property 

1298 def secrets_backend_class_names(self) -> list[str]: 

1299 """Returns set of secret backend class names.""" 

1300 self.initialize_providers_secrets_backends() 

1301 return sorted(self._secrets_backend_class_name_set) 

1302 

1303 @property 

1304 def executor_class_names(self) -> list[str]: 

1305 self.initialize_providers_executors() 

1306 return sorted(self._executor_class_name_set) 

1307 

1308 @property 

1309 def executor_without_check(self) -> set[tuple[str, str]]: 

1310 """Returns set of (executor class names, provider package name) without correctness check.""" 

1311 self.initialize_providers_executors_without_check() 

1312 return self._executor_without_check_set 

1313 

1314 @property 

1315 def queue_class_names(self) -> list[str]: 

1316 self.initialize_providers_queues() 

1317 return sorted(self._queue_class_name_set) 

1318 

1319 @property 

1320 def filesystem_module_names(self) -> list[str]: 

1321 self.initialize_providers_filesystems() 

1322 return sorted(self._fs_set) 

1323 

1324 @property 

1325 def asset_factories(self) -> dict[str, Callable[..., Asset]]: 

1326 self.initialize_providers_asset_uri_resources() 

1327 return self._asset_factories 

1328 

1329 @property 

1330 def asset_uri_handlers(self) -> dict[str, Callable[[SplitResult], SplitResult]]: 

1331 self.initialize_providers_asset_uri_resources() 

1332 return self._asset_uri_handlers 

1333 

1334 @property 

1335 def asset_to_openlineage_converters( 

1336 self, 

1337 ) -> dict[str, Callable]: 

1338 self.initialize_providers_asset_uri_resources() 

1339 return self._asset_to_openlineage_converters 

1340 

1341 @property 

1342 def provider_configs(self) -> list[tuple[str, dict[str, Any]]]: 

1343 self.initialize_providers_configuration() 

1344 return sorted(self._provider_configs.items(), key=lambda x: x[0]) 

1345 

1346 @property 

1347 def already_initialized_provider_configs(self) -> list[tuple[str, dict[str, Any]]]: 

1348 return sorted(self._provider_configs.items(), key=lambda x: x[0]) 

1349 

1350 def _cleanup(self): 

1351 self._initialized_cache.clear() 

1352 self._provider_dict.clear() 

1353 self._fs_set.clear() 

1354 self._taskflow_decorators.clear() 

1355 self._hook_provider_dict.clear() 

1356 self._dialect_provider_dict.clear() 

1357 self._hooks_lazy_dict.clear() 

1358 self._connection_form_widgets.clear() 

1359 self._field_behaviours.clear() 

1360 self._extra_link_class_name_set.clear() 

1361 self._logging_class_name_set.clear() 

1362 self._auth_manager_class_name_set.clear() 

1363 self._auth_manager_without_check_set.clear() 

1364 self._secrets_backend_class_name_set.clear() 

1365 self._executor_class_name_set.clear() 

1366 self._executor_without_check_set.clear() 

1367 self._queue_class_name_set.clear() 

1368 self._provider_configs.clear() 

1369 self._trigger_info_set.clear() 

1370 self._notification_info_set.clear() 

1371 self._plugins_set.clear() 

1372 self._cli_command_functions_set.clear() 

1373 self._cli_command_provider_name_set.clear() 

1374 

1375 self._initialized = False 

1376 self._initialization_stack_trace = None