1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
18
19import contextlib
20import logging
21import multiprocessing
22import os
23import pathlib
24import re
25import shlex
26import stat
27import subprocess
28import sys
29import warnings
30from base64 import b64encode
31from collections.abc import Callable
32from configparser import ConfigParser
33from copy import deepcopy
34from inspect import ismodule
35from io import StringIO
36from re import Pattern
37from typing import IO, TYPE_CHECKING, Any
38from urllib.parse import urlsplit
39
40from typing_extensions import overload
41
42from airflow._shared.configuration.parser import (
43 VALUE_NOT_FOUND_SENTINEL,
44 AirflowConfigParser as _SharedAirflowConfigParser,
45 ValueNotFound,
46)
47from airflow._shared.module_loading import import_string
48from airflow.exceptions import AirflowConfigException, RemovedInAirflow4Warning
49from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH
50from airflow.task.weight_rule import WeightRule
51from airflow.utils import yaml
52
53if TYPE_CHECKING:
54 from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
55 from airflow.secrets import BaseSecretsBackend
56
57log = logging.getLogger(__name__)
58
59# show Airflow's deprecation warnings
60if not sys.warnoptions:
61 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow")
62 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow")
63
64_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$")
65
66ConfigType = str | int | float | bool
67ConfigOptionsDictType = dict[str, ConfigType]
68ConfigSectionSourcesType = dict[str, str | tuple[str, str]]
69ConfigSourcesType = dict[str, ConfigSectionSourcesType]
70
71ENV_VAR_PREFIX = "AIRFLOW__"
72
73
74class _SecretKeys:
75 """Holds the secret keys used in Airflow during runtime."""
76
77 fernet_key: str | None = None
78 jwt_secret_key: str | None = None
79
80
81class ConfigModifications:
82 """
83 Holds modifications to be applied when writing out the config.
84
85 :param rename: Mapping from (old_section, old_option) to (new_section, new_option)
86 :param remove: Set of (section, option) to remove
87 :param default_updates: Mapping from (section, option) to new default value
88 """
89
90 def __init__(self) -> None:
91 self.rename: dict[tuple[str, str], tuple[str, str]] = {}
92 self.remove: set[tuple[str, str]] = set()
93 self.default_updates: dict[tuple[str, str], str] = {}
94
95 def add_rename(self, old_section: str, old_option: str, new_section: str, new_option: str) -> None:
96 self.rename[(old_section, old_option)] = (new_section, new_option)
97
98 def add_remove(self, section: str, option: str) -> None:
99 self.remove.add((section, option))
100
101 def add_default_update(self, section: str, option: str, new_default: str) -> None:
102 self.default_updates[(section, option)] = new_default
103
104
105def _parse_sqlite_version(s: str) -> tuple[int, ...]:
106 match = _SQLITE3_VERSION_PATTERN.match(s)
107 if match is None:
108 return ()
109 return tuple(int(p) for p in match.group("version").split("."))
110
111
112@overload
113def expand_env_var(env_var: None) -> None: ...
114
115
116@overload
117def expand_env_var(env_var: str) -> str: ...
118
119
120def expand_env_var(env_var: str | None) -> str | None:
121 """
122 Expand (potentially nested) env vars.
123
124 Repeat and apply `expandvars` and `expanduser` until
125 interpolation stops having any effect.
126 """
127 if not env_var or not isinstance(env_var, str):
128 return env_var
129 while True:
130 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
131 if interpolated == env_var:
132 return interpolated
133 env_var = interpolated
134
135
136def run_command(command: str) -> str:
137 """Run command and returns stdout."""
138 process = subprocess.Popen(
139 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
140 )
141 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
142
143 if process.returncode != 0:
144 raise AirflowConfigException(
145 f"Cannot execute {command}. Error code is: {process.returncode}. "
146 f"Output: {output}, Stderr: {stderr}"
147 )
148
149 return output
150
151
152def _default_config_file_path(file_name: str) -> str:
153 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates")
154 return os.path.join(templates_dir, file_name)
155
156
157def retrieve_configuration_description(
158 include_airflow: bool = True,
159 include_providers: bool = True,
160 selected_provider: str | None = None,
161) -> dict[str, dict[str, Any]]:
162 """
163 Read Airflow configuration description from YAML file.
164
165 :param include_airflow: Include Airflow configs
166 :param include_providers: Include provider configs
167 :param selected_provider: If specified, include selected provider only
168 :return: Python dictionary containing configs & their info
169 """
170 base_configuration_description: dict[str, dict[str, Any]] = {}
171 if include_airflow:
172 with open(_default_config_file_path("config.yml")) as config_file:
173 base_configuration_description.update(yaml.safe_load(config_file))
174 if include_providers:
175 from airflow.providers_manager import ProvidersManager
176
177 for provider, config in ProvidersManager().provider_configs:
178 if not selected_provider or provider == selected_provider:
179 base_configuration_description.update(config)
180 return base_configuration_description
181
182
183class AirflowConfigParser(_SharedAirflowConfigParser):
184 """
185 Custom Airflow Configparser supporting defaults and deprecated options.
186
187 This is a subclass of the shared AirflowConfigParser that adds Core-specific initialization
188 and functionality (providers, validation, writing, etc.).
189
190 The defaults are stored in the ``_default_values``. The configuration description keeps
191 description of all the options available in Airflow (description follow config.yaml.schema).
192
193 :param default_config: default configuration (in the form of ini file).
194 :param configuration_description: description of configuration to use
195 """
196
197 def __init__(
198 self,
199 default_config: str | None = None,
200 *args,
201 **kwargs,
202 ):
203 configuration_description = retrieve_configuration_description(include_providers=False)
204 # For those who would like to use a different data structure to keep defaults:
205 # We have to keep the default values in a ConfigParser rather than in any other
206 # data structure, because the values we have might contain %% which are ConfigParser
207 # interpolation placeholders. The _default_values config parser will interpolate them
208 # properly when we call get() on it.
209 _default_values = create_default_config_parser(configuration_description)
210 super().__init__(configuration_description, _default_values, *args, **kwargs)
211 self.configuration_description = configuration_description
212 self._default_values = _default_values
213 self._provider_config_fallback_default_values = create_provider_config_fallback_defaults()
214 if default_config is not None:
215 self._update_defaults_from_string(default_config)
216 self._update_logging_deprecated_template_to_one_from_defaults()
217 self.is_validated = False
218 self._suppress_future_warnings = False
219 self._providers_configuration_loaded = False
220
221 @property
222 def _validators(self) -> list[Callable[[], None]]:
223 """Overring _validators from shared base class to add core-specific validators."""
224 return [
225 self._validate_sqlite3_version,
226 self._validate_enums,
227 self._validate_deprecated_values,
228 self._upgrade_postgres_metastore_conn,
229 ]
230
231 @property
232 def _lookup_sequence(self) -> list[Callable]:
233 """Overring _lookup_sequence from shared base class to add provider fallbacks."""
234 return super()._lookup_sequence + [self._get_option_from_provider_fallbacks]
235
236 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]:
237 """Override the base method to add provider fallbacks."""
238 return [
239 ("provider-fallback-defaults", self._provider_config_fallback_default_values),
240 ("default", self._default_values),
241 ("airflow.cfg", self),
242 ]
243
244 def _get_option_from_provider_fallbacks(
245 self,
246 deprecated_key: str | None,
247 deprecated_section: str | None,
248 key: str,
249 section: str,
250 issue_warning: bool = True,
251 extra_stacklevel: int = 0,
252 **kwargs,
253 ) -> str | ValueNotFound:
254 """Get config option from provider fallback defaults."""
255 if self.get_provider_config_fallback_defaults(section, key) is not None:
256 # no expansion needed
257 return self.get_provider_config_fallback_defaults(section, key, **kwargs)
258 return VALUE_NOT_FOUND_SENTINEL
259
260 def _update_logging_deprecated_template_to_one_from_defaults(self):
261 default = self.get_default_value("logging", "log_filename_template")
262 if default:
263 # Tuple does not support item assignment, so we have to create a new tuple and replace it
264 original_replacement = self.deprecated_values["logging"]["log_filename_template"]
265 self.deprecated_values["logging"]["log_filename_template"] = (
266 original_replacement[0],
267 default,
268 )
269
270 def get_provider_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any:
271 """Get provider config fallback default values."""
272 return self._provider_config_fallback_default_values.get(section, key, fallback=None, **kwargs)
273
274 # A mapping of old default values that we want to change and warn the user
275 # about. Mapping of section -> setting -> { old, replace }
276 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {
277 "logging": {
278 "log_filename_template": (
279 re.compile(
280 re.escape(
281 "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log"
282 )
283 ),
284 # The actual replacement value will be updated after defaults are loaded from config.yml
285 "XX-set-after-default-config-loaded-XX",
286 ),
287 },
288 "core": {
289 "executor": (re.compile(re.escape("SequentialExecutor")), "LocalExecutor"),
290 },
291 }
292
293 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"]
294 enums_options = {
295 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()),
296 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"],
297 ("core", "mp_start_method"): multiprocessing.get_all_start_methods(),
298 ("dag_processor", "file_parsing_sort_mode"): [
299 "modified_time",
300 "random_seeded_by_host",
301 "alphabetical",
302 ],
303 ("logging", "logging_level"): _available_logging_levels,
304 ("logging", "fab_logging_level"): _available_logging_levels,
305 # celery_logging_level can be empty, which uses logging_level as fallback
306 ("logging", "celery_logging_level"): [*_available_logging_levels, ""],
307 # uvicorn and gunicorn logging levels for web servers
308 ("logging", "uvicorn_logging_level"): _available_logging_levels,
309 ("logging", "gunicorn_logging_level"): _available_logging_levels,
310 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""],
311 ("api", "grid_view_sorting_order"): ["topological", "hierarchical_alphabetical"],
312 }
313
314 upgraded_values: dict[tuple[str, str], str]
315 """Mapping of (section,option) to the old value that was upgraded"""
316
317 def write_custom_config(
318 self,
319 file: IO[str],
320 comment_out_defaults: bool = True,
321 include_descriptions: bool = True,
322 extra_spacing: bool = True,
323 modifications: ConfigModifications | None = None,
324 ) -> None:
325 """
326 Write a configuration file using a ConfigModifications object.
327
328 This method includes only options from the current airflow.cfg. For each option:
329 - If it's marked for removal, omit it.
330 - If renamed, output it under its new name and add a comment indicating its original location.
331 - If a default update is specified, apply the new default and output the option as a commented line.
332 - Otherwise, if the current value equals the default and comment_out_defaults is True, output it as a comment.
333 Options absent from the current airflow.cfg are omitted.
334
335 :param file: File to write the configuration.
336 :param comment_out_defaults: If True, options whose value equals the default are written as comments.
337 :param include_descriptions: Whether to include section descriptions.
338 :param extra_spacing: Whether to insert an extra blank line after each option.
339 :param modifications: ConfigModifications instance with rename, remove, and default updates.
340 """
341 modifications = modifications or ConfigModifications()
342 output: dict[str, list[tuple[str, str, bool, str]]] = {}
343
344 for section in self._sections: # type: ignore[attr-defined] # accessing _sections from ConfigParser
345 for option, orig_value in self._sections[section].items(): # type: ignore[attr-defined]
346 key = (section.lower(), option.lower())
347 if key in modifications.remove:
348 continue
349
350 mod_comment = ""
351 if key in modifications.rename:
352 new_sec, new_opt = modifications.rename[key]
353 effective_section = new_sec
354 effective_option = new_opt
355 mod_comment += f"# Renamed from {section}.{option}\n"
356 else:
357 effective_section = section
358 effective_option = option
359
360 value = orig_value
361 if key in modifications.default_updates:
362 mod_comment += (
363 f"# Default updated from {orig_value} to {modifications.default_updates[key]}\n"
364 )
365 value = modifications.default_updates[key]
366
367 default_value = self.get_default_value(effective_section, effective_option, fallback="")
368 is_default = str(value) == str(default_value)
369 output.setdefault(effective_section.lower(), []).append(
370 (effective_option, str(value), is_default, mod_comment)
371 )
372
373 for section, options in output.items():
374 section_buffer = StringIO()
375 section_buffer.write(f"[{section}]\n")
376 if include_descriptions:
377 description = self.configuration_description.get(section, {}).get("description", "")
378 if description:
379 for line in description.splitlines():
380 section_buffer.write(f"# {line}\n")
381 section_buffer.write("\n")
382 for option, value_str, is_default, mod_comment in options:
383 key = (section.lower(), option.lower())
384 if key in modifications.default_updates and comment_out_defaults:
385 section_buffer.write(f"# {option} = {value_str}\n")
386 else:
387 if mod_comment:
388 section_buffer.write(mod_comment)
389 if is_default and comment_out_defaults:
390 section_buffer.write(f"# {option} = {value_str}\n")
391 else:
392 section_buffer.write(f"{option} = {value_str}\n")
393 if extra_spacing:
394 section_buffer.write("\n")
395 content = section_buffer.getvalue().strip()
396 if content:
397 file.write(f"{content}\n\n")
398
399 def _ensure_providers_config_loaded(self) -> None:
400 """Ensure providers configurations are loaded."""
401 if not self._providers_configuration_loaded:
402 from airflow.providers_manager import ProvidersManager
403
404 ProvidersManager()._initialize_providers_configuration()
405
406 def _ensure_providers_config_unloaded(self) -> bool:
407 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded."""
408 if self._providers_configuration_loaded:
409 self.restore_core_default_configuration()
410 return True
411 return False
412
413 def _reload_provider_configs(self) -> None:
414 """Reload providers configuration."""
415 self.load_providers_configuration()
416
417 def restore_core_default_configuration(self) -> None:
418 """
419 Restore default configuration for core Airflow.
420
421 It does not restore configuration for providers. If you want to restore configuration for
422 providers, you need to call ``load_providers_configuration`` method.
423 """
424 self.configuration_description = retrieve_configuration_description(include_providers=False)
425 self._default_values = create_default_config_parser(self.configuration_description)
426 self._providers_configuration_loaded = False
427
428 def _upgrade_postgres_metastore_conn(self):
429 """
430 Upgrade SQL schemas.
431
432 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres`
433 must be replaced with `postgresql`.
434 """
435 section, key = "database", "sql_alchemy_conn"
436 old_value = self.get(section, key, _extra_stacklevel=1)
437 bad_schemes = ["postgres+psycopg2", "postgres"]
438 good_scheme = "postgresql"
439 parsed = urlsplit(old_value)
440 if parsed.scheme in bad_schemes:
441 warnings.warn(
442 f"Bad scheme in Airflow configuration [database] sql_alchemy_conn: `{parsed.scheme}`. "
443 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must "
444 f"change to `{good_scheme}` before the next Airflow release.",
445 FutureWarning,
446 stacklevel=1,
447 )
448 self.upgraded_values[(section, key)] = old_value
449 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value)
450 self._update_env_var(section=section, name=key, new_value=new_value)
451
452 # if the old value is set via env var, we need to wipe it
453 # otherwise, it'll "win" over our adjusted value
454 old_env_var = self._env_var_name("core", key)
455 os.environ.pop(old_env_var, None)
456
457 def _validate_enums(self):
458 """Validate that enum type config has an accepted value."""
459 for (section_key, option_key), enum_options in self.enums_options.items():
460 if self.has_option(section_key, option_key):
461 value = self.get(section_key, option_key, fallback=None)
462 if value and value not in enum_options:
463 raise AirflowConfigException(
464 f"`[{section_key}] {option_key}` should not be "
465 f"{value!r}. Possible values: {', '.join(enum_options)}."
466 )
467
468 def _validate_sqlite3_version(self):
469 """
470 Validate SQLite version.
471
472 Some features in storing rendered fields require SQLite >= 3.15.0.
473 """
474 if "sqlite" not in self.get("database", "sql_alchemy_conn"):
475 return
476
477 import sqlite3
478
479 min_sqlite_version = (3, 15, 0)
480 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version:
481 return
482
483 from airflow.utils.docs import get_docs_url
484
485 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version)
486 raise AirflowConfigException(
487 f"error: SQLite C library too old (< {min_sqlite_version_str}). "
488 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}"
489 )
490
491 def mask_secrets(self):
492 from airflow._shared.secrets_masker import mask_secret as mask_secret_core
493 from airflow.sdk.log import mask_secret as mask_secret_sdk
494
495 for section, key in self.sensitive_config_values:
496 try:
497 with self.suppress_future_warnings():
498 value = self.get(section, key, suppress_warnings=True)
499 except AirflowConfigException:
500 log.debug(
501 "Could not retrieve value from section %s, for key %s. Skipping redaction of this conf.",
502 section,
503 key,
504 )
505 continue
506 mask_secret_core(value)
507 mask_secret_sdk(value)
508
509 def load_test_config(self):
510 """
511 Use test configuration rather than the configuration coming from airflow defaults.
512
513 When running tests we use special the unit_test configuration to avoid accidental modifications and
514 different behaviours when running the tests. Values for those test configuration are stored in
515 the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder
516 and you need to change values there if you want to make some specific configuration to be used
517 """
518 from cryptography.fernet import Fernet
519
520 unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg"
521 unit_test_config = unit_test_config_file.read_text()
522 self.remove_all_read_configurations()
523 with StringIO(unit_test_config) as test_config_file:
524 self.read_file(test_config_file)
525
526 # We need those globals before we run "get_all_expansion_variables" because this is where
527 # the variables are expanded from in the configuration - set to random values for tests
528 _SecretKeys.fernet_key = Fernet.generate_key().decode()
529 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8")
530 self.expand_all_configuration_values()
531 log.info("Unit test configuration loaded from 'config_unit_tests.cfg'")
532
533 def expand_all_configuration_values(self):
534 """Expand all configuration values using global and local variables defined in this module."""
535 all_vars = get_all_expansion_variables()
536 for section in self.sections():
537 for key, value in self.items(section):
538 if value is not None:
539 if self.has_option(section, key):
540 self.remove_option(section, key)
541 if self.is_template(section, key) or not isinstance(value, str):
542 self.set(section, key, value)
543 else:
544 self.set(section, key, value.format(**all_vars))
545
546 def remove_all_read_configurations(self):
547 """Remove all read configurations, leaving only default values in the config."""
548 for section in self.sections():
549 self.remove_section(section)
550
551 @property
552 def providers_configuration_loaded(self) -> bool:
553 """Checks if providers have been loaded."""
554 return self._providers_configuration_loaded
555
556 def load_providers_configuration(self):
557 """
558 Load configuration for providers.
559
560 This should be done after initial configuration have been performed. Initializing and discovering
561 providers is an expensive operation and cannot be performed when we load configuration for the first
562 time when airflow starts, because we initialize configuration very early, during importing of the
563 `airflow` package and the module is not yet ready to be used when it happens and until configuration
564 and settings are loaded. Therefore, in order to reload provider configuration we need to additionally
565 load provider - specific configuration.
566 """
567 log.debug("Loading providers configuration")
568 from airflow.providers_manager import ProvidersManager
569
570 self.restore_core_default_configuration()
571 for provider, config in ProvidersManager().already_initialized_provider_configs:
572 for provider_section, provider_section_content in config.items():
573 provider_options = provider_section_content["options"]
574 section_in_current_config = self.configuration_description.get(provider_section)
575 if not section_in_current_config:
576 self.configuration_description[provider_section] = deepcopy(provider_section_content)
577 section_in_current_config = self.configuration_description.get(provider_section)
578 section_in_current_config["source"] = f"default-{provider}"
579 for option in provider_options:
580 section_in_current_config["options"][option]["source"] = f"default-{provider}"
581 else:
582 section_source = section_in_current_config.get("source", "Airflow's core package").split(
583 "default-"
584 )[-1]
585 raise AirflowConfigException(
586 f"The provider {provider} is attempting to contribute "
587 f"configuration section {provider_section} that "
588 f"has already been added before. The source of it: {section_source}. "
589 "This is forbidden. A provider can only add new sections. It "
590 "cannot contribute options to existing sections or override other "
591 "provider's configuration.",
592 UserWarning,
593 )
594 self._default_values = create_default_config_parser(self.configuration_description)
595 # sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete
596 # the cached values, and it will be refreshed on next access.
597 with contextlib.suppress(AttributeError):
598 # no problem if cache is not set yet
599 del self.sensitive_config_values
600 self._providers_configuration_loaded = True
601
602 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
603 """
604 Override to use module-level function that reads from global conf.
605
606 This ensures as_dict() and other methods use the same secrets backend
607 configuration as the global conf instance (set via conf_vars in tests).
608 """
609 secrets_client = get_custom_secret_backend()
610 if not secrets_client:
611 return None
612 try:
613 return secrets_client.get_config(config_key)
614 except Exception as e:
615 raise AirflowConfigException(
616 "Cannot retrieve config from alternative secrets backend. "
617 "Make sure it is configured properly and that the Backend "
618 "is accessible.\n"
619 f"{e}"
620 )
621
622 def __getstate__(self) -> dict[str, Any]:
623 """Return the state of the object as a dictionary for pickling."""
624 return {
625 name: getattr(self, name)
626 for name in [
627 "_sections",
628 "is_validated",
629 "configuration_description",
630 "upgraded_values",
631 "_default_values",
632 ]
633 }
634
635 def __setstate__(self, state) -> None:
636 """Restore the state of the object from a dictionary representation."""
637 self.__init__() # type: ignore[misc]
638 config = state.pop("_sections")
639 self.read_dict(config)
640 self.__dict__.update(state)
641
642
643def get_airflow_home() -> str:
644 """Get path to Airflow Home."""
645 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow"))
646
647
648def get_airflow_config(airflow_home: str) -> str:
649 """Get Path to airflow.cfg path."""
650 airflow_config_var = os.environ.get("AIRFLOW_CONFIG")
651 if airflow_config_var is None:
652 return os.path.join(airflow_home, "airflow.cfg")
653 return expand_env_var(airflow_config_var)
654
655
656def get_all_expansion_variables() -> dict[str, Any]:
657 return {
658 "FERNET_KEY": _SecretKeys.fernet_key,
659 "JWT_SECRET_KEY": _SecretKeys.jwt_secret_key,
660 **{
661 k: v
662 for k, v in globals().items()
663 if not k.startswith("_") and not callable(v) and not ismodule(v)
664 },
665 }
666
667
668def _generate_fernet_key() -> str:
669 from cryptography.fernet import Fernet
670
671 return Fernet.generate_key().decode()
672
673
674def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser:
675 """
676 Create default config parser based on configuration description.
677
678 It creates ConfigParser with all default values retrieved from the configuration description and
679 expands all the variables from the global and local variables defined in this module.
680
681 :param configuration_description: configuration description - retrieved from config.yaml files
682 following the schema defined in "config.yml.schema.json" in the config_templates folder.
683 :return: Default Config Parser that can be used to read configuration values from.
684 """
685 parser = ConfigParser()
686 all_vars = get_all_expansion_variables()
687 for section, section_desc in configuration_description.items():
688 parser.add_section(section)
689 options = section_desc["options"]
690 for key in options:
691 default_value = options[key]["default"]
692 is_template = options[key].get("is_template", False)
693 if default_value is not None:
694 if is_template or not isinstance(default_value, str):
695 parser.set(section, key, default_value)
696 else:
697 parser.set(section, key, default_value.format(**all_vars))
698 return parser
699
700
701def create_provider_config_fallback_defaults() -> ConfigParser:
702 """
703 Create fallback defaults.
704
705 This parser contains provider defaults for Airflow configuration, containing fallback default values
706 that might be needed when provider classes are being imported - before provider's configuration
707 is loaded.
708
709 Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead
710 to retrieving provider configuration before the defaults for the provider are loaded.
711
712 Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or
713 environment variables) those will be used as usual.
714
715 NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication,
716 at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed.
717
718 You've been warned!
719 """
720 config_parser = ConfigParser()
721 config_parser.read(_default_config_file_path("provider_config_fallback_defaults.cfg"))
722 return config_parser
723
724
725def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
726 airflow_config = pathlib.Path(AIRFLOW_CONFIG)
727 if airflow_config.is_dir():
728 msg = (
729 "Airflow config expected to be a path to the configuration file, "
730 f"but got a directory {airflow_config.__fspath__()!r}."
731 )
732 raise IsADirectoryError(msg)
733 if not airflow_config.exists():
734 log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__())
735 config_directory = airflow_config.parent
736 if not config_directory.exists():
737 if not config_directory.is_relative_to(AIRFLOW_HOME):
738 msg = (
739 f"Config directory {config_directory.__fspath__()!r} not exists "
740 f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. "
741 "Please create this directory first."
742 )
743 raise FileNotFoundError(msg) from None
744 log.debug("Create directory %r for Airflow config", config_directory.__fspath__())
745 config_directory.mkdir(parents=True, exist_ok=True)
746 if conf.get("core", "fernet_key", fallback=None) in (None, ""):
747 # We know that fernet_key is not set, so we can generate it, set as global key
748 # and also write it to the config file so that same key will be used next time
749 _SecretKeys.fernet_key = _generate_fernet_key()
750 conf.configuration_description["core"]["options"]["fernet_key"]["default"] = (
751 _SecretKeys.fernet_key
752 )
753
754 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8")
755 conf.configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = (
756 _SecretKeys.jwt_secret_key
757 )
758 pathlib.Path(airflow_config.__fspath__()).touch()
759 make_group_other_inaccessible(airflow_config.__fspath__())
760 with open(airflow_config, "w") as file:
761 conf.write(
762 file,
763 include_sources=False,
764 include_env_vars=True,
765 include_providers=True,
766 extra_spacing=True,
767 only_defaults=True,
768 )
769 return conf
770
771
772def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser):
773 """
774 Load standard airflow configuration.
775
776 In case it finds that the configuration file is missing, it will create it and write the default
777 configuration values there, based on defaults passed, and will add the comments and examples
778 from the default configuration.
779
780 :param airflow_config_parser: parser to which the configuration will be loaded
781
782 """
783 global AIRFLOW_HOME # to be cleaned in Airflow 4.0
784 log.info("Reading the config from %s", AIRFLOW_CONFIG)
785 airflow_config_parser.read(AIRFLOW_CONFIG)
786 if airflow_config_parser.has_option("core", "AIRFLOW_HOME"):
787 msg = (
788 "Specifying both AIRFLOW_HOME environment variable and airflow_home "
789 "in the config file is deprecated. Please use only the AIRFLOW_HOME "
790 "environment variable and remove the config file entry."
791 )
792 if "AIRFLOW_HOME" in os.environ:
793 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1)
794 elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME:
795 warnings.warn(
796 "Specifying airflow_home in the config file is deprecated. As you "
797 "have left it at the default value you should remove the setting "
798 "from your airflow.cfg and suffer no change in behaviour.",
799 category=RemovedInAirflow4Warning,
800 stacklevel=1,
801 )
802 else:
803 AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home")
804 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1)
805
806
807def initialize_config() -> AirflowConfigParser:
808 """
809 Load the Airflow config files.
810
811 Called for you automatically as part of the Airflow boot process.
812 """
813 airflow_config_parser = AirflowConfigParser()
814 if airflow_config_parser.getboolean("core", "unit_test_mode"):
815 airflow_config_parser.load_test_config()
816 else:
817 load_standard_airflow_configuration(airflow_config_parser)
818 # If the user set unit_test_mode in the airflow.cfg, we still
819 # want to respect that and then load the default unit test configuration
820 # file on top of it.
821 if airflow_config_parser.getboolean("core", "unit_test_mode"):
822 airflow_config_parser.load_test_config()
823 return airflow_config_parser
824
825
826def make_group_other_inaccessible(file_path: str):
827 try:
828 permissions = os.stat(file_path)
829 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR))
830 except Exception as e:
831 log.warning(
832 "Could not change permissions of config file to be group/other inaccessible. "
833 "Continuing with original permissions: %s",
834 e,
835 )
836
837
838def ensure_secrets_loaded(
839 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
840) -> list[BaseSecretsBackend]:
841 """
842 Ensure that all secrets backends are loaded.
843
844 If the secrets_backend_list contains only 2 default backends, reload it.
845 """
846 # Check if the secrets_backend_list contains only 2 default backends.
847
848 # Check if we are loading the backends for worker too by checking if the default_backends is equal
849 # to DEFAULT_SECRETS_SEARCH_PATH.
850 if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH:
851 return initialize_secrets_backends(default_backends=default_backends)
852 return secrets_backend_list
853
854
855def get_custom_secret_backend(worker_mode: bool = False) -> BaseSecretsBackend | None:
856 """
857 Get Secret Backend if defined in airflow.cfg.
858
859 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
860
861 This is a convenience function that calls conf._get_custom_secret_backend().
862 """
863 return conf._get_custom_secret_backend(worker_mode=worker_mode)
864
865
866def initialize_secrets_backends(
867 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
868) -> list[BaseSecretsBackend]:
869 """
870 Initialize secrets backend.
871
872 * import secrets backend classes
873 * instantiate them and return them in a list
874 """
875 backend_list = []
876 worker_mode = False
877 if default_backends != DEFAULT_SECRETS_SEARCH_PATH:
878 worker_mode = True
879
880 custom_secret_backend = get_custom_secret_backend(worker_mode)
881
882 if custom_secret_backend is not None:
883 backend_list.append(custom_secret_backend)
884
885 for class_name in default_backends:
886 secrets_backend_cls = import_string(class_name)
887 backend_list.append(secrets_backend_cls())
888
889 return backend_list
890
891
892def initialize_auth_manager() -> BaseAuthManager:
893 """
894 Initialize auth manager.
895
896 * import user manager class
897 * instantiate it and return it
898 """
899 auth_manager_cls = conf.getimport(section="core", key="auth_manager")
900
901 if not auth_manager_cls:
902 raise AirflowConfigException(
903 "No auth manager defined in the config. Please specify one using section/key [core/auth_manager]."
904 )
905
906 return auth_manager_cls()
907
908
909# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
910# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults.
911AIRFLOW_HOME = get_airflow_home()
912AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME)
913
914# Set up dags folder for unit tests
915# this directory won't exist if users install via pip
916_TEST_DAGS_FOLDER = os.path.join(
917 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags"
918)
919if os.path.exists(_TEST_DAGS_FOLDER):
920 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
921else:
922 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags")
923
924# Set up plugins folder for unit tests
925_TEST_PLUGINS_FOLDER = os.path.join(
926 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins"
927)
928if os.path.exists(_TEST_PLUGINS_FOLDER):
929 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
930else:
931 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins")
932
933SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8")
934
935conf: AirflowConfigParser = initialize_config()
936secrets_backend_list = initialize_secrets_backends()
937conf.validate()