1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Base configuration parser with pure parsing logic."""
19
20from __future__ import annotations
21
22import contextlib
23import datetime
24import functools
25import itertools
26import json
27import logging
28import os
29import shlex
30import subprocess
31import sys
32import warnings
33from collections.abc import Callable, Generator, Iterable
34from configparser import ConfigParser, NoOptionError, NoSectionError
35from contextlib import contextmanager
36from copy import deepcopy
37from enum import Enum
38from json.decoder import JSONDecodeError
39from re import Pattern
40from typing import IO, TYPE_CHECKING, Any, TypeVar, overload
41
42from .exceptions import AirflowConfigException
43
44log = logging.getLogger(__name__)
45
46
47def _build_kwarg_env_prefix(section: str, kwargs_key: str) -> str:
48 """
49 Build env prefix for per-key backend kwargs.
50
51 ("secrets", "backend_kwargs") -> "AIRFLOW__SECRETS__BACKEND_KWARG__"
52 ("workers", "secrets_backend_kwargs") -> "AIRFLOW__WORKERS__SECRETS_BACKEND_KWARG__"
53 """
54 singular_key = kwargs_key.replace("_kwargs", "_kwarg")
55 return f"{ENV_VAR_PREFIX}{section.upper()}__{singular_key.upper()}__"
56
57
58def _collect_kwarg_env_vars(prefix: str) -> dict[str, str]:
59 """
60 Scan os.environ for per-key secrets backend kwargs.
61
62 AIRFLOW__SECRETS__BACKEND_KWARG__ROLE_ID -> {"role_id": value}
63 Values are raw strings (not JSON-parsed).
64 Empty keys (trailing __ with no suffix) are ignored.
65 """
66 overrides: dict[str, str] = {}
67 for env_var, value in os.environ.items():
68 if env_var.startswith(prefix):
69 kwarg_key = env_var[len(prefix) :].lower()
70 if kwarg_key:
71 overrides[kwarg_key] = value
72 return overrides
73
74
75ConfigType = str | int | float | bool
76ConfigOptionsDictType = dict[str, ConfigType]
77ConfigSectionSourcesType = dict[str, str | tuple[str, str]]
78ConfigSourcesType = dict[str, ConfigSectionSourcesType]
79ENV_VAR_PREFIX = "AIRFLOW__"
80
81
82if TYPE_CHECKING:
83 from airflow.providers_manager import ProvidersManager
84 from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
85
86
87class ValueNotFound:
88 """Object of this is raised when a configuration value cannot be found."""
89
90 pass
91
92
93VALUE_NOT_FOUND_SENTINEL = ValueNotFound()
94
95
96@overload
97def expand_env_var(env_var: None) -> None: ...
98@overload
99def expand_env_var(env_var: str) -> str: ...
100
101
102def expand_env_var(env_var: str | None) -> str | None:
103 """
104 Expand (potentially nested) env vars.
105
106 Repeat and apply `expandvars` and `expanduser` until
107 interpolation stops having any effect.
108 """
109 if not env_var or not isinstance(env_var, str):
110 return env_var
111 while True:
112 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
113 if interpolated == env_var:
114 return interpolated
115 env_var = interpolated
116
117
118def run_command(command: str) -> str:
119 """Run command and returns stdout."""
120 process = subprocess.Popen(
121 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
122 )
123 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
124
125 if process.returncode != 0:
126 raise AirflowConfigException(
127 f"Cannot execute {command}. Error code is: {process.returncode}. "
128 f"Output: {output}, Stderr: {stderr}"
129 )
130
131 return output
132
133
134def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool:
135 """
136 Check if the config is a template.
137
138 :param configuration_description: description of configuration
139 :param section: section
140 :param key: key
141 :return: True if the config is a template
142 """
143 return configuration_description.get(section, {}).get(key, {}).get("is_template", False)
144
145
146def configure_parser_from_configuration_description(
147 parser: ConfigParser,
148 configuration_description: dict[str, dict[str, Any]],
149 all_vars: dict[str, Any],
150) -> None:
151 """
152 Configure a ConfigParser based on configuration description.
153
154 :param parser: ConfigParser to configure
155 :param configuration_description: configuration description from config.yml
156 """
157 for section, section_desc in configuration_description.items():
158 parser.add_section(section)
159 options = section_desc["options"]
160 for key in options:
161 default_value = options[key]["default"]
162 is_template = options[key].get("is_template", False)
163 if (default_value is not None) and not (
164 options[key].get("version_deprecated") or options[key].get("deprecation_reason")
165 ):
166 if is_template or not isinstance(default_value, str):
167 parser.set(section, key, str(default_value))
168 else:
169 try:
170 parser.set(section, key, default_value.format(**all_vars))
171 except (KeyError, ValueError):
172 parser.set(section, key, default_value)
173
174
175def create_provider_cfg_config_fallback_defaults(
176 provider_config_fallback_defaults_cfg_path: str,
177) -> ConfigParser:
178 """
179 Create fallback defaults for configuration.
180
181 This parser contains provider defaults for Airflow configuration, containing fallback default values
182 that might be needed when provider classes are being imported - before provider's configuration
183 is loaded.
184
185 Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead
186 to retrieving provider configuration before the defaults for the provider are loaded.
187
188 Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or
189 environment variables) those will be used as usual.
190
191 NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication,
192 at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed.
193
194 You've been warned!
195
196 :param provider_config_fallback_defaults_cfg_path: path to the provider config fallback defaults .cfg file
197 """
198 config_parser = ConfigParser()
199 config_parser.read(provider_config_fallback_defaults_cfg_path)
200 return config_parser
201
202
203class AirflowConfigParser(ConfigParser):
204 """
205 Base configuration parser with pure parsing logic.
206
207 This class provides the core parsing methods that work with:
208 - configuration_description: dict describing config options (required in __init__)
209 - _default_values: ConfigParser with default values (required in __init__)
210 - deprecated_options: class attribute mapping new -> old options
211 - deprecated_sections: class attribute mapping new -> old sections
212 """
213
214 # A mapping of section -> setting -> { old, replace } for deprecated default values.
215 # Subclasses can override this to define deprecated values that should be upgraded.
216 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {}
217
218 # A mapping of (new section, new option) -> (old section, old option, since_version).
219 # When reading new option, the old option will be checked to see if it exists. If it does a
220 # DeprecationWarning will be issued and the old option will be used instead
221 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
222 ("dag_processor", "dag_file_processor_timeout"): ("core", "dag_file_processor_timeout", "3.0"),
223 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"),
224 ("api", "base_url"): ("webserver", "base_url", "3.0"),
225 ("api", "host"): ("webserver", "web_server_host", "3.0"),
226 ("api", "port"): ("webserver", "web_server_port", "3.0"),
227 ("api", "workers"): ("webserver", "workers", "3.0"),
228 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"),
229 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"),
230 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"),
231 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"),
232 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"),
233 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"),
234 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"),
235 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"),
236 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"),
237 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"),
238 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"),
239 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"),
240 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"),
241 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"),
242 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"),
243 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"),
244 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"),
245 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"),
246 ("core", "num_dag_runs_to_retain_rendered_fields"): (
247 "core",
248 "max_num_rendered_ti_fields_per_task",
249 "3.2.0",
250 ),
251 ("api", "page_size"): ("webserver", "page_size", "3.1.0"),
252 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"),
253 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"),
254 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"),
255 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"),
256 ("api", "log_config"): ("api", "access_logfile", "3.1.0"),
257 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"),
258 ("api", "fallback_page_limit"): ("api", "page_size", "3.2.0"),
259 ("workers", "missing_dag_retries"): ("workers", "missing_dag_retires", "3.1.8"),
260 }
261
262 # A mapping of new section -> (old section, since_version).
263 deprecated_sections: dict[str, tuple[str, str]] = {}
264
265 @property
266 def _lookup_sequence(self) -> list[Callable]:
267 """
268 Define the sequence of lookup methods for get(). The definition here does not have provider lookup.
269
270 Subclasses can override this to customise lookup order.
271 """
272 lookup_methods = [
273 self._get_environment_variables,
274 self._get_option_from_config_file,
275 self._get_option_from_commands,
276 self._get_option_from_secrets,
277 self._get_option_from_defaults,
278 ]
279 if self._use_providers_configuration:
280 # Provider fallback lookups are last so they have the lowest priority in the lookup sequence.
281 lookup_methods += [
282 self._get_option_from_provider_metadata_config_fallbacks,
283 self._get_option_from_provider_cfg_config_fallbacks,
284 ]
285 return lookup_methods
286
287 @functools.cached_property
288 def configuration_description(self) -> dict[str, dict[str, Any]]:
289 """
290 Return configuration description from multiple sources.
291
292 Respects the ``_use_providers_configuration`` flag to decide whether to include
293 provider configuration.
294
295 The merged description is built as follows:
296
297 1. Start from the base configuration description provided in ``__init__``, usually
298 loaded from ``config.yml`` in core. Values defined here are never overridden.
299 2. Merge provider metadata from ``_provider_metadata_configuration_description``,
300 loaded from provider packages' ``get_provider_info`` method. Only adds missing
301 sections/options; does not overwrite existing entries from the base configuration.
302 3. Merge default values from ``_provider_cfg_config_fallback_default_values``,
303 loaded from ``provider_config_fallback_defaults.cfg``. Only sets ``"default"``
304 (and heuristically ``"sensitive"``) for options that do not already define them.
305
306 Base configuration takes precedence, then provider metadata fills in missing
307 descriptions/options, and finally cfg-based fallbacks provide defaults only where
308 none are defined.
309
310 We use ``cached_property`` to cache the merged result; clear this cache (via
311 ``invalidate_cache``) when toggling ``_use_providers_configuration``.
312 """
313 if not self._use_providers_configuration:
314 return self._configuration_description
315
316 merged_description: dict[str, dict[str, Any]] = deepcopy(self._configuration_description)
317
318 # Merge full provider config descriptions (with metadata like sensitive, description, etc.)
319 # from provider packages' get_provider_info method, reusing the cached raw dict.
320 for section, section_content in self._provider_metadata_configuration_description.items():
321 if section not in merged_description:
322 merged_description[section] = deepcopy(section_content)
323 else:
324 existing_options = merged_description[section].setdefault("options", {})
325 for option, option_content in section_content.get("options", {}).items():
326 if option not in existing_options:
327 existing_options[option] = deepcopy(option_content)
328
329 # Merge default values from cfg-based fallbacks (key=value only, no metadata).
330 # Uses setdefault so provider metadata values above take priority.
331 cfg = self._provider_cfg_config_fallback_default_values
332 for section in cfg.sections():
333 section_options = merged_description.setdefault(section, {"options": {}}).setdefault(
334 "options", {}
335 )
336 for option in cfg.options(section):
337 opt_dict = section_options.setdefault(option, {})
338 opt_dict.setdefault("default", cfg.get(section, option))
339 # For cfg-only options with no provider metadata, infer sensitivity from name.
340 if "sensitive" not in opt_dict and option.endswith(("password", "secret")):
341 opt_dict["sensitive"] = True
342
343 return merged_description
344
345 @property
346 def _config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]:
347 """Override the base method to add provider fallbacks when providers are loaded."""
348 sources: list[tuple[str, ConfigParser]] = []
349 if self._use_providers_configuration:
350 # Provider fallback defaults are listed first so they have the lowest priority
351 # in as_dict()'s "last source wins" semantics.
352 sources += [
353 ("provider-cfg-fallback-defaults", self._provider_cfg_config_fallback_default_values),
354 (
355 "provider-metadata-fallback-defaults",
356 self._provider_metadata_config_fallback_default_values,
357 ),
358 ]
359 sources += [
360 ("default", self._default_values),
361 ("airflow.cfg", self),
362 ]
363 return sources
364
365 def _get_option_from_provider_cfg_config_fallbacks(
366 self,
367 deprecated_key: str | None,
368 deprecated_section: str | None,
369 key: str,
370 section: str,
371 issue_warning: bool = True,
372 extra_stacklevel: int = 0,
373 **kwargs,
374 ) -> str | ValueNotFound:
375 """Get config option from provider fallback defaults."""
376 value = self.get_from_provider_cfg_config_fallback_defaults(section, key, **kwargs)
377 if value is not VALUE_NOT_FOUND_SENTINEL:
378 return value
379 return VALUE_NOT_FOUND_SENTINEL
380
381 def _get_option_from_provider_metadata_config_fallbacks(
382 self,
383 deprecated_key: str | None,
384 deprecated_section: str | None,
385 key: str,
386 section: str,
387 issue_warning: bool = True,
388 extra_stacklevel: int = 0,
389 **kwargs,
390 ) -> str | ValueNotFound:
391 """Get config option from provider metadata fallback defaults."""
392 value = self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs)
393 if value is not VALUE_NOT_FOUND_SENTINEL:
394 return value
395 return VALUE_NOT_FOUND_SENTINEL
396
397 def get_from_provider_cfg_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any:
398 """Get provider config fallback default values."""
399 raw = kwargs.get("raw", False)
400 vars_ = kwargs.get("vars")
401 return self._provider_cfg_config_fallback_default_values.get(
402 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, raw=raw, vars=vars_
403 )
404
405 @functools.cached_property
406 def _provider_metadata_configuration_description(self) -> dict[str, dict[str, Any]]:
407 """Raw provider configuration descriptions with full metadata (sensitive, description, etc.)."""
408 result: dict[str, dict[str, Any]] = {}
409 for _, config in self._provider_manager_type().provider_configs:
410 result.update(config)
411 return result
412
413 @functools.cached_property
414 def _provider_metadata_config_fallback_default_values(self) -> ConfigParser:
415 """Return Provider metadata config fallback default values."""
416 return self._create_default_config_parser_callable(self._provider_metadata_configuration_description)
417
418 def get_from_provider_metadata_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any:
419 """Get provider metadata config fallback default values."""
420 raw = kwargs.get("raw", False)
421 vars_ = kwargs.get("vars")
422 return self._provider_metadata_config_fallback_default_values.get(
423 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, raw=raw, vars=vars_
424 )
425
426 @property
427 def _validators(self) -> list[Callable[[], None]]:
428 """
429 Return list of validators defined on a config parser class. Base class will return an empty list.
430
431 Subclasses can override this to customize the validators that are run during validation on the
432 config parser instance.
433 """
434 return []
435
436 def validate(self) -> None:
437 """Run all registered validators."""
438 for validator in self._validators:
439 validator()
440 self.is_validated = True
441
442 def _validate_deprecated_values(self) -> None:
443 """Validate and upgrade deprecated default values."""
444 for section, replacement in self.deprecated_values.items():
445 for name, info in replacement.items():
446 old, new = info
447 current_value = self.get(section, name, fallback="")
448 if self._using_old_value(old, current_value):
449 self.upgraded_values[(section, name)] = current_value
450 new_value = old.sub(new, current_value)
451 self._update_env_var(section=section, name=name, new_value=new_value)
452 self._create_future_warning(
453 name=name,
454 section=section,
455 current_value=current_value,
456 new_value=new_value,
457 )
458
459 def _using_old_value(self, old: Pattern, current_value: str) -> bool:
460 """Check if current_value matches the old pattern."""
461 return old.search(current_value) is not None
462
463 def _update_env_var(self, section: str, name: str, new_value: str) -> None:
464 """Update environment variable with new value."""
465 env_var = self._env_var_name(section, name)
466 # Set it as an env var so that any subprocesses keep the same override!
467 os.environ[env_var] = new_value
468
469 @staticmethod
470 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None:
471 """Create a FutureWarning for deprecated default values."""
472 warnings.warn(
473 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. "
474 f"This value has been changed to {new_value!r} in the running config, but please update your config.",
475 FutureWarning,
476 stacklevel=3,
477 )
478
479 def __init__(
480 self,
481 configuration_description: dict[str, dict[str, Any]],
482 _default_values: ConfigParser,
483 provider_manager_type: type[ProvidersManager] | type[ProvidersManagerTaskRuntime],
484 create_default_config_parser_callable: Callable[[dict[str, dict[str, Any]]], ConfigParser],
485 provider_config_fallback_defaults_cfg_path: str,
486 *args,
487 **kwargs,
488 ):
489 """
490 Initialize the parser.
491
492 :param configuration_description: Description of configuration options
493 :param _default_values: ConfigParser with default values
494 :param provider_manager_type: Either ProvidersManager or ProvidersManagerTaskRuntime, depending on the context of the caller.
495 :param create_default_config_parser_callable: The `create_default_config_parser` function from core or SDK, depending on the context of the caller.
496 :param provider_config_fallback_defaults_cfg_path: Path to the `provider_config_fallback_defaults.cfg` file.
497 """
498 super().__init__(*args, **kwargs)
499 self._configuration_description = configuration_description
500 self._default_values = _default_values
501 self._provider_manager_type = provider_manager_type
502 self._create_default_config_parser_callable = create_default_config_parser_callable
503 self._provider_cfg_config_fallback_default_values = create_provider_cfg_config_fallback_defaults(
504 provider_config_fallback_defaults_cfg_path
505 )
506 self._suppress_future_warnings = False
507 self.upgraded_values: dict[tuple[str, str], str] = {}
508 # The _use_providers_configuration flag will always be True unless we call `write(include_providers=False)` or `with self.make_sure_configuration_loaded(with_providers=False)`.
509 # Even when we call those methods, the flag will be set back to True after the method is done, so it only affects the current call to `as_dict()` and does not have any effect on subsequent calls.
510 self._use_providers_configuration = True
511
512 def invalidate_cache(self) -> None:
513 """
514 Clear all ``functools.cached_property`` entries on this instance.
515
516 Call this after mutating class-level attributes (e.g. ``deprecated_options``)
517 so that derived cached properties are recomputed on next access.
518 """
519 for attr_name in (
520 name
521 for name in dir(type(self))
522 if isinstance(getattr(type(self), name, None), functools.cached_property)
523 ):
524 self.__dict__.pop(attr_name, None)
525
526 def _invalidate_provider_flag_caches(self) -> None:
527 """Invalidate caches related to provider configuration flags."""
528 self.__dict__.pop("configuration_description", None)
529 self.__dict__.pop("sensitive_config_values", None)
530
531 @functools.cached_property
532 def inversed_deprecated_options(self):
533 """Build inverse mapping from old options to new options."""
534 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()}
535
536 @functools.cached_property
537 def inversed_deprecated_sections(self):
538 """Build inverse mapping from old sections to new sections."""
539 return {
540 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items()
541 }
542
543 @functools.cached_property
544 def sensitive_config_values(self) -> set[tuple[str, str]]:
545 """Get set of sensitive config values that should be masked."""
546 flattened = {
547 (s, k): item
548 for s, s_c in self.configuration_description.items()
549 for k, item in s_c.get("options", {}).items()
550 }
551 sensitive = {
552 (section.lower(), key.lower())
553 for (section, key), v in flattened.items()
554 if v.get("sensitive") is True
555 }
556 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options}
557 depr_section = {
558 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections
559 }
560 sensitive.update(depr_section, depr_option)
561 return sensitive
562
563 def _update_defaults_from_string(self, config_string: str) -> None:
564 """
565 Update the defaults in _default_values based on values in config_string ("ini" format).
566
567 Override shared parser's method to add validation for template variables.
568 Note that those values are not validated and cannot contain variables because we are using
569 regular config parser to load them. This method is used to test the config parser in unit tests.
570
571 :param config_string: ini-formatted config string
572 """
573 parser = ConfigParser()
574 parser.read_string(config_string)
575 for section in parser.sections():
576 if section not in self._default_values.sections():
577 self._default_values.add_section(section)
578 errors = False
579 for key, value in parser.items(section):
580 if not self.is_template(section, key) and "{" in value:
581 errors = True
582 log.error(
583 "The %s.%s value %s read from string contains variable. This is not supported",
584 section,
585 key,
586 value,
587 )
588 self._default_values.set(section, key, value)
589 if errors:
590 raise AirflowConfigException(
591 f"The string config passed as default contains variables. "
592 f"This is not supported. String config: {config_string}"
593 )
594
595 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any:
596 """
597 Retrieve default value from default config parser, including provider fallbacks.
598
599 This will retrieve the default value from the core default config parser first. If not found
600 and providers configuration is loaded, it also checks provider fallback defaults.
601 Optionally a raw, stored value can be retrieved by setting skip_interpolation to True.
602 This is useful for example when we want to write the default value to a file, and we don't
603 want the interpolation to happen as it is going to be done later when the config is read.
604
605 :param section: section of the config
606 :param key: key to use
607 :param fallback: fallback value to use
608 :param raw: if raw, then interpolation will be reversed
609 :param kwargs: other args
610 :return:
611 """
612 value = self._default_values.get(section, key, fallback=VALUE_NOT_FOUND_SENTINEL, **kwargs)
613 # Provider metadata has higher priority than cfg fallback — check it first.
614 if value is VALUE_NOT_FOUND_SENTINEL and self._use_providers_configuration:
615 value = self._provider_metadata_config_fallback_default_values.get(
616 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, **kwargs
617 )
618 if value is VALUE_NOT_FOUND_SENTINEL and self._use_providers_configuration:
619 value = self._provider_cfg_config_fallback_default_values.get(
620 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, **kwargs
621 )
622 if value is VALUE_NOT_FOUND_SENTINEL:
623 value = fallback
624 if raw and value is not None:
625 return value.replace("%", "%%")
626 return value
627
628 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None:
629 """
630 Get Secret Backend if defined in airflow.cfg.
631
632 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
633 """
634 section = "workers" if worker_mode else "secrets"
635 key = "secrets_backend" if worker_mode else "backend"
636 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs"
637
638 secrets_backend_cls = self.getimport(section=section, key=key)
639
640 if not secrets_backend_cls:
641 if worker_mode:
642 # if we find no secrets backend for worker, return that of secrets backend
643 secrets_backend_cls = self.getimport(section="secrets", key="backend")
644 if not secrets_backend_cls:
645 return None
646 # When falling back to secrets backend, use its kwargs
647 kwargs_key = "backend_kwargs"
648 section = "secrets"
649 else:
650 return None
651
652 try:
653 backend_kwargs = self.getjson(section=section, key=kwargs_key)
654 if not backend_kwargs:
655 backend_kwargs = {}
656 elif not isinstance(backend_kwargs, dict):
657 raise ValueError("not a dict")
658 except AirflowConfigException:
659 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key)
660 backend_kwargs = {}
661 except ValueError:
662 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key)
663 backend_kwargs = {}
664
665 # Collect per-key overrides; they take precedence over the JSON blob.
666 env_prefix = _build_kwarg_env_prefix(section, kwargs_key)
667 backend_kwargs.update(_collect_kwarg_env_vars(env_prefix))
668
669 return secrets_backend_cls(**backend_kwargs)
670
671 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
672 """
673 Get Config option values from Secret Backend.
674
675 Called by the shared parser's _get_secret_option() method as part of the lookup chain.
676 Uses _get_custom_secret_backend() to get the backend instance.
677
678 :param config_key: the config key to retrieve
679 :return: config value or None
680 """
681 try:
682 secrets_client = self._get_custom_secret_backend()
683 if not secrets_client:
684 return None
685 return secrets_client.get_config(config_key)
686 except Exception as e:
687 raise AirflowConfigException(
688 "Cannot retrieve config from alternative secrets backend. "
689 "Make sure it is configured properly and that the Backend "
690 "is accessible.\n"
691 f"{e}"
692 )
693
694 def _get_cmd_option_from_config_sources(
695 self, config_sources: ConfigSourcesType, section: str, key: str
696 ) -> str | None:
697 fallback_key = key + "_cmd"
698 if (section, key) in self.sensitive_config_values:
699 section_dict = config_sources.get(section)
700 if section_dict is not None:
701 command_value = section_dict.get(fallback_key)
702 if command_value is not None:
703 if isinstance(command_value, str):
704 command = command_value
705 else:
706 command = command_value[0]
707 return run_command(command)
708 return None
709
710 def _get_secret_option_from_config_sources(
711 self, config_sources: ConfigSourcesType, section: str, key: str
712 ) -> str | None:
713 fallback_key = key + "_secret"
714 if (section, key) in self.sensitive_config_values:
715 section_dict = config_sources.get(section)
716 if section_dict is not None:
717 secrets_path_value = section_dict.get(fallback_key)
718 if secrets_path_value is not None:
719 if isinstance(secrets_path_value, str):
720 secrets_path = secrets_path_value
721 else:
722 secrets_path = secrets_path_value[0]
723 return self._get_config_value_from_secret_backend(secrets_path)
724 return None
725
726 def _include_secrets(
727 self,
728 config_sources: ConfigSourcesType,
729 display_sensitive: bool,
730 display_source: bool,
731 raw: bool,
732 ):
733 for section, key in self.sensitive_config_values:
734 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key)
735 if value:
736 if not display_sensitive:
737 value = "< hidden >"
738 if display_source:
739 opt: str | tuple[str, str] = (value, "secret")
740 elif raw:
741 opt = value.replace("%", "%%")
742 else:
743 opt = value
744 config_sources.setdefault(section, {}).update({key: opt})
745 del config_sources[section][key + "_secret"]
746
747 def _include_commands(
748 self,
749 config_sources: ConfigSourcesType,
750 display_sensitive: bool,
751 display_source: bool,
752 raw: bool,
753 ):
754 for section, key in self.sensitive_config_values:
755 opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
756 if not opt:
757 continue
758 opt_to_set: str | tuple[str, str] | None = opt
759 if not display_sensitive:
760 opt_to_set = "< hidden >"
761 if display_source:
762 opt_to_set = (str(opt_to_set), "cmd")
763 elif raw:
764 opt_to_set = str(opt_to_set).replace("%", "%%")
765 if opt_to_set is not None:
766 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set}
767 config_sources.setdefault(section, {}).update(dict_to_update)
768 del config_sources[section][key + "_cmd"]
769
770 def _include_envs(
771 self,
772 config_sources: ConfigSourcesType,
773 display_sensitive: bool,
774 display_source: bool,
775 raw: bool,
776 ):
777 for env_var in [
778 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
779 ]:
780 try:
781 _, section, key = env_var.split("__", 2)
782 opt = self._get_env_var_option(section, key)
783 except ValueError:
784 continue
785 if opt is None:
786 log.warning("Ignoring unknown env var '%s'", env_var)
787 continue
788 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"):
789 # Don't hide cmd/secret values here
790 if not env_var.lower().endswith(("cmd", "secret")):
791 if (section, key) in self.sensitive_config_values:
792 opt = "< hidden >"
793 elif raw:
794 opt = opt.replace("%", "%%")
795 if display_source:
796 opt = (opt, "env var")
797
798 section = section.lower()
799 key = key.lower()
800 config_sources.setdefault(section, {}).update({key: opt})
801
802 def _filter_by_source(
803 self,
804 config_sources: ConfigSourcesType,
805 display_source: bool,
806 getter_func,
807 ):
808 """
809 Delete default configs from current configuration.
810
811 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values.
812
813 This is necessary because bare configs take precedence over the command
814 or secret key equivalents so if the current running config is
815 materialized with Airflow defaults they in turn override user set
816 command or secret key configs.
817
818 :param config_sources: The current configuration to operate on
819 :param display_source: If False, configuration options contain raw
820 values. If True, options are a tuple of (option_value, source).
821 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
822 :param getter_func: A callback function that gets the user configured
823 override value for a particular sensitive_config_values config.
824 :return: None, the given config_sources is filtered if necessary,
825 otherwise untouched.
826 """
827 for section, key in self.sensitive_config_values:
828 # Don't bother if we don't have section / key
829 if section not in config_sources or key not in config_sources[section]:
830 continue
831 # Check that there is something to override defaults
832 try:
833 getter_opt = getter_func(section, key)
834 except ValueError:
835 continue
836 if not getter_opt:
837 continue
838 # Check to see that there is a default value
839 if self.get_default_value(section, key) is None:
840 continue
841 # Check to see if bare setting is the same as defaults
842 if display_source:
843 # when display_source = true, we know that the config_sources contains tuple
844 opt, source = config_sources[section][key] # type: ignore
845 else:
846 opt = config_sources[section][key]
847 if opt == self.get_default_value(section, key):
848 del config_sources[section][key]
849
850 @staticmethod
851 def _deprecated_value_is_set_in_config(
852 deprecated_section: str,
853 deprecated_key: str,
854 configs: Iterable[tuple[str, ConfigParser]],
855 ) -> bool:
856 for config_type, config in configs:
857 if config_type != "default":
858 with contextlib.suppress(NoSectionError):
859 deprecated_section_array = config.items(section=deprecated_section, raw=True)
860 if any(key == deprecated_key for key, _ in deprecated_section_array):
861 return True
862 return False
863
864 @staticmethod
865 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
866 return (
867 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}")
868 is not None
869 )
870
871 @staticmethod
872 def _deprecated_command_is_set_in_config(
873 deprecated_section: str,
874 deprecated_key: str,
875 configs: Iterable[tuple[str, ConfigParser]],
876 ) -> bool:
877 return AirflowConfigParser._deprecated_value_is_set_in_config(
878 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
879 )
880
881 @staticmethod
882 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
883 return (
884 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD")
885 is not None
886 )
887
888 @staticmethod
889 def _deprecated_secret_is_set_in_config(
890 deprecated_section: str,
891 deprecated_key: str,
892 configs: Iterable[tuple[str, ConfigParser]],
893 ) -> bool:
894 return AirflowConfigParser._deprecated_value_is_set_in_config(
895 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
896 )
897
898 @staticmethod
899 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
900 return (
901 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET")
902 is not None
903 )
904
905 @staticmethod
906 def _replace_config_with_display_sources(
907 config_sources: ConfigSourcesType,
908 configs: Iterable[tuple[str, ConfigParser]],
909 configuration_description: dict[str, dict[str, Any]],
910 display_source: bool,
911 raw: bool,
912 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
913 include_env: bool,
914 include_cmds: bool,
915 include_secret: bool,
916 ):
917 for source_name, config in configs:
918 sections = config.sections()
919 for section in sections:
920 AirflowConfigParser._replace_section_config_with_display_sources(
921 config,
922 config_sources,
923 configuration_description,
924 display_source,
925 raw,
926 section,
927 source_name,
928 deprecated_options,
929 configs,
930 include_env=include_env,
931 include_cmds=include_cmds,
932 include_secret=include_secret,
933 )
934
935 @staticmethod
936 def _replace_section_config_with_display_sources(
937 config: ConfigParser,
938 config_sources: ConfigSourcesType,
939 configuration_description: dict[str, dict[str, Any]],
940 display_source: bool,
941 raw: bool,
942 section: str,
943 source_name: str,
944 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
945 configs: Iterable[tuple[str, ConfigParser]],
946 include_env: bool,
947 include_cmds: bool,
948 include_secret: bool,
949 ):
950 sect = config_sources.setdefault(section, {})
951 if isinstance(config, AirflowConfigParser):
952 with config.suppress_future_warnings():
953 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw)
954 else:
955 items = config.items(section=section, raw=raw)
956 for k, val in items:
957 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
958 if deprecated_section and deprecated_key:
959 if source_name == "default":
960 # If deprecated entry has some non-default value set for any of the sources requested,
961 # We should NOT set default for the new entry (because it will override anything
962 # coming from the deprecated ones)
963 if AirflowConfigParser._deprecated_value_is_set_in_config(
964 deprecated_section, deprecated_key, configs
965 ):
966 continue
967 if include_env and AirflowConfigParser._deprecated_variable_is_set(
968 deprecated_section, deprecated_key
969 ):
970 continue
971 if include_cmds and (
972 AirflowConfigParser._deprecated_variable_command_is_set(
973 deprecated_section, deprecated_key
974 )
975 or AirflowConfigParser._deprecated_command_is_set_in_config(
976 deprecated_section, deprecated_key, configs
977 )
978 ):
979 continue
980 if include_secret and (
981 AirflowConfigParser._deprecated_variable_secret_is_set(
982 deprecated_section, deprecated_key
983 )
984 or AirflowConfigParser._deprecated_secret_is_set_in_config(
985 deprecated_section, deprecated_key, configs
986 )
987 ):
988 continue
989 if display_source:
990 updated_source_name = source_name
991 if source_name == "default":
992 # defaults can come from other sources (default-<PROVIDER>) that should be used here
993 source_description_section = configuration_description.get(section, {})
994 source_description_key = source_description_section.get("options", {}).get(k, {})
995 if source_description_key is not None:
996 updated_source_name = source_description_key.get("source", source_name)
997 sect[k] = (val, updated_source_name)
998 else:
999 sect[k] = val
1000
1001 def _warn_deprecate(
1002 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int
1003 ):
1004 """Warn about deprecated config option usage."""
1005 if section == deprecated_section:
1006 warnings.warn(
1007 f"The {deprecated_name} option in [{section}] has been renamed to {key} - "
1008 f"the old setting has been used, but please update your config.",
1009 DeprecationWarning,
1010 stacklevel=4 + extra_stacklevel,
1011 )
1012 else:
1013 warnings.warn(
1014 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option "
1015 f"in [{section}] - the old setting has been used, but please update your config.",
1016 DeprecationWarning,
1017 stacklevel=4 + extra_stacklevel,
1018 )
1019
1020 @contextmanager
1021 def suppress_future_warnings(self):
1022 """
1023 Context manager to temporarily suppress future warnings.
1024
1025 This is a stub used by the shared parser's lookup methods when checking deprecated options.
1026 Subclasses can override this to customize warning suppression behavior.
1027
1028 :return: context manager that suppresses future warnings
1029 """
1030 suppress_future_warnings = self._suppress_future_warnings
1031 self._suppress_future_warnings = True
1032 yield self
1033 self._suppress_future_warnings = suppress_future_warnings
1034
1035 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str:
1036 """Generate environment variable name for a config option."""
1037 team_component: str = f"{team_name.upper()}___" if team_name else ""
1038 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}"
1039
1040 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None):
1041 """Get config option from environment variable."""
1042 env_var: str = self._env_var_name(section, key, team_name=team_name)
1043 if env_var in os.environ:
1044 return expand_env_var(os.environ[env_var])
1045 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
1046 env_var_cmd = env_var + "_CMD"
1047 if env_var_cmd in os.environ:
1048 # if this is a valid command key...
1049 if (section, key) in self.sensitive_config_values:
1050 return run_command(os.environ[env_var_cmd])
1051 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
1052 env_var_secret_path = env_var + "_SECRET"
1053 if env_var_secret_path in os.environ:
1054 # if this is a valid secret path...
1055 if (section, key) in self.sensitive_config_values:
1056 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path])
1057 return None
1058
1059 def _get_cmd_option(self, section: str, key: str):
1060 """Get config option from command execution."""
1061 fallback_key = key + "_cmd"
1062 if (section, key) in self.sensitive_config_values:
1063 if super().has_option(section, fallback_key):
1064 command = super().get(section, fallback_key)
1065 try:
1066 cmd_output = run_command(command)
1067 except AirflowConfigException as e:
1068 raise e
1069 except Exception as e:
1070 raise AirflowConfigException(
1071 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd."
1072 f" Please check the {fallback_key} value."
1073 ) from e
1074 return cmd_output
1075 return None
1076
1077 def _get_secret_option(self, section: str, key: str) -> str | None:
1078 """Get Config option values from Secret Backend."""
1079 fallback_key = key + "_secret"
1080 if (section, key) in self.sensitive_config_values:
1081 if super().has_option(section, fallback_key):
1082 secrets_path = super().get(section, fallback_key)
1083 return self._get_config_value_from_secret_backend(secrets_path)
1084 return None
1085
1086 def _get_environment_variables(
1087 self,
1088 deprecated_key: str | None,
1089 deprecated_section: str | None,
1090 key: str,
1091 section: str,
1092 issue_warning: bool = True,
1093 extra_stacklevel: int = 0,
1094 **kwargs,
1095 ) -> str | ValueNotFound:
1096 """Get config option from environment variables."""
1097 team_name = kwargs.get("team_name", None)
1098 option = self._get_env_var_option(section, key, team_name=team_name)
1099 if option is not None:
1100 return option
1101 if deprecated_section and deprecated_key:
1102 with self.suppress_future_warnings():
1103 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name)
1104 if option is not None:
1105 if issue_warning:
1106 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
1107 return option
1108 return VALUE_NOT_FOUND_SENTINEL
1109
1110 def _get_option_from_config_file(
1111 self,
1112 deprecated_key: str | None,
1113 deprecated_section: str | None,
1114 key: str,
1115 section: str,
1116 issue_warning: bool = True,
1117 extra_stacklevel: int = 0,
1118 **kwargs,
1119 ) -> str | ValueNotFound:
1120 """Get config option from config file."""
1121 if team_name := kwargs.get("team_name", None):
1122 section = f"{team_name}={section}"
1123 # since this is the last lookup that supports team_name, pop it
1124 kwargs.pop("team_name")
1125 if super().has_option(section, key):
1126 return expand_env_var(super().get(section, key, **kwargs))
1127 if deprecated_section and deprecated_key:
1128 if super().has_option(deprecated_section, deprecated_key):
1129 if issue_warning:
1130 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
1131 with self.suppress_future_warnings():
1132 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
1133 return VALUE_NOT_FOUND_SENTINEL
1134
1135 def _get_option_from_commands(
1136 self,
1137 deprecated_key: str | None,
1138 deprecated_section: str | None,
1139 key: str,
1140 section: str,
1141 issue_warning: bool = True,
1142 extra_stacklevel: int = 0,
1143 **kwargs,
1144 ) -> str | ValueNotFound:
1145 """Get config option from command execution."""
1146 if kwargs.get("team_name", None):
1147 # Commands based team config fetching is not currently supported
1148 return VALUE_NOT_FOUND_SENTINEL
1149 option = self._get_cmd_option(section, key)
1150 if option:
1151 return option
1152 if deprecated_section and deprecated_key:
1153 with self.suppress_future_warnings():
1154 option = self._get_cmd_option(deprecated_section, deprecated_key)
1155 if option:
1156 if issue_warning:
1157 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
1158 return option
1159 return VALUE_NOT_FOUND_SENTINEL
1160
1161 def _get_option_from_secrets(
1162 self,
1163 deprecated_key: str | None,
1164 deprecated_section: str | None,
1165 key: str,
1166 section: str,
1167 issue_warning: bool = True,
1168 extra_stacklevel: int = 0,
1169 **kwargs,
1170 ) -> str | ValueNotFound:
1171 """Get config option from secrets backend."""
1172 if kwargs.get("team_name", None):
1173 # Secrets based team config fetching is not currently supported
1174 return VALUE_NOT_FOUND_SENTINEL
1175 option = self._get_secret_option(section, key)
1176 if option:
1177 return option
1178 if deprecated_section and deprecated_key:
1179 with self.suppress_future_warnings():
1180 option = self._get_secret_option(deprecated_section, deprecated_key)
1181 if option:
1182 if issue_warning:
1183 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
1184 return option
1185 return VALUE_NOT_FOUND_SENTINEL
1186
1187 def _get_option_from_defaults(
1188 self,
1189 deprecated_key: str | None,
1190 deprecated_section: str | None,
1191 key: str,
1192 section: str,
1193 issue_warning: bool = True,
1194 extra_stacklevel: int = 0,
1195 team_name: str | None = None,
1196 **kwargs,
1197 ) -> str | ValueNotFound:
1198 """Get config option from default values."""
1199 if self.get_default_value(section, key) is not None or "fallback" in kwargs:
1200 return expand_env_var(self.get_default_value(section, key, **kwargs))
1201 return VALUE_NOT_FOUND_SENTINEL
1202
1203 def _resolve_deprecated_lookup(
1204 self,
1205 section: str,
1206 key: str,
1207 lookup_from_deprecated: bool,
1208 extra_stacklevel: int = 0,
1209 ) -> tuple[str, str, str | None, str | None, bool]:
1210 """
1211 Resolve deprecated section/key mappings and determine deprecated values.
1212
1213 :param section: Section name (will be lowercased)
1214 :param key: Key name (will be lowercased)
1215 :param lookup_from_deprecated: Whether to lookup from deprecated options
1216 :param extra_stacklevel: Extra stack level for warnings
1217 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted)
1218 """
1219 section = section.lower()
1220 key = key.lower()
1221 warning_emitted = False
1222 deprecated_section: str | None = None
1223 deprecated_key: str | None = None
1224
1225 if not lookup_from_deprecated:
1226 return section, key, deprecated_section, deprecated_key, warning_emitted
1227
1228 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {})
1229 if option_description.get("deprecated"):
1230 deprecation_reason = option_description.get("deprecation_reason", "")
1231 warnings.warn(
1232 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}",
1233 DeprecationWarning,
1234 stacklevel=2 + extra_stacklevel,
1235 )
1236 # For the cases in which we rename whole sections
1237 if section in self.inversed_deprecated_sections:
1238 deprecated_section, deprecated_key = (section, key)
1239 section = self.inversed_deprecated_sections[section]
1240 if not self._suppress_future_warnings:
1241 warnings.warn(
1242 f"The config section [{deprecated_section}] has been renamed to "
1243 f"[{section}]. Please update your `conf.get*` call to use the new name",
1244 FutureWarning,
1245 stacklevel=2 + extra_stacklevel,
1246 )
1247 # Don't warn about individual rename if the whole section is renamed
1248 warning_emitted = True
1249 elif (section, key) in self.inversed_deprecated_options:
1250 # Handle using deprecated section/key instead of the new section/key
1251 new_section, new_key = self.inversed_deprecated_options[(section, key)]
1252 if not self._suppress_future_warnings and not warning_emitted:
1253 warnings.warn(
1254 f"section/key [{section}/{key}] has been deprecated, you should use"
1255 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
1256 "new name",
1257 FutureWarning,
1258 stacklevel=2 + extra_stacklevel,
1259 )
1260 warning_emitted = True
1261 deprecated_section, deprecated_key = section, key
1262 section, key = (new_section, new_key)
1263 elif section in self.deprecated_sections:
1264 # When accessing the new section name, make sure we check under the old config name
1265 deprecated_key = key
1266 deprecated_section = self.deprecated_sections[section][0]
1267 else:
1268 deprecated_section, deprecated_key, _ = self.deprecated_options.get(
1269 (section, key), (None, None, None)
1270 )
1271
1272 return section, key, deprecated_section, deprecated_key, warning_emitted
1273
1274 def load_providers_configuration(self) -> None:
1275 """
1276 Load configuration for providers.
1277
1278 .. deprecated:: 3.2.0
1279 Provider configuration is now loaded lazily via the ``configuration_description``
1280 cached property. This method is kept for backwards compatibility and will be
1281 removed in a future version.
1282 """
1283 warnings.warn(
1284 "load_providers_configuration() is deprecated. "
1285 "Provider configuration is now loaded lazily via the "
1286 "`configuration_description` cached property.",
1287 DeprecationWarning,
1288 stacklevel=2,
1289 )
1290 self._use_providers_configuration = True
1291 self._invalidate_provider_flag_caches()
1292
1293 def restore_core_default_configuration(self) -> None:
1294 """
1295 Restore the parser state before provider-contributed sections were loaded.
1296
1297 .. deprecated:: 3.2.0
1298 Use ``make_sure_configuration_loaded(with_providers=False)`` context manager
1299 instead. This method is kept for backwards compatibility and will be removed
1300 in a future version.
1301 """
1302 warnings.warn(
1303 "restore_core_default_configuration() is deprecated. "
1304 "Use `make_sure_configuration_loaded(with_providers=False)` instead.",
1305 DeprecationWarning,
1306 stacklevel=2,
1307 )
1308 self._use_providers_configuration = False
1309 self._invalidate_provider_flag_caches()
1310
1311 @overload # type: ignore[override]
1312 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ...
1313
1314 @overload # type: ignore[override]
1315 def get(self, section: str, key: str, **kwargs) -> str | None: ...
1316
1317 def get( # type: ignore[misc, override]
1318 self,
1319 section: str,
1320 key: str,
1321 suppress_warnings: bool = False,
1322 lookup_from_deprecated: bool = True,
1323 _extra_stacklevel: int = 0,
1324 team_name: str | None = None,
1325 **kwargs,
1326 ) -> str | None:
1327 """
1328 Get config value by iterating through lookup sequence.
1329
1330 Priority order is defined by _lookup_sequence property.
1331 """
1332 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup(
1333 section=section,
1334 key=key,
1335 lookup_from_deprecated=lookup_from_deprecated,
1336 extra_stacklevel=_extra_stacklevel,
1337 )
1338
1339 if team_name is not None:
1340 kwargs["team_name"] = team_name
1341
1342 for lookup_method in self._lookup_sequence:
1343 value = lookup_method(
1344 deprecated_key=deprecated_key,
1345 deprecated_section=deprecated_section,
1346 key=key,
1347 section=section,
1348 issue_warning=not warning_emitted,
1349 extra_stacklevel=_extra_stacklevel,
1350 **kwargs,
1351 )
1352 if value is not VALUE_NOT_FOUND_SENTINEL:
1353 return value
1354
1355 # Check if fallback was explicitly provided (even if None)
1356 if "fallback" in kwargs:
1357 return kwargs["fallback"]
1358
1359 if not suppress_warnings:
1360 log.warning("section/key [%s/%s] not found in config", section, key)
1361
1362 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
1363
1364 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override]
1365 """Get config value as boolean."""
1366 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip()
1367 if "#" in val:
1368 val = val.split("#")[0].strip()
1369 if val in ("t", "true", "1"):
1370 return True
1371 if val in ("f", "false", "0"):
1372 return False
1373 raise AirflowConfigException(
1374 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. '
1375 f'Current value: "{val}".'
1376 )
1377
1378 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override]
1379 """Get config value as integer."""
1380 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1381 if val is None:
1382 raise AirflowConfigException(
1383 f"Failed to convert value None to int. "
1384 f'Please check "{key}" key in "{section}" section is set.'
1385 )
1386 try:
1387 return int(val)
1388 except ValueError:
1389 try:
1390 if (float_val := float(val)) != (int_val := int(float_val)):
1391 raise ValueError
1392 return int_val
1393 except (ValueError, OverflowError):
1394 raise AirflowConfigException(
1395 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1396 f'Current value: "{val}".'
1397 )
1398
1399 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override]
1400 """Get config value as float."""
1401 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1402 if val is None:
1403 raise AirflowConfigException(
1404 f"Failed to convert value None to float. "
1405 f'Please check "{key}" key in "{section}" section is set.'
1406 )
1407 try:
1408 return float(val)
1409 except ValueError:
1410 raise AirflowConfigException(
1411 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. '
1412 f'Current value: "{val}".'
1413 )
1414
1415 def getlist(self, section: str, key: str, delimiter=",", **kwargs):
1416 """Get config value as list."""
1417 val = self.get(section, key, **kwargs)
1418
1419 if isinstance(val, list) or val is None:
1420 # `get` will always return a (possibly-empty) string, so the only way we can
1421 # have these types is with `fallback=` was specified. So just return it.
1422 return val
1423
1424 if val == "":
1425 return []
1426
1427 try:
1428 return [item.strip() for item in val.split(delimiter)]
1429 except Exception:
1430 raise AirflowConfigException(
1431 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. '
1432 f'Current value: "{val}".'
1433 )
1434
1435 E = TypeVar("E", bound=Enum)
1436
1437 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E:
1438 """Get config value as enum."""
1439 val = self.get(section, key, **kwargs)
1440 enum_names = [enum_item.name for enum_item in enum_class]
1441
1442 if val is None:
1443 raise AirflowConfigException(
1444 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1445 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}'
1446 )
1447
1448 try:
1449 return enum_class[val]
1450 except KeyError:
1451 if "fallback" in kwargs and kwargs["fallback"] in enum_names:
1452 return enum_class[kwargs["fallback"]]
1453 raise AirflowConfigException(
1454 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1455 f"the value must be one of {', '.join(enum_names)}"
1456 )
1457
1458 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]:
1459 """Get config value as list of enums."""
1460 kwargs.setdefault("fallback", [])
1461 string_list = self.getlist(section, key, delimiter, **kwargs)
1462
1463 enum_names = [enum_item.name for enum_item in enum_class]
1464 enum_list = []
1465
1466 for val in string_list:
1467 try:
1468 enum_list.append(enum_class[val])
1469 except KeyError:
1470 log.warning(
1471 "Failed to convert value %r. Please check %s key in %s section. "
1472 "it must be one of %s, if not the value is ignored",
1473 val,
1474 key,
1475 section,
1476 ", ".join(enum_names),
1477 )
1478
1479 return enum_list
1480
1481 def getimport(self, section: str, key: str, **kwargs) -> Any:
1482 """
1483 Read options, import the full qualified name, and return the object.
1484
1485 In case of failure, it throws an exception with the key and section names
1486
1487 :return: The object or None, if the option is empty
1488 """
1489 # Fixed: use self.get() instead of conf.get()
1490 full_qualified_path = self.get(section=section, key=key, **kwargs)
1491 if not full_qualified_path:
1492 return None
1493
1494 try:
1495 # Import here to avoid circular dependency
1496 from ..module_loading import import_string
1497
1498 return import_string(full_qualified_path)
1499 except ImportError as e:
1500 log.warning(e)
1501 raise AirflowConfigException(
1502 f'The object could not be loaded. Please check "{key}" key in "{section}" section. '
1503 f'Current value: "{full_qualified_path}".'
1504 )
1505
1506 def getjson(
1507 self, section: str, key: str, fallback=None, **kwargs
1508 ) -> dict | list | str | int | float | None:
1509 """
1510 Return a config value parsed from a JSON string.
1511
1512 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given.
1513 """
1514 try:
1515 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs)
1516 except (NoSectionError, NoOptionError):
1517 data = None
1518
1519 if data is None or data == "":
1520 return fallback
1521
1522 try:
1523 return json.loads(data)
1524 except JSONDecodeError as e:
1525 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e
1526
1527 def gettimedelta(
1528 self, section: str, key: str, fallback: Any = None, **kwargs
1529 ) -> datetime.timedelta | None:
1530 """
1531 Get the config value for the given section and key, and convert it into datetime.timedelta object.
1532
1533 If the key is missing, then it is considered as `None`.
1534
1535 :param section: the section from the config
1536 :param key: the key defined in the given section
1537 :param fallback: fallback value when no config value is given, defaults to None
1538 :raises AirflowConfigException: raised because ValueError or OverflowError
1539 :return: datetime.timedelta(seconds=<config_value>) or None
1540 """
1541 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs)
1542
1543 if val:
1544 # the given value must be convertible to integer
1545 try:
1546 int_val = int(val)
1547 except ValueError:
1548 raise AirflowConfigException(
1549 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1550 f'Current value: "{val}".'
1551 )
1552
1553 try:
1554 return datetime.timedelta(seconds=int_val)
1555 except OverflowError as err:
1556 raise AirflowConfigException(
1557 f"Failed to convert value to timedelta in `seconds`. "
1558 f"{err}. "
1559 f'Please check "{key}" key in "{section}" section. Current value: "{val}".'
1560 )
1561
1562 return fallback
1563
1564 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
1565 """Get mandatory config value, raising ValueError if not found."""
1566 value = self.get(section, key, _extra_stacklevel=1, **kwargs)
1567 if value is None:
1568 raise ValueError(f"The value {section}/{key} should be set!")
1569 return value
1570
1571 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]:
1572 """Get mandatory config value as list, raising ValueError if not found."""
1573 value = self.getlist(section, key, **kwargs)
1574 if value is None:
1575 raise ValueError(f"The value {section}/{key} should be set!")
1576 return value
1577
1578 def read(
1579 self,
1580 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike],
1581 encoding: str | None = None,
1582 ) -> list[str]:
1583 return super().read(filenames=filenames, encoding=encoding)
1584
1585 def read_dict( # type: ignore[override]
1586 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>"
1587 ) -> None:
1588 """
1589 We define a different signature here to add better type hints and checking.
1590
1591 :param dictionary: dictionary to read from
1592 :param source: source to be used to store the configuration
1593 :return:
1594 """
1595 super().read_dict(dictionary=dictionary, source=source)
1596
1597 def _has_section_in_any_defaults(self, section: str) -> bool:
1598 """Check if section exists in core defaults or provider fallback defaults."""
1599 if self._default_values.has_section(section):
1600 return True
1601 if self._use_providers_configuration:
1602 if self._provider_cfg_config_fallback_default_values.has_section(section):
1603 return True
1604 if self._provider_metadata_config_fallback_default_values.has_section(section):
1605 return True
1606 return False
1607
1608 def get_sections_including_defaults(self) -> list[str]:
1609 """
1610 Retrieve all sections from the configuration parser, including sections defined by built-in defaults.
1611
1612 :return: list of section names
1613 """
1614 sections_from_config = self.sections()
1615 sections_from_description = list(self.configuration_description.keys())
1616 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config)))
1617
1618 def get_options_including_defaults(self, section: str) -> list[str]:
1619 """
1620 Retrieve all possible options from the configuration parser for the section given.
1621
1622 Includes options defined by built-in defaults.
1623
1624 :param section: section name
1625 :return: list of option names for the section given
1626 """
1627 my_own_options = self.options(section) if self.has_section(section) else []
1628 all_options_from_defaults = list(
1629 self.configuration_description.get(section, {}).get("options", {}).keys()
1630 )
1631 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options)))
1632
1633 def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True, **kwargs) -> bool:
1634 """
1635 Check if option is defined.
1636
1637 Uses self.get() to avoid reimplementing the priority order of config variables
1638 (env, config, cmd, defaults).
1639
1640 :param section: section to get option from
1641 :param option: option to get
1642 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections
1643 :param kwargs: additional keyword arguments to pass to get(), such as team_name
1644 :return:
1645 """
1646 try:
1647 value = self.get(
1648 section,
1649 option,
1650 fallback=VALUE_NOT_FOUND_SENTINEL,
1651 _extra_stacklevel=1,
1652 suppress_warnings=True,
1653 lookup_from_deprecated=lookup_from_deprecated,
1654 **kwargs,
1655 )
1656 if value is VALUE_NOT_FOUND_SENTINEL:
1657 return False
1658 return True
1659 except (NoOptionError, NoSectionError, AirflowConfigException):
1660 return False
1661
1662 def set(self, section: str, option: str, value: str | None = None) -> None:
1663 """
1664 Set an option to the given value.
1665
1666 This override just makes sure the section and option are lower case, to match what we do in `get`.
1667 """
1668 section = section.lower()
1669 option = option.lower()
1670 defaults = self.configuration_description or {}
1671 if not self.has_section(section) and section in defaults:
1672 # Trying to set a key in a section that exists in default, but not in the user config;
1673 # automatically create it
1674 self.add_section(section)
1675 super().set(section, option, value)
1676
1677 def remove_option(self, section: str, option: str, remove_default: bool = True):
1678 """
1679 Remove an option if it exists in config from a file or default config.
1680
1681 If both of config have the same option, this removes the option
1682 in both configs unless remove_default=False.
1683 """
1684 section = section.lower()
1685 option = option.lower()
1686 if super().has_option(section, option):
1687 super().remove_option(section, option)
1688
1689 if remove_default and self._default_values.has_option(section, option):
1690 self._default_values.remove_option(section, option)
1691
1692 def optionxform(self, optionstr: str) -> str:
1693 """
1694 Transform option names on every read, get, or set operation.
1695
1696 This changes from the default behaviour of ConfigParser from lower-casing
1697 to instead be case-preserving.
1698
1699 :param optionstr:
1700 :return:
1701 """
1702 return optionstr
1703
1704 def as_dict(
1705 self,
1706 display_source: bool = False,
1707 display_sensitive: bool = False,
1708 raw: bool = False,
1709 include_env: bool = True,
1710 include_cmds: bool = True,
1711 include_secret: bool = True,
1712 ) -> ConfigSourcesType:
1713 """
1714 Return the current configuration as an OrderedDict of OrderedDicts.
1715
1716 When materializing current configuration Airflow defaults are
1717 materialized along with user set configs. If any of the `include_*`
1718 options are False then the result of calling command or secret key
1719 configs do not override Airflow defaults and instead are passed through.
1720 In order to then avoid Airflow defaults from overwriting user set
1721 command or secret key configs we filter out bare sensitive_config_values
1722 that are set to Airflow defaults when command or secret key configs
1723 produce different values.
1724
1725 :param display_source: If False, the option value is returned. If True,
1726 a tuple of (option_value, source) is returned. Source is either
1727 'airflow.cfg', 'default', 'env var', or 'cmd'.
1728 :param display_sensitive: If True, the values of options set by env
1729 vars and bash commands will be displayed. If False, those options
1730 are shown as '< hidden >'
1731 :param raw: Should the values be output as interpolated values, or the
1732 "raw" form that can be fed back in to ConfigParser
1733 :param include_env: Should the value of configuration from AIRFLOW__
1734 environment variables be included or not
1735 :param include_cmds: Should the result of calling any ``*_cmd`` config be
1736 set (True, default), or should the _cmd options be left as the
1737 command to run (False)
1738 :param include_secret: Should the result of calling any ``*_secret`` config be
1739 set (True, default), or should the _secret options be left as the
1740 path to get the secret from (False)
1741 :return: Dictionary, where the key is the name of the section and the content is
1742 the dictionary with the name of the parameter and its value.
1743 """
1744 if not display_sensitive:
1745 # We want to hide the sensitive values at the appropriate methods
1746 # since envs from cmds, secrets can be read at _include_envs method
1747 if not all([include_env, include_cmds, include_secret]):
1748 raise ValueError(
1749 "If display_sensitive is false, then include_env, "
1750 "include_cmds, include_secret must all be set as True"
1751 )
1752
1753 config_sources: ConfigSourcesType = {}
1754
1755 # We check sequentially all those sources and the last one we saw it in will "win"
1756 configs = self._config_sources_for_as_dict
1757
1758 self._replace_config_with_display_sources(
1759 config_sources,
1760 configs,
1761 self.configuration_description,
1762 display_source,
1763 raw,
1764 self.deprecated_options,
1765 include_cmds=include_cmds,
1766 include_env=include_env,
1767 include_secret=include_secret,
1768 )
1769
1770 # add env vars and overwrite because they have priority
1771 if include_env:
1772 self._include_envs(config_sources, display_sensitive, display_source, raw)
1773 else:
1774 self._filter_by_source(config_sources, display_source, self._get_env_var_option)
1775
1776 # add bash commands
1777 if include_cmds:
1778 self._include_commands(config_sources, display_sensitive, display_source, raw)
1779 else:
1780 self._filter_by_source(config_sources, display_source, self._get_cmd_option)
1781
1782 # add config from secret backends
1783 if include_secret:
1784 self._include_secrets(config_sources, display_sensitive, display_source, raw)
1785 else:
1786 self._filter_by_source(config_sources, display_source, self._get_secret_option)
1787
1788 if not display_sensitive:
1789 # This ensures the ones from config file is hidden too
1790 # if they are not provided through env, cmd and secret
1791 hidden = "< hidden >"
1792 for section, key in self.sensitive_config_values:
1793 if config_sources.get(section):
1794 if config_sources[section].get(key, None):
1795 if display_source:
1796 source = config_sources[section][key][1]
1797 config_sources[section][key] = (hidden, source)
1798 else:
1799 config_sources[section][key] = hidden
1800
1801 return config_sources
1802
1803 def _write_option_header(
1804 self,
1805 file: IO[str],
1806 option: str,
1807 extra_spacing: bool,
1808 include_descriptions: bool,
1809 include_env_vars: bool,
1810 include_examples: bool,
1811 include_sources: bool,
1812 section_config_description: dict[str, dict[str, Any]],
1813 section_to_write: str,
1814 sources_dict: ConfigSourcesType,
1815 ) -> tuple[bool, bool]:
1816 """
1817 Write header for configuration option.
1818
1819 Returns tuple of (should_continue, needs_separation) where needs_separation should be
1820 set if the option needs additional separation to visually separate it from the next option.
1821 """
1822 option_config_description = (
1823 section_config_description.get("options", {}).get(option, {})
1824 if section_config_description
1825 else {}
1826 )
1827 description = option_config_description.get("description")
1828 needs_separation = False
1829 if description and include_descriptions:
1830 for line in description.splitlines():
1831 file.write(f"# {line}\n")
1832 needs_separation = True
1833 example = option_config_description.get("example")
1834 if example is not None and include_examples:
1835 if extra_spacing:
1836 file.write("#\n")
1837 example_lines = example.splitlines()
1838 example = "\n# ".join(example_lines)
1839 file.write(f"# Example: {option} = {example}\n")
1840 needs_separation = True
1841 if include_sources and sources_dict:
1842 sources_section = sources_dict.get(section_to_write)
1843 value_with_source = sources_section.get(option) if sources_section else None
1844 if value_with_source is None:
1845 file.write("#\n# Source: not defined\n")
1846 else:
1847 file.write(f"#\n# Source: {value_with_source[1]}\n")
1848 needs_separation = True
1849 if include_env_vars:
1850 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n")
1851 if extra_spacing:
1852 file.write("#\n")
1853 needs_separation = True
1854 return True, needs_separation
1855
1856 def is_template(self, section: str, key) -> bool:
1857 """
1858 Return whether the value is templated.
1859
1860 :param section: section of the config
1861 :param key: key in the section
1862 :return: True if the value is templated
1863 """
1864 return _is_template(self.configuration_description, section, key)
1865
1866 def getsection(self, section: str, team_name: str | None = None) -> ConfigOptionsDictType | None:
1867 """
1868 Return the section as a dict.
1869
1870 Values are converted to int, float, bool as required.
1871
1872 :param section: section from the config
1873 :param team_name: optional team name for team-specific configuration lookup
1874 """
1875 # Handle team-specific section lookup for config file
1876 config_section = f"{team_name}={section}" if team_name else section
1877
1878 if not self.has_section(config_section) and not self._default_values.has_section(config_section):
1879 return None
1880 if self._default_values.has_section(config_section):
1881 _section: ConfigOptionsDictType = dict(self._default_values.items(config_section))
1882 else:
1883 _section = {}
1884
1885 if self.has_section(config_section):
1886 _section.update(self.items(config_section))
1887
1888 # Use section (not config_section) for env var lookup - team_name is handled by _env_var_name
1889 section_prefix = self._env_var_name(section, "", team_name=team_name)
1890 for env_var in sorted(os.environ.keys()):
1891 if env_var.startswith(section_prefix):
1892 key = env_var.replace(section_prefix, "")
1893 if key.endswith("_CMD"):
1894 key = key[:-4]
1895 key = key.lower()
1896 _section[key] = self._get_env_var_option(section, key, team_name=team_name)
1897
1898 for key, val in _section.items():
1899 if val is None:
1900 raise AirflowConfigException(
1901 f"Failed to convert value automatically. "
1902 f'Please check "{key}" key in "{section}" section is set.'
1903 )
1904 try:
1905 _section[key] = int(val)
1906 except ValueError:
1907 try:
1908 _section[key] = float(val)
1909 except ValueError:
1910 if isinstance(val, str) and val.lower() in ("t", "true"):
1911 _section[key] = True
1912 elif isinstance(val, str) and val.lower() in ("f", "false"):
1913 _section[key] = False
1914 return _section
1915
1916 @staticmethod
1917 def _write_section_header(
1918 file: IO[str],
1919 include_descriptions: bool,
1920 section_config_description: dict[str, str],
1921 section_to_write: str,
1922 ) -> None:
1923 """Write header for configuration section."""
1924 file.write(f"[{section_to_write}]\n")
1925 section_description = section_config_description.get("description")
1926 if section_description and include_descriptions:
1927 for line in section_description.splitlines():
1928 file.write(f"# {line}\n")
1929 file.write("\n")
1930
1931 def _write_value(
1932 self,
1933 file: IO[str],
1934 option: str,
1935 comment_out_everything: bool,
1936 needs_separation: bool,
1937 only_defaults: bool,
1938 section_to_write: str,
1939 hide_sensitive: bool,
1940 is_sensitive: bool,
1941 show_values: bool = False,
1942 ):
1943 default_value = self.get_default_value(section_to_write, option, raw=True)
1944 if only_defaults:
1945 value = default_value
1946 else:
1947 value = self.get(section_to_write, option, fallback=default_value, raw=True)
1948 if not show_values:
1949 file.write(f"# {option} = \n")
1950 else:
1951 if hide_sensitive and is_sensitive:
1952 value = "< hidden >"
1953 else:
1954 pass
1955 if value is None:
1956 file.write(f"# {option} = \n")
1957 else:
1958 if comment_out_everything:
1959 value_lines = value.splitlines()
1960 value = "\n# ".join(value_lines)
1961 file.write(f"# {option} = {value}\n")
1962 else:
1963 if "\n" in value:
1964 try:
1965 value = json.dumps(json.loads(value), indent=4)
1966 value = value.replace(
1967 "\n", "\n "
1968 ) # indent multi-line JSON to satisfy configparser format
1969 except JSONDecodeError:
1970 pass
1971 file.write(f"{option} = {value}\n")
1972 if needs_separation:
1973 file.write("\n")
1974
1975 def write( # type: ignore[override]
1976 self,
1977 file: IO[str],
1978 section: str | None = None,
1979 include_examples: bool = True,
1980 include_descriptions: bool = True,
1981 include_sources: bool = True,
1982 include_env_vars: bool = True,
1983 include_providers: bool = True,
1984 comment_out_everything: bool = False,
1985 hide_sensitive: bool = False,
1986 extra_spacing: bool = True,
1987 only_defaults: bool = False,
1988 show_values: bool = False,
1989 **kwargs: Any,
1990 ) -> None:
1991 """
1992 Write configuration with comments and examples to a file.
1993
1994 :param file: file to write to
1995 :param section: section of the config to write, defaults to all sections
1996 :param include_examples: Include examples in the output
1997 :param include_descriptions: Include descriptions in the output
1998 :param include_sources: Include the source of each config option
1999 :param include_env_vars: Include environment variables corresponding to each config option
2000 :param include_providers: Include providers configuration
2001 :param comment_out_everything: Comment out all values
2002 :param hide_sensitive_values: Include sensitive values in the output
2003 :param extra_spacing: Add extra spacing before examples and after variables
2004 :param only_defaults: Only include default values when writing the config, not the actual values
2005 """
2006 with self.make_sure_configuration_loaded(with_providers=include_providers):
2007 sources_dict = {}
2008 if include_sources:
2009 sources_dict = self.as_dict(display_source=True)
2010 for section_to_write in self.get_sections_including_defaults():
2011 section_config_description = self.configuration_description.get(section_to_write, {})
2012 if section_to_write != section and section is not None:
2013 continue
2014 if self._has_section_in_any_defaults(section_to_write) or self.has_section(section_to_write):
2015 self._write_section_header(
2016 file, include_descriptions, section_config_description, section_to_write
2017 )
2018 for option in self.get_options_including_defaults(section_to_write):
2019 should_continue, needs_separation = self._write_option_header(
2020 file=file,
2021 option=option,
2022 extra_spacing=extra_spacing,
2023 include_descriptions=include_descriptions,
2024 include_env_vars=include_env_vars,
2025 include_examples=include_examples,
2026 include_sources=include_sources,
2027 section_config_description=section_config_description,
2028 section_to_write=section_to_write,
2029 sources_dict=sources_dict,
2030 )
2031 is_sensitive = (
2032 section_to_write.lower(),
2033 option.lower(),
2034 ) in self.sensitive_config_values
2035 self._write_value(
2036 file=file,
2037 option=option,
2038 comment_out_everything=comment_out_everything,
2039 needs_separation=needs_separation,
2040 only_defaults=only_defaults,
2041 section_to_write=section_to_write,
2042 hide_sensitive=hide_sensitive,
2043 is_sensitive=is_sensitive,
2044 show_values=show_values,
2045 )
2046 if include_descriptions and not needs_separation:
2047 # extra separation between sections in case last option did not need it
2048 file.write("\n")
2049
2050 @contextmanager
2051 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]:
2052 """
2053 Make sure configuration is loaded with or without providers.
2054
2055 The context manager will only toggle the `self._use_providers_configuration` flag if `with_providers` is False, and will reset `self._use_providers_configuration` to True after the context block.
2056 Nop for `with_providers=True` as the configuration already loads providers configuration by default.
2057
2058 :param with_providers: whether providers should be loaded
2059 """
2060 if not with_providers:
2061 self._use_providers_configuration = False
2062 # Only invalidate cached properties that depend on _use_providers_configuration.
2063 # Do NOT use invalidate_cache() here — it would also evict expensive provider-discovery
2064 # caches (_provider_metadata_configuration_description, _provider_metadata_config_fallback_default_values)
2065 # that don't depend on this flag.
2066 self._invalidate_provider_flag_caches()
2067 try:
2068 yield
2069 finally:
2070 if not with_providers:
2071 self._use_providers_configuration = True
2072 self._invalidate_provider_flag_caches()