1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
18
19import contextlib
20import logging
21import multiprocessing
22import os
23import pathlib
24import re
25import shlex
26import stat
27import subprocess
28import sys
29import warnings
30from base64 import b64encode
31from collections.abc import Callable
32from configparser import ConfigParser
33from copy import deepcopy
34from inspect import ismodule
35from io import StringIO
36from re import Pattern
37from typing import IO, TYPE_CHECKING, Any
38from urllib.parse import urlsplit
39
40from typing_extensions import overload
41
42from airflow._shared.configuration.parser import (
43 VALUE_NOT_FOUND_SENTINEL,
44 AirflowConfigParser as _SharedAirflowConfigParser,
45 ValueNotFound,
46 configure_parser_from_configuration_description,
47)
48from airflow._shared.module_loading import import_string
49from airflow.exceptions import AirflowConfigException, RemovedInAirflow4Warning
50from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH
51from airflow.task.weight_rule import WeightRule
52from airflow.utils import yaml
53
54if TYPE_CHECKING:
55 from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
56 from airflow.secrets import BaseSecretsBackend
57
58log = logging.getLogger(__name__)
59
60# show Airflow's deprecation warnings
61if not sys.warnoptions:
62 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow")
63 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow")
64
65_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$")
66
67ConfigType = str | int | float | bool
68ConfigOptionsDictType = dict[str, ConfigType]
69ConfigSectionSourcesType = dict[str, str | tuple[str, str]]
70ConfigSourcesType = dict[str, ConfigSectionSourcesType]
71
72ENV_VAR_PREFIX = "AIRFLOW__"
73
74
75class _SecretKeys:
76 """Holds the secret keys used in Airflow during runtime."""
77
78 fernet_key: str = "" # Set only if needed when generating a new file
79 jwt_secret_key: str = ""
80
81
82class ConfigModifications:
83 """
84 Holds modifications to be applied when writing out the config.
85
86 :param rename: Mapping from (old_section, old_option) to (new_section, new_option)
87 :param remove: Set of (section, option) to remove
88 :param default_updates: Mapping from (section, option) to new default value
89 """
90
91 def __init__(self) -> None:
92 self.rename: dict[tuple[str, str], tuple[str, str]] = {}
93 self.remove: set[tuple[str, str]] = set()
94 self.default_updates: dict[tuple[str, str], str] = {}
95
96 def add_rename(self, old_section: str, old_option: str, new_section: str, new_option: str) -> None:
97 self.rename[(old_section, old_option)] = (new_section, new_option)
98
99 def add_remove(self, section: str, option: str) -> None:
100 self.remove.add((section, option))
101
102 def add_default_update(self, section: str, option: str, new_default: str) -> None:
103 self.default_updates[(section, option)] = new_default
104
105
106def _parse_sqlite_version(s: str) -> tuple[int, ...]:
107 match = _SQLITE3_VERSION_PATTERN.match(s)
108 if match is None:
109 return ()
110 return tuple(int(p) for p in match.group("version").split("."))
111
112
113@overload
114def expand_env_var(env_var: None) -> None: ...
115
116
117@overload
118def expand_env_var(env_var: str) -> str: ...
119
120
121def expand_env_var(env_var: str | None) -> str | None:
122 """
123 Expand (potentially nested) env vars.
124
125 Repeat and apply `expandvars` and `expanduser` until
126 interpolation stops having any effect.
127 """
128 if not env_var or not isinstance(env_var, str):
129 return env_var
130 while True:
131 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
132 if interpolated == env_var:
133 return interpolated
134 env_var = interpolated
135
136
137def run_command(command: str) -> str:
138 """Run command and returns stdout."""
139 process = subprocess.Popen(
140 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
141 )
142 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
143
144 if process.returncode != 0:
145 raise AirflowConfigException(
146 f"Cannot execute {command}. Error code is: {process.returncode}. "
147 f"Output: {output}, Stderr: {stderr}"
148 )
149
150 return output
151
152
153def _default_config_file_path(file_name: str) -> str:
154 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates")
155 return os.path.join(templates_dir, file_name)
156
157
158def retrieve_configuration_description(
159 include_airflow: bool = True,
160 include_providers: bool = True,
161 selected_provider: str | None = None,
162) -> dict[str, dict[str, Any]]:
163 """
164 Read Airflow configuration description from YAML file.
165
166 :param include_airflow: Include Airflow configs
167 :param include_providers: Include provider configs
168 :param selected_provider: If specified, include selected provider only
169 :return: Python dictionary containing configs & their info
170 """
171 base_configuration_description: dict[str, dict[str, Any]] = {}
172 if include_airflow:
173 with open(_default_config_file_path("config.yml")) as config_file:
174 base_configuration_description.update(yaml.safe_load(config_file))
175 if include_providers:
176 from airflow.providers_manager import ProvidersManager
177
178 for provider, config in ProvidersManager().provider_configs:
179 if not selected_provider or provider == selected_provider:
180 base_configuration_description.update(config)
181 return base_configuration_description
182
183
184class AirflowConfigParser(_SharedAirflowConfigParser):
185 """
186 Custom Airflow Configparser supporting defaults and deprecated options.
187
188 This is a subclass of the shared AirflowConfigParser that adds Core-specific initialization
189 and functionality (providers, validation, writing, etc.).
190
191 The defaults are stored in the ``_default_values``. The configuration description keeps
192 description of all the options available in Airflow (description follow config.yaml.schema).
193
194 :param default_config: default configuration (in the form of ini file).
195 :param configuration_description: description of configuration to use
196 """
197
198 def __init__(
199 self,
200 default_config: str | None = None,
201 *args,
202 **kwargs,
203 ):
204 configuration_description = retrieve_configuration_description(include_providers=False)
205 # For those who would like to use a different data structure to keep defaults:
206 # We have to keep the default values in a ConfigParser rather than in any other
207 # data structure, because the values we have might contain %% which are ConfigParser
208 # interpolation placeholders. The _default_values config parser will interpolate them
209 # properly when we call get() on it.
210 _default_values = create_default_config_parser(configuration_description)
211 super().__init__(configuration_description, _default_values, *args, **kwargs)
212 self.configuration_description = configuration_description
213 self._default_values = _default_values
214 self._provider_config_fallback_default_values = create_provider_config_fallback_defaults()
215 if default_config is not None:
216 self._update_defaults_from_string(default_config)
217 self._update_logging_deprecated_template_to_one_from_defaults()
218 self.is_validated = False
219 self._suppress_future_warnings = False
220 self._providers_configuration_loaded = False
221
222 @property
223 def _validators(self) -> list[Callable[[], None]]:
224 """Overring _validators from shared base class to add core-specific validators."""
225 return [
226 self._validate_sqlite3_version,
227 self._validate_enums,
228 self._validate_deprecated_values,
229 self._upgrade_postgres_metastore_conn,
230 ]
231
232 @property
233 def _lookup_sequence(self) -> list[Callable]:
234 """Overring _lookup_sequence from shared base class to add provider fallbacks."""
235 return super()._lookup_sequence + [self._get_option_from_provider_fallbacks]
236
237 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]:
238 """Override the base method to add provider fallbacks."""
239 return [
240 ("provider-fallback-defaults", self._provider_config_fallback_default_values),
241 ("default", self._default_values),
242 ("airflow.cfg", self),
243 ]
244
245 def _get_option_from_provider_fallbacks(
246 self,
247 deprecated_key: str | None,
248 deprecated_section: str | None,
249 key: str,
250 section: str,
251 issue_warning: bool = True,
252 extra_stacklevel: int = 0,
253 **kwargs,
254 ) -> str | ValueNotFound:
255 """Get config option from provider fallback defaults."""
256 if self.get_provider_config_fallback_defaults(section, key) is not None:
257 # no expansion needed
258 return self.get_provider_config_fallback_defaults(section, key, **kwargs)
259 return VALUE_NOT_FOUND_SENTINEL
260
261 def _update_logging_deprecated_template_to_one_from_defaults(self):
262 default = self.get_default_value("logging", "log_filename_template")
263 if default:
264 # Tuple does not support item assignment, so we have to create a new tuple and replace it
265 original_replacement = self.deprecated_values["logging"]["log_filename_template"]
266 self.deprecated_values["logging"]["log_filename_template"] = (
267 original_replacement[0],
268 default,
269 )
270
271 def get_provider_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any:
272 """Get provider config fallback default values."""
273 return self._provider_config_fallback_default_values.get(section, key, fallback=None, **kwargs)
274
275 # A mapping of old default values that we want to change and warn the user
276 # about. Mapping of section -> setting -> { old, replace }
277 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {
278 "logging": {
279 "log_filename_template": (
280 re.compile(
281 re.escape(
282 "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log"
283 )
284 ),
285 # The actual replacement value will be updated after defaults are loaded from config.yml
286 "XX-set-after-default-config-loaded-XX",
287 ),
288 },
289 "core": {
290 "executor": (re.compile(re.escape("SequentialExecutor")), "LocalExecutor"),
291 },
292 }
293
294 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"]
295 enums_options = {
296 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()),
297 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"],
298 ("core", "mp_start_method"): multiprocessing.get_all_start_methods(),
299 ("dag_processor", "file_parsing_sort_mode"): [
300 "modified_time",
301 "random_seeded_by_host",
302 "alphabetical",
303 ],
304 ("logging", "logging_level"): _available_logging_levels,
305 ("logging", "fab_logging_level"): _available_logging_levels,
306 # celery_logging_level can be empty, which uses logging_level as fallback
307 ("logging", "celery_logging_level"): [*_available_logging_levels, ""],
308 # uvicorn and gunicorn logging levels for web servers
309 ("logging", "uvicorn_logging_level"): _available_logging_levels,
310 ("logging", "gunicorn_logging_level"): _available_logging_levels,
311 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""],
312 ("api", "grid_view_sorting_order"): ["topological", "hierarchical_alphabetical"],
313 }
314
315 upgraded_values: dict[tuple[str, str], str]
316 """Mapping of (section,option) to the old value that was upgraded"""
317
318 def write_custom_config(
319 self,
320 file: IO[str],
321 comment_out_defaults: bool = True,
322 include_descriptions: bool = True,
323 extra_spacing: bool = True,
324 modifications: ConfigModifications | None = None,
325 ) -> None:
326 """
327 Write a configuration file using a ConfigModifications object.
328
329 This method includes only options from the current airflow.cfg. For each option:
330 - If it's marked for removal, omit it.
331 - If renamed, output it under its new name and add a comment indicating its original location.
332 - If a default update is specified, apply the new default and output the option as a commented line.
333 - Otherwise, if the current value equals the default and comment_out_defaults is True, output it as a comment.
334 Options absent from the current airflow.cfg are omitted.
335
336 :param file: File to write the configuration.
337 :param comment_out_defaults: If True, options whose value equals the default are written as comments.
338 :param include_descriptions: Whether to include section descriptions.
339 :param extra_spacing: Whether to insert an extra blank line after each option.
340 :param modifications: ConfigModifications instance with rename, remove, and default updates.
341 """
342 modifications = modifications or ConfigModifications()
343 output: dict[str, list[tuple[str, str, bool, str]]] = {}
344
345 for section in self._sections: # type: ignore[attr-defined] # accessing _sections from ConfigParser
346 for option, orig_value in self._sections[section].items(): # type: ignore[attr-defined]
347 key = (section.lower(), option.lower())
348 if key in modifications.remove:
349 continue
350
351 mod_comment = ""
352 if key in modifications.rename:
353 new_sec, new_opt = modifications.rename[key]
354 effective_section = new_sec
355 effective_option = new_opt
356 mod_comment += f"# Renamed from {section}.{option}\n"
357 else:
358 effective_section = section
359 effective_option = option
360
361 value = orig_value
362 if key in modifications.default_updates:
363 mod_comment += (
364 f"# Default updated from {orig_value} to {modifications.default_updates[key]}\n"
365 )
366 value = modifications.default_updates[key]
367
368 default_value = self.get_default_value(effective_section, effective_option, fallback="")
369 is_default = str(value) == str(default_value)
370 output.setdefault(effective_section.lower(), []).append(
371 (effective_option, str(value), is_default, mod_comment)
372 )
373
374 for section, options in output.items():
375 section_buffer = StringIO()
376 section_buffer.write(f"[{section}]\n")
377 if include_descriptions:
378 description = self.configuration_description.get(section, {}).get("description", "")
379 if description:
380 for line in description.splitlines():
381 section_buffer.write(f"# {line}\n")
382 section_buffer.write("\n")
383 for option, value_str, is_default, mod_comment in options:
384 key = (section.lower(), option.lower())
385 if key in modifications.default_updates and comment_out_defaults:
386 section_buffer.write(f"# {option} = {value_str}\n")
387 else:
388 if mod_comment:
389 section_buffer.write(mod_comment)
390 if is_default and comment_out_defaults:
391 section_buffer.write(f"# {option} = {value_str}\n")
392 else:
393 section_buffer.write(f"{option} = {value_str}\n")
394 if extra_spacing:
395 section_buffer.write("\n")
396 content = section_buffer.getvalue().strip()
397 if content:
398 file.write(f"{content}\n\n")
399
400 def _ensure_providers_config_loaded(self) -> None:
401 """Ensure providers configurations are loaded."""
402 if not self._providers_configuration_loaded:
403 from airflow.providers_manager import ProvidersManager
404
405 ProvidersManager()._initialize_providers_configuration()
406
407 def _ensure_providers_config_unloaded(self) -> bool:
408 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded."""
409 if self._providers_configuration_loaded:
410 self.restore_core_default_configuration()
411 return True
412 return False
413
414 def _reload_provider_configs(self) -> None:
415 """Reload providers configuration."""
416 self.load_providers_configuration()
417
418 def restore_core_default_configuration(self) -> None:
419 """
420 Restore default configuration for core Airflow.
421
422 It does not restore configuration for providers. If you want to restore configuration for
423 providers, you need to call ``load_providers_configuration`` method.
424 """
425 self.configuration_description = retrieve_configuration_description(include_providers=False)
426 self._default_values = create_default_config_parser(self.configuration_description)
427 self._providers_configuration_loaded = False
428
429 def _upgrade_postgres_metastore_conn(self):
430 """
431 Upgrade SQL schemas.
432
433 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres`
434 must be replaced with `postgresql`.
435 """
436 section, key = "database", "sql_alchemy_conn"
437 old_value = self.get(section, key, _extra_stacklevel=1)
438 bad_schemes = ["postgres+psycopg2", "postgres"]
439 good_scheme = "postgresql"
440 parsed = urlsplit(old_value)
441 if parsed.scheme in bad_schemes:
442 warnings.warn(
443 f"Bad scheme in Airflow configuration [database] sql_alchemy_conn: `{parsed.scheme}`. "
444 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must "
445 f"change to `{good_scheme}` before the next Airflow release.",
446 FutureWarning,
447 stacklevel=1,
448 )
449 self.upgraded_values[(section, key)] = old_value
450 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value)
451 self._update_env_var(section=section, name=key, new_value=new_value)
452
453 # if the old value is set via env var, we need to wipe it
454 # otherwise, it'll "win" over our adjusted value
455 old_env_var = self._env_var_name("core", key)
456 os.environ.pop(old_env_var, None)
457
458 def _validate_enums(self):
459 """Validate that enum type config has an accepted value."""
460 for (section_key, option_key), enum_options in self.enums_options.items():
461 if self.has_option(section_key, option_key):
462 value = self.get(section_key, option_key, fallback=None)
463 if value and value not in enum_options:
464 raise AirflowConfigException(
465 f"`[{section_key}] {option_key}` should not be "
466 f"{value!r}. Possible values: {', '.join(enum_options)}."
467 )
468
469 def _validate_sqlite3_version(self):
470 """
471 Validate SQLite version.
472
473 Some features in storing rendered fields require SQLite >= 3.15.0.
474 """
475 if "sqlite" not in self.get("database", "sql_alchemy_conn"):
476 return
477
478 import sqlite3
479
480 min_sqlite_version = (3, 15, 0)
481 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version:
482 return
483
484 from airflow.utils.docs import get_docs_url
485
486 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version)
487 raise AirflowConfigException(
488 f"error: SQLite C library too old (< {min_sqlite_version_str}). "
489 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}"
490 )
491
492 def mask_secrets(self):
493 from airflow._shared.secrets_masker import mask_secret as mask_secret_core
494 from airflow.sdk.log import mask_secret as mask_secret_sdk
495
496 for section, key in self.sensitive_config_values:
497 try:
498 with self.suppress_future_warnings():
499 value = self.get(section, key, suppress_warnings=True)
500 except AirflowConfigException:
501 log.debug(
502 "Could not retrieve value from section %s, for key %s. Skipping redaction of this conf.",
503 section,
504 key,
505 )
506 continue
507 mask_secret_core(value)
508 mask_secret_sdk(value)
509
510 def load_test_config(self):
511 """
512 Use test configuration rather than the configuration coming from airflow defaults.
513
514 When running tests we use special the unit_test configuration to avoid accidental modifications and
515 different behaviours when running the tests. Values for those test configuration are stored in
516 the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder
517 and you need to change values there if you want to make some specific configuration to be used
518 """
519 from cryptography.fernet import Fernet
520
521 unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg"
522 unit_test_config = unit_test_config_file.read_text()
523 self.remove_all_read_configurations()
524 with StringIO(unit_test_config) as test_config_file:
525 self.read_file(test_config_file)
526
527 # We need those globals before we run "get_all_expansion_variables" because this is where
528 # the variables are expanded from in the configuration - set to random values for tests
529 _SecretKeys.fernet_key = Fernet.generate_key().decode()
530 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8")
531 self.expand_all_configuration_values()
532 log.info("Unit test configuration loaded from 'config_unit_tests.cfg'")
533
534 def expand_all_configuration_values(self):
535 """Expand all configuration values using global and local variables defined in this module."""
536 all_vars = get_all_expansion_variables()
537 for section in self.sections():
538 for key, value in self.items(section):
539 if value is not None:
540 if self.has_option(section, key):
541 self.remove_option(section, key)
542 if self.is_template(section, key) or not isinstance(value, str):
543 self.set(section, key, value)
544 else:
545 self.set(section, key, value.format(**all_vars))
546
547 def remove_all_read_configurations(self):
548 """Remove all read configurations, leaving only default values in the config."""
549 for section in self.sections():
550 self.remove_section(section)
551
552 @property
553 def providers_configuration_loaded(self) -> bool:
554 """Checks if providers have been loaded."""
555 return self._providers_configuration_loaded
556
557 def load_providers_configuration(self):
558 """
559 Load configuration for providers.
560
561 This should be done after initial configuration have been performed. Initializing and discovering
562 providers is an expensive operation and cannot be performed when we load configuration for the first
563 time when airflow starts, because we initialize configuration very early, during importing of the
564 `airflow` package and the module is not yet ready to be used when it happens and until configuration
565 and settings are loaded. Therefore, in order to reload provider configuration we need to additionally
566 load provider - specific configuration.
567 """
568 log.debug("Loading providers configuration")
569 from airflow.providers_manager import ProvidersManager
570
571 self.restore_core_default_configuration()
572 for provider, config in ProvidersManager().already_initialized_provider_configs:
573 for provider_section, provider_section_content in config.items():
574 provider_options = provider_section_content["options"]
575 section_in_current_config = self.configuration_description.get(provider_section)
576 if not section_in_current_config:
577 self.configuration_description[provider_section] = deepcopy(provider_section_content)
578 section_in_current_config = self.configuration_description.get(provider_section)
579 section_in_current_config["source"] = f"default-{provider}"
580 for option in provider_options:
581 section_in_current_config["options"][option]["source"] = f"default-{provider}"
582 else:
583 section_source = section_in_current_config.get("source", "Airflow's core package").split(
584 "default-"
585 )[-1]
586 raise AirflowConfigException(
587 f"The provider {provider} is attempting to contribute "
588 f"configuration section {provider_section} that "
589 f"has already been added before. The source of it: {section_source}. "
590 "This is forbidden. A provider can only add new sections. It "
591 "cannot contribute options to existing sections or override other "
592 "provider's configuration.",
593 UserWarning,
594 )
595 self._default_values = create_default_config_parser(self.configuration_description)
596 # sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete
597 # the cached values, and it will be refreshed on next access.
598 with contextlib.suppress(AttributeError):
599 # no problem if cache is not set yet
600 del self.sensitive_config_values
601 self._providers_configuration_loaded = True
602
603 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
604 """
605 Override to use module-level function that reads from global conf.
606
607 This ensures as_dict() and other methods use the same secrets backend
608 configuration as the global conf instance (set via conf_vars in tests).
609 """
610 secrets_client = get_custom_secret_backend()
611 if not secrets_client:
612 return None
613 try:
614 return secrets_client.get_config(config_key)
615 except Exception as e:
616 raise AirflowConfigException(
617 "Cannot retrieve config from alternative secrets backend. "
618 "Make sure it is configured properly and that the Backend "
619 "is accessible.\n"
620 f"{e}"
621 )
622
623 def __getstate__(self) -> dict[str, Any]:
624 """Return the state of the object as a dictionary for pickling."""
625 return {
626 name: getattr(self, name)
627 for name in [
628 "_sections",
629 "is_validated",
630 "configuration_description",
631 "upgraded_values",
632 "_default_values",
633 ]
634 }
635
636 def __setstate__(self, state) -> None:
637 """Restore the state of the object from a dictionary representation."""
638 self.__init__() # type: ignore[misc]
639 config = state.pop("_sections")
640 self.read_dict(config)
641 self.__dict__.update(state)
642
643
644def get_airflow_home() -> str:
645 """Get path to Airflow Home."""
646 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow"))
647
648
649def get_airflow_config(airflow_home: str) -> str:
650 """Get Path to airflow.cfg path."""
651 airflow_config_var = os.environ.get("AIRFLOW_CONFIG")
652 if airflow_config_var is None:
653 return os.path.join(airflow_home, "airflow.cfg")
654 return expand_env_var(airflow_config_var)
655
656
657def get_all_expansion_variables() -> dict[str, Any]:
658 return {
659 "FERNET_KEY": _SecretKeys.fernet_key,
660 "JWT_SECRET_KEY": _SecretKeys.jwt_secret_key,
661 **{
662 k: v
663 for k, v in globals().items()
664 if not k.startswith("_") and not callable(v) and not ismodule(v)
665 },
666 }
667
668
669def _generate_fernet_key() -> str:
670 from cryptography.fernet import Fernet
671
672 return Fernet.generate_key().decode()
673
674
675def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser:
676 """
677 Create default config parser based on configuration description.
678
679 It creates ConfigParser with all default values retrieved from the configuration description and
680 expands all the variables from the global and local variables defined in this module.
681
682 :param configuration_description: configuration description - retrieved from config.yaml files
683 following the schema defined in "config.yml.schema.json" in the config_templates folder.
684 :return: Default Config Parser that can be used to read configuration values from.
685 """
686 parser = ConfigParser()
687 all_vars = get_all_expansion_variables()
688 configure_parser_from_configuration_description(parser, configuration_description, all_vars)
689 return parser
690
691
692def create_provider_config_fallback_defaults() -> ConfigParser:
693 """
694 Create fallback defaults.
695
696 This parser contains provider defaults for Airflow configuration, containing fallback default values
697 that might be needed when provider classes are being imported - before provider's configuration
698 is loaded.
699
700 Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead
701 to retrieving provider configuration before the defaults for the provider are loaded.
702
703 Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or
704 environment variables) those will be used as usual.
705
706 NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication,
707 at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed.
708
709 You've been warned!
710 """
711 config_parser = ConfigParser()
712 config_parser.read(_default_config_file_path("provider_config_fallback_defaults.cfg"))
713 return config_parser
714
715
716def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
717 airflow_config = pathlib.Path(AIRFLOW_CONFIG)
718 if airflow_config.is_dir():
719 msg = (
720 "Airflow config expected to be a path to the configuration file, "
721 f"but got a directory {airflow_config.__fspath__()!r}."
722 )
723 raise IsADirectoryError(msg)
724 if not airflow_config.exists():
725 log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__())
726 config_directory = airflow_config.parent
727 if not config_directory.exists():
728 if not config_directory.is_relative_to(AIRFLOW_HOME):
729 msg = (
730 f"Config directory {config_directory.__fspath__()!r} not exists "
731 f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. "
732 "Please create this directory first."
733 )
734 raise FileNotFoundError(msg) from None
735 log.debug("Create directory %r for Airflow config", config_directory.__fspath__())
736 config_directory.mkdir(parents=True, exist_ok=True)
737 if not conf.get("core", "fernet_key"):
738 # We know that fernet_key is not set, so we can generate it, set as global key
739 # and also write it to the config file so that same key will be used next time
740 _SecretKeys.fernet_key = _generate_fernet_key()
741 conf.configuration_description["core"]["options"]["fernet_key"]["default"] = (
742 _SecretKeys.fernet_key
743 )
744
745 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8")
746 conf.configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = (
747 _SecretKeys.jwt_secret_key
748 )
749 pathlib.Path(airflow_config.__fspath__()).touch()
750 make_group_other_inaccessible(airflow_config.__fspath__())
751 with open(airflow_config, "w") as file:
752 conf.write(
753 file,
754 include_sources=False,
755 include_env_vars=True,
756 include_providers=True,
757 extra_spacing=True,
758 only_defaults=True,
759 show_values=True,
760 )
761 return conf
762
763
764def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser):
765 """
766 Load standard airflow configuration.
767
768 In case it finds that the configuration file is missing, it will create it and write the default
769 configuration values there, based on defaults passed, and will add the comments and examples
770 from the default configuration.
771
772 :param airflow_config_parser: parser to which the configuration will be loaded
773
774 """
775 global AIRFLOW_HOME # to be cleaned in Airflow 4.0
776 log.info("Reading the config from %s", AIRFLOW_CONFIG)
777 airflow_config_parser.read(AIRFLOW_CONFIG)
778 if airflow_config_parser.has_option("core", "AIRFLOW_HOME"):
779 msg = (
780 "Specifying both AIRFLOW_HOME environment variable and airflow_home "
781 "in the config file is deprecated. Please use only the AIRFLOW_HOME "
782 "environment variable and remove the config file entry."
783 )
784 if "AIRFLOW_HOME" in os.environ:
785 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1)
786 elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME:
787 warnings.warn(
788 "Specifying airflow_home in the config file is deprecated. As you "
789 "have left it at the default value you should remove the setting "
790 "from your airflow.cfg and suffer no change in behaviour.",
791 category=RemovedInAirflow4Warning,
792 stacklevel=1,
793 )
794 else:
795 AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home")
796 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1)
797
798
799def initialize_config() -> AirflowConfigParser:
800 """
801 Load the Airflow config files.
802
803 Called for you automatically as part of the Airflow boot process.
804 """
805 airflow_config_parser = AirflowConfigParser()
806 if airflow_config_parser.getboolean("core", "unit_test_mode"):
807 airflow_config_parser.load_test_config()
808 else:
809 load_standard_airflow_configuration(airflow_config_parser)
810 # If the user set unit_test_mode in the airflow.cfg, we still
811 # want to respect that and then load the default unit test configuration
812 # file on top of it.
813 if airflow_config_parser.getboolean("core", "unit_test_mode"):
814 airflow_config_parser.load_test_config()
815 return airflow_config_parser
816
817
818def make_group_other_inaccessible(file_path: str):
819 try:
820 permissions = os.stat(file_path)
821 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR))
822 except Exception as e:
823 log.warning(
824 "Could not change permissions of config file to be group/other inaccessible. "
825 "Continuing with original permissions: %s",
826 e,
827 )
828
829
830def ensure_secrets_loaded(
831 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
832) -> list[BaseSecretsBackend]:
833 """
834 Ensure that all secrets backends are loaded.
835
836 If the secrets_backend_list contains only 2 default backends, reload it.
837 """
838 # Check if the secrets_backend_list contains only 2 default backends.
839
840 # Check if we are loading the backends for worker too by checking if the default_backends is equal
841 # to DEFAULT_SECRETS_SEARCH_PATH.
842 if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH:
843 return initialize_secrets_backends(default_backends=default_backends)
844 return secrets_backend_list
845
846
847def get_custom_secret_backend(worker_mode: bool = False) -> BaseSecretsBackend | None:
848 """
849 Get Secret Backend if defined in airflow.cfg.
850
851 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
852
853 This is a convenience function that calls conf._get_custom_secret_backend().
854 """
855 return conf._get_custom_secret_backend(worker_mode=worker_mode)
856
857
858def initialize_secrets_backends(
859 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
860) -> list[BaseSecretsBackend]:
861 """
862 Initialize secrets backend.
863
864 * import secrets backend classes
865 * instantiate them and return them in a list
866 """
867 backend_list = []
868 worker_mode = False
869 if default_backends != DEFAULT_SECRETS_SEARCH_PATH:
870 worker_mode = True
871
872 custom_secret_backend = get_custom_secret_backend(worker_mode)
873
874 if custom_secret_backend is not None:
875 backend_list.append(custom_secret_backend)
876
877 for class_name in default_backends:
878 secrets_backend_cls = import_string(class_name)
879 backend_list.append(secrets_backend_cls())
880
881 return backend_list
882
883
884def initialize_auth_manager() -> BaseAuthManager:
885 """
886 Initialize auth manager.
887
888 * import user manager class
889 * instantiate it and return it
890 """
891 auth_manager_cls = conf.getimport(section="core", key="auth_manager")
892
893 if not auth_manager_cls:
894 raise AirflowConfigException(
895 "No auth manager defined in the config. Please specify one using section/key [core/auth_manager]."
896 )
897
898 return auth_manager_cls()
899
900
901# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
902# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults.
903AIRFLOW_HOME = get_airflow_home()
904AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME)
905
906# Set up dags folder for unit tests
907# this directory won't exist if users install via pip
908_TEST_DAGS_FOLDER = os.path.join(
909 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags"
910)
911if os.path.exists(_TEST_DAGS_FOLDER):
912 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
913else:
914 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags")
915
916# Set up plugins folder for unit tests
917_TEST_PLUGINS_FOLDER = os.path.join(
918 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins"
919)
920if os.path.exists(_TEST_PLUGINS_FOLDER):
921 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
922else:
923 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins")
924
925SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8")
926
927conf: AirflowConfigParser = initialize_config()
928secrets_backend_list = initialize_secrets_backends()
929conf.validate()