Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/configuration.py: 41%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import contextlib
20import datetime
21import functools
22import itertools
23import json
24import logging
25import multiprocessing
26import os
27import pathlib
28import shlex
29import stat
30import subprocess
31import sys
32import warnings
33from base64 import b64encode
34from configparser import ConfigParser, NoOptionError, NoSectionError
35from contextlib import contextmanager
36from copy import deepcopy
37from io import StringIO
38from json.decoder import JSONDecodeError
39from typing import IO, TYPE_CHECKING, Any, Dict, Generator, Iterable, Pattern, Set, Tuple, Union
40from urllib.parse import urlsplit
42import re2
43from packaging.version import parse as parse_version
44from typing_extensions import overload
46from airflow.exceptions import AirflowConfigException
47from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH
48from airflow.utils import yaml
49from airflow.utils.empty_set import _get_empty_set_for_configuration
50from airflow.utils.module_loading import import_string
51from airflow.utils.providers_configuration_loader import providers_configuration_loaded
52from airflow.utils.weight_rule import WeightRule
54if TYPE_CHECKING:
55 from airflow.auth.managers.base_auth_manager import BaseAuthManager
56 from airflow.secrets import BaseSecretsBackend
58log = logging.getLogger(__name__)
60# show Airflow's deprecation warnings
61if not sys.warnoptions:
62 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow")
63 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow")
65_SQLITE3_VERSION_PATTERN = re2.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$")
67ConfigType = Union[str, int, float, bool]
68ConfigOptionsDictType = Dict[str, ConfigType]
69ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]]
70ConfigSourcesType = Dict[str, ConfigSectionSourcesType]
72ENV_VAR_PREFIX = "AIRFLOW__"
75def _parse_sqlite_version(s: str) -> tuple[int, ...]:
76 match = _SQLITE3_VERSION_PATTERN.match(s)
77 if match is None:
78 return ()
79 return tuple(int(p) for p in match.group("version").split("."))
82@overload
83def expand_env_var(env_var: None) -> None: ...
86@overload
87def expand_env_var(env_var: str) -> str: ...
90def expand_env_var(env_var: str | None) -> str | None:
91 """
92 Expand (potentially nested) env vars.
94 Repeat and apply `expandvars` and `expanduser` until
95 interpolation stops having any effect.
96 """
97 if not env_var:
98 return env_var
99 while True:
100 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
101 if interpolated == env_var:
102 return interpolated
103 else:
104 env_var = interpolated
107def run_command(command: str) -> str:
108 """Run command and returns stdout."""
109 process = subprocess.Popen(
110 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
111 )
112 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
114 if process.returncode != 0:
115 raise AirflowConfigException(
116 f"Cannot execute {command}. Error code is: {process.returncode}. "
117 f"Output: {output}, Stderr: {stderr}"
118 )
120 return output
123def _get_config_value_from_secret_backend(config_key: str) -> str | None:
124 """Get Config option values from Secret Backend."""
125 try:
126 secrets_client = get_custom_secret_backend()
127 if not secrets_client:
128 return None
129 return secrets_client.get_config(config_key)
130 except Exception as e:
131 raise AirflowConfigException(
132 "Cannot retrieve config from alternative secrets backend. "
133 "Make sure it is configured properly and that the Backend "
134 "is accessible.\n"
135 f"{e}"
136 )
139def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool:
140 """
141 Check if the config is a template.
143 :param configuration_description: description of configuration
144 :param section: section
145 :param key: key
146 :return: True if the config is a template
147 """
148 return configuration_description.get(section, {}).get(key, {}).get("is_template", False)
151def _default_config_file_path(file_name: str) -> str:
152 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates")
153 return os.path.join(templates_dir, file_name)
156def retrieve_configuration_description(
157 include_airflow: bool = True,
158 include_providers: bool = True,
159 selected_provider: str | None = None,
160) -> dict[str, dict[str, Any]]:
161 """
162 Read Airflow configuration description from YAML file.
164 :param include_airflow: Include Airflow configs
165 :param include_providers: Include provider configs
166 :param selected_provider: If specified, include selected provider only
167 :return: Python dictionary containing configs & their info
168 """
169 base_configuration_description: dict[str, dict[str, Any]] = {}
170 if include_airflow:
171 with open(_default_config_file_path("config.yml")) as config_file:
172 base_configuration_description.update(yaml.safe_load(config_file))
173 if include_providers:
174 from airflow.providers_manager import ProvidersManager
176 for provider, config in ProvidersManager().provider_configs:
177 if not selected_provider or provider == selected_provider:
178 base_configuration_description.update(config)
179 return base_configuration_description
182class AirflowConfigParser(ConfigParser):
183 """
184 Custom Airflow Configparser supporting defaults and deprecated options.
186 This is a subclass of ConfigParser that supports defaults and deprecated options.
188 The defaults are stored in the ``_default_values ConfigParser. The configuration description keeps
189 description of all the options available in Airflow (description follow config.yaml.schema).
191 :param default_config: default configuration (in the form of ini file).
192 :param configuration_description: description of configuration to use
193 """
195 def __init__(
196 self,
197 default_config: str | None = None,
198 *args,
199 **kwargs,
200 ):
201 super().__init__(*args, **kwargs)
202 self.configuration_description = retrieve_configuration_description(include_providers=False)
203 self.upgraded_values = {}
204 # For those who would like to use a different data structure to keep defaults:
205 # We have to keep the default values in a ConfigParser rather than in any other
206 # data structure, because the values we have might contain %% which are ConfigParser
207 # interpolation placeholders. The _default_values config parser will interpolate them
208 # properly when we call get() on it.
209 self._default_values = create_default_config_parser(self.configuration_description)
210 self._pre_2_7_default_values = create_pre_2_7_defaults()
211 if default_config is not None:
212 self._update_defaults_from_string(default_config)
213 self._update_logging_deprecated_template_to_one_from_defaults()
214 self.is_validated = False
215 self._suppress_future_warnings = False
216 self._providers_configuration_loaded = False
218 def _update_logging_deprecated_template_to_one_from_defaults(self):
219 default = self.get_default_value("logging", "log_filename_template")
220 if default:
221 # Tuple does not support item assignment, so we have to create a new tuple and replace it
222 original_replacement = self.deprecated_values["logging"]["log_filename_template"]
223 self.deprecated_values["logging"]["log_filename_template"] = (
224 original_replacement[0],
225 default,
226 original_replacement[2],
227 )
229 def is_template(self, section: str, key) -> bool:
230 """
231 Return whether the value is templated.
233 :param section: section of the config
234 :param key: key in the section
235 :return: True if the value is templated
236 """
237 if self.configuration_description is None:
238 return False
239 return _is_template(self.configuration_description, section, key)
241 def _update_defaults_from_string(self, config_string: str):
242 """
243 Update the defaults in _default_values based on values in config_string ("ini" format).
245 Note that those values are not validated and cannot contain variables because we are using
246 regular config parser to load them. This method is used to test the config parser in unit tests.
248 :param config_string: ini-formatted config string
249 """
250 parser = ConfigParser()
251 parser.read_string(config_string)
252 for section in parser.sections():
253 if section not in self._default_values.sections():
254 self._default_values.add_section(section)
255 errors = False
256 for key, value in parser.items(section):
257 if not self.is_template(section, key) and "{" in value:
258 errors = True
259 log.error(
260 "The %s.%s value %s read from string contains variable. This is not supported",
261 section,
262 key,
263 value,
264 )
265 self._default_values.set(section, key, value)
266 if errors:
267 raise AirflowConfigException(
268 f"The string config passed as default contains variables. "
269 f"This is not supported. String config: {config_string}"
270 )
272 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any:
273 """
274 Retrieve default value from default config parser.
276 This will retrieve the default value from the default config parser. Optionally a raw, stored
277 value can be retrieved by setting skip_interpolation to True. This is useful for example when
278 we want to write the default value to a file, and we don't want the interpolation to happen
279 as it is going to be done later when the config is read.
281 :param section: section of the config
282 :param key: key to use
283 :param fallback: fallback value to use
284 :param raw: if raw, then interpolation will be reversed
285 :param kwargs: other args
286 :return:
287 """
288 value = self._default_values.get(section, key, fallback=fallback, **kwargs)
289 if raw and value is not None:
290 return value.replace("%", "%%")
291 return value
293 def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> Any:
294 """Get pre 2.7 default config values."""
295 return self._pre_2_7_default_values.get(section, key, fallback=None, **kwargs)
297 # These configuration elements can be fetched as the stdout of commands
298 # following the "{section}__{name}_cmd" pattern, the idea behind this
299 # is to not store password on boxes in text files.
300 # These configs can also be fetched from Secrets backend
301 # following the "{section}__{name}__secret" pattern
302 @functools.cached_property
303 def sensitive_config_values(self) -> Set[tuple[str, str]]: # noqa: UP006
304 if self.configuration_description is None:
305 return (
306 _get_empty_set_for_configuration()
307 ) # we can't use set() here because set is defined below # ¯\_(ツ)_/¯
308 flattened = {
309 (s, k): item
310 for s, s_c in self.configuration_description.items()
311 for k, item in s_c.get("options").items() # type: ignore[union-attr]
312 }
313 sensitive = {
314 (section.lower(), key.lower())
315 for (section, key), v in flattened.items()
316 if v.get("sensitive") is True
317 }
318 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options}
319 depr_section = {
320 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections
321 }
322 sensitive.update(depr_section, depr_option)
323 return sensitive
325 # A mapping of (new section, new option) -> (old section, old option, since_version).
326 # When reading new option, the old option will be checked to see if it exists. If it does a
327 # DeprecationWarning will be issued and the old option will be used instead
328 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
329 ("celery", "worker_precheck"): ("core", "worker_precheck", "2.0.0"),
330 ("logging", "interleave_timestamp_parser"): ("core", "interleave_timestamp_parser", "2.6.1"),
331 ("logging", "base_log_folder"): ("core", "base_log_folder", "2.0.0"),
332 ("logging", "remote_logging"): ("core", "remote_logging", "2.0.0"),
333 ("logging", "remote_log_conn_id"): ("core", "remote_log_conn_id", "2.0.0"),
334 ("logging", "remote_base_log_folder"): ("core", "remote_base_log_folder", "2.0.0"),
335 ("logging", "encrypt_s3_logs"): ("core", "encrypt_s3_logs", "2.0.0"),
336 ("logging", "logging_level"): ("core", "logging_level", "2.0.0"),
337 ("logging", "fab_logging_level"): ("core", "fab_logging_level", "2.0.0"),
338 ("logging", "logging_config_class"): ("core", "logging_config_class", "2.0.0"),
339 ("logging", "colored_console_log"): ("core", "colored_console_log", "2.0.0"),
340 ("logging", "colored_log_format"): ("core", "colored_log_format", "2.0.0"),
341 ("logging", "colored_formatter_class"): ("core", "colored_formatter_class", "2.0.0"),
342 ("logging", "log_format"): ("core", "log_format", "2.0.0"),
343 ("logging", "simple_log_format"): ("core", "simple_log_format", "2.0.0"),
344 ("logging", "task_log_prefix_template"): ("core", "task_log_prefix_template", "2.0.0"),
345 ("logging", "log_filename_template"): ("core", "log_filename_template", "2.0.0"),
346 ("logging", "log_processor_filename_template"): ("core", "log_processor_filename_template", "2.0.0"),
347 ("logging", "dag_processor_manager_log_location"): (
348 "core",
349 "dag_processor_manager_log_location",
350 "2.0.0",
351 ),
352 ("logging", "task_log_reader"): ("core", "task_log_reader", "2.0.0"),
353 ("metrics", "metrics_allow_list"): ("metrics", "statsd_allow_list", "2.6.0"),
354 ("metrics", "metrics_block_list"): ("metrics", "statsd_block_list", "2.6.0"),
355 ("metrics", "statsd_on"): ("scheduler", "statsd_on", "2.0.0"),
356 ("metrics", "statsd_host"): ("scheduler", "statsd_host", "2.0.0"),
357 ("metrics", "statsd_port"): ("scheduler", "statsd_port", "2.0.0"),
358 ("metrics", "statsd_prefix"): ("scheduler", "statsd_prefix", "2.0.0"),
359 ("metrics", "statsd_allow_list"): ("scheduler", "statsd_allow_list", "2.0.0"),
360 ("metrics", "stat_name_handler"): ("scheduler", "stat_name_handler", "2.0.0"),
361 ("metrics", "statsd_datadog_enabled"): ("scheduler", "statsd_datadog_enabled", "2.0.0"),
362 ("metrics", "statsd_datadog_tags"): ("scheduler", "statsd_datadog_tags", "2.0.0"),
363 ("metrics", "statsd_datadog_metrics_tags"): ("scheduler", "statsd_datadog_metrics_tags", "2.6.0"),
364 ("metrics", "statsd_custom_client_path"): ("scheduler", "statsd_custom_client_path", "2.0.0"),
365 ("scheduler", "parsing_processes"): ("scheduler", "max_threads", "1.10.14"),
366 ("scheduler", "scheduler_idle_sleep_time"): ("scheduler", "processor_poll_interval", "2.2.0"),
367 ("operators", "default_queue"): ("celery", "default_queue", "2.1.0"),
368 ("core", "hide_sensitive_var_conn_fields"): ("admin", "hide_sensitive_variable_fields", "2.1.0"),
369 ("core", "sensitive_var_conn_names"): ("admin", "sensitive_variable_fields", "2.1.0"),
370 ("core", "default_pool_task_slot_count"): ("core", "non_pooled_task_slot_count", "1.10.4"),
371 ("core", "max_active_tasks_per_dag"): ("core", "dag_concurrency", "2.2.0"),
372 ("logging", "worker_log_server_port"): ("celery", "worker_log_server_port", "2.2.0"),
373 ("api", "access_control_allow_origins"): ("api", "access_control_allow_origin", "2.2.0"),
374 ("api", "auth_backends"): ("api", "auth_backend", "2.3.0"),
375 ("database", "sql_alchemy_conn"): ("core", "sql_alchemy_conn", "2.3.0"),
376 ("database", "sql_engine_encoding"): ("core", "sql_engine_encoding", "2.3.0"),
377 ("database", "sql_engine_collation_for_ids"): ("core", "sql_engine_collation_for_ids", "2.3.0"),
378 ("database", "sql_alchemy_pool_enabled"): ("core", "sql_alchemy_pool_enabled", "2.3.0"),
379 ("database", "sql_alchemy_pool_size"): ("core", "sql_alchemy_pool_size", "2.3.0"),
380 ("database", "sql_alchemy_max_overflow"): ("core", "sql_alchemy_max_overflow", "2.3.0"),
381 ("database", "sql_alchemy_pool_recycle"): ("core", "sql_alchemy_pool_recycle", "2.3.0"),
382 ("database", "sql_alchemy_pool_pre_ping"): ("core", "sql_alchemy_pool_pre_ping", "2.3.0"),
383 ("database", "sql_alchemy_schema"): ("core", "sql_alchemy_schema", "2.3.0"),
384 ("database", "sql_alchemy_connect_args"): ("core", "sql_alchemy_connect_args", "2.3.0"),
385 ("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"),
386 ("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"),
387 ("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"),
388 ("scheduler", "task_queued_timeout_check_interval"): (
389 "kubernetes_executor",
390 "worker_pods_pending_timeout_check_interval",
391 "2.6.0",
392 ),
393 }
395 # A mapping of new configurations to a list of old configurations for when one configuration
396 # deprecates more than one other deprecation. The deprecation logic for these configurations
397 # is defined in SchedulerJobRunner.
398 many_to_one_deprecated_options: dict[tuple[str, str], list[tuple[str, str, str]]] = {
399 ("scheduler", "task_queued_timeout"): [
400 ("celery", "stalled_task_timeout", "2.6.0"),
401 ("celery", "task_adoption_timeout", "2.6.0"),
402 ("kubernetes_executor", "worker_pods_pending_timeout", "2.6.0"),
403 ]
404 }
406 # A mapping of new section -> (old section, since_version).
407 deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")}
409 # Now build the inverse so we can go from old_section/old_key to new_section/new_key
410 # if someone tries to retrieve it based on old_section/old_key
411 @functools.cached_property
412 def inversed_deprecated_options(self):
413 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()}
415 @functools.cached_property
416 def inversed_deprecated_sections(self):
417 return {
418 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items()
419 }
421 # A mapping of old default values that we want to change and warn the user
422 # about. Mapping of section -> setting -> { old, replace, by_version }
423 deprecated_values: dict[str, dict[str, tuple[Pattern, str, str]]] = {
424 "core": {
425 "hostname_callable": (re2.compile(r":"), r".", "2.1"),
426 },
427 "webserver": {
428 "navbar_color": (re2.compile(r"(?i)\A#007A87\z"), "#fff", "2.1"),
429 "dag_default_view": (re2.compile(r"^tree$"), "grid", "3.0"),
430 },
431 "email": {
432 "email_backend": (
433 re2.compile(r"^airflow\.contrib\.utils\.sendgrid\.send_email$"),
434 r"airflow.providers.sendgrid.utils.emailer.send_email",
435 "2.1",
436 ),
437 },
438 "logging": {
439 "log_filename_template": (
440 re2.compile(re2.escape("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")),
441 # The actual replacement value will be updated after defaults are loaded from config.yml
442 "XX-set-after-default-config-loaded-XX",
443 "3.0",
444 ),
445 },
446 "api": {
447 "auth_backends": (
448 re2.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"),
449 "airflow.api.auth.backend.session",
450 "3.0",
451 ),
452 },
453 "elasticsearch": {
454 "log_id_template": (
455 re2.compile("^" + re2.escape("{dag_id}-{task_id}-{execution_date}-{try_number}") + "$"),
456 "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
457 "3.0",
458 )
459 },
460 }
462 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"]
463 enums_options = {
464 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()),
465 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"],
466 ("core", "mp_start_method"): multiprocessing.get_all_start_methods(),
467 ("scheduler", "file_parsing_sort_mode"): ["modified_time", "random_seeded_by_host", "alphabetical"],
468 ("logging", "logging_level"): _available_logging_levels,
469 ("logging", "fab_logging_level"): _available_logging_levels,
470 # celery_logging_level can be empty, which uses logging_level as fallback
471 ("logging", "celery_logging_level"): [*_available_logging_levels, ""],
472 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""],
473 }
475 upgraded_values: dict[tuple[str, str], str]
476 """Mapping of (section,option) to the old value that was upgraded"""
478 def get_sections_including_defaults(self) -> list[str]:
479 """
480 Retrieve all sections from the configuration parser, including sections defined by built-in defaults.
482 :return: list of section names
483 """
484 return list(dict.fromkeys(itertools.chain(self.configuration_description, self.sections())))
486 def get_options_including_defaults(self, section: str) -> list[str]:
487 """
488 Retrieve all possible option from the configuration parser for the section given.
490 Includes options defined by built-in defaults.
492 :return: list of option names for the section given
493 """
494 my_own_options = self.options(section) if self.has_section(section) else []
495 all_options_from_defaults = self.configuration_description.get(section, {}).get("options", {})
496 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options)))
498 def optionxform(self, optionstr: str) -> str:
499 """
500 Transform option names on every read, get, or set operation.
502 This changes from the default behaviour of ConfigParser from lower-casing
503 to instead be case-preserving.
505 :param optionstr:
506 :return:
507 """
508 return optionstr
510 @contextmanager
511 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]:
512 """
513 Make sure configuration is loaded with or without providers.
515 This happens regardless if the provider configuration has been loaded before or not.
516 Restores configuration to the state before entering the context.
518 :param with_providers: whether providers should be loaded
519 """
520 reload_providers_when_leaving = False
521 if with_providers and not self._providers_configuration_loaded:
522 # make sure providers are initialized
523 from airflow.providers_manager import ProvidersManager
525 # run internal method to initialize providers configuration in ordered to not trigger the
526 # initialize_providers_configuration cache (because we will be unloading it now
527 ProvidersManager()._initialize_providers_configuration()
528 elif not with_providers and self._providers_configuration_loaded:
529 reload_providers_when_leaving = True
530 self.restore_core_default_configuration()
531 yield
532 if reload_providers_when_leaving:
533 self.load_providers_configuration()
535 @staticmethod
536 def _write_section_header(
537 file: IO[str],
538 include_descriptions: bool,
539 section_config_description: dict[str, str],
540 section_to_write: str,
541 ) -> None:
542 """Write header for configuration section."""
543 file.write(f"[{section_to_write}]\n")
544 section_description = section_config_description.get("description")
545 if section_description and include_descriptions:
546 for line in section_description.splitlines():
547 file.write(f"# {line}\n")
548 file.write("\n")
550 def _write_option_header(
551 self,
552 file: IO[str],
553 option: str,
554 extra_spacing: bool,
555 include_descriptions: bool,
556 include_env_vars: bool,
557 include_examples: bool,
558 include_sources: bool,
559 section_config_description: dict[str, dict[str, Any]],
560 section_to_write: str,
561 sources_dict: ConfigSourcesType,
562 ) -> tuple[bool, bool]:
563 """
564 Write header for configuration option.
566 Returns tuple of (should_continue, needs_separation) where needs_separation should be
567 set if the option needs additional separation to visually separate it from the next option.
568 """
569 from airflow import __version__ as airflow_version
571 option_config_description = (
572 section_config_description.get("options", {}).get(option, {})
573 if section_config_description
574 else {}
575 )
576 version_added = option_config_description.get("version_added")
577 if version_added is not None and parse_version(version_added) > parse_version(
578 parse_version(airflow_version).base_version
579 ):
580 # skip if option is going to be added in the future version
581 return False, False
582 description = option_config_description.get("description")
583 needs_separation = False
584 if description and include_descriptions:
585 for line in description.splitlines():
586 file.write(f"# {line}\n")
587 needs_separation = True
588 example = option_config_description.get("example")
589 if example is not None and include_examples:
590 if extra_spacing:
591 file.write("#\n")
592 file.write(f"# Example: {option} = {example}\n")
593 needs_separation = True
594 if include_sources and sources_dict:
595 sources_section = sources_dict.get(section_to_write)
596 value_with_source = sources_section.get(option) if sources_section else None
597 if value_with_source is None:
598 file.write("#\n# Source: not defined\n")
599 else:
600 file.write(f"#\n# Source: {value_with_source[1]}\n")
601 needs_separation = True
602 if include_env_vars:
603 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n")
604 if extra_spacing:
605 file.write("#\n")
606 needs_separation = True
607 return True, needs_separation
609 def _write_value(
610 self,
611 file: IO[str],
612 option: str,
613 comment_out_everything: bool,
614 needs_separation: bool,
615 only_defaults: bool,
616 section_to_write: str,
617 ):
618 if self._default_values is None:
619 default_value = None
620 else:
621 default_value = self.get_default_value(section_to_write, option, raw=True)
622 if only_defaults:
623 value = default_value
624 else:
625 value = self.get(section_to_write, option, fallback=default_value, raw=True)
626 if value is None:
627 file.write(f"# {option} = \n")
628 else:
629 if comment_out_everything:
630 file.write(f"# {option} = {value}\n")
631 else:
632 file.write(f"{option} = {value}\n")
633 if needs_separation:
634 file.write("\n")
636 def write( # type: ignore[override]
637 self,
638 file: IO[str],
639 section: str | None = None,
640 include_examples: bool = True,
641 include_descriptions: bool = True,
642 include_sources: bool = True,
643 include_env_vars: bool = True,
644 include_providers: bool = True,
645 comment_out_everything: bool = False,
646 hide_sensitive_values: bool = False,
647 extra_spacing: bool = True,
648 only_defaults: bool = False,
649 **kwargs: Any,
650 ) -> None:
651 """
652 Write configuration with comments and examples to a file.
654 :param file: file to write to
655 :param section: section of the config to write, defaults to all sections
656 :param include_examples: Include examples in the output
657 :param include_descriptions: Include descriptions in the output
658 :param include_sources: Include the source of each config option
659 :param include_env_vars: Include environment variables corresponding to each config option
660 :param include_providers: Include providers configuration
661 :param comment_out_everything: Comment out all values
662 :param hide_sensitive_values: Include sensitive values in the output
663 :param extra_spacing: Add extra spacing before examples and after variables
664 :param only_defaults: Only include default values when writing the config, not the actual values
665 """
666 sources_dict = {}
667 if include_sources:
668 sources_dict = self.as_dict(display_source=True)
669 if self._default_values is None:
670 raise RuntimeError("Cannot write default config, no default config set")
671 if self.configuration_description is None:
672 raise RuntimeError("Cannot write default config, no default configuration description set")
673 with self.make_sure_configuration_loaded(with_providers=include_providers):
674 for section_to_write in self.get_sections_including_defaults():
675 section_config_description = self.configuration_description.get(section_to_write, {})
676 if section_to_write != section and section is not None:
677 continue
678 if self._default_values.has_section(section_to_write) or self.has_section(section_to_write):
679 self._write_section_header(
680 file, include_descriptions, section_config_description, section_to_write
681 )
682 for option in self.get_options_including_defaults(section_to_write):
683 should_continue, needs_separation = self._write_option_header(
684 file=file,
685 option=option,
686 extra_spacing=extra_spacing,
687 include_descriptions=include_descriptions,
688 include_env_vars=include_env_vars,
689 include_examples=include_examples,
690 include_sources=include_sources,
691 section_config_description=section_config_description,
692 section_to_write=section_to_write,
693 sources_dict=sources_dict,
694 )
695 self._write_value(
696 file=file,
697 option=option,
698 comment_out_everything=comment_out_everything,
699 needs_separation=needs_separation,
700 only_defaults=only_defaults,
701 section_to_write=section_to_write,
702 )
703 if include_descriptions and not needs_separation:
704 # extra separation between sections in case last option did not need it
705 file.write("\n")
707 def restore_core_default_configuration(self) -> None:
708 """Restore default configuration for core Airflow.
710 It does not restore configuration for providers. If you want to restore configuration for
711 providers, you need to call ``load_providers_configuration`` method.
712 """
713 self.configuration_description = retrieve_configuration_description(include_providers=False)
714 self._default_values = create_default_config_parser(self.configuration_description)
715 self._providers_configuration_loaded = False
717 def validate(self):
718 self._validate_sqlite3_version()
719 self._validate_enums()
721 for section, replacement in self.deprecated_values.items():
722 for name, info in replacement.items():
723 old, new, version = info
724 current_value = self.get(section, name, fallback="")
725 if self._using_old_value(old, current_value):
726 self.upgraded_values[(section, name)] = current_value
727 new_value = old.sub(new, current_value)
728 self._update_env_var(section=section, name=name, new_value=new_value)
729 self._create_future_warning(
730 name=name,
731 section=section,
732 current_value=current_value,
733 new_value=new_value,
734 version=version,
735 )
737 self._upgrade_auth_backends()
738 self._upgrade_postgres_metastore_conn()
739 self.is_validated = True
741 def _upgrade_auth_backends(self):
742 """
743 Ensure a custom auth_backends setting contains session.
745 This is required by the UI for ajax queries.
746 """
747 old_value = self.get("api", "auth_backends", fallback="")
748 if old_value in ("airflow.api.auth.backend.default", ""):
749 # handled by deprecated_values
750 pass
751 elif old_value.find("airflow.api.auth.backend.session") == -1:
752 new_value = old_value + ",airflow.api.auth.backend.session"
753 self._update_env_var(section="api", name="auth_backends", new_value=new_value)
754 self.upgraded_values[("api", "auth_backends")] = old_value
756 # if the old value is set via env var, we need to wipe it
757 # otherwise, it'll "win" over our adjusted value
758 old_env_var = self._env_var_name("api", "auth_backend")
759 os.environ.pop(old_env_var, None)
761 warnings.warn(
762 "The auth_backends setting in [api] has had airflow.api.auth.backend.session added "
763 "in the running config, which is needed by the UI. Please update your config before "
764 "Apache Airflow 3.0.",
765 FutureWarning,
766 stacklevel=1,
767 )
769 def _upgrade_postgres_metastore_conn(self):
770 """
771 Upgrade SQL schemas.
773 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres`
774 must be replaced with `postgresql`.
775 """
776 section, key = "database", "sql_alchemy_conn"
777 old_value = self.get(section, key, _extra_stacklevel=1)
778 bad_schemes = ["postgres+psycopg2", "postgres"]
779 good_scheme = "postgresql"
780 parsed = urlsplit(old_value)
781 if parsed.scheme in bad_schemes:
782 warnings.warn(
783 f"Bad scheme in Airflow configuration core > sql_alchemy_conn: `{parsed.scheme}`. "
784 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must "
785 f"change to `{good_scheme}` before the next Airflow release.",
786 FutureWarning,
787 stacklevel=1,
788 )
789 self.upgraded_values[(section, key)] = old_value
790 new_value = re2.sub("^" + re2.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value)
791 self._update_env_var(section=section, name=key, new_value=new_value)
793 # if the old value is set via env var, we need to wipe it
794 # otherwise, it'll "win" over our adjusted value
795 old_env_var = self._env_var_name("core", key)
796 os.environ.pop(old_env_var, None)
798 def _validate_enums(self):
799 """Validate that enum type config has an accepted value."""
800 for (section_key, option_key), enum_options in self.enums_options.items():
801 if self.has_option(section_key, option_key):
802 value = self.get(section_key, option_key, fallback=None)
803 if value and value not in enum_options:
804 raise AirflowConfigException(
805 f"`[{section_key}] {option_key}` should not be "
806 f"{value!r}. Possible values: {', '.join(enum_options)}."
807 )
809 def _validate_sqlite3_version(self):
810 """Validate SQLite version.
812 Some features in storing rendered fields require SQLite >= 3.15.0.
813 """
814 if "sqlite" not in self.get("database", "sql_alchemy_conn"):
815 return
817 import sqlite3
819 min_sqlite_version = (3, 15, 0)
820 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version:
821 return
823 from airflow.utils.docs import get_docs_url
825 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version)
826 raise AirflowConfigException(
827 f"error: SQLite C library too old (< {min_sqlite_version_str}). "
828 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}"
829 )
831 def _using_old_value(self, old: Pattern, current_value: str) -> bool:
832 return old.search(current_value) is not None
834 def _update_env_var(self, section: str, name: str, new_value: str):
835 env_var = self._env_var_name(section, name)
836 # Set it as an env var so that any subprocesses keep the same override!
837 os.environ[env_var] = new_value
839 @staticmethod
840 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any, version: str):
841 warnings.warn(
842 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. "
843 f"This value has been changed to {new_value!r} in the running config, but "
844 f"please update your config before Apache Airflow {version}.",
845 FutureWarning,
846 stacklevel=3,
847 )
849 def _env_var_name(self, section: str, key: str) -> str:
850 return f"{ENV_VAR_PREFIX}{section.replace('.', '_').upper()}__{key.upper()}"
852 def _get_env_var_option(self, section: str, key: str):
853 # must have format AIRFLOW__{SECTION}__{KEY} (note double underscore)
854 env_var = self._env_var_name(section, key)
855 if env_var in os.environ:
856 return expand_env_var(os.environ[env_var])
857 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
858 env_var_cmd = env_var + "_CMD"
859 if env_var_cmd in os.environ:
860 # if this is a valid command key...
861 if (section, key) in self.sensitive_config_values:
862 return run_command(os.environ[env_var_cmd])
863 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
864 env_var_secret_path = env_var + "_SECRET"
865 if env_var_secret_path in os.environ:
866 # if this is a valid secret path...
867 if (section, key) in self.sensitive_config_values:
868 return _get_config_value_from_secret_backend(os.environ[env_var_secret_path])
869 return None
871 def _get_cmd_option(self, section: str, key: str):
872 fallback_key = key + "_cmd"
873 if (section, key) in self.sensitive_config_values:
874 if super().has_option(section, fallback_key):
875 command = super().get(section, fallback_key)
876 return run_command(command)
877 return None
879 def _get_cmd_option_from_config_sources(
880 self, config_sources: ConfigSourcesType, section: str, key: str
881 ) -> str | None:
882 fallback_key = key + "_cmd"
883 if (section, key) in self.sensitive_config_values:
884 section_dict = config_sources.get(section)
885 if section_dict is not None:
886 command_value = section_dict.get(fallback_key)
887 if command_value is not None:
888 if isinstance(command_value, str):
889 command = command_value
890 else:
891 command = command_value[0]
892 return run_command(command)
893 return None
895 def _get_secret_option(self, section: str, key: str) -> str | None:
896 """Get Config option values from Secret Backend."""
897 fallback_key = key + "_secret"
898 if (section, key) in self.sensitive_config_values:
899 if super().has_option(section, fallback_key):
900 secrets_path = super().get(section, fallback_key)
901 return _get_config_value_from_secret_backend(secrets_path)
902 return None
904 def _get_secret_option_from_config_sources(
905 self, config_sources: ConfigSourcesType, section: str, key: str
906 ) -> str | None:
907 fallback_key = key + "_secret"
908 if (section, key) in self.sensitive_config_values:
909 section_dict = config_sources.get(section)
910 if section_dict is not None:
911 secrets_path_value = section_dict.get(fallback_key)
912 if secrets_path_value is not None:
913 if isinstance(secrets_path_value, str):
914 secrets_path = secrets_path_value
915 else:
916 secrets_path = secrets_path_value[0]
917 return _get_config_value_from_secret_backend(secrets_path)
918 return None
920 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
921 value = self.get(section, key, _extra_stacklevel=1, **kwargs)
922 if value is None:
923 raise ValueError(f"The value {section}/{key} should be set!")
924 return value
926 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]:
927 value = self.getlist(section, key, **kwargs)
928 if value is None:
929 raise ValueError(f"The value {section}/{key} should be set!")
930 return value
932 @overload # type: ignore[override]
933 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ...
935 @overload # type: ignore[override]
936 def get(self, section: str, key: str, **kwargs) -> str | None: ...
938 def get( # type: ignore[override,misc]
939 self,
940 section: str,
941 key: str,
942 suppress_warnings: bool = False,
943 _extra_stacklevel: int = 0,
944 **kwargs,
945 ) -> str | None:
946 section = section.lower()
947 key = key.lower()
948 warning_emitted = False
949 deprecated_section: str | None
950 deprecated_key: str | None
952 option_description = self.configuration_description.get(section, {}).get(key, {})
953 if option_description.get("deprecated"):
954 deprecation_reason = option_description.get("deprecation_reason", "")
955 warnings.warn(
956 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}",
957 DeprecationWarning,
958 stacklevel=2 + _extra_stacklevel,
959 )
960 # For when we rename whole sections
961 if section in self.inversed_deprecated_sections:
962 deprecated_section, deprecated_key = (section, key)
963 section = self.inversed_deprecated_sections[section]
964 if not self._suppress_future_warnings:
965 warnings.warn(
966 f"The config section [{deprecated_section}] has been renamed to "
967 f"[{section}]. Please update your `conf.get*` call to use the new name",
968 FutureWarning,
969 stacklevel=2 + _extra_stacklevel,
970 )
971 # Don't warn about individual rename if the whole section is renamed
972 warning_emitted = True
973 elif (section, key) in self.inversed_deprecated_options:
974 # Handle using deprecated section/key instead of the new section/key
975 new_section, new_key = self.inversed_deprecated_options[(section, key)]
976 if not self._suppress_future_warnings and not warning_emitted:
977 warnings.warn(
978 f"section/key [{section}/{key}] has been deprecated, you should use"
979 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
980 "new name",
981 FutureWarning,
982 stacklevel=2 + _extra_stacklevel,
983 )
984 warning_emitted = True
985 deprecated_section, deprecated_key = section, key
986 section, key = (new_section, new_key)
987 elif section in self.deprecated_sections:
988 # When accessing the new section name, make sure we check under the old config name
989 deprecated_key = key
990 deprecated_section = self.deprecated_sections[section][0]
991 else:
992 deprecated_section, deprecated_key, _ = self.deprecated_options.get(
993 (section, key), (None, None, None)
994 )
995 # first check environment variables
996 option = self._get_environment_variables(
997 deprecated_key,
998 deprecated_section,
999 key,
1000 section,
1001 issue_warning=not warning_emitted,
1002 extra_stacklevel=_extra_stacklevel,
1003 )
1004 if option is not None:
1005 return option
1007 # ...then the config file
1008 option = self._get_option_from_config_file(
1009 deprecated_key,
1010 deprecated_section,
1011 key,
1012 kwargs,
1013 section,
1014 issue_warning=not warning_emitted,
1015 extra_stacklevel=_extra_stacklevel,
1016 )
1017 if option is not None:
1018 return option
1020 # ...then commands
1021 option = self._get_option_from_commands(
1022 deprecated_key,
1023 deprecated_section,
1024 key,
1025 section,
1026 issue_warning=not warning_emitted,
1027 extra_stacklevel=_extra_stacklevel,
1028 )
1029 if option is not None:
1030 return option
1032 # ...then from secret backends
1033 option = self._get_option_from_secrets(
1034 deprecated_key,
1035 deprecated_section,
1036 key,
1037 section,
1038 issue_warning=not warning_emitted,
1039 extra_stacklevel=_extra_stacklevel,
1040 )
1041 if option is not None:
1042 return option
1044 # ...then the default config
1045 if self.get_default_value(section, key) is not None or "fallback" in kwargs:
1046 return expand_env_var(self.get_default_value(section, key, **kwargs))
1048 if self.get_default_pre_2_7_value(section, key) is not None:
1049 # no expansion needed
1050 return self.get_default_pre_2_7_value(section, key, **kwargs)
1052 if not suppress_warnings:
1053 log.warning("section/key [%s/%s] not found in config", section, key)
1055 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
1057 def _get_option_from_secrets(
1058 self,
1059 deprecated_key: str | None,
1060 deprecated_section: str | None,
1061 key: str,
1062 section: str,
1063 issue_warning: bool = True,
1064 extra_stacklevel: int = 0,
1065 ) -> str | None:
1066 option = self._get_secret_option(section, key)
1067 if option:
1068 return option
1069 if deprecated_section and deprecated_key:
1070 with self.suppress_future_warnings():
1071 option = self._get_secret_option(deprecated_section, deprecated_key)
1072 if option:
1073 if issue_warning:
1074 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
1075 return option
1076 return None
1078 def _get_option_from_commands(
1079 self,
1080 deprecated_key: str | None,
1081 deprecated_section: str | None,
1082 key: str,
1083 section: str,
1084 issue_warning: bool = True,
1085 extra_stacklevel: int = 0,
1086 ) -> str | None:
1087 option = self._get_cmd_option(section, key)
1088 if option:
1089 return option
1090 if deprecated_section and deprecated_key:
1091 with self.suppress_future_warnings():
1092 option = self._get_cmd_option(deprecated_section, deprecated_key)
1093 if option:
1094 if issue_warning:
1095 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
1096 return option
1097 return None
1099 def _get_option_from_config_file(
1100 self,
1101 deprecated_key: str | None,
1102 deprecated_section: str | None,
1103 key: str,
1104 kwargs: dict[str, Any],
1105 section: str,
1106 issue_warning: bool = True,
1107 extra_stacklevel: int = 0,
1108 ) -> str | None:
1109 if super().has_option(section, key):
1110 # Use the parent's methods to get the actual config here to be able to
1111 # separate the config from default config.
1112 return expand_env_var(super().get(section, key, **kwargs))
1113 if deprecated_section and deprecated_key:
1114 if super().has_option(deprecated_section, deprecated_key):
1115 if issue_warning:
1116 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
1117 with self.suppress_future_warnings():
1118 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
1119 return None
1121 def _get_environment_variables(
1122 self,
1123 deprecated_key: str | None,
1124 deprecated_section: str | None,
1125 key: str,
1126 section: str,
1127 issue_warning: bool = True,
1128 extra_stacklevel: int = 0,
1129 ) -> str | None:
1130 option = self._get_env_var_option(section, key)
1131 if option is not None:
1132 return option
1133 if deprecated_section and deprecated_key:
1134 with self.suppress_future_warnings():
1135 option = self._get_env_var_option(deprecated_section, deprecated_key)
1136 if option is not None:
1137 if issue_warning:
1138 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
1139 return option
1140 return None
1142 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override]
1143 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip()
1144 if "#" in val:
1145 val = val.split("#")[0].strip()
1146 if val in ("t", "true", "1"):
1147 return True
1148 elif val in ("f", "false", "0"):
1149 return False
1150 else:
1151 raise AirflowConfigException(
1152 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. '
1153 f'Current value: "{val}".'
1154 )
1156 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override]
1157 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1158 if val is None:
1159 raise AirflowConfigException(
1160 f"Failed to convert value None to int. "
1161 f'Please check "{key}" key in "{section}" section is set.'
1162 )
1163 try:
1164 return int(val)
1165 except ValueError:
1166 raise AirflowConfigException(
1167 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1168 f'Current value: "{val}".'
1169 )
1171 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override]
1172 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
1173 if val is None:
1174 raise AirflowConfigException(
1175 f"Failed to convert value None to float. "
1176 f'Please check "{key}" key in "{section}" section is set.'
1177 )
1178 try:
1179 return float(val)
1180 except ValueError:
1181 raise AirflowConfigException(
1182 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. '
1183 f'Current value: "{val}".'
1184 )
1186 def getlist(self, section: str, key: str, delimiter=",", **kwargs):
1187 val = self.get(section, key, **kwargs)
1188 if val is None:
1189 raise AirflowConfigException(
1190 f"Failed to convert value None to list. "
1191 f'Please check "{key}" key in "{section}" section is set.'
1192 )
1193 try:
1194 return [item.strip() for item in val.split(delimiter)]
1195 except Exception:
1196 raise AirflowConfigException(
1197 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. '
1198 f'Current value: "{val}".'
1199 )
1201 def getimport(self, section: str, key: str, **kwargs) -> Any:
1202 """
1203 Read options, import the full qualified name, and return the object.
1205 In case of failure, it throws an exception with the key and section names
1207 :return: The object or None, if the option is empty
1208 """
1209 full_qualified_path = conf.get(section=section, key=key, **kwargs)
1210 if not full_qualified_path:
1211 return None
1213 try:
1214 return import_string(full_qualified_path)
1215 except ImportError as e:
1216 log.warning(e)
1217 raise AirflowConfigException(
1218 f'The object could not be loaded. Please check "{key}" key in "{section}" section. '
1219 f'Current value: "{full_qualified_path}".'
1220 )
1222 def getjson(
1223 self, section: str, key: str, fallback=None, **kwargs
1224 ) -> dict | list | str | int | float | None:
1225 """
1226 Return a config value parsed from a JSON string.
1228 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given.
1229 """
1230 try:
1231 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs)
1232 except (NoSectionError, NoOptionError):
1233 data = None
1235 if data is None or data == "":
1236 return fallback
1238 try:
1239 return json.loads(data)
1240 except JSONDecodeError as e:
1241 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e
1243 def gettimedelta(
1244 self, section: str, key: str, fallback: Any = None, **kwargs
1245 ) -> datetime.timedelta | None:
1246 """
1247 Get the config value for the given section and key, and convert it into datetime.timedelta object.
1249 If the key is missing, then it is considered as `None`.
1251 :param section: the section from the config
1252 :param key: the key defined in the given section
1253 :param fallback: fallback value when no config value is given, defaults to None
1254 :raises AirflowConfigException: raised because ValueError or OverflowError
1255 :return: datetime.timedelta(seconds=<config_value>) or None
1256 """
1257 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs)
1259 if val:
1260 # the given value must be convertible to integer
1261 try:
1262 int_val = int(val)
1263 except ValueError:
1264 raise AirflowConfigException(
1265 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
1266 f'Current value: "{val}".'
1267 )
1269 try:
1270 return datetime.timedelta(seconds=int_val)
1271 except OverflowError as err:
1272 raise AirflowConfigException(
1273 f"Failed to convert value to timedelta in `seconds`. "
1274 f"{err}. "
1275 f'Please check "{key}" key in "{section}" section. Current value: "{val}".'
1276 )
1278 return fallback
1280 def read(
1281 self,
1282 filenames: (str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike]),
1283 encoding=None,
1284 ):
1285 super().read(filenames=filenames, encoding=encoding)
1287 def read_dict( # type: ignore[override]
1288 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>"
1289 ):
1290 """
1291 We define a different signature here to add better type hints and checking.
1293 :param dictionary: dictionary to read from
1294 :param source: source to be used to store the configuration
1295 :return:
1296 """
1297 super().read_dict(dictionary=dictionary, source=source)
1299 def has_option(self, section: str, option: str) -> bool:
1300 """
1301 Check if option is defined.
1303 Uses self.get() to avoid reimplementing the priority order of config variables
1304 (env, config, cmd, defaults).
1306 :param section: section to get option from
1307 :param option: option to get
1308 :return:
1309 """
1310 try:
1311 value = self.get(section, option, fallback=None, _extra_stacklevel=1, suppress_warnings=True)
1312 if value is None:
1313 return False
1314 return True
1315 except (NoOptionError, NoSectionError):
1316 return False
1318 def set(self, section: str, option: str, value: str | None = None) -> None:
1319 """
1320 Set an option to the given value.
1322 This override just makes sure the section and option are lower case, to match what we do in `get`.
1323 """
1324 section = section.lower()
1325 option = option.lower()
1326 super().set(section, option, value)
1328 def remove_option(self, section: str, option: str, remove_default: bool = True):
1329 """
1330 Remove an option if it exists in config from a file or default config.
1332 If both of config have the same option, this removes the option
1333 in both configs unless remove_default=False.
1334 """
1335 section = section.lower()
1336 option = option.lower()
1337 if super().has_option(section, option):
1338 super().remove_option(section, option)
1340 if self.get_default_value(section, option) is not None and remove_default:
1341 self._default_values.remove_option(section, option)
1343 def getsection(self, section: str) -> ConfigOptionsDictType | None:
1344 """
1345 Return the section as a dict.
1347 Values are converted to int, float, bool as required.
1349 :param section: section from the config
1350 """
1351 if not self.has_section(section) and not self._default_values.has_section(section):
1352 return None
1353 if self._default_values.has_section(section):
1354 _section: ConfigOptionsDictType = dict(self._default_values.items(section))
1355 else:
1356 _section = {}
1358 if self.has_section(section):
1359 _section.update(self.items(section))
1361 section_prefix = self._env_var_name(section, "")
1362 for env_var in sorted(os.environ.keys()):
1363 if env_var.startswith(section_prefix):
1364 key = env_var.replace(section_prefix, "")
1365 if key.endswith("_CMD"):
1366 key = key[:-4]
1367 key = key.lower()
1368 _section[key] = self._get_env_var_option(section, key)
1370 for key, val in _section.items():
1371 if val is None:
1372 raise AirflowConfigException(
1373 f"Failed to convert value automatically. "
1374 f'Please check "{key}" key in "{section}" section is set.'
1375 )
1376 try:
1377 _section[key] = int(val)
1378 except ValueError:
1379 try:
1380 _section[key] = float(val)
1381 except ValueError:
1382 if isinstance(val, str) and val.lower() in ("t", "true"):
1383 _section[key] = True
1384 elif isinstance(val, str) and val.lower() in ("f", "false"):
1385 _section[key] = False
1386 return _section
1388 def as_dict(
1389 self,
1390 display_source: bool = False,
1391 display_sensitive: bool = False,
1392 raw: bool = False,
1393 include_env: bool = True,
1394 include_cmds: bool = True,
1395 include_secret: bool = True,
1396 ) -> ConfigSourcesType:
1397 """
1398 Return the current configuration as an OrderedDict of OrderedDicts.
1400 When materializing current configuration Airflow defaults are
1401 materialized along with user set configs. If any of the `include_*`
1402 options are False then the result of calling command or secret key
1403 configs do not override Airflow defaults and instead are passed through.
1404 In order to then avoid Airflow defaults from overwriting user set
1405 command or secret key configs we filter out bare sensitive_config_values
1406 that are set to Airflow defaults when command or secret key configs
1407 produce different values.
1409 :param display_source: If False, the option value is returned. If True,
1410 a tuple of (option_value, source) is returned. Source is either
1411 'airflow.cfg', 'default', 'env var', or 'cmd'.
1412 :param display_sensitive: If True, the values of options set by env
1413 vars and bash commands will be displayed. If False, those options
1414 are shown as '< hidden >'
1415 :param raw: Should the values be output as interpolated values, or the
1416 "raw" form that can be fed back in to ConfigParser
1417 :param include_env: Should the value of configuration from AIRFLOW__
1418 environment variables be included or not
1419 :param include_cmds: Should the result of calling any *_cmd config be
1420 set (True, default), or should the _cmd options be left as the
1421 command to run (False)
1422 :param include_secret: Should the result of calling any *_secret config be
1423 set (True, default), or should the _secret options be left as the
1424 path to get the secret from (False)
1425 :return: Dictionary, where the key is the name of the section and the content is
1426 the dictionary with the name of the parameter and its value.
1427 """
1428 if not display_sensitive:
1429 # We want to hide the sensitive values at the appropriate methods
1430 # since envs from cmds, secrets can be read at _include_envs method
1431 if not all([include_env, include_cmds, include_secret]):
1432 raise ValueError(
1433 "If display_sensitive is false, then include_env, "
1434 "include_cmds, include_secret must all be set as True"
1435 )
1437 config_sources: ConfigSourcesType = {}
1439 # We check sequentially all those sources and the last one we saw it in will "win"
1440 configs: Iterable[tuple[str, ConfigParser]] = [
1441 ("default-pre-2-7", self._pre_2_7_default_values),
1442 ("default", self._default_values),
1443 ("airflow.cfg", self),
1444 ]
1446 self._replace_config_with_display_sources(
1447 config_sources,
1448 configs,
1449 self.configuration_description if self.configuration_description else {},
1450 display_source,
1451 raw,
1452 self.deprecated_options,
1453 include_cmds=include_cmds,
1454 include_env=include_env,
1455 include_secret=include_secret,
1456 )
1458 # add env vars and overwrite because they have priority
1459 if include_env:
1460 self._include_envs(config_sources, display_sensitive, display_source, raw)
1461 else:
1462 self._filter_by_source(config_sources, display_source, self._get_env_var_option)
1464 # add bash commands
1465 if include_cmds:
1466 self._include_commands(config_sources, display_sensitive, display_source, raw)
1467 else:
1468 self._filter_by_source(config_sources, display_source, self._get_cmd_option)
1470 # add config from secret backends
1471 if include_secret:
1472 self._include_secrets(config_sources, display_sensitive, display_source, raw)
1473 else:
1474 self._filter_by_source(config_sources, display_source, self._get_secret_option)
1476 if not display_sensitive:
1477 # This ensures the ones from config file is hidden too
1478 # if they are not provided through env, cmd and secret
1479 hidden = "< hidden >"
1480 for section, key in self.sensitive_config_values:
1481 if config_sources.get(section):
1482 if config_sources[section].get(key, None):
1483 if display_source:
1484 source = config_sources[section][key][1]
1485 config_sources[section][key] = (hidden, source)
1486 else:
1487 config_sources[section][key] = hidden
1489 return config_sources
1491 def _include_secrets(
1492 self,
1493 config_sources: ConfigSourcesType,
1494 display_sensitive: bool,
1495 display_source: bool,
1496 raw: bool,
1497 ):
1498 for section, key in self.sensitive_config_values:
1499 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key)
1500 if value:
1501 if not display_sensitive:
1502 value = "< hidden >"
1503 if display_source:
1504 opt: str | tuple[str, str] = (value, "secret")
1505 elif raw:
1506 opt = value.replace("%", "%%")
1507 else:
1508 opt = value
1509 config_sources.setdefault(section, {}).update({key: opt})
1510 del config_sources[section][key + "_secret"]
1512 def _include_commands(
1513 self,
1514 config_sources: ConfigSourcesType,
1515 display_sensitive: bool,
1516 display_source: bool,
1517 raw: bool,
1518 ):
1519 for section, key in self.sensitive_config_values:
1520 opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
1521 if not opt:
1522 continue
1523 opt_to_set: str | tuple[str, str] | None = opt
1524 if not display_sensitive:
1525 opt_to_set = "< hidden >"
1526 if display_source:
1527 opt_to_set = (str(opt_to_set), "cmd")
1528 elif raw:
1529 opt_to_set = str(opt_to_set).replace("%", "%%")
1530 if opt_to_set is not None:
1531 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set}
1532 config_sources.setdefault(section, {}).update(dict_to_update)
1533 del config_sources[section][key + "_cmd"]
1535 def _include_envs(
1536 self,
1537 config_sources: ConfigSourcesType,
1538 display_sensitive: bool,
1539 display_source: bool,
1540 raw: bool,
1541 ):
1542 for env_var in [
1543 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
1544 ]:
1545 try:
1546 _, section, key = env_var.split("__", 2)
1547 opt = self._get_env_var_option(section, key)
1548 except ValueError:
1549 continue
1550 if opt is None:
1551 log.warning("Ignoring unknown env var '%s'", env_var)
1552 continue
1553 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"):
1554 # Don't hide cmd/secret values here
1555 if not env_var.lower().endswith(("cmd", "secret")):
1556 if (section, key) in self.sensitive_config_values:
1557 opt = "< hidden >"
1558 elif raw:
1559 opt = opt.replace("%", "%%")
1560 if display_source:
1561 opt = (opt, "env var")
1563 section = section.lower()
1564 # if we lower key for kubernetes_environment_variables section,
1565 # then we won't be able to set any Airflow environment
1566 # variables. Airflow only parse environment variables starts
1567 # with AIRFLOW_. Therefore, we need to make it a special case.
1568 if section != "kubernetes_environment_variables":
1569 key = key.lower()
1570 config_sources.setdefault(section, {}).update({key: opt})
1572 def _filter_by_source(
1573 self,
1574 config_sources: ConfigSourcesType,
1575 display_source: bool,
1576 getter_func,
1577 ):
1578 """
1579 Delete default configs from current configuration.
1581 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values.
1583 This is necessary because bare configs take precedence over the command
1584 or secret key equivalents so if the current running config is
1585 materialized with Airflow defaults they in turn override user set
1586 command or secret key configs.
1588 :param config_sources: The current configuration to operate on
1589 :param display_source: If False, configuration options contain raw
1590 values. If True, options are a tuple of (option_value, source).
1591 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
1592 :param getter_func: A callback function that gets the user configured
1593 override value for a particular sensitive_config_values config.
1594 :return: None, the given config_sources is filtered if necessary,
1595 otherwise untouched.
1596 """
1597 for section, key in self.sensitive_config_values:
1598 # Don't bother if we don't have section / key
1599 if section not in config_sources or key not in config_sources[section]:
1600 continue
1601 # Check that there is something to override defaults
1602 try:
1603 getter_opt = getter_func(section, key)
1604 except ValueError:
1605 continue
1606 if not getter_opt:
1607 continue
1608 # Check to see that there is a default value
1609 if self.get_default_value(section, key) is None:
1610 continue
1611 # Check to see if bare setting is the same as defaults
1612 if display_source:
1613 # when display_source = true, we know that the config_sources contains tuple
1614 opt, source = config_sources[section][key] # type: ignore
1615 else:
1616 opt = config_sources[section][key]
1617 if opt == self.get_default_value(section, key):
1618 del config_sources[section][key]
1620 @staticmethod
1621 def _replace_config_with_display_sources(
1622 config_sources: ConfigSourcesType,
1623 configs: Iterable[tuple[str, ConfigParser]],
1624 configuration_description: dict[str, dict[str, Any]],
1625 display_source: bool,
1626 raw: bool,
1627 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
1628 include_env: bool,
1629 include_cmds: bool,
1630 include_secret: bool,
1631 ):
1632 for source_name, config in configs:
1633 sections = config.sections()
1634 for section in sections:
1635 AirflowConfigParser._replace_section_config_with_display_sources(
1636 config,
1637 config_sources,
1638 configuration_description,
1639 display_source,
1640 raw,
1641 section,
1642 source_name,
1643 deprecated_options,
1644 configs,
1645 include_env=include_env,
1646 include_cmds=include_cmds,
1647 include_secret=include_secret,
1648 )
1650 @staticmethod
1651 def _deprecated_value_is_set_in_config(
1652 deprecated_section: str,
1653 deprecated_key: str,
1654 configs: Iterable[tuple[str, ConfigParser]],
1655 ) -> bool:
1656 for config_type, config in configs:
1657 if config_type != "default":
1658 with contextlib.suppress(NoSectionError):
1659 deprecated_section_array = config.items(section=deprecated_section, raw=True)
1660 if any(key == deprecated_key for key, _ in deprecated_section_array):
1661 return True
1662 else:
1663 return False
1665 @staticmethod
1666 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
1667 return (
1668 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}")
1669 is not None
1670 )
1672 @staticmethod
1673 def _deprecated_command_is_set_in_config(
1674 deprecated_section: str,
1675 deprecated_key: str,
1676 configs: Iterable[tuple[str, ConfigParser]],
1677 ) -> bool:
1678 return AirflowConfigParser._deprecated_value_is_set_in_config(
1679 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
1680 )
1682 @staticmethod
1683 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
1684 return (
1685 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD")
1686 is not None
1687 )
1689 @staticmethod
1690 def _deprecated_secret_is_set_in_config(
1691 deprecated_section: str,
1692 deprecated_key: str,
1693 configs: Iterable[tuple[str, ConfigParser]],
1694 ) -> bool:
1695 return AirflowConfigParser._deprecated_value_is_set_in_config(
1696 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
1697 )
1699 @staticmethod
1700 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
1701 return (
1702 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET")
1703 is not None
1704 )
1706 @contextmanager
1707 def suppress_future_warnings(self):
1708 suppress_future_warnings = self._suppress_future_warnings
1709 self._suppress_future_warnings = True
1710 yield self
1711 self._suppress_future_warnings = suppress_future_warnings
1713 @staticmethod
1714 def _replace_section_config_with_display_sources(
1715 config: ConfigParser,
1716 config_sources: ConfigSourcesType,
1717 configuration_description: dict[str, dict[str, Any]],
1718 display_source: bool,
1719 raw: bool,
1720 section: str,
1721 source_name: str,
1722 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
1723 configs: Iterable[tuple[str, ConfigParser]],
1724 include_env: bool,
1725 include_cmds: bool,
1726 include_secret: bool,
1727 ):
1728 sect = config_sources.setdefault(section, {})
1729 if isinstance(config, AirflowConfigParser):
1730 with config.suppress_future_warnings():
1731 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw)
1732 else:
1733 items = config.items(section=section, raw=raw)
1734 for k, val in items:
1735 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
1736 if deprecated_section and deprecated_key:
1737 if source_name == "default":
1738 # If deprecated entry has some non-default value set for any of the sources requested,
1739 # We should NOT set default for the new entry (because it will override anything
1740 # coming from the deprecated ones)
1741 if AirflowConfigParser._deprecated_value_is_set_in_config(
1742 deprecated_section, deprecated_key, configs
1743 ):
1744 continue
1745 if include_env and AirflowConfigParser._deprecated_variable_is_set(
1746 deprecated_section, deprecated_key
1747 ):
1748 continue
1749 if include_cmds and (
1750 AirflowConfigParser._deprecated_variable_command_is_set(
1751 deprecated_section, deprecated_key
1752 )
1753 or AirflowConfigParser._deprecated_command_is_set_in_config(
1754 deprecated_section, deprecated_key, configs
1755 )
1756 ):
1757 continue
1758 if include_secret and (
1759 AirflowConfigParser._deprecated_variable_secret_is_set(
1760 deprecated_section, deprecated_key
1761 )
1762 or AirflowConfigParser._deprecated_secret_is_set_in_config(
1763 deprecated_section, deprecated_key, configs
1764 )
1765 ):
1766 continue
1767 if display_source:
1768 updated_source_name = source_name
1769 if source_name == "default":
1770 # defaults can come from other sources (default-<PROVIDER>) that should be used here
1771 source_description_section = configuration_description.get(section, {})
1772 source_description_key = source_description_section.get("options", {}).get(k, {})
1773 if source_description_key is not None:
1774 updated_source_name = source_description_key.get("source", source_name)
1775 sect[k] = (val, updated_source_name)
1776 else:
1777 sect[k] = val
1779 def load_test_config(self):
1780 """
1781 Use test configuration rather than the configuration coming from airflow defaults.
1783 When running tests we use special the unit_test configuration to avoid accidental modifications and
1784 different behaviours when running the tests. Values for those test configuration are stored in
1785 the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder
1786 and you need to change values there if you want to make some specific configuration to be used
1787 """
1788 # We need those globals before we run "get_all_expansion_variables" because this is where
1789 # the variables are expanded from in the configuration
1790 global FERNET_KEY, AIRFLOW_HOME
1791 from cryptography.fernet import Fernet
1793 unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg"
1794 unit_test_config = unit_test_config_file.read_text()
1795 self.remove_all_read_configurations()
1796 with StringIO(unit_test_config) as test_config_file:
1797 self.read_file(test_config_file)
1798 # set fernet key to a random value
1799 global FERNET_KEY
1800 FERNET_KEY = Fernet.generate_key().decode()
1801 self.expand_all_configuration_values()
1802 log.info("Unit test configuration loaded from 'config_unit_tests.cfg'")
1804 def expand_all_configuration_values(self):
1805 """Expand all configuration values using global and local variables defined in this module."""
1806 all_vars = get_all_expansion_variables()
1807 for section in self.sections():
1808 for key, value in self.items(section):
1809 if value is not None:
1810 if self.has_option(section, key):
1811 self.remove_option(section, key)
1812 if self.is_template(section, key) or not isinstance(value, str):
1813 self.set(section, key, value)
1814 else:
1815 self.set(section, key, value.format(**all_vars))
1817 def remove_all_read_configurations(self):
1818 """Remove all read configurations, leaving only default values in the config."""
1819 for section in self.sections():
1820 self.remove_section(section)
1822 @property
1823 def providers_configuration_loaded(self) -> bool:
1824 """Checks if providers have been loaded."""
1825 return self._providers_configuration_loaded
1827 def load_providers_configuration(self):
1828 """
1829 Load configuration for providers.
1831 This should be done after initial configuration have been performed. Initializing and discovering
1832 providers is an expensive operation and cannot be performed when we load configuration for the first
1833 time when airflow starts, because we initialize configuration very early, during importing of the
1834 `airflow` package and the module is not yet ready to be used when it happens and until configuration
1835 and settings are loaded. Therefore, in order to reload provider configuration we need to additionally
1836 load provider - specific configuration.
1837 """
1838 log.debug("Loading providers configuration")
1839 from airflow.providers_manager import ProvidersManager
1841 self.restore_core_default_configuration()
1842 for provider, config in ProvidersManager().already_initialized_provider_configs:
1843 for provider_section, provider_section_content in config.items():
1844 provider_options = provider_section_content["options"]
1845 section_in_current_config = self.configuration_description.get(provider_section)
1846 if not section_in_current_config:
1847 self.configuration_description[provider_section] = deepcopy(provider_section_content)
1848 section_in_current_config = self.configuration_description.get(provider_section)
1849 section_in_current_config["source"] = f"default-{provider}"
1850 for option in provider_options:
1851 section_in_current_config["options"][option]["source"] = f"default-{provider}"
1852 else:
1853 section_source = section_in_current_config.get("source", "Airflow's core package").split(
1854 "default-"
1855 )[-1]
1856 raise AirflowConfigException(
1857 f"The provider {provider} is attempting to contribute "
1858 f"configuration section {provider_section} that "
1859 f"has already been added before. The source of it: {section_source}. "
1860 "This is forbidden. A provider can only add new sections. It "
1861 "cannot contribute options to existing sections or override other "
1862 "provider's configuration.",
1863 UserWarning,
1864 )
1865 self._default_values = create_default_config_parser(self.configuration_description)
1866 # sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete
1867 # the cached values, and it will be refreshed on next access. This has been an implementation
1868 # detail in Python 3.8 but as of Python 3.9 it is documented behaviour.
1869 # See https://docs.python.org/3/library/functools.html#functools.cached_property
1870 try:
1871 del self.sensitive_config_values
1872 except AttributeError:
1873 # no problem if cache is not set yet
1874 pass
1875 self._providers_configuration_loaded = True
1877 @staticmethod
1878 def _warn_deprecate(
1879 section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int
1880 ):
1881 if section == deprecated_section:
1882 warnings.warn(
1883 f"The {deprecated_name} option in [{section}] has been renamed to {key} - "
1884 f"the old setting has been used, but please update your config.",
1885 DeprecationWarning,
1886 stacklevel=4 + extra_stacklevel,
1887 )
1888 else:
1889 warnings.warn(
1890 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option "
1891 f"in [{section}] - the old setting has been used, but please update your config.",
1892 DeprecationWarning,
1893 stacklevel=4 + extra_stacklevel,
1894 )
1896 def __getstate__(self) -> dict[str, Any]:
1897 """Return the state of the object as a dictionary for pickling."""
1898 return {
1899 name: getattr(self, name)
1900 for name in [
1901 "_sections",
1902 "is_validated",
1903 "configuration_description",
1904 "upgraded_values",
1905 "_default_values",
1906 ]
1907 }
1909 def __setstate__(self, state) -> None:
1910 """Restore the state of the object from a dictionary representation."""
1911 self.__init__() # type: ignore[misc]
1912 config = state.pop("_sections")
1913 self.read_dict(config)
1914 self.__dict__.update(state)
1917def get_airflow_home() -> str:
1918 """Get path to Airflow Home."""
1919 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow"))
1922def get_airflow_config(airflow_home: str) -> str:
1923 """Get Path to airflow.cfg path."""
1924 airflow_config_var = os.environ.get("AIRFLOW_CONFIG")
1925 if airflow_config_var is None:
1926 return os.path.join(airflow_home, "airflow.cfg")
1927 return expand_env_var(airflow_config_var)
1930def get_all_expansion_variables() -> dict[str, Any]:
1931 return {k: v for d in [globals(), locals()] for k, v in d.items()}
1934def _generate_fernet_key() -> str:
1935 from cryptography.fernet import Fernet
1937 return Fernet.generate_key().decode()
1940def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser:
1941 """
1942 Create default config parser based on configuration description.
1944 It creates ConfigParser with all default values retrieved from the configuration description and
1945 expands all the variables from the global and local variables defined in this module.
1947 :param configuration_description: configuration description - retrieved from config.yaml files
1948 following the schema defined in "config.yml.schema.json" in the config_templates folder.
1949 :return: Default Config Parser that can be used to read configuration values from.
1950 """
1951 parser = ConfigParser()
1952 all_vars = get_all_expansion_variables()
1953 for section, section_desc in configuration_description.items():
1954 parser.add_section(section)
1955 options = section_desc["options"]
1956 for key in options:
1957 default_value = options[key]["default"]
1958 is_template = options[key].get("is_template", False)
1959 if default_value is not None:
1960 if is_template or not isinstance(default_value, str):
1961 parser.set(section, key, default_value)
1962 else:
1963 parser.set(section, key, default_value.format(**all_vars))
1964 return parser
1967def create_pre_2_7_defaults() -> ConfigParser:
1968 """
1969 Create parser using the old defaults from Airflow < 2.7.0.
1971 This is used in order to be able to fall-back to those defaults when old version of provider,
1972 not supporting "config contribution" is installed with Airflow 2.7.0+. This "default"
1973 configuration does not support variable expansion, those are pretty much hard-coded defaults '
1974 we want to fall-back to in such case.
1975 """
1976 config_parser = ConfigParser()
1977 config_parser.read(_default_config_file_path("pre_2_7_defaults.cfg"))
1978 return config_parser
1981def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
1982 airflow_config = pathlib.Path(AIRFLOW_CONFIG)
1983 if airflow_config.is_dir():
1984 msg = (
1985 "Airflow config expected to be a path to the configuration file, "
1986 f"but got a directory {airflow_config.__fspath__()!r}."
1987 )
1988 raise IsADirectoryError(msg)
1989 elif not airflow_config.exists():
1990 log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__())
1991 config_directory = airflow_config.parent
1992 if not config_directory.exists():
1993 # Compatibility with Python 3.8, ``PurePath.is_relative_to`` was added in Python 3.9
1994 try:
1995 config_directory.relative_to(AIRFLOW_HOME)
1996 except ValueError:
1997 msg = (
1998 f"Config directory {config_directory.__fspath__()!r} not exists "
1999 f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. "
2000 "Please create this directory first."
2001 )
2002 raise FileNotFoundError(msg) from None
2003 log.debug("Create directory %r for Airflow config", config_directory.__fspath__())
2004 config_directory.mkdir(parents=True, exist_ok=True)
2005 if conf.get("core", "fernet_key", fallback=None) is None:
2006 # We know that FERNET_KEY is not set, so we can generate it, set as global key
2007 # and also write it to the config file so that same key will be used next time
2008 global FERNET_KEY
2009 FERNET_KEY = _generate_fernet_key()
2010 conf.remove_option("core", "fernet_key")
2011 conf.set("core", "fernet_key", FERNET_KEY)
2012 pathlib.Path(airflow_config.__fspath__()).touch()
2013 make_group_other_inaccessible(airflow_config.__fspath__())
2014 with open(airflow_config, "w") as file:
2015 conf.write(
2016 file,
2017 include_sources=False,
2018 include_env_vars=True,
2019 include_providers=True,
2020 extra_spacing=True,
2021 only_defaults=True,
2022 )
2023 return conf
2026def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser):
2027 """
2028 Load standard airflow configuration.
2030 In case it finds that the configuration file is missing, it will create it and write the default
2031 configuration values there, based on defaults passed, and will add the comments and examples
2032 from the default configuration.
2034 :param airflow_config_parser: parser to which the configuration will be loaded
2036 """
2037 global AIRFLOW_HOME
2038 log.info("Reading the config from %s", AIRFLOW_CONFIG)
2039 airflow_config_parser.read(AIRFLOW_CONFIG)
2040 if airflow_config_parser.has_option("core", "AIRFLOW_HOME"):
2041 msg = (
2042 "Specifying both AIRFLOW_HOME environment variable and airflow_home "
2043 "in the config file is deprecated. Please use only the AIRFLOW_HOME "
2044 "environment variable and remove the config file entry."
2045 )
2046 if "AIRFLOW_HOME" in os.environ:
2047 warnings.warn(msg, category=DeprecationWarning, stacklevel=1)
2048 elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME:
2049 warnings.warn(
2050 "Specifying airflow_home in the config file is deprecated. As you "
2051 "have left it at the default value you should remove the setting "
2052 "from your airflow.cfg and suffer no change in behaviour.",
2053 category=DeprecationWarning,
2054 stacklevel=1,
2055 )
2056 else:
2057 # there
2058 AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home") # type: ignore[assignment]
2059 warnings.warn(msg, category=DeprecationWarning, stacklevel=1)
2062def initialize_config() -> AirflowConfigParser:
2063 """
2064 Load the Airflow config files.
2066 Called for you automatically as part of the Airflow boot process.
2067 """
2068 airflow_config_parser = AirflowConfigParser()
2069 if airflow_config_parser.getboolean("core", "unit_test_mode"):
2070 airflow_config_parser.load_test_config()
2071 else:
2072 load_standard_airflow_configuration(airflow_config_parser)
2073 # If the user set unit_test_mode in the airflow.cfg, we still
2074 # want to respect that and then load the default unit test configuration
2075 # file on top of it.
2076 if airflow_config_parser.getboolean("core", "unit_test_mode"):
2077 airflow_config_parser.load_test_config()
2078 # Set the WEBSERVER_CONFIG variable
2079 global WEBSERVER_CONFIG
2080 WEBSERVER_CONFIG = airflow_config_parser.get("webserver", "config_file")
2081 return airflow_config_parser
2084@providers_configuration_loaded
2085def write_webserver_configuration_if_needed(airflow_config_parser: AirflowConfigParser):
2086 webserver_config = airflow_config_parser.get("webserver", "config_file")
2087 if not os.path.isfile(webserver_config):
2088 import shutil
2090 pathlib.Path(webserver_config).parent.mkdir(parents=True, exist_ok=True)
2091 log.info("Creating new FAB webserver config file in: %s", webserver_config)
2092 shutil.copy(_default_config_file_path("default_webserver_config.py"), webserver_config)
2095def make_group_other_inaccessible(file_path: str):
2096 try:
2097 permissions = os.stat(file_path)
2098 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR))
2099 except Exception as e:
2100 log.warning(
2101 "Could not change permissions of config file to be group/other inaccessible. "
2102 "Continuing with original permissions: %s",
2103 e,
2104 )
2107def get(*args, **kwargs) -> ConfigType | None:
2108 """Historical get."""
2109 warnings.warn(
2110 "Accessing configuration method 'get' directly from the configuration module is "
2111 "deprecated. Please access the configuration from the 'configuration.conf' object via "
2112 "'conf.get'",
2113 DeprecationWarning,
2114 stacklevel=2,
2115 )
2116 return conf.get(*args, **kwargs)
2119def getboolean(*args, **kwargs) -> bool:
2120 """Historical getboolean."""
2121 warnings.warn(
2122 "Accessing configuration method 'getboolean' directly from the configuration module is "
2123 "deprecated. Please access the configuration from the 'configuration.conf' object via "
2124 "'conf.getboolean'",
2125 DeprecationWarning,
2126 stacklevel=2,
2127 )
2128 return conf.getboolean(*args, **kwargs)
2131def getfloat(*args, **kwargs) -> float:
2132 """Historical getfloat."""
2133 warnings.warn(
2134 "Accessing configuration method 'getfloat' directly from the configuration module is "
2135 "deprecated. Please access the configuration from the 'configuration.conf' object via "
2136 "'conf.getfloat'",
2137 DeprecationWarning,
2138 stacklevel=2,
2139 )
2140 return conf.getfloat(*args, **kwargs)
2143def getint(*args, **kwargs) -> int:
2144 """Historical getint."""
2145 warnings.warn(
2146 "Accessing configuration method 'getint' directly from the configuration module is "
2147 "deprecated. Please access the configuration from the 'configuration.conf' object via "
2148 "'conf.getint'",
2149 DeprecationWarning,
2150 stacklevel=2,
2151 )
2152 return conf.getint(*args, **kwargs)
2155def getsection(*args, **kwargs) -> ConfigOptionsDictType | None:
2156 """Historical getsection."""
2157 warnings.warn(
2158 "Accessing configuration method 'getsection' directly from the configuration module is "
2159 "deprecated. Please access the configuration from the 'configuration.conf' object via "
2160 "'conf.getsection'",
2161 DeprecationWarning,
2162 stacklevel=2,
2163 )
2164 return conf.getsection(*args, **kwargs)
2167def has_option(*args, **kwargs) -> bool:
2168 """Historical has_option."""
2169 warnings.warn(
2170 "Accessing configuration method 'has_option' directly from the configuration module is "
2171 "deprecated. Please access the configuration from the 'configuration.conf' object via "
2172 "'conf.has_option'",
2173 DeprecationWarning,
2174 stacklevel=2,
2175 )
2176 return conf.has_option(*args, **kwargs)
2179def remove_option(*args, **kwargs) -> bool:
2180 """Historical remove_option."""
2181 warnings.warn(
2182 "Accessing configuration method 'remove_option' directly from the configuration module is "
2183 "deprecated. Please access the configuration from the 'configuration.conf' object via "
2184 "'conf.remove_option'",
2185 DeprecationWarning,
2186 stacklevel=2,
2187 )
2188 return conf.remove_option(*args, **kwargs)
2191def as_dict(*args, **kwargs) -> ConfigSourcesType:
2192 """Historical as_dict."""
2193 warnings.warn(
2194 "Accessing configuration method 'as_dict' directly from the configuration module is "
2195 "deprecated. Please access the configuration from the 'configuration.conf' object via "
2196 "'conf.as_dict'",
2197 DeprecationWarning,
2198 stacklevel=2,
2199 )
2200 return conf.as_dict(*args, **kwargs)
2203def set(*args, **kwargs) -> None:
2204 """Historical set."""
2205 warnings.warn(
2206 "Accessing configuration method 'set' directly from the configuration module is "
2207 "deprecated. Please access the configuration from the 'configuration.conf' object via "
2208 "'conf.set'",
2209 DeprecationWarning,
2210 stacklevel=2,
2211 )
2212 conf.set(*args, **kwargs)
2215def ensure_secrets_loaded() -> list[BaseSecretsBackend]:
2216 """
2217 Ensure that all secrets backends are loaded.
2219 If the secrets_backend_list contains only 2 default backends, reload it.
2220 """
2221 # Check if the secrets_backend_list contains only 2 default backends
2222 if len(secrets_backend_list) == 2:
2223 return initialize_secrets_backends()
2224 return secrets_backend_list
2227def get_custom_secret_backend() -> BaseSecretsBackend | None:
2228 """Get Secret Backend if defined in airflow.cfg."""
2229 secrets_backend_cls = conf.getimport(section="secrets", key="backend")
2231 if not secrets_backend_cls:
2232 return None
2234 try:
2235 backend_kwargs = conf.getjson(section="secrets", key="backend_kwargs")
2236 if not backend_kwargs:
2237 backend_kwargs = {}
2238 elif not isinstance(backend_kwargs, dict):
2239 raise ValueError("not a dict")
2240 except AirflowConfigException:
2241 log.warning("Failed to parse [secrets] backend_kwargs as JSON, defaulting to no kwargs.")
2242 backend_kwargs = {}
2243 except ValueError:
2244 log.warning("Failed to parse [secrets] backend_kwargs into a dict, defaulting to no kwargs.")
2245 backend_kwargs = {}
2247 return secrets_backend_cls(**backend_kwargs)
2250def initialize_secrets_backends() -> list[BaseSecretsBackend]:
2251 """
2252 Initialize secrets backend.
2254 * import secrets backend classes
2255 * instantiate them and return them in a list
2256 """
2257 backend_list = []
2259 custom_secret_backend = get_custom_secret_backend()
2261 if custom_secret_backend is not None:
2262 backend_list.append(custom_secret_backend)
2264 for class_name in DEFAULT_SECRETS_SEARCH_PATH:
2265 secrets_backend_cls = import_string(class_name)
2266 backend_list.append(secrets_backend_cls())
2268 return backend_list
2271@functools.lru_cache(maxsize=None)
2272def _DEFAULT_CONFIG() -> str:
2273 path = _default_config_file_path("default_airflow.cfg")
2274 with open(path) as fh:
2275 return fh.read()
2278@functools.lru_cache(maxsize=None)
2279def _TEST_CONFIG() -> str:
2280 path = _default_config_file_path("default_test.cfg")
2281 with open(path) as fh:
2282 return fh.read()
2285_deprecated = {
2286 "DEFAULT_CONFIG": _DEFAULT_CONFIG,
2287 "TEST_CONFIG": _TEST_CONFIG,
2288 "TEST_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_test.cfg"),
2289 "DEFAULT_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_airflow.cfg"),
2290}
2293def __getattr__(name):
2294 if name in _deprecated:
2295 warnings.warn(
2296 f"{__name__}.{name} is deprecated and will be removed in future",
2297 DeprecationWarning,
2298 stacklevel=2,
2299 )
2300 return _deprecated[name]()
2301 raise AttributeError(f"module {__name__} has no attribute {name}")
2304def initialize_auth_manager() -> BaseAuthManager:
2305 """
2306 Initialize auth manager.
2308 * import user manager class
2309 * instantiate it and return it
2310 """
2311 auth_manager_cls = conf.getimport(section="core", key="auth_manager")
2313 if not auth_manager_cls:
2314 raise AirflowConfigException(
2315 "No auth manager defined in the config. "
2316 "Please specify one using section/key [core/auth_manager]."
2317 )
2319 return auth_manager_cls()
2322# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
2323# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults.
2324AIRFLOW_HOME = get_airflow_home()
2325AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME)
2327# Set up dags folder for unit tests
2328# this directory won't exist if users install via pip
2329_TEST_DAGS_FOLDER = os.path.join(
2330 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags"
2331)
2332if os.path.exists(_TEST_DAGS_FOLDER):
2333 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
2334else:
2335 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags")
2337# Set up plugins folder for unit tests
2338_TEST_PLUGINS_FOLDER = os.path.join(
2339 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins"
2340)
2341if os.path.exists(_TEST_PLUGINS_FOLDER):
2342 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
2343else:
2344 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins")
2346SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8")
2347FERNET_KEY = "" # Set only if needed when generating a new file
2348WEBSERVER_CONFIG = "" # Set by initialize_config
2350conf: AirflowConfigParser = initialize_config()
2351secrets_backend_list = initialize_secrets_backends()
2352conf.validate()