1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Manages all providers."""
19
20from __future__ import annotations
21
22import contextlib
23import functools
24import inspect
25import json
26import logging
27import traceback
28import warnings
29from collections.abc import Callable, Iterator, MutableMapping
30from dataclasses import dataclass
31from functools import wraps
32from importlib.resources import files as resource_files
33from time import perf_counter
34from typing import TYPE_CHECKING, Any, NamedTuple, ParamSpec, TypeVar, cast
35
36from airflow import DeprecatedImportWarning
37from airflow._shared.module_loading import import_string
38from airflow._shared.providers_discovery import discover_all_providers_from_packages
39from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning
40from airflow.utils.log.logging_mixin import LoggingMixin
41
42if TYPE_CHECKING:
43 from airflow.cli.cli_config import CLICommand
44
45log = logging.getLogger(__name__)
46
47
48PS = ParamSpec("PS")
49RT = TypeVar("RT")
50
51MIN_PROVIDER_VERSIONS = {
52 "apache-airflow-providers-celery": "2.1.0",
53}
54
55
56def _ensure_prefix_for_placeholders(field_behaviors: dict[str, Any], conn_type: str):
57 """
58 Verify the correct placeholder prefix.
59
60 If the given field_behaviors dict contains a placeholder's node, and there
61 are placeholders for extra fields (i.e. anything other than the built-in conn
62 attrs), and if those extra fields are unprefixed, then add the prefix.
63
64 The reason we need to do this is, all custom conn fields live in the same dictionary,
65 so we need to namespace them with a prefix internally. But for user convenience,
66 and consistency between the `get_ui_field_behaviour` method and the extra dict itself,
67 we allow users to supply the unprefixed name.
68 """
69 conn_attrs = {"host", "schema", "login", "password", "port", "extra"}
70
71 def ensure_prefix(field):
72 if field not in conn_attrs and not field.startswith("extra__"):
73 return f"extra__{conn_type}__{field}"
74 return field
75
76 if "placeholders" in field_behaviors:
77 placeholders = field_behaviors["placeholders"]
78 field_behaviors["placeholders"] = {ensure_prefix(k): v for k, v in placeholders.items()}
79
80 return field_behaviors
81
82
83if TYPE_CHECKING:
84 from urllib.parse import SplitResult
85
86 from airflow.sdk import BaseHook
87 from airflow.sdk.bases.decorator import TaskDecorator
88 from airflow.sdk.definitions.asset import Asset
89
90
91class LazyDictWithCache(MutableMapping):
92 """
93 Lazy-loaded cached dictionary.
94
95 Dictionary, which in case you set callable, executes the passed callable with `key` attribute
96 at first use - and returns and caches the result.
97 """
98
99 __slots__ = ["_resolved", "_raw_dict"]
100
101 def __init__(self, *args, **kw):
102 self._resolved = set()
103 self._raw_dict = dict(*args, **kw)
104
105 def __setitem__(self, key, value):
106 self._raw_dict.__setitem__(key, value)
107
108 def __getitem__(self, key):
109 value = self._raw_dict.__getitem__(key)
110 if key not in self._resolved and callable(value):
111 # exchange callable with result of calling it -- but only once! allow resolver to return a
112 # callable itself
113 value = value()
114 self._resolved.add(key)
115 self._raw_dict.__setitem__(key, value)
116 return value
117
118 def __delitem__(self, key):
119 with contextlib.suppress(KeyError):
120 self._resolved.remove(key)
121 self._raw_dict.__delitem__(key)
122
123 def __iter__(self):
124 return iter(self._raw_dict)
125
126 def __len__(self):
127 return len(self._raw_dict)
128
129 def __contains__(self, key):
130 return key in self._raw_dict
131
132 def clear(self):
133 self._resolved.clear()
134 self._raw_dict.clear()
135
136
137def _read_schema_from_resources_or_local_file(filename: str) -> dict:
138 try:
139 with resource_files("airflow").joinpath(filename).open("rb") as f:
140 schema = json.load(f)
141 except (TypeError, FileNotFoundError):
142 import pathlib
143
144 with (pathlib.Path(__file__).parent / filename).open("rb") as f:
145 schema = json.load(f)
146 return schema
147
148
149def _create_provider_info_schema_validator():
150 """Create JSON schema validator from the provider_info.schema.json."""
151 import jsonschema
152
153 schema = _read_schema_from_resources_or_local_file("provider_info.schema.json")
154 cls = jsonschema.validators.validator_for(schema)
155 validator = cls(schema)
156 return validator
157
158
159def _create_customized_form_field_behaviours_schema_validator():
160 """Create JSON schema validator from the customized_form_field_behaviours.schema.json."""
161 import jsonschema
162
163 schema = _read_schema_from_resources_or_local_file("customized_form_field_behaviours.schema.json")
164 cls = jsonschema.validators.validator_for(schema)
165 validator = cls(schema)
166 return validator
167
168
169def _check_builtin_provider_prefix(provider_package: str, class_name: str) -> bool:
170 if provider_package.startswith("apache-airflow"):
171 provider_path = provider_package[len("apache-") :].replace("-", ".")
172 if not class_name.startswith(provider_path):
173 log.warning(
174 "Coherence check failed when importing '%s' from '%s' package. It should start with '%s'",
175 class_name,
176 provider_package,
177 provider_path,
178 )
179 return False
180 return True
181
182
183@dataclass
184class ProviderInfo:
185 """
186 Provider information.
187
188 :param version: version string
189 :param data: dictionary with information about the provider
190 """
191
192 version: str
193 data: dict
194
195
196class HookClassProvider(NamedTuple):
197 """Hook class and Provider it comes from."""
198
199 hook_class_name: str
200 package_name: str
201
202
203class DialectInfo(NamedTuple):
204 """Dialect class and Provider it comes from."""
205
206 name: str
207 dialect_class_name: str
208 provider_name: str
209
210
211class TriggerInfo(NamedTuple):
212 """Trigger class and provider it comes from."""
213
214 trigger_class_name: str
215 package_name: str
216 integration_name: str
217
218
219class NotificationInfo(NamedTuple):
220 """Notification class and provider it comes from."""
221
222 notification_class_name: str
223 package_name: str
224
225
226class RemoteLoggingInfo(NamedTuple):
227 """Remote logging IO handler registered by a provider."""
228
229 classpath: str
230 scheme: str
231 package_name: str
232
233
234class PluginInfo(NamedTuple):
235 """Plugin class, name and provider it comes from."""
236
237 name: str
238 plugin_class: str
239 provider_name: str
240
241
242class HookInfo(NamedTuple):
243 """Hook information."""
244
245 hook_class_name: str
246 connection_id_attribute_name: str
247 package_name: str
248 hook_name: str
249 connection_type: str
250 connection_testable: bool
251 dialects: list[str] = []
252
253
254class ConnectionTypeHookUIMetadata(NamedTuple):
255 """Hook metadata for one connection type (connection UI); ``field_behaviour`` is standard fields."""
256
257 connection_type: str
258 hook_name: str
259 hook_class_name: str | None
260 field_behaviour: dict | None
261
262
263class ConnectionFormWidgetInfo(NamedTuple):
264 """Connection Form Widget information."""
265
266 hook_class_name: str
267 package_name: str
268 field: Any
269 field_name: str
270 is_sensitive: bool
271
272
273def log_optional_feature_disabled(class_name, e, provider_package):
274 """Log optional feature disabled."""
275 log.debug(
276 "Optional feature disabled on exception when importing '%s' from '%s' package",
277 class_name,
278 provider_package,
279 exc_info=e,
280 )
281 log.info(
282 "Optional provider feature disabled when importing '%s' from '%s' package",
283 class_name,
284 provider_package,
285 )
286
287
288def log_import_warning(class_name, e, provider_package):
289 """Log import warning."""
290 log.warning(
291 "Exception when importing '%s' from '%s' package",
292 class_name,
293 provider_package,
294 exc_info=e,
295 )
296
297
298# This is a temporary measure until all community providers will add AirflowOptionalProviderFeatureException
299# where they have optional features. We are going to add tests in our CI to catch all such cases and will
300# fix them, but until now all "known unhandled optional feature errors" from community providers
301# should be added here
302KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS = [("apache-airflow-providers-google", "No module named 'paramiko'")]
303
304
305def _correctness_check(provider_package: str, class_name: str, provider_info: ProviderInfo) -> Any:
306 """
307 Perform coherence check on provider classes.
308
309 For apache-airflow providers - it checks if it starts with appropriate package. For all providers
310 it tries to import the provider - checking that there are no exceptions during importing.
311 It logs appropriate warning in case it detects any problems.
312
313 :param provider_package: name of the provider package
314 :param class_name: name of the class to import
315
316 :return the class if the class is OK, None otherwise.
317 """
318 if not _check_builtin_provider_prefix(provider_package, class_name):
319 return None
320 try:
321 imported_class = import_string(class_name)
322 except AirflowOptionalProviderFeatureException as e:
323 # When the provider class raises AirflowOptionalProviderFeatureException
324 # this is an expected case when only some classes in provider are
325 # available. We just log debug level here and print info message in logs so that
326 # the user is aware of it
327 log_optional_feature_disabled(class_name, e, provider_package)
328 return None
329 except ImportError as e:
330 if "No module named 'airflow.providers." in e.msg:
331 # handle cases where another provider is missing. This can only happen if
332 # there is an optional feature, so we log debug and print information about it
333 log_optional_feature_disabled(class_name, e, provider_package)
334 return None
335 for known_error in KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS:
336 # Until we convert all providers to use AirflowOptionalProviderFeatureException
337 # we assume any problem with importing another "provider" is because this is an
338 # optional feature, so we log debug and print information about it
339 if known_error[0] == provider_package and known_error[1] in e.msg:
340 log_optional_feature_disabled(class_name, e, provider_package)
341 return None
342 # But when we have no idea - we print warning to logs
343 log_import_warning(class_name, e, provider_package)
344 return None
345 except Exception as e:
346 log_import_warning(class_name, e, provider_package)
347 return None
348 return imported_class
349
350
351# We want to have better control over initialization of parameters and be able to debug and test it
352# So we add our own decorator
353def provider_info_cache(cache_name: str) -> Callable[[Callable[PS, None]], Callable[PS, None]]:
354 """
355 Decorate and cache provider info.
356
357 Decorator factory that create decorator that caches initialization of provider's parameters
358 :param cache_name: Name of the cache
359 """
360
361 def provider_info_cache_decorator(func: Callable[PS, None]) -> Callable[PS, None]:
362 @wraps(func)
363 def wrapped_function(*args: PS.args, **kwargs: PS.kwargs) -> None:
364 providers_manager_instance = args[0]
365 if TYPE_CHECKING:
366 assert isinstance(providers_manager_instance, ProvidersManager)
367
368 if cache_name in providers_manager_instance._initialized_cache:
369 return
370 start_time = perf_counter()
371 log.debug("Initializing Providers Manager[%s]", cache_name)
372 func(*args, **kwargs)
373 providers_manager_instance._initialized_cache[cache_name] = True
374 log.debug(
375 "Initialization of Providers Manager[%s] took %.2f seconds",
376 cache_name,
377 perf_counter() - start_time,
378 )
379
380 return wrapped_function
381
382 return provider_info_cache_decorator
383
384
385class ProvidersManager(LoggingMixin):
386 """
387 Manages all provider distributions.
388
389 This is a Singleton class. The first time it is
390 instantiated, it discovers all available providers in installed packages.
391 """
392
393 resource_version = "0"
394 _initialized: bool = False
395 _initialization_stack_trace = None
396 _instance: ProvidersManager | None = None
397
398 def __new__(cls):
399 if cls._instance is None:
400 cls._instance = super().__new__(cls)
401 return cls._instance
402
403 @staticmethod
404 def initialized() -> bool:
405 return ProvidersManager._initialized
406
407 @staticmethod
408 def initialization_stack_trace() -> str | None:
409 return ProvidersManager._initialization_stack_trace
410
411 def __init__(self):
412 """Initialize the manager."""
413 # skip initialization if already initialized
414 if self.initialized():
415 return
416
417 super().__init__()
418 ProvidersManager._initialized = True
419 ProvidersManager._initialization_stack_trace = "".join(traceback.format_stack(inspect.currentframe()))
420 self._initialized_cache: dict[str, bool] = {}
421 # Keeps dict of providers keyed by module name
422 self._provider_dict: dict[str, ProviderInfo] = {}
423 self._fs_set: set[str] = set()
424 self._asset_uri_handlers: dict[str, Callable[[SplitResult], SplitResult]] = {}
425 self._asset_factories: dict[str, Callable[..., Asset]] = {}
426 self._asset_to_openlineage_converters: dict[str, Callable] = {}
427 self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache()
428 # keeps mapping between connection_types and hook class, package they come from
429 self._hook_provider_dict: dict[str, HookClassProvider] = {}
430 self._dialect_provider_dict: dict[str, DialectInfo] = {}
431 # Keeps dict of hooks keyed by connection type. They are lazy evaluated at access time
432 self._hooks_lazy_dict: LazyDictWithCache[str, HookInfo | Callable] = LazyDictWithCache()
433 # Keeps hook display names read from provider.yaml (hook-name field)
434 self._hook_name_dict: dict[str, str] = {}
435 # Keeps methods that should be used to add custom widgets tuple of keyed by name of the extra field
436 self._connection_form_widgets: dict[str, ConnectionFormWidgetInfo] = {}
437 # Customizations for javascript fields are kept here
438 self._field_behaviours: dict[str, dict] = {}
439 self._cli_command_functions_set: set[Callable[[], list[CLICommand]]] = set()
440 self._cli_command_provider_name_set: set[str] = set()
441 self._extra_link_class_name_set: set[str] = set()
442 self._logging_class_name_set: set[str] = set()
443 self._remote_logging_info_list: list[RemoteLoggingInfo] = []
444 self._remote_logging_by_scheme: dict[str, RemoteLoggingInfo] = {}
445 self._auth_manager_class_name_set: set[str] = set()
446 self._auth_manager_without_check_set: set[tuple[str, str]] = set()
447 self._secrets_backend_class_name_set: set[str] = set()
448 self._executor_class_name_set: set[str] = set()
449 self._executor_without_check_set: set[tuple[str, str]] = set()
450 self._queue_class_name_set: set[str] = set()
451 self._db_manager_class_name_set: set[str] = set()
452 self._provider_configs: dict[str, dict[str, Any]] = {}
453 self._trigger_info_set: set[TriggerInfo] = set()
454 self._notification_info_set: set[NotificationInfo] = set()
455 self._provider_schema_validator = _create_provider_info_schema_validator()
456 self._customized_form_fields_schema_validator = (
457 _create_customized_form_field_behaviours_schema_validator()
458 )
459 # Set of plugins contained in providers
460 self._plugins_set: set[PluginInfo] = set()
461 self._init_airflow_core_hooks()
462
463 self._runtime_manager = None
464
465 def __getattribute__(self, name: str):
466 # Hacky but does the trick for now
467 runtime_properties = {
468 "hooks",
469 "taskflow_decorators",
470 "filesystem_module_names",
471 "asset_factories",
472 "asset_uri_handlers",
473 "asset_to_openlineage_converters",
474 }
475
476 if name in runtime_properties:
477 warnings.warn(
478 f"ProvidersManager.{name} is deprecated. Use ProvidersManagerTaskRuntime.{name} from task-sdk instead.",
479 DeprecatedImportWarning,
480 stacklevel=2,
481 )
482 if object.__getattribute__(self, "_runtime_manager") is None:
483 from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
484
485 object.__setattr__(self, "_runtime_manager", ProvidersManagerTaskRuntime())
486 return getattr(object.__getattribute__(self, "_runtime_manager"), name)
487
488 return object.__getattribute__(self, name)
489
490 def _init_airflow_core_hooks(self):
491 """Initialize the hooks dict with default hooks from Airflow core."""
492 core_dummy_hooks = {
493 "generic": "Generic",
494 "email": "Email",
495 }
496 for key, display in core_dummy_hooks.items():
497 self._hooks_lazy_dict[key] = HookInfo(
498 hook_class_name=None,
499 connection_id_attribute_name=None,
500 package_name=None,
501 hook_name=display,
502 connection_type=None,
503 connection_testable=False,
504 )
505 for conn_type, class_name in (
506 ("fs", "airflow.providers.standard.hooks.filesystem.FSHook"),
507 ("package_index", "airflow.providers.standard.hooks.package_index.PackageIndexHook"),
508 ):
509 self._hooks_lazy_dict[conn_type] = functools.partial(
510 self._import_hook,
511 connection_type=None,
512 package_name="apache-airflow-providers-standard",
513 hook_class_name=class_name,
514 provider_info=None,
515 )
516
517 @provider_info_cache("list")
518 def initialize_providers_list(self):
519 """Lazy initialization of providers list."""
520 # Local source folders are loaded first. They should take precedence over the package ones for
521 # Development purpose. In production provider.yaml files are not present in the 'airflow" directory
522 # So there is no risk we are going to override package provider accidentally. This can only happen
523 # in case of local development
524 discover_all_providers_from_packages(self._provider_dict, self._provider_schema_validator)
525 self._verify_all_providers_all_compatible()
526 self._provider_dict = dict(sorted(self._provider_dict.items()))
527
528 def _verify_all_providers_all_compatible(self):
529 from packaging import version as packaging_version
530
531 for provider_id, info in self._provider_dict.items():
532 min_version = MIN_PROVIDER_VERSIONS.get(provider_id)
533 if min_version:
534 if packaging_version.parse(min_version) > packaging_version.parse(info.version):
535 log.warning(
536 "The package %s is not compatible with this version of Airflow. "
537 "The package has version %s but the minimum supported version "
538 "of the package is %s",
539 provider_id,
540 info.version,
541 min_version,
542 )
543
544 @provider_info_cache("hooks")
545 def initialize_providers_hooks(self):
546 """Lazy initialization of providers hooks."""
547 self._init_airflow_core_hooks()
548 self.initialize_providers_list()
549 self._discover_hooks()
550 self._load_ui_metadata()
551 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items()))
552
553 @provider_info_cache("filesystems")
554 def initialize_providers_filesystems(self):
555 """Lazy initialization of providers filesystems."""
556 self.initialize_providers_list()
557 self._discover_filesystems()
558
559 @provider_info_cache("asset_uris")
560 def initialize_providers_asset_uri_resources(self):
561 """Lazy initialization of provider asset URI handlers, factories, converters etc."""
562 self.initialize_providers_list()
563 self._discover_asset_uri_resources()
564
565 @provider_info_cache("hook_lineage_writers")
566 @provider_info_cache("taskflow_decorators")
567 def initialize_providers_taskflow_decorator(self):
568 """Lazy initialization of providers hooks."""
569 self.initialize_providers_list()
570 self._discover_taskflow_decorators()
571
572 @provider_info_cache("extra_links")
573 def initialize_providers_extra_links(self):
574 """Lazy initialization of providers extra links."""
575 self.initialize_providers_list()
576 self._discover_extra_links()
577
578 @provider_info_cache("logging")
579 def initialize_providers_logging(self):
580 """Lazy initialization of providers logging information."""
581 self.initialize_providers_list()
582 self._discover_logging()
583
584 @provider_info_cache("remote_logging")
585 def initialize_providers_remote_logging(self):
586 """Lazy initialization of providers remote logging IO handlers."""
587 self.initialize_providers_list()
588 self._discover_remote_logging()
589
590 @provider_info_cache("secrets_backends")
591 def initialize_providers_secrets_backends(self):
592 """Lazy initialization of providers secrets_backends information."""
593 self.initialize_providers_list()
594 self._discover_secrets_backends()
595
596 @provider_info_cache("executors")
597 def initialize_providers_executors(self):
598 """Lazy initialization of providers executors information."""
599 self.initialize_providers_list()
600 self._discover_executors(check=True)
601
602 @provider_info_cache("executors_without_check")
603 def initialize_providers_executors_without_check(self):
604 """Lazy initialization of providers executors information."""
605 self.initialize_providers_list()
606 self._discover_executors(check=False)
607
608 @provider_info_cache("queues")
609 def initialize_providers_queues(self):
610 """Lazy initialization of providers queue information."""
611 self.initialize_providers_list()
612 self._discover_queues()
613
614 @provider_info_cache("db_managers")
615 def initialize_providers_db_managers(self):
616 """Lazy initialization of providers db_managers information."""
617 self.initialize_providers_list()
618 self._discover_db_managers()
619
620 @provider_info_cache("notifications")
621 def initialize_providers_notifications(self):
622 """Lazy initialization of providers notifications information."""
623 self.initialize_providers_list()
624 self._discover_notifications()
625
626 @provider_info_cache("auth_managers")
627 def initialize_providers_auth_managers(self):
628 """Lazy initialization of providers auth manager information."""
629 self.initialize_providers_list()
630 self._discover_auth_managers(check=True)
631
632 @provider_info_cache("auth_managers_without_check")
633 def initialize_providers_auth_managers_without_check(self):
634 """Lazy initialization of providers auth manager information."""
635 self.initialize_providers_list()
636 self._discover_auth_managers(check=False)
637
638 @provider_info_cache("config")
639 def initialize_providers_configuration(self):
640 """Lazy initialization of provider configuration metadata and merge it into ``conf``."""
641 self.initialize_providers_list()
642 self._discover_config()
643
644 @provider_info_cache("plugins")
645 def initialize_providers_plugins(self):
646 self.initialize_providers_list()
647 self._discover_plugins()
648
649 @provider_info_cache("cli_command")
650 def initialize_providers_cli_command(self):
651 """Lazy initialization of providers CLI commands."""
652 self.initialize_providers_list()
653 self._discover_cli_command()
654
655 def _discover_hooks_from_connection_types(
656 self,
657 hook_class_names_registered: set[str],
658 already_registered_warning_connection_types: set[str],
659 package_name: str,
660 provider: ProviderInfo,
661 ):
662 """
663 Discover hooks from the "connection-types" property.
664
665 This is new, better method that replaces discovery from hook-class-names as it
666 allows to lazy import individual Hook classes when they are accessed.
667 The "connection-types" keeps information about both - connection type and class
668 name so we can discover all connection-types without importing the classes.
669 :param hook_class_names_registered: set of registered hook class names for this provider
670 :param already_registered_warning_connection_types: set of connections for which warning should be
671 printed in logs as they were already registered before
672 :param package_name:
673 :param provider:
674 :return:
675 """
676 provider_uses_connection_types = False
677 connection_types = provider.data.get("connection-types")
678 if connection_types:
679 for connection_type_dict in connection_types:
680 connection_type = connection_type_dict["connection-type"]
681 hook_class_name = connection_type_dict["hook-class-name"]
682 hook_class_names_registered.add(hook_class_name)
683 already_registered = self._hook_provider_dict.get(connection_type)
684 if already_registered:
685 if already_registered.package_name != package_name:
686 already_registered_warning_connection_types.add(connection_type)
687 else:
688 log.warning(
689 "The connection type '%s' is already registered in the"
690 " package '%s' with different class names: '%s' and '%s'. ",
691 connection_type,
692 package_name,
693 already_registered.hook_class_name,
694 hook_class_name,
695 )
696 else:
697 self._hook_provider_dict[connection_type] = HookClassProvider(
698 hook_class_name=hook_class_name, package_name=package_name
699 )
700 # Defer importing hook to access time by setting import hook method as dict value
701 self._hooks_lazy_dict[connection_type] = functools.partial(
702 self._import_hook,
703 connection_type=connection_type,
704 provider_info=provider,
705 )
706 provider_uses_connection_types = True
707 return provider_uses_connection_types
708
709 def _discover_hooks_from_hook_class_names(
710 self,
711 hook_class_names_registered: set[str],
712 already_registered_warning_connection_types: set[str],
713 package_name: str,
714 provider: ProviderInfo,
715 provider_uses_connection_types: bool,
716 ):
717 """
718 Discover hooks from "hook-class-names' property.
719
720 This property is deprecated but we should support it in Airflow 2.
721 The hook-class-names array contained just Hook names without connection type,
722 therefore we need to import all those classes immediately to know which connection types
723 are supported. This makes it impossible to selectively only import those hooks that are used.
724 :param already_registered_warning_connection_types: list of connection hooks that we should warn
725 about when finished discovery
726 :param package_name: name of the provider package
727 :param provider: class that keeps information about version and details of the provider
728 :param provider_uses_connection_types: determines whether the provider uses "connection-types" new
729 form of passing connection types
730 :return:
731 """
732 hook_class_names = provider.data.get("hook-class-names")
733 if hook_class_names:
734 for hook_class_name in hook_class_names:
735 if hook_class_name in hook_class_names_registered:
736 # Silently ignore the hook class - it's already marked for lazy-import by
737 # connection-types discovery
738 continue
739 hook_info = self._import_hook(
740 connection_type=None,
741 provider_info=provider,
742 hook_class_name=hook_class_name,
743 package_name=package_name,
744 )
745 if not hook_info:
746 # Problem why importing class - we ignore it. Log is written at import time
747 continue
748 already_registered = self._hook_provider_dict.get(hook_info.connection_type)
749 if already_registered:
750 if already_registered.package_name != package_name:
751 already_registered_warning_connection_types.add(hook_info.connection_type)
752 else:
753 if already_registered.hook_class_name != hook_class_name:
754 log.warning(
755 "The hook connection type '%s' is registered twice in the"
756 " package '%s' with different class names: '%s' and '%s'. "
757 " Please fix it!",
758 hook_info.connection_type,
759 package_name,
760 already_registered.hook_class_name,
761 hook_class_name,
762 )
763 else:
764 self._hook_provider_dict[hook_info.connection_type] = HookClassProvider(
765 hook_class_name=hook_class_name, package_name=package_name
766 )
767 self._hooks_lazy_dict[hook_info.connection_type] = hook_info
768
769 if not provider_uses_connection_types:
770 warnings.warn(
771 f"The provider {package_name} uses `hook-class-names` "
772 "property in provider-info and has no `connection-types` one. "
773 "The 'hook-class-names' property has been deprecated in favour "
774 "of 'connection-types' in Airflow 2.2. Use **both** in case you want to "
775 "have backwards compatibility with Airflow < 2.2",
776 DeprecationWarning,
777 stacklevel=1,
778 )
779 for already_registered_connection_type in already_registered_warning_connection_types:
780 log.warning(
781 "The connection_type '%s' has been already registered by provider '%s.'",
782 already_registered_connection_type,
783 self._hook_provider_dict[already_registered_connection_type].package_name,
784 )
785
786 def _discover_hooks(self) -> None:
787 """Retrieve all connections defined in the providers via Hooks."""
788 for package_name, provider in self._provider_dict.items():
789 duplicated_connection_types: set[str] = set()
790 hook_class_names_registered: set[str] = set()
791 self._discover_provider_dialects(package_name, provider)
792 provider_uses_connection_types = self._discover_hooks_from_connection_types(
793 hook_class_names_registered, duplicated_connection_types, package_name, provider
794 )
795 self._discover_hooks_from_hook_class_names(
796 hook_class_names_registered,
797 duplicated_connection_types,
798 package_name,
799 provider,
800 provider_uses_connection_types,
801 )
802 self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items()))
803
804 def _discover_provider_dialects(self, provider_name: str, provider: ProviderInfo):
805 dialects = provider.data.get("dialects", [])
806 if dialects:
807 self._dialect_provider_dict.update(
808 {
809 item["dialect-type"]: DialectInfo(
810 name=item["dialect-type"],
811 dialect_class_name=item["dialect-class-name"],
812 provider_name=provider_name,
813 )
814 for item in dialects
815 }
816 )
817
818 @provider_info_cache("import_all_hooks")
819 def _import_info_from_all_hooks(self):
820 """Force-import all hooks and initialize the connections/fields."""
821 # Retrieve all hooks to make sure that all of them are imported
822 _ = list(self._hooks_lazy_dict.values())
823 self._field_behaviours = dict(sorted(self._field_behaviours.items()))
824
825 # Widgets for connection forms are currently used in two places:
826 # 1. In the UI Connections, expected same order that it defined in Hook.
827 # 2. cli command - `airflow providers widgets` and expected that it in alphabetical order.
828 # It is not possible to recover original ordering after sorting,
829 # that the main reason why original sorting moved to cli part:
830 # self._connection_form_widgets = dict(sorted(self._connection_form_widgets.items()))
831
832 def _discover_filesystems(self) -> None:
833 """Retrieve all filesystems defined in the providers."""
834 for provider_package, provider in self._provider_dict.items():
835 for fs_module_name in provider.data.get("filesystems", []):
836 if _correctness_check(provider_package, f"{fs_module_name}.get_fs", provider):
837 self._fs_set.add(fs_module_name)
838 self._fs_set = set(sorted(self._fs_set))
839
840 def _discover_asset_uri_resources(self) -> None:
841 """Discovers and registers asset URI handlers, factories, and converters for all providers."""
842 from airflow.sdk.definitions.asset import normalize_noop
843
844 def _safe_register_resource(
845 provider_package_name: str,
846 schemes_list: list[str],
847 resource_path: str | None,
848 resource_registry: dict,
849 default_resource: Any = None,
850 ):
851 """
852 Register a specific resource (handler, factory, or converter) for the given schemes.
853
854 If the resolved resource (either from the path or the default) is valid, it updates
855 the resource registry with the appropriate resource for each scheme.
856 """
857 resource = (
858 _correctness_check(provider_package_name, resource_path, provider)
859 if resource_path is not None
860 else default_resource
861 )
862 if resource:
863 resource_registry.update((scheme, resource) for scheme in schemes_list)
864
865 for provider_name, provider in self._provider_dict.items():
866 for uri_info in provider.data.get("asset-uris", []):
867 if "schemes" not in uri_info or "handler" not in uri_info:
868 continue # Both schemas and handler must be explicitly set, handler can be set to null
869 common_args = {"schemes_list": uri_info["schemes"], "provider_package_name": provider_name}
870 _safe_register_resource(
871 resource_path=uri_info["handler"],
872 resource_registry=self._asset_uri_handlers,
873 default_resource=normalize_noop,
874 **common_args,
875 )
876 _safe_register_resource(
877 resource_path=uri_info.get("factory"),
878 resource_registry=self._asset_factories,
879 **common_args,
880 )
881 _safe_register_resource(
882 resource_path=uri_info.get("to_openlineage_converter"),
883 resource_registry=self._asset_to_openlineage_converters,
884 **common_args,
885 )
886
887 def _discover_taskflow_decorators(self) -> None:
888 for name, info in self._provider_dict.items():
889 for taskflow_decorator in info.data.get("task-decorators", []):
890 self._add_taskflow_decorator(
891 taskflow_decorator["name"], taskflow_decorator["class-name"], name
892 )
893
894 def _add_taskflow_decorator(self, name, decorator_class_name: str, provider_package: str) -> None:
895 if not _check_builtin_provider_prefix(provider_package, decorator_class_name):
896 return
897
898 if name in self._taskflow_decorators:
899 try:
900 existing = self._taskflow_decorators[name]
901 other_name = f"{existing.__module__}.{existing.__name__}"
902 except Exception:
903 # If problem importing, then get the value from the functools.partial
904 other_name = self._taskflow_decorators._raw_dict[name].args[0] # type: ignore[attr-defined]
905
906 log.warning(
907 "The taskflow decorator '%s' has been already registered (by %s).",
908 name,
909 other_name,
910 )
911 return
912
913 self._taskflow_decorators[name] = functools.partial(import_string, decorator_class_name)
914
915 @staticmethod
916 def _get_attr(obj: Any, attr_name: str):
917 """Retrieve attributes of an object, or warn if not found."""
918 if not hasattr(obj, attr_name):
919 log.warning("The object '%s' is missing %s attribute and cannot be registered", obj, attr_name)
920 return None
921 return getattr(obj, attr_name)
922
923 def _get_connection_type_config(self, provider_info: ProviderInfo, connection_type: str) -> dict | None:
924 """Get connection type config from provider.yaml if it exists."""
925 connection_types = provider_info.data.get("connection-types", [])
926 for conn_config in connection_types:
927 if conn_config.get("connection-type") == connection_type:
928 return conn_config
929 return None
930
931 def _to_api_format(self, field_name: str, field_def: dict) -> dict:
932 """Convert conn-fields definition to format expected by the API."""
933 schema_def = field_def.get("schema", {})
934
935 # build schema dict with label moved to `title` per jsonschema convention
936 schema = schema_def.copy()
937 if "label" in field_def:
938 schema["title"] = field_def.get("label")
939
940 return {
941 "value": schema_def.get("default"),
942 "schema": schema,
943 "description": field_def.get("description"),
944 "source": None,
945 }
946
947 def _add_widgets(
948 self, package_name: str, hook_class_name: str, connection_type: str, conn_fields: dict
949 ) -> None:
950 """Parse conn-fields from provider info and add to connection_form_widgets."""
951 for field_name, field_def in conn_fields.items():
952 field_data = self._to_api_format(field_name, field_def)
953
954 prefixed_name = f"extra__{connection_type}__{field_name}"
955 if prefixed_name in self._connection_form_widgets:
956 log.warning(
957 "Field %s for connection type %s already added, skipping",
958 field_name,
959 connection_type,
960 )
961 continue
962
963 schema_def = field_def.get("schema", {})
964 self._connection_form_widgets[prefixed_name] = ConnectionFormWidgetInfo(
965 hook_class_name=hook_class_name,
966 package_name=package_name,
967 field=field_data,
968 field_name=field_name,
969 is_sensitive=schema_def.get("format") == "password",
970 )
971
972 def _add_customized_fields(self, package_name: str, connection_type: str, behaviour: dict) -> None:
973 """Process ui-field-behaviour from provider info and add to field_behaviours."""
974 if connection_type in self._field_behaviours:
975 log.warning(
976 "Field behaviour for connection type %s already exists, skipping",
977 connection_type,
978 )
979 return
980
981 # convert kebab-case keys to python style
982 customized_fields = {
983 "hidden_fields": behaviour.get("hidden-fields", []),
984 "relabeling": behaviour.get("relabeling", {}),
985 "placeholders": behaviour.get("placeholders", {}),
986 }
987
988 try:
989 self._customized_form_fields_schema_validator.validate(customized_fields)
990 customized_fields = _ensure_prefix_for_placeholders(customized_fields, connection_type)
991 self._field_behaviours[connection_type] = customized_fields
992 except Exception as e:
993 log.warning(
994 "Failed to add field behaviour for %s in package %s: %s",
995 connection_type,
996 package_name,
997 e,
998 )
999
1000 def _load_ui_metadata(self) -> None:
1001 """Load connection form UI metadata from provider info without importing hooks."""
1002 for package_name, provider in self._provider_dict.items():
1003 for conn_config in provider.data.get("connection-types", []):
1004 connection_type = conn_config.get("connection-type")
1005 hook_class_name = conn_config.get("hook-class-name")
1006 if not connection_type or not hook_class_name:
1007 continue
1008
1009 if hook_name := conn_config.get("hook-name"):
1010 self._hook_name_dict[connection_type] = hook_name
1011
1012 if conn_fields := conn_config.get("conn-fields"):
1013 self._add_widgets(package_name, hook_class_name, connection_type, conn_fields)
1014
1015 if behaviour := conn_config.get("ui-field-behaviour"):
1016 self._add_customized_fields(package_name, connection_type, behaviour)
1017
1018 def _import_hook(
1019 self,
1020 connection_type: str | None,
1021 provider_info: ProviderInfo,
1022 hook_class_name: str | None = None,
1023 package_name: str | None = None,
1024 ) -> HookInfo | None:
1025 """
1026 Import hook and retrieve hook information.
1027
1028 Either connection_type (for lazy loading) or hook_class_name must be set - but not both).
1029 Only needs package_name if hook_class_name is passed (for lazy loading, package_name
1030 is retrieved from _connection_type_class_provider_dict together with hook_class_name).
1031
1032 :param connection_type: type of the connection
1033 :param hook_class_name: name of the hook class
1034 :param package_name: provider package - only needed in case connection_type is missing
1035 : return
1036 """
1037 if connection_type is None and hook_class_name is None:
1038 raise ValueError("Either connection_type or hook_class_name must be set")
1039 if connection_type is not None and hook_class_name is not None:
1040 raise ValueError(
1041 f"Both connection_type ({connection_type} and "
1042 f"hook_class_name {hook_class_name} are set. Only one should be set!"
1043 )
1044 if connection_type is not None:
1045 class_provider = self._hook_provider_dict[connection_type]
1046 package_name = class_provider.package_name
1047 hook_class_name = class_provider.hook_class_name
1048 else:
1049 if not hook_class_name:
1050 raise ValueError("Either connection_type or hook_class_name must be set")
1051 if not package_name:
1052 raise ValueError(
1053 f"Provider package name is not set when hook_class_name ({hook_class_name}) is used"
1054 )
1055 hook_class: type[BaseHook] | None = _correctness_check(package_name, hook_class_name, provider_info)
1056 if hook_class is None:
1057 return None
1058
1059 # Check if provider info already has UI metadata and skip Python hook methods
1060 # to avoid duplicate initialization and unnecessary wtforms imports
1061 ui_metadata_loaded = False
1062 if provider_info and connection_type:
1063 conn_config = self._get_connection_type_config(provider_info, connection_type)
1064 ui_metadata_loaded = conn_config is not None and bool(
1065 conn_config.get("conn-fields") or conn_config.get("ui-field-behaviour")
1066 )
1067
1068 if not ui_metadata_loaded:
1069 try:
1070 from wtforms import BooleanField, IntegerField, PasswordField, StringField
1071
1072 allowed_field_classes = [IntegerField, PasswordField, StringField, BooleanField]
1073 # Do not use attr here. We want to check only direct class fields not those
1074 # inherited from parent hook. This way we add form fields only once for the whole
1075 # hierarchy and we add it only from the parent hook that provides those!
1076 if "get_connection_form_widgets" in hook_class.__dict__:
1077 warning = AirflowProviderDeprecationWarning(
1078 f"Hook '{hook_class_name}' defines get_connection_form_widgets(). "
1079 "This method is deprecated. Define connection fields declaratively in "
1080 "provider.yaml under 'conn-fields' instead. See "
1081 "https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html"
1082 )
1083 warning.deprecated_provider_since = "3.2.0"
1084 warnings.warn(warning, stacklevel=2)
1085 widgets = hook_class.get_connection_form_widgets()
1086 if widgets:
1087 for widget in widgets.values():
1088 if widget.field_class not in allowed_field_classes:
1089 log.warning(
1090 "The hook_class '%s' uses field of unsupported class '%s'. "
1091 "Only '%s' field classes are supported",
1092 hook_class_name,
1093 widget.field_class,
1094 allowed_field_classes,
1095 )
1096 return None
1097 self._add_widgets_from_hook(package_name, hook_class, widgets)
1098 if "get_ui_field_behaviour" in hook_class.__dict__:
1099 warning = AirflowProviderDeprecationWarning(
1100 f"Hook '{hook_class_name}' defines get_ui_field_behaviour(). "
1101 "This method is deprecated. Define field behaviour declaratively in "
1102 "provider.yaml under 'ui-field-behaviour' instead. See "
1103 "https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html"
1104 )
1105 warning.deprecated_provider_since = "3.2.0"
1106 warnings.warn(warning, stacklevel=2)
1107 field_behaviours = hook_class.get_ui_field_behaviour()
1108 if field_behaviours:
1109 self._add_customized_fields_from_hook(package_name, hook_class, field_behaviours)
1110 except ImportError as e:
1111 if e.name in ["flask_appbuilder", "wtforms"]:
1112 log.info(
1113 "The hook_class '%s' is not fully initialized (UI widgets will be missing), because "
1114 "the 'flask_appbuilder' package is not installed, however it is not required for "
1115 "Airflow components to work",
1116 hook_class_name,
1117 )
1118 except Exception as e:
1119 log.warning(
1120 "Exception when importing '%s' from '%s' package: %s",
1121 hook_class_name,
1122 package_name,
1123 e,
1124 )
1125 return None
1126
1127 hook_connection_type = self._get_attr(hook_class, "conn_type")
1128 if connection_type:
1129 if hook_connection_type != connection_type:
1130 log.warning(
1131 "Inconsistency! The hook class '%s' declares connection type '%s'"
1132 " but it is added by provider '%s' as connection_type '%s' in provider info. "
1133 "This should be fixed!",
1134 hook_class,
1135 hook_connection_type,
1136 package_name,
1137 connection_type,
1138 )
1139 connection_type = hook_connection_type
1140 connection_id_attribute_name: str = self._get_attr(hook_class, "conn_name_attr")
1141 hook_name: str = self._get_attr(hook_class, "hook_name")
1142
1143 if not connection_type or not connection_id_attribute_name or not hook_name:
1144 log.warning(
1145 "The hook misses one of the key attributes: "
1146 "conn_type: %s, conn_id_attribute_name: %s, hook_name: %s",
1147 connection_type,
1148 connection_id_attribute_name,
1149 hook_name,
1150 )
1151 return None
1152
1153 return HookInfo(
1154 hook_class_name=hook_class_name,
1155 connection_id_attribute_name=connection_id_attribute_name,
1156 package_name=package_name,
1157 hook_name=hook_name,
1158 connection_type=connection_type,
1159 connection_testable=hasattr(hook_class, "test_connection"),
1160 )
1161
1162 def _add_widgets_from_hook(self, package_name: str, hook_class: type, widgets: dict[str, Any]):
1163 conn_type = hook_class.conn_type # type: ignore
1164 for field_identifier, field in widgets.items():
1165 if field_identifier.startswith("extra__"):
1166 prefixed_field_name = field_identifier
1167 else:
1168 prefixed_field_name = f"extra__{conn_type}__{field_identifier}"
1169 if prefixed_field_name in self._connection_form_widgets:
1170 log.warning(
1171 "The field %s from class %s has already been added by another provider. Ignoring it.",
1172 field_identifier,
1173 hook_class.__name__,
1174 )
1175 # In case of inherited hooks this might be happening several times
1176 else:
1177 self._connection_form_widgets[prefixed_field_name] = ConnectionFormWidgetInfo(
1178 hook_class.__name__,
1179 package_name,
1180 field,
1181 field_identifier,
1182 hasattr(field.field_class.widget, "input_type")
1183 and field.field_class.widget.input_type == "password",
1184 )
1185
1186 def _add_customized_fields_from_hook(self, package_name: str, hook_class: type, customized_fields: dict):
1187 try:
1188 connection_type = getattr(hook_class, "conn_type")
1189
1190 self._customized_form_fields_schema_validator.validate(customized_fields)
1191
1192 if connection_type:
1193 customized_fields = _ensure_prefix_for_placeholders(customized_fields, connection_type)
1194
1195 if connection_type in self._field_behaviours:
1196 log.warning(
1197 "The connection_type %s from package %s and class %s has already been added "
1198 "by another provider. Ignoring it.",
1199 connection_type,
1200 package_name,
1201 hook_class.__name__,
1202 )
1203 return
1204 self._field_behaviours[connection_type] = customized_fields
1205 except Exception as e:
1206 log.warning(
1207 "Error when loading customized fields from package '%s' hook class '%s': %s",
1208 package_name,
1209 hook_class.__name__,
1210 e,
1211 )
1212
1213 def _discover_auth_managers(self, *, check: bool) -> None:
1214 """Retrieve all auth managers defined in the providers."""
1215 for provider_package, provider in self._provider_dict.items():
1216 if provider.data.get("auth-managers"):
1217 for auth_manager_class_name in provider.data["auth-managers"]:
1218 if not check:
1219 self._auth_manager_without_check_set.add((auth_manager_class_name, provider_package))
1220 elif _correctness_check(provider_package, auth_manager_class_name, provider):
1221 self._auth_manager_class_name_set.add(auth_manager_class_name)
1222
1223 def _discover_cli_command(self) -> None:
1224 """Retrieve all CLI command functions defined in the providers."""
1225 for provider_package, provider in self._provider_dict.items():
1226 if provider.data.get("cli"):
1227 for cli_command_function_name in provider.data["cli"]:
1228 # _correctness_check will return the function if found and correct
1229 # we store the function itself instead of its name to avoid importing it again later in cli_parser to speed up cli loading
1230 if cli_func := _correctness_check(provider_package, cli_command_function_name, provider):
1231 cli_func = cast("Callable[[], list[CLICommand]]", cli_func)
1232 self._cli_command_functions_set.add(cli_func)
1233 self._cli_command_provider_name_set.add(provider_package)
1234
1235 def _discover_notifications(self) -> None:
1236 """Retrieve all notifications defined in the providers."""
1237 for provider_package, provider in self._provider_dict.items():
1238 if provider.data.get("notifications"):
1239 for notification_class_name in provider.data["notifications"]:
1240 if _correctness_check(provider_package, notification_class_name, provider):
1241 self._notification_info_set.add(notification_class_name)
1242
1243 def _discover_extra_links(self) -> None:
1244 """Retrieve all extra links defined in the providers."""
1245 for provider_package, provider in self._provider_dict.items():
1246 if provider.data.get("extra-links"):
1247 for extra_link_class_name in provider.data["extra-links"]:
1248 if _correctness_check(provider_package, extra_link_class_name, provider):
1249 self._extra_link_class_name_set.add(extra_link_class_name)
1250
1251 def _discover_logging(self) -> None:
1252 """Retrieve all logging defined in the providers."""
1253 for provider_package, provider in self._provider_dict.items():
1254 if provider.data.get("logging"):
1255 for logging_class_name in provider.data["logging"]:
1256 if _correctness_check(provider_package, logging_class_name, provider):
1257 self._logging_class_name_set.add(logging_class_name)
1258
1259 def _discover_remote_logging(self) -> None:
1260 """Retrieve all remote logging IO handlers defined in the providers."""
1261 for provider_package, provider in self._provider_dict.items():
1262 entries = provider.data.get("remote-logging") or []
1263 for entry in entries:
1264 classpath = entry["classpath"]
1265 if not _correctness_check(provider_package, classpath, provider):
1266 continue
1267 info = RemoteLoggingInfo(
1268 classpath=classpath,
1269 scheme=entry["scheme"],
1270 package_name=provider_package,
1271 )
1272 if (existing := self._remote_logging_by_scheme.get(info.scheme)) is not None:
1273 log.warning(
1274 "Remote logging scheme '%s' is already registered by %s; ignoring "
1275 "duplicate registration from %s.",
1276 info.scheme,
1277 existing.package_name,
1278 info.package_name,
1279 )
1280 continue
1281 self._remote_logging_info_list.append(info)
1282 self._remote_logging_by_scheme[info.scheme] = info
1283
1284 def _discover_secrets_backends(self) -> None:
1285 """Retrieve all secrets backends defined in the providers."""
1286 for provider_package, provider in self._provider_dict.items():
1287 if provider.data.get("secrets-backends"):
1288 for secrets_backends_class_name in provider.data["secrets-backends"]:
1289 if _correctness_check(provider_package, secrets_backends_class_name, provider):
1290 self._secrets_backend_class_name_set.add(secrets_backends_class_name)
1291
1292 def _discover_executors(self, *, check: bool) -> None:
1293 """Retrieve all executors defined in the providers."""
1294 for provider_package, provider in self._provider_dict.items():
1295 if provider.data.get("executors"):
1296 for executors_class_path in provider.data["executors"]:
1297 if not check:
1298 self._executor_without_check_set.add((executors_class_path, provider_package))
1299 elif _correctness_check(provider_package, executors_class_path, provider):
1300 self._executor_class_name_set.add(executors_class_path)
1301
1302 def _discover_queues(self) -> None:
1303 """Retrieve all queues defined in the providers."""
1304 for provider_package, provider in self._provider_dict.items():
1305 if provider.data.get("queues"):
1306 for queue_class_name in provider.data["queues"]:
1307 if _correctness_check(provider_package, queue_class_name, provider):
1308 self._queue_class_name_set.add(queue_class_name)
1309
1310 def _discover_db_managers(self) -> None:
1311 """Retrieve all DB managers defined in the providers."""
1312 for provider_package, provider in self._provider_dict.items():
1313 if provider.data.get("db-managers"):
1314 for db_manager_class_name in provider.data["db-managers"]:
1315 if _correctness_check(provider_package, db_manager_class_name, provider):
1316 self._db_manager_class_name_set.add(db_manager_class_name)
1317
1318 def _discover_config(self) -> None:
1319 """Retrieve all configs defined in the providers."""
1320 for provider_package, provider in self._provider_dict.items():
1321 if provider.data.get("config"):
1322 self._provider_configs[provider_package] = provider.data.get("config") # type: ignore[assignment]
1323
1324 def _discover_plugins(self) -> None:
1325 """Retrieve all plugins defined in the providers."""
1326 for provider_package, provider in self._provider_dict.items():
1327 for plugin_dict in provider.data.get("plugins", ()):
1328 if not _correctness_check(provider_package, plugin_dict["plugin-class"], provider):
1329 log.warning("Plugin not loaded due to above correctness check problem.")
1330 continue
1331 self._plugins_set.add(
1332 PluginInfo(
1333 name=plugin_dict["name"],
1334 plugin_class=plugin_dict["plugin-class"],
1335 provider_name=provider_package,
1336 )
1337 )
1338
1339 @provider_info_cache("triggers")
1340 def initialize_providers_triggers(self):
1341 """Initialize providers triggers."""
1342 self.initialize_providers_list()
1343 for provider_package, provider in self._provider_dict.items():
1344 for trigger in provider.data.get("triggers", []):
1345 for trigger_class_name in trigger.get("python-modules"):
1346 self._trigger_info_set.add(
1347 TriggerInfo(
1348 package_name=provider_package,
1349 trigger_class_name=trigger_class_name,
1350 integration_name=trigger.get("integration-name", ""),
1351 )
1352 )
1353
1354 @property
1355 def auth_managers(self) -> list[str]:
1356 """Returns information about available providers notifications class."""
1357 self.initialize_providers_auth_managers()
1358 return sorted(self._auth_manager_class_name_set)
1359
1360 @property
1361 def auth_manager_without_check(self) -> set[tuple[str, str]]:
1362 """Returns set of (auth manager class names, provider package name) without correctness check."""
1363 self.initialize_providers_auth_managers_without_check()
1364 return self._auth_manager_without_check_set
1365
1366 @property
1367 def cli_command_functions(self) -> set[Callable[[], list[CLICommand]]]:
1368 """Returns list of CLI command function names from providers."""
1369 self.initialize_providers_cli_command()
1370 return self._cli_command_functions_set
1371
1372 @property
1373 def cli_command_providers(self) -> set[str]:
1374 """Returns set of provider package names that provide CLI commands."""
1375 self.initialize_providers_cli_command()
1376 return self._cli_command_provider_name_set
1377
1378 @property
1379 def notification(self) -> list[NotificationInfo]:
1380 """Returns information about available providers notifications class."""
1381 self.initialize_providers_notifications()
1382 return sorted(self._notification_info_set)
1383
1384 @property
1385 def trigger(self) -> list[TriggerInfo]:
1386 """Returns information about available providers trigger class."""
1387 self.initialize_providers_triggers()
1388 return sorted(self._trigger_info_set, key=lambda x: x.package_name)
1389
1390 @property
1391 def providers(self) -> dict[str, ProviderInfo]:
1392 """Returns information about available providers."""
1393 self.initialize_providers_list()
1394 return self._provider_dict
1395
1396 @property
1397 def hooks(self) -> MutableMapping[str, HookInfo | None]:
1398 """
1399 Return dictionary of connection_type-to-hook mapping.
1400
1401 Note that the dict can contain None values if a hook discovered cannot be imported!
1402 """
1403 self.initialize_providers_hooks()
1404 # When we return hooks here it will only be used to retrieve hook information
1405 return self._hooks_lazy_dict
1406
1407 def iter_connection_type_hook_ui_metadata(self) -> Iterator[ConnectionTypeHookUIMetadata]:
1408 """
1409 Yield hook metadata per connection type for the connection UI.
1410
1411 Does not import hook classes.
1412 """
1413 self.initialize_providers_hooks()
1414 all_types = frozenset(self._hooks_lazy_dict) | frozenset(self._hook_provider_dict)
1415 for conn_type in sorted(all_types):
1416 raw_entry = self._hooks_lazy_dict._raw_dict.get(conn_type)
1417 provider_entry = self._hook_provider_dict.get(conn_type)
1418 if isinstance(raw_entry, HookInfo):
1419 hook_name = raw_entry.hook_name
1420 hook_class_name = raw_entry.hook_class_name
1421 elif provider_entry:
1422 hook_name = self._hook_name_dict.get(conn_type, conn_type)
1423 hook_class_name = provider_entry.hook_class_name
1424 else:
1425 hook_name = self._hook_name_dict.get(conn_type, conn_type)
1426 hook_class_name = None
1427 yield ConnectionTypeHookUIMetadata(
1428 connection_type=conn_type,
1429 hook_name=hook_name,
1430 hook_class_name=hook_class_name,
1431 field_behaviour=self._field_behaviours.get(conn_type),
1432 )
1433
1434 @property
1435 def _connection_form_widgets_from_metadata(self) -> dict[str, ConnectionFormWidgetInfo]:
1436 """Return connection form widgets from metadata without importing every hook."""
1437 self.initialize_providers_hooks()
1438 return self._connection_form_widgets
1439
1440 @property
1441 def _field_behaviours_from_metadata(self) -> dict[str, dict]:
1442 """Return field behaviour dicts from metadata without importing every hook."""
1443 self.initialize_providers_hooks()
1444 return self._field_behaviours
1445
1446 @property
1447 def dialects(self) -> MutableMapping[str, DialectInfo]:
1448 """Return dictionary of connection_type-to-dialect mapping."""
1449 self.initialize_providers_hooks()
1450 # When we return dialects here it will only be used to retrieve dialect information
1451 return self._dialect_provider_dict
1452
1453 @property
1454 def plugins(self) -> list[PluginInfo]:
1455 """Returns information about plugins available in providers."""
1456 self.initialize_providers_plugins()
1457 return sorted(self._plugins_set, key=lambda x: x.plugin_class)
1458
1459 @property
1460 def taskflow_decorators(self) -> dict[str, TaskDecorator]:
1461 self.initialize_providers_taskflow_decorator()
1462 return self._taskflow_decorators # type: ignore[return-value]
1463
1464 @property
1465 def extra_links_class_names(self) -> list[str]:
1466 """Returns set of extra link class names."""
1467 self.initialize_providers_extra_links()
1468 return sorted(self._extra_link_class_name_set)
1469
1470 @property
1471 def connection_form_widgets(self) -> dict[str, ConnectionFormWidgetInfo]:
1472 """
1473 Returns widgets for connection forms.
1474
1475 Dictionary keys in the same order that it defined in Hook.
1476 """
1477 self.initialize_providers_hooks()
1478 self._import_info_from_all_hooks()
1479 return self._connection_form_widgets
1480
1481 @property
1482 def field_behaviours(self) -> dict[str, dict]:
1483 """Returns dictionary with field behaviours for connection types."""
1484 self.initialize_providers_hooks()
1485 self._import_info_from_all_hooks()
1486 return self._field_behaviours
1487
1488 @property
1489 def logging_class_names(self) -> list[str]:
1490 """Returns set of log task handlers class names."""
1491 self.initialize_providers_logging()
1492 return sorted(self._logging_class_name_set)
1493
1494 @property
1495 def remote_logging_handlers(self) -> list[RemoteLoggingInfo]:
1496 """Return all remote logging IO handlers contributed by providers."""
1497 self.initialize_providers_remote_logging()
1498 return list(self._remote_logging_info_list)
1499
1500 def remote_logging_handler_by_scheme(self, scheme: str) -> RemoteLoggingInfo | None:
1501 """Return the remote logging IO handler registered for the given URL scheme, if any."""
1502 self.initialize_providers_remote_logging()
1503 return self._remote_logging_by_scheme.get(scheme)
1504
1505 @property
1506 def secrets_backend_class_names(self) -> list[str]:
1507 """Returns set of secret backend class names."""
1508 self.initialize_providers_secrets_backends()
1509 return sorted(self._secrets_backend_class_name_set)
1510
1511 @property
1512 def executor_class_names(self) -> list[str]:
1513 self.initialize_providers_executors()
1514 return sorted(self._executor_class_name_set)
1515
1516 @property
1517 def executor_without_check(self) -> set[tuple[str, str]]:
1518 """Returns set of (executor class names, provider package name) without correctness check."""
1519 self.initialize_providers_executors_without_check()
1520 return self._executor_without_check_set
1521
1522 @property
1523 def queue_class_names(self) -> list[str]:
1524 self.initialize_providers_queues()
1525 return sorted(self._queue_class_name_set)
1526
1527 @property
1528 def db_managers(self) -> list[str]:
1529 self.initialize_providers_db_managers()
1530 return sorted(self._db_manager_class_name_set)
1531
1532 @property
1533 def filesystem_module_names(self) -> list[str]:
1534 self.initialize_providers_filesystems()
1535 return sorted(self._fs_set)
1536
1537 @property
1538 def asset_factories(self) -> dict[str, Callable[..., Asset]]:
1539 self.initialize_providers_asset_uri_resources()
1540 return self._asset_factories
1541
1542 @property
1543 def asset_uri_handlers(self) -> dict[str, Callable[[SplitResult], SplitResult]]:
1544 self.initialize_providers_asset_uri_resources()
1545 return self._asset_uri_handlers
1546
1547 @property
1548 def asset_to_openlineage_converters(
1549 self,
1550 ) -> dict[str, Callable]:
1551 self.initialize_providers_asset_uri_resources()
1552 return self._asset_to_openlineage_converters
1553
1554 @property
1555 def provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
1556 self.initialize_providers_configuration()
1557 return sorted(self._provider_configs.items(), key=lambda x: x[0])
1558
1559 @property
1560 def already_initialized_provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
1561 """
1562 Return provider configs that have already been initialized.
1563
1564 .. deprecated:: 3.2.0
1565 Use ``provider_configs`` instead. This property is kept for backwards
1566 compatibility and will be removed in a future version.
1567 """
1568 warnings.warn(
1569 "already_initialized_provider_configs is deprecated. Use `provider_configs` instead.",
1570 DeprecationWarning,
1571 stacklevel=2,
1572 )
1573 return sorted(self._provider_configs.items(), key=lambda x: x[0])
1574
1575 def _cleanup(self):
1576 self._initialized_cache.clear()
1577 self._provider_dict.clear()
1578 self._fs_set.clear()
1579 self._taskflow_decorators.clear()
1580 self._hook_provider_dict.clear()
1581 self._dialect_provider_dict.clear()
1582 self._hooks_lazy_dict.clear()
1583 self._connection_form_widgets.clear()
1584 self._field_behaviours.clear()
1585 self._extra_link_class_name_set.clear()
1586 self._logging_class_name_set.clear()
1587 self._remote_logging_info_list.clear()
1588 self._remote_logging_by_scheme.clear()
1589 self._auth_manager_class_name_set.clear()
1590 self._auth_manager_without_check_set.clear()
1591 self._secrets_backend_class_name_set.clear()
1592 self._executor_class_name_set.clear()
1593 self._executor_without_check_set.clear()
1594 self._queue_class_name_set.clear()
1595 self._provider_configs.clear()
1596
1597 # Imported lazily to avoid a configuration/providers_manager import cycle during cleanup.
1598 from airflow.configuration import conf
1599
1600 conf.invalidate_cache()
1601
1602 self._trigger_info_set.clear()
1603 self._notification_info_set.clear()
1604 self._plugins_set.clear()
1605 self._cli_command_functions_set.clear()
1606 self._cli_command_provider_name_set.clear()
1607
1608 self._initialized = False
1609 self._initialization_stack_trace = None