Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/_shared/configuration/parser.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

872 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Base configuration parser with pure parsing logic.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import datetime 

24import functools 

25import itertools 

26import json 

27import logging 

28import os 

29import shlex 

30import subprocess 

31import sys 

32import warnings 

33from collections.abc import Callable, Generator, Iterable 

34from configparser import ConfigParser, NoOptionError, NoSectionError 

35from contextlib import contextmanager 

36from copy import deepcopy 

37from enum import Enum 

38from json.decoder import JSONDecodeError 

39from re import Pattern 

40from typing import IO, TYPE_CHECKING, Any, TypeVar, overload 

41 

42from .exceptions import AirflowConfigException 

43 

44log = logging.getLogger(__name__) 

45 

46 

47def _build_kwarg_env_prefix(section: str, kwargs_key: str) -> str: 

48 """ 

49 Build env prefix for per-key backend kwargs. 

50 

51 ("secrets", "backend_kwargs") -> "AIRFLOW__SECRETS__BACKEND_KWARG__" 

52 ("workers", "secrets_backend_kwargs") -> "AIRFLOW__WORKERS__SECRETS_BACKEND_KWARG__" 

53 """ 

54 singular_key = kwargs_key.replace("_kwargs", "_kwarg") 

55 return f"{ENV_VAR_PREFIX}{section.upper()}__{singular_key.upper()}__" 

56 

57 

58def _collect_kwarg_env_vars(prefix: str) -> dict[str, str]: 

59 """ 

60 Scan os.environ for per-key secrets backend kwargs. 

61 

62 AIRFLOW__SECRETS__BACKEND_KWARG__ROLE_ID -> {"role_id": value} 

63 Values are raw strings (not JSON-parsed). 

64 Empty keys (trailing __ with no suffix) are ignored. 

65 """ 

66 overrides: dict[str, str] = {} 

67 for env_var, value in os.environ.items(): 

68 if env_var.startswith(prefix): 

69 kwarg_key = env_var[len(prefix) :].lower() 

70 if kwarg_key: 

71 overrides[kwarg_key] = value 

72 return overrides 

73 

74 

75ConfigType = str | int | float | bool 

76ConfigOptionsDictType = dict[str, ConfigType] 

77ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

78ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

79ENV_VAR_PREFIX = "AIRFLOW__" 

80 

81 

82if TYPE_CHECKING: 

83 from airflow.providers_manager import ProvidersManager 

84 from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime 

85 

86 

87class ValueNotFound: 

88 """Object of this is raised when a configuration value cannot be found.""" 

89 

90 pass 

91 

92 

93VALUE_NOT_FOUND_SENTINEL = ValueNotFound() 

94 

95 

96@overload 

97def expand_env_var(env_var: None) -> None: ... 

98@overload 

99def expand_env_var(env_var: str) -> str: ... 

100 

101 

102def expand_env_var(env_var: str | None) -> str | None: 

103 """ 

104 Expand (potentially nested) env vars. 

105 

106 Repeat and apply `expandvars` and `expanduser` until 

107 interpolation stops having any effect. 

108 """ 

109 if not env_var or not isinstance(env_var, str): 

110 return env_var 

111 while True: 

112 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

113 if interpolated == env_var: 

114 return interpolated 

115 env_var = interpolated 

116 

117 

118def run_command(command: str) -> str: 

119 """Run command and returns stdout.""" 

120 process = subprocess.Popen( 

121 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

122 ) 

123 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

124 

125 if process.returncode != 0: 

126 raise AirflowConfigException( 

127 f"Cannot execute {command}. Error code is: {process.returncode}. " 

128 f"Output: {output}, Stderr: {stderr}" 

129 ) 

130 

131 return output 

132 

133 

134def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool: 

135 """ 

136 Check if the config is a template. 

137 

138 :param configuration_description: description of configuration 

139 :param section: section 

140 :param key: key 

141 :return: True if the config is a template 

142 """ 

143 return configuration_description.get(section, {}).get(key, {}).get("is_template", False) 

144 

145 

146def configure_parser_from_configuration_description( 

147 parser: ConfigParser, 

148 configuration_description: dict[str, dict[str, Any]], 

149 all_vars: dict[str, Any], 

150) -> None: 

151 """ 

152 Configure a ConfigParser based on configuration description. 

153 

154 :param parser: ConfigParser to configure 

155 :param configuration_description: configuration description from config.yml 

156 """ 

157 for section, section_desc in configuration_description.items(): 

158 parser.add_section(section) 

159 options = section_desc["options"] 

160 for key in options: 

161 default_value = options[key]["default"] 

162 is_template = options[key].get("is_template", False) 

163 if (default_value is not None) and not ( 

164 options[key].get("version_deprecated") or options[key].get("deprecation_reason") 

165 ): 

166 if is_template or not isinstance(default_value, str): 

167 parser.set(section, key, str(default_value)) 

168 else: 

169 try: 

170 parser.set(section, key, default_value.format(**all_vars)) 

171 except (KeyError, ValueError): 

172 parser.set(section, key, default_value) 

173 

174 

175def create_provider_cfg_config_fallback_defaults( 

176 provider_config_fallback_defaults_cfg_path: str, 

177) -> ConfigParser: 

178 """ 

179 Create fallback defaults for configuration. 

180 

181 This parser contains provider defaults for Airflow configuration, containing fallback default values 

182 that might be needed when provider classes are being imported - before provider's configuration 

183 is loaded. 

184 

185 Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead 

186 to retrieving provider configuration before the defaults for the provider are loaded. 

187 

188 Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or 

189 environment variables) those will be used as usual. 

190 

191 NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication, 

192 at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed. 

193 

194 You've been warned! 

195 

196 :param provider_config_fallback_defaults_cfg_path: path to the provider config fallback defaults .cfg file 

197 """ 

198 config_parser = ConfigParser() 

199 config_parser.read(provider_config_fallback_defaults_cfg_path) 

200 return config_parser 

201 

202 

203class AirflowConfigParser(ConfigParser): 

204 """ 

205 Base configuration parser with pure parsing logic. 

206 

207 This class provides the core parsing methods that work with: 

208 - configuration_description: dict describing config options (required in __init__) 

209 - _default_values: ConfigParser with default values (required in __init__) 

210 - deprecated_options: class attribute mapping new -> old options 

211 - deprecated_sections: class attribute mapping new -> old sections 

212 """ 

213 

214 # A mapping of section -> setting -> { old, replace } for deprecated default values. 

215 # Subclasses can override this to define deprecated values that should be upgraded. 

216 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {} 

217 

218 # A mapping of (new section, new option) -> (old section, old option, since_version). 

219 # When reading new option, the old option will be checked to see if it exists. If it does a 

220 # DeprecationWarning will be issued and the old option will be used instead 

221 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { 

222 ("dag_processor", "dag_file_processor_timeout"): ("core", "dag_file_processor_timeout", "3.0"), 

223 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"), 

224 ("api", "base_url"): ("webserver", "base_url", "3.0"), 

225 ("api", "host"): ("webserver", "web_server_host", "3.0"), 

226 ("api", "port"): ("webserver", "web_server_port", "3.0"), 

227 ("api", "workers"): ("webserver", "workers", "3.0"), 

228 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"), 

229 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"), 

230 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"), 

231 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"), 

232 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"), 

233 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"), 

234 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"), 

235 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"), 

236 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"), 

237 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"), 

238 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"), 

239 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"), 

240 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"), 

241 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"), 

242 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"), 

243 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"), 

244 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"), 

245 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"), 

246 ("core", "num_dag_runs_to_retain_rendered_fields"): ( 

247 "core", 

248 "max_num_rendered_ti_fields_per_task", 

249 "3.2.0", 

250 ), 

251 ("api", "page_size"): ("webserver", "page_size", "3.1.0"), 

252 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"), 

253 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"), 

254 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"), 

255 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"), 

256 ("api", "log_config"): ("api", "access_logfile", "3.1.0"), 

257 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"), 

258 ("api", "fallback_page_limit"): ("api", "page_size", "3.2.0"), 

259 ("workers", "missing_dag_retries"): ("workers", "missing_dag_retires", "3.1.8"), 

260 ("core", "execution_api_server_url"): ("workers", "execution_api_server_url", "3.0"), 

261 ("database", "sql_alchemy_conn"): ("core", "sql_alchemy_conn", "3.0"), 

262 } 

263 

264 # A mapping of new section -> (old section, since_version). 

265 deprecated_sections: dict[str, tuple[str, str]] = {} 

266 

267 @property 

268 def _lookup_sequence(self) -> list[Callable]: 

269 """ 

270 Define the sequence of lookup methods for get(). The definition here does not have provider lookup. 

271 

272 Subclasses can override this to customise lookup order. 

273 """ 

274 lookup_methods = [ 

275 self._get_environment_variables, 

276 self._get_option_from_config_file, 

277 self._get_option_from_commands, 

278 self._get_option_from_secrets, 

279 self._get_option_from_defaults, 

280 ] 

281 if self._use_providers_configuration: 

282 # Provider fallback lookups are last so they have the lowest priority in the lookup sequence. 

283 lookup_methods += [ 

284 self._get_option_from_provider_metadata_config_fallbacks, 

285 self._get_option_from_provider_cfg_config_fallbacks, 

286 ] 

287 return lookup_methods 

288 

289 @functools.cached_property 

290 def configuration_description(self) -> dict[str, dict[str, Any]]: 

291 """ 

292 Return configuration description from multiple sources. 

293 

294 Respects the ``_use_providers_configuration`` flag to decide whether to include 

295 provider configuration. 

296 

297 The merged description is built as follows: 

298 

299 1. Start from the base configuration description provided in ``__init__``, usually 

300 loaded from ``config.yml`` in core. Values defined here are never overridden. 

301 2. Merge provider metadata from ``_provider_metadata_configuration_description``, 

302 loaded from provider packages' ``get_provider_info`` method. Only adds missing 

303 sections/options; does not overwrite existing entries from the base configuration. 

304 3. Merge default values from ``_provider_cfg_config_fallback_default_values``, 

305 loaded from ``provider_config_fallback_defaults.cfg``. Only sets ``"default"`` 

306 (and heuristically ``"sensitive"``) for options that do not already define them. 

307 

308 Base configuration takes precedence, then provider metadata fills in missing 

309 descriptions/options, and finally cfg-based fallbacks provide defaults only where 

310 none are defined. 

311 

312 We use ``cached_property`` to cache the merged result; clear this cache (via 

313 ``invalidate_cache``) when toggling ``_use_providers_configuration``. 

314 """ 

315 if not self._use_providers_configuration: 

316 return self._configuration_description 

317 

318 merged_description: dict[str, dict[str, Any]] = deepcopy(self._configuration_description) 

319 

320 # Merge full provider config descriptions (with metadata like sensitive, description, etc.) 

321 # from provider packages' get_provider_info method, reusing the cached raw dict. 

322 for section, section_content in self._provider_metadata_configuration_description.items(): 

323 if section not in merged_description: 

324 merged_description[section] = deepcopy(section_content) 

325 else: 

326 existing_options = merged_description[section].setdefault("options", {}) 

327 for option, option_content in section_content.get("options", {}).items(): 

328 if option not in existing_options: 

329 existing_options[option] = deepcopy(option_content) 

330 

331 # Merge default values from cfg-based fallbacks (key=value only, no metadata). 

332 # Uses setdefault so provider metadata values above take priority. 

333 cfg = self._provider_cfg_config_fallback_default_values 

334 for section in cfg.sections(): 

335 section_options = merged_description.setdefault(section, {"options": {}}).setdefault( 

336 "options", {} 

337 ) 

338 for option in cfg.options(section): 

339 opt_dict = section_options.setdefault(option, {}) 

340 opt_dict.setdefault("default", cfg.get(section, option)) 

341 # For cfg-only options with no provider metadata, infer sensitivity from name. 

342 if "sensitive" not in opt_dict and option.endswith(("password", "secret")): 

343 opt_dict["sensitive"] = True 

344 

345 return merged_description 

346 

347 @property 

348 def _config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: 

349 """Override the base method to add provider fallbacks when providers are loaded.""" 

350 sources: list[tuple[str, ConfigParser]] = [] 

351 if self._use_providers_configuration: 

352 # Provider fallback defaults are listed first so they have the lowest priority 

353 # in as_dict()'s "last source wins" semantics. 

354 sources += [ 

355 ("provider-cfg-fallback-defaults", self._provider_cfg_config_fallback_default_values), 

356 ( 

357 "provider-metadata-fallback-defaults", 

358 self._provider_metadata_config_fallback_default_values, 

359 ), 

360 ] 

361 sources += [ 

362 ("default", self._default_values), 

363 ("airflow.cfg", self), 

364 ] 

365 return sources 

366 

367 def _get_option_from_provider_cfg_config_fallbacks( 

368 self, 

369 deprecated_key: str | None, 

370 deprecated_section: str | None, 

371 key: str, 

372 section: str, 

373 issue_warning: bool = True, 

374 extra_stacklevel: int = 0, 

375 **kwargs, 

376 ) -> str | ValueNotFound: 

377 """Get config option from provider fallback defaults.""" 

378 value = self.get_from_provider_cfg_config_fallback_defaults(section, key, **kwargs) 

379 if value is not VALUE_NOT_FOUND_SENTINEL: 

380 return value 

381 return VALUE_NOT_FOUND_SENTINEL 

382 

383 def _get_option_from_provider_metadata_config_fallbacks( 

384 self, 

385 deprecated_key: str | None, 

386 deprecated_section: str | None, 

387 key: str, 

388 section: str, 

389 issue_warning: bool = True, 

390 extra_stacklevel: int = 0, 

391 **kwargs, 

392 ) -> str | ValueNotFound: 

393 """Get config option from provider metadata fallback defaults.""" 

394 value = self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs) 

395 if value is not VALUE_NOT_FOUND_SENTINEL: 

396 return value 

397 return VALUE_NOT_FOUND_SENTINEL 

398 

399 def get_from_provider_cfg_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: 

400 """Get provider config fallback default values.""" 

401 raw = kwargs.get("raw", False) 

402 vars_ = kwargs.get("vars") 

403 return self._provider_cfg_config_fallback_default_values.get( 

404 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, raw=raw, vars=vars_ 

405 ) 

406 

407 @functools.cached_property 

408 def _provider_metadata_configuration_description(self) -> dict[str, dict[str, Any]]: 

409 """Raw provider configuration descriptions with full metadata (sensitive, description, etc.).""" 

410 result: dict[str, dict[str, Any]] = {} 

411 for _, config in self._provider_manager_type().provider_configs: 

412 result.update(config) 

413 return result 

414 

415 @functools.cached_property 

416 def _provider_metadata_config_fallback_default_values(self) -> ConfigParser: 

417 """Return Provider metadata config fallback default values.""" 

418 return self._create_default_config_parser_callable(self._provider_metadata_configuration_description) 

419 

420 def get_from_provider_metadata_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: 

421 """Get provider metadata config fallback default values.""" 

422 raw = kwargs.get("raw", False) 

423 vars_ = kwargs.get("vars") 

424 return self._provider_metadata_config_fallback_default_values.get( 

425 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, raw=raw, vars=vars_ 

426 ) 

427 

428 @property 

429 def _validators(self) -> list[Callable[[], None]]: 

430 """ 

431 Return list of validators defined on a config parser class. Base class will return an empty list. 

432 

433 Subclasses can override this to customize the validators that are run during validation on the 

434 config parser instance. 

435 """ 

436 return [] 

437 

438 def validate(self) -> None: 

439 """Run all registered validators.""" 

440 for validator in self._validators: 

441 validator() 

442 self.is_validated = True 

443 

444 def _validate_deprecated_values(self) -> None: 

445 """Validate and upgrade deprecated default values.""" 

446 for section, replacement in self.deprecated_values.items(): 

447 for name, info in replacement.items(): 

448 old, new = info 

449 current_value = self.get(section, name, fallback="") 

450 if self._using_old_value(old, current_value): 

451 self.upgraded_values[(section, name)] = current_value 

452 new_value = old.sub(new, current_value) 

453 self._update_env_var(section=section, name=name, new_value=new_value) 

454 self._create_future_warning( 

455 name=name, 

456 section=section, 

457 current_value=current_value, 

458 new_value=new_value, 

459 ) 

460 

461 def _using_old_value(self, old: Pattern, current_value: str) -> bool: 

462 """Check if current_value matches the old pattern.""" 

463 return old.search(current_value) is not None 

464 

465 def _update_env_var(self, section: str, name: str, new_value: str) -> None: 

466 """Update environment variable with new value.""" 

467 env_var = self._env_var_name(section, name) 

468 # Set it as an env var so that any subprocesses keep the same override! 

469 os.environ[env_var] = new_value 

470 

471 @staticmethod 

472 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None: 

473 """Create a FutureWarning for deprecated default values.""" 

474 warnings.warn( 

475 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. " 

476 f"This value has been changed to {new_value!r} in the running config, but please update your config.", 

477 FutureWarning, 

478 stacklevel=3, 

479 ) 

480 

481 def __init__( 

482 self, 

483 configuration_description: dict[str, dict[str, Any]], 

484 _default_values: ConfigParser, 

485 provider_manager_type: type[ProvidersManager] | type[ProvidersManagerTaskRuntime], 

486 create_default_config_parser_callable: Callable[[dict[str, dict[str, Any]]], ConfigParser], 

487 provider_config_fallback_defaults_cfg_path: str, 

488 *args, 

489 **kwargs, 

490 ): 

491 """ 

492 Initialize the parser. 

493 

494 :param configuration_description: Description of configuration options 

495 :param _default_values: ConfigParser with default values 

496 :param provider_manager_type: Either ProvidersManager or ProvidersManagerTaskRuntime, depending on the context of the caller. 

497 :param create_default_config_parser_callable: The `create_default_config_parser` function from core or SDK, depending on the context of the caller. 

498 :param provider_config_fallback_defaults_cfg_path: Path to the `provider_config_fallback_defaults.cfg` file. 

499 """ 

500 super().__init__(*args, **kwargs) 

501 self._configuration_description = configuration_description 

502 self._default_values = _default_values 

503 self._provider_manager_type = provider_manager_type 

504 self._create_default_config_parser_callable = create_default_config_parser_callable 

505 self._provider_cfg_config_fallback_default_values = create_provider_cfg_config_fallback_defaults( 

506 provider_config_fallback_defaults_cfg_path 

507 ) 

508 self._suppress_future_warnings = False 

509 self.upgraded_values: dict[tuple[str, str], str] = {} 

510 # The _use_providers_configuration flag will always be True unless we call `write(include_providers=False)` or `with self.make_sure_configuration_loaded(with_providers=False)`. 

511 # Even when we call those methods, the flag will be set back to True after the method is done, so it only affects the current call to `as_dict()` and does not have any effect on subsequent calls. 

512 self._use_providers_configuration = True 

513 

514 def invalidate_cache(self) -> None: 

515 """ 

516 Clear all ``functools.cached_property`` entries on this instance. 

517 

518 Call this after mutating class-level attributes (e.g. ``deprecated_options``) 

519 so that derived cached properties are recomputed on next access. 

520 """ 

521 for attr_name in ( 

522 name 

523 for name in dir(type(self)) 

524 if isinstance(getattr(type(self), name, None), functools.cached_property) 

525 ): 

526 self.__dict__.pop(attr_name, None) 

527 

528 def _invalidate_provider_flag_caches(self) -> None: 

529 """Invalidate caches related to provider configuration flags.""" 

530 self.__dict__.pop("configuration_description", None) 

531 self.__dict__.pop("sensitive_config_values", None) 

532 

533 @functools.cached_property 

534 def inversed_deprecated_options(self): 

535 """Build inverse mapping from old options to new options.""" 

536 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()} 

537 

538 @functools.cached_property 

539 def inversed_deprecated_sections(self): 

540 """Build inverse mapping from old sections to new sections.""" 

541 return { 

542 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items() 

543 } 

544 

545 @functools.cached_property 

546 def sensitive_config_values(self) -> set[tuple[str, str]]: 

547 """Get set of sensitive config values that should be masked.""" 

548 flattened = { 

549 (s, k): item 

550 for s, s_c in self.configuration_description.items() 

551 for k, item in s_c.get("options", {}).items() 

552 } 

553 sensitive = { 

554 (section.lower(), key.lower()) 

555 for (section, key), v in flattened.items() 

556 if v.get("sensitive") is True 

557 } 

558 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options} 

559 depr_section = { 

560 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections 

561 } 

562 sensitive.update(depr_section, depr_option) 

563 return sensitive 

564 

565 def _update_defaults_from_string(self, config_string: str) -> None: 

566 """ 

567 Update the defaults in _default_values based on values in config_string ("ini" format). 

568 

569 Override shared parser's method to add validation for template variables. 

570 Note that those values are not validated and cannot contain variables because we are using 

571 regular config parser to load them. This method is used to test the config parser in unit tests. 

572 

573 :param config_string: ini-formatted config string 

574 """ 

575 parser = ConfigParser() 

576 parser.read_string(config_string) 

577 for section in parser.sections(): 

578 if section not in self._default_values.sections(): 

579 self._default_values.add_section(section) 

580 errors = False 

581 for key, value in parser.items(section): 

582 if not self.is_template(section, key) and "{" in value: 

583 errors = True 

584 log.error( 

585 "The %s.%s value %s read from string contains variable. This is not supported", 

586 section, 

587 key, 

588 value, 

589 ) 

590 self._default_values.set(section, key, value) 

591 if errors: 

592 raise AirflowConfigException( 

593 f"The string config passed as default contains variables. " 

594 f"This is not supported. String config: {config_string}" 

595 ) 

596 

597 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any: 

598 """ 

599 Retrieve default value from default config parser, including provider fallbacks. 

600 

601 This will retrieve the default value from the core default config parser first. If not found 

602 and providers configuration is loaded, it also checks provider fallback defaults. 

603 Optionally a raw, stored value can be retrieved by setting skip_interpolation to True. 

604 This is useful for example when we want to write the default value to a file, and we don't 

605 want the interpolation to happen as it is going to be done later when the config is read. 

606 

607 :param section: section of the config 

608 :param key: key to use 

609 :param fallback: fallback value to use 

610 :param raw: if raw, then interpolation will be reversed 

611 :param kwargs: other args 

612 :return: 

613 """ 

614 value = self._default_values.get(section, key, fallback=VALUE_NOT_FOUND_SENTINEL, **kwargs) 

615 # Provider metadata has higher priority than cfg fallback — check it first. 

616 if value is VALUE_NOT_FOUND_SENTINEL and self._use_providers_configuration: 

617 value = self._provider_metadata_config_fallback_default_values.get( 

618 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, **kwargs 

619 ) 

620 if value is VALUE_NOT_FOUND_SENTINEL and self._use_providers_configuration: 

621 value = self._provider_cfg_config_fallback_default_values.get( 

622 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, **kwargs 

623 ) 

624 if value is VALUE_NOT_FOUND_SENTINEL: 

625 value = fallback 

626 if raw and isinstance(value, str): 

627 return value.replace("%", "%%") 

628 return value 

629 

630 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None: 

631 """ 

632 Get Secret Backend if defined in airflow.cfg. 

633 

634 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

635 """ 

636 section = "workers" if worker_mode else "secrets" 

637 key = "secrets_backend" if worker_mode else "backend" 

638 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs" 

639 

640 secrets_backend_cls = self.getimport(section=section, key=key) 

641 

642 if not secrets_backend_cls: 

643 if worker_mode: 

644 # if we find no secrets backend for worker, return that of secrets backend 

645 secrets_backend_cls = self.getimport(section="secrets", key="backend") 

646 if not secrets_backend_cls: 

647 return None 

648 # When falling back to secrets backend, use its kwargs 

649 kwargs_key = "backend_kwargs" 

650 section = "secrets" 

651 else: 

652 return None 

653 

654 try: 

655 backend_kwargs = self.getjson(section=section, key=kwargs_key) 

656 if not backend_kwargs: 

657 backend_kwargs = {} 

658 elif not isinstance(backend_kwargs, dict): 

659 raise ValueError("not a dict") 

660 except AirflowConfigException: 

661 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key) 

662 backend_kwargs = {} 

663 except ValueError: 

664 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key) 

665 backend_kwargs = {} 

666 

667 # Collect per-key overrides; they take precedence over the JSON blob. 

668 env_prefix = _build_kwarg_env_prefix(section, kwargs_key) 

669 backend_kwargs.update(_collect_kwarg_env_vars(env_prefix)) 

670 

671 return secrets_backend_cls(**backend_kwargs) 

672 

673 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

674 """ 

675 Get Config option values from Secret Backend. 

676 

677 Called by the shared parser's _get_secret_option() method as part of the lookup chain. 

678 Uses _get_custom_secret_backend() to get the backend instance. 

679 

680 :param config_key: the config key to retrieve 

681 :return: config value or None 

682 """ 

683 try: 

684 secrets_client = self._get_custom_secret_backend() 

685 if not secrets_client: 

686 return None 

687 return secrets_client.get_config(config_key) 

688 except Exception as e: 

689 raise AirflowConfigException( 

690 "Cannot retrieve config from alternative secrets backend. " 

691 "Make sure it is configured properly and that the Backend " 

692 "is accessible.\n" 

693 f"{e}" 

694 ) 

695 

696 def _get_cmd_option_from_config_sources( 

697 self, config_sources: ConfigSourcesType, section: str, key: str 

698 ) -> str | None: 

699 fallback_key = key + "_cmd" 

700 if (section, key) in self.sensitive_config_values: 

701 section_dict = config_sources.get(section) 

702 if section_dict is not None: 

703 command_value = section_dict.get(fallback_key) 

704 if command_value is not None: 

705 if isinstance(command_value, str): 

706 command = command_value 

707 else: 

708 command = command_value[0] 

709 return run_command(command) 

710 return None 

711 

712 def _get_secret_option_from_config_sources( 

713 self, config_sources: ConfigSourcesType, section: str, key: str 

714 ) -> str | None: 

715 fallback_key = key + "_secret" 

716 if (section, key) in self.sensitive_config_values: 

717 section_dict = config_sources.get(section) 

718 if section_dict is not None: 

719 secrets_path_value = section_dict.get(fallback_key) 

720 if secrets_path_value is not None: 

721 if isinstance(secrets_path_value, str): 

722 secrets_path = secrets_path_value 

723 else: 

724 secrets_path = secrets_path_value[0] 

725 return self._get_config_value_from_secret_backend(secrets_path) 

726 return None 

727 

728 def _include_secrets( 

729 self, 

730 config_sources: ConfigSourcesType, 

731 display_sensitive: bool, 

732 display_source: bool, 

733 raw: bool, 

734 ): 

735 for section, key in self.sensitive_config_values: 

736 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key) 

737 if value: 

738 if not display_sensitive: 

739 value = "< hidden >" 

740 if display_source: 

741 opt: str | tuple[str, str] = (value, "secret") 

742 elif raw: 

743 opt = value.replace("%", "%%") 

744 else: 

745 opt = value 

746 config_sources.setdefault(section, {}).update({key: opt}) 

747 del config_sources[section][key + "_secret"] 

748 

749 def _include_commands( 

750 self, 

751 config_sources: ConfigSourcesType, 

752 display_sensitive: bool, 

753 display_source: bool, 

754 raw: bool, 

755 ): 

756 for section, key in self.sensitive_config_values: 

757 opt = self._get_cmd_option_from_config_sources(config_sources, section, key) 

758 if not opt: 

759 continue 

760 opt_to_set: str | tuple[str, str] | None = opt 

761 if not display_sensitive: 

762 opt_to_set = "< hidden >" 

763 if display_source: 

764 opt_to_set = (str(opt_to_set), "cmd") 

765 elif raw: 

766 opt_to_set = str(opt_to_set).replace("%", "%%") 

767 if opt_to_set is not None: 

768 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set} 

769 config_sources.setdefault(section, {}).update(dict_to_update) 

770 del config_sources[section][key + "_cmd"] 

771 

772 def _include_envs( 

773 self, 

774 config_sources: ConfigSourcesType, 

775 display_sensitive: bool, 

776 display_source: bool, 

777 raw: bool, 

778 ): 

779 for env_var in [ 

780 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX) 

781 ]: 

782 try: 

783 _, section, key = env_var.split("__", 2) 

784 opt = self._get_env_var_option(section, key) 

785 except ValueError: 

786 continue 

787 if opt is None: 

788 log.warning("Ignoring unknown env var '%s'", env_var) 

789 continue 

790 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"): 

791 # Don't hide cmd/secret values here 

792 if not env_var.lower().endswith(("cmd", "secret")): 

793 if (section, key) in self.sensitive_config_values: 

794 opt = "< hidden >" 

795 elif raw: 

796 opt = opt.replace("%", "%%") 

797 if display_source: 

798 opt = (opt, "env var") 

799 

800 section = section.lower() 

801 key = key.lower() 

802 config_sources.setdefault(section, {}).update({key: opt}) 

803 

804 def _filter_by_source( 

805 self, 

806 config_sources: ConfigSourcesType, 

807 display_source: bool, 

808 getter_func, 

809 ): 

810 """ 

811 Delete default configs from current configuration. 

812 

813 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values. 

814 

815 This is necessary because bare configs take precedence over the command 

816 or secret key equivalents so if the current running config is 

817 materialized with Airflow defaults they in turn override user set 

818 command or secret key configs. 

819 

820 :param config_sources: The current configuration to operate on 

821 :param display_source: If False, configuration options contain raw 

822 values. If True, options are a tuple of (option_value, source). 

823 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'. 

824 :param getter_func: A callback function that gets the user configured 

825 override value for a particular sensitive_config_values config. 

826 :return: None, the given config_sources is filtered if necessary, 

827 otherwise untouched. 

828 """ 

829 for section, key in self.sensitive_config_values: 

830 # Don't bother if we don't have section / key 

831 if section not in config_sources or key not in config_sources[section]: 

832 continue 

833 # Check that there is something to override defaults 

834 try: 

835 getter_opt = getter_func(section, key) 

836 except ValueError: 

837 continue 

838 if not getter_opt: 

839 continue 

840 # Check to see that there is a default value 

841 if self.get_default_value(section, key) is None: 

842 continue 

843 # Check to see if bare setting is the same as defaults 

844 if display_source: 

845 # when display_source = true, we know that the config_sources contains tuple 

846 opt, source = config_sources[section][key] # type: ignore 

847 else: 

848 opt = config_sources[section][key] # type: ignore[assignment] 

849 if opt == self.get_default_value(section, key): 

850 del config_sources[section][key] 

851 

852 @staticmethod 

853 def _deprecated_value_is_set_in_config( 

854 deprecated_section: str, 

855 deprecated_key: str, 

856 configs: Iterable[tuple[str, ConfigParser]], 

857 ) -> bool: 

858 for config_type, config in configs: 

859 if config_type != "default": 

860 with contextlib.suppress(NoSectionError): 

861 deprecated_section_array = config.items(section=deprecated_section, raw=True) 

862 if any(key == deprecated_key for key, _ in deprecated_section_array): 

863 return True 

864 return False 

865 

866 @staticmethod 

867 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

868 return ( 

869 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}") 

870 is not None 

871 ) 

872 

873 @staticmethod 

874 def _deprecated_command_is_set_in_config( 

875 deprecated_section: str, 

876 deprecated_key: str, 

877 configs: Iterable[tuple[str, ConfigParser]], 

878 ) -> bool: 

879 return AirflowConfigParser._deprecated_value_is_set_in_config( 

880 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs 

881 ) 

882 

883 @staticmethod 

884 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

885 return ( 

886 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD") 

887 is not None 

888 ) 

889 

890 @staticmethod 

891 def _deprecated_secret_is_set_in_config( 

892 deprecated_section: str, 

893 deprecated_key: str, 

894 configs: Iterable[tuple[str, ConfigParser]], 

895 ) -> bool: 

896 return AirflowConfigParser._deprecated_value_is_set_in_config( 

897 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs 

898 ) 

899 

900 @staticmethod 

901 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

902 return ( 

903 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET") 

904 is not None 

905 ) 

906 

907 @staticmethod 

908 def _replace_config_with_display_sources( 

909 config_sources: ConfigSourcesType, 

910 configs: Iterable[tuple[str, ConfigParser]], 

911 configuration_description: dict[str, dict[str, Any]], 

912 display_source: bool, 

913 raw: bool, 

914 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

915 include_env: bool, 

916 include_cmds: bool, 

917 include_secret: bool, 

918 ): 

919 for source_name, config in configs: 

920 sections = config.sections() 

921 for section in sections: 

922 AirflowConfigParser._replace_section_config_with_display_sources( 

923 config, 

924 config_sources, 

925 configuration_description, 

926 display_source, 

927 raw, 

928 section, 

929 source_name, 

930 deprecated_options, 

931 configs, 

932 include_env=include_env, 

933 include_cmds=include_cmds, 

934 include_secret=include_secret, 

935 ) 

936 

937 @staticmethod 

938 def _replace_section_config_with_display_sources( 

939 config: ConfigParser, 

940 config_sources: ConfigSourcesType, 

941 configuration_description: dict[str, dict[str, Any]], 

942 display_source: bool, 

943 raw: bool, 

944 section: str, 

945 source_name: str, 

946 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

947 configs: Iterable[tuple[str, ConfigParser]], 

948 include_env: bool, 

949 include_cmds: bool, 

950 include_secret: bool, 

951 ): 

952 sect = config_sources.setdefault(section, {}) 

953 if isinstance(config, AirflowConfigParser): 

954 with config.suppress_future_warnings(): 

955 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw) 

956 else: 

957 items = config.items(section=section, raw=raw) 

958 for k, val in items: 

959 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None)) 

960 if deprecated_section and deprecated_key: 

961 if source_name == "default": 

962 # If deprecated entry has some non-default value set for any of the sources requested, 

963 # We should NOT set default for the new entry (because it will override anything 

964 # coming from the deprecated ones) 

965 if AirflowConfigParser._deprecated_value_is_set_in_config( 

966 deprecated_section, deprecated_key, configs 

967 ): 

968 continue 

969 if include_env and AirflowConfigParser._deprecated_variable_is_set( 

970 deprecated_section, deprecated_key 

971 ): 

972 continue 

973 if include_cmds and ( 

974 AirflowConfigParser._deprecated_variable_command_is_set( 

975 deprecated_section, deprecated_key 

976 ) 

977 or AirflowConfigParser._deprecated_command_is_set_in_config( 

978 deprecated_section, deprecated_key, configs 

979 ) 

980 ): 

981 continue 

982 if include_secret and ( 

983 AirflowConfigParser._deprecated_variable_secret_is_set( 

984 deprecated_section, deprecated_key 

985 ) 

986 or AirflowConfigParser._deprecated_secret_is_set_in_config( 

987 deprecated_section, deprecated_key, configs 

988 ) 

989 ): 

990 continue 

991 if display_source: 

992 updated_source_name = source_name 

993 if source_name == "default": 

994 # defaults can come from other sources (default-<PROVIDER>) that should be used here 

995 source_description_section = configuration_description.get(section, {}) 

996 source_description_key = source_description_section.get("options", {}).get(k, {}) 

997 if source_description_key is not None: 

998 updated_source_name = source_description_key.get("source", source_name) 

999 sect[k] = (val, updated_source_name) 

1000 else: 

1001 sect[k] = val 

1002 

1003 def _warn_deprecate( 

1004 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int 

1005 ): 

1006 """Warn about deprecated config option usage.""" 

1007 if section == deprecated_section: 

1008 warnings.warn( 

1009 f"The {deprecated_name} option in [{section}] has been renamed to {key} - " 

1010 f"the old setting has been used, but please update your config.", 

1011 DeprecationWarning, 

1012 stacklevel=4 + extra_stacklevel, 

1013 ) 

1014 else: 

1015 warnings.warn( 

1016 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option " 

1017 f"in [{section}] - the old setting has been used, but please update your config.", 

1018 DeprecationWarning, 

1019 stacklevel=4 + extra_stacklevel, 

1020 ) 

1021 

1022 @contextmanager 

1023 def suppress_future_warnings(self): 

1024 """ 

1025 Context manager to temporarily suppress future warnings. 

1026 

1027 This is a stub used by the shared parser's lookup methods when checking deprecated options. 

1028 Subclasses can override this to customize warning suppression behavior. 

1029 

1030 :return: context manager that suppresses future warnings 

1031 """ 

1032 suppress_future_warnings = self._suppress_future_warnings 

1033 self._suppress_future_warnings = True 

1034 yield self 

1035 self._suppress_future_warnings = suppress_future_warnings 

1036 

1037 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str: 

1038 """Generate environment variable name for a config option.""" 

1039 team_component: str = f"{team_name.upper()}___" if team_name else "" 

1040 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}" 

1041 

1042 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None): 

1043 """Get config option from environment variable.""" 

1044 env_var: str = self._env_var_name(section, key, team_name=team_name) 

1045 if env_var in os.environ: 

1046 return expand_env_var(os.environ[env_var]) 

1047 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command) 

1048 env_var_cmd = env_var + "_CMD" 

1049 if env_var_cmd in os.environ: 

1050 # if this is a valid command key... 

1051 if (section, key) in self.sensitive_config_values: 

1052 return run_command(os.environ[env_var_cmd]) 

1053 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend) 

1054 env_var_secret_path = env_var + "_SECRET" 

1055 if env_var_secret_path in os.environ: 

1056 # if this is a valid secret path... 

1057 if (section, key) in self.sensitive_config_values: 

1058 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path]) 

1059 return None 

1060 

1061 def _get_cmd_option(self, section: str, key: str): 

1062 """Get config option from command execution.""" 

1063 fallback_key = key + "_cmd" 

1064 if (section, key) in self.sensitive_config_values: 

1065 if super().has_option(section, fallback_key): 

1066 command = super().get(section, fallback_key) 

1067 try: 

1068 cmd_output = run_command(command) 

1069 except AirflowConfigException as e: 

1070 raise e 

1071 except Exception as e: 

1072 raise AirflowConfigException( 

1073 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd." 

1074 f" Please check the {fallback_key} value." 

1075 ) from e 

1076 return cmd_output 

1077 return None 

1078 

1079 def _get_secret_option(self, section: str, key: str) -> str | None: 

1080 """Get Config option values from Secret Backend.""" 

1081 fallback_key = key + "_secret" 

1082 if (section, key) in self.sensitive_config_values: 

1083 if super().has_option(section, fallback_key): 

1084 secrets_path = super().get(section, fallback_key) 

1085 return self._get_config_value_from_secret_backend(secrets_path) 

1086 return None 

1087 

1088 def _get_environment_variables( 

1089 self, 

1090 deprecated_key: str | None, 

1091 deprecated_section: str | None, 

1092 key: str, 

1093 section: str, 

1094 issue_warning: bool = True, 

1095 extra_stacklevel: int = 0, 

1096 **kwargs, 

1097 ) -> str | ValueNotFound: 

1098 """Get config option from environment variables.""" 

1099 team_name = kwargs.get("team_name", None) 

1100 option = self._get_env_var_option(section, key, team_name=team_name) 

1101 if option is not None: 

1102 return option 

1103 if deprecated_section and deprecated_key: 

1104 with self.suppress_future_warnings(): 

1105 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name) 

1106 if option is not None: 

1107 if issue_warning: 

1108 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

1109 return option 

1110 return VALUE_NOT_FOUND_SENTINEL 

1111 

1112 def _get_option_from_config_file( 

1113 self, 

1114 deprecated_key: str | None, 

1115 deprecated_section: str | None, 

1116 key: str, 

1117 section: str, 

1118 issue_warning: bool = True, 

1119 extra_stacklevel: int = 0, 

1120 **kwargs, 

1121 ) -> str | ValueNotFound: 

1122 """Get config option from config file.""" 

1123 if team_name := kwargs.get("team_name", None): 

1124 section = f"{team_name}={section}" 

1125 # since this is the last lookup that supports team_name, pop it 

1126 kwargs.pop("team_name") 

1127 if super().has_option(section, key): 

1128 return expand_env_var(super().get(section, key, **kwargs)) 

1129 if deprecated_section and deprecated_key: 

1130 if super().has_option(deprecated_section, deprecated_key): 

1131 if issue_warning: 

1132 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

1133 with self.suppress_future_warnings(): 

1134 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs)) 

1135 return VALUE_NOT_FOUND_SENTINEL 

1136 

1137 def _get_option_from_commands( 

1138 self, 

1139 deprecated_key: str | None, 

1140 deprecated_section: str | None, 

1141 key: str, 

1142 section: str, 

1143 issue_warning: bool = True, 

1144 extra_stacklevel: int = 0, 

1145 **kwargs, 

1146 ) -> str | ValueNotFound: 

1147 """Get config option from command execution.""" 

1148 if kwargs.get("team_name", None): 

1149 # Commands based team config fetching is not currently supported 

1150 return VALUE_NOT_FOUND_SENTINEL 

1151 option = self._get_cmd_option(section, key) 

1152 if option: 

1153 return option 

1154 if deprecated_section and deprecated_key: 

1155 with self.suppress_future_warnings(): 

1156 option = self._get_cmd_option(deprecated_section, deprecated_key) 

1157 if option: 

1158 if issue_warning: 

1159 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

1160 return option 

1161 return VALUE_NOT_FOUND_SENTINEL 

1162 

1163 def _get_option_from_secrets( 

1164 self, 

1165 deprecated_key: str | None, 

1166 deprecated_section: str | None, 

1167 key: str, 

1168 section: str, 

1169 issue_warning: bool = True, 

1170 extra_stacklevel: int = 0, 

1171 **kwargs, 

1172 ) -> str | ValueNotFound: 

1173 """Get config option from secrets backend.""" 

1174 if kwargs.get("team_name", None): 

1175 # Secrets based team config fetching is not currently supported 

1176 return VALUE_NOT_FOUND_SENTINEL 

1177 option = self._get_secret_option(section, key) 

1178 if option: 

1179 return option 

1180 if deprecated_section and deprecated_key: 

1181 with self.suppress_future_warnings(): 

1182 option = self._get_secret_option(deprecated_section, deprecated_key) 

1183 if option: 

1184 if issue_warning: 

1185 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

1186 return option 

1187 return VALUE_NOT_FOUND_SENTINEL 

1188 

1189 def _get_option_from_defaults( 

1190 self, 

1191 deprecated_key: str | None, 

1192 deprecated_section: str | None, 

1193 key: str, 

1194 section: str, 

1195 issue_warning: bool = True, 

1196 extra_stacklevel: int = 0, 

1197 team_name: str | None = None, 

1198 **kwargs, 

1199 ) -> str | ValueNotFound: 

1200 """Get config option from default values.""" 

1201 if self.get_default_value(section, key) is not None or "fallback" in kwargs: 

1202 return expand_env_var(self.get_default_value(section, key, **kwargs)) 

1203 return VALUE_NOT_FOUND_SENTINEL 

1204 

1205 def _resolve_deprecated_lookup( 

1206 self, 

1207 section: str, 

1208 key: str, 

1209 lookup_from_deprecated: bool, 

1210 extra_stacklevel: int = 0, 

1211 ) -> tuple[str, str, str | None, str | None, bool]: 

1212 """ 

1213 Resolve deprecated section/key mappings and determine deprecated values. 

1214 

1215 :param section: Section name (will be lowercased) 

1216 :param key: Key name (will be lowercased) 

1217 :param lookup_from_deprecated: Whether to lookup from deprecated options 

1218 :param extra_stacklevel: Extra stack level for warnings 

1219 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted) 

1220 """ 

1221 section = section.lower() 

1222 key = key.lower() 

1223 warning_emitted = False 

1224 deprecated_section: str | None = None 

1225 deprecated_key: str | None = None 

1226 

1227 if not lookup_from_deprecated: 

1228 return section, key, deprecated_section, deprecated_key, warning_emitted 

1229 

1230 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {}) 

1231 if option_description.get("deprecated"): 

1232 deprecation_reason = option_description.get("deprecation_reason", "") 

1233 warnings.warn( 

1234 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}", 

1235 DeprecationWarning, 

1236 stacklevel=2 + extra_stacklevel, 

1237 ) 

1238 # For the cases in which we rename whole sections 

1239 if section in self.inversed_deprecated_sections: 

1240 deprecated_section, deprecated_key = (section, key) 

1241 section = self.inversed_deprecated_sections[section] 

1242 if not self._suppress_future_warnings: 

1243 warnings.warn( 

1244 f"The config section [{deprecated_section}] has been renamed to " 

1245 f"[{section}]. Please update your `conf.get*` call to use the new name", 

1246 FutureWarning, 

1247 stacklevel=2 + extra_stacklevel, 

1248 ) 

1249 # Don't warn about individual rename if the whole section is renamed 

1250 warning_emitted = True 

1251 elif (section, key) in self.inversed_deprecated_options: 

1252 # Handle using deprecated section/key instead of the new section/key 

1253 new_section, new_key = self.inversed_deprecated_options[(section, key)] 

1254 if not self._suppress_future_warnings and not warning_emitted: 

1255 warnings.warn( 

1256 f"section/key [{section}/{key}] has been deprecated, you should use" 

1257 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the " 

1258 "new name", 

1259 FutureWarning, 

1260 stacklevel=2 + extra_stacklevel, 

1261 ) 

1262 warning_emitted = True 

1263 deprecated_section, deprecated_key = section, key 

1264 section, key = (new_section, new_key) 

1265 elif section in self.deprecated_sections: 

1266 # When accessing the new section name, make sure we check under the old config name 

1267 deprecated_key = key 

1268 deprecated_section = self.deprecated_sections[section][0] 

1269 else: 

1270 deprecated_section, deprecated_key, _ = self.deprecated_options.get( 

1271 (section, key), (None, None, None) 

1272 ) 

1273 

1274 return section, key, deprecated_section, deprecated_key, warning_emitted 

1275 

1276 def load_providers_configuration(self) -> None: 

1277 """ 

1278 Load configuration for providers. 

1279 

1280 .. deprecated:: 3.2.0 

1281 Provider configuration is now loaded lazily via the ``configuration_description`` 

1282 cached property. This method is kept for backwards compatibility and will be 

1283 removed in a future version. 

1284 """ 

1285 warnings.warn( 

1286 "load_providers_configuration() is deprecated. " 

1287 "Provider configuration is now loaded lazily via the " 

1288 "`configuration_description` cached property.", 

1289 DeprecationWarning, 

1290 stacklevel=2, 

1291 ) 

1292 self._use_providers_configuration = True 

1293 self._invalidate_provider_flag_caches() 

1294 

1295 def restore_core_default_configuration(self) -> None: 

1296 """ 

1297 Restore the parser state before provider-contributed sections were loaded. 

1298 

1299 .. deprecated:: 3.2.0 

1300 Use ``make_sure_configuration_loaded(with_providers=False)`` context manager 

1301 instead. This method is kept for backwards compatibility and will be removed 

1302 in a future version. 

1303 """ 

1304 warnings.warn( 

1305 "restore_core_default_configuration() is deprecated. " 

1306 "Use `make_sure_configuration_loaded(with_providers=False)` instead.", 

1307 DeprecationWarning, 

1308 stacklevel=2, 

1309 ) 

1310 self._use_providers_configuration = False 

1311 self._invalidate_provider_flag_caches() 

1312 

1313 @overload # type: ignore[override] 

1314 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... 

1315 

1316 @overload # type: ignore[override] 

1317 def get(self, section: str, key: str, **kwargs) -> str | None: ... 

1318 

1319 def get( # type: ignore[misc, override] 

1320 self, 

1321 section: str, 

1322 key: str, 

1323 suppress_warnings: bool = False, 

1324 lookup_from_deprecated: bool = True, 

1325 _extra_stacklevel: int = 0, 

1326 team_name: str | None = None, 

1327 **kwargs, 

1328 ) -> str | None: 

1329 """ 

1330 Get config value by iterating through lookup sequence. 

1331 

1332 Priority order is defined by _lookup_sequence property. 

1333 """ 

1334 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup( 

1335 section=section, 

1336 key=key, 

1337 lookup_from_deprecated=lookup_from_deprecated, 

1338 extra_stacklevel=_extra_stacklevel, 

1339 ) 

1340 

1341 if team_name is not None: 

1342 kwargs["team_name"] = team_name 

1343 

1344 for lookup_method in self._lookup_sequence: 

1345 value = lookup_method( 

1346 deprecated_key=deprecated_key, 

1347 deprecated_section=deprecated_section, 

1348 key=key, 

1349 section=section, 

1350 issue_warning=not warning_emitted, 

1351 extra_stacklevel=_extra_stacklevel, 

1352 **kwargs, 

1353 ) 

1354 if value is not VALUE_NOT_FOUND_SENTINEL: 

1355 return value 

1356 

1357 # Check if fallback was explicitly provided (even if None) 

1358 if "fallback" in kwargs: 

1359 return kwargs["fallback"] 

1360 

1361 if not suppress_warnings: 

1362 log.warning("section/key [%s/%s] not found in config", section, key) 

1363 

1364 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config") 

1365 

1366 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override] 

1367 """Get config value as boolean.""" 

1368 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip() 

1369 if "#" in val: 

1370 val = val.split("#")[0].strip() 

1371 if val in ("t", "true", "1"): 

1372 return True 

1373 if val in ("f", "false", "0"): 

1374 return False 

1375 raise AirflowConfigException( 

1376 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. ' 

1377 f'Current value: "{val}".' 

1378 ) 

1379 

1380 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override] 

1381 """Get config value as integer.""" 

1382 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1383 if val is None: 

1384 raise AirflowConfigException( 

1385 f"Failed to convert value None to int. " 

1386 f'Please check "{key}" key in "{section}" section is set.' 

1387 ) 

1388 try: 

1389 return int(val) 

1390 except ValueError: 

1391 try: 

1392 if (float_val := float(val)) != (int_val := int(float_val)): 

1393 raise ValueError 

1394 return int_val 

1395 except (ValueError, OverflowError): 

1396 raise AirflowConfigException( 

1397 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1398 f'Current value: "{val}".' 

1399 ) 

1400 

1401 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override] 

1402 """Get config value as float.""" 

1403 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1404 if val is None: 

1405 raise AirflowConfigException( 

1406 f"Failed to convert value None to float. " 

1407 f'Please check "{key}" key in "{section}" section is set.' 

1408 ) 

1409 try: 

1410 return float(val) 

1411 except ValueError: 

1412 raise AirflowConfigException( 

1413 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. ' 

1414 f'Current value: "{val}".' 

1415 ) 

1416 

1417 def getlist(self, section: str, key: str, delimiter=",", **kwargs): 

1418 """Get config value as list.""" 

1419 val = self.get(section, key, **kwargs) 

1420 

1421 if isinstance(val, list) or val is None: 

1422 # `get` will always return a (possibly-empty) string, so the only way we can 

1423 # have these types is with `fallback=` was specified. So just return it. 

1424 return val 

1425 

1426 if val == "": 

1427 return [] 

1428 

1429 try: 

1430 return [item.strip() for item in val.split(delimiter)] 

1431 except Exception: 

1432 raise AirflowConfigException( 

1433 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. ' 

1434 f'Current value: "{val}".' 

1435 ) 

1436 

1437 E = TypeVar("E", bound=Enum) 

1438 

1439 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E: 

1440 """Get config value as enum.""" 

1441 val = self.get(section, key, **kwargs) 

1442 enum_names = [enum_item.name for enum_item in enum_class] 

1443 

1444 if val is None: 

1445 raise AirflowConfigException( 

1446 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1447 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}' 

1448 ) 

1449 

1450 try: 

1451 return enum_class[val] 

1452 except KeyError: 

1453 if "fallback" in kwargs and kwargs["fallback"] in enum_names: 

1454 return enum_class[kwargs["fallback"]] 

1455 raise AirflowConfigException( 

1456 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1457 f"the value must be one of {', '.join(enum_names)}" 

1458 ) 

1459 

1460 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]: 

1461 """Get config value as list of enums.""" 

1462 kwargs.setdefault("fallback", []) 

1463 string_list = self.getlist(section, key, delimiter, **kwargs) 

1464 

1465 enum_names = [enum_item.name for enum_item in enum_class] 

1466 enum_list = [] 

1467 

1468 for val in string_list: 

1469 try: 

1470 enum_list.append(enum_class[val]) 

1471 except KeyError: 

1472 log.warning( 

1473 "Failed to convert value %r. Please check %s key in %s section. " 

1474 "it must be one of %s, if not the value is ignored", 

1475 val, 

1476 key, 

1477 section, 

1478 ", ".join(enum_names), 

1479 ) 

1480 

1481 return enum_list 

1482 

1483 def getimport(self, section: str, key: str, **kwargs) -> Any: 

1484 """ 

1485 Read options, import the full qualified name, and return the object. 

1486 

1487 In case of failure, it throws an exception with the key and section names 

1488 

1489 :return: The object or None, if the option is empty 

1490 """ 

1491 # Fixed: use self.get() instead of conf.get() 

1492 full_qualified_path = self.get(section=section, key=key, **kwargs) 

1493 if not full_qualified_path: 

1494 return None 

1495 

1496 try: 

1497 # Import here to avoid circular dependency 

1498 from ..module_loading import import_string 

1499 

1500 return import_string(full_qualified_path) 

1501 except ImportError as e: 

1502 log.warning(e) 

1503 raise AirflowConfigException( 

1504 f'The object could not be loaded. Please check "{key}" key in "{section}" section. ' 

1505 f'Current value: "{full_qualified_path}".' 

1506 ) 

1507 

1508 def getjson( 

1509 self, section: str, key: str, fallback=None, **kwargs 

1510 ) -> dict | list | str | int | float | None: 

1511 """ 

1512 Return a config value parsed from a JSON string. 

1513 

1514 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given. 

1515 """ 

1516 try: 

1517 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs) 

1518 except (NoSectionError, NoOptionError): 

1519 data = None 

1520 

1521 if data is None or data == "": 

1522 return fallback 

1523 

1524 try: 

1525 return json.loads(data) 

1526 except JSONDecodeError as e: 

1527 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e 

1528 

1529 def gettimedelta( 

1530 self, section: str, key: str, fallback: Any = None, **kwargs 

1531 ) -> datetime.timedelta | None: 

1532 """ 

1533 Get the config value for the given section and key, and convert it into datetime.timedelta object. 

1534 

1535 If the key is missing, then it is considered as `None`. 

1536 

1537 :param section: the section from the config 

1538 :param key: the key defined in the given section 

1539 :param fallback: fallback value when no config value is given, defaults to None 

1540 :raises AirflowConfigException: raised because ValueError or OverflowError 

1541 :return: datetime.timedelta(seconds=<config_value>) or None 

1542 """ 

1543 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs) 

1544 

1545 if val: 

1546 # the given value must be convertible to integer 

1547 try: 

1548 int_val = int(val) 

1549 except ValueError: 

1550 raise AirflowConfigException( 

1551 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1552 f'Current value: "{val}".' 

1553 ) 

1554 

1555 try: 

1556 return datetime.timedelta(seconds=int_val) 

1557 except OverflowError as err: 

1558 raise AirflowConfigException( 

1559 f"Failed to convert value to timedelta in `seconds`. " 

1560 f"{err}. " 

1561 f'Please check "{key}" key in "{section}" section. Current value: "{val}".' 

1562 ) 

1563 

1564 return fallback 

1565 

1566 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: 

1567 """Get mandatory config value, raising ValueError if not found.""" 

1568 value = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1569 if value is None: 

1570 raise ValueError(f"The value {section}/{key} should be set!") 

1571 return value 

1572 

1573 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]: 

1574 """Get mandatory config value as list, raising ValueError if not found.""" 

1575 value = self.getlist(section, key, **kwargs) 

1576 if value is None: 

1577 raise ValueError(f"The value {section}/{key} should be set!") 

1578 return value 

1579 

1580 def read( # type: ignore[override] 

1581 self, 

1582 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike], 

1583 encoding: str | None = None, 

1584 ) -> list[str]: 

1585 return super().read(filenames=filenames, encoding=encoding) # type: ignore[arg-type,return-value] 

1586 

1587 def read_dict( # type: ignore[override] 

1588 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>" 

1589 ) -> None: 

1590 """ 

1591 We define a different signature here to add better type hints and checking. 

1592 

1593 :param dictionary: dictionary to read from 

1594 :param source: source to be used to store the configuration 

1595 :return: 

1596 """ 

1597 super().read_dict(dictionary=dictionary, source=source) 

1598 

1599 def _has_section_in_any_defaults(self, section: str) -> bool: 

1600 """Check if section exists in core defaults or provider fallback defaults.""" 

1601 if self._default_values.has_section(section): 

1602 return True 

1603 if self._use_providers_configuration: 

1604 if self._provider_cfg_config_fallback_default_values.has_section(section): 

1605 return True 

1606 if self._provider_metadata_config_fallback_default_values.has_section(section): 

1607 return True 

1608 return False 

1609 

1610 def get_sections_including_defaults(self) -> list[str]: 

1611 """ 

1612 Retrieve all sections from the configuration parser, including sections defined by built-in defaults. 

1613 

1614 :return: list of section names 

1615 """ 

1616 sections_from_config = self.sections() 

1617 sections_from_description = list(self.configuration_description.keys()) 

1618 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config))) 

1619 

1620 def get_options_including_defaults(self, section: str) -> list[str]: 

1621 """ 

1622 Retrieve all possible options from the configuration parser for the section given. 

1623 

1624 Includes options defined by built-in defaults. 

1625 

1626 :param section: section name 

1627 :return: list of option names for the section given 

1628 """ 

1629 my_own_options = self.options(section) if self.has_section(section) else [] 

1630 all_options_from_defaults = list( 

1631 self.configuration_description.get(section, {}).get("options", {}).keys() 

1632 ) 

1633 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options))) 

1634 

1635 def has_option( # type: ignore[override] 

1636 self, section: str, option: str, lookup_from_deprecated: bool = True, **kwargs 

1637 ) -> bool: 

1638 """ 

1639 Check if option is defined. 

1640 

1641 Uses self.get() to avoid reimplementing the priority order of config variables 

1642 (env, config, cmd, defaults). 

1643 

1644 :param section: section to get option from 

1645 :param option: option to get 

1646 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections 

1647 :param kwargs: additional keyword arguments to pass to get(), such as team_name 

1648 :return: 

1649 """ 

1650 try: 

1651 value = self.get( 

1652 section, 

1653 option, 

1654 fallback=VALUE_NOT_FOUND_SENTINEL, 

1655 _extra_stacklevel=1, 

1656 suppress_warnings=True, 

1657 lookup_from_deprecated=lookup_from_deprecated, 

1658 **kwargs, 

1659 ) 

1660 if value is VALUE_NOT_FOUND_SENTINEL: 

1661 return False 

1662 return True 

1663 except (NoOptionError, NoSectionError, AirflowConfigException): 

1664 return False 

1665 

1666 def set(self, section: str, option: str, value: str | None = None) -> None: # type: ignore[override] 

1667 """ 

1668 Set an option to the given value. 

1669 

1670 This override just makes sure the section and option are lower case, to match what we do in `get`. 

1671 """ 

1672 section = section.lower() 

1673 option = option.lower() 

1674 defaults = self.configuration_description or {} 

1675 if not self.has_section(section) and section in defaults: 

1676 # Trying to set a key in a section that exists in default, but not in the user config; 

1677 # automatically create it 

1678 self.add_section(section) 

1679 super().set(section, option, value) 

1680 

1681 def remove_option(self, section: str, option: str, remove_default: bool = True): # type: ignore[override] 

1682 """ 

1683 Remove an option if it exists in config from a file or default config. 

1684 

1685 If both of config have the same option, this removes the option 

1686 in both configs unless remove_default=False. 

1687 """ 

1688 section = section.lower() 

1689 option = option.lower() 

1690 if super().has_option(section, option): 

1691 super().remove_option(section, option) 

1692 

1693 if remove_default and self._default_values.has_option(section, option): 

1694 self._default_values.remove_option(section, option) 

1695 

1696 def optionxform(self, optionstr: str) -> str: 

1697 """ 

1698 Transform option names on every read, get, or set operation. 

1699 

1700 This changes from the default behaviour of ConfigParser from lower-casing 

1701 to instead be case-preserving. 

1702 

1703 :param optionstr: 

1704 :return: 

1705 """ 

1706 return optionstr 

1707 

1708 def as_dict( 

1709 self, 

1710 display_source: bool = False, 

1711 display_sensitive: bool = False, 

1712 raw: bool = False, 

1713 include_env: bool = True, 

1714 include_cmds: bool = True, 

1715 include_secret: bool = True, 

1716 ) -> ConfigSourcesType: 

1717 """ 

1718 Return the current configuration as an OrderedDict of OrderedDicts. 

1719 

1720 When materializing current configuration Airflow defaults are 

1721 materialized along with user set configs. If any of the `include_*` 

1722 options are False then the result of calling command or secret key 

1723 configs do not override Airflow defaults and instead are passed through. 

1724 In order to then avoid Airflow defaults from overwriting user set 

1725 command or secret key configs we filter out bare sensitive_config_values 

1726 that are set to Airflow defaults when command or secret key configs 

1727 produce different values. 

1728 

1729 :param display_source: If False, the option value is returned. If True, 

1730 a tuple of (option_value, source) is returned. Source is either 

1731 'airflow.cfg', 'default', 'env var', or 'cmd'. 

1732 :param display_sensitive: If True, the values of options set by env 

1733 vars and bash commands will be displayed. If False, those options 

1734 are shown as '< hidden >' 

1735 :param raw: Should the values be output as interpolated values, or the 

1736 "raw" form that can be fed back in to ConfigParser 

1737 :param include_env: Should the value of configuration from AIRFLOW__ 

1738 environment variables be included or not 

1739 :param include_cmds: Should the result of calling any ``*_cmd`` config be 

1740 set (True, default), or should the _cmd options be left as the 

1741 command to run (False) 

1742 :param include_secret: Should the result of calling any ``*_secret`` config be 

1743 set (True, default), or should the _secret options be left as the 

1744 path to get the secret from (False) 

1745 :return: Dictionary, where the key is the name of the section and the content is 

1746 the dictionary with the name of the parameter and its value. 

1747 """ 

1748 if not display_sensitive: 

1749 # We want to hide the sensitive values at the appropriate methods 

1750 # since envs from cmds, secrets can be read at _include_envs method 

1751 if not all([include_env, include_cmds, include_secret]): 

1752 raise ValueError( 

1753 "If display_sensitive is false, then include_env, " 

1754 "include_cmds, include_secret must all be set as True" 

1755 ) 

1756 

1757 config_sources: ConfigSourcesType = {} 

1758 

1759 # We check sequentially all those sources and the last one we saw it in will "win" 

1760 configs = self._config_sources_for_as_dict 

1761 

1762 self._replace_config_with_display_sources( 

1763 config_sources, 

1764 configs, 

1765 self.configuration_description, 

1766 display_source, 

1767 raw, 

1768 self.deprecated_options, 

1769 include_cmds=include_cmds, 

1770 include_env=include_env, 

1771 include_secret=include_secret, 

1772 ) 

1773 

1774 # add env vars and overwrite because they have priority 

1775 if include_env: 

1776 self._include_envs(config_sources, display_sensitive, display_source, raw) 

1777 else: 

1778 self._filter_by_source(config_sources, display_source, self._get_env_var_option) 

1779 

1780 # add bash commands 

1781 if include_cmds: 

1782 self._include_commands(config_sources, display_sensitive, display_source, raw) 

1783 else: 

1784 self._filter_by_source(config_sources, display_source, self._get_cmd_option) 

1785 

1786 # add config from secret backends 

1787 if include_secret: 

1788 self._include_secrets(config_sources, display_sensitive, display_source, raw) 

1789 else: 

1790 self._filter_by_source(config_sources, display_source, self._get_secret_option) 

1791 

1792 if not display_sensitive: 

1793 # This ensures the ones from config file is hidden too 

1794 # if they are not provided through env, cmd and secret 

1795 hidden = "< hidden >" 

1796 for section, key in self.sensitive_config_values: 

1797 if config_sources.get(section): 

1798 if config_sources[section].get(key, None): 

1799 if display_source: 

1800 source = config_sources[section][key][1] 

1801 config_sources[section][key] = (hidden, source) 

1802 else: 

1803 config_sources[section][key] = hidden 

1804 

1805 return config_sources 

1806 

1807 def _write_option_header( 

1808 self, 

1809 file: IO[str], 

1810 option: str, 

1811 extra_spacing: bool, 

1812 include_descriptions: bool, 

1813 include_env_vars: bool, 

1814 include_examples: bool, 

1815 include_sources: bool, 

1816 section_config_description: dict[str, dict[str, Any]], 

1817 section_to_write: str, 

1818 sources_dict: ConfigSourcesType, 

1819 ) -> tuple[bool, bool]: 

1820 """ 

1821 Write header for configuration option. 

1822 

1823 Returns tuple of (should_continue, needs_separation) where needs_separation should be 

1824 set if the option needs additional separation to visually separate it from the next option. 

1825 """ 

1826 option_config_description = ( 

1827 section_config_description.get("options", {}).get(option, {}) 

1828 if section_config_description 

1829 else {} 

1830 ) 

1831 description = option_config_description.get("description") 

1832 needs_separation = False 

1833 if description and include_descriptions: 

1834 for line in description.splitlines(): 

1835 file.write(f"# {line}\n") 

1836 needs_separation = True 

1837 example = option_config_description.get("example") 

1838 if example is not None and include_examples: 

1839 if extra_spacing: 

1840 file.write("#\n") 

1841 example_lines = example.splitlines() 

1842 example = "\n# ".join(example_lines) 

1843 file.write(f"# Example: {option} = {example}\n") 

1844 needs_separation = True 

1845 if include_sources and sources_dict: 

1846 sources_section = sources_dict.get(section_to_write) 

1847 value_with_source = sources_section.get(option) if sources_section else None 

1848 if value_with_source is None: 

1849 file.write("#\n# Source: not defined\n") 

1850 else: 

1851 file.write(f"#\n# Source: {value_with_source[1]}\n") 

1852 needs_separation = True 

1853 if include_env_vars: 

1854 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n") 

1855 if extra_spacing: 

1856 file.write("#\n") 

1857 needs_separation = True 

1858 return True, needs_separation 

1859 

1860 def is_template(self, section: str, key) -> bool: 

1861 """ 

1862 Return whether the value is templated. 

1863 

1864 :param section: section of the config 

1865 :param key: key in the section 

1866 :return: True if the value is templated 

1867 """ 

1868 return _is_template(self.configuration_description, section, key) 

1869 

1870 def getsection(self, section: str, team_name: str | None = None) -> ConfigOptionsDictType | None: 

1871 """ 

1872 Return the section as a dict. 

1873 

1874 Values are converted to int, float, bool as required. 

1875 

1876 :param section: section from the config 

1877 :param team_name: optional team name for team-specific configuration lookup 

1878 """ 

1879 # Handle team-specific section lookup for config file 

1880 config_section = f"{team_name}={section}" if team_name else section 

1881 

1882 if not self.has_section(config_section) and not self._default_values.has_section(config_section): 

1883 return None 

1884 if self._default_values.has_section(config_section): 

1885 _section: ConfigOptionsDictType = dict(self._default_values.items(config_section)) 

1886 else: 

1887 _section = {} 

1888 

1889 if self.has_section(config_section): 

1890 _section.update(self.items(config_section)) 

1891 

1892 # Use section (not config_section) for env var lookup - team_name is handled by _env_var_name 

1893 section_prefix = self._env_var_name(section, "", team_name=team_name) 

1894 for env_var in sorted(os.environ.keys()): 

1895 if env_var.startswith(section_prefix): 

1896 key = env_var.replace(section_prefix, "") 

1897 if key.endswith("_CMD"): 

1898 key = key[:-4] 

1899 key = key.lower() 

1900 _section[key] = self._get_env_var_option(section, key, team_name=team_name) 

1901 

1902 for key, val in _section.items(): 

1903 if val is None: 

1904 raise AirflowConfigException( 

1905 f"Failed to convert value automatically. " 

1906 f'Please check "{key}" key in "{section}" section is set.' 

1907 ) 

1908 try: 

1909 _section[key] = int(val) 

1910 except ValueError: 

1911 try: 

1912 _section[key] = float(val) 

1913 except ValueError: 

1914 if isinstance(val, str) and val.lower() in ("t", "true"): 

1915 _section[key] = True 

1916 elif isinstance(val, str) and val.lower() in ("f", "false"): 

1917 _section[key] = False 

1918 return _section 

1919 

1920 @staticmethod 

1921 def _write_section_header( 

1922 file: IO[str], 

1923 include_descriptions: bool, 

1924 section_config_description: dict[str, str], 

1925 section_to_write: str, 

1926 ) -> None: 

1927 """Write header for configuration section.""" 

1928 file.write(f"[{section_to_write}]\n") 

1929 section_description = section_config_description.get("description") 

1930 if section_description and include_descriptions: 

1931 for line in section_description.splitlines(): 

1932 file.write(f"# {line}\n") 

1933 file.write("\n") 

1934 

1935 def _write_value( 

1936 self, 

1937 file: IO[str], 

1938 option: str, 

1939 comment_out_everything: bool, 

1940 needs_separation: bool, 

1941 only_defaults: bool, 

1942 section_to_write: str, 

1943 hide_sensitive: bool, 

1944 is_sensitive: bool, 

1945 show_values: bool = False, 

1946 ): 

1947 default_value = self.get_default_value(section_to_write, option, raw=True) 

1948 if only_defaults: 

1949 value = default_value 

1950 else: 

1951 value = self.get(section_to_write, option, fallback=default_value, raw=True) 

1952 if not show_values: 

1953 file.write(f"# {option} = \n") 

1954 else: 

1955 if hide_sensitive and is_sensitive: 

1956 value = "< hidden >" 

1957 else: 

1958 pass 

1959 if value is None: 

1960 file.write(f"# {option} = \n") 

1961 else: 

1962 if comment_out_everything: 

1963 value_lines = value.splitlines() 

1964 value = "\n# ".join(value_lines) 

1965 file.write(f"# {option} = {value}\n") 

1966 else: 

1967 if "\n" in value: 

1968 try: 

1969 value = json.dumps(json.loads(value), indent=4) 

1970 value = value.replace( 

1971 "\n", "\n " 

1972 ) # indent multi-line JSON to satisfy configparser format 

1973 except JSONDecodeError: 

1974 pass 

1975 file.write(f"{option} = {value}\n") 

1976 if needs_separation: 

1977 file.write("\n") 

1978 

1979 def write( # type: ignore[override] 

1980 self, 

1981 file: IO[str], 

1982 section: str | None = None, 

1983 include_examples: bool = True, 

1984 include_descriptions: bool = True, 

1985 include_sources: bool = True, 

1986 include_env_vars: bool = True, 

1987 include_providers: bool = True, 

1988 comment_out_everything: bool = False, 

1989 hide_sensitive: bool = False, 

1990 extra_spacing: bool = True, 

1991 only_defaults: bool = False, 

1992 show_values: bool = False, 

1993 **kwargs: Any, 

1994 ) -> None: 

1995 """ 

1996 Write configuration with comments and examples to a file. 

1997 

1998 :param file: file to write to 

1999 :param section: section of the config to write, defaults to all sections 

2000 :param include_examples: Include examples in the output 

2001 :param include_descriptions: Include descriptions in the output 

2002 :param include_sources: Include the source of each config option 

2003 :param include_env_vars: Include environment variables corresponding to each config option 

2004 :param include_providers: Include providers configuration 

2005 :param comment_out_everything: Comment out all values 

2006 :param hide_sensitive_values: Include sensitive values in the output 

2007 :param extra_spacing: Add extra spacing before examples and after variables 

2008 :param only_defaults: Only include default values when writing the config, not the actual values 

2009 """ 

2010 with self.make_sure_configuration_loaded(with_providers=include_providers): 

2011 sources_dict = {} 

2012 if include_sources: 

2013 sources_dict = self.as_dict(display_source=True) 

2014 for section_to_write in self.get_sections_including_defaults(): 

2015 section_config_description = self.configuration_description.get(section_to_write, {}) 

2016 if section_to_write != section and section is not None: 

2017 continue 

2018 if self._has_section_in_any_defaults(section_to_write) or self.has_section(section_to_write): 

2019 self._write_section_header( 

2020 file, include_descriptions, section_config_description, section_to_write 

2021 ) 

2022 for option in self.get_options_including_defaults(section_to_write): 

2023 should_continue, needs_separation = self._write_option_header( 

2024 file=file, 

2025 option=option, 

2026 extra_spacing=extra_spacing, 

2027 include_descriptions=include_descriptions, 

2028 include_env_vars=include_env_vars, 

2029 include_examples=include_examples, 

2030 include_sources=include_sources, 

2031 section_config_description=section_config_description, 

2032 section_to_write=section_to_write, 

2033 sources_dict=sources_dict, 

2034 ) 

2035 is_sensitive = ( 

2036 section_to_write.lower(), 

2037 option.lower(), 

2038 ) in self.sensitive_config_values 

2039 self._write_value( 

2040 file=file, 

2041 option=option, 

2042 comment_out_everything=comment_out_everything, 

2043 needs_separation=needs_separation, 

2044 only_defaults=only_defaults, 

2045 section_to_write=section_to_write, 

2046 hide_sensitive=hide_sensitive, 

2047 is_sensitive=is_sensitive, 

2048 show_values=show_values, 

2049 ) 

2050 if include_descriptions and not needs_separation: 

2051 # extra separation between sections in case last option did not need it 

2052 file.write("\n") 

2053 

2054 @contextmanager 

2055 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]: 

2056 """ 

2057 Make sure configuration is loaded with or without providers. 

2058 

2059 The context manager will only toggle the `self._use_providers_configuration` flag if `with_providers` is False, and will reset `self._use_providers_configuration` to True after the context block. 

2060 Nop for `with_providers=True` as the configuration already loads providers configuration by default. 

2061 

2062 :param with_providers: whether providers should be loaded 

2063 """ 

2064 if not with_providers: 

2065 self._use_providers_configuration = False 

2066 # Only invalidate cached properties that depend on _use_providers_configuration. 

2067 # Do NOT use invalidate_cache() here — it would also evict expensive provider-discovery 

2068 # caches (_provider_metadata_configuration_description, _provider_metadata_config_fallback_default_values) 

2069 # that don't depend on this flag. 

2070 self._invalidate_provider_flag_caches() 

2071 try: 

2072 yield 

2073 finally: 

2074 if not with_providers: 

2075 self._use_providers_configuration = True 

2076 self._invalidate_provider_flag_caches()