1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Base configuration parser with pure parsing logic."""
19
20from __future__ import annotations
21
22import contextlib
23import datetime
24import functools
25import itertools
26import json
27import logging
28import os
29import shlex
30import subprocess
31import sys
32import warnings
33from collections.abc import Callable, Generator, Iterable
34from configparser import ConfigParser, NoOptionError, NoSectionError
35from contextlib import contextmanager
36from enum import Enum
37from json.decoder import JSONDecodeError
38from re import Pattern
39from typing import IO, Any, TypeVar, overload
40
41from .exceptions import AirflowConfigException
42
43log = logging.getLogger(__name__)
44
45
46ConfigType = str | int | float | bool
47ConfigOptionsDictType = dict[str, ConfigType]
48ConfigSectionSourcesType = dict[str, str | tuple[str, str]]
49ConfigSourcesType = dict[str, ConfigSectionSourcesType]
50ENV_VAR_PREFIX = "AIRFLOW__"
51
52
53class ValueNotFound:
54 """Object of this is raised when a configuration value cannot be found."""
55
56 pass
57
58
59VALUE_NOT_FOUND_SENTINEL = ValueNotFound()
60
61
62@overload
63def expand_env_var(env_var: None) -> None: ...
64@overload
65def expand_env_var(env_var: str) -> str: ...
66
67
68def expand_env_var(env_var: str | None) -> str | None:
69 """
70 Expand (potentially nested) env vars.
71
72 Repeat and apply `expandvars` and `expanduser` until
73 interpolation stops having any effect.
74 """
75 if not env_var or not isinstance(env_var, str):
76 return env_var
77 while True:
78 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
79 if interpolated == env_var:
80 return interpolated
81 env_var = interpolated
82
83
84def run_command(command: str) -> str:
85 """Run command and returns stdout."""
86 process = subprocess.Popen(
87 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
88 )
89 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
90
91 if process.returncode != 0:
92 raise AirflowConfigException(
93 f"Cannot execute {command}. Error code is: {process.returncode}. "
94 f"Output: {output}, Stderr: {stderr}"
95 )
96
97 return output
98
99
100def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool:
101 """
102 Check if the config is a template.
103
104 :param configuration_description: description of configuration
105 :param section: section
106 :param key: key
107 :return: True if the config is a template
108 """
109 return configuration_description.get(section, {}).get(key, {}).get("is_template", False)
110
111
112class AirflowConfigParser(ConfigParser):
113 """
114 Base configuration parser with pure parsing logic.
115
116 This class provides the core parsing methods that work with:
117 - configuration_description: dict describing config options (required in __init__)
118 - _default_values: ConfigParser with default values (required in __init__)
119 - deprecated_options: class attribute mapping new -> old options
120 - deprecated_sections: class attribute mapping new -> old sections
121 """
122
123 # A mapping of section -> setting -> { old, replace } for deprecated default values.
124 # Subclasses can override this to define deprecated values that should be upgraded.
125 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {}
126
127 # A mapping of (new section, new option) -> (old section, old option, since_version).
128 # When reading new option, the old option will be checked to see if it exists. If it does a
129 # DeprecationWarning will be issued and the old option will be used instead
130 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
131 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"),
132 ("api", "base_url"): ("webserver", "base_url", "3.0"),
133 ("api", "host"): ("webserver", "web_server_host", "3.0"),
134 ("api", "port"): ("webserver", "web_server_port", "3.0"),
135 ("api", "workers"): ("webserver", "workers", "3.0"),
136 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"),
137 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"),
138 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"),
139 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"),
140 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"),
141 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"),
142 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"),
143 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"),
144 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"),
145 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"),
146 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"),
147 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"),
148 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"),
149 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"),
150 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"),
151 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"),
152 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"),
153 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"),
154 ("api", "page_size"): ("webserver", "page_size", "3.1.0"),
155 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"),
156 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"),
157 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"),
158 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"),
159 ("api", "log_config"): ("api", "access_logfile", "3.1.0"),
160 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"),
161 }
162
163 # A mapping of new section -> (old section, since_version).
164 deprecated_sections: dict[str, tuple[str, str]] = {}
165
166 @property
167 def _lookup_sequence(self) -> list[Callable]:
168 """
169 Define the sequence of lookup methods for get(). The definition here does not have provider lookup.
170
171 Subclasses can override this to customise lookup order.
172 """
173 return [
174 self._get_environment_variables,
175 self._get_option_from_config_file,
176 self._get_option_from_commands,
177 self._get_option_from_secrets,
178 self._get_option_from_defaults,
179 ]
180
181 @property
182 def _validators(self) -> list[Callable[[], None]]:
183 """
184 Return list of validators defined on a config parser class. Base class will return an empty list.
185
186 Subclasses can override this to customize the validators that are run during validation on the
187 config parser instance.
188 """
189 return []
190
191 def validate(self) -> None:
192 """Run all registered validators."""
193 for validator in self._validators:
194 validator()
195 self.is_validated = True
196
197 def _validate_deprecated_values(self) -> None:
198 """Validate and upgrade deprecated default values."""
199 for section, replacement in self.deprecated_values.items():
200 for name, info in replacement.items():
201 old, new = info
202 current_value = self.get(section, name, fallback="")
203 if self._using_old_value(old, current_value):
204 self.upgraded_values[(section, name)] = current_value
205 new_value = old.sub(new, current_value)
206 self._update_env_var(section=section, name=name, new_value=new_value)
207 self._create_future_warning(
208 name=name,
209 section=section,
210 current_value=current_value,
211 new_value=new_value,
212 )
213
214 def _using_old_value(self, old: Pattern, current_value: str) -> bool:
215 """Check if current_value matches the old pattern."""
216 return old.search(current_value) is not None
217
218 def _update_env_var(self, section: str, name: str, new_value: str) -> None:
219 """Update environment variable with new value."""
220 env_var = self._env_var_name(section, name)
221 # Set it as an env var so that any subprocesses keep the same override!
222 os.environ[env_var] = new_value
223
224 @staticmethod
225 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None:
226 """Create a FutureWarning for deprecated default values."""
227 warnings.warn(
228 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. "
229 f"This value has been changed to {new_value!r} in the running config, but please update your config.",
230 FutureWarning,
231 stacklevel=3,
232 )
233
234 def __init__(
235 self,
236 configuration_description: dict[str, dict[str, Any]],
237 _default_values: ConfigParser,
238 *args,
239 **kwargs,
240 ):
241 """
242 Initialize the parser.
243
244 :param configuration_description: Description of configuration options
245 :param _default_values: ConfigParser with default values
246 """
247 super().__init__(*args, **kwargs)
248 self.configuration_description = configuration_description
249 self._default_values = _default_values
250 self._suppress_future_warnings = False
251 self.upgraded_values = {}
252
253 @functools.cached_property
254 def inversed_deprecated_options(self):
255 """Build inverse mapping from old options to new options."""
256 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()}
257
258 @functools.cached_property
259 def inversed_deprecated_sections(self):
260 """Build inverse mapping from old sections to new sections."""
261 return {
262 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items()
263 }
264
265 @functools.cached_property
266 def sensitive_config_values(self) -> set[tuple[str, str]]:
267 """Get set of sensitive config values that should be masked."""
268 flattened = {
269 (s, k): item
270 for s, s_c in self.configuration_description.items()
271 for k, item in s_c.get("options", {}).items()
272 }
273 sensitive = {
274 (section.lower(), key.lower())
275 for (section, key), v in flattened.items()
276 if v.get("sensitive") is True
277 }
278 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options}
279 depr_section = {
280 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections
281 }
282 sensitive.update(depr_section, depr_option)
283 return sensitive
284
285 @overload # type: ignore[override]
286 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ...
287
288 @overload
289 def get(self, section: str, key: str, **kwargs) -> str | None: ...
290
291 def _update_defaults_from_string(self, config_string: str) -> None:
292 """
293 Update the defaults in _default_values based on values in config_string ("ini" format).
294
295 Override shared parser's method to add validation for template variables.
296 Note that those values are not validated and cannot contain variables because we are using
297 regular config parser to load them. This method is used to test the config parser in unit tests.
298
299 :param config_string: ini-formatted config string
300 """
301 parser = ConfigParser()
302 parser.read_string(config_string)
303 for section in parser.sections():
304 if section not in self._default_values.sections():
305 self._default_values.add_section(section)
306 errors = False
307 for key, value in parser.items(section):
308 if not self.is_template(section, key) and "{" in value:
309 errors = True
310 log.error(
311 "The %s.%s value %s read from string contains variable. This is not supported",
312 section,
313 key,
314 value,
315 )
316 self._default_values.set(section, key, value)
317 if errors:
318 raise AirflowConfigException(
319 f"The string config passed as default contains variables. "
320 f"This is not supported. String config: {config_string}"
321 )
322
323 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any:
324 """
325 Retrieve default value from default config parser.
326
327 This will retrieve the default value from the default config parser. Optionally a raw, stored
328 value can be retrieved by setting skip_interpolation to True. This is useful for example when
329 we want to write the default value to a file, and we don't want the interpolation to happen
330 as it is going to be done later when the config is read.
331
332 :param section: section of the config
333 :param key: key to use
334 :param fallback: fallback value to use
335 :param raw: if raw, then interpolation will be reversed
336 :param kwargs: other args
337 :return:
338 """
339 value = self._default_values.get(section, key, fallback=fallback, **kwargs)
340 if raw and value is not None:
341 return value.replace("%", "%%")
342 return value
343
344 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None:
345 """
346 Get Secret Backend if defined in airflow.cfg.
347
348 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
349 """
350 section = "workers" if worker_mode else "secrets"
351 key = "secrets_backend" if worker_mode else "backend"
352 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs"
353
354 secrets_backend_cls = self.getimport(section=section, key=key)
355
356 if not secrets_backend_cls:
357 if worker_mode:
358 # if we find no secrets backend for worker, return that of secrets backend
359 secrets_backend_cls = self.getimport(section="secrets", key="backend")
360 if not secrets_backend_cls:
361 return None
362 # When falling back to secrets backend, use its kwargs
363 kwargs_key = "backend_kwargs"
364 section = "secrets"
365 else:
366 return None
367
368 try:
369 backend_kwargs = self.getjson(section=section, key=kwargs_key)
370 if not backend_kwargs:
371 backend_kwargs = {}
372 elif not isinstance(backend_kwargs, dict):
373 raise ValueError("not a dict")
374 except AirflowConfigException:
375 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key)
376 backend_kwargs = {}
377 except ValueError:
378 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key)
379 backend_kwargs = {}
380
381 return secrets_backend_cls(**backend_kwargs)
382
383 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
384 """
385 Get Config option values from Secret Backend.
386
387 Called by the shared parser's _get_secret_option() method as part of the lookup chain.
388 Uses _get_custom_secret_backend() to get the backend instance.
389
390 :param config_key: the config key to retrieve
391 :return: config value or None
392 """
393 try:
394 secrets_client = self._get_custom_secret_backend()
395 if not secrets_client:
396 return None
397 return secrets_client.get_config(config_key)
398 except Exception as e:
399 raise AirflowConfigException(
400 "Cannot retrieve config from alternative secrets backend. "
401 "Make sure it is configured properly and that the Backend "
402 "is accessible.\n"
403 f"{e}"
404 )
405
406 def _get_cmd_option_from_config_sources(
407 self, config_sources: ConfigSourcesType, section: str, key: str
408 ) -> str | None:
409 fallback_key = key + "_cmd"
410 if (section, key) in self.sensitive_config_values:
411 section_dict = config_sources.get(section)
412 if section_dict is not None:
413 command_value = section_dict.get(fallback_key)
414 if command_value is not None:
415 if isinstance(command_value, str):
416 command = command_value
417 else:
418 command = command_value[0]
419 return run_command(command)
420 return None
421
422 def _get_secret_option_from_config_sources(
423 self, config_sources: ConfigSourcesType, section: str, key: str
424 ) -> str | None:
425 fallback_key = key + "_secret"
426 if (section, key) in self.sensitive_config_values:
427 section_dict = config_sources.get(section)
428 if section_dict is not None:
429 secrets_path_value = section_dict.get(fallback_key)
430 if secrets_path_value is not None:
431 if isinstance(secrets_path_value, str):
432 secrets_path = secrets_path_value
433 else:
434 secrets_path = secrets_path_value[0]
435 return self._get_config_value_from_secret_backend(secrets_path)
436 return None
437
438 def _include_secrets(
439 self,
440 config_sources: ConfigSourcesType,
441 display_sensitive: bool,
442 display_source: bool,
443 raw: bool,
444 ):
445 for section, key in self.sensitive_config_values:
446 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key)
447 if value:
448 if not display_sensitive:
449 value = "< hidden >"
450 if display_source:
451 opt: str | tuple[str, str] = (value, "secret")
452 elif raw:
453 opt = value.replace("%", "%%")
454 else:
455 opt = value
456 config_sources.setdefault(section, {}).update({key: opt})
457 del config_sources[section][key + "_secret"]
458
459 def _include_commands(
460 self,
461 config_sources: ConfigSourcesType,
462 display_sensitive: bool,
463 display_source: bool,
464 raw: bool,
465 ):
466 for section, key in self.sensitive_config_values:
467 opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
468 if not opt:
469 continue
470 opt_to_set: str | tuple[str, str] | None = opt
471 if not display_sensitive:
472 opt_to_set = "< hidden >"
473 if display_source:
474 opt_to_set = (str(opt_to_set), "cmd")
475 elif raw:
476 opt_to_set = str(opt_to_set).replace("%", "%%")
477 if opt_to_set is not None:
478 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set}
479 config_sources.setdefault(section, {}).update(dict_to_update)
480 del config_sources[section][key + "_cmd"]
481
482 def _include_envs(
483 self,
484 config_sources: ConfigSourcesType,
485 display_sensitive: bool,
486 display_source: bool,
487 raw: bool,
488 ):
489 for env_var in [
490 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
491 ]:
492 try:
493 _, section, key = env_var.split("__", 2)
494 opt = self._get_env_var_option(section, key)
495 except ValueError:
496 continue
497 if opt is None:
498 log.warning("Ignoring unknown env var '%s'", env_var)
499 continue
500 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"):
501 # Don't hide cmd/secret values here
502 if not env_var.lower().endswith(("cmd", "secret")):
503 if (section, key) in self.sensitive_config_values:
504 opt = "< hidden >"
505 elif raw:
506 opt = opt.replace("%", "%%")
507 if display_source:
508 opt = (opt, "env var")
509
510 section = section.lower()
511 key = key.lower()
512 config_sources.setdefault(section, {}).update({key: opt})
513
514 def _filter_by_source(
515 self,
516 config_sources: ConfigSourcesType,
517 display_source: bool,
518 getter_func,
519 ):
520 """
521 Delete default configs from current configuration.
522
523 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values.
524
525 This is necessary because bare configs take precedence over the command
526 or secret key equivalents so if the current running config is
527 materialized with Airflow defaults they in turn override user set
528 command or secret key configs.
529
530 :param config_sources: The current configuration to operate on
531 :param display_source: If False, configuration options contain raw
532 values. If True, options are a tuple of (option_value, source).
533 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
534 :param getter_func: A callback function that gets the user configured
535 override value for a particular sensitive_config_values config.
536 :return: None, the given config_sources is filtered if necessary,
537 otherwise untouched.
538 """
539 for section, key in self.sensitive_config_values:
540 # Don't bother if we don't have section / key
541 if section not in config_sources or key not in config_sources[section]:
542 continue
543 # Check that there is something to override defaults
544 try:
545 getter_opt = getter_func(section, key)
546 except ValueError:
547 continue
548 if not getter_opt:
549 continue
550 # Check to see that there is a default value
551 if self.get_default_value(section, key) is None:
552 continue
553 # Check to see if bare setting is the same as defaults
554 if display_source:
555 # when display_source = true, we know that the config_sources contains tuple
556 opt, source = config_sources[section][key] # type: ignore
557 else:
558 opt = config_sources[section][key]
559 if opt == self.get_default_value(section, key):
560 del config_sources[section][key]
561
562 @staticmethod
563 def _deprecated_value_is_set_in_config(
564 deprecated_section: str,
565 deprecated_key: str,
566 configs: Iterable[tuple[str, ConfigParser]],
567 ) -> bool:
568 for config_type, config in configs:
569 if config_type != "default":
570 with contextlib.suppress(NoSectionError):
571 deprecated_section_array = config.items(section=deprecated_section, raw=True)
572 if any(key == deprecated_key for key, _ in deprecated_section_array):
573 return True
574 return False
575
576 @staticmethod
577 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
578 return (
579 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}")
580 is not None
581 )
582
583 @staticmethod
584 def _deprecated_command_is_set_in_config(
585 deprecated_section: str,
586 deprecated_key: str,
587 configs: Iterable[tuple[str, ConfigParser]],
588 ) -> bool:
589 return AirflowConfigParser._deprecated_value_is_set_in_config(
590 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
591 )
592
593 @staticmethod
594 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
595 return (
596 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD")
597 is not None
598 )
599
600 @staticmethod
601 def _deprecated_secret_is_set_in_config(
602 deprecated_section: str,
603 deprecated_key: str,
604 configs: Iterable[tuple[str, ConfigParser]],
605 ) -> bool:
606 return AirflowConfigParser._deprecated_value_is_set_in_config(
607 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
608 )
609
610 @staticmethod
611 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
612 return (
613 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET")
614 is not None
615 )
616
617 @staticmethod
618 def _replace_config_with_display_sources(
619 config_sources: ConfigSourcesType,
620 configs: Iterable[tuple[str, ConfigParser]],
621 configuration_description: dict[str, dict[str, Any]],
622 display_source: bool,
623 raw: bool,
624 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
625 include_env: bool,
626 include_cmds: bool,
627 include_secret: bool,
628 ):
629 for source_name, config in configs:
630 sections = config.sections()
631 for section in sections:
632 AirflowConfigParser._replace_section_config_with_display_sources(
633 config,
634 config_sources,
635 configuration_description,
636 display_source,
637 raw,
638 section,
639 source_name,
640 deprecated_options,
641 configs,
642 include_env=include_env,
643 include_cmds=include_cmds,
644 include_secret=include_secret,
645 )
646
647 @staticmethod
648 def _replace_section_config_with_display_sources(
649 config: ConfigParser,
650 config_sources: ConfigSourcesType,
651 configuration_description: dict[str, dict[str, Any]],
652 display_source: bool,
653 raw: bool,
654 section: str,
655 source_name: str,
656 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
657 configs: Iterable[tuple[str, ConfigParser]],
658 include_env: bool,
659 include_cmds: bool,
660 include_secret: bool,
661 ):
662 sect = config_sources.setdefault(section, {})
663 if isinstance(config, AirflowConfigParser):
664 with config.suppress_future_warnings():
665 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw)
666 else:
667 items = config.items(section=section, raw=raw)
668 for k, val in items:
669 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
670 if deprecated_section and deprecated_key:
671 if source_name == "default":
672 # If deprecated entry has some non-default value set for any of the sources requested,
673 # We should NOT set default for the new entry (because it will override anything
674 # coming from the deprecated ones)
675 if AirflowConfigParser._deprecated_value_is_set_in_config(
676 deprecated_section, deprecated_key, configs
677 ):
678 continue
679 if include_env and AirflowConfigParser._deprecated_variable_is_set(
680 deprecated_section, deprecated_key
681 ):
682 continue
683 if include_cmds and (
684 AirflowConfigParser._deprecated_variable_command_is_set(
685 deprecated_section, deprecated_key
686 )
687 or AirflowConfigParser._deprecated_command_is_set_in_config(
688 deprecated_section, deprecated_key, configs
689 )
690 ):
691 continue
692 if include_secret and (
693 AirflowConfigParser._deprecated_variable_secret_is_set(
694 deprecated_section, deprecated_key
695 )
696 or AirflowConfigParser._deprecated_secret_is_set_in_config(
697 deprecated_section, deprecated_key, configs
698 )
699 ):
700 continue
701 if display_source:
702 updated_source_name = source_name
703 if source_name == "default":
704 # defaults can come from other sources (default-<PROVIDER>) that should be used here
705 source_description_section = configuration_description.get(section, {})
706 source_description_key = source_description_section.get("options", {}).get(k, {})
707 if source_description_key is not None:
708 updated_source_name = source_description_key.get("source", source_name)
709 sect[k] = (val, updated_source_name)
710 else:
711 sect[k] = val
712
713 def _warn_deprecate(
714 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int
715 ):
716 """Warn about deprecated config option usage."""
717 if section == deprecated_section:
718 warnings.warn(
719 f"The {deprecated_name} option in [{section}] has been renamed to {key} - "
720 f"the old setting has been used, but please update your config.",
721 DeprecationWarning,
722 stacklevel=4 + extra_stacklevel,
723 )
724 else:
725 warnings.warn(
726 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option "
727 f"in [{section}] - the old setting has been used, but please update your config.",
728 DeprecationWarning,
729 stacklevel=4 + extra_stacklevel,
730 )
731
732 @contextmanager
733 def suppress_future_warnings(self):
734 """
735 Context manager to temporarily suppress future warnings.
736
737 This is a stub used by the shared parser's lookup methods when checking deprecated options.
738 Subclasses can override this to customize warning suppression behavior.
739
740 :return: context manager that suppresses future warnings
741 """
742 suppress_future_warnings = self._suppress_future_warnings
743 self._suppress_future_warnings = True
744 yield self
745 self._suppress_future_warnings = suppress_future_warnings
746
747 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str:
748 """Generate environment variable name for a config option."""
749 team_component: str = f"{team_name.upper()}___" if team_name else ""
750 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}"
751
752 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None):
753 """Get config option from environment variable."""
754 env_var: str = self._env_var_name(section, key, team_name=team_name)
755 if env_var in os.environ:
756 return expand_env_var(os.environ[env_var])
757 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
758 env_var_cmd = env_var + "_CMD"
759 if env_var_cmd in os.environ:
760 # if this is a valid command key...
761 if (section, key) in self.sensitive_config_values:
762 return run_command(os.environ[env_var_cmd])
763 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
764 env_var_secret_path = env_var + "_SECRET"
765 if env_var_secret_path in os.environ:
766 # if this is a valid secret path...
767 if (section, key) in self.sensitive_config_values:
768 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path])
769 return None
770
771 def _get_cmd_option(self, section: str, key: str):
772 """Get config option from command execution."""
773 fallback_key = key + "_cmd"
774 if (section, key) in self.sensitive_config_values:
775 if super().has_option(section, fallback_key):
776 command = super().get(section, fallback_key)
777 try:
778 cmd_output = run_command(command)
779 except AirflowConfigException as e:
780 raise e
781 except Exception as e:
782 raise AirflowConfigException(
783 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd."
784 f" Please check the {fallback_key} value."
785 ) from e
786 return cmd_output
787 return None
788
789 def _get_secret_option(self, section: str, key: str) -> str | None:
790 """Get Config option values from Secret Backend."""
791 fallback_key = key + "_secret"
792 if (section, key) in self.sensitive_config_values:
793 if super().has_option(section, fallback_key):
794 secrets_path = super().get(section, fallback_key)
795 return self._get_config_value_from_secret_backend(secrets_path)
796 return None
797
798 def _get_environment_variables(
799 self,
800 deprecated_key: str | None,
801 deprecated_section: str | None,
802 key: str,
803 section: str,
804 issue_warning: bool = True,
805 extra_stacklevel: int = 0,
806 **kwargs,
807 ) -> str | ValueNotFound:
808 """Get config option from environment variables."""
809 team_name = kwargs.get("team_name", None)
810 option = self._get_env_var_option(section, key, team_name=team_name)
811 if option is not None:
812 return option
813 if deprecated_section and deprecated_key:
814 with self.suppress_future_warnings():
815 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name)
816 if option is not None:
817 if issue_warning:
818 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
819 return option
820 return VALUE_NOT_FOUND_SENTINEL
821
822 def _get_option_from_config_file(
823 self,
824 deprecated_key: str | None,
825 deprecated_section: str | None,
826 key: str,
827 section: str,
828 issue_warning: bool = True,
829 extra_stacklevel: int = 0,
830 **kwargs,
831 ) -> str | ValueNotFound:
832 """Get config option from config file."""
833 if team_name := kwargs.get("team_name", None):
834 section = f"{team_name}={section}"
835 # since this is the last lookup that supports team_name, pop it
836 kwargs.pop("team_name")
837 if super().has_option(section, key):
838 return expand_env_var(super().get(section, key, **kwargs))
839 if deprecated_section and deprecated_key:
840 if super().has_option(deprecated_section, deprecated_key):
841 if issue_warning:
842 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
843 with self.suppress_future_warnings():
844 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
845 return VALUE_NOT_FOUND_SENTINEL
846
847 def _get_option_from_commands(
848 self,
849 deprecated_key: str | None,
850 deprecated_section: str | None,
851 key: str,
852 section: str,
853 issue_warning: bool = True,
854 extra_stacklevel: int = 0,
855 **kwargs,
856 ) -> str | ValueNotFound:
857 """Get config option from command execution."""
858 option = self._get_cmd_option(section, key)
859 if option:
860 return option
861 if deprecated_section and deprecated_key:
862 with self.suppress_future_warnings():
863 option = self._get_cmd_option(deprecated_section, deprecated_key)
864 if option:
865 if issue_warning:
866 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
867 return option
868 return VALUE_NOT_FOUND_SENTINEL
869
870 def _get_option_from_secrets(
871 self,
872 deprecated_key: str | None,
873 deprecated_section: str | None,
874 key: str,
875 section: str,
876 issue_warning: bool = True,
877 extra_stacklevel: int = 0,
878 **kwargs,
879 ) -> str | ValueNotFound:
880 """Get config option from secrets backend."""
881 option = self._get_secret_option(section, key)
882 if option:
883 return option
884 if deprecated_section and deprecated_key:
885 with self.suppress_future_warnings():
886 option = self._get_secret_option(deprecated_section, deprecated_key)
887 if option:
888 if issue_warning:
889 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
890 return option
891 return VALUE_NOT_FOUND_SENTINEL
892
893 def _get_option_from_defaults(
894 self,
895 deprecated_key: str | None,
896 deprecated_section: str | None,
897 key: str,
898 section: str,
899 issue_warning: bool = True,
900 extra_stacklevel: int = 0,
901 team_name: str | None = None,
902 **kwargs,
903 ) -> str | ValueNotFound:
904 """Get config option from default values."""
905 if self.get_default_value(section, key) is not None or "fallback" in kwargs:
906 return expand_env_var(self.get_default_value(section, key, **kwargs))
907 return VALUE_NOT_FOUND_SENTINEL
908
909 def _resolve_deprecated_lookup(
910 self,
911 section: str,
912 key: str,
913 lookup_from_deprecated: bool,
914 extra_stacklevel: int = 0,
915 ) -> tuple[str, str, str | None, str | None, bool]:
916 """
917 Resolve deprecated section/key mappings and determine deprecated values.
918
919 :param section: Section name (will be lowercased)
920 :param key: Key name (will be lowercased)
921 :param lookup_from_deprecated: Whether to lookup from deprecated options
922 :param extra_stacklevel: Extra stack level for warnings
923 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted)
924 """
925 section = section.lower()
926 key = key.lower()
927 warning_emitted = False
928 deprecated_section: str | None = None
929 deprecated_key: str | None = None
930
931 if not lookup_from_deprecated:
932 return section, key, deprecated_section, deprecated_key, warning_emitted
933
934 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {})
935 if option_description.get("deprecated"):
936 deprecation_reason = option_description.get("deprecation_reason", "")
937 warnings.warn(
938 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}",
939 DeprecationWarning,
940 stacklevel=2 + extra_stacklevel,
941 )
942 # For the cases in which we rename whole sections
943 if section in self.inversed_deprecated_sections:
944 deprecated_section, deprecated_key = (section, key)
945 section = self.inversed_deprecated_sections[section]
946 if not self._suppress_future_warnings:
947 warnings.warn(
948 f"The config section [{deprecated_section}] has been renamed to "
949 f"[{section}]. Please update your `conf.get*` call to use the new name",
950 FutureWarning,
951 stacklevel=2 + extra_stacklevel,
952 )
953 # Don't warn about individual rename if the whole section is renamed
954 warning_emitted = True
955 elif (section, key) in self.inversed_deprecated_options:
956 # Handle using deprecated section/key instead of the new section/key
957 new_section, new_key = self.inversed_deprecated_options[(section, key)]
958 if not self._suppress_future_warnings and not warning_emitted:
959 warnings.warn(
960 f"section/key [{section}/{key}] has been deprecated, you should use"
961 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
962 "new name",
963 FutureWarning,
964 stacklevel=2 + extra_stacklevel,
965 )
966 warning_emitted = True
967 deprecated_section, deprecated_key = section, key
968 section, key = (new_section, new_key)
969 elif section in self.deprecated_sections:
970 # When accessing the new section name, make sure we check under the old config name
971 deprecated_key = key
972 deprecated_section = self.deprecated_sections[section][0]
973 else:
974 deprecated_section, deprecated_key, _ = self.deprecated_options.get(
975 (section, key), (None, None, None)
976 )
977
978 return section, key, deprecated_section, deprecated_key, warning_emitted
979
980 @overload # type: ignore[override]
981 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ...
982
983 @overload # type: ignore[override]
984 def get(self, section: str, key: str, **kwargs) -> str | None: ...
985
986 def get( # type: ignore[misc, override]
987 self,
988 section: str,
989 key: str,
990 suppress_warnings: bool = False,
991 lookup_from_deprecated: bool = True,
992 _extra_stacklevel: int = 0,
993 team_name: str | None = None,
994 **kwargs,
995 ) -> str | None:
996 """
997 Get config value by iterating through lookup sequence.
998
999 Priority order is defined by _lookup_sequence property.
1000 """
1001 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup(
1002 section=section,
1003 key=key,
1004 lookup_from_deprecated=lookup_from_deprecated,
1005 extra_stacklevel=_extra_stacklevel,
1006 )
1007
1008 if team_name is not None:
1009 kwargs["team_name"] = team_name
1010
1011 for lookup_method in self._lookup_sequence:
1012 value = lookup_method(
1013 deprecated_key=deprecated_key,
1014 deprecated_section=deprecated_section,
1015 key=key,
1016 section=section,
1017 issue_warning=not warning_emitted,
1018 extra_stacklevel=_extra_stacklevel,
1019 **kwargs,
1020 )
1021 if value is not VALUE_NOT_FOUND_SENTINEL:
1022 return value
1023
1024 # Check if fallback was explicitly provided (even if None)
1025 if "fallback" in kwargs:
1026 return kwargs["fallback"]
1027
1028 if not suppress_warnings:
1029 log.warning("section/key [%s/%s] not found in config", section, key)
1030
1031 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
1032
1033 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override]
1034 """Get config value as boolean."""
1035 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip()
1036 if "#" in val:
1037 val = val.split("#")[0].strip()
1038 if val in ("t", "true", "1"):
1039 return True
1040 if val in ("f", "false", "0"):
1041 return False
1042 raise AirflowConfigException(
1043 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. '
1044 f'Current value: "{val}".'
1045 )
1046
1047 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override]
1048 """Get config value as integer."""
1049 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1050 if val is None:
1051 raise AirflowConfigException(
1052 f"Failed to convert value None to int. "
1053 f'Please check "{key}" key in "{section}" section is set.'
1054 )
1055 try:
1056 return int(val)
1057 except ValueError:
1058 raise AirflowConfigException(
1059 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1060 f'Current value: "{val}".'
1061 )
1062
1063 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override]
1064 """Get config value as float."""
1065 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1066 if val is None:
1067 raise AirflowConfigException(
1068 f"Failed to convert value None to float. "
1069 f'Please check "{key}" key in "{section}" section is set.'
1070 )
1071 try:
1072 return float(val)
1073 except ValueError:
1074 raise AirflowConfigException(
1075 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. '
1076 f'Current value: "{val}".'
1077 )
1078
1079 def getlist(self, section: str, key: str, delimiter=",", **kwargs):
1080 """Get config value as list."""
1081 val = self.get(section, key, **kwargs)
1082 if val is None:
1083 if "fallback" in kwargs:
1084 return kwargs["fallback"]
1085 raise AirflowConfigException(
1086 f"Failed to convert value None to list. "
1087 f'Please check "{key}" key in "{section}" section is set.'
1088 )
1089 try:
1090 return [item.strip() for item in val.split(delimiter)]
1091 except Exception:
1092 raise AirflowConfigException(
1093 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. '
1094 f'Current value: "{val}".'
1095 )
1096
1097 E = TypeVar("E", bound=Enum)
1098
1099 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E:
1100 """Get config value as enum."""
1101 val = self.get(section, key, **kwargs)
1102 enum_names = [enum_item.name for enum_item in enum_class]
1103
1104 if val is None:
1105 raise AirflowConfigException(
1106 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1107 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}'
1108 )
1109
1110 try:
1111 return enum_class[val]
1112 except KeyError:
1113 if "fallback" in kwargs and kwargs["fallback"] in enum_names:
1114 return enum_class[kwargs["fallback"]]
1115 raise AirflowConfigException(
1116 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1117 f"the value must be one of {', '.join(enum_names)}"
1118 )
1119
1120 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]:
1121 """Get config value as list of enums."""
1122 string_list = self.getlist(section, key, delimiter, **kwargs)
1123 enum_names = [enum_item.name for enum_item in enum_class]
1124 enum_list = []
1125
1126 for val in string_list:
1127 try:
1128 enum_list.append(enum_class[val])
1129 except KeyError:
1130 log.warning(
1131 "Failed to convert value. Please check %s key in %s section. "
1132 "it must be one of %s, if not the value is ignored",
1133 key,
1134 section,
1135 ", ".join(enum_names),
1136 )
1137
1138 return enum_list
1139
1140 def getimport(self, section: str, key: str, **kwargs) -> Any:
1141 """
1142 Read options, import the full qualified name, and return the object.
1143
1144 In case of failure, it throws an exception with the key and section names
1145
1146 :return: The object or None, if the option is empty
1147 """
1148 # Fixed: use self.get() instead of conf.get()
1149 full_qualified_path = self.get(section=section, key=key, **kwargs)
1150 if not full_qualified_path:
1151 return None
1152
1153 try:
1154 # Import here to avoid circular dependency
1155 from ..module_loading import import_string
1156
1157 return import_string(full_qualified_path)
1158 except ImportError as e:
1159 log.warning(e)
1160 raise AirflowConfigException(
1161 f'The object could not be loaded. Please check "{key}" key in "{section}" section. '
1162 f'Current value: "{full_qualified_path}".'
1163 )
1164
1165 def getjson(
1166 self, section: str, key: str, fallback=None, **kwargs
1167 ) -> dict | list | str | int | float | None:
1168 """
1169 Return a config value parsed from a JSON string.
1170
1171 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given.
1172 """
1173 try:
1174 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs)
1175 except (NoSectionError, NoOptionError):
1176 data = None
1177
1178 if data is None or data == "":
1179 return fallback
1180
1181 try:
1182 return json.loads(data)
1183 except JSONDecodeError as e:
1184 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e
1185
1186 def gettimedelta(
1187 self, section: str, key: str, fallback: Any = None, **kwargs
1188 ) -> datetime.timedelta | None:
1189 """
1190 Get the config value for the given section and key, and convert it into datetime.timedelta object.
1191
1192 If the key is missing, then it is considered as `None`.
1193
1194 :param section: the section from the config
1195 :param key: the key defined in the given section
1196 :param fallback: fallback value when no config value is given, defaults to None
1197 :raises AirflowConfigException: raised because ValueError or OverflowError
1198 :return: datetime.timedelta(seconds=<config_value>) or None
1199 """
1200 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs)
1201
1202 if val:
1203 # the given value must be convertible to integer
1204 try:
1205 int_val = int(val)
1206 except ValueError:
1207 raise AirflowConfigException(
1208 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1209 f'Current value: "{val}".'
1210 )
1211
1212 try:
1213 return datetime.timedelta(seconds=int_val)
1214 except OverflowError as err:
1215 raise AirflowConfigException(
1216 f"Failed to convert value to timedelta in `seconds`. "
1217 f"{err}. "
1218 f'Please check "{key}" key in "{section}" section. Current value: "{val}".'
1219 )
1220
1221 return fallback
1222
1223 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
1224 """Get mandatory config value, raising ValueError if not found."""
1225 value = self.get(section, key, _extra_stacklevel=1, **kwargs)
1226 if value is None:
1227 raise ValueError(f"The value {section}/{key} should be set!")
1228 return value
1229
1230 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]:
1231 """Get mandatory config value as list, raising ValueError if not found."""
1232 value = self.getlist(section, key, **kwargs)
1233 if value is None:
1234 raise ValueError(f"The value {section}/{key} should be set!")
1235 return value
1236
1237 def read(
1238 self,
1239 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike],
1240 encoding: str | None = None,
1241 ) -> list[str]:
1242 return super().read(filenames=filenames, encoding=encoding)
1243
1244 def read_dict( # type: ignore[override]
1245 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>"
1246 ) -> None:
1247 """
1248 We define a different signature here to add better type hints and checking.
1249
1250 :param dictionary: dictionary to read from
1251 :param source: source to be used to store the configuration
1252 :return:
1253 """
1254 super().read_dict(dictionary=dictionary, source=source)
1255
1256 def get_sections_including_defaults(self) -> list[str]:
1257 """
1258 Retrieve all sections from the configuration parser, including sections defined by built-in defaults.
1259
1260 :return: list of section names
1261 """
1262 sections_from_config = self.sections()
1263 sections_from_description = list(self.configuration_description.keys())
1264 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config)))
1265
1266 def get_options_including_defaults(self, section: str) -> list[str]:
1267 """
1268 Retrieve all possible options from the configuration parser for the section given.
1269
1270 Includes options defined by built-in defaults.
1271
1272 :param section: section name
1273 :return: list of option names for the section given
1274 """
1275 my_own_options = self.options(section) if self.has_section(section) else []
1276 all_options_from_defaults = list(
1277 self.configuration_description.get(section, {}).get("options", {}).keys()
1278 )
1279 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options)))
1280
1281 def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True, **kwargs) -> bool:
1282 """
1283 Check if option is defined.
1284
1285 Uses self.get() to avoid reimplementing the priority order of config variables
1286 (env, config, cmd, defaults).
1287
1288 :param section: section to get option from
1289 :param option: option to get
1290 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections
1291 :param kwargs: additional keyword arguments to pass to get(), such as team_name
1292 :return:
1293 """
1294 try:
1295 value = self.get(
1296 section,
1297 option,
1298 fallback=None,
1299 _extra_stacklevel=1,
1300 suppress_warnings=True,
1301 lookup_from_deprecated=lookup_from_deprecated,
1302 **kwargs,
1303 )
1304 if value is None:
1305 return False
1306 return True
1307 except (NoOptionError, NoSectionError, AirflowConfigException):
1308 return False
1309
1310 def set(self, section: str, option: str, value: str | None = None) -> None:
1311 """
1312 Set an option to the given value.
1313
1314 This override just makes sure the section and option are lower case, to match what we do in `get`.
1315 """
1316 section = section.lower()
1317 option = option.lower()
1318 defaults = self.configuration_description or {}
1319 if not self.has_section(section) and section in defaults:
1320 # Trying to set a key in a section that exists in default, but not in the user config;
1321 # automatically create it
1322 self.add_section(section)
1323 super().set(section, option, value)
1324
1325 def remove_option(self, section: str, option: str, remove_default: bool = True):
1326 """
1327 Remove an option if it exists in config from a file or default config.
1328
1329 If both of config have the same option, this removes the option
1330 in both configs unless remove_default=False.
1331 """
1332 section = section.lower()
1333 option = option.lower()
1334 if super().has_option(section, option):
1335 super().remove_option(section, option)
1336
1337 if self.get_default_value(section, option) is not None and remove_default:
1338 self._default_values.remove_option(section, option)
1339
1340 def optionxform(self, optionstr: str) -> str:
1341 """
1342 Transform option names on every read, get, or set operation.
1343
1344 This changes from the default behaviour of ConfigParser from lower-casing
1345 to instead be case-preserving.
1346
1347 :param optionstr:
1348 :return:
1349 """
1350 return optionstr
1351
1352 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]:
1353 """
1354 Get list of config sources to use in as_dict().
1355
1356 Subclasses can override to add additional sources (e.g., provider configs).
1357 """
1358 return [
1359 ("default", self._default_values),
1360 ("airflow.cfg", self),
1361 ]
1362
1363 def as_dict(
1364 self,
1365 display_source: bool = False,
1366 display_sensitive: bool = False,
1367 raw: bool = False,
1368 include_env: bool = True,
1369 include_cmds: bool = True,
1370 include_secret: bool = True,
1371 ) -> ConfigSourcesType:
1372 """
1373 Return the current configuration as an OrderedDict of OrderedDicts.
1374
1375 When materializing current configuration Airflow defaults are
1376 materialized along with user set configs. If any of the `include_*`
1377 options are False then the result of calling command or secret key
1378 configs do not override Airflow defaults and instead are passed through.
1379 In order to then avoid Airflow defaults from overwriting user set
1380 command or secret key configs we filter out bare sensitive_config_values
1381 that are set to Airflow defaults when command or secret key configs
1382 produce different values.
1383
1384 :param display_source: If False, the option value is returned. If True,
1385 a tuple of (option_value, source) is returned. Source is either
1386 'airflow.cfg', 'default', 'env var', or 'cmd'.
1387 :param display_sensitive: If True, the values of options set by env
1388 vars and bash commands will be displayed. If False, those options
1389 are shown as '< hidden >'
1390 :param raw: Should the values be output as interpolated values, or the
1391 "raw" form that can be fed back in to ConfigParser
1392 :param include_env: Should the value of configuration from AIRFLOW__
1393 environment variables be included or not
1394 :param include_cmds: Should the result of calling any ``*_cmd`` config be
1395 set (True, default), or should the _cmd options be left as the
1396 command to run (False)
1397 :param include_secret: Should the result of calling any ``*_secret`` config be
1398 set (True, default), or should the _secret options be left as the
1399 path to get the secret from (False)
1400 :return: Dictionary, where the key is the name of the section and the content is
1401 the dictionary with the name of the parameter and its value.
1402 """
1403 if not display_sensitive:
1404 # We want to hide the sensitive values at the appropriate methods
1405 # since envs from cmds, secrets can be read at _include_envs method
1406 if not all([include_env, include_cmds, include_secret]):
1407 raise ValueError(
1408 "If display_sensitive is false, then include_env, "
1409 "include_cmds, include_secret must all be set as True"
1410 )
1411
1412 config_sources: ConfigSourcesType = {}
1413
1414 # We check sequentially all those sources and the last one we saw it in will "win"
1415 configs = self._get_config_sources_for_as_dict()
1416
1417 self._replace_config_with_display_sources(
1418 config_sources,
1419 configs,
1420 self.configuration_description,
1421 display_source,
1422 raw,
1423 self.deprecated_options,
1424 include_cmds=include_cmds,
1425 include_env=include_env,
1426 include_secret=include_secret,
1427 )
1428
1429 # add env vars and overwrite because they have priority
1430 if include_env:
1431 self._include_envs(config_sources, display_sensitive, display_source, raw)
1432 else:
1433 self._filter_by_source(config_sources, display_source, self._get_env_var_option)
1434
1435 # add bash commands
1436 if include_cmds:
1437 self._include_commands(config_sources, display_sensitive, display_source, raw)
1438 else:
1439 self._filter_by_source(config_sources, display_source, self._get_cmd_option)
1440
1441 # add config from secret backends
1442 if include_secret:
1443 self._include_secrets(config_sources, display_sensitive, display_source, raw)
1444 else:
1445 self._filter_by_source(config_sources, display_source, self._get_secret_option)
1446
1447 if not display_sensitive:
1448 # This ensures the ones from config file is hidden too
1449 # if they are not provided through env, cmd and secret
1450 hidden = "< hidden >"
1451 for section, key in self.sensitive_config_values:
1452 if config_sources.get(section):
1453 if config_sources[section].get(key, None):
1454 if display_source:
1455 source = config_sources[section][key][1]
1456 config_sources[section][key] = (hidden, source)
1457 else:
1458 config_sources[section][key] = hidden
1459
1460 return config_sources
1461
1462 def _write_option_header(
1463 self,
1464 file: IO[str],
1465 option: str,
1466 extra_spacing: bool,
1467 include_descriptions: bool,
1468 include_env_vars: bool,
1469 include_examples: bool,
1470 include_sources: bool,
1471 section_config_description: dict[str, dict[str, Any]],
1472 section_to_write: str,
1473 sources_dict: ConfigSourcesType,
1474 ) -> tuple[bool, bool]:
1475 """
1476 Write header for configuration option.
1477
1478 Returns tuple of (should_continue, needs_separation) where needs_separation should be
1479 set if the option needs additional separation to visually separate it from the next option.
1480 """
1481 option_config_description = (
1482 section_config_description.get("options", {}).get(option, {})
1483 if section_config_description
1484 else {}
1485 )
1486 description = option_config_description.get("description")
1487 needs_separation = False
1488 if description and include_descriptions:
1489 for line in description.splitlines():
1490 file.write(f"# {line}\n")
1491 needs_separation = True
1492 example = option_config_description.get("example")
1493 if example is not None and include_examples:
1494 if extra_spacing:
1495 file.write("#\n")
1496 example_lines = example.splitlines()
1497 example = "\n# ".join(example_lines)
1498 file.write(f"# Example: {option} = {example}\n")
1499 needs_separation = True
1500 if include_sources and sources_dict:
1501 sources_section = sources_dict.get(section_to_write)
1502 value_with_source = sources_section.get(option) if sources_section else None
1503 if value_with_source is None:
1504 file.write("#\n# Source: not defined\n")
1505 else:
1506 file.write(f"#\n# Source: {value_with_source[1]}\n")
1507 needs_separation = True
1508 if include_env_vars:
1509 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n")
1510 if extra_spacing:
1511 file.write("#\n")
1512 needs_separation = True
1513 return True, needs_separation
1514
1515 def is_template(self, section: str, key) -> bool:
1516 """
1517 Return whether the value is templated.
1518
1519 :param section: section of the config
1520 :param key: key in the section
1521 :return: True if the value is templated
1522 """
1523 return _is_template(self.configuration_description, section, key)
1524
1525 def getsection(self, section: str, team_name: str | None = None) -> ConfigOptionsDictType | None:
1526 """
1527 Return the section as a dict.
1528
1529 Values are converted to int, float, bool as required.
1530
1531 :param section: section from the config
1532 :param team_name: optional team name for team-specific configuration lookup
1533 """
1534 # Handle team-specific section lookup for config file
1535 config_section = f"{team_name}={section}" if team_name else section
1536
1537 if not self.has_section(config_section) and not self._default_values.has_section(config_section):
1538 return None
1539 if self._default_values.has_section(config_section):
1540 _section: ConfigOptionsDictType = dict(self._default_values.items(config_section))
1541 else:
1542 _section = {}
1543
1544 if self.has_section(config_section):
1545 _section.update(self.items(config_section))
1546
1547 # Use section (not config_section) for env var lookup - team_name is handled by _env_var_name
1548 section_prefix = self._env_var_name(section, "", team_name=team_name)
1549 for env_var in sorted(os.environ.keys()):
1550 if env_var.startswith(section_prefix):
1551 key = env_var.replace(section_prefix, "")
1552 if key.endswith("_CMD"):
1553 key = key[:-4]
1554 key = key.lower()
1555 _section[key] = self._get_env_var_option(section, key, team_name=team_name)
1556
1557 for key, val in _section.items():
1558 if val is None:
1559 raise AirflowConfigException(
1560 f"Failed to convert value automatically. "
1561 f'Please check "{key}" key in "{section}" section is set.'
1562 )
1563 try:
1564 _section[key] = int(val)
1565 except ValueError:
1566 try:
1567 _section[key] = float(val)
1568 except ValueError:
1569 if isinstance(val, str) and val.lower() in ("t", "true"):
1570 _section[key] = True
1571 elif isinstance(val, str) and val.lower() in ("f", "false"):
1572 _section[key] = False
1573 return _section
1574
1575 @staticmethod
1576 def _write_section_header(
1577 file: IO[str],
1578 include_descriptions: bool,
1579 section_config_description: dict[str, str],
1580 section_to_write: str,
1581 ) -> None:
1582 """Write header for configuration section."""
1583 file.write(f"[{section_to_write}]\n")
1584 section_description = section_config_description.get("description")
1585 if section_description and include_descriptions:
1586 for line in section_description.splitlines():
1587 file.write(f"# {line}\n")
1588 file.write("\n")
1589
1590 def _write_value(
1591 self,
1592 file: IO[str],
1593 option: str,
1594 comment_out_everything: bool,
1595 needs_separation: bool,
1596 only_defaults: bool,
1597 section_to_write: str,
1598 ):
1599 default_value = self.get_default_value(section_to_write, option, raw=True)
1600 if only_defaults:
1601 value = default_value
1602 else:
1603 value = self.get(section_to_write, option, fallback=default_value, raw=True)
1604 if value is None:
1605 file.write(f"# {option} = \n")
1606 else:
1607 if comment_out_everything:
1608 value_lines = value.splitlines()
1609 value = "\n# ".join(value_lines)
1610 file.write(f"# {option} = {value}\n")
1611 else:
1612 if "\n" in value:
1613 try:
1614 value = json.dumps(json.loads(value), indent=4)
1615 value = value.replace(
1616 "\n", "\n "
1617 ) # indent multi-line JSON to satisfy configparser format
1618 except JSONDecodeError:
1619 pass
1620 file.write(f"{option} = {value}\n")
1621 if needs_separation:
1622 file.write("\n")
1623
1624 def write( # type: ignore[override]
1625 self,
1626 file: IO[str],
1627 section: str | None = None,
1628 include_examples: bool = True,
1629 include_descriptions: bool = True,
1630 include_sources: bool = True,
1631 include_env_vars: bool = True,
1632 include_providers: bool = True,
1633 comment_out_everything: bool = False,
1634 hide_sensitive_values: bool = False,
1635 extra_spacing: bool = True,
1636 only_defaults: bool = False,
1637 **kwargs: Any,
1638 ) -> None:
1639 """
1640 Write configuration with comments and examples to a file.
1641
1642 :param file: file to write to
1643 :param section: section of the config to write, defaults to all sections
1644 :param include_examples: Include examples in the output
1645 :param include_descriptions: Include descriptions in the output
1646 :param include_sources: Include the source of each config option
1647 :param include_env_vars: Include environment variables corresponding to each config option
1648 :param include_providers: Include providers configuration
1649 :param comment_out_everything: Comment out all values
1650 :param hide_sensitive_values: Include sensitive values in the output
1651 :param extra_spacing: Add extra spacing before examples and after variables
1652 :param only_defaults: Only include default values when writing the config, not the actual values
1653 """
1654 sources_dict = {}
1655 if include_sources:
1656 sources_dict = self.as_dict(display_source=True)
1657 with self.make_sure_configuration_loaded(with_providers=include_providers):
1658 for section_to_write in self.get_sections_including_defaults():
1659 section_config_description = self.configuration_description.get(section_to_write, {})
1660 if section_to_write != section and section is not None:
1661 continue
1662 if self._default_values.has_section(section_to_write) or self.has_section(section_to_write):
1663 self._write_section_header(
1664 file, include_descriptions, section_config_description, section_to_write
1665 )
1666 for option in self.get_options_including_defaults(section_to_write):
1667 should_continue, needs_separation = self._write_option_header(
1668 file=file,
1669 option=option,
1670 extra_spacing=extra_spacing,
1671 include_descriptions=include_descriptions,
1672 include_env_vars=include_env_vars,
1673 include_examples=include_examples,
1674 include_sources=include_sources,
1675 section_config_description=section_config_description,
1676 section_to_write=section_to_write,
1677 sources_dict=sources_dict,
1678 )
1679 self._write_value(
1680 file=file,
1681 option=option,
1682 comment_out_everything=comment_out_everything,
1683 needs_separation=needs_separation,
1684 only_defaults=only_defaults,
1685 section_to_write=section_to_write,
1686 )
1687 if include_descriptions and not needs_separation:
1688 # extra separation between sections in case last option did not need it
1689 file.write("\n")
1690
1691 @contextmanager
1692 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]:
1693 """
1694 Make sure configuration is loaded with or without providers.
1695
1696 This happens regardless if the provider configuration has been loaded before or not.
1697 Restores configuration to the state before entering the context.
1698
1699 :param with_providers: whether providers should be loaded
1700 """
1701 needs_reload = False
1702 if with_providers:
1703 self._ensure_providers_config_loaded()
1704 else:
1705 needs_reload = self._ensure_providers_config_unloaded()
1706 yield
1707 if needs_reload:
1708 self._reload_provider_configs()
1709
1710 def _ensure_providers_config_loaded(self) -> None:
1711 """Ensure providers configurations are loaded."""
1712 raise NotImplementedError("Subclasses must implement _ensure_providers_config_loaded method")
1713
1714 def _ensure_providers_config_unloaded(self) -> bool:
1715 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded."""
1716 raise NotImplementedError("Subclasses must implement _ensure_providers_config_unloaded method")
1717
1718 def _reload_provider_configs(self) -> None:
1719 """Reload providers configuration."""
1720 raise NotImplementedError("Subclasses must implement _reload_provider_configs method")