1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
18
19import contextlib
20import logging
21import multiprocessing
22import os
23import pathlib
24import re
25import shlex
26import stat
27import subprocess
28import sys
29import warnings
30from base64 import b64encode
31from collections.abc import Callable
32from configparser import ConfigParser
33from copy import deepcopy
34from io import StringIO
35from re import Pattern
36from typing import IO, TYPE_CHECKING, Any
37from urllib.parse import urlsplit
38
39from typing_extensions import overload
40
41from airflow._shared.configuration.parser import (
42 VALUE_NOT_FOUND_SENTINEL,
43 AirflowConfigParser as _SharedAirflowConfigParser,
44 ValueNotFound,
45)
46from airflow._shared.module_loading import import_string
47from airflow.exceptions import AirflowConfigException
48from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH
49from airflow.task.weight_rule import WeightRule
50from airflow.utils import yaml
51
52if TYPE_CHECKING:
53 from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
54 from airflow.secrets import BaseSecretsBackend
55
56log = logging.getLogger(__name__)
57
58# show Airflow's deprecation warnings
59if not sys.warnoptions:
60 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow")
61 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow")
62
63_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$")
64
65ConfigType = str | int | float | bool
66ConfigOptionsDictType = dict[str, ConfigType]
67ConfigSectionSourcesType = dict[str, str | tuple[str, str]]
68ConfigSourcesType = dict[str, ConfigSectionSourcesType]
69
70ENV_VAR_PREFIX = "AIRFLOW__"
71
72
73class ConfigModifications:
74 """
75 Holds modifications to be applied when writing out the config.
76
77 :param rename: Mapping from (old_section, old_option) to (new_section, new_option)
78 :param remove: Set of (section, option) to remove
79 :param default_updates: Mapping from (section, option) to new default value
80 """
81
82 def __init__(self) -> None:
83 self.rename: dict[tuple[str, str], tuple[str, str]] = {}
84 self.remove: set[tuple[str, str]] = set()
85 self.default_updates: dict[tuple[str, str], str] = {}
86
87 def add_rename(self, old_section: str, old_option: str, new_section: str, new_option: str) -> None:
88 self.rename[(old_section, old_option)] = (new_section, new_option)
89
90 def add_remove(self, section: str, option: str) -> None:
91 self.remove.add((section, option))
92
93 def add_default_update(self, section: str, option: str, new_default: str) -> None:
94 self.default_updates[(section, option)] = new_default
95
96
97def _parse_sqlite_version(s: str) -> tuple[int, ...]:
98 match = _SQLITE3_VERSION_PATTERN.match(s)
99 if match is None:
100 return ()
101 return tuple(int(p) for p in match.group("version").split("."))
102
103
104@overload
105def expand_env_var(env_var: None) -> None: ...
106
107
108@overload
109def expand_env_var(env_var: str) -> str: ...
110
111
112def expand_env_var(env_var: str | None) -> str | None:
113 """
114 Expand (potentially nested) env vars.
115
116 Repeat and apply `expandvars` and `expanduser` until
117 interpolation stops having any effect.
118 """
119 if not env_var or not isinstance(env_var, str):
120 return env_var
121 while True:
122 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
123 if interpolated == env_var:
124 return interpolated
125 env_var = interpolated
126
127
128def run_command(command: str) -> str:
129 """Run command and returns stdout."""
130 process = subprocess.Popen(
131 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
132 )
133 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
134
135 if process.returncode != 0:
136 raise AirflowConfigException(
137 f"Cannot execute {command}. Error code is: {process.returncode}. "
138 f"Output: {output}, Stderr: {stderr}"
139 )
140
141 return output
142
143
144def _default_config_file_path(file_name: str) -> str:
145 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates")
146 return os.path.join(templates_dir, file_name)
147
148
149def retrieve_configuration_description(
150 include_airflow: bool = True,
151 include_providers: bool = True,
152 selected_provider: str | None = None,
153) -> dict[str, dict[str, Any]]:
154 """
155 Read Airflow configuration description from YAML file.
156
157 :param include_airflow: Include Airflow configs
158 :param include_providers: Include provider configs
159 :param selected_provider: If specified, include selected provider only
160 :return: Python dictionary containing configs & their info
161 """
162 base_configuration_description: dict[str, dict[str, Any]] = {}
163 if include_airflow:
164 with open(_default_config_file_path("config.yml")) as config_file:
165 base_configuration_description.update(yaml.safe_load(config_file))
166 if include_providers:
167 from airflow.providers_manager import ProvidersManager
168
169 for provider, config in ProvidersManager().provider_configs:
170 if not selected_provider or provider == selected_provider:
171 base_configuration_description.update(config)
172 return base_configuration_description
173
174
175class AirflowConfigParser(_SharedAirflowConfigParser):
176 """
177 Custom Airflow Configparser supporting defaults and deprecated options.
178
179 This is a subclass of the shared AirflowConfigParser that adds Core-specific initialization
180 and functionality (providers, validation, writing, etc.).
181
182 The defaults are stored in the ``_default_values``. The configuration description keeps
183 description of all the options available in Airflow (description follow config.yaml.schema).
184
185 :param default_config: default configuration (in the form of ini file).
186 :param configuration_description: description of configuration to use
187 """
188
189 def __init__(
190 self,
191 default_config: str | None = None,
192 *args,
193 **kwargs,
194 ):
195 configuration_description = retrieve_configuration_description(include_providers=False)
196 # For those who would like to use a different data structure to keep defaults:
197 # We have to keep the default values in a ConfigParser rather than in any other
198 # data structure, because the values we have might contain %% which are ConfigParser
199 # interpolation placeholders. The _default_values config parser will interpolate them
200 # properly when we call get() on it.
201 _default_values = create_default_config_parser(configuration_description)
202 super().__init__(configuration_description, _default_values, *args, **kwargs)
203 self.configuration_description = configuration_description
204 self._default_values = _default_values
205 self._provider_config_fallback_default_values = create_provider_config_fallback_defaults()
206 if default_config is not None:
207 self._update_defaults_from_string(default_config)
208 self._update_logging_deprecated_template_to_one_from_defaults()
209 self.is_validated = False
210 self._suppress_future_warnings = False
211 self._providers_configuration_loaded = False
212
213 @property
214 def _validators(self) -> list[Callable[[], None]]:
215 """Overring _validators from shared base class to add core-specific validators."""
216 return [
217 self._validate_sqlite3_version,
218 self._validate_enums,
219 self._validate_deprecated_values,
220 self._upgrade_postgres_metastore_conn,
221 ]
222
223 @property
224 def _lookup_sequence(self) -> list[Callable]:
225 """Overring _lookup_sequence from shared base class to add provider fallbacks."""
226 return super()._lookup_sequence + [self._get_option_from_provider_fallbacks]
227
228 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]:
229 """Override the base method to add provider fallbacks."""
230 return [
231 ("provider-fallback-defaults", self._provider_config_fallback_default_values),
232 ("default", self._default_values),
233 ("airflow.cfg", self),
234 ]
235
236 def _get_option_from_provider_fallbacks(
237 self,
238 deprecated_key: str | None,
239 deprecated_section: str | None,
240 key: str,
241 section: str,
242 issue_warning: bool = True,
243 extra_stacklevel: int = 0,
244 **kwargs,
245 ) -> str | ValueNotFound:
246 """Get config option from provider fallback defaults."""
247 if self.get_provider_config_fallback_defaults(section, key) is not None:
248 # no expansion needed
249 return self.get_provider_config_fallback_defaults(section, key, **kwargs)
250 return VALUE_NOT_FOUND_SENTINEL
251
252 def _update_logging_deprecated_template_to_one_from_defaults(self):
253 default = self.get_default_value("logging", "log_filename_template")
254 if default:
255 # Tuple does not support item assignment, so we have to create a new tuple and replace it
256 original_replacement = self.deprecated_values["logging"]["log_filename_template"]
257 self.deprecated_values["logging"]["log_filename_template"] = (
258 original_replacement[0],
259 default,
260 )
261
262 def get_provider_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any:
263 """Get provider config fallback default values."""
264 return self._provider_config_fallback_default_values.get(section, key, fallback=None, **kwargs)
265
266 # A mapping of old default values that we want to change and warn the user
267 # about. Mapping of section -> setting -> { old, replace }
268 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {
269 "logging": {
270 "log_filename_template": (
271 re.compile(
272 re.escape(
273 "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log"
274 )
275 ),
276 # The actual replacement value will be updated after defaults are loaded from config.yml
277 "XX-set-after-default-config-loaded-XX",
278 ),
279 },
280 "core": {
281 "executor": (re.compile(re.escape("SequentialExecutor")), "LocalExecutor"),
282 },
283 }
284
285 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"]
286 enums_options = {
287 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()),
288 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"],
289 ("core", "mp_start_method"): multiprocessing.get_all_start_methods(),
290 ("dag_processor", "file_parsing_sort_mode"): [
291 "modified_time",
292 "random_seeded_by_host",
293 "alphabetical",
294 ],
295 ("logging", "logging_level"): _available_logging_levels,
296 ("logging", "fab_logging_level"): _available_logging_levels,
297 # celery_logging_level can be empty, which uses logging_level as fallback
298 ("logging", "celery_logging_level"): [*_available_logging_levels, ""],
299 # uvicorn and gunicorn logging levels for web servers
300 ("logging", "uvicorn_logging_level"): _available_logging_levels,
301 ("logging", "gunicorn_logging_level"): _available_logging_levels,
302 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""],
303 ("api", "grid_view_sorting_order"): ["topological", "hierarchical_alphabetical"],
304 }
305
306 upgraded_values: dict[tuple[str, str], str]
307 """Mapping of (section,option) to the old value that was upgraded"""
308
309 def write_custom_config(
310 self,
311 file: IO[str],
312 comment_out_defaults: bool = True,
313 include_descriptions: bool = True,
314 extra_spacing: bool = True,
315 modifications: ConfigModifications | None = None,
316 ) -> None:
317 """
318 Write a configuration file using a ConfigModifications object.
319
320 This method includes only options from the current airflow.cfg. For each option:
321 - If it's marked for removal, omit it.
322 - If renamed, output it under its new name and add a comment indicating its original location.
323 - If a default update is specified, apply the new default and output the option as a commented line.
324 - Otherwise, if the current value equals the default and comment_out_defaults is True, output it as a comment.
325 Options absent from the current airflow.cfg are omitted.
326
327 :param file: File to write the configuration.
328 :param comment_out_defaults: If True, options whose value equals the default are written as comments.
329 :param include_descriptions: Whether to include section descriptions.
330 :param extra_spacing: Whether to insert an extra blank line after each option.
331 :param modifications: ConfigModifications instance with rename, remove, and default updates.
332 """
333 modifications = modifications or ConfigModifications()
334 output: dict[str, list[tuple[str, str, bool, str]]] = {}
335
336 for section in self._sections: # type: ignore[attr-defined] # accessing _sections from ConfigParser
337 for option, orig_value in self._sections[section].items(): # type: ignore[attr-defined]
338 key = (section.lower(), option.lower())
339 if key in modifications.remove:
340 continue
341
342 mod_comment = ""
343 if key in modifications.rename:
344 new_sec, new_opt = modifications.rename[key]
345 effective_section = new_sec
346 effective_option = new_opt
347 mod_comment += f"# Renamed from {section}.{option}\n"
348 else:
349 effective_section = section
350 effective_option = option
351
352 value = orig_value
353 if key in modifications.default_updates:
354 mod_comment += (
355 f"# Default updated from {orig_value} to {modifications.default_updates[key]}\n"
356 )
357 value = modifications.default_updates[key]
358
359 default_value = self.get_default_value(effective_section, effective_option, fallback="")
360 is_default = str(value) == str(default_value)
361 output.setdefault(effective_section.lower(), []).append(
362 (effective_option, str(value), is_default, mod_comment)
363 )
364
365 for section, options in output.items():
366 section_buffer = StringIO()
367 section_buffer.write(f"[{section}]\n")
368 if include_descriptions:
369 description = self.configuration_description.get(section, {}).get("description", "")
370 if description:
371 for line in description.splitlines():
372 section_buffer.write(f"# {line}\n")
373 section_buffer.write("\n")
374 for option, value_str, is_default, mod_comment in options:
375 key = (section.lower(), option.lower())
376 if key in modifications.default_updates and comment_out_defaults:
377 section_buffer.write(f"# {option} = {value_str}\n")
378 else:
379 if mod_comment:
380 section_buffer.write(mod_comment)
381 if is_default and comment_out_defaults:
382 section_buffer.write(f"# {option} = {value_str}\n")
383 else:
384 section_buffer.write(f"{option} = {value_str}\n")
385 if extra_spacing:
386 section_buffer.write("\n")
387 content = section_buffer.getvalue().strip()
388 if content:
389 file.write(f"{content}\n\n")
390
391 def _ensure_providers_config_loaded(self) -> None:
392 """Ensure providers configurations are loaded."""
393 if not self._providers_configuration_loaded:
394 from airflow.providers_manager import ProvidersManager
395
396 ProvidersManager()._initialize_providers_configuration()
397
398 def _ensure_providers_config_unloaded(self) -> bool:
399 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded."""
400 if self._providers_configuration_loaded:
401 self.restore_core_default_configuration()
402 return True
403 return False
404
405 def _reload_provider_configs(self) -> None:
406 """Reload providers configuration."""
407 self.load_providers_configuration()
408
409 def restore_core_default_configuration(self) -> None:
410 """
411 Restore default configuration for core Airflow.
412
413 It does not restore configuration for providers. If you want to restore configuration for
414 providers, you need to call ``load_providers_configuration`` method.
415 """
416 self.configuration_description = retrieve_configuration_description(include_providers=False)
417 self._default_values = create_default_config_parser(self.configuration_description)
418 self._providers_configuration_loaded = False
419
420 def _upgrade_postgres_metastore_conn(self):
421 """
422 Upgrade SQL schemas.
423
424 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres`
425 must be replaced with `postgresql`.
426 """
427 section, key = "database", "sql_alchemy_conn"
428 old_value = self.get(section, key, _extra_stacklevel=1)
429 bad_schemes = ["postgres+psycopg2", "postgres"]
430 good_scheme = "postgresql"
431 parsed = urlsplit(old_value)
432 if parsed.scheme in bad_schemes:
433 warnings.warn(
434 f"Bad scheme in Airflow configuration [database] sql_alchemy_conn: `{parsed.scheme}`. "
435 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must "
436 f"change to `{good_scheme}` before the next Airflow release.",
437 FutureWarning,
438 stacklevel=1,
439 )
440 self.upgraded_values[(section, key)] = old_value
441 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value)
442 self._update_env_var(section=section, name=key, new_value=new_value)
443
444 # if the old value is set via env var, we need to wipe it
445 # otherwise, it'll "win" over our adjusted value
446 old_env_var = self._env_var_name("core", key)
447 os.environ.pop(old_env_var, None)
448
449 def _validate_enums(self):
450 """Validate that enum type config has an accepted value."""
451 for (section_key, option_key), enum_options in self.enums_options.items():
452 if self.has_option(section_key, option_key):
453 value = self.get(section_key, option_key, fallback=None)
454 if value and value not in enum_options:
455 raise AirflowConfigException(
456 f"`[{section_key}] {option_key}` should not be "
457 f"{value!r}. Possible values: {', '.join(enum_options)}."
458 )
459
460 def _validate_sqlite3_version(self):
461 """
462 Validate SQLite version.
463
464 Some features in storing rendered fields require SQLite >= 3.15.0.
465 """
466 if "sqlite" not in self.get("database", "sql_alchemy_conn"):
467 return
468
469 import sqlite3
470
471 min_sqlite_version = (3, 15, 0)
472 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version:
473 return
474
475 from airflow.utils.docs import get_docs_url
476
477 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version)
478 raise AirflowConfigException(
479 f"error: SQLite C library too old (< {min_sqlite_version_str}). "
480 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}"
481 )
482
483 def mask_secrets(self):
484 from airflow._shared.secrets_masker import mask_secret as mask_secret_core
485 from airflow.sdk.log import mask_secret as mask_secret_sdk
486
487 for section, key in self.sensitive_config_values:
488 try:
489 with self.suppress_future_warnings():
490 value = self.get(section, key, suppress_warnings=True)
491 except AirflowConfigException:
492 log.debug(
493 "Could not retrieve value from section %s, for key %s. Skipping redaction of this conf.",
494 section,
495 key,
496 )
497 continue
498 mask_secret_core(value)
499 mask_secret_sdk(value)
500
501 def load_test_config(self):
502 """
503 Use test configuration rather than the configuration coming from airflow defaults.
504
505 When running tests we use special the unit_test configuration to avoid accidental modifications and
506 different behaviours when running the tests. Values for those test configuration are stored in
507 the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder
508 and you need to change values there if you want to make some specific configuration to be used
509 """
510 # We need those globals before we run "get_all_expansion_variables" because this is where
511 # the variables are expanded from in the configuration
512 global FERNET_KEY, JWT_SECRET_KEY
513 from cryptography.fernet import Fernet
514
515 unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg"
516 unit_test_config = unit_test_config_file.read_text()
517 self.remove_all_read_configurations()
518 with StringIO(unit_test_config) as test_config_file:
519 self.read_file(test_config_file)
520 # set fernet key to a random value
521 FERNET_KEY = Fernet.generate_key().decode()
522 JWT_SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8")
523 self.expand_all_configuration_values()
524 log.info("Unit test configuration loaded from 'config_unit_tests.cfg'")
525
526 def expand_all_configuration_values(self):
527 """Expand all configuration values using global and local variables defined in this module."""
528 all_vars = get_all_expansion_variables()
529 for section in self.sections():
530 for key, value in self.items(section):
531 if value is not None:
532 if self.has_option(section, key):
533 self.remove_option(section, key)
534 if self.is_template(section, key) or not isinstance(value, str):
535 self.set(section, key, value)
536 else:
537 self.set(section, key, value.format(**all_vars))
538
539 def remove_all_read_configurations(self):
540 """Remove all read configurations, leaving only default values in the config."""
541 for section in self.sections():
542 self.remove_section(section)
543
544 @property
545 def providers_configuration_loaded(self) -> bool:
546 """Checks if providers have been loaded."""
547 return self._providers_configuration_loaded
548
549 def load_providers_configuration(self):
550 """
551 Load configuration for providers.
552
553 This should be done after initial configuration have been performed. Initializing and discovering
554 providers is an expensive operation and cannot be performed when we load configuration for the first
555 time when airflow starts, because we initialize configuration very early, during importing of the
556 `airflow` package and the module is not yet ready to be used when it happens and until configuration
557 and settings are loaded. Therefore, in order to reload provider configuration we need to additionally
558 load provider - specific configuration.
559 """
560 log.debug("Loading providers configuration")
561 from airflow.providers_manager import ProvidersManager
562
563 self.restore_core_default_configuration()
564 for provider, config in ProvidersManager().already_initialized_provider_configs:
565 for provider_section, provider_section_content in config.items():
566 provider_options = provider_section_content["options"]
567 section_in_current_config = self.configuration_description.get(provider_section)
568 if not section_in_current_config:
569 self.configuration_description[provider_section] = deepcopy(provider_section_content)
570 section_in_current_config = self.configuration_description.get(provider_section)
571 section_in_current_config["source"] = f"default-{provider}"
572 for option in provider_options:
573 section_in_current_config["options"][option]["source"] = f"default-{provider}"
574 else:
575 section_source = section_in_current_config.get("source", "Airflow's core package").split(
576 "default-"
577 )[-1]
578 raise AirflowConfigException(
579 f"The provider {provider} is attempting to contribute "
580 f"configuration section {provider_section} that "
581 f"has already been added before. The source of it: {section_source}. "
582 "This is forbidden. A provider can only add new sections. It "
583 "cannot contribute options to existing sections or override other "
584 "provider's configuration.",
585 UserWarning,
586 )
587 self._default_values = create_default_config_parser(self.configuration_description)
588 # sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete
589 # the cached values, and it will be refreshed on next access.
590 with contextlib.suppress(AttributeError):
591 # no problem if cache is not set yet
592 del self.sensitive_config_values
593 self._providers_configuration_loaded = True
594
595 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
596 """
597 Override to use module-level function that reads from global conf.
598
599 This ensures as_dict() and other methods use the same secrets backend
600 configuration as the global conf instance (set via conf_vars in tests).
601 """
602 secrets_client = get_custom_secret_backend()
603 if not secrets_client:
604 return None
605 try:
606 return secrets_client.get_config(config_key)
607 except Exception as e:
608 raise AirflowConfigException(
609 "Cannot retrieve config from alternative secrets backend. "
610 "Make sure it is configured properly and that the Backend "
611 "is accessible.\n"
612 f"{e}"
613 )
614
615 def __getstate__(self) -> dict[str, Any]:
616 """Return the state of the object as a dictionary for pickling."""
617 return {
618 name: getattr(self, name)
619 for name in [
620 "_sections",
621 "is_validated",
622 "configuration_description",
623 "upgraded_values",
624 "_default_values",
625 ]
626 }
627
628 def __setstate__(self, state) -> None:
629 """Restore the state of the object from a dictionary representation."""
630 self.__init__() # type: ignore[misc]
631 config = state.pop("_sections")
632 self.read_dict(config)
633 self.__dict__.update(state)
634
635
636def get_airflow_home() -> str:
637 """Get path to Airflow Home."""
638 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow"))
639
640
641def get_airflow_config(airflow_home: str) -> str:
642 """Get Path to airflow.cfg path."""
643 airflow_config_var = os.environ.get("AIRFLOW_CONFIG")
644 if airflow_config_var is None:
645 return os.path.join(airflow_home, "airflow.cfg")
646 return expand_env_var(airflow_config_var)
647
648
649def get_all_expansion_variables() -> dict[str, Any]:
650 return {k: v for d in [globals(), locals()] for k, v in d.items() if not k.startswith("_")}
651
652
653def _generate_fernet_key() -> str:
654 from cryptography.fernet import Fernet
655
656 return Fernet.generate_key().decode()
657
658
659def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser:
660 """
661 Create default config parser based on configuration description.
662
663 It creates ConfigParser with all default values retrieved from the configuration description and
664 expands all the variables from the global and local variables defined in this module.
665
666 :param configuration_description: configuration description - retrieved from config.yaml files
667 following the schema defined in "config.yml.schema.json" in the config_templates folder.
668 :return: Default Config Parser that can be used to read configuration values from.
669 """
670 parser = ConfigParser()
671 all_vars = get_all_expansion_variables()
672 for section, section_desc in configuration_description.items():
673 parser.add_section(section)
674 options = section_desc["options"]
675 for key in options:
676 default_value = options[key]["default"]
677 is_template = options[key].get("is_template", False)
678 if default_value is not None:
679 if is_template or not isinstance(default_value, str):
680 parser.set(section, key, default_value)
681 else:
682 parser.set(section, key, default_value.format(**all_vars))
683 return parser
684
685
686def create_provider_config_fallback_defaults() -> ConfigParser:
687 """
688 Create fallback defaults.
689
690 This parser contains provider defaults for Airflow configuration, containing fallback default values
691 that might be needed when provider classes are being imported - before provider's configuration
692 is loaded.
693
694 Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead
695 to retrieving provider configuration before the defaults for the provider are loaded.
696
697 Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or
698 environment variables) those will be used as usual.
699
700 NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication,
701 at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed.
702
703 You've been warned!
704 """
705 config_parser = ConfigParser()
706 config_parser.read(_default_config_file_path("provider_config_fallback_defaults.cfg"))
707 return config_parser
708
709
710def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
711 global FERNET_KEY, JWT_SECRET_KEY
712 airflow_config = pathlib.Path(AIRFLOW_CONFIG)
713 if airflow_config.is_dir():
714 msg = (
715 "Airflow config expected to be a path to the configuration file, "
716 f"but got a directory {airflow_config.__fspath__()!r}."
717 )
718 raise IsADirectoryError(msg)
719 if not airflow_config.exists():
720 log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__())
721 config_directory = airflow_config.parent
722 if not config_directory.exists():
723 if not config_directory.is_relative_to(AIRFLOW_HOME):
724 msg = (
725 f"Config directory {config_directory.__fspath__()!r} not exists "
726 f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. "
727 "Please create this directory first."
728 )
729 raise FileNotFoundError(msg) from None
730 log.debug("Create directory %r for Airflow config", config_directory.__fspath__())
731 config_directory.mkdir(parents=True, exist_ok=True)
732 if conf.get("core", "fernet_key", fallback=None) in (None, ""):
733 # We know that FERNET_KEY is not set, so we can generate it, set as global key
734 # and also write it to the config file so that same key will be used next time
735 FERNET_KEY = _generate_fernet_key()
736 conf.configuration_description["core"]["options"]["fernet_key"]["default"] = FERNET_KEY
737
738 JWT_SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8")
739 conf.configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = JWT_SECRET_KEY
740 pathlib.Path(airflow_config.__fspath__()).touch()
741 make_group_other_inaccessible(airflow_config.__fspath__())
742 with open(airflow_config, "w") as file:
743 conf.write(
744 file,
745 include_sources=False,
746 include_env_vars=True,
747 include_providers=True,
748 extra_spacing=True,
749 only_defaults=True,
750 )
751 return conf
752
753
754def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser):
755 """
756 Load standard airflow configuration.
757
758 In case it finds that the configuration file is missing, it will create it and write the default
759 configuration values there, based on defaults passed, and will add the comments and examples
760 from the default configuration.
761
762 :param airflow_config_parser: parser to which the configuration will be loaded
763
764 """
765 global AIRFLOW_HOME
766 log.info("Reading the config from %s", AIRFLOW_CONFIG)
767 airflow_config_parser.read(AIRFLOW_CONFIG)
768 if airflow_config_parser.has_option("core", "AIRFLOW_HOME"):
769 msg = (
770 "Specifying both AIRFLOW_HOME environment variable and airflow_home "
771 "in the config file is deprecated. Please use only the AIRFLOW_HOME "
772 "environment variable and remove the config file entry."
773 )
774 if "AIRFLOW_HOME" in os.environ:
775 warnings.warn(msg, category=DeprecationWarning, stacklevel=1)
776 elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME:
777 warnings.warn(
778 "Specifying airflow_home in the config file is deprecated. As you "
779 "have left it at the default value you should remove the setting "
780 "from your airflow.cfg and suffer no change in behaviour.",
781 category=DeprecationWarning,
782 stacklevel=1,
783 )
784 else:
785 AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home")
786 warnings.warn(msg, category=DeprecationWarning, stacklevel=1)
787
788
789def initialize_config() -> AirflowConfigParser:
790 """
791 Load the Airflow config files.
792
793 Called for you automatically as part of the Airflow boot process.
794 """
795 airflow_config_parser = AirflowConfigParser()
796 if airflow_config_parser.getboolean("core", "unit_test_mode"):
797 airflow_config_parser.load_test_config()
798 else:
799 load_standard_airflow_configuration(airflow_config_parser)
800 # If the user set unit_test_mode in the airflow.cfg, we still
801 # want to respect that and then load the default unit test configuration
802 # file on top of it.
803 if airflow_config_parser.getboolean("core", "unit_test_mode"):
804 airflow_config_parser.load_test_config()
805 return airflow_config_parser
806
807
808def make_group_other_inaccessible(file_path: str):
809 try:
810 permissions = os.stat(file_path)
811 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR))
812 except Exception as e:
813 log.warning(
814 "Could not change permissions of config file to be group/other inaccessible. "
815 "Continuing with original permissions: %s",
816 e,
817 )
818
819
820def ensure_secrets_loaded(
821 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
822) -> list[BaseSecretsBackend]:
823 """
824 Ensure that all secrets backends are loaded.
825
826 If the secrets_backend_list contains only 2 default backends, reload it.
827 """
828 # Check if the secrets_backend_list contains only 2 default backends.
829
830 # Check if we are loading the backends for worker too by checking if the default_backends is equal
831 # to DEFAULT_SECRETS_SEARCH_PATH.
832 if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH:
833 return initialize_secrets_backends(default_backends=default_backends)
834 return secrets_backend_list
835
836
837def get_custom_secret_backend(worker_mode: bool = False) -> BaseSecretsBackend | None:
838 """
839 Get Secret Backend if defined in airflow.cfg.
840
841 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
842
843 This is a convenience function that calls conf._get_custom_secret_backend().
844 """
845 return conf._get_custom_secret_backend(worker_mode=worker_mode)
846
847
848def initialize_secrets_backends(
849 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
850) -> list[BaseSecretsBackend]:
851 """
852 Initialize secrets backend.
853
854 * import secrets backend classes
855 * instantiate them and return them in a list
856 """
857 backend_list = []
858 worker_mode = False
859 if default_backends != DEFAULT_SECRETS_SEARCH_PATH:
860 worker_mode = True
861
862 custom_secret_backend = get_custom_secret_backend(worker_mode)
863
864 if custom_secret_backend is not None:
865 backend_list.append(custom_secret_backend)
866
867 for class_name in default_backends:
868 secrets_backend_cls = import_string(class_name)
869 backend_list.append(secrets_backend_cls())
870
871 return backend_list
872
873
874def initialize_auth_manager() -> BaseAuthManager:
875 """
876 Initialize auth manager.
877
878 * import user manager class
879 * instantiate it and return it
880 """
881 auth_manager_cls = conf.getimport(section="core", key="auth_manager")
882
883 if not auth_manager_cls:
884 raise AirflowConfigException(
885 "No auth manager defined in the config. Please specify one using section/key [core/auth_manager]."
886 )
887
888 return auth_manager_cls()
889
890
891# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
892# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults.
893AIRFLOW_HOME = get_airflow_home()
894AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME)
895
896# Set up dags folder for unit tests
897# this directory won't exist if users install via pip
898_TEST_DAGS_FOLDER = os.path.join(
899 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags"
900)
901if os.path.exists(_TEST_DAGS_FOLDER):
902 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
903else:
904 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags")
905
906# Set up plugins folder for unit tests
907_TEST_PLUGINS_FOLDER = os.path.join(
908 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins"
909)
910if os.path.exists(_TEST_PLUGINS_FOLDER):
911 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
912else:
913 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins")
914
915SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8")
916FERNET_KEY = "" # Set only if needed when generating a new file
917JWT_SECRET_KEY = ""
918
919conf: AirflowConfigParser = initialize_config()
920secrets_backend_list = initialize_secrets_backends()
921conf.validate()