Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/configuration.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

994 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import contextlib 

20import datetime 

21import functools 

22import itertools 

23import json 

24import logging 

25import multiprocessing 

26import os 

27import pathlib 

28import shlex 

29import stat 

30import subprocess 

31import sys 

32import warnings 

33from base64 import b64encode 

34from configparser import ConfigParser, NoOptionError, NoSectionError 

35from contextlib import contextmanager 

36from copy import deepcopy 

37from io import StringIO 

38from json.decoder import JSONDecodeError 

39from typing import IO, TYPE_CHECKING, Any, Dict, Generator, Iterable, Pattern, Set, Tuple, Union 

40from urllib.parse import urlsplit 

41 

42import re2 

43from packaging.version import parse as parse_version 

44from typing_extensions import overload 

45 

46from airflow.exceptions import AirflowConfigException 

47from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH 

48from airflow.utils import yaml 

49from airflow.utils.empty_set import _get_empty_set_for_configuration 

50from airflow.utils.module_loading import import_string 

51from airflow.utils.providers_configuration_loader import providers_configuration_loaded 

52from airflow.utils.weight_rule import WeightRule 

53 

54if TYPE_CHECKING: 

55 from airflow.auth.managers.base_auth_manager import BaseAuthManager 

56 from airflow.secrets import BaseSecretsBackend 

57 

58log = logging.getLogger(__name__) 

59 

60# show Airflow's deprecation warnings 

61if not sys.warnoptions: 

62 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow") 

63 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow") 

64 

65_SQLITE3_VERSION_PATTERN = re2.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$") 

66 

67ConfigType = Union[str, int, float, bool] 

68ConfigOptionsDictType = Dict[str, ConfigType] 

69ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]] 

70ConfigSourcesType = Dict[str, ConfigSectionSourcesType] 

71 

72ENV_VAR_PREFIX = "AIRFLOW__" 

73 

74 

75def _parse_sqlite_version(s: str) -> tuple[int, ...]: 

76 match = _SQLITE3_VERSION_PATTERN.match(s) 

77 if match is None: 

78 return () 

79 return tuple(int(p) for p in match.group("version").split(".")) 

80 

81 

82@overload 

83def expand_env_var(env_var: None) -> None: ... 

84 

85 

86@overload 

87def expand_env_var(env_var: str) -> str: ... 

88 

89 

90def expand_env_var(env_var: str | None) -> str | None: 

91 """ 

92 Expand (potentially nested) env vars. 

93 

94 Repeat and apply `expandvars` and `expanduser` until 

95 interpolation stops having any effect. 

96 """ 

97 if not env_var: 

98 return env_var 

99 while True: 

100 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

101 if interpolated == env_var: 

102 return interpolated 

103 else: 

104 env_var = interpolated 

105 

106 

107def run_command(command: str) -> str: 

108 """Run command and returns stdout.""" 

109 process = subprocess.Popen( 

110 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

111 ) 

112 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

113 

114 if process.returncode != 0: 

115 raise AirflowConfigException( 

116 f"Cannot execute {command}. Error code is: {process.returncode}. " 

117 f"Output: {output}, Stderr: {stderr}" 

118 ) 

119 

120 return output 

121 

122 

123def _get_config_value_from_secret_backend(config_key: str) -> str | None: 

124 """Get Config option values from Secret Backend.""" 

125 try: 

126 secrets_client = get_custom_secret_backend() 

127 if not secrets_client: 

128 return None 

129 return secrets_client.get_config(config_key) 

130 except Exception as e: 

131 raise AirflowConfigException( 

132 "Cannot retrieve config from alternative secrets backend. " 

133 "Make sure it is configured properly and that the Backend " 

134 "is accessible.\n" 

135 f"{e}" 

136 ) 

137 

138 

139def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool: 

140 """ 

141 Check if the config is a template. 

142 

143 :param configuration_description: description of configuration 

144 :param section: section 

145 :param key: key 

146 :return: True if the config is a template 

147 """ 

148 return configuration_description.get(section, {}).get(key, {}).get("is_template", False) 

149 

150 

151def _default_config_file_path(file_name: str) -> str: 

152 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates") 

153 return os.path.join(templates_dir, file_name) 

154 

155 

156def retrieve_configuration_description( 

157 include_airflow: bool = True, 

158 include_providers: bool = True, 

159 selected_provider: str | None = None, 

160) -> dict[str, dict[str, Any]]: 

161 """ 

162 Read Airflow configuration description from YAML file. 

163 

164 :param include_airflow: Include Airflow configs 

165 :param include_providers: Include provider configs 

166 :param selected_provider: If specified, include selected provider only 

167 :return: Python dictionary containing configs & their info 

168 """ 

169 base_configuration_description: dict[str, dict[str, Any]] = {} 

170 if include_airflow: 

171 with open(_default_config_file_path("config.yml")) as config_file: 

172 base_configuration_description.update(yaml.safe_load(config_file)) 

173 if include_providers: 

174 from airflow.providers_manager import ProvidersManager 

175 

176 for provider, config in ProvidersManager().provider_configs: 

177 if not selected_provider or provider == selected_provider: 

178 base_configuration_description.update(config) 

179 return base_configuration_description 

180 

181 

182class AirflowConfigParser(ConfigParser): 

183 """ 

184 Custom Airflow Configparser supporting defaults and deprecated options. 

185 

186 This is a subclass of ConfigParser that supports defaults and deprecated options. 

187 

188 The defaults are stored in the ``_default_values ConfigParser. The configuration description keeps 

189 description of all the options available in Airflow (description follow config.yaml.schema). 

190 

191 :param default_config: default configuration (in the form of ini file). 

192 :param configuration_description: description of configuration to use 

193 """ 

194 

195 def __init__( 

196 self, 

197 default_config: str | None = None, 

198 *args, 

199 **kwargs, 

200 ): 

201 super().__init__(*args, **kwargs) 

202 self.configuration_description = retrieve_configuration_description(include_providers=False) 

203 self.upgraded_values = {} 

204 # For those who would like to use a different data structure to keep defaults: 

205 # We have to keep the default values in a ConfigParser rather than in any other 

206 # data structure, because the values we have might contain %% which are ConfigParser 

207 # interpolation placeholders. The _default_values config parser will interpolate them 

208 # properly when we call get() on it. 

209 self._default_values = create_default_config_parser(self.configuration_description) 

210 self._pre_2_7_default_values = create_pre_2_7_defaults() 

211 if default_config is not None: 

212 self._update_defaults_from_string(default_config) 

213 self._update_logging_deprecated_template_to_one_from_defaults() 

214 self.is_validated = False 

215 self._suppress_future_warnings = False 

216 self._providers_configuration_loaded = False 

217 

218 def _update_logging_deprecated_template_to_one_from_defaults(self): 

219 default = self.get_default_value("logging", "log_filename_template") 

220 if default: 

221 # Tuple does not support item assignment, so we have to create a new tuple and replace it 

222 original_replacement = self.deprecated_values["logging"]["log_filename_template"] 

223 self.deprecated_values["logging"]["log_filename_template"] = ( 

224 original_replacement[0], 

225 default, 

226 original_replacement[2], 

227 ) 

228 

229 def is_template(self, section: str, key) -> bool: 

230 """ 

231 Return whether the value is templated. 

232 

233 :param section: section of the config 

234 :param key: key in the section 

235 :return: True if the value is templated 

236 """ 

237 if self.configuration_description is None: 

238 return False 

239 return _is_template(self.configuration_description, section, key) 

240 

241 def _update_defaults_from_string(self, config_string: str): 

242 """ 

243 Update the defaults in _default_values based on values in config_string ("ini" format). 

244 

245 Note that those values are not validated and cannot contain variables because we are using 

246 regular config parser to load them. This method is used to test the config parser in unit tests. 

247 

248 :param config_string: ini-formatted config string 

249 """ 

250 parser = ConfigParser() 

251 parser.read_string(config_string) 

252 for section in parser.sections(): 

253 if section not in self._default_values.sections(): 

254 self._default_values.add_section(section) 

255 errors = False 

256 for key, value in parser.items(section): 

257 if not self.is_template(section, key) and "{" in value: 

258 errors = True 

259 log.error( 

260 "The %s.%s value %s read from string contains variable. This is not supported", 

261 section, 

262 key, 

263 value, 

264 ) 

265 self._default_values.set(section, key, value) 

266 if errors: 

267 raise AirflowConfigException( 

268 f"The string config passed as default contains variables. " 

269 f"This is not supported. String config: {config_string}" 

270 ) 

271 

272 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any: 

273 """ 

274 Retrieve default value from default config parser. 

275 

276 This will retrieve the default value from the default config parser. Optionally a raw, stored 

277 value can be retrieved by setting skip_interpolation to True. This is useful for example when 

278 we want to write the default value to a file, and we don't want the interpolation to happen 

279 as it is going to be done later when the config is read. 

280 

281 :param section: section of the config 

282 :param key: key to use 

283 :param fallback: fallback value to use 

284 :param raw: if raw, then interpolation will be reversed 

285 :param kwargs: other args 

286 :return: 

287 """ 

288 value = self._default_values.get(section, key, fallback=fallback, **kwargs) 

289 if raw and value is not None: 

290 return value.replace("%", "%%") 

291 return value 

292 

293 def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> Any: 

294 """Get pre 2.7 default config values.""" 

295 return self._pre_2_7_default_values.get(section, key, fallback=None, **kwargs) 

296 

297 # These configuration elements can be fetched as the stdout of commands 

298 # following the "{section}__{name}_cmd" pattern, the idea behind this 

299 # is to not store password on boxes in text files. 

300 # These configs can also be fetched from Secrets backend 

301 # following the "{section}__{name}__secret" pattern 

302 @functools.cached_property 

303 def sensitive_config_values(self) -> Set[tuple[str, str]]: # noqa: UP006 

304 if self.configuration_description is None: 

305 return ( 

306 _get_empty_set_for_configuration() 

307 ) # we can't use set() here because set is defined below # ¯\_(ツ)_/¯ 

308 flattened = { 

309 (s, k): item 

310 for s, s_c in self.configuration_description.items() 

311 for k, item in s_c.get("options").items() # type: ignore[union-attr] 

312 } 

313 sensitive = { 

314 (section.lower(), key.lower()) 

315 for (section, key), v in flattened.items() 

316 if v.get("sensitive") is True 

317 } 

318 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options} 

319 depr_section = { 

320 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections 

321 } 

322 sensitive.update(depr_section, depr_option) 

323 return sensitive 

324 

325 # A mapping of (new section, new option) -> (old section, old option, since_version). 

326 # When reading new option, the old option will be checked to see if it exists. If it does a 

327 # DeprecationWarning will be issued and the old option will be used instead 

328 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { 

329 ("celery", "worker_precheck"): ("core", "worker_precheck", "2.0.0"), 

330 ("logging", "interleave_timestamp_parser"): ("core", "interleave_timestamp_parser", "2.6.1"), 

331 ("logging", "base_log_folder"): ("core", "base_log_folder", "2.0.0"), 

332 ("logging", "remote_logging"): ("core", "remote_logging", "2.0.0"), 

333 ("logging", "remote_log_conn_id"): ("core", "remote_log_conn_id", "2.0.0"), 

334 ("logging", "remote_base_log_folder"): ("core", "remote_base_log_folder", "2.0.0"), 

335 ("logging", "encrypt_s3_logs"): ("core", "encrypt_s3_logs", "2.0.0"), 

336 ("logging", "logging_level"): ("core", "logging_level", "2.0.0"), 

337 ("logging", "fab_logging_level"): ("core", "fab_logging_level", "2.0.0"), 

338 ("logging", "logging_config_class"): ("core", "logging_config_class", "2.0.0"), 

339 ("logging", "colored_console_log"): ("core", "colored_console_log", "2.0.0"), 

340 ("logging", "colored_log_format"): ("core", "colored_log_format", "2.0.0"), 

341 ("logging", "colored_formatter_class"): ("core", "colored_formatter_class", "2.0.0"), 

342 ("logging", "log_format"): ("core", "log_format", "2.0.0"), 

343 ("logging", "simple_log_format"): ("core", "simple_log_format", "2.0.0"), 

344 ("logging", "task_log_prefix_template"): ("core", "task_log_prefix_template", "2.0.0"), 

345 ("logging", "log_filename_template"): ("core", "log_filename_template", "2.0.0"), 

346 ("logging", "log_processor_filename_template"): ("core", "log_processor_filename_template", "2.0.0"), 

347 ("logging", "dag_processor_manager_log_location"): ( 

348 "core", 

349 "dag_processor_manager_log_location", 

350 "2.0.0", 

351 ), 

352 ("logging", "task_log_reader"): ("core", "task_log_reader", "2.0.0"), 

353 ("metrics", "metrics_allow_list"): ("metrics", "statsd_allow_list", "2.6.0"), 

354 ("metrics", "metrics_block_list"): ("metrics", "statsd_block_list", "2.6.0"), 

355 ("metrics", "statsd_on"): ("scheduler", "statsd_on", "2.0.0"), 

356 ("metrics", "statsd_host"): ("scheduler", "statsd_host", "2.0.0"), 

357 ("metrics", "statsd_port"): ("scheduler", "statsd_port", "2.0.0"), 

358 ("metrics", "statsd_prefix"): ("scheduler", "statsd_prefix", "2.0.0"), 

359 ("metrics", "statsd_allow_list"): ("scheduler", "statsd_allow_list", "2.0.0"), 

360 ("metrics", "stat_name_handler"): ("scheduler", "stat_name_handler", "2.0.0"), 

361 ("metrics", "statsd_datadog_enabled"): ("scheduler", "statsd_datadog_enabled", "2.0.0"), 

362 ("metrics", "statsd_datadog_tags"): ("scheduler", "statsd_datadog_tags", "2.0.0"), 

363 ("metrics", "statsd_datadog_metrics_tags"): ("scheduler", "statsd_datadog_metrics_tags", "2.6.0"), 

364 ("metrics", "statsd_custom_client_path"): ("scheduler", "statsd_custom_client_path", "2.0.0"), 

365 ("scheduler", "parsing_processes"): ("scheduler", "max_threads", "1.10.14"), 

366 ("scheduler", "scheduler_idle_sleep_time"): ("scheduler", "processor_poll_interval", "2.2.0"), 

367 ("operators", "default_queue"): ("celery", "default_queue", "2.1.0"), 

368 ("core", "hide_sensitive_var_conn_fields"): ("admin", "hide_sensitive_variable_fields", "2.1.0"), 

369 ("core", "sensitive_var_conn_names"): ("admin", "sensitive_variable_fields", "2.1.0"), 

370 ("core", "default_pool_task_slot_count"): ("core", "non_pooled_task_slot_count", "1.10.4"), 

371 ("core", "max_active_tasks_per_dag"): ("core", "dag_concurrency", "2.2.0"), 

372 ("logging", "worker_log_server_port"): ("celery", "worker_log_server_port", "2.2.0"), 

373 ("api", "access_control_allow_origins"): ("api", "access_control_allow_origin", "2.2.0"), 

374 ("api", "auth_backends"): ("api", "auth_backend", "2.3.0"), 

375 ("database", "sql_alchemy_conn"): ("core", "sql_alchemy_conn", "2.3.0"), 

376 ("database", "sql_engine_encoding"): ("core", "sql_engine_encoding", "2.3.0"), 

377 ("database", "sql_engine_collation_for_ids"): ("core", "sql_engine_collation_for_ids", "2.3.0"), 

378 ("database", "sql_alchemy_pool_enabled"): ("core", "sql_alchemy_pool_enabled", "2.3.0"), 

379 ("database", "sql_alchemy_pool_size"): ("core", "sql_alchemy_pool_size", "2.3.0"), 

380 ("database", "sql_alchemy_max_overflow"): ("core", "sql_alchemy_max_overflow", "2.3.0"), 

381 ("database", "sql_alchemy_pool_recycle"): ("core", "sql_alchemy_pool_recycle", "2.3.0"), 

382 ("database", "sql_alchemy_pool_pre_ping"): ("core", "sql_alchemy_pool_pre_ping", "2.3.0"), 

383 ("database", "sql_alchemy_schema"): ("core", "sql_alchemy_schema", "2.3.0"), 

384 ("database", "sql_alchemy_connect_args"): ("core", "sql_alchemy_connect_args", "2.3.0"), 

385 ("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"), 

386 ("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"), 

387 ("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"), 

388 ("scheduler", "task_queued_timeout_check_interval"): ( 

389 "kubernetes_executor", 

390 "worker_pods_pending_timeout_check_interval", 

391 "2.6.0", 

392 ), 

393 } 

394 

395 # A mapping of new configurations to a list of old configurations for when one configuration 

396 # deprecates more than one other deprecation. The deprecation logic for these configurations 

397 # is defined in SchedulerJobRunner. 

398 many_to_one_deprecated_options: dict[tuple[str, str], list[tuple[str, str, str]]] = { 

399 ("scheduler", "task_queued_timeout"): [ 

400 ("celery", "stalled_task_timeout", "2.6.0"), 

401 ("celery", "task_adoption_timeout", "2.6.0"), 

402 ("kubernetes_executor", "worker_pods_pending_timeout", "2.6.0"), 

403 ] 

404 } 

405 

406 # A mapping of new section -> (old section, since_version). 

407 deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")} 

408 

409 # Now build the inverse so we can go from old_section/old_key to new_section/new_key 

410 # if someone tries to retrieve it based on old_section/old_key 

411 @functools.cached_property 

412 def inversed_deprecated_options(self): 

413 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()} 

414 

415 @functools.cached_property 

416 def inversed_deprecated_sections(self): 

417 return { 

418 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items() 

419 } 

420 

421 # A mapping of old default values that we want to change and warn the user 

422 # about. Mapping of section -> setting -> { old, replace, by_version } 

423 deprecated_values: dict[str, dict[str, tuple[Pattern, str, str]]] = { 

424 "core": { 

425 "hostname_callable": (re2.compile(r":"), r".", "2.1"), 

426 }, 

427 "webserver": { 

428 "navbar_color": (re2.compile(r"(?i)\A#007A87\z"), "#fff", "2.1"), 

429 "dag_default_view": (re2.compile(r"^tree$"), "grid", "3.0"), 

430 }, 

431 "email": { 

432 "email_backend": ( 

433 re2.compile(r"^airflow\.contrib\.utils\.sendgrid\.send_email$"), 

434 r"airflow.providers.sendgrid.utils.emailer.send_email", 

435 "2.1", 

436 ), 

437 }, 

438 "logging": { 

439 "log_filename_template": ( 

440 re2.compile(re2.escape("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")), 

441 # The actual replacement value will be updated after defaults are loaded from config.yml 

442 "XX-set-after-default-config-loaded-XX", 

443 "3.0", 

444 ), 

445 }, 

446 "api": { 

447 "auth_backends": ( 

448 re2.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"), 

449 "airflow.api.auth.backend.session", 

450 "3.0", 

451 ), 

452 }, 

453 "elasticsearch": { 

454 "log_id_template": ( 

455 re2.compile("^" + re2.escape("{dag_id}-{task_id}-{execution_date}-{try_number}") + "$"), 

456 "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}", 

457 "3.0", 

458 ) 

459 }, 

460 } 

461 

462 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"] 

463 enums_options = { 

464 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()), 

465 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"], 

466 ("core", "mp_start_method"): multiprocessing.get_all_start_methods(), 

467 ("scheduler", "file_parsing_sort_mode"): ["modified_time", "random_seeded_by_host", "alphabetical"], 

468 ("logging", "logging_level"): _available_logging_levels, 

469 ("logging", "fab_logging_level"): _available_logging_levels, 

470 # celery_logging_level can be empty, which uses logging_level as fallback 

471 ("logging", "celery_logging_level"): [*_available_logging_levels, ""], 

472 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""], 

473 } 

474 

475 upgraded_values: dict[tuple[str, str], str] 

476 """Mapping of (section,option) to the old value that was upgraded""" 

477 

478 def get_sections_including_defaults(self) -> list[str]: 

479 """ 

480 Retrieve all sections from the configuration parser, including sections defined by built-in defaults. 

481 

482 :return: list of section names 

483 """ 

484 return list(dict.fromkeys(itertools.chain(self.configuration_description, self.sections()))) 

485 

486 def get_options_including_defaults(self, section: str) -> list[str]: 

487 """ 

488 Retrieve all possible option from the configuration parser for the section given. 

489 

490 Includes options defined by built-in defaults. 

491 

492 :return: list of option names for the section given 

493 """ 

494 my_own_options = self.options(section) if self.has_section(section) else [] 

495 all_options_from_defaults = self.configuration_description.get(section, {}).get("options", {}) 

496 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options))) 

497 

498 def optionxform(self, optionstr: str) -> str: 

499 """ 

500 Transform option names on every read, get, or set operation. 

501 

502 This changes from the default behaviour of ConfigParser from lower-casing 

503 to instead be case-preserving. 

504 

505 :param optionstr: 

506 :return: 

507 """ 

508 return optionstr 

509 

510 @contextmanager 

511 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]: 

512 """ 

513 Make sure configuration is loaded with or without providers. 

514 

515 This happens regardless if the provider configuration has been loaded before or not. 

516 Restores configuration to the state before entering the context. 

517 

518 :param with_providers: whether providers should be loaded 

519 """ 

520 reload_providers_when_leaving = False 

521 if with_providers and not self._providers_configuration_loaded: 

522 # make sure providers are initialized 

523 from airflow.providers_manager import ProvidersManager 

524 

525 # run internal method to initialize providers configuration in ordered to not trigger the 

526 # initialize_providers_configuration cache (because we will be unloading it now 

527 ProvidersManager()._initialize_providers_configuration() 

528 elif not with_providers and self._providers_configuration_loaded: 

529 reload_providers_when_leaving = True 

530 self.restore_core_default_configuration() 

531 yield 

532 if reload_providers_when_leaving: 

533 self.load_providers_configuration() 

534 

535 @staticmethod 

536 def _write_section_header( 

537 file: IO[str], 

538 include_descriptions: bool, 

539 section_config_description: dict[str, str], 

540 section_to_write: str, 

541 ) -> None: 

542 """Write header for configuration section.""" 

543 file.write(f"[{section_to_write}]\n") 

544 section_description = section_config_description.get("description") 

545 if section_description and include_descriptions: 

546 for line in section_description.splitlines(): 

547 file.write(f"# {line}\n") 

548 file.write("\n") 

549 

550 def _write_option_header( 

551 self, 

552 file: IO[str], 

553 option: str, 

554 extra_spacing: bool, 

555 include_descriptions: bool, 

556 include_env_vars: bool, 

557 include_examples: bool, 

558 include_sources: bool, 

559 section_config_description: dict[str, dict[str, Any]], 

560 section_to_write: str, 

561 sources_dict: ConfigSourcesType, 

562 ) -> tuple[bool, bool]: 

563 """ 

564 Write header for configuration option. 

565 

566 Returns tuple of (should_continue, needs_separation) where needs_separation should be 

567 set if the option needs additional separation to visually separate it from the next option. 

568 """ 

569 from airflow import __version__ as airflow_version 

570 

571 option_config_description = ( 

572 section_config_description.get("options", {}).get(option, {}) 

573 if section_config_description 

574 else {} 

575 ) 

576 version_added = option_config_description.get("version_added") 

577 if version_added is not None and parse_version(version_added) > parse_version( 

578 parse_version(airflow_version).base_version 

579 ): 

580 # skip if option is going to be added in the future version 

581 return False, False 

582 description = option_config_description.get("description") 

583 needs_separation = False 

584 if description and include_descriptions: 

585 for line in description.splitlines(): 

586 file.write(f"# {line}\n") 

587 needs_separation = True 

588 example = option_config_description.get("example") 

589 if example is not None and include_examples: 

590 if extra_spacing: 

591 file.write("#\n") 

592 file.write(f"# Example: {option} = {example}\n") 

593 needs_separation = True 

594 if include_sources and sources_dict: 

595 sources_section = sources_dict.get(section_to_write) 

596 value_with_source = sources_section.get(option) if sources_section else None 

597 if value_with_source is None: 

598 file.write("#\n# Source: not defined\n") 

599 else: 

600 file.write(f"#\n# Source: {value_with_source[1]}\n") 

601 needs_separation = True 

602 if include_env_vars: 

603 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n") 

604 if extra_spacing: 

605 file.write("#\n") 

606 needs_separation = True 

607 return True, needs_separation 

608 

609 def _write_value( 

610 self, 

611 file: IO[str], 

612 option: str, 

613 comment_out_everything: bool, 

614 needs_separation: bool, 

615 only_defaults: bool, 

616 section_to_write: str, 

617 ): 

618 if self._default_values is None: 

619 default_value = None 

620 else: 

621 default_value = self.get_default_value(section_to_write, option, raw=True) 

622 if only_defaults: 

623 value = default_value 

624 else: 

625 value = self.get(section_to_write, option, fallback=default_value, raw=True) 

626 if value is None: 

627 file.write(f"# {option} = \n") 

628 else: 

629 if comment_out_everything: 

630 file.write(f"# {option} = {value}\n") 

631 else: 

632 file.write(f"{option} = {value}\n") 

633 if needs_separation: 

634 file.write("\n") 

635 

636 def write( # type: ignore[override] 

637 self, 

638 file: IO[str], 

639 section: str | None = None, 

640 include_examples: bool = True, 

641 include_descriptions: bool = True, 

642 include_sources: bool = True, 

643 include_env_vars: bool = True, 

644 include_providers: bool = True, 

645 comment_out_everything: bool = False, 

646 hide_sensitive_values: bool = False, 

647 extra_spacing: bool = True, 

648 only_defaults: bool = False, 

649 **kwargs: Any, 

650 ) -> None: 

651 """ 

652 Write configuration with comments and examples to a file. 

653 

654 :param file: file to write to 

655 :param section: section of the config to write, defaults to all sections 

656 :param include_examples: Include examples in the output 

657 :param include_descriptions: Include descriptions in the output 

658 :param include_sources: Include the source of each config option 

659 :param include_env_vars: Include environment variables corresponding to each config option 

660 :param include_providers: Include providers configuration 

661 :param comment_out_everything: Comment out all values 

662 :param hide_sensitive_values: Include sensitive values in the output 

663 :param extra_spacing: Add extra spacing before examples and after variables 

664 :param only_defaults: Only include default values when writing the config, not the actual values 

665 """ 

666 sources_dict = {} 

667 if include_sources: 

668 sources_dict = self.as_dict(display_source=True) 

669 if self._default_values is None: 

670 raise RuntimeError("Cannot write default config, no default config set") 

671 if self.configuration_description is None: 

672 raise RuntimeError("Cannot write default config, no default configuration description set") 

673 with self.make_sure_configuration_loaded(with_providers=include_providers): 

674 for section_to_write in self.get_sections_including_defaults(): 

675 section_config_description = self.configuration_description.get(section_to_write, {}) 

676 if section_to_write != section and section is not None: 

677 continue 

678 if self._default_values.has_section(section_to_write) or self.has_section(section_to_write): 

679 self._write_section_header( 

680 file, include_descriptions, section_config_description, section_to_write 

681 ) 

682 for option in self.get_options_including_defaults(section_to_write): 

683 should_continue, needs_separation = self._write_option_header( 

684 file=file, 

685 option=option, 

686 extra_spacing=extra_spacing, 

687 include_descriptions=include_descriptions, 

688 include_env_vars=include_env_vars, 

689 include_examples=include_examples, 

690 include_sources=include_sources, 

691 section_config_description=section_config_description, 

692 section_to_write=section_to_write, 

693 sources_dict=sources_dict, 

694 ) 

695 self._write_value( 

696 file=file, 

697 option=option, 

698 comment_out_everything=comment_out_everything, 

699 needs_separation=needs_separation, 

700 only_defaults=only_defaults, 

701 section_to_write=section_to_write, 

702 ) 

703 if include_descriptions and not needs_separation: 

704 # extra separation between sections in case last option did not need it 

705 file.write("\n") 

706 

707 def restore_core_default_configuration(self) -> None: 

708 """Restore default configuration for core Airflow. 

709 

710 It does not restore configuration for providers. If you want to restore configuration for 

711 providers, you need to call ``load_providers_configuration`` method. 

712 """ 

713 self.configuration_description = retrieve_configuration_description(include_providers=False) 

714 self._default_values = create_default_config_parser(self.configuration_description) 

715 self._providers_configuration_loaded = False 

716 

717 def validate(self): 

718 self._validate_sqlite3_version() 

719 self._validate_enums() 

720 

721 for section, replacement in self.deprecated_values.items(): 

722 for name, info in replacement.items(): 

723 old, new, version = info 

724 current_value = self.get(section, name, fallback="") 

725 if self._using_old_value(old, current_value): 

726 self.upgraded_values[(section, name)] = current_value 

727 new_value = old.sub(new, current_value) 

728 self._update_env_var(section=section, name=name, new_value=new_value) 

729 self._create_future_warning( 

730 name=name, 

731 section=section, 

732 current_value=current_value, 

733 new_value=new_value, 

734 version=version, 

735 ) 

736 

737 self._upgrade_auth_backends() 

738 self._upgrade_postgres_metastore_conn() 

739 self.is_validated = True 

740 

741 def _upgrade_auth_backends(self): 

742 """ 

743 Ensure a custom auth_backends setting contains session. 

744 

745 This is required by the UI for ajax queries. 

746 """ 

747 old_value = self.get("api", "auth_backends", fallback="") 

748 if old_value in ("airflow.api.auth.backend.default", ""): 

749 # handled by deprecated_values 

750 pass 

751 elif old_value.find("airflow.api.auth.backend.session") == -1: 

752 new_value = old_value + ",airflow.api.auth.backend.session" 

753 self._update_env_var(section="api", name="auth_backends", new_value=new_value) 

754 self.upgraded_values[("api", "auth_backends")] = old_value 

755 

756 # if the old value is set via env var, we need to wipe it 

757 # otherwise, it'll "win" over our adjusted value 

758 old_env_var = self._env_var_name("api", "auth_backend") 

759 os.environ.pop(old_env_var, None) 

760 

761 warnings.warn( 

762 "The auth_backends setting in [api] has had airflow.api.auth.backend.session added " 

763 "in the running config, which is needed by the UI. Please update your config before " 

764 "Apache Airflow 3.0.", 

765 FutureWarning, 

766 stacklevel=1, 

767 ) 

768 

769 def _upgrade_postgres_metastore_conn(self): 

770 """ 

771 Upgrade SQL schemas. 

772 

773 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres` 

774 must be replaced with `postgresql`. 

775 """ 

776 section, key = "database", "sql_alchemy_conn" 

777 old_value = self.get(section, key, _extra_stacklevel=1) 

778 bad_schemes = ["postgres+psycopg2", "postgres"] 

779 good_scheme = "postgresql" 

780 parsed = urlsplit(old_value) 

781 if parsed.scheme in bad_schemes: 

782 warnings.warn( 

783 f"Bad scheme in Airflow configuration core > sql_alchemy_conn: `{parsed.scheme}`. " 

784 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must " 

785 f"change to `{good_scheme}` before the next Airflow release.", 

786 FutureWarning, 

787 stacklevel=1, 

788 ) 

789 self.upgraded_values[(section, key)] = old_value 

790 new_value = re2.sub("^" + re2.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value) 

791 self._update_env_var(section=section, name=key, new_value=new_value) 

792 

793 # if the old value is set via env var, we need to wipe it 

794 # otherwise, it'll "win" over our adjusted value 

795 old_env_var = self._env_var_name("core", key) 

796 os.environ.pop(old_env_var, None) 

797 

798 def _validate_enums(self): 

799 """Validate that enum type config has an accepted value.""" 

800 for (section_key, option_key), enum_options in self.enums_options.items(): 

801 if self.has_option(section_key, option_key): 

802 value = self.get(section_key, option_key, fallback=None) 

803 if value and value not in enum_options: 

804 raise AirflowConfigException( 

805 f"`[{section_key}] {option_key}` should not be " 

806 f"{value!r}. Possible values: {', '.join(enum_options)}." 

807 ) 

808 

809 def _validate_sqlite3_version(self): 

810 """Validate SQLite version. 

811 

812 Some features in storing rendered fields require SQLite >= 3.15.0. 

813 """ 

814 if "sqlite" not in self.get("database", "sql_alchemy_conn"): 

815 return 

816 

817 import sqlite3 

818 

819 min_sqlite_version = (3, 15, 0) 

820 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version: 

821 return 

822 

823 from airflow.utils.docs import get_docs_url 

824 

825 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version) 

826 raise AirflowConfigException( 

827 f"error: SQLite C library too old (< {min_sqlite_version_str}). " 

828 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}" 

829 ) 

830 

831 def _using_old_value(self, old: Pattern, current_value: str) -> bool: 

832 return old.search(current_value) is not None 

833 

834 def _update_env_var(self, section: str, name: str, new_value: str): 

835 env_var = self._env_var_name(section, name) 

836 # Set it as an env var so that any subprocesses keep the same override! 

837 os.environ[env_var] = new_value 

838 

839 @staticmethod 

840 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any, version: str): 

841 warnings.warn( 

842 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. " 

843 f"This value has been changed to {new_value!r} in the running config, but " 

844 f"please update your config before Apache Airflow {version}.", 

845 FutureWarning, 

846 stacklevel=3, 

847 ) 

848 

849 def _env_var_name(self, section: str, key: str) -> str: 

850 return f"{ENV_VAR_PREFIX}{section.replace('.', '_').upper()}__{key.upper()}" 

851 

852 def _get_env_var_option(self, section: str, key: str): 

853 # must have format AIRFLOW__{SECTION}__{KEY} (note double underscore) 

854 env_var = self._env_var_name(section, key) 

855 if env_var in os.environ: 

856 return expand_env_var(os.environ[env_var]) 

857 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command) 

858 env_var_cmd = env_var + "_CMD" 

859 if env_var_cmd in os.environ: 

860 # if this is a valid command key... 

861 if (section, key) in self.sensitive_config_values: 

862 return run_command(os.environ[env_var_cmd]) 

863 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend) 

864 env_var_secret_path = env_var + "_SECRET" 

865 if env_var_secret_path in os.environ: 

866 # if this is a valid secret path... 

867 if (section, key) in self.sensitive_config_values: 

868 return _get_config_value_from_secret_backend(os.environ[env_var_secret_path]) 

869 return None 

870 

871 def _get_cmd_option(self, section: str, key: str): 

872 fallback_key = key + "_cmd" 

873 if (section, key) in self.sensitive_config_values: 

874 if super().has_option(section, fallback_key): 

875 command = super().get(section, fallback_key) 

876 return run_command(command) 

877 return None 

878 

879 def _get_cmd_option_from_config_sources( 

880 self, config_sources: ConfigSourcesType, section: str, key: str 

881 ) -> str | None: 

882 fallback_key = key + "_cmd" 

883 if (section, key) in self.sensitive_config_values: 

884 section_dict = config_sources.get(section) 

885 if section_dict is not None: 

886 command_value = section_dict.get(fallback_key) 

887 if command_value is not None: 

888 if isinstance(command_value, str): 

889 command = command_value 

890 else: 

891 command = command_value[0] 

892 return run_command(command) 

893 return None 

894 

895 def _get_secret_option(self, section: str, key: str) -> str | None: 

896 """Get Config option values from Secret Backend.""" 

897 fallback_key = key + "_secret" 

898 if (section, key) in self.sensitive_config_values: 

899 if super().has_option(section, fallback_key): 

900 secrets_path = super().get(section, fallback_key) 

901 return _get_config_value_from_secret_backend(secrets_path) 

902 return None 

903 

904 def _get_secret_option_from_config_sources( 

905 self, config_sources: ConfigSourcesType, section: str, key: str 

906 ) -> str | None: 

907 fallback_key = key + "_secret" 

908 if (section, key) in self.sensitive_config_values: 

909 section_dict = config_sources.get(section) 

910 if section_dict is not None: 

911 secrets_path_value = section_dict.get(fallback_key) 

912 if secrets_path_value is not None: 

913 if isinstance(secrets_path_value, str): 

914 secrets_path = secrets_path_value 

915 else: 

916 secrets_path = secrets_path_value[0] 

917 return _get_config_value_from_secret_backend(secrets_path) 

918 return None 

919 

920 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: 

921 value = self.get(section, key, _extra_stacklevel=1, **kwargs) 

922 if value is None: 

923 raise ValueError(f"The value {section}/{key} should be set!") 

924 return value 

925 

926 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]: 

927 value = self.getlist(section, key, **kwargs) 

928 if value is None: 

929 raise ValueError(f"The value {section}/{key} should be set!") 

930 return value 

931 

932 @overload # type: ignore[override] 

933 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... 

934 

935 @overload # type: ignore[override] 

936 def get(self, section: str, key: str, **kwargs) -> str | None: ... 

937 

938 def get( # type: ignore[override,misc] 

939 self, 

940 section: str, 

941 key: str, 

942 suppress_warnings: bool = False, 

943 _extra_stacklevel: int = 0, 

944 **kwargs, 

945 ) -> str | None: 

946 section = section.lower() 

947 key = key.lower() 

948 warning_emitted = False 

949 deprecated_section: str | None 

950 deprecated_key: str | None 

951 

952 option_description = self.configuration_description.get(section, {}).get(key, {}) 

953 if option_description.get("deprecated"): 

954 deprecation_reason = option_description.get("deprecation_reason", "") 

955 warnings.warn( 

956 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}", 

957 DeprecationWarning, 

958 stacklevel=2 + _extra_stacklevel, 

959 ) 

960 # For when we rename whole sections 

961 if section in self.inversed_deprecated_sections: 

962 deprecated_section, deprecated_key = (section, key) 

963 section = self.inversed_deprecated_sections[section] 

964 if not self._suppress_future_warnings: 

965 warnings.warn( 

966 f"The config section [{deprecated_section}] has been renamed to " 

967 f"[{section}]. Please update your `conf.get*` call to use the new name", 

968 FutureWarning, 

969 stacklevel=2 + _extra_stacklevel, 

970 ) 

971 # Don't warn about individual rename if the whole section is renamed 

972 warning_emitted = True 

973 elif (section, key) in self.inversed_deprecated_options: 

974 # Handle using deprecated section/key instead of the new section/key 

975 new_section, new_key = self.inversed_deprecated_options[(section, key)] 

976 if not self._suppress_future_warnings and not warning_emitted: 

977 warnings.warn( 

978 f"section/key [{section}/{key}] has been deprecated, you should use" 

979 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the " 

980 "new name", 

981 FutureWarning, 

982 stacklevel=2 + _extra_stacklevel, 

983 ) 

984 warning_emitted = True 

985 deprecated_section, deprecated_key = section, key 

986 section, key = (new_section, new_key) 

987 elif section in self.deprecated_sections: 

988 # When accessing the new section name, make sure we check under the old config name 

989 deprecated_key = key 

990 deprecated_section = self.deprecated_sections[section][0] 

991 else: 

992 deprecated_section, deprecated_key, _ = self.deprecated_options.get( 

993 (section, key), (None, None, None) 

994 ) 

995 # first check environment variables 

996 option = self._get_environment_variables( 

997 deprecated_key, 

998 deprecated_section, 

999 key, 

1000 section, 

1001 issue_warning=not warning_emitted, 

1002 extra_stacklevel=_extra_stacklevel, 

1003 ) 

1004 if option is not None: 

1005 return option 

1006 

1007 # ...then the config file 

1008 option = self._get_option_from_config_file( 

1009 deprecated_key, 

1010 deprecated_section, 

1011 key, 

1012 kwargs, 

1013 section, 

1014 issue_warning=not warning_emitted, 

1015 extra_stacklevel=_extra_stacklevel, 

1016 ) 

1017 if option is not None: 

1018 return option 

1019 

1020 # ...then commands 

1021 option = self._get_option_from_commands( 

1022 deprecated_key, 

1023 deprecated_section, 

1024 key, 

1025 section, 

1026 issue_warning=not warning_emitted, 

1027 extra_stacklevel=_extra_stacklevel, 

1028 ) 

1029 if option is not None: 

1030 return option 

1031 

1032 # ...then from secret backends 

1033 option = self._get_option_from_secrets( 

1034 deprecated_key, 

1035 deprecated_section, 

1036 key, 

1037 section, 

1038 issue_warning=not warning_emitted, 

1039 extra_stacklevel=_extra_stacklevel, 

1040 ) 

1041 if option is not None: 

1042 return option 

1043 

1044 # ...then the default config 

1045 if self.get_default_value(section, key) is not None or "fallback" in kwargs: 

1046 return expand_env_var(self.get_default_value(section, key, **kwargs)) 

1047 

1048 if self.get_default_pre_2_7_value(section, key) is not None: 

1049 # no expansion needed 

1050 return self.get_default_pre_2_7_value(section, key, **kwargs) 

1051 

1052 if not suppress_warnings: 

1053 log.warning("section/key [%s/%s] not found in config", section, key) 

1054 

1055 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config") 

1056 

1057 def _get_option_from_secrets( 

1058 self, 

1059 deprecated_key: str | None, 

1060 deprecated_section: str | None, 

1061 key: str, 

1062 section: str, 

1063 issue_warning: bool = True, 

1064 extra_stacklevel: int = 0, 

1065 ) -> str | None: 

1066 option = self._get_secret_option(section, key) 

1067 if option: 

1068 return option 

1069 if deprecated_section and deprecated_key: 

1070 with self.suppress_future_warnings(): 

1071 option = self._get_secret_option(deprecated_section, deprecated_key) 

1072 if option: 

1073 if issue_warning: 

1074 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

1075 return option 

1076 return None 

1077 

1078 def _get_option_from_commands( 

1079 self, 

1080 deprecated_key: str | None, 

1081 deprecated_section: str | None, 

1082 key: str, 

1083 section: str, 

1084 issue_warning: bool = True, 

1085 extra_stacklevel: int = 0, 

1086 ) -> str | None: 

1087 option = self._get_cmd_option(section, key) 

1088 if option: 

1089 return option 

1090 if deprecated_section and deprecated_key: 

1091 with self.suppress_future_warnings(): 

1092 option = self._get_cmd_option(deprecated_section, deprecated_key) 

1093 if option: 

1094 if issue_warning: 

1095 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

1096 return option 

1097 return None 

1098 

1099 def _get_option_from_config_file( 

1100 self, 

1101 deprecated_key: str | None, 

1102 deprecated_section: str | None, 

1103 key: str, 

1104 kwargs: dict[str, Any], 

1105 section: str, 

1106 issue_warning: bool = True, 

1107 extra_stacklevel: int = 0, 

1108 ) -> str | None: 

1109 if super().has_option(section, key): 

1110 # Use the parent's methods to get the actual config here to be able to 

1111 # separate the config from default config. 

1112 return expand_env_var(super().get(section, key, **kwargs)) 

1113 if deprecated_section and deprecated_key: 

1114 if super().has_option(deprecated_section, deprecated_key): 

1115 if issue_warning: 

1116 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

1117 with self.suppress_future_warnings(): 

1118 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs)) 

1119 return None 

1120 

1121 def _get_environment_variables( 

1122 self, 

1123 deprecated_key: str | None, 

1124 deprecated_section: str | None, 

1125 key: str, 

1126 section: str, 

1127 issue_warning: bool = True, 

1128 extra_stacklevel: int = 0, 

1129 ) -> str | None: 

1130 option = self._get_env_var_option(section, key) 

1131 if option is not None: 

1132 return option 

1133 if deprecated_section and deprecated_key: 

1134 with self.suppress_future_warnings(): 

1135 option = self._get_env_var_option(deprecated_section, deprecated_key) 

1136 if option is not None: 

1137 if issue_warning: 

1138 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

1139 return option 

1140 return None 

1141 

1142 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override] 

1143 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip() 

1144 if "#" in val: 

1145 val = val.split("#")[0].strip() 

1146 if val in ("t", "true", "1"): 

1147 return True 

1148 elif val in ("f", "false", "0"): 

1149 return False 

1150 else: 

1151 raise AirflowConfigException( 

1152 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. ' 

1153 f'Current value: "{val}".' 

1154 ) 

1155 

1156 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override] 

1157 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1158 if val is None: 

1159 raise AirflowConfigException( 

1160 f"Failed to convert value None to int. " 

1161 f'Please check "{key}" key in "{section}" section is set.' 

1162 ) 

1163 try: 

1164 return int(val) 

1165 except ValueError: 

1166 raise AirflowConfigException( 

1167 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1168 f'Current value: "{val}".' 

1169 ) 

1170 

1171 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override] 

1172 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1173 if val is None: 

1174 raise AirflowConfigException( 

1175 f"Failed to convert value None to float. " 

1176 f'Please check "{key}" key in "{section}" section is set.' 

1177 ) 

1178 try: 

1179 return float(val) 

1180 except ValueError: 

1181 raise AirflowConfigException( 

1182 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. ' 

1183 f'Current value: "{val}".' 

1184 ) 

1185 

1186 def getlist(self, section: str, key: str, delimiter=",", **kwargs): 

1187 val = self.get(section, key, **kwargs) 

1188 if val is None: 

1189 raise AirflowConfigException( 

1190 f"Failed to convert value None to list. " 

1191 f'Please check "{key}" key in "{section}" section is set.' 

1192 ) 

1193 try: 

1194 return [item.strip() for item in val.split(delimiter)] 

1195 except Exception: 

1196 raise AirflowConfigException( 

1197 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. ' 

1198 f'Current value: "{val}".' 

1199 ) 

1200 

1201 def getimport(self, section: str, key: str, **kwargs) -> Any: 

1202 """ 

1203 Read options, import the full qualified name, and return the object. 

1204 

1205 In case of failure, it throws an exception with the key and section names 

1206 

1207 :return: The object or None, if the option is empty 

1208 """ 

1209 full_qualified_path = conf.get(section=section, key=key, **kwargs) 

1210 if not full_qualified_path: 

1211 return None 

1212 

1213 try: 

1214 return import_string(full_qualified_path) 

1215 except ImportError as e: 

1216 log.warning(e) 

1217 raise AirflowConfigException( 

1218 f'The object could not be loaded. Please check "{key}" key in "{section}" section. ' 

1219 f'Current value: "{full_qualified_path}".' 

1220 ) 

1221 

1222 def getjson( 

1223 self, section: str, key: str, fallback=None, **kwargs 

1224 ) -> dict | list | str | int | float | None: 

1225 """ 

1226 Return a config value parsed from a JSON string. 

1227 

1228 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given. 

1229 """ 

1230 try: 

1231 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs) 

1232 except (NoSectionError, NoOptionError): 

1233 data = None 

1234 

1235 if data is None or data == "": 

1236 return fallback 

1237 

1238 try: 

1239 return json.loads(data) 

1240 except JSONDecodeError as e: 

1241 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e 

1242 

1243 def gettimedelta( 

1244 self, section: str, key: str, fallback: Any = None, **kwargs 

1245 ) -> datetime.timedelta | None: 

1246 """ 

1247 Get the config value for the given section and key, and convert it into datetime.timedelta object. 

1248 

1249 If the key is missing, then it is considered as `None`. 

1250 

1251 :param section: the section from the config 

1252 :param key: the key defined in the given section 

1253 :param fallback: fallback value when no config value is given, defaults to None 

1254 :raises AirflowConfigException: raised because ValueError or OverflowError 

1255 :return: datetime.timedelta(seconds=<config_value>) or None 

1256 """ 

1257 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs) 

1258 

1259 if val: 

1260 # the given value must be convertible to integer 

1261 try: 

1262 int_val = int(val) 

1263 except ValueError: 

1264 raise AirflowConfigException( 

1265 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1266 f'Current value: "{val}".' 

1267 ) 

1268 

1269 try: 

1270 return datetime.timedelta(seconds=int_val) 

1271 except OverflowError as err: 

1272 raise AirflowConfigException( 

1273 f"Failed to convert value to timedelta in `seconds`. " 

1274 f"{err}. " 

1275 f'Please check "{key}" key in "{section}" section. Current value: "{val}".' 

1276 ) 

1277 

1278 return fallback 

1279 

1280 def read( 

1281 self, 

1282 filenames: (str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike]), 

1283 encoding=None, 

1284 ): 

1285 super().read(filenames=filenames, encoding=encoding) 

1286 

1287 def read_dict( # type: ignore[override] 

1288 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>" 

1289 ): 

1290 """ 

1291 We define a different signature here to add better type hints and checking. 

1292 

1293 :param dictionary: dictionary to read from 

1294 :param source: source to be used to store the configuration 

1295 :return: 

1296 """ 

1297 super().read_dict(dictionary=dictionary, source=source) 

1298 

1299 def has_option(self, section: str, option: str) -> bool: 

1300 """ 

1301 Check if option is defined. 

1302 

1303 Uses self.get() to avoid reimplementing the priority order of config variables 

1304 (env, config, cmd, defaults). 

1305 

1306 :param section: section to get option from 

1307 :param option: option to get 

1308 :return: 

1309 """ 

1310 try: 

1311 value = self.get(section, option, fallback=None, _extra_stacklevel=1, suppress_warnings=True) 

1312 if value is None: 

1313 return False 

1314 return True 

1315 except (NoOptionError, NoSectionError): 

1316 return False 

1317 

1318 def set(self, section: str, option: str, value: str | None = None) -> None: 

1319 """ 

1320 Set an option to the given value. 

1321 

1322 This override just makes sure the section and option are lower case, to match what we do in `get`. 

1323 """ 

1324 section = section.lower() 

1325 option = option.lower() 

1326 super().set(section, option, value) 

1327 

1328 def remove_option(self, section: str, option: str, remove_default: bool = True): 

1329 """ 

1330 Remove an option if it exists in config from a file or default config. 

1331 

1332 If both of config have the same option, this removes the option 

1333 in both configs unless remove_default=False. 

1334 """ 

1335 section = section.lower() 

1336 option = option.lower() 

1337 if super().has_option(section, option): 

1338 super().remove_option(section, option) 

1339 

1340 if self.get_default_value(section, option) is not None and remove_default: 

1341 self._default_values.remove_option(section, option) 

1342 

1343 def getsection(self, section: str) -> ConfigOptionsDictType | None: 

1344 """ 

1345 Return the section as a dict. 

1346 

1347 Values are converted to int, float, bool as required. 

1348 

1349 :param section: section from the config 

1350 """ 

1351 if not self.has_section(section) and not self._default_values.has_section(section): 

1352 return None 

1353 if self._default_values.has_section(section): 

1354 _section: ConfigOptionsDictType = dict(self._default_values.items(section)) 

1355 else: 

1356 _section = {} 

1357 

1358 if self.has_section(section): 

1359 _section.update(self.items(section)) 

1360 

1361 section_prefix = self._env_var_name(section, "") 

1362 for env_var in sorted(os.environ.keys()): 

1363 if env_var.startswith(section_prefix): 

1364 key = env_var.replace(section_prefix, "") 

1365 if key.endswith("_CMD"): 

1366 key = key[:-4] 

1367 key = key.lower() 

1368 _section[key] = self._get_env_var_option(section, key) 

1369 

1370 for key, val in _section.items(): 

1371 if val is None: 

1372 raise AirflowConfigException( 

1373 f"Failed to convert value automatically. " 

1374 f'Please check "{key}" key in "{section}" section is set.' 

1375 ) 

1376 try: 

1377 _section[key] = int(val) 

1378 except ValueError: 

1379 try: 

1380 _section[key] = float(val) 

1381 except ValueError: 

1382 if isinstance(val, str) and val.lower() in ("t", "true"): 

1383 _section[key] = True 

1384 elif isinstance(val, str) and val.lower() in ("f", "false"): 

1385 _section[key] = False 

1386 return _section 

1387 

1388 def as_dict( 

1389 self, 

1390 display_source: bool = False, 

1391 display_sensitive: bool = False, 

1392 raw: bool = False, 

1393 include_env: bool = True, 

1394 include_cmds: bool = True, 

1395 include_secret: bool = True, 

1396 ) -> ConfigSourcesType: 

1397 """ 

1398 Return the current configuration as an OrderedDict of OrderedDicts. 

1399 

1400 When materializing current configuration Airflow defaults are 

1401 materialized along with user set configs. If any of the `include_*` 

1402 options are False then the result of calling command or secret key 

1403 configs do not override Airflow defaults and instead are passed through. 

1404 In order to then avoid Airflow defaults from overwriting user set 

1405 command or secret key configs we filter out bare sensitive_config_values 

1406 that are set to Airflow defaults when command or secret key configs 

1407 produce different values. 

1408 

1409 :param display_source: If False, the option value is returned. If True, 

1410 a tuple of (option_value, source) is returned. Source is either 

1411 'airflow.cfg', 'default', 'env var', or 'cmd'. 

1412 :param display_sensitive: If True, the values of options set by env 

1413 vars and bash commands will be displayed. If False, those options 

1414 are shown as '< hidden >' 

1415 :param raw: Should the values be output as interpolated values, or the 

1416 "raw" form that can be fed back in to ConfigParser 

1417 :param include_env: Should the value of configuration from AIRFLOW__ 

1418 environment variables be included or not 

1419 :param include_cmds: Should the result of calling any *_cmd config be 

1420 set (True, default), or should the _cmd options be left as the 

1421 command to run (False) 

1422 :param include_secret: Should the result of calling any *_secret config be 

1423 set (True, default), or should the _secret options be left as the 

1424 path to get the secret from (False) 

1425 :return: Dictionary, where the key is the name of the section and the content is 

1426 the dictionary with the name of the parameter and its value. 

1427 """ 

1428 if not display_sensitive: 

1429 # We want to hide the sensitive values at the appropriate methods 

1430 # since envs from cmds, secrets can be read at _include_envs method 

1431 if not all([include_env, include_cmds, include_secret]): 

1432 raise ValueError( 

1433 "If display_sensitive is false, then include_env, " 

1434 "include_cmds, include_secret must all be set as True" 

1435 ) 

1436 

1437 config_sources: ConfigSourcesType = {} 

1438 

1439 # We check sequentially all those sources and the last one we saw it in will "win" 

1440 configs: Iterable[tuple[str, ConfigParser]] = [ 

1441 ("default-pre-2-7", self._pre_2_7_default_values), 

1442 ("default", self._default_values), 

1443 ("airflow.cfg", self), 

1444 ] 

1445 

1446 self._replace_config_with_display_sources( 

1447 config_sources, 

1448 configs, 

1449 self.configuration_description if self.configuration_description else {}, 

1450 display_source, 

1451 raw, 

1452 self.deprecated_options, 

1453 include_cmds=include_cmds, 

1454 include_env=include_env, 

1455 include_secret=include_secret, 

1456 ) 

1457 

1458 # add env vars and overwrite because they have priority 

1459 if include_env: 

1460 self._include_envs(config_sources, display_sensitive, display_source, raw) 

1461 else: 

1462 self._filter_by_source(config_sources, display_source, self._get_env_var_option) 

1463 

1464 # add bash commands 

1465 if include_cmds: 

1466 self._include_commands(config_sources, display_sensitive, display_source, raw) 

1467 else: 

1468 self._filter_by_source(config_sources, display_source, self._get_cmd_option) 

1469 

1470 # add config from secret backends 

1471 if include_secret: 

1472 self._include_secrets(config_sources, display_sensitive, display_source, raw) 

1473 else: 

1474 self._filter_by_source(config_sources, display_source, self._get_secret_option) 

1475 

1476 if not display_sensitive: 

1477 # This ensures the ones from config file is hidden too 

1478 # if they are not provided through env, cmd and secret 

1479 hidden = "< hidden >" 

1480 for section, key in self.sensitive_config_values: 

1481 if config_sources.get(section): 

1482 if config_sources[section].get(key, None): 

1483 if display_source: 

1484 source = config_sources[section][key][1] 

1485 config_sources[section][key] = (hidden, source) 

1486 else: 

1487 config_sources[section][key] = hidden 

1488 

1489 return config_sources 

1490 

1491 def _include_secrets( 

1492 self, 

1493 config_sources: ConfigSourcesType, 

1494 display_sensitive: bool, 

1495 display_source: bool, 

1496 raw: bool, 

1497 ): 

1498 for section, key in self.sensitive_config_values: 

1499 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key) 

1500 if value: 

1501 if not display_sensitive: 

1502 value = "< hidden >" 

1503 if display_source: 

1504 opt: str | tuple[str, str] = (value, "secret") 

1505 elif raw: 

1506 opt = value.replace("%", "%%") 

1507 else: 

1508 opt = value 

1509 config_sources.setdefault(section, {}).update({key: opt}) 

1510 del config_sources[section][key + "_secret"] 

1511 

1512 def _include_commands( 

1513 self, 

1514 config_sources: ConfigSourcesType, 

1515 display_sensitive: bool, 

1516 display_source: bool, 

1517 raw: bool, 

1518 ): 

1519 for section, key in self.sensitive_config_values: 

1520 opt = self._get_cmd_option_from_config_sources(config_sources, section, key) 

1521 if not opt: 

1522 continue 

1523 opt_to_set: str | tuple[str, str] | None = opt 

1524 if not display_sensitive: 

1525 opt_to_set = "< hidden >" 

1526 if display_source: 

1527 opt_to_set = (str(opt_to_set), "cmd") 

1528 elif raw: 

1529 opt_to_set = str(opt_to_set).replace("%", "%%") 

1530 if opt_to_set is not None: 

1531 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set} 

1532 config_sources.setdefault(section, {}).update(dict_to_update) 

1533 del config_sources[section][key + "_cmd"] 

1534 

1535 def _include_envs( 

1536 self, 

1537 config_sources: ConfigSourcesType, 

1538 display_sensitive: bool, 

1539 display_source: bool, 

1540 raw: bool, 

1541 ): 

1542 for env_var in [ 

1543 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX) 

1544 ]: 

1545 try: 

1546 _, section, key = env_var.split("__", 2) 

1547 opt = self._get_env_var_option(section, key) 

1548 except ValueError: 

1549 continue 

1550 if opt is None: 

1551 log.warning("Ignoring unknown env var '%s'", env_var) 

1552 continue 

1553 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"): 

1554 # Don't hide cmd/secret values here 

1555 if not env_var.lower().endswith(("cmd", "secret")): 

1556 if (section, key) in self.sensitive_config_values: 

1557 opt = "< hidden >" 

1558 elif raw: 

1559 opt = opt.replace("%", "%%") 

1560 if display_source: 

1561 opt = (opt, "env var") 

1562 

1563 section = section.lower() 

1564 # if we lower key for kubernetes_environment_variables section, 

1565 # then we won't be able to set any Airflow environment 

1566 # variables. Airflow only parse environment variables starts 

1567 # with AIRFLOW_. Therefore, we need to make it a special case. 

1568 if section != "kubernetes_environment_variables": 

1569 key = key.lower() 

1570 config_sources.setdefault(section, {}).update({key: opt}) 

1571 

1572 def _filter_by_source( 

1573 self, 

1574 config_sources: ConfigSourcesType, 

1575 display_source: bool, 

1576 getter_func, 

1577 ): 

1578 """ 

1579 Delete default configs from current configuration. 

1580 

1581 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values. 

1582 

1583 This is necessary because bare configs take precedence over the command 

1584 or secret key equivalents so if the current running config is 

1585 materialized with Airflow defaults they in turn override user set 

1586 command or secret key configs. 

1587 

1588 :param config_sources: The current configuration to operate on 

1589 :param display_source: If False, configuration options contain raw 

1590 values. If True, options are a tuple of (option_value, source). 

1591 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'. 

1592 :param getter_func: A callback function that gets the user configured 

1593 override value for a particular sensitive_config_values config. 

1594 :return: None, the given config_sources is filtered if necessary, 

1595 otherwise untouched. 

1596 """ 

1597 for section, key in self.sensitive_config_values: 

1598 # Don't bother if we don't have section / key 

1599 if section not in config_sources or key not in config_sources[section]: 

1600 continue 

1601 # Check that there is something to override defaults 

1602 try: 

1603 getter_opt = getter_func(section, key) 

1604 except ValueError: 

1605 continue 

1606 if not getter_opt: 

1607 continue 

1608 # Check to see that there is a default value 

1609 if self.get_default_value(section, key) is None: 

1610 continue 

1611 # Check to see if bare setting is the same as defaults 

1612 if display_source: 

1613 # when display_source = true, we know that the config_sources contains tuple 

1614 opt, source = config_sources[section][key] # type: ignore 

1615 else: 

1616 opt = config_sources[section][key] 

1617 if opt == self.get_default_value(section, key): 

1618 del config_sources[section][key] 

1619 

1620 @staticmethod 

1621 def _replace_config_with_display_sources( 

1622 config_sources: ConfigSourcesType, 

1623 configs: Iterable[tuple[str, ConfigParser]], 

1624 configuration_description: dict[str, dict[str, Any]], 

1625 display_source: bool, 

1626 raw: bool, 

1627 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

1628 include_env: bool, 

1629 include_cmds: bool, 

1630 include_secret: bool, 

1631 ): 

1632 for source_name, config in configs: 

1633 sections = config.sections() 

1634 for section in sections: 

1635 AirflowConfigParser._replace_section_config_with_display_sources( 

1636 config, 

1637 config_sources, 

1638 configuration_description, 

1639 display_source, 

1640 raw, 

1641 section, 

1642 source_name, 

1643 deprecated_options, 

1644 configs, 

1645 include_env=include_env, 

1646 include_cmds=include_cmds, 

1647 include_secret=include_secret, 

1648 ) 

1649 

1650 @staticmethod 

1651 def _deprecated_value_is_set_in_config( 

1652 deprecated_section: str, 

1653 deprecated_key: str, 

1654 configs: Iterable[tuple[str, ConfigParser]], 

1655 ) -> bool: 

1656 for config_type, config in configs: 

1657 if config_type != "default": 

1658 with contextlib.suppress(NoSectionError): 

1659 deprecated_section_array = config.items(section=deprecated_section, raw=True) 

1660 if any(key == deprecated_key for key, _ in deprecated_section_array): 

1661 return True 

1662 else: 

1663 return False 

1664 

1665 @staticmethod 

1666 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

1667 return ( 

1668 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}") 

1669 is not None 

1670 ) 

1671 

1672 @staticmethod 

1673 def _deprecated_command_is_set_in_config( 

1674 deprecated_section: str, 

1675 deprecated_key: str, 

1676 configs: Iterable[tuple[str, ConfigParser]], 

1677 ) -> bool: 

1678 return AirflowConfigParser._deprecated_value_is_set_in_config( 

1679 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs 

1680 ) 

1681 

1682 @staticmethod 

1683 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

1684 return ( 

1685 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD") 

1686 is not None 

1687 ) 

1688 

1689 @staticmethod 

1690 def _deprecated_secret_is_set_in_config( 

1691 deprecated_section: str, 

1692 deprecated_key: str, 

1693 configs: Iterable[tuple[str, ConfigParser]], 

1694 ) -> bool: 

1695 return AirflowConfigParser._deprecated_value_is_set_in_config( 

1696 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs 

1697 ) 

1698 

1699 @staticmethod 

1700 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

1701 return ( 

1702 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET") 

1703 is not None 

1704 ) 

1705 

1706 @contextmanager 

1707 def suppress_future_warnings(self): 

1708 suppress_future_warnings = self._suppress_future_warnings 

1709 self._suppress_future_warnings = True 

1710 yield self 

1711 self._suppress_future_warnings = suppress_future_warnings 

1712 

1713 @staticmethod 

1714 def _replace_section_config_with_display_sources( 

1715 config: ConfigParser, 

1716 config_sources: ConfigSourcesType, 

1717 configuration_description: dict[str, dict[str, Any]], 

1718 display_source: bool, 

1719 raw: bool, 

1720 section: str, 

1721 source_name: str, 

1722 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

1723 configs: Iterable[tuple[str, ConfigParser]], 

1724 include_env: bool, 

1725 include_cmds: bool, 

1726 include_secret: bool, 

1727 ): 

1728 sect = config_sources.setdefault(section, {}) 

1729 if isinstance(config, AirflowConfigParser): 

1730 with config.suppress_future_warnings(): 

1731 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw) 

1732 else: 

1733 items = config.items(section=section, raw=raw) 

1734 for k, val in items: 

1735 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None)) 

1736 if deprecated_section and deprecated_key: 

1737 if source_name == "default": 

1738 # If deprecated entry has some non-default value set for any of the sources requested, 

1739 # We should NOT set default for the new entry (because it will override anything 

1740 # coming from the deprecated ones) 

1741 if AirflowConfigParser._deprecated_value_is_set_in_config( 

1742 deprecated_section, deprecated_key, configs 

1743 ): 

1744 continue 

1745 if include_env and AirflowConfigParser._deprecated_variable_is_set( 

1746 deprecated_section, deprecated_key 

1747 ): 

1748 continue 

1749 if include_cmds and ( 

1750 AirflowConfigParser._deprecated_variable_command_is_set( 

1751 deprecated_section, deprecated_key 

1752 ) 

1753 or AirflowConfigParser._deprecated_command_is_set_in_config( 

1754 deprecated_section, deprecated_key, configs 

1755 ) 

1756 ): 

1757 continue 

1758 if include_secret and ( 

1759 AirflowConfigParser._deprecated_variable_secret_is_set( 

1760 deprecated_section, deprecated_key 

1761 ) 

1762 or AirflowConfigParser._deprecated_secret_is_set_in_config( 

1763 deprecated_section, deprecated_key, configs 

1764 ) 

1765 ): 

1766 continue 

1767 if display_source: 

1768 updated_source_name = source_name 

1769 if source_name == "default": 

1770 # defaults can come from other sources (default-<PROVIDER>) that should be used here 

1771 source_description_section = configuration_description.get(section, {}) 

1772 source_description_key = source_description_section.get("options", {}).get(k, {}) 

1773 if source_description_key is not None: 

1774 updated_source_name = source_description_key.get("source", source_name) 

1775 sect[k] = (val, updated_source_name) 

1776 else: 

1777 sect[k] = val 

1778 

1779 def load_test_config(self): 

1780 """ 

1781 Use test configuration rather than the configuration coming from airflow defaults. 

1782 

1783 When running tests we use special the unit_test configuration to avoid accidental modifications and 

1784 different behaviours when running the tests. Values for those test configuration are stored in 

1785 the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder 

1786 and you need to change values there if you want to make some specific configuration to be used 

1787 """ 

1788 # We need those globals before we run "get_all_expansion_variables" because this is where 

1789 # the variables are expanded from in the configuration 

1790 global FERNET_KEY, AIRFLOW_HOME 

1791 from cryptography.fernet import Fernet 

1792 

1793 unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg" 

1794 unit_test_config = unit_test_config_file.read_text() 

1795 self.remove_all_read_configurations() 

1796 with StringIO(unit_test_config) as test_config_file: 

1797 self.read_file(test_config_file) 

1798 # set fernet key to a random value 

1799 global FERNET_KEY 

1800 FERNET_KEY = Fernet.generate_key().decode() 

1801 self.expand_all_configuration_values() 

1802 log.info("Unit test configuration loaded from 'config_unit_tests.cfg'") 

1803 

1804 def expand_all_configuration_values(self): 

1805 """Expand all configuration values using global and local variables defined in this module.""" 

1806 all_vars = get_all_expansion_variables() 

1807 for section in self.sections(): 

1808 for key, value in self.items(section): 

1809 if value is not None: 

1810 if self.has_option(section, key): 

1811 self.remove_option(section, key) 

1812 if self.is_template(section, key) or not isinstance(value, str): 

1813 self.set(section, key, value) 

1814 else: 

1815 self.set(section, key, value.format(**all_vars)) 

1816 

1817 def remove_all_read_configurations(self): 

1818 """Remove all read configurations, leaving only default values in the config.""" 

1819 for section in self.sections(): 

1820 self.remove_section(section) 

1821 

1822 @property 

1823 def providers_configuration_loaded(self) -> bool: 

1824 """Checks if providers have been loaded.""" 

1825 return self._providers_configuration_loaded 

1826 

1827 def load_providers_configuration(self): 

1828 """ 

1829 Load configuration for providers. 

1830 

1831 This should be done after initial configuration have been performed. Initializing and discovering 

1832 providers is an expensive operation and cannot be performed when we load configuration for the first 

1833 time when airflow starts, because we initialize configuration very early, during importing of the 

1834 `airflow` package and the module is not yet ready to be used when it happens and until configuration 

1835 and settings are loaded. Therefore, in order to reload provider configuration we need to additionally 

1836 load provider - specific configuration. 

1837 """ 

1838 log.debug("Loading providers configuration") 

1839 from airflow.providers_manager import ProvidersManager 

1840 

1841 self.restore_core_default_configuration() 

1842 for provider, config in ProvidersManager().already_initialized_provider_configs: 

1843 for provider_section, provider_section_content in config.items(): 

1844 provider_options = provider_section_content["options"] 

1845 section_in_current_config = self.configuration_description.get(provider_section) 

1846 if not section_in_current_config: 

1847 self.configuration_description[provider_section] = deepcopy(provider_section_content) 

1848 section_in_current_config = self.configuration_description.get(provider_section) 

1849 section_in_current_config["source"] = f"default-{provider}" 

1850 for option in provider_options: 

1851 section_in_current_config["options"][option]["source"] = f"default-{provider}" 

1852 else: 

1853 section_source = section_in_current_config.get("source", "Airflow's core package").split( 

1854 "default-" 

1855 )[-1] 

1856 raise AirflowConfigException( 

1857 f"The provider {provider} is attempting to contribute " 

1858 f"configuration section {provider_section} that " 

1859 f"has already been added before. The source of it: {section_source}. " 

1860 "This is forbidden. A provider can only add new sections. It " 

1861 "cannot contribute options to existing sections or override other " 

1862 "provider's configuration.", 

1863 UserWarning, 

1864 ) 

1865 self._default_values = create_default_config_parser(self.configuration_description) 

1866 # sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete 

1867 # the cached values, and it will be refreshed on next access. This has been an implementation 

1868 # detail in Python 3.8 but as of Python 3.9 it is documented behaviour. 

1869 # See https://docs.python.org/3/library/functools.html#functools.cached_property 

1870 try: 

1871 del self.sensitive_config_values 

1872 except AttributeError: 

1873 # no problem if cache is not set yet 

1874 pass 

1875 self._providers_configuration_loaded = True 

1876 

1877 @staticmethod 

1878 def _warn_deprecate( 

1879 section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int 

1880 ): 

1881 if section == deprecated_section: 

1882 warnings.warn( 

1883 f"The {deprecated_name} option in [{section}] has been renamed to {key} - " 

1884 f"the old setting has been used, but please update your config.", 

1885 DeprecationWarning, 

1886 stacklevel=4 + extra_stacklevel, 

1887 ) 

1888 else: 

1889 warnings.warn( 

1890 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option " 

1891 f"in [{section}] - the old setting has been used, but please update your config.", 

1892 DeprecationWarning, 

1893 stacklevel=4 + extra_stacklevel, 

1894 ) 

1895 

1896 def __getstate__(self) -> dict[str, Any]: 

1897 """Return the state of the object as a dictionary for pickling.""" 

1898 return { 

1899 name: getattr(self, name) 

1900 for name in [ 

1901 "_sections", 

1902 "is_validated", 

1903 "configuration_description", 

1904 "upgraded_values", 

1905 "_default_values", 

1906 ] 

1907 } 

1908 

1909 def __setstate__(self, state) -> None: 

1910 """Restore the state of the object from a dictionary representation.""" 

1911 self.__init__() # type: ignore[misc] 

1912 config = state.pop("_sections") 

1913 self.read_dict(config) 

1914 self.__dict__.update(state) 

1915 

1916 

1917def get_airflow_home() -> str: 

1918 """Get path to Airflow Home.""" 

1919 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow")) 

1920 

1921 

1922def get_airflow_config(airflow_home: str) -> str: 

1923 """Get Path to airflow.cfg path.""" 

1924 airflow_config_var = os.environ.get("AIRFLOW_CONFIG") 

1925 if airflow_config_var is None: 

1926 return os.path.join(airflow_home, "airflow.cfg") 

1927 return expand_env_var(airflow_config_var) 

1928 

1929 

1930def get_all_expansion_variables() -> dict[str, Any]: 

1931 return {k: v for d in [globals(), locals()] for k, v in d.items()} 

1932 

1933 

1934def _generate_fernet_key() -> str: 

1935 from cryptography.fernet import Fernet 

1936 

1937 return Fernet.generate_key().decode() 

1938 

1939 

1940def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser: 

1941 """ 

1942 Create default config parser based on configuration description. 

1943 

1944 It creates ConfigParser with all default values retrieved from the configuration description and 

1945 expands all the variables from the global and local variables defined in this module. 

1946 

1947 :param configuration_description: configuration description - retrieved from config.yaml files 

1948 following the schema defined in "config.yml.schema.json" in the config_templates folder. 

1949 :return: Default Config Parser that can be used to read configuration values from. 

1950 """ 

1951 parser = ConfigParser() 

1952 all_vars = get_all_expansion_variables() 

1953 for section, section_desc in configuration_description.items(): 

1954 parser.add_section(section) 

1955 options = section_desc["options"] 

1956 for key in options: 

1957 default_value = options[key]["default"] 

1958 is_template = options[key].get("is_template", False) 

1959 if default_value is not None: 

1960 if is_template or not isinstance(default_value, str): 

1961 parser.set(section, key, default_value) 

1962 else: 

1963 parser.set(section, key, default_value.format(**all_vars)) 

1964 return parser 

1965 

1966 

1967def create_pre_2_7_defaults() -> ConfigParser: 

1968 """ 

1969 Create parser using the old defaults from Airflow < 2.7.0. 

1970 

1971 This is used in order to be able to fall-back to those defaults when old version of provider, 

1972 not supporting "config contribution" is installed with Airflow 2.7.0+. This "default" 

1973 configuration does not support variable expansion, those are pretty much hard-coded defaults ' 

1974 we want to fall-back to in such case. 

1975 """ 

1976 config_parser = ConfigParser() 

1977 config_parser.read(_default_config_file_path("pre_2_7_defaults.cfg")) 

1978 return config_parser 

1979 

1980 

1981def write_default_airflow_configuration_if_needed() -> AirflowConfigParser: 

1982 airflow_config = pathlib.Path(AIRFLOW_CONFIG) 

1983 if airflow_config.is_dir(): 

1984 msg = ( 

1985 "Airflow config expected to be a path to the configuration file, " 

1986 f"but got a directory {airflow_config.__fspath__()!r}." 

1987 ) 

1988 raise IsADirectoryError(msg) 

1989 elif not airflow_config.exists(): 

1990 log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__()) 

1991 config_directory = airflow_config.parent 

1992 if not config_directory.exists(): 

1993 # Compatibility with Python 3.8, ``PurePath.is_relative_to`` was added in Python 3.9 

1994 try: 

1995 config_directory.relative_to(AIRFLOW_HOME) 

1996 except ValueError: 

1997 msg = ( 

1998 f"Config directory {config_directory.__fspath__()!r} not exists " 

1999 f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. " 

2000 "Please create this directory first." 

2001 ) 

2002 raise FileNotFoundError(msg) from None 

2003 log.debug("Create directory %r for Airflow config", config_directory.__fspath__()) 

2004 config_directory.mkdir(parents=True, exist_ok=True) 

2005 if conf.get("core", "fernet_key", fallback=None) is None: 

2006 # We know that FERNET_KEY is not set, so we can generate it, set as global key 

2007 # and also write it to the config file so that same key will be used next time 

2008 global FERNET_KEY 

2009 FERNET_KEY = _generate_fernet_key() 

2010 conf.remove_option("core", "fernet_key") 

2011 conf.set("core", "fernet_key", FERNET_KEY) 

2012 pathlib.Path(airflow_config.__fspath__()).touch() 

2013 make_group_other_inaccessible(airflow_config.__fspath__()) 

2014 with open(airflow_config, "w") as file: 

2015 conf.write( 

2016 file, 

2017 include_sources=False, 

2018 include_env_vars=True, 

2019 include_providers=True, 

2020 extra_spacing=True, 

2021 only_defaults=True, 

2022 ) 

2023 return conf 

2024 

2025 

2026def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser): 

2027 """ 

2028 Load standard airflow configuration. 

2029 

2030 In case it finds that the configuration file is missing, it will create it and write the default 

2031 configuration values there, based on defaults passed, and will add the comments and examples 

2032 from the default configuration. 

2033 

2034 :param airflow_config_parser: parser to which the configuration will be loaded 

2035 

2036 """ 

2037 global AIRFLOW_HOME 

2038 log.info("Reading the config from %s", AIRFLOW_CONFIG) 

2039 airflow_config_parser.read(AIRFLOW_CONFIG) 

2040 if airflow_config_parser.has_option("core", "AIRFLOW_HOME"): 

2041 msg = ( 

2042 "Specifying both AIRFLOW_HOME environment variable and airflow_home " 

2043 "in the config file is deprecated. Please use only the AIRFLOW_HOME " 

2044 "environment variable and remove the config file entry." 

2045 ) 

2046 if "AIRFLOW_HOME" in os.environ: 

2047 warnings.warn(msg, category=DeprecationWarning, stacklevel=1) 

2048 elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME: 

2049 warnings.warn( 

2050 "Specifying airflow_home in the config file is deprecated. As you " 

2051 "have left it at the default value you should remove the setting " 

2052 "from your airflow.cfg and suffer no change in behaviour.", 

2053 category=DeprecationWarning, 

2054 stacklevel=1, 

2055 ) 

2056 else: 

2057 # there 

2058 AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home") # type: ignore[assignment] 

2059 warnings.warn(msg, category=DeprecationWarning, stacklevel=1) 

2060 

2061 

2062def initialize_config() -> AirflowConfigParser: 

2063 """ 

2064 Load the Airflow config files. 

2065 

2066 Called for you automatically as part of the Airflow boot process. 

2067 """ 

2068 airflow_config_parser = AirflowConfigParser() 

2069 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

2070 airflow_config_parser.load_test_config() 

2071 else: 

2072 load_standard_airflow_configuration(airflow_config_parser) 

2073 # If the user set unit_test_mode in the airflow.cfg, we still 

2074 # want to respect that and then load the default unit test configuration 

2075 # file on top of it. 

2076 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

2077 airflow_config_parser.load_test_config() 

2078 # Set the WEBSERVER_CONFIG variable 

2079 global WEBSERVER_CONFIG 

2080 WEBSERVER_CONFIG = airflow_config_parser.get("webserver", "config_file") 

2081 return airflow_config_parser 

2082 

2083 

2084@providers_configuration_loaded 

2085def write_webserver_configuration_if_needed(airflow_config_parser: AirflowConfigParser): 

2086 webserver_config = airflow_config_parser.get("webserver", "config_file") 

2087 if not os.path.isfile(webserver_config): 

2088 import shutil 

2089 

2090 pathlib.Path(webserver_config).parent.mkdir(parents=True, exist_ok=True) 

2091 log.info("Creating new FAB webserver config file in: %s", webserver_config) 

2092 shutil.copy(_default_config_file_path("default_webserver_config.py"), webserver_config) 

2093 

2094 

2095def make_group_other_inaccessible(file_path: str): 

2096 try: 

2097 permissions = os.stat(file_path) 

2098 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR)) 

2099 except Exception as e: 

2100 log.warning( 

2101 "Could not change permissions of config file to be group/other inaccessible. " 

2102 "Continuing with original permissions: %s", 

2103 e, 

2104 ) 

2105 

2106 

2107def get(*args, **kwargs) -> ConfigType | None: 

2108 """Historical get.""" 

2109 warnings.warn( 

2110 "Accessing configuration method 'get' directly from the configuration module is " 

2111 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

2112 "'conf.get'", 

2113 DeprecationWarning, 

2114 stacklevel=2, 

2115 ) 

2116 return conf.get(*args, **kwargs) 

2117 

2118 

2119def getboolean(*args, **kwargs) -> bool: 

2120 """Historical getboolean.""" 

2121 warnings.warn( 

2122 "Accessing configuration method 'getboolean' directly from the configuration module is " 

2123 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

2124 "'conf.getboolean'", 

2125 DeprecationWarning, 

2126 stacklevel=2, 

2127 ) 

2128 return conf.getboolean(*args, **kwargs) 

2129 

2130 

2131def getfloat(*args, **kwargs) -> float: 

2132 """Historical getfloat.""" 

2133 warnings.warn( 

2134 "Accessing configuration method 'getfloat' directly from the configuration module is " 

2135 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

2136 "'conf.getfloat'", 

2137 DeprecationWarning, 

2138 stacklevel=2, 

2139 ) 

2140 return conf.getfloat(*args, **kwargs) 

2141 

2142 

2143def getint(*args, **kwargs) -> int: 

2144 """Historical getint.""" 

2145 warnings.warn( 

2146 "Accessing configuration method 'getint' directly from the configuration module is " 

2147 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

2148 "'conf.getint'", 

2149 DeprecationWarning, 

2150 stacklevel=2, 

2151 ) 

2152 return conf.getint(*args, **kwargs) 

2153 

2154 

2155def getsection(*args, **kwargs) -> ConfigOptionsDictType | None: 

2156 """Historical getsection.""" 

2157 warnings.warn( 

2158 "Accessing configuration method 'getsection' directly from the configuration module is " 

2159 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

2160 "'conf.getsection'", 

2161 DeprecationWarning, 

2162 stacklevel=2, 

2163 ) 

2164 return conf.getsection(*args, **kwargs) 

2165 

2166 

2167def has_option(*args, **kwargs) -> bool: 

2168 """Historical has_option.""" 

2169 warnings.warn( 

2170 "Accessing configuration method 'has_option' directly from the configuration module is " 

2171 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

2172 "'conf.has_option'", 

2173 DeprecationWarning, 

2174 stacklevel=2, 

2175 ) 

2176 return conf.has_option(*args, **kwargs) 

2177 

2178 

2179def remove_option(*args, **kwargs) -> bool: 

2180 """Historical remove_option.""" 

2181 warnings.warn( 

2182 "Accessing configuration method 'remove_option' directly from the configuration module is " 

2183 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

2184 "'conf.remove_option'", 

2185 DeprecationWarning, 

2186 stacklevel=2, 

2187 ) 

2188 return conf.remove_option(*args, **kwargs) 

2189 

2190 

2191def as_dict(*args, **kwargs) -> ConfigSourcesType: 

2192 """Historical as_dict.""" 

2193 warnings.warn( 

2194 "Accessing configuration method 'as_dict' directly from the configuration module is " 

2195 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

2196 "'conf.as_dict'", 

2197 DeprecationWarning, 

2198 stacklevel=2, 

2199 ) 

2200 return conf.as_dict(*args, **kwargs) 

2201 

2202 

2203def set(*args, **kwargs) -> None: 

2204 """Historical set.""" 

2205 warnings.warn( 

2206 "Accessing configuration method 'set' directly from the configuration module is " 

2207 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

2208 "'conf.set'", 

2209 DeprecationWarning, 

2210 stacklevel=2, 

2211 ) 

2212 conf.set(*args, **kwargs) 

2213 

2214 

2215def ensure_secrets_loaded() -> list[BaseSecretsBackend]: 

2216 """ 

2217 Ensure that all secrets backends are loaded. 

2218 

2219 If the secrets_backend_list contains only 2 default backends, reload it. 

2220 """ 

2221 # Check if the secrets_backend_list contains only 2 default backends 

2222 if len(secrets_backend_list) == 2: 

2223 return initialize_secrets_backends() 

2224 return secrets_backend_list 

2225 

2226 

2227def get_custom_secret_backend() -> BaseSecretsBackend | None: 

2228 """Get Secret Backend if defined in airflow.cfg.""" 

2229 secrets_backend_cls = conf.getimport(section="secrets", key="backend") 

2230 

2231 if not secrets_backend_cls: 

2232 return None 

2233 

2234 try: 

2235 backend_kwargs = conf.getjson(section="secrets", key="backend_kwargs") 

2236 if not backend_kwargs: 

2237 backend_kwargs = {} 

2238 elif not isinstance(backend_kwargs, dict): 

2239 raise ValueError("not a dict") 

2240 except AirflowConfigException: 

2241 log.warning("Failed to parse [secrets] backend_kwargs as JSON, defaulting to no kwargs.") 

2242 backend_kwargs = {} 

2243 except ValueError: 

2244 log.warning("Failed to parse [secrets] backend_kwargs into a dict, defaulting to no kwargs.") 

2245 backend_kwargs = {} 

2246 

2247 return secrets_backend_cls(**backend_kwargs) 

2248 

2249 

2250def initialize_secrets_backends() -> list[BaseSecretsBackend]: 

2251 """ 

2252 Initialize secrets backend. 

2253 

2254 * import secrets backend classes 

2255 * instantiate them and return them in a list 

2256 """ 

2257 backend_list = [] 

2258 

2259 custom_secret_backend = get_custom_secret_backend() 

2260 

2261 if custom_secret_backend is not None: 

2262 backend_list.append(custom_secret_backend) 

2263 

2264 for class_name in DEFAULT_SECRETS_SEARCH_PATH: 

2265 secrets_backend_cls = import_string(class_name) 

2266 backend_list.append(secrets_backend_cls()) 

2267 

2268 return backend_list 

2269 

2270 

2271@functools.lru_cache(maxsize=None) 

2272def _DEFAULT_CONFIG() -> str: 

2273 path = _default_config_file_path("default_airflow.cfg") 

2274 with open(path) as fh: 

2275 return fh.read() 

2276 

2277 

2278@functools.lru_cache(maxsize=None) 

2279def _TEST_CONFIG() -> str: 

2280 path = _default_config_file_path("default_test.cfg") 

2281 with open(path) as fh: 

2282 return fh.read() 

2283 

2284 

2285_deprecated = { 

2286 "DEFAULT_CONFIG": _DEFAULT_CONFIG, 

2287 "TEST_CONFIG": _TEST_CONFIG, 

2288 "TEST_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_test.cfg"), 

2289 "DEFAULT_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_airflow.cfg"), 

2290} 

2291 

2292 

2293def __getattr__(name): 

2294 if name in _deprecated: 

2295 warnings.warn( 

2296 f"{__name__}.{name} is deprecated and will be removed in future", 

2297 DeprecationWarning, 

2298 stacklevel=2, 

2299 ) 

2300 return _deprecated[name]() 

2301 raise AttributeError(f"module {__name__} has no attribute {name}") 

2302 

2303 

2304def initialize_auth_manager() -> BaseAuthManager: 

2305 """ 

2306 Initialize auth manager. 

2307 

2308 * import user manager class 

2309 * instantiate it and return it 

2310 """ 

2311 auth_manager_cls = conf.getimport(section="core", key="auth_manager") 

2312 

2313 if not auth_manager_cls: 

2314 raise AirflowConfigException( 

2315 "No auth manager defined in the config. " 

2316 "Please specify one using section/key [core/auth_manager]." 

2317 ) 

2318 

2319 return auth_manager_cls() 

2320 

2321 

2322# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using 

2323# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults. 

2324AIRFLOW_HOME = get_airflow_home() 

2325AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME) 

2326 

2327# Set up dags folder for unit tests 

2328# this directory won't exist if users install via pip 

2329_TEST_DAGS_FOLDER = os.path.join( 

2330 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags" 

2331) 

2332if os.path.exists(_TEST_DAGS_FOLDER): 

2333 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER 

2334else: 

2335 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags") 

2336 

2337# Set up plugins folder for unit tests 

2338_TEST_PLUGINS_FOLDER = os.path.join( 

2339 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins" 

2340) 

2341if os.path.exists(_TEST_PLUGINS_FOLDER): 

2342 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER 

2343else: 

2344 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins") 

2345 

2346SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8") 

2347FERNET_KEY = "" # Set only if needed when generating a new file 

2348WEBSERVER_CONFIG = "" # Set by initialize_config 

2349 

2350conf: AirflowConfigParser = initialize_config() 

2351secrets_backend_list = initialize_secrets_backends() 

2352conf.validate()