Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/providers_manager.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

822 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Manages all providers.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import functools 

24import inspect 

25import json 

26import logging 

27import traceback 

28import warnings 

29from collections.abc import Callable, Iterator, MutableMapping 

30from dataclasses import dataclass 

31from functools import wraps 

32from importlib.resources import files as resource_files 

33from time import perf_counter 

34from typing import TYPE_CHECKING, Any, NamedTuple, ParamSpec, TypeVar, cast 

35 

36from airflow import DeprecatedImportWarning 

37from airflow._shared.module_loading import import_string 

38from airflow._shared.providers_discovery import discover_all_providers_from_packages 

39from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning 

40from airflow.utils.log.logging_mixin import LoggingMixin 

41 

42if TYPE_CHECKING: 

43 from airflow.cli.cli_config import CLICommand 

44 

45log = logging.getLogger(__name__) 

46 

47 

48PS = ParamSpec("PS") 

49RT = TypeVar("RT") 

50 

51MIN_PROVIDER_VERSIONS = { 

52 "apache-airflow-providers-celery": "2.1.0", 

53} 

54 

55 

56def _ensure_prefix_for_placeholders(field_behaviors: dict[str, Any], conn_type: str): 

57 """ 

58 Verify the correct placeholder prefix. 

59 

60 If the given field_behaviors dict contains a placeholder's node, and there 

61 are placeholders for extra fields (i.e. anything other than the built-in conn 

62 attrs), and if those extra fields are unprefixed, then add the prefix. 

63 

64 The reason we need to do this is, all custom conn fields live in the same dictionary, 

65 so we need to namespace them with a prefix internally. But for user convenience, 

66 and consistency between the `get_ui_field_behaviour` method and the extra dict itself, 

67 we allow users to supply the unprefixed name. 

68 """ 

69 conn_attrs = {"host", "schema", "login", "password", "port", "extra"} 

70 

71 def ensure_prefix(field): 

72 if field not in conn_attrs and not field.startswith("extra__"): 

73 return f"extra__{conn_type}__{field}" 

74 return field 

75 

76 if "placeholders" in field_behaviors: 

77 placeholders = field_behaviors["placeholders"] 

78 field_behaviors["placeholders"] = {ensure_prefix(k): v for k, v in placeholders.items()} 

79 

80 return field_behaviors 

81 

82 

83if TYPE_CHECKING: 

84 from urllib.parse import SplitResult 

85 

86 from airflow.sdk import BaseHook 

87 from airflow.sdk.bases.decorator import TaskDecorator 

88 from airflow.sdk.definitions.asset import Asset 

89 

90 

91class LazyDictWithCache(MutableMapping): 

92 """ 

93 Lazy-loaded cached dictionary. 

94 

95 Dictionary, which in case you set callable, executes the passed callable with `key` attribute 

96 at first use - and returns and caches the result. 

97 """ 

98 

99 __slots__ = ["_resolved", "_raw_dict"] 

100 

101 def __init__(self, *args, **kw): 

102 self._resolved = set() 

103 self._raw_dict = dict(*args, **kw) 

104 

105 def __setitem__(self, key, value): 

106 self._raw_dict.__setitem__(key, value) 

107 

108 def __getitem__(self, key): 

109 value = self._raw_dict.__getitem__(key) 

110 if key not in self._resolved and callable(value): 

111 # exchange callable with result of calling it -- but only once! allow resolver to return a 

112 # callable itself 

113 value = value() 

114 self._resolved.add(key) 

115 self._raw_dict.__setitem__(key, value) 

116 return value 

117 

118 def __delitem__(self, key): 

119 with contextlib.suppress(KeyError): 

120 self._resolved.remove(key) 

121 self._raw_dict.__delitem__(key) 

122 

123 def __iter__(self): 

124 return iter(self._raw_dict) 

125 

126 def __len__(self): 

127 return len(self._raw_dict) 

128 

129 def __contains__(self, key): 

130 return key in self._raw_dict 

131 

132 def clear(self): 

133 self._resolved.clear() 

134 self._raw_dict.clear() 

135 

136 

137def _read_schema_from_resources_or_local_file(filename: str) -> dict: 

138 try: 

139 with resource_files("airflow").joinpath(filename).open("rb") as f: 

140 schema = json.load(f) 

141 except (TypeError, FileNotFoundError): 

142 import pathlib 

143 

144 with (pathlib.Path(__file__).parent / filename).open("rb") as f: 

145 schema = json.load(f) 

146 return schema 

147 

148 

149def _create_provider_info_schema_validator(): 

150 """Create JSON schema validator from the provider_info.schema.json.""" 

151 import jsonschema 

152 

153 schema = _read_schema_from_resources_or_local_file("provider_info.schema.json") 

154 cls = jsonschema.validators.validator_for(schema) 

155 validator = cls(schema) 

156 return validator 

157 

158 

159def _create_customized_form_field_behaviours_schema_validator(): 

160 """Create JSON schema validator from the customized_form_field_behaviours.schema.json.""" 

161 import jsonschema 

162 

163 schema = _read_schema_from_resources_or_local_file("customized_form_field_behaviours.schema.json") 

164 cls = jsonschema.validators.validator_for(schema) 

165 validator = cls(schema) 

166 return validator 

167 

168 

169def _check_builtin_provider_prefix(provider_package: str, class_name: str) -> bool: 

170 if provider_package.startswith("apache-airflow"): 

171 provider_path = provider_package[len("apache-") :].replace("-", ".") 

172 if not class_name.startswith(provider_path): 

173 log.warning( 

174 "Coherence check failed when importing '%s' from '%s' package. It should start with '%s'", 

175 class_name, 

176 provider_package, 

177 provider_path, 

178 ) 

179 return False 

180 return True 

181 

182 

183@dataclass 

184class ProviderInfo: 

185 """ 

186 Provider information. 

187 

188 :param version: version string 

189 :param data: dictionary with information about the provider 

190 """ 

191 

192 version: str 

193 data: dict 

194 

195 

196class HookClassProvider(NamedTuple): 

197 """Hook class and Provider it comes from.""" 

198 

199 hook_class_name: str 

200 package_name: str 

201 

202 

203class DialectInfo(NamedTuple): 

204 """Dialect class and Provider it comes from.""" 

205 

206 name: str 

207 dialect_class_name: str 

208 provider_name: str 

209 

210 

211class TriggerInfo(NamedTuple): 

212 """Trigger class and provider it comes from.""" 

213 

214 trigger_class_name: str 

215 package_name: str 

216 integration_name: str 

217 

218 

219class NotificationInfo(NamedTuple): 

220 """Notification class and provider it comes from.""" 

221 

222 notification_class_name: str 

223 package_name: str 

224 

225 

226class RemoteLoggingInfo(NamedTuple): 

227 """Remote logging IO handler registered by a provider.""" 

228 

229 classpath: str 

230 scheme: str 

231 package_name: str 

232 

233 

234class PluginInfo(NamedTuple): 

235 """Plugin class, name and provider it comes from.""" 

236 

237 name: str 

238 plugin_class: str 

239 provider_name: str 

240 

241 

242class HookInfo(NamedTuple): 

243 """Hook information.""" 

244 

245 hook_class_name: str 

246 connection_id_attribute_name: str 

247 package_name: str 

248 hook_name: str 

249 connection_type: str 

250 connection_testable: bool 

251 dialects: list[str] = [] 

252 

253 

254class ConnectionTypeHookUIMetadata(NamedTuple): 

255 """Hook metadata for one connection type (connection UI); ``field_behaviour`` is standard fields.""" 

256 

257 connection_type: str 

258 hook_name: str 

259 hook_class_name: str | None 

260 field_behaviour: dict | None 

261 

262 

263class ConnectionFormWidgetInfo(NamedTuple): 

264 """Connection Form Widget information.""" 

265 

266 hook_class_name: str 

267 package_name: str 

268 field: Any 

269 field_name: str 

270 is_sensitive: bool 

271 

272 

273def log_optional_feature_disabled(class_name, e, provider_package): 

274 """Log optional feature disabled.""" 

275 log.debug( 

276 "Optional feature disabled on exception when importing '%s' from '%s' package", 

277 class_name, 

278 provider_package, 

279 exc_info=e, 

280 ) 

281 log.info( 

282 "Optional provider feature disabled when importing '%s' from '%s' package", 

283 class_name, 

284 provider_package, 

285 ) 

286 

287 

288def log_import_warning(class_name, e, provider_package): 

289 """Log import warning.""" 

290 log.warning( 

291 "Exception when importing '%s' from '%s' package", 

292 class_name, 

293 provider_package, 

294 exc_info=e, 

295 ) 

296 

297 

298# This is a temporary measure until all community providers will add AirflowOptionalProviderFeatureException 

299# where they have optional features. We are going to add tests in our CI to catch all such cases and will 

300# fix them, but until now all "known unhandled optional feature errors" from community providers 

301# should be added here 

302KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS = [("apache-airflow-providers-google", "No module named 'paramiko'")] 

303 

304 

305def _correctness_check(provider_package: str, class_name: str, provider_info: ProviderInfo) -> Any: 

306 """ 

307 Perform coherence check on provider classes. 

308 

309 For apache-airflow providers - it checks if it starts with appropriate package. For all providers 

310 it tries to import the provider - checking that there are no exceptions during importing. 

311 It logs appropriate warning in case it detects any problems. 

312 

313 :param provider_package: name of the provider package 

314 :param class_name: name of the class to import 

315 

316 :return the class if the class is OK, None otherwise. 

317 """ 

318 if not _check_builtin_provider_prefix(provider_package, class_name): 

319 return None 

320 try: 

321 imported_class = import_string(class_name) 

322 except AirflowOptionalProviderFeatureException as e: 

323 # When the provider class raises AirflowOptionalProviderFeatureException 

324 # this is an expected case when only some classes in provider are 

325 # available. We just log debug level here and print info message in logs so that 

326 # the user is aware of it 

327 log_optional_feature_disabled(class_name, e, provider_package) 

328 return None 

329 except ImportError as e: 

330 if "No module named 'airflow.providers." in e.msg: 

331 # handle cases where another provider is missing. This can only happen if 

332 # there is an optional feature, so we log debug and print information about it 

333 log_optional_feature_disabled(class_name, e, provider_package) 

334 return None 

335 for known_error in KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS: 

336 # Until we convert all providers to use AirflowOptionalProviderFeatureException 

337 # we assume any problem with importing another "provider" is because this is an 

338 # optional feature, so we log debug and print information about it 

339 if known_error[0] == provider_package and known_error[1] in e.msg: 

340 log_optional_feature_disabled(class_name, e, provider_package) 

341 return None 

342 # But when we have no idea - we print warning to logs 

343 log_import_warning(class_name, e, provider_package) 

344 return None 

345 except Exception as e: 

346 log_import_warning(class_name, e, provider_package) 

347 return None 

348 return imported_class 

349 

350 

351# We want to have better control over initialization of parameters and be able to debug and test it 

352# So we add our own decorator 

353def provider_info_cache(cache_name: str) -> Callable[[Callable[PS, None]], Callable[PS, None]]: 

354 """ 

355 Decorate and cache provider info. 

356 

357 Decorator factory that create decorator that caches initialization of provider's parameters 

358 :param cache_name: Name of the cache 

359 """ 

360 

361 def provider_info_cache_decorator(func: Callable[PS, None]) -> Callable[PS, None]: 

362 @wraps(func) 

363 def wrapped_function(*args: PS.args, **kwargs: PS.kwargs) -> None: 

364 providers_manager_instance = args[0] 

365 if TYPE_CHECKING: 

366 assert isinstance(providers_manager_instance, ProvidersManager) 

367 

368 if cache_name in providers_manager_instance._initialized_cache: 

369 return 

370 start_time = perf_counter() 

371 log.debug("Initializing Providers Manager[%s]", cache_name) 

372 func(*args, **kwargs) 

373 providers_manager_instance._initialized_cache[cache_name] = True 

374 log.debug( 

375 "Initialization of Providers Manager[%s] took %.2f seconds", 

376 cache_name, 

377 perf_counter() - start_time, 

378 ) 

379 

380 return wrapped_function 

381 

382 return provider_info_cache_decorator 

383 

384 

385class ProvidersManager(LoggingMixin): 

386 """ 

387 Manages all provider distributions. 

388 

389 This is a Singleton class. The first time it is 

390 instantiated, it discovers all available providers in installed packages. 

391 """ 

392 

393 resource_version = "0" 

394 _initialized: bool = False 

395 _initialization_stack_trace = None 

396 _instance: ProvidersManager | None = None 

397 

398 def __new__(cls): 

399 if cls._instance is None: 

400 cls._instance = super().__new__(cls) 

401 return cls._instance 

402 

403 @staticmethod 

404 def initialized() -> bool: 

405 return ProvidersManager._initialized 

406 

407 @staticmethod 

408 def initialization_stack_trace() -> str | None: 

409 return ProvidersManager._initialization_stack_trace 

410 

411 def __init__(self): 

412 """Initialize the manager.""" 

413 # skip initialization if already initialized 

414 if self.initialized(): 

415 return 

416 

417 super().__init__() 

418 ProvidersManager._initialized = True 

419 ProvidersManager._initialization_stack_trace = "".join(traceback.format_stack(inspect.currentframe())) 

420 self._initialized_cache: dict[str, bool] = {} 

421 # Keeps dict of providers keyed by module name 

422 self._provider_dict: dict[str, ProviderInfo] = {} 

423 self._fs_set: set[str] = set() 

424 self._asset_uri_handlers: dict[str, Callable[[SplitResult], SplitResult]] = {} 

425 self._asset_factories: dict[str, Callable[..., Asset]] = {} 

426 self._asset_to_openlineage_converters: dict[str, Callable] = {} 

427 self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache() 

428 # keeps mapping between connection_types and hook class, package they come from 

429 self._hook_provider_dict: dict[str, HookClassProvider] = {} 

430 self._dialect_provider_dict: dict[str, DialectInfo] = {} 

431 # Keeps dict of hooks keyed by connection type. They are lazy evaluated at access time 

432 self._hooks_lazy_dict: LazyDictWithCache[str, HookInfo | Callable] = LazyDictWithCache() 

433 # Keeps hook display names read from provider.yaml (hook-name field) 

434 self._hook_name_dict: dict[str, str] = {} 

435 # Keeps methods that should be used to add custom widgets tuple of keyed by name of the extra field 

436 self._connection_form_widgets: dict[str, ConnectionFormWidgetInfo] = {} 

437 # Customizations for javascript fields are kept here 

438 self._field_behaviours: dict[str, dict] = {} 

439 self._cli_command_functions_set: set[Callable[[], list[CLICommand]]] = set() 

440 self._cli_command_provider_name_set: set[str] = set() 

441 self._extra_link_class_name_set: set[str] = set() 

442 self._logging_class_name_set: set[str] = set() 

443 self._remote_logging_info_list: list[RemoteLoggingInfo] = [] 

444 self._remote_logging_by_scheme: dict[str, RemoteLoggingInfo] = {} 

445 self._auth_manager_class_name_set: set[str] = set() 

446 self._auth_manager_without_check_set: set[tuple[str, str]] = set() 

447 self._secrets_backend_class_name_set: set[str] = set() 

448 self._executor_class_name_set: set[str] = set() 

449 self._executor_without_check_set: set[tuple[str, str]] = set() 

450 self._queue_class_name_set: set[str] = set() 

451 self._db_manager_class_name_set: set[str] = set() 

452 self._provider_configs: dict[str, dict[str, Any]] = {} 

453 self._trigger_info_set: set[TriggerInfo] = set() 

454 self._notification_info_set: set[NotificationInfo] = set() 

455 self._provider_schema_validator = _create_provider_info_schema_validator() 

456 self._customized_form_fields_schema_validator = ( 

457 _create_customized_form_field_behaviours_schema_validator() 

458 ) 

459 # Set of plugins contained in providers 

460 self._plugins_set: set[PluginInfo] = set() 

461 self._init_airflow_core_hooks() 

462 

463 self._runtime_manager = None 

464 

465 def __getattribute__(self, name: str): 

466 # Hacky but does the trick for now 

467 runtime_properties = { 

468 "hooks", 

469 "taskflow_decorators", 

470 "filesystem_module_names", 

471 "asset_factories", 

472 "asset_uri_handlers", 

473 "asset_to_openlineage_converters", 

474 } 

475 

476 if name in runtime_properties: 

477 warnings.warn( 

478 f"ProvidersManager.{name} is deprecated. Use ProvidersManagerTaskRuntime.{name} from task-sdk instead.", 

479 DeprecatedImportWarning, 

480 stacklevel=2, 

481 ) 

482 if object.__getattribute__(self, "_runtime_manager") is None: 

483 from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime 

484 

485 object.__setattr__(self, "_runtime_manager", ProvidersManagerTaskRuntime()) 

486 return getattr(object.__getattribute__(self, "_runtime_manager"), name) 

487 

488 return object.__getattribute__(self, name) 

489 

490 def _init_airflow_core_hooks(self): 

491 """Initialize the hooks dict with default hooks from Airflow core.""" 

492 core_dummy_hooks = { 

493 "generic": "Generic", 

494 "email": "Email", 

495 } 

496 for key, display in core_dummy_hooks.items(): 

497 self._hooks_lazy_dict[key] = HookInfo( 

498 hook_class_name=None, 

499 connection_id_attribute_name=None, 

500 package_name=None, 

501 hook_name=display, 

502 connection_type=None, 

503 connection_testable=False, 

504 ) 

505 for conn_type, class_name in ( 

506 ("fs", "airflow.providers.standard.hooks.filesystem.FSHook"), 

507 ("package_index", "airflow.providers.standard.hooks.package_index.PackageIndexHook"), 

508 ): 

509 self._hooks_lazy_dict[conn_type] = functools.partial( 

510 self._import_hook, 

511 connection_type=None, 

512 package_name="apache-airflow-providers-standard", 

513 hook_class_name=class_name, 

514 provider_info=None, 

515 ) 

516 

517 @provider_info_cache("list") 

518 def initialize_providers_list(self): 

519 """Lazy initialization of providers list.""" 

520 # Local source folders are loaded first. They should take precedence over the package ones for 

521 # Development purpose. In production provider.yaml files are not present in the 'airflow" directory 

522 # So there is no risk we are going to override package provider accidentally. This can only happen 

523 # in case of local development 

524 discover_all_providers_from_packages(self._provider_dict, self._provider_schema_validator) 

525 self._verify_all_providers_all_compatible() 

526 self._provider_dict = dict(sorted(self._provider_dict.items())) 

527 

528 def _verify_all_providers_all_compatible(self): 

529 from packaging import version as packaging_version 

530 

531 for provider_id, info in self._provider_dict.items(): 

532 min_version = MIN_PROVIDER_VERSIONS.get(provider_id) 

533 if min_version: 

534 if packaging_version.parse(min_version) > packaging_version.parse(info.version): 

535 log.warning( 

536 "The package %s is not compatible with this version of Airflow. " 

537 "The package has version %s but the minimum supported version " 

538 "of the package is %s", 

539 provider_id, 

540 info.version, 

541 min_version, 

542 ) 

543 

544 @provider_info_cache("hooks") 

545 def initialize_providers_hooks(self): 

546 """Lazy initialization of providers hooks.""" 

547 self._init_airflow_core_hooks() 

548 self.initialize_providers_list() 

549 self._discover_hooks() 

550 self._load_ui_metadata() 

551 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items())) 

552 

553 @provider_info_cache("filesystems") 

554 def initialize_providers_filesystems(self): 

555 """Lazy initialization of providers filesystems.""" 

556 self.initialize_providers_list() 

557 self._discover_filesystems() 

558 

559 @provider_info_cache("asset_uris") 

560 def initialize_providers_asset_uri_resources(self): 

561 """Lazy initialization of provider asset URI handlers, factories, converters etc.""" 

562 self.initialize_providers_list() 

563 self._discover_asset_uri_resources() 

564 

565 @provider_info_cache("hook_lineage_writers") 

566 @provider_info_cache("taskflow_decorators") 

567 def initialize_providers_taskflow_decorator(self): 

568 """Lazy initialization of providers hooks.""" 

569 self.initialize_providers_list() 

570 self._discover_taskflow_decorators() 

571 

572 @provider_info_cache("extra_links") 

573 def initialize_providers_extra_links(self): 

574 """Lazy initialization of providers extra links.""" 

575 self.initialize_providers_list() 

576 self._discover_extra_links() 

577 

578 @provider_info_cache("logging") 

579 def initialize_providers_logging(self): 

580 """Lazy initialization of providers logging information.""" 

581 self.initialize_providers_list() 

582 self._discover_logging() 

583 

584 @provider_info_cache("remote_logging") 

585 def initialize_providers_remote_logging(self): 

586 """Lazy initialization of providers remote logging IO handlers.""" 

587 self.initialize_providers_list() 

588 self._discover_remote_logging() 

589 

590 @provider_info_cache("secrets_backends") 

591 def initialize_providers_secrets_backends(self): 

592 """Lazy initialization of providers secrets_backends information.""" 

593 self.initialize_providers_list() 

594 self._discover_secrets_backends() 

595 

596 @provider_info_cache("executors") 

597 def initialize_providers_executors(self): 

598 """Lazy initialization of providers executors information.""" 

599 self.initialize_providers_list() 

600 self._discover_executors(check=True) 

601 

602 @provider_info_cache("executors_without_check") 

603 def initialize_providers_executors_without_check(self): 

604 """Lazy initialization of providers executors information.""" 

605 self.initialize_providers_list() 

606 self._discover_executors(check=False) 

607 

608 @provider_info_cache("queues") 

609 def initialize_providers_queues(self): 

610 """Lazy initialization of providers queue information.""" 

611 self.initialize_providers_list() 

612 self._discover_queues() 

613 

614 @provider_info_cache("db_managers") 

615 def initialize_providers_db_managers(self): 

616 """Lazy initialization of providers db_managers information.""" 

617 self.initialize_providers_list() 

618 self._discover_db_managers() 

619 

620 @provider_info_cache("notifications") 

621 def initialize_providers_notifications(self): 

622 """Lazy initialization of providers notifications information.""" 

623 self.initialize_providers_list() 

624 self._discover_notifications() 

625 

626 @provider_info_cache("auth_managers") 

627 def initialize_providers_auth_managers(self): 

628 """Lazy initialization of providers auth manager information.""" 

629 self.initialize_providers_list() 

630 self._discover_auth_managers(check=True) 

631 

632 @provider_info_cache("auth_managers_without_check") 

633 def initialize_providers_auth_managers_without_check(self): 

634 """Lazy initialization of providers auth manager information.""" 

635 self.initialize_providers_list() 

636 self._discover_auth_managers(check=False) 

637 

638 @provider_info_cache("config") 

639 def initialize_providers_configuration(self): 

640 """Lazy initialization of provider configuration metadata and merge it into ``conf``.""" 

641 self.initialize_providers_list() 

642 self._discover_config() 

643 

644 @provider_info_cache("plugins") 

645 def initialize_providers_plugins(self): 

646 self.initialize_providers_list() 

647 self._discover_plugins() 

648 

649 @provider_info_cache("cli_command") 

650 def initialize_providers_cli_command(self): 

651 """Lazy initialization of providers CLI commands.""" 

652 self.initialize_providers_list() 

653 self._discover_cli_command() 

654 

655 def _discover_hooks_from_connection_types( 

656 self, 

657 hook_class_names_registered: set[str], 

658 already_registered_warning_connection_types: set[str], 

659 package_name: str, 

660 provider: ProviderInfo, 

661 ): 

662 """ 

663 Discover hooks from the "connection-types" property. 

664 

665 This is new, better method that replaces discovery from hook-class-names as it 

666 allows to lazy import individual Hook classes when they are accessed. 

667 The "connection-types" keeps information about both - connection type and class 

668 name so we can discover all connection-types without importing the classes. 

669 :param hook_class_names_registered: set of registered hook class names for this provider 

670 :param already_registered_warning_connection_types: set of connections for which warning should be 

671 printed in logs as they were already registered before 

672 :param package_name: 

673 :param provider: 

674 :return: 

675 """ 

676 provider_uses_connection_types = False 

677 connection_types = provider.data.get("connection-types") 

678 if connection_types: 

679 for connection_type_dict in connection_types: 

680 connection_type = connection_type_dict["connection-type"] 

681 hook_class_name = connection_type_dict["hook-class-name"] 

682 hook_class_names_registered.add(hook_class_name) 

683 already_registered = self._hook_provider_dict.get(connection_type) 

684 if already_registered: 

685 if already_registered.package_name != package_name: 

686 already_registered_warning_connection_types.add(connection_type) 

687 else: 

688 log.warning( 

689 "The connection type '%s' is already registered in the" 

690 " package '%s' with different class names: '%s' and '%s'. ", 

691 connection_type, 

692 package_name, 

693 already_registered.hook_class_name, 

694 hook_class_name, 

695 ) 

696 else: 

697 self._hook_provider_dict[connection_type] = HookClassProvider( 

698 hook_class_name=hook_class_name, package_name=package_name 

699 ) 

700 # Defer importing hook to access time by setting import hook method as dict value 

701 self._hooks_lazy_dict[connection_type] = functools.partial( 

702 self._import_hook, 

703 connection_type=connection_type, 

704 provider_info=provider, 

705 ) 

706 provider_uses_connection_types = True 

707 return provider_uses_connection_types 

708 

709 def _discover_hooks_from_hook_class_names( 

710 self, 

711 hook_class_names_registered: set[str], 

712 already_registered_warning_connection_types: set[str], 

713 package_name: str, 

714 provider: ProviderInfo, 

715 provider_uses_connection_types: bool, 

716 ): 

717 """ 

718 Discover hooks from "hook-class-names' property. 

719 

720 This property is deprecated but we should support it in Airflow 2. 

721 The hook-class-names array contained just Hook names without connection type, 

722 therefore we need to import all those classes immediately to know which connection types 

723 are supported. This makes it impossible to selectively only import those hooks that are used. 

724 :param already_registered_warning_connection_types: list of connection hooks that we should warn 

725 about when finished discovery 

726 :param package_name: name of the provider package 

727 :param provider: class that keeps information about version and details of the provider 

728 :param provider_uses_connection_types: determines whether the provider uses "connection-types" new 

729 form of passing connection types 

730 :return: 

731 """ 

732 hook_class_names = provider.data.get("hook-class-names") 

733 if hook_class_names: 

734 for hook_class_name in hook_class_names: 

735 if hook_class_name in hook_class_names_registered: 

736 # Silently ignore the hook class - it's already marked for lazy-import by 

737 # connection-types discovery 

738 continue 

739 hook_info = self._import_hook( 

740 connection_type=None, 

741 provider_info=provider, 

742 hook_class_name=hook_class_name, 

743 package_name=package_name, 

744 ) 

745 if not hook_info: 

746 # Problem why importing class - we ignore it. Log is written at import time 

747 continue 

748 already_registered = self._hook_provider_dict.get(hook_info.connection_type) 

749 if already_registered: 

750 if already_registered.package_name != package_name: 

751 already_registered_warning_connection_types.add(hook_info.connection_type) 

752 else: 

753 if already_registered.hook_class_name != hook_class_name: 

754 log.warning( 

755 "The hook connection type '%s' is registered twice in the" 

756 " package '%s' with different class names: '%s' and '%s'. " 

757 " Please fix it!", 

758 hook_info.connection_type, 

759 package_name, 

760 already_registered.hook_class_name, 

761 hook_class_name, 

762 ) 

763 else: 

764 self._hook_provider_dict[hook_info.connection_type] = HookClassProvider( 

765 hook_class_name=hook_class_name, package_name=package_name 

766 ) 

767 self._hooks_lazy_dict[hook_info.connection_type] = hook_info 

768 

769 if not provider_uses_connection_types: 

770 warnings.warn( 

771 f"The provider {package_name} uses `hook-class-names` " 

772 "property in provider-info and has no `connection-types` one. " 

773 "The 'hook-class-names' property has been deprecated in favour " 

774 "of 'connection-types' in Airflow 2.2. Use **both** in case you want to " 

775 "have backwards compatibility with Airflow < 2.2", 

776 DeprecationWarning, 

777 stacklevel=1, 

778 ) 

779 for already_registered_connection_type in already_registered_warning_connection_types: 

780 log.warning( 

781 "The connection_type '%s' has been already registered by provider '%s.'", 

782 already_registered_connection_type, 

783 self._hook_provider_dict[already_registered_connection_type].package_name, 

784 ) 

785 

786 def _discover_hooks(self) -> None: 

787 """Retrieve all connections defined in the providers via Hooks.""" 

788 for package_name, provider in self._provider_dict.items(): 

789 duplicated_connection_types: set[str] = set() 

790 hook_class_names_registered: set[str] = set() 

791 self._discover_provider_dialects(package_name, provider) 

792 provider_uses_connection_types = self._discover_hooks_from_connection_types( 

793 hook_class_names_registered, duplicated_connection_types, package_name, provider 

794 ) 

795 self._discover_hooks_from_hook_class_names( 

796 hook_class_names_registered, 

797 duplicated_connection_types, 

798 package_name, 

799 provider, 

800 provider_uses_connection_types, 

801 ) 

802 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items())) 

803 

804 def _discover_provider_dialects(self, provider_name: str, provider: ProviderInfo): 

805 dialects = provider.data.get("dialects", []) 

806 if dialects: 

807 self._dialect_provider_dict.update( 

808 { 

809 item["dialect-type"]: DialectInfo( 

810 name=item["dialect-type"], 

811 dialect_class_name=item["dialect-class-name"], 

812 provider_name=provider_name, 

813 ) 

814 for item in dialects 

815 } 

816 ) 

817 

818 @provider_info_cache("import_all_hooks") 

819 def _import_info_from_all_hooks(self): 

820 """Force-import all hooks and initialize the connections/fields.""" 

821 # Retrieve all hooks to make sure that all of them are imported 

822 _ = list(self._hooks_lazy_dict.values()) 

823 self._field_behaviours = dict(sorted(self._field_behaviours.items())) 

824 

825 # Widgets for connection forms are currently used in two places: 

826 # 1. In the UI Connections, expected same order that it defined in Hook. 

827 # 2. cli command - `airflow providers widgets` and expected that it in alphabetical order. 

828 # It is not possible to recover original ordering after sorting, 

829 # that the main reason why original sorting moved to cli part: 

830 # self._connection_form_widgets = dict(sorted(self._connection_form_widgets.items())) 

831 

832 def _discover_filesystems(self) -> None: 

833 """Retrieve all filesystems defined in the providers.""" 

834 for provider_package, provider in self._provider_dict.items(): 

835 for fs_module_name in provider.data.get("filesystems", []): 

836 if _correctness_check(provider_package, f"{fs_module_name}.get_fs", provider): 

837 self._fs_set.add(fs_module_name) 

838 self._fs_set = set(sorted(self._fs_set)) 

839 

840 def _discover_asset_uri_resources(self) -> None: 

841 """Discovers and registers asset URI handlers, factories, and converters for all providers.""" 

842 from airflow.sdk.definitions.asset import normalize_noop 

843 

844 def _safe_register_resource( 

845 provider_package_name: str, 

846 schemes_list: list[str], 

847 resource_path: str | None, 

848 resource_registry: dict, 

849 default_resource: Any = None, 

850 ): 

851 """ 

852 Register a specific resource (handler, factory, or converter) for the given schemes. 

853 

854 If the resolved resource (either from the path or the default) is valid, it updates 

855 the resource registry with the appropriate resource for each scheme. 

856 """ 

857 resource = ( 

858 _correctness_check(provider_package_name, resource_path, provider) 

859 if resource_path is not None 

860 else default_resource 

861 ) 

862 if resource: 

863 resource_registry.update((scheme, resource) for scheme in schemes_list) 

864 

865 for provider_name, provider in self._provider_dict.items(): 

866 for uri_info in provider.data.get("asset-uris", []): 

867 if "schemes" not in uri_info or "handler" not in uri_info: 

868 continue # Both schemas and handler must be explicitly set, handler can be set to null 

869 common_args = {"schemes_list": uri_info["schemes"], "provider_package_name": provider_name} 

870 _safe_register_resource( 

871 resource_path=uri_info["handler"], 

872 resource_registry=self._asset_uri_handlers, 

873 default_resource=normalize_noop, 

874 **common_args, 

875 ) 

876 _safe_register_resource( 

877 resource_path=uri_info.get("factory"), 

878 resource_registry=self._asset_factories, 

879 **common_args, 

880 ) 

881 _safe_register_resource( 

882 resource_path=uri_info.get("to_openlineage_converter"), 

883 resource_registry=self._asset_to_openlineage_converters, 

884 **common_args, 

885 ) 

886 

887 def _discover_taskflow_decorators(self) -> None: 

888 for name, info in self._provider_dict.items(): 

889 for taskflow_decorator in info.data.get("task-decorators", []): 

890 self._add_taskflow_decorator( 

891 taskflow_decorator["name"], taskflow_decorator["class-name"], name 

892 ) 

893 

894 def _add_taskflow_decorator(self, name, decorator_class_name: str, provider_package: str) -> None: 

895 if not _check_builtin_provider_prefix(provider_package, decorator_class_name): 

896 return 

897 

898 if name in self._taskflow_decorators: 

899 try: 

900 existing = self._taskflow_decorators[name] 

901 other_name = f"{existing.__module__}.{existing.__name__}" 

902 except Exception: 

903 # If problem importing, then get the value from the functools.partial 

904 other_name = self._taskflow_decorators._raw_dict[name].args[0] # type: ignore[attr-defined] 

905 

906 log.warning( 

907 "The taskflow decorator '%s' has been already registered (by %s).", 

908 name, 

909 other_name, 

910 ) 

911 return 

912 

913 self._taskflow_decorators[name] = functools.partial(import_string, decorator_class_name) 

914 

915 @staticmethod 

916 def _get_attr(obj: Any, attr_name: str): 

917 """Retrieve attributes of an object, or warn if not found.""" 

918 if not hasattr(obj, attr_name): 

919 log.warning("The object '%s' is missing %s attribute and cannot be registered", obj, attr_name) 

920 return None 

921 return getattr(obj, attr_name) 

922 

923 def _get_connection_type_config(self, provider_info: ProviderInfo, connection_type: str) -> dict | None: 

924 """Get connection type config from provider.yaml if it exists.""" 

925 connection_types = provider_info.data.get("connection-types", []) 

926 for conn_config in connection_types: 

927 if conn_config.get("connection-type") == connection_type: 

928 return conn_config 

929 return None 

930 

931 def _to_api_format(self, field_name: str, field_def: dict) -> dict: 

932 """Convert conn-fields definition to format expected by the API.""" 

933 schema_def = field_def.get("schema", {}) 

934 

935 # build schema dict with label moved to `title` per jsonschema convention 

936 schema = schema_def.copy() 

937 if "label" in field_def: 

938 schema["title"] = field_def.get("label") 

939 

940 return { 

941 "value": schema_def.get("default"), 

942 "schema": schema, 

943 "description": field_def.get("description"), 

944 "source": None, 

945 } 

946 

947 def _add_widgets( 

948 self, package_name: str, hook_class_name: str, connection_type: str, conn_fields: dict 

949 ) -> None: 

950 """Parse conn-fields from provider info and add to connection_form_widgets.""" 

951 for field_name, field_def in conn_fields.items(): 

952 field_data = self._to_api_format(field_name, field_def) 

953 

954 prefixed_name = f"extra__{connection_type}__{field_name}" 

955 if prefixed_name in self._connection_form_widgets: 

956 log.warning( 

957 "Field %s for connection type %s already added, skipping", 

958 field_name, 

959 connection_type, 

960 ) 

961 continue 

962 

963 schema_def = field_def.get("schema", {}) 

964 self._connection_form_widgets[prefixed_name] = ConnectionFormWidgetInfo( 

965 hook_class_name=hook_class_name, 

966 package_name=package_name, 

967 field=field_data, 

968 field_name=field_name, 

969 is_sensitive=schema_def.get("format") == "password", 

970 ) 

971 

972 def _add_customized_fields(self, package_name: str, connection_type: str, behaviour: dict) -> None: 

973 """Process ui-field-behaviour from provider info and add to field_behaviours.""" 

974 if connection_type in self._field_behaviours: 

975 log.warning( 

976 "Field behaviour for connection type %s already exists, skipping", 

977 connection_type, 

978 ) 

979 return 

980 

981 # convert kebab-case keys to python style 

982 customized_fields = { 

983 "hidden_fields": behaviour.get("hidden-fields", []), 

984 "relabeling": behaviour.get("relabeling", {}), 

985 "placeholders": behaviour.get("placeholders", {}), 

986 } 

987 

988 try: 

989 self._customized_form_fields_schema_validator.validate(customized_fields) 

990 customized_fields = _ensure_prefix_for_placeholders(customized_fields, connection_type) 

991 self._field_behaviours[connection_type] = customized_fields 

992 except Exception as e: 

993 log.warning( 

994 "Failed to add field behaviour for %s in package %s: %s", 

995 connection_type, 

996 package_name, 

997 e, 

998 ) 

999 

1000 def _load_ui_metadata(self) -> None: 

1001 """Load connection form UI metadata from provider info without importing hooks.""" 

1002 for package_name, provider in self._provider_dict.items(): 

1003 for conn_config in provider.data.get("connection-types", []): 

1004 connection_type = conn_config.get("connection-type") 

1005 hook_class_name = conn_config.get("hook-class-name") 

1006 if not connection_type or not hook_class_name: 

1007 continue 

1008 

1009 if hook_name := conn_config.get("hook-name"): 

1010 self._hook_name_dict[connection_type] = hook_name 

1011 

1012 if conn_fields := conn_config.get("conn-fields"): 

1013 self._add_widgets(package_name, hook_class_name, connection_type, conn_fields) 

1014 

1015 if behaviour := conn_config.get("ui-field-behaviour"): 

1016 self._add_customized_fields(package_name, connection_type, behaviour) 

1017 

1018 def _import_hook( 

1019 self, 

1020 connection_type: str | None, 

1021 provider_info: ProviderInfo, 

1022 hook_class_name: str | None = None, 

1023 package_name: str | None = None, 

1024 ) -> HookInfo | None: 

1025 """ 

1026 Import hook and retrieve hook information. 

1027 

1028 Either connection_type (for lazy loading) or hook_class_name must be set - but not both). 

1029 Only needs package_name if hook_class_name is passed (for lazy loading, package_name 

1030 is retrieved from _connection_type_class_provider_dict together with hook_class_name). 

1031 

1032 :param connection_type: type of the connection 

1033 :param hook_class_name: name of the hook class 

1034 :param package_name: provider package - only needed in case connection_type is missing 

1035 : return 

1036 """ 

1037 if connection_type is None and hook_class_name is None: 

1038 raise ValueError("Either connection_type or hook_class_name must be set") 

1039 if connection_type is not None and hook_class_name is not None: 

1040 raise ValueError( 

1041 f"Both connection_type ({connection_type} and " 

1042 f"hook_class_name {hook_class_name} are set. Only one should be set!" 

1043 ) 

1044 if connection_type is not None: 

1045 class_provider = self._hook_provider_dict[connection_type] 

1046 package_name = class_provider.package_name 

1047 hook_class_name = class_provider.hook_class_name 

1048 else: 

1049 if not hook_class_name: 

1050 raise ValueError("Either connection_type or hook_class_name must be set") 

1051 if not package_name: 

1052 raise ValueError( 

1053 f"Provider package name is not set when hook_class_name ({hook_class_name}) is used" 

1054 ) 

1055 hook_class: type[BaseHook] | None = _correctness_check(package_name, hook_class_name, provider_info) 

1056 if hook_class is None: 

1057 return None 

1058 

1059 # Check if provider info already has UI metadata and skip Python hook methods 

1060 # to avoid duplicate initialization and unnecessary wtforms imports 

1061 ui_metadata_loaded = False 

1062 if provider_info and connection_type: 

1063 conn_config = self._get_connection_type_config(provider_info, connection_type) 

1064 ui_metadata_loaded = conn_config is not None and bool( 

1065 conn_config.get("conn-fields") or conn_config.get("ui-field-behaviour") 

1066 ) 

1067 

1068 if not ui_metadata_loaded: 

1069 try: 

1070 from wtforms import BooleanField, IntegerField, PasswordField, StringField 

1071 

1072 allowed_field_classes = [IntegerField, PasswordField, StringField, BooleanField] 

1073 # Do not use attr here. We want to check only direct class fields not those 

1074 # inherited from parent hook. This way we add form fields only once for the whole 

1075 # hierarchy and we add it only from the parent hook that provides those! 

1076 if "get_connection_form_widgets" in hook_class.__dict__: 

1077 warning = AirflowProviderDeprecationWarning( 

1078 f"Hook '{hook_class_name}' defines get_connection_form_widgets(). " 

1079 "This method is deprecated. Define connection fields declaratively in " 

1080 "provider.yaml under 'conn-fields' instead. See " 

1081 "https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html" 

1082 ) 

1083 warning.deprecated_provider_since = "3.2.0" 

1084 warnings.warn(warning, stacklevel=2) 

1085 widgets = hook_class.get_connection_form_widgets() 

1086 if widgets: 

1087 for widget in widgets.values(): 

1088 if widget.field_class not in allowed_field_classes: 

1089 log.warning( 

1090 "The hook_class '%s' uses field of unsupported class '%s'. " 

1091 "Only '%s' field classes are supported", 

1092 hook_class_name, 

1093 widget.field_class, 

1094 allowed_field_classes, 

1095 ) 

1096 return None 

1097 self._add_widgets_from_hook(package_name, hook_class, widgets) 

1098 if "get_ui_field_behaviour" in hook_class.__dict__: 

1099 warning = AirflowProviderDeprecationWarning( 

1100 f"Hook '{hook_class_name}' defines get_ui_field_behaviour(). " 

1101 "This method is deprecated. Define field behaviour declaratively in " 

1102 "provider.yaml under 'ui-field-behaviour' instead. See " 

1103 "https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html" 

1104 ) 

1105 warning.deprecated_provider_since = "3.2.0" 

1106 warnings.warn(warning, stacklevel=2) 

1107 field_behaviours = hook_class.get_ui_field_behaviour() 

1108 if field_behaviours: 

1109 self._add_customized_fields_from_hook(package_name, hook_class, field_behaviours) 

1110 except ImportError as e: 

1111 if e.name in ["flask_appbuilder", "wtforms"]: 

1112 log.info( 

1113 "The hook_class '%s' is not fully initialized (UI widgets will be missing), because " 

1114 "the 'flask_appbuilder' package is not installed, however it is not required for " 

1115 "Airflow components to work", 

1116 hook_class_name, 

1117 ) 

1118 except Exception as e: 

1119 log.warning( 

1120 "Exception when importing '%s' from '%s' package: %s", 

1121 hook_class_name, 

1122 package_name, 

1123 e, 

1124 ) 

1125 return None 

1126 

1127 hook_connection_type = self._get_attr(hook_class, "conn_type") 

1128 if connection_type: 

1129 if hook_connection_type != connection_type: 

1130 log.warning( 

1131 "Inconsistency! The hook class '%s' declares connection type '%s'" 

1132 " but it is added by provider '%s' as connection_type '%s' in provider info. " 

1133 "This should be fixed!", 

1134 hook_class, 

1135 hook_connection_type, 

1136 package_name, 

1137 connection_type, 

1138 ) 

1139 connection_type = hook_connection_type 

1140 connection_id_attribute_name: str = self._get_attr(hook_class, "conn_name_attr") 

1141 hook_name: str = self._get_attr(hook_class, "hook_name") 

1142 

1143 if not connection_type or not connection_id_attribute_name or not hook_name: 

1144 log.warning( 

1145 "The hook misses one of the key attributes: " 

1146 "conn_type: %s, conn_id_attribute_name: %s, hook_name: %s", 

1147 connection_type, 

1148 connection_id_attribute_name, 

1149 hook_name, 

1150 ) 

1151 return None 

1152 

1153 return HookInfo( 

1154 hook_class_name=hook_class_name, 

1155 connection_id_attribute_name=connection_id_attribute_name, 

1156 package_name=package_name, 

1157 hook_name=hook_name, 

1158 connection_type=connection_type, 

1159 connection_testable=hasattr(hook_class, "test_connection"), 

1160 ) 

1161 

1162 def _add_widgets_from_hook(self, package_name: str, hook_class: type, widgets: dict[str, Any]): 

1163 conn_type = hook_class.conn_type # type: ignore 

1164 for field_identifier, field in widgets.items(): 

1165 if field_identifier.startswith("extra__"): 

1166 prefixed_field_name = field_identifier 

1167 else: 

1168 prefixed_field_name = f"extra__{conn_type}__{field_identifier}" 

1169 if prefixed_field_name in self._connection_form_widgets: 

1170 log.warning( 

1171 "The field %s from class %s has already been added by another provider. Ignoring it.", 

1172 field_identifier, 

1173 hook_class.__name__, 

1174 ) 

1175 # In case of inherited hooks this might be happening several times 

1176 else: 

1177 self._connection_form_widgets[prefixed_field_name] = ConnectionFormWidgetInfo( 

1178 hook_class.__name__, 

1179 package_name, 

1180 field, 

1181 field_identifier, 

1182 hasattr(field.field_class.widget, "input_type") 

1183 and field.field_class.widget.input_type == "password", 

1184 ) 

1185 

1186 def _add_customized_fields_from_hook(self, package_name: str, hook_class: type, customized_fields: dict): 

1187 try: 

1188 connection_type = getattr(hook_class, "conn_type") 

1189 

1190 self._customized_form_fields_schema_validator.validate(customized_fields) 

1191 

1192 if connection_type: 

1193 customized_fields = _ensure_prefix_for_placeholders(customized_fields, connection_type) 

1194 

1195 if connection_type in self._field_behaviours: 

1196 log.warning( 

1197 "The connection_type %s from package %s and class %s has already been added " 

1198 "by another provider. Ignoring it.", 

1199 connection_type, 

1200 package_name, 

1201 hook_class.__name__, 

1202 ) 

1203 return 

1204 self._field_behaviours[connection_type] = customized_fields 

1205 except Exception as e: 

1206 log.warning( 

1207 "Error when loading customized fields from package '%s' hook class '%s': %s", 

1208 package_name, 

1209 hook_class.__name__, 

1210 e, 

1211 ) 

1212 

1213 def _discover_auth_managers(self, *, check: bool) -> None: 

1214 """Retrieve all auth managers defined in the providers.""" 

1215 for provider_package, provider in self._provider_dict.items(): 

1216 if provider.data.get("auth-managers"): 

1217 for auth_manager_class_name in provider.data["auth-managers"]: 

1218 if not check: 

1219 self._auth_manager_without_check_set.add((auth_manager_class_name, provider_package)) 

1220 elif _correctness_check(provider_package, auth_manager_class_name, provider): 

1221 self._auth_manager_class_name_set.add(auth_manager_class_name) 

1222 

1223 def _discover_cli_command(self) -> None: 

1224 """Retrieve all CLI command functions defined in the providers.""" 

1225 for provider_package, provider in self._provider_dict.items(): 

1226 if provider.data.get("cli"): 

1227 for cli_command_function_name in provider.data["cli"]: 

1228 # _correctness_check will return the function if found and correct 

1229 # we store the function itself instead of its name to avoid importing it again later in cli_parser to speed up cli loading 

1230 if cli_func := _correctness_check(provider_package, cli_command_function_name, provider): 

1231 cli_func = cast("Callable[[], list[CLICommand]]", cli_func) 

1232 self._cli_command_functions_set.add(cli_func) 

1233 self._cli_command_provider_name_set.add(provider_package) 

1234 

1235 def _discover_notifications(self) -> None: 

1236 """Retrieve all notifications defined in the providers.""" 

1237 for provider_package, provider in self._provider_dict.items(): 

1238 if provider.data.get("notifications"): 

1239 for notification_class_name in provider.data["notifications"]: 

1240 if _correctness_check(provider_package, notification_class_name, provider): 

1241 self._notification_info_set.add(notification_class_name) 

1242 

1243 def _discover_extra_links(self) -> None: 

1244 """Retrieve all extra links defined in the providers.""" 

1245 for provider_package, provider in self._provider_dict.items(): 

1246 if provider.data.get("extra-links"): 

1247 for extra_link_class_name in provider.data["extra-links"]: 

1248 if _correctness_check(provider_package, extra_link_class_name, provider): 

1249 self._extra_link_class_name_set.add(extra_link_class_name) 

1250 

1251 def _discover_logging(self) -> None: 

1252 """Retrieve all logging defined in the providers.""" 

1253 for provider_package, provider in self._provider_dict.items(): 

1254 if provider.data.get("logging"): 

1255 for logging_class_name in provider.data["logging"]: 

1256 if _correctness_check(provider_package, logging_class_name, provider): 

1257 self._logging_class_name_set.add(logging_class_name) 

1258 

1259 def _discover_remote_logging(self) -> None: 

1260 """Retrieve all remote logging IO handlers defined in the providers.""" 

1261 for provider_package, provider in self._provider_dict.items(): 

1262 entries = provider.data.get("remote-logging") or [] 

1263 for entry in entries: 

1264 classpath = entry["classpath"] 

1265 if not _correctness_check(provider_package, classpath, provider): 

1266 continue 

1267 info = RemoteLoggingInfo( 

1268 classpath=classpath, 

1269 scheme=entry["scheme"], 

1270 package_name=provider_package, 

1271 ) 

1272 if (existing := self._remote_logging_by_scheme.get(info.scheme)) is not None: 

1273 log.warning( 

1274 "Remote logging scheme '%s' is already registered by %s; ignoring " 

1275 "duplicate registration from %s.", 

1276 info.scheme, 

1277 existing.package_name, 

1278 info.package_name, 

1279 ) 

1280 continue 

1281 self._remote_logging_info_list.append(info) 

1282 self._remote_logging_by_scheme[info.scheme] = info 

1283 

1284 def _discover_secrets_backends(self) -> None: 

1285 """Retrieve all secrets backends defined in the providers.""" 

1286 for provider_package, provider in self._provider_dict.items(): 

1287 if provider.data.get("secrets-backends"): 

1288 for secrets_backends_class_name in provider.data["secrets-backends"]: 

1289 if _correctness_check(provider_package, secrets_backends_class_name, provider): 

1290 self._secrets_backend_class_name_set.add(secrets_backends_class_name) 

1291 

1292 def _discover_executors(self, *, check: bool) -> None: 

1293 """Retrieve all executors defined in the providers.""" 

1294 for provider_package, provider in self._provider_dict.items(): 

1295 if provider.data.get("executors"): 

1296 for executors_class_path in provider.data["executors"]: 

1297 if not check: 

1298 self._executor_without_check_set.add((executors_class_path, provider_package)) 

1299 elif _correctness_check(provider_package, executors_class_path, provider): 

1300 self._executor_class_name_set.add(executors_class_path) 

1301 

1302 def _discover_queues(self) -> None: 

1303 """Retrieve all queues defined in the providers.""" 

1304 for provider_package, provider in self._provider_dict.items(): 

1305 if provider.data.get("queues"): 

1306 for queue_class_name in provider.data["queues"]: 

1307 if _correctness_check(provider_package, queue_class_name, provider): 

1308 self._queue_class_name_set.add(queue_class_name) 

1309 

1310 def _discover_db_managers(self) -> None: 

1311 """Retrieve all DB managers defined in the providers.""" 

1312 for provider_package, provider in self._provider_dict.items(): 

1313 if provider.data.get("db-managers"): 

1314 for db_manager_class_name in provider.data["db-managers"]: 

1315 if _correctness_check(provider_package, db_manager_class_name, provider): 

1316 self._db_manager_class_name_set.add(db_manager_class_name) 

1317 

1318 def _discover_config(self) -> None: 

1319 """Retrieve all configs defined in the providers.""" 

1320 for provider_package, provider in self._provider_dict.items(): 

1321 if provider.data.get("config"): 

1322 self._provider_configs[provider_package] = provider.data.get("config") # type: ignore[assignment] 

1323 

1324 def _discover_plugins(self) -> None: 

1325 """Retrieve all plugins defined in the providers.""" 

1326 for provider_package, provider in self._provider_dict.items(): 

1327 for plugin_dict in provider.data.get("plugins", ()): 

1328 if not _correctness_check(provider_package, plugin_dict["plugin-class"], provider): 

1329 log.warning("Plugin not loaded due to above correctness check problem.") 

1330 continue 

1331 self._plugins_set.add( 

1332 PluginInfo( 

1333 name=plugin_dict["name"], 

1334 plugin_class=plugin_dict["plugin-class"], 

1335 provider_name=provider_package, 

1336 ) 

1337 ) 

1338 

1339 @provider_info_cache("triggers") 

1340 def initialize_providers_triggers(self): 

1341 """Initialize providers triggers.""" 

1342 self.initialize_providers_list() 

1343 for provider_package, provider in self._provider_dict.items(): 

1344 for trigger in provider.data.get("triggers", []): 

1345 for trigger_class_name in trigger.get("python-modules"): 

1346 self._trigger_info_set.add( 

1347 TriggerInfo( 

1348 package_name=provider_package, 

1349 trigger_class_name=trigger_class_name, 

1350 integration_name=trigger.get("integration-name", ""), 

1351 ) 

1352 ) 

1353 

1354 @property 

1355 def auth_managers(self) -> list[str]: 

1356 """Returns information about available providers notifications class.""" 

1357 self.initialize_providers_auth_managers() 

1358 return sorted(self._auth_manager_class_name_set) 

1359 

1360 @property 

1361 def auth_manager_without_check(self) -> set[tuple[str, str]]: 

1362 """Returns set of (auth manager class names, provider package name) without correctness check.""" 

1363 self.initialize_providers_auth_managers_without_check() 

1364 return self._auth_manager_without_check_set 

1365 

1366 @property 

1367 def cli_command_functions(self) -> set[Callable[[], list[CLICommand]]]: 

1368 """Returns list of CLI command function names from providers.""" 

1369 self.initialize_providers_cli_command() 

1370 return self._cli_command_functions_set 

1371 

1372 @property 

1373 def cli_command_providers(self) -> set[str]: 

1374 """Returns set of provider package names that provide CLI commands.""" 

1375 self.initialize_providers_cli_command() 

1376 return self._cli_command_provider_name_set 

1377 

1378 @property 

1379 def notification(self) -> list[NotificationInfo]: 

1380 """Returns information about available providers notifications class.""" 

1381 self.initialize_providers_notifications() 

1382 return sorted(self._notification_info_set) 

1383 

1384 @property 

1385 def trigger(self) -> list[TriggerInfo]: 

1386 """Returns information about available providers trigger class.""" 

1387 self.initialize_providers_triggers() 

1388 return sorted(self._trigger_info_set, key=lambda x: x.package_name) 

1389 

1390 @property 

1391 def providers(self) -> dict[str, ProviderInfo]: 

1392 """Returns information about available providers.""" 

1393 self.initialize_providers_list() 

1394 return self._provider_dict 

1395 

1396 @property 

1397 def hooks(self) -> MutableMapping[str, HookInfo | None]: 

1398 """ 

1399 Return dictionary of connection_type-to-hook mapping. 

1400 

1401 Note that the dict can contain None values if a hook discovered cannot be imported! 

1402 """ 

1403 self.initialize_providers_hooks() 

1404 # When we return hooks here it will only be used to retrieve hook information 

1405 return self._hooks_lazy_dict 

1406 

1407 def iter_connection_type_hook_ui_metadata(self) -> Iterator[ConnectionTypeHookUIMetadata]: 

1408 """ 

1409 Yield hook metadata per connection type for the connection UI. 

1410 

1411 Does not import hook classes. 

1412 """ 

1413 self.initialize_providers_hooks() 

1414 all_types = frozenset(self._hooks_lazy_dict) | frozenset(self._hook_provider_dict) 

1415 for conn_type in sorted(all_types): 

1416 raw_entry = self._hooks_lazy_dict._raw_dict.get(conn_type) 

1417 provider_entry = self._hook_provider_dict.get(conn_type) 

1418 if isinstance(raw_entry, HookInfo): 

1419 hook_name = raw_entry.hook_name 

1420 hook_class_name = raw_entry.hook_class_name 

1421 elif provider_entry: 

1422 hook_name = self._hook_name_dict.get(conn_type, conn_type) 

1423 hook_class_name = provider_entry.hook_class_name 

1424 else: 

1425 hook_name = self._hook_name_dict.get(conn_type, conn_type) 

1426 hook_class_name = None 

1427 yield ConnectionTypeHookUIMetadata( 

1428 connection_type=conn_type, 

1429 hook_name=hook_name, 

1430 hook_class_name=hook_class_name, 

1431 field_behaviour=self._field_behaviours.get(conn_type), 

1432 ) 

1433 

1434 @property 

1435 def _connection_form_widgets_from_metadata(self) -> dict[str, ConnectionFormWidgetInfo]: 

1436 """Return connection form widgets from metadata without importing every hook.""" 

1437 self.initialize_providers_hooks() 

1438 return self._connection_form_widgets 

1439 

1440 @property 

1441 def _field_behaviours_from_metadata(self) -> dict[str, dict]: 

1442 """Return field behaviour dicts from metadata without importing every hook.""" 

1443 self.initialize_providers_hooks() 

1444 return self._field_behaviours 

1445 

1446 @property 

1447 def dialects(self) -> MutableMapping[str, DialectInfo]: 

1448 """Return dictionary of connection_type-to-dialect mapping.""" 

1449 self.initialize_providers_hooks() 

1450 # When we return dialects here it will only be used to retrieve dialect information 

1451 return self._dialect_provider_dict 

1452 

1453 @property 

1454 def plugins(self) -> list[PluginInfo]: 

1455 """Returns information about plugins available in providers.""" 

1456 self.initialize_providers_plugins() 

1457 return sorted(self._plugins_set, key=lambda x: x.plugin_class) 

1458 

1459 @property 

1460 def taskflow_decorators(self) -> dict[str, TaskDecorator]: 

1461 self.initialize_providers_taskflow_decorator() 

1462 return self._taskflow_decorators # type: ignore[return-value] 

1463 

1464 @property 

1465 def extra_links_class_names(self) -> list[str]: 

1466 """Returns set of extra link class names.""" 

1467 self.initialize_providers_extra_links() 

1468 return sorted(self._extra_link_class_name_set) 

1469 

1470 @property 

1471 def connection_form_widgets(self) -> dict[str, ConnectionFormWidgetInfo]: 

1472 """ 

1473 Returns widgets for connection forms. 

1474 

1475 Dictionary keys in the same order that it defined in Hook. 

1476 """ 

1477 self.initialize_providers_hooks() 

1478 self._import_info_from_all_hooks() 

1479 return self._connection_form_widgets 

1480 

1481 @property 

1482 def field_behaviours(self) -> dict[str, dict]: 

1483 """Returns dictionary with field behaviours for connection types.""" 

1484 self.initialize_providers_hooks() 

1485 self._import_info_from_all_hooks() 

1486 return self._field_behaviours 

1487 

1488 @property 

1489 def logging_class_names(self) -> list[str]: 

1490 """Returns set of log task handlers class names.""" 

1491 self.initialize_providers_logging() 

1492 return sorted(self._logging_class_name_set) 

1493 

1494 @property 

1495 def remote_logging_handlers(self) -> list[RemoteLoggingInfo]: 

1496 """Return all remote logging IO handlers contributed by providers.""" 

1497 self.initialize_providers_remote_logging() 

1498 return list(self._remote_logging_info_list) 

1499 

1500 def remote_logging_handler_by_scheme(self, scheme: str) -> RemoteLoggingInfo | None: 

1501 """Return the remote logging IO handler registered for the given URL scheme, if any.""" 

1502 self.initialize_providers_remote_logging() 

1503 return self._remote_logging_by_scheme.get(scheme) 

1504 

1505 @property 

1506 def secrets_backend_class_names(self) -> list[str]: 

1507 """Returns set of secret backend class names.""" 

1508 self.initialize_providers_secrets_backends() 

1509 return sorted(self._secrets_backend_class_name_set) 

1510 

1511 @property 

1512 def executor_class_names(self) -> list[str]: 

1513 self.initialize_providers_executors() 

1514 return sorted(self._executor_class_name_set) 

1515 

1516 @property 

1517 def executor_without_check(self) -> set[tuple[str, str]]: 

1518 """Returns set of (executor class names, provider package name) without correctness check.""" 

1519 self.initialize_providers_executors_without_check() 

1520 return self._executor_without_check_set 

1521 

1522 @property 

1523 def queue_class_names(self) -> list[str]: 

1524 self.initialize_providers_queues() 

1525 return sorted(self._queue_class_name_set) 

1526 

1527 @property 

1528 def db_managers(self) -> list[str]: 

1529 self.initialize_providers_db_managers() 

1530 return sorted(self._db_manager_class_name_set) 

1531 

1532 @property 

1533 def filesystem_module_names(self) -> list[str]: 

1534 self.initialize_providers_filesystems() 

1535 return sorted(self._fs_set) 

1536 

1537 @property 

1538 def asset_factories(self) -> dict[str, Callable[..., Asset]]: 

1539 self.initialize_providers_asset_uri_resources() 

1540 return self._asset_factories 

1541 

1542 @property 

1543 def asset_uri_handlers(self) -> dict[str, Callable[[SplitResult], SplitResult]]: 

1544 self.initialize_providers_asset_uri_resources() 

1545 return self._asset_uri_handlers 

1546 

1547 @property 

1548 def asset_to_openlineage_converters( 

1549 self, 

1550 ) -> dict[str, Callable]: 

1551 self.initialize_providers_asset_uri_resources() 

1552 return self._asset_to_openlineage_converters 

1553 

1554 @property 

1555 def provider_configs(self) -> list[tuple[str, dict[str, Any]]]: 

1556 self.initialize_providers_configuration() 

1557 return sorted(self._provider_configs.items(), key=lambda x: x[0]) 

1558 

1559 @property 

1560 def already_initialized_provider_configs(self) -> list[tuple[str, dict[str, Any]]]: 

1561 """ 

1562 Return provider configs that have already been initialized. 

1563 

1564 .. deprecated:: 3.2.0 

1565 Use ``provider_configs`` instead. This property is kept for backwards 

1566 compatibility and will be removed in a future version. 

1567 """ 

1568 warnings.warn( 

1569 "already_initialized_provider_configs is deprecated. Use `provider_configs` instead.", 

1570 DeprecationWarning, 

1571 stacklevel=2, 

1572 ) 

1573 return sorted(self._provider_configs.items(), key=lambda x: x[0]) 

1574 

1575 def _cleanup(self): 

1576 self._initialized_cache.clear() 

1577 self._provider_dict.clear() 

1578 self._fs_set.clear() 

1579 self._taskflow_decorators.clear() 

1580 self._hook_provider_dict.clear() 

1581 self._dialect_provider_dict.clear() 

1582 self._hooks_lazy_dict.clear() 

1583 self._connection_form_widgets.clear() 

1584 self._field_behaviours.clear() 

1585 self._extra_link_class_name_set.clear() 

1586 self._logging_class_name_set.clear() 

1587 self._remote_logging_info_list.clear() 

1588 self._remote_logging_by_scheme.clear() 

1589 self._auth_manager_class_name_set.clear() 

1590 self._auth_manager_without_check_set.clear() 

1591 self._secrets_backend_class_name_set.clear() 

1592 self._executor_class_name_set.clear() 

1593 self._executor_without_check_set.clear() 

1594 self._queue_class_name_set.clear() 

1595 self._provider_configs.clear() 

1596 

1597 # Imported lazily to avoid a configuration/providers_manager import cycle during cleanup. 

1598 from airflow.configuration import conf 

1599 

1600 conf.invalidate_cache() 

1601 

1602 self._trigger_info_set.clear() 

1603 self._notification_info_set.clear() 

1604 self._plugins_set.clear() 

1605 self._cli_command_functions_set.clear() 

1606 self._cli_command_provider_name_set.clear() 

1607 

1608 self._initialized = False 

1609 self._initialization_stack_trace = None