Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/providers_manager.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

759 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Manages all providers.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import functools 

24import inspect 

25import json 

26import logging 

27import traceback 

28import warnings 

29from collections.abc import Callable, MutableMapping 

30from dataclasses import dataclass 

31from functools import wraps 

32from importlib.resources import files as resource_files 

33from time import perf_counter 

34from typing import TYPE_CHECKING, Any, NamedTuple, ParamSpec, TypeVar, cast 

35 

36from airflow import DeprecatedImportWarning 

37from airflow._shared.module_loading import import_string 

38from airflow._shared.providers_discovery import discover_all_providers_from_packages 

39from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning 

40from airflow.utils.log.logging_mixin import LoggingMixin 

41 

42if TYPE_CHECKING: 

43 from airflow.cli.cli_config import CLICommand 

44 

45log = logging.getLogger(__name__) 

46 

47 

48PS = ParamSpec("PS") 

49RT = TypeVar("RT") 

50 

51MIN_PROVIDER_VERSIONS = { 

52 "apache-airflow-providers-celery": "2.1.0", 

53} 

54 

55 

56def _ensure_prefix_for_placeholders(field_behaviors: dict[str, Any], conn_type: str): 

57 """ 

58 Verify the correct placeholder prefix. 

59 

60 If the given field_behaviors dict contains a placeholder's node, and there 

61 are placeholders for extra fields (i.e. anything other than the built-in conn 

62 attrs), and if those extra fields are unprefixed, then add the prefix. 

63 

64 The reason we need to do this is, all custom conn fields live in the same dictionary, 

65 so we need to namespace them with a prefix internally. But for user convenience, 

66 and consistency between the `get_ui_field_behaviour` method and the extra dict itself, 

67 we allow users to supply the unprefixed name. 

68 """ 

69 conn_attrs = {"host", "schema", "login", "password", "port", "extra"} 

70 

71 def ensure_prefix(field): 

72 if field not in conn_attrs and not field.startswith("extra__"): 

73 return f"extra__{conn_type}__{field}" 

74 return field 

75 

76 if "placeholders" in field_behaviors: 

77 placeholders = field_behaviors["placeholders"] 

78 field_behaviors["placeholders"] = {ensure_prefix(k): v for k, v in placeholders.items()} 

79 

80 return field_behaviors 

81 

82 

83if TYPE_CHECKING: 

84 from urllib.parse import SplitResult 

85 

86 from airflow.sdk import BaseHook 

87 from airflow.sdk.bases.decorator import TaskDecorator 

88 from airflow.sdk.definitions.asset import Asset 

89 

90 

91class LazyDictWithCache(MutableMapping): 

92 """ 

93 Lazy-loaded cached dictionary. 

94 

95 Dictionary, which in case you set callable, executes the passed callable with `key` attribute 

96 at first use - and returns and caches the result. 

97 """ 

98 

99 __slots__ = ["_resolved", "_raw_dict"] 

100 

101 def __init__(self, *args, **kw): 

102 self._resolved = set() 

103 self._raw_dict = dict(*args, **kw) 

104 

105 def __setitem__(self, key, value): 

106 self._raw_dict.__setitem__(key, value) 

107 

108 def __getitem__(self, key): 

109 value = self._raw_dict.__getitem__(key) 

110 if key not in self._resolved and callable(value): 

111 # exchange callable with result of calling it -- but only once! allow resolver to return a 

112 # callable itself 

113 value = value() 

114 self._resolved.add(key) 

115 self._raw_dict.__setitem__(key, value) 

116 return value 

117 

118 def __delitem__(self, key): 

119 with contextlib.suppress(KeyError): 

120 self._resolved.remove(key) 

121 self._raw_dict.__delitem__(key) 

122 

123 def __iter__(self): 

124 return iter(self._raw_dict) 

125 

126 def __len__(self): 

127 return len(self._raw_dict) 

128 

129 def __contains__(self, key): 

130 return key in self._raw_dict 

131 

132 def clear(self): 

133 self._resolved.clear() 

134 self._raw_dict.clear() 

135 

136 

137def _read_schema_from_resources_or_local_file(filename: str) -> dict: 

138 try: 

139 with resource_files("airflow").joinpath(filename).open("rb") as f: 

140 schema = json.load(f) 

141 except (TypeError, FileNotFoundError): 

142 import pathlib 

143 

144 with (pathlib.Path(__file__).parent / filename).open("rb") as f: 

145 schema = json.load(f) 

146 return schema 

147 

148 

149def _create_provider_info_schema_validator(): 

150 """Create JSON schema validator from the provider_info.schema.json.""" 

151 import jsonschema 

152 

153 schema = _read_schema_from_resources_or_local_file("provider_info.schema.json") 

154 cls = jsonschema.validators.validator_for(schema) 

155 validator = cls(schema) 

156 return validator 

157 

158 

159def _create_customized_form_field_behaviours_schema_validator(): 

160 """Create JSON schema validator from the customized_form_field_behaviours.schema.json.""" 

161 import jsonschema 

162 

163 schema = _read_schema_from_resources_or_local_file("customized_form_field_behaviours.schema.json") 

164 cls = jsonschema.validators.validator_for(schema) 

165 validator = cls(schema) 

166 return validator 

167 

168 

169def _check_builtin_provider_prefix(provider_package: str, class_name: str) -> bool: 

170 if provider_package.startswith("apache-airflow"): 

171 provider_path = provider_package[len("apache-") :].replace("-", ".") 

172 if not class_name.startswith(provider_path): 

173 log.warning( 

174 "Coherence check failed when importing '%s' from '%s' package. It should start with '%s'", 

175 class_name, 

176 provider_package, 

177 provider_path, 

178 ) 

179 return False 

180 return True 

181 

182 

183@dataclass 

184class ProviderInfo: 

185 """ 

186 Provider information. 

187 

188 :param version: version string 

189 :param data: dictionary with information about the provider 

190 """ 

191 

192 version: str 

193 data: dict 

194 

195 

196class HookClassProvider(NamedTuple): 

197 """Hook class and Provider it comes from.""" 

198 

199 hook_class_name: str 

200 package_name: str 

201 

202 

203class DialectInfo(NamedTuple): 

204 """Dialect class and Provider it comes from.""" 

205 

206 name: str 

207 dialect_class_name: str 

208 provider_name: str 

209 

210 

211class TriggerInfo(NamedTuple): 

212 """Trigger class and provider it comes from.""" 

213 

214 trigger_class_name: str 

215 package_name: str 

216 integration_name: str 

217 

218 

219class NotificationInfo(NamedTuple): 

220 """Notification class and provider it comes from.""" 

221 

222 notification_class_name: str 

223 package_name: str 

224 

225 

226class PluginInfo(NamedTuple): 

227 """Plugin class, name and provider it comes from.""" 

228 

229 name: str 

230 plugin_class: str 

231 provider_name: str 

232 

233 

234class HookInfo(NamedTuple): 

235 """Hook information.""" 

236 

237 hook_class_name: str 

238 connection_id_attribute_name: str 

239 package_name: str 

240 hook_name: str 

241 connection_type: str 

242 connection_testable: bool 

243 dialects: list[str] = [] 

244 

245 

246class ConnectionFormWidgetInfo(NamedTuple): 

247 """Connection Form Widget information.""" 

248 

249 hook_class_name: str 

250 package_name: str 

251 field: Any 

252 field_name: str 

253 is_sensitive: bool 

254 

255 

256def log_optional_feature_disabled(class_name, e, provider_package): 

257 """Log optional feature disabled.""" 

258 log.debug( 

259 "Optional feature disabled on exception when importing '%s' from '%s' package", 

260 class_name, 

261 provider_package, 

262 exc_info=e, 

263 ) 

264 log.info( 

265 "Optional provider feature disabled when importing '%s' from '%s' package", 

266 class_name, 

267 provider_package, 

268 ) 

269 

270 

271def log_import_warning(class_name, e, provider_package): 

272 """Log import warning.""" 

273 log.warning( 

274 "Exception when importing '%s' from '%s' package", 

275 class_name, 

276 provider_package, 

277 exc_info=e, 

278 ) 

279 

280 

281# This is a temporary measure until all community providers will add AirflowOptionalProviderFeatureException 

282# where they have optional features. We are going to add tests in our CI to catch all such cases and will 

283# fix them, but until now all "known unhandled optional feature errors" from community providers 

284# should be added here 

285KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS = [("apache-airflow-providers-google", "No module named 'paramiko'")] 

286 

287 

288def _correctness_check(provider_package: str, class_name: str, provider_info: ProviderInfo) -> Any: 

289 """ 

290 Perform coherence check on provider classes. 

291 

292 For apache-airflow providers - it checks if it starts with appropriate package. For all providers 

293 it tries to import the provider - checking that there are no exceptions during importing. 

294 It logs appropriate warning in case it detects any problems. 

295 

296 :param provider_package: name of the provider package 

297 :param class_name: name of the class to import 

298 

299 :return the class if the class is OK, None otherwise. 

300 """ 

301 if not _check_builtin_provider_prefix(provider_package, class_name): 

302 return None 

303 try: 

304 imported_class = import_string(class_name) 

305 except AirflowOptionalProviderFeatureException as e: 

306 # When the provider class raises AirflowOptionalProviderFeatureException 

307 # this is an expected case when only some classes in provider are 

308 # available. We just log debug level here and print info message in logs so that 

309 # the user is aware of it 

310 log_optional_feature_disabled(class_name, e, provider_package) 

311 return None 

312 except ImportError as e: 

313 if "No module named 'airflow.providers." in e.msg: 

314 # handle cases where another provider is missing. This can only happen if 

315 # there is an optional feature, so we log debug and print information about it 

316 log_optional_feature_disabled(class_name, e, provider_package) 

317 return None 

318 for known_error in KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS: 

319 # Until we convert all providers to use AirflowOptionalProviderFeatureException 

320 # we assume any problem with importing another "provider" is because this is an 

321 # optional feature, so we log debug and print information about it 

322 if known_error[0] == provider_package and known_error[1] in e.msg: 

323 log_optional_feature_disabled(class_name, e, provider_package) 

324 return None 

325 # But when we have no idea - we print warning to logs 

326 log_import_warning(class_name, e, provider_package) 

327 return None 

328 except Exception as e: 

329 log_import_warning(class_name, e, provider_package) 

330 return None 

331 return imported_class 

332 

333 

334# We want to have better control over initialization of parameters and be able to debug and test it 

335# So we add our own decorator 

336def provider_info_cache(cache_name: str) -> Callable[[Callable[PS, None]], Callable[PS, None]]: 

337 """ 

338 Decorate and cache provider info. 

339 

340 Decorator factory that create decorator that caches initialization of provider's parameters 

341 :param cache_name: Name of the cache 

342 """ 

343 

344 def provider_info_cache_decorator(func: Callable[PS, None]) -> Callable[PS, None]: 

345 @wraps(func) 

346 def wrapped_function(*args: PS.args, **kwargs: PS.kwargs) -> None: 

347 providers_manager_instance = args[0] 

348 if TYPE_CHECKING: 

349 assert isinstance(providers_manager_instance, ProvidersManager) 

350 

351 if cache_name in providers_manager_instance._initialized_cache: 

352 return 

353 start_time = perf_counter() 

354 log.debug("Initializing Providers Manager[%s]", cache_name) 

355 func(*args, **kwargs) 

356 providers_manager_instance._initialized_cache[cache_name] = True 

357 log.debug( 

358 "Initialization of Providers Manager[%s] took %.2f seconds", 

359 cache_name, 

360 perf_counter() - start_time, 

361 ) 

362 

363 return wrapped_function 

364 

365 return provider_info_cache_decorator 

366 

367 

368class ProvidersManager(LoggingMixin): 

369 """ 

370 Manages all provider distributions. 

371 

372 This is a Singleton class. The first time it is 

373 instantiated, it discovers all available providers in installed packages. 

374 """ 

375 

376 resource_version = "0" 

377 _initialized: bool = False 

378 _initialization_stack_trace = None 

379 _instance: ProvidersManager | None = None 

380 

381 def __new__(cls): 

382 if cls._instance is None: 

383 cls._instance = super().__new__(cls) 

384 return cls._instance 

385 

386 @staticmethod 

387 def initialized() -> bool: 

388 return ProvidersManager._initialized 

389 

390 @staticmethod 

391 def initialization_stack_trace() -> str | None: 

392 return ProvidersManager._initialization_stack_trace 

393 

394 def __init__(self): 

395 """Initialize the manager.""" 

396 # skip initialization if already initialized 

397 if self.initialized(): 

398 return 

399 

400 super().__init__() 

401 ProvidersManager._initialized = True 

402 ProvidersManager._initialization_stack_trace = "".join(traceback.format_stack(inspect.currentframe())) 

403 self._initialized_cache: dict[str, bool] = {} 

404 # Keeps dict of providers keyed by module name 

405 self._provider_dict: dict[str, ProviderInfo] = {} 

406 self._fs_set: set[str] = set() 

407 self._asset_uri_handlers: dict[str, Callable[[SplitResult], SplitResult]] = {} 

408 self._asset_factories: dict[str, Callable[..., Asset]] = {} 

409 self._asset_to_openlineage_converters: dict[str, Callable] = {} 

410 self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache() 

411 # keeps mapping between connection_types and hook class, package they come from 

412 self._hook_provider_dict: dict[str, HookClassProvider] = {} 

413 self._dialect_provider_dict: dict[str, DialectInfo] = {} 

414 # Keeps dict of hooks keyed by connection type. They are lazy evaluated at access time 

415 self._hooks_lazy_dict: LazyDictWithCache[str, HookInfo | Callable] = LazyDictWithCache() 

416 # Keeps methods that should be used to add custom widgets tuple of keyed by name of the extra field 

417 self._connection_form_widgets: dict[str, ConnectionFormWidgetInfo] = {} 

418 # Customizations for javascript fields are kept here 

419 self._field_behaviours: dict[str, dict] = {} 

420 self._cli_command_functions_set: set[Callable[[], list[CLICommand]]] = set() 

421 self._cli_command_provider_name_set: set[str] = set() 

422 self._extra_link_class_name_set: set[str] = set() 

423 self._logging_class_name_set: set[str] = set() 

424 self._auth_manager_class_name_set: set[str] = set() 

425 self._auth_manager_without_check_set: set[tuple[str, str]] = set() 

426 self._secrets_backend_class_name_set: set[str] = set() 

427 self._executor_class_name_set: set[str] = set() 

428 self._executor_without_check_set: set[tuple[str, str]] = set() 

429 self._queue_class_name_set: set[str] = set() 

430 self._db_manager_class_name_set: set[str] = set() 

431 self._provider_configs: dict[str, dict[str, Any]] = {} 

432 self._trigger_info_set: set[TriggerInfo] = set() 

433 self._notification_info_set: set[NotificationInfo] = set() 

434 self._provider_schema_validator = _create_provider_info_schema_validator() 

435 self._customized_form_fields_schema_validator = ( 

436 _create_customized_form_field_behaviours_schema_validator() 

437 ) 

438 # Set of plugins contained in providers 

439 self._plugins_set: set[PluginInfo] = set() 

440 self._init_airflow_core_hooks() 

441 

442 self._runtime_manager = None 

443 

444 def __getattribute__(self, name: str): 

445 # Hacky but does the trick for now 

446 runtime_properties = { 

447 "hooks", 

448 "taskflow_decorators", 

449 "filesystem_module_names", 

450 "asset_factories", 

451 "asset_uri_handlers", 

452 "asset_to_openlineage_converters", 

453 } 

454 

455 if name in runtime_properties: 

456 warnings.warn( 

457 f"ProvidersManager.{name} is deprecated. Use ProvidersManagerTaskRuntime.{name} from task-sdk instead.", 

458 DeprecatedImportWarning, 

459 stacklevel=2, 

460 ) 

461 if object.__getattribute__(self, "_runtime_manager") is None: 

462 from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime 

463 

464 object.__setattr__(self, "_runtime_manager", ProvidersManagerTaskRuntime()) 

465 return getattr(object.__getattribute__(self, "_runtime_manager"), name) 

466 

467 return object.__getattribute__(self, name) 

468 

469 def _init_airflow_core_hooks(self): 

470 """Initialize the hooks dict with default hooks from Airflow core.""" 

471 core_dummy_hooks = { 

472 "generic": "Generic", 

473 "email": "Email", 

474 } 

475 for key, display in core_dummy_hooks.items(): 

476 self._hooks_lazy_dict[key] = HookInfo( 

477 hook_class_name=None, 

478 connection_id_attribute_name=None, 

479 package_name=None, 

480 hook_name=display, 

481 connection_type=None, 

482 connection_testable=False, 

483 ) 

484 for conn_type, class_name in ( 

485 ("fs", "airflow.providers.standard.hooks.filesystem.FSHook"), 

486 ("package_index", "airflow.providers.standard.hooks.package_index.PackageIndexHook"), 

487 ): 

488 self._hooks_lazy_dict[conn_type] = functools.partial( 

489 self._import_hook, 

490 connection_type=None, 

491 package_name="apache-airflow-providers-standard", 

492 hook_class_name=class_name, 

493 provider_info=None, 

494 ) 

495 

496 @provider_info_cache("list") 

497 def initialize_providers_list(self): 

498 """Lazy initialization of providers list.""" 

499 # Local source folders are loaded first. They should take precedence over the package ones for 

500 # Development purpose. In production provider.yaml files are not present in the 'airflow" directory 

501 # So there is no risk we are going to override package provider accidentally. This can only happen 

502 # in case of local development 

503 discover_all_providers_from_packages(self._provider_dict, self._provider_schema_validator) 

504 self._verify_all_providers_all_compatible() 

505 self._provider_dict = dict(sorted(self._provider_dict.items())) 

506 

507 def _verify_all_providers_all_compatible(self): 

508 from packaging import version as packaging_version 

509 

510 for provider_id, info in self._provider_dict.items(): 

511 min_version = MIN_PROVIDER_VERSIONS.get(provider_id) 

512 if min_version: 

513 if packaging_version.parse(min_version) > packaging_version.parse(info.version): 

514 log.warning( 

515 "The package %s is not compatible with this version of Airflow. " 

516 "The package has version %s but the minimum supported version " 

517 "of the package is %s", 

518 provider_id, 

519 info.version, 

520 min_version, 

521 ) 

522 

523 @provider_info_cache("hooks") 

524 def initialize_providers_hooks(self): 

525 """Lazy initialization of providers hooks.""" 

526 self._init_airflow_core_hooks() 

527 self.initialize_providers_list() 

528 self._discover_hooks() 

529 self._load_ui_metadata() 

530 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items())) 

531 

532 @provider_info_cache("filesystems") 

533 def initialize_providers_filesystems(self): 

534 """Lazy initialization of providers filesystems.""" 

535 self.initialize_providers_list() 

536 self._discover_filesystems() 

537 

538 @provider_info_cache("asset_uris") 

539 def initialize_providers_asset_uri_resources(self): 

540 """Lazy initialization of provider asset URI handlers, factories, converters etc.""" 

541 self.initialize_providers_list() 

542 self._discover_asset_uri_resources() 

543 

544 @provider_info_cache("hook_lineage_writers") 

545 @provider_info_cache("taskflow_decorators") 

546 def initialize_providers_taskflow_decorator(self): 

547 """Lazy initialization of providers hooks.""" 

548 self.initialize_providers_list() 

549 self._discover_taskflow_decorators() 

550 

551 @provider_info_cache("extra_links") 

552 def initialize_providers_extra_links(self): 

553 """Lazy initialization of providers extra links.""" 

554 self.initialize_providers_list() 

555 self._discover_extra_links() 

556 

557 @provider_info_cache("logging") 

558 def initialize_providers_logging(self): 

559 """Lazy initialization of providers logging information.""" 

560 self.initialize_providers_list() 

561 self._discover_logging() 

562 

563 @provider_info_cache("secrets_backends") 

564 def initialize_providers_secrets_backends(self): 

565 """Lazy initialization of providers secrets_backends information.""" 

566 self.initialize_providers_list() 

567 self._discover_secrets_backends() 

568 

569 @provider_info_cache("executors") 

570 def initialize_providers_executors(self): 

571 """Lazy initialization of providers executors information.""" 

572 self.initialize_providers_list() 

573 self._discover_executors(check=True) 

574 

575 @provider_info_cache("executors_without_check") 

576 def initialize_providers_executors_without_check(self): 

577 """Lazy initialization of providers executors information.""" 

578 self.initialize_providers_list() 

579 self._discover_executors(check=False) 

580 

581 @provider_info_cache("queues") 

582 def initialize_providers_queues(self): 

583 """Lazy initialization of providers queue information.""" 

584 self.initialize_providers_list() 

585 self._discover_queues() 

586 

587 @provider_info_cache("db_managers") 

588 def initialize_providers_db_managers(self): 

589 """Lazy initialization of providers db_managers information.""" 

590 self.initialize_providers_list() 

591 self._discover_db_managers() 

592 

593 @provider_info_cache("notifications") 

594 def initialize_providers_notifications(self): 

595 """Lazy initialization of providers notifications information.""" 

596 self.initialize_providers_list() 

597 self._discover_notifications() 

598 

599 @provider_info_cache("auth_managers") 

600 def initialize_providers_auth_managers(self): 

601 """Lazy initialization of providers auth manager information.""" 

602 self.initialize_providers_list() 

603 self._discover_auth_managers(check=True) 

604 

605 @provider_info_cache("auth_managers_without_check") 

606 def initialize_providers_auth_managers_without_check(self): 

607 """Lazy initialization of providers auth manager information.""" 

608 self.initialize_providers_list() 

609 self._discover_auth_managers(check=False) 

610 

611 @provider_info_cache("config") 

612 def initialize_providers_configuration(self): 

613 """Lazy initialization of provider configuration metadata and merge it into ``conf``.""" 

614 self.initialize_providers_list() 

615 self._discover_config() 

616 

617 @provider_info_cache("plugins") 

618 def initialize_providers_plugins(self): 

619 self.initialize_providers_list() 

620 self._discover_plugins() 

621 

622 @provider_info_cache("cli_command") 

623 def initialize_providers_cli_command(self): 

624 """Lazy initialization of providers CLI commands.""" 

625 self.initialize_providers_list() 

626 self._discover_cli_command() 

627 

628 def _discover_hooks_from_connection_types( 

629 self, 

630 hook_class_names_registered: set[str], 

631 already_registered_warning_connection_types: set[str], 

632 package_name: str, 

633 provider: ProviderInfo, 

634 ): 

635 """ 

636 Discover hooks from the "connection-types" property. 

637 

638 This is new, better method that replaces discovery from hook-class-names as it 

639 allows to lazy import individual Hook classes when they are accessed. 

640 The "connection-types" keeps information about both - connection type and class 

641 name so we can discover all connection-types without importing the classes. 

642 :param hook_class_names_registered: set of registered hook class names for this provider 

643 :param already_registered_warning_connection_types: set of connections for which warning should be 

644 printed in logs as they were already registered before 

645 :param package_name: 

646 :param provider: 

647 :return: 

648 """ 

649 provider_uses_connection_types = False 

650 connection_types = provider.data.get("connection-types") 

651 if connection_types: 

652 for connection_type_dict in connection_types: 

653 connection_type = connection_type_dict["connection-type"] 

654 hook_class_name = connection_type_dict["hook-class-name"] 

655 hook_class_names_registered.add(hook_class_name) 

656 already_registered = self._hook_provider_dict.get(connection_type) 

657 if already_registered: 

658 if already_registered.package_name != package_name: 

659 already_registered_warning_connection_types.add(connection_type) 

660 else: 

661 log.warning( 

662 "The connection type '%s' is already registered in the" 

663 " package '%s' with different class names: '%s' and '%s'. ", 

664 connection_type, 

665 package_name, 

666 already_registered.hook_class_name, 

667 hook_class_name, 

668 ) 

669 else: 

670 self._hook_provider_dict[connection_type] = HookClassProvider( 

671 hook_class_name=hook_class_name, package_name=package_name 

672 ) 

673 # Defer importing hook to access time by setting import hook method as dict value 

674 self._hooks_lazy_dict[connection_type] = functools.partial( 

675 self._import_hook, 

676 connection_type=connection_type, 

677 provider_info=provider, 

678 ) 

679 provider_uses_connection_types = True 

680 return provider_uses_connection_types 

681 

682 def _discover_hooks_from_hook_class_names( 

683 self, 

684 hook_class_names_registered: set[str], 

685 already_registered_warning_connection_types: set[str], 

686 package_name: str, 

687 provider: ProviderInfo, 

688 provider_uses_connection_types: bool, 

689 ): 

690 """ 

691 Discover hooks from "hook-class-names' property. 

692 

693 This property is deprecated but we should support it in Airflow 2. 

694 The hook-class-names array contained just Hook names without connection type, 

695 therefore we need to import all those classes immediately to know which connection types 

696 are supported. This makes it impossible to selectively only import those hooks that are used. 

697 :param already_registered_warning_connection_types: list of connection hooks that we should warn 

698 about when finished discovery 

699 :param package_name: name of the provider package 

700 :param provider: class that keeps information about version and details of the provider 

701 :param provider_uses_connection_types: determines whether the provider uses "connection-types" new 

702 form of passing connection types 

703 :return: 

704 """ 

705 hook_class_names = provider.data.get("hook-class-names") 

706 if hook_class_names: 

707 for hook_class_name in hook_class_names: 

708 if hook_class_name in hook_class_names_registered: 

709 # Silently ignore the hook class - it's already marked for lazy-import by 

710 # connection-types discovery 

711 continue 

712 hook_info = self._import_hook( 

713 connection_type=None, 

714 provider_info=provider, 

715 hook_class_name=hook_class_name, 

716 package_name=package_name, 

717 ) 

718 if not hook_info: 

719 # Problem why importing class - we ignore it. Log is written at import time 

720 continue 

721 already_registered = self._hook_provider_dict.get(hook_info.connection_type) 

722 if already_registered: 

723 if already_registered.package_name != package_name: 

724 already_registered_warning_connection_types.add(hook_info.connection_type) 

725 else: 

726 if already_registered.hook_class_name != hook_class_name: 

727 log.warning( 

728 "The hook connection type '%s' is registered twice in the" 

729 " package '%s' with different class names: '%s' and '%s'. " 

730 " Please fix it!", 

731 hook_info.connection_type, 

732 package_name, 

733 already_registered.hook_class_name, 

734 hook_class_name, 

735 ) 

736 else: 

737 self._hook_provider_dict[hook_info.connection_type] = HookClassProvider( 

738 hook_class_name=hook_class_name, package_name=package_name 

739 ) 

740 self._hooks_lazy_dict[hook_info.connection_type] = hook_info 

741 

742 if not provider_uses_connection_types: 

743 warnings.warn( 

744 f"The provider {package_name} uses `hook-class-names` " 

745 "property in provider-info and has no `connection-types` one. " 

746 "The 'hook-class-names' property has been deprecated in favour " 

747 "of 'connection-types' in Airflow 2.2. Use **both** in case you want to " 

748 "have backwards compatibility with Airflow < 2.2", 

749 DeprecationWarning, 

750 stacklevel=1, 

751 ) 

752 for already_registered_connection_type in already_registered_warning_connection_types: 

753 log.warning( 

754 "The connection_type '%s' has been already registered by provider '%s.'", 

755 already_registered_connection_type, 

756 self._hook_provider_dict[already_registered_connection_type].package_name, 

757 ) 

758 

759 def _discover_hooks(self) -> None: 

760 """Retrieve all connections defined in the providers via Hooks.""" 

761 for package_name, provider in self._provider_dict.items(): 

762 duplicated_connection_types: set[str] = set() 

763 hook_class_names_registered: set[str] = set() 

764 self._discover_provider_dialects(package_name, provider) 

765 provider_uses_connection_types = self._discover_hooks_from_connection_types( 

766 hook_class_names_registered, duplicated_connection_types, package_name, provider 

767 ) 

768 self._discover_hooks_from_hook_class_names( 

769 hook_class_names_registered, 

770 duplicated_connection_types, 

771 package_name, 

772 provider, 

773 provider_uses_connection_types, 

774 ) 

775 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items())) 

776 

777 def _discover_provider_dialects(self, provider_name: str, provider: ProviderInfo): 

778 dialects = provider.data.get("dialects", []) 

779 if dialects: 

780 self._dialect_provider_dict.update( 

781 { 

782 item["dialect-type"]: DialectInfo( 

783 name=item["dialect-type"], 

784 dialect_class_name=item["dialect-class-name"], 

785 provider_name=provider_name, 

786 ) 

787 for item in dialects 

788 } 

789 ) 

790 

791 @provider_info_cache("import_all_hooks") 

792 def _import_info_from_all_hooks(self): 

793 """Force-import all hooks and initialize the connections/fields.""" 

794 # Retrieve all hooks to make sure that all of them are imported 

795 _ = list(self._hooks_lazy_dict.values()) 

796 self._field_behaviours = dict(sorted(self._field_behaviours.items())) 

797 

798 # Widgets for connection forms are currently used in two places: 

799 # 1. In the UI Connections, expected same order that it defined in Hook. 

800 # 2. cli command - `airflow providers widgets` and expected that it in alphabetical order. 

801 # It is not possible to recover original ordering after sorting, 

802 # that the main reason why original sorting moved to cli part: 

803 # self._connection_form_widgets = dict(sorted(self._connection_form_widgets.items())) 

804 

805 def _discover_filesystems(self) -> None: 

806 """Retrieve all filesystems defined in the providers.""" 

807 for provider_package, provider in self._provider_dict.items(): 

808 for fs_module_name in provider.data.get("filesystems", []): 

809 if _correctness_check(provider_package, f"{fs_module_name}.get_fs", provider): 

810 self._fs_set.add(fs_module_name) 

811 self._fs_set = set(sorted(self._fs_set)) 

812 

813 def _discover_asset_uri_resources(self) -> None: 

814 """Discovers and registers asset URI handlers, factories, and converters for all providers.""" 

815 from airflow.sdk.definitions.asset import normalize_noop 

816 

817 def _safe_register_resource( 

818 provider_package_name: str, 

819 schemes_list: list[str], 

820 resource_path: str | None, 

821 resource_registry: dict, 

822 default_resource: Any = None, 

823 ): 

824 """ 

825 Register a specific resource (handler, factory, or converter) for the given schemes. 

826 

827 If the resolved resource (either from the path or the default) is valid, it updates 

828 the resource registry with the appropriate resource for each scheme. 

829 """ 

830 resource = ( 

831 _correctness_check(provider_package_name, resource_path, provider) 

832 if resource_path is not None 

833 else default_resource 

834 ) 

835 if resource: 

836 resource_registry.update((scheme, resource) for scheme in schemes_list) 

837 

838 for provider_name, provider in self._provider_dict.items(): 

839 for uri_info in provider.data.get("asset-uris", []): 

840 if "schemes" not in uri_info or "handler" not in uri_info: 

841 continue # Both schemas and handler must be explicitly set, handler can be set to null 

842 common_args = {"schemes_list": uri_info["schemes"], "provider_package_name": provider_name} 

843 _safe_register_resource( 

844 resource_path=uri_info["handler"], 

845 resource_registry=self._asset_uri_handlers, 

846 default_resource=normalize_noop, 

847 **common_args, 

848 ) 

849 _safe_register_resource( 

850 resource_path=uri_info.get("factory"), 

851 resource_registry=self._asset_factories, 

852 **common_args, 

853 ) 

854 _safe_register_resource( 

855 resource_path=uri_info.get("to_openlineage_converter"), 

856 resource_registry=self._asset_to_openlineage_converters, 

857 **common_args, 

858 ) 

859 

860 def _discover_taskflow_decorators(self) -> None: 

861 for name, info in self._provider_dict.items(): 

862 for taskflow_decorator in info.data.get("task-decorators", []): 

863 self._add_taskflow_decorator( 

864 taskflow_decorator["name"], taskflow_decorator["class-name"], name 

865 ) 

866 

867 def _add_taskflow_decorator(self, name, decorator_class_name: str, provider_package: str) -> None: 

868 if not _check_builtin_provider_prefix(provider_package, decorator_class_name): 

869 return 

870 

871 if name in self._taskflow_decorators: 

872 try: 

873 existing = self._taskflow_decorators[name] 

874 other_name = f"{existing.__module__}.{existing.__name__}" 

875 except Exception: 

876 # If problem importing, then get the value from the functools.partial 

877 other_name = self._taskflow_decorators._raw_dict[name].args[0] # type: ignore[attr-defined] 

878 

879 log.warning( 

880 "The taskflow decorator '%s' has been already registered (by %s).", 

881 name, 

882 other_name, 

883 ) 

884 return 

885 

886 self._taskflow_decorators[name] = functools.partial(import_string, decorator_class_name) 

887 

888 @staticmethod 

889 def _get_attr(obj: Any, attr_name: str): 

890 """Retrieve attributes of an object, or warn if not found.""" 

891 if not hasattr(obj, attr_name): 

892 log.warning("The object '%s' is missing %s attribute and cannot be registered", obj, attr_name) 

893 return None 

894 return getattr(obj, attr_name) 

895 

896 def _get_connection_type_config(self, provider_info: ProviderInfo, connection_type: str) -> dict | None: 

897 """Get connection type config from provider.yaml if it exists.""" 

898 connection_types = provider_info.data.get("connection-types", []) 

899 for conn_config in connection_types: 

900 if conn_config.get("connection-type") == connection_type: 

901 return conn_config 

902 return None 

903 

904 def _to_api_format(self, field_name: str, field_def: dict) -> dict: 

905 """Convert conn-fields definition to format expected by the API.""" 

906 schema_def = field_def.get("schema", {}) 

907 

908 # build schema dict with label moved to `title` per jsonschema convention 

909 schema = schema_def.copy() 

910 if "label" in field_def: 

911 schema["title"] = field_def.get("label") 

912 

913 return { 

914 "value": schema_def.get("default"), 

915 "schema": schema, 

916 "description": field_def.get("description"), 

917 "source": None, 

918 } 

919 

920 def _add_widgets( 

921 self, package_name: str, hook_class_name: str, connection_type: str, conn_fields: dict 

922 ) -> None: 

923 """Parse conn-fields from provider info and add to connection_form_widgets.""" 

924 for field_name, field_def in conn_fields.items(): 

925 field_data = self._to_api_format(field_name, field_def) 

926 

927 prefixed_name = f"extra__{connection_type}__{field_name}" 

928 if prefixed_name in self._connection_form_widgets: 

929 log.warning( 

930 "Field %s for connection type %s already added, skipping", 

931 field_name, 

932 connection_type, 

933 ) 

934 continue 

935 

936 schema_def = field_def.get("schema", {}) 

937 self._connection_form_widgets[prefixed_name] = ConnectionFormWidgetInfo( 

938 hook_class_name=hook_class_name, 

939 package_name=package_name, 

940 field=field_data, 

941 field_name=field_name, 

942 is_sensitive=schema_def.get("format") == "password", 

943 ) 

944 

945 def _add_customized_fields(self, package_name: str, connection_type: str, behaviour: dict) -> None: 

946 """Process ui-field-behaviour from provider info and add to field_behaviours.""" 

947 if connection_type in self._field_behaviours: 

948 log.warning( 

949 "Field behaviour for connection type %s already exists, skipping", 

950 connection_type, 

951 ) 

952 return 

953 

954 # convert kebab-case keys to python style 

955 customized_fields = { 

956 "hidden_fields": behaviour.get("hidden-fields", []), 

957 "relabeling": behaviour.get("relabeling", {}), 

958 "placeholders": behaviour.get("placeholders", {}), 

959 } 

960 

961 try: 

962 self._customized_form_fields_schema_validator.validate(customized_fields) 

963 customized_fields = _ensure_prefix_for_placeholders(customized_fields, connection_type) 

964 self._field_behaviours[connection_type] = customized_fields 

965 except Exception as e: 

966 log.warning( 

967 "Failed to add field behaviour for %s in package %s: %s", 

968 connection_type, 

969 package_name, 

970 e, 

971 ) 

972 

973 def _load_ui_metadata(self) -> None: 

974 """Load connection form UI metadata from provider info without importing hooks.""" 

975 for package_name, provider in self._provider_dict.items(): 

976 for conn_config in provider.data.get("connection-types", []): 

977 connection_type = conn_config.get("connection-type") 

978 hook_class_name = conn_config.get("hook-class-name") 

979 if not connection_type or not hook_class_name: 

980 continue 

981 

982 if conn_fields := conn_config.get("conn-fields"): 

983 self._add_widgets(package_name, hook_class_name, connection_type, conn_fields) 

984 

985 if behaviour := conn_config.get("ui-field-behaviour"): 

986 self._add_customized_fields(package_name, connection_type, behaviour) 

987 

988 def _import_hook( 

989 self, 

990 connection_type: str | None, 

991 provider_info: ProviderInfo, 

992 hook_class_name: str | None = None, 

993 package_name: str | None = None, 

994 ) -> HookInfo | None: 

995 """ 

996 Import hook and retrieve hook information. 

997 

998 Either connection_type (for lazy loading) or hook_class_name must be set - but not both). 

999 Only needs package_name if hook_class_name is passed (for lazy loading, package_name 

1000 is retrieved from _connection_type_class_provider_dict together with hook_class_name). 

1001 

1002 :param connection_type: type of the connection 

1003 :param hook_class_name: name of the hook class 

1004 :param package_name: provider package - only needed in case connection_type is missing 

1005 : return 

1006 """ 

1007 if connection_type is None and hook_class_name is None: 

1008 raise ValueError("Either connection_type or hook_class_name must be set") 

1009 if connection_type is not None and hook_class_name is not None: 

1010 raise ValueError( 

1011 f"Both connection_type ({connection_type} and " 

1012 f"hook_class_name {hook_class_name} are set. Only one should be set!" 

1013 ) 

1014 if connection_type is not None: 

1015 class_provider = self._hook_provider_dict[connection_type] 

1016 package_name = class_provider.package_name 

1017 hook_class_name = class_provider.hook_class_name 

1018 else: 

1019 if not hook_class_name: 

1020 raise ValueError("Either connection_type or hook_class_name must be set") 

1021 if not package_name: 

1022 raise ValueError( 

1023 f"Provider package name is not set when hook_class_name ({hook_class_name}) is used" 

1024 ) 

1025 hook_class: type[BaseHook] | None = _correctness_check(package_name, hook_class_name, provider_info) 

1026 if hook_class is None: 

1027 return None 

1028 

1029 # Check if provider info already has UI metadata and skip Python hook methods 

1030 # to avoid duplicate initialization and unnecessary wtforms imports 

1031 ui_metadata_loaded = False 

1032 if provider_info and connection_type: 

1033 conn_config = self._get_connection_type_config(provider_info, connection_type) 

1034 ui_metadata_loaded = conn_config is not None and bool( 

1035 conn_config.get("conn-fields") or conn_config.get("ui-field-behaviour") 

1036 ) 

1037 

1038 if not ui_metadata_loaded: 

1039 try: 

1040 from wtforms import BooleanField, IntegerField, PasswordField, StringField 

1041 

1042 allowed_field_classes = [IntegerField, PasswordField, StringField, BooleanField] 

1043 # Do not use attr here. We want to check only direct class fields not those 

1044 # inherited from parent hook. This way we add form fields only once for the whole 

1045 # hierarchy and we add it only from the parent hook that provides those! 

1046 if "get_connection_form_widgets" in hook_class.__dict__: 

1047 warning = AirflowProviderDeprecationWarning( 

1048 f"Hook '{hook_class_name}' defines get_connection_form_widgets(). " 

1049 "This method is deprecated. Define connection fields declaratively in " 

1050 "provider.yaml under 'conn-fields' instead. See " 

1051 "https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html" 

1052 ) 

1053 warning.deprecated_provider_since = "3.2.0" 

1054 warnings.warn(warning, stacklevel=2) 

1055 widgets = hook_class.get_connection_form_widgets() 

1056 if widgets: 

1057 for widget in widgets.values(): 

1058 if widget.field_class not in allowed_field_classes: 

1059 log.warning( 

1060 "The hook_class '%s' uses field of unsupported class '%s'. " 

1061 "Only '%s' field classes are supported", 

1062 hook_class_name, 

1063 widget.field_class, 

1064 allowed_field_classes, 

1065 ) 

1066 return None 

1067 self._add_widgets_from_hook(package_name, hook_class, widgets) 

1068 if "get_ui_field_behaviour" in hook_class.__dict__: 

1069 warning = AirflowProviderDeprecationWarning( 

1070 f"Hook '{hook_class_name}' defines get_ui_field_behaviour(). " 

1071 "This method is deprecated. Define field behaviour declaratively in " 

1072 "provider.yaml under 'ui-field-behaviour' instead. See " 

1073 "https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html" 

1074 ) 

1075 warning.deprecated_provider_since = "3.2.0" 

1076 warnings.warn(warning, stacklevel=2) 

1077 field_behaviours = hook_class.get_ui_field_behaviour() 

1078 if field_behaviours: 

1079 self._add_customized_fields_from_hook(package_name, hook_class, field_behaviours) 

1080 except ImportError as e: 

1081 if e.name in ["flask_appbuilder", "wtforms"]: 

1082 log.info( 

1083 "The hook_class '%s' is not fully initialized (UI widgets will be missing), because " 

1084 "the 'flask_appbuilder' package is not installed, however it is not required for " 

1085 "Airflow components to work", 

1086 hook_class_name, 

1087 ) 

1088 except Exception as e: 

1089 log.warning( 

1090 "Exception when importing '%s' from '%s' package: %s", 

1091 hook_class_name, 

1092 package_name, 

1093 e, 

1094 ) 

1095 return None 

1096 

1097 hook_connection_type = self._get_attr(hook_class, "conn_type") 

1098 if connection_type: 

1099 if hook_connection_type != connection_type: 

1100 log.warning( 

1101 "Inconsistency! The hook class '%s' declares connection type '%s'" 

1102 " but it is added by provider '%s' as connection_type '%s' in provider info. " 

1103 "This should be fixed!", 

1104 hook_class, 

1105 hook_connection_type, 

1106 package_name, 

1107 connection_type, 

1108 ) 

1109 connection_type = hook_connection_type 

1110 connection_id_attribute_name: str = self._get_attr(hook_class, "conn_name_attr") 

1111 hook_name: str = self._get_attr(hook_class, "hook_name") 

1112 

1113 if not connection_type or not connection_id_attribute_name or not hook_name: 

1114 log.warning( 

1115 "The hook misses one of the key attributes: " 

1116 "conn_type: %s, conn_id_attribute_name: %s, hook_name: %s", 

1117 connection_type, 

1118 connection_id_attribute_name, 

1119 hook_name, 

1120 ) 

1121 return None 

1122 

1123 return HookInfo( 

1124 hook_class_name=hook_class_name, 

1125 connection_id_attribute_name=connection_id_attribute_name, 

1126 package_name=package_name, 

1127 hook_name=hook_name, 

1128 connection_type=connection_type, 

1129 connection_testable=hasattr(hook_class, "test_connection"), 

1130 ) 

1131 

1132 def _add_widgets_from_hook(self, package_name: str, hook_class: type, widgets: dict[str, Any]): 

1133 conn_type = hook_class.conn_type # type: ignore 

1134 for field_identifier, field in widgets.items(): 

1135 if field_identifier.startswith("extra__"): 

1136 prefixed_field_name = field_identifier 

1137 else: 

1138 prefixed_field_name = f"extra__{conn_type}__{field_identifier}" 

1139 if prefixed_field_name in self._connection_form_widgets: 

1140 log.warning( 

1141 "The field %s from class %s has already been added by another provider. Ignoring it.", 

1142 field_identifier, 

1143 hook_class.__name__, 

1144 ) 

1145 # In case of inherited hooks this might be happening several times 

1146 else: 

1147 self._connection_form_widgets[prefixed_field_name] = ConnectionFormWidgetInfo( 

1148 hook_class.__name__, 

1149 package_name, 

1150 field, 

1151 field_identifier, 

1152 hasattr(field.field_class.widget, "input_type") 

1153 and field.field_class.widget.input_type == "password", 

1154 ) 

1155 

1156 def _add_customized_fields_from_hook(self, package_name: str, hook_class: type, customized_fields: dict): 

1157 try: 

1158 connection_type = getattr(hook_class, "conn_type") 

1159 

1160 self._customized_form_fields_schema_validator.validate(customized_fields) 

1161 

1162 if connection_type: 

1163 customized_fields = _ensure_prefix_for_placeholders(customized_fields, connection_type) 

1164 

1165 if connection_type in self._field_behaviours: 

1166 log.warning( 

1167 "The connection_type %s from package %s and class %s has already been added " 

1168 "by another provider. Ignoring it.", 

1169 connection_type, 

1170 package_name, 

1171 hook_class.__name__, 

1172 ) 

1173 return 

1174 self._field_behaviours[connection_type] = customized_fields 

1175 except Exception as e: 

1176 log.warning( 

1177 "Error when loading customized fields from package '%s' hook class '%s': %s", 

1178 package_name, 

1179 hook_class.__name__, 

1180 e, 

1181 ) 

1182 

1183 def _discover_auth_managers(self, *, check: bool) -> None: 

1184 """Retrieve all auth managers defined in the providers.""" 

1185 for provider_package, provider in self._provider_dict.items(): 

1186 if provider.data.get("auth-managers"): 

1187 for auth_manager_class_name in provider.data["auth-managers"]: 

1188 if not check: 

1189 self._auth_manager_without_check_set.add((auth_manager_class_name, provider_package)) 

1190 elif _correctness_check(provider_package, auth_manager_class_name, provider): 

1191 self._auth_manager_class_name_set.add(auth_manager_class_name) 

1192 

1193 def _discover_cli_command(self) -> None: 

1194 """Retrieve all CLI command functions defined in the providers.""" 

1195 for provider_package, provider in self._provider_dict.items(): 

1196 if provider.data.get("cli"): 

1197 for cli_command_function_name in provider.data["cli"]: 

1198 # _correctness_check will return the function if found and correct 

1199 # we store the function itself instead of its name to avoid importing it again later in cli_parser to speed up cli loading 

1200 if cli_func := _correctness_check(provider_package, cli_command_function_name, provider): 

1201 cli_func = cast("Callable[[], list[CLICommand]]", cli_func) 

1202 self._cli_command_functions_set.add(cli_func) 

1203 self._cli_command_provider_name_set.add(provider_package) 

1204 

1205 def _discover_notifications(self) -> None: 

1206 """Retrieve all notifications defined in the providers.""" 

1207 for provider_package, provider in self._provider_dict.items(): 

1208 if provider.data.get("notifications"): 

1209 for notification_class_name in provider.data["notifications"]: 

1210 if _correctness_check(provider_package, notification_class_name, provider): 

1211 self._notification_info_set.add(notification_class_name) 

1212 

1213 def _discover_extra_links(self) -> None: 

1214 """Retrieve all extra links defined in the providers.""" 

1215 for provider_package, provider in self._provider_dict.items(): 

1216 if provider.data.get("extra-links"): 

1217 for extra_link_class_name in provider.data["extra-links"]: 

1218 if _correctness_check(provider_package, extra_link_class_name, provider): 

1219 self._extra_link_class_name_set.add(extra_link_class_name) 

1220 

1221 def _discover_logging(self) -> None: 

1222 """Retrieve all logging defined in the providers.""" 

1223 for provider_package, provider in self._provider_dict.items(): 

1224 if provider.data.get("logging"): 

1225 for logging_class_name in provider.data["logging"]: 

1226 if _correctness_check(provider_package, logging_class_name, provider): 

1227 self._logging_class_name_set.add(logging_class_name) 

1228 

1229 def _discover_secrets_backends(self) -> None: 

1230 """Retrieve all secrets backends defined in the providers.""" 

1231 for provider_package, provider in self._provider_dict.items(): 

1232 if provider.data.get("secrets-backends"): 

1233 for secrets_backends_class_name in provider.data["secrets-backends"]: 

1234 if _correctness_check(provider_package, secrets_backends_class_name, provider): 

1235 self._secrets_backend_class_name_set.add(secrets_backends_class_name) 

1236 

1237 def _discover_executors(self, *, check: bool) -> None: 

1238 """Retrieve all executors defined in the providers.""" 

1239 for provider_package, provider in self._provider_dict.items(): 

1240 if provider.data.get("executors"): 

1241 for executors_class_path in provider.data["executors"]: 

1242 if not check: 

1243 self._executor_without_check_set.add((executors_class_path, provider_package)) 

1244 elif _correctness_check(provider_package, executors_class_path, provider): 

1245 self._executor_class_name_set.add(executors_class_path) 

1246 

1247 def _discover_queues(self) -> None: 

1248 """Retrieve all queues defined in the providers.""" 

1249 for provider_package, provider in self._provider_dict.items(): 

1250 if provider.data.get("queues"): 

1251 for queue_class_name in provider.data["queues"]: 

1252 if _correctness_check(provider_package, queue_class_name, provider): 

1253 self._queue_class_name_set.add(queue_class_name) 

1254 

1255 def _discover_db_managers(self) -> None: 

1256 """Retrieve all DB managers defined in the providers.""" 

1257 for provider_package, provider in self._provider_dict.items(): 

1258 if provider.data.get("db-managers"): 

1259 for db_manager_class_name in provider.data["db-managers"]: 

1260 if _correctness_check(provider_package, db_manager_class_name, provider): 

1261 self._db_manager_class_name_set.add(db_manager_class_name) 

1262 

1263 def _discover_config(self) -> None: 

1264 """Retrieve all configs defined in the providers.""" 

1265 for provider_package, provider in self._provider_dict.items(): 

1266 if provider.data.get("config"): 

1267 self._provider_configs[provider_package] = provider.data.get("config") # type: ignore[assignment] 

1268 

1269 def _discover_plugins(self) -> None: 

1270 """Retrieve all plugins defined in the providers.""" 

1271 for provider_package, provider in self._provider_dict.items(): 

1272 for plugin_dict in provider.data.get("plugins", ()): 

1273 if not _correctness_check(provider_package, plugin_dict["plugin-class"], provider): 

1274 log.warning("Plugin not loaded due to above correctness check problem.") 

1275 continue 

1276 self._plugins_set.add( 

1277 PluginInfo( 

1278 name=plugin_dict["name"], 

1279 plugin_class=plugin_dict["plugin-class"], 

1280 provider_name=provider_package, 

1281 ) 

1282 ) 

1283 

1284 @provider_info_cache("triggers") 

1285 def initialize_providers_triggers(self): 

1286 """Initialize providers triggers.""" 

1287 self.initialize_providers_list() 

1288 for provider_package, provider in self._provider_dict.items(): 

1289 for trigger in provider.data.get("triggers", []): 

1290 for trigger_class_name in trigger.get("python-modules"): 

1291 self._trigger_info_set.add( 

1292 TriggerInfo( 

1293 package_name=provider_package, 

1294 trigger_class_name=trigger_class_name, 

1295 integration_name=trigger.get("integration-name", ""), 

1296 ) 

1297 ) 

1298 

1299 @property 

1300 def auth_managers(self) -> list[str]: 

1301 """Returns information about available providers notifications class.""" 

1302 self.initialize_providers_auth_managers() 

1303 return sorted(self._auth_manager_class_name_set) 

1304 

1305 @property 

1306 def auth_manager_without_check(self) -> set[tuple[str, str]]: 

1307 """Returns set of (auth manager class names, provider package name) without correctness check.""" 

1308 self.initialize_providers_auth_managers_without_check() 

1309 return self._auth_manager_without_check_set 

1310 

1311 @property 

1312 def cli_command_functions(self) -> set[Callable[[], list[CLICommand]]]: 

1313 """Returns list of CLI command function names from providers.""" 

1314 self.initialize_providers_cli_command() 

1315 return self._cli_command_functions_set 

1316 

1317 @property 

1318 def cli_command_providers(self) -> set[str]: 

1319 """Returns set of provider package names that provide CLI commands.""" 

1320 self.initialize_providers_cli_command() 

1321 return self._cli_command_provider_name_set 

1322 

1323 @property 

1324 def notification(self) -> list[NotificationInfo]: 

1325 """Returns information about available providers notifications class.""" 

1326 self.initialize_providers_notifications() 

1327 return sorted(self._notification_info_set) 

1328 

1329 @property 

1330 def trigger(self) -> list[TriggerInfo]: 

1331 """Returns information about available providers trigger class.""" 

1332 self.initialize_providers_triggers() 

1333 return sorted(self._trigger_info_set, key=lambda x: x.package_name) 

1334 

1335 @property 

1336 def providers(self) -> dict[str, ProviderInfo]: 

1337 """Returns information about available providers.""" 

1338 self.initialize_providers_list() 

1339 return self._provider_dict 

1340 

1341 @property 

1342 def hooks(self) -> MutableMapping[str, HookInfo | None]: 

1343 """ 

1344 Return dictionary of connection_type-to-hook mapping. 

1345 

1346 Note that the dict can contain None values if a hook discovered cannot be imported! 

1347 """ 

1348 self.initialize_providers_hooks() 

1349 # When we return hooks here it will only be used to retrieve hook information 

1350 return self._hooks_lazy_dict 

1351 

1352 @property 

1353 def dialects(self) -> MutableMapping[str, DialectInfo]: 

1354 """Return dictionary of connection_type-to-dialect mapping.""" 

1355 self.initialize_providers_hooks() 

1356 # When we return dialects here it will only be used to retrieve dialect information 

1357 return self._dialect_provider_dict 

1358 

1359 @property 

1360 def plugins(self) -> list[PluginInfo]: 

1361 """Returns information about plugins available in providers.""" 

1362 self.initialize_providers_plugins() 

1363 return sorted(self._plugins_set, key=lambda x: x.plugin_class) 

1364 

1365 @property 

1366 def taskflow_decorators(self) -> dict[str, TaskDecorator]: 

1367 self.initialize_providers_taskflow_decorator() 

1368 return self._taskflow_decorators # type: ignore[return-value] 

1369 

1370 @property 

1371 def extra_links_class_names(self) -> list[str]: 

1372 """Returns set of extra link class names.""" 

1373 self.initialize_providers_extra_links() 

1374 return sorted(self._extra_link_class_name_set) 

1375 

1376 @property 

1377 def connection_form_widgets(self) -> dict[str, ConnectionFormWidgetInfo]: 

1378 """ 

1379 Returns widgets for connection forms. 

1380 

1381 Dictionary keys in the same order that it defined in Hook. 

1382 """ 

1383 self.initialize_providers_hooks() 

1384 self._import_info_from_all_hooks() 

1385 return self._connection_form_widgets 

1386 

1387 @property 

1388 def field_behaviours(self) -> dict[str, dict]: 

1389 """Returns dictionary with field behaviours for connection types.""" 

1390 self.initialize_providers_hooks() 

1391 self._import_info_from_all_hooks() 

1392 return self._field_behaviours 

1393 

1394 @property 

1395 def logging_class_names(self) -> list[str]: 

1396 """Returns set of log task handlers class names.""" 

1397 self.initialize_providers_logging() 

1398 return sorted(self._logging_class_name_set) 

1399 

1400 @property 

1401 def secrets_backend_class_names(self) -> list[str]: 

1402 """Returns set of secret backend class names.""" 

1403 self.initialize_providers_secrets_backends() 

1404 return sorted(self._secrets_backend_class_name_set) 

1405 

1406 @property 

1407 def executor_class_names(self) -> list[str]: 

1408 self.initialize_providers_executors() 

1409 return sorted(self._executor_class_name_set) 

1410 

1411 @property 

1412 def executor_without_check(self) -> set[tuple[str, str]]: 

1413 """Returns set of (executor class names, provider package name) without correctness check.""" 

1414 self.initialize_providers_executors_without_check() 

1415 return self._executor_without_check_set 

1416 

1417 @property 

1418 def queue_class_names(self) -> list[str]: 

1419 self.initialize_providers_queues() 

1420 return sorted(self._queue_class_name_set) 

1421 

1422 @property 

1423 def db_managers(self) -> list[str]: 

1424 self.initialize_providers_db_managers() 

1425 return sorted(self._db_manager_class_name_set) 

1426 

1427 @property 

1428 def filesystem_module_names(self) -> list[str]: 

1429 self.initialize_providers_filesystems() 

1430 return sorted(self._fs_set) 

1431 

1432 @property 

1433 def asset_factories(self) -> dict[str, Callable[..., Asset]]: 

1434 self.initialize_providers_asset_uri_resources() 

1435 return self._asset_factories 

1436 

1437 @property 

1438 def asset_uri_handlers(self) -> dict[str, Callable[[SplitResult], SplitResult]]: 

1439 self.initialize_providers_asset_uri_resources() 

1440 return self._asset_uri_handlers 

1441 

1442 @property 

1443 def asset_to_openlineage_converters( 

1444 self, 

1445 ) -> dict[str, Callable]: 

1446 self.initialize_providers_asset_uri_resources() 

1447 return self._asset_to_openlineage_converters 

1448 

1449 @property 

1450 def provider_configs(self) -> list[tuple[str, dict[str, Any]]]: 

1451 self.initialize_providers_configuration() 

1452 return sorted(self._provider_configs.items(), key=lambda x: x[0]) 

1453 

1454 @property 

1455 def already_initialized_provider_configs(self) -> list[tuple[str, dict[str, Any]]]: 

1456 """ 

1457 Return provider configs that have already been initialized. 

1458 

1459 .. deprecated:: 3.2.0 

1460 Use ``provider_configs`` instead. This property is kept for backwards 

1461 compatibility and will be removed in a future version. 

1462 """ 

1463 warnings.warn( 

1464 "already_initialized_provider_configs is deprecated. Use `provider_configs` instead.", 

1465 DeprecationWarning, 

1466 stacklevel=2, 

1467 ) 

1468 return sorted(self._provider_configs.items(), key=lambda x: x[0]) 

1469 

1470 def _cleanup(self): 

1471 self._initialized_cache.clear() 

1472 self._provider_dict.clear() 

1473 self._fs_set.clear() 

1474 self._taskflow_decorators.clear() 

1475 self._hook_provider_dict.clear() 

1476 self._dialect_provider_dict.clear() 

1477 self._hooks_lazy_dict.clear() 

1478 self._connection_form_widgets.clear() 

1479 self._field_behaviours.clear() 

1480 self._extra_link_class_name_set.clear() 

1481 self._logging_class_name_set.clear() 

1482 self._auth_manager_class_name_set.clear() 

1483 self._auth_manager_without_check_set.clear() 

1484 self._secrets_backend_class_name_set.clear() 

1485 self._executor_class_name_set.clear() 

1486 self._executor_without_check_set.clear() 

1487 self._queue_class_name_set.clear() 

1488 self._provider_configs.clear() 

1489 

1490 # Imported lazily to avoid a configuration/providers_manager import cycle during cleanup. 

1491 from airflow.configuration import conf 

1492 

1493 conf.invalidate_cache() 

1494 

1495 self._trigger_info_set.clear() 

1496 self._notification_info_set.clear() 

1497 self._plugins_set.clear() 

1498 self._cli_command_functions_set.clear() 

1499 self._cli_command_provider_name_set.clear() 

1500 

1501 self._initialized = False 

1502 self._initialization_stack_trace = None