Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/providers_manager.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

790 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Manages all providers.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import functools 

24import inspect 

25import json 

26import logging 

27import traceback 

28import warnings 

29from collections.abc import Callable, Iterator, MutableMapping 

30from dataclasses import dataclass 

31from functools import wraps 

32from importlib.resources import files as resource_files 

33from time import perf_counter 

34from typing import TYPE_CHECKING, Any, NamedTuple, ParamSpec, TypeVar, cast 

35 

36from airflow import DeprecatedImportWarning 

37from airflow._shared.module_loading import import_string 

38from airflow._shared.providers_discovery import discover_all_providers_from_packages 

39from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning 

40from airflow.utils.log.logging_mixin import LoggingMixin 

41 

42if TYPE_CHECKING: 

43 from airflow.cli.cli_config import CLICommand 

44 

45log = logging.getLogger(__name__) 

46 

47 

48PS = ParamSpec("PS") 

49RT = TypeVar("RT") 

50 

51MIN_PROVIDER_VERSIONS = { 

52 "apache-airflow-providers-celery": "2.1.0", 

53} 

54 

55 

56def _ensure_prefix_for_placeholders(field_behaviors: dict[str, Any], conn_type: str): 

57 """ 

58 Verify the correct placeholder prefix. 

59 

60 If the given field_behaviors dict contains a placeholder's node, and there 

61 are placeholders for extra fields (i.e. anything other than the built-in conn 

62 attrs), and if those extra fields are unprefixed, then add the prefix. 

63 

64 The reason we need to do this is, all custom conn fields live in the same dictionary, 

65 so we need to namespace them with a prefix internally. But for user convenience, 

66 and consistency between the `get_ui_field_behaviour` method and the extra dict itself, 

67 we allow users to supply the unprefixed name. 

68 """ 

69 conn_attrs = {"host", "schema", "login", "password", "port", "extra"} 

70 

71 def ensure_prefix(field): 

72 if field not in conn_attrs and not field.startswith("extra__"): 

73 return f"extra__{conn_type}__{field}" 

74 return field 

75 

76 if "placeholders" in field_behaviors: 

77 placeholders = field_behaviors["placeholders"] 

78 field_behaviors["placeholders"] = {ensure_prefix(k): v for k, v in placeholders.items()} 

79 

80 return field_behaviors 

81 

82 

83if TYPE_CHECKING: 

84 from urllib.parse import SplitResult 

85 

86 from airflow.sdk import BaseHook 

87 from airflow.sdk.bases.decorator import TaskDecorator 

88 from airflow.sdk.definitions.asset import Asset 

89 

90 

91class LazyDictWithCache(MutableMapping): 

92 """ 

93 Lazy-loaded cached dictionary. 

94 

95 Dictionary, which in case you set callable, executes the passed callable with `key` attribute 

96 at first use - and returns and caches the result. 

97 """ 

98 

99 __slots__ = ["_resolved", "_raw_dict"] 

100 

101 def __init__(self, *args, **kw): 

102 self._resolved = set() 

103 self._raw_dict = dict(*args, **kw) 

104 

105 def __setitem__(self, key, value): 

106 self._raw_dict.__setitem__(key, value) 

107 

108 def __getitem__(self, key): 

109 value = self._raw_dict.__getitem__(key) 

110 if key not in self._resolved and callable(value): 

111 # exchange callable with result of calling it -- but only once! allow resolver to return a 

112 # callable itself 

113 value = value() 

114 self._resolved.add(key) 

115 self._raw_dict.__setitem__(key, value) 

116 return value 

117 

118 def __delitem__(self, key): 

119 with contextlib.suppress(KeyError): 

120 self._resolved.remove(key) 

121 self._raw_dict.__delitem__(key) 

122 

123 def __iter__(self): 

124 return iter(self._raw_dict) 

125 

126 def __len__(self): 

127 return len(self._raw_dict) 

128 

129 def __contains__(self, key): 

130 return key in self._raw_dict 

131 

132 def clear(self): 

133 self._resolved.clear() 

134 self._raw_dict.clear() 

135 

136 

137def _read_schema_from_resources_or_local_file(filename: str) -> dict: 

138 try: 

139 with resource_files("airflow").joinpath(filename).open("rb") as f: 

140 schema = json.load(f) 

141 except (TypeError, FileNotFoundError): 

142 import pathlib 

143 

144 with (pathlib.Path(__file__).parent / filename).open("rb") as f: 

145 schema = json.load(f) 

146 return schema 

147 

148 

149def _create_provider_info_schema_validator(): 

150 """Create JSON schema validator from the provider_info.schema.json.""" 

151 import jsonschema 

152 

153 schema = _read_schema_from_resources_or_local_file("provider_info.schema.json") 

154 cls = jsonschema.validators.validator_for(schema) 

155 validator = cls(schema) 

156 return validator 

157 

158 

159def _create_customized_form_field_behaviours_schema_validator(): 

160 """Create JSON schema validator from the customized_form_field_behaviours.schema.json.""" 

161 import jsonschema 

162 

163 schema = _read_schema_from_resources_or_local_file("customized_form_field_behaviours.schema.json") 

164 cls = jsonschema.validators.validator_for(schema) 

165 validator = cls(schema) 

166 return validator 

167 

168 

169def _check_builtin_provider_prefix(provider_package: str, class_name: str) -> bool: 

170 if provider_package.startswith("apache-airflow"): 

171 provider_path = provider_package[len("apache-") :].replace("-", ".") 

172 if not class_name.startswith(provider_path): 

173 log.warning( 

174 "Coherence check failed when importing '%s' from '%s' package. It should start with '%s'", 

175 class_name, 

176 provider_package, 

177 provider_path, 

178 ) 

179 return False 

180 return True 

181 

182 

183@dataclass 

184class ProviderInfo: 

185 """ 

186 Provider information. 

187 

188 :param version: version string 

189 :param data: dictionary with information about the provider 

190 """ 

191 

192 version: str 

193 data: dict 

194 

195 

196class HookClassProvider(NamedTuple): 

197 """Hook class and Provider it comes from.""" 

198 

199 hook_class_name: str 

200 package_name: str 

201 

202 

203class DialectInfo(NamedTuple): 

204 """Dialect class and Provider it comes from.""" 

205 

206 name: str 

207 dialect_class_name: str 

208 provider_name: str 

209 

210 

211class TriggerInfo(NamedTuple): 

212 """Trigger class and provider it comes from.""" 

213 

214 trigger_class_name: str 

215 package_name: str 

216 integration_name: str 

217 

218 

219class NotificationInfo(NamedTuple): 

220 """Notification class and provider it comes from.""" 

221 

222 notification_class_name: str 

223 package_name: str 

224 

225 

226class PluginInfo(NamedTuple): 

227 """Plugin class, name and provider it comes from.""" 

228 

229 name: str 

230 plugin_class: str 

231 provider_name: str 

232 

233 

234class HookInfo(NamedTuple): 

235 """Hook information.""" 

236 

237 hook_class_name: str 

238 connection_id_attribute_name: str 

239 package_name: str 

240 hook_name: str 

241 connection_type: str 

242 connection_testable: bool 

243 dialects: list[str] = [] 

244 

245 

246class ConnectionTypeHookUIMetadata(NamedTuple): 

247 """Hook metadata for one connection type (connection UI); ``field_behaviour`` is standard fields.""" 

248 

249 connection_type: str 

250 hook_name: str 

251 hook_class_name: str | None 

252 field_behaviour: dict | None 

253 

254 

255class ConnectionFormWidgetInfo(NamedTuple): 

256 """Connection Form Widget information.""" 

257 

258 hook_class_name: str 

259 package_name: str 

260 field: Any 

261 field_name: str 

262 is_sensitive: bool 

263 

264 

265def log_optional_feature_disabled(class_name, e, provider_package): 

266 """Log optional feature disabled.""" 

267 log.debug( 

268 "Optional feature disabled on exception when importing '%s' from '%s' package", 

269 class_name, 

270 provider_package, 

271 exc_info=e, 

272 ) 

273 log.info( 

274 "Optional provider feature disabled when importing '%s' from '%s' package", 

275 class_name, 

276 provider_package, 

277 ) 

278 

279 

280def log_import_warning(class_name, e, provider_package): 

281 """Log import warning.""" 

282 log.warning( 

283 "Exception when importing '%s' from '%s' package", 

284 class_name, 

285 provider_package, 

286 exc_info=e, 

287 ) 

288 

289 

290# This is a temporary measure until all community providers will add AirflowOptionalProviderFeatureException 

291# where they have optional features. We are going to add tests in our CI to catch all such cases and will 

292# fix them, but until now all "known unhandled optional feature errors" from community providers 

293# should be added here 

294KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS = [("apache-airflow-providers-google", "No module named 'paramiko'")] 

295 

296 

297def _correctness_check(provider_package: str, class_name: str, provider_info: ProviderInfo) -> Any: 

298 """ 

299 Perform coherence check on provider classes. 

300 

301 For apache-airflow providers - it checks if it starts with appropriate package. For all providers 

302 it tries to import the provider - checking that there are no exceptions during importing. 

303 It logs appropriate warning in case it detects any problems. 

304 

305 :param provider_package: name of the provider package 

306 :param class_name: name of the class to import 

307 

308 :return the class if the class is OK, None otherwise. 

309 """ 

310 if not _check_builtin_provider_prefix(provider_package, class_name): 

311 return None 

312 try: 

313 imported_class = import_string(class_name) 

314 except AirflowOptionalProviderFeatureException as e: 

315 # When the provider class raises AirflowOptionalProviderFeatureException 

316 # this is an expected case when only some classes in provider are 

317 # available. We just log debug level here and print info message in logs so that 

318 # the user is aware of it 

319 log_optional_feature_disabled(class_name, e, provider_package) 

320 return None 

321 except ImportError as e: 

322 if "No module named 'airflow.providers." in e.msg: 

323 # handle cases where another provider is missing. This can only happen if 

324 # there is an optional feature, so we log debug and print information about it 

325 log_optional_feature_disabled(class_name, e, provider_package) 

326 return None 

327 for known_error in KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS: 

328 # Until we convert all providers to use AirflowOptionalProviderFeatureException 

329 # we assume any problem with importing another "provider" is because this is an 

330 # optional feature, so we log debug and print information about it 

331 if known_error[0] == provider_package and known_error[1] in e.msg: 

332 log_optional_feature_disabled(class_name, e, provider_package) 

333 return None 

334 # But when we have no idea - we print warning to logs 

335 log_import_warning(class_name, e, provider_package) 

336 return None 

337 except Exception as e: 

338 log_import_warning(class_name, e, provider_package) 

339 return None 

340 return imported_class 

341 

342 

343# We want to have better control over initialization of parameters and be able to debug and test it 

344# So we add our own decorator 

345def provider_info_cache(cache_name: str) -> Callable[[Callable[PS, None]], Callable[PS, None]]: 

346 """ 

347 Decorate and cache provider info. 

348 

349 Decorator factory that create decorator that caches initialization of provider's parameters 

350 :param cache_name: Name of the cache 

351 """ 

352 

353 def provider_info_cache_decorator(func: Callable[PS, None]) -> Callable[PS, None]: 

354 @wraps(func) 

355 def wrapped_function(*args: PS.args, **kwargs: PS.kwargs) -> None: 

356 providers_manager_instance = args[0] 

357 if TYPE_CHECKING: 

358 assert isinstance(providers_manager_instance, ProvidersManager) 

359 

360 if cache_name in providers_manager_instance._initialized_cache: 

361 return 

362 start_time = perf_counter() 

363 log.debug("Initializing Providers Manager[%s]", cache_name) 

364 func(*args, **kwargs) 

365 providers_manager_instance._initialized_cache[cache_name] = True 

366 log.debug( 

367 "Initialization of Providers Manager[%s] took %.2f seconds", 

368 cache_name, 

369 perf_counter() - start_time, 

370 ) 

371 

372 return wrapped_function 

373 

374 return provider_info_cache_decorator 

375 

376 

377class ProvidersManager(LoggingMixin): 

378 """ 

379 Manages all provider distributions. 

380 

381 This is a Singleton class. The first time it is 

382 instantiated, it discovers all available providers in installed packages. 

383 """ 

384 

385 resource_version = "0" 

386 _initialized: bool = False 

387 _initialization_stack_trace = None 

388 _instance: ProvidersManager | None = None 

389 

390 def __new__(cls): 

391 if cls._instance is None: 

392 cls._instance = super().__new__(cls) 

393 return cls._instance 

394 

395 @staticmethod 

396 def initialized() -> bool: 

397 return ProvidersManager._initialized 

398 

399 @staticmethod 

400 def initialization_stack_trace() -> str | None: 

401 return ProvidersManager._initialization_stack_trace 

402 

403 def __init__(self): 

404 """Initialize the manager.""" 

405 # skip initialization if already initialized 

406 if self.initialized(): 

407 return 

408 

409 super().__init__() 

410 ProvidersManager._initialized = True 

411 ProvidersManager._initialization_stack_trace = "".join(traceback.format_stack(inspect.currentframe())) 

412 self._initialized_cache: dict[str, bool] = {} 

413 # Keeps dict of providers keyed by module name 

414 self._provider_dict: dict[str, ProviderInfo] = {} 

415 self._fs_set: set[str] = set() 

416 self._asset_uri_handlers: dict[str, Callable[[SplitResult], SplitResult]] = {} 

417 self._asset_factories: dict[str, Callable[..., Asset]] = {} 

418 self._asset_to_openlineage_converters: dict[str, Callable] = {} 

419 self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache() 

420 # keeps mapping between connection_types and hook class, package they come from 

421 self._hook_provider_dict: dict[str, HookClassProvider] = {} 

422 self._dialect_provider_dict: dict[str, DialectInfo] = {} 

423 # Keeps dict of hooks keyed by connection type. They are lazy evaluated at access time 

424 self._hooks_lazy_dict: LazyDictWithCache[str, HookInfo | Callable] = LazyDictWithCache() 

425 # Keeps hook display names read from provider.yaml (hook-name field) 

426 self._hook_name_dict: dict[str, str] = {} 

427 # Keeps methods that should be used to add custom widgets tuple of keyed by name of the extra field 

428 self._connection_form_widgets: dict[str, ConnectionFormWidgetInfo] = {} 

429 # Customizations for javascript fields are kept here 

430 self._field_behaviours: dict[str, dict] = {} 

431 self._cli_command_functions_set: set[Callable[[], list[CLICommand]]] = set() 

432 self._cli_command_provider_name_set: set[str] = set() 

433 self._extra_link_class_name_set: set[str] = set() 

434 self._logging_class_name_set: set[str] = set() 

435 self._auth_manager_class_name_set: set[str] = set() 

436 self._auth_manager_without_check_set: set[tuple[str, str]] = set() 

437 self._secrets_backend_class_name_set: set[str] = set() 

438 self._executor_class_name_set: set[str] = set() 

439 self._executor_without_check_set: set[tuple[str, str]] = set() 

440 self._queue_class_name_set: set[str] = set() 

441 self._db_manager_class_name_set: set[str] = set() 

442 self._provider_configs: dict[str, dict[str, Any]] = {} 

443 self._trigger_info_set: set[TriggerInfo] = set() 

444 self._notification_info_set: set[NotificationInfo] = set() 

445 self._provider_schema_validator = _create_provider_info_schema_validator() 

446 self._customized_form_fields_schema_validator = ( 

447 _create_customized_form_field_behaviours_schema_validator() 

448 ) 

449 # Set of plugins contained in providers 

450 self._plugins_set: set[PluginInfo] = set() 

451 self._init_airflow_core_hooks() 

452 

453 self._runtime_manager = None 

454 

455 def __getattribute__(self, name: str): 

456 # Hacky but does the trick for now 

457 runtime_properties = { 

458 "hooks", 

459 "taskflow_decorators", 

460 "filesystem_module_names", 

461 "asset_factories", 

462 "asset_uri_handlers", 

463 "asset_to_openlineage_converters", 

464 } 

465 

466 if name in runtime_properties: 

467 warnings.warn( 

468 f"ProvidersManager.{name} is deprecated. Use ProvidersManagerTaskRuntime.{name} from task-sdk instead.", 

469 DeprecatedImportWarning, 

470 stacklevel=2, 

471 ) 

472 if object.__getattribute__(self, "_runtime_manager") is None: 

473 from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime 

474 

475 object.__setattr__(self, "_runtime_manager", ProvidersManagerTaskRuntime()) 

476 return getattr(object.__getattribute__(self, "_runtime_manager"), name) 

477 

478 return object.__getattribute__(self, name) 

479 

480 def _init_airflow_core_hooks(self): 

481 """Initialize the hooks dict with default hooks from Airflow core.""" 

482 core_dummy_hooks = { 

483 "generic": "Generic", 

484 "email": "Email", 

485 } 

486 for key, display in core_dummy_hooks.items(): 

487 self._hooks_lazy_dict[key] = HookInfo( 

488 hook_class_name=None, 

489 connection_id_attribute_name=None, 

490 package_name=None, 

491 hook_name=display, 

492 connection_type=None, 

493 connection_testable=False, 

494 ) 

495 for conn_type, class_name in ( 

496 ("fs", "airflow.providers.standard.hooks.filesystem.FSHook"), 

497 ("package_index", "airflow.providers.standard.hooks.package_index.PackageIndexHook"), 

498 ): 

499 self._hooks_lazy_dict[conn_type] = functools.partial( 

500 self._import_hook, 

501 connection_type=None, 

502 package_name="apache-airflow-providers-standard", 

503 hook_class_name=class_name, 

504 provider_info=None, 

505 ) 

506 

507 @provider_info_cache("list") 

508 def initialize_providers_list(self): 

509 """Lazy initialization of providers list.""" 

510 # Local source folders are loaded first. They should take precedence over the package ones for 

511 # Development purpose. In production provider.yaml files are not present in the 'airflow" directory 

512 # So there is no risk we are going to override package provider accidentally. This can only happen 

513 # in case of local development 

514 discover_all_providers_from_packages(self._provider_dict, self._provider_schema_validator) 

515 self._verify_all_providers_all_compatible() 

516 self._provider_dict = dict(sorted(self._provider_dict.items())) 

517 

518 def _verify_all_providers_all_compatible(self): 

519 from packaging import version as packaging_version 

520 

521 for provider_id, info in self._provider_dict.items(): 

522 min_version = MIN_PROVIDER_VERSIONS.get(provider_id) 

523 if min_version: 

524 if packaging_version.parse(min_version) > packaging_version.parse(info.version): 

525 log.warning( 

526 "The package %s is not compatible with this version of Airflow. " 

527 "The package has version %s but the minimum supported version " 

528 "of the package is %s", 

529 provider_id, 

530 info.version, 

531 min_version, 

532 ) 

533 

534 @provider_info_cache("hooks") 

535 def initialize_providers_hooks(self): 

536 """Lazy initialization of providers hooks.""" 

537 self._init_airflow_core_hooks() 

538 self.initialize_providers_list() 

539 self._discover_hooks() 

540 self._load_ui_metadata() 

541 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items())) 

542 

543 @provider_info_cache("filesystems") 

544 def initialize_providers_filesystems(self): 

545 """Lazy initialization of providers filesystems.""" 

546 self.initialize_providers_list() 

547 self._discover_filesystems() 

548 

549 @provider_info_cache("asset_uris") 

550 def initialize_providers_asset_uri_resources(self): 

551 """Lazy initialization of provider asset URI handlers, factories, converters etc.""" 

552 self.initialize_providers_list() 

553 self._discover_asset_uri_resources() 

554 

555 @provider_info_cache("hook_lineage_writers") 

556 @provider_info_cache("taskflow_decorators") 

557 def initialize_providers_taskflow_decorator(self): 

558 """Lazy initialization of providers hooks.""" 

559 self.initialize_providers_list() 

560 self._discover_taskflow_decorators() 

561 

562 @provider_info_cache("extra_links") 

563 def initialize_providers_extra_links(self): 

564 """Lazy initialization of providers extra links.""" 

565 self.initialize_providers_list() 

566 self._discover_extra_links() 

567 

568 @provider_info_cache("logging") 

569 def initialize_providers_logging(self): 

570 """Lazy initialization of providers logging information.""" 

571 self.initialize_providers_list() 

572 self._discover_logging() 

573 

574 @provider_info_cache("secrets_backends") 

575 def initialize_providers_secrets_backends(self): 

576 """Lazy initialization of providers secrets_backends information.""" 

577 self.initialize_providers_list() 

578 self._discover_secrets_backends() 

579 

580 @provider_info_cache("executors") 

581 def initialize_providers_executors(self): 

582 """Lazy initialization of providers executors information.""" 

583 self.initialize_providers_list() 

584 self._discover_executors(check=True) 

585 

586 @provider_info_cache("executors_without_check") 

587 def initialize_providers_executors_without_check(self): 

588 """Lazy initialization of providers executors information.""" 

589 self.initialize_providers_list() 

590 self._discover_executors(check=False) 

591 

592 @provider_info_cache("queues") 

593 def initialize_providers_queues(self): 

594 """Lazy initialization of providers queue information.""" 

595 self.initialize_providers_list() 

596 self._discover_queues() 

597 

598 @provider_info_cache("db_managers") 

599 def initialize_providers_db_managers(self): 

600 """Lazy initialization of providers db_managers information.""" 

601 self.initialize_providers_list() 

602 self._discover_db_managers() 

603 

604 @provider_info_cache("notifications") 

605 def initialize_providers_notifications(self): 

606 """Lazy initialization of providers notifications information.""" 

607 self.initialize_providers_list() 

608 self._discover_notifications() 

609 

610 @provider_info_cache("auth_managers") 

611 def initialize_providers_auth_managers(self): 

612 """Lazy initialization of providers auth manager information.""" 

613 self.initialize_providers_list() 

614 self._discover_auth_managers(check=True) 

615 

616 @provider_info_cache("auth_managers_without_check") 

617 def initialize_providers_auth_managers_without_check(self): 

618 """Lazy initialization of providers auth manager information.""" 

619 self.initialize_providers_list() 

620 self._discover_auth_managers(check=False) 

621 

622 @provider_info_cache("config") 

623 def initialize_providers_configuration(self): 

624 """Lazy initialization of provider configuration metadata and merge it into ``conf``.""" 

625 self.initialize_providers_list() 

626 self._discover_config() 

627 

628 @provider_info_cache("plugins") 

629 def initialize_providers_plugins(self): 

630 self.initialize_providers_list() 

631 self._discover_plugins() 

632 

633 @provider_info_cache("cli_command") 

634 def initialize_providers_cli_command(self): 

635 """Lazy initialization of providers CLI commands.""" 

636 self.initialize_providers_list() 

637 self._discover_cli_command() 

638 

639 def _discover_hooks_from_connection_types( 

640 self, 

641 hook_class_names_registered: set[str], 

642 already_registered_warning_connection_types: set[str], 

643 package_name: str, 

644 provider: ProviderInfo, 

645 ): 

646 """ 

647 Discover hooks from the "connection-types" property. 

648 

649 This is new, better method that replaces discovery from hook-class-names as it 

650 allows to lazy import individual Hook classes when they are accessed. 

651 The "connection-types" keeps information about both - connection type and class 

652 name so we can discover all connection-types without importing the classes. 

653 :param hook_class_names_registered: set of registered hook class names for this provider 

654 :param already_registered_warning_connection_types: set of connections for which warning should be 

655 printed in logs as they were already registered before 

656 :param package_name: 

657 :param provider: 

658 :return: 

659 """ 

660 provider_uses_connection_types = False 

661 connection_types = provider.data.get("connection-types") 

662 if connection_types: 

663 for connection_type_dict in connection_types: 

664 connection_type = connection_type_dict["connection-type"] 

665 hook_class_name = connection_type_dict["hook-class-name"] 

666 hook_class_names_registered.add(hook_class_name) 

667 already_registered = self._hook_provider_dict.get(connection_type) 

668 if already_registered: 

669 if already_registered.package_name != package_name: 

670 already_registered_warning_connection_types.add(connection_type) 

671 else: 

672 log.warning( 

673 "The connection type '%s' is already registered in the" 

674 " package '%s' with different class names: '%s' and '%s'. ", 

675 connection_type, 

676 package_name, 

677 already_registered.hook_class_name, 

678 hook_class_name, 

679 ) 

680 else: 

681 self._hook_provider_dict[connection_type] = HookClassProvider( 

682 hook_class_name=hook_class_name, package_name=package_name 

683 ) 

684 # Defer importing hook to access time by setting import hook method as dict value 

685 self._hooks_lazy_dict[connection_type] = functools.partial( 

686 self._import_hook, 

687 connection_type=connection_type, 

688 provider_info=provider, 

689 ) 

690 provider_uses_connection_types = True 

691 return provider_uses_connection_types 

692 

693 def _discover_hooks_from_hook_class_names( 

694 self, 

695 hook_class_names_registered: set[str], 

696 already_registered_warning_connection_types: set[str], 

697 package_name: str, 

698 provider: ProviderInfo, 

699 provider_uses_connection_types: bool, 

700 ): 

701 """ 

702 Discover hooks from "hook-class-names' property. 

703 

704 This property is deprecated but we should support it in Airflow 2. 

705 The hook-class-names array contained just Hook names without connection type, 

706 therefore we need to import all those classes immediately to know which connection types 

707 are supported. This makes it impossible to selectively only import those hooks that are used. 

708 :param already_registered_warning_connection_types: list of connection hooks that we should warn 

709 about when finished discovery 

710 :param package_name: name of the provider package 

711 :param provider: class that keeps information about version and details of the provider 

712 :param provider_uses_connection_types: determines whether the provider uses "connection-types" new 

713 form of passing connection types 

714 :return: 

715 """ 

716 hook_class_names = provider.data.get("hook-class-names") 

717 if hook_class_names: 

718 for hook_class_name in hook_class_names: 

719 if hook_class_name in hook_class_names_registered: 

720 # Silently ignore the hook class - it's already marked for lazy-import by 

721 # connection-types discovery 

722 continue 

723 hook_info = self._import_hook( 

724 connection_type=None, 

725 provider_info=provider, 

726 hook_class_name=hook_class_name, 

727 package_name=package_name, 

728 ) 

729 if not hook_info: 

730 # Problem why importing class - we ignore it. Log is written at import time 

731 continue 

732 already_registered = self._hook_provider_dict.get(hook_info.connection_type) 

733 if already_registered: 

734 if already_registered.package_name != package_name: 

735 already_registered_warning_connection_types.add(hook_info.connection_type) 

736 else: 

737 if already_registered.hook_class_name != hook_class_name: 

738 log.warning( 

739 "The hook connection type '%s' is registered twice in the" 

740 " package '%s' with different class names: '%s' and '%s'. " 

741 " Please fix it!", 

742 hook_info.connection_type, 

743 package_name, 

744 already_registered.hook_class_name, 

745 hook_class_name, 

746 ) 

747 else: 

748 self._hook_provider_dict[hook_info.connection_type] = HookClassProvider( 

749 hook_class_name=hook_class_name, package_name=package_name 

750 ) 

751 self._hooks_lazy_dict[hook_info.connection_type] = hook_info 

752 

753 if not provider_uses_connection_types: 

754 warnings.warn( 

755 f"The provider {package_name} uses `hook-class-names` " 

756 "property in provider-info and has no `connection-types` one. " 

757 "The 'hook-class-names' property has been deprecated in favour " 

758 "of 'connection-types' in Airflow 2.2. Use **both** in case you want to " 

759 "have backwards compatibility with Airflow < 2.2", 

760 DeprecationWarning, 

761 stacklevel=1, 

762 ) 

763 for already_registered_connection_type in already_registered_warning_connection_types: 

764 log.warning( 

765 "The connection_type '%s' has been already registered by provider '%s.'", 

766 already_registered_connection_type, 

767 self._hook_provider_dict[already_registered_connection_type].package_name, 

768 ) 

769 

770 def _discover_hooks(self) -> None: 

771 """Retrieve all connections defined in the providers via Hooks.""" 

772 for package_name, provider in self._provider_dict.items(): 

773 duplicated_connection_types: set[str] = set() 

774 hook_class_names_registered: set[str] = set() 

775 self._discover_provider_dialects(package_name, provider) 

776 provider_uses_connection_types = self._discover_hooks_from_connection_types( 

777 hook_class_names_registered, duplicated_connection_types, package_name, provider 

778 ) 

779 self._discover_hooks_from_hook_class_names( 

780 hook_class_names_registered, 

781 duplicated_connection_types, 

782 package_name, 

783 provider, 

784 provider_uses_connection_types, 

785 ) 

786 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items())) 

787 

788 def _discover_provider_dialects(self, provider_name: str, provider: ProviderInfo): 

789 dialects = provider.data.get("dialects", []) 

790 if dialects: 

791 self._dialect_provider_dict.update( 

792 { 

793 item["dialect-type"]: DialectInfo( 

794 name=item["dialect-type"], 

795 dialect_class_name=item["dialect-class-name"], 

796 provider_name=provider_name, 

797 ) 

798 for item in dialects 

799 } 

800 ) 

801 

802 @provider_info_cache("import_all_hooks") 

803 def _import_info_from_all_hooks(self): 

804 """Force-import all hooks and initialize the connections/fields.""" 

805 # Retrieve all hooks to make sure that all of them are imported 

806 _ = list(self._hooks_lazy_dict.values()) 

807 self._field_behaviours = dict(sorted(self._field_behaviours.items())) 

808 

809 # Widgets for connection forms are currently used in two places: 

810 # 1. In the UI Connections, expected same order that it defined in Hook. 

811 # 2. cli command - `airflow providers widgets` and expected that it in alphabetical order. 

812 # It is not possible to recover original ordering after sorting, 

813 # that the main reason why original sorting moved to cli part: 

814 # self._connection_form_widgets = dict(sorted(self._connection_form_widgets.items())) 

815 

816 def _discover_filesystems(self) -> None: 

817 """Retrieve all filesystems defined in the providers.""" 

818 for provider_package, provider in self._provider_dict.items(): 

819 for fs_module_name in provider.data.get("filesystems", []): 

820 if _correctness_check(provider_package, f"{fs_module_name}.get_fs", provider): 

821 self._fs_set.add(fs_module_name) 

822 self._fs_set = set(sorted(self._fs_set)) 

823 

824 def _discover_asset_uri_resources(self) -> None: 

825 """Discovers and registers asset URI handlers, factories, and converters for all providers.""" 

826 from airflow.sdk.definitions.asset import normalize_noop 

827 

828 def _safe_register_resource( 

829 provider_package_name: str, 

830 schemes_list: list[str], 

831 resource_path: str | None, 

832 resource_registry: dict, 

833 default_resource: Any = None, 

834 ): 

835 """ 

836 Register a specific resource (handler, factory, or converter) for the given schemes. 

837 

838 If the resolved resource (either from the path or the default) is valid, it updates 

839 the resource registry with the appropriate resource for each scheme. 

840 """ 

841 resource = ( 

842 _correctness_check(provider_package_name, resource_path, provider) 

843 if resource_path is not None 

844 else default_resource 

845 ) 

846 if resource: 

847 resource_registry.update((scheme, resource) for scheme in schemes_list) 

848 

849 for provider_name, provider in self._provider_dict.items(): 

850 for uri_info in provider.data.get("asset-uris", []): 

851 if "schemes" not in uri_info or "handler" not in uri_info: 

852 continue # Both schemas and handler must be explicitly set, handler can be set to null 

853 common_args = {"schemes_list": uri_info["schemes"], "provider_package_name": provider_name} 

854 _safe_register_resource( 

855 resource_path=uri_info["handler"], 

856 resource_registry=self._asset_uri_handlers, 

857 default_resource=normalize_noop, 

858 **common_args, 

859 ) 

860 _safe_register_resource( 

861 resource_path=uri_info.get("factory"), 

862 resource_registry=self._asset_factories, 

863 **common_args, 

864 ) 

865 _safe_register_resource( 

866 resource_path=uri_info.get("to_openlineage_converter"), 

867 resource_registry=self._asset_to_openlineage_converters, 

868 **common_args, 

869 ) 

870 

871 def _discover_taskflow_decorators(self) -> None: 

872 for name, info in self._provider_dict.items(): 

873 for taskflow_decorator in info.data.get("task-decorators", []): 

874 self._add_taskflow_decorator( 

875 taskflow_decorator["name"], taskflow_decorator["class-name"], name 

876 ) 

877 

878 def _add_taskflow_decorator(self, name, decorator_class_name: str, provider_package: str) -> None: 

879 if not _check_builtin_provider_prefix(provider_package, decorator_class_name): 

880 return 

881 

882 if name in self._taskflow_decorators: 

883 try: 

884 existing = self._taskflow_decorators[name] 

885 other_name = f"{existing.__module__}.{existing.__name__}" 

886 except Exception: 

887 # If problem importing, then get the value from the functools.partial 

888 other_name = self._taskflow_decorators._raw_dict[name].args[0] # type: ignore[attr-defined] 

889 

890 log.warning( 

891 "The taskflow decorator '%s' has been already registered (by %s).", 

892 name, 

893 other_name, 

894 ) 

895 return 

896 

897 self._taskflow_decorators[name] = functools.partial(import_string, decorator_class_name) 

898 

899 @staticmethod 

900 def _get_attr(obj: Any, attr_name: str): 

901 """Retrieve attributes of an object, or warn if not found.""" 

902 if not hasattr(obj, attr_name): 

903 log.warning("The object '%s' is missing %s attribute and cannot be registered", obj, attr_name) 

904 return None 

905 return getattr(obj, attr_name) 

906 

907 def _get_connection_type_config(self, provider_info: ProviderInfo, connection_type: str) -> dict | None: 

908 """Get connection type config from provider.yaml if it exists.""" 

909 connection_types = provider_info.data.get("connection-types", []) 

910 for conn_config in connection_types: 

911 if conn_config.get("connection-type") == connection_type: 

912 return conn_config 

913 return None 

914 

915 def _to_api_format(self, field_name: str, field_def: dict) -> dict: 

916 """Convert conn-fields definition to format expected by the API.""" 

917 schema_def = field_def.get("schema", {}) 

918 

919 # build schema dict with label moved to `title` per jsonschema convention 

920 schema = schema_def.copy() 

921 if "label" in field_def: 

922 schema["title"] = field_def.get("label") 

923 

924 return { 

925 "value": schema_def.get("default"), 

926 "schema": schema, 

927 "description": field_def.get("description"), 

928 "source": None, 

929 } 

930 

931 def _add_widgets( 

932 self, package_name: str, hook_class_name: str, connection_type: str, conn_fields: dict 

933 ) -> None: 

934 """Parse conn-fields from provider info and add to connection_form_widgets.""" 

935 for field_name, field_def in conn_fields.items(): 

936 field_data = self._to_api_format(field_name, field_def) 

937 

938 prefixed_name = f"extra__{connection_type}__{field_name}" 

939 if prefixed_name in self._connection_form_widgets: 

940 log.warning( 

941 "Field %s for connection type %s already added, skipping", 

942 field_name, 

943 connection_type, 

944 ) 

945 continue 

946 

947 schema_def = field_def.get("schema", {}) 

948 self._connection_form_widgets[prefixed_name] = ConnectionFormWidgetInfo( 

949 hook_class_name=hook_class_name, 

950 package_name=package_name, 

951 field=field_data, 

952 field_name=field_name, 

953 is_sensitive=schema_def.get("format") == "password", 

954 ) 

955 

956 def _add_customized_fields(self, package_name: str, connection_type: str, behaviour: dict) -> None: 

957 """Process ui-field-behaviour from provider info and add to field_behaviours.""" 

958 if connection_type in self._field_behaviours: 

959 log.warning( 

960 "Field behaviour for connection type %s already exists, skipping", 

961 connection_type, 

962 ) 

963 return 

964 

965 # convert kebab-case keys to python style 

966 customized_fields = { 

967 "hidden_fields": behaviour.get("hidden-fields", []), 

968 "relabeling": behaviour.get("relabeling", {}), 

969 "placeholders": behaviour.get("placeholders", {}), 

970 } 

971 

972 try: 

973 self._customized_form_fields_schema_validator.validate(customized_fields) 

974 customized_fields = _ensure_prefix_for_placeholders(customized_fields, connection_type) 

975 self._field_behaviours[connection_type] = customized_fields 

976 except Exception as e: 

977 log.warning( 

978 "Failed to add field behaviour for %s in package %s: %s", 

979 connection_type, 

980 package_name, 

981 e, 

982 ) 

983 

984 def _load_ui_metadata(self) -> None: 

985 """Load connection form UI metadata from provider info without importing hooks.""" 

986 for package_name, provider in self._provider_dict.items(): 

987 for conn_config in provider.data.get("connection-types", []): 

988 connection_type = conn_config.get("connection-type") 

989 hook_class_name = conn_config.get("hook-class-name") 

990 if not connection_type or not hook_class_name: 

991 continue 

992 

993 if hook_name := conn_config.get("hook-name"): 

994 self._hook_name_dict[connection_type] = hook_name 

995 

996 if conn_fields := conn_config.get("conn-fields"): 

997 self._add_widgets(package_name, hook_class_name, connection_type, conn_fields) 

998 

999 if behaviour := conn_config.get("ui-field-behaviour"): 

1000 self._add_customized_fields(package_name, connection_type, behaviour) 

1001 

1002 def _import_hook( 

1003 self, 

1004 connection_type: str | None, 

1005 provider_info: ProviderInfo, 

1006 hook_class_name: str | None = None, 

1007 package_name: str | None = None, 

1008 ) -> HookInfo | None: 

1009 """ 

1010 Import hook and retrieve hook information. 

1011 

1012 Either connection_type (for lazy loading) or hook_class_name must be set - but not both). 

1013 Only needs package_name if hook_class_name is passed (for lazy loading, package_name 

1014 is retrieved from _connection_type_class_provider_dict together with hook_class_name). 

1015 

1016 :param connection_type: type of the connection 

1017 :param hook_class_name: name of the hook class 

1018 :param package_name: provider package - only needed in case connection_type is missing 

1019 : return 

1020 """ 

1021 if connection_type is None and hook_class_name is None: 

1022 raise ValueError("Either connection_type or hook_class_name must be set") 

1023 if connection_type is not None and hook_class_name is not None: 

1024 raise ValueError( 

1025 f"Both connection_type ({connection_type} and " 

1026 f"hook_class_name {hook_class_name} are set. Only one should be set!" 

1027 ) 

1028 if connection_type is not None: 

1029 class_provider = self._hook_provider_dict[connection_type] 

1030 package_name = class_provider.package_name 

1031 hook_class_name = class_provider.hook_class_name 

1032 else: 

1033 if not hook_class_name: 

1034 raise ValueError("Either connection_type or hook_class_name must be set") 

1035 if not package_name: 

1036 raise ValueError( 

1037 f"Provider package name is not set when hook_class_name ({hook_class_name}) is used" 

1038 ) 

1039 hook_class: type[BaseHook] | None = _correctness_check(package_name, hook_class_name, provider_info) 

1040 if hook_class is None: 

1041 return None 

1042 

1043 # Check if provider info already has UI metadata and skip Python hook methods 

1044 # to avoid duplicate initialization and unnecessary wtforms imports 

1045 ui_metadata_loaded = False 

1046 if provider_info and connection_type: 

1047 conn_config = self._get_connection_type_config(provider_info, connection_type) 

1048 ui_metadata_loaded = conn_config is not None and bool( 

1049 conn_config.get("conn-fields") or conn_config.get("ui-field-behaviour") 

1050 ) 

1051 

1052 if not ui_metadata_loaded: 

1053 try: 

1054 from wtforms import BooleanField, IntegerField, PasswordField, StringField 

1055 

1056 allowed_field_classes = [IntegerField, PasswordField, StringField, BooleanField] 

1057 # Do not use attr here. We want to check only direct class fields not those 

1058 # inherited from parent hook. This way we add form fields only once for the whole 

1059 # hierarchy and we add it only from the parent hook that provides those! 

1060 if "get_connection_form_widgets" in hook_class.__dict__: 

1061 warning = AirflowProviderDeprecationWarning( 

1062 f"Hook '{hook_class_name}' defines get_connection_form_widgets(). " 

1063 "This method is deprecated. Define connection fields declaratively in " 

1064 "provider.yaml under 'conn-fields' instead. See " 

1065 "https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html" 

1066 ) 

1067 warning.deprecated_provider_since = "3.2.0" 

1068 warnings.warn(warning, stacklevel=2) 

1069 widgets = hook_class.get_connection_form_widgets() 

1070 if widgets: 

1071 for widget in widgets.values(): 

1072 if widget.field_class not in allowed_field_classes: 

1073 log.warning( 

1074 "The hook_class '%s' uses field of unsupported class '%s'. " 

1075 "Only '%s' field classes are supported", 

1076 hook_class_name, 

1077 widget.field_class, 

1078 allowed_field_classes, 

1079 ) 

1080 return None 

1081 self._add_widgets_from_hook(package_name, hook_class, widgets) 

1082 if "get_ui_field_behaviour" in hook_class.__dict__: 

1083 warning = AirflowProviderDeprecationWarning( 

1084 f"Hook '{hook_class_name}' defines get_ui_field_behaviour(). " 

1085 "This method is deprecated. Define field behaviour declaratively in " 

1086 "provider.yaml under 'ui-field-behaviour' instead. See " 

1087 "https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html" 

1088 ) 

1089 warning.deprecated_provider_since = "3.2.0" 

1090 warnings.warn(warning, stacklevel=2) 

1091 field_behaviours = hook_class.get_ui_field_behaviour() 

1092 if field_behaviours: 

1093 self._add_customized_fields_from_hook(package_name, hook_class, field_behaviours) 

1094 except ImportError as e: 

1095 if e.name in ["flask_appbuilder", "wtforms"]: 

1096 log.info( 

1097 "The hook_class '%s' is not fully initialized (UI widgets will be missing), because " 

1098 "the 'flask_appbuilder' package is not installed, however it is not required for " 

1099 "Airflow components to work", 

1100 hook_class_name, 

1101 ) 

1102 except Exception as e: 

1103 log.warning( 

1104 "Exception when importing '%s' from '%s' package: %s", 

1105 hook_class_name, 

1106 package_name, 

1107 e, 

1108 ) 

1109 return None 

1110 

1111 hook_connection_type = self._get_attr(hook_class, "conn_type") 

1112 if connection_type: 

1113 if hook_connection_type != connection_type: 

1114 log.warning( 

1115 "Inconsistency! The hook class '%s' declares connection type '%s'" 

1116 " but it is added by provider '%s' as connection_type '%s' in provider info. " 

1117 "This should be fixed!", 

1118 hook_class, 

1119 hook_connection_type, 

1120 package_name, 

1121 connection_type, 

1122 ) 

1123 connection_type = hook_connection_type 

1124 connection_id_attribute_name: str = self._get_attr(hook_class, "conn_name_attr") 

1125 hook_name: str = self._get_attr(hook_class, "hook_name") 

1126 

1127 if not connection_type or not connection_id_attribute_name or not hook_name: 

1128 log.warning( 

1129 "The hook misses one of the key attributes: " 

1130 "conn_type: %s, conn_id_attribute_name: %s, hook_name: %s", 

1131 connection_type, 

1132 connection_id_attribute_name, 

1133 hook_name, 

1134 ) 

1135 return None 

1136 

1137 return HookInfo( 

1138 hook_class_name=hook_class_name, 

1139 connection_id_attribute_name=connection_id_attribute_name, 

1140 package_name=package_name, 

1141 hook_name=hook_name, 

1142 connection_type=connection_type, 

1143 connection_testable=hasattr(hook_class, "test_connection"), 

1144 ) 

1145 

1146 def _add_widgets_from_hook(self, package_name: str, hook_class: type, widgets: dict[str, Any]): 

1147 conn_type = hook_class.conn_type # type: ignore 

1148 for field_identifier, field in widgets.items(): 

1149 if field_identifier.startswith("extra__"): 

1150 prefixed_field_name = field_identifier 

1151 else: 

1152 prefixed_field_name = f"extra__{conn_type}__{field_identifier}" 

1153 if prefixed_field_name in self._connection_form_widgets: 

1154 log.warning( 

1155 "The field %s from class %s has already been added by another provider. Ignoring it.", 

1156 field_identifier, 

1157 hook_class.__name__, 

1158 ) 

1159 # In case of inherited hooks this might be happening several times 

1160 else: 

1161 self._connection_form_widgets[prefixed_field_name] = ConnectionFormWidgetInfo( 

1162 hook_class.__name__, 

1163 package_name, 

1164 field, 

1165 field_identifier, 

1166 hasattr(field.field_class.widget, "input_type") 

1167 and field.field_class.widget.input_type == "password", 

1168 ) 

1169 

1170 def _add_customized_fields_from_hook(self, package_name: str, hook_class: type, customized_fields: dict): 

1171 try: 

1172 connection_type = getattr(hook_class, "conn_type") 

1173 

1174 self._customized_form_fields_schema_validator.validate(customized_fields) 

1175 

1176 if connection_type: 

1177 customized_fields = _ensure_prefix_for_placeholders(customized_fields, connection_type) 

1178 

1179 if connection_type in self._field_behaviours: 

1180 log.warning( 

1181 "The connection_type %s from package %s and class %s has already been added " 

1182 "by another provider. Ignoring it.", 

1183 connection_type, 

1184 package_name, 

1185 hook_class.__name__, 

1186 ) 

1187 return 

1188 self._field_behaviours[connection_type] = customized_fields 

1189 except Exception as e: 

1190 log.warning( 

1191 "Error when loading customized fields from package '%s' hook class '%s': %s", 

1192 package_name, 

1193 hook_class.__name__, 

1194 e, 

1195 ) 

1196 

1197 def _discover_auth_managers(self, *, check: bool) -> None: 

1198 """Retrieve all auth managers defined in the providers.""" 

1199 for provider_package, provider in self._provider_dict.items(): 

1200 if provider.data.get("auth-managers"): 

1201 for auth_manager_class_name in provider.data["auth-managers"]: 

1202 if not check: 

1203 self._auth_manager_without_check_set.add((auth_manager_class_name, provider_package)) 

1204 elif _correctness_check(provider_package, auth_manager_class_name, provider): 

1205 self._auth_manager_class_name_set.add(auth_manager_class_name) 

1206 

1207 def _discover_cli_command(self) -> None: 

1208 """Retrieve all CLI command functions defined in the providers.""" 

1209 for provider_package, provider in self._provider_dict.items(): 

1210 if provider.data.get("cli"): 

1211 for cli_command_function_name in provider.data["cli"]: 

1212 # _correctness_check will return the function if found and correct 

1213 # we store the function itself instead of its name to avoid importing it again later in cli_parser to speed up cli loading 

1214 if cli_func := _correctness_check(provider_package, cli_command_function_name, provider): 

1215 cli_func = cast("Callable[[], list[CLICommand]]", cli_func) 

1216 self._cli_command_functions_set.add(cli_func) 

1217 self._cli_command_provider_name_set.add(provider_package) 

1218 

1219 def _discover_notifications(self) -> None: 

1220 """Retrieve all notifications defined in the providers.""" 

1221 for provider_package, provider in self._provider_dict.items(): 

1222 if provider.data.get("notifications"): 

1223 for notification_class_name in provider.data["notifications"]: 

1224 if _correctness_check(provider_package, notification_class_name, provider): 

1225 self._notification_info_set.add(notification_class_name) 

1226 

1227 def _discover_extra_links(self) -> None: 

1228 """Retrieve all extra links defined in the providers.""" 

1229 for provider_package, provider in self._provider_dict.items(): 

1230 if provider.data.get("extra-links"): 

1231 for extra_link_class_name in provider.data["extra-links"]: 

1232 if _correctness_check(provider_package, extra_link_class_name, provider): 

1233 self._extra_link_class_name_set.add(extra_link_class_name) 

1234 

1235 def _discover_logging(self) -> None: 

1236 """Retrieve all logging defined in the providers.""" 

1237 for provider_package, provider in self._provider_dict.items(): 

1238 if provider.data.get("logging"): 

1239 for logging_class_name in provider.data["logging"]: 

1240 if _correctness_check(provider_package, logging_class_name, provider): 

1241 self._logging_class_name_set.add(logging_class_name) 

1242 

1243 def _discover_secrets_backends(self) -> None: 

1244 """Retrieve all secrets backends defined in the providers.""" 

1245 for provider_package, provider in self._provider_dict.items(): 

1246 if provider.data.get("secrets-backends"): 

1247 for secrets_backends_class_name in provider.data["secrets-backends"]: 

1248 if _correctness_check(provider_package, secrets_backends_class_name, provider): 

1249 self._secrets_backend_class_name_set.add(secrets_backends_class_name) 

1250 

1251 def _discover_executors(self, *, check: bool) -> None: 

1252 """Retrieve all executors defined in the providers.""" 

1253 for provider_package, provider in self._provider_dict.items(): 

1254 if provider.data.get("executors"): 

1255 for executors_class_path in provider.data["executors"]: 

1256 if not check: 

1257 self._executor_without_check_set.add((executors_class_path, provider_package)) 

1258 elif _correctness_check(provider_package, executors_class_path, provider): 

1259 self._executor_class_name_set.add(executors_class_path) 

1260 

1261 def _discover_queues(self) -> None: 

1262 """Retrieve all queues defined in the providers.""" 

1263 for provider_package, provider in self._provider_dict.items(): 

1264 if provider.data.get("queues"): 

1265 for queue_class_name in provider.data["queues"]: 

1266 if _correctness_check(provider_package, queue_class_name, provider): 

1267 self._queue_class_name_set.add(queue_class_name) 

1268 

1269 def _discover_db_managers(self) -> None: 

1270 """Retrieve all DB managers defined in the providers.""" 

1271 for provider_package, provider in self._provider_dict.items(): 

1272 if provider.data.get("db-managers"): 

1273 for db_manager_class_name in provider.data["db-managers"]: 

1274 if _correctness_check(provider_package, db_manager_class_name, provider): 

1275 self._db_manager_class_name_set.add(db_manager_class_name) 

1276 

1277 def _discover_config(self) -> None: 

1278 """Retrieve all configs defined in the providers.""" 

1279 for provider_package, provider in self._provider_dict.items(): 

1280 if provider.data.get("config"): 

1281 self._provider_configs[provider_package] = provider.data.get("config") # type: ignore[assignment] 

1282 

1283 def _discover_plugins(self) -> None: 

1284 """Retrieve all plugins defined in the providers.""" 

1285 for provider_package, provider in self._provider_dict.items(): 

1286 for plugin_dict in provider.data.get("plugins", ()): 

1287 if not _correctness_check(provider_package, plugin_dict["plugin-class"], provider): 

1288 log.warning("Plugin not loaded due to above correctness check problem.") 

1289 continue 

1290 self._plugins_set.add( 

1291 PluginInfo( 

1292 name=plugin_dict["name"], 

1293 plugin_class=plugin_dict["plugin-class"], 

1294 provider_name=provider_package, 

1295 ) 

1296 ) 

1297 

1298 @provider_info_cache("triggers") 

1299 def initialize_providers_triggers(self): 

1300 """Initialize providers triggers.""" 

1301 self.initialize_providers_list() 

1302 for provider_package, provider in self._provider_dict.items(): 

1303 for trigger in provider.data.get("triggers", []): 

1304 for trigger_class_name in trigger.get("python-modules"): 

1305 self._trigger_info_set.add( 

1306 TriggerInfo( 

1307 package_name=provider_package, 

1308 trigger_class_name=trigger_class_name, 

1309 integration_name=trigger.get("integration-name", ""), 

1310 ) 

1311 ) 

1312 

1313 @property 

1314 def auth_managers(self) -> list[str]: 

1315 """Returns information about available providers notifications class.""" 

1316 self.initialize_providers_auth_managers() 

1317 return sorted(self._auth_manager_class_name_set) 

1318 

1319 @property 

1320 def auth_manager_without_check(self) -> set[tuple[str, str]]: 

1321 """Returns set of (auth manager class names, provider package name) without correctness check.""" 

1322 self.initialize_providers_auth_managers_without_check() 

1323 return self._auth_manager_without_check_set 

1324 

1325 @property 

1326 def cli_command_functions(self) -> set[Callable[[], list[CLICommand]]]: 

1327 """Returns list of CLI command function names from providers.""" 

1328 self.initialize_providers_cli_command() 

1329 return self._cli_command_functions_set 

1330 

1331 @property 

1332 def cli_command_providers(self) -> set[str]: 

1333 """Returns set of provider package names that provide CLI commands.""" 

1334 self.initialize_providers_cli_command() 

1335 return self._cli_command_provider_name_set 

1336 

1337 @property 

1338 def notification(self) -> list[NotificationInfo]: 

1339 """Returns information about available providers notifications class.""" 

1340 self.initialize_providers_notifications() 

1341 return sorted(self._notification_info_set) 

1342 

1343 @property 

1344 def trigger(self) -> list[TriggerInfo]: 

1345 """Returns information about available providers trigger class.""" 

1346 self.initialize_providers_triggers() 

1347 return sorted(self._trigger_info_set, key=lambda x: x.package_name) 

1348 

1349 @property 

1350 def providers(self) -> dict[str, ProviderInfo]: 

1351 """Returns information about available providers.""" 

1352 self.initialize_providers_list() 

1353 return self._provider_dict 

1354 

1355 @property 

1356 def hooks(self) -> MutableMapping[str, HookInfo | None]: 

1357 """ 

1358 Return dictionary of connection_type-to-hook mapping. 

1359 

1360 Note that the dict can contain None values if a hook discovered cannot be imported! 

1361 """ 

1362 self.initialize_providers_hooks() 

1363 # When we return hooks here it will only be used to retrieve hook information 

1364 return self._hooks_lazy_dict 

1365 

1366 def iter_connection_type_hook_ui_metadata(self) -> Iterator[ConnectionTypeHookUIMetadata]: 

1367 """ 

1368 Yield hook metadata per connection type for the connection UI. 

1369 

1370 Does not import hook classes. 

1371 """ 

1372 self.initialize_providers_hooks() 

1373 all_types = frozenset(self._hooks_lazy_dict) | frozenset(self._hook_provider_dict) 

1374 for conn_type in sorted(all_types): 

1375 raw_entry = self._hooks_lazy_dict._raw_dict.get(conn_type) 

1376 provider_entry = self._hook_provider_dict.get(conn_type) 

1377 if isinstance(raw_entry, HookInfo): 

1378 hook_name = raw_entry.hook_name 

1379 hook_class_name = raw_entry.hook_class_name 

1380 elif provider_entry: 

1381 hook_name = self._hook_name_dict.get(conn_type, conn_type) 

1382 hook_class_name = provider_entry.hook_class_name 

1383 else: 

1384 hook_name = self._hook_name_dict.get(conn_type, conn_type) 

1385 hook_class_name = None 

1386 yield ConnectionTypeHookUIMetadata( 

1387 connection_type=conn_type, 

1388 hook_name=hook_name, 

1389 hook_class_name=hook_class_name, 

1390 field_behaviour=self._field_behaviours.get(conn_type), 

1391 ) 

1392 

1393 @property 

1394 def _connection_form_widgets_from_metadata(self) -> dict[str, ConnectionFormWidgetInfo]: 

1395 """Return connection form widgets from metadata without importing every hook.""" 

1396 self.initialize_providers_hooks() 

1397 return self._connection_form_widgets 

1398 

1399 @property 

1400 def _field_behaviours_from_metadata(self) -> dict[str, dict]: 

1401 """Return field behaviour dicts from metadata without importing every hook.""" 

1402 self.initialize_providers_hooks() 

1403 return self._field_behaviours 

1404 

1405 @property 

1406 def dialects(self) -> MutableMapping[str, DialectInfo]: 

1407 """Return dictionary of connection_type-to-dialect mapping.""" 

1408 self.initialize_providers_hooks() 

1409 # When we return dialects here it will only be used to retrieve dialect information 

1410 return self._dialect_provider_dict 

1411 

1412 @property 

1413 def plugins(self) -> list[PluginInfo]: 

1414 """Returns information about plugins available in providers.""" 

1415 self.initialize_providers_plugins() 

1416 return sorted(self._plugins_set, key=lambda x: x.plugin_class) 

1417 

1418 @property 

1419 def taskflow_decorators(self) -> dict[str, TaskDecorator]: 

1420 self.initialize_providers_taskflow_decorator() 

1421 return self._taskflow_decorators # type: ignore[return-value] 

1422 

1423 @property 

1424 def extra_links_class_names(self) -> list[str]: 

1425 """Returns set of extra link class names.""" 

1426 self.initialize_providers_extra_links() 

1427 return sorted(self._extra_link_class_name_set) 

1428 

1429 @property 

1430 def connection_form_widgets(self) -> dict[str, ConnectionFormWidgetInfo]: 

1431 """ 

1432 Returns widgets for connection forms. 

1433 

1434 Dictionary keys in the same order that it defined in Hook. 

1435 """ 

1436 self.initialize_providers_hooks() 

1437 self._import_info_from_all_hooks() 

1438 return self._connection_form_widgets 

1439 

1440 @property 

1441 def field_behaviours(self) -> dict[str, dict]: 

1442 """Returns dictionary with field behaviours for connection types.""" 

1443 self.initialize_providers_hooks() 

1444 self._import_info_from_all_hooks() 

1445 return self._field_behaviours 

1446 

1447 @property 

1448 def logging_class_names(self) -> list[str]: 

1449 """Returns set of log task handlers class names.""" 

1450 self.initialize_providers_logging() 

1451 return sorted(self._logging_class_name_set) 

1452 

1453 @property 

1454 def secrets_backend_class_names(self) -> list[str]: 

1455 """Returns set of secret backend class names.""" 

1456 self.initialize_providers_secrets_backends() 

1457 return sorted(self._secrets_backend_class_name_set) 

1458 

1459 @property 

1460 def executor_class_names(self) -> list[str]: 

1461 self.initialize_providers_executors() 

1462 return sorted(self._executor_class_name_set) 

1463 

1464 @property 

1465 def executor_without_check(self) -> set[tuple[str, str]]: 

1466 """Returns set of (executor class names, provider package name) without correctness check.""" 

1467 self.initialize_providers_executors_without_check() 

1468 return self._executor_without_check_set 

1469 

1470 @property 

1471 def queue_class_names(self) -> list[str]: 

1472 self.initialize_providers_queues() 

1473 return sorted(self._queue_class_name_set) 

1474 

1475 @property 

1476 def db_managers(self) -> list[str]: 

1477 self.initialize_providers_db_managers() 

1478 return sorted(self._db_manager_class_name_set) 

1479 

1480 @property 

1481 def filesystem_module_names(self) -> list[str]: 

1482 self.initialize_providers_filesystems() 

1483 return sorted(self._fs_set) 

1484 

1485 @property 

1486 def asset_factories(self) -> dict[str, Callable[..., Asset]]: 

1487 self.initialize_providers_asset_uri_resources() 

1488 return self._asset_factories 

1489 

1490 @property 

1491 def asset_uri_handlers(self) -> dict[str, Callable[[SplitResult], SplitResult]]: 

1492 self.initialize_providers_asset_uri_resources() 

1493 return self._asset_uri_handlers 

1494 

1495 @property 

1496 def asset_to_openlineage_converters( 

1497 self, 

1498 ) -> dict[str, Callable]: 

1499 self.initialize_providers_asset_uri_resources() 

1500 return self._asset_to_openlineage_converters 

1501 

1502 @property 

1503 def provider_configs(self) -> list[tuple[str, dict[str, Any]]]: 

1504 self.initialize_providers_configuration() 

1505 return sorted(self._provider_configs.items(), key=lambda x: x[0]) 

1506 

1507 @property 

1508 def already_initialized_provider_configs(self) -> list[tuple[str, dict[str, Any]]]: 

1509 """ 

1510 Return provider configs that have already been initialized. 

1511 

1512 .. deprecated:: 3.2.0 

1513 Use ``provider_configs`` instead. This property is kept for backwards 

1514 compatibility and will be removed in a future version. 

1515 """ 

1516 warnings.warn( 

1517 "already_initialized_provider_configs is deprecated. Use `provider_configs` instead.", 

1518 DeprecationWarning, 

1519 stacklevel=2, 

1520 ) 

1521 return sorted(self._provider_configs.items(), key=lambda x: x[0]) 

1522 

1523 def _cleanup(self): 

1524 self._initialized_cache.clear() 

1525 self._provider_dict.clear() 

1526 self._fs_set.clear() 

1527 self._taskflow_decorators.clear() 

1528 self._hook_provider_dict.clear() 

1529 self._dialect_provider_dict.clear() 

1530 self._hooks_lazy_dict.clear() 

1531 self._connection_form_widgets.clear() 

1532 self._field_behaviours.clear() 

1533 self._extra_link_class_name_set.clear() 

1534 self._logging_class_name_set.clear() 

1535 self._auth_manager_class_name_set.clear() 

1536 self._auth_manager_without_check_set.clear() 

1537 self._secrets_backend_class_name_set.clear() 

1538 self._executor_class_name_set.clear() 

1539 self._executor_without_check_set.clear() 

1540 self._queue_class_name_set.clear() 

1541 self._provider_configs.clear() 

1542 

1543 # Imported lazily to avoid a configuration/providers_manager import cycle during cleanup. 

1544 from airflow.configuration import conf 

1545 

1546 conf.invalidate_cache() 

1547 

1548 self._trigger_info_set.clear() 

1549 self._notification_info_set.clear() 

1550 self._plugins_set.clear() 

1551 self._cli_command_functions_set.clear() 

1552 self._cli_command_provider_name_set.clear() 

1553 

1554 self._initialized = False 

1555 self._initialization_stack_trace = None