Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/providers_manager.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

642 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Manages all providers.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import functools 

24import inspect 

25import json 

26import logging 

27import traceback 

28import warnings 

29from collections.abc import Callable, MutableMapping 

30from dataclasses import dataclass 

31from functools import wraps 

32from importlib.resources import files as resource_files 

33from time import perf_counter 

34from typing import TYPE_CHECKING, Any, NamedTuple, ParamSpec, TypeVar 

35 

36from packaging.utils import canonicalize_name 

37 

38from airflow._shared.module_loading import import_string 

39from airflow.exceptions import AirflowOptionalProviderFeatureException 

40from airflow.utils.entry_points import entry_points_with_dist 

41from airflow.utils.log.logging_mixin import LoggingMixin 

42from airflow.utils.singleton import Singleton 

43 

44log = logging.getLogger(__name__) 

45 

46 

47PS = ParamSpec("PS") 

48RT = TypeVar("RT") 

49 

50MIN_PROVIDER_VERSIONS = { 

51 "apache-airflow-providers-celery": "2.1.0", 

52} 

53 

54 

55def _ensure_prefix_for_placeholders(field_behaviors: dict[str, Any], conn_type: str): 

56 """ 

57 Verify the correct placeholder prefix. 

58 

59 If the given field_behaviors dict contains a placeholder's node, and there 

60 are placeholders for extra fields (i.e. anything other than the built-in conn 

61 attrs), and if those extra fields are unprefixed, then add the prefix. 

62 

63 The reason we need to do this is, all custom conn fields live in the same dictionary, 

64 so we need to namespace them with a prefix internally. But for user convenience, 

65 and consistency between the `get_ui_field_behaviour` method and the extra dict itself, 

66 we allow users to supply the unprefixed name. 

67 """ 

68 conn_attrs = {"host", "schema", "login", "password", "port", "extra"} 

69 

70 def ensure_prefix(field): 

71 if field not in conn_attrs and not field.startswith("extra__"): 

72 return f"extra__{conn_type}__{field}" 

73 return field 

74 

75 if "placeholders" in field_behaviors: 

76 placeholders = field_behaviors["placeholders"] 

77 field_behaviors["placeholders"] = {ensure_prefix(k): v for k, v in placeholders.items()} 

78 

79 return field_behaviors 

80 

81 

82if TYPE_CHECKING: 

83 from urllib.parse import SplitResult 

84 

85 from airflow.sdk import BaseHook 

86 from airflow.sdk.bases.decorator import TaskDecorator 

87 from airflow.sdk.definitions.asset import Asset 

88 

89 

90class LazyDictWithCache(MutableMapping): 

91 """ 

92 Lazy-loaded cached dictionary. 

93 

94 Dictionary, which in case you set callable, executes the passed callable with `key` attribute 

95 at first use - and returns and caches the result. 

96 """ 

97 

98 __slots__ = ["_resolved", "_raw_dict"] 

99 

100 def __init__(self, *args, **kw): 

101 self._resolved = set() 

102 self._raw_dict = dict(*args, **kw) 

103 

104 def __setitem__(self, key, value): 

105 self._raw_dict.__setitem__(key, value) 

106 

107 def __getitem__(self, key): 

108 value = self._raw_dict.__getitem__(key) 

109 if key not in self._resolved and callable(value): 

110 # exchange callable with result of calling it -- but only once! allow resolver to return a 

111 # callable itself 

112 value = value() 

113 self._resolved.add(key) 

114 self._raw_dict.__setitem__(key, value) 

115 return value 

116 

117 def __delitem__(self, key): 

118 with contextlib.suppress(KeyError): 

119 self._resolved.remove(key) 

120 self._raw_dict.__delitem__(key) 

121 

122 def __iter__(self): 

123 return iter(self._raw_dict) 

124 

125 def __len__(self): 

126 return len(self._raw_dict) 

127 

128 def __contains__(self, key): 

129 return key in self._raw_dict 

130 

131 def clear(self): 

132 self._resolved.clear() 

133 self._raw_dict.clear() 

134 

135 

136def _read_schema_from_resources_or_local_file(filename: str) -> dict: 

137 try: 

138 with resource_files("airflow").joinpath(filename).open("rb") as f: 

139 schema = json.load(f) 

140 except (TypeError, FileNotFoundError): 

141 import pathlib 

142 

143 with (pathlib.Path(__file__).parent / filename).open("rb") as f: 

144 schema = json.load(f) 

145 return schema 

146 

147 

148def _create_provider_info_schema_validator(): 

149 """Create JSON schema validator from the provider_info.schema.json.""" 

150 import jsonschema 

151 

152 schema = _read_schema_from_resources_or_local_file("provider_info.schema.json") 

153 cls = jsonschema.validators.validator_for(schema) 

154 validator = cls(schema) 

155 return validator 

156 

157 

158def _create_customized_form_field_behaviours_schema_validator(): 

159 """Create JSON schema validator from the customized_form_field_behaviours.schema.json.""" 

160 import jsonschema 

161 

162 schema = _read_schema_from_resources_or_local_file("customized_form_field_behaviours.schema.json") 

163 cls = jsonschema.validators.validator_for(schema) 

164 validator = cls(schema) 

165 return validator 

166 

167 

168def _check_builtin_provider_prefix(provider_package: str, class_name: str) -> bool: 

169 if provider_package.startswith("apache-airflow"): 

170 provider_path = provider_package[len("apache-") :].replace("-", ".") 

171 if not class_name.startswith(provider_path): 

172 log.warning( 

173 "Coherence check failed when importing '%s' from '%s' package. It should start with '%s'", 

174 class_name, 

175 provider_package, 

176 provider_path, 

177 ) 

178 return False 

179 return True 

180 

181 

182@dataclass 

183class ProviderInfo: 

184 """ 

185 Provider information. 

186 

187 :param version: version string 

188 :param data: dictionary with information about the provider 

189 """ 

190 

191 version: str 

192 data: dict 

193 

194 

195class HookClassProvider(NamedTuple): 

196 """Hook class and Provider it comes from.""" 

197 

198 hook_class_name: str 

199 package_name: str 

200 

201 

202class DialectInfo(NamedTuple): 

203 """Dialect class and Provider it comes from.""" 

204 

205 name: str 

206 dialect_class_name: str 

207 provider_name: str 

208 

209 

210class TriggerInfo(NamedTuple): 

211 """Trigger class and provider it comes from.""" 

212 

213 trigger_class_name: str 

214 package_name: str 

215 integration_name: str 

216 

217 

218class NotificationInfo(NamedTuple): 

219 """Notification class and provider it comes from.""" 

220 

221 notification_class_name: str 

222 package_name: str 

223 

224 

225class PluginInfo(NamedTuple): 

226 """Plugin class, name and provider it comes from.""" 

227 

228 name: str 

229 plugin_class: str 

230 provider_name: str 

231 

232 

233class HookInfo(NamedTuple): 

234 """Hook information.""" 

235 

236 hook_class_name: str 

237 connection_id_attribute_name: str 

238 package_name: str 

239 hook_name: str 

240 connection_type: str 

241 connection_testable: bool 

242 dialects: list[str] = [] 

243 

244 

245class ConnectionFormWidgetInfo(NamedTuple): 

246 """Connection Form Widget information.""" 

247 

248 hook_class_name: str 

249 package_name: str 

250 field: Any 

251 field_name: str 

252 is_sensitive: bool 

253 

254 

255def log_optional_feature_disabled(class_name, e, provider_package): 

256 """Log optional feature disabled.""" 

257 log.debug( 

258 "Optional feature disabled on exception when importing '%s' from '%s' package", 

259 class_name, 

260 provider_package, 

261 exc_info=e, 

262 ) 

263 log.info( 

264 "Optional provider feature disabled when importing '%s' from '%s' package", 

265 class_name, 

266 provider_package, 

267 ) 

268 

269 

270def log_import_warning(class_name, e, provider_package): 

271 """Log import warning.""" 

272 log.warning( 

273 "Exception when importing '%s' from '%s' package", 

274 class_name, 

275 provider_package, 

276 exc_info=e, 

277 ) 

278 

279 

280# This is a temporary measure until all community providers will add AirflowOptionalProviderFeatureException 

281# where they have optional features. We are going to add tests in our CI to catch all such cases and will 

282# fix them, but until now all "known unhandled optional feature errors" from community providers 

283# should be added here 

284KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS = [("apache-airflow-providers-google", "No module named 'paramiko'")] 

285 

286 

287def _correctness_check(provider_package: str, class_name: str, provider_info: ProviderInfo) -> Any: 

288 """ 

289 Perform coherence check on provider classes. 

290 

291 For apache-airflow providers - it checks if it starts with appropriate package. For all providers 

292 it tries to import the provider - checking that there are no exceptions during importing. 

293 It logs appropriate warning in case it detects any problems. 

294 

295 :param provider_package: name of the provider package 

296 :param class_name: name of the class to import 

297 

298 :return the class if the class is OK, None otherwise. 

299 """ 

300 if not _check_builtin_provider_prefix(provider_package, class_name): 

301 return None 

302 try: 

303 imported_class = import_string(class_name) 

304 except AirflowOptionalProviderFeatureException as e: 

305 # When the provider class raises AirflowOptionalProviderFeatureException 

306 # this is an expected case when only some classes in provider are 

307 # available. We just log debug level here and print info message in logs so that 

308 # the user is aware of it 

309 log_optional_feature_disabled(class_name, e, provider_package) 

310 return None 

311 except ImportError as e: 

312 if "No module named 'airflow.providers." in e.msg: 

313 # handle cases where another provider is missing. This can only happen if 

314 # there is an optional feature, so we log debug and print information about it 

315 log_optional_feature_disabled(class_name, e, provider_package) 

316 return None 

317 for known_error in KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS: 

318 # Until we convert all providers to use AirflowOptionalProviderFeatureException 

319 # we assume any problem with importing another "provider" is because this is an 

320 # optional feature, so we log debug and print information about it 

321 if known_error[0] == provider_package and known_error[1] in e.msg: 

322 log_optional_feature_disabled(class_name, e, provider_package) 

323 return None 

324 # But when we have no idea - we print warning to logs 

325 log_import_warning(class_name, e, provider_package) 

326 return None 

327 except Exception as e: 

328 log_import_warning(class_name, e, provider_package) 

329 return None 

330 return imported_class 

331 

332 

333# We want to have better control over initialization of parameters and be able to debug and test it 

334# So we add our own decorator 

335def provider_info_cache(cache_name: str) -> Callable[[Callable[PS, None]], Callable[PS, None]]: 

336 """ 

337 Decorate and cache provider info. 

338 

339 Decorator factory that create decorator that caches initialization of provider's parameters 

340 :param cache_name: Name of the cache 

341 """ 

342 

343 def provider_info_cache_decorator(func: Callable[PS, None]) -> Callable[PS, None]: 

344 @wraps(func) 

345 def wrapped_function(*args: PS.args, **kwargs: PS.kwargs) -> None: 

346 providers_manager_instance = args[0] 

347 if TYPE_CHECKING: 

348 assert isinstance(providers_manager_instance, ProvidersManager) 

349 

350 if cache_name in providers_manager_instance._initialized_cache: 

351 return 

352 start_time = perf_counter() 

353 log.debug("Initializing Providers Manager[%s]", cache_name) 

354 func(*args, **kwargs) 

355 providers_manager_instance._initialized_cache[cache_name] = True 

356 log.debug( 

357 "Initialization of Providers Manager[%s] took %.2f seconds", 

358 cache_name, 

359 perf_counter() - start_time, 

360 ) 

361 

362 return wrapped_function 

363 

364 return provider_info_cache_decorator 

365 

366 

367class ProvidersManager(LoggingMixin, metaclass=Singleton): 

368 """ 

369 Manages all provider distributions. 

370 

371 This is a Singleton class. The first time it is 

372 instantiated, it discovers all available providers in installed packages. 

373 """ 

374 

375 resource_version = "0" 

376 _initialized: bool = False 

377 _initialization_stack_trace = None 

378 

379 @staticmethod 

380 def initialized() -> bool: 

381 return ProvidersManager._initialized 

382 

383 @staticmethod 

384 def initialization_stack_trace() -> str | None: 

385 return ProvidersManager._initialization_stack_trace 

386 

387 def __init__(self): 

388 """Initialize the manager.""" 

389 super().__init__() 

390 ProvidersManager._initialized = True 

391 ProvidersManager._initialization_stack_trace = "".join(traceback.format_stack(inspect.currentframe())) 

392 self._initialized_cache: dict[str, bool] = {} 

393 # Keeps dict of providers keyed by module name 

394 self._provider_dict: dict[str, ProviderInfo] = {} 

395 self._fs_set: set[str] = set() 

396 self._asset_uri_handlers: dict[str, Callable[[SplitResult], SplitResult]] = {} 

397 self._asset_factories: dict[str, Callable[..., Asset]] = {} 

398 self._asset_to_openlineage_converters: dict[str, Callable] = {} 

399 self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache() 

400 # keeps mapping between connection_types and hook class, package they come from 

401 self._hook_provider_dict: dict[str, HookClassProvider] = {} 

402 self._dialect_provider_dict: dict[str, DialectInfo] = {} 

403 # Keeps dict of hooks keyed by connection type. They are lazy evaluated at access time 

404 self._hooks_lazy_dict: LazyDictWithCache[str, HookInfo | Callable] = LazyDictWithCache() 

405 # Keeps methods that should be used to add custom widgets tuple of keyed by name of the extra field 

406 self._connection_form_widgets: dict[str, ConnectionFormWidgetInfo] = {} 

407 # Customizations for javascript fields are kept here 

408 self._field_behaviours: dict[str, dict] = {} 

409 self._extra_link_class_name_set: set[str] = set() 

410 self._logging_class_name_set: set[str] = set() 

411 self._auth_manager_class_name_set: set[str] = set() 

412 self._secrets_backend_class_name_set: set[str] = set() 

413 self._executor_class_name_set: set[str] = set() 

414 self._queue_class_name_set: set[str] = set() 

415 self._provider_configs: dict[str, dict[str, Any]] = {} 

416 self._trigger_info_set: set[TriggerInfo] = set() 

417 self._notification_info_set: set[NotificationInfo] = set() 

418 self._provider_schema_validator = _create_provider_info_schema_validator() 

419 self._customized_form_fields_schema_validator = ( 

420 _create_customized_form_field_behaviours_schema_validator() 

421 ) 

422 # Set of plugins contained in providers 

423 self._plugins_set: set[PluginInfo] = set() 

424 self._init_airflow_core_hooks() 

425 

426 def _init_airflow_core_hooks(self): 

427 """Initialize the hooks dict with default hooks from Airflow core.""" 

428 core_dummy_hooks = { 

429 "generic": "Generic", 

430 "email": "Email", 

431 } 

432 for key, display in core_dummy_hooks.items(): 

433 self._hooks_lazy_dict[key] = HookInfo( 

434 hook_class_name=None, 

435 connection_id_attribute_name=None, 

436 package_name=None, 

437 hook_name=display, 

438 connection_type=None, 

439 connection_testable=False, 

440 ) 

441 for conn_type, class_name in ( 

442 ("fs", "airflow.providers.standard.hooks.filesystem.FSHook"), 

443 ("package_index", "airflow.providers.standard.hooks.package_index.PackageIndexHook"), 

444 ): 

445 self._hooks_lazy_dict[conn_type] = functools.partial( 

446 self._import_hook, 

447 connection_type=None, 

448 package_name="apache-airflow-providers-standard", 

449 hook_class_name=class_name, 

450 provider_info=None, 

451 ) 

452 

453 @provider_info_cache("list") 

454 def initialize_providers_list(self): 

455 """Lazy initialization of providers list.""" 

456 # Local source folders are loaded first. They should take precedence over the package ones for 

457 # Development purpose. In production provider.yaml files are not present in the 'airflow" directory 

458 # So there is no risk we are going to override package provider accidentally. This can only happen 

459 # in case of local development 

460 self._discover_all_providers_from_packages() 

461 self._verify_all_providers_all_compatible() 

462 self._provider_dict = dict(sorted(self._provider_dict.items())) 

463 

464 def _verify_all_providers_all_compatible(self): 

465 from packaging import version as packaging_version 

466 

467 for provider_id, info in self._provider_dict.items(): 

468 min_version = MIN_PROVIDER_VERSIONS.get(provider_id) 

469 if min_version: 

470 if packaging_version.parse(min_version) > packaging_version.parse(info.version): 

471 log.warning( 

472 "The package %s is not compatible with this version of Airflow. " 

473 "The package has version %s but the minimum supported version " 

474 "of the package is %s", 

475 provider_id, 

476 info.version, 

477 min_version, 

478 ) 

479 

480 @provider_info_cache("hooks") 

481 def initialize_providers_hooks(self): 

482 """Lazy initialization of providers hooks.""" 

483 self._init_airflow_core_hooks() 

484 self.initialize_providers_list() 

485 self._discover_hooks() 

486 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items())) 

487 

488 @provider_info_cache("filesystems") 

489 def initialize_providers_filesystems(self): 

490 """Lazy initialization of providers filesystems.""" 

491 self.initialize_providers_list() 

492 self._discover_filesystems() 

493 

494 @provider_info_cache("asset_uris") 

495 def initialize_providers_asset_uri_resources(self): 

496 """Lazy initialization of provider asset URI handlers, factories, converters etc.""" 

497 self.initialize_providers_list() 

498 self._discover_asset_uri_resources() 

499 

500 @provider_info_cache("hook_lineage_writers") 

501 @provider_info_cache("taskflow_decorators") 

502 def initialize_providers_taskflow_decorator(self): 

503 """Lazy initialization of providers hooks.""" 

504 self.initialize_providers_list() 

505 self._discover_taskflow_decorators() 

506 

507 @provider_info_cache("extra_links") 

508 def initialize_providers_extra_links(self): 

509 """Lazy initialization of providers extra links.""" 

510 self.initialize_providers_list() 

511 self._discover_extra_links() 

512 

513 @provider_info_cache("logging") 

514 def initialize_providers_logging(self): 

515 """Lazy initialization of providers logging information.""" 

516 self.initialize_providers_list() 

517 self._discover_logging() 

518 

519 @provider_info_cache("secrets_backends") 

520 def initialize_providers_secrets_backends(self): 

521 """Lazy initialization of providers secrets_backends information.""" 

522 self.initialize_providers_list() 

523 self._discover_secrets_backends() 

524 

525 @provider_info_cache("executors") 

526 def initialize_providers_executors(self): 

527 """Lazy initialization of providers executors information.""" 

528 self.initialize_providers_list() 

529 self._discover_executors() 

530 

531 @provider_info_cache("queues") 

532 def initialize_providers_queues(self): 

533 """Lazy initialization of providers queue information.""" 

534 self.initialize_providers_list() 

535 self._discover_queues() 

536 

537 @provider_info_cache("notifications") 

538 def initialize_providers_notifications(self): 

539 """Lazy initialization of providers notifications information.""" 

540 self.initialize_providers_list() 

541 self._discover_notifications() 

542 

543 @provider_info_cache("auth_managers") 

544 def initialize_providers_auth_managers(self): 

545 """Lazy initialization of providers notifications information.""" 

546 self.initialize_providers_list() 

547 self._discover_auth_managers() 

548 

549 @provider_info_cache("config") 

550 def initialize_providers_configuration(self): 

551 """Lazy initialization of providers configuration information.""" 

552 self._initialize_providers_configuration() 

553 

554 def _initialize_providers_configuration(self): 

555 """ 

556 Initialize providers configuration information. 

557 

558 Should be used if we do not want to trigger caching for ``initialize_providers_configuration`` method. 

559 In some cases we might want to make sure that the configuration is initialized, but we do not want 

560 to cache the initialization method - for example when we just want to write configuration with 

561 providers, but it is used in the context where no providers are loaded yet we will eventually 

562 restore the original configuration and we want the subsequent ``initialize_providers_configuration`` 

563 method to be run in order to load the configuration for providers again. 

564 """ 

565 self.initialize_providers_list() 

566 self._discover_config() 

567 # Now update conf with the new provider configuration from providers 

568 from airflow.configuration import conf 

569 

570 conf.load_providers_configuration() 

571 

572 @provider_info_cache("plugins") 

573 def initialize_providers_plugins(self): 

574 self.initialize_providers_list() 

575 self._discover_plugins() 

576 

577 def _discover_all_providers_from_packages(self) -> None: 

578 """ 

579 Discover all providers by scanning packages installed. 

580 

581 The list of providers should be returned via the 'apache_airflow_provider' 

582 entrypoint as a dictionary conforming to the 'airflow/provider_info.schema.json' 

583 schema. Note that the schema is different at runtime than provider.yaml.schema.json. 

584 The development version of provider schema is more strict and changes together with 

585 the code. The runtime version is more relaxed (allows for additional properties) 

586 and verifies only the subset of fields that are needed at runtime. 

587 """ 

588 for entry_point, dist in entry_points_with_dist("apache_airflow_provider"): 

589 if not dist.metadata: 

590 continue 

591 package_name = canonicalize_name(dist.metadata["name"]) 

592 if package_name in self._provider_dict: 

593 continue 

594 log.debug("Loading %s from package %s", entry_point, package_name) 

595 version = dist.version 

596 provider_info = entry_point.load()() 

597 self._provider_schema_validator.validate(provider_info) 

598 provider_info_package_name = provider_info["package-name"] 

599 if package_name != provider_info_package_name: 

600 raise ValueError( 

601 f"The package '{package_name}' from packaging information " 

602 f"{provider_info_package_name} do not match. Please make sure they are aligned" 

603 ) 

604 if package_name not in self._provider_dict: 

605 self._provider_dict[package_name] = ProviderInfo(version, provider_info) 

606 else: 

607 log.warning( 

608 "The provider for package '%s' could not be registered from because providers for that " 

609 "package name have already been registered", 

610 package_name, 

611 ) 

612 

613 def _discover_hooks_from_connection_types( 

614 self, 

615 hook_class_names_registered: set[str], 

616 already_registered_warning_connection_types: set[str], 

617 package_name: str, 

618 provider: ProviderInfo, 

619 ): 

620 """ 

621 Discover hooks from the "connection-types" property. 

622 

623 This is new, better method that replaces discovery from hook-class-names as it 

624 allows to lazy import individual Hook classes when they are accessed. 

625 The "connection-types" keeps information about both - connection type and class 

626 name so we can discover all connection-types without importing the classes. 

627 :param hook_class_names_registered: set of registered hook class names for this provider 

628 :param already_registered_warning_connection_types: set of connections for which warning should be 

629 printed in logs as they were already registered before 

630 :param package_name: 

631 :param provider: 

632 :return: 

633 """ 

634 provider_uses_connection_types = False 

635 connection_types = provider.data.get("connection-types") 

636 if connection_types: 

637 for connection_type_dict in connection_types: 

638 connection_type = connection_type_dict["connection-type"] 

639 hook_class_name = connection_type_dict["hook-class-name"] 

640 hook_class_names_registered.add(hook_class_name) 

641 already_registered = self._hook_provider_dict.get(connection_type) 

642 if already_registered: 

643 if already_registered.package_name != package_name: 

644 already_registered_warning_connection_types.add(connection_type) 

645 else: 

646 log.warning( 

647 "The connection type '%s' is already registered in the" 

648 " package '%s' with different class names: '%s' and '%s'. ", 

649 connection_type, 

650 package_name, 

651 already_registered.hook_class_name, 

652 hook_class_name, 

653 ) 

654 else: 

655 self._hook_provider_dict[connection_type] = HookClassProvider( 

656 hook_class_name=hook_class_name, package_name=package_name 

657 ) 

658 # Defer importing hook to access time by setting import hook method as dict value 

659 self._hooks_lazy_dict[connection_type] = functools.partial( 

660 self._import_hook, 

661 connection_type=connection_type, 

662 provider_info=provider, 

663 ) 

664 provider_uses_connection_types = True 

665 return provider_uses_connection_types 

666 

667 def _discover_hooks_from_hook_class_names( 

668 self, 

669 hook_class_names_registered: set[str], 

670 already_registered_warning_connection_types: set[str], 

671 package_name: str, 

672 provider: ProviderInfo, 

673 provider_uses_connection_types: bool, 

674 ): 

675 """ 

676 Discover hooks from "hook-class-names' property. 

677 

678 This property is deprecated but we should support it in Airflow 2. 

679 The hook-class-names array contained just Hook names without connection type, 

680 therefore we need to import all those classes immediately to know which connection types 

681 are supported. This makes it impossible to selectively only import those hooks that are used. 

682 :param already_registered_warning_connection_types: list of connection hooks that we should warn 

683 about when finished discovery 

684 :param package_name: name of the provider package 

685 :param provider: class that keeps information about version and details of the provider 

686 :param provider_uses_connection_types: determines whether the provider uses "connection-types" new 

687 form of passing connection types 

688 :return: 

689 """ 

690 hook_class_names = provider.data.get("hook-class-names") 

691 if hook_class_names: 

692 for hook_class_name in hook_class_names: 

693 if hook_class_name in hook_class_names_registered: 

694 # Silently ignore the hook class - it's already marked for lazy-import by 

695 # connection-types discovery 

696 continue 

697 hook_info = self._import_hook( 

698 connection_type=None, 

699 provider_info=provider, 

700 hook_class_name=hook_class_name, 

701 package_name=package_name, 

702 ) 

703 if not hook_info: 

704 # Problem why importing class - we ignore it. Log is written at import time 

705 continue 

706 already_registered = self._hook_provider_dict.get(hook_info.connection_type) 

707 if already_registered: 

708 if already_registered.package_name != package_name: 

709 already_registered_warning_connection_types.add(hook_info.connection_type) 

710 else: 

711 if already_registered.hook_class_name != hook_class_name: 

712 log.warning( 

713 "The hook connection type '%s' is registered twice in the" 

714 " package '%s' with different class names: '%s' and '%s'. " 

715 " Please fix it!", 

716 hook_info.connection_type, 

717 package_name, 

718 already_registered.hook_class_name, 

719 hook_class_name, 

720 ) 

721 else: 

722 self._hook_provider_dict[hook_info.connection_type] = HookClassProvider( 

723 hook_class_name=hook_class_name, package_name=package_name 

724 ) 

725 self._hooks_lazy_dict[hook_info.connection_type] = hook_info 

726 

727 if not provider_uses_connection_types: 

728 warnings.warn( 

729 f"The provider {package_name} uses `hook-class-names` " 

730 "property in provider-info and has no `connection-types` one. " 

731 "The 'hook-class-names' property has been deprecated in favour " 

732 "of 'connection-types' in Airflow 2.2. Use **both** in case you want to " 

733 "have backwards compatibility with Airflow < 2.2", 

734 DeprecationWarning, 

735 stacklevel=1, 

736 ) 

737 for already_registered_connection_type in already_registered_warning_connection_types: 

738 log.warning( 

739 "The connection_type '%s' has been already registered by provider '%s.'", 

740 already_registered_connection_type, 

741 self._hook_provider_dict[already_registered_connection_type].package_name, 

742 ) 

743 

744 def _discover_hooks(self) -> None: 

745 """Retrieve all connections defined in the providers via Hooks.""" 

746 for package_name, provider in self._provider_dict.items(): 

747 duplicated_connection_types: set[str] = set() 

748 hook_class_names_registered: set[str] = set() 

749 self._discover_provider_dialects(package_name, provider) 

750 provider_uses_connection_types = self._discover_hooks_from_connection_types( 

751 hook_class_names_registered, duplicated_connection_types, package_name, provider 

752 ) 

753 self._discover_hooks_from_hook_class_names( 

754 hook_class_names_registered, 

755 duplicated_connection_types, 

756 package_name, 

757 provider, 

758 provider_uses_connection_types, 

759 ) 

760 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items())) 

761 

762 def _discover_provider_dialects(self, provider_name: str, provider: ProviderInfo): 

763 dialects = provider.data.get("dialects", []) 

764 if dialects: 

765 self._dialect_provider_dict.update( 

766 { 

767 item["dialect-type"]: DialectInfo( 

768 name=item["dialect-type"], 

769 dialect_class_name=item["dialect-class-name"], 

770 provider_name=provider_name, 

771 ) 

772 for item in dialects 

773 } 

774 ) 

775 

776 @provider_info_cache("import_all_hooks") 

777 def _import_info_from_all_hooks(self): 

778 """Force-import all hooks and initialize the connections/fields.""" 

779 # Retrieve all hooks to make sure that all of them are imported 

780 _ = list(self._hooks_lazy_dict.values()) 

781 self._field_behaviours = dict(sorted(self._field_behaviours.items())) 

782 

783 # Widgets for connection forms are currently used in two places: 

784 # 1. In the UI Connections, expected same order that it defined in Hook. 

785 # 2. cli command - `airflow providers widgets` and expected that it in alphabetical order. 

786 # It is not possible to recover original ordering after sorting, 

787 # that the main reason why original sorting moved to cli part: 

788 # self._connection_form_widgets = dict(sorted(self._connection_form_widgets.items())) 

789 

790 def _discover_filesystems(self) -> None: 

791 """Retrieve all filesystems defined in the providers.""" 

792 for provider_package, provider in self._provider_dict.items(): 

793 for fs_module_name in provider.data.get("filesystems", []): 

794 if _correctness_check(provider_package, f"{fs_module_name}.get_fs", provider): 

795 self._fs_set.add(fs_module_name) 

796 self._fs_set = set(sorted(self._fs_set)) 

797 

798 def _discover_asset_uri_resources(self) -> None: 

799 """Discovers and registers asset URI handlers, factories, and converters for all providers.""" 

800 from airflow.sdk.definitions.asset import normalize_noop 

801 

802 def _safe_register_resource( 

803 provider_package_name: str, 

804 schemes_list: list[str], 

805 resource_path: str | None, 

806 resource_registry: dict, 

807 default_resource: Any = None, 

808 ): 

809 """ 

810 Register a specific resource (handler, factory, or converter) for the given schemes. 

811 

812 If the resolved resource (either from the path or the default) is valid, it updates 

813 the resource registry with the appropriate resource for each scheme. 

814 """ 

815 resource = ( 

816 _correctness_check(provider_package_name, resource_path, provider) 

817 if resource_path is not None 

818 else default_resource 

819 ) 

820 if resource: 

821 resource_registry.update((scheme, resource) for scheme in schemes_list) 

822 

823 for provider_name, provider in self._provider_dict.items(): 

824 for uri_info in provider.data.get("asset-uris", []): 

825 if "schemes" not in uri_info or "handler" not in uri_info: 

826 continue # Both schemas and handler must be explicitly set, handler can be set to null 

827 common_args = {"schemes_list": uri_info["schemes"], "provider_package_name": provider_name} 

828 _safe_register_resource( 

829 resource_path=uri_info["handler"], 

830 resource_registry=self._asset_uri_handlers, 

831 default_resource=normalize_noop, 

832 **common_args, 

833 ) 

834 _safe_register_resource( 

835 resource_path=uri_info.get("factory"), 

836 resource_registry=self._asset_factories, 

837 **common_args, 

838 ) 

839 _safe_register_resource( 

840 resource_path=uri_info.get("to_openlineage_converter"), 

841 resource_registry=self._asset_to_openlineage_converters, 

842 **common_args, 

843 ) 

844 

845 def _discover_taskflow_decorators(self) -> None: 

846 for name, info in self._provider_dict.items(): 

847 for taskflow_decorator in info.data.get("task-decorators", []): 

848 self._add_taskflow_decorator( 

849 taskflow_decorator["name"], taskflow_decorator["class-name"], name 

850 ) 

851 

852 def _add_taskflow_decorator(self, name, decorator_class_name: str, provider_package: str) -> None: 

853 if not _check_builtin_provider_prefix(provider_package, decorator_class_name): 

854 return 

855 

856 if name in self._taskflow_decorators: 

857 try: 

858 existing = self._taskflow_decorators[name] 

859 other_name = f"{existing.__module__}.{existing.__name__}" 

860 except Exception: 

861 # If problem importing, then get the value from the functools.partial 

862 other_name = self._taskflow_decorators._raw_dict[name].args[0] # type: ignore[attr-defined] 

863 

864 log.warning( 

865 "The taskflow decorator '%s' has been already registered (by %s).", 

866 name, 

867 other_name, 

868 ) 

869 return 

870 

871 self._taskflow_decorators[name] = functools.partial(import_string, decorator_class_name) 

872 

873 @staticmethod 

874 def _get_attr(obj: Any, attr_name: str): 

875 """Retrieve attributes of an object, or warn if not found.""" 

876 if not hasattr(obj, attr_name): 

877 log.warning("The object '%s' is missing %s attribute and cannot be registered", obj, attr_name) 

878 return None 

879 return getattr(obj, attr_name) 

880 

881 def _import_hook( 

882 self, 

883 connection_type: str | None, 

884 provider_info: ProviderInfo, 

885 hook_class_name: str | None = None, 

886 package_name: str | None = None, 

887 ) -> HookInfo | None: 

888 """ 

889 Import hook and retrieve hook information. 

890 

891 Either connection_type (for lazy loading) or hook_class_name must be set - but not both). 

892 Only needs package_name if hook_class_name is passed (for lazy loading, package_name 

893 is retrieved from _connection_type_class_provider_dict together with hook_class_name). 

894 

895 :param connection_type: type of the connection 

896 :param hook_class_name: name of the hook class 

897 :param package_name: provider package - only needed in case connection_type is missing 

898 : return 

899 """ 

900 if connection_type is None and hook_class_name is None: 

901 raise ValueError("Either connection_type or hook_class_name must be set") 

902 if connection_type is not None and hook_class_name is not None: 

903 raise ValueError( 

904 f"Both connection_type ({connection_type} and " 

905 f"hook_class_name {hook_class_name} are set. Only one should be set!" 

906 ) 

907 if connection_type is not None: 

908 class_provider = self._hook_provider_dict[connection_type] 

909 package_name = class_provider.package_name 

910 hook_class_name = class_provider.hook_class_name 

911 else: 

912 if not hook_class_name: 

913 raise ValueError("Either connection_type or hook_class_name must be set") 

914 if not package_name: 

915 raise ValueError( 

916 f"Provider package name is not set when hook_class_name ({hook_class_name}) is used" 

917 ) 

918 hook_class: type[BaseHook] | None = _correctness_check(package_name, hook_class_name, provider_info) 

919 if hook_class is None: 

920 return None 

921 try: 

922 from wtforms import BooleanField, IntegerField, PasswordField, StringField 

923 

924 allowed_field_classes = [IntegerField, PasswordField, StringField, BooleanField] 

925 module, class_name = hook_class_name.rsplit(".", maxsplit=1) 

926 # Do not use attr here. We want to check only direct class fields not those 

927 # inherited from parent hook. This way we add form fields only once for the whole 

928 # hierarchy and we add it only from the parent hook that provides those! 

929 if "get_connection_form_widgets" in hook_class.__dict__: 

930 widgets = hook_class.get_connection_form_widgets() 

931 if widgets: 

932 for widget in widgets.values(): 

933 if widget.field_class not in allowed_field_classes: 

934 log.warning( 

935 "The hook_class '%s' uses field of unsupported class '%s'. " 

936 "Only '%s' field classes are supported", 

937 hook_class_name, 

938 widget.field_class, 

939 allowed_field_classes, 

940 ) 

941 return None 

942 self._add_widgets(package_name, hook_class, widgets) 

943 if "get_ui_field_behaviour" in hook_class.__dict__: 

944 field_behaviours = hook_class.get_ui_field_behaviour() 

945 if field_behaviours: 

946 self._add_customized_fields(package_name, hook_class, field_behaviours) 

947 except ImportError as e: 

948 if e.name in ["flask_appbuilder", "wtforms"]: 

949 log.info( 

950 "The hook_class '%s' is not fully initialized (UI widgets will be missing), because " 

951 "the 'flask_appbuilder' package is not installed, however it is not required for " 

952 "Airflow components to work", 

953 hook_class_name, 

954 ) 

955 except Exception as e: 

956 log.warning( 

957 "Exception when importing '%s' from '%s' package: %s", 

958 hook_class_name, 

959 package_name, 

960 e, 

961 ) 

962 return None 

963 hook_connection_type = self._get_attr(hook_class, "conn_type") 

964 if connection_type: 

965 if hook_connection_type != connection_type: 

966 log.warning( 

967 "Inconsistency! The hook class '%s' declares connection type '%s'" 

968 " but it is added by provider '%s' as connection_type '%s' in provider info. " 

969 "This should be fixed!", 

970 hook_class, 

971 hook_connection_type, 

972 package_name, 

973 connection_type, 

974 ) 

975 connection_type = hook_connection_type 

976 connection_id_attribute_name: str = self._get_attr(hook_class, "conn_name_attr") 

977 hook_name: str = self._get_attr(hook_class, "hook_name") 

978 

979 if not connection_type or not connection_id_attribute_name or not hook_name: 

980 log.warning( 

981 "The hook misses one of the key attributes: " 

982 "conn_type: %s, conn_id_attribute_name: %s, hook_name: %s", 

983 connection_type, 

984 connection_id_attribute_name, 

985 hook_name, 

986 ) 

987 return None 

988 

989 return HookInfo( 

990 hook_class_name=hook_class_name, 

991 connection_id_attribute_name=connection_id_attribute_name, 

992 package_name=package_name, 

993 hook_name=hook_name, 

994 connection_type=connection_type, 

995 connection_testable=hasattr(hook_class, "test_connection"), 

996 ) 

997 

998 def _add_widgets(self, package_name: str, hook_class: type, widgets: dict[str, Any]): 

999 conn_type = hook_class.conn_type # type: ignore 

1000 for field_identifier, field in widgets.items(): 

1001 if field_identifier.startswith("extra__"): 

1002 prefixed_field_name = field_identifier 

1003 else: 

1004 prefixed_field_name = f"extra__{conn_type}__{field_identifier}" 

1005 if prefixed_field_name in self._connection_form_widgets: 

1006 log.warning( 

1007 "The field %s from class %s has already been added by another provider. Ignoring it.", 

1008 field_identifier, 

1009 hook_class.__name__, 

1010 ) 

1011 # In case of inherited hooks this might be happening several times 

1012 else: 

1013 self._connection_form_widgets[prefixed_field_name] = ConnectionFormWidgetInfo( 

1014 hook_class.__name__, 

1015 package_name, 

1016 field, 

1017 field_identifier, 

1018 hasattr(field.field_class.widget, "input_type") 

1019 and field.field_class.widget.input_type == "password", 

1020 ) 

1021 

1022 def _add_customized_fields(self, package_name: str, hook_class: type, customized_fields: dict): 

1023 try: 

1024 connection_type = getattr(hook_class, "conn_type") 

1025 

1026 self._customized_form_fields_schema_validator.validate(customized_fields) 

1027 

1028 if connection_type: 

1029 customized_fields = _ensure_prefix_for_placeholders(customized_fields, connection_type) 

1030 

1031 if connection_type in self._field_behaviours: 

1032 log.warning( 

1033 "The connection_type %s from package %s and class %s has already been added " 

1034 "by another provider. Ignoring it.", 

1035 connection_type, 

1036 package_name, 

1037 hook_class.__name__, 

1038 ) 

1039 return 

1040 self._field_behaviours[connection_type] = customized_fields 

1041 except Exception as e: 

1042 log.warning( 

1043 "Error when loading customized fields from package '%s' hook class '%s': %s", 

1044 package_name, 

1045 hook_class.__name__, 

1046 e, 

1047 ) 

1048 

1049 def _discover_auth_managers(self) -> None: 

1050 """Retrieve all auth managers defined in the providers.""" 

1051 for provider_package, provider in self._provider_dict.items(): 

1052 if provider.data.get("auth-managers"): 

1053 for auth_manager_class_name in provider.data["auth-managers"]: 

1054 if _correctness_check(provider_package, auth_manager_class_name, provider): 

1055 self._auth_manager_class_name_set.add(auth_manager_class_name) 

1056 

1057 def _discover_notifications(self) -> None: 

1058 """Retrieve all notifications defined in the providers.""" 

1059 for provider_package, provider in self._provider_dict.items(): 

1060 if provider.data.get("notifications"): 

1061 for notification_class_name in provider.data["notifications"]: 

1062 if _correctness_check(provider_package, notification_class_name, provider): 

1063 self._notification_info_set.add(notification_class_name) 

1064 

1065 def _discover_extra_links(self) -> None: 

1066 """Retrieve all extra links defined in the providers.""" 

1067 for provider_package, provider in self._provider_dict.items(): 

1068 if provider.data.get("extra-links"): 

1069 for extra_link_class_name in provider.data["extra-links"]: 

1070 if _correctness_check(provider_package, extra_link_class_name, provider): 

1071 self._extra_link_class_name_set.add(extra_link_class_name) 

1072 

1073 def _discover_logging(self) -> None: 

1074 """Retrieve all logging defined in the providers.""" 

1075 for provider_package, provider in self._provider_dict.items(): 

1076 if provider.data.get("logging"): 

1077 for logging_class_name in provider.data["logging"]: 

1078 if _correctness_check(provider_package, logging_class_name, provider): 

1079 self._logging_class_name_set.add(logging_class_name) 

1080 

1081 def _discover_secrets_backends(self) -> None: 

1082 """Retrieve all secrets backends defined in the providers.""" 

1083 for provider_package, provider in self._provider_dict.items(): 

1084 if provider.data.get("secrets-backends"): 

1085 for secrets_backends_class_name in provider.data["secrets-backends"]: 

1086 if _correctness_check(provider_package, secrets_backends_class_name, provider): 

1087 self._secrets_backend_class_name_set.add(secrets_backends_class_name) 

1088 

1089 def _discover_executors(self) -> None: 

1090 """Retrieve all executors defined in the providers.""" 

1091 for provider_package, provider in self._provider_dict.items(): 

1092 if provider.data.get("executors"): 

1093 for executors_class_name in provider.data["executors"]: 

1094 if _correctness_check(provider_package, executors_class_name, provider): 

1095 self._executor_class_name_set.add(executors_class_name) 

1096 

1097 def _discover_queues(self) -> None: 

1098 """Retrieve all queues defined in the providers.""" 

1099 for provider_package, provider in self._provider_dict.items(): 

1100 if provider.data.get("queues"): 

1101 for queue_class_name in provider.data["queues"]: 

1102 if _correctness_check(provider_package, queue_class_name, provider): 

1103 self._queue_class_name_set.add(queue_class_name) 

1104 

1105 def _discover_config(self) -> None: 

1106 """Retrieve all configs defined in the providers.""" 

1107 for provider_package, provider in self._provider_dict.items(): 

1108 if provider.data.get("config"): 

1109 self._provider_configs[provider_package] = provider.data.get("config") # type: ignore[assignment] 

1110 

1111 def _discover_plugins(self) -> None: 

1112 """Retrieve all plugins defined in the providers.""" 

1113 for provider_package, provider in self._provider_dict.items(): 

1114 for plugin_dict in provider.data.get("plugins", ()): 

1115 if not _correctness_check(provider_package, plugin_dict["plugin-class"], provider): 

1116 log.warning("Plugin not loaded due to above correctness check problem.") 

1117 continue 

1118 self._plugins_set.add( 

1119 PluginInfo( 

1120 name=plugin_dict["name"], 

1121 plugin_class=plugin_dict["plugin-class"], 

1122 provider_name=provider_package, 

1123 ) 

1124 ) 

1125 

1126 @provider_info_cache("triggers") 

1127 def initialize_providers_triggers(self): 

1128 """Initialize providers triggers.""" 

1129 self.initialize_providers_list() 

1130 for provider_package, provider in self._provider_dict.items(): 

1131 for trigger in provider.data.get("triggers", []): 

1132 for trigger_class_name in trigger.get("python-modules"): 

1133 self._trigger_info_set.add( 

1134 TriggerInfo( 

1135 package_name=provider_package, 

1136 trigger_class_name=trigger_class_name, 

1137 integration_name=trigger.get("integration-name", ""), 

1138 ) 

1139 ) 

1140 

1141 @property 

1142 def auth_managers(self) -> list[str]: 

1143 """Returns information about available providers notifications class.""" 

1144 self.initialize_providers_auth_managers() 

1145 return sorted(self._auth_manager_class_name_set) 

1146 

1147 @property 

1148 def notification(self) -> list[NotificationInfo]: 

1149 """Returns information about available providers notifications class.""" 

1150 self.initialize_providers_notifications() 

1151 return sorted(self._notification_info_set) 

1152 

1153 @property 

1154 def trigger(self) -> list[TriggerInfo]: 

1155 """Returns information about available providers trigger class.""" 

1156 self.initialize_providers_triggers() 

1157 return sorted(self._trigger_info_set, key=lambda x: x.package_name) 

1158 

1159 @property 

1160 def providers(self) -> dict[str, ProviderInfo]: 

1161 """Returns information about available providers.""" 

1162 self.initialize_providers_list() 

1163 return self._provider_dict 

1164 

1165 @property 

1166 def hooks(self) -> MutableMapping[str, HookInfo | None]: 

1167 """ 

1168 Return dictionary of connection_type-to-hook mapping. 

1169 

1170 Note that the dict can contain None values if a hook discovered cannot be imported! 

1171 """ 

1172 self.initialize_providers_hooks() 

1173 # When we return hooks here it will only be used to retrieve hook information 

1174 return self._hooks_lazy_dict 

1175 

1176 @property 

1177 def dialects(self) -> MutableMapping[str, DialectInfo]: 

1178 """Return dictionary of connection_type-to-dialect mapping.""" 

1179 self.initialize_providers_hooks() 

1180 # When we return dialects here it will only be used to retrieve dialect information 

1181 return self._dialect_provider_dict 

1182 

1183 @property 

1184 def plugins(self) -> list[PluginInfo]: 

1185 """Returns information about plugins available in providers.""" 

1186 self.initialize_providers_plugins() 

1187 return sorted(self._plugins_set, key=lambda x: x.plugin_class) 

1188 

1189 @property 

1190 def taskflow_decorators(self) -> dict[str, TaskDecorator]: 

1191 self.initialize_providers_taskflow_decorator() 

1192 return self._taskflow_decorators # type: ignore[return-value] 

1193 

1194 @property 

1195 def extra_links_class_names(self) -> list[str]: 

1196 """Returns set of extra link class names.""" 

1197 self.initialize_providers_extra_links() 

1198 return sorted(self._extra_link_class_name_set) 

1199 

1200 @property 

1201 def connection_form_widgets(self) -> dict[str, ConnectionFormWidgetInfo]: 

1202 """ 

1203 Returns widgets for connection forms. 

1204 

1205 Dictionary keys in the same order that it defined in Hook. 

1206 """ 

1207 self.initialize_providers_hooks() 

1208 self._import_info_from_all_hooks() 

1209 return self._connection_form_widgets 

1210 

1211 @property 

1212 def field_behaviours(self) -> dict[str, dict]: 

1213 """Returns dictionary with field behaviours for connection types.""" 

1214 self.initialize_providers_hooks() 

1215 self._import_info_from_all_hooks() 

1216 return self._field_behaviours 

1217 

1218 @property 

1219 def logging_class_names(self) -> list[str]: 

1220 """Returns set of log task handlers class names.""" 

1221 self.initialize_providers_logging() 

1222 return sorted(self._logging_class_name_set) 

1223 

1224 @property 

1225 def secrets_backend_class_names(self) -> list[str]: 

1226 """Returns set of secret backend class names.""" 

1227 self.initialize_providers_secrets_backends() 

1228 return sorted(self._secrets_backend_class_name_set) 

1229 

1230 @property 

1231 def executor_class_names(self) -> list[str]: 

1232 self.initialize_providers_executors() 

1233 return sorted(self._executor_class_name_set) 

1234 

1235 @property 

1236 def queue_class_names(self) -> list[str]: 

1237 self.initialize_providers_queues() 

1238 return sorted(self._queue_class_name_set) 

1239 

1240 @property 

1241 def filesystem_module_names(self) -> list[str]: 

1242 self.initialize_providers_filesystems() 

1243 return sorted(self._fs_set) 

1244 

1245 @property 

1246 def asset_factories(self) -> dict[str, Callable[..., Asset]]: 

1247 self.initialize_providers_asset_uri_resources() 

1248 return self._asset_factories 

1249 

1250 @property 

1251 def asset_uri_handlers(self) -> dict[str, Callable[[SplitResult], SplitResult]]: 

1252 self.initialize_providers_asset_uri_resources() 

1253 return self._asset_uri_handlers 

1254 

1255 @property 

1256 def asset_to_openlineage_converters( 

1257 self, 

1258 ) -> dict[str, Callable]: 

1259 self.initialize_providers_asset_uri_resources() 

1260 return self._asset_to_openlineage_converters 

1261 

1262 @property 

1263 def provider_configs(self) -> list[tuple[str, dict[str, Any]]]: 

1264 self.initialize_providers_configuration() 

1265 return sorted(self._provider_configs.items(), key=lambda x: x[0]) 

1266 

1267 @property 

1268 def already_initialized_provider_configs(self) -> list[tuple[str, dict[str, Any]]]: 

1269 return sorted(self._provider_configs.items(), key=lambda x: x[0]) 

1270 

1271 def _cleanup(self): 

1272 self._initialized_cache.clear() 

1273 self._provider_dict.clear() 

1274 self._fs_set.clear() 

1275 self._taskflow_decorators.clear() 

1276 self._hook_provider_dict.clear() 

1277 self._dialect_provider_dict.clear() 

1278 self._hooks_lazy_dict.clear() 

1279 self._connection_form_widgets.clear() 

1280 self._field_behaviours.clear() 

1281 self._extra_link_class_name_set.clear() 

1282 self._logging_class_name_set.clear() 

1283 self._auth_manager_class_name_set.clear() 

1284 self._secrets_backend_class_name_set.clear() 

1285 self._executor_class_name_set.clear() 

1286 self._queue_class_name_set.clear() 

1287 self._provider_configs.clear() 

1288 self._trigger_info_set.clear() 

1289 self._notification_info_set.clear() 

1290 self._plugins_set.clear() 

1291 

1292 self._initialized = False 

1293 self._initialization_stack_trace = None