1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
18
19import logging
20import os
21import pathlib
22import re
23import shlex
24import stat
25import subprocess
26import sys
27import warnings
28from base64 import b64encode
29from collections.abc import Callable
30from configparser import ConfigParser
31from inspect import ismodule
32from io import StringIO
33from re import Pattern
34from typing import IO, TYPE_CHECKING, Any
35from urllib.parse import urlsplit
36
37from typing_extensions import overload
38
39from airflow._shared.configuration.parser import (
40 AirflowConfigParser as _SharedAirflowConfigParser,
41 configure_parser_from_configuration_description,
42)
43from airflow._shared.module_loading import import_string
44from airflow.exceptions import AirflowConfigException, RemovedInAirflow4Warning
45from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH
46from airflow.task.weight_rule import WeightRule
47from airflow.utils import yaml
48
49if TYPE_CHECKING:
50 from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
51 from airflow.secrets import BaseSecretsBackend
52
53log = logging.getLogger(__name__)
54
55# show Airflow's deprecation warnings
56if not sys.warnoptions:
57 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow")
58 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow")
59
60_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$")
61
62ConfigType = str | int | float | bool
63ConfigOptionsDictType = dict[str, ConfigType]
64ConfigSectionSourcesType = dict[str, str | tuple[str, str]]
65ConfigSourcesType = dict[str, ConfigSectionSourcesType]
66
67ENV_VAR_PREFIX = "AIRFLOW__"
68
69
70class _SecretKeys:
71 """Holds the secret keys used in Airflow during runtime."""
72
73 fernet_key: str = "" # Set only if needed when generating a new file
74 jwt_secret_key: str = ""
75
76
77class ConfigModifications:
78 """
79 Holds modifications to be applied when writing out the config.
80
81 :param rename: Mapping from (old_section, old_option) to (new_section, new_option)
82 :param remove: Set of (section, option) to remove
83 :param default_updates: Mapping from (section, option) to new default value
84 """
85
86 def __init__(self) -> None:
87 self.rename: dict[tuple[str, str], tuple[str, str]] = {}
88 self.remove: set[tuple[str, str]] = set()
89 self.default_updates: dict[tuple[str, str], str] = {}
90
91 def add_rename(self, old_section: str, old_option: str, new_section: str, new_option: str) -> None:
92 self.rename[(old_section, old_option)] = (new_section, new_option)
93
94 def add_remove(self, section: str, option: str) -> None:
95 self.remove.add((section, option))
96
97 def add_default_update(self, section: str, option: str, new_default: str) -> None:
98 self.default_updates[(section, option)] = new_default
99
100
101def _parse_sqlite_version(s: str) -> tuple[int, ...]:
102 match = _SQLITE3_VERSION_PATTERN.match(s)
103 if match is None:
104 return ()
105 return tuple(int(p) for p in match.group("version").split("."))
106
107
108@overload
109def expand_env_var(env_var: None) -> None: ...
110
111
112@overload
113def expand_env_var(env_var: str) -> str: ...
114
115
116def expand_env_var(env_var: str | None) -> str | None:
117 """
118 Expand (potentially nested) env vars.
119
120 Repeat and apply `expandvars` and `expanduser` until
121 interpolation stops having any effect.
122 """
123 if not env_var or not isinstance(env_var, str):
124 return env_var
125 while True:
126 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
127 if interpolated == env_var:
128 return interpolated
129 env_var = interpolated
130
131
132def run_command(command: str) -> str:
133 """Run command and returns stdout."""
134 process = subprocess.Popen(
135 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
136 )
137 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
138
139 if process.returncode != 0:
140 raise AirflowConfigException(
141 f"Cannot execute {command}. Error code is: {process.returncode}. "
142 f"Output: {output}, Stderr: {stderr}"
143 )
144
145 return output
146
147
148def _default_config_file_path(file_name: str) -> str:
149 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates")
150 return os.path.join(templates_dir, file_name)
151
152
153def retrieve_configuration_description(
154 include_airflow: bool = True,
155 include_providers: bool = True,
156 selected_provider: str | None = None,
157) -> dict[str, dict[str, Any]]:
158 """
159 Read Airflow configuration description from YAML file.
160
161 :param include_airflow: Include Airflow configs
162 :param include_providers: Include provider configs
163 :param selected_provider: If specified, include selected provider only
164 :return: Python dictionary containing configs & their info
165 """
166 base_configuration_description: dict[str, dict[str, Any]] = {}
167 if include_airflow:
168 with open(_default_config_file_path("config.yml")) as config_file:
169 base_configuration_description.update(yaml.safe_load(config_file))
170 if include_providers:
171 from airflow.providers_manager import ProvidersManager
172
173 for provider, config in ProvidersManager().provider_configs:
174 if not selected_provider or provider == selected_provider:
175 base_configuration_description.update(config)
176 return base_configuration_description
177
178
179class AirflowConfigParser(_SharedAirflowConfigParser):
180 """
181 Custom Airflow Configparser supporting defaults and deprecated options.
182
183 This is a subclass of the shared AirflowConfigParser that adds Core-specific initialization
184 and functionality (providers, validation, writing, etc.).
185
186 The defaults are stored in the ``_default_values``. The configuration description keeps
187 description of all the options available in Airflow (description follow config.yaml.schema).
188
189 :param default_config: default configuration (in the form of ini file).
190 :param configuration_description: description of configuration to use
191 """
192
193 def __init__(
194 self,
195 default_config: str | None = None,
196 *args,
197 **kwargs,
198 ):
199 _configuration_description = retrieve_configuration_description(include_providers=False)
200 # For those who would like to use a different data structure to keep defaults:
201 # We have to keep the default values in a ConfigParser rather than in any other
202 # data structure, because the values we have might contain %% which are ConfigParser
203 # interpolation placeholders. The _default_values config parser will interpolate them
204 # properly when we call get() on it.
205 _default_values = create_default_config_parser(_configuration_description)
206 from airflow.providers_manager import ProvidersManager
207
208 super().__init__(
209 _configuration_description,
210 _default_values,
211 ProvidersManager,
212 create_default_config_parser,
213 _default_config_file_path("provider_config_fallback_defaults.cfg"),
214 *args,
215 **kwargs,
216 )
217 self._configuration_description = _configuration_description
218 self._default_values = _default_values
219 if default_config is not None:
220 self._update_defaults_from_string(default_config)
221 self._update_logging_deprecated_template_to_one_from_defaults()
222 self.is_validated = False
223 self._suppress_future_warnings = False
224
225 @property
226 def _validators(self) -> list[Callable[[], None]]:
227 """Overring _validators from shared base class to add core-specific validators."""
228 return [
229 self._validate_sqlite3_version,
230 self._validate_enums,
231 self._validate_deprecated_values,
232 self._upgrade_postgres_metastore_conn,
233 ]
234
235 def _update_logging_deprecated_template_to_one_from_defaults(self):
236 default = self.get_default_value("logging", "log_filename_template")
237 if default:
238 # Tuple does not support item assignment, so we have to create a new tuple and replace it
239 original_replacement = self.deprecated_values["logging"]["log_filename_template"]
240 self.deprecated_values["logging"]["log_filename_template"] = (
241 original_replacement[0],
242 default,
243 )
244
245 # A mapping of old default values that we want to change and warn the user
246 # about. Mapping of section -> setting -> { old, replace }
247 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {
248 "logging": {
249 "log_filename_template": (
250 re.compile(
251 re.escape(
252 "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log"
253 )
254 ),
255 # The actual replacement value will be updated after defaults are loaded from config.yml
256 "XX-set-after-default-config-loaded-XX",
257 ),
258 },
259 "core": {
260 "executor": (re.compile(re.escape("SequentialExecutor")), "LocalExecutor"),
261 },
262 }
263
264 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"]
265 enums_options = {
266 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()),
267 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"],
268 ("dag_processor", "file_parsing_sort_mode"): [
269 "modified_time",
270 "random_seeded_by_host",
271 "alphabetical",
272 ],
273 ("logging", "logging_level"): _available_logging_levels,
274 ("logging", "fab_logging_level"): _available_logging_levels,
275 # celery_logging_level can be empty, which uses logging_level as fallback
276 ("logging", "celery_logging_level"): [*_available_logging_levels, ""],
277 # uvicorn and gunicorn logging levels for web servers
278 ("logging", "uvicorn_logging_level"): _available_logging_levels,
279 ("logging", "gunicorn_logging_level"): _available_logging_levels,
280 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""],
281 ("api", "grid_view_sorting_order"): ["topological", "hierarchical_alphabetical"],
282 ("logging", "dag_processor_log_target"): ["file", "stdout"],
283 }
284
285 upgraded_values: dict[tuple[str, str], str]
286 """Mapping of (section,option) to the old value that was upgraded"""
287
288 def write_custom_config(
289 self,
290 file: IO[str],
291 comment_out_defaults: bool = True,
292 include_descriptions: bool = True,
293 extra_spacing: bool = True,
294 modifications: ConfigModifications | None = None,
295 ) -> None:
296 """
297 Write a configuration file using a ConfigModifications object.
298
299 This method includes only options from the current airflow.cfg. For each option:
300 - If it's marked for removal, omit it.
301 - If renamed, output it under its new name and add a comment indicating its original location.
302 - If a default update is specified, apply the new default and output the option as a commented line.
303 - Otherwise, if the current value equals the default and comment_out_defaults is True, output it as a comment.
304 Options absent from the current airflow.cfg are omitted.
305
306 :param file: File to write the configuration.
307 :param comment_out_defaults: If True, options whose value equals the default are written as comments.
308 :param include_descriptions: Whether to include section descriptions.
309 :param extra_spacing: Whether to insert an extra blank line after each option.
310 :param modifications: ConfigModifications instance with rename, remove, and default updates.
311 """
312 modifications = modifications or ConfigModifications()
313 output: dict[str, list[tuple[str, str, bool, str]]] = {}
314
315 for section in self._sections: # type: ignore[attr-defined] # accessing _sections from ConfigParser
316 for option, orig_value in self._sections[section].items(): # type: ignore[attr-defined]
317 key = (section.lower(), option.lower())
318 if key in modifications.remove:
319 continue
320
321 mod_comment = ""
322 if key in modifications.rename:
323 new_sec, new_opt = modifications.rename[key]
324 effective_section = new_sec
325 effective_option = new_opt
326 mod_comment += f"# Renamed from {section}.{option}\n"
327 else:
328 effective_section = section
329 effective_option = option
330
331 value = orig_value
332 if key in modifications.default_updates:
333 mod_comment += (
334 f"# Default updated from {orig_value} to {modifications.default_updates[key]}\n"
335 )
336 value = modifications.default_updates[key]
337
338 default_value = self.get_default_value(effective_section, effective_option, fallback="")
339 is_default = str(value) == str(default_value)
340 output.setdefault(effective_section.lower(), []).append(
341 (effective_option, str(value), is_default, mod_comment)
342 )
343
344 for section, options in output.items():
345 section_buffer = StringIO()
346 section_buffer.write(f"[{section}]\n")
347 if include_descriptions:
348 description = self.configuration_description.get(section, {}).get("description", "")
349 if description:
350 for line in description.splitlines():
351 section_buffer.write(f"# {line}\n")
352 section_buffer.write("\n")
353 for option, value_str, is_default, mod_comment in options:
354 key = (section.lower(), option.lower())
355 if key in modifications.default_updates and comment_out_defaults:
356 section_buffer.write(f"# {option} = {value_str}\n")
357 else:
358 if mod_comment:
359 section_buffer.write(mod_comment)
360 if is_default and comment_out_defaults:
361 section_buffer.write(f"# {option} = {value_str}\n")
362 else:
363 section_buffer.write(f"{option} = {value_str}\n")
364 if extra_spacing:
365 section_buffer.write("\n")
366 content = section_buffer.getvalue().strip()
367 if content:
368 file.write(f"{content}\n\n")
369
370 def _upgrade_postgres_metastore_conn(self):
371 """
372 Upgrade SQL schemas.
373
374 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres`
375 must be replaced with `postgresql+psycopg2`. The bare `postgresql`
376 scheme is also upgraded to make the psycopg2 driver explicit.
377 """
378 section, key = "database", "sql_alchemy_conn"
379 old_value = self.get(section, key, _extra_stacklevel=1)
380 bad_schemes = ["postgres+psycopg2", "postgres", "postgresql"]
381 good_scheme = "postgresql+psycopg2"
382 parsed = urlsplit(old_value)
383 if parsed.scheme in bad_schemes:
384 warnings.warn(
385 f"Bad scheme in Airflow configuration [database] sql_alchemy_conn: `{parsed.scheme}`. "
386 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must "
387 f"change to `{good_scheme}` before the next Airflow release.",
388 FutureWarning,
389 stacklevel=1,
390 )
391 self.upgraded_values[(section, key)] = old_value
392 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value)
393 self._update_env_var(section=section, name=key, new_value=new_value)
394
395 # if the old value is set via env var, we need to wipe it
396 # otherwise, it'll "win" over our adjusted value
397 old_env_var = self._env_var_name("core", key)
398 os.environ.pop(old_env_var, None)
399
400 def _validate_enums(self):
401 """Validate that enum type config has an accepted value."""
402 for (section_key, option_key), enum_options in self.enums_options.items():
403 if self.has_option(section_key, option_key):
404 value = self.get(section_key, option_key, fallback=None)
405 if value and value not in enum_options:
406 raise AirflowConfigException(
407 f"`[{section_key}] {option_key}` should not be "
408 f"{value!r}. Possible values: {', '.join(enum_options)}."
409 )
410
411 def _validate_sqlite3_version(self):
412 """
413 Validate SQLite version.
414
415 Some features in storing rendered fields require SQLite >= 3.15.0.
416 """
417 if "sqlite" not in self.get("database", "sql_alchemy_conn"):
418 return
419
420 import sqlite3
421
422 min_sqlite_version = (3, 15, 0)
423 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version:
424 return
425
426 from airflow.utils.docs import get_docs_url
427
428 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version)
429 raise AirflowConfigException(
430 f"error: SQLite C library too old (< {min_sqlite_version_str}). "
431 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}"
432 )
433
434 def _get_custom_secret_backend(self, worker_mode: bool | None = None) -> Any | None:
435 return super()._get_custom_secret_backend(
436 worker_mode=worker_mode if worker_mode is not None else False
437 )
438
439 def mask_secrets(self):
440 from airflow._shared.configuration.parser import _build_kwarg_env_prefix, _collect_kwarg_env_vars
441 from airflow._shared.secrets_masker import mask_secret as mask_secret_core
442 from airflow.sdk.log import mask_secret as mask_secret_sdk
443
444 for section, key in self.sensitive_config_values:
445 try:
446 with self.suppress_future_warnings():
447 value = self.get(section, key, suppress_warnings=True)
448 except AirflowConfigException:
449 log.debug(
450 "Could not retrieve value from section %s, for key %s. Skipping redaction of this conf.",
451 section,
452 key,
453 )
454 continue
455 mask_secret_core(value)
456 mask_secret_sdk(value)
457
458 # Mask per-key backend kwarg env vars (AIRFLOW__SECRETS__BACKEND_KWARG__* etc.).
459 # These are not in sensitive_config_values but may contain sensitive values.
460 for _section, _kwargs_key in [
461 ("secrets", "backend_kwargs"),
462 ("workers", "secrets_backend_kwargs"),
463 ]:
464 _prefix = _build_kwarg_env_prefix(_section, _kwargs_key)
465 for _value in _collect_kwarg_env_vars(_prefix).values():
466 mask_secret_core(_value)
467 mask_secret_sdk(_value)
468
469 def load_test_config(self):
470 """
471 Use test configuration rather than the configuration coming from airflow defaults.
472
473 When running tests we use special the unit_test configuration to avoid accidental modifications and
474 different behaviours when running the tests. Values for those test configuration are stored in
475 the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder
476 and you need to change values there if you want to make some specific configuration to be used
477 """
478 from cryptography.fernet import Fernet
479
480 unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg"
481 unit_test_config = unit_test_config_file.read_text()
482 self.remove_all_read_configurations()
483 with StringIO(unit_test_config) as test_config_file:
484 self.read_file(test_config_file)
485
486 # We need those globals before we run "get_all_expansion_variables" because this is where
487 # the variables are expanded from in the configuration - set to random values for tests
488 _SecretKeys.fernet_key = Fernet.generate_key().decode()
489 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8")
490 self.expand_all_configuration_values()
491 log.info("Unit test configuration loaded from 'config_unit_tests.cfg'")
492
493 def expand_all_configuration_values(self):
494 """Expand all configuration values using global and local variables defined in this module."""
495 all_vars = get_all_expansion_variables()
496 for section in self.sections():
497 for key, value in self.items(section):
498 if value is not None:
499 if self.has_option(section, key):
500 self.remove_option(section, key, remove_default=False)
501 if self.is_template(section, key) or not isinstance(value, str):
502 self.set(section, key, value)
503 else:
504 self.set(section, key, value.format(**all_vars))
505
506 def remove_all_read_configurations(self):
507 """Remove all read configurations, leaving only default values in the config."""
508 for section in self.sections():
509 self.remove_section(section)
510
511 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
512 """
513 Override to use module-level function that reads from global conf.
514
515 This ensures as_dict() and other methods use the same secrets backend
516 configuration as the global conf instance (set via conf_vars in tests).
517 """
518 secrets_client = get_custom_secret_backend()
519 if not secrets_client:
520 return None
521 try:
522 return secrets_client.get_config(config_key)
523 except Exception as e:
524 raise AirflowConfigException(
525 "Cannot retrieve config from alternative secrets backend. "
526 "Make sure it is configured properly and that the Backend "
527 "is accessible.\n"
528 f"{e}"
529 )
530
531 def __getstate__(self) -> dict[str, Any]:
532 """Return the state of the object as a dictionary for pickling."""
533 return {
534 name: getattr(self, name)
535 for name in [
536 "_sections",
537 "is_validated",
538 "configuration_description",
539 "upgraded_values",
540 "_default_values",
541 ]
542 }
543
544 def __setstate__(self, state) -> None:
545 """Restore the state of the object from a dictionary representation."""
546 self.__init__() # type: ignore[misc]
547 config = state.pop("_sections")
548 self.read_dict(config)
549 self.__dict__.update(state)
550
551
552def get_airflow_home() -> str:
553 """Get path to Airflow Home."""
554 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow"))
555
556
557def get_airflow_config(airflow_home: str) -> str:
558 """Get Path to airflow.cfg path."""
559 airflow_config_var = os.environ.get("AIRFLOW_CONFIG")
560 if airflow_config_var is None:
561 return os.path.join(airflow_home, "airflow.cfg")
562 return expand_env_var(airflow_config_var)
563
564
565def get_all_expansion_variables() -> dict[str, Any]:
566 return {
567 "FERNET_KEY": _SecretKeys.fernet_key,
568 "JWT_SECRET_KEY": _SecretKeys.jwt_secret_key,
569 **{
570 k: v
571 for k, v in globals().items()
572 if not k.startswith("_") and not callable(v) and not ismodule(v)
573 },
574 }
575
576
577def _generate_fernet_key() -> str:
578 from cryptography.fernet import Fernet
579
580 return Fernet.generate_key().decode()
581
582
583def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser:
584 """
585 Create default config parser based on configuration description.
586
587 It creates ConfigParser with all default values retrieved from the configuration description and
588 expands all the variables from the global and local variables defined in this module.
589
590 :param configuration_description: configuration description - retrieved from config.yaml files
591 following the schema defined in "config.yml.schema.json" in the config_templates folder.
592 :return: Default Config Parser that can be used to read configuration values from.
593 """
594 parser = ConfigParser()
595 all_vars = get_all_expansion_variables()
596 configure_parser_from_configuration_description(parser, configuration_description, all_vars)
597 return parser
598
599
600def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
601 airflow_config = pathlib.Path(AIRFLOW_CONFIG)
602 if airflow_config.is_dir():
603 msg = (
604 "Airflow config expected to be a path to the configuration file, "
605 f"but got a directory {airflow_config.__fspath__()!r}."
606 )
607 raise IsADirectoryError(msg)
608 if not airflow_config.exists():
609 log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__())
610 config_directory = airflow_config.parent
611 if not config_directory.exists():
612 if not config_directory.is_relative_to(AIRFLOW_HOME):
613 msg = (
614 f"Config directory {config_directory.__fspath__()!r} not exists "
615 f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. "
616 "Please create this directory first."
617 )
618 raise FileNotFoundError(msg) from None
619 log.debug("Create directory %r for Airflow config", config_directory.__fspath__())
620 config_directory.mkdir(parents=True, exist_ok=True)
621 if not conf.get("core", "fernet_key"):
622 # We know that fernet_key is not set, so we can generate it, set as global key
623 # and also write it to the config file so that same key will be used next time
624 _SecretKeys.fernet_key = _generate_fernet_key()
625 conf._configuration_description["core"]["options"]["fernet_key"]["default"] = (
626 _SecretKeys.fernet_key
627 )
628 conf._default_values.set("core", "fernet_key", _SecretKeys.fernet_key)
629
630 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8")
631 conf._configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = (
632 _SecretKeys.jwt_secret_key
633 )
634 conf._default_values.set("api_auth", "jwt_secret", _SecretKeys.jwt_secret_key)
635 # Invalidate cached configuration_description so it recomputes with the updated base
636 conf.invalidate_cache()
637 pathlib.Path(airflow_config.__fspath__()).touch()
638 make_group_other_inaccessible(airflow_config.__fspath__())
639 with open(airflow_config, "w") as file:
640 conf.write(
641 file,
642 include_sources=False,
643 include_env_vars=True,
644 include_providers=True,
645 extra_spacing=True,
646 only_defaults=True,
647 show_values=True,
648 )
649 return conf
650
651
652def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser):
653 """
654 Load standard airflow configuration.
655
656 In case it finds that the configuration file is missing, it will create it and write the default
657 configuration values there, based on defaults passed, and will add the comments and examples
658 from the default configuration.
659
660 :param airflow_config_parser: parser to which the configuration will be loaded
661
662 """
663 global AIRFLOW_HOME # to be cleaned in Airflow 4.0
664 log.info("Reading the config from %s", AIRFLOW_CONFIG)
665 airflow_config_parser.read(AIRFLOW_CONFIG)
666 if airflow_config_parser.has_option("core", "AIRFLOW_HOME"):
667 msg = (
668 "Specifying both AIRFLOW_HOME environment variable and airflow_home "
669 "in the config file is deprecated. Please use only the AIRFLOW_HOME "
670 "environment variable and remove the config file entry."
671 )
672 if "AIRFLOW_HOME" in os.environ:
673 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1)
674 elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME:
675 warnings.warn(
676 "Specifying airflow_home in the config file is deprecated. As you "
677 "have left it at the default value you should remove the setting "
678 "from your airflow.cfg and suffer no change in behaviour.",
679 category=RemovedInAirflow4Warning,
680 stacklevel=1,
681 )
682 else:
683 AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home")
684 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1)
685
686
687def initialize_config() -> AirflowConfigParser:
688 """
689 Load the Airflow config files.
690
691 Called for you automatically as part of the Airflow boot process.
692 """
693 airflow_config_parser = AirflowConfigParser()
694 if airflow_config_parser.getboolean("core", "unit_test_mode"):
695 airflow_config_parser.load_test_config()
696 else:
697 load_standard_airflow_configuration(airflow_config_parser)
698 # If the user set unit_test_mode in the airflow.cfg, we still
699 # want to respect that and then load the default unit test configuration
700 # file on top of it.
701 if airflow_config_parser.getboolean("core", "unit_test_mode"):
702 airflow_config_parser.load_test_config()
703 return airflow_config_parser
704
705
706def make_group_other_inaccessible(file_path: str):
707 try:
708 permissions = os.stat(file_path)
709 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR))
710 except Exception as e:
711 log.warning(
712 "Could not change permissions of config file to be group/other inaccessible. "
713 "Continuing with original permissions: %s",
714 e,
715 )
716
717
718def ensure_secrets_loaded(
719 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
720) -> list[BaseSecretsBackend]:
721 """
722 Ensure that all secrets backends are loaded.
723
724 If the secrets_backend_list contains only 2 default backends, reload it.
725 """
726 # Check if the secrets_backend_list contains only 2 default backends.
727
728 # Check if we are loading the backends for worker too by checking if the default_backends is equal
729 # to DEFAULT_SECRETS_SEARCH_PATH.
730 if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH:
731 return initialize_secrets_backends(default_backends=default_backends)
732 return secrets_backend_list
733
734
735def get_custom_secret_backend(worker_mode: bool = False) -> BaseSecretsBackend | None:
736 """
737 Get Secret Backend if defined in airflow.cfg.
738
739 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
740
741 This is a convenience function that calls conf._get_custom_secret_backend().
742 """
743 return conf._get_custom_secret_backend(worker_mode=worker_mode)
744
745
746def initialize_secrets_backends(
747 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
748) -> list[BaseSecretsBackend]:
749 """
750 Initialize secrets backend.
751
752 * import secrets backend classes
753 * instantiate them and return them in a list
754 """
755 backend_list = []
756 worker_mode = False
757 if default_backends != DEFAULT_SECRETS_SEARCH_PATH:
758 worker_mode = True
759
760 custom_secret_backend = get_custom_secret_backend(worker_mode)
761
762 if custom_secret_backend is not None:
763 from airflow.models import Connection
764
765 custom_secret_backend._set_connection_class(Connection)
766 backend_list.append(custom_secret_backend)
767
768 for class_name in default_backends:
769 from airflow.models import Connection
770
771 secrets_backend_cls = import_string(class_name)
772 backend = secrets_backend_cls()
773 backend._set_connection_class(Connection)
774 backend_list.append(backend)
775
776 return backend_list
777
778
779def initialize_auth_manager() -> BaseAuthManager:
780 """
781 Initialize auth manager.
782
783 * import user manager class
784 * instantiate it and return it
785 """
786 auth_manager_cls = conf.getimport(section="core", key="auth_manager")
787
788 if not auth_manager_cls:
789 raise AirflowConfigException(
790 "No auth manager defined in the config. Please specify one using section/key [core/auth_manager]."
791 )
792
793 return auth_manager_cls()
794
795
796# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
797# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults.
798AIRFLOW_HOME = get_airflow_home()
799AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME)
800
801# Set up dags folder for unit tests
802# this directory won't exist if users install via pip
803_TEST_DAGS_FOLDER = os.path.join(
804 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags"
805)
806if os.path.exists(_TEST_DAGS_FOLDER):
807 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
808else:
809 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags")
810
811# Set up plugins folder for unit tests
812_TEST_PLUGINS_FOLDER = os.path.join(
813 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins"
814)
815if os.path.exists(_TEST_PLUGINS_FOLDER):
816 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
817else:
818 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins")
819
820SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8")
821
822conf: AirflowConfigParser = initialize_config()
823secrets_backend_list = initialize_secrets_backends()
824conf.validate()