1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Base configuration parser with pure parsing logic."""
19
20from __future__ import annotations
21
22import contextlib
23import datetime
24import functools
25import itertools
26import json
27import logging
28import os
29import shlex
30import subprocess
31import sys
32import warnings
33from collections.abc import Callable, Generator, Iterable
34from configparser import ConfigParser, NoOptionError, NoSectionError
35from contextlib import contextmanager
36from enum import Enum
37from json.decoder import JSONDecodeError
38from re import Pattern
39from typing import IO, Any, TypeVar, overload
40
41from .exceptions import AirflowConfigException
42
43log = logging.getLogger(__name__)
44
45
46ConfigType = str | int | float | bool
47ConfigOptionsDictType = dict[str, ConfigType]
48ConfigSectionSourcesType = dict[str, str | tuple[str, str]]
49ConfigSourcesType = dict[str, ConfigSectionSourcesType]
50ENV_VAR_PREFIX = "AIRFLOW__"
51
52
53class ValueNotFound:
54 """Object of this is raised when a configuration value cannot be found."""
55
56 pass
57
58
59VALUE_NOT_FOUND_SENTINEL = ValueNotFound()
60
61
62@overload
63def expand_env_var(env_var: None) -> None: ...
64@overload
65def expand_env_var(env_var: str) -> str: ...
66
67
68def expand_env_var(env_var: str | None) -> str | None:
69 """
70 Expand (potentially nested) env vars.
71
72 Repeat and apply `expandvars` and `expanduser` until
73 interpolation stops having any effect.
74 """
75 if not env_var or not isinstance(env_var, str):
76 return env_var
77 while True:
78 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
79 if interpolated == env_var:
80 return interpolated
81 env_var = interpolated
82
83
84def run_command(command: str) -> str:
85 """Run command and returns stdout."""
86 process = subprocess.Popen(
87 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
88 )
89 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
90
91 if process.returncode != 0:
92 raise AirflowConfigException(
93 f"Cannot execute {command}. Error code is: {process.returncode}. "
94 f"Output: {output}, Stderr: {stderr}"
95 )
96
97 return output
98
99
100def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool:
101 """
102 Check if the config is a template.
103
104 :param configuration_description: description of configuration
105 :param section: section
106 :param key: key
107 :return: True if the config is a template
108 """
109 return configuration_description.get(section, {}).get(key, {}).get("is_template", False)
110
111
112class AirflowConfigParser(ConfigParser):
113 """
114 Base configuration parser with pure parsing logic.
115
116 This class provides the core parsing methods that work with:
117 - configuration_description: dict describing config options (required in __init__)
118 - _default_values: ConfigParser with default values (required in __init__)
119 - deprecated_options: class attribute mapping new -> old options
120 - deprecated_sections: class attribute mapping new -> old sections
121 """
122
123 # A mapping of section -> setting -> { old, replace } for deprecated default values.
124 # Subclasses can override this to define deprecated values that should be upgraded.
125 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {}
126
127 # A mapping of (new section, new option) -> (old section, old option, since_version).
128 # When reading new option, the old option will be checked to see if it exists. If it does a
129 # DeprecationWarning will be issued and the old option will be used instead
130 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
131 ("dag_processor", "dag_file_processor_timeout"): ("core", "dag_file_processor_timeout", "3.0"),
132 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"),
133 ("api", "base_url"): ("webserver", "base_url", "3.0"),
134 ("api", "host"): ("webserver", "web_server_host", "3.0"),
135 ("api", "port"): ("webserver", "web_server_port", "3.0"),
136 ("api", "workers"): ("webserver", "workers", "3.0"),
137 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"),
138 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"),
139 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"),
140 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"),
141 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"),
142 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"),
143 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"),
144 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"),
145 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"),
146 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"),
147 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"),
148 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"),
149 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"),
150 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"),
151 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"),
152 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"),
153 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"),
154 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"),
155 ("api", "page_size"): ("webserver", "page_size", "3.1.0"),
156 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"),
157 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"),
158 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"),
159 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"),
160 ("api", "log_config"): ("api", "access_logfile", "3.1.0"),
161 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"),
162 }
163
164 # A mapping of new section -> (old section, since_version).
165 deprecated_sections: dict[str, tuple[str, str]] = {}
166
167 @property
168 def _lookup_sequence(self) -> list[Callable]:
169 """
170 Define the sequence of lookup methods for get(). The definition here does not have provider lookup.
171
172 Subclasses can override this to customise lookup order.
173 """
174 return [
175 self._get_environment_variables,
176 self._get_option_from_config_file,
177 self._get_option_from_commands,
178 self._get_option_from_secrets,
179 self._get_option_from_defaults,
180 ]
181
182 @property
183 def _validators(self) -> list[Callable[[], None]]:
184 """
185 Return list of validators defined on a config parser class. Base class will return an empty list.
186
187 Subclasses can override this to customize the validators that are run during validation on the
188 config parser instance.
189 """
190 return []
191
192 def validate(self) -> None:
193 """Run all registered validators."""
194 for validator in self._validators:
195 validator()
196 self.is_validated = True
197
198 def _validate_deprecated_values(self) -> None:
199 """Validate and upgrade deprecated default values."""
200 for section, replacement in self.deprecated_values.items():
201 for name, info in replacement.items():
202 old, new = info
203 current_value = self.get(section, name, fallback="")
204 if self._using_old_value(old, current_value):
205 self.upgraded_values[(section, name)] = current_value
206 new_value = old.sub(new, current_value)
207 self._update_env_var(section=section, name=name, new_value=new_value)
208 self._create_future_warning(
209 name=name,
210 section=section,
211 current_value=current_value,
212 new_value=new_value,
213 )
214
215 def _using_old_value(self, old: Pattern, current_value: str) -> bool:
216 """Check if current_value matches the old pattern."""
217 return old.search(current_value) is not None
218
219 def _update_env_var(self, section: str, name: str, new_value: str) -> None:
220 """Update environment variable with new value."""
221 env_var = self._env_var_name(section, name)
222 # Set it as an env var so that any subprocesses keep the same override!
223 os.environ[env_var] = new_value
224
225 @staticmethod
226 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None:
227 """Create a FutureWarning for deprecated default values."""
228 warnings.warn(
229 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. "
230 f"This value has been changed to {new_value!r} in the running config, but please update your config.",
231 FutureWarning,
232 stacklevel=3,
233 )
234
235 def __init__(
236 self,
237 configuration_description: dict[str, dict[str, Any]],
238 _default_values: ConfigParser,
239 *args,
240 **kwargs,
241 ):
242 """
243 Initialize the parser.
244
245 :param configuration_description: Description of configuration options
246 :param _default_values: ConfigParser with default values
247 """
248 super().__init__(*args, **kwargs)
249 self.configuration_description = configuration_description
250 self._default_values = _default_values
251 self._suppress_future_warnings = False
252 self.upgraded_values: dict[tuple[str, str], str] = {}
253
254 @functools.cached_property
255 def inversed_deprecated_options(self):
256 """Build inverse mapping from old options to new options."""
257 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()}
258
259 @functools.cached_property
260 def inversed_deprecated_sections(self):
261 """Build inverse mapping from old sections to new sections."""
262 return {
263 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items()
264 }
265
266 @functools.cached_property
267 def sensitive_config_values(self) -> set[tuple[str, str]]:
268 """Get set of sensitive config values that should be masked."""
269 flattened = {
270 (s, k): item
271 for s, s_c in self.configuration_description.items()
272 for k, item in s_c.get("options", {}).items()
273 }
274 sensitive = {
275 (section.lower(), key.lower())
276 for (section, key), v in flattened.items()
277 if v.get("sensitive") is True
278 }
279 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options}
280 depr_section = {
281 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections
282 }
283 sensitive.update(depr_section, depr_option)
284 return sensitive
285
286 def _update_defaults_from_string(self, config_string: str) -> None:
287 """
288 Update the defaults in _default_values based on values in config_string ("ini" format).
289
290 Override shared parser's method to add validation for template variables.
291 Note that those values are not validated and cannot contain variables because we are using
292 regular config parser to load them. This method is used to test the config parser in unit tests.
293
294 :param config_string: ini-formatted config string
295 """
296 parser = ConfigParser()
297 parser.read_string(config_string)
298 for section in parser.sections():
299 if section not in self._default_values.sections():
300 self._default_values.add_section(section)
301 errors = False
302 for key, value in parser.items(section):
303 if not self.is_template(section, key) and "{" in value:
304 errors = True
305 log.error(
306 "The %s.%s value %s read from string contains variable. This is not supported",
307 section,
308 key,
309 value,
310 )
311 self._default_values.set(section, key, value)
312 if errors:
313 raise AirflowConfigException(
314 f"The string config passed as default contains variables. "
315 f"This is not supported. String config: {config_string}"
316 )
317
318 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any:
319 """
320 Retrieve default value from default config parser.
321
322 This will retrieve the default value from the default config parser. Optionally a raw, stored
323 value can be retrieved by setting skip_interpolation to True. This is useful for example when
324 we want to write the default value to a file, and we don't want the interpolation to happen
325 as it is going to be done later when the config is read.
326
327 :param section: section of the config
328 :param key: key to use
329 :param fallback: fallback value to use
330 :param raw: if raw, then interpolation will be reversed
331 :param kwargs: other args
332 :return:
333 """
334 value = self._default_values.get(section, key, fallback=fallback, **kwargs)
335 if raw and value is not None:
336 return value.replace("%", "%%")
337 return value
338
339 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None:
340 """
341 Get Secret Backend if defined in airflow.cfg.
342
343 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
344 """
345 section = "workers" if worker_mode else "secrets"
346 key = "secrets_backend" if worker_mode else "backend"
347 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs"
348
349 secrets_backend_cls = self.getimport(section=section, key=key)
350
351 if not secrets_backend_cls:
352 if worker_mode:
353 # if we find no secrets backend for worker, return that of secrets backend
354 secrets_backend_cls = self.getimport(section="secrets", key="backend")
355 if not secrets_backend_cls:
356 return None
357 # When falling back to secrets backend, use its kwargs
358 kwargs_key = "backend_kwargs"
359 section = "secrets"
360 else:
361 return None
362
363 try:
364 backend_kwargs = self.getjson(section=section, key=kwargs_key)
365 if not backend_kwargs:
366 backend_kwargs = {}
367 elif not isinstance(backend_kwargs, dict):
368 raise ValueError("not a dict")
369 except AirflowConfigException:
370 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key)
371 backend_kwargs = {}
372 except ValueError:
373 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key)
374 backend_kwargs = {}
375
376 return secrets_backend_cls(**backend_kwargs)
377
378 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
379 """
380 Get Config option values from Secret Backend.
381
382 Called by the shared parser's _get_secret_option() method as part of the lookup chain.
383 Uses _get_custom_secret_backend() to get the backend instance.
384
385 :param config_key: the config key to retrieve
386 :return: config value or None
387 """
388 try:
389 secrets_client = self._get_custom_secret_backend()
390 if not secrets_client:
391 return None
392 return secrets_client.get_config(config_key)
393 except Exception as e:
394 raise AirflowConfigException(
395 "Cannot retrieve config from alternative secrets backend. "
396 "Make sure it is configured properly and that the Backend "
397 "is accessible.\n"
398 f"{e}"
399 )
400
401 def _get_cmd_option_from_config_sources(
402 self, config_sources: ConfigSourcesType, section: str, key: str
403 ) -> str | None:
404 fallback_key = key + "_cmd"
405 if (section, key) in self.sensitive_config_values:
406 section_dict = config_sources.get(section)
407 if section_dict is not None:
408 command_value = section_dict.get(fallback_key)
409 if command_value is not None:
410 if isinstance(command_value, str):
411 command = command_value
412 else:
413 command = command_value[0]
414 return run_command(command)
415 return None
416
417 def _get_secret_option_from_config_sources(
418 self, config_sources: ConfigSourcesType, section: str, key: str
419 ) -> str | None:
420 fallback_key = key + "_secret"
421 if (section, key) in self.sensitive_config_values:
422 section_dict = config_sources.get(section)
423 if section_dict is not None:
424 secrets_path_value = section_dict.get(fallback_key)
425 if secrets_path_value is not None:
426 if isinstance(secrets_path_value, str):
427 secrets_path = secrets_path_value
428 else:
429 secrets_path = secrets_path_value[0]
430 return self._get_config_value_from_secret_backend(secrets_path)
431 return None
432
433 def _include_secrets(
434 self,
435 config_sources: ConfigSourcesType,
436 display_sensitive: bool,
437 display_source: bool,
438 raw: bool,
439 ):
440 for section, key in self.sensitive_config_values:
441 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key)
442 if value:
443 if not display_sensitive:
444 value = "< hidden >"
445 if display_source:
446 opt: str | tuple[str, str] = (value, "secret")
447 elif raw:
448 opt = value.replace("%", "%%")
449 else:
450 opt = value
451 config_sources.setdefault(section, {}).update({key: opt})
452 del config_sources[section][key + "_secret"]
453
454 def _include_commands(
455 self,
456 config_sources: ConfigSourcesType,
457 display_sensitive: bool,
458 display_source: bool,
459 raw: bool,
460 ):
461 for section, key in self.sensitive_config_values:
462 opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
463 if not opt:
464 continue
465 opt_to_set: str | tuple[str, str] | None = opt
466 if not display_sensitive:
467 opt_to_set = "< hidden >"
468 if display_source:
469 opt_to_set = (str(opt_to_set), "cmd")
470 elif raw:
471 opt_to_set = str(opt_to_set).replace("%", "%%")
472 if opt_to_set is not None:
473 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set}
474 config_sources.setdefault(section, {}).update(dict_to_update)
475 del config_sources[section][key + "_cmd"]
476
477 def _include_envs(
478 self,
479 config_sources: ConfigSourcesType,
480 display_sensitive: bool,
481 display_source: bool,
482 raw: bool,
483 ):
484 for env_var in [
485 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
486 ]:
487 try:
488 _, section, key = env_var.split("__", 2)
489 opt = self._get_env_var_option(section, key)
490 except ValueError:
491 continue
492 if opt is None:
493 log.warning("Ignoring unknown env var '%s'", env_var)
494 continue
495 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"):
496 # Don't hide cmd/secret values here
497 if not env_var.lower().endswith(("cmd", "secret")):
498 if (section, key) in self.sensitive_config_values:
499 opt = "< hidden >"
500 elif raw:
501 opt = opt.replace("%", "%%")
502 if display_source:
503 opt = (opt, "env var")
504
505 section = section.lower()
506 key = key.lower()
507 config_sources.setdefault(section, {}).update({key: opt})
508
509 def _filter_by_source(
510 self,
511 config_sources: ConfigSourcesType,
512 display_source: bool,
513 getter_func,
514 ):
515 """
516 Delete default configs from current configuration.
517
518 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values.
519
520 This is necessary because bare configs take precedence over the command
521 or secret key equivalents so if the current running config is
522 materialized with Airflow defaults they in turn override user set
523 command or secret key configs.
524
525 :param config_sources: The current configuration to operate on
526 :param display_source: If False, configuration options contain raw
527 values. If True, options are a tuple of (option_value, source).
528 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
529 :param getter_func: A callback function that gets the user configured
530 override value for a particular sensitive_config_values config.
531 :return: None, the given config_sources is filtered if necessary,
532 otherwise untouched.
533 """
534 for section, key in self.sensitive_config_values:
535 # Don't bother if we don't have section / key
536 if section not in config_sources or key not in config_sources[section]:
537 continue
538 # Check that there is something to override defaults
539 try:
540 getter_opt = getter_func(section, key)
541 except ValueError:
542 continue
543 if not getter_opt:
544 continue
545 # Check to see that there is a default value
546 if self.get_default_value(section, key) is None:
547 continue
548 # Check to see if bare setting is the same as defaults
549 if display_source:
550 # when display_source = true, we know that the config_sources contains tuple
551 opt, source = config_sources[section][key] # type: ignore
552 else:
553 opt = config_sources[section][key]
554 if opt == self.get_default_value(section, key):
555 del config_sources[section][key]
556
557 @staticmethod
558 def _deprecated_value_is_set_in_config(
559 deprecated_section: str,
560 deprecated_key: str,
561 configs: Iterable[tuple[str, ConfigParser]],
562 ) -> bool:
563 for config_type, config in configs:
564 if config_type != "default":
565 with contextlib.suppress(NoSectionError):
566 deprecated_section_array = config.items(section=deprecated_section, raw=True)
567 if any(key == deprecated_key for key, _ in deprecated_section_array):
568 return True
569 return False
570
571 @staticmethod
572 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
573 return (
574 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}")
575 is not None
576 )
577
578 @staticmethod
579 def _deprecated_command_is_set_in_config(
580 deprecated_section: str,
581 deprecated_key: str,
582 configs: Iterable[tuple[str, ConfigParser]],
583 ) -> bool:
584 return AirflowConfigParser._deprecated_value_is_set_in_config(
585 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
586 )
587
588 @staticmethod
589 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
590 return (
591 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD")
592 is not None
593 )
594
595 @staticmethod
596 def _deprecated_secret_is_set_in_config(
597 deprecated_section: str,
598 deprecated_key: str,
599 configs: Iterable[tuple[str, ConfigParser]],
600 ) -> bool:
601 return AirflowConfigParser._deprecated_value_is_set_in_config(
602 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
603 )
604
605 @staticmethod
606 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
607 return (
608 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET")
609 is not None
610 )
611
612 @staticmethod
613 def _replace_config_with_display_sources(
614 config_sources: ConfigSourcesType,
615 configs: Iterable[tuple[str, ConfigParser]],
616 configuration_description: dict[str, dict[str, Any]],
617 display_source: bool,
618 raw: bool,
619 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
620 include_env: bool,
621 include_cmds: bool,
622 include_secret: bool,
623 ):
624 for source_name, config in configs:
625 sections = config.sections()
626 for section in sections:
627 AirflowConfigParser._replace_section_config_with_display_sources(
628 config,
629 config_sources,
630 configuration_description,
631 display_source,
632 raw,
633 section,
634 source_name,
635 deprecated_options,
636 configs,
637 include_env=include_env,
638 include_cmds=include_cmds,
639 include_secret=include_secret,
640 )
641
642 @staticmethod
643 def _replace_section_config_with_display_sources(
644 config: ConfigParser,
645 config_sources: ConfigSourcesType,
646 configuration_description: dict[str, dict[str, Any]],
647 display_source: bool,
648 raw: bool,
649 section: str,
650 source_name: str,
651 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
652 configs: Iterable[tuple[str, ConfigParser]],
653 include_env: bool,
654 include_cmds: bool,
655 include_secret: bool,
656 ):
657 sect = config_sources.setdefault(section, {})
658 if isinstance(config, AirflowConfigParser):
659 with config.suppress_future_warnings():
660 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw)
661 else:
662 items = config.items(section=section, raw=raw)
663 for k, val in items:
664 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
665 if deprecated_section and deprecated_key:
666 if source_name == "default":
667 # If deprecated entry has some non-default value set for any of the sources requested,
668 # We should NOT set default for the new entry (because it will override anything
669 # coming from the deprecated ones)
670 if AirflowConfigParser._deprecated_value_is_set_in_config(
671 deprecated_section, deprecated_key, configs
672 ):
673 continue
674 if include_env and AirflowConfigParser._deprecated_variable_is_set(
675 deprecated_section, deprecated_key
676 ):
677 continue
678 if include_cmds and (
679 AirflowConfigParser._deprecated_variable_command_is_set(
680 deprecated_section, deprecated_key
681 )
682 or AirflowConfigParser._deprecated_command_is_set_in_config(
683 deprecated_section, deprecated_key, configs
684 )
685 ):
686 continue
687 if include_secret and (
688 AirflowConfigParser._deprecated_variable_secret_is_set(
689 deprecated_section, deprecated_key
690 )
691 or AirflowConfigParser._deprecated_secret_is_set_in_config(
692 deprecated_section, deprecated_key, configs
693 )
694 ):
695 continue
696 if display_source:
697 updated_source_name = source_name
698 if source_name == "default":
699 # defaults can come from other sources (default-<PROVIDER>) that should be used here
700 source_description_section = configuration_description.get(section, {})
701 source_description_key = source_description_section.get("options", {}).get(k, {})
702 if source_description_key is not None:
703 updated_source_name = source_description_key.get("source", source_name)
704 sect[k] = (val, updated_source_name)
705 else:
706 sect[k] = val
707
708 def _warn_deprecate(
709 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int
710 ):
711 """Warn about deprecated config option usage."""
712 if section == deprecated_section:
713 warnings.warn(
714 f"The {deprecated_name} option in [{section}] has been renamed to {key} - "
715 f"the old setting has been used, but please update your config.",
716 DeprecationWarning,
717 stacklevel=4 + extra_stacklevel,
718 )
719 else:
720 warnings.warn(
721 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option "
722 f"in [{section}] - the old setting has been used, but please update your config.",
723 DeprecationWarning,
724 stacklevel=4 + extra_stacklevel,
725 )
726
727 @contextmanager
728 def suppress_future_warnings(self):
729 """
730 Context manager to temporarily suppress future warnings.
731
732 This is a stub used by the shared parser's lookup methods when checking deprecated options.
733 Subclasses can override this to customize warning suppression behavior.
734
735 :return: context manager that suppresses future warnings
736 """
737 suppress_future_warnings = self._suppress_future_warnings
738 self._suppress_future_warnings = True
739 yield self
740 self._suppress_future_warnings = suppress_future_warnings
741
742 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str:
743 """Generate environment variable name for a config option."""
744 team_component: str = f"{team_name.upper()}___" if team_name else ""
745 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}"
746
747 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None):
748 """Get config option from environment variable."""
749 env_var: str = self._env_var_name(section, key, team_name=team_name)
750 if env_var in os.environ:
751 return expand_env_var(os.environ[env_var])
752 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
753 env_var_cmd = env_var + "_CMD"
754 if env_var_cmd in os.environ:
755 # if this is a valid command key...
756 if (section, key) in self.sensitive_config_values:
757 return run_command(os.environ[env_var_cmd])
758 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
759 env_var_secret_path = env_var + "_SECRET"
760 if env_var_secret_path in os.environ:
761 # if this is a valid secret path...
762 if (section, key) in self.sensitive_config_values:
763 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path])
764 return None
765
766 def _get_cmd_option(self, section: str, key: str):
767 """Get config option from command execution."""
768 fallback_key = key + "_cmd"
769 if (section, key) in self.sensitive_config_values:
770 if super().has_option(section, fallback_key):
771 command = super().get(section, fallback_key)
772 try:
773 cmd_output = run_command(command)
774 except AirflowConfigException as e:
775 raise e
776 except Exception as e:
777 raise AirflowConfigException(
778 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd."
779 f" Please check the {fallback_key} value."
780 ) from e
781 return cmd_output
782 return None
783
784 def _get_secret_option(self, section: str, key: str) -> str | None:
785 """Get Config option values from Secret Backend."""
786 fallback_key = key + "_secret"
787 if (section, key) in self.sensitive_config_values:
788 if super().has_option(section, fallback_key):
789 secrets_path = super().get(section, fallback_key)
790 return self._get_config_value_from_secret_backend(secrets_path)
791 return None
792
793 def _get_environment_variables(
794 self,
795 deprecated_key: str | None,
796 deprecated_section: str | None,
797 key: str,
798 section: str,
799 issue_warning: bool = True,
800 extra_stacklevel: int = 0,
801 **kwargs,
802 ) -> str | ValueNotFound:
803 """Get config option from environment variables."""
804 team_name = kwargs.get("team_name", None)
805 option = self._get_env_var_option(section, key, team_name=team_name)
806 if option is not None:
807 return option
808 if deprecated_section and deprecated_key:
809 with self.suppress_future_warnings():
810 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name)
811 if option is not None:
812 if issue_warning:
813 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
814 return option
815 return VALUE_NOT_FOUND_SENTINEL
816
817 def _get_option_from_config_file(
818 self,
819 deprecated_key: str | None,
820 deprecated_section: str | None,
821 key: str,
822 section: str,
823 issue_warning: bool = True,
824 extra_stacklevel: int = 0,
825 **kwargs,
826 ) -> str | ValueNotFound:
827 """Get config option from config file."""
828 if team_name := kwargs.get("team_name", None):
829 section = f"{team_name}={section}"
830 # since this is the last lookup that supports team_name, pop it
831 kwargs.pop("team_name")
832 if super().has_option(section, key):
833 return expand_env_var(super().get(section, key, **kwargs))
834 if deprecated_section and deprecated_key:
835 if super().has_option(deprecated_section, deprecated_key):
836 if issue_warning:
837 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
838 with self.suppress_future_warnings():
839 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
840 return VALUE_NOT_FOUND_SENTINEL
841
842 def _get_option_from_commands(
843 self,
844 deprecated_key: str | None,
845 deprecated_section: str | None,
846 key: str,
847 section: str,
848 issue_warning: bool = True,
849 extra_stacklevel: int = 0,
850 **kwargs,
851 ) -> str | ValueNotFound:
852 """Get config option from command execution."""
853 option = self._get_cmd_option(section, key)
854 if option:
855 return option
856 if deprecated_section and deprecated_key:
857 with self.suppress_future_warnings():
858 option = self._get_cmd_option(deprecated_section, deprecated_key)
859 if option:
860 if issue_warning:
861 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
862 return option
863 return VALUE_NOT_FOUND_SENTINEL
864
865 def _get_option_from_secrets(
866 self,
867 deprecated_key: str | None,
868 deprecated_section: str | None,
869 key: str,
870 section: str,
871 issue_warning: bool = True,
872 extra_stacklevel: int = 0,
873 **kwargs,
874 ) -> str | ValueNotFound:
875 """Get config option from secrets backend."""
876 option = self._get_secret_option(section, key)
877 if option:
878 return option
879 if deprecated_section and deprecated_key:
880 with self.suppress_future_warnings():
881 option = self._get_secret_option(deprecated_section, deprecated_key)
882 if option:
883 if issue_warning:
884 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
885 return option
886 return VALUE_NOT_FOUND_SENTINEL
887
888 def _get_option_from_defaults(
889 self,
890 deprecated_key: str | None,
891 deprecated_section: str | None,
892 key: str,
893 section: str,
894 issue_warning: bool = True,
895 extra_stacklevel: int = 0,
896 team_name: str | None = None,
897 **kwargs,
898 ) -> str | ValueNotFound:
899 """Get config option from default values."""
900 if self.get_default_value(section, key) is not None or "fallback" in kwargs:
901 return expand_env_var(self.get_default_value(section, key, **kwargs))
902 return VALUE_NOT_FOUND_SENTINEL
903
904 def _resolve_deprecated_lookup(
905 self,
906 section: str,
907 key: str,
908 lookup_from_deprecated: bool,
909 extra_stacklevel: int = 0,
910 ) -> tuple[str, str, str | None, str | None, bool]:
911 """
912 Resolve deprecated section/key mappings and determine deprecated values.
913
914 :param section: Section name (will be lowercased)
915 :param key: Key name (will be lowercased)
916 :param lookup_from_deprecated: Whether to lookup from deprecated options
917 :param extra_stacklevel: Extra stack level for warnings
918 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted)
919 """
920 section = section.lower()
921 key = key.lower()
922 warning_emitted = False
923 deprecated_section: str | None = None
924 deprecated_key: str | None = None
925
926 if not lookup_from_deprecated:
927 return section, key, deprecated_section, deprecated_key, warning_emitted
928
929 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {})
930 if option_description.get("deprecated"):
931 deprecation_reason = option_description.get("deprecation_reason", "")
932 warnings.warn(
933 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}",
934 DeprecationWarning,
935 stacklevel=2 + extra_stacklevel,
936 )
937 # For the cases in which we rename whole sections
938 if section in self.inversed_deprecated_sections:
939 deprecated_section, deprecated_key = (section, key)
940 section = self.inversed_deprecated_sections[section]
941 if not self._suppress_future_warnings:
942 warnings.warn(
943 f"The config section [{deprecated_section}] has been renamed to "
944 f"[{section}]. Please update your `conf.get*` call to use the new name",
945 FutureWarning,
946 stacklevel=2 + extra_stacklevel,
947 )
948 # Don't warn about individual rename if the whole section is renamed
949 warning_emitted = True
950 elif (section, key) in self.inversed_deprecated_options:
951 # Handle using deprecated section/key instead of the new section/key
952 new_section, new_key = self.inversed_deprecated_options[(section, key)]
953 if not self._suppress_future_warnings and not warning_emitted:
954 warnings.warn(
955 f"section/key [{section}/{key}] has been deprecated, you should use"
956 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
957 "new name",
958 FutureWarning,
959 stacklevel=2 + extra_stacklevel,
960 )
961 warning_emitted = True
962 deprecated_section, deprecated_key = section, key
963 section, key = (new_section, new_key)
964 elif section in self.deprecated_sections:
965 # When accessing the new section name, make sure we check under the old config name
966 deprecated_key = key
967 deprecated_section = self.deprecated_sections[section][0]
968 else:
969 deprecated_section, deprecated_key, _ = self.deprecated_options.get(
970 (section, key), (None, None, None)
971 )
972
973 return section, key, deprecated_section, deprecated_key, warning_emitted
974
975 @overload # type: ignore[override]
976 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ...
977
978 @overload # type: ignore[override]
979 def get(self, section: str, key: str, **kwargs) -> str | None: ...
980
981 def get( # type: ignore[misc, override]
982 self,
983 section: str,
984 key: str,
985 suppress_warnings: bool = False,
986 lookup_from_deprecated: bool = True,
987 _extra_stacklevel: int = 0,
988 team_name: str | None = None,
989 **kwargs,
990 ) -> str | None:
991 """
992 Get config value by iterating through lookup sequence.
993
994 Priority order is defined by _lookup_sequence property.
995 """
996 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup(
997 section=section,
998 key=key,
999 lookup_from_deprecated=lookup_from_deprecated,
1000 extra_stacklevel=_extra_stacklevel,
1001 )
1002
1003 if team_name is not None:
1004 kwargs["team_name"] = team_name
1005
1006 for lookup_method in self._lookup_sequence:
1007 value = lookup_method(
1008 deprecated_key=deprecated_key,
1009 deprecated_section=deprecated_section,
1010 key=key,
1011 section=section,
1012 issue_warning=not warning_emitted,
1013 extra_stacklevel=_extra_stacklevel,
1014 **kwargs,
1015 )
1016 if value is not VALUE_NOT_FOUND_SENTINEL:
1017 return value
1018
1019 # Check if fallback was explicitly provided (even if None)
1020 if "fallback" in kwargs:
1021 return kwargs["fallback"]
1022
1023 if not suppress_warnings:
1024 log.warning("section/key [%s/%s] not found in config", section, key)
1025
1026 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
1027
1028 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override]
1029 """Get config value as boolean."""
1030 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip()
1031 if "#" in val:
1032 val = val.split("#")[0].strip()
1033 if val in ("t", "true", "1"):
1034 return True
1035 if val in ("f", "false", "0"):
1036 return False
1037 raise AirflowConfigException(
1038 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. '
1039 f'Current value: "{val}".'
1040 )
1041
1042 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override]
1043 """Get config value as integer."""
1044 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1045 if val is None:
1046 raise AirflowConfigException(
1047 f"Failed to convert value None to int. "
1048 f'Please check "{key}" key in "{section}" section is set.'
1049 )
1050 try:
1051 return int(val)
1052 except ValueError:
1053 raise AirflowConfigException(
1054 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1055 f'Current value: "{val}".'
1056 )
1057
1058 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override]
1059 """Get config value as float."""
1060 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1061 if val is None:
1062 raise AirflowConfigException(
1063 f"Failed to convert value None to float. "
1064 f'Please check "{key}" key in "{section}" section is set.'
1065 )
1066 try:
1067 return float(val)
1068 except ValueError:
1069 raise AirflowConfigException(
1070 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. '
1071 f'Current value: "{val}".'
1072 )
1073
1074 def getlist(self, section: str, key: str, delimiter=",", **kwargs):
1075 """Get config value as list."""
1076 val = self.get(section, key, **kwargs)
1077
1078 if isinstance(val, list) or val is None:
1079 # `get` will always return a (possibly-empty) string, so the only way we can
1080 # have these types is with `fallback=` was specified. So just return it.
1081 return val
1082
1083 if val == "":
1084 return []
1085
1086 try:
1087 return [item.strip() for item in val.split(delimiter)]
1088 except Exception:
1089 raise AirflowConfigException(
1090 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. '
1091 f'Current value: "{val}".'
1092 )
1093
1094 E = TypeVar("E", bound=Enum)
1095
1096 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E:
1097 """Get config value as enum."""
1098 val = self.get(section, key, **kwargs)
1099 enum_names = [enum_item.name for enum_item in enum_class]
1100
1101 if val is None:
1102 raise AirflowConfigException(
1103 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1104 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}'
1105 )
1106
1107 try:
1108 return enum_class[val]
1109 except KeyError:
1110 if "fallback" in kwargs and kwargs["fallback"] in enum_names:
1111 return enum_class[kwargs["fallback"]]
1112 raise AirflowConfigException(
1113 f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1114 f"the value must be one of {', '.join(enum_names)}"
1115 )
1116
1117 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]:
1118 """Get config value as list of enums."""
1119 kwargs.setdefault("fallback", [])
1120 string_list = self.getlist(section, key, delimiter, **kwargs)
1121
1122 enum_names = [enum_item.name for enum_item in enum_class]
1123 enum_list = []
1124
1125 for val in string_list:
1126 try:
1127 enum_list.append(enum_class[val])
1128 except KeyError:
1129 log.warning(
1130 "Failed to convert value %r. Please check %s key in %s section. "
1131 "it must be one of %s, if not the value is ignored",
1132 val,
1133 key,
1134 section,
1135 ", ".join(enum_names),
1136 )
1137
1138 return enum_list
1139
1140 def getimport(self, section: str, key: str, **kwargs) -> Any:
1141 """
1142 Read options, import the full qualified name, and return the object.
1143
1144 In case of failure, it throws an exception with the key and section names
1145
1146 :return: The object or None, if the option is empty
1147 """
1148 # Fixed: use self.get() instead of conf.get()
1149 full_qualified_path = self.get(section=section, key=key, **kwargs)
1150 if not full_qualified_path:
1151 return None
1152
1153 try:
1154 # Import here to avoid circular dependency
1155 from ..module_loading import import_string
1156
1157 return import_string(full_qualified_path)
1158 except ImportError as e:
1159 log.warning(e)
1160 raise AirflowConfigException(
1161 f'The object could not be loaded. Please check "{key}" key in "{section}" section. '
1162 f'Current value: "{full_qualified_path}".'
1163 )
1164
1165 def getjson(
1166 self, section: str, key: str, fallback=None, **kwargs
1167 ) -> dict | list | str | int | float | None:
1168 """
1169 Return a config value parsed from a JSON string.
1170
1171 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given.
1172 """
1173 try:
1174 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs)
1175 except (NoSectionError, NoOptionError):
1176 data = None
1177
1178 if data is None or data == "":
1179 return fallback
1180
1181 try:
1182 return json.loads(data)
1183 except JSONDecodeError as e:
1184 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e
1185
1186 def gettimedelta(
1187 self, section: str, key: str, fallback: Any = None, **kwargs
1188 ) -> datetime.timedelta | None:
1189 """
1190 Get the config value for the given section and key, and convert it into datetime.timedelta object.
1191
1192 If the key is missing, then it is considered as `None`.
1193
1194 :param section: the section from the config
1195 :param key: the key defined in the given section
1196 :param fallback: fallback value when no config value is given, defaults to None
1197 :raises AirflowConfigException: raised because ValueError or OverflowError
1198 :return: datetime.timedelta(seconds=<config_value>) or None
1199 """
1200 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs)
1201
1202 if val:
1203 # the given value must be convertible to integer
1204 try:
1205 int_val = int(val)
1206 except ValueError:
1207 raise AirflowConfigException(
1208 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1209 f'Current value: "{val}".'
1210 )
1211
1212 try:
1213 return datetime.timedelta(seconds=int_val)
1214 except OverflowError as err:
1215 raise AirflowConfigException(
1216 f"Failed to convert value to timedelta in `seconds`. "
1217 f"{err}. "
1218 f'Please check "{key}" key in "{section}" section. Current value: "{val}".'
1219 )
1220
1221 return fallback
1222
1223 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
1224 """Get mandatory config value, raising ValueError if not found."""
1225 value = self.get(section, key, _extra_stacklevel=1, **kwargs)
1226 if value is None:
1227 raise ValueError(f"The value {section}/{key} should be set!")
1228 return value
1229
1230 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]:
1231 """Get mandatory config value as list, raising ValueError if not found."""
1232 value = self.getlist(section, key, **kwargs)
1233 if value is None:
1234 raise ValueError(f"The value {section}/{key} should be set!")
1235 return value
1236
1237 def read(
1238 self,
1239 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike],
1240 encoding: str | None = None,
1241 ) -> list[str]:
1242 return super().read(filenames=filenames, encoding=encoding)
1243
1244 def read_dict( # type: ignore[override]
1245 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>"
1246 ) -> None:
1247 """
1248 We define a different signature here to add better type hints and checking.
1249
1250 :param dictionary: dictionary to read from
1251 :param source: source to be used to store the configuration
1252 :return:
1253 """
1254 super().read_dict(dictionary=dictionary, source=source)
1255
1256 def get_sections_including_defaults(self) -> list[str]:
1257 """
1258 Retrieve all sections from the configuration parser, including sections defined by built-in defaults.
1259
1260 :return: list of section names
1261 """
1262 sections_from_config = self.sections()
1263 sections_from_description = list(self.configuration_description.keys())
1264 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config)))
1265
1266 def get_options_including_defaults(self, section: str) -> list[str]:
1267 """
1268 Retrieve all possible options from the configuration parser for the section given.
1269
1270 Includes options defined by built-in defaults.
1271
1272 :param section: section name
1273 :return: list of option names for the section given
1274 """
1275 my_own_options = self.options(section) if self.has_section(section) else []
1276 all_options_from_defaults = list(
1277 self.configuration_description.get(section, {}).get("options", {}).keys()
1278 )
1279 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options)))
1280
1281 def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True, **kwargs) -> bool:
1282 """
1283 Check if option is defined.
1284
1285 Uses self.get() to avoid reimplementing the priority order of config variables
1286 (env, config, cmd, defaults).
1287
1288 :param section: section to get option from
1289 :param option: option to get
1290 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections
1291 :param kwargs: additional keyword arguments to pass to get(), such as team_name
1292 :return:
1293 """
1294 try:
1295 value = self.get(
1296 section,
1297 option,
1298 fallback=None,
1299 _extra_stacklevel=1,
1300 suppress_warnings=True,
1301 lookup_from_deprecated=lookup_from_deprecated,
1302 **kwargs,
1303 )
1304 if value is None:
1305 return False
1306 return True
1307 except (NoOptionError, NoSectionError, AirflowConfigException):
1308 return False
1309
1310 def set(self, section: str, option: str, value: str | None = None) -> None:
1311 """
1312 Set an option to the given value.
1313
1314 This override just makes sure the section and option are lower case, to match what we do in `get`.
1315 """
1316 section = section.lower()
1317 option = option.lower()
1318 defaults = self.configuration_description or {}
1319 if not self.has_section(section) and section in defaults:
1320 # Trying to set a key in a section that exists in default, but not in the user config;
1321 # automatically create it
1322 self.add_section(section)
1323 super().set(section, option, value)
1324
1325 def remove_option(self, section: str, option: str, remove_default: bool = True):
1326 """
1327 Remove an option if it exists in config from a file or default config.
1328
1329 If both of config have the same option, this removes the option
1330 in both configs unless remove_default=False.
1331 """
1332 section = section.lower()
1333 option = option.lower()
1334 if super().has_option(section, option):
1335 super().remove_option(section, option)
1336
1337 if self.get_default_value(section, option) is not None and remove_default:
1338 self._default_values.remove_option(section, option)
1339
1340 def optionxform(self, optionstr: str) -> str:
1341 """
1342 Transform option names on every read, get, or set operation.
1343
1344 This changes from the default behaviour of ConfigParser from lower-casing
1345 to instead be case-preserving.
1346
1347 :param optionstr:
1348 :return:
1349 """
1350 return optionstr
1351
1352 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]:
1353 """
1354 Get list of config sources to use in as_dict().
1355
1356 Subclasses can override to add additional sources (e.g., provider configs).
1357 """
1358 return [
1359 ("default", self._default_values),
1360 ("airflow.cfg", self),
1361 ]
1362
1363 def as_dict(
1364 self,
1365 display_source: bool = False,
1366 display_sensitive: bool = False,
1367 raw: bool = False,
1368 include_env: bool = True,
1369 include_cmds: bool = True,
1370 include_secret: bool = True,
1371 ) -> ConfigSourcesType:
1372 """
1373 Return the current configuration as an OrderedDict of OrderedDicts.
1374
1375 When materializing current configuration Airflow defaults are
1376 materialized along with user set configs. If any of the `include_*`
1377 options are False then the result of calling command or secret key
1378 configs do not override Airflow defaults and instead are passed through.
1379 In order to then avoid Airflow defaults from overwriting user set
1380 command or secret key configs we filter out bare sensitive_config_values
1381 that are set to Airflow defaults when command or secret key configs
1382 produce different values.
1383
1384 :param display_source: If False, the option value is returned. If True,
1385 a tuple of (option_value, source) is returned. Source is either
1386 'airflow.cfg', 'default', 'env var', or 'cmd'.
1387 :param display_sensitive: If True, the values of options set by env
1388 vars and bash commands will be displayed. If False, those options
1389 are shown as '< hidden >'
1390 :param raw: Should the values be output as interpolated values, or the
1391 "raw" form that can be fed back in to ConfigParser
1392 :param include_env: Should the value of configuration from AIRFLOW__
1393 environment variables be included or not
1394 :param include_cmds: Should the result of calling any ``*_cmd`` config be
1395 set (True, default), or should the _cmd options be left as the
1396 command to run (False)
1397 :param include_secret: Should the result of calling any ``*_secret`` config be
1398 set (True, default), or should the _secret options be left as the
1399 path to get the secret from (False)
1400 :return: Dictionary, where the key is the name of the section and the content is
1401 the dictionary with the name of the parameter and its value.
1402 """
1403 if not display_sensitive:
1404 # We want to hide the sensitive values at the appropriate methods
1405 # since envs from cmds, secrets can be read at _include_envs method
1406 if not all([include_env, include_cmds, include_secret]):
1407 raise ValueError(
1408 "If display_sensitive is false, then include_env, "
1409 "include_cmds, include_secret must all be set as True"
1410 )
1411
1412 config_sources: ConfigSourcesType = {}
1413
1414 # We check sequentially all those sources and the last one we saw it in will "win"
1415 configs = self._get_config_sources_for_as_dict()
1416
1417 self._replace_config_with_display_sources(
1418 config_sources,
1419 configs,
1420 self.configuration_description,
1421 display_source,
1422 raw,
1423 self.deprecated_options,
1424 include_cmds=include_cmds,
1425 include_env=include_env,
1426 include_secret=include_secret,
1427 )
1428
1429 # add env vars and overwrite because they have priority
1430 if include_env:
1431 self._include_envs(config_sources, display_sensitive, display_source, raw)
1432 else:
1433 self._filter_by_source(config_sources, display_source, self._get_env_var_option)
1434
1435 # add bash commands
1436 if include_cmds:
1437 self._include_commands(config_sources, display_sensitive, display_source, raw)
1438 else:
1439 self._filter_by_source(config_sources, display_source, self._get_cmd_option)
1440
1441 # add config from secret backends
1442 if include_secret:
1443 self._include_secrets(config_sources, display_sensitive, display_source, raw)
1444 else:
1445 self._filter_by_source(config_sources, display_source, self._get_secret_option)
1446
1447 if not display_sensitive:
1448 # This ensures the ones from config file is hidden too
1449 # if they are not provided through env, cmd and secret
1450 hidden = "< hidden >"
1451 for section, key in self.sensitive_config_values:
1452 if config_sources.get(section):
1453 if config_sources[section].get(key, None):
1454 if display_source:
1455 source = config_sources[section][key][1]
1456 config_sources[section][key] = (hidden, source)
1457 else:
1458 config_sources[section][key] = hidden
1459
1460 return config_sources
1461
1462 def _write_option_header(
1463 self,
1464 file: IO[str],
1465 option: str,
1466 extra_spacing: bool,
1467 include_descriptions: bool,
1468 include_env_vars: bool,
1469 include_examples: bool,
1470 include_sources: bool,
1471 section_config_description: dict[str, dict[str, Any]],
1472 section_to_write: str,
1473 sources_dict: ConfigSourcesType,
1474 ) -> tuple[bool, bool]:
1475 """
1476 Write header for configuration option.
1477
1478 Returns tuple of (should_continue, needs_separation) where needs_separation should be
1479 set if the option needs additional separation to visually separate it from the next option.
1480 """
1481 option_config_description = (
1482 section_config_description.get("options", {}).get(option, {})
1483 if section_config_description
1484 else {}
1485 )
1486 description = option_config_description.get("description")
1487 needs_separation = False
1488 if description and include_descriptions:
1489 for line in description.splitlines():
1490 file.write(f"# {line}\n")
1491 needs_separation = True
1492 example = option_config_description.get("example")
1493 if example is not None and include_examples:
1494 if extra_spacing:
1495 file.write("#\n")
1496 example_lines = example.splitlines()
1497 example = "\n# ".join(example_lines)
1498 file.write(f"# Example: {option} = {example}\n")
1499 needs_separation = True
1500 if include_sources and sources_dict:
1501 sources_section = sources_dict.get(section_to_write)
1502 value_with_source = sources_section.get(option) if sources_section else None
1503 if value_with_source is None:
1504 file.write("#\n# Source: not defined\n")
1505 else:
1506 file.write(f"#\n# Source: {value_with_source[1]}\n")
1507 needs_separation = True
1508 if include_env_vars:
1509 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n")
1510 if extra_spacing:
1511 file.write("#\n")
1512 needs_separation = True
1513 return True, needs_separation
1514
1515 def is_template(self, section: str, key) -> bool:
1516 """
1517 Return whether the value is templated.
1518
1519 :param section: section of the config
1520 :param key: key in the section
1521 :return: True if the value is templated
1522 """
1523 return _is_template(self.configuration_description, section, key)
1524
1525 def getsection(self, section: str, team_name: str | None = None) -> ConfigOptionsDictType | None:
1526 """
1527 Return the section as a dict.
1528
1529 Values are converted to int, float, bool as required.
1530
1531 :param section: section from the config
1532 :param team_name: optional team name for team-specific configuration lookup
1533 """
1534 # Handle team-specific section lookup for config file
1535 config_section = f"{team_name}={section}" if team_name else section
1536
1537 if not self.has_section(config_section) and not self._default_values.has_section(config_section):
1538 return None
1539 if self._default_values.has_section(config_section):
1540 _section: ConfigOptionsDictType = dict(self._default_values.items(config_section))
1541 else:
1542 _section = {}
1543
1544 if self.has_section(config_section):
1545 _section.update(self.items(config_section))
1546
1547 # Use section (not config_section) for env var lookup - team_name is handled by _env_var_name
1548 section_prefix = self._env_var_name(section, "", team_name=team_name)
1549 for env_var in sorted(os.environ.keys()):
1550 if env_var.startswith(section_prefix):
1551 key = env_var.replace(section_prefix, "")
1552 if key.endswith("_CMD"):
1553 key = key[:-4]
1554 key = key.lower()
1555 _section[key] = self._get_env_var_option(section, key, team_name=team_name)
1556
1557 for key, val in _section.items():
1558 if val is None:
1559 raise AirflowConfigException(
1560 f"Failed to convert value automatically. "
1561 f'Please check "{key}" key in "{section}" section is set.'
1562 )
1563 try:
1564 _section[key] = int(val)
1565 except ValueError:
1566 try:
1567 _section[key] = float(val)
1568 except ValueError:
1569 if isinstance(val, str) and val.lower() in ("t", "true"):
1570 _section[key] = True
1571 elif isinstance(val, str) and val.lower() in ("f", "false"):
1572 _section[key] = False
1573 return _section
1574
1575 @staticmethod
1576 def _write_section_header(
1577 file: IO[str],
1578 include_descriptions: bool,
1579 section_config_description: dict[str, str],
1580 section_to_write: str,
1581 ) -> None:
1582 """Write header for configuration section."""
1583 file.write(f"[{section_to_write}]\n")
1584 section_description = section_config_description.get("description")
1585 if section_description and include_descriptions:
1586 for line in section_description.splitlines():
1587 file.write(f"# {line}\n")
1588 file.write("\n")
1589
1590 def _write_value(
1591 self,
1592 file: IO[str],
1593 option: str,
1594 comment_out_everything: bool,
1595 needs_separation: bool,
1596 only_defaults: bool,
1597 section_to_write: str,
1598 hide_sensitive: bool,
1599 is_sensitive: bool,
1600 show_values: bool = False,
1601 ):
1602 default_value = self.get_default_value(section_to_write, option, raw=True)
1603 if only_defaults:
1604 value = default_value
1605 else:
1606 value = self.get(section_to_write, option, fallback=default_value, raw=True)
1607 if not show_values:
1608 file.write(f"# {option} = \n")
1609 else:
1610 if hide_sensitive and is_sensitive:
1611 value = "< hidden >"
1612 else:
1613 pass
1614 if value is None:
1615 file.write(f"# {option} = \n")
1616 else:
1617 if comment_out_everything:
1618 value_lines = value.splitlines()
1619 value = "\n# ".join(value_lines)
1620 file.write(f"# {option} = {value}\n")
1621 else:
1622 if "\n" in value:
1623 try:
1624 value = json.dumps(json.loads(value), indent=4)
1625 value = value.replace(
1626 "\n", "\n "
1627 ) # indent multi-line JSON to satisfy configparser format
1628 except JSONDecodeError:
1629 pass
1630 file.write(f"{option} = {value}\n")
1631 if needs_separation:
1632 file.write("\n")
1633
1634 def write( # type: ignore[override]
1635 self,
1636 file: IO[str],
1637 section: str | None = None,
1638 include_examples: bool = True,
1639 include_descriptions: bool = True,
1640 include_sources: bool = True,
1641 include_env_vars: bool = True,
1642 include_providers: bool = True,
1643 comment_out_everything: bool = False,
1644 hide_sensitive: bool = False,
1645 extra_spacing: bool = True,
1646 only_defaults: bool = False,
1647 show_values: bool = False,
1648 **kwargs: Any,
1649 ) -> None:
1650 """
1651 Write configuration with comments and examples to a file.
1652
1653 :param file: file to write to
1654 :param section: section of the config to write, defaults to all sections
1655 :param include_examples: Include examples in the output
1656 :param include_descriptions: Include descriptions in the output
1657 :param include_sources: Include the source of each config option
1658 :param include_env_vars: Include environment variables corresponding to each config option
1659 :param include_providers: Include providers configuration
1660 :param comment_out_everything: Comment out all values
1661 :param hide_sensitive_values: Include sensitive values in the output
1662 :param extra_spacing: Add extra spacing before examples and after variables
1663 :param only_defaults: Only include default values when writing the config, not the actual values
1664 """
1665 sources_dict = {}
1666 if include_sources:
1667 sources_dict = self.as_dict(display_source=True)
1668 with self.make_sure_configuration_loaded(with_providers=include_providers):
1669 for section_to_write in self.get_sections_including_defaults():
1670 section_config_description = self.configuration_description.get(section_to_write, {})
1671 if section_to_write != section and section is not None:
1672 continue
1673 if self._default_values.has_section(section_to_write) or self.has_section(section_to_write):
1674 self._write_section_header(
1675 file, include_descriptions, section_config_description, section_to_write
1676 )
1677 for option in self.get_options_including_defaults(section_to_write):
1678 should_continue, needs_separation = self._write_option_header(
1679 file=file,
1680 option=option,
1681 extra_spacing=extra_spacing,
1682 include_descriptions=include_descriptions,
1683 include_env_vars=include_env_vars,
1684 include_examples=include_examples,
1685 include_sources=include_sources,
1686 section_config_description=section_config_description,
1687 section_to_write=section_to_write,
1688 sources_dict=sources_dict,
1689 )
1690 is_sensitive = (
1691 section_to_write.lower(),
1692 option.lower(),
1693 ) in self.sensitive_config_values
1694 self._write_value(
1695 file=file,
1696 option=option,
1697 comment_out_everything=comment_out_everything,
1698 needs_separation=needs_separation,
1699 only_defaults=only_defaults,
1700 section_to_write=section_to_write,
1701 hide_sensitive=hide_sensitive,
1702 is_sensitive=is_sensitive,
1703 show_values=show_values,
1704 )
1705 if include_descriptions and not needs_separation:
1706 # extra separation between sections in case last option did not need it
1707 file.write("\n")
1708
1709 @contextmanager
1710 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]:
1711 """
1712 Make sure configuration is loaded with or without providers.
1713
1714 This happens regardless if the provider configuration has been loaded before or not.
1715 Restores configuration to the state before entering the context.
1716
1717 :param with_providers: whether providers should be loaded
1718 """
1719 needs_reload = False
1720 if with_providers:
1721 self._ensure_providers_config_loaded()
1722 else:
1723 needs_reload = self._ensure_providers_config_unloaded()
1724 yield
1725 if needs_reload:
1726 self._reload_provider_configs()
1727
1728 def _ensure_providers_config_loaded(self) -> None:
1729 """Ensure providers configurations are loaded."""
1730 raise NotImplementedError("Subclasses must implement _ensure_providers_config_loaded method")
1731
1732 def _ensure_providers_config_unloaded(self) -> bool:
1733 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded."""
1734 raise NotImplementedError("Subclasses must implement _ensure_providers_config_unloaded method")
1735
1736 def _reload_provider_configs(self) -> None:
1737 """Reload providers configuration."""
1738 raise NotImplementedError("Subclasses must implement _reload_provider_configs method")