1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Base configuration parser with pure parsing logic."""
19
20from __future__ import annotations
21
22import contextlib
23import datetime
24import functools
25import itertools
26import json
27import logging
28import os
29import shlex
30import subprocess
31import sys
32import warnings
33from collections.abc import Callable, Generator, Iterable
34from configparser import ConfigParser, NoOptionError, NoSectionError
35from contextlib import contextmanager
36from enum import Enum
37from json.decoder import JSONDecodeError
38from re import Pattern
39from typing import IO, Any, TypeVar, overload
40
41from .exceptions import AirflowConfigException
42
43log = logging.getLogger(__name__)
44
45
46ConfigType = str | int | float | bool
47ConfigOptionsDictType = dict[str, ConfigType]
48ConfigSectionSourcesType = dict[str, str | tuple[str, str]]
49ConfigSourcesType = dict[str, ConfigSectionSourcesType]
50ENV_VAR_PREFIX = "AIRFLOW__"
51
52
53class ValueNotFound:
54 """Object of this is raised when a configuration value cannot be found."""
55
56 pass
57
58
59VALUE_NOT_FOUND_SENTINEL = ValueNotFound()
60
61
62@overload
63def expand_env_var(env_var: None) -> None: ...
64@overload
65def expand_env_var(env_var: str) -> str: ...
66
67
68def expand_env_var(env_var: str | None) -> str | None:
69 """
70 Expand (potentially nested) env vars.
71
72 Repeat and apply `expandvars` and `expanduser` until
73 interpolation stops having any effect.
74 """
75 if not env_var or not isinstance(env_var, str):
76 return env_var
77 while True:
78 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
79 if interpolated == env_var:
80 return interpolated
81 env_var = interpolated
82
83
84def run_command(command: str) -> str:
85 """Run command and returns stdout."""
86 process = subprocess.Popen(
87 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
88 )
89 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
90
91 if process.returncode != 0:
92 raise AirflowConfigException(
93 f"Cannot execute {command}. Error code is: {process.returncode}. "
94 f"Output: {output}, Stderr: {stderr}"
95 )
96
97 return output
98
99
100def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool:
101 """
102 Check if the config is a template.
103
104 :param configuration_description: description of configuration
105 :param section: section
106 :param key: key
107 :return: True if the config is a template
108 """
109 return configuration_description.get(section, {}).get(key, {}).get("is_template", False)
110
111
112class AirflowConfigParser(ConfigParser):
113 """
114 Base configuration parser with pure parsing logic.
115
116 This class provides the core parsing methods that work with:
117 - configuration_description: dict describing config options (required in __init__)
118 - _default_values: ConfigParser with default values (required in __init__)
119 - deprecated_options: class attribute mapping new -> old options
120 - deprecated_sections: class attribute mapping new -> old sections
121 """
122
123 # A mapping of section -> setting -> { old, replace } for deprecated default values.
124 # Subclasses can override this to define deprecated values that should be upgraded.
125 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {}
126
127 # A mapping of (new section, new option) -> (old section, old option, since_version).
128 # When reading new option, the old option will be checked to see if it exists. If it does a
129 # DeprecationWarning will be issued and the old option will be used instead
130 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
131 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"),
132 ("api", "base_url"): ("webserver", "base_url", "3.0"),
133 ("api", "host"): ("webserver", "web_server_host", "3.0"),
134 ("api", "port"): ("webserver", "web_server_port", "3.0"),
135 ("api", "workers"): ("webserver", "workers", "3.0"),
136 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"),
137 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"),
138 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"),
139 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"),
140 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"),
141 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"),
142 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"),
143 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"),
144 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"),
145 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"),
146 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"),
147 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"),
148 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"),
149 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"),
150 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"),
151 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"),
152 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"),
153 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"),
154 ("api", "page_size"): ("webserver", "page_size", "3.1.0"),
155 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"),
156 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"),
157 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"),
158 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"),
159 ("api", "log_config"): ("api", "access_logfile", "3.1.0"),
160 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"),
161 }
162
163 # A mapping of new section -> (old section, since_version).
164 deprecated_sections: dict[str, tuple[str, str]] = {}
165
166 @property
167 def _lookup_sequence(self) -> list[Callable]:
168 """
169 Define the sequence of lookup methods for get(). The definition here does not have provider lookup.
170
171 Subclasses can override this to customise lookup order.
172 """
173 return [
174 self._get_environment_variables,
175 self._get_option_from_config_file,
176 self._get_option_from_commands,
177 self._get_option_from_secrets,
178 self._get_option_from_defaults,
179 ]
180
181 @property
182 def _validators(self) -> list[Callable[[], None]]:
183 """
184 Return list of validators defined on a config parser class. Base class will return an empty list.
185
186 Subclasses can override this to customize the validators that are run during validation on the
187 config parser instance.
188 """
189 return []
190
191 def validate(self) -> None:
192 """Run all registered validators."""
193 for validator in self._validators:
194 validator()
195 self.is_validated = True
196
197 def _validate_deprecated_values(self) -> None:
198 """Validate and upgrade deprecated default values."""
199 for section, replacement in self.deprecated_values.items():
200 for name, info in replacement.items():
201 old, new = info
202 current_value = self.get(section, name, fallback="")
203 if self._using_old_value(old, current_value):
204 self.upgraded_values[(section, name)] = current_value
205 new_value = old.sub(new, current_value)
206 self._update_env_var(section=section, name=name, new_value=new_value)
207 self._create_future_warning(
208 name=name,
209 section=section,
210 current_value=current_value,
211 new_value=new_value,
212 )
213
214 def _using_old_value(self, old: Pattern, current_value: str) -> bool:
215 """Check if current_value matches the old pattern."""
216 return old.search(current_value) is not None
217
218 def _update_env_var(self, section: str, name: str, new_value: str) -> None:
219 """Update environment variable with new value."""
220 env_var = self._env_var_name(section, name)
221 # Set it as an env var so that any subprocesses keep the same override!
222 os.environ[env_var] = new_value
223
224 @staticmethod
225 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None:
226 """Create a FutureWarning for deprecated default values."""
227 warnings.warn(
228 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. "
229 f"This value has been changed to {new_value!r} in the running config, but please update your config.",
230 FutureWarning,
231 stacklevel=3,
232 )
233
234 def __init__(
235 self,
236 configuration_description: dict[str, dict[str, Any]],
237 _default_values: ConfigParser,
238 *args,
239 **kwargs,
240 ):
241 """
242 Initialize the parser.
243
244 :param configuration_description: Description of configuration options
245 :param _default_values: ConfigParser with default values
246 """
247 super().__init__(*args, **kwargs)
248 self.configuration_description = configuration_description
249 self._default_values = _default_values
250 self._suppress_future_warnings = False
251 self.upgraded_values: dict[tuple[str, str], str] = {}
252
253 @functools.cached_property
254 def inversed_deprecated_options(self):
255 """Build inverse mapping from old options to new options."""
256 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()}
257
258 @functools.cached_property
259 def inversed_deprecated_sections(self):
260 """Build inverse mapping from old sections to new sections."""
261 return {
262 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items()
263 }
264
265 @functools.cached_property
266 def sensitive_config_values(self) -> set[tuple[str, str]]:
267 """Get set of sensitive config values that should be masked."""
268 flattened = {
269 (s, k): item
270 for s, s_c in self.configuration_description.items()
271 for k, item in s_c.get("options", {}).items()
272 }
273 sensitive = {
274 (section.lower(), key.lower())
275 for (section, key), v in flattened.items()
276 if v.get("sensitive") is True
277 }
278 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options}
279 depr_section = {
280 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections
281 }
282 sensitive.update(depr_section, depr_option)
283 return sensitive
284
285 def _update_defaults_from_string(self, config_string: str) -> None:
286 """
287 Update the defaults in _default_values based on values in config_string ("ini" format).
288
289 Override shared parser's method to add validation for template variables.
290 Note that those values are not validated and cannot contain variables because we are using
291 regular config parser to load them. This method is used to test the config parser in unit tests.
292
293 :param config_string: ini-formatted config string
294 """
295 parser = ConfigParser()
296 parser.read_string(config_string)
297 for section in parser.sections():
298 if section not in self._default_values.sections():
299 self._default_values.add_section(section)
300 errors = False
301 for key, value in parser.items(section):
302 if not self.is_template(section, key) and "{" in value:
303 errors = True
304 log.error(
305 "The %s.%s value %s read from string contains variable. This is not supported",
306 section,
307 key,
308 value,
309 )
310 self._default_values.set(section, key, value)
311 if errors:
312 raise AirflowConfigException(
313 f"The string config passed as default contains variables. "
314 f"This is not supported. String config: {config_string}"
315 )
316
317 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any:
318 """
319 Retrieve default value from default config parser.
320
321 This will retrieve the default value from the default config parser. Optionally a raw, stored
322 value can be retrieved by setting skip_interpolation to True. This is useful for example when
323 we want to write the default value to a file, and we don't want the interpolation to happen
324 as it is going to be done later when the config is read.
325
326 :param section: section of the config
327 :param key: key to use
328 :param fallback: fallback value to use
329 :param raw: if raw, then interpolation will be reversed
330 :param kwargs: other args
331 :return:
332 """
333 value = self._default_values.get(section, key, fallback=fallback, **kwargs)
334 if raw and value is not None:
335 return value.replace("%", "%%")
336 return value
337
338 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None:
339 """
340 Get Secret Backend if defined in airflow.cfg.
341
342 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
343 """
344 section = "workers" if worker_mode else "secrets"
345 key = "secrets_backend" if worker_mode else "backend"
346 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs"
347
348 secrets_backend_cls = self.getimport(section=section, key=key)
349
350 if not secrets_backend_cls:
351 if worker_mode:
352 # if we find no secrets backend for worker, return that of secrets backend
353 secrets_backend_cls = self.getimport(section="secrets", key="backend")
354 if not secrets_backend_cls:
355 return None
356 # When falling back to secrets backend, use its kwargs
357 kwargs_key = "backend_kwargs"
358 section = "secrets"
359 else:
360 return None
361
362 try:
363 backend_kwargs = self.getjson(section=section, key=kwargs_key)
364 if not backend_kwargs:
365 backend_kwargs = {}
366 elif not isinstance(backend_kwargs, dict):
367 raise ValueError("not a dict")
368 except AirflowConfigException:
369 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key)
370 backend_kwargs = {}
371 except ValueError:
372 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key)
373 backend_kwargs = {}
374
375 return secrets_backend_cls(**backend_kwargs)
376
377 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
378 """
379 Get Config option values from Secret Backend.
380
381 Called by the shared parser's _get_secret_option() method as part of the lookup chain.
382 Uses _get_custom_secret_backend() to get the backend instance.
383
384 :param config_key: the config key to retrieve
385 :return: config value or None
386 """
387 try:
388 secrets_client = self._get_custom_secret_backend()
389 if not secrets_client:
390 return None
391 return secrets_client.get_config(config_key)
392 except Exception as e:
393 raise AirflowConfigException(
394 "Cannot retrieve config from alternative secrets backend. "
395 "Make sure it is configured properly and that the Backend "
396 "is accessible.\n"
397 f"{e}"
398 )
399
400 def _get_cmd_option_from_config_sources(
401 self, config_sources: ConfigSourcesType, section: str, key: str
402 ) -> str | None:
403 fallback_key = key + "_cmd"
404 if (section, key) in self.sensitive_config_values:
405 section_dict = config_sources.get(section)
406 if section_dict is not None:
407 command_value = section_dict.get(fallback_key)
408 if command_value is not None:
409 if isinstance(command_value, str):
410 command = command_value
411 else:
412 command = command_value[0]
413 return run_command(command)
414 return None
415
416 def _get_secret_option_from_config_sources(
417 self, config_sources: ConfigSourcesType, section: str, key: str
418 ) -> str | None:
419 fallback_key = key + "_secret"
420 if (section, key) in self.sensitive_config_values:
421 section_dict = config_sources.get(section)
422 if section_dict is not None:
423 secrets_path_value = section_dict.get(fallback_key)
424 if secrets_path_value is not None:
425 if isinstance(secrets_path_value, str):
426 secrets_path = secrets_path_value
427 else:
428 secrets_path = secrets_path_value[0]
429 return self._get_config_value_from_secret_backend(secrets_path)
430 return None
431
432 def _include_secrets(
433 self,
434 config_sources: ConfigSourcesType,
435 display_sensitive: bool,
436 display_source: bool,
437 raw: bool,
438 ):
439 for section, key in self.sensitive_config_values:
440 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key)
441 if value:
442 if not display_sensitive:
443 value = "< hidden >"
444 if display_source:
445 opt: str | tuple[str, str] = (value, "secret")
446 elif raw:
447 opt = value.replace("%", "%%")
448 else:
449 opt = value
450 config_sources.setdefault(section, {}).update({key: opt})
451 del config_sources[section][key + "_secret"]
452
453 def _include_commands(
454 self,
455 config_sources: ConfigSourcesType,
456 display_sensitive: bool,
457 display_source: bool,
458 raw: bool,
459 ):
460 for section, key in self.sensitive_config_values:
461 opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
462 if not opt:
463 continue
464 opt_to_set: str | tuple[str, str] | None = opt
465 if not display_sensitive:
466 opt_to_set = "< hidden >"
467 if display_source:
468 opt_to_set = (str(opt_to_set), "cmd")
469 elif raw:
470 opt_to_set = str(opt_to_set).replace("%", "%%")
471 if opt_to_set is not None:
472 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set}
473 config_sources.setdefault(section, {}).update(dict_to_update)
474 del config_sources[section][key + "_cmd"]
475
476 def _include_envs(
477 self,
478 config_sources: ConfigSourcesType,
479 display_sensitive: bool,
480 display_source: bool,
481 raw: bool,
482 ):
483 for env_var in [
484 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
485 ]:
486 try:
487 _, section, key = env_var.split("__", 2)
488 opt = self._get_env_var_option(section, key)
489 except ValueError:
490 continue
491 if opt is None:
492 log.warning("Ignoring unknown env var '%s'", env_var)
493 continue
494 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"):
495 # Don't hide cmd/secret values here
496 if not env_var.lower().endswith(("cmd", "secret")):
497 if (section, key) in self.sensitive_config_values:
498 opt = "< hidden >"
499 elif raw:
500 opt = opt.replace("%", "%%")
501 if display_source:
502 opt = (opt, "env var")
503
504 section = section.lower()
505 key = key.lower()
506 config_sources.setdefault(section, {}).update({key: opt})
507
508 def _filter_by_source(
509 self,
510 config_sources: ConfigSourcesType,
511 display_source: bool,
512 getter_func,
513 ):
514 """
515 Delete default configs from current configuration.
516
517 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values.
518
519 This is necessary because bare configs take precedence over the command
520 or secret key equivalents so if the current running config is
521 materialized with Airflow defaults they in turn override user set
522 command or secret key configs.
523
524 :param config_sources: The current configuration to operate on
525 :param display_source: If False, configuration options contain raw
526 values. If True, options are a tuple of (option_value, source).
527 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
528 :param getter_func: A callback function that gets the user configured
529 override value for a particular sensitive_config_values config.
530 :return: None, the given config_sources is filtered if necessary,
531 otherwise untouched.
532 """
533 for section, key in self.sensitive_config_values:
534 # Don't bother if we don't have section / key
535 if section not in config_sources or key not in config_sources[section]:
536 continue
537 # Check that there is something to override defaults
538 try:
539 getter_opt = getter_func(section, key)
540 except ValueError:
541 continue
542 if not getter_opt:
543 continue
544 # Check to see that there is a default value
545 if self.get_default_value(section, key) is None:
546 continue
547 # Check to see if bare setting is the same as defaults
548 if display_source:
549 # when display_source = true, we know that the config_sources contains tuple
550 opt, source = config_sources[section][key] # type: ignore
551 else:
552 opt = config_sources[section][key]
553 if opt == self.get_default_value(section, key):
554 del config_sources[section][key]
555
556 @staticmethod
557 def _deprecated_value_is_set_in_config(
558 deprecated_section: str,
559 deprecated_key: str,
560 configs: Iterable[tuple[str, ConfigParser]],
561 ) -> bool:
562 for config_type, config in configs:
563 if config_type != "default":
564 with contextlib.suppress(NoSectionError):
565 deprecated_section_array = config.items(section=deprecated_section, raw=True)
566 if any(key == deprecated_key for key, _ in deprecated_section_array):
567 return True
568 return False
569
570 @staticmethod
571 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
572 return (
573 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}")
574 is not None
575 )
576
577 @staticmethod
578 def _deprecated_command_is_set_in_config(
579 deprecated_section: str,
580 deprecated_key: str,
581 configs: Iterable[tuple[str, ConfigParser]],
582 ) -> bool:
583 return AirflowConfigParser._deprecated_value_is_set_in_config(
584 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
585 )
586
587 @staticmethod
588 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
589 return (
590 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD")
591 is not None
592 )
593
594 @staticmethod
595 def _deprecated_secret_is_set_in_config(
596 deprecated_section: str,
597 deprecated_key: str,
598 configs: Iterable[tuple[str, ConfigParser]],
599 ) -> bool:
600 return AirflowConfigParser._deprecated_value_is_set_in_config(
601 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
602 )
603
604 @staticmethod
605 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
606 return (
607 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET")
608 is not None
609 )
610
611 @staticmethod
612 def _replace_config_with_display_sources(
613 config_sources: ConfigSourcesType,
614 configs: Iterable[tuple[str, ConfigParser]],
615 configuration_description: dict[str, dict[str, Any]],
616 display_source: bool,
617 raw: bool,
618 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
619 include_env: bool,
620 include_cmds: bool,
621 include_secret: bool,
622 ):
623 for source_name, config in configs:
624 sections = config.sections()
625 for section in sections:
626 AirflowConfigParser._replace_section_config_with_display_sources(
627 config,
628 config_sources,
629 configuration_description,
630 display_source,
631 raw,
632 section,
633 source_name,
634 deprecated_options,
635 configs,
636 include_env=include_env,
637 include_cmds=include_cmds,
638 include_secret=include_secret,
639 )
640
641 @staticmethod
642 def _replace_section_config_with_display_sources(
643 config: ConfigParser,
644 config_sources: ConfigSourcesType,
645 configuration_description: dict[str, dict[str, Any]],
646 display_source: bool,
647 raw: bool,
648 section: str,
649 source_name: str,
650 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
651 configs: Iterable[tuple[str, ConfigParser]],
652 include_env: bool,
653 include_cmds: bool,
654 include_secret: bool,
655 ):
656 sect = config_sources.setdefault(section, {})
657 if isinstance(config, AirflowConfigParser):
658 with config.suppress_future_warnings():
659 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw)
660 else:
661 items = config.items(section=section, raw=raw)
662 for k, val in items:
663 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
664 if deprecated_section and deprecated_key:
665 if source_name == "default":
666 # If deprecated entry has some non-default value set for any of the sources requested,
667 # We should NOT set default for the new entry (because it will override anything
668 # coming from the deprecated ones)
669 if AirflowConfigParser._deprecated_value_is_set_in_config(
670 deprecated_section, deprecated_key, configs
671 ):
672 continue
673 if include_env and AirflowConfigParser._deprecated_variable_is_set(
674 deprecated_section, deprecated_key
675 ):
676 continue
677 if include_cmds and (
678 AirflowConfigParser._deprecated_variable_command_is_set(
679 deprecated_section, deprecated_key
680 )
681 or AirflowConfigParser._deprecated_command_is_set_in_config(
682 deprecated_section, deprecated_key, configs
683 )
684 ):
685 continue
686 if include_secret and (
687 AirflowConfigParser._deprecated_variable_secret_is_set(
688 deprecated_section, deprecated_key
689 )
690 or AirflowConfigParser._deprecated_secret_is_set_in_config(
691 deprecated_section, deprecated_key, configs
692 )
693 ):
694 continue
695 if display_source:
696 updated_source_name = source_name
697 if source_name == "default":
698 # defaults can come from other sources (default-<PROVIDER>) that should be used here
699 source_description_section = configuration_description.get(section, {})
700 source_description_key = source_description_section.get("options", {}).get(k, {})
701 if source_description_key is not None:
702 updated_source_name = source_description_key.get("source", source_name)
703 sect[k] = (val, updated_source_name)
704 else:
705 sect[k] = val
706
707 def _warn_deprecate(
708 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int
709 ):
710 """Warn about deprecated config option usage."""
711 if section == deprecated_section:
712 warnings.warn(
713 f"The {deprecated_name} option in [{section}] has been renamed to {key} - "
714 f"the old setting has been used, but please update your config.",
715 DeprecationWarning,
716 stacklevel=4 + extra_stacklevel,
717 )
718 else:
719 warnings.warn(
720 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option "
721 f"in [{section}] - the old setting has been used, but please update your config.",
722 DeprecationWarning,
723 stacklevel=4 + extra_stacklevel,
724 )
725
726 @contextmanager
727 def suppress_future_warnings(self):
728 """
729 Context manager to temporarily suppress future warnings.
730
731 This is a stub used by the shared parser's lookup methods when checking deprecated options.
732 Subclasses can override this to customize warning suppression behavior.
733
734 :return: context manager that suppresses future warnings
735 """
736 suppress_future_warnings = self._suppress_future_warnings
737 self._suppress_future_warnings = True
738 yield self
739 self._suppress_future_warnings = suppress_future_warnings
740
741 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str:
742 """Generate environment variable name for a config option."""
743 team_component: str = f"{team_name.upper()}___" if team_name else ""
744 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}"
745
746 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None):
747 """Get config option from environment variable."""
748 env_var: str = self._env_var_name(section, key, team_name=team_name)
749 if env_var in os.environ:
750 return expand_env_var(os.environ[env_var])
751 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
752 env_var_cmd = env_var + "_CMD"
753 if env_var_cmd in os.environ:
754 # if this is a valid command key...
755 if (section, key) in self.sensitive_config_values:
756 return run_command(os.environ[env_var_cmd])
757 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
758 env_var_secret_path = env_var + "_SECRET"
759 if env_var_secret_path in os.environ:
760 # if this is a valid secret path...
761 if (section, key) in self.sensitive_config_values:
762 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path])
763 return None
764
765 def _get_cmd_option(self, section: str, key: str):
766 """Get config option from command execution."""
767 fallback_key = key + "_cmd"
768 if (section, key) in self.sensitive_config_values:
769 if super().has_option(section, fallback_key):
770 command = super().get(section, fallback_key)
771 try:
772 cmd_output = run_command(command)
773 except AirflowConfigException as e:
774 raise e
775 except Exception as e:
776 raise AirflowConfigException(
777 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd."
778 f" Please check the {fallback_key} value."
779 ) from e
780 return cmd_output
781 return None
782
783 def _get_secret_option(self, section: str, key: str) -> str | None:
784 """Get Config option values from Secret Backend."""
785 fallback_key = key + "_secret"
786 if (section, key) in self.sensitive_config_values:
787 if super().has_option(section, fallback_key):
788 secrets_path = super().get(section, fallback_key)
789 return self._get_config_value_from_secret_backend(secrets_path)
790 return None
791
792 def _get_environment_variables(
793 self,
794 deprecated_key: str | None,
795 deprecated_section: str | None,
796 key: str,
797 section: str,
798 issue_warning: bool = True,
799 extra_stacklevel: int = 0,
800 **kwargs,
801 ) -> str | ValueNotFound:
802 """Get config option from environment variables."""
803 team_name = kwargs.get("team_name", None)
804 option = self._get_env_var_option(section, key, team_name=team_name)
805 if option is not None:
806 return option
807 if deprecated_section and deprecated_key:
808 with self.suppress_future_warnings():
809 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name)
810 if option is not None:
811 if issue_warning:
812 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
813 return option
814 return VALUE_NOT_FOUND_SENTINEL
815
816 def _get_option_from_config_file(
817 self,
818 deprecated_key: str | None,
819 deprecated_section: str | None,
820 key: str,
821 section: str,
822 issue_warning: bool = True,
823 extra_stacklevel: int = 0,
824 **kwargs,
825 ) -> str | ValueNotFound:
826 """Get config option from config file."""
827 if team_name := kwargs.get("team_name", None):
828 section = f"{team_name}={section}"
829 # since this is the last lookup that supports team_name, pop it
830 kwargs.pop("team_name")
831 if super().has_option(section, key):
832 return expand_env_var(super().get(section, key, **kwargs))
833 if deprecated_section and deprecated_key:
834 if super().has_option(deprecated_section, deprecated_key):
835 if issue_warning:
836 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
837 with self.suppress_future_warnings():
838 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
839 return VALUE_NOT_FOUND_SENTINEL
840
841 def _get_option_from_commands(
842 self,
843 deprecated_key: str | None,
844 deprecated_section: str | None,
845 key: str,
846 section: str,
847 issue_warning: bool = True,
848 extra_stacklevel: int = 0,
849 **kwargs,
850 ) -> str | ValueNotFound:
851 """Get config option from command execution."""
852 option = self._get_cmd_option(section, key)
853 if option:
854 return option
855 if deprecated_section and deprecated_key:
856 with self.suppress_future_warnings():
857 option = self._get_cmd_option(deprecated_section, deprecated_key)
858 if option:
859 if issue_warning:
860 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
861 return option
862 return VALUE_NOT_FOUND_SENTINEL
863
864 def _get_option_from_secrets(
865 self,
866 deprecated_key: str | None,
867 deprecated_section: str | None,
868 key: str,
869 section: str,
870 issue_warning: bool = True,
871 extra_stacklevel: int = 0,
872 **kwargs,
873 ) -> str | ValueNotFound:
874 """Get config option from secrets backend."""
875 option = self._get_secret_option(section, key)
876 if option:
877 return option
878 if deprecated_section and deprecated_key:
879 with self.suppress_future_warnings():
880 option = self._get_secret_option(deprecated_section, deprecated_key)
881 if option:
882 if issue_warning:
883 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
884 return option
885 return VALUE_NOT_FOUND_SENTINEL
886
887 def _get_option_from_defaults(
888 self,
889 deprecated_key: str | None,
890 deprecated_section: str | None,
891 key: str,
892 section: str,
893 issue_warning: bool = True,
894 extra_stacklevel: int = 0,
895 team_name: str | None = None,
896 **kwargs,
897 ) -> str | ValueNotFound:
898 """Get config option from default values."""
899 if self.get_default_value(section, key) is not None or "fallback" in kwargs:
900 return expand_env_var(self.get_default_value(section, key, **kwargs))
901 return VALUE_NOT_FOUND_SENTINEL
902
903 def _resolve_deprecated_lookup(
904 self,
905 section: str,
906 key: str,
907 lookup_from_deprecated: bool,
908 extra_stacklevel: int = 0,
909 ) -> tuple[str, str, str | None, str | None, bool]:
910 """
911 Resolve deprecated section/key mappings and determine deprecated values.
912
913 :param section: Section name (will be lowercased)
914 :param key: Key name (will be lowercased)
915 :param lookup_from_deprecated: Whether to lookup from deprecated options
916 :param extra_stacklevel: Extra stack level for warnings
917 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted)
918 """
919 section = section.lower()
920 key = key.lower()
921 warning_emitted = False
922 deprecated_section: str | None = None
923 deprecated_key: str | None = None
924
925 if not lookup_from_deprecated:
926 return section, key, deprecated_section, deprecated_key, warning_emitted
927
928 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {})
929 if option_description.get("deprecated"):
930 deprecation_reason = option_description.get("deprecation_reason", "")
931 warnings.warn(
932 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}",
933 DeprecationWarning,
934 stacklevel=2 + extra_stacklevel,
935 )
936 # For the cases in which we rename whole sections
937 if section in self.inversed_deprecated_sections:
938 deprecated_section, deprecated_key = (section, key)
939 section = self.inversed_deprecated_sections[section]
940 if not self._suppress_future_warnings:
941 warnings.warn(
942 f"The config section [{deprecated_section}] has been renamed to "
943 f"[{section}]. Please update your `conf.get*` call to use the new name",
944 FutureWarning,
945 stacklevel=2 + extra_stacklevel,
946 )
947 # Don't warn about individual rename if the whole section is renamed
948 warning_emitted = True
949 elif (section, key) in self.inversed_deprecated_options:
950 # Handle using deprecated section/key instead of the new section/key
951 new_section, new_key = self.inversed_deprecated_options[(section, key)]
952 if not self._suppress_future_warnings and not warning_emitted:
953 warnings.warn(
954 f"section/key [{section}/{key}] has been deprecated, you should use"
955 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
956 "new name",
957 FutureWarning,
958 stacklevel=2 + extra_stacklevel,
959 )
960 warning_emitted = True
961 deprecated_section, deprecated_key = section, key
962 section, key = (new_section, new_key)
963 elif section in self.deprecated_sections:
964 # When accessing the new section name, make sure we check under the old config name
965 deprecated_key = key
966 deprecated_section = self.deprecated_sections[section][0]
967 else:
968 deprecated_section, deprecated_key, _ = self.deprecated_options.get(
969 (section, key), (None, None, None)
970 )
971
972 return section, key, deprecated_section, deprecated_key, warning_emitted
973
974 @overload # type: ignore[override]
975 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ...
976
977 @overload # type: ignore[override]
978 def get(self, section: str, key: str, **kwargs) -> str | None: ...
979
980 def get( # type: ignore[misc, override]
981 self,
982 section: str,
983 key: str,
984 suppress_warnings: bool = False,
985 lookup_from_deprecated: bool = True,
986 _extra_stacklevel: int = 0,
987 team_name: str | None = None,
988 **kwargs,
989 ) -> str | None:
990 """
991 Get config value by iterating through lookup sequence.
992
993 Priority order is defined by _lookup_sequence property.
994 """
995 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup(
996 section=section,
997 key=key,
998 lookup_from_deprecated=lookup_from_deprecated,
999 extra_stacklevel=_extra_stacklevel,
1000 )
1001
1002 if team_name is not None:
1003 kwargs["team_name"] = team_name
1004
1005 for lookup_method in self._lookup_sequence:
1006 value = lookup_method(
1007 deprecated_key=deprecated_key,
1008 deprecated_section=deprecated_section,
1009 key=key,
1010 section=section,
1011 issue_warning=not warning_emitted,
1012 extra_stacklevel=_extra_stacklevel,
1013 **kwargs,
1014 )
1015 if value is not VALUE_NOT_FOUND_SENTINEL:
1016 return value
1017
1018 # Check if fallback was explicitly provided (even if None)
1019 if "fallback" in kwargs:
1020 return kwargs["fallback"]
1021
1022 if not suppress_warnings:
1023 log.warning("section/key [%s/%s] not found in config", section, key)
1024
1025 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
1026
1027 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override]
1028 """Get config value as boolean."""
1029 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip()
1030 if "#" in val:
1031 val = val.split("#")[0].strip()
1032 if val in ("t", "true", "1"):
1033 return True
1034 if val in ("f", "false", "0"):
1035 return False
1036 raise AirflowConfigException(
1037 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. '
1038 f'Current value: "{val}".'
1039 )
1040
1041 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override]
1042 """Get config value as integer."""
1043 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1044 if val is None:
1045 raise AirflowConfigException(
1046 f"Failed to convert value None to int. "
1047 f'Please check "{key}" key in "{section}" section is set.'
1048 )
1049 try:
1050 return int(val)
1051 except ValueError:
1052 raise AirflowConfigException(
1053 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1054 f'Current value: "{val}".'
1055 )
1056
1057 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override]
1058 """Get config value as float."""
1059 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1060 if val is None:
1061 raise AirflowConfigException(
1062 f"Failed to convert value None to float. "
1063 f'Please check "{key}" key in "{section}" section is set.'
1064 )
1065 try:
1066 return float(val)
1067 except ValueError:
1068 raise AirflowConfigException(
1069 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. '
1070 f'Current value: "{val}".'
1071 )
1072
1073 def getlist(self, section: str, key: str, delimiter=",", **kwargs):
1074 """Get config value as list."""
1075 val = self.get(section, key, **kwargs)
1076
1077 if isinstance(val, list) or val is None:
1078 # `get` will always return a (possibly-empty) string, so the only way we can
1079 # have these types is with `fallback=` was specified. So just return it.
1080 return val
1081
1082 if val == "":
1083 return []
1084
1085 try:
1086 return [item.strip() for item in val.split(delimiter)]
1087 except Exception:
1088 raise AirflowConfigException(
1089 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. '
1090 f'Current value: "{val}".'
1091 )
1092
1093 E = TypeVar("E", bound=Enum)
1094
1095 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E:
1096 """Get config value as enum."""
1097 val = self.get(section, key, **kwargs)
1098 enum_names = [enum_item.name for enum_item in enum_class]
1099
1100 if val is None:
1101 raise AirflowConfigException(
1102 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1103 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}'
1104 )
1105
1106 try:
1107 return enum_class[val]
1108 except KeyError:
1109 if "fallback" in kwargs and kwargs["fallback"] in enum_names:
1110 return enum_class[kwargs["fallback"]]
1111 raise AirflowConfigException(
1112 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1113 f"the value must be one of {', '.join(enum_names)}"
1114 )
1115
1116 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]:
1117 """Get config value as list of enums."""
1118 kwargs.setdefault("fallback", [])
1119 string_list = self.getlist(section, key, delimiter, **kwargs)
1120
1121 enum_names = [enum_item.name for enum_item in enum_class]
1122 enum_list = []
1123
1124 for val in string_list:
1125 try:
1126 enum_list.append(enum_class[val])
1127 except KeyError:
1128 log.warning(
1129 "Failed to convert value %r. Please check %s key in %s section. "
1130 "it must be one of %s, if not the value is ignored",
1131 val,
1132 key,
1133 section,
1134 ", ".join(enum_names),
1135 )
1136
1137 return enum_list
1138
1139 def getimport(self, section: str, key: str, **kwargs) -> Any:
1140 """
1141 Read options, import the full qualified name, and return the object.
1142
1143 In case of failure, it throws an exception with the key and section names
1144
1145 :return: The object or None, if the option is empty
1146 """
1147 # Fixed: use self.get() instead of conf.get()
1148 full_qualified_path = self.get(section=section, key=key, **kwargs)
1149 if not full_qualified_path:
1150 return None
1151
1152 try:
1153 # Import here to avoid circular dependency
1154 from ..module_loading import import_string
1155
1156 return import_string(full_qualified_path)
1157 except ImportError as e:
1158 log.warning(e)
1159 raise AirflowConfigException(
1160 f'The object could not be loaded. Please check "{key}" key in "{section}" section. '
1161 f'Current value: "{full_qualified_path}".'
1162 )
1163
1164 def getjson(
1165 self, section: str, key: str, fallback=None, **kwargs
1166 ) -> dict | list | str | int | float | None:
1167 """
1168 Return a config value parsed from a JSON string.
1169
1170 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given.
1171 """
1172 try:
1173 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs)
1174 except (NoSectionError, NoOptionError):
1175 data = None
1176
1177 if data is None or data == "":
1178 return fallback
1179
1180 try:
1181 return json.loads(data)
1182 except JSONDecodeError as e:
1183 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e
1184
1185 def gettimedelta(
1186 self, section: str, key: str, fallback: Any = None, **kwargs
1187 ) -> datetime.timedelta | None:
1188 """
1189 Get the config value for the given section and key, and convert it into datetime.timedelta object.
1190
1191 If the key is missing, then it is considered as `None`.
1192
1193 :param section: the section from the config
1194 :param key: the key defined in the given section
1195 :param fallback: fallback value when no config value is given, defaults to None
1196 :raises AirflowConfigException: raised because ValueError or OverflowError
1197 :return: datetime.timedelta(seconds=<config_value>) or None
1198 """
1199 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs)
1200
1201 if val:
1202 # the given value must be convertible to integer
1203 try:
1204 int_val = int(val)
1205 except ValueError:
1206 raise AirflowConfigException(
1207 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1208 f'Current value: "{val}".'
1209 )
1210
1211 try:
1212 return datetime.timedelta(seconds=int_val)
1213 except OverflowError as err:
1214 raise AirflowConfigException(
1215 f"Failed to convert value to timedelta in `seconds`. "
1216 f"{err}. "
1217 f'Please check "{key}" key in "{section}" section. Current value: "{val}".'
1218 )
1219
1220 return fallback
1221
1222 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
1223 """Get mandatory config value, raising ValueError if not found."""
1224 value = self.get(section, key, _extra_stacklevel=1, **kwargs)
1225 if value is None:
1226 raise ValueError(f"The value {section}/{key} should be set!")
1227 return value
1228
1229 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]:
1230 """Get mandatory config value as list, raising ValueError if not found."""
1231 value = self.getlist(section, key, **kwargs)
1232 if value is None:
1233 raise ValueError(f"The value {section}/{key} should be set!")
1234 return value
1235
1236 def read(
1237 self,
1238 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike],
1239 encoding: str | None = None,
1240 ) -> list[str]:
1241 return super().read(filenames=filenames, encoding=encoding)
1242
1243 def read_dict( # type: ignore[override]
1244 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>"
1245 ) -> None:
1246 """
1247 We define a different signature here to add better type hints and checking.
1248
1249 :param dictionary: dictionary to read from
1250 :param source: source to be used to store the configuration
1251 :return:
1252 """
1253 super().read_dict(dictionary=dictionary, source=source)
1254
1255 def get_sections_including_defaults(self) -> list[str]:
1256 """
1257 Retrieve all sections from the configuration parser, including sections defined by built-in defaults.
1258
1259 :return: list of section names
1260 """
1261 sections_from_config = self.sections()
1262 sections_from_description = list(self.configuration_description.keys())
1263 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config)))
1264
1265 def get_options_including_defaults(self, section: str) -> list[str]:
1266 """
1267 Retrieve all possible options from the configuration parser for the section given.
1268
1269 Includes options defined by built-in defaults.
1270
1271 :param section: section name
1272 :return: list of option names for the section given
1273 """
1274 my_own_options = self.options(section) if self.has_section(section) else []
1275 all_options_from_defaults = list(
1276 self.configuration_description.get(section, {}).get("options", {}).keys()
1277 )
1278 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options)))
1279
1280 def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True, **kwargs) -> bool:
1281 """
1282 Check if option is defined.
1283
1284 Uses self.get() to avoid reimplementing the priority order of config variables
1285 (env, config, cmd, defaults).
1286
1287 :param section: section to get option from
1288 :param option: option to get
1289 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections
1290 :param kwargs: additional keyword arguments to pass to get(), such as team_name
1291 :return:
1292 """
1293 try:
1294 value = self.get(
1295 section,
1296 option,
1297 fallback=None,
1298 _extra_stacklevel=1,
1299 suppress_warnings=True,
1300 lookup_from_deprecated=lookup_from_deprecated,
1301 **kwargs,
1302 )
1303 if value is None:
1304 return False
1305 return True
1306 except (NoOptionError, NoSectionError, AirflowConfigException):
1307 return False
1308
1309 def set(self, section: str, option: str, value: str | None = None) -> None:
1310 """
1311 Set an option to the given value.
1312
1313 This override just makes sure the section and option are lower case, to match what we do in `get`.
1314 """
1315 section = section.lower()
1316 option = option.lower()
1317 defaults = self.configuration_description or {}
1318 if not self.has_section(section) and section in defaults:
1319 # Trying to set a key in a section that exists in default, but not in the user config;
1320 # automatically create it
1321 self.add_section(section)
1322 super().set(section, option, value)
1323
1324 def remove_option(self, section: str, option: str, remove_default: bool = True):
1325 """
1326 Remove an option if it exists in config from a file or default config.
1327
1328 If both of config have the same option, this removes the option
1329 in both configs unless remove_default=False.
1330 """
1331 section = section.lower()
1332 option = option.lower()
1333 if super().has_option(section, option):
1334 super().remove_option(section, option)
1335
1336 if self.get_default_value(section, option) is not None and remove_default:
1337 self._default_values.remove_option(section, option)
1338
1339 def optionxform(self, optionstr: str) -> str:
1340 """
1341 Transform option names on every read, get, or set operation.
1342
1343 This changes from the default behaviour of ConfigParser from lower-casing
1344 to instead be case-preserving.
1345
1346 :param optionstr:
1347 :return:
1348 """
1349 return optionstr
1350
1351 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]:
1352 """
1353 Get list of config sources to use in as_dict().
1354
1355 Subclasses can override to add additional sources (e.g., provider configs).
1356 """
1357 return [
1358 ("default", self._default_values),
1359 ("airflow.cfg", self),
1360 ]
1361
1362 def as_dict(
1363 self,
1364 display_source: bool = False,
1365 display_sensitive: bool = False,
1366 raw: bool = False,
1367 include_env: bool = True,
1368 include_cmds: bool = True,
1369 include_secret: bool = True,
1370 ) -> ConfigSourcesType:
1371 """
1372 Return the current configuration as an OrderedDict of OrderedDicts.
1373
1374 When materializing current configuration Airflow defaults are
1375 materialized along with user set configs. If any of the `include_*`
1376 options are False then the result of calling command or secret key
1377 configs do not override Airflow defaults and instead are passed through.
1378 In order to then avoid Airflow defaults from overwriting user set
1379 command or secret key configs we filter out bare sensitive_config_values
1380 that are set to Airflow defaults when command or secret key configs
1381 produce different values.
1382
1383 :param display_source: If False, the option value is returned. If True,
1384 a tuple of (option_value, source) is returned. Source is either
1385 'airflow.cfg', 'default', 'env var', or 'cmd'.
1386 :param display_sensitive: If True, the values of options set by env
1387 vars and bash commands will be displayed. If False, those options
1388 are shown as '< hidden >'
1389 :param raw: Should the values be output as interpolated values, or the
1390 "raw" form that can be fed back in to ConfigParser
1391 :param include_env: Should the value of configuration from AIRFLOW__
1392 environment variables be included or not
1393 :param include_cmds: Should the result of calling any ``*_cmd`` config be
1394 set (True, default), or should the _cmd options be left as the
1395 command to run (False)
1396 :param include_secret: Should the result of calling any ``*_secret`` config be
1397 set (True, default), or should the _secret options be left as the
1398 path to get the secret from (False)
1399 :return: Dictionary, where the key is the name of the section and the content is
1400 the dictionary with the name of the parameter and its value.
1401 """
1402 if not display_sensitive:
1403 # We want to hide the sensitive values at the appropriate methods
1404 # since envs from cmds, secrets can be read at _include_envs method
1405 if not all([include_env, include_cmds, include_secret]):
1406 raise ValueError(
1407 "If display_sensitive is false, then include_env, "
1408 "include_cmds, include_secret must all be set as True"
1409 )
1410
1411 config_sources: ConfigSourcesType = {}
1412
1413 # We check sequentially all those sources and the last one we saw it in will "win"
1414 configs = self._get_config_sources_for_as_dict()
1415
1416 self._replace_config_with_display_sources(
1417 config_sources,
1418 configs,
1419 self.configuration_description,
1420 display_source,
1421 raw,
1422 self.deprecated_options,
1423 include_cmds=include_cmds,
1424 include_env=include_env,
1425 include_secret=include_secret,
1426 )
1427
1428 # add env vars and overwrite because they have priority
1429 if include_env:
1430 self._include_envs(config_sources, display_sensitive, display_source, raw)
1431 else:
1432 self._filter_by_source(config_sources, display_source, self._get_env_var_option)
1433
1434 # add bash commands
1435 if include_cmds:
1436 self._include_commands(config_sources, display_sensitive, display_source, raw)
1437 else:
1438 self._filter_by_source(config_sources, display_source, self._get_cmd_option)
1439
1440 # add config from secret backends
1441 if include_secret:
1442 self._include_secrets(config_sources, display_sensitive, display_source, raw)
1443 else:
1444 self._filter_by_source(config_sources, display_source, self._get_secret_option)
1445
1446 if not display_sensitive:
1447 # This ensures the ones from config file is hidden too
1448 # if they are not provided through env, cmd and secret
1449 hidden = "< hidden >"
1450 for section, key in self.sensitive_config_values:
1451 if config_sources.get(section):
1452 if config_sources[section].get(key, None):
1453 if display_source:
1454 source = config_sources[section][key][1]
1455 config_sources[section][key] = (hidden, source)
1456 else:
1457 config_sources[section][key] = hidden
1458
1459 return config_sources
1460
1461 def _write_option_header(
1462 self,
1463 file: IO[str],
1464 option: str,
1465 extra_spacing: bool,
1466 include_descriptions: bool,
1467 include_env_vars: bool,
1468 include_examples: bool,
1469 include_sources: bool,
1470 section_config_description: dict[str, dict[str, Any]],
1471 section_to_write: str,
1472 sources_dict: ConfigSourcesType,
1473 ) -> tuple[bool, bool]:
1474 """
1475 Write header for configuration option.
1476
1477 Returns tuple of (should_continue, needs_separation) where needs_separation should be
1478 set if the option needs additional separation to visually separate it from the next option.
1479 """
1480 option_config_description = (
1481 section_config_description.get("options", {}).get(option, {})
1482 if section_config_description
1483 else {}
1484 )
1485 description = option_config_description.get("description")
1486 needs_separation = False
1487 if description and include_descriptions:
1488 for line in description.splitlines():
1489 file.write(f"# {line}\n")
1490 needs_separation = True
1491 example = option_config_description.get("example")
1492 if example is not None and include_examples:
1493 if extra_spacing:
1494 file.write("#\n")
1495 example_lines = example.splitlines()
1496 example = "\n# ".join(example_lines)
1497 file.write(f"# Example: {option} = {example}\n")
1498 needs_separation = True
1499 if include_sources and sources_dict:
1500 sources_section = sources_dict.get(section_to_write)
1501 value_with_source = sources_section.get(option) if sources_section else None
1502 if value_with_source is None:
1503 file.write("#\n# Source: not defined\n")
1504 else:
1505 file.write(f"#\n# Source: {value_with_source[1]}\n")
1506 needs_separation = True
1507 if include_env_vars:
1508 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n")
1509 if extra_spacing:
1510 file.write("#\n")
1511 needs_separation = True
1512 return True, needs_separation
1513
1514 def is_template(self, section: str, key) -> bool:
1515 """
1516 Return whether the value is templated.
1517
1518 :param section: section of the config
1519 :param key: key in the section
1520 :return: True if the value is templated
1521 """
1522 return _is_template(self.configuration_description, section, key)
1523
1524 def getsection(self, section: str, team_name: str | None = None) -> ConfigOptionsDictType | None:
1525 """
1526 Return the section as a dict.
1527
1528 Values are converted to int, float, bool as required.
1529
1530 :param section: section from the config
1531 :param team_name: optional team name for team-specific configuration lookup
1532 """
1533 # Handle team-specific section lookup for config file
1534 config_section = f"{team_name}={section}" if team_name else section
1535
1536 if not self.has_section(config_section) and not self._default_values.has_section(config_section):
1537 return None
1538 if self._default_values.has_section(config_section):
1539 _section: ConfigOptionsDictType = dict(self._default_values.items(config_section))
1540 else:
1541 _section = {}
1542
1543 if self.has_section(config_section):
1544 _section.update(self.items(config_section))
1545
1546 # Use section (not config_section) for env var lookup - team_name is handled by _env_var_name
1547 section_prefix = self._env_var_name(section, "", team_name=team_name)
1548 for env_var in sorted(os.environ.keys()):
1549 if env_var.startswith(section_prefix):
1550 key = env_var.replace(section_prefix, "")
1551 if key.endswith("_CMD"):
1552 key = key[:-4]
1553 key = key.lower()
1554 _section[key] = self._get_env_var_option(section, key, team_name=team_name)
1555
1556 for key, val in _section.items():
1557 if val is None:
1558 raise AirflowConfigException(
1559 f"Failed to convert value automatically. "
1560 f'Please check "{key}" key in "{section}" section is set.'
1561 )
1562 try:
1563 _section[key] = int(val)
1564 except ValueError:
1565 try:
1566 _section[key] = float(val)
1567 except ValueError:
1568 if isinstance(val, str) and val.lower() in ("t", "true"):
1569 _section[key] = True
1570 elif isinstance(val, str) and val.lower() in ("f", "false"):
1571 _section[key] = False
1572 return _section
1573
1574 @staticmethod
1575 def _write_section_header(
1576 file: IO[str],
1577 include_descriptions: bool,
1578 section_config_description: dict[str, str],
1579 section_to_write: str,
1580 ) -> None:
1581 """Write header for configuration section."""
1582 file.write(f"[{section_to_write}]\n")
1583 section_description = section_config_description.get("description")
1584 if section_description and include_descriptions:
1585 for line in section_description.splitlines():
1586 file.write(f"# {line}\n")
1587 file.write("\n")
1588
1589 def _write_value(
1590 self,
1591 file: IO[str],
1592 option: str,
1593 comment_out_everything: bool,
1594 needs_separation: bool,
1595 only_defaults: bool,
1596 section_to_write: str,
1597 ):
1598 default_value = self.get_default_value(section_to_write, option, raw=True)
1599 if only_defaults:
1600 value = default_value
1601 else:
1602 value = self.get(section_to_write, option, fallback=default_value, raw=True)
1603 if value is None:
1604 file.write(f"# {option} = \n")
1605 else:
1606 if comment_out_everything:
1607 value_lines = value.splitlines()
1608 value = "\n# ".join(value_lines)
1609 file.write(f"# {option} = {value}\n")
1610 else:
1611 if "\n" in value:
1612 try:
1613 value = json.dumps(json.loads(value), indent=4)
1614 value = value.replace(
1615 "\n", "\n "
1616 ) # indent multi-line JSON to satisfy configparser format
1617 except JSONDecodeError:
1618 pass
1619 file.write(f"{option} = {value}\n")
1620 if needs_separation:
1621 file.write("\n")
1622
1623 def write( # type: ignore[override]
1624 self,
1625 file: IO[str],
1626 section: str | None = None,
1627 include_examples: bool = True,
1628 include_descriptions: bool = True,
1629 include_sources: bool = True,
1630 include_env_vars: bool = True,
1631 include_providers: bool = True,
1632 comment_out_everything: bool = False,
1633 hide_sensitive_values: bool = False,
1634 extra_spacing: bool = True,
1635 only_defaults: bool = False,
1636 **kwargs: Any,
1637 ) -> None:
1638 """
1639 Write configuration with comments and examples to a file.
1640
1641 :param file: file to write to
1642 :param section: section of the config to write, defaults to all sections
1643 :param include_examples: Include examples in the output
1644 :param include_descriptions: Include descriptions in the output
1645 :param include_sources: Include the source of each config option
1646 :param include_env_vars: Include environment variables corresponding to each config option
1647 :param include_providers: Include providers configuration
1648 :param comment_out_everything: Comment out all values
1649 :param hide_sensitive_values: Include sensitive values in the output
1650 :param extra_spacing: Add extra spacing before examples and after variables
1651 :param only_defaults: Only include default values when writing the config, not the actual values
1652 """
1653 sources_dict = {}
1654 if include_sources:
1655 sources_dict = self.as_dict(display_source=True)
1656 with self.make_sure_configuration_loaded(with_providers=include_providers):
1657 for section_to_write in self.get_sections_including_defaults():
1658 section_config_description = self.configuration_description.get(section_to_write, {})
1659 if section_to_write != section and section is not None:
1660 continue
1661 if self._default_values.has_section(section_to_write) or self.has_section(section_to_write):
1662 self._write_section_header(
1663 file, include_descriptions, section_config_description, section_to_write
1664 )
1665 for option in self.get_options_including_defaults(section_to_write):
1666 should_continue, needs_separation = self._write_option_header(
1667 file=file,
1668 option=option,
1669 extra_spacing=extra_spacing,
1670 include_descriptions=include_descriptions,
1671 include_env_vars=include_env_vars,
1672 include_examples=include_examples,
1673 include_sources=include_sources,
1674 section_config_description=section_config_description,
1675 section_to_write=section_to_write,
1676 sources_dict=sources_dict,
1677 )
1678 self._write_value(
1679 file=file,
1680 option=option,
1681 comment_out_everything=comment_out_everything,
1682 needs_separation=needs_separation,
1683 only_defaults=only_defaults,
1684 section_to_write=section_to_write,
1685 )
1686 if include_descriptions and not needs_separation:
1687 # extra separation between sections in case last option did not need it
1688 file.write("\n")
1689
1690 @contextmanager
1691 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]:
1692 """
1693 Make sure configuration is loaded with or without providers.
1694
1695 This happens regardless if the provider configuration has been loaded before or not.
1696 Restores configuration to the state before entering the context.
1697
1698 :param with_providers: whether providers should be loaded
1699 """
1700 needs_reload = False
1701 if with_providers:
1702 self._ensure_providers_config_loaded()
1703 else:
1704 needs_reload = self._ensure_providers_config_unloaded()
1705 yield
1706 if needs_reload:
1707 self._reload_provider_configs()
1708
1709 def _ensure_providers_config_loaded(self) -> None:
1710 """Ensure providers configurations are loaded."""
1711 raise NotImplementedError("Subclasses must implement _ensure_providers_config_loaded method")
1712
1713 def _ensure_providers_config_unloaded(self) -> bool:
1714 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded."""
1715 raise NotImplementedError("Subclasses must implement _ensure_providers_config_unloaded method")
1716
1717 def _reload_provider_configs(self) -> None:
1718 """Reload providers configuration."""
1719 raise NotImplementedError("Subclasses must implement _reload_provider_configs method")