Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/configuration.py: 48%
769 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import datetime
20import functools
21import json
22import logging
23import multiprocessing
24import os
25import pathlib
26import re
27import shlex
28import subprocess
29import sys
30import warnings
31from base64 import b64encode
32from collections import OrderedDict
34# Ignored Mypy on configparser because it thinks the configparser module has no _UNSET attribute
35from configparser import _UNSET, ConfigParser, NoOptionError, NoSectionError # type: ignore
36from contextlib import contextmanager, suppress
37from json.decoder import JSONDecodeError
38from re import Pattern
39from typing import IO, Any, Dict, Iterable, Tuple, Union
40from urllib.parse import urlsplit
42from typing_extensions import overload
44from airflow.compat.functools import cached_property
45from airflow.exceptions import AirflowConfigException
46from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH, BaseSecretsBackend
47from airflow.utils import yaml
48from airflow.utils.module_loading import import_string
49from airflow.utils.weight_rule import WeightRule
51log = logging.getLogger(__name__)
53# show Airflow's deprecation warnings
54if not sys.warnoptions:
55 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow")
56 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow")
58_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$")
60ConfigType = Union[str, int, float, bool]
61ConfigOptionsDictType = Dict[str, ConfigType]
62ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]]
63ConfigSourcesType = Dict[str, ConfigSectionSourcesType]
65ENV_VAR_PREFIX = "AIRFLOW__"
68def _parse_sqlite_version(s: str) -> tuple[int, ...]:
69 match = _SQLITE3_VERSION_PATTERN.match(s)
70 if match is None:
71 return ()
72 return tuple(int(p) for p in match.group("version").split("."))
75@overload
76def expand_env_var(env_var: None) -> None:
77 ...
80@overload
81def expand_env_var(env_var: str) -> str:
82 ...
85def expand_env_var(env_var: str | None) -> str | None:
86 """
87 Expands (potentially nested) env vars.
89 Repeat and apply `expandvars` and `expanduser` until
90 interpolation stops having any effect.
91 """
92 if not env_var:
93 return env_var
94 while True:
95 interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
96 if interpolated == env_var:
97 return interpolated
98 else:
99 env_var = interpolated
102def run_command(command: str) -> str:
103 """Runs command and returns stdout."""
104 process = subprocess.Popen(
105 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
106 )
107 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
109 if process.returncode != 0:
110 raise AirflowConfigException(
111 f"Cannot execute {command}. Error code is: {process.returncode}. "
112 f"Output: {output}, Stderr: {stderr}"
113 )
115 return output
118def _get_config_value_from_secret_backend(config_key: str) -> str | None:
119 """Get Config option values from Secret Backend."""
120 try:
121 secrets_client = get_custom_secret_backend()
122 if not secrets_client:
123 return None
124 return secrets_client.get_config(config_key)
125 except Exception as e:
126 raise AirflowConfigException(
127 "Cannot retrieve config from alternative secrets backend. "
128 "Make sure it is configured properly and that the Backend "
129 "is accessible.\n"
130 f"{e}"
131 )
134def _default_config_file_path(file_name: str) -> str:
135 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates")
136 return os.path.join(templates_dir, file_name)
139def default_config_yaml() -> dict[str, Any]:
140 """
141 Read Airflow configs from YAML file.
143 :return: Python dictionary containing configs & their info
144 """
145 with open(_default_config_file_path("config.yml")) as config_file:
146 return yaml.safe_load(config_file)
149SENSITIVE_CONFIG_VALUES = {
150 ("database", "sql_alchemy_conn"),
151 ("core", "fernet_key"),
152 ("celery", "broker_url"),
153 ("celery", "flower_basic_auth"),
154 ("celery", "result_backend"),
155 ("atlas", "password"),
156 ("smtp", "smtp_password"),
157 ("webserver", "secret_key"),
158 # The following options are deprecated
159 ("core", "sql_alchemy_conn"),
160}
163class AirflowConfigParser(ConfigParser):
164 """Custom Airflow Configparser supporting defaults and deprecated options."""
166 # These configuration elements can be fetched as the stdout of commands
167 # following the "{section}__{name}_cmd" pattern, the idea behind this
168 # is to not store password on boxes in text files.
169 # These configs can also be fetched from Secrets backend
170 # following the "{section}__{name}__secret" pattern
172 sensitive_config_values: set[tuple[str, str]] = SENSITIVE_CONFIG_VALUES
174 # A mapping of (new section, new option) -> (old section, old option, since_version).
175 # When reading new option, the old option will be checked to see if it exists. If it does a
176 # DeprecationWarning will be issued and the old option will be used instead
177 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
178 ("celery", "worker_precheck"): ("core", "worker_precheck", "2.0.0"),
179 ("logging", "base_log_folder"): ("core", "base_log_folder", "2.0.0"),
180 ("logging", "remote_logging"): ("core", "remote_logging", "2.0.0"),
181 ("logging", "remote_log_conn_id"): ("core", "remote_log_conn_id", "2.0.0"),
182 ("logging", "remote_base_log_folder"): ("core", "remote_base_log_folder", "2.0.0"),
183 ("logging", "encrypt_s3_logs"): ("core", "encrypt_s3_logs", "2.0.0"),
184 ("logging", "logging_level"): ("core", "logging_level", "2.0.0"),
185 ("logging", "fab_logging_level"): ("core", "fab_logging_level", "2.0.0"),
186 ("logging", "logging_config_class"): ("core", "logging_config_class", "2.0.0"),
187 ("logging", "colored_console_log"): ("core", "colored_console_log", "2.0.0"),
188 ("logging", "colored_log_format"): ("core", "colored_log_format", "2.0.0"),
189 ("logging", "colored_formatter_class"): ("core", "colored_formatter_class", "2.0.0"),
190 ("logging", "log_format"): ("core", "log_format", "2.0.0"),
191 ("logging", "simple_log_format"): ("core", "simple_log_format", "2.0.0"),
192 ("logging", "task_log_prefix_template"): ("core", "task_log_prefix_template", "2.0.0"),
193 ("logging", "log_filename_template"): ("core", "log_filename_template", "2.0.0"),
194 ("logging", "log_processor_filename_template"): ("core", "log_processor_filename_template", "2.0.0"),
195 ("logging", "dag_processor_manager_log_location"): (
196 "core",
197 "dag_processor_manager_log_location",
198 "2.0.0",
199 ),
200 ("logging", "task_log_reader"): ("core", "task_log_reader", "2.0.0"),
201 ("metrics", "statsd_on"): ("scheduler", "statsd_on", "2.0.0"),
202 ("metrics", "statsd_host"): ("scheduler", "statsd_host", "2.0.0"),
203 ("metrics", "statsd_port"): ("scheduler", "statsd_port", "2.0.0"),
204 ("metrics", "statsd_prefix"): ("scheduler", "statsd_prefix", "2.0.0"),
205 ("metrics", "statsd_allow_list"): ("scheduler", "statsd_allow_list", "2.0.0"),
206 ("metrics", "stat_name_handler"): ("scheduler", "stat_name_handler", "2.0.0"),
207 ("metrics", "statsd_datadog_enabled"): ("scheduler", "statsd_datadog_enabled", "2.0.0"),
208 ("metrics", "statsd_datadog_tags"): ("scheduler", "statsd_datadog_tags", "2.0.0"),
209 ("metrics", "statsd_custom_client_path"): ("scheduler", "statsd_custom_client_path", "2.0.0"),
210 ("scheduler", "parsing_processes"): ("scheduler", "max_threads", "1.10.14"),
211 ("scheduler", "scheduler_idle_sleep_time"): ("scheduler", "processor_poll_interval", "2.2.0"),
212 ("operators", "default_queue"): ("celery", "default_queue", "2.1.0"),
213 ("core", "hide_sensitive_var_conn_fields"): ("admin", "hide_sensitive_variable_fields", "2.1.0"),
214 ("core", "sensitive_var_conn_names"): ("admin", "sensitive_variable_fields", "2.1.0"),
215 ("core", "default_pool_task_slot_count"): ("core", "non_pooled_task_slot_count", "1.10.4"),
216 ("core", "max_active_tasks_per_dag"): ("core", "dag_concurrency", "2.2.0"),
217 ("logging", "worker_log_server_port"): ("celery", "worker_log_server_port", "2.2.0"),
218 ("api", "access_control_allow_origins"): ("api", "access_control_allow_origin", "2.2.0"),
219 ("api", "auth_backends"): ("api", "auth_backend", "2.3.0"),
220 ("database", "sql_alchemy_conn"): ("core", "sql_alchemy_conn", "2.3.0"),
221 ("database", "sql_engine_encoding"): ("core", "sql_engine_encoding", "2.3.0"),
222 ("database", "sql_engine_collation_for_ids"): ("core", "sql_engine_collation_for_ids", "2.3.0"),
223 ("database", "sql_alchemy_pool_enabled"): ("core", "sql_alchemy_pool_enabled", "2.3.0"),
224 ("database", "sql_alchemy_pool_size"): ("core", "sql_alchemy_pool_size", "2.3.0"),
225 ("database", "sql_alchemy_max_overflow"): ("core", "sql_alchemy_max_overflow", "2.3.0"),
226 ("database", "sql_alchemy_pool_recycle"): ("core", "sql_alchemy_pool_recycle", "2.3.0"),
227 ("database", "sql_alchemy_pool_pre_ping"): ("core", "sql_alchemy_pool_pre_ping", "2.3.0"),
228 ("database", "sql_alchemy_schema"): ("core", "sql_alchemy_schema", "2.3.0"),
229 ("database", "sql_alchemy_connect_args"): ("core", "sql_alchemy_connect_args", "2.3.0"),
230 ("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"),
231 ("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"),
232 ("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"),
233 }
235 # A mapping of new section -> (old section, since_version).
236 deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")}
238 # Now build the inverse so we can go from old_section/old_key to new_section/new_key
239 # if someone tries to retrieve it based on old_section/old_key
240 @cached_property
241 def inversed_deprecated_options(self):
242 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()}
244 @cached_property
245 def inversed_deprecated_sections(self):
246 return {
247 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items()
248 }
250 # A mapping of old default values that we want to change and warn the user
251 # about. Mapping of section -> setting -> { old, replace, by_version }
252 deprecated_values: dict[str, dict[str, tuple[Pattern, str, str]]] = {
253 "core": {
254 "hostname_callable": (re.compile(r":"), r".", "2.1"),
255 },
256 "webserver": {
257 "navbar_color": (re.compile(r"\A#007A87\Z", re.IGNORECASE), "#fff", "2.1"),
258 "dag_default_view": (re.compile(r"^tree$"), "grid", "3.0"),
259 },
260 "email": {
261 "email_backend": (
262 re.compile(r"^airflow\.contrib\.utils\.sendgrid\.send_email$"),
263 r"airflow.providers.sendgrid.utils.emailer.send_email",
264 "2.1",
265 ),
266 },
267 "logging": {
268 "log_filename_template": (
269 re.compile(re.escape("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")),
270 "XX-set-after-default-config-loaded-XX",
271 "3.0",
272 ),
273 },
274 "api": {
275 "auth_backends": (
276 re.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"),
277 "airflow.api.auth.backend.session",
278 "3.0",
279 ),
280 },
281 "elasticsearch": {
282 "log_id_template": (
283 re.compile("^" + re.escape("{dag_id}-{task_id}-{execution_date}-{try_number}") + "$"),
284 "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
285 "3.0",
286 )
287 },
288 }
290 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"]
291 enums_options = {
292 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()),
293 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"],
294 ("core", "mp_start_method"): multiprocessing.get_all_start_methods(),
295 ("scheduler", "file_parsing_sort_mode"): ["modified_time", "random_seeded_by_host", "alphabetical"],
296 ("logging", "logging_level"): _available_logging_levels,
297 ("logging", "fab_logging_level"): _available_logging_levels,
298 # celery_logging_level can be empty, which uses logging_level as fallback
299 ("logging", "celery_logging_level"): _available_logging_levels + [""],
300 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", ""],
301 }
303 upgraded_values: dict[tuple[str, str], str]
304 """Mapping of (section,option) to the old value that was upgraded"""
306 # This method transforms option names on every read, get, or set operation.
307 # This changes from the default behaviour of ConfigParser from lower-casing
308 # to instead be case-preserving
309 def optionxform(self, optionstr: str) -> str:
310 return optionstr
312 def __init__(self, default_config: str | None = None, *args, **kwargs):
313 super().__init__(*args, **kwargs)
314 self.upgraded_values = {}
316 self.airflow_defaults = ConfigParser(*args, **kwargs)
317 if default_config is not None:
318 self.airflow_defaults.read_string(default_config)
319 # Set the upgrade value based on the current loaded default
320 default = self.airflow_defaults.get("logging", "log_filename_template", fallback=None)
321 if default:
322 replacement = self.deprecated_values["logging"]["log_filename_template"]
323 self.deprecated_values["logging"]["log_filename_template"] = (
324 replacement[0],
325 default,
326 replacement[2],
327 )
328 else:
329 # In case of tests it might not exist
330 with suppress(KeyError):
331 del self.deprecated_values["logging"]["log_filename_template"]
332 else:
333 with suppress(KeyError):
334 del self.deprecated_values["logging"]["log_filename_template"]
336 self.is_validated = False
337 self._suppress_future_warnings = False
339 def validate(self):
340 self._validate_config_dependencies()
341 self._validate_enums()
343 for section, replacement in self.deprecated_values.items():
344 for name, info in replacement.items():
345 old, new, version = info
346 current_value = self.get(section, name, fallback="")
347 if self._using_old_value(old, current_value):
348 self.upgraded_values[(section, name)] = current_value
349 new_value = old.sub(new, current_value)
350 self._update_env_var(section=section, name=name, new_value=new_value)
351 self._create_future_warning(
352 name=name,
353 section=section,
354 current_value=current_value,
355 new_value=new_value,
356 version=version,
357 )
359 self._upgrade_auth_backends()
360 self._upgrade_postgres_metastore_conn()
361 self.is_validated = True
363 def _upgrade_auth_backends(self):
364 """
365 Ensure a custom auth_backends setting contains session.
367 This is required by the UI for ajax queries.
368 """
369 old_value = self.get("api", "auth_backends", fallback="")
370 if old_value in ("airflow.api.auth.backend.default", ""):
371 # handled by deprecated_values
372 pass
373 elif old_value.find("airflow.api.auth.backend.session") == -1:
374 new_value = old_value + ",airflow.api.auth.backend.session"
375 self._update_env_var(section="api", name="auth_backends", new_value=new_value)
376 self.upgraded_values[("api", "auth_backends")] = old_value
378 # if the old value is set via env var, we need to wipe it
379 # otherwise, it'll "win" over our adjusted value
380 old_env_var = self._env_var_name("api", "auth_backend")
381 os.environ.pop(old_env_var, None)
383 warnings.warn(
384 "The auth_backends setting in [api] has had airflow.api.auth.backend.session added "
385 "in the running config, which is needed by the UI. Please update your config before "
386 "Apache Airflow 3.0.",
387 FutureWarning,
388 )
390 def _upgrade_postgres_metastore_conn(self):
391 """
392 Upgrade SQL schemas.
394 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres`
395 must be replaced with `postgresql`.
396 """
397 section, key = "database", "sql_alchemy_conn"
398 old_value = self.get(section, key)
399 bad_schemes = ["postgres+psycopg2", "postgres"]
400 good_scheme = "postgresql"
401 parsed = urlsplit(old_value)
402 if parsed.scheme in bad_schemes:
403 warnings.warn(
404 f"Bad scheme in Airflow configuration core > sql_alchemy_conn: `{parsed.scheme}`. "
405 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must "
406 f"change to `{good_scheme}` before the next Airflow release.",
407 FutureWarning,
408 )
409 self.upgraded_values[(section, key)] = old_value
410 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value)
411 self._update_env_var(section=section, name=key, new_value=new_value)
413 # if the old value is set via env var, we need to wipe it
414 # otherwise, it'll "win" over our adjusted value
415 old_env_var = self._env_var_name("core", key)
416 os.environ.pop(old_env_var, None)
418 def _validate_enums(self):
419 """Validate that enum type config has an accepted value."""
420 for (section_key, option_key), enum_options in self.enums_options.items():
421 if self.has_option(section_key, option_key):
422 value = self.get(section_key, option_key)
423 if value not in enum_options:
424 raise AirflowConfigException(
425 f"`[{section_key}] {option_key}` should not be "
426 f"{value!r}. Possible values: {', '.join(enum_options)}."
427 )
429 def _validate_config_dependencies(self):
430 """
431 Validate that config based on condition.
433 Values are considered invalid when they conflict with other config values
434 or system-level limitations and requirements.
435 """
436 is_executor_without_sqlite_support = self.get("core", "executor") not in (
437 "DebugExecutor",
438 "SequentialExecutor",
439 )
440 is_sqlite = "sqlite" in self.get("database", "sql_alchemy_conn")
441 if is_sqlite and is_executor_without_sqlite_support:
442 raise AirflowConfigException(f"error: cannot use sqlite with the {self.get('core', 'executor')}")
443 if is_sqlite:
444 import sqlite3
446 from airflow.utils.docs import get_docs_url
448 # Some features in storing rendered fields require sqlite version >= 3.15.0
449 min_sqlite_version = (3, 15, 0)
450 if _parse_sqlite_version(sqlite3.sqlite_version) < min_sqlite_version:
451 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version)
452 raise AirflowConfigException(
453 f"error: sqlite C library version too old (< {min_sqlite_version_str}). "
454 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}"
455 )
457 def _using_old_value(self, old: Pattern, current_value: str) -> bool:
458 return old.search(current_value) is not None
460 def _update_env_var(self, section: str, name: str, new_value: str):
461 env_var = self._env_var_name(section, name)
462 # Set it as an env var so that any subprocesses keep the same override!
463 os.environ[env_var] = new_value
465 @staticmethod
466 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any, version: str):
467 warnings.warn(
468 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. "
469 f"This value has been changed to {new_value!r} in the running config, but "
470 f"please update your config before Apache Airflow {version}.",
471 FutureWarning,
472 )
474 def _env_var_name(self, section: str, key: str) -> str:
475 return f"{ENV_VAR_PREFIX}{section.upper()}__{key.upper()}"
477 def _get_env_var_option(self, section: str, key: str):
478 # must have format AIRFLOW__{SECTION}__{KEY} (note double underscore)
479 env_var = self._env_var_name(section, key)
480 if env_var in os.environ:
481 return expand_env_var(os.environ[env_var])
482 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
483 env_var_cmd = env_var + "_CMD"
484 if env_var_cmd in os.environ:
485 # if this is a valid command key...
486 if (section, key) in self.sensitive_config_values:
487 return run_command(os.environ[env_var_cmd])
488 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
489 env_var_secret_path = env_var + "_SECRET"
490 if env_var_secret_path in os.environ:
491 # if this is a valid secret path...
492 if (section, key) in self.sensitive_config_values:
493 return _get_config_value_from_secret_backend(os.environ[env_var_secret_path])
494 return None
496 def _get_cmd_option(self, section: str, key: str):
497 fallback_key = key + "_cmd"
498 if (section, key) in self.sensitive_config_values:
499 if super().has_option(section, fallback_key):
500 command = super().get(section, fallback_key)
501 return run_command(command)
502 return None
504 def _get_cmd_option_from_config_sources(
505 self, config_sources: ConfigSourcesType, section: str, key: str
506 ) -> str | None:
507 fallback_key = key + "_cmd"
508 if (section, key) in self.sensitive_config_values:
509 section_dict = config_sources.get(section)
510 if section_dict is not None:
511 command_value = section_dict.get(fallback_key)
512 if command_value is not None:
513 if isinstance(command_value, str):
514 command = command_value
515 else:
516 command = command_value[0]
517 return run_command(command)
518 return None
520 def _get_secret_option(self, section: str, key: str) -> str | None:
521 """Get Config option values from Secret Backend."""
522 fallback_key = key + "_secret"
523 if (section, key) in self.sensitive_config_values:
524 if super().has_option(section, fallback_key):
525 secrets_path = super().get(section, fallback_key)
526 return _get_config_value_from_secret_backend(secrets_path)
527 return None
529 def _get_secret_option_from_config_sources(
530 self, config_sources: ConfigSourcesType, section: str, key: str
531 ) -> str | None:
532 fallback_key = key + "_secret"
533 if (section, key) in self.sensitive_config_values:
534 section_dict = config_sources.get(section)
535 if section_dict is not None:
536 secrets_path_value = section_dict.get(fallback_key)
537 if secrets_path_value is not None:
538 if isinstance(secrets_path_value, str):
539 secrets_path = secrets_path_value
540 else:
541 secrets_path = secrets_path_value[0]
542 return _get_config_value_from_secret_backend(secrets_path)
543 return None
545 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
546 value = self.get(section, key, **kwargs)
547 if value is None:
548 raise ValueError(f"The value {section}/{key} should be set!")
549 return value
551 @overload # type: ignore[override]
552 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: # type: ignore[override]
554 ...
556 @overload # type: ignore[override]
557 def get(self, section: str, key: str, **kwargs) -> str | None: # type: ignore[override]
559 ...
561 def get(self, section: str, key: str, **kwargs) -> str | None: # type: ignore[override, misc]
562 section = str(section).lower()
563 key = str(key).lower()
564 warning_emitted = False
565 deprecated_section: str | None
566 deprecated_key: str | None
568 # For when we rename whole sections
569 if section in self.inversed_deprecated_sections:
570 deprecated_section, deprecated_key = (section, key)
571 section = self.inversed_deprecated_sections[section]
572 if not self._suppress_future_warnings:
573 warnings.warn(
574 f"The config section [{deprecated_section}] has been renamed to "
575 f"[{section}]. Please update your `conf.get*` call to use the new name",
576 FutureWarning,
577 stacklevel=2,
578 )
579 # Don't warn about individual rename if the whole section is renamed
580 warning_emitted = True
581 elif (section, key) in self.inversed_deprecated_options:
582 # Handle using deprecated section/key instead of the new section/key
583 new_section, new_key = self.inversed_deprecated_options[(section, key)]
584 if not self._suppress_future_warnings and not warning_emitted:
585 warnings.warn(
586 f"section/key [{section}/{key}] has been deprecated, you should use"
587 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
588 "new name",
589 FutureWarning,
590 stacklevel=2,
591 )
592 warning_emitted = True
593 deprecated_section, deprecated_key = section, key
594 section, key = (new_section, new_key)
595 elif section in self.deprecated_sections:
596 # When accessing the new section name, make sure we check under the old config name
597 deprecated_key = key
598 deprecated_section = self.deprecated_sections[section][0]
599 else:
600 deprecated_section, deprecated_key, _ = self.deprecated_options.get(
601 (section, key), (None, None, None)
602 )
604 # first check environment variables
605 option = self._get_environment_variables(
606 deprecated_key, deprecated_section, key, section, issue_warning=not warning_emitted
607 )
608 if option is not None:
609 return option
611 # ...then the config file
612 option = self._get_option_from_config_file(
613 deprecated_key, deprecated_section, key, kwargs, section, issue_warning=not warning_emitted
614 )
615 if option is not None:
616 return option
618 # ...then commands
619 option = self._get_option_from_commands(
620 deprecated_key, deprecated_section, key, section, issue_warning=not warning_emitted
621 )
622 if option is not None:
623 return option
625 # ...then from secret backends
626 option = self._get_option_from_secrets(
627 deprecated_key, deprecated_section, key, section, issue_warning=not warning_emitted
628 )
629 if option is not None:
630 return option
632 # ...then the default config
633 if self.airflow_defaults.has_option(section, key) or "fallback" in kwargs:
634 return expand_env_var(self.airflow_defaults.get(section, key, **kwargs))
636 log.warning("section/key [%s/%s] not found in config", section, key)
638 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
640 def _get_option_from_secrets(
641 self,
642 deprecated_key: str | None,
643 deprecated_section: str | None,
644 key: str,
645 section: str,
646 issue_warning: bool = True,
647 ) -> str | None:
648 option = self._get_secret_option(section, key)
649 if option:
650 return option
651 if deprecated_section and deprecated_key:
652 with self.suppress_future_warnings():
653 option = self._get_secret_option(deprecated_section, deprecated_key)
654 if option:
655 if issue_warning:
656 self._warn_deprecate(section, key, deprecated_section, deprecated_key)
657 return option
658 return None
660 def _get_option_from_commands(
661 self,
662 deprecated_key: str | None,
663 deprecated_section: str | None,
664 key: str,
665 section: str,
666 issue_warning: bool = True,
667 ) -> str | None:
668 option = self._get_cmd_option(section, key)
669 if option:
670 return option
671 if deprecated_section and deprecated_key:
672 with self.suppress_future_warnings():
673 option = self._get_cmd_option(deprecated_section, deprecated_key)
674 if option:
675 if issue_warning:
676 self._warn_deprecate(section, key, deprecated_section, deprecated_key)
677 return option
678 return None
680 def _get_option_from_config_file(
681 self,
682 deprecated_key: str | None,
683 deprecated_section: str | None,
684 key: str,
685 kwargs: dict[str, Any],
686 section: str,
687 issue_warning: bool = True,
688 ) -> str | None:
689 if super().has_option(section, key):
690 # Use the parent's methods to get the actual config here to be able to
691 # separate the config from default config.
692 return expand_env_var(super().get(section, key, **kwargs))
693 if deprecated_section and deprecated_key:
694 if super().has_option(deprecated_section, deprecated_key):
695 if issue_warning:
696 self._warn_deprecate(section, key, deprecated_section, deprecated_key)
697 with self.suppress_future_warnings():
698 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
699 return None
701 def _get_environment_variables(
702 self,
703 deprecated_key: str | None,
704 deprecated_section: str | None,
705 key: str,
706 section: str,
707 issue_warning: bool = True,
708 ) -> str | None:
709 option = self._get_env_var_option(section, key)
710 if option is not None:
711 return option
712 if deprecated_section and deprecated_key:
713 with self.suppress_future_warnings():
714 option = self._get_env_var_option(deprecated_section, deprecated_key)
715 if option is not None:
716 if issue_warning:
717 self._warn_deprecate(section, key, deprecated_section, deprecated_key)
718 return option
719 return None
721 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override]
722 val = str(self.get(section, key, **kwargs)).lower().strip()
723 if "#" in val:
724 val = val.split("#")[0].strip()
725 if val in ("t", "true", "1"):
726 return True
727 elif val in ("f", "false", "0"):
728 return False
729 else:
730 raise AirflowConfigException(
731 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. '
732 f'Current value: "{val}".'
733 )
735 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override]
736 val = self.get(section, key, **kwargs)
737 if val is None:
738 raise AirflowConfigException(
739 f"Failed to convert value None to int. "
740 f'Please check "{key}" key in "{section}" section is set.'
741 )
742 try:
743 return int(val)
744 except ValueError:
745 raise AirflowConfigException(
746 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
747 f'Current value: "{val}".'
748 )
750 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override]
751 val = self.get(section, key, **kwargs)
752 if val is None:
753 raise AirflowConfigException(
754 f"Failed to convert value None to float. "
755 f'Please check "{key}" key in "{section}" section is set.'
756 )
757 try:
758 return float(val)
759 except ValueError:
760 raise AirflowConfigException(
761 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. '
762 f'Current value: "{val}".'
763 )
765 def getimport(self, section: str, key: str, **kwargs) -> Any:
766 """
767 Reads options, imports the full qualified name, and returns the object.
769 In case of failure, it throws an exception with the key and section names
771 :return: The object or None, if the option is empty
772 """
773 full_qualified_path = conf.get(section=section, key=key, **kwargs)
774 if not full_qualified_path:
775 return None
777 try:
778 return import_string(full_qualified_path)
779 except ImportError as e:
780 log.error(e)
781 raise AirflowConfigException(
782 f'The object could not be loaded. Please check "{key}" key in "{section}" section. '
783 f'Current value: "{full_qualified_path}".'
784 )
786 def getjson(
787 self, section: str, key: str, fallback=_UNSET, **kwargs
788 ) -> dict | list | str | int | float | None:
789 """
790 Return a config value parsed from a JSON string.
792 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given.
793 """
794 # get always returns the fallback value as a string, so for this if
795 # someone gives us an object we want to keep that
796 default = _UNSET
797 if fallback is not _UNSET:
798 default = fallback
799 fallback = _UNSET
801 try:
802 data = self.get(section=section, key=key, fallback=fallback, **kwargs)
803 except (NoSectionError, NoOptionError):
804 return default
806 if not data:
807 return default if default is not _UNSET else None
809 try:
810 return json.loads(data)
811 except JSONDecodeError as e:
812 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e
814 def gettimedelta(
815 self, section: str, key: str, fallback: Any = None, **kwargs
816 ) -> datetime.timedelta | None:
817 """
818 Gets the config value for the given section and key, and converts it into datetime.timedelta object.
820 If the key is missing, then it is considered as `None`.
822 :param section: the section from the config
823 :param key: the key defined in the given section
824 :param fallback: fallback value when no config value is given, defaults to None
825 :raises AirflowConfigException: raised because ValueError or OverflowError
826 :return: datetime.timedelta(seconds=<config_value>) or None
827 """
828 val = self.get(section, key, fallback=fallback, **kwargs)
830 if val:
831 # the given value must be convertible to integer
832 try:
833 int_val = int(val)
834 except ValueError:
835 raise AirflowConfigException(
836 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
837 f'Current value: "{val}".'
838 )
840 try:
841 return datetime.timedelta(seconds=int_val)
842 except OverflowError as err:
843 raise AirflowConfigException(
844 f"Failed to convert value to timedelta in `seconds`. "
845 f"{err}. "
846 f'Please check "{key}" key in "{section}" section. Current value: "{val}".'
847 )
849 return fallback
851 def read(
852 self,
853 filenames: (str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike]),
854 encoding=None,
855 ):
856 super().read(filenames=filenames, encoding=encoding)
858 # The RawConfigParser defines "Mapping" from abc.collections is not subscriptable - so we have
859 # to use Dict here.
860 def read_dict( # type: ignore[override]
861 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>"
862 ):
863 super().read_dict(dictionary=dictionary, source=source)
865 def has_option(self, section: str, option: str) -> bool:
866 try:
867 # Using self.get() to avoid reimplementing the priority order
868 # of config variables (env, config, cmd, defaults)
869 # UNSET to avoid logging a warning about missing values
870 self.get(section, option, fallback=_UNSET)
871 return True
872 except (NoOptionError, NoSectionError):
873 return False
875 def remove_option(self, section: str, option: str, remove_default: bool = True):
876 """
877 Remove an option if it exists in config from a file or default config.
879 If both of config have the same option, this removes the option
880 in both configs unless remove_default=False.
881 """
882 if super().has_option(section, option):
883 super().remove_option(section, option)
885 if self.airflow_defaults.has_option(section, option) and remove_default:
886 self.airflow_defaults.remove_option(section, option)
888 def getsection(self, section: str) -> ConfigOptionsDictType | None:
889 """
890 Returns the section as a dict.
892 Values are converted to int, float, bool as required.
894 :param section: section from the config
895 """
896 if not self.has_section(section) and not self.airflow_defaults.has_section(section):
897 return None
898 if self.airflow_defaults.has_section(section):
899 _section: ConfigOptionsDictType = OrderedDict(self.airflow_defaults.items(section))
900 else:
901 _section = OrderedDict()
903 if self.has_section(section):
904 _section.update(OrderedDict(self.items(section)))
906 section_prefix = self._env_var_name(section, "")
907 for env_var in sorted(os.environ.keys()):
908 if env_var.startswith(section_prefix):
909 key = env_var.replace(section_prefix, "")
910 if key.endswith("_CMD"):
911 key = key[:-4]
912 key = key.lower()
913 _section[key] = self._get_env_var_option(section, key)
915 for key, val in _section.items():
916 if val is None:
917 raise AirflowConfigException(
918 f"Failed to convert value automatically. "
919 f'Please check "{key}" key in "{section}" section is set.'
920 )
921 try:
922 _section[key] = int(val)
923 except ValueError:
924 try:
925 _section[key] = float(val)
926 except ValueError:
927 if isinstance(val, str) and val.lower() in ("t", "true"):
928 _section[key] = True
929 elif isinstance(val, str) and val.lower() in ("f", "false"):
930 _section[key] = False
931 return _section
933 def write( # type: ignore[override]
934 self, fp: IO, space_around_delimiters: bool = True, section: str | None = None
935 ) -> None:
936 # This is based on the configparser.RawConfigParser.write method code to add support for
937 # reading options from environment variables.
938 # Various type ignores below deal with less-than-perfect RawConfigParser superclass typing
939 if space_around_delimiters:
940 delimiter = f" {self._delimiters[0]} " # type: ignore[attr-defined]
941 else:
942 delimiter = self._delimiters[0] # type: ignore[attr-defined]
943 if self._defaults: # type: ignore
944 self._write_section( # type: ignore[attr-defined]
945 fp, self.default_section, self._defaults.items(), delimiter # type: ignore[attr-defined]
946 )
947 sections = (
948 {section: dict(self.getsection(section))} # type: ignore[arg-type]
949 if section
950 else self._sections # type: ignore[attr-defined]
951 )
952 for sect in sections:
953 item_section: ConfigOptionsDictType = self.getsection(sect) # type: ignore[assignment]
954 self._write_section(fp, sect, item_section.items(), delimiter) # type: ignore[attr-defined]
956 def as_dict(
957 self,
958 display_source: bool = False,
959 display_sensitive: bool = False,
960 raw: bool = False,
961 include_env: bool = True,
962 include_cmds: bool = True,
963 include_secret: bool = True,
964 ) -> ConfigSourcesType:
965 """
966 Returns the current configuration as an OrderedDict of OrderedDicts.
968 When materializing current configuration Airflow defaults are
969 materialized along with user set configs. If any of the `include_*`
970 options are False then the result of calling command or secret key
971 configs do not override Airflow defaults and instead are passed through.
972 In order to then avoid Airflow defaults from overwriting user set
973 command or secret key configs we filter out bare sensitive_config_values
974 that are set to Airflow defaults when command or secret key configs
975 produce different values.
977 :param display_source: If False, the option value is returned. If True,
978 a tuple of (option_value, source) is returned. Source is either
979 'airflow.cfg', 'default', 'env var', or 'cmd'.
980 :param display_sensitive: If True, the values of options set by env
981 vars and bash commands will be displayed. If False, those options
982 are shown as '< hidden >'
983 :param raw: Should the values be output as interpolated values, or the
984 "raw" form that can be fed back in to ConfigParser
985 :param include_env: Should the value of configuration from AIRFLOW__
986 environment variables be included or not
987 :param include_cmds: Should the result of calling any *_cmd config be
988 set (True, default), or should the _cmd options be left as the
989 command to run (False)
990 :param include_secret: Should the result of calling any *_secret config be
991 set (True, default), or should the _secret options be left as the
992 path to get the secret from (False)
993 :return: Dictionary, where the key is the name of the section and the content is
994 the dictionary with the name of the parameter and its value.
995 """
996 if not display_sensitive:
997 # We want to hide the sensitive values at the appropriate methods
998 # since envs from cmds, secrets can be read at _include_envs method
999 if not all([include_env, include_cmds, include_secret]):
1000 raise ValueError(
1001 "If display_sensitive is false, then include_env, "
1002 "include_cmds, include_secret must all be set as True"
1003 )
1005 config_sources: ConfigSourcesType = {}
1006 configs = [
1007 ("default", self.airflow_defaults),
1008 ("airflow.cfg", self),
1009 ]
1011 self._replace_config_with_display_sources(
1012 config_sources,
1013 configs,
1014 display_source,
1015 raw,
1016 self.deprecated_options,
1017 include_cmds=include_cmds,
1018 include_env=include_env,
1019 include_secret=include_secret,
1020 )
1022 # add env vars and overwrite because they have priority
1023 if include_env:
1024 self._include_envs(config_sources, display_sensitive, display_source, raw)
1025 else:
1026 self._filter_by_source(config_sources, display_source, self._get_env_var_option)
1028 # add bash commands
1029 if include_cmds:
1030 self._include_commands(config_sources, display_sensitive, display_source, raw)
1031 else:
1032 self._filter_by_source(config_sources, display_source, self._get_cmd_option)
1034 # add config from secret backends
1035 if include_secret:
1036 self._include_secrets(config_sources, display_sensitive, display_source, raw)
1037 else:
1038 self._filter_by_source(config_sources, display_source, self._get_secret_option)
1040 if not display_sensitive:
1041 # This ensures the ones from config file is hidden too
1042 # if they are not provided through env, cmd and secret
1043 hidden = "< hidden >"
1044 for (section, key) in self.sensitive_config_values:
1045 if not config_sources.get(section):
1046 continue
1047 if config_sources[section].get(key, None):
1048 if display_source:
1049 source = config_sources[section][key][1]
1050 config_sources[section][key] = (hidden, source)
1051 else:
1052 config_sources[section][key] = hidden
1054 return config_sources
1056 def _include_secrets(
1057 self,
1058 config_sources: ConfigSourcesType,
1059 display_sensitive: bool,
1060 display_source: bool,
1061 raw: bool,
1062 ):
1063 for (section, key) in self.sensitive_config_values:
1064 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key)
1065 if value:
1066 if not display_sensitive:
1067 value = "< hidden >"
1068 if display_source:
1069 opt: str | tuple[str, str] = (value, "secret")
1070 elif raw:
1071 opt = value.replace("%", "%%")
1072 else:
1073 opt = value
1074 config_sources.setdefault(section, OrderedDict()).update({key: opt})
1075 del config_sources[section][key + "_secret"]
1077 def _include_commands(
1078 self,
1079 config_sources: ConfigSourcesType,
1080 display_sensitive: bool,
1081 display_source: bool,
1082 raw: bool,
1083 ):
1084 for (section, key) in self.sensitive_config_values:
1085 opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
1086 if not opt:
1087 continue
1088 opt_to_set: str | tuple[str, str] | None = opt
1089 if not display_sensitive:
1090 opt_to_set = "< hidden >"
1091 if display_source:
1092 opt_to_set = (str(opt_to_set), "cmd")
1093 elif raw:
1094 opt_to_set = str(opt_to_set).replace("%", "%%")
1095 if opt_to_set is not None:
1096 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set}
1097 config_sources.setdefault(section, OrderedDict()).update(dict_to_update)
1098 del config_sources[section][key + "_cmd"]
1100 def _include_envs(
1101 self,
1102 config_sources: ConfigSourcesType,
1103 display_sensitive: bool,
1104 display_source: bool,
1105 raw: bool,
1106 ):
1107 for env_var in [
1108 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
1109 ]:
1110 try:
1111 _, section, key = env_var.split("__", 2)
1112 opt = self._get_env_var_option(section, key)
1113 except ValueError:
1114 continue
1115 if opt is None:
1116 log.warning("Ignoring unknown env var '%s'", env_var)
1117 continue
1118 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"):
1119 # Don't hide cmd/secret values here
1120 if not env_var.lower().endswith("cmd") and not env_var.lower().endswith("secret"):
1121 opt = "< hidden >"
1123 elif raw:
1124 opt = opt.replace("%", "%%")
1125 if display_source:
1126 opt = (opt, "env var")
1128 section = section.lower()
1129 # if we lower key for kubernetes_environment_variables section,
1130 # then we won't be able to set any Airflow environment
1131 # variables. Airflow only parse environment variables starts
1132 # with AIRFLOW_. Therefore, we need to make it a special case.
1133 if section != "kubernetes_environment_variables":
1134 key = key.lower()
1135 config_sources.setdefault(section, OrderedDict()).update({key: opt})
1137 def _filter_by_source(
1138 self,
1139 config_sources: ConfigSourcesType,
1140 display_source: bool,
1141 getter_func,
1142 ):
1143 """
1144 Deletes default configs from current configuration.
1146 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values.
1148 This is necessary because bare configs take precedence over the command
1149 or secret key equivalents so if the current running config is
1150 materialized with Airflow defaults they in turn override user set
1151 command or secret key configs.
1153 :param config_sources: The current configuration to operate on
1154 :param display_source: If False, configuration options contain raw
1155 values. If True, options are a tuple of (option_value, source).
1156 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
1157 :param getter_func: A callback function that gets the user configured
1158 override value for a particular sensitive_config_values config.
1159 :return: None, the given config_sources is filtered if necessary,
1160 otherwise untouched.
1161 """
1162 for (section, key) in self.sensitive_config_values:
1163 # Don't bother if we don't have section / key
1164 if section not in config_sources or key not in config_sources[section]:
1165 continue
1166 # Check that there is something to override defaults
1167 try:
1168 getter_opt = getter_func(section, key)
1169 except ValueError:
1170 continue
1171 if not getter_opt:
1172 continue
1173 # Check to see that there is a default value
1174 if not self.airflow_defaults.has_option(section, key):
1175 continue
1176 # Check to see if bare setting is the same as defaults
1177 if display_source:
1178 # when display_source = true, we know that the config_sources contains tuple
1179 opt, source = config_sources[section][key] # type: ignore
1180 else:
1181 opt = config_sources[section][key]
1182 if opt == self.airflow_defaults.get(section, key):
1183 del config_sources[section][key]
1185 @staticmethod
1186 def _replace_config_with_display_sources(
1187 config_sources: ConfigSourcesType,
1188 configs: Iterable[tuple[str, ConfigParser]],
1189 display_source: bool,
1190 raw: bool,
1191 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
1192 include_env: bool,
1193 include_cmds: bool,
1194 include_secret: bool,
1195 ):
1196 for (source_name, config) in configs:
1197 for section in config.sections():
1198 AirflowConfigParser._replace_section_config_with_display_sources(
1199 config,
1200 config_sources,
1201 display_source,
1202 raw,
1203 section,
1204 source_name,
1205 deprecated_options,
1206 configs,
1207 include_env=include_env,
1208 include_cmds=include_cmds,
1209 include_secret=include_secret,
1210 )
1212 @staticmethod
1213 def _deprecated_value_is_set_in_config(
1214 deprecated_section: str,
1215 deprecated_key: str,
1216 configs: Iterable[tuple[str, ConfigParser]],
1217 ) -> bool:
1218 for config_type, config in configs:
1219 if config_type == "default":
1220 continue
1221 try:
1222 deprecated_section_array = config.items(section=deprecated_section, raw=True)
1223 for (key_candidate, _) in deprecated_section_array:
1224 if key_candidate == deprecated_key:
1225 return True
1226 except NoSectionError:
1227 pass
1228 return False
1230 @staticmethod
1231 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
1232 return (
1233 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}")
1234 is not None
1235 )
1237 @staticmethod
1238 def _deprecated_command_is_set_in_config(
1239 deprecated_section: str, deprecated_key: str, configs: Iterable[tuple[str, ConfigParser]]
1240 ) -> bool:
1241 return AirflowConfigParser._deprecated_value_is_set_in_config(
1242 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
1243 )
1245 @staticmethod
1246 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
1247 return (
1248 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD")
1249 is not None
1250 )
1252 @staticmethod
1253 def _deprecated_secret_is_set_in_config(
1254 deprecated_section: str, deprecated_key: str, configs: Iterable[tuple[str, ConfigParser]]
1255 ) -> bool:
1256 return AirflowConfigParser._deprecated_value_is_set_in_config(
1257 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
1258 )
1260 @staticmethod
1261 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
1262 return (
1263 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET")
1264 is not None
1265 )
1267 @contextmanager
1268 def suppress_future_warnings(self):
1269 suppress_future_warnings = self._suppress_future_warnings
1270 self._suppress_future_warnings = True
1271 yield self
1272 self._suppress_future_warnings = suppress_future_warnings
1274 @staticmethod
1275 def _replace_section_config_with_display_sources(
1276 config: ConfigParser,
1277 config_sources: ConfigSourcesType,
1278 display_source: bool,
1279 raw: bool,
1280 section: str,
1281 source_name: str,
1282 deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
1283 configs: Iterable[tuple[str, ConfigParser]],
1284 include_env: bool,
1285 include_cmds: bool,
1286 include_secret: bool,
1287 ):
1288 sect = config_sources.setdefault(section, OrderedDict())
1289 if isinstance(config, AirflowConfigParser):
1290 with config.suppress_future_warnings():
1291 items = config.items(section=section, raw=raw)
1292 else:
1293 items = config.items(section=section, raw=raw)
1294 for k, val in items:
1295 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
1296 if deprecated_section and deprecated_key:
1297 if source_name == "default":
1298 # If deprecated entry has some non-default value set for any of the sources requested,
1299 # We should NOT set default for the new entry (because it will override anything
1300 # coming from the deprecated ones)
1301 if AirflowConfigParser._deprecated_value_is_set_in_config(
1302 deprecated_section, deprecated_key, configs
1303 ):
1304 continue
1305 if include_env and AirflowConfigParser._deprecated_variable_is_set(
1306 deprecated_section, deprecated_key
1307 ):
1308 continue
1309 if include_cmds and (
1310 AirflowConfigParser._deprecated_variable_command_is_set(
1311 deprecated_section, deprecated_key
1312 )
1313 or AirflowConfigParser._deprecated_command_is_set_in_config(
1314 deprecated_section, deprecated_key, configs
1315 )
1316 ):
1317 continue
1318 if include_secret and (
1319 AirflowConfigParser._deprecated_variable_secret_is_set(
1320 deprecated_section, deprecated_key
1321 )
1322 or AirflowConfigParser._deprecated_secret_is_set_in_config(
1323 deprecated_section, deprecated_key, configs
1324 )
1325 ):
1326 continue
1327 if display_source:
1328 sect[k] = (val, source_name)
1329 else:
1330 sect[k] = val
1332 def load_test_config(self):
1333 """
1334 Load the unit test configuration.
1336 Note: this is not reversible.
1337 """
1338 # remove all sections, falling back to defaults
1339 for section in self.sections():
1340 self.remove_section(section)
1342 # then read test config
1344 path = _default_config_file_path("default_test.cfg")
1345 log.info("Reading default test configuration from %s", path)
1346 self.read_string(_parameterized_config_from_template("default_test.cfg"))
1347 # then read any "custom" test settings
1348 log.info("Reading test configuration from %s", TEST_CONFIG_FILE)
1349 self.read(TEST_CONFIG_FILE)
1351 @staticmethod
1352 def _warn_deprecate(section: str, key: str, deprecated_section: str, deprecated_name: str):
1353 if section == deprecated_section:
1354 warnings.warn(
1355 f"The {deprecated_name} option in [{section}] has been renamed to {key} - "
1356 f"the old setting has been used, but please update your config.",
1357 DeprecationWarning,
1358 stacklevel=4,
1359 )
1360 else:
1361 warnings.warn(
1362 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option "
1363 f"in [{section}] - the old setting has been used, but please update your config.",
1364 DeprecationWarning,
1365 stacklevel=4,
1366 )
1368 def __getstate__(self):
1369 return {
1370 name: getattr(self, name)
1371 for name in [
1372 "_sections",
1373 "is_validated",
1374 "airflow_defaults",
1375 ]
1376 }
1378 def __setstate__(self, state):
1379 self.__init__()
1380 config = state.pop("_sections")
1381 self.read_dict(config)
1382 self.__dict__.update(state)
1385def get_airflow_home() -> str:
1386 """Get path to Airflow Home."""
1387 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow"))
1390def get_airflow_config(airflow_home) -> str:
1391 """Get Path to airflow.cfg path."""
1392 airflow_config_var = os.environ.get("AIRFLOW_CONFIG")
1393 if airflow_config_var is None:
1394 return os.path.join(airflow_home, "airflow.cfg")
1395 return expand_env_var(airflow_config_var)
1398def _parameterized_config_from_template(filename) -> str:
1399 TEMPLATE_START = "# ----------------------- TEMPLATE BEGINS HERE -----------------------\n"
1401 path = _default_config_file_path(filename)
1402 with open(path) as fh:
1403 for line in fh:
1404 if line != TEMPLATE_START:
1405 continue
1406 return parameterized_config(fh.read().strip())
1407 raise RuntimeError(f"Template marker not found in {path!r}")
1410def parameterized_config(template) -> str:
1411 """
1412 Generates configuration from provided template & variables defined in current scope.
1414 :param template: a config content templated with {{variables}}
1415 """
1416 all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()}
1417 return template.format(**all_vars)
1420def get_airflow_test_config(airflow_home) -> str:
1421 """Get path to unittests.cfg."""
1422 if "AIRFLOW_TEST_CONFIG" not in os.environ:
1423 return os.path.join(airflow_home, "unittests.cfg")
1424 # It will never return None
1425 return expand_env_var(os.environ["AIRFLOW_TEST_CONFIG"]) # type: ignore[return-value]
1428def _generate_fernet_key() -> str:
1429 from cryptography.fernet import Fernet
1431 return Fernet.generate_key().decode()
1434def initialize_config() -> AirflowConfigParser:
1435 """
1436 Load the Airflow config files.
1438 Called for you automatically as part of the Airflow boot process.
1439 """
1440 global FERNET_KEY, AIRFLOW_HOME
1442 default_config = _parameterized_config_from_template("default_airflow.cfg")
1444 local_conf = AirflowConfigParser(default_config=default_config)
1446 if local_conf.getboolean("core", "unit_test_mode"):
1447 # Load test config only
1448 if not os.path.isfile(TEST_CONFIG_FILE):
1449 from cryptography.fernet import Fernet
1451 log.info("Creating new Airflow config file for unit tests in: %s", TEST_CONFIG_FILE)
1452 pathlib.Path(AIRFLOW_HOME).mkdir(parents=True, exist_ok=True)
1454 FERNET_KEY = Fernet.generate_key().decode()
1456 with open(TEST_CONFIG_FILE, "w") as file:
1457 cfg = _parameterized_config_from_template("default_test.cfg")
1458 file.write(cfg)
1460 local_conf.load_test_config()
1461 else:
1462 # Load normal config
1463 if not os.path.isfile(AIRFLOW_CONFIG):
1464 from cryptography.fernet import Fernet
1466 log.info("Creating new Airflow config file in: %s", AIRFLOW_CONFIG)
1467 pathlib.Path(AIRFLOW_HOME).mkdir(parents=True, exist_ok=True)
1469 FERNET_KEY = Fernet.generate_key().decode()
1471 with open(AIRFLOW_CONFIG, "w") as file:
1472 file.write(default_config)
1474 log.info("Reading the config from %s", AIRFLOW_CONFIG)
1476 local_conf.read(AIRFLOW_CONFIG)
1478 if local_conf.has_option("core", "AIRFLOW_HOME"):
1479 msg = (
1480 "Specifying both AIRFLOW_HOME environment variable and airflow_home "
1481 "in the config file is deprecated. Please use only the AIRFLOW_HOME "
1482 "environment variable and remove the config file entry."
1483 )
1484 if "AIRFLOW_HOME" in os.environ:
1485 warnings.warn(msg, category=DeprecationWarning)
1486 elif local_conf.get("core", "airflow_home") == AIRFLOW_HOME:
1487 warnings.warn(
1488 "Specifying airflow_home in the config file is deprecated. As you "
1489 "have left it at the default value you should remove the setting "
1490 "from your airflow.cfg and suffer no change in behaviour.",
1491 category=DeprecationWarning,
1492 )
1493 else:
1494 # there
1495 AIRFLOW_HOME = local_conf.get("core", "airflow_home") # type: ignore[assignment]
1496 warnings.warn(msg, category=DeprecationWarning)
1498 # They _might_ have set unit_test_mode in the airflow.cfg, we still
1499 # want to respect that and then load the unittests.cfg
1500 if local_conf.getboolean("core", "unit_test_mode"):
1501 local_conf.load_test_config()
1503 # Make it no longer a proxy variable, just set it to an actual string
1504 global WEBSERVER_CONFIG
1505 WEBSERVER_CONFIG = AIRFLOW_HOME + "/webserver_config.py"
1507 if not os.path.isfile(WEBSERVER_CONFIG):
1508 import shutil
1510 log.info("Creating new FAB webserver config file in: %s", WEBSERVER_CONFIG)
1511 shutil.copy(_default_config_file_path("default_webserver_config.py"), WEBSERVER_CONFIG)
1512 return local_conf
1515# Historical convenience functions to access config entries
1516def load_test_config():
1517 """Historical load_test_config."""
1518 warnings.warn(
1519 "Accessing configuration method 'load_test_config' directly from the configuration module is "
1520 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1521 "'conf.load_test_config'",
1522 DeprecationWarning,
1523 stacklevel=2,
1524 )
1525 conf.load_test_config()
1528def get(*args, **kwargs) -> ConfigType | None:
1529 """Historical get."""
1530 warnings.warn(
1531 "Accessing configuration method 'get' directly from the configuration module is "
1532 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1533 "'conf.get'",
1534 DeprecationWarning,
1535 stacklevel=2,
1536 )
1537 return conf.get(*args, **kwargs)
1540def getboolean(*args, **kwargs) -> bool:
1541 """Historical getboolean."""
1542 warnings.warn(
1543 "Accessing configuration method 'getboolean' directly from the configuration module is "
1544 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1545 "'conf.getboolean'",
1546 DeprecationWarning,
1547 stacklevel=2,
1548 )
1549 return conf.getboolean(*args, **kwargs)
1552def getfloat(*args, **kwargs) -> float:
1553 """Historical getfloat."""
1554 warnings.warn(
1555 "Accessing configuration method 'getfloat' directly from the configuration module is "
1556 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1557 "'conf.getfloat'",
1558 DeprecationWarning,
1559 stacklevel=2,
1560 )
1561 return conf.getfloat(*args, **kwargs)
1564def getint(*args, **kwargs) -> int:
1565 """Historical getint."""
1566 warnings.warn(
1567 "Accessing configuration method 'getint' directly from the configuration module is "
1568 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1569 "'conf.getint'",
1570 DeprecationWarning,
1571 stacklevel=2,
1572 )
1573 return conf.getint(*args, **kwargs)
1576def getsection(*args, **kwargs) -> ConfigOptionsDictType | None:
1577 """Historical getsection."""
1578 warnings.warn(
1579 "Accessing configuration method 'getsection' directly from the configuration module is "
1580 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1581 "'conf.getsection'",
1582 DeprecationWarning,
1583 stacklevel=2,
1584 )
1585 return conf.getsection(*args, **kwargs)
1588def has_option(*args, **kwargs) -> bool:
1589 """Historical has_option."""
1590 warnings.warn(
1591 "Accessing configuration method 'has_option' directly from the configuration module is "
1592 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1593 "'conf.has_option'",
1594 DeprecationWarning,
1595 stacklevel=2,
1596 )
1597 return conf.has_option(*args, **kwargs)
1600def remove_option(*args, **kwargs) -> bool:
1601 """Historical remove_option."""
1602 warnings.warn(
1603 "Accessing configuration method 'remove_option' directly from the configuration module is "
1604 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1605 "'conf.remove_option'",
1606 DeprecationWarning,
1607 stacklevel=2,
1608 )
1609 return conf.remove_option(*args, **kwargs)
1612def as_dict(*args, **kwargs) -> ConfigSourcesType:
1613 """Historical as_dict."""
1614 warnings.warn(
1615 "Accessing configuration method 'as_dict' directly from the configuration module is "
1616 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1617 "'conf.as_dict'",
1618 DeprecationWarning,
1619 stacklevel=2,
1620 )
1621 return conf.as_dict(*args, **kwargs)
1624def set(*args, **kwargs) -> None:
1625 """Historical set."""
1626 warnings.warn(
1627 "Accessing configuration method 'set' directly from the configuration module is "
1628 "deprecated. Please access the configuration from the 'configuration.conf' object via "
1629 "'conf.set'",
1630 DeprecationWarning,
1631 stacklevel=2,
1632 )
1633 conf.set(*args, **kwargs)
1636def ensure_secrets_loaded() -> list[BaseSecretsBackend]:
1637 """
1638 Ensure that all secrets backends are loaded.
1639 If the secrets_backend_list contains only 2 default backends, reload it.
1640 """
1641 # Check if the secrets_backend_list contains only 2 default backends
1642 if len(secrets_backend_list) == 2:
1643 return initialize_secrets_backends()
1644 return secrets_backend_list
1647def get_custom_secret_backend() -> BaseSecretsBackend | None:
1648 """Get Secret Backend if defined in airflow.cfg."""
1649 secrets_backend_cls = conf.getimport(section="secrets", key="backend")
1651 if not secrets_backend_cls:
1652 return None
1654 try:
1655 backend_kwargs = conf.getjson(section="secrets", key="backend_kwargs")
1656 if not backend_kwargs:
1657 backend_kwargs = {}
1658 elif not isinstance(backend_kwargs, dict):
1659 raise ValueError("not a dict")
1660 except AirflowConfigException:
1661 log.warning("Failed to parse [secrets] backend_kwargs as JSON, defaulting to no kwargs.")
1662 backend_kwargs = {}
1663 except ValueError:
1664 log.warning("Failed to parse [secrets] backend_kwargs into a dict, defaulting to no kwargs.")
1665 backend_kwargs = {}
1667 return secrets_backend_cls(**backend_kwargs)
1670def initialize_secrets_backends() -> list[BaseSecretsBackend]:
1671 """
1672 Initialize secrets backend.
1674 * import secrets backend classes
1675 * instantiate them and return them in a list
1676 """
1677 backend_list = []
1679 custom_secret_backend = get_custom_secret_backend()
1681 if custom_secret_backend is not None:
1682 backend_list.append(custom_secret_backend)
1684 for class_name in DEFAULT_SECRETS_SEARCH_PATH:
1685 secrets_backend_cls = import_string(class_name)
1686 backend_list.append(secrets_backend_cls())
1688 return backend_list
1691@functools.lru_cache(maxsize=None)
1692def _DEFAULT_CONFIG() -> str:
1693 path = _default_config_file_path("default_airflow.cfg")
1694 with open(path) as fh:
1695 return fh.read()
1698@functools.lru_cache(maxsize=None)
1699def _TEST_CONFIG() -> str:
1700 path = _default_config_file_path("default_test.cfg")
1701 with open(path) as fh:
1702 return fh.read()
1705_deprecated = {
1706 "DEFAULT_CONFIG": _DEFAULT_CONFIG,
1707 "TEST_CONFIG": _TEST_CONFIG,
1708 "TEST_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_test.cfg"),
1709 "DEFAULT_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_airflow.cfg"),
1710}
1713def __getattr__(name):
1714 if name in _deprecated:
1715 warnings.warn(
1716 f"{__name__}.{name} is deprecated and will be removed in future",
1717 DeprecationWarning,
1718 stacklevel=2,
1719 )
1720 return _deprecated[name]()
1721 raise AttributeError(f"module {__name__} has no attribute {name}")
1724# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
1725# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults.
1726AIRFLOW_HOME = get_airflow_home()
1727AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME)
1730# Set up dags folder for unit tests
1731# this directory won't exist if users install via pip
1732_TEST_DAGS_FOLDER = os.path.join(
1733 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags"
1734)
1735if os.path.exists(_TEST_DAGS_FOLDER):
1736 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
1737else:
1738 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags")
1740# Set up plugins folder for unit tests
1741_TEST_PLUGINS_FOLDER = os.path.join(
1742 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins"
1743)
1744if os.path.exists(_TEST_PLUGINS_FOLDER):
1745 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
1746else:
1747 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins")
1750TEST_CONFIG_FILE = get_airflow_test_config(AIRFLOW_HOME)
1752SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8")
1753FERNET_KEY = "" # Set only if needed when generating a new file
1754WEBSERVER_CONFIG = "" # Set by initialize_config
1756conf = initialize_config()
1757secrets_backend_list = initialize_secrets_backends()
1758conf.validate()