1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Base configuration parser with pure parsing logic."""
19
20from __future__ import annotations
21
22import contextlib
23import datetime
24import functools
25import itertools
26import json
27import logging
28import os
29import shlex
30import subprocess
31import sys
32import warnings
33from collections.abc import Callable, Generator, Iterable
34from configparser import ConfigParser, NoOptionError, NoSectionError
35from contextlib import contextmanager
36from copy import deepcopy
37from enum import Enum
38from json.decoder import JSONDecodeError
39from re import Pattern
40from typing import IO, TYPE_CHECKING, Any, TypeVar, overload
41
42from .exceptions import AirflowConfigException
43
44log = logging.getLogger(__name__)
45
46
47def _build_kwarg_env_prefix(section: str, kwargs_key: str) -> str:
48 """
49 Build env prefix for per-key backend kwargs.
50
51 ("secrets", "backend_kwargs") -> "AIRFLOW__SECRETS__BACKEND_KWARG__"
52 ("workers", "secrets_backend_kwargs") -> "AIRFLOW__WORKERS__SECRETS_BACKEND_KWARG__"
53 """
54 singular_key = kwargs_key.replace("_kwargs", "_kwarg")
55 return f"{ENV_VAR_PREFIX}{section.upper()}__{singular_key.upper()}__"
56
57
58def _collect_kwarg_env_vars(prefix: str) -> dict[str, str]:
59 """
60 Scan os.environ for per-key secrets backend kwargs.
61
62 AIRFLOW__SECRETS__BACKEND_KWARG__ROLE_ID -> {"role_id": value}
63 Values are raw strings (not JSON-parsed).
64 Empty keys (trailing __ with no suffix) are ignored.
65 """
66 overrides: dict[str, str] = {}
67 for env_var, value in os.environ.items():
68 if env_var.startswith(prefix):
69 kwarg_key = env_var[len(prefix) :].lower()
70 if kwarg_key:
71 overrides[kwarg_key] = value
72 return overrides
73
74
75ConfigType = str | int | float | bool
76ConfigOptionsDictType = dict[str, ConfigType]
77ConfigSectionSourcesType = dict[str, str | tuple[str, str]]
78ConfigSourcesType = dict[str, ConfigSectionSourcesType]
79ENV_VAR_PREFIX = "AIRFLOW__"
80
81
82if TYPE_CHECKING:
83 from airflow.providers_manager import ProvidersManager
84 from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
85
86
87class ValueNotFound:
88 """Object of this is raised when a configuration value cannot be found."""
89
90 pass
91
92
93VALUE_NOT_FOUND_SENTINEL = ValueNotFound()
94
95
96@overload
97def expand_env_var(env_var: None) -> None: ...
98@overload
99def expand_env_var(env_var: str) -> str: ...
100
101
102def expand_env_var(env_var: str | None) -> str | None:
103 """
104 Expand (potentially nested) env vars.
105
106 Repeat and apply `expandvars` and `expanduser` until
107 interpolation stops having any effect.
108 """
109 if not env_var or not isinstance(env_var, str):
110 return env_var
111 while True:
112 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
113 if interpolated == env_var:
114 return interpolated
115 env_var = interpolated
116
117
118def run_command(command: str) -> str:
119 """Run command and returns stdout."""
120 process = subprocess.Popen(
121 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
122 )
123 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
124
125 if process.returncode != 0:
126 raise AirflowConfigException(
127 f"Cannot execute {command}. Error code is: {process.returncode}. "
128 f"Output: {output}, Stderr: {stderr}"
129 )
130
131 return output
132
133
134def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool:
135 """
136 Check if the config is a template.
137
138 :param configuration_description: description of configuration
139 :param section: section
140 :param key: key
141 :return: True if the config is a template
142 """
143 return configuration_description.get(section, {}).get(key, {}).get("is_template", False)
144
145
146def configure_parser_from_configuration_description(
147 parser: ConfigParser,
148 configuration_description: dict[str, dict[str, Any]],
149 all_vars: dict[str, Any],
150) -> None:
151 """
152 Configure a ConfigParser based on configuration description.
153
154 :param parser: ConfigParser to configure
155 :param configuration_description: configuration description from config.yml
156 """
157 for section, section_desc in configuration_description.items():
158 parser.add_section(section)
159 options = section_desc["options"]
160 for key in options:
161 default_value = options[key]["default"]
162 is_template = options[key].get("is_template", False)
163 if (default_value is not None) and not (
164 options[key].get("version_deprecated") or options[key].get("deprecation_reason")
165 ):
166 if is_template or not isinstance(default_value, str):
167 parser.set(section, key, str(default_value))
168 else:
169 try:
170 parser.set(section, key, default_value.format(**all_vars))
171 except (KeyError, ValueError):
172 parser.set(section, key, default_value)
173
174
175def create_provider_cfg_config_fallback_defaults(
176 provider_config_fallback_defaults_cfg_path: str,
177) -> ConfigParser:
178 """
179 Create fallback defaults for configuration.
180
181 This parser contains provider defaults for Airflow configuration, containing fallback default values
182 that might be needed when provider classes are being imported - before provider's configuration
183 is loaded.
184
185 Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead
186 to retrieving provider configuration before the defaults for the provider are loaded.
187
188 Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or
189 environment variables) those will be used as usual.
190
191 NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication,
192 at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed.
193
194 You've been warned!
195
196 :param provider_config_fallback_defaults_cfg_path: path to the provider config fallback defaults .cfg file
197 """
198 config_parser = ConfigParser()
199 config_parser.read(provider_config_fallback_defaults_cfg_path)
200 return config_parser
201
202
203class AirflowConfigParser(ConfigParser):
204 """
205 Base configuration parser with pure parsing logic.
206
207 This class provides the core parsing methods that work with:
208 - configuration_description: dict describing config options (required in __init__)
209 - _default_values: ConfigParser with default values (required in __init__)
210 - deprecated_options: class attribute mapping new -> old options
211 - deprecated_sections: class attribute mapping new -> old sections
212 """
213
214 # A mapping of section -> setting -> { old, replace } for deprecated default values.
215 # Subclasses can override this to define deprecated values that should be upgraded.
216 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {}
217
218 # A mapping of (new section, new option) -> (old section, old option, since_version).
219 # When reading new option, the old option will be checked to see if it exists. If it does a
220 # DeprecationWarning will be issued and the old option will be used instead
221 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
222 ("dag_processor", "dag_file_processor_timeout"): ("core", "dag_file_processor_timeout", "3.0"),
223 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"),
224 ("api", "base_url"): ("webserver", "base_url", "3.0"),
225 ("api", "host"): ("webserver", "web_server_host", "3.0"),
226 ("api", "port"): ("webserver", "web_server_port", "3.0"),
227 ("api", "workers"): ("webserver", "workers", "3.0"),
228 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"),
229 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"),
230 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"),
231 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"),
232 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"),
233 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"),
234 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"),
235 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"),
236 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"),
237 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"),
238 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"),
239 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"),
240 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"),
241 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"),
242 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"),
243 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"),
244 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"),
245 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"),
246 ("core", "num_dag_runs_to_retain_rendered_fields"): (
247 "core",
248 "max_num_rendered_ti_fields_per_task",
249 "3.2.0",
250 ),
251 ("api", "page_size"): ("webserver", "page_size", "3.1.0"),
252 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"),
253 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"),
254 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"),
255 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"),
256 ("api", "log_config"): ("api", "access_logfile", "3.1.0"),
257 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"),
258 ("api", "fallback_page_limit"): ("api", "page_size", "3.2.0"),
259 ("workers", "missing_dag_retries"): ("workers", "missing_dag_retires", "3.1.8"),
260 ("core", "execution_api_server_url"): ("workers", "execution_api_server_url", "3.0"),
261 ("database", "sql_alchemy_conn"): ("core", "sql_alchemy_conn", "3.0"),
262 }
263
264 # A mapping of new section -> (old section, since_version).
265 deprecated_sections: dict[str, tuple[str, str]] = {}
266
267 @property
268 def _lookup_sequence(self) -> list[Callable]:
269 """
270 Define the sequence of lookup methods for get(). The definition here does not have provider lookup.
271
272 Subclasses can override this to customise lookup order.
273 """
274 lookup_methods = [
275 self._get_environment_variables,
276 self._get_option_from_config_file,
277 self._get_option_from_commands,
278 self._get_option_from_secrets,
279 self._get_option_from_defaults,
280 ]
281 if self._use_providers_configuration:
282 # Provider fallback lookups are last so they have the lowest priority in the lookup sequence.
283 lookup_methods += [
284 self._get_option_from_provider_metadata_config_fallbacks,
285 self._get_option_from_provider_cfg_config_fallbacks,
286 ]
287 return lookup_methods
288
289 @functools.cached_property
290 def configuration_description(self) -> dict[str, dict[str, Any]]:
291 """
292 Return configuration description from multiple sources.
293
294 Respects the ``_use_providers_configuration`` flag to decide whether to include
295 provider configuration.
296
297 The merged description is built as follows:
298
299 1. Start from the base configuration description provided in ``__init__``, usually
300 loaded from ``config.yml`` in core. Values defined here are never overridden.
301 2. Merge provider metadata from ``_provider_metadata_configuration_description``,
302 loaded from provider packages' ``get_provider_info`` method. Only adds missing
303 sections/options; does not overwrite existing entries from the base configuration.
304 3. Merge default values from ``_provider_cfg_config_fallback_default_values``,
305 loaded from ``provider_config_fallback_defaults.cfg``. Only sets ``"default"``
306 (and heuristically ``"sensitive"``) for options that do not already define them.
307
308 Base configuration takes precedence, then provider metadata fills in missing
309 descriptions/options, and finally cfg-based fallbacks provide defaults only where
310 none are defined.
311
312 We use ``cached_property`` to cache the merged result; clear this cache (via
313 ``invalidate_cache``) when toggling ``_use_providers_configuration``.
314 """
315 if not self._use_providers_configuration:
316 return self._configuration_description
317
318 merged_description: dict[str, dict[str, Any]] = deepcopy(self._configuration_description)
319
320 # Merge full provider config descriptions (with metadata like sensitive, description, etc.)
321 # from provider packages' get_provider_info method, reusing the cached raw dict.
322 for section, section_content in self._provider_metadata_configuration_description.items():
323 if section not in merged_description:
324 merged_description[section] = deepcopy(section_content)
325 else:
326 existing_options = merged_description[section].setdefault("options", {})
327 for option, option_content in section_content.get("options", {}).items():
328 if option not in existing_options:
329 existing_options[option] = deepcopy(option_content)
330
331 # Merge default values from cfg-based fallbacks (key=value only, no metadata).
332 # Uses setdefault so provider metadata values above take priority.
333 cfg = self._provider_cfg_config_fallback_default_values
334 for section in cfg.sections():
335 section_options = merged_description.setdefault(section, {"options": {}}).setdefault(
336 "options", {}
337 )
338 for option in cfg.options(section):
339 opt_dict = section_options.setdefault(option, {})
340 opt_dict.setdefault("default", cfg.get(section, option))
341 # For cfg-only options with no provider metadata, infer sensitivity from name.
342 if "sensitive" not in opt_dict and option.endswith(("password", "secret")):
343 opt_dict["sensitive"] = True
344
345 return merged_description
346
347 @property
348 def _config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]:
349 """Override the base method to add provider fallbacks when providers are loaded."""
350 sources: list[tuple[str, ConfigParser]] = []
351 if self._use_providers_configuration:
352 # Provider fallback defaults are listed first so they have the lowest priority
353 # in as_dict()'s "last source wins" semantics.
354 sources += [
355 ("provider-cfg-fallback-defaults", self._provider_cfg_config_fallback_default_values),
356 (
357 "provider-metadata-fallback-defaults",
358 self._provider_metadata_config_fallback_default_values,
359 ),
360 ]
361 sources += [
362 ("default", self._default_values),
363 ("airflow.cfg", self),
364 ]
365 return sources
366
367 def _get_option_from_provider_cfg_config_fallbacks(
368 self,
369 deprecated_key: str | None,
370 deprecated_section: str | None,
371 key: str,
372 section: str,
373 issue_warning: bool = True,
374 extra_stacklevel: int = 0,
375 **kwargs,
376 ) -> str | ValueNotFound:
377 """Get config option from provider fallback defaults."""
378 value = self.get_from_provider_cfg_config_fallback_defaults(section, key, **kwargs)
379 if value is not VALUE_NOT_FOUND_SENTINEL:
380 return value
381 return VALUE_NOT_FOUND_SENTINEL
382
383 def _get_option_from_provider_metadata_config_fallbacks(
384 self,
385 deprecated_key: str | None,
386 deprecated_section: str | None,
387 key: str,
388 section: str,
389 issue_warning: bool = True,
390 extra_stacklevel: int = 0,
391 **kwargs,
392 ) -> str | ValueNotFound:
393 """Get config option from provider metadata fallback defaults."""
394 value = self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs)
395 if value is not VALUE_NOT_FOUND_SENTINEL:
396 return value
397 return VALUE_NOT_FOUND_SENTINEL
398
399 def get_from_provider_cfg_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any:
400 """Get provider config fallback default values."""
401 raw = kwargs.get("raw", False)
402 vars_ = kwargs.get("vars")
403 return self._provider_cfg_config_fallback_default_values.get(
404 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, raw=raw, vars=vars_
405 )
406
407 @functools.cached_property
408 def _provider_metadata_configuration_description(self) -> dict[str, dict[str, Any]]:
409 """Raw provider configuration descriptions with full metadata (sensitive, description, etc.)."""
410 result: dict[str, dict[str, Any]] = {}
411 for _, config in self._provider_manager_type().provider_configs:
412 result.update(config)
413 return result
414
415 @functools.cached_property
416 def _provider_metadata_config_fallback_default_values(self) -> ConfigParser:
417 """Return Provider metadata config fallback default values."""
418 return self._create_default_config_parser_callable(self._provider_metadata_configuration_description)
419
420 def get_from_provider_metadata_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any:
421 """Get provider metadata config fallback default values."""
422 raw = kwargs.get("raw", False)
423 vars_ = kwargs.get("vars")
424 return self._provider_metadata_config_fallback_default_values.get(
425 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, raw=raw, vars=vars_
426 )
427
428 @property
429 def _validators(self) -> list[Callable[[], None]]:
430 """
431 Return list of validators defined on a config parser class. Base class will return an empty list.
432
433 Subclasses can override this to customize the validators that are run during validation on the
434 config parser instance.
435 """
436 return []
437
438 def validate(self) -> None:
439 """Run all registered validators."""
440 for validator in self._validators:
441 validator()
442 self.is_validated = True
443
444 def _validate_deprecated_values(self) -> None:
445 """Validate and upgrade deprecated default values."""
446 for section, replacement in self.deprecated_values.items():
447 for name, info in replacement.items():
448 old, new = info
449 current_value = self.get(section, name, fallback="")
450 if self._using_old_value(old, current_value):
451 self.upgraded_values[(section, name)] = current_value
452 new_value = old.sub(new, current_value)
453 self._update_env_var(section=section, name=name, new_value=new_value)
454 self._create_future_warning(
455 name=name,
456 section=section,
457 current_value=current_value,
458 new_value=new_value,
459 )
460
461 def _using_old_value(self, old: Pattern, current_value: str) -> bool:
462 """Check if current_value matches the old pattern."""
463 return old.search(current_value) is not None
464
465 def _update_env_var(self, section: str, name: str, new_value: str) -> None:
466 """Update environment variable with new value."""
467 env_var = self._env_var_name(section, name)
468 # Set it as an env var so that any subprocesses keep the same override!
469 os.environ[env_var] = new_value
470
471 @staticmethod
472 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None:
473 """Create a FutureWarning for deprecated default values."""
474 warnings.warn(
475 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. "
476 f"This value has been changed to {new_value!r} in the running config, but please update your config.",
477 FutureWarning,
478 stacklevel=3,
479 )
480
481 def __init__(
482 self,
483 configuration_description: dict[str, dict[str, Any]],
484 _default_values: ConfigParser,
485 provider_manager_type: type[ProvidersManager] | type[ProvidersManagerTaskRuntime],
486 create_default_config_parser_callable: Callable[[dict[str, dict[str, Any]]], ConfigParser],
487 provider_config_fallback_defaults_cfg_path: str,
488 *args,
489 **kwargs,
490 ):
491 """
492 Initialize the parser.
493
494 :param configuration_description: Description of configuration options
495 :param _default_values: ConfigParser with default values
496 :param provider_manager_type: Either ProvidersManager or ProvidersManagerTaskRuntime, depending on the context of the caller.
497 :param create_default_config_parser_callable: The `create_default_config_parser` function from core or SDK, depending on the context of the caller.
498 :param provider_config_fallback_defaults_cfg_path: Path to the `provider_config_fallback_defaults.cfg` file.
499 """
500 super().__init__(*args, **kwargs)
501 self._configuration_description = configuration_description
502 self._default_values = _default_values
503 self._provider_manager_type = provider_manager_type
504 self._create_default_config_parser_callable = create_default_config_parser_callable
505 self._provider_cfg_config_fallback_default_values = create_provider_cfg_config_fallback_defaults(
506 provider_config_fallback_defaults_cfg_path
507 )
508 self._suppress_future_warnings = False
509 self.upgraded_values: dict[tuple[str, str], str] = {}
510 # The _use_providers_configuration flag will always be True unless we call `write(include_providers=False)` or `with self.make_sure_configuration_loaded(with_providers=False)`.
511 # Even when we call those methods, the flag will be set back to True after the method is done, so it only affects the current call to `as_dict()` and does not have any effect on subsequent calls.
512 self._use_providers_configuration = True
513
514 def invalidate_cache(self) -> None:
515 """
516 Clear all ``functools.cached_property`` entries on this instance.
517
518 Call this after mutating class-level attributes (e.g. ``deprecated_options``)
519 so that derived cached properties are recomputed on next access.
520 """
521 for attr_name in (
522 name
523 for name in dir(type(self))
524 if isinstance(getattr(type(self), name, None), functools.cached_property)
525 ):
526 self.__dict__.pop(attr_name, None)
527
528 def _invalidate_provider_flag_caches(self) -> None:
529 """Invalidate caches related to provider configuration flags."""
530 self.__dict__.pop("configuration_description", None)
531 self.__dict__.pop("sensitive_config_values", None)
532
533 @functools.cached_property
534 def inversed_deprecated_options(self):
535 """Build inverse mapping from old options to new options."""
536 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()}
537
538 @functools.cached_property
539 def inversed_deprecated_sections(self):
540 """Build inverse mapping from old sections to new sections."""
541 return {
542 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items()
543 }
544
545 @functools.cached_property
546 def sensitive_config_values(self) -> set[tuple[str, str]]:
547 """Get set of sensitive config values that should be masked."""
548 flattened = {
549 (s, k): item
550 for s, s_c in self.configuration_description.items()
551 for k, item in s_c.get("options", {}).items()
552 }
553 sensitive = {
554 (section.lower(), key.lower())
555 for (section, key), v in flattened.items()
556 if v.get("sensitive") is True
557 }
558 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options}
559 depr_section = {
560 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections
561 }
562 sensitive.update(depr_section, depr_option)
563 return sensitive
564
565 def _update_defaults_from_string(self, config_string: str) -> None:
566 """
567 Update the defaults in _default_values based on values in config_string ("ini" format).
568
569 Override shared parser's method to add validation for template variables.
570 Note that those values are not validated and cannot contain variables because we are using
571 regular config parser to load them. This method is used to test the config parser in unit tests.
572
573 :param config_string: ini-formatted config string
574 """
575 parser = ConfigParser()
576 parser.read_string(config_string)
577 for section in parser.sections():
578 if section not in self._default_values.sections():
579 self._default_values.add_section(section)
580 errors = False
581 for key, value in parser.items(section):
582 if not self.is_template(section, key) and "{" in value:
583 errors = True
584 log.error(
585 "The %s.%s value %s read from string contains variable. This is not supported",
586 section,
587 key,
588 value,
589 )
590 self._default_values.set(section, key, value)
591 if errors:
592 raise AirflowConfigException(
593 f"The string config passed as default contains variables. "
594 f"This is not supported. String config: {config_string}"
595 )
596
597 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any:
598 """
599 Retrieve default value from default config parser, including provider fallbacks.
600
601 This will retrieve the default value from the core default config parser first. If not found
602 and providers configuration is loaded, it also checks provider fallback defaults.
603 Optionally a raw, stored value can be retrieved by setting skip_interpolation to True.
604 This is useful for example when we want to write the default value to a file, and we don't
605 want the interpolation to happen as it is going to be done later when the config is read.
606
607 :param section: section of the config
608 :param key: key to use
609 :param fallback: fallback value to use
610 :param raw: if raw, then interpolation will be reversed
611 :param kwargs: other args
612 :return:
613 """
614 value = self._default_values.get(section, key, fallback=VALUE_NOT_FOUND_SENTINEL, **kwargs)
615 # Provider metadata has higher priority than cfg fallback — check it first.
616 if value is VALUE_NOT_FOUND_SENTINEL and self._use_providers_configuration:
617 value = self._provider_metadata_config_fallback_default_values.get(
618 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, **kwargs
619 )
620 if value is VALUE_NOT_FOUND_SENTINEL and self._use_providers_configuration:
621 value = self._provider_cfg_config_fallback_default_values.get(
622 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, **kwargs
623 )
624 if value is VALUE_NOT_FOUND_SENTINEL:
625 value = fallback
626 if raw and isinstance(value, str):
627 return value.replace("%", "%%")
628 return value
629
630 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None:
631 """
632 Get Secret Backend if defined in airflow.cfg.
633
634 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
635 """
636 section = "workers" if worker_mode else "secrets"
637 key = "secrets_backend" if worker_mode else "backend"
638 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs"
639
640 secrets_backend_cls = self.getimport(section=section, key=key)
641
642 if not secrets_backend_cls:
643 if worker_mode:
644 # if we find no secrets backend for worker, return that of secrets backend
645 secrets_backend_cls = self.getimport(section="secrets", key="backend")
646 if not secrets_backend_cls:
647 return None
648 # When falling back to secrets backend, use its kwargs
649 kwargs_key = "backend_kwargs"
650 section = "secrets"
651 else:
652 return None
653
654 try:
655 backend_kwargs = self.getjson(section=section, key=kwargs_key)
656 if not backend_kwargs:
657 backend_kwargs = {}
658 elif not isinstance(backend_kwargs, dict):
659 raise ValueError("not a dict")
660 except AirflowConfigException:
661 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key)
662 backend_kwargs = {}
663 except ValueError:
664 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key)
665 backend_kwargs = {}
666
667 # Collect per-key overrides; they take precedence over the JSON blob.
668 env_prefix = _build_kwarg_env_prefix(section, kwargs_key)
669 backend_kwargs.update(_collect_kwarg_env_vars(env_prefix))
670
671 return secrets_backend_cls(**backend_kwargs)
672
673 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
674 """
675 Get Config option values from Secret Backend.
676
677 Called by the shared parser's _get_secret_option() method as part of the lookup chain.
678 Uses _get_custom_secret_backend() to get the backend instance.
679
680 :param config_key: the config key to retrieve
681 :return: config value or None
682 """
683 try:
684 secrets_client = self._get_custom_secret_backend()
685 if not secrets_client:
686 return None
687 return secrets_client.get_config(config_key)
688 except Exception as e:
689 raise AirflowConfigException(
690 "Cannot retrieve config from alternative secrets backend. "
691 "Make sure it is configured properly and that the Backend "
692 "is accessible.\n"
693 f"{e}"
694 )
695
696 def _get_cmd_option_from_config_sources(
697 self, config_sources: ConfigSourcesType, section: str, key: str
698 ) -> str | None:
699 fallback_key = key + "_cmd"
700 if (section, key) in self.sensitive_config_values:
701 section_dict = config_sources.get(section)
702 if section_dict is not None:
703 command_value = section_dict.get(fallback_key)
704 if command_value is not None:
705 if isinstance(command_value, str):
706 command = command_value
707 else:
708 command = command_value[0]
709 return run_command(command)
710 return None
711
712 def _get_secret_option_from_config_sources(
713 self, config_sources: ConfigSourcesType, section: str, key: str
714 ) -> str | None:
715 fallback_key = key + "_secret"
716 if (section, key) in self.sensitive_config_values:
717 section_dict = config_sources.get(section)
718 if section_dict is not None:
719 secrets_path_value = section_dict.get(fallback_key)
720 if secrets_path_value is not None:
721 if isinstance(secrets_path_value, str):
722 secrets_path = secrets_path_value
723 else:
724 secrets_path = secrets_path_value[0]
725 return self._get_config_value_from_secret_backend(secrets_path)
726 return None
727
728 def _include_secrets(
729 self,
730 config_sources: ConfigSourcesType,
731 display_sensitive: bool,
732 display_source: bool,
733 raw: bool,
734 ):
735 for section, key in self.sensitive_config_values:
736 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key)
737 if value:
738 if not display_sensitive:
739 value = "< hidden >"
740 if display_source:
741 opt: str | tuple[str, str] = (value, "secret")
742 elif raw:
743 opt = value.replace("%", "%%")
744 else:
745 opt = value
746 config_sources.setdefault(section, {}).update({key: opt})
747 del config_sources[section][key + "_secret"]
748
749 def _include_commands(
750 self,
751 config_sources: ConfigSourcesType,
752 display_sensitive: bool,
753 display_source: bool,
754 raw: bool,
755 ):
756 for section, key in self.sensitive_config_values:
757 opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
758 if not opt:
759 continue
760 opt_to_set: str | tuple[str, str] | None = opt
761 if not display_sensitive:
762 opt_to_set = "< hidden >"
763 if display_source:
764 opt_to_set = (str(opt_to_set), "cmd")
765 elif raw:
766 opt_to_set = str(opt_to_set).replace("%", "%%")
767 if opt_to_set is not None:
768 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set}
769 config_sources.setdefault(section, {}).update(dict_to_update)
770 del config_sources[section][key + "_cmd"]
771
772 def _include_envs(
773 self,
774 config_sources: ConfigSourcesType,
775 display_sensitive: bool,
776 display_source: bool,
777 raw: bool,
778 ):
779 for env_var in [
780 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
781 ]:
782 try:
783 _, section, key = env_var.split("__", 2)
784 opt = self._get_env_var_option(section, key)
785 except ValueError:
786 continue
787 if opt is None:
788 log.warning("Ignoring unknown env var '%s'", env_var)
789 continue
790 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"):
791 # Don't hide cmd/secret values here
792 if not env_var.lower().endswith(("cmd", "secret")):
793 if (section, key) in self.sensitive_config_values:
794 opt = "< hidden >"
795 elif raw:
796 opt = opt.replace("%", "%%")
797 if display_source:
798 opt = (opt, "env var")
799
800 section = section.lower()
801 key = key.lower()
802 config_sources.setdefault(section, {}).update({key: opt})
803
804 def _filter_by_source(
805 self,
806 config_sources: ConfigSourcesType,
807 display_source: bool,
808 getter_func,
809 ):
810 """
811 Delete default configs from current configuration.
812
813 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values.
814
815 This is necessary because bare configs take precedence over the command
816 or secret key equivalents so if the current running config is
817 materialized with Airflow defaults they in turn override user set
818 command or secret key configs.
819
820 :param config_sources: The current configuration to operate on
821 :param display_source: If False, configuration options contain raw
822 values. If True, options are a tuple of (option_value, source).
823 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
824 :param getter_func: A callback function that gets the user configured
825 override value for a particular sensitive_config_values config.
826 :return: None, the given config_sources is filtered if necessary,
827 otherwise untouched.
828 """
829 for section, key in self.sensitive_config_values:
830 # Don't bother if we don't have section / key
831 if section not in config_sources or key not in config_sources[section]:
832 continue
833 # Check that there is something to override defaults
834 try:
835 getter_opt = getter_func(section, key)
836 except ValueError:
837 continue
838 if not getter_opt:
839 continue
840 # Check to see that there is a default value
841 if self.get_default_value(section, key) is None:
842 continue
843 # Check to see if bare setting is the same as defaults
844 if display_source:
845 # when display_source = true, we know that the config_sources contains tuple
846 opt, source = config_sources[section][key] # type: ignore
847 else:
848 opt = config_sources[section][key] # type: ignore[assignment]
849 if opt == self.get_default_value(section, key):
850 del config_sources[section][key]
851
852 @staticmethod
853 def _deprecated_value_is_set_in_config(
854 deprecated_section: str,
855 deprecated_key: str,
856 configs: Iterable[tuple[str, ConfigParser]],
857 ) -> bool:
858 for config_type, config in configs:
859 if config_type != "default":
860 with contextlib.suppress(NoSectionError):
861 deprecated_section_array = config.items(section=deprecated_section, raw=True)
862 if any(key == deprecated_key for key, _ in deprecated_section_array):
863 return True
864 return False
865
866 @staticmethod
867 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
868 return (
869 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}")
870 is not None
871 )
872
873 @staticmethod
874 def _deprecated_command_is_set_in_config(
875 deprecated_section: str,
876 deprecated_key: str,
877 configs: Iterable[tuple[str, ConfigParser]],
878 ) -> bool:
879 return AirflowConfigParser._deprecated_value_is_set_in_config(
880 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
881 )
882
883 @staticmethod
884 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
885 return (
886 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD")
887 is not None
888 )
889
890 @staticmethod
891 def _deprecated_secret_is_set_in_config(
892 deprecated_section: str,
893 deprecated_key: str,
894 configs: Iterable[tuple[str, ConfigParser]],
895 ) -> bool:
896 return AirflowConfigParser._deprecated_value_is_set_in_config(
897 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
898 )
899
900 @staticmethod
901 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
902 return (
903 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET")
904 is not None
905 )
906
907 @staticmethod
908 def _replace_config_with_display_sources(
909 config_sources: ConfigSourcesType,
910 configs: Iterable[tuple[str, ConfigParser]],
911 configuration_description: dict[str, dict[str, Any]],
912 display_source: bool,
913 raw: bool,
914 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
915 include_env: bool,
916 include_cmds: bool,
917 include_secret: bool,
918 ):
919 for source_name, config in configs:
920 sections = config.sections()
921 for section in sections:
922 AirflowConfigParser._replace_section_config_with_display_sources(
923 config,
924 config_sources,
925 configuration_description,
926 display_source,
927 raw,
928 section,
929 source_name,
930 deprecated_options,
931 configs,
932 include_env=include_env,
933 include_cmds=include_cmds,
934 include_secret=include_secret,
935 )
936
937 @staticmethod
938 def _replace_section_config_with_display_sources(
939 config: ConfigParser,
940 config_sources: ConfigSourcesType,
941 configuration_description: dict[str, dict[str, Any]],
942 display_source: bool,
943 raw: bool,
944 section: str,
945 source_name: str,
946 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
947 configs: Iterable[tuple[str, ConfigParser]],
948 include_env: bool,
949 include_cmds: bool,
950 include_secret: bool,
951 ):
952 sect = config_sources.setdefault(section, {})
953 if isinstance(config, AirflowConfigParser):
954 with config.suppress_future_warnings():
955 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw)
956 else:
957 items = config.items(section=section, raw=raw)
958 for k, val in items:
959 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
960 if deprecated_section and deprecated_key:
961 if source_name == "default":
962 # If deprecated entry has some non-default value set for any of the sources requested,
963 # We should NOT set default for the new entry (because it will override anything
964 # coming from the deprecated ones)
965 if AirflowConfigParser._deprecated_value_is_set_in_config(
966 deprecated_section, deprecated_key, configs
967 ):
968 continue
969 if include_env and AirflowConfigParser._deprecated_variable_is_set(
970 deprecated_section, deprecated_key
971 ):
972 continue
973 if include_cmds and (
974 AirflowConfigParser._deprecated_variable_command_is_set(
975 deprecated_section, deprecated_key
976 )
977 or AirflowConfigParser._deprecated_command_is_set_in_config(
978 deprecated_section, deprecated_key, configs
979 )
980 ):
981 continue
982 if include_secret and (
983 AirflowConfigParser._deprecated_variable_secret_is_set(
984 deprecated_section, deprecated_key
985 )
986 or AirflowConfigParser._deprecated_secret_is_set_in_config(
987 deprecated_section, deprecated_key, configs
988 )
989 ):
990 continue
991 if display_source:
992 updated_source_name = source_name
993 if source_name == "default":
994 # defaults can come from other sources (default-<PROVIDER>) that should be used here
995 source_description_section = configuration_description.get(section, {})
996 source_description_key = source_description_section.get("options", {}).get(k, {})
997 if source_description_key is not None:
998 updated_source_name = source_description_key.get("source", source_name)
999 sect[k] = (val, updated_source_name)
1000 else:
1001 sect[k] = val
1002
1003 def _warn_deprecate(
1004 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int
1005 ):
1006 """Warn about deprecated config option usage."""
1007 if section == deprecated_section:
1008 warnings.warn(
1009 f"The {deprecated_name} option in [{section}] has been renamed to {key} - "
1010 f"the old setting has been used, but please update your config.",
1011 DeprecationWarning,
1012 stacklevel=4 + extra_stacklevel,
1013 )
1014 else:
1015 warnings.warn(
1016 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option "
1017 f"in [{section}] - the old setting has been used, but please update your config.",
1018 DeprecationWarning,
1019 stacklevel=4 + extra_stacklevel,
1020 )
1021
1022 @contextmanager
1023 def suppress_future_warnings(self):
1024 """
1025 Context manager to temporarily suppress future warnings.
1026
1027 This is a stub used by the shared parser's lookup methods when checking deprecated options.
1028 Subclasses can override this to customize warning suppression behavior.
1029
1030 :return: context manager that suppresses future warnings
1031 """
1032 suppress_future_warnings = self._suppress_future_warnings
1033 self._suppress_future_warnings = True
1034 yield self
1035 self._suppress_future_warnings = suppress_future_warnings
1036
1037 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str:
1038 """Generate environment variable name for a config option."""
1039 team_component: str = f"{team_name.upper()}___" if team_name else ""
1040 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}"
1041
1042 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None):
1043 """Get config option from environment variable."""
1044 env_var: str = self._env_var_name(section, key, team_name=team_name)
1045 if env_var in os.environ:
1046 return expand_env_var(os.environ[env_var])
1047 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
1048 env_var_cmd = env_var + "_CMD"
1049 if env_var_cmd in os.environ:
1050 # if this is a valid command key...
1051 if (section, key) in self.sensitive_config_values:
1052 return run_command(os.environ[env_var_cmd])
1053 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
1054 env_var_secret_path = env_var + "_SECRET"
1055 if env_var_secret_path in os.environ:
1056 # if this is a valid secret path...
1057 if (section, key) in self.sensitive_config_values:
1058 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path])
1059 return None
1060
1061 def _get_cmd_option(self, section: str, key: str):
1062 """Get config option from command execution."""
1063 fallback_key = key + "_cmd"
1064 if (section, key) in self.sensitive_config_values:
1065 if super().has_option(section, fallback_key):
1066 command = super().get(section, fallback_key)
1067 try:
1068 cmd_output = run_command(command)
1069 except AirflowConfigException as e:
1070 raise e
1071 except Exception as e:
1072 raise AirflowConfigException(
1073 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd."
1074 f" Please check the {fallback_key} value."
1075 ) from e
1076 return cmd_output
1077 return None
1078
1079 def _get_secret_option(self, section: str, key: str) -> str | None:
1080 """Get Config option values from Secret Backend."""
1081 fallback_key = key + "_secret"
1082 if (section, key) in self.sensitive_config_values:
1083 if super().has_option(section, fallback_key):
1084 secrets_path = super().get(section, fallback_key)
1085 return self._get_config_value_from_secret_backend(secrets_path)
1086 return None
1087
1088 def _get_environment_variables(
1089 self,
1090 deprecated_key: str | None,
1091 deprecated_section: str | None,
1092 key: str,
1093 section: str,
1094 issue_warning: bool = True,
1095 extra_stacklevel: int = 0,
1096 **kwargs,
1097 ) -> str | ValueNotFound:
1098 """Get config option from environment variables."""
1099 team_name = kwargs.get("team_name", None)
1100 option = self._get_env_var_option(section, key, team_name=team_name)
1101 if option is not None:
1102 return option
1103 if deprecated_section and deprecated_key:
1104 with self.suppress_future_warnings():
1105 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name)
1106 if option is not None:
1107 if issue_warning:
1108 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
1109 return option
1110 return VALUE_NOT_FOUND_SENTINEL
1111
1112 def _get_option_from_config_file(
1113 self,
1114 deprecated_key: str | None,
1115 deprecated_section: str | None,
1116 key: str,
1117 section: str,
1118 issue_warning: bool = True,
1119 extra_stacklevel: int = 0,
1120 **kwargs,
1121 ) -> str | ValueNotFound:
1122 """Get config option from config file."""
1123 if team_name := kwargs.get("team_name", None):
1124 section = f"{team_name}={section}"
1125 # since this is the last lookup that supports team_name, pop it
1126 kwargs.pop("team_name")
1127 if super().has_option(section, key):
1128 return expand_env_var(super().get(section, key, **kwargs))
1129 if deprecated_section and deprecated_key:
1130 if super().has_option(deprecated_section, deprecated_key):
1131 if issue_warning:
1132 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
1133 with self.suppress_future_warnings():
1134 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
1135 return VALUE_NOT_FOUND_SENTINEL
1136
1137 def _get_option_from_commands(
1138 self,
1139 deprecated_key: str | None,
1140 deprecated_section: str | None,
1141 key: str,
1142 section: str,
1143 issue_warning: bool = True,
1144 extra_stacklevel: int = 0,
1145 **kwargs,
1146 ) -> str | ValueNotFound:
1147 """Get config option from command execution."""
1148 if kwargs.get("team_name", None):
1149 # Commands based team config fetching is not currently supported
1150 return VALUE_NOT_FOUND_SENTINEL
1151 option = self._get_cmd_option(section, key)
1152 if option:
1153 return option
1154 if deprecated_section and deprecated_key:
1155 with self.suppress_future_warnings():
1156 option = self._get_cmd_option(deprecated_section, deprecated_key)
1157 if option:
1158 if issue_warning:
1159 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
1160 return option
1161 return VALUE_NOT_FOUND_SENTINEL
1162
1163 def _get_option_from_secrets(
1164 self,
1165 deprecated_key: str | None,
1166 deprecated_section: str | None,
1167 key: str,
1168 section: str,
1169 issue_warning: bool = True,
1170 extra_stacklevel: int = 0,
1171 **kwargs,
1172 ) -> str | ValueNotFound:
1173 """Get config option from secrets backend."""
1174 if kwargs.get("team_name", None):
1175 # Secrets based team config fetching is not currently supported
1176 return VALUE_NOT_FOUND_SENTINEL
1177 option = self._get_secret_option(section, key)
1178 if option:
1179 return option
1180 if deprecated_section and deprecated_key:
1181 with self.suppress_future_warnings():
1182 option = self._get_secret_option(deprecated_section, deprecated_key)
1183 if option:
1184 if issue_warning:
1185 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
1186 return option
1187 return VALUE_NOT_FOUND_SENTINEL
1188
1189 def _get_option_from_defaults(
1190 self,
1191 deprecated_key: str | None,
1192 deprecated_section: str | None,
1193 key: str,
1194 section: str,
1195 issue_warning: bool = True,
1196 extra_stacklevel: int = 0,
1197 team_name: str | None = None,
1198 **kwargs,
1199 ) -> str | ValueNotFound:
1200 """Get config option from default values."""
1201 if self.get_default_value(section, key) is not None or "fallback" in kwargs:
1202 return expand_env_var(self.get_default_value(section, key, **kwargs))
1203 return VALUE_NOT_FOUND_SENTINEL
1204
1205 def _resolve_deprecated_lookup(
1206 self,
1207 section: str,
1208 key: str,
1209 lookup_from_deprecated: bool,
1210 extra_stacklevel: int = 0,
1211 ) -> tuple[str, str, str | None, str | None, bool]:
1212 """
1213 Resolve deprecated section/key mappings and determine deprecated values.
1214
1215 :param section: Section name (will be lowercased)
1216 :param key: Key name (will be lowercased)
1217 :param lookup_from_deprecated: Whether to lookup from deprecated options
1218 :param extra_stacklevel: Extra stack level for warnings
1219 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted)
1220 """
1221 section = section.lower()
1222 key = key.lower()
1223 warning_emitted = False
1224 deprecated_section: str | None = None
1225 deprecated_key: str | None = None
1226
1227 if not lookup_from_deprecated:
1228 return section, key, deprecated_section, deprecated_key, warning_emitted
1229
1230 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {})
1231 if option_description.get("deprecated"):
1232 deprecation_reason = option_description.get("deprecation_reason", "")
1233 warnings.warn(
1234 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}",
1235 DeprecationWarning,
1236 stacklevel=2 + extra_stacklevel,
1237 )
1238 # For the cases in which we rename whole sections
1239 if section in self.inversed_deprecated_sections:
1240 deprecated_section, deprecated_key = (section, key)
1241 section = self.inversed_deprecated_sections[section]
1242 if not self._suppress_future_warnings:
1243 warnings.warn(
1244 f"The config section [{deprecated_section}] has been renamed to "
1245 f"[{section}]. Please update your `conf.get*` call to use the new name",
1246 FutureWarning,
1247 stacklevel=2 + extra_stacklevel,
1248 )
1249 # Don't warn about individual rename if the whole section is renamed
1250 warning_emitted = True
1251 elif (section, key) in self.inversed_deprecated_options:
1252 # Handle using deprecated section/key instead of the new section/key
1253 new_section, new_key = self.inversed_deprecated_options[(section, key)]
1254 if not self._suppress_future_warnings and not warning_emitted:
1255 warnings.warn(
1256 f"section/key [{section}/{key}] has been deprecated, you should use"
1257 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
1258 "new name",
1259 FutureWarning,
1260 stacklevel=2 + extra_stacklevel,
1261 )
1262 warning_emitted = True
1263 deprecated_section, deprecated_key = section, key
1264 section, key = (new_section, new_key)
1265 elif section in self.deprecated_sections:
1266 # When accessing the new section name, make sure we check under the old config name
1267 deprecated_key = key
1268 deprecated_section = self.deprecated_sections[section][0]
1269 else:
1270 deprecated_section, deprecated_key, _ = self.deprecated_options.get(
1271 (section, key), (None, None, None)
1272 )
1273
1274 return section, key, deprecated_section, deprecated_key, warning_emitted
1275
1276 def load_providers_configuration(self) -> None:
1277 """
1278 Load configuration for providers.
1279
1280 .. deprecated:: 3.2.0
1281 Provider configuration is now loaded lazily via the ``configuration_description``
1282 cached property. This method is kept for backwards compatibility and will be
1283 removed in a future version.
1284 """
1285 warnings.warn(
1286 "load_providers_configuration() is deprecated. "
1287 "Provider configuration is now loaded lazily via the "
1288 "`configuration_description` cached property.",
1289 DeprecationWarning,
1290 stacklevel=2,
1291 )
1292 self._use_providers_configuration = True
1293 self._invalidate_provider_flag_caches()
1294
1295 def restore_core_default_configuration(self) -> None:
1296 """
1297 Restore the parser state before provider-contributed sections were loaded.
1298
1299 .. deprecated:: 3.2.0
1300 Use ``make_sure_configuration_loaded(with_providers=False)`` context manager
1301 instead. This method is kept for backwards compatibility and will be removed
1302 in a future version.
1303 """
1304 warnings.warn(
1305 "restore_core_default_configuration() is deprecated. "
1306 "Use `make_sure_configuration_loaded(with_providers=False)` instead.",
1307 DeprecationWarning,
1308 stacklevel=2,
1309 )
1310 self._use_providers_configuration = False
1311 self._invalidate_provider_flag_caches()
1312
1313 @overload # type: ignore[override]
1314 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ...
1315
1316 @overload # type: ignore[override]
1317 def get(self, section: str, key: str, **kwargs) -> str | None: ...
1318
1319 def get( # type: ignore[misc, override]
1320 self,
1321 section: str,
1322 key: str,
1323 suppress_warnings: bool = False,
1324 lookup_from_deprecated: bool = True,
1325 _extra_stacklevel: int = 0,
1326 team_name: str | None = None,
1327 **kwargs,
1328 ) -> str | None:
1329 """
1330 Get config value by iterating through lookup sequence.
1331
1332 Priority order is defined by _lookup_sequence property.
1333 """
1334 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup(
1335 section=section,
1336 key=key,
1337 lookup_from_deprecated=lookup_from_deprecated,
1338 extra_stacklevel=_extra_stacklevel,
1339 )
1340
1341 if team_name is not None:
1342 kwargs["team_name"] = team_name
1343
1344 for lookup_method in self._lookup_sequence:
1345 value = lookup_method(
1346 deprecated_key=deprecated_key,
1347 deprecated_section=deprecated_section,
1348 key=key,
1349 section=section,
1350 issue_warning=not warning_emitted,
1351 extra_stacklevel=_extra_stacklevel,
1352 **kwargs,
1353 )
1354 if value is not VALUE_NOT_FOUND_SENTINEL:
1355 return value
1356
1357 # Check if fallback was explicitly provided (even if None)
1358 if "fallback" in kwargs:
1359 return kwargs["fallback"]
1360
1361 if not suppress_warnings:
1362 log.warning("section/key [%s/%s] not found in config", section, key)
1363
1364 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
1365
1366 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override]
1367 """Get config value as boolean."""
1368 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip()
1369 if "#" in val:
1370 val = val.split("#")[0].strip()
1371 if val in ("t", "true", "1"):
1372 return True
1373 if val in ("f", "false", "0"):
1374 return False
1375 raise AirflowConfigException(
1376 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. '
1377 f'Current value: "{val}".'
1378 )
1379
1380 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override]
1381 """Get config value as integer."""
1382 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1383 if val is None:
1384 raise AirflowConfigException(
1385 f"Failed to convert value None to int. "
1386 f'Please check "{key}" key in "{section}" section is set.'
1387 )
1388 try:
1389 return int(val)
1390 except ValueError:
1391 try:
1392 if (float_val := float(val)) != (int_val := int(float_val)):
1393 raise ValueError
1394 return int_val
1395 except (ValueError, OverflowError):
1396 raise AirflowConfigException(
1397 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1398 f'Current value: "{val}".'
1399 )
1400
1401 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override]
1402 """Get config value as float."""
1403 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1404 if val is None:
1405 raise AirflowConfigException(
1406 f"Failed to convert value None to float. "
1407 f'Please check "{key}" key in "{section}" section is set.'
1408 )
1409 try:
1410 return float(val)
1411 except ValueError:
1412 raise AirflowConfigException(
1413 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. '
1414 f'Current value: "{val}".'
1415 )
1416
1417 def getlist(self, section: str, key: str, delimiter=",", **kwargs):
1418 """Get config value as list."""
1419 val = self.get(section, key, **kwargs)
1420
1421 if isinstance(val, list) or val is None:
1422 # `get` will always return a (possibly-empty) string, so the only way we can
1423 # have these types is with `fallback=` was specified. So just return it.
1424 return val
1425
1426 if val == "":
1427 return []
1428
1429 try:
1430 return [item.strip() for item in val.split(delimiter)]
1431 except Exception:
1432 raise AirflowConfigException(
1433 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. '
1434 f'Current value: "{val}".'
1435 )
1436
1437 E = TypeVar("E", bound=Enum)
1438
1439 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E:
1440 """Get config value as enum."""
1441 val = self.get(section, key, **kwargs)
1442 enum_names = [enum_item.name for enum_item in enum_class]
1443
1444 if val is None:
1445 raise AirflowConfigException(
1446 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1447 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}'
1448 )
1449
1450 try:
1451 return enum_class[val]
1452 except KeyError:
1453 if "fallback" in kwargs and kwargs["fallback"] in enum_names:
1454 return enum_class[kwargs["fallback"]]
1455 raise AirflowConfigException(
1456 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1457 f"the value must be one of {', '.join(enum_names)}"
1458 )
1459
1460 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]:
1461 """Get config value as list of enums."""
1462 kwargs.setdefault("fallback", [])
1463 string_list = self.getlist(section, key, delimiter, **kwargs)
1464
1465 enum_names = [enum_item.name for enum_item in enum_class]
1466 enum_list = []
1467
1468 for val in string_list:
1469 try:
1470 enum_list.append(enum_class[val])
1471 except KeyError:
1472 log.warning(
1473 "Failed to convert value %r. Please check %s key in %s section. "
1474 "it must be one of %s, if not the value is ignored",
1475 val,
1476 key,
1477 section,
1478 ", ".join(enum_names),
1479 )
1480
1481 return enum_list
1482
1483 def getimport(self, section: str, key: str, **kwargs) -> Any:
1484 """
1485 Read options, import the full qualified name, and return the object.
1486
1487 In case of failure, it throws an exception with the key and section names
1488
1489 :return: The object or None, if the option is empty
1490 """
1491 # Fixed: use self.get() instead of conf.get()
1492 full_qualified_path = self.get(section=section, key=key, **kwargs)
1493 if not full_qualified_path:
1494 return None
1495
1496 try:
1497 # Import here to avoid circular dependency
1498 from ..module_loading import import_string
1499
1500 return import_string(full_qualified_path)
1501 except ImportError as e:
1502 log.warning(e)
1503 raise AirflowConfigException(
1504 f'The object could not be loaded. Please check "{key}" key in "{section}" section. '
1505 f'Current value: "{full_qualified_path}".'
1506 )
1507
1508 def getjson(
1509 self, section: str, key: str, fallback=None, **kwargs
1510 ) -> dict | list | str | int | float | None:
1511 """
1512 Return a config value parsed from a JSON string.
1513
1514 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given.
1515 """
1516 try:
1517 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs)
1518 except (NoSectionError, NoOptionError):
1519 data = None
1520
1521 if data is None or data == "":
1522 return fallback
1523
1524 try:
1525 return json.loads(data)
1526 except JSONDecodeError as e:
1527 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e
1528
1529 def gettimedelta(
1530 self, section: str, key: str, fallback: Any = None, **kwargs
1531 ) -> datetime.timedelta | None:
1532 """
1533 Get the config value for the given section and key, and convert it into datetime.timedelta object.
1534
1535 If the key is missing, then it is considered as `None`.
1536
1537 :param section: the section from the config
1538 :param key: the key defined in the given section
1539 :param fallback: fallback value when no config value is given, defaults to None
1540 :raises AirflowConfigException: raised because ValueError or OverflowError
1541 :return: datetime.timedelta(seconds=<config_value>) or None
1542 """
1543 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs)
1544
1545 if val:
1546 # the given value must be convertible to integer
1547 try:
1548 int_val = int(val)
1549 except ValueError:
1550 raise AirflowConfigException(
1551 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1552 f'Current value: "{val}".'
1553 )
1554
1555 try:
1556 return datetime.timedelta(seconds=int_val)
1557 except OverflowError as err:
1558 raise AirflowConfigException(
1559 f"Failed to convert value to timedelta in `seconds`. "
1560 f"{err}. "
1561 f'Please check "{key}" key in "{section}" section. Current value: "{val}".'
1562 )
1563
1564 return fallback
1565
1566 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
1567 """Get mandatory config value, raising ValueError if not found."""
1568 value = self.get(section, key, _extra_stacklevel=1, **kwargs)
1569 if value is None:
1570 raise ValueError(f"The value {section}/{key} should be set!")
1571 return value
1572
1573 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]:
1574 """Get mandatory config value as list, raising ValueError if not found."""
1575 value = self.getlist(section, key, **kwargs)
1576 if value is None:
1577 raise ValueError(f"The value {section}/{key} should be set!")
1578 return value
1579
1580 def read( # type: ignore[override]
1581 self,
1582 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike],
1583 encoding: str | None = None,
1584 ) -> list[str]:
1585 return super().read(filenames=filenames, encoding=encoding) # type: ignore[arg-type,return-value]
1586
1587 def read_dict( # type: ignore[override]
1588 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>"
1589 ) -> None:
1590 """
1591 We define a different signature here to add better type hints and checking.
1592
1593 :param dictionary: dictionary to read from
1594 :param source: source to be used to store the configuration
1595 :return:
1596 """
1597 super().read_dict(dictionary=dictionary, source=source)
1598
1599 def _has_section_in_any_defaults(self, section: str) -> bool:
1600 """Check if section exists in core defaults or provider fallback defaults."""
1601 if self._default_values.has_section(section):
1602 return True
1603 if self._use_providers_configuration:
1604 if self._provider_cfg_config_fallback_default_values.has_section(section):
1605 return True
1606 if self._provider_metadata_config_fallback_default_values.has_section(section):
1607 return True
1608 return False
1609
1610 def get_sections_including_defaults(self) -> list[str]:
1611 """
1612 Retrieve all sections from the configuration parser, including sections defined by built-in defaults.
1613
1614 :return: list of section names
1615 """
1616 sections_from_config = self.sections()
1617 sections_from_description = list(self.configuration_description.keys())
1618 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config)))
1619
1620 def get_options_including_defaults(self, section: str) -> list[str]:
1621 """
1622 Retrieve all possible options from the configuration parser for the section given.
1623
1624 Includes options defined by built-in defaults.
1625
1626 :param section: section name
1627 :return: list of option names for the section given
1628 """
1629 my_own_options = self.options(section) if self.has_section(section) else []
1630 all_options_from_defaults = list(
1631 self.configuration_description.get(section, {}).get("options", {}).keys()
1632 )
1633 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options)))
1634
1635 def has_option( # type: ignore[override]
1636 self, section: str, option: str, lookup_from_deprecated: bool = True, **kwargs
1637 ) -> bool:
1638 """
1639 Check if option is defined.
1640
1641 Uses self.get() to avoid reimplementing the priority order of config variables
1642 (env, config, cmd, defaults).
1643
1644 :param section: section to get option from
1645 :param option: option to get
1646 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections
1647 :param kwargs: additional keyword arguments to pass to get(), such as team_name
1648 :return:
1649 """
1650 try:
1651 value = self.get(
1652 section,
1653 option,
1654 fallback=VALUE_NOT_FOUND_SENTINEL,
1655 _extra_stacklevel=1,
1656 suppress_warnings=True,
1657 lookup_from_deprecated=lookup_from_deprecated,
1658 **kwargs,
1659 )
1660 if value is VALUE_NOT_FOUND_SENTINEL:
1661 return False
1662 return True
1663 except (NoOptionError, NoSectionError, AirflowConfigException):
1664 return False
1665
1666 def set(self, section: str, option: str, value: str | None = None) -> None: # type: ignore[override]
1667 """
1668 Set an option to the given value.
1669
1670 This override just makes sure the section and option are lower case, to match what we do in `get`.
1671 """
1672 section = section.lower()
1673 option = option.lower()
1674 defaults = self.configuration_description or {}
1675 if not self.has_section(section) and section in defaults:
1676 # Trying to set a key in a section that exists in default, but not in the user config;
1677 # automatically create it
1678 self.add_section(section)
1679 super().set(section, option, value)
1680
1681 def remove_option(self, section: str, option: str, remove_default: bool = True): # type: ignore[override]
1682 """
1683 Remove an option if it exists in config from a file or default config.
1684
1685 If both of config have the same option, this removes the option
1686 in both configs unless remove_default=False.
1687 """
1688 section = section.lower()
1689 option = option.lower()
1690 if super().has_option(section, option):
1691 super().remove_option(section, option)
1692
1693 if remove_default and self._default_values.has_option(section, option):
1694 self._default_values.remove_option(section, option)
1695
1696 def optionxform(self, optionstr: str) -> str:
1697 """
1698 Transform option names on every read, get, or set operation.
1699
1700 This changes from the default behaviour of ConfigParser from lower-casing
1701 to instead be case-preserving.
1702
1703 :param optionstr:
1704 :return:
1705 """
1706 return optionstr
1707
1708 def as_dict(
1709 self,
1710 display_source: bool = False,
1711 display_sensitive: bool = False,
1712 raw: bool = False,
1713 include_env: bool = True,
1714 include_cmds: bool = True,
1715 include_secret: bool = True,
1716 ) -> ConfigSourcesType:
1717 """
1718 Return the current configuration as an OrderedDict of OrderedDicts.
1719
1720 When materializing current configuration Airflow defaults are
1721 materialized along with user set configs. If any of the `include_*`
1722 options are False then the result of calling command or secret key
1723 configs do not override Airflow defaults and instead are passed through.
1724 In order to then avoid Airflow defaults from overwriting user set
1725 command or secret key configs we filter out bare sensitive_config_values
1726 that are set to Airflow defaults when command or secret key configs
1727 produce different values.
1728
1729 :param display_source: If False, the option value is returned. If True,
1730 a tuple of (option_value, source) is returned. Source is either
1731 'airflow.cfg', 'default', 'env var', or 'cmd'.
1732 :param display_sensitive: If True, the values of options set by env
1733 vars and bash commands will be displayed. If False, those options
1734 are shown as '< hidden >'
1735 :param raw: Should the values be output as interpolated values, or the
1736 "raw" form that can be fed back in to ConfigParser
1737 :param include_env: Should the value of configuration from AIRFLOW__
1738 environment variables be included or not
1739 :param include_cmds: Should the result of calling any ``*_cmd`` config be
1740 set (True, default), or should the _cmd options be left as the
1741 command to run (False)
1742 :param include_secret: Should the result of calling any ``*_secret`` config be
1743 set (True, default), or should the _secret options be left as the
1744 path to get the secret from (False)
1745 :return: Dictionary, where the key is the name of the section and the content is
1746 the dictionary with the name of the parameter and its value.
1747 """
1748 if not display_sensitive:
1749 # We want to hide the sensitive values at the appropriate methods
1750 # since envs from cmds, secrets can be read at _include_envs method
1751 if not all([include_env, include_cmds, include_secret]):
1752 raise ValueError(
1753 "If display_sensitive is false, then include_env, "
1754 "include_cmds, include_secret must all be set as True"
1755 )
1756
1757 config_sources: ConfigSourcesType = {}
1758
1759 # We check sequentially all those sources and the last one we saw it in will "win"
1760 configs = self._config_sources_for_as_dict
1761
1762 self._replace_config_with_display_sources(
1763 config_sources,
1764 configs,
1765 self.configuration_description,
1766 display_source,
1767 raw,
1768 self.deprecated_options,
1769 include_cmds=include_cmds,
1770 include_env=include_env,
1771 include_secret=include_secret,
1772 )
1773
1774 # add env vars and overwrite because they have priority
1775 if include_env:
1776 self._include_envs(config_sources, display_sensitive, display_source, raw)
1777 else:
1778 self._filter_by_source(config_sources, display_source, self._get_env_var_option)
1779
1780 # add bash commands
1781 if include_cmds:
1782 self._include_commands(config_sources, display_sensitive, display_source, raw)
1783 else:
1784 self._filter_by_source(config_sources, display_source, self._get_cmd_option)
1785
1786 # add config from secret backends
1787 if include_secret:
1788 self._include_secrets(config_sources, display_sensitive, display_source, raw)
1789 else:
1790 self._filter_by_source(config_sources, display_source, self._get_secret_option)
1791
1792 if not display_sensitive:
1793 # This ensures the ones from config file is hidden too
1794 # if they are not provided through env, cmd and secret
1795 hidden = "< hidden >"
1796 for section, key in self.sensitive_config_values:
1797 if config_sources.get(section):
1798 if config_sources[section].get(key, None):
1799 if display_source:
1800 source = config_sources[section][key][1]
1801 config_sources[section][key] = (hidden, source)
1802 else:
1803 config_sources[section][key] = hidden
1804
1805 return config_sources
1806
1807 def _write_option_header(
1808 self,
1809 file: IO[str],
1810 option: str,
1811 extra_spacing: bool,
1812 include_descriptions: bool,
1813 include_env_vars: bool,
1814 include_examples: bool,
1815 include_sources: bool,
1816 section_config_description: dict[str, dict[str, Any]],
1817 section_to_write: str,
1818 sources_dict: ConfigSourcesType,
1819 ) -> tuple[bool, bool]:
1820 """
1821 Write header for configuration option.
1822
1823 Returns tuple of (should_continue, needs_separation) where needs_separation should be
1824 set if the option needs additional separation to visually separate it from the next option.
1825 """
1826 option_config_description = (
1827 section_config_description.get("options", {}).get(option, {})
1828 if section_config_description
1829 else {}
1830 )
1831 description = option_config_description.get("description")
1832 needs_separation = False
1833 if description and include_descriptions:
1834 for line in description.splitlines():
1835 file.write(f"# {line}\n")
1836 needs_separation = True
1837 example = option_config_description.get("example")
1838 if example is not None and include_examples:
1839 if extra_spacing:
1840 file.write("#\n")
1841 example_lines = example.splitlines()
1842 example = "\n# ".join(example_lines)
1843 file.write(f"# Example: {option} = {example}\n")
1844 needs_separation = True
1845 if include_sources and sources_dict:
1846 sources_section = sources_dict.get(section_to_write)
1847 value_with_source = sources_section.get(option) if sources_section else None
1848 if value_with_source is None:
1849 file.write("#\n# Source: not defined\n")
1850 else:
1851 file.write(f"#\n# Source: {value_with_source[1]}\n")
1852 needs_separation = True
1853 if include_env_vars:
1854 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n")
1855 if extra_spacing:
1856 file.write("#\n")
1857 needs_separation = True
1858 return True, needs_separation
1859
1860 def is_template(self, section: str, key) -> bool:
1861 """
1862 Return whether the value is templated.
1863
1864 :param section: section of the config
1865 :param key: key in the section
1866 :return: True if the value is templated
1867 """
1868 return _is_template(self.configuration_description, section, key)
1869
1870 def getsection(self, section: str, team_name: str | None = None) -> ConfigOptionsDictType | None:
1871 """
1872 Return the section as a dict.
1873
1874 Values are converted to int, float, bool as required.
1875
1876 :param section: section from the config
1877 :param team_name: optional team name for team-specific configuration lookup
1878 """
1879 # Handle team-specific section lookup for config file
1880 config_section = f"{team_name}={section}" if team_name else section
1881
1882 if not self.has_section(config_section) and not self._default_values.has_section(config_section):
1883 return None
1884 if self._default_values.has_section(config_section):
1885 _section: ConfigOptionsDictType = dict(self._default_values.items(config_section))
1886 else:
1887 _section = {}
1888
1889 if self.has_section(config_section):
1890 _section.update(self.items(config_section))
1891
1892 # Use section (not config_section) for env var lookup - team_name is handled by _env_var_name
1893 section_prefix = self._env_var_name(section, "", team_name=team_name)
1894 for env_var in sorted(os.environ.keys()):
1895 if env_var.startswith(section_prefix):
1896 key = env_var.replace(section_prefix, "")
1897 if key.endswith("_CMD"):
1898 key = key[:-4]
1899 key = key.lower()
1900 _section[key] = self._get_env_var_option(section, key, team_name=team_name)
1901
1902 for key, val in _section.items():
1903 if val is None:
1904 raise AirflowConfigException(
1905 f"Failed to convert value automatically. "
1906 f'Please check "{key}" key in "{section}" section is set.'
1907 )
1908 try:
1909 _section[key] = int(val)
1910 except ValueError:
1911 try:
1912 _section[key] = float(val)
1913 except ValueError:
1914 if isinstance(val, str) and val.lower() in ("t", "true"):
1915 _section[key] = True
1916 elif isinstance(val, str) and val.lower() in ("f", "false"):
1917 _section[key] = False
1918 return _section
1919
1920 @staticmethod
1921 def _write_section_header(
1922 file: IO[str],
1923 include_descriptions: bool,
1924 section_config_description: dict[str, str],
1925 section_to_write: str,
1926 ) -> None:
1927 """Write header for configuration section."""
1928 file.write(f"[{section_to_write}]\n")
1929 section_description = section_config_description.get("description")
1930 if section_description and include_descriptions:
1931 for line in section_description.splitlines():
1932 file.write(f"# {line}\n")
1933 file.write("\n")
1934
1935 def _write_value(
1936 self,
1937 file: IO[str],
1938 option: str,
1939 comment_out_everything: bool,
1940 needs_separation: bool,
1941 only_defaults: bool,
1942 section_to_write: str,
1943 hide_sensitive: bool,
1944 is_sensitive: bool,
1945 show_values: bool = False,
1946 ):
1947 default_value = self.get_default_value(section_to_write, option, raw=True)
1948 if only_defaults:
1949 value = default_value
1950 else:
1951 value = self.get(section_to_write, option, fallback=default_value, raw=True)
1952 if not show_values:
1953 file.write(f"# {option} = \n")
1954 else:
1955 if hide_sensitive and is_sensitive:
1956 value = "< hidden >"
1957 else:
1958 pass
1959 if value is None:
1960 file.write(f"# {option} = \n")
1961 else:
1962 if comment_out_everything:
1963 value_lines = value.splitlines()
1964 value = "\n# ".join(value_lines)
1965 file.write(f"# {option} = {value}\n")
1966 else:
1967 if "\n" in value:
1968 try:
1969 value = json.dumps(json.loads(value), indent=4)
1970 value = value.replace(
1971 "\n", "\n "
1972 ) # indent multi-line JSON to satisfy configparser format
1973 except JSONDecodeError:
1974 pass
1975 file.write(f"{option} = {value}\n")
1976 if needs_separation:
1977 file.write("\n")
1978
1979 def write( # type: ignore[override]
1980 self,
1981 file: IO[str],
1982 section: str | None = None,
1983 include_examples: bool = True,
1984 include_descriptions: bool = True,
1985 include_sources: bool = True,
1986 include_env_vars: bool = True,
1987 include_providers: bool = True,
1988 comment_out_everything: bool = False,
1989 hide_sensitive: bool = False,
1990 extra_spacing: bool = True,
1991 only_defaults: bool = False,
1992 show_values: bool = False,
1993 **kwargs: Any,
1994 ) -> None:
1995 """
1996 Write configuration with comments and examples to a file.
1997
1998 :param file: file to write to
1999 :param section: section of the config to write, defaults to all sections
2000 :param include_examples: Include examples in the output
2001 :param include_descriptions: Include descriptions in the output
2002 :param include_sources: Include the source of each config option
2003 :param include_env_vars: Include environment variables corresponding to each config option
2004 :param include_providers: Include providers configuration
2005 :param comment_out_everything: Comment out all values
2006 :param hide_sensitive_values: Include sensitive values in the output
2007 :param extra_spacing: Add extra spacing before examples and after variables
2008 :param only_defaults: Only include default values when writing the config, not the actual values
2009 """
2010 with self.make_sure_configuration_loaded(with_providers=include_providers):
2011 sources_dict = {}
2012 if include_sources:
2013 sources_dict = self.as_dict(display_source=True)
2014 for section_to_write in self.get_sections_including_defaults():
2015 section_config_description = self.configuration_description.get(section_to_write, {})
2016 if section_to_write != section and section is not None:
2017 continue
2018 if self._has_section_in_any_defaults(section_to_write) or self.has_section(section_to_write):
2019 self._write_section_header(
2020 file, include_descriptions, section_config_description, section_to_write
2021 )
2022 for option in self.get_options_including_defaults(section_to_write):
2023 should_continue, needs_separation = self._write_option_header(
2024 file=file,
2025 option=option,
2026 extra_spacing=extra_spacing,
2027 include_descriptions=include_descriptions,
2028 include_env_vars=include_env_vars,
2029 include_examples=include_examples,
2030 include_sources=include_sources,
2031 section_config_description=section_config_description,
2032 section_to_write=section_to_write,
2033 sources_dict=sources_dict,
2034 )
2035 is_sensitive = (
2036 section_to_write.lower(),
2037 option.lower(),
2038 ) in self.sensitive_config_values
2039 self._write_value(
2040 file=file,
2041 option=option,
2042 comment_out_everything=comment_out_everything,
2043 needs_separation=needs_separation,
2044 only_defaults=only_defaults,
2045 section_to_write=section_to_write,
2046 hide_sensitive=hide_sensitive,
2047 is_sensitive=is_sensitive,
2048 show_values=show_values,
2049 )
2050 if include_descriptions and not needs_separation:
2051 # extra separation between sections in case last option did not need it
2052 file.write("\n")
2053
2054 @contextmanager
2055 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]:
2056 """
2057 Make sure configuration is loaded with or without providers.
2058
2059 The context manager will only toggle the `self._use_providers_configuration` flag if `with_providers` is False, and will reset `self._use_providers_configuration` to True after the context block.
2060 Nop for `with_providers=True` as the configuration already loads providers configuration by default.
2061
2062 :param with_providers: whether providers should be loaded
2063 """
2064 if not with_providers:
2065 self._use_providers_configuration = False
2066 # Only invalidate cached properties that depend on _use_providers_configuration.
2067 # Do NOT use invalidate_cache() here — it would also evict expensive provider-discovery
2068 # caches (_provider_metadata_configuration_description, _provider_metadata_config_fallback_default_values)
2069 # that don't depend on this flag.
2070 self._invalidate_provider_flag_caches()
2071 try:
2072 yield
2073 finally:
2074 if not with_providers:
2075 self._use_providers_configuration = True
2076 self._invalidate_provider_flag_caches()