Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/configuration.py: 49%
777 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import datetime
20import functools
21import json
22import logging
23import multiprocessing
24import os
25import pathlib
26import re
27import shlex
28import stat
29import subprocess
30import sys
31import warnings
32from base64 import b64encode
33from collections import OrderedDict
35# Ignored Mypy on configparser because it thinks the configparser module has no _UNSET attribute
36from configparser import _UNSET, ConfigParser, NoOptionError, NoSectionError # type: ignore
37from contextlib import contextmanager, suppress
38from json.decoder import JSONDecodeError
39from re import Pattern
40from typing import IO, Any, Dict, Iterable, Tuple, Union
41from urllib.parse import urlsplit
43from typing_extensions import overload
45from airflow.exceptions import AirflowConfigException
46from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH, BaseSecretsBackend
47from airflow.utils import yaml
48from airflow.utils.module_loading import import_string
49from airflow.utils.weight_rule import WeightRule
51log = logging.getLogger(__name__)
53# show Airflow's deprecation warnings
54if not sys.warnoptions:
55 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow")
56 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow")
58_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$")
60ConfigType = Union[str, int, float, bool]
61ConfigOptionsDictType = Dict[str, ConfigType]
62ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]]
63ConfigSourcesType = Dict[str, ConfigSectionSourcesType]
65ENV_VAR_PREFIX = "AIRFLOW__"
68def _parse_sqlite_version(s: str) -> tuple[int, ...]:
69 match = _SQLITE3_VERSION_PATTERN.match(s)
70 if match is None:
71 return ()
72 return tuple(int(p) for p in match.group("version").split("."))
75@overload
76def expand_env_var(env_var: None) -> None:
77 ...
80@overload
81def expand_env_var(env_var: str) -> str:
82 ...
85def expand_env_var(env_var: str | None) -> str | None:
86 """
87 Expands (potentially nested) env vars.
89 Repeat and apply `expandvars` and `expanduser` until
90 interpolation stops having any effect.
91 """
92 if not env_var:
93 return env_var
94 while True:
95 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
96 if interpolated == env_var:
97 return interpolated
98 else:
99 env_var = interpolated
102def run_command(command: str) -> str:
103 """Runs command and returns stdout."""
104 process = subprocess.Popen(
105 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
106 )
107 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
109 if process.returncode != 0:
110 raise AirflowConfigException(
111 f"Cannot execute {command}. Error code is: {process.returncode}. "
112 f"Output: {output}, Stderr: {stderr}"
113 )
115 return output
118def _get_config_value_from_secret_backend(config_key: str) -> str | None:
119 """Get Config option values from Secret Backend."""
120 try:
121 secrets_client = get_custom_secret_backend()
122 if not secrets_client:
123 return None
124 return secrets_client.get_config(config_key)
125 except Exception as e:
126 raise AirflowConfigException(
127 "Cannot retrieve config from alternative secrets backend. "
128 "Make sure it is configured properly and that the Backend "
129 "is accessible.\n"
130 f"{e}"
131 )
134def _default_config_file_path(file_name: str) -> str:
135 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates")
136 return os.path.join(templates_dir, file_name)
139def default_config_yaml() -> dict[str, Any]:
140 """
141 Read Airflow configs from YAML file.
143 :return: Python dictionary containing configs & their info
144 """
145 with open(_default_config_file_path("config.yml")) as config_file:
146 return yaml.safe_load(config_file)
149SENSITIVE_CONFIG_VALUES = {
150 ("database", "sql_alchemy_conn"),
151 ("core", "fernet_key"),
152 ("celery", "broker_url"),
153 ("celery", "flower_basic_auth"),
154 ("celery", "result_backend"),
155 ("atlas", "password"),
156 ("smtp", "smtp_password"),
157 ("webserver", "secret_key"),
158 # The following options are deprecated
159 ("core", "sql_alchemy_conn"),
160}
163class AirflowConfigParser(ConfigParser):
164 """Custom Airflow Configparser supporting defaults and deprecated options."""
166 # These configuration elements can be fetched as the stdout of commands
167 # following the "{section}__{name}_cmd" pattern, the idea behind this
168 # is to not store password on boxes in text files.
169 # These configs can also be fetched from Secrets backend
170 # following the "{section}__{name}__secret" pattern
172 sensitive_config_values: set[tuple[str, str]] = SENSITIVE_CONFIG_VALUES
174 # A mapping of (new section, new option) -> (old section, old option, since_version).
175 # When reading new option, the old option will be checked to see if it exists. If it does a
176 # DeprecationWarning will be issued and the old option will be used instead
177 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
178 ("celery", "worker_precheck"): ("core", "worker_precheck", "2.0.0"),
179 ("logging", "interleave_timestamp_parser"): ("core", "interleave_timestamp_parser", "2.6.1"),
180 ("logging", "base_log_folder"): ("core", "base_log_folder", "2.0.0"),
181 ("logging", "remote_logging"): ("core", "remote_logging", "2.0.0"),
182 ("logging", "remote_log_conn_id"): ("core", "remote_log_conn_id", "2.0.0"),
183 ("logging", "remote_base_log_folder"): ("core", "remote_base_log_folder", "2.0.0"),
184 ("logging", "encrypt_s3_logs"): ("core", "encrypt_s3_logs", "2.0.0"),
185 ("logging", "logging_level"): ("core", "logging_level", "2.0.0"),
186 ("logging", "fab_logging_level"): ("core", "fab_logging_level", "2.0.0"),
187 ("logging", "logging_config_class"): ("core", "logging_config_class", "2.0.0"),
188 ("logging", "colored_console_log"): ("core", "colored_console_log", "2.0.0"),
189 ("logging", "colored_log_format"): ("core", "colored_log_format", "2.0.0"),
190 ("logging", "colored_formatter_class"): ("core", "colored_formatter_class", "2.0.0"),
191 ("logging", "log_format"): ("core", "log_format", "2.0.0"),
192 ("logging", "simple_log_format"): ("core", "simple_log_format", "2.0.0"),
193 ("logging", "task_log_prefix_template"): ("core", "task_log_prefix_template", "2.0.0"),
194 ("logging", "log_filename_template"): ("core", "log_filename_template", "2.0.0"),
195 ("logging", "log_processor_filename_template"): ("core", "log_processor_filename_template", "2.0.0"),
196 ("logging", "dag_processor_manager_log_location"): (
197 "core",
198 "dag_processor_manager_log_location",
199 "2.0.0",
200 ),
201 ("logging", "task_log_reader"): ("core", "task_log_reader", "2.0.0"),
202 ("metrics", "metrics_allow_list"): ("metrics", "statsd_allow_list", "2.6.0"),
203 ("metrics", "metrics_block_list"): ("metrics", "statsd_block_list", "2.6.0"),
204 ("metrics", "statsd_on"): ("scheduler", "statsd_on", "2.0.0"),
205 ("metrics", "statsd_host"): ("scheduler", "statsd_host", "2.0.0"),
206 ("metrics", "statsd_port"): ("scheduler", "statsd_port", "2.0.0"),
207 ("metrics", "statsd_prefix"): ("scheduler", "statsd_prefix", "2.0.0"),
208 ("metrics", "statsd_allow_list"): ("scheduler", "statsd_allow_list", "2.0.0"),
209 ("metrics", "stat_name_handler"): ("scheduler", "stat_name_handler", "2.0.0"),
210 ("metrics", "statsd_datadog_enabled"): ("scheduler", "statsd_datadog_enabled", "2.0.0"),
211 ("metrics", "statsd_datadog_tags"): ("scheduler", "statsd_datadog_tags", "2.0.0"),
212 ("metrics", "statsd_datadog_metrics_tags"): ("scheduler", "statsd_datadog_metrics_tags", "2.6.0"),
213 ("metrics", "statsd_custom_client_path"): ("scheduler", "statsd_custom_client_path", "2.0.0"),
214 ("scheduler", "parsing_processes"): ("scheduler", "max_threads", "1.10.14"),
215 ("scheduler", "scheduler_idle_sleep_time"): ("scheduler", "processor_poll_interval", "2.2.0"),
216 ("operators", "default_queue"): ("celery", "default_queue", "2.1.0"),
217 ("core", "hide_sensitive_var_conn_fields"): ("admin", "hide_sensitive_variable_fields", "2.1.0"),
218 ("core", "sensitive_var_conn_names"): ("admin", "sensitive_variable_fields", "2.1.0"),
219 ("core", "default_pool_task_slot_count"): ("core", "non_pooled_task_slot_count", "1.10.4"),
220 ("core", "max_active_tasks_per_dag"): ("core", "dag_concurrency", "2.2.0"),
221 ("logging", "worker_log_server_port"): ("celery", "worker_log_server_port", "2.2.0"),
222 ("api", "access_control_allow_origins"): ("api", "access_control_allow_origin", "2.2.0"),
223 ("api", "auth_backends"): ("api", "auth_backend", "2.3.0"),
224 ("database", "sql_alchemy_conn"): ("core", "sql_alchemy_conn", "2.3.0"),
225 ("database", "sql_engine_encoding"): ("core", "sql_engine_encoding", "2.3.0"),
226 ("database", "sql_engine_collation_for_ids"): ("core", "sql_engine_collation_for_ids", "2.3.0"),
227 ("database", "sql_alchemy_pool_enabled"): ("core", "sql_alchemy_pool_enabled", "2.3.0"),
228 ("database", "sql_alchemy_pool_size"): ("core", "sql_alchemy_pool_size", "2.3.0"),
229 ("database", "sql_alchemy_max_overflow"): ("core", "sql_alchemy_max_overflow", "2.3.0"),
230 ("database", "sql_alchemy_pool_recycle"): ("core", "sql_alchemy_pool_recycle", "2.3.0"),
231 ("database", "sql_alchemy_pool_pre_ping"): ("core", "sql_alchemy_pool_pre_ping", "2.3.0"),
232 ("database", "sql_alchemy_schema"): ("core", "sql_alchemy_schema", "2.3.0"),
233 ("database", "sql_alchemy_connect_args"): ("core", "sql_alchemy_connect_args", "2.3.0"),
234 ("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"),
235 ("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"),
236 ("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"),
237 ("scheduler", "task_queued_timeout_check_interval"): (
238 "kubernetes_executor",
239 "worker_pods_pending_timeout_check_interval",
240 "2.6.0",
241 ),
242 }
244 # A mapping of new configurations to a list of old configurations for when one configuration
245 # deprecates more than one other deprecation. The deprecation logic for these configurations
246 # is defined in SchedulerJobRunner.
247 many_to_one_deprecated_options: dict[tuple[str, str], list[tuple[str, str, str]]] = {
248 ("scheduler", "task_queued_timeout"): [
249 ("celery", "stalled_task_timeout", "2.6.0"),
250 ("celery", "task_adoption_timeout", "2.6.0"),
251 ("kubernetes_executor", "worker_pods_pending_timeout", "2.6.0"),
252 ]
253 }
255 # A mapping of new section -> (old section, since_version).
256 deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")}
258 # Now build the inverse so we can go from old_section/old_key to new_section/new_key
259 # if someone tries to retrieve it based on old_section/old_key
260 @functools.cached_property
261 def inversed_deprecated_options(self):
262 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()}
264 @functools.cached_property
265 def inversed_deprecated_sections(self):
266 return {
267 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items()
268 }
270 # A mapping of old default values that we want to change and warn the user
271 # about. Mapping of section -> setting -> { old, replace, by_version }
272 deprecated_values: dict[str, dict[str, tuple[Pattern, str, str]]] = {
273 "core": {
274 "hostname_callable": (re.compile(r":"), r".", "2.1"),
275 },
276 "webserver": {
277 "navbar_color": (re.compile(r"\A#007A87\Z", re.IGNORECASE), "#fff", "2.1"),
278 "dag_default_view": (re.compile(r"^tree$"), "grid", "3.0"),
279 },
280 "email": {
281 "email_backend": (
282 re.compile(r"^airflow\.contrib\.utils\.sendgrid\.send_email$"),
283 r"airflow.providers.sendgrid.utils.emailer.send_email",
284 "2.1",
285 ),
286 },
287 "logging": {
288 "log_filename_template": (
289 re.compile(re.escape("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")),
290 "XX-set-after-default-config-loaded-XX",
291 "3.0",
292 ),
293 },
294 "api": {
295 "auth_backends": (
296 re.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"),
297 "airflow.api.auth.backend.session",
298 "3.0",
299 ),
300 },
301 "elasticsearch": {
302 "log_id_template": (
303 re.compile("^" + re.escape("{dag_id}-{task_id}-{execution_date}-{try_number}") + "$"),
304 "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
305 "3.0",
306 )
307 },
308 }
310 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"]
311 enums_options = {
312 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()),
313 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"],
314 ("core", "mp_start_method"): multiprocessing.get_all_start_methods(),
315 ("scheduler", "file_parsing_sort_mode"): ["modified_time", "random_seeded_by_host", "alphabetical"],
316 ("logging", "logging_level"): _available_logging_levels,
317 ("logging", "fab_logging_level"): _available_logging_levels,
318 # celery_logging_level can be empty, which uses logging_level as fallback
319 ("logging", "celery_logging_level"): _available_logging_levels + [""],
320 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", ""],
321 }
323 upgraded_values: dict[tuple[str, str], str]
324 """Mapping of (section,option) to the old value that was upgraded"""
326 # This method transforms option names on every read, get, or set operation.
327 # This changes from the default behaviour of ConfigParser from lower-casing
328 # to instead be case-preserving
329 def optionxform(self, optionstr: str) -> str:
330 return optionstr
332 def __init__(self, default_config: str | None = None, *args, **kwargs):
333 super().__init__(*args, **kwargs)
334 self.upgraded_values = {}
336 self.airflow_defaults = ConfigParser(*args, **kwargs)
337 if default_config is not None:
338 self.airflow_defaults.read_string(default_config)
339 # Set the upgrade value based on the current loaded default
340 default = self.airflow_defaults.get("logging", "log_filename_template", fallback=None)
341 if default:
342 replacement = self.deprecated_values["logging"]["log_filename_template"]
343 self.deprecated_values["logging"]["log_filename_template"] = (
344 replacement[0],
345 default,
346 replacement[2],
347 )
348 else:
349 # In case of tests it might not exist
350 with suppress(KeyError):
351 del self.deprecated_values["logging"]["log_filename_template"]
352 else:
353 with suppress(KeyError):
354 del self.deprecated_values["logging"]["log_filename_template"]
356 self.is_validated = False
357 self._suppress_future_warnings = False
359 def validate(self):
360 self._validate_sqlite3_version()
361 self._validate_enums()
363 for section, replacement in self.deprecated_values.items():
364 for name, info in replacement.items():
365 old, new, version = info
366 current_value = self.get(section, name, fallback="")
367 if self._using_old_value(old, current_value):
368 self.upgraded_values[(section, name)] = current_value
369 new_value = old.sub(new, current_value)
370 self._update_env_var(section=section, name=name, new_value=new_value)
371 self._create_future_warning(
372 name=name,
373 section=section,
374 current_value=current_value,
375 new_value=new_value,
376 version=version,
377 )
379 self._upgrade_auth_backends()
380 self._upgrade_postgres_metastore_conn()
381 self.is_validated = True
383 def _upgrade_auth_backends(self):
384 """
385 Ensure a custom auth_backends setting contains session.
387 This is required by the UI for ajax queries.
388 """
389 old_value = self.get("api", "auth_backends", fallback="")
390 if old_value in ("airflow.api.auth.backend.default", ""):
391 # handled by deprecated_values
392 pass
393 elif old_value.find("airflow.api.auth.backend.session") == -1:
394 new_value = old_value + ",airflow.api.auth.backend.session"
395 self._update_env_var(section="api", name="auth_backends", new_value=new_value)
396 self.upgraded_values[("api", "auth_backends")] = old_value
398 # if the old value is set via env var, we need to wipe it
399 # otherwise, it'll "win" over our adjusted value
400 old_env_var = self._env_var_name("api", "auth_backend")
401 os.environ.pop(old_env_var, None)
403 warnings.warn(
404 "The auth_backends setting in [api] has had airflow.api.auth.backend.session added "
405 "in the running config, which is needed by the UI. Please update your config before "
406 "Apache Airflow 3.0.",
407 FutureWarning,
408 )
410 def _upgrade_postgres_metastore_conn(self):
411 """
412 Upgrade SQL schemas.
414 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres`
415 must be replaced with `postgresql`.
416 """
417 section, key = "database", "sql_alchemy_conn"
418 old_value = self.get(section, key, _extra_stacklevel=1)
419 bad_schemes = ["postgres+psycopg2", "postgres"]
420 good_scheme = "postgresql"
421 parsed = urlsplit(old_value)
422 if parsed.scheme in bad_schemes:
423 warnings.warn(
424 f"Bad scheme in Airflow configuration core > sql_alchemy_conn: `{parsed.scheme}`. "
425 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must "
426 f"change to `{good_scheme}` before the next Airflow release.",
427 FutureWarning,
428 )
429 self.upgraded_values[(section, key)] = old_value
430 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value)
431 self._update_env_var(section=section, name=key, new_value=new_value)
433 # if the old value is set via env var, we need to wipe it
434 # otherwise, it'll "win" over our adjusted value
435 old_env_var = self._env_var_name("core", key)
436 os.environ.pop(old_env_var, None)
438 def _validate_enums(self):
439 """Validate that enum type config has an accepted value."""
440 for (section_key, option_key), enum_options in self.enums_options.items():
441 if self.has_option(section_key, option_key):
442 value = self.get(section_key, option_key)
443 if value not in enum_options:
444 raise AirflowConfigException(
445 f"`[{section_key}] {option_key}` should not be "
446 f"{value!r}. Possible values: {', '.join(enum_options)}."
447 )
449 def _validate_sqlite3_version(self):
450 """Validate SQLite version.
452 Some features in storing rendered fields require SQLite >= 3.15.0.
453 """
454 if "sqlite" not in self.get("database", "sql_alchemy_conn"):
455 return
457 import sqlite3
459 min_sqlite_version = (3, 15, 0)
460 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version:
461 return
463 from airflow.utils.docs import get_docs_url
465 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version)
466 raise AirflowConfigException(
467 f"error: SQLite C library too old (< {min_sqlite_version_str}). "
468 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}"
469 )
471 def _using_old_value(self, old: Pattern, current_value: str) -> bool:
472 return old.search(current_value) is not None
474 def _update_env_var(self, section: str, name: str, new_value: str):
475 env_var = self._env_var_name(section, name)
476 # Set it as an env var so that any subprocesses keep the same override!
477 os.environ[env_var] = new_value
479 @staticmethod
480 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any, version: str):
481 warnings.warn(
482 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. "
483 f"This value has been changed to {new_value!r} in the running config, but "
484 f"please update your config before Apache Airflow {version}.",
485 FutureWarning,
486 )
488 def _env_var_name(self, section: str, key: str) -> str:
489 return f"{ENV_VAR_PREFIX}{section.upper()}__{key.upper()}"
491 def _get_env_var_option(self, section: str, key: str):
492 # must have format AIRFLOW__{SECTION}__{KEY} (note double underscore)
493 env_var = self._env_var_name(section, key)
494 if env_var in os.environ:
495 return expand_env_var(os.environ[env_var])
496 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
497 env_var_cmd = env_var + "_CMD"
498 if env_var_cmd in os.environ:
499 # if this is a valid command key...
500 if (section, key) in self.sensitive_config_values:
501 return run_command(os.environ[env_var_cmd])
502 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
503 env_var_secret_path = env_var + "_SECRET"
504 if env_var_secret_path in os.environ:
505 # if this is a valid secret path...
506 if (section, key) in self.sensitive_config_values:
507 return _get_config_value_from_secret_backend(os.environ[env_var_secret_path])
508 return None
510 def _get_cmd_option(self, section: str, key: str):
511 fallback_key = key + "_cmd"
512 if (section, key) in self.sensitive_config_values:
513 if super().has_option(section, fallback_key):
514 command = super().get(section, fallback_key)
515 return run_command(command)
516 return None
518 def _get_cmd_option_from_config_sources(
519 self, config_sources: ConfigSourcesType, section: str, key: str
520 ) -> str | None:
521 fallback_key = key + "_cmd"
522 if (section, key) in self.sensitive_config_values:
523 section_dict = config_sources.get(section)
524 if section_dict is not None:
525 command_value = section_dict.get(fallback_key)
526 if command_value is not None:
527 if isinstance(command_value, str):
528 command = command_value
529 else:
530 command = command_value[0]
531 return run_command(command)
532 return None
534 def _get_secret_option(self, section: str, key: str) -> str | None:
535 """Get Config option values from Secret Backend."""
536 fallback_key = key + "_secret"
537 if (section, key) in self.sensitive_config_values:
538 if super().has_option(section, fallback_key):
539 secrets_path = super().get(section, fallback_key)
540 return _get_config_value_from_secret_backend(secrets_path)
541 return None
543 def _get_secret_option_from_config_sources(
544 self, config_sources: ConfigSourcesType, section: str, key: str
545 ) -> str | None:
546 fallback_key = key + "_secret"
547 if (section, key) in self.sensitive_config_values:
548 section_dict = config_sources.get(section)
549 if section_dict is not None:
550 secrets_path_value = section_dict.get(fallback_key)
551 if secrets_path_value is not None:
552 if isinstance(secrets_path_value, str):
553 secrets_path = secrets_path_value
554 else:
555 secrets_path = secrets_path_value[0]
556 return _get_config_value_from_secret_backend(secrets_path)
557 return None
559 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
560 value = self.get(section, key, _extra_stacklevel=1, **kwargs)
561 if value is None:
562 raise ValueError(f"The value {section}/{key} should be set!")
563 return value
565 @overload # type: ignore[override]
566 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: # type: ignore[override]
567 ...
569 @overload # type: ignore[override]
570 def get(self, section: str, key: str, **kwargs) -> str | None: # type: ignore[override]
571 ...
573 def get( # type: ignore[override, misc]
574 self,
575 section: str,
576 key: str,
577 _extra_stacklevel: int = 0,
578 **kwargs,
579 ) -> str | None:
580 section = str(section).lower()
581 key = str(key).lower()
582 warning_emitted = False
583 deprecated_section: str | None
584 deprecated_key: str | None
586 # For when we rename whole sections
587 if section in self.inversed_deprecated_sections:
588 deprecated_section, deprecated_key = (section, key)
589 section = self.inversed_deprecated_sections[section]
590 if not self._suppress_future_warnings:
591 warnings.warn(
592 f"The config section [{deprecated_section}] has been renamed to "
593 f"[{section}]. Please update your `conf.get*` call to use the new name",
594 FutureWarning,
595 stacklevel=2 + _extra_stacklevel,
596 )
597 # Don't warn about individual rename if the whole section is renamed
598 warning_emitted = True
599 elif (section, key) in self.inversed_deprecated_options:
600 # Handle using deprecated section/key instead of the new section/key
601 new_section, new_key = self.inversed_deprecated_options[(section, key)]
602 if not self._suppress_future_warnings and not warning_emitted:
603 warnings.warn(
604 f"section/key [{section}/{key}] has been deprecated, you should use"
605 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
606 "new name",
607 FutureWarning,
608 stacklevel=2 + _extra_stacklevel,
609 )
610 warning_emitted = True
611 deprecated_section, deprecated_key = section, key
612 section, key = (new_section, new_key)
613 elif section in self.deprecated_sections:
614 # When accessing the new section name, make sure we check under the old config name
615 deprecated_key = key
616 deprecated_section = self.deprecated_sections[section][0]
617 else:
618 deprecated_section, deprecated_key, _ = self.deprecated_options.get(
619 (section, key), (None, None, None)
620 )
622 # first check environment variables
623 option = self._get_environment_variables(
624 deprecated_key,
625 deprecated_section,
626 key,
627 section,
628 issue_warning=not warning_emitted,
629 extra_stacklevel=_extra_stacklevel,
630 )
631 if option is not None:
632 return option
634 # ...then the config file
635 option = self._get_option_from_config_file(
636 deprecated_key,
637 deprecated_section,
638 key,
639 kwargs,
640 section,
641 issue_warning=not warning_emitted,
642 extra_stacklevel=_extra_stacklevel,
643 )
644 if option is not None:
645 return option
647 # ...then commands
648 option = self._get_option_from_commands(
649 deprecated_key,
650 deprecated_section,
651 key,
652 section,
653 issue_warning=not warning_emitted,
654 extra_stacklevel=_extra_stacklevel,
655 )
656 if option is not None:
657 return option
659 # ...then from secret backends
660 option = self._get_option_from_secrets(
661 deprecated_key,
662 deprecated_section,
663 key,
664 section,
665 issue_warning=not warning_emitted,
666 extra_stacklevel=_extra_stacklevel,
667 )
668 if option is not None:
669 return option
671 # ...then the default config
672 if self.airflow_defaults.has_option(section, key) or "fallback" in kwargs:
673 return expand_env_var(self.airflow_defaults.get(section, key, **kwargs))
675 log.warning("section/key [%s/%s] not found in config", section, key)
677 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
679 def _get_option_from_secrets(
680 self,
681 deprecated_key: str | None,
682 deprecated_section: str | None,
683 key: str,
684 section: str,
685 issue_warning: bool = True,
686 extra_stacklevel: int = 0,
687 ) -> str | None:
688 option = self._get_secret_option(section, key)
689 if option:
690 return option
691 if deprecated_section and deprecated_key:
692 with self.suppress_future_warnings():
693 option = self._get_secret_option(deprecated_section, deprecated_key)
694 if option:
695 if issue_warning:
696 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
697 return option
698 return None
700 def _get_option_from_commands(
701 self,
702 deprecated_key: str | None,
703 deprecated_section: str | None,
704 key: str,
705 section: str,
706 issue_warning: bool = True,
707 extra_stacklevel: int = 0,
708 ) -> str | None:
709 option = self._get_cmd_option(section, key)
710 if option:
711 return option
712 if deprecated_section and deprecated_key:
713 with self.suppress_future_warnings():
714 option = self._get_cmd_option(deprecated_section, deprecated_key)
715 if option:
716 if issue_warning:
717 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
718 return option
719 return None
721 def _get_option_from_config_file(
722 self,
723 deprecated_key: str | None,
724 deprecated_section: str | None,
725 key: str,
726 kwargs: dict[str, Any],
727 section: str,
728 issue_warning: bool = True,
729 extra_stacklevel: int = 0,
730 ) -> str | None:
731 if super().has_option(section, key):
732 # Use the parent's methods to get the actual config here to be able to
733 # separate the config from default config.
734 return expand_env_var(super().get(section, key, **kwargs))
735 if deprecated_section and deprecated_key:
736 if super().has_option(deprecated_section, deprecated_key):
737 if issue_warning:
738 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
739 with self.suppress_future_warnings():
740 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
741 return None
743 def _get_environment_variables(
744 self,
745 deprecated_key: str | None,
746 deprecated_section: str | None,
747 key: str,
748 section: str,
749 issue_warning: bool = True,
750 extra_stacklevel: int = 0,
751 ) -> str | None:
752 option = self._get_env_var_option(section, key)
753 if option is not None:
754 return option
755 if deprecated_section and deprecated_key:
756 with self.suppress_future_warnings():
757 option = self._get_env_var_option(deprecated_section, deprecated_key)
758 if option is not None:
759 if issue_warning:
760 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
761 return option
762 return None
764 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override]
765 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip()
766 if "#" in val:
767 val = val.split("#")[0].strip()
768 if val in ("t", "true", "1"):
769 return True
770 elif val in ("f", "false", "0"):
771 return False
772 else:
773 raise AirflowConfigException(
774 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. '
775 f'Current value: "{val}".'
776 )
778 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override]
779 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
780 if val is None:
781 raise AirflowConfigException(
782 f"Failed to convert value None to int. "
783 f'Please check "{key}" key in "{section}" section is set.'
784 )
785 try:
786 return int(val)
787 except ValueError:
788 raise AirflowConfigException(
789 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
790 f'Current value: "{val}".'
791 )
793 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override]
794 val = self.get(section, key, _extra_stacklevel=1, **kwargs)
795 if val is None:
796 raise AirflowConfigException(
797 f"Failed to convert value None to float. "
798 f'Please check "{key}" key in "{section}" section is set.'
799 )
800 try:
801 return float(val)
802 except ValueError:
803 raise AirflowConfigException(
804 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. '
805 f'Current value: "{val}".'
806 )
808 def getimport(self, section: str, key: str, **kwargs) -> Any:
809 """
810 Reads options, imports the full qualified name, and returns the object.
812 In case of failure, it throws an exception with the key and section names
814 :return: The object or None, if the option is empty
815 """
816 full_qualified_path = conf.get(section=section, key=key, **kwargs)
817 if not full_qualified_path:
818 return None
820 try:
821 return import_string(full_qualified_path)
822 except ImportError as e:
823 log.error(e)
824 raise AirflowConfigException(
825 f'The object could not be loaded. Please check "{key}" key in "{section}" section. '
826 f'Current value: "{full_qualified_path}".'
827 )
829 def getjson(
830 self, section: str, key: str, fallback=_UNSET, **kwargs
831 ) -> dict | list | str | int | float | None:
832 """
833 Return a config value parsed from a JSON string.
835 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given.
836 """
837 # get always returns the fallback value as a string, so for this if
838 # someone gives us an object we want to keep that
839 default = _UNSET
840 if fallback is not _UNSET:
841 default = fallback
842 fallback = _UNSET
844 try:
845 data = self.get(section=section, key=key, fallback=fallback, _extra_stacklevel=1, **kwargs)
846 except (NoSectionError, NoOptionError):
847 return default
849 if not data:
850 return default if default is not _UNSET else None
852 try:
853 return json.loads(data)
854 except JSONDecodeError as e:
855 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e
857 def gettimedelta(
858 self, section: str, key: str, fallback: Any = None, **kwargs
859 ) -> datetime.timedelta | None:
860 """
861 Gets the config value for the given section and key, and converts it into datetime.timedelta object.
863 If the key is missing, then it is considered as `None`.
865 :param section: the section from the config
866 :param key: the key defined in the given section
867 :param fallback: fallback value when no config value is given, defaults to None
868 :raises AirflowConfigException: raised because ValueError or OverflowError
869 :return: datetime.timedelta(seconds=<config_value>) or None
870 """
871 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs)
873 if val:
874 # the given value must be convertible to integer
875 try:
876 int_val = int(val)
877 except ValueError:
878 raise AirflowConfigException(
879 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
880 f'Current value: "{val}".'
881 )
883 try:
884 return datetime.timedelta(seconds=int_val)
885 except OverflowError as err:
886 raise AirflowConfigException(
887 f"Failed to convert value to timedelta in `seconds`. "
888 f"{err}. "
889 f'Please check "{key}" key in "{section}" section. Current value: "{val}".'
890 )
892 return fallback
894 def read(
895 self,
896 filenames: (str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike]),
897 encoding=None,
898 ):
899 super().read(filenames=filenames, encoding=encoding)
901 # The RawConfigParser defines "Mapping" from abc.collections is not subscriptable - so we have
902 # to use Dict here.
903 def read_dict( # type: ignore[override]
904 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>"
905 ):
906 super().read_dict(dictionary=dictionary, source=source)
908 def has_option(self, section: str, option: str) -> bool:
909 try:
910 # Using self.get() to avoid reimplementing the priority order
911 # of config variables (env, config, cmd, defaults)
912 # UNSET to avoid logging a warning about missing values
913 self.get(section, option, fallback=_UNSET, _extra_stacklevel=1)
914 return True
915 except (NoOptionError, NoSectionError):
916 return False
918 def remove_option(self, section: str, option: str, remove_default: bool = True):
919 """
920 Remove an option if it exists in config from a file or default config.
922 If both of config have the same option, this removes the option
923 in both configs unless remove_default=False.
924 """
925 if super().has_option(section, option):
926 super().remove_option(section, option)
928 if self.airflow_defaults.has_option(section, option) and remove_default:
929 self.airflow_defaults.remove_option(section, option)
931 def getsection(self, section: str) -> ConfigOptionsDictType | None:
932 """
933 Returns the section as a dict.
935 Values are converted to int, float, bool as required.
937 :param section: section from the config
938 """
939 if not self.has_section(section) and not self.airflow_defaults.has_section(section):
940 return None
941 if self.airflow_defaults.has_section(section):
942 _section: ConfigOptionsDictType = OrderedDict(self.airflow_defaults.items(section))
943 else:
944 _section = OrderedDict()
946 if self.has_section(section):
947 _section.update(OrderedDict(self.items(section)))
949 section_prefix = self._env_var_name(section, "")
950 for env_var in sorted(os.environ.keys()):
951 if env_var.startswith(section_prefix):
952 key = env_var.replace(section_prefix, "")
953 if key.endswith("_CMD"):
954 key = key[:-4]
955 key = key.lower()
956 _section[key] = self._get_env_var_option(section, key)
958 for key, val in _section.items():
959 if val is None:
960 raise AirflowConfigException(
961 f"Failed to convert value automatically. "
962 f'Please check "{key}" key in "{section}" section is set.'
963 )
964 try:
965 _section[key] = int(val)
966 except ValueError:
967 try:
968 _section[key] = float(val)
969 except ValueError:
970 if isinstance(val, str) and val.lower() in ("t", "true"):
971 _section[key] = True
972 elif isinstance(val, str) and val.lower() in ("f", "false"):
973 _section[key] = False
974 return _section
976 def write( # type: ignore[override]
977 self, fp: IO, space_around_delimiters: bool = True, section: str | None = None
978 ) -> None:
979 # This is based on the configparser.RawConfigParser.write method code to add support for
980 # reading options from environment variables.
981 # Various type ignores below deal with less-than-perfect RawConfigParser superclass typing
982 if space_around_delimiters:
983 delimiter = f" {self._delimiters[0]} " # type: ignore[attr-defined]
984 else:
985 delimiter = self._delimiters[0] # type: ignore[attr-defined]
986 if self._defaults: # type: ignore
987 self._write_section( # type: ignore[attr-defined]
988 fp, self.default_section, self._defaults.items(), delimiter # type: ignore[attr-defined]
989 )
990 sections = (
991 {section: dict(self.getsection(section))} # type: ignore[arg-type]
992 if section
993 else self._sections # type: ignore[attr-defined]
994 )
995 for sect in sections:
996 item_section: ConfigOptionsDictType = self.getsection(sect) # type: ignore[assignment]
997 self._write_section(fp, sect, item_section.items(), delimiter) # type: ignore[attr-defined]
999 def as_dict(
1000 self,
1001 display_source: bool = False,
1002 display_sensitive: bool = False,
1003 raw: bool = False,
1004 include_env: bool = True,
1005 include_cmds: bool = True,
1006 include_secret: bool = True,
1007 ) -> ConfigSourcesType:
1008 """
1009 Returns the current configuration as an OrderedDict of OrderedDicts.
1011 When materializing current configuration Airflow defaults are
1012 materialized along with user set configs. If any of the `include_*`
1013 options are False then the result of calling command or secret key
1014 configs do not override Airflow defaults and instead are passed through.
1015 In order to then avoid Airflow defaults from overwriting user set
1016 command or secret key configs we filter out bare sensitive_config_values
1017 that are set to Airflow defaults when command or secret key configs
1018 produce different values.
1020 :param display_source: If False, the option value is returned. If True,
1021 a tuple of (option_value, source) is returned. Source is either
1022 'airflow.cfg', 'default', 'env var', or 'cmd'.
1023 :param display_sensitive: If True, the values of options set by env
1024 vars and bash commands will be displayed. If False, those options
1025 are shown as '< hidden >'
1026 :param raw: Should the values be output as interpolated values, or the
1027 "raw" form that can be fed back in to ConfigParser
1028 :param include_env: Should the value of configuration from AIRFLOW__
1029 environment variables be included or not
1030 :param include_cmds: Should the result of calling any *_cmd config be
1031 set (True, default), or should the _cmd options be left as the
1032 command to run (False)
1033 :param include_secret: Should the result of calling any *_secret config be
1034 set (True, default), or should the _secret options be left as the
1035 path to get the secret from (False)
1036 :return: Dictionary, where the key is the name of the section and the content is
1037 the dictionary with the name of the parameter and its value.
1038 """
1039 if not display_sensitive:
1040 # We want to hide the sensitive values at the appropriate methods
1041 # since envs from cmds, secrets can be read at _include_envs method
1042 if not all([include_env, include_cmds, include_secret]):
1043 raise ValueError(
1044 "If display_sensitive is false, then include_env, "
1045 "include_cmds, include_secret must all be set as True"
1046 )
1048 config_sources: ConfigSourcesType = {}
1049 configs = [
1050 ("default", self.airflow_defaults),
1051 ("airflow.cfg", self),
1052 ]
1054 self._replace_config_with_display_sources(
1055 config_sources,
1056 configs,
1057 display_source,
1058 raw,
1059 self.deprecated_options,
1060 include_cmds=include_cmds,
1061 include_env=include_env,
1062 include_secret=include_secret,
1063 )
1065 # add env vars and overwrite because they have priority
1066 if include_env:
1067 self._include_envs(config_sources, display_sensitive, display_source, raw)
1068 else:
1069 self._filter_by_source(config_sources, display_source, self._get_env_var_option)
1071 # add bash commands
1072 if include_cmds:
1073 self._include_commands(config_sources, display_sensitive, display_source, raw)
1074 else:
1075 self._filter_by_source(config_sources, display_source, self._get_cmd_option)
1077 # add config from secret backends
1078 if include_secret:
1079 self._include_secrets(config_sources, display_sensitive, display_source, raw)
1080 else:
1081 self._filter_by_source(config_sources, display_source, self._get_secret_option)
1083 if not display_sensitive:
1084 # This ensures the ones from config file is hidden too
1085 # if they are not provided through env, cmd and secret
1086 hidden = "< hidden >"
1087 for section, key in self.sensitive_config_values:
1088 if not config_sources.get(section):
1089 continue
1090 if config_sources[section].get(key, None):
1091 if display_source:
1092 source = config_sources[section][key][1]
1093 config_sources[section][key] = (hidden, source)
1094 else:
1095 config_sources[section][key] = hidden
1097 return config_sources
1099 def _include_secrets(
1100 self,
1101 config_sources: ConfigSourcesType,
1102 display_sensitive: bool,
1103 display_source: bool,
1104 raw: bool,
1105 ):
1106 for section, key in self.sensitive_config_values:
1107 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key)
1108 if value:
1109 if not display_sensitive:
1110 value = "< hidden >"
1111 if display_source:
1112 opt: str | tuple[str, str] = (value, "secret")
1113 elif raw:
1114 opt = value.replace("%", "%%")
1115 else:
1116 opt = value
1117 config_sources.setdefault(section, OrderedDict()).update({key: opt})
1118 del config_sources[section][key + "_secret"]
1120 def _include_commands(
1121 self,
1122 config_sources: ConfigSourcesType,
1123 display_sensitive: bool,
1124 display_source: bool,
1125 raw: bool,
1126 ):
1127 for section, key in self.sensitive_config_values:
1128 opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
1129 if not opt:
1130 continue
1131 opt_to_set: str | tuple[str, str] | None = opt
1132 if not display_sensitive:
1133 opt_to_set = "< hidden >"
1134 if display_source:
1135 opt_to_set = (str(opt_to_set), "cmd")
1136 elif raw:
1137 opt_to_set = str(opt_to_set).replace("%", "%%")
1138 if opt_to_set is not None:
1139 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set}
1140 config_sources.setdefault(section, OrderedDict()).update(dict_to_update)
1141 del config_sources[section][key + "_cmd"]
1143 def _include_envs(
1144 self,
1145 config_sources: ConfigSourcesType,
1146 display_sensitive: bool,
1147 display_source: bool,
1148 raw: bool,
1149 ):
1150 for env_var in [
1151 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
1152 ]:
1153 try:
1154 _, section, key = env_var.split("__", 2)
1155 opt = self._get_env_var_option(section, key)
1156 except ValueError:
1157 continue
1158 if opt is None:
1159 log.warning("Ignoring unknown env var '%s'", env_var)
1160 continue
1161 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"):
1162 # Don't hide cmd/secret values here
1163 if not env_var.lower().endswith("cmd") and not env_var.lower().endswith("secret"):
1164 if (section, key) in self.sensitive_config_values:
1165 opt = "< hidden >"
1166 elif raw:
1167 opt = opt.replace("%", "%%")
1168 if display_source:
1169 opt = (opt, "env var")
1171 section = section.lower()
1172 # if we lower key for kubernetes_environment_variables section,
1173 # then we won't be able to set any Airflow environment
1174 # variables. Airflow only parse environment variables starts
1175 # with AIRFLOW_. Therefore, we need to make it a special case.
1176 if section != "kubernetes_environment_variables":
1177 key = key.lower()
1178 config_sources.setdefault(section, OrderedDict()).update({key: opt})
1180 def _filter_by_source(
1181 self,
1182 config_sources: ConfigSourcesType,
1183 display_source: bool,
1184 getter_func,
1185 ):
1186 """
1187 Deletes default configs from current configuration.
1189 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values.
1191 This is necessary because bare configs take precedence over the command
1192 or secret key equivalents so if the current running config is
1193 materialized with Airflow defaults they in turn override user set
1194 command or secret key configs.
1196 :param config_sources: The current configuration to operate on
1197 :param display_source: If False, configuration options contain raw
1198 values. If True, options are a tuple of (option_value, source).
1199 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
1200 :param getter_func: A callback function that gets the user configured
1201 override value for a particular sensitive_config_values config.
1202 :return: None, the given config_sources is filtered if necessary,
1203 otherwise untouched.
1204 """
1205 for section, key in self.sensitive_config_values:
1206 # Don't bother if we don't have section / key
1207 if section not in config_sources or key not in config_sources[section]:
1208 continue
1209 # Check that there is something to override defaults
1210 try:
1211 getter_opt = getter_func(section, key)
1212 except ValueError:
1213 continue
1214 if not getter_opt:
1215 continue
1216 # Check to see that there is a default value
1217 if not self.airflow_defaults.has_option(section, key):
1218 continue
1219 # Check to see if bare setting is the same as defaults
1220 if display_source:
1221 # when display_source = true, we know that the config_sources contains tuple
1222 opt, source = config_sources[section][key] # type: ignore
1223 else:
1224 opt = config_sources[section][key]
1225 if opt == self.airflow_defaults.get(section, key):
1226 del config_sources[section][key]
1228 @staticmethod
1229 def _replace_config_with_display_sources(
1230 config_sources: ConfigSourcesType,
1231 configs: Iterable[tuple[str, ConfigParser]],
1232 display_source: bool,
1233 raw: bool,
1234 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
1235 include_env: bool,
1236 include_cmds: bool,
1237 include_secret: bool,
1238 ):
1239 for source_name, config in configs:
1240 for section in config.sections():
1241 AirflowConfigParser._replace_section_config_with_display_sources(
1242 config,
1243 config_sources,
1244 display_source,
1245 raw,
1246 section,
1247 source_name,
1248 deprecated_options,
1249 configs,
1250 include_env=include_env,
1251 include_cmds=include_cmds,
1252 include_secret=include_secret,
1253 )
1255 @staticmethod
1256 def _deprecated_value_is_set_in_config(
1257 deprecated_section: str,
1258 deprecated_key: str,
1259 configs: Iterable[tuple[str, ConfigParser]],
1260 ) -> bool:
1261 for config_type, config in configs:
1262 if config_type == "default":
1263 continue
1264 try:
1265 deprecated_section_array = config.items(section=deprecated_section, raw=True)
1266 for key_candidate, _ in deprecated_section_array:
1267 if key_candidate == deprecated_key:
1268 return True
1269 except NoSectionError:
1270 pass
1271 return False
1273 @staticmethod
1274 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
1275 return (
1276 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}")
1277 is not None
1278 )
1280 @staticmethod
1281 def _deprecated_command_is_set_in_config(
1282 deprecated_section: str, deprecated_key: str, configs: Iterable[tuple[str, ConfigParser]]
1283 ) -> bool:
1284 return AirflowConfigParser._deprecated_value_is_set_in_config(
1285 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
1286 )
1288 @staticmethod
1289 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
1290 return (
1291 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD")
1292 is not None
1293 )
1295 @staticmethod
1296 def _deprecated_secret_is_set_in_config(
1297 deprecated_section: str, deprecated_key: str, configs: Iterable[tuple[str, ConfigParser]]
1298 ) -> bool:
1299 return AirflowConfigParser._deprecated_value_is_set_in_config(
1300 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
1301 )
1303 @staticmethod
1304 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
1305 return (
1306 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET")
1307 is not None
1308 )
1310 @contextmanager
1311 def suppress_future_warnings(self):
1312 suppress_future_warnings = self._suppress_future_warnings
1313 self._suppress_future_warnings = True
1314 yield self
1315 self._suppress_future_warnings = suppress_future_warnings
1317 @staticmethod
1318 def _replace_section_config_with_display_sources(
1319 config: ConfigParser,
1320 config_sources: ConfigSourcesType,
1321 display_source: bool,
1322 raw: bool,
1323 section: str,
1324 source_name: str,
1325 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
1326 configs: Iterable[tuple[str, ConfigParser]],
1327 include_env: bool,
1328 include_cmds: bool,
1329 include_secret: bool,
1330 ):
1331 sect = config_sources.setdefault(section, OrderedDict())
1332 if isinstance(config, AirflowConfigParser):
1333 with config.suppress_future_warnings():
1334 items = config.items(section=section, raw=raw)
1335 else:
1336 items = config.items(section=section, raw=raw)
1337 for k, val in items:
1338 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
1339 if deprecated_section and deprecated_key:
1340 if source_name == "default":
1341 # If deprecated entry has some non-default value set for any of the sources requested,
1342 # We should NOT set default for the new entry (because it will override anything
1343 # coming from the deprecated ones)
1344 if AirflowConfigParser._deprecated_value_is_set_in_config(
1345 deprecated_section, deprecated_key, configs
1346 ):
1347 continue
1348 if include_env and AirflowConfigParser._deprecated_variable_is_set(
1349 deprecated_section, deprecated_key
1350 ):
1351 continue
1352 if include_cmds and (
1353 AirflowConfigParser._deprecated_variable_command_is_set(
1354 deprecated_section, deprecated_key
1355 )
1356 or AirflowConfigParser._deprecated_command_is_set_in_config(
1357 deprecated_section, deprecated_key, configs
1358 )
1359 ):
1360 continue
1361 if include_secret and (
1362 AirflowConfigParser._deprecated_variable_secret_is_set(
1363 deprecated_section, deprecated_key
1364 )
1365 or AirflowConfigParser._deprecated_secret_is_set_in_config(
1366 deprecated_section, deprecated_key, configs
1367 )
1368 ):
1369 continue
1370 if display_source:
1371 sect[k] = (val, source_name)
1372 else:
1373 sect[k] = val
1375 def load_test_config(self):
1376 """
1377 Load the unit test configuration.
1379 Note: this is not reversible.
1380 """
1381 # remove all sections, falling back to defaults
1382 for section in self.sections():
1383 self.remove_section(section)
1385 # then read test config
1387 path = _default_config_file_path("default_test.cfg")
1388 log.info("Reading default test configuration from %s", path)
1389 self.read_string(_parameterized_config_from_template("default_test.cfg"))
1390 # then read any "custom" test settings
1391 log.info("Reading test configuration from %s", TEST_CONFIG_FILE)
1392 self.read(TEST_CONFIG_FILE)
1394 @staticmethod
1395 def _warn_deprecate(
1396 section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int
1397 ):
1398 if section == deprecated_section:
1399 warnings.warn(
1400 f"The {deprecated_name} option in [{section}] has been renamed to {key} - "
1401 f"the old setting has been used, but please update your config.",
1402 DeprecationWarning,
1403 stacklevel=4 + extra_stacklevel,
1404 )
1405 else:
1406 warnings.warn(
1407 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option "
1408 f"in [{section}] - the old setting has been used, but please update your config.",
1409 DeprecationWarning,
1410 stacklevel=4 + extra_stacklevel,
1411 )
1413 def __getstate__(self):
1414 return {
1415 name: getattr(self, name)
1416 for name in [
1417 "_sections",
1418 "is_validated",
1419 "airflow_defaults",
1420 ]
1421 }
1423 def __setstate__(self, state):
1424 self.__init__()
1425 config = state.pop("_sections")
1426 self.read_dict(config)
1427 self.__dict__.update(state)
1430def get_airflow_home() -> str:
1431 """Get path to Airflow Home."""
1432 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow"))
1435def get_airflow_config(airflow_home) -> str:
1436 """Get Path to airflow.cfg path."""
1437 airflow_config_var = os.environ.get("AIRFLOW_CONFIG")
1438 if airflow_config_var is None:
1439 return os.path.join(airflow_home, "airflow.cfg")
1440 return expand_env_var(airflow_config_var)
1443def _parameterized_config_from_template(filename) -> str:
1444 TEMPLATE_START = "# ----------------------- TEMPLATE BEGINS HERE -----------------------\n"
1446 path = _default_config_file_path(filename)
1447 with open(path) as fh:
1448 for line in fh:
1449 if line != TEMPLATE_START:
1450 continue
1451 return parameterized_config(fh.read().strip())
1452 raise RuntimeError(f"Template marker not found in {path!r}")
1455def parameterized_config(template) -> str:
1456 """
1457 Generates configuration from provided template & variables defined in current scope.
1459 :param template: a config content templated with {{variables}}
1460 """
1461 all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()}
1462 return template.format(**all_vars)
1465def get_airflow_test_config(airflow_home) -> str:
1466 """Get path to unittests.cfg."""
1467 if "AIRFLOW_TEST_CONFIG" not in os.environ:
1468 return os.path.join(airflow_home, "unittests.cfg")
1469 # It will never return None
1470 return expand_env_var(os.environ["AIRFLOW_TEST_CONFIG"]) # type: ignore[return-value]
1473def _generate_fernet_key() -> str:
1474 from cryptography.fernet import Fernet
1476 return Fernet.generate_key().decode()
1479def initialize_config() -> AirflowConfigParser:
1480 """
1481 Load the Airflow config files.
1483 Called for you automatically as part of the Airflow boot process.
1484 """
1485 global FERNET_KEY, AIRFLOW_HOME, WEBSERVER_CONFIG
1487 default_config = _parameterized_config_from_template("default_airflow.cfg")
1489 local_conf = AirflowConfigParser(default_config=default_config)
1491 if local_conf.getboolean("core", "unit_test_mode"):
1492 # Load test config only
1493 if not os.path.isfile(TEST_CONFIG_FILE):
1494 from cryptography.fernet import Fernet
1496 log.info("Creating new Airflow config file for unit tests in: %s", TEST_CONFIG_FILE)
1497 pathlib.Path(AIRFLOW_HOME).mkdir(parents=True, exist_ok=True)
1499 FERNET_KEY = Fernet.generate_key().decode()
1501 with open(TEST_CONFIG_FILE, "w") as file:
1502 cfg = _parameterized_config_from_template("default_test.cfg")
1503 file.write(cfg)
1504 make_group_other_inaccessible(TEST_CONFIG_FILE)
1506 local_conf.load_test_config()
1507 else:
1508 # Load normal config
1509 if not os.path.isfile(AIRFLOW_CONFIG):
1510 from cryptography.fernet import Fernet
1512 log.info("Creating new Airflow config file in: %s", AIRFLOW_CONFIG)
1513 pathlib.Path(AIRFLOW_HOME).mkdir(parents=True, exist_ok=True)
1515 FERNET_KEY = Fernet.generate_key().decode()
1517 with open(AIRFLOW_CONFIG, "w") as file:
1518 file.write(default_config)
1519 make_group_other_inaccessible(AIRFLOW_CONFIG)
1521 log.info("Reading the config from %s", AIRFLOW_CONFIG)
1523 local_conf.read(AIRFLOW_CONFIG)
1525 if local_conf.has_option("core", "AIRFLOW_HOME"):
1526 msg = (
1527 "Specifying both AIRFLOW_HOME environment variable and airflow_home "
1528 "in the config file is deprecated. Please use only the AIRFLOW_HOME "
1529 "environment variable and remove the config file entry."
1530 )
1531 if "AIRFLOW_HOME" in os.environ:
1532 warnings.warn(msg, category=DeprecationWarning)
1533 elif local_conf.get("core", "airflow_home") == AIRFLOW_HOME:
1534 warnings.warn(
1535 "Specifying airflow_home in the config file is deprecated. As you "
1536 "have left it at the default value you should remove the setting "
1537 "from your airflow.cfg and suffer no change in behaviour.",
1538 category=DeprecationWarning,
1539 )
1540 else:
1541 # there
1542 AIRFLOW_HOME = local_conf.get("core", "airflow_home") # type: ignore[assignment]
1543 warnings.warn(msg, category=DeprecationWarning)
1545 # They _might_ have set unit_test_mode in the airflow.cfg, we still
1546 # want to respect that and then load the unittests.cfg
1547 if local_conf.getboolean("core", "unit_test_mode"):
1548 local_conf.load_test_config()
1550 WEBSERVER_CONFIG = local_conf.get("webserver", "config_file")
1551 if not os.path.isfile(WEBSERVER_CONFIG):
1552 import shutil
1554 log.info("Creating new FAB webserver config file in: %s", WEBSERVER_CONFIG)
1555 shutil.copy(_default_config_file_path("default_webserver_config.py"), WEBSERVER_CONFIG)
1556 return local_conf
1559def make_group_other_inaccessible(file_path: str):
1560 try:
1561 permissions = os.stat(file_path)
1562 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR))
1563 except Exception as e:
1564 log.warning(
1565 "Could not change permissions of config file to be group/other inaccessible. "
1566 "Continuing with original permissions:",
1567 e,
1568 )
1571# Historical convenience functions to access config entries
1572def load_test_config():
1573 """Historical load_test_config."""
1574 warnings.warn(
1575 "Accessing configuration method 'load_test_config' directly from the configuration module is "
1576 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1577 "'conf.load_test_config'",
1578 DeprecationWarning,
1579 stacklevel=2,
1580 )
1581 conf.load_test_config()
1584def get(*args, **kwargs) -> ConfigType | None:
1585 """Historical get."""
1586 warnings.warn(
1587 "Accessing configuration method 'get' directly from the configuration module is "
1588 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1589 "'conf.get'",
1590 DeprecationWarning,
1591 stacklevel=2,
1592 )
1593 return conf.get(*args, **kwargs)
1596def getboolean(*args, **kwargs) -> bool:
1597 """Historical getboolean."""
1598 warnings.warn(
1599 "Accessing configuration method 'getboolean' directly from the configuration module is "
1600 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1601 "'conf.getboolean'",
1602 DeprecationWarning,
1603 stacklevel=2,
1604 )
1605 return conf.getboolean(*args, **kwargs)
1608def getfloat(*args, **kwargs) -> float:
1609 """Historical getfloat."""
1610 warnings.warn(
1611 "Accessing configuration method 'getfloat' directly from the configuration module is "
1612 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1613 "'conf.getfloat'",
1614 DeprecationWarning,
1615 stacklevel=2,
1616 )
1617 return conf.getfloat(*args, **kwargs)
1620def getint(*args, **kwargs) -> int:
1621 """Historical getint."""
1622 warnings.warn(
1623 "Accessing configuration method 'getint' directly from the configuration module is "
1624 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1625 "'conf.getint'",
1626 DeprecationWarning,
1627 stacklevel=2,
1628 )
1629 return conf.getint(*args, **kwargs)
1632def getsection(*args, **kwargs) -> ConfigOptionsDictType | None:
1633 """Historical getsection."""
1634 warnings.warn(
1635 "Accessing configuration method 'getsection' directly from the configuration module is "
1636 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1637 "'conf.getsection'",
1638 DeprecationWarning,
1639 stacklevel=2,
1640 )
1641 return conf.getsection(*args, **kwargs)
1644def has_option(*args, **kwargs) -> bool:
1645 """Historical has_option."""
1646 warnings.warn(
1647 "Accessing configuration method 'has_option' directly from the configuration module is "
1648 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1649 "'conf.has_option'",
1650 DeprecationWarning,
1651 stacklevel=2,
1652 )
1653 return conf.has_option(*args, **kwargs)
1656def remove_option(*args, **kwargs) -> bool:
1657 """Historical remove_option."""
1658 warnings.warn(
1659 "Accessing configuration method 'remove_option' directly from the configuration module is "
1660 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1661 "'conf.remove_option'",
1662 DeprecationWarning,
1663 stacklevel=2,
1664 )
1665 return conf.remove_option(*args, **kwargs)
1668def as_dict(*args, **kwargs) -> ConfigSourcesType:
1669 """Historical as_dict."""
1670 warnings.warn(
1671 "Accessing configuration method 'as_dict' directly from the configuration module is "
1672 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1673 "'conf.as_dict'",
1674 DeprecationWarning,
1675 stacklevel=2,
1676 )
1677 return conf.as_dict(*args, **kwargs)
1680def set(*args, **kwargs) -> None:
1681 """Historical set."""
1682 warnings.warn(
1683 "Accessing configuration method 'set' directly from the configuration module is "
1684 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1685 "'conf.set'",
1686 DeprecationWarning,
1687 stacklevel=2,
1688 )
1689 conf.set(*args, **kwargs)
1692def ensure_secrets_loaded() -> list[BaseSecretsBackend]:
1693 """
1694 Ensure that all secrets backends are loaded.
1695 If the secrets_backend_list contains only 2 default backends, reload it.
1696 """
1697 # Check if the secrets_backend_list contains only 2 default backends
1698 if len(secrets_backend_list) == 2:
1699 return initialize_secrets_backends()
1700 return secrets_backend_list
1703def get_custom_secret_backend() -> BaseSecretsBackend | None:
1704 """Get Secret Backend if defined in airflow.cfg."""
1705 secrets_backend_cls = conf.getimport(section="secrets", key="backend")
1707 if not secrets_backend_cls:
1708 return None
1710 try:
1711 backend_kwargs = conf.getjson(section="secrets", key="backend_kwargs")
1712 if not backend_kwargs:
1713 backend_kwargs = {}
1714 elif not isinstance(backend_kwargs, dict):
1715 raise ValueError("not a dict")
1716 except AirflowConfigException:
1717 log.warning("Failed to parse [secrets] backend_kwargs as JSON, defaulting to no kwargs.")
1718 backend_kwargs = {}
1719 except ValueError:
1720 log.warning("Failed to parse [secrets] backend_kwargs into a dict, defaulting to no kwargs.")
1721 backend_kwargs = {}
1723 return secrets_backend_cls(**backend_kwargs)
1726def initialize_secrets_backends() -> list[BaseSecretsBackend]:
1727 """
1728 Initialize secrets backend.
1730 * import secrets backend classes
1731 * instantiate them and return them in a list
1732 """
1733 backend_list = []
1735 custom_secret_backend = get_custom_secret_backend()
1737 if custom_secret_backend is not None:
1738 backend_list.append(custom_secret_backend)
1740 for class_name in DEFAULT_SECRETS_SEARCH_PATH:
1741 secrets_backend_cls = import_string(class_name)
1742 backend_list.append(secrets_backend_cls())
1744 return backend_list
1747@functools.lru_cache(maxsize=None)
1748def _DEFAULT_CONFIG() -> str:
1749 path = _default_config_file_path("default_airflow.cfg")
1750 with open(path) as fh:
1751 return fh.read()
1754@functools.lru_cache(maxsize=None)
1755def _TEST_CONFIG() -> str:
1756 path = _default_config_file_path("default_test.cfg")
1757 with open(path) as fh:
1758 return fh.read()
1761_deprecated = {
1762 "DEFAULT_CONFIG": _DEFAULT_CONFIG,
1763 "TEST_CONFIG": _TEST_CONFIG,
1764 "TEST_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_test.cfg"),
1765 "DEFAULT_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_airflow.cfg"),
1766}
1769def __getattr__(name):
1770 if name in _deprecated:
1771 warnings.warn(
1772 f"{__name__}.{name} is deprecated and will be removed in future",
1773 DeprecationWarning,
1774 stacklevel=2,
1775 )
1776 return _deprecated[name]()
1777 raise AttributeError(f"module {__name__} has no attribute {name}")
1780# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
1781# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults.
1782AIRFLOW_HOME = get_airflow_home()
1783AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME)
1785# Set up dags folder for unit tests
1786# this directory won't exist if users install via pip
1787_TEST_DAGS_FOLDER = os.path.join(
1788 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags"
1789)
1790if os.path.exists(_TEST_DAGS_FOLDER):
1791 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
1792else:
1793 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags")
1795# Set up plugins folder for unit tests
1796_TEST_PLUGINS_FOLDER = os.path.join(
1797 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins"
1798)
1799if os.path.exists(_TEST_PLUGINS_FOLDER):
1800 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
1801else:
1802 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins")
1804TEST_CONFIG_FILE = get_airflow_test_config(AIRFLOW_HOME)
1806SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8")
1807FERNET_KEY = "" # Set only if needed when generating a new file
1808WEBSERVER_CONFIG = "" # Set by initialize_config
1810conf = initialize_config()
1811secrets_backend_list = initialize_secrets_backends()
1812conf.validate()