1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Base configuration parser with pure parsing logic."""
19
20from __future__ import annotations
21
22import contextlib
23import datetime
24import functools
25import itertools
26import json
27import logging
28import os
29import shlex
30import subprocess
31import sys
32import warnings
33from collections.abc import Callable, Generator, Iterable
34from configparser import ConfigParser, NoOptionError, NoSectionError
35from contextlib import contextmanager
36from enum import Enum
37from json.decoder import JSONDecodeError
38from re import Pattern
39from typing import IO, Any, TypeVar, overload
40
41from .exceptions import AirflowConfigException
42
43log = logging.getLogger(__name__)
44
45
46ConfigType = str | int | float | bool
47ConfigOptionsDictType = dict[str, ConfigType]
48ConfigSectionSourcesType = dict[str, str | tuple[str, str]]
49ConfigSourcesType = dict[str, ConfigSectionSourcesType]
50ENV_VAR_PREFIX = "AIRFLOW__"
51
52
53class ValueNotFound:
54 """Object of this is raised when a configuration value cannot be found."""
55
56 pass
57
58
59VALUE_NOT_FOUND_SENTINEL = ValueNotFound()
60
61
62@overload
63def expand_env_var(env_var: None) -> None: ...
64@overload
65def expand_env_var(env_var: str) -> str: ...
66
67
68def expand_env_var(env_var: str | None) -> str | None:
69 """
70 Expand (potentially nested) env vars.
71
72 Repeat and apply `expandvars` and `expanduser` until
73 interpolation stops having any effect.
74 """
75 if not env_var or not isinstance(env_var, str):
76 return env_var
77 while True:
78 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
79 if interpolated == env_var:
80 return interpolated
81 env_var = interpolated
82
83
84def run_command(command: str) -> str:
85 """Run command and returns stdout."""
86 process = subprocess.Popen(
87 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
88 )
89 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
90
91 if process.returncode != 0:
92 raise AirflowConfigException(
93 f"Cannot execute {command}. Error code is: {process.returncode}. "
94 f"Output: {output}, Stderr: {stderr}"
95 )
96
97 return output
98
99
100def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool:
101 """
102 Check if the config is a template.
103
104 :param configuration_description: description of configuration
105 :param section: section
106 :param key: key
107 :return: True if the config is a template
108 """
109 return configuration_description.get(section, {}).get(key, {}).get("is_template", False)
110
111
112def configure_parser_from_configuration_description(
113 parser: ConfigParser,
114 configuration_description: dict[str, dict[str, Any]],
115 all_vars: dict[str, Any],
116) -> None:
117 """
118 Configure a ConfigParser based on configuration description.
119
120 :param parser: ConfigParser to configure
121 :param configuration_description: configuration description from config.yml
122 """
123 for section, section_desc in configuration_description.items():
124 parser.add_section(section)
125 options = section_desc["options"]
126 for key in options:
127 default_value = options[key]["default"]
128 is_template = options[key].get("is_template", False)
129 if (default_value is not None) and not (
130 options[key].get("version_deprecated") or options[key].get("deprecation_reason")
131 ):
132 if is_template or not isinstance(default_value, str):
133 parser.set(section, key, str(default_value))
134 else:
135 try:
136 parser.set(section, key, default_value.format(**all_vars))
137 except (KeyError, ValueError):
138 parser.set(section, key, default_value)
139
140
141class AirflowConfigParser(ConfigParser):
142 """
143 Base configuration parser with pure parsing logic.
144
145 This class provides the core parsing methods that work with:
146 - configuration_description: dict describing config options (required in __init__)
147 - _default_values: ConfigParser with default values (required in __init__)
148 - deprecated_options: class attribute mapping new -> old options
149 - deprecated_sections: class attribute mapping new -> old sections
150 """
151
152 # A mapping of section -> setting -> { old, replace } for deprecated default values.
153 # Subclasses can override this to define deprecated values that should be upgraded.
154 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {}
155
156 # A mapping of (new section, new option) -> (old section, old option, since_version).
157 # When reading new option, the old option will be checked to see if it exists. If it does a
158 # DeprecationWarning will be issued and the old option will be used instead
159 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
160 ("dag_processor", "dag_file_processor_timeout"): ("core", "dag_file_processor_timeout", "3.0"),
161 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"),
162 ("api", "base_url"): ("webserver", "base_url", "3.0"),
163 ("api", "host"): ("webserver", "web_server_host", "3.0"),
164 ("api", "port"): ("webserver", "web_server_port", "3.0"),
165 ("api", "workers"): ("webserver", "workers", "3.0"),
166 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"),
167 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"),
168 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"),
169 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"),
170 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"),
171 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"),
172 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"),
173 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"),
174 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"),
175 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"),
176 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"),
177 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"),
178 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"),
179 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"),
180 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"),
181 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"),
182 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"),
183 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"),
184 ("core", "num_dag_runs_to_retain_rendered_fields"): (
185 "core",
186 "max_num_rendered_ti_fields_per_task",
187 "3.2.0",
188 ),
189 ("api", "page_size"): ("webserver", "page_size", "3.1.0"),
190 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"),
191 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"),
192 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"),
193 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"),
194 ("api", "log_config"): ("api", "access_logfile", "3.1.0"),
195 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"),
196 ("api", "fallback_page_limit"): ("api", "page_size", "3.2.0"),
197 }
198
199 # A mapping of new section -> (old section, since_version).
200 deprecated_sections: dict[str, tuple[str, str]] = {}
201
202 @property
203 def _lookup_sequence(self) -> list[Callable]:
204 """
205 Define the sequence of lookup methods for get(). The definition here does not have provider lookup.
206
207 Subclasses can override this to customise lookup order.
208 """
209 return [
210 self._get_environment_variables,
211 self._get_option_from_config_file,
212 self._get_option_from_commands,
213 self._get_option_from_secrets,
214 self._get_option_from_defaults,
215 ]
216
217 @property
218 def _validators(self) -> list[Callable[[], None]]:
219 """
220 Return list of validators defined on a config parser class. Base class will return an empty list.
221
222 Subclasses can override this to customize the validators that are run during validation on the
223 config parser instance.
224 """
225 return []
226
227 def validate(self) -> None:
228 """Run all registered validators."""
229 for validator in self._validators:
230 validator()
231 self.is_validated = True
232
233 def _validate_deprecated_values(self) -> None:
234 """Validate and upgrade deprecated default values."""
235 for section, replacement in self.deprecated_values.items():
236 for name, info in replacement.items():
237 old, new = info
238 current_value = self.get(section, name, fallback="")
239 if self._using_old_value(old, current_value):
240 self.upgraded_values[(section, name)] = current_value
241 new_value = old.sub(new, current_value)
242 self._update_env_var(section=section, name=name, new_value=new_value)
243 self._create_future_warning(
244 name=name,
245 section=section,
246 current_value=current_value,
247 new_value=new_value,
248 )
249
250 def _using_old_value(self, old: Pattern, current_value: str) -> bool:
251 """Check if current_value matches the old pattern."""
252 return old.search(current_value) is not None
253
254 def _update_env_var(self, section: str, name: str, new_value: str) -> None:
255 """Update environment variable with new value."""
256 env_var = self._env_var_name(section, name)
257 # Set it as an env var so that any subprocesses keep the same override!
258 os.environ[env_var] = new_value
259
260 @staticmethod
261 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None:
262 """Create a FutureWarning for deprecated default values."""
263 warnings.warn(
264 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. "
265 f"This value has been changed to {new_value!r} in the running config, but please update your config.",
266 FutureWarning,
267 stacklevel=3,
268 )
269
270 def __init__(
271 self,
272 configuration_description: dict[str, dict[str, Any]],
273 _default_values: ConfigParser,
274 *args,
275 **kwargs,
276 ):
277 """
278 Initialize the parser.
279
280 :param configuration_description: Description of configuration options
281 :param _default_values: ConfigParser with default values
282 """
283 super().__init__(*args, **kwargs)
284 self.configuration_description = configuration_description
285 self._default_values = _default_values
286 self._suppress_future_warnings = False
287 self.upgraded_values: dict[tuple[str, str], str] = {}
288
289 @functools.cached_property
290 def inversed_deprecated_options(self):
291 """Build inverse mapping from old options to new options."""
292 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()}
293
294 @functools.cached_property
295 def inversed_deprecated_sections(self):
296 """Build inverse mapping from old sections to new sections."""
297 return {
298 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items()
299 }
300
301 @functools.cached_property
302 def sensitive_config_values(self) -> set[tuple[str, str]]:
303 """Get set of sensitive config values that should be masked."""
304 flattened = {
305 (s, k): item
306 for s, s_c in self.configuration_description.items()
307 for k, item in s_c.get("options", {}).items()
308 }
309 sensitive = {
310 (section.lower(), key.lower())
311 for (section, key), v in flattened.items()
312 if v.get("sensitive") is True
313 }
314 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options}
315 depr_section = {
316 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections
317 }
318 sensitive.update(depr_section, depr_option)
319 return sensitive
320
321 def _update_defaults_from_string(self, config_string: str) -> None:
322 """
323 Update the defaults in _default_values based on values in config_string ("ini" format).
324
325 Override shared parser's method to add validation for template variables.
326 Note that those values are not validated and cannot contain variables because we are using
327 regular config parser to load them. This method is used to test the config parser in unit tests.
328
329 :param config_string: ini-formatted config string
330 """
331 parser = ConfigParser()
332 parser.read_string(config_string)
333 for section in parser.sections():
334 if section not in self._default_values.sections():
335 self._default_values.add_section(section)
336 errors = False
337 for key, value in parser.items(section):
338 if not self.is_template(section, key) and "{" in value:
339 errors = True
340 log.error(
341 "The %s.%s value %s read from string contains variable. This is not supported",
342 section,
343 key,
344 value,
345 )
346 self._default_values.set(section, key, value)
347 if errors:
348 raise AirflowConfigException(
349 f"The string config passed as default contains variables. "
350 f"This is not supported. String config: {config_string}"
351 )
352
353 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any:
354 """
355 Retrieve default value from default config parser.
356
357 This will retrieve the default value from the default config parser. Optionally a raw, stored
358 value can be retrieved by setting skip_interpolation to True. This is useful for example when
359 we want to write the default value to a file, and we don't want the interpolation to happen
360 as it is going to be done later when the config is read.
361
362 :param section: section of the config
363 :param key: key to use
364 :param fallback: fallback value to use
365 :param raw: if raw, then interpolation will be reversed
366 :param kwargs: other args
367 :return:
368 """
369 value = self._default_values.get(section, key, fallback=fallback, **kwargs)
370 if raw and value is not None:
371 return value.replace("%", "%%")
372 return value
373
374 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None:
375 """
376 Get Secret Backend if defined in airflow.cfg.
377
378 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
379 """
380 section = "workers" if worker_mode else "secrets"
381 key = "secrets_backend" if worker_mode else "backend"
382 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs"
383
384 secrets_backend_cls = self.getimport(section=section, key=key)
385
386 if not secrets_backend_cls:
387 if worker_mode:
388 # if we find no secrets backend for worker, return that of secrets backend
389 secrets_backend_cls = self.getimport(section="secrets", key="backend")
390 if not secrets_backend_cls:
391 return None
392 # When falling back to secrets backend, use its kwargs
393 kwargs_key = "backend_kwargs"
394 section = "secrets"
395 else:
396 return None
397
398 try:
399 backend_kwargs = self.getjson(section=section, key=kwargs_key)
400 if not backend_kwargs:
401 backend_kwargs = {}
402 elif not isinstance(backend_kwargs, dict):
403 raise ValueError("not a dict")
404 except AirflowConfigException:
405 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key)
406 backend_kwargs = {}
407 except ValueError:
408 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key)
409 backend_kwargs = {}
410
411 return secrets_backend_cls(**backend_kwargs)
412
413 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
414 """
415 Get Config option values from Secret Backend.
416
417 Called by the shared parser's _get_secret_option() method as part of the lookup chain.
418 Uses _get_custom_secret_backend() to get the backend instance.
419
420 :param config_key: the config key to retrieve
421 :return: config value or None
422 """
423 try:
424 secrets_client = self._get_custom_secret_backend()
425 if not secrets_client:
426 return None
427 return secrets_client.get_config(config_key)
428 except Exception as e:
429 raise AirflowConfigException(
430 "Cannot retrieve config from alternative secrets backend. "
431 "Make sure it is configured properly and that the Backend "
432 "is accessible.\n"
433 f"{e}"
434 )
435
436 def _get_cmd_option_from_config_sources(
437 self, config_sources: ConfigSourcesType, section: str, key: str
438 ) -> str | None:
439 fallback_key = key + "_cmd"
440 if (section, key) in self.sensitive_config_values:
441 section_dict = config_sources.get(section)
442 if section_dict is not None:
443 command_value = section_dict.get(fallback_key)
444 if command_value is not None:
445 if isinstance(command_value, str):
446 command = command_value
447 else:
448 command = command_value[0]
449 return run_command(command)
450 return None
451
452 def _get_secret_option_from_config_sources(
453 self, config_sources: ConfigSourcesType, section: str, key: str
454 ) -> str | None:
455 fallback_key = key + "_secret"
456 if (section, key) in self.sensitive_config_values:
457 section_dict = config_sources.get(section)
458 if section_dict is not None:
459 secrets_path_value = section_dict.get(fallback_key)
460 if secrets_path_value is not None:
461 if isinstance(secrets_path_value, str):
462 secrets_path = secrets_path_value
463 else:
464 secrets_path = secrets_path_value[0]
465 return self._get_config_value_from_secret_backend(secrets_path)
466 return None
467
468 def _include_secrets(
469 self,
470 config_sources: ConfigSourcesType,
471 display_sensitive: bool,
472 display_source: bool,
473 raw: bool,
474 ):
475 for section, key in self.sensitive_config_values:
476 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key)
477 if value:
478 if not display_sensitive:
479 value = "< hidden >"
480 if display_source:
481 opt: str | tuple[str, str] = (value, "secret")
482 elif raw:
483 opt = value.replace("%", "%%")
484 else:
485 opt = value
486 config_sources.setdefault(section, {}).update({key: opt})
487 del config_sources[section][key + "_secret"]
488
489 def _include_commands(
490 self,
491 config_sources: ConfigSourcesType,
492 display_sensitive: bool,
493 display_source: bool,
494 raw: bool,
495 ):
496 for section, key in self.sensitive_config_values:
497 opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
498 if not opt:
499 continue
500 opt_to_set: str | tuple[str, str] | None = opt
501 if not display_sensitive:
502 opt_to_set = "< hidden >"
503 if display_source:
504 opt_to_set = (str(opt_to_set), "cmd")
505 elif raw:
506 opt_to_set = str(opt_to_set).replace("%", "%%")
507 if opt_to_set is not None:
508 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set}
509 config_sources.setdefault(section, {}).update(dict_to_update)
510 del config_sources[section][key + "_cmd"]
511
512 def _include_envs(
513 self,
514 config_sources: ConfigSourcesType,
515 display_sensitive: bool,
516 display_source: bool,
517 raw: bool,
518 ):
519 for env_var in [
520 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
521 ]:
522 try:
523 _, section, key = env_var.split("__", 2)
524 opt = self._get_env_var_option(section, key)
525 except ValueError:
526 continue
527 if opt is None:
528 log.warning("Ignoring unknown env var '%s'", env_var)
529 continue
530 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"):
531 # Don't hide cmd/secret values here
532 if not env_var.lower().endswith(("cmd", "secret")):
533 if (section, key) in self.sensitive_config_values:
534 opt = "< hidden >"
535 elif raw:
536 opt = opt.replace("%", "%%")
537 if display_source:
538 opt = (opt, "env var")
539
540 section = section.lower()
541 key = key.lower()
542 config_sources.setdefault(section, {}).update({key: opt})
543
544 def _filter_by_source(
545 self,
546 config_sources: ConfigSourcesType,
547 display_source: bool,
548 getter_func,
549 ):
550 """
551 Delete default configs from current configuration.
552
553 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values.
554
555 This is necessary because bare configs take precedence over the command
556 or secret key equivalents so if the current running config is
557 materialized with Airflow defaults they in turn override user set
558 command or secret key configs.
559
560 :param config_sources: The current configuration to operate on
561 :param display_source: If False, configuration options contain raw
562 values. If True, options are a tuple of (option_value, source).
563 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
564 :param getter_func: A callback function that gets the user configured
565 override value for a particular sensitive_config_values config.
566 :return: None, the given config_sources is filtered if necessary,
567 otherwise untouched.
568 """
569 for section, key in self.sensitive_config_values:
570 # Don't bother if we don't have section / key
571 if section not in config_sources or key not in config_sources[section]:
572 continue
573 # Check that there is something to override defaults
574 try:
575 getter_opt = getter_func(section, key)
576 except ValueError:
577 continue
578 if not getter_opt:
579 continue
580 # Check to see that there is a default value
581 if self.get_default_value(section, key) is None:
582 continue
583 # Check to see if bare setting is the same as defaults
584 if display_source:
585 # when display_source = true, we know that the config_sources contains tuple
586 opt, source = config_sources[section][key] # type: ignore
587 else:
588 opt = config_sources[section][key]
589 if opt == self.get_default_value(section, key):
590 del config_sources[section][key]
591
592 @staticmethod
593 def _deprecated_value_is_set_in_config(
594 deprecated_section: str,
595 deprecated_key: str,
596 configs: Iterable[tuple[str, ConfigParser]],
597 ) -> bool:
598 for config_type, config in configs:
599 if config_type != "default":
600 with contextlib.suppress(NoSectionError):
601 deprecated_section_array = config.items(section=deprecated_section, raw=True)
602 if any(key == deprecated_key for key, _ in deprecated_section_array):
603 return True
604 return False
605
606 @staticmethod
607 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
608 return (
609 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}")
610 is not None
611 )
612
613 @staticmethod
614 def _deprecated_command_is_set_in_config(
615 deprecated_section: str,
616 deprecated_key: str,
617 configs: Iterable[tuple[str, ConfigParser]],
618 ) -> bool:
619 return AirflowConfigParser._deprecated_value_is_set_in_config(
620 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
621 )
622
623 @staticmethod
624 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
625 return (
626 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD")
627 is not None
628 )
629
630 @staticmethod
631 def _deprecated_secret_is_set_in_config(
632 deprecated_section: str,
633 deprecated_key: str,
634 configs: Iterable[tuple[str, ConfigParser]],
635 ) -> bool:
636 return AirflowConfigParser._deprecated_value_is_set_in_config(
637 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
638 )
639
640 @staticmethod
641 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
642 return (
643 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET")
644 is not None
645 )
646
647 @staticmethod
648 def _replace_config_with_display_sources(
649 config_sources: ConfigSourcesType,
650 configs: Iterable[tuple[str, ConfigParser]],
651 configuration_description: dict[str, dict[str, Any]],
652 display_source: bool,
653 raw: bool,
654 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
655 include_env: bool,
656 include_cmds: bool,
657 include_secret: bool,
658 ):
659 for source_name, config in configs:
660 sections = config.sections()
661 for section in sections:
662 AirflowConfigParser._replace_section_config_with_display_sources(
663 config,
664 config_sources,
665 configuration_description,
666 display_source,
667 raw,
668 section,
669 source_name,
670 deprecated_options,
671 configs,
672 include_env=include_env,
673 include_cmds=include_cmds,
674 include_secret=include_secret,
675 )
676
677 @staticmethod
678 def _replace_section_config_with_display_sources(
679 config: ConfigParser,
680 config_sources: ConfigSourcesType,
681 configuration_description: dict[str, dict[str, Any]],
682 display_source: bool,
683 raw: bool,
684 section: str,
685 source_name: str,
686 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
687 configs: Iterable[tuple[str, ConfigParser]],
688 include_env: bool,
689 include_cmds: bool,
690 include_secret: bool,
691 ):
692 sect = config_sources.setdefault(section, {})
693 if isinstance(config, AirflowConfigParser):
694 with config.suppress_future_warnings():
695 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw)
696 else:
697 items = config.items(section=section, raw=raw)
698 for k, val in items:
699 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
700 if deprecated_section and deprecated_key:
701 if source_name == "default":
702 # If deprecated entry has some non-default value set for any of the sources requested,
703 # We should NOT set default for the new entry (because it will override anything
704 # coming from the deprecated ones)
705 if AirflowConfigParser._deprecated_value_is_set_in_config(
706 deprecated_section, deprecated_key, configs
707 ):
708 continue
709 if include_env and AirflowConfigParser._deprecated_variable_is_set(
710 deprecated_section, deprecated_key
711 ):
712 continue
713 if include_cmds and (
714 AirflowConfigParser._deprecated_variable_command_is_set(
715 deprecated_section, deprecated_key
716 )
717 or AirflowConfigParser._deprecated_command_is_set_in_config(
718 deprecated_section, deprecated_key, configs
719 )
720 ):
721 continue
722 if include_secret and (
723 AirflowConfigParser._deprecated_variable_secret_is_set(
724 deprecated_section, deprecated_key
725 )
726 or AirflowConfigParser._deprecated_secret_is_set_in_config(
727 deprecated_section, deprecated_key, configs
728 )
729 ):
730 continue
731 if display_source:
732 updated_source_name = source_name
733 if source_name == "default":
734 # defaults can come from other sources (default-<PROVIDER>) that should be used here
735 source_description_section = configuration_description.get(section, {})
736 source_description_key = source_description_section.get("options", {}).get(k, {})
737 if source_description_key is not None:
738 updated_source_name = source_description_key.get("source", source_name)
739 sect[k] = (val, updated_source_name)
740 else:
741 sect[k] = val
742
743 def _warn_deprecate(
744 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int
745 ):
746 """Warn about deprecated config option usage."""
747 if section == deprecated_section:
748 warnings.warn(
749 f"The {deprecated_name} option in [{section}] has been renamed to {key} - "
750 f"the old setting has been used, but please update your config.",
751 DeprecationWarning,
752 stacklevel=4 + extra_stacklevel,
753 )
754 else:
755 warnings.warn(
756 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option "
757 f"in [{section}] - the old setting has been used, but please update your config.",
758 DeprecationWarning,
759 stacklevel=4 + extra_stacklevel,
760 )
761
762 @contextmanager
763 def suppress_future_warnings(self):
764 """
765 Context manager to temporarily suppress future warnings.
766
767 This is a stub used by the shared parser's lookup methods when checking deprecated options.
768 Subclasses can override this to customize warning suppression behavior.
769
770 :return: context manager that suppresses future warnings
771 """
772 suppress_future_warnings = self._suppress_future_warnings
773 self._suppress_future_warnings = True
774 yield self
775 self._suppress_future_warnings = suppress_future_warnings
776
777 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str:
778 """Generate environment variable name for a config option."""
779 team_component: str = f"{team_name.upper()}___" if team_name else ""
780 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}"
781
782 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None):
783 """Get config option from environment variable."""
784 env_var: str = self._env_var_name(section, key, team_name=team_name)
785 if env_var in os.environ:
786 return expand_env_var(os.environ[env_var])
787 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
788 env_var_cmd = env_var + "_CMD"
789 if env_var_cmd in os.environ:
790 # if this is a valid command key...
791 if (section, key) in self.sensitive_config_values:
792 return run_command(os.environ[env_var_cmd])
793 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
794 env_var_secret_path = env_var + "_SECRET"
795 if env_var_secret_path in os.environ:
796 # if this is a valid secret path...
797 if (section, key) in self.sensitive_config_values:
798 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path])
799 return None
800
801 def _get_cmd_option(self, section: str, key: str):
802 """Get config option from command execution."""
803 fallback_key = key + "_cmd"
804 if (section, key) in self.sensitive_config_values:
805 if super().has_option(section, fallback_key):
806 command = super().get(section, fallback_key)
807 try:
808 cmd_output = run_command(command)
809 except AirflowConfigException as e:
810 raise e
811 except Exception as e:
812 raise AirflowConfigException(
813 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd."
814 f" Please check the {fallback_key} value."
815 ) from e
816 return cmd_output
817 return None
818
819 def _get_secret_option(self, section: str, key: str) -> str | None:
820 """Get Config option values from Secret Backend."""
821 fallback_key = key + "_secret"
822 if (section, key) in self.sensitive_config_values:
823 if super().has_option(section, fallback_key):
824 secrets_path = super().get(section, fallback_key)
825 return self._get_config_value_from_secret_backend(secrets_path)
826 return None
827
828 def _get_environment_variables(
829 self,
830 deprecated_key: str | None,
831 deprecated_section: str | None,
832 key: str,
833 section: str,
834 issue_warning: bool = True,
835 extra_stacklevel: int = 0,
836 **kwargs,
837 ) -> str | ValueNotFound:
838 """Get config option from environment variables."""
839 team_name = kwargs.get("team_name", None)
840 option = self._get_env_var_option(section, key, team_name=team_name)
841 if option is not None:
842 return option
843 if deprecated_section and deprecated_key:
844 with self.suppress_future_warnings():
845 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name)
846 if option is not None:
847 if issue_warning:
848 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
849 return option
850 return VALUE_NOT_FOUND_SENTINEL
851
852 def _get_option_from_config_file(
853 self,
854 deprecated_key: str | None,
855 deprecated_section: str | None,
856 key: str,
857 section: str,
858 issue_warning: bool = True,
859 extra_stacklevel: int = 0,
860 **kwargs,
861 ) -> str | ValueNotFound:
862 """Get config option from config file."""
863 if team_name := kwargs.get("team_name", None):
864 section = f"{team_name}={section}"
865 # since this is the last lookup that supports team_name, pop it
866 kwargs.pop("team_name")
867 if super().has_option(section, key):
868 return expand_env_var(super().get(section, key, **kwargs))
869 if deprecated_section and deprecated_key:
870 if super().has_option(deprecated_section, deprecated_key):
871 if issue_warning:
872 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
873 with self.suppress_future_warnings():
874 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
875 return VALUE_NOT_FOUND_SENTINEL
876
877 def _get_option_from_commands(
878 self,
879 deprecated_key: str | None,
880 deprecated_section: str | None,
881 key: str,
882 section: str,
883 issue_warning: bool = True,
884 extra_stacklevel: int = 0,
885 **kwargs,
886 ) -> str | ValueNotFound:
887 """Get config option from command execution."""
888 option = self._get_cmd_option(section, key)
889 if option:
890 return option
891 if deprecated_section and deprecated_key:
892 with self.suppress_future_warnings():
893 option = self._get_cmd_option(deprecated_section, deprecated_key)
894 if option:
895 if issue_warning:
896 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
897 return option
898 return VALUE_NOT_FOUND_SENTINEL
899
900 def _get_option_from_secrets(
901 self,
902 deprecated_key: str | None,
903 deprecated_section: str | None,
904 key: str,
905 section: str,
906 issue_warning: bool = True,
907 extra_stacklevel: int = 0,
908 **kwargs,
909 ) -> str | ValueNotFound:
910 """Get config option from secrets backend."""
911 option = self._get_secret_option(section, key)
912 if option:
913 return option
914 if deprecated_section and deprecated_key:
915 with self.suppress_future_warnings():
916 option = self._get_secret_option(deprecated_section, deprecated_key)
917 if option:
918 if issue_warning:
919 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
920 return option
921 return VALUE_NOT_FOUND_SENTINEL
922
923 def _get_option_from_defaults(
924 self,
925 deprecated_key: str | None,
926 deprecated_section: str | None,
927 key: str,
928 section: str,
929 issue_warning: bool = True,
930 extra_stacklevel: int = 0,
931 team_name: str | None = None,
932 **kwargs,
933 ) -> str | ValueNotFound:
934 """Get config option from default values."""
935 if self.get_default_value(section, key) is not None or "fallback" in kwargs:
936 return expand_env_var(self.get_default_value(section, key, **kwargs))
937 return VALUE_NOT_FOUND_SENTINEL
938
939 def _resolve_deprecated_lookup(
940 self,
941 section: str,
942 key: str,
943 lookup_from_deprecated: bool,
944 extra_stacklevel: int = 0,
945 ) -> tuple[str, str, str | None, str | None, bool]:
946 """
947 Resolve deprecated section/key mappings and determine deprecated values.
948
949 :param section: Section name (will be lowercased)
950 :param key: Key name (will be lowercased)
951 :param lookup_from_deprecated: Whether to lookup from deprecated options
952 :param extra_stacklevel: Extra stack level for warnings
953 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted)
954 """
955 section = section.lower()
956 key = key.lower()
957 warning_emitted = False
958 deprecated_section: str | None = None
959 deprecated_key: str | None = None
960
961 if not lookup_from_deprecated:
962 return section, key, deprecated_section, deprecated_key, warning_emitted
963
964 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {})
965 if option_description.get("deprecated"):
966 deprecation_reason = option_description.get("deprecation_reason", "")
967 warnings.warn(
968 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}",
969 DeprecationWarning,
970 stacklevel=2 + extra_stacklevel,
971 )
972 # For the cases in which we rename whole sections
973 if section in self.inversed_deprecated_sections:
974 deprecated_section, deprecated_key = (section, key)
975 section = self.inversed_deprecated_sections[section]
976 if not self._suppress_future_warnings:
977 warnings.warn(
978 f"The config section [{deprecated_section}] has been renamed to "
979 f"[{section}]. Please update your `conf.get*` call to use the new name",
980 FutureWarning,
981 stacklevel=2 + extra_stacklevel,
982 )
983 # Don't warn about individual rename if the whole section is renamed
984 warning_emitted = True
985 elif (section, key) in self.inversed_deprecated_options:
986 # Handle using deprecated section/key instead of the new section/key
987 new_section, new_key = self.inversed_deprecated_options[(section, key)]
988 if not self._suppress_future_warnings and not warning_emitted:
989 warnings.warn(
990 f"section/key [{section}/{key}] has been deprecated, you should use"
991 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
992 "new name",
993 FutureWarning,
994 stacklevel=2 + extra_stacklevel,
995 )
996 warning_emitted = True
997 deprecated_section, deprecated_key = section, key
998 section, key = (new_section, new_key)
999 elif section in self.deprecated_sections:
1000 # When accessing the new section name, make sure we check under the old config name
1001 deprecated_key = key
1002 deprecated_section = self.deprecated_sections[section][0]
1003 else:
1004 deprecated_section, deprecated_key, _ = self.deprecated_options.get(
1005 (section, key), (None, None, None)
1006 )
1007
1008 return section, key, deprecated_section, deprecated_key, warning_emitted
1009
1010 @overload # type: ignore[override]
1011 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ...
1012
1013 @overload # type: ignore[override]
1014 def get(self, section: str, key: str, **kwargs) -> str | None: ...
1015
1016 def get( # type: ignore[misc, override]
1017 self,
1018 section: str,
1019 key: str,
1020 suppress_warnings: bool = False,
1021 lookup_from_deprecated: bool = True,
1022 _extra_stacklevel: int = 0,
1023 team_name: str | None = None,
1024 **kwargs,
1025 ) -> str | None:
1026 """
1027 Get config value by iterating through lookup sequence.
1028
1029 Priority order is defined by _lookup_sequence property.
1030 """
1031 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup(
1032 section=section,
1033 key=key,
1034 lookup_from_deprecated=lookup_from_deprecated,
1035 extra_stacklevel=_extra_stacklevel,
1036 )
1037
1038 if team_name is not None:
1039 kwargs["team_name"] = team_name
1040
1041 for lookup_method in self._lookup_sequence:
1042 value = lookup_method(
1043 deprecated_key=deprecated_key,
1044 deprecated_section=deprecated_section,
1045 key=key,
1046 section=section,
1047 issue_warning=not warning_emitted,
1048 extra_stacklevel=_extra_stacklevel,
1049 **kwargs,
1050 )
1051 if value is not VALUE_NOT_FOUND_SENTINEL:
1052 return value
1053
1054 # Check if fallback was explicitly provided (even if None)
1055 if "fallback" in kwargs:
1056 return kwargs["fallback"]
1057
1058 if not suppress_warnings:
1059 log.warning("section/key [%s/%s] not found in config", section, key)
1060
1061 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
1062
1063 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override]
1064 """Get config value as boolean."""
1065 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip()
1066 if "#" in val:
1067 val = val.split("#")[0].strip()
1068 if val in ("t", "true", "1"):
1069 return True
1070 if val in ("f", "false", "0"):
1071 return False
1072 raise AirflowConfigException(
1073 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. '
1074 f'Current value: "{val}".'
1075 )
1076
1077 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override]
1078 """Get config value as integer."""
1079 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1080 if val is None:
1081 raise AirflowConfigException(
1082 f"Failed to convert value None to int. "
1083 f'Please check "{key}" key in "{section}" section is set.'
1084 )
1085 try:
1086 return int(val)
1087 except ValueError:
1088 try:
1089 if (float_val := float(val)) != (int_val := int(float_val)):
1090 raise ValueError
1091 return int_val
1092 except (ValueError, OverflowError):
1093 raise AirflowConfigException(
1094 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1095 f'Current value: "{val}".'
1096 )
1097
1098 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override]
1099 """Get config value as float."""
1100 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1101 if val is None:
1102 raise AirflowConfigException(
1103 f"Failed to convert value None to float. "
1104 f'Please check "{key}" key in "{section}" section is set.'
1105 )
1106 try:
1107 return float(val)
1108 except ValueError:
1109 raise AirflowConfigException(
1110 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. '
1111 f'Current value: "{val}".'
1112 )
1113
1114 def getlist(self, section: str, key: str, delimiter=",", **kwargs):
1115 """Get config value as list."""
1116 val = self.get(section, key, **kwargs)
1117
1118 if isinstance(val, list) or val is None:
1119 # `get` will always return a (possibly-empty) string, so the only way we can
1120 # have these types is with `fallback=` was specified. So just return it.
1121 return val
1122
1123 if val == "":
1124 return []
1125
1126 try:
1127 return [item.strip() for item in val.split(delimiter)]
1128 except Exception:
1129 raise AirflowConfigException(
1130 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. '
1131 f'Current value: "{val}".'
1132 )
1133
1134 E = TypeVar("E", bound=Enum)
1135
1136 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E:
1137 """Get config value as enum."""
1138 val = self.get(section, key, **kwargs)
1139 enum_names = [enum_item.name for enum_item in enum_class]
1140
1141 if val is None:
1142 raise AirflowConfigException(
1143 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1144 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}'
1145 )
1146
1147 try:
1148 return enum_class[val]
1149 except KeyError:
1150 if "fallback" in kwargs and kwargs["fallback"] in enum_names:
1151 return enum_class[kwargs["fallback"]]
1152 raise AirflowConfigException(
1153 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1154 f"the value must be one of {', '.join(enum_names)}"
1155 )
1156
1157 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]:
1158 """Get config value as list of enums."""
1159 kwargs.setdefault("fallback", [])
1160 string_list = self.getlist(section, key, delimiter, **kwargs)
1161
1162 enum_names = [enum_item.name for enum_item in enum_class]
1163 enum_list = []
1164
1165 for val in string_list:
1166 try:
1167 enum_list.append(enum_class[val])
1168 except KeyError:
1169 log.warning(
1170 "Failed to convert value %r. Please check %s key in %s section. "
1171 "it must be one of %s, if not the value is ignored",
1172 val,
1173 key,
1174 section,
1175 ", ".join(enum_names),
1176 )
1177
1178 return enum_list
1179
1180 def getimport(self, section: str, key: str, **kwargs) -> Any:
1181 """
1182 Read options, import the full qualified name, and return the object.
1183
1184 In case of failure, it throws an exception with the key and section names
1185
1186 :return: The object or None, if the option is empty
1187 """
1188 # Fixed: use self.get() instead of conf.get()
1189 full_qualified_path = self.get(section=section, key=key, **kwargs)
1190 if not full_qualified_path:
1191 return None
1192
1193 try:
1194 # Import here to avoid circular dependency
1195 from ..module_loading import import_string
1196
1197 return import_string(full_qualified_path)
1198 except ImportError as e:
1199 log.warning(e)
1200 raise AirflowConfigException(
1201 f'The object could not be loaded. Please check "{key}" key in "{section}" section. '
1202 f'Current value: "{full_qualified_path}".'
1203 )
1204
1205 def getjson(
1206 self, section: str, key: str, fallback=None, **kwargs
1207 ) -> dict | list | str | int | float | None:
1208 """
1209 Return a config value parsed from a JSON string.
1210
1211 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given.
1212 """
1213 try:
1214 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs)
1215 except (NoSectionError, NoOptionError):
1216 data = None
1217
1218 if data is None or data == "":
1219 return fallback
1220
1221 try:
1222 return json.loads(data)
1223 except JSONDecodeError as e:
1224 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e
1225
1226 def gettimedelta(
1227 self, section: str, key: str, fallback: Any = None, **kwargs
1228 ) -> datetime.timedelta | None:
1229 """
1230 Get the config value for the given section and key, and convert it into datetime.timedelta object.
1231
1232 If the key is missing, then it is considered as `None`.
1233
1234 :param section: the section from the config
1235 :param key: the key defined in the given section
1236 :param fallback: fallback value when no config value is given, defaults to None
1237 :raises AirflowConfigException: raised because ValueError or OverflowError
1238 :return: datetime.timedelta(seconds=<config_value>) or None
1239 """
1240 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs)
1241
1242 if val:
1243 # the given value must be convertible to integer
1244 try:
1245 int_val = int(val)
1246 except ValueError:
1247 raise AirflowConfigException(
1248 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1249 f'Current value: "{val}".'
1250 )
1251
1252 try:
1253 return datetime.timedelta(seconds=int_val)
1254 except OverflowError as err:
1255 raise AirflowConfigException(
1256 f"Failed to convert value to timedelta in `seconds`. "
1257 f"{err}. "
1258 f'Please check "{key}" key in "{section}" section. Current value: "{val}".'
1259 )
1260
1261 return fallback
1262
1263 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
1264 """Get mandatory config value, raising ValueError if not found."""
1265 value = self.get(section, key, _extra_stacklevel=1, **kwargs)
1266 if value is None:
1267 raise ValueError(f"The value {section}/{key} should be set!")
1268 return value
1269
1270 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]:
1271 """Get mandatory config value as list, raising ValueError if not found."""
1272 value = self.getlist(section, key, **kwargs)
1273 if value is None:
1274 raise ValueError(f"The value {section}/{key} should be set!")
1275 return value
1276
1277 def read(
1278 self,
1279 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike],
1280 encoding: str | None = None,
1281 ) -> list[str]:
1282 return super().read(filenames=filenames, encoding=encoding)
1283
1284 def read_dict( # type: ignore[override]
1285 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>"
1286 ) -> None:
1287 """
1288 We define a different signature here to add better type hints and checking.
1289
1290 :param dictionary: dictionary to read from
1291 :param source: source to be used to store the configuration
1292 :return:
1293 """
1294 super().read_dict(dictionary=dictionary, source=source)
1295
1296 def get_sections_including_defaults(self) -> list[str]:
1297 """
1298 Retrieve all sections from the configuration parser, including sections defined by built-in defaults.
1299
1300 :return: list of section names
1301 """
1302 sections_from_config = self.sections()
1303 sections_from_description = list(self.configuration_description.keys())
1304 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config)))
1305
1306 def get_options_including_defaults(self, section: str) -> list[str]:
1307 """
1308 Retrieve all possible options from the configuration parser for the section given.
1309
1310 Includes options defined by built-in defaults.
1311
1312 :param section: section name
1313 :return: list of option names for the section given
1314 """
1315 my_own_options = self.options(section) if self.has_section(section) else []
1316 all_options_from_defaults = list(
1317 self.configuration_description.get(section, {}).get("options", {}).keys()
1318 )
1319 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options)))
1320
1321 def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True, **kwargs) -> bool:
1322 """
1323 Check if option is defined.
1324
1325 Uses self.get() to avoid reimplementing the priority order of config variables
1326 (env, config, cmd, defaults).
1327
1328 :param section: section to get option from
1329 :param option: option to get
1330 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections
1331 :param kwargs: additional keyword arguments to pass to get(), such as team_name
1332 :return:
1333 """
1334 try:
1335 value = self.get(
1336 section,
1337 option,
1338 fallback=None,
1339 _extra_stacklevel=1,
1340 suppress_warnings=True,
1341 lookup_from_deprecated=lookup_from_deprecated,
1342 **kwargs,
1343 )
1344 if value is None:
1345 return False
1346 return True
1347 except (NoOptionError, NoSectionError, AirflowConfigException):
1348 return False
1349
1350 def set(self, section: str, option: str, value: str | None = None) -> None:
1351 """
1352 Set an option to the given value.
1353
1354 This override just makes sure the section and option are lower case, to match what we do in `get`.
1355 """
1356 section = section.lower()
1357 option = option.lower()
1358 defaults = self.configuration_description or {}
1359 if not self.has_section(section) and section in defaults:
1360 # Trying to set a key in a section that exists in default, but not in the user config;
1361 # automatically create it
1362 self.add_section(section)
1363 super().set(section, option, value)
1364
1365 def remove_option(self, section: str, option: str, remove_default: bool = True):
1366 """
1367 Remove an option if it exists in config from a file or default config.
1368
1369 If both of config have the same option, this removes the option
1370 in both configs unless remove_default=False.
1371 """
1372 section = section.lower()
1373 option = option.lower()
1374 if super().has_option(section, option):
1375 super().remove_option(section, option)
1376
1377 if self.get_default_value(section, option) is not None and remove_default:
1378 self._default_values.remove_option(section, option)
1379
1380 def optionxform(self, optionstr: str) -> str:
1381 """
1382 Transform option names on every read, get, or set operation.
1383
1384 This changes from the default behaviour of ConfigParser from lower-casing
1385 to instead be case-preserving.
1386
1387 :param optionstr:
1388 :return:
1389 """
1390 return optionstr
1391
1392 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]:
1393 """
1394 Get list of config sources to use in as_dict().
1395
1396 Subclasses can override to add additional sources (e.g., provider configs).
1397 """
1398 return [
1399 ("default", self._default_values),
1400 ("airflow.cfg", self),
1401 ]
1402
1403 def as_dict(
1404 self,
1405 display_source: bool = False,
1406 display_sensitive: bool = False,
1407 raw: bool = False,
1408 include_env: bool = True,
1409 include_cmds: bool = True,
1410 include_secret: bool = True,
1411 ) -> ConfigSourcesType:
1412 """
1413 Return the current configuration as an OrderedDict of OrderedDicts.
1414
1415 When materializing current configuration Airflow defaults are
1416 materialized along with user set configs. If any of the `include_*`
1417 options are False then the result of calling command or secret key
1418 configs do not override Airflow defaults and instead are passed through.
1419 In order to then avoid Airflow defaults from overwriting user set
1420 command or secret key configs we filter out bare sensitive_config_values
1421 that are set to Airflow defaults when command or secret key configs
1422 produce different values.
1423
1424 :param display_source: If False, the option value is returned. If True,
1425 a tuple of (option_value, source) is returned. Source is either
1426 'airflow.cfg', 'default', 'env var', or 'cmd'.
1427 :param display_sensitive: If True, the values of options set by env
1428 vars and bash commands will be displayed. If False, those options
1429 are shown as '< hidden >'
1430 :param raw: Should the values be output as interpolated values, or the
1431 "raw" form that can be fed back in to ConfigParser
1432 :param include_env: Should the value of configuration from AIRFLOW__
1433 environment variables be included or not
1434 :param include_cmds: Should the result of calling any ``*_cmd`` config be
1435 set (True, default), or should the _cmd options be left as the
1436 command to run (False)
1437 :param include_secret: Should the result of calling any ``*_secret`` config be
1438 set (True, default), or should the _secret options be left as the
1439 path to get the secret from (False)
1440 :return: Dictionary, where the key is the name of the section and the content is
1441 the dictionary with the name of the parameter and its value.
1442 """
1443 if not display_sensitive:
1444 # We want to hide the sensitive values at the appropriate methods
1445 # since envs from cmds, secrets can be read at _include_envs method
1446 if not all([include_env, include_cmds, include_secret]):
1447 raise ValueError(
1448 "If display_sensitive is false, then include_env, "
1449 "include_cmds, include_secret must all be set as True"
1450 )
1451
1452 config_sources: ConfigSourcesType = {}
1453
1454 # We check sequentially all those sources and the last one we saw it in will "win"
1455 configs = self._get_config_sources_for_as_dict()
1456
1457 self._replace_config_with_display_sources(
1458 config_sources,
1459 configs,
1460 self.configuration_description,
1461 display_source,
1462 raw,
1463 self.deprecated_options,
1464 include_cmds=include_cmds,
1465 include_env=include_env,
1466 include_secret=include_secret,
1467 )
1468
1469 # add env vars and overwrite because they have priority
1470 if include_env:
1471 self._include_envs(config_sources, display_sensitive, display_source, raw)
1472 else:
1473 self._filter_by_source(config_sources, display_source, self._get_env_var_option)
1474
1475 # add bash commands
1476 if include_cmds:
1477 self._include_commands(config_sources, display_sensitive, display_source, raw)
1478 else:
1479 self._filter_by_source(config_sources, display_source, self._get_cmd_option)
1480
1481 # add config from secret backends
1482 if include_secret:
1483 self._include_secrets(config_sources, display_sensitive, display_source, raw)
1484 else:
1485 self._filter_by_source(config_sources, display_source, self._get_secret_option)
1486
1487 if not display_sensitive:
1488 # This ensures the ones from config file is hidden too
1489 # if they are not provided through env, cmd and secret
1490 hidden = "< hidden >"
1491 for section, key in self.sensitive_config_values:
1492 if config_sources.get(section):
1493 if config_sources[section].get(key, None):
1494 if display_source:
1495 source = config_sources[section][key][1]
1496 config_sources[section][key] = (hidden, source)
1497 else:
1498 config_sources[section][key] = hidden
1499
1500 return config_sources
1501
1502 def _write_option_header(
1503 self,
1504 file: IO[str],
1505 option: str,
1506 extra_spacing: bool,
1507 include_descriptions: bool,
1508 include_env_vars: bool,
1509 include_examples: bool,
1510 include_sources: bool,
1511 section_config_description: dict[str, dict[str, Any]],
1512 section_to_write: str,
1513 sources_dict: ConfigSourcesType,
1514 ) -> tuple[bool, bool]:
1515 """
1516 Write header for configuration option.
1517
1518 Returns tuple of (should_continue, needs_separation) where needs_separation should be
1519 set if the option needs additional separation to visually separate it from the next option.
1520 """
1521 option_config_description = (
1522 section_config_description.get("options", {}).get(option, {})
1523 if section_config_description
1524 else {}
1525 )
1526 description = option_config_description.get("description")
1527 needs_separation = False
1528 if description and include_descriptions:
1529 for line in description.splitlines():
1530 file.write(f"# {line}\n")
1531 needs_separation = True
1532 example = option_config_description.get("example")
1533 if example is not None and include_examples:
1534 if extra_spacing:
1535 file.write("#\n")
1536 example_lines = example.splitlines()
1537 example = "\n# ".join(example_lines)
1538 file.write(f"# Example: {option} = {example}\n")
1539 needs_separation = True
1540 if include_sources and sources_dict:
1541 sources_section = sources_dict.get(section_to_write)
1542 value_with_source = sources_section.get(option) if sources_section else None
1543 if value_with_source is None:
1544 file.write("#\n# Source: not defined\n")
1545 else:
1546 file.write(f"#\n# Source: {value_with_source[1]}\n")
1547 needs_separation = True
1548 if include_env_vars:
1549 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n")
1550 if extra_spacing:
1551 file.write("#\n")
1552 needs_separation = True
1553 return True, needs_separation
1554
1555 def is_template(self, section: str, key) -> bool:
1556 """
1557 Return whether the value is templated.
1558
1559 :param section: section of the config
1560 :param key: key in the section
1561 :return: True if the value is templated
1562 """
1563 return _is_template(self.configuration_description, section, key)
1564
1565 def getsection(self, section: str, team_name: str | None = None) -> ConfigOptionsDictType | None:
1566 """
1567 Return the section as a dict.
1568
1569 Values are converted to int, float, bool as required.
1570
1571 :param section: section from the config
1572 :param team_name: optional team name for team-specific configuration lookup
1573 """
1574 # Handle team-specific section lookup for config file
1575 config_section = f"{team_name}={section}" if team_name else section
1576
1577 if not self.has_section(config_section) and not self._default_values.has_section(config_section):
1578 return None
1579 if self._default_values.has_section(config_section):
1580 _section: ConfigOptionsDictType = dict(self._default_values.items(config_section))
1581 else:
1582 _section = {}
1583
1584 if self.has_section(config_section):
1585 _section.update(self.items(config_section))
1586
1587 # Use section (not config_section) for env var lookup - team_name is handled by _env_var_name
1588 section_prefix = self._env_var_name(section, "", team_name=team_name)
1589 for env_var in sorted(os.environ.keys()):
1590 if env_var.startswith(section_prefix):
1591 key = env_var.replace(section_prefix, "")
1592 if key.endswith("_CMD"):
1593 key = key[:-4]
1594 key = key.lower()
1595 _section[key] = self._get_env_var_option(section, key, team_name=team_name)
1596
1597 for key, val in _section.items():
1598 if val is None:
1599 raise AirflowConfigException(
1600 f"Failed to convert value automatically. "
1601 f'Please check "{key}" key in "{section}" section is set.'
1602 )
1603 try:
1604 _section[key] = int(val)
1605 except ValueError:
1606 try:
1607 _section[key] = float(val)
1608 except ValueError:
1609 if isinstance(val, str) and val.lower() in ("t", "true"):
1610 _section[key] = True
1611 elif isinstance(val, str) and val.lower() in ("f", "false"):
1612 _section[key] = False
1613 return _section
1614
1615 @staticmethod
1616 def _write_section_header(
1617 file: IO[str],
1618 include_descriptions: bool,
1619 section_config_description: dict[str, str],
1620 section_to_write: str,
1621 ) -> None:
1622 """Write header for configuration section."""
1623 file.write(f"[{section_to_write}]\n")
1624 section_description = section_config_description.get("description")
1625 if section_description and include_descriptions:
1626 for line in section_description.splitlines():
1627 file.write(f"# {line}\n")
1628 file.write("\n")
1629
1630 def _write_value(
1631 self,
1632 file: IO[str],
1633 option: str,
1634 comment_out_everything: bool,
1635 needs_separation: bool,
1636 only_defaults: bool,
1637 section_to_write: str,
1638 hide_sensitive: bool,
1639 is_sensitive: bool,
1640 show_values: bool = False,
1641 ):
1642 default_value = self.get_default_value(section_to_write, option, raw=True)
1643 if only_defaults:
1644 value = default_value
1645 else:
1646 value = self.get(section_to_write, option, fallback=default_value, raw=True)
1647 if not show_values:
1648 file.write(f"# {option} = \n")
1649 else:
1650 if hide_sensitive and is_sensitive:
1651 value = "< hidden >"
1652 else:
1653 pass
1654 if value is None:
1655 file.write(f"# {option} = \n")
1656 else:
1657 if comment_out_everything:
1658 value_lines = value.splitlines()
1659 value = "\n# ".join(value_lines)
1660 file.write(f"# {option} = {value}\n")
1661 else:
1662 if "\n" in value:
1663 try:
1664 value = json.dumps(json.loads(value), indent=4)
1665 value = value.replace(
1666 "\n", "\n "
1667 ) # indent multi-line JSON to satisfy configparser format
1668 except JSONDecodeError:
1669 pass
1670 file.write(f"{option} = {value}\n")
1671 if needs_separation:
1672 file.write("\n")
1673
1674 def write( # type: ignore[override]
1675 self,
1676 file: IO[str],
1677 section: str | None = None,
1678 include_examples: bool = True,
1679 include_descriptions: bool = True,
1680 include_sources: bool = True,
1681 include_env_vars: bool = True,
1682 include_providers: bool = True,
1683 comment_out_everything: bool = False,
1684 hide_sensitive: bool = False,
1685 extra_spacing: bool = True,
1686 only_defaults: bool = False,
1687 show_values: bool = False,
1688 **kwargs: Any,
1689 ) -> None:
1690 """
1691 Write configuration with comments and examples to a file.
1692
1693 :param file: file to write to
1694 :param section: section of the config to write, defaults to all sections
1695 :param include_examples: Include examples in the output
1696 :param include_descriptions: Include descriptions in the output
1697 :param include_sources: Include the source of each config option
1698 :param include_env_vars: Include environment variables corresponding to each config option
1699 :param include_providers: Include providers configuration
1700 :param comment_out_everything: Comment out all values
1701 :param hide_sensitive_values: Include sensitive values in the output
1702 :param extra_spacing: Add extra spacing before examples and after variables
1703 :param only_defaults: Only include default values when writing the config, not the actual values
1704 """
1705 sources_dict = {}
1706 if include_sources:
1707 sources_dict = self.as_dict(display_source=True)
1708 with self.make_sure_configuration_loaded(with_providers=include_providers):
1709 for section_to_write in self.get_sections_including_defaults():
1710 section_config_description = self.configuration_description.get(section_to_write, {})
1711 if section_to_write != section and section is not None:
1712 continue
1713 if self._default_values.has_section(section_to_write) or self.has_section(section_to_write):
1714 self._write_section_header(
1715 file, include_descriptions, section_config_description, section_to_write
1716 )
1717 for option in self.get_options_including_defaults(section_to_write):
1718 should_continue, needs_separation = self._write_option_header(
1719 file=file,
1720 option=option,
1721 extra_spacing=extra_spacing,
1722 include_descriptions=include_descriptions,
1723 include_env_vars=include_env_vars,
1724 include_examples=include_examples,
1725 include_sources=include_sources,
1726 section_config_description=section_config_description,
1727 section_to_write=section_to_write,
1728 sources_dict=sources_dict,
1729 )
1730 is_sensitive = (
1731 section_to_write.lower(),
1732 option.lower(),
1733 ) in self.sensitive_config_values
1734 self._write_value(
1735 file=file,
1736 option=option,
1737 comment_out_everything=comment_out_everything,
1738 needs_separation=needs_separation,
1739 only_defaults=only_defaults,
1740 section_to_write=section_to_write,
1741 hide_sensitive=hide_sensitive,
1742 is_sensitive=is_sensitive,
1743 show_values=show_values,
1744 )
1745 if include_descriptions and not needs_separation:
1746 # extra separation between sections in case last option did not need it
1747 file.write("\n")
1748
1749 @contextmanager
1750 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]:
1751 """
1752 Make sure configuration is loaded with or without providers.
1753
1754 This happens regardless if the provider configuration has been loaded before or not.
1755 Restores configuration to the state before entering the context.
1756
1757 :param with_providers: whether providers should be loaded
1758 """
1759 needs_reload = False
1760 if with_providers:
1761 self._ensure_providers_config_loaded()
1762 else:
1763 needs_reload = self._ensure_providers_config_unloaded()
1764 yield
1765 if needs_reload:
1766 self._reload_provider_configs()
1767
1768 def _ensure_providers_config_loaded(self) -> None:
1769 """Ensure providers configurations are loaded."""
1770 raise NotImplementedError("Subclasses must implement _ensure_providers_config_loaded method")
1771
1772 def _ensure_providers_config_unloaded(self) -> bool:
1773 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded."""
1774 raise NotImplementedError("Subclasses must implement _ensure_providers_config_unloaded method")
1775
1776 def _reload_provider_configs(self) -> None:
1777 """Reload providers configuration."""
1778 raise NotImplementedError("Subclasses must implement _reload_provider_configs method")