1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
18
19import logging
20import os
21import pathlib
22import re
23import shlex
24import stat
25import subprocess
26import sys
27import warnings
28from base64 import b64encode
29from collections.abc import Callable
30from configparser import ConfigParser
31from inspect import ismodule
32from io import StringIO
33from re import Pattern
34from typing import IO, TYPE_CHECKING, Any
35from urllib.parse import urlsplit
36
37from typing_extensions import overload
38
39from airflow._shared.configuration.parser import (
40 AirflowConfigParser as _SharedAirflowConfigParser,
41 configure_parser_from_configuration_description,
42)
43from airflow._shared.module_loading import import_string
44from airflow.exceptions import AirflowConfigException, RemovedInAirflow4Warning
45from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH
46from airflow.task.weight_rule import WeightRule
47from airflow.utils import yaml
48
49if TYPE_CHECKING:
50 from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
51 from airflow.secrets import BaseSecretsBackend
52
53log = logging.getLogger(__name__)
54
55# show Airflow's deprecation warnings
56if not sys.warnoptions:
57 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow")
58 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow")
59
60_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$")
61
62ConfigType = str | int | float | bool
63ConfigOptionsDictType = dict[str, ConfigType]
64ConfigSectionSourcesType = dict[str, str | tuple[str, str]]
65ConfigSourcesType = dict[str, ConfigSectionSourcesType]
66
67ENV_VAR_PREFIX = "AIRFLOW__"
68
69
70class _SecretKeys:
71 """Holds the secret keys used in Airflow during runtime."""
72
73 fernet_key: str = "" # Set only if needed when generating a new file
74 jwt_secret_key: str = ""
75
76
77class ConfigModifications:
78 """
79 Holds modifications to be applied when writing out the config.
80
81 :param rename: Mapping from (old_section, old_option) to (new_section, new_option)
82 :param remove: Set of (section, option) to remove
83 :param default_updates: Mapping from (section, option) to new default value
84 """
85
86 def __init__(self) -> None:
87 self.rename: dict[tuple[str, str], tuple[str, str]] = {}
88 self.remove: set[tuple[str, str]] = set()
89 self.default_updates: dict[tuple[str, str], str] = {}
90
91 def add_rename(self, old_section: str, old_option: str, new_section: str, new_option: str) -> None:
92 self.rename[(old_section, old_option)] = (new_section, new_option)
93
94 def add_remove(self, section: str, option: str) -> None:
95 self.remove.add((section, option))
96
97 def add_default_update(self, section: str, option: str, new_default: str) -> None:
98 self.default_updates[(section, option)] = new_default
99
100
101def _parse_sqlite_version(s: str) -> tuple[int, ...]:
102 match = _SQLITE3_VERSION_PATTERN.match(s)
103 if match is None:
104 return ()
105 return tuple(int(p) for p in match.group("version").split("."))
106
107
108@overload
109def expand_env_var(env_var: None) -> None: ...
110
111
112@overload
113def expand_env_var(env_var: str) -> str: ...
114
115
116def expand_env_var(env_var: str | None) -> str | None:
117 """
118 Expand (potentially nested) env vars.
119
120 Repeat and apply `expandvars` and `expanduser` until
121 interpolation stops having any effect.
122 """
123 if not env_var or not isinstance(env_var, str):
124 return env_var
125 while True:
126 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
127 if interpolated == env_var:
128 return interpolated
129 env_var = interpolated
130
131
132def run_command(command: str) -> str:
133 """Run command and returns stdout."""
134 process = subprocess.Popen(
135 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
136 )
137 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
138
139 if process.returncode != 0:
140 raise AirflowConfigException(
141 f"Cannot execute {command}. Error code is: {process.returncode}. "
142 f"Output: {output}, Stderr: {stderr}"
143 )
144
145 return output
146
147
148def _default_config_file_path(file_name: str) -> str:
149 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates")
150 return os.path.join(templates_dir, file_name)
151
152
153def retrieve_configuration_description(
154 include_airflow: bool = True,
155 include_providers: bool = True,
156 selected_provider: str | None = None,
157) -> dict[str, dict[str, Any]]:
158 """
159 Read Airflow configuration description from YAML file.
160
161 :param include_airflow: Include Airflow configs
162 :param include_providers: Include provider configs
163 :param selected_provider: If specified, include selected provider only
164 :return: Python dictionary containing configs & their info
165 """
166 base_configuration_description: dict[str, dict[str, Any]] = {}
167 if include_airflow:
168 with open(_default_config_file_path("config.yml")) as config_file:
169 base_configuration_description.update(yaml.safe_load(config_file))
170 if include_providers:
171 from airflow.providers_manager import ProvidersManager
172
173 for provider, config in ProvidersManager().provider_configs:
174 if not selected_provider or provider == selected_provider:
175 base_configuration_description.update(config)
176 return base_configuration_description
177
178
179class AirflowConfigParser(_SharedAirflowConfigParser):
180 """
181 Custom Airflow Configparser supporting defaults and deprecated options.
182
183 This is a subclass of the shared AirflowConfigParser that adds Core-specific initialization
184 and functionality (providers, validation, writing, etc.).
185
186 The defaults are stored in the ``_default_values``. The configuration description keeps
187 description of all the options available in Airflow (description follow config.yaml.schema).
188
189 :param default_config: default configuration (in the form of ini file).
190 :param configuration_description: description of configuration to use
191 """
192
193 def __init__(
194 self,
195 default_config: str | None = None,
196 *args,
197 **kwargs,
198 ):
199 _configuration_description = retrieve_configuration_description(include_providers=False)
200 # For those who would like to use a different data structure to keep defaults:
201 # We have to keep the default values in a ConfigParser rather than in any other
202 # data structure, because the values we have might contain %% which are ConfigParser
203 # interpolation placeholders. The _default_values config parser will interpolate them
204 # properly when we call get() on it.
205 _default_values = create_default_config_parser(_configuration_description)
206 from airflow.providers_manager import ProvidersManager
207
208 super().__init__(
209 _configuration_description,
210 _default_values,
211 ProvidersManager,
212 create_default_config_parser,
213 _default_config_file_path("provider_config_fallback_defaults.cfg"),
214 *args,
215 **kwargs,
216 )
217 self._configuration_description = _configuration_description
218 self._default_values = _default_values
219 if default_config is not None:
220 self._update_defaults_from_string(default_config)
221 self._update_logging_deprecated_template_to_one_from_defaults()
222 self.is_validated = False
223 self._suppress_future_warnings = False
224
225 @property
226 def _validators(self) -> list[Callable[[], None]]:
227 """Overring _validators from shared base class to add core-specific validators."""
228 return [
229 self._validate_sqlite3_version,
230 self._validate_enums,
231 self._validate_deprecated_values,
232 self._upgrade_postgres_metastore_conn,
233 ]
234
235 def _update_logging_deprecated_template_to_one_from_defaults(self):
236 default = self.get_default_value("logging", "log_filename_template")
237 if default:
238 # Tuple does not support item assignment, so we have to create a new tuple and replace it
239 original_replacement = self.deprecated_values["logging"]["log_filename_template"]
240 self.deprecated_values["logging"]["log_filename_template"] = (
241 original_replacement[0],
242 default,
243 )
244
245 # A mapping of old default values that we want to change and warn the user
246 # about. Mapping of section -> setting -> { old, replace }
247 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {
248 "logging": {
249 "log_filename_template": (
250 re.compile(
251 re.escape(
252 "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log"
253 )
254 ),
255 # The actual replacement value will be updated after defaults are loaded from config.yml
256 "XX-set-after-default-config-loaded-XX",
257 ),
258 },
259 "core": {
260 "executor": (re.compile(re.escape("SequentialExecutor")), "LocalExecutor"),
261 },
262 }
263
264 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"]
265 enums_options = {
266 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()),
267 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"],
268 ("dag_processor", "file_parsing_sort_mode"): [
269 "modified_time",
270 "random_seeded_by_host",
271 "alphabetical",
272 ],
273 ("logging", "logging_level"): _available_logging_levels,
274 ("logging", "fab_logging_level"): _available_logging_levels,
275 # celery_logging_level can be empty, which uses logging_level as fallback
276 ("logging", "celery_logging_level"): [*_available_logging_levels, ""],
277 # uvicorn and gunicorn logging levels for web servers
278 ("logging", "uvicorn_logging_level"): _available_logging_levels,
279 ("logging", "gunicorn_logging_level"): _available_logging_levels,
280 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""],
281 ("api", "grid_view_sorting_order"): ["topological", "hierarchical_alphabetical"],
282 ("logging", "dag_processor_log_target"): ["file", "stdout"],
283 }
284
285 upgraded_values: dict[tuple[str, str], str]
286 """Mapping of (section,option) to the old value that was upgraded"""
287
288 def write_custom_config(
289 self,
290 file: IO[str],
291 comment_out_defaults: bool = True,
292 include_descriptions: bool = True,
293 extra_spacing: bool = True,
294 modifications: ConfigModifications | None = None,
295 ) -> None:
296 """
297 Write a configuration file using a ConfigModifications object.
298
299 This method includes only options from the current airflow.cfg. For each option:
300 - If it's marked for removal, omit it.
301 - If renamed, output it under its new name and add a comment indicating its original location.
302 - If a default update is specified, apply the new default and output the option as a commented line.
303 - Otherwise, if the current value equals the default and comment_out_defaults is True, output it as a comment.
304 Options absent from the current airflow.cfg are omitted.
305
306 :param file: File to write the configuration.
307 :param comment_out_defaults: If True, options whose value equals the default are written as comments.
308 :param include_descriptions: Whether to include section descriptions.
309 :param extra_spacing: Whether to insert an extra blank line after each option.
310 :param modifications: ConfigModifications instance with rename, remove, and default updates.
311 """
312 modifications = modifications or ConfigModifications()
313 output: dict[str, list[tuple[str, str, bool, str]]] = {}
314
315 for section in self._sections: # type: ignore[attr-defined] # accessing _sections from ConfigParser
316 for option, orig_value in self._sections[section].items(): # type: ignore[attr-defined]
317 key = (section.lower(), option.lower())
318 if key in modifications.remove:
319 continue
320
321 mod_comment = ""
322 if key in modifications.rename:
323 new_sec, new_opt = modifications.rename[key]
324 effective_section = new_sec
325 effective_option = new_opt
326 mod_comment += f"# Renamed from {section}.{option}\n"
327 else:
328 effective_section = section
329 effective_option = option
330
331 value = orig_value
332 if key in modifications.default_updates:
333 mod_comment += (
334 f"# Default updated from {orig_value} to {modifications.default_updates[key]}\n"
335 )
336 value = modifications.default_updates[key]
337
338 default_value = self.get_default_value(effective_section, effective_option, fallback="")
339 is_default = str(value) == str(default_value)
340 output.setdefault(effective_section.lower(), []).append(
341 (effective_option, str(value), is_default, mod_comment)
342 )
343
344 for section, options in output.items():
345 section_buffer = StringIO()
346 section_buffer.write(f"[{section}]\n")
347 if include_descriptions:
348 description = self.configuration_description.get(section, {}).get("description", "")
349 if description:
350 for line in description.splitlines():
351 section_buffer.write(f"# {line}\n")
352 section_buffer.write("\n")
353 for option, value_str, is_default, mod_comment in options:
354 key = (section.lower(), option.lower())
355 if key in modifications.default_updates and comment_out_defaults:
356 section_buffer.write(f"# {option} = {value_str}\n")
357 else:
358 if mod_comment:
359 section_buffer.write(mod_comment)
360 if is_default and comment_out_defaults:
361 section_buffer.write(f"# {option} = {value_str}\n")
362 else:
363 section_buffer.write(f"{option} = {value_str}\n")
364 if extra_spacing:
365 section_buffer.write("\n")
366 content = section_buffer.getvalue().strip()
367 if content:
368 file.write(f"{content}\n\n")
369
370 def _upgrade_postgres_metastore_conn(self):
371 """
372 Upgrade SQL schemas.
373
374 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres`
375 must be replaced with `postgresql`.
376 """
377 section, key = "database", "sql_alchemy_conn"
378 old_value = self.get(section, key, _extra_stacklevel=1)
379 bad_schemes = ["postgres+psycopg2", "postgres"]
380 good_scheme = "postgresql"
381 parsed = urlsplit(old_value)
382 if parsed.scheme in bad_schemes:
383 warnings.warn(
384 f"Bad scheme in Airflow configuration [database] sql_alchemy_conn: `{parsed.scheme}`. "
385 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must "
386 f"change to `{good_scheme}` before the next Airflow release.",
387 FutureWarning,
388 stacklevel=1,
389 )
390 self.upgraded_values[(section, key)] = old_value
391 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value)
392 self._update_env_var(section=section, name=key, new_value=new_value)
393
394 # if the old value is set via env var, we need to wipe it
395 # otherwise, it'll "win" over our adjusted value
396 old_env_var = self._env_var_name("core", key)
397 os.environ.pop(old_env_var, None)
398
399 def _validate_enums(self):
400 """Validate that enum type config has an accepted value."""
401 for (section_key, option_key), enum_options in self.enums_options.items():
402 if self.has_option(section_key, option_key):
403 value = self.get(section_key, option_key, fallback=None)
404 if value and value not in enum_options:
405 raise AirflowConfigException(
406 f"`[{section_key}] {option_key}` should not be "
407 f"{value!r}. Possible values: {', '.join(enum_options)}."
408 )
409
410 def _validate_sqlite3_version(self):
411 """
412 Validate SQLite version.
413
414 Some features in storing rendered fields require SQLite >= 3.15.0.
415 """
416 if "sqlite" not in self.get("database", "sql_alchemy_conn"):
417 return
418
419 import sqlite3
420
421 min_sqlite_version = (3, 15, 0)
422 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version:
423 return
424
425 from airflow.utils.docs import get_docs_url
426
427 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version)
428 raise AirflowConfigException(
429 f"error: SQLite C library too old (< {min_sqlite_version_str}). "
430 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}"
431 )
432
433 def _get_custom_secret_backend(self, worker_mode: bool | None = None) -> Any | None:
434 return super()._get_custom_secret_backend(
435 worker_mode=worker_mode if worker_mode is not None else False
436 )
437
438 def mask_secrets(self):
439 from airflow._shared.configuration.parser import _build_kwarg_env_prefix, _collect_kwarg_env_vars
440 from airflow._shared.secrets_masker import mask_secret as mask_secret_core
441 from airflow.sdk.log import mask_secret as mask_secret_sdk
442
443 for section, key in self.sensitive_config_values:
444 try:
445 with self.suppress_future_warnings():
446 value = self.get(section, key, suppress_warnings=True)
447 except AirflowConfigException:
448 log.debug(
449 "Could not retrieve value from section %s, for key %s. Skipping redaction of this conf.",
450 section,
451 key,
452 )
453 continue
454 mask_secret_core(value)
455 mask_secret_sdk(value)
456
457 # Mask per-key backend kwarg env vars (AIRFLOW__SECRETS__BACKEND_KWARG__* etc.).
458 # These are not in sensitive_config_values but may contain sensitive values.
459 for _section, _kwargs_key in [
460 ("secrets", "backend_kwargs"),
461 ("workers", "secrets_backend_kwargs"),
462 ]:
463 _prefix = _build_kwarg_env_prefix(_section, _kwargs_key)
464 for _value in _collect_kwarg_env_vars(_prefix).values():
465 mask_secret_core(_value)
466 mask_secret_sdk(_value)
467
468 def load_test_config(self):
469 """
470 Use test configuration rather than the configuration coming from airflow defaults.
471
472 When running tests we use special the unit_test configuration to avoid accidental modifications and
473 different behaviours when running the tests. Values for those test configuration are stored in
474 the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder
475 and you need to change values there if you want to make some specific configuration to be used
476 """
477 from cryptography.fernet import Fernet
478
479 unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg"
480 unit_test_config = unit_test_config_file.read_text()
481 self.remove_all_read_configurations()
482 with StringIO(unit_test_config) as test_config_file:
483 self.read_file(test_config_file)
484
485 # We need those globals before we run "get_all_expansion_variables" because this is where
486 # the variables are expanded from in the configuration - set to random values for tests
487 _SecretKeys.fernet_key = Fernet.generate_key().decode()
488 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8")
489 self.expand_all_configuration_values()
490 log.info("Unit test configuration loaded from 'config_unit_tests.cfg'")
491
492 def expand_all_configuration_values(self):
493 """Expand all configuration values using global and local variables defined in this module."""
494 all_vars = get_all_expansion_variables()
495 for section in self.sections():
496 for key, value in self.items(section):
497 if value is not None:
498 if self.has_option(section, key):
499 self.remove_option(section, key, remove_default=False)
500 if self.is_template(section, key) or not isinstance(value, str):
501 self.set(section, key, value)
502 else:
503 self.set(section, key, value.format(**all_vars))
504
505 def remove_all_read_configurations(self):
506 """Remove all read configurations, leaving only default values in the config."""
507 for section in self.sections():
508 self.remove_section(section)
509
510 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
511 """
512 Override to use module-level function that reads from global conf.
513
514 This ensures as_dict() and other methods use the same secrets backend
515 configuration as the global conf instance (set via conf_vars in tests).
516 """
517 secrets_client = get_custom_secret_backend()
518 if not secrets_client:
519 return None
520 try:
521 return secrets_client.get_config(config_key)
522 except Exception as e:
523 raise AirflowConfigException(
524 "Cannot retrieve config from alternative secrets backend. "
525 "Make sure it is configured properly and that the Backend "
526 "is accessible.\n"
527 f"{e}"
528 )
529
530 def __getstate__(self) -> dict[str, Any]:
531 """Return the state of the object as a dictionary for pickling."""
532 return {
533 name: getattr(self, name)
534 for name in [
535 "_sections",
536 "is_validated",
537 "configuration_description",
538 "upgraded_values",
539 "_default_values",
540 ]
541 }
542
543 def __setstate__(self, state) -> None:
544 """Restore the state of the object from a dictionary representation."""
545 self.__init__() # type: ignore[misc]
546 config = state.pop("_sections")
547 self.read_dict(config)
548 self.__dict__.update(state)
549
550
551def get_airflow_home() -> str:
552 """Get path to Airflow Home."""
553 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow"))
554
555
556def get_airflow_config(airflow_home: str) -> str:
557 """Get Path to airflow.cfg path."""
558 airflow_config_var = os.environ.get("AIRFLOW_CONFIG")
559 if airflow_config_var is None:
560 return os.path.join(airflow_home, "airflow.cfg")
561 return expand_env_var(airflow_config_var)
562
563
564def get_all_expansion_variables() -> dict[str, Any]:
565 return {
566 "FERNET_KEY": _SecretKeys.fernet_key,
567 "JWT_SECRET_KEY": _SecretKeys.jwt_secret_key,
568 **{
569 k: v
570 for k, v in globals().items()
571 if not k.startswith("_") and not callable(v) and not ismodule(v)
572 },
573 }
574
575
576def _generate_fernet_key() -> str:
577 from cryptography.fernet import Fernet
578
579 return Fernet.generate_key().decode()
580
581
582def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser:
583 """
584 Create default config parser based on configuration description.
585
586 It creates ConfigParser with all default values retrieved from the configuration description and
587 expands all the variables from the global and local variables defined in this module.
588
589 :param configuration_description: configuration description - retrieved from config.yaml files
590 following the schema defined in "config.yml.schema.json" in the config_templates folder.
591 :return: Default Config Parser that can be used to read configuration values from.
592 """
593 parser = ConfigParser()
594 all_vars = get_all_expansion_variables()
595 configure_parser_from_configuration_description(parser, configuration_description, all_vars)
596 return parser
597
598
599def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
600 airflow_config = pathlib.Path(AIRFLOW_CONFIG)
601 if airflow_config.is_dir():
602 msg = (
603 "Airflow config expected to be a path to the configuration file, "
604 f"but got a directory {airflow_config.__fspath__()!r}."
605 )
606 raise IsADirectoryError(msg)
607 if not airflow_config.exists():
608 log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__())
609 config_directory = airflow_config.parent
610 if not config_directory.exists():
611 if not config_directory.is_relative_to(AIRFLOW_HOME):
612 msg = (
613 f"Config directory {config_directory.__fspath__()!r} not exists "
614 f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. "
615 "Please create this directory first."
616 )
617 raise FileNotFoundError(msg) from None
618 log.debug("Create directory %r for Airflow config", config_directory.__fspath__())
619 config_directory.mkdir(parents=True, exist_ok=True)
620 if not conf.get("core", "fernet_key"):
621 # We know that fernet_key is not set, so we can generate it, set as global key
622 # and also write it to the config file so that same key will be used next time
623 _SecretKeys.fernet_key = _generate_fernet_key()
624 conf._configuration_description["core"]["options"]["fernet_key"]["default"] = (
625 _SecretKeys.fernet_key
626 )
627 conf._default_values.set("core", "fernet_key", _SecretKeys.fernet_key)
628
629 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8")
630 conf._configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = (
631 _SecretKeys.jwt_secret_key
632 )
633 conf._default_values.set("api_auth", "jwt_secret", _SecretKeys.jwt_secret_key)
634 # Invalidate cached configuration_description so it recomputes with the updated base
635 conf.invalidate_cache()
636 pathlib.Path(airflow_config.__fspath__()).touch()
637 make_group_other_inaccessible(airflow_config.__fspath__())
638 with open(airflow_config, "w") as file:
639 conf.write(
640 file,
641 include_sources=False,
642 include_env_vars=True,
643 include_providers=True,
644 extra_spacing=True,
645 only_defaults=True,
646 show_values=True,
647 )
648 return conf
649
650
651def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser):
652 """
653 Load standard airflow configuration.
654
655 In case it finds that the configuration file is missing, it will create it and write the default
656 configuration values there, based on defaults passed, and will add the comments and examples
657 from the default configuration.
658
659 :param airflow_config_parser: parser to which the configuration will be loaded
660
661 """
662 global AIRFLOW_HOME # to be cleaned in Airflow 4.0
663 log.info("Reading the config from %s", AIRFLOW_CONFIG)
664 airflow_config_parser.read(AIRFLOW_CONFIG)
665 if airflow_config_parser.has_option("core", "AIRFLOW_HOME"):
666 msg = (
667 "Specifying both AIRFLOW_HOME environment variable and airflow_home "
668 "in the config file is deprecated. Please use only the AIRFLOW_HOME "
669 "environment variable and remove the config file entry."
670 )
671 if "AIRFLOW_HOME" in os.environ:
672 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1)
673 elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME:
674 warnings.warn(
675 "Specifying airflow_home in the config file is deprecated. As you "
676 "have left it at the default value you should remove the setting "
677 "from your airflow.cfg and suffer no change in behaviour.",
678 category=RemovedInAirflow4Warning,
679 stacklevel=1,
680 )
681 else:
682 AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home")
683 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1)
684
685
686def initialize_config() -> AirflowConfigParser:
687 """
688 Load the Airflow config files.
689
690 Called for you automatically as part of the Airflow boot process.
691 """
692 airflow_config_parser = AirflowConfigParser()
693 if airflow_config_parser.getboolean("core", "unit_test_mode"):
694 airflow_config_parser.load_test_config()
695 else:
696 load_standard_airflow_configuration(airflow_config_parser)
697 # If the user set unit_test_mode in the airflow.cfg, we still
698 # want to respect that and then load the default unit test configuration
699 # file on top of it.
700 if airflow_config_parser.getboolean("core", "unit_test_mode"):
701 airflow_config_parser.load_test_config()
702 return airflow_config_parser
703
704
705def make_group_other_inaccessible(file_path: str):
706 try:
707 permissions = os.stat(file_path)
708 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR))
709 except Exception as e:
710 log.warning(
711 "Could not change permissions of config file to be group/other inaccessible. "
712 "Continuing with original permissions: %s",
713 e,
714 )
715
716
717def ensure_secrets_loaded(
718 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
719) -> list[BaseSecretsBackend]:
720 """
721 Ensure that all secrets backends are loaded.
722
723 If the secrets_backend_list contains only 2 default backends, reload it.
724 """
725 # Check if the secrets_backend_list contains only 2 default backends.
726
727 # Check if we are loading the backends for worker too by checking if the default_backends is equal
728 # to DEFAULT_SECRETS_SEARCH_PATH.
729 if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH:
730 return initialize_secrets_backends(default_backends=default_backends)
731 return secrets_backend_list
732
733
734def get_custom_secret_backend(worker_mode: bool = False) -> BaseSecretsBackend | None:
735 """
736 Get Secret Backend if defined in airflow.cfg.
737
738 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
739
740 This is a convenience function that calls conf._get_custom_secret_backend().
741 """
742 return conf._get_custom_secret_backend(worker_mode=worker_mode)
743
744
745def initialize_secrets_backends(
746 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
747) -> list[BaseSecretsBackend]:
748 """
749 Initialize secrets backend.
750
751 * import secrets backend classes
752 * instantiate them and return them in a list
753 """
754 backend_list = []
755 worker_mode = False
756 if default_backends != DEFAULT_SECRETS_SEARCH_PATH:
757 worker_mode = True
758
759 custom_secret_backend = get_custom_secret_backend(worker_mode)
760
761 if custom_secret_backend is not None:
762 from airflow.models import Connection
763
764 custom_secret_backend._set_connection_class(Connection)
765 backend_list.append(custom_secret_backend)
766
767 for class_name in default_backends:
768 from airflow.models import Connection
769
770 secrets_backend_cls = import_string(class_name)
771 backend = secrets_backend_cls()
772 backend._set_connection_class(Connection)
773 backend_list.append(backend)
774
775 return backend_list
776
777
778def initialize_auth_manager() -> BaseAuthManager:
779 """
780 Initialize auth manager.
781
782 * import user manager class
783 * instantiate it and return it
784 """
785 auth_manager_cls = conf.getimport(section="core", key="auth_manager")
786
787 if not auth_manager_cls:
788 raise AirflowConfigException(
789 "No auth manager defined in the config. Please specify one using section/key [core/auth_manager]."
790 )
791
792 return auth_manager_cls()
793
794
795# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
796# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults.
797AIRFLOW_HOME = get_airflow_home()
798AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME)
799
800# Set up dags folder for unit tests
801# this directory won't exist if users install via pip
802_TEST_DAGS_FOLDER = os.path.join(
803 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags"
804)
805if os.path.exists(_TEST_DAGS_FOLDER):
806 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
807else:
808 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags")
809
810# Set up plugins folder for unit tests
811_TEST_PLUGINS_FOLDER = os.path.join(
812 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins"
813)
814if os.path.exists(_TEST_PLUGINS_FOLDER):
815 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
816else:
817 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins")
818
819SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8")
820
821conf: AirflowConfigParser = initialize_config()
822secrets_backend_list = initialize_secrets_backends()
823conf.validate()