1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Base configuration parser with pure parsing logic."""
19
20from __future__ import annotations
21
22import contextlib
23import datetime
24import functools
25import itertools
26import json
27import logging
28import os
29import shlex
30import subprocess
31import sys
32import warnings
33from collections.abc import Callable, Generator, Iterable
34from configparser import ConfigParser, NoOptionError, NoSectionError
35from contextlib import contextmanager
36from enum import Enum
37from json.decoder import JSONDecodeError
38from re import Pattern
39from typing import IO, Any, TypeVar, overload
40
41from .exceptions import AirflowConfigException
42
43log = logging.getLogger(__name__)
44
45
46ConfigType = str | int | float | bool
47ConfigOptionsDictType = dict[str, ConfigType]
48ConfigSectionSourcesType = dict[str, str | tuple[str, str]]
49ConfigSourcesType = dict[str, ConfigSectionSourcesType]
50ENV_VAR_PREFIX = "AIRFLOW__"
51
52
53class ValueNotFound:
54 """Object of this is raised when a configuration value cannot be found."""
55
56 pass
57
58
59VALUE_NOT_FOUND_SENTINEL = ValueNotFound()
60
61
62@overload
63def expand_env_var(env_var: None) -> None: ...
64@overload
65def expand_env_var(env_var: str) -> str: ...
66
67
68def expand_env_var(env_var: str | None) -> str | None:
69 """
70 Expand (potentially nested) env vars.
71
72 Repeat and apply `expandvars` and `expanduser` until
73 interpolation stops having any effect.
74 """
75 if not env_var or not isinstance(env_var, str):
76 return env_var
77 while True:
78 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
79 if interpolated == env_var:
80 return interpolated
81 env_var = interpolated
82
83
84def run_command(command: str) -> str:
85 """Run command and returns stdout."""
86 process = subprocess.Popen(
87 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
88 )
89 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
90
91 if process.returncode != 0:
92 raise AirflowConfigException(
93 f"Cannot execute {command}. Error code is: {process.returncode}. "
94 f"Output: {output}, Stderr: {stderr}"
95 )
96
97 return output
98
99
100def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool:
101 """
102 Check if the config is a template.
103
104 :param configuration_description: description of configuration
105 :param section: section
106 :param key: key
107 :return: True if the config is a template
108 """
109 return configuration_description.get(section, {}).get(key, {}).get("is_template", False)
110
111
112class AirflowConfigParser(ConfigParser):
113 """
114 Base configuration parser with pure parsing logic.
115
116 This class provides the core parsing methods that work with:
117 - configuration_description: dict describing config options (required in __init__)
118 - _default_values: ConfigParser with default values (required in __init__)
119 - deprecated_options: class attribute mapping new -> old options
120 - deprecated_sections: class attribute mapping new -> old sections
121 """
122
123 # A mapping of section -> setting -> { old, replace } for deprecated default values.
124 # Subclasses can override this to define deprecated values that should be upgraded.
125 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {}
126
127 # A mapping of (new section, new option) -> (old section, old option, since_version).
128 # When reading new option, the old option will be checked to see if it exists. If it does a
129 # DeprecationWarning will be issued and the old option will be used instead
130 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
131 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"),
132 ("api", "host"): ("webserver", "web_server_host", "3.0"),
133 ("api", "port"): ("webserver", "web_server_port", "3.0"),
134 ("api", "workers"): ("webserver", "workers", "3.0"),
135 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"),
136 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"),
137 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"),
138 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"),
139 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"),
140 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"),
141 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"),
142 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"),
143 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"),
144 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"),
145 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"),
146 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"),
147 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"),
148 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"),
149 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"),
150 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"),
151 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"),
152 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"),
153 ("api", "page_size"): ("webserver", "page_size", "3.1.0"),
154 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"),
155 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"),
156 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"),
157 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"),
158 ("api", "log_config"): ("api", "access_logfile", "3.1.0"),
159 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"),
160 }
161
162 # A mapping of new section -> (old section, since_version).
163 deprecated_sections: dict[str, tuple[str, str]] = {}
164
165 @property
166 def _lookup_sequence(self) -> list[Callable]:
167 """
168 Define the sequence of lookup methods for get(). The definition here does not have provider lookup.
169
170 Subclasses can override this to customise lookup order.
171 """
172 return [
173 self._get_environment_variables,
174 self._get_option_from_config_file,
175 self._get_option_from_commands,
176 self._get_option_from_secrets,
177 self._get_option_from_defaults,
178 ]
179
180 @property
181 def _validators(self) -> list[Callable[[], None]]:
182 """
183 Return list of validators defined on a config parser class. Base class will return an empty list.
184
185 Subclasses can override this to customize the validators that are run during validation on the
186 config parser instance.
187 """
188 return []
189
190 def validate(self) -> None:
191 """Run all registered validators."""
192 for validator in self._validators:
193 validator()
194 self.is_validated = True
195
196 def _validate_deprecated_values(self) -> None:
197 """Validate and upgrade deprecated default values."""
198 for section, replacement in self.deprecated_values.items():
199 for name, info in replacement.items():
200 old, new = info
201 current_value = self.get(section, name, fallback="")
202 if self._using_old_value(old, current_value):
203 self.upgraded_values[(section, name)] = current_value
204 new_value = old.sub(new, current_value)
205 self._update_env_var(section=section, name=name, new_value=new_value)
206 self._create_future_warning(
207 name=name,
208 section=section,
209 current_value=current_value,
210 new_value=new_value,
211 )
212
213 def _using_old_value(self, old: Pattern, current_value: str) -> bool:
214 """Check if current_value matches the old pattern."""
215 return old.search(current_value) is not None
216
217 def _update_env_var(self, section: str, name: str, new_value: str) -> None:
218 """Update environment variable with new value."""
219 env_var = self._env_var_name(section, name)
220 # Set it as an env var so that any subprocesses keep the same override!
221 os.environ[env_var] = new_value
222
223 @staticmethod
224 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None:
225 """Create a FutureWarning for deprecated default values."""
226 warnings.warn(
227 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. "
228 f"This value has been changed to {new_value!r} in the running config, but please update your config.",
229 FutureWarning,
230 stacklevel=3,
231 )
232
233 def __init__(
234 self,
235 configuration_description: dict[str, dict[str, Any]],
236 _default_values: ConfigParser,
237 *args,
238 **kwargs,
239 ):
240 """
241 Initialize the parser.
242
243 :param configuration_description: Description of configuration options
244 :param _default_values: ConfigParser with default values
245 """
246 super().__init__(*args, **kwargs)
247 self.configuration_description = configuration_description
248 self._default_values = _default_values
249 self._suppress_future_warnings = False
250 self.upgraded_values = {}
251
252 @functools.cached_property
253 def inversed_deprecated_options(self):
254 """Build inverse mapping from old options to new options."""
255 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()}
256
257 @functools.cached_property
258 def inversed_deprecated_sections(self):
259 """Build inverse mapping from old sections to new sections."""
260 return {
261 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items()
262 }
263
264 @functools.cached_property
265 def sensitive_config_values(self) -> set[tuple[str, str]]:
266 """Get set of sensitive config values that should be masked."""
267 flattened = {
268 (s, k): item
269 for s, s_c in self.configuration_description.items()
270 for k, item in s_c.get("options", {}).items()
271 }
272 sensitive = {
273 (section.lower(), key.lower())
274 for (section, key), v in flattened.items()
275 if v.get("sensitive") is True
276 }
277 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options}
278 depr_section = {
279 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections
280 }
281 sensitive.update(depr_section, depr_option)
282 return sensitive
283
284 @overload # type: ignore[override]
285 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ...
286
287 @overload
288 def get(self, section: str, key: str, **kwargs) -> str | None: ...
289
290 def _update_defaults_from_string(self, config_string: str) -> None:
291 """
292 Update the defaults in _default_values based on values in config_string ("ini" format).
293
294 Override shared parser's method to add validation for template variables.
295 Note that those values are not validated and cannot contain variables because we are using
296 regular config parser to load them. This method is used to test the config parser in unit tests.
297
298 :param config_string: ini-formatted config string
299 """
300 parser = ConfigParser()
301 parser.read_string(config_string)
302 for section in parser.sections():
303 if section not in self._default_values.sections():
304 self._default_values.add_section(section)
305 errors = False
306 for key, value in parser.items(section):
307 if not self.is_template(section, key) and "{" in value:
308 errors = True
309 log.error(
310 "The %s.%s value %s read from string contains variable. This is not supported",
311 section,
312 key,
313 value,
314 )
315 self._default_values.set(section, key, value)
316 if errors:
317 raise AirflowConfigException(
318 f"The string config passed as default contains variables. "
319 f"This is not supported. String config: {config_string}"
320 )
321
322 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any:
323 """
324 Retrieve default value from default config parser.
325
326 This will retrieve the default value from the default config parser. Optionally a raw, stored
327 value can be retrieved by setting skip_interpolation to True. This is useful for example when
328 we want to write the default value to a file, and we don't want the interpolation to happen
329 as it is going to be done later when the config is read.
330
331 :param section: section of the config
332 :param key: key to use
333 :param fallback: fallback value to use
334 :param raw: if raw, then interpolation will be reversed
335 :param kwargs: other args
336 :return:
337 """
338 value = self._default_values.get(section, key, fallback=fallback, **kwargs)
339 if raw and value is not None:
340 return value.replace("%", "%%")
341 return value
342
343 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None:
344 """
345 Get Secret Backend if defined in airflow.cfg.
346
347 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
348 """
349 section = "workers" if worker_mode else "secrets"
350 key = "secrets_backend" if worker_mode else "backend"
351 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs"
352
353 secrets_backend_cls = self.getimport(section=section, key=key)
354
355 if not secrets_backend_cls:
356 if worker_mode:
357 # if we find no secrets backend for worker, return that of secrets backend
358 secrets_backend_cls = self.getimport(section="secrets", key="backend")
359 if not secrets_backend_cls:
360 return None
361 # When falling back to secrets backend, use its kwargs
362 kwargs_key = "backend_kwargs"
363 section = "secrets"
364 else:
365 return None
366
367 try:
368 backend_kwargs = self.getjson(section=section, key=kwargs_key)
369 if not backend_kwargs:
370 backend_kwargs = {}
371 elif not isinstance(backend_kwargs, dict):
372 raise ValueError("not a dict")
373 except AirflowConfigException:
374 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key)
375 backend_kwargs = {}
376 except ValueError:
377 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key)
378 backend_kwargs = {}
379
380 return secrets_backend_cls(**backend_kwargs)
381
382 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
383 """
384 Get Config option values from Secret Backend.
385
386 Called by the shared parser's _get_secret_option() method as part of the lookup chain.
387 Uses _get_custom_secret_backend() to get the backend instance.
388
389 :param config_key: the config key to retrieve
390 :return: config value or None
391 """
392 try:
393 secrets_client = self._get_custom_secret_backend()
394 if not secrets_client:
395 return None
396 return secrets_client.get_config(config_key)
397 except Exception as e:
398 raise AirflowConfigException(
399 "Cannot retrieve config from alternative secrets backend. "
400 "Make sure it is configured properly and that the Backend "
401 "is accessible.\n"
402 f"{e}"
403 )
404
405 def _get_cmd_option_from_config_sources(
406 self, config_sources: ConfigSourcesType, section: str, key: str
407 ) -> str | None:
408 fallback_key = key + "_cmd"
409 if (section, key) in self.sensitive_config_values:
410 section_dict = config_sources.get(section)
411 if section_dict is not None:
412 command_value = section_dict.get(fallback_key)
413 if command_value is not None:
414 if isinstance(command_value, str):
415 command = command_value
416 else:
417 command = command_value[0]
418 return run_command(command)
419 return None
420
421 def _get_secret_option_from_config_sources(
422 self, config_sources: ConfigSourcesType, section: str, key: str
423 ) -> str | None:
424 fallback_key = key + "_secret"
425 if (section, key) in self.sensitive_config_values:
426 section_dict = config_sources.get(section)
427 if section_dict is not None:
428 secrets_path_value = section_dict.get(fallback_key)
429 if secrets_path_value is not None:
430 if isinstance(secrets_path_value, str):
431 secrets_path = secrets_path_value
432 else:
433 secrets_path = secrets_path_value[0]
434 return self._get_config_value_from_secret_backend(secrets_path)
435 return None
436
437 def _include_secrets(
438 self,
439 config_sources: ConfigSourcesType,
440 display_sensitive: bool,
441 display_source: bool,
442 raw: bool,
443 ):
444 for section, key in self.sensitive_config_values:
445 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key)
446 if value:
447 if not display_sensitive:
448 value = "< hidden >"
449 if display_source:
450 opt: str | tuple[str, str] = (value, "secret")
451 elif raw:
452 opt = value.replace("%", "%%")
453 else:
454 opt = value
455 config_sources.setdefault(section, {}).update({key: opt})
456 del config_sources[section][key + "_secret"]
457
458 def _include_commands(
459 self,
460 config_sources: ConfigSourcesType,
461 display_sensitive: bool,
462 display_source: bool,
463 raw: bool,
464 ):
465 for section, key in self.sensitive_config_values:
466 opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
467 if not opt:
468 continue
469 opt_to_set: str | tuple[str, str] | None = opt
470 if not display_sensitive:
471 opt_to_set = "< hidden >"
472 if display_source:
473 opt_to_set = (str(opt_to_set), "cmd")
474 elif raw:
475 opt_to_set = str(opt_to_set).replace("%", "%%")
476 if opt_to_set is not None:
477 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set}
478 config_sources.setdefault(section, {}).update(dict_to_update)
479 del config_sources[section][key + "_cmd"]
480
481 def _include_envs(
482 self,
483 config_sources: ConfigSourcesType,
484 display_sensitive: bool,
485 display_source: bool,
486 raw: bool,
487 ):
488 for env_var in [
489 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
490 ]:
491 try:
492 _, section, key = env_var.split("__", 2)
493 opt = self._get_env_var_option(section, key)
494 except ValueError:
495 continue
496 if opt is None:
497 log.warning("Ignoring unknown env var '%s'", env_var)
498 continue
499 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"):
500 # Don't hide cmd/secret values here
501 if not env_var.lower().endswith(("cmd", "secret")):
502 if (section, key) in self.sensitive_config_values:
503 opt = "< hidden >"
504 elif raw:
505 opt = opt.replace("%", "%%")
506 if display_source:
507 opt = (opt, "env var")
508
509 section = section.lower()
510 key = key.lower()
511 config_sources.setdefault(section, {}).update({key: opt})
512
513 def _filter_by_source(
514 self,
515 config_sources: ConfigSourcesType,
516 display_source: bool,
517 getter_func,
518 ):
519 """
520 Delete default configs from current configuration.
521
522 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values.
523
524 This is necessary because bare configs take precedence over the command
525 or secret key equivalents so if the current running config is
526 materialized with Airflow defaults they in turn override user set
527 command or secret key configs.
528
529 :param config_sources: The current configuration to operate on
530 :param display_source: If False, configuration options contain raw
531 values. If True, options are a tuple of (option_value, source).
532 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
533 :param getter_func: A callback function that gets the user configured
534 override value for a particular sensitive_config_values config.
535 :return: None, the given config_sources is filtered if necessary,
536 otherwise untouched.
537 """
538 for section, key in self.sensitive_config_values:
539 # Don't bother if we don't have section / key
540 if section not in config_sources or key not in config_sources[section]:
541 continue
542 # Check that there is something to override defaults
543 try:
544 getter_opt = getter_func(section, key)
545 except ValueError:
546 continue
547 if not getter_opt:
548 continue
549 # Check to see that there is a default value
550 if self.get_default_value(section, key) is None:
551 continue
552 # Check to see if bare setting is the same as defaults
553 if display_source:
554 # when display_source = true, we know that the config_sources contains tuple
555 opt, source = config_sources[section][key] # type: ignore
556 else:
557 opt = config_sources[section][key]
558 if opt == self.get_default_value(section, key):
559 del config_sources[section][key]
560
561 @staticmethod
562 def _deprecated_value_is_set_in_config(
563 deprecated_section: str,
564 deprecated_key: str,
565 configs: Iterable[tuple[str, ConfigParser]],
566 ) -> bool:
567 for config_type, config in configs:
568 if config_type != "default":
569 with contextlib.suppress(NoSectionError):
570 deprecated_section_array = config.items(section=deprecated_section, raw=True)
571 if any(key == deprecated_key for key, _ in deprecated_section_array):
572 return True
573 return False
574
575 @staticmethod
576 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
577 return (
578 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}")
579 is not None
580 )
581
582 @staticmethod
583 def _deprecated_command_is_set_in_config(
584 deprecated_section: str,
585 deprecated_key: str,
586 configs: Iterable[tuple[str, ConfigParser]],
587 ) -> bool:
588 return AirflowConfigParser._deprecated_value_is_set_in_config(
589 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
590 )
591
592 @staticmethod
593 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
594 return (
595 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD")
596 is not None
597 )
598
599 @staticmethod
600 def _deprecated_secret_is_set_in_config(
601 deprecated_section: str,
602 deprecated_key: str,
603 configs: Iterable[tuple[str, ConfigParser]],
604 ) -> bool:
605 return AirflowConfigParser._deprecated_value_is_set_in_config(
606 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
607 )
608
609 @staticmethod
610 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
611 return (
612 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET")
613 is not None
614 )
615
616 @staticmethod
617 def _replace_config_with_display_sources(
618 config_sources: ConfigSourcesType,
619 configs: Iterable[tuple[str, ConfigParser]],
620 configuration_description: dict[str, dict[str, Any]],
621 display_source: bool,
622 raw: bool,
623 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
624 include_env: bool,
625 include_cmds: bool,
626 include_secret: bool,
627 ):
628 for source_name, config in configs:
629 sections = config.sections()
630 for section in sections:
631 AirflowConfigParser._replace_section_config_with_display_sources(
632 config,
633 config_sources,
634 configuration_description,
635 display_source,
636 raw,
637 section,
638 source_name,
639 deprecated_options,
640 configs,
641 include_env=include_env,
642 include_cmds=include_cmds,
643 include_secret=include_secret,
644 )
645
646 @staticmethod
647 def _replace_section_config_with_display_sources(
648 config: ConfigParser,
649 config_sources: ConfigSourcesType,
650 configuration_description: dict[str, dict[str, Any]],
651 display_source: bool,
652 raw: bool,
653 section: str,
654 source_name: str,
655 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
656 configs: Iterable[tuple[str, ConfigParser]],
657 include_env: bool,
658 include_cmds: bool,
659 include_secret: bool,
660 ):
661 sect = config_sources.setdefault(section, {})
662 if isinstance(config, AirflowConfigParser):
663 with config.suppress_future_warnings():
664 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw)
665 else:
666 items = config.items(section=section, raw=raw)
667 for k, val in items:
668 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
669 if deprecated_section and deprecated_key:
670 if source_name == "default":
671 # If deprecated entry has some non-default value set for any of the sources requested,
672 # We should NOT set default for the new entry (because it will override anything
673 # coming from the deprecated ones)
674 if AirflowConfigParser._deprecated_value_is_set_in_config(
675 deprecated_section, deprecated_key, configs
676 ):
677 continue
678 if include_env and AirflowConfigParser._deprecated_variable_is_set(
679 deprecated_section, deprecated_key
680 ):
681 continue
682 if include_cmds and (
683 AirflowConfigParser._deprecated_variable_command_is_set(
684 deprecated_section, deprecated_key
685 )
686 or AirflowConfigParser._deprecated_command_is_set_in_config(
687 deprecated_section, deprecated_key, configs
688 )
689 ):
690 continue
691 if include_secret and (
692 AirflowConfigParser._deprecated_variable_secret_is_set(
693 deprecated_section, deprecated_key
694 )
695 or AirflowConfigParser._deprecated_secret_is_set_in_config(
696 deprecated_section, deprecated_key, configs
697 )
698 ):
699 continue
700 if display_source:
701 updated_source_name = source_name
702 if source_name == "default":
703 # defaults can come from other sources (default-<PROVIDER>) that should be used here
704 source_description_section = configuration_description.get(section, {})
705 source_description_key = source_description_section.get("options", {}).get(k, {})
706 if source_description_key is not None:
707 updated_source_name = source_description_key.get("source", source_name)
708 sect[k] = (val, updated_source_name)
709 else:
710 sect[k] = val
711
712 def _warn_deprecate(
713 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int
714 ):
715 """Warn about deprecated config option usage."""
716 if section == deprecated_section:
717 warnings.warn(
718 f"The {deprecated_name} option in [{section}] has been renamed to {key} - "
719 f"the old setting has been used, but please update your config.",
720 DeprecationWarning,
721 stacklevel=4 + extra_stacklevel,
722 )
723 else:
724 warnings.warn(
725 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option "
726 f"in [{section}] - the old setting has been used, but please update your config.",
727 DeprecationWarning,
728 stacklevel=4 + extra_stacklevel,
729 )
730
731 @contextmanager
732 def suppress_future_warnings(self):
733 """
734 Context manager to temporarily suppress future warnings.
735
736 This is a stub used by the shared parser's lookup methods when checking deprecated options.
737 Subclasses can override this to customize warning suppression behavior.
738
739 :return: context manager that suppresses future warnings
740 """
741 suppress_future_warnings = self._suppress_future_warnings
742 self._suppress_future_warnings = True
743 yield self
744 self._suppress_future_warnings = suppress_future_warnings
745
746 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str:
747 """Generate environment variable name for a config option."""
748 team_component: str = f"{team_name.upper()}___" if team_name else ""
749 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}"
750
751 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None):
752 """Get config option from environment variable."""
753 env_var: str = self._env_var_name(section, key, team_name=team_name)
754 if env_var in os.environ:
755 return expand_env_var(os.environ[env_var])
756 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
757 env_var_cmd = env_var + "_CMD"
758 if env_var_cmd in os.environ:
759 # if this is a valid command key...
760 if (section, key) in self.sensitive_config_values:
761 return run_command(os.environ[env_var_cmd])
762 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
763 env_var_secret_path = env_var + "_SECRET"
764 if env_var_secret_path in os.environ:
765 # if this is a valid secret path...
766 if (section, key) in self.sensitive_config_values:
767 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path])
768 return None
769
770 def _get_cmd_option(self, section: str, key: str):
771 """Get config option from command execution."""
772 fallback_key = key + "_cmd"
773 if (section, key) in self.sensitive_config_values:
774 if super().has_option(section, fallback_key):
775 command = super().get(section, fallback_key)
776 try:
777 cmd_output = run_command(command)
778 except AirflowConfigException as e:
779 raise e
780 except Exception as e:
781 raise AirflowConfigException(
782 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd."
783 f" Please check the {fallback_key} value."
784 ) from e
785 return cmd_output
786 return None
787
788 def _get_secret_option(self, section: str, key: str) -> str | None:
789 """Get Config option values from Secret Backend."""
790 fallback_key = key + "_secret"
791 if (section, key) in self.sensitive_config_values:
792 if super().has_option(section, fallback_key):
793 secrets_path = super().get(section, fallback_key)
794 return self._get_config_value_from_secret_backend(secrets_path)
795 return None
796
797 def _get_environment_variables(
798 self,
799 deprecated_key: str | None,
800 deprecated_section: str | None,
801 key: str,
802 section: str,
803 issue_warning: bool = True,
804 extra_stacklevel: int = 0,
805 **kwargs,
806 ) -> str | ValueNotFound:
807 """Get config option from environment variables."""
808 team_name = kwargs.get("team_name", None)
809 option = self._get_env_var_option(section, key, team_name=team_name)
810 if option is not None:
811 return option
812 if deprecated_section and deprecated_key:
813 with self.suppress_future_warnings():
814 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name)
815 if option is not None:
816 if issue_warning:
817 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
818 return option
819 return VALUE_NOT_FOUND_SENTINEL
820
821 def _get_option_from_config_file(
822 self,
823 deprecated_key: str | None,
824 deprecated_section: str | None,
825 key: str,
826 section: str,
827 issue_warning: bool = True,
828 extra_stacklevel: int = 0,
829 **kwargs,
830 ) -> str | ValueNotFound:
831 """Get config option from config file."""
832 if team_name := kwargs.get("team_name", None):
833 section = f"{team_name}={section}"
834 # since this is the last lookup that supports team_name, pop it
835 kwargs.pop("team_name")
836 if super().has_option(section, key):
837 return expand_env_var(super().get(section, key, **kwargs))
838 if deprecated_section and deprecated_key:
839 if super().has_option(deprecated_section, deprecated_key):
840 if issue_warning:
841 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
842 with self.suppress_future_warnings():
843 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
844 return VALUE_NOT_FOUND_SENTINEL
845
846 def _get_option_from_commands(
847 self,
848 deprecated_key: str | None,
849 deprecated_section: str | None,
850 key: str,
851 section: str,
852 issue_warning: bool = True,
853 extra_stacklevel: int = 0,
854 **kwargs,
855 ) -> str | ValueNotFound:
856 """Get config option from command execution."""
857 option = self._get_cmd_option(section, key)
858 if option:
859 return option
860 if deprecated_section and deprecated_key:
861 with self.suppress_future_warnings():
862 option = self._get_cmd_option(deprecated_section, deprecated_key)
863 if option:
864 if issue_warning:
865 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
866 return option
867 return VALUE_NOT_FOUND_SENTINEL
868
869 def _get_option_from_secrets(
870 self,
871 deprecated_key: str | None,
872 deprecated_section: str | None,
873 key: str,
874 section: str,
875 issue_warning: bool = True,
876 extra_stacklevel: int = 0,
877 **kwargs,
878 ) -> str | ValueNotFound:
879 """Get config option from secrets backend."""
880 option = self._get_secret_option(section, key)
881 if option:
882 return option
883 if deprecated_section and deprecated_key:
884 with self.suppress_future_warnings():
885 option = self._get_secret_option(deprecated_section, deprecated_key)
886 if option:
887 if issue_warning:
888 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
889 return option
890 return VALUE_NOT_FOUND_SENTINEL
891
892 def _get_option_from_defaults(
893 self,
894 deprecated_key: str | None,
895 deprecated_section: str | None,
896 key: str,
897 section: str,
898 issue_warning: bool = True,
899 extra_stacklevel: int = 0,
900 team_name: str | None = None,
901 **kwargs,
902 ) -> str | ValueNotFound:
903 """Get config option from default values."""
904 if self.get_default_value(section, key) is not None or "fallback" in kwargs:
905 return expand_env_var(self.get_default_value(section, key, **kwargs))
906 return VALUE_NOT_FOUND_SENTINEL
907
908 def _resolve_deprecated_lookup(
909 self,
910 section: str,
911 key: str,
912 lookup_from_deprecated: bool,
913 extra_stacklevel: int = 0,
914 ) -> tuple[str, str, str | None, str | None, bool]:
915 """
916 Resolve deprecated section/key mappings and determine deprecated values.
917
918 :param section: Section name (will be lowercased)
919 :param key: Key name (will be lowercased)
920 :param lookup_from_deprecated: Whether to lookup from deprecated options
921 :param extra_stacklevel: Extra stack level for warnings
922 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted)
923 """
924 section = section.lower()
925 key = key.lower()
926 warning_emitted = False
927 deprecated_section: str | None = None
928 deprecated_key: str | None = None
929
930 if not lookup_from_deprecated:
931 return section, key, deprecated_section, deprecated_key, warning_emitted
932
933 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {})
934 if option_description.get("deprecated"):
935 deprecation_reason = option_description.get("deprecation_reason", "")
936 warnings.warn(
937 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}",
938 DeprecationWarning,
939 stacklevel=2 + extra_stacklevel,
940 )
941 # For the cases in which we rename whole sections
942 if section in self.inversed_deprecated_sections:
943 deprecated_section, deprecated_key = (section, key)
944 section = self.inversed_deprecated_sections[section]
945 if not self._suppress_future_warnings:
946 warnings.warn(
947 f"The config section [{deprecated_section}] has been renamed to "
948 f"[{section}]. Please update your `conf.get*` call to use the new name",
949 FutureWarning,
950 stacklevel=2 + extra_stacklevel,
951 )
952 # Don't warn about individual rename if the whole section is renamed
953 warning_emitted = True
954 elif (section, key) in self.inversed_deprecated_options:
955 # Handle using deprecated section/key instead of the new section/key
956 new_section, new_key = self.inversed_deprecated_options[(section, key)]
957 if not self._suppress_future_warnings and not warning_emitted:
958 warnings.warn(
959 f"section/key [{section}/{key}] has been deprecated, you should use"
960 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
961 "new name",
962 FutureWarning,
963 stacklevel=2 + extra_stacklevel,
964 )
965 warning_emitted = True
966 deprecated_section, deprecated_key = section, key
967 section, key = (new_section, new_key)
968 elif section in self.deprecated_sections:
969 # When accessing the new section name, make sure we check under the old config name
970 deprecated_key = key
971 deprecated_section = self.deprecated_sections[section][0]
972 else:
973 deprecated_section, deprecated_key, _ = self.deprecated_options.get(
974 (section, key), (None, None, None)
975 )
976
977 return section, key, deprecated_section, deprecated_key, warning_emitted
978
979 @overload # type: ignore[override]
980 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ...
981
982 @overload # type: ignore[override]
983 def get(self, section: str, key: str, **kwargs) -> str | None: ...
984
985 def get( # type: ignore[misc, override]
986 self,
987 section: str,
988 key: str,
989 suppress_warnings: bool = False,
990 lookup_from_deprecated: bool = True,
991 _extra_stacklevel: int = 0,
992 team_name: str | None = None,
993 **kwargs,
994 ) -> str | None:
995 """
996 Get config value by iterating through lookup sequence.
997
998 Priority order is defined by _lookup_sequence property.
999 """
1000 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup(
1001 section=section,
1002 key=key,
1003 lookup_from_deprecated=lookup_from_deprecated,
1004 extra_stacklevel=_extra_stacklevel,
1005 )
1006
1007 if team_name is not None:
1008 kwargs["team_name"] = team_name
1009
1010 for lookup_method in self._lookup_sequence:
1011 value = lookup_method(
1012 deprecated_key=deprecated_key,
1013 deprecated_section=deprecated_section,
1014 key=key,
1015 section=section,
1016 issue_warning=not warning_emitted,
1017 extra_stacklevel=_extra_stacklevel,
1018 **kwargs,
1019 )
1020 if value is not VALUE_NOT_FOUND_SENTINEL:
1021 return value
1022
1023 # Check if fallback was explicitly provided (even if None)
1024 if "fallback" in kwargs:
1025 return kwargs["fallback"]
1026
1027 if not suppress_warnings:
1028 log.warning("section/key [%s/%s] not found in config", section, key)
1029
1030 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
1031
1032 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override]
1033 """Get config value as boolean."""
1034 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip()
1035 if "#" in val:
1036 val = val.split("#")[0].strip()
1037 if val in ("t", "true", "1"):
1038 return True
1039 if val in ("f", "false", "0"):
1040 return False
1041 raise AirflowConfigException(
1042 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. '
1043 f'Current value: "{val}".'
1044 )
1045
1046 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override]
1047 """Get config value as integer."""
1048 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1049 if val is None:
1050 raise AirflowConfigException(
1051 f"Failed to convert value None to int. "
1052 f'Please check "{key}" key in "{section}" section is set.'
1053 )
1054 try:
1055 return int(val)
1056 except ValueError:
1057 raise AirflowConfigException(
1058 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1059 f'Current value: "{val}".'
1060 )
1061
1062 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override]
1063 """Get config value as float."""
1064 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1065 if val is None:
1066 raise AirflowConfigException(
1067 f"Failed to convert value None to float. "
1068 f'Please check "{key}" key in "{section}" section is set.'
1069 )
1070 try:
1071 return float(val)
1072 except ValueError:
1073 raise AirflowConfigException(
1074 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. '
1075 f'Current value: "{val}".'
1076 )
1077
1078 def getlist(self, section: str, key: str, delimiter=",", **kwargs):
1079 """Get config value as list."""
1080 val = self.get(section, key, **kwargs)
1081 if val is None:
1082 if "fallback" in kwargs:
1083 return kwargs["fallback"]
1084 raise AirflowConfigException(
1085 f"Failed to convert value None to list. "
1086 f'Please check "{key}" key in "{section}" section is set.'
1087 )
1088 try:
1089 return [item.strip() for item in val.split(delimiter)]
1090 except Exception:
1091 raise AirflowConfigException(
1092 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. '
1093 f'Current value: "{val}".'
1094 )
1095
1096 E = TypeVar("E", bound=Enum)
1097
1098 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E:
1099 """Get config value as enum."""
1100 val = self.get(section, key, **kwargs)
1101 enum_names = [enum_item.name for enum_item in enum_class]
1102
1103 if val is None:
1104 raise AirflowConfigException(
1105 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1106 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}'
1107 )
1108
1109 try:
1110 return enum_class[val]
1111 except KeyError:
1112 if "fallback" in kwargs and kwargs["fallback"] in enum_names:
1113 return enum_class[kwargs["fallback"]]
1114 raise AirflowConfigException(
1115 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1116 f"the value must be one of {', '.join(enum_names)}"
1117 )
1118
1119 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]:
1120 """Get config value as list of enums."""
1121 string_list = self.getlist(section, key, delimiter, **kwargs)
1122 enum_names = [enum_item.name for enum_item in enum_class]
1123 enum_list = []
1124
1125 for val in string_list:
1126 try:
1127 enum_list.append(enum_class[val])
1128 except KeyError:
1129 log.warning(
1130 "Failed to convert value. Please check %s key in %s section. "
1131 "it must be one of %s, if not the value is ignored",
1132 key,
1133 section,
1134 ", ".join(enum_names),
1135 )
1136
1137 return enum_list
1138
1139 def getimport(self, section: str, key: str, **kwargs) -> Any:
1140 """
1141 Read options, import the full qualified name, and return the object.
1142
1143 In case of failure, it throws an exception with the key and section names
1144
1145 :return: The object or None, if the option is empty
1146 """
1147 # Fixed: use self.get() instead of conf.get()
1148 full_qualified_path = self.get(section=section, key=key, **kwargs)
1149 if not full_qualified_path:
1150 return None
1151
1152 try:
1153 # Import here to avoid circular dependency
1154 from ..module_loading import import_string
1155
1156 return import_string(full_qualified_path)
1157 except ImportError as e:
1158 log.warning(e)
1159 raise AirflowConfigException(
1160 f'The object could not be loaded. Please check "{key}" key in "{section}" section. '
1161 f'Current value: "{full_qualified_path}".'
1162 )
1163
1164 def getjson(
1165 self, section: str, key: str, fallback=None, **kwargs
1166 ) -> dict | list | str | int | float | None:
1167 """
1168 Return a config value parsed from a JSON string.
1169
1170 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given.
1171 """
1172 try:
1173 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs)
1174 except (NoSectionError, NoOptionError):
1175 data = None
1176
1177 if data is None or data == "":
1178 return fallback
1179
1180 try:
1181 return json.loads(data)
1182 except JSONDecodeError as e:
1183 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e
1184
1185 def gettimedelta(
1186 self, section: str, key: str, fallback: Any = None, **kwargs
1187 ) -> datetime.timedelta | None:
1188 """
1189 Get the config value for the given section and key, and convert it into datetime.timedelta object.
1190
1191 If the key is missing, then it is considered as `None`.
1192
1193 :param section: the section from the config
1194 :param key: the key defined in the given section
1195 :param fallback: fallback value when no config value is given, defaults to None
1196 :raises AirflowConfigException: raised because ValueError or OverflowError
1197 :return: datetime.timedelta(seconds=<config_value>) or None
1198 """
1199 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs)
1200
1201 if val:
1202 # the given value must be convertible to integer
1203 try:
1204 int_val = int(val)
1205 except ValueError:
1206 raise AirflowConfigException(
1207 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1208 f'Current value: "{val}".'
1209 )
1210
1211 try:
1212 return datetime.timedelta(seconds=int_val)
1213 except OverflowError as err:
1214 raise AirflowConfigException(
1215 f"Failed to convert value to timedelta in `seconds`. "
1216 f"{err}. "
1217 f'Please check "{key}" key in "{section}" section. Current value: "{val}".'
1218 )
1219
1220 return fallback
1221
1222 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
1223 """Get mandatory config value, raising ValueError if not found."""
1224 value = self.get(section, key, _extra_stacklevel=1, **kwargs)
1225 if value is None:
1226 raise ValueError(f"The value {section}/{key} should be set!")
1227 return value
1228
1229 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]:
1230 """Get mandatory config value as list, raising ValueError if not found."""
1231 value = self.getlist(section, key, **kwargs)
1232 if value is None:
1233 raise ValueError(f"The value {section}/{key} should be set!")
1234 return value
1235
1236 def read(
1237 self,
1238 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike],
1239 encoding: str | None = None,
1240 ) -> list[str]:
1241 return super().read(filenames=filenames, encoding=encoding)
1242
1243 def read_dict( # type: ignore[override]
1244 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>"
1245 ) -> None:
1246 """
1247 We define a different signature here to add better type hints and checking.
1248
1249 :param dictionary: dictionary to read from
1250 :param source: source to be used to store the configuration
1251 :return:
1252 """
1253 super().read_dict(dictionary=dictionary, source=source)
1254
1255 def get_sections_including_defaults(self) -> list[str]:
1256 """
1257 Retrieve all sections from the configuration parser, including sections defined by built-in defaults.
1258
1259 :return: list of section names
1260 """
1261 sections_from_config = self.sections()
1262 sections_from_description = list(self.configuration_description.keys())
1263 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config)))
1264
1265 def get_options_including_defaults(self, section: str) -> list[str]:
1266 """
1267 Retrieve all possible options from the configuration parser for the section given.
1268
1269 Includes options defined by built-in defaults.
1270
1271 :param section: section name
1272 :return: list of option names for the section given
1273 """
1274 my_own_options = self.options(section) if self.has_section(section) else []
1275 all_options_from_defaults = list(
1276 self.configuration_description.get(section, {}).get("options", {}).keys()
1277 )
1278 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options)))
1279
1280 def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True) -> bool:
1281 """
1282 Check if option is defined.
1283
1284 Uses self.get() to avoid reimplementing the priority order of config variables
1285 (env, config, cmd, defaults).
1286
1287 :param section: section to get option from
1288 :param option: option to get
1289 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections
1290 :return:
1291 """
1292 try:
1293 value = self.get(
1294 section,
1295 option,
1296 fallback=None,
1297 _extra_stacklevel=1,
1298 suppress_warnings=True,
1299 lookup_from_deprecated=lookup_from_deprecated,
1300 )
1301 if value is None:
1302 return False
1303 return True
1304 except (NoOptionError, NoSectionError, AirflowConfigException):
1305 return False
1306
1307 def set(self, section: str, option: str, value: str | None = None) -> None:
1308 """
1309 Set an option to the given value.
1310
1311 This override just makes sure the section and option are lower case, to match what we do in `get`.
1312 """
1313 section = section.lower()
1314 option = option.lower()
1315 defaults = self.configuration_description or {}
1316 if not self.has_section(section) and section in defaults:
1317 # Trying to set a key in a section that exists in default, but not in the user config;
1318 # automatically create it
1319 self.add_section(section)
1320 super().set(section, option, value)
1321
1322 def remove_option(self, section: str, option: str, remove_default: bool = True):
1323 """
1324 Remove an option if it exists in config from a file or default config.
1325
1326 If both of config have the same option, this removes the option
1327 in both configs unless remove_default=False.
1328 """
1329 section = section.lower()
1330 option = option.lower()
1331 if super().has_option(section, option):
1332 super().remove_option(section, option)
1333
1334 if self.get_default_value(section, option) is not None and remove_default:
1335 self._default_values.remove_option(section, option)
1336
1337 def optionxform(self, optionstr: str) -> str:
1338 """
1339 Transform option names on every read, get, or set operation.
1340
1341 This changes from the default behaviour of ConfigParser from lower-casing
1342 to instead be case-preserving.
1343
1344 :param optionstr:
1345 :return:
1346 """
1347 return optionstr
1348
1349 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]:
1350 """
1351 Get list of config sources to use in as_dict().
1352
1353 Subclasses can override to add additional sources (e.g., provider configs).
1354 """
1355 return [
1356 ("default", self._default_values),
1357 ("airflow.cfg", self),
1358 ]
1359
1360 def as_dict(
1361 self,
1362 display_source: bool = False,
1363 display_sensitive: bool = False,
1364 raw: bool = False,
1365 include_env: bool = True,
1366 include_cmds: bool = True,
1367 include_secret: bool = True,
1368 ) -> ConfigSourcesType:
1369 """
1370 Return the current configuration as an OrderedDict of OrderedDicts.
1371
1372 When materializing current configuration Airflow defaults are
1373 materialized along with user set configs. If any of the `include_*`
1374 options are False then the result of calling command or secret key
1375 configs do not override Airflow defaults and instead are passed through.
1376 In order to then avoid Airflow defaults from overwriting user set
1377 command or secret key configs we filter out bare sensitive_config_values
1378 that are set to Airflow defaults when command or secret key configs
1379 produce different values.
1380
1381 :param display_source: If False, the option value is returned. If True,
1382 a tuple of (option_value, source) is returned. Source is either
1383 'airflow.cfg', 'default', 'env var', or 'cmd'.
1384 :param display_sensitive: If True, the values of options set by env
1385 vars and bash commands will be displayed. If False, those options
1386 are shown as '< hidden >'
1387 :param raw: Should the values be output as interpolated values, or the
1388 "raw" form that can be fed back in to ConfigParser
1389 :param include_env: Should the value of configuration from AIRFLOW__
1390 environment variables be included or not
1391 :param include_cmds: Should the result of calling any ``*_cmd`` config be
1392 set (True, default), or should the _cmd options be left as the
1393 command to run (False)
1394 :param include_secret: Should the result of calling any ``*_secret`` config be
1395 set (True, default), or should the _secret options be left as the
1396 path to get the secret from (False)
1397 :return: Dictionary, where the key is the name of the section and the content is
1398 the dictionary with the name of the parameter and its value.
1399 """
1400 if not display_sensitive:
1401 # We want to hide the sensitive values at the appropriate methods
1402 # since envs from cmds, secrets can be read at _include_envs method
1403 if not all([include_env, include_cmds, include_secret]):
1404 raise ValueError(
1405 "If display_sensitive is false, then include_env, "
1406 "include_cmds, include_secret must all be set as True"
1407 )
1408
1409 config_sources: ConfigSourcesType = {}
1410
1411 # We check sequentially all those sources and the last one we saw it in will "win"
1412 configs = self._get_config_sources_for_as_dict()
1413
1414 self._replace_config_with_display_sources(
1415 config_sources,
1416 configs,
1417 self.configuration_description,
1418 display_source,
1419 raw,
1420 self.deprecated_options,
1421 include_cmds=include_cmds,
1422 include_env=include_env,
1423 include_secret=include_secret,
1424 )
1425
1426 # add env vars and overwrite because they have priority
1427 if include_env:
1428 self._include_envs(config_sources, display_sensitive, display_source, raw)
1429 else:
1430 self._filter_by_source(config_sources, display_source, self._get_env_var_option)
1431
1432 # add bash commands
1433 if include_cmds:
1434 self._include_commands(config_sources, display_sensitive, display_source, raw)
1435 else:
1436 self._filter_by_source(config_sources, display_source, self._get_cmd_option)
1437
1438 # add config from secret backends
1439 if include_secret:
1440 self._include_secrets(config_sources, display_sensitive, display_source, raw)
1441 else:
1442 self._filter_by_source(config_sources, display_source, self._get_secret_option)
1443
1444 if not display_sensitive:
1445 # This ensures the ones from config file is hidden too
1446 # if they are not provided through env, cmd and secret
1447 hidden = "< hidden >"
1448 for section, key in self.sensitive_config_values:
1449 if config_sources.get(section):
1450 if config_sources[section].get(key, None):
1451 if display_source:
1452 source = config_sources[section][key][1]
1453 config_sources[section][key] = (hidden, source)
1454 else:
1455 config_sources[section][key] = hidden
1456
1457 return config_sources
1458
1459 def _write_option_header(
1460 self,
1461 file: IO[str],
1462 option: str,
1463 extra_spacing: bool,
1464 include_descriptions: bool,
1465 include_env_vars: bool,
1466 include_examples: bool,
1467 include_sources: bool,
1468 section_config_description: dict[str, dict[str, Any]],
1469 section_to_write: str,
1470 sources_dict: ConfigSourcesType,
1471 ) -> tuple[bool, bool]:
1472 """
1473 Write header for configuration option.
1474
1475 Returns tuple of (should_continue, needs_separation) where needs_separation should be
1476 set if the option needs additional separation to visually separate it from the next option.
1477 """
1478 option_config_description = (
1479 section_config_description.get("options", {}).get(option, {})
1480 if section_config_description
1481 else {}
1482 )
1483 description = option_config_description.get("description")
1484 needs_separation = False
1485 if description and include_descriptions:
1486 for line in description.splitlines():
1487 file.write(f"# {line}\n")
1488 needs_separation = True
1489 example = option_config_description.get("example")
1490 if example is not None and include_examples:
1491 if extra_spacing:
1492 file.write("#\n")
1493 example_lines = example.splitlines()
1494 example = "\n# ".join(example_lines)
1495 file.write(f"# Example: {option} = {example}\n")
1496 needs_separation = True
1497 if include_sources and sources_dict:
1498 sources_section = sources_dict.get(section_to_write)
1499 value_with_source = sources_section.get(option) if sources_section else None
1500 if value_with_source is None:
1501 file.write("#\n# Source: not defined\n")
1502 else:
1503 file.write(f"#\n# Source: {value_with_source[1]}\n")
1504 needs_separation = True
1505 if include_env_vars:
1506 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n")
1507 if extra_spacing:
1508 file.write("#\n")
1509 needs_separation = True
1510 return True, needs_separation
1511
1512 def is_template(self, section: str, key) -> bool:
1513 """
1514 Return whether the value is templated.
1515
1516 :param section: section of the config
1517 :param key: key in the section
1518 :return: True if the value is templated
1519 """
1520 return _is_template(self.configuration_description, section, key)
1521
1522 def getsection(self, section: str) -> ConfigOptionsDictType | None:
1523 """
1524 Return the section as a dict.
1525
1526 Values are converted to int, float, bool as required.
1527
1528 :param section: section from the config
1529 """
1530 if not self.has_section(section) and not self._default_values.has_section(section):
1531 return None
1532 if self._default_values.has_section(section):
1533 _section: ConfigOptionsDictType = dict(self._default_values.items(section))
1534 else:
1535 _section = {}
1536
1537 if self.has_section(section):
1538 _section.update(self.items(section))
1539
1540 section_prefix = self._env_var_name(section, "")
1541 for env_var in sorted(os.environ.keys()):
1542 if env_var.startswith(section_prefix):
1543 key = env_var.replace(section_prefix, "")
1544 if key.endswith("_CMD"):
1545 key = key[:-4]
1546 key = key.lower()
1547 _section[key] = self._get_env_var_option(section, key)
1548
1549 for key, val in _section.items():
1550 if val is None:
1551 raise AirflowConfigException(
1552 f"Failed to convert value automatically. "
1553 f'Please check "{key}" key in "{section}" section is set.'
1554 )
1555 try:
1556 _section[key] = int(val)
1557 except ValueError:
1558 try:
1559 _section[key] = float(val)
1560 except ValueError:
1561 if isinstance(val, str) and val.lower() in ("t", "true"):
1562 _section[key] = True
1563 elif isinstance(val, str) and val.lower() in ("f", "false"):
1564 _section[key] = False
1565 return _section
1566
1567 @staticmethod
1568 def _write_section_header(
1569 file: IO[str],
1570 include_descriptions: bool,
1571 section_config_description: dict[str, str],
1572 section_to_write: str,
1573 ) -> None:
1574 """Write header for configuration section."""
1575 file.write(f"[{section_to_write}]\n")
1576 section_description = section_config_description.get("description")
1577 if section_description and include_descriptions:
1578 for line in section_description.splitlines():
1579 file.write(f"# {line}\n")
1580 file.write("\n")
1581
1582 def _write_value(
1583 self,
1584 file: IO[str],
1585 option: str,
1586 comment_out_everything: bool,
1587 needs_separation: bool,
1588 only_defaults: bool,
1589 section_to_write: str,
1590 ):
1591 default_value = self.get_default_value(section_to_write, option, raw=True)
1592 if only_defaults:
1593 value = default_value
1594 else:
1595 value = self.get(section_to_write, option, fallback=default_value, raw=True)
1596 if value is None:
1597 file.write(f"# {option} = \n")
1598 else:
1599 if comment_out_everything:
1600 value_lines = value.splitlines()
1601 value = "\n# ".join(value_lines)
1602 file.write(f"# {option} = {value}\n")
1603 else:
1604 if "\n" in value:
1605 try:
1606 value = json.dumps(json.loads(value), indent=4)
1607 value = value.replace(
1608 "\n", "\n "
1609 ) # indent multi-line JSON to satisfy configparser format
1610 except JSONDecodeError:
1611 pass
1612 file.write(f"{option} = {value}\n")
1613 if needs_separation:
1614 file.write("\n")
1615
1616 def write( # type: ignore[override]
1617 self,
1618 file: IO[str],
1619 section: str | None = None,
1620 include_examples: bool = True,
1621 include_descriptions: bool = True,
1622 include_sources: bool = True,
1623 include_env_vars: bool = True,
1624 include_providers: bool = True,
1625 comment_out_everything: bool = False,
1626 hide_sensitive_values: bool = False,
1627 extra_spacing: bool = True,
1628 only_defaults: bool = False,
1629 **kwargs: Any,
1630 ) -> None:
1631 """
1632 Write configuration with comments and examples to a file.
1633
1634 :param file: file to write to
1635 :param section: section of the config to write, defaults to all sections
1636 :param include_examples: Include examples in the output
1637 :param include_descriptions: Include descriptions in the output
1638 :param include_sources: Include the source of each config option
1639 :param include_env_vars: Include environment variables corresponding to each config option
1640 :param include_providers: Include providers configuration
1641 :param comment_out_everything: Comment out all values
1642 :param hide_sensitive_values: Include sensitive values in the output
1643 :param extra_spacing: Add extra spacing before examples and after variables
1644 :param only_defaults: Only include default values when writing the config, not the actual values
1645 """
1646 sources_dict = {}
1647 if include_sources:
1648 sources_dict = self.as_dict(display_source=True)
1649 with self.make_sure_configuration_loaded(with_providers=include_providers):
1650 for section_to_write in self.get_sections_including_defaults():
1651 section_config_description = self.configuration_description.get(section_to_write, {})
1652 if section_to_write != section and section is not None:
1653 continue
1654 if self._default_values.has_section(section_to_write) or self.has_section(section_to_write):
1655 self._write_section_header(
1656 file, include_descriptions, section_config_description, section_to_write
1657 )
1658 for option in self.get_options_including_defaults(section_to_write):
1659 should_continue, needs_separation = self._write_option_header(
1660 file=file,
1661 option=option,
1662 extra_spacing=extra_spacing,
1663 include_descriptions=include_descriptions,
1664 include_env_vars=include_env_vars,
1665 include_examples=include_examples,
1666 include_sources=include_sources,
1667 section_config_description=section_config_description,
1668 section_to_write=section_to_write,
1669 sources_dict=sources_dict,
1670 )
1671 self._write_value(
1672 file=file,
1673 option=option,
1674 comment_out_everything=comment_out_everything,
1675 needs_separation=needs_separation,
1676 only_defaults=only_defaults,
1677 section_to_write=section_to_write,
1678 )
1679 if include_descriptions and not needs_separation:
1680 # extra separation between sections in case last option did not need it
1681 file.write("\n")
1682
1683 @contextmanager
1684 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]:
1685 """
1686 Make sure configuration is loaded with or without providers.
1687
1688 This happens regardless if the provider configuration has been loaded before or not.
1689 Restores configuration to the state before entering the context.
1690
1691 :param with_providers: whether providers should be loaded
1692 """
1693 needs_reload = False
1694 if with_providers:
1695 self._ensure_providers_config_loaded()
1696 else:
1697 needs_reload = self._ensure_providers_config_unloaded()
1698 yield
1699 if needs_reload:
1700 self._reload_provider_configs()
1701
1702 def _ensure_providers_config_loaded(self) -> None:
1703 """Ensure providers configurations are loaded."""
1704 raise NotImplementedError("Subclasses must implement _ensure_providers_config_loaded method")
1705
1706 def _ensure_providers_config_unloaded(self) -> bool:
1707 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded."""
1708 raise NotImplementedError("Subclasses must implement _ensure_providers_config_unloaded method")
1709
1710 def _reload_provider_configs(self) -> None:
1711 """Reload providers configuration."""
1712 raise NotImplementedError("Subclasses must implement _reload_provider_configs method")