Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/_shared/configuration/parser.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

872 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Base configuration parser with pure parsing logic.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import datetime 

24import functools 

25import itertools 

26import json 

27import logging 

28import os 

29import shlex 

30import subprocess 

31import sys 

32import warnings 

33from collections.abc import Callable, Generator, Iterable 

34from configparser import ConfigParser, NoOptionError, NoSectionError 

35from contextlib import contextmanager 

36from copy import deepcopy 

37from enum import Enum 

38from json.decoder import JSONDecodeError 

39from re import Pattern 

40from typing import IO, TYPE_CHECKING, Any, TypeVar, overload 

41 

42from .exceptions import AirflowConfigException 

43 

44log = logging.getLogger(__name__) 

45 

46 

47def _build_kwarg_env_prefix(section: str, kwargs_key: str) -> str: 

48 """ 

49 Build env prefix for per-key backend kwargs. 

50 

51 ("secrets", "backend_kwargs") -> "AIRFLOW__SECRETS__BACKEND_KWARG__" 

52 ("workers", "secrets_backend_kwargs") -> "AIRFLOW__WORKERS__SECRETS_BACKEND_KWARG__" 

53 """ 

54 singular_key = kwargs_key.replace("_kwargs", "_kwarg") 

55 return f"{ENV_VAR_PREFIX}{section.upper()}__{singular_key.upper()}__" 

56 

57 

58def _collect_kwarg_env_vars(prefix: str) -> dict[str, str]: 

59 """ 

60 Scan os.environ for per-key secrets backend kwargs. 

61 

62 AIRFLOW__SECRETS__BACKEND_KWARG__ROLE_ID -> {"role_id": value} 

63 Values are raw strings (not JSON-parsed). 

64 Empty keys (trailing __ with no suffix) are ignored. 

65 """ 

66 overrides: dict[str, str] = {} 

67 for env_var, value in os.environ.items(): 

68 if env_var.startswith(prefix): 

69 kwarg_key = env_var[len(prefix) :].lower() 

70 if kwarg_key: 

71 overrides[kwarg_key] = value 

72 return overrides 

73 

74 

75ConfigType = str | int | float | bool 

76ConfigOptionsDictType = dict[str, ConfigType] 

77ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

78ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

79ENV_VAR_PREFIX = "AIRFLOW__" 

80 

81 

82if TYPE_CHECKING: 

83 from airflow.providers_manager import ProvidersManager 

84 from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime 

85 

86 

87class ValueNotFound: 

88 """Object of this is raised when a configuration value cannot be found.""" 

89 

90 pass 

91 

92 

93VALUE_NOT_FOUND_SENTINEL = ValueNotFound() 

94 

95 

96@overload 

97def expand_env_var(env_var: None) -> None: ... 

98@overload 

99def expand_env_var(env_var: str) -> str: ... 

100 

101 

102def expand_env_var(env_var: str | None) -> str | None: 

103 """ 

104 Expand (potentially nested) env vars. 

105 

106 Repeat and apply `expandvars` and `expanduser` until 

107 interpolation stops having any effect. 

108 """ 

109 if not env_var or not isinstance(env_var, str): 

110 return env_var 

111 while True: 

112 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

113 if interpolated == env_var: 

114 return interpolated 

115 env_var = interpolated 

116 

117 

118def run_command(command: str) -> str: 

119 """Run command and returns stdout.""" 

120 process = subprocess.Popen( 

121 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

122 ) 

123 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

124 

125 if process.returncode != 0: 

126 raise AirflowConfigException( 

127 f"Cannot execute {command}. Error code is: {process.returncode}. " 

128 f"Output: {output}, Stderr: {stderr}" 

129 ) 

130 

131 return output 

132 

133 

134def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool: 

135 """ 

136 Check if the config is a template. 

137 

138 :param configuration_description: description of configuration 

139 :param section: section 

140 :param key: key 

141 :return: True if the config is a template 

142 """ 

143 return configuration_description.get(section, {}).get(key, {}).get("is_template", False) 

144 

145 

146def configure_parser_from_configuration_description( 

147 parser: ConfigParser, 

148 configuration_description: dict[str, dict[str, Any]], 

149 all_vars: dict[str, Any], 

150) -> None: 

151 """ 

152 Configure a ConfigParser based on configuration description. 

153 

154 :param parser: ConfigParser to configure 

155 :param configuration_description: configuration description from config.yml 

156 """ 

157 for section, section_desc in configuration_description.items(): 

158 parser.add_section(section) 

159 options = section_desc["options"] 

160 for key in options: 

161 default_value = options[key]["default"] 

162 is_template = options[key].get("is_template", False) 

163 if (default_value is not None) and not ( 

164 options[key].get("version_deprecated") or options[key].get("deprecation_reason") 

165 ): 

166 if is_template or not isinstance(default_value, str): 

167 parser.set(section, key, str(default_value)) 

168 else: 

169 try: 

170 parser.set(section, key, default_value.format(**all_vars)) 

171 except (KeyError, ValueError): 

172 parser.set(section, key, default_value) 

173 

174 

175def create_provider_cfg_config_fallback_defaults( 

176 provider_config_fallback_defaults_cfg_path: str, 

177) -> ConfigParser: 

178 """ 

179 Create fallback defaults for configuration. 

180 

181 This parser contains provider defaults for Airflow configuration, containing fallback default values 

182 that might be needed when provider classes are being imported - before provider's configuration 

183 is loaded. 

184 

185 Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead 

186 to retrieving provider configuration before the defaults for the provider are loaded. 

187 

188 Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or 

189 environment variables) those will be used as usual. 

190 

191 NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication, 

192 at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed. 

193 

194 You've been warned! 

195 

196 :param provider_config_fallback_defaults_cfg_path: path to the provider config fallback defaults .cfg file 

197 """ 

198 config_parser = ConfigParser() 

199 config_parser.read(provider_config_fallback_defaults_cfg_path) 

200 return config_parser 

201 

202 

203class AirflowConfigParser(ConfigParser): 

204 """ 

205 Base configuration parser with pure parsing logic. 

206 

207 This class provides the core parsing methods that work with: 

208 - configuration_description: dict describing config options (required in __init__) 

209 - _default_values: ConfigParser with default values (required in __init__) 

210 - deprecated_options: class attribute mapping new -> old options 

211 - deprecated_sections: class attribute mapping new -> old sections 

212 """ 

213 

214 # A mapping of section -> setting -> { old, replace } for deprecated default values. 

215 # Subclasses can override this to define deprecated values that should be upgraded. 

216 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {} 

217 

218 # A mapping of (new section, new option) -> (old section, old option, since_version). 

219 # When reading new option, the old option will be checked to see if it exists. If it does a 

220 # DeprecationWarning will be issued and the old option will be used instead 

221 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { 

222 ("dag_processor", "dag_file_processor_timeout"): ("core", "dag_file_processor_timeout", "3.0"), 

223 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"), 

224 ("api", "base_url"): ("webserver", "base_url", "3.0"), 

225 ("api", "host"): ("webserver", "web_server_host", "3.0"), 

226 ("api", "port"): ("webserver", "web_server_port", "3.0"), 

227 ("api", "workers"): ("webserver", "workers", "3.0"), 

228 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"), 

229 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"), 

230 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"), 

231 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"), 

232 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"), 

233 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"), 

234 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"), 

235 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"), 

236 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"), 

237 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"), 

238 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"), 

239 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"), 

240 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"), 

241 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"), 

242 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"), 

243 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"), 

244 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"), 

245 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"), 

246 ("core", "num_dag_runs_to_retain_rendered_fields"): ( 

247 "core", 

248 "max_num_rendered_ti_fields_per_task", 

249 "3.2.0", 

250 ), 

251 ("api", "page_size"): ("webserver", "page_size", "3.1.0"), 

252 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"), 

253 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"), 

254 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"), 

255 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"), 

256 ("api", "log_config"): ("api", "access_logfile", "3.1.0"), 

257 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"), 

258 ("api", "fallback_page_limit"): ("api", "page_size", "3.2.0"), 

259 ("workers", "missing_dag_retries"): ("workers", "missing_dag_retires", "3.1.8"), 

260 } 

261 

262 # A mapping of new section -> (old section, since_version). 

263 deprecated_sections: dict[str, tuple[str, str]] = {} 

264 

265 @property 

266 def _lookup_sequence(self) -> list[Callable]: 

267 """ 

268 Define the sequence of lookup methods for get(). The definition here does not have provider lookup. 

269 

270 Subclasses can override this to customise lookup order. 

271 """ 

272 lookup_methods = [ 

273 self._get_environment_variables, 

274 self._get_option_from_config_file, 

275 self._get_option_from_commands, 

276 self._get_option_from_secrets, 

277 self._get_option_from_defaults, 

278 ] 

279 if self._use_providers_configuration: 

280 # Provider fallback lookups are last so they have the lowest priority in the lookup sequence. 

281 lookup_methods += [ 

282 self._get_option_from_provider_metadata_config_fallbacks, 

283 self._get_option_from_provider_cfg_config_fallbacks, 

284 ] 

285 return lookup_methods 

286 

287 @functools.cached_property 

288 def configuration_description(self) -> dict[str, dict[str, Any]]: 

289 """ 

290 Return configuration description from multiple sources. 

291 

292 Respects the ``_use_providers_configuration`` flag to decide whether to include 

293 provider configuration. 

294 

295 The merged description is built as follows: 

296 

297 1. Start from the base configuration description provided in ``__init__``, usually 

298 loaded from ``config.yml`` in core. Values defined here are never overridden. 

299 2. Merge provider metadata from ``_provider_metadata_configuration_description``, 

300 loaded from provider packages' ``get_provider_info`` method. Only adds missing 

301 sections/options; does not overwrite existing entries from the base configuration. 

302 3. Merge default values from ``_provider_cfg_config_fallback_default_values``, 

303 loaded from ``provider_config_fallback_defaults.cfg``. Only sets ``"default"`` 

304 (and heuristically ``"sensitive"``) for options that do not already define them. 

305 

306 Base configuration takes precedence, then provider metadata fills in missing 

307 descriptions/options, and finally cfg-based fallbacks provide defaults only where 

308 none are defined. 

309 

310 We use ``cached_property`` to cache the merged result; clear this cache (via 

311 ``invalidate_cache``) when toggling ``_use_providers_configuration``. 

312 """ 

313 if not self._use_providers_configuration: 

314 return self._configuration_description 

315 

316 merged_description: dict[str, dict[str, Any]] = deepcopy(self._configuration_description) 

317 

318 # Merge full provider config descriptions (with metadata like sensitive, description, etc.) 

319 # from provider packages' get_provider_info method, reusing the cached raw dict. 

320 for section, section_content in self._provider_metadata_configuration_description.items(): 

321 if section not in merged_description: 

322 merged_description[section] = deepcopy(section_content) 

323 else: 

324 existing_options = merged_description[section].setdefault("options", {}) 

325 for option, option_content in section_content.get("options", {}).items(): 

326 if option not in existing_options: 

327 existing_options[option] = deepcopy(option_content) 

328 

329 # Merge default values from cfg-based fallbacks (key=value only, no metadata). 

330 # Uses setdefault so provider metadata values above take priority. 

331 cfg = self._provider_cfg_config_fallback_default_values 

332 for section in cfg.sections(): 

333 section_options = merged_description.setdefault(section, {"options": {}}).setdefault( 

334 "options", {} 

335 ) 

336 for option in cfg.options(section): 

337 opt_dict = section_options.setdefault(option, {}) 

338 opt_dict.setdefault("default", cfg.get(section, option)) 

339 # For cfg-only options with no provider metadata, infer sensitivity from name. 

340 if "sensitive" not in opt_dict and option.endswith(("password", "secret")): 

341 opt_dict["sensitive"] = True 

342 

343 return merged_description 

344 

345 @property 

346 def _config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: 

347 """Override the base method to add provider fallbacks when providers are loaded.""" 

348 sources: list[tuple[str, ConfigParser]] = [] 

349 if self._use_providers_configuration: 

350 # Provider fallback defaults are listed first so they have the lowest priority 

351 # in as_dict()'s "last source wins" semantics. 

352 sources += [ 

353 ("provider-cfg-fallback-defaults", self._provider_cfg_config_fallback_default_values), 

354 ( 

355 "provider-metadata-fallback-defaults", 

356 self._provider_metadata_config_fallback_default_values, 

357 ), 

358 ] 

359 sources += [ 

360 ("default", self._default_values), 

361 ("airflow.cfg", self), 

362 ] 

363 return sources 

364 

365 def _get_option_from_provider_cfg_config_fallbacks( 

366 self, 

367 deprecated_key: str | None, 

368 deprecated_section: str | None, 

369 key: str, 

370 section: str, 

371 issue_warning: bool = True, 

372 extra_stacklevel: int = 0, 

373 **kwargs, 

374 ) -> str | ValueNotFound: 

375 """Get config option from provider fallback defaults.""" 

376 value = self.get_from_provider_cfg_config_fallback_defaults(section, key, **kwargs) 

377 if value is not VALUE_NOT_FOUND_SENTINEL: 

378 return value 

379 return VALUE_NOT_FOUND_SENTINEL 

380 

381 def _get_option_from_provider_metadata_config_fallbacks( 

382 self, 

383 deprecated_key: str | None, 

384 deprecated_section: str | None, 

385 key: str, 

386 section: str, 

387 issue_warning: bool = True, 

388 extra_stacklevel: int = 0, 

389 **kwargs, 

390 ) -> str | ValueNotFound: 

391 """Get config option from provider metadata fallback defaults.""" 

392 value = self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs) 

393 if value is not VALUE_NOT_FOUND_SENTINEL: 

394 return value 

395 return VALUE_NOT_FOUND_SENTINEL 

396 

397 def get_from_provider_cfg_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: 

398 """Get provider config fallback default values.""" 

399 raw = kwargs.get("raw", False) 

400 vars_ = kwargs.get("vars") 

401 return self._provider_cfg_config_fallback_default_values.get( 

402 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, raw=raw, vars=vars_ 

403 ) 

404 

405 @functools.cached_property 

406 def _provider_metadata_configuration_description(self) -> dict[str, dict[str, Any]]: 

407 """Raw provider configuration descriptions with full metadata (sensitive, description, etc.).""" 

408 result: dict[str, dict[str, Any]] = {} 

409 for _, config in self._provider_manager_type().provider_configs: 

410 result.update(config) 

411 return result 

412 

413 @functools.cached_property 

414 def _provider_metadata_config_fallback_default_values(self) -> ConfigParser: 

415 """Return Provider metadata config fallback default values.""" 

416 return self._create_default_config_parser_callable(self._provider_metadata_configuration_description) 

417 

418 def get_from_provider_metadata_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: 

419 """Get provider metadata config fallback default values.""" 

420 raw = kwargs.get("raw", False) 

421 vars_ = kwargs.get("vars") 

422 return self._provider_metadata_config_fallback_default_values.get( 

423 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, raw=raw, vars=vars_ 

424 ) 

425 

426 @property 

427 def _validators(self) -> list[Callable[[], None]]: 

428 """ 

429 Return list of validators defined on a config parser class. Base class will return an empty list. 

430 

431 Subclasses can override this to customize the validators that are run during validation on the 

432 config parser instance. 

433 """ 

434 return [] 

435 

436 def validate(self) -> None: 

437 """Run all registered validators.""" 

438 for validator in self._validators: 

439 validator() 

440 self.is_validated = True 

441 

442 def _validate_deprecated_values(self) -> None: 

443 """Validate and upgrade deprecated default values.""" 

444 for section, replacement in self.deprecated_values.items(): 

445 for name, info in replacement.items(): 

446 old, new = info 

447 current_value = self.get(section, name, fallback="") 

448 if self._using_old_value(old, current_value): 

449 self.upgraded_values[(section, name)] = current_value 

450 new_value = old.sub(new, current_value) 

451 self._update_env_var(section=section, name=name, new_value=new_value) 

452 self._create_future_warning( 

453 name=name, 

454 section=section, 

455 current_value=current_value, 

456 new_value=new_value, 

457 ) 

458 

459 def _using_old_value(self, old: Pattern, current_value: str) -> bool: 

460 """Check if current_value matches the old pattern.""" 

461 return old.search(current_value) is not None 

462 

463 def _update_env_var(self, section: str, name: str, new_value: str) -> None: 

464 """Update environment variable with new value.""" 

465 env_var = self._env_var_name(section, name) 

466 # Set it as an env var so that any subprocesses keep the same override! 

467 os.environ[env_var] = new_value 

468 

469 @staticmethod 

470 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None: 

471 """Create a FutureWarning for deprecated default values.""" 

472 warnings.warn( 

473 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. " 

474 f"This value has been changed to {new_value!r} in the running config, but please update your config.", 

475 FutureWarning, 

476 stacklevel=3, 

477 ) 

478 

479 def __init__( 

480 self, 

481 configuration_description: dict[str, dict[str, Any]], 

482 _default_values: ConfigParser, 

483 provider_manager_type: type[ProvidersManager] | type[ProvidersManagerTaskRuntime], 

484 create_default_config_parser_callable: Callable[[dict[str, dict[str, Any]]], ConfigParser], 

485 provider_config_fallback_defaults_cfg_path: str, 

486 *args, 

487 **kwargs, 

488 ): 

489 """ 

490 Initialize the parser. 

491 

492 :param configuration_description: Description of configuration options 

493 :param _default_values: ConfigParser with default values 

494 :param provider_manager_type: Either ProvidersManager or ProvidersManagerTaskRuntime, depending on the context of the caller. 

495 :param create_default_config_parser_callable: The `create_default_config_parser` function from core or SDK, depending on the context of the caller. 

496 :param provider_config_fallback_defaults_cfg_path: Path to the `provider_config_fallback_defaults.cfg` file. 

497 """ 

498 super().__init__(*args, **kwargs) 

499 self._configuration_description = configuration_description 

500 self._default_values = _default_values 

501 self._provider_manager_type = provider_manager_type 

502 self._create_default_config_parser_callable = create_default_config_parser_callable 

503 self._provider_cfg_config_fallback_default_values = create_provider_cfg_config_fallback_defaults( 

504 provider_config_fallback_defaults_cfg_path 

505 ) 

506 self._suppress_future_warnings = False 

507 self.upgraded_values: dict[tuple[str, str], str] = {} 

508 # The _use_providers_configuration flag will always be True unless we call `write(include_providers=False)` or `with self.make_sure_configuration_loaded(with_providers=False)`. 

509 # Even when we call those methods, the flag will be set back to True after the method is done, so it only affects the current call to `as_dict()` and does not have any effect on subsequent calls. 

510 self._use_providers_configuration = True 

511 

512 def invalidate_cache(self) -> None: 

513 """ 

514 Clear all ``functools.cached_property`` entries on this instance. 

515 

516 Call this after mutating class-level attributes (e.g. ``deprecated_options``) 

517 so that derived cached properties are recomputed on next access. 

518 """ 

519 for attr_name in ( 

520 name 

521 for name in dir(type(self)) 

522 if isinstance(getattr(type(self), name, None), functools.cached_property) 

523 ): 

524 self.__dict__.pop(attr_name, None) 

525 

526 def _invalidate_provider_flag_caches(self) -> None: 

527 """Invalidate caches related to provider configuration flags.""" 

528 self.__dict__.pop("configuration_description", None) 

529 self.__dict__.pop("sensitive_config_values", None) 

530 

531 @functools.cached_property 

532 def inversed_deprecated_options(self): 

533 """Build inverse mapping from old options to new options.""" 

534 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()} 

535 

536 @functools.cached_property 

537 def inversed_deprecated_sections(self): 

538 """Build inverse mapping from old sections to new sections.""" 

539 return { 

540 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items() 

541 } 

542 

543 @functools.cached_property 

544 def sensitive_config_values(self) -> set[tuple[str, str]]: 

545 """Get set of sensitive config values that should be masked.""" 

546 flattened = { 

547 (s, k): item 

548 for s, s_c in self.configuration_description.items() 

549 for k, item in s_c.get("options", {}).items() 

550 } 

551 sensitive = { 

552 (section.lower(), key.lower()) 

553 for (section, key), v in flattened.items() 

554 if v.get("sensitive") is True 

555 } 

556 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options} 

557 depr_section = { 

558 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections 

559 } 

560 sensitive.update(depr_section, depr_option) 

561 return sensitive 

562 

563 def _update_defaults_from_string(self, config_string: str) -> None: 

564 """ 

565 Update the defaults in _default_values based on values in config_string ("ini" format). 

566 

567 Override shared parser's method to add validation for template variables. 

568 Note that those values are not validated and cannot contain variables because we are using 

569 regular config parser to load them. This method is used to test the config parser in unit tests. 

570 

571 :param config_string: ini-formatted config string 

572 """ 

573 parser = ConfigParser() 

574 parser.read_string(config_string) 

575 for section in parser.sections(): 

576 if section not in self._default_values.sections(): 

577 self._default_values.add_section(section) 

578 errors = False 

579 for key, value in parser.items(section): 

580 if not self.is_template(section, key) and "{" in value: 

581 errors = True 

582 log.error( 

583 "The %s.%s value %s read from string contains variable. This is not supported", 

584 section, 

585 key, 

586 value, 

587 ) 

588 self._default_values.set(section, key, value) 

589 if errors: 

590 raise AirflowConfigException( 

591 f"The string config passed as default contains variables. " 

592 f"This is not supported. String config: {config_string}" 

593 ) 

594 

595 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any: 

596 """ 

597 Retrieve default value from default config parser, including provider fallbacks. 

598 

599 This will retrieve the default value from the core default config parser first. If not found 

600 and providers configuration is loaded, it also checks provider fallback defaults. 

601 Optionally a raw, stored value can be retrieved by setting skip_interpolation to True. 

602 This is useful for example when we want to write the default value to a file, and we don't 

603 want the interpolation to happen as it is going to be done later when the config is read. 

604 

605 :param section: section of the config 

606 :param key: key to use 

607 :param fallback: fallback value to use 

608 :param raw: if raw, then interpolation will be reversed 

609 :param kwargs: other args 

610 :return: 

611 """ 

612 value = self._default_values.get(section, key, fallback=VALUE_NOT_FOUND_SENTINEL, **kwargs) 

613 # Provider metadata has higher priority than cfg fallback — check it first. 

614 if value is VALUE_NOT_FOUND_SENTINEL and self._use_providers_configuration: 

615 value = self._provider_metadata_config_fallback_default_values.get( 

616 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, **kwargs 

617 ) 

618 if value is VALUE_NOT_FOUND_SENTINEL and self._use_providers_configuration: 

619 value = self._provider_cfg_config_fallback_default_values.get( 

620 section, key, fallback=VALUE_NOT_FOUND_SENTINEL, **kwargs 

621 ) 

622 if value is VALUE_NOT_FOUND_SENTINEL: 

623 value = fallback 

624 if raw and value is not None: 

625 return value.replace("%", "%%") 

626 return value 

627 

628 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None: 

629 """ 

630 Get Secret Backend if defined in airflow.cfg. 

631 

632 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

633 """ 

634 section = "workers" if worker_mode else "secrets" 

635 key = "secrets_backend" if worker_mode else "backend" 

636 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs" 

637 

638 secrets_backend_cls = self.getimport(section=section, key=key) 

639 

640 if not secrets_backend_cls: 

641 if worker_mode: 

642 # if we find no secrets backend for worker, return that of secrets backend 

643 secrets_backend_cls = self.getimport(section="secrets", key="backend") 

644 if not secrets_backend_cls: 

645 return None 

646 # When falling back to secrets backend, use its kwargs 

647 kwargs_key = "backend_kwargs" 

648 section = "secrets" 

649 else: 

650 return None 

651 

652 try: 

653 backend_kwargs = self.getjson(section=section, key=kwargs_key) 

654 if not backend_kwargs: 

655 backend_kwargs = {} 

656 elif not isinstance(backend_kwargs, dict): 

657 raise ValueError("not a dict") 

658 except AirflowConfigException: 

659 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key) 

660 backend_kwargs = {} 

661 except ValueError: 

662 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key) 

663 backend_kwargs = {} 

664 

665 # Collect per-key overrides; they take precedence over the JSON blob. 

666 env_prefix = _build_kwarg_env_prefix(section, kwargs_key) 

667 backend_kwargs.update(_collect_kwarg_env_vars(env_prefix)) 

668 

669 return secrets_backend_cls(**backend_kwargs) 

670 

671 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

672 """ 

673 Get Config option values from Secret Backend. 

674 

675 Called by the shared parser's _get_secret_option() method as part of the lookup chain. 

676 Uses _get_custom_secret_backend() to get the backend instance. 

677 

678 :param config_key: the config key to retrieve 

679 :return: config value or None 

680 """ 

681 try: 

682 secrets_client = self._get_custom_secret_backend() 

683 if not secrets_client: 

684 return None 

685 return secrets_client.get_config(config_key) 

686 except Exception as e: 

687 raise AirflowConfigException( 

688 "Cannot retrieve config from alternative secrets backend. " 

689 "Make sure it is configured properly and that the Backend " 

690 "is accessible.\n" 

691 f"{e}" 

692 ) 

693 

694 def _get_cmd_option_from_config_sources( 

695 self, config_sources: ConfigSourcesType, section: str, key: str 

696 ) -> str | None: 

697 fallback_key = key + "_cmd" 

698 if (section, key) in self.sensitive_config_values: 

699 section_dict = config_sources.get(section) 

700 if section_dict is not None: 

701 command_value = section_dict.get(fallback_key) 

702 if command_value is not None: 

703 if isinstance(command_value, str): 

704 command = command_value 

705 else: 

706 command = command_value[0] 

707 return run_command(command) 

708 return None 

709 

710 def _get_secret_option_from_config_sources( 

711 self, config_sources: ConfigSourcesType, section: str, key: str 

712 ) -> str | None: 

713 fallback_key = key + "_secret" 

714 if (section, key) in self.sensitive_config_values: 

715 section_dict = config_sources.get(section) 

716 if section_dict is not None: 

717 secrets_path_value = section_dict.get(fallback_key) 

718 if secrets_path_value is not None: 

719 if isinstance(secrets_path_value, str): 

720 secrets_path = secrets_path_value 

721 else: 

722 secrets_path = secrets_path_value[0] 

723 return self._get_config_value_from_secret_backend(secrets_path) 

724 return None 

725 

726 def _include_secrets( 

727 self, 

728 config_sources: ConfigSourcesType, 

729 display_sensitive: bool, 

730 display_source: bool, 

731 raw: bool, 

732 ): 

733 for section, key in self.sensitive_config_values: 

734 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key) 

735 if value: 

736 if not display_sensitive: 

737 value = "< hidden >" 

738 if display_source: 

739 opt: str | tuple[str, str] = (value, "secret") 

740 elif raw: 

741 opt = value.replace("%", "%%") 

742 else: 

743 opt = value 

744 config_sources.setdefault(section, {}).update({key: opt}) 

745 del config_sources[section][key + "_secret"] 

746 

747 def _include_commands( 

748 self, 

749 config_sources: ConfigSourcesType, 

750 display_sensitive: bool, 

751 display_source: bool, 

752 raw: bool, 

753 ): 

754 for section, key in self.sensitive_config_values: 

755 opt = self._get_cmd_option_from_config_sources(config_sources, section, key) 

756 if not opt: 

757 continue 

758 opt_to_set: str | tuple[str, str] | None = opt 

759 if not display_sensitive: 

760 opt_to_set = "< hidden >" 

761 if display_source: 

762 opt_to_set = (str(opt_to_set), "cmd") 

763 elif raw: 

764 opt_to_set = str(opt_to_set).replace("%", "%%") 

765 if opt_to_set is not None: 

766 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set} 

767 config_sources.setdefault(section, {}).update(dict_to_update) 

768 del config_sources[section][key + "_cmd"] 

769 

770 def _include_envs( 

771 self, 

772 config_sources: ConfigSourcesType, 

773 display_sensitive: bool, 

774 display_source: bool, 

775 raw: bool, 

776 ): 

777 for env_var in [ 

778 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX) 

779 ]: 

780 try: 

781 _, section, key = env_var.split("__", 2) 

782 opt = self._get_env_var_option(section, key) 

783 except ValueError: 

784 continue 

785 if opt is None: 

786 log.warning("Ignoring unknown env var '%s'", env_var) 

787 continue 

788 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"): 

789 # Don't hide cmd/secret values here 

790 if not env_var.lower().endswith(("cmd", "secret")): 

791 if (section, key) in self.sensitive_config_values: 

792 opt = "< hidden >" 

793 elif raw: 

794 opt = opt.replace("%", "%%") 

795 if display_source: 

796 opt = (opt, "env var") 

797 

798 section = section.lower() 

799 key = key.lower() 

800 config_sources.setdefault(section, {}).update({key: opt}) 

801 

802 def _filter_by_source( 

803 self, 

804 config_sources: ConfigSourcesType, 

805 display_source: bool, 

806 getter_func, 

807 ): 

808 """ 

809 Delete default configs from current configuration. 

810 

811 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values. 

812 

813 This is necessary because bare configs take precedence over the command 

814 or secret key equivalents so if the current running config is 

815 materialized with Airflow defaults they in turn override user set 

816 command or secret key configs. 

817 

818 :param config_sources: The current configuration to operate on 

819 :param display_source: If False, configuration options contain raw 

820 values. If True, options are a tuple of (option_value, source). 

821 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'. 

822 :param getter_func: A callback function that gets the user configured 

823 override value for a particular sensitive_config_values config. 

824 :return: None, the given config_sources is filtered if necessary, 

825 otherwise untouched. 

826 """ 

827 for section, key in self.sensitive_config_values: 

828 # Don't bother if we don't have section / key 

829 if section not in config_sources or key not in config_sources[section]: 

830 continue 

831 # Check that there is something to override defaults 

832 try: 

833 getter_opt = getter_func(section, key) 

834 except ValueError: 

835 continue 

836 if not getter_opt: 

837 continue 

838 # Check to see that there is a default value 

839 if self.get_default_value(section, key) is None: 

840 continue 

841 # Check to see if bare setting is the same as defaults 

842 if display_source: 

843 # when display_source = true, we know that the config_sources contains tuple 

844 opt, source = config_sources[section][key] # type: ignore 

845 else: 

846 opt = config_sources[section][key] 

847 if opt == self.get_default_value(section, key): 

848 del config_sources[section][key] 

849 

850 @staticmethod 

851 def _deprecated_value_is_set_in_config( 

852 deprecated_section: str, 

853 deprecated_key: str, 

854 configs: Iterable[tuple[str, ConfigParser]], 

855 ) -> bool: 

856 for config_type, config in configs: 

857 if config_type != "default": 

858 with contextlib.suppress(NoSectionError): 

859 deprecated_section_array = config.items(section=deprecated_section, raw=True) 

860 if any(key == deprecated_key for key, _ in deprecated_section_array): 

861 return True 

862 return False 

863 

864 @staticmethod 

865 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

866 return ( 

867 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}") 

868 is not None 

869 ) 

870 

871 @staticmethod 

872 def _deprecated_command_is_set_in_config( 

873 deprecated_section: str, 

874 deprecated_key: str, 

875 configs: Iterable[tuple[str, ConfigParser]], 

876 ) -> bool: 

877 return AirflowConfigParser._deprecated_value_is_set_in_config( 

878 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs 

879 ) 

880 

881 @staticmethod 

882 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

883 return ( 

884 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD") 

885 is not None 

886 ) 

887 

888 @staticmethod 

889 def _deprecated_secret_is_set_in_config( 

890 deprecated_section: str, 

891 deprecated_key: str, 

892 configs: Iterable[tuple[str, ConfigParser]], 

893 ) -> bool: 

894 return AirflowConfigParser._deprecated_value_is_set_in_config( 

895 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs 

896 ) 

897 

898 @staticmethod 

899 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

900 return ( 

901 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET") 

902 is not None 

903 ) 

904 

905 @staticmethod 

906 def _replace_config_with_display_sources( 

907 config_sources: ConfigSourcesType, 

908 configs: Iterable[tuple[str, ConfigParser]], 

909 configuration_description: dict[str, dict[str, Any]], 

910 display_source: bool, 

911 raw: bool, 

912 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

913 include_env: bool, 

914 include_cmds: bool, 

915 include_secret: bool, 

916 ): 

917 for source_name, config in configs: 

918 sections = config.sections() 

919 for section in sections: 

920 AirflowConfigParser._replace_section_config_with_display_sources( 

921 config, 

922 config_sources, 

923 configuration_description, 

924 display_source, 

925 raw, 

926 section, 

927 source_name, 

928 deprecated_options, 

929 configs, 

930 include_env=include_env, 

931 include_cmds=include_cmds, 

932 include_secret=include_secret, 

933 ) 

934 

935 @staticmethod 

936 def _replace_section_config_with_display_sources( 

937 config: ConfigParser, 

938 config_sources: ConfigSourcesType, 

939 configuration_description: dict[str, dict[str, Any]], 

940 display_source: bool, 

941 raw: bool, 

942 section: str, 

943 source_name: str, 

944 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

945 configs: Iterable[tuple[str, ConfigParser]], 

946 include_env: bool, 

947 include_cmds: bool, 

948 include_secret: bool, 

949 ): 

950 sect = config_sources.setdefault(section, {}) 

951 if isinstance(config, AirflowConfigParser): 

952 with config.suppress_future_warnings(): 

953 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw) 

954 else: 

955 items = config.items(section=section, raw=raw) 

956 for k, val in items: 

957 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None)) 

958 if deprecated_section and deprecated_key: 

959 if source_name == "default": 

960 # If deprecated entry has some non-default value set for any of the sources requested, 

961 # We should NOT set default for the new entry (because it will override anything 

962 # coming from the deprecated ones) 

963 if AirflowConfigParser._deprecated_value_is_set_in_config( 

964 deprecated_section, deprecated_key, configs 

965 ): 

966 continue 

967 if include_env and AirflowConfigParser._deprecated_variable_is_set( 

968 deprecated_section, deprecated_key 

969 ): 

970 continue 

971 if include_cmds and ( 

972 AirflowConfigParser._deprecated_variable_command_is_set( 

973 deprecated_section, deprecated_key 

974 ) 

975 or AirflowConfigParser._deprecated_command_is_set_in_config( 

976 deprecated_section, deprecated_key, configs 

977 ) 

978 ): 

979 continue 

980 if include_secret and ( 

981 AirflowConfigParser._deprecated_variable_secret_is_set( 

982 deprecated_section, deprecated_key 

983 ) 

984 or AirflowConfigParser._deprecated_secret_is_set_in_config( 

985 deprecated_section, deprecated_key, configs 

986 ) 

987 ): 

988 continue 

989 if display_source: 

990 updated_source_name = source_name 

991 if source_name == "default": 

992 # defaults can come from other sources (default-<PROVIDER>) that should be used here 

993 source_description_section = configuration_description.get(section, {}) 

994 source_description_key = source_description_section.get("options", {}).get(k, {}) 

995 if source_description_key is not None: 

996 updated_source_name = source_description_key.get("source", source_name) 

997 sect[k] = (val, updated_source_name) 

998 else: 

999 sect[k] = val 

1000 

1001 def _warn_deprecate( 

1002 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int 

1003 ): 

1004 """Warn about deprecated config option usage.""" 

1005 if section == deprecated_section: 

1006 warnings.warn( 

1007 f"The {deprecated_name} option in [{section}] has been renamed to {key} - " 

1008 f"the old setting has been used, but please update your config.", 

1009 DeprecationWarning, 

1010 stacklevel=4 + extra_stacklevel, 

1011 ) 

1012 else: 

1013 warnings.warn( 

1014 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option " 

1015 f"in [{section}] - the old setting has been used, but please update your config.", 

1016 DeprecationWarning, 

1017 stacklevel=4 + extra_stacklevel, 

1018 ) 

1019 

1020 @contextmanager 

1021 def suppress_future_warnings(self): 

1022 """ 

1023 Context manager to temporarily suppress future warnings. 

1024 

1025 This is a stub used by the shared parser's lookup methods when checking deprecated options. 

1026 Subclasses can override this to customize warning suppression behavior. 

1027 

1028 :return: context manager that suppresses future warnings 

1029 """ 

1030 suppress_future_warnings = self._suppress_future_warnings 

1031 self._suppress_future_warnings = True 

1032 yield self 

1033 self._suppress_future_warnings = suppress_future_warnings 

1034 

1035 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str: 

1036 """Generate environment variable name for a config option.""" 

1037 team_component: str = f"{team_name.upper()}___" if team_name else "" 

1038 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}" 

1039 

1040 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None): 

1041 """Get config option from environment variable.""" 

1042 env_var: str = self._env_var_name(section, key, team_name=team_name) 

1043 if env_var in os.environ: 

1044 return expand_env_var(os.environ[env_var]) 

1045 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command) 

1046 env_var_cmd = env_var + "_CMD" 

1047 if env_var_cmd in os.environ: 

1048 # if this is a valid command key... 

1049 if (section, key) in self.sensitive_config_values: 

1050 return run_command(os.environ[env_var_cmd]) 

1051 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend) 

1052 env_var_secret_path = env_var + "_SECRET" 

1053 if env_var_secret_path in os.environ: 

1054 # if this is a valid secret path... 

1055 if (section, key) in self.sensitive_config_values: 

1056 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path]) 

1057 return None 

1058 

1059 def _get_cmd_option(self, section: str, key: str): 

1060 """Get config option from command execution.""" 

1061 fallback_key = key + "_cmd" 

1062 if (section, key) in self.sensitive_config_values: 

1063 if super().has_option(section, fallback_key): 

1064 command = super().get(section, fallback_key) 

1065 try: 

1066 cmd_output = run_command(command) 

1067 except AirflowConfigException as e: 

1068 raise e 

1069 except Exception as e: 

1070 raise AirflowConfigException( 

1071 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd." 

1072 f" Please check the {fallback_key} value." 

1073 ) from e 

1074 return cmd_output 

1075 return None 

1076 

1077 def _get_secret_option(self, section: str, key: str) -> str | None: 

1078 """Get Config option values from Secret Backend.""" 

1079 fallback_key = key + "_secret" 

1080 if (section, key) in self.sensitive_config_values: 

1081 if super().has_option(section, fallback_key): 

1082 secrets_path = super().get(section, fallback_key) 

1083 return self._get_config_value_from_secret_backend(secrets_path) 

1084 return None 

1085 

1086 def _get_environment_variables( 

1087 self, 

1088 deprecated_key: str | None, 

1089 deprecated_section: str | None, 

1090 key: str, 

1091 section: str, 

1092 issue_warning: bool = True, 

1093 extra_stacklevel: int = 0, 

1094 **kwargs, 

1095 ) -> str | ValueNotFound: 

1096 """Get config option from environment variables.""" 

1097 team_name = kwargs.get("team_name", None) 

1098 option = self._get_env_var_option(section, key, team_name=team_name) 

1099 if option is not None: 

1100 return option 

1101 if deprecated_section and deprecated_key: 

1102 with self.suppress_future_warnings(): 

1103 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name) 

1104 if option is not None: 

1105 if issue_warning: 

1106 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

1107 return option 

1108 return VALUE_NOT_FOUND_SENTINEL 

1109 

1110 def _get_option_from_config_file( 

1111 self, 

1112 deprecated_key: str | None, 

1113 deprecated_section: str | None, 

1114 key: str, 

1115 section: str, 

1116 issue_warning: bool = True, 

1117 extra_stacklevel: int = 0, 

1118 **kwargs, 

1119 ) -> str | ValueNotFound: 

1120 """Get config option from config file.""" 

1121 if team_name := kwargs.get("team_name", None): 

1122 section = f"{team_name}={section}" 

1123 # since this is the last lookup that supports team_name, pop it 

1124 kwargs.pop("team_name") 

1125 if super().has_option(section, key): 

1126 return expand_env_var(super().get(section, key, **kwargs)) 

1127 if deprecated_section and deprecated_key: 

1128 if super().has_option(deprecated_section, deprecated_key): 

1129 if issue_warning: 

1130 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

1131 with self.suppress_future_warnings(): 

1132 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs)) 

1133 return VALUE_NOT_FOUND_SENTINEL 

1134 

1135 def _get_option_from_commands( 

1136 self, 

1137 deprecated_key: str | None, 

1138 deprecated_section: str | None, 

1139 key: str, 

1140 section: str, 

1141 issue_warning: bool = True, 

1142 extra_stacklevel: int = 0, 

1143 **kwargs, 

1144 ) -> str | ValueNotFound: 

1145 """Get config option from command execution.""" 

1146 if kwargs.get("team_name", None): 

1147 # Commands based team config fetching is not currently supported 

1148 return VALUE_NOT_FOUND_SENTINEL 

1149 option = self._get_cmd_option(section, key) 

1150 if option: 

1151 return option 

1152 if deprecated_section and deprecated_key: 

1153 with self.suppress_future_warnings(): 

1154 option = self._get_cmd_option(deprecated_section, deprecated_key) 

1155 if option: 

1156 if issue_warning: 

1157 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

1158 return option 

1159 return VALUE_NOT_FOUND_SENTINEL 

1160 

1161 def _get_option_from_secrets( 

1162 self, 

1163 deprecated_key: str | None, 

1164 deprecated_section: str | None, 

1165 key: str, 

1166 section: str, 

1167 issue_warning: bool = True, 

1168 extra_stacklevel: int = 0, 

1169 **kwargs, 

1170 ) -> str | ValueNotFound: 

1171 """Get config option from secrets backend.""" 

1172 if kwargs.get("team_name", None): 

1173 # Secrets based team config fetching is not currently supported 

1174 return VALUE_NOT_FOUND_SENTINEL 

1175 option = self._get_secret_option(section, key) 

1176 if option: 

1177 return option 

1178 if deprecated_section and deprecated_key: 

1179 with self.suppress_future_warnings(): 

1180 option = self._get_secret_option(deprecated_section, deprecated_key) 

1181 if option: 

1182 if issue_warning: 

1183 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

1184 return option 

1185 return VALUE_NOT_FOUND_SENTINEL 

1186 

1187 def _get_option_from_defaults( 

1188 self, 

1189 deprecated_key: str | None, 

1190 deprecated_section: str | None, 

1191 key: str, 

1192 section: str, 

1193 issue_warning: bool = True, 

1194 extra_stacklevel: int = 0, 

1195 team_name: str | None = None, 

1196 **kwargs, 

1197 ) -> str | ValueNotFound: 

1198 """Get config option from default values.""" 

1199 if self.get_default_value(section, key) is not None or "fallback" in kwargs: 

1200 return expand_env_var(self.get_default_value(section, key, **kwargs)) 

1201 return VALUE_NOT_FOUND_SENTINEL 

1202 

1203 def _resolve_deprecated_lookup( 

1204 self, 

1205 section: str, 

1206 key: str, 

1207 lookup_from_deprecated: bool, 

1208 extra_stacklevel: int = 0, 

1209 ) -> tuple[str, str, str | None, str | None, bool]: 

1210 """ 

1211 Resolve deprecated section/key mappings and determine deprecated values. 

1212 

1213 :param section: Section name (will be lowercased) 

1214 :param key: Key name (will be lowercased) 

1215 :param lookup_from_deprecated: Whether to lookup from deprecated options 

1216 :param extra_stacklevel: Extra stack level for warnings 

1217 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted) 

1218 """ 

1219 section = section.lower() 

1220 key = key.lower() 

1221 warning_emitted = False 

1222 deprecated_section: str | None = None 

1223 deprecated_key: str | None = None 

1224 

1225 if not lookup_from_deprecated: 

1226 return section, key, deprecated_section, deprecated_key, warning_emitted 

1227 

1228 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {}) 

1229 if option_description.get("deprecated"): 

1230 deprecation_reason = option_description.get("deprecation_reason", "") 

1231 warnings.warn( 

1232 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}", 

1233 DeprecationWarning, 

1234 stacklevel=2 + extra_stacklevel, 

1235 ) 

1236 # For the cases in which we rename whole sections 

1237 if section in self.inversed_deprecated_sections: 

1238 deprecated_section, deprecated_key = (section, key) 

1239 section = self.inversed_deprecated_sections[section] 

1240 if not self._suppress_future_warnings: 

1241 warnings.warn( 

1242 f"The config section [{deprecated_section}] has been renamed to " 

1243 f"[{section}]. Please update your `conf.get*` call to use the new name", 

1244 FutureWarning, 

1245 stacklevel=2 + extra_stacklevel, 

1246 ) 

1247 # Don't warn about individual rename if the whole section is renamed 

1248 warning_emitted = True 

1249 elif (section, key) in self.inversed_deprecated_options: 

1250 # Handle using deprecated section/key instead of the new section/key 

1251 new_section, new_key = self.inversed_deprecated_options[(section, key)] 

1252 if not self._suppress_future_warnings and not warning_emitted: 

1253 warnings.warn( 

1254 f"section/key [{section}/{key}] has been deprecated, you should use" 

1255 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the " 

1256 "new name", 

1257 FutureWarning, 

1258 stacklevel=2 + extra_stacklevel, 

1259 ) 

1260 warning_emitted = True 

1261 deprecated_section, deprecated_key = section, key 

1262 section, key = (new_section, new_key) 

1263 elif section in self.deprecated_sections: 

1264 # When accessing the new section name, make sure we check under the old config name 

1265 deprecated_key = key 

1266 deprecated_section = self.deprecated_sections[section][0] 

1267 else: 

1268 deprecated_section, deprecated_key, _ = self.deprecated_options.get( 

1269 (section, key), (None, None, None) 

1270 ) 

1271 

1272 return section, key, deprecated_section, deprecated_key, warning_emitted 

1273 

1274 def load_providers_configuration(self) -> None: 

1275 """ 

1276 Load configuration for providers. 

1277 

1278 .. deprecated:: 3.2.0 

1279 Provider configuration is now loaded lazily via the ``configuration_description`` 

1280 cached property. This method is kept for backwards compatibility and will be 

1281 removed in a future version. 

1282 """ 

1283 warnings.warn( 

1284 "load_providers_configuration() is deprecated. " 

1285 "Provider configuration is now loaded lazily via the " 

1286 "`configuration_description` cached property.", 

1287 DeprecationWarning, 

1288 stacklevel=2, 

1289 ) 

1290 self._use_providers_configuration = True 

1291 self._invalidate_provider_flag_caches() 

1292 

1293 def restore_core_default_configuration(self) -> None: 

1294 """ 

1295 Restore the parser state before provider-contributed sections were loaded. 

1296 

1297 .. deprecated:: 3.2.0 

1298 Use ``make_sure_configuration_loaded(with_providers=False)`` context manager 

1299 instead. This method is kept for backwards compatibility and will be removed 

1300 in a future version. 

1301 """ 

1302 warnings.warn( 

1303 "restore_core_default_configuration() is deprecated. " 

1304 "Use `make_sure_configuration_loaded(with_providers=False)` instead.", 

1305 DeprecationWarning, 

1306 stacklevel=2, 

1307 ) 

1308 self._use_providers_configuration = False 

1309 self._invalidate_provider_flag_caches() 

1310 

1311 @overload # type: ignore[override] 

1312 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... 

1313 

1314 @overload # type: ignore[override] 

1315 def get(self, section: str, key: str, **kwargs) -> str | None: ... 

1316 

1317 def get( # type: ignore[misc, override] 

1318 self, 

1319 section: str, 

1320 key: str, 

1321 suppress_warnings: bool = False, 

1322 lookup_from_deprecated: bool = True, 

1323 _extra_stacklevel: int = 0, 

1324 team_name: str | None = None, 

1325 **kwargs, 

1326 ) -> str | None: 

1327 """ 

1328 Get config value by iterating through lookup sequence. 

1329 

1330 Priority order is defined by _lookup_sequence property. 

1331 """ 

1332 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup( 

1333 section=section, 

1334 key=key, 

1335 lookup_from_deprecated=lookup_from_deprecated, 

1336 extra_stacklevel=_extra_stacklevel, 

1337 ) 

1338 

1339 if team_name is not None: 

1340 kwargs["team_name"] = team_name 

1341 

1342 for lookup_method in self._lookup_sequence: 

1343 value = lookup_method( 

1344 deprecated_key=deprecated_key, 

1345 deprecated_section=deprecated_section, 

1346 key=key, 

1347 section=section, 

1348 issue_warning=not warning_emitted, 

1349 extra_stacklevel=_extra_stacklevel, 

1350 **kwargs, 

1351 ) 

1352 if value is not VALUE_NOT_FOUND_SENTINEL: 

1353 return value 

1354 

1355 # Check if fallback was explicitly provided (even if None) 

1356 if "fallback" in kwargs: 

1357 return kwargs["fallback"] 

1358 

1359 if not suppress_warnings: 

1360 log.warning("section/key [%s/%s] not found in config", section, key) 

1361 

1362 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config") 

1363 

1364 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override] 

1365 """Get config value as boolean.""" 

1366 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip() 

1367 if "#" in val: 

1368 val = val.split("#")[0].strip() 

1369 if val in ("t", "true", "1"): 

1370 return True 

1371 if val in ("f", "false", "0"): 

1372 return False 

1373 raise AirflowConfigException( 

1374 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. ' 

1375 f'Current value: "{val}".' 

1376 ) 

1377 

1378 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override] 

1379 """Get config value as integer.""" 

1380 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1381 if val is None: 

1382 raise AirflowConfigException( 

1383 f"Failed to convert value None to int. " 

1384 f'Please check "{key}" key in "{section}" section is set.' 

1385 ) 

1386 try: 

1387 return int(val) 

1388 except ValueError: 

1389 try: 

1390 if (float_val := float(val)) != (int_val := int(float_val)): 

1391 raise ValueError 

1392 return int_val 

1393 except (ValueError, OverflowError): 

1394 raise AirflowConfigException( 

1395 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1396 f'Current value: "{val}".' 

1397 ) 

1398 

1399 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override] 

1400 """Get config value as float.""" 

1401 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1402 if val is None: 

1403 raise AirflowConfigException( 

1404 f"Failed to convert value None to float. " 

1405 f'Please check "{key}" key in "{section}" section is set.' 

1406 ) 

1407 try: 

1408 return float(val) 

1409 except ValueError: 

1410 raise AirflowConfigException( 

1411 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. ' 

1412 f'Current value: "{val}".' 

1413 ) 

1414 

1415 def getlist(self, section: str, key: str, delimiter=",", **kwargs): 

1416 """Get config value as list.""" 

1417 val = self.get(section, key, **kwargs) 

1418 

1419 if isinstance(val, list) or val is None: 

1420 # `get` will always return a (possibly-empty) string, so the only way we can 

1421 # have these types is with `fallback=` was specified. So just return it. 

1422 return val 

1423 

1424 if val == "": 

1425 return [] 

1426 

1427 try: 

1428 return [item.strip() for item in val.split(delimiter)] 

1429 except Exception: 

1430 raise AirflowConfigException( 

1431 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. ' 

1432 f'Current value: "{val}".' 

1433 ) 

1434 

1435 E = TypeVar("E", bound=Enum) 

1436 

1437 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E: 

1438 """Get config value as enum.""" 

1439 val = self.get(section, key, **kwargs) 

1440 enum_names = [enum_item.name for enum_item in enum_class] 

1441 

1442 if val is None: 

1443 raise AirflowConfigException( 

1444 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1445 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}' 

1446 ) 

1447 

1448 try: 

1449 return enum_class[val] 

1450 except KeyError: 

1451 if "fallback" in kwargs and kwargs["fallback"] in enum_names: 

1452 return enum_class[kwargs["fallback"]] 

1453 raise AirflowConfigException( 

1454 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1455 f"the value must be one of {', '.join(enum_names)}" 

1456 ) 

1457 

1458 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]: 

1459 """Get config value as list of enums.""" 

1460 kwargs.setdefault("fallback", []) 

1461 string_list = self.getlist(section, key, delimiter, **kwargs) 

1462 

1463 enum_names = [enum_item.name for enum_item in enum_class] 

1464 enum_list = [] 

1465 

1466 for val in string_list: 

1467 try: 

1468 enum_list.append(enum_class[val]) 

1469 except KeyError: 

1470 log.warning( 

1471 "Failed to convert value %r. Please check %s key in %s section. " 

1472 "it must be one of %s, if not the value is ignored", 

1473 val, 

1474 key, 

1475 section, 

1476 ", ".join(enum_names), 

1477 ) 

1478 

1479 return enum_list 

1480 

1481 def getimport(self, section: str, key: str, **kwargs) -> Any: 

1482 """ 

1483 Read options, import the full qualified name, and return the object. 

1484 

1485 In case of failure, it throws an exception with the key and section names 

1486 

1487 :return: The object or None, if the option is empty 

1488 """ 

1489 # Fixed: use self.get() instead of conf.get() 

1490 full_qualified_path = self.get(section=section, key=key, **kwargs) 

1491 if not full_qualified_path: 

1492 return None 

1493 

1494 try: 

1495 # Import here to avoid circular dependency 

1496 from ..module_loading import import_string 

1497 

1498 return import_string(full_qualified_path) 

1499 except ImportError as e: 

1500 log.warning(e) 

1501 raise AirflowConfigException( 

1502 f'The object could not be loaded. Please check "{key}" key in "{section}" section. ' 

1503 f'Current value: "{full_qualified_path}".' 

1504 ) 

1505 

1506 def getjson( 

1507 self, section: str, key: str, fallback=None, **kwargs 

1508 ) -> dict | list | str | int | float | None: 

1509 """ 

1510 Return a config value parsed from a JSON string. 

1511 

1512 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given. 

1513 """ 

1514 try: 

1515 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs) 

1516 except (NoSectionError, NoOptionError): 

1517 data = None 

1518 

1519 if data is None or data == "": 

1520 return fallback 

1521 

1522 try: 

1523 return json.loads(data) 

1524 except JSONDecodeError as e: 

1525 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e 

1526 

1527 def gettimedelta( 

1528 self, section: str, key: str, fallback: Any = None, **kwargs 

1529 ) -> datetime.timedelta | None: 

1530 """ 

1531 Get the config value for the given section and key, and convert it into datetime.timedelta object. 

1532 

1533 If the key is missing, then it is considered as `None`. 

1534 

1535 :param section: the section from the config 

1536 :param key: the key defined in the given section 

1537 :param fallback: fallback value when no config value is given, defaults to None 

1538 :raises AirflowConfigException: raised because ValueError or OverflowError 

1539 :return: datetime.timedelta(seconds=<config_value>) or None 

1540 """ 

1541 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs) 

1542 

1543 if val: 

1544 # the given value must be convertible to integer 

1545 try: 

1546 int_val = int(val) 

1547 except ValueError: 

1548 raise AirflowConfigException( 

1549 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1550 f'Current value: "{val}".' 

1551 ) 

1552 

1553 try: 

1554 return datetime.timedelta(seconds=int_val) 

1555 except OverflowError as err: 

1556 raise AirflowConfigException( 

1557 f"Failed to convert value to timedelta in `seconds`. " 

1558 f"{err}. " 

1559 f'Please check "{key}" key in "{section}" section. Current value: "{val}".' 

1560 ) 

1561 

1562 return fallback 

1563 

1564 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: 

1565 """Get mandatory config value, raising ValueError if not found.""" 

1566 value = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1567 if value is None: 

1568 raise ValueError(f"The value {section}/{key} should be set!") 

1569 return value 

1570 

1571 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]: 

1572 """Get mandatory config value as list, raising ValueError if not found.""" 

1573 value = self.getlist(section, key, **kwargs) 

1574 if value is None: 

1575 raise ValueError(f"The value {section}/{key} should be set!") 

1576 return value 

1577 

1578 def read( 

1579 self, 

1580 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike], 

1581 encoding: str | None = None, 

1582 ) -> list[str]: 

1583 return super().read(filenames=filenames, encoding=encoding) 

1584 

1585 def read_dict( # type: ignore[override] 

1586 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>" 

1587 ) -> None: 

1588 """ 

1589 We define a different signature here to add better type hints and checking. 

1590 

1591 :param dictionary: dictionary to read from 

1592 :param source: source to be used to store the configuration 

1593 :return: 

1594 """ 

1595 super().read_dict(dictionary=dictionary, source=source) 

1596 

1597 def _has_section_in_any_defaults(self, section: str) -> bool: 

1598 """Check if section exists in core defaults or provider fallback defaults.""" 

1599 if self._default_values.has_section(section): 

1600 return True 

1601 if self._use_providers_configuration: 

1602 if self._provider_cfg_config_fallback_default_values.has_section(section): 

1603 return True 

1604 if self._provider_metadata_config_fallback_default_values.has_section(section): 

1605 return True 

1606 return False 

1607 

1608 def get_sections_including_defaults(self) -> list[str]: 

1609 """ 

1610 Retrieve all sections from the configuration parser, including sections defined by built-in defaults. 

1611 

1612 :return: list of section names 

1613 """ 

1614 sections_from_config = self.sections() 

1615 sections_from_description = list(self.configuration_description.keys()) 

1616 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config))) 

1617 

1618 def get_options_including_defaults(self, section: str) -> list[str]: 

1619 """ 

1620 Retrieve all possible options from the configuration parser for the section given. 

1621 

1622 Includes options defined by built-in defaults. 

1623 

1624 :param section: section name 

1625 :return: list of option names for the section given 

1626 """ 

1627 my_own_options = self.options(section) if self.has_section(section) else [] 

1628 all_options_from_defaults = list( 

1629 self.configuration_description.get(section, {}).get("options", {}).keys() 

1630 ) 

1631 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options))) 

1632 

1633 def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True, **kwargs) -> bool: 

1634 """ 

1635 Check if option is defined. 

1636 

1637 Uses self.get() to avoid reimplementing the priority order of config variables 

1638 (env, config, cmd, defaults). 

1639 

1640 :param section: section to get option from 

1641 :param option: option to get 

1642 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections 

1643 :param kwargs: additional keyword arguments to pass to get(), such as team_name 

1644 :return: 

1645 """ 

1646 try: 

1647 value = self.get( 

1648 section, 

1649 option, 

1650 fallback=VALUE_NOT_FOUND_SENTINEL, 

1651 _extra_stacklevel=1, 

1652 suppress_warnings=True, 

1653 lookup_from_deprecated=lookup_from_deprecated, 

1654 **kwargs, 

1655 ) 

1656 if value is VALUE_NOT_FOUND_SENTINEL: 

1657 return False 

1658 return True 

1659 except (NoOptionError, NoSectionError, AirflowConfigException): 

1660 return False 

1661 

1662 def set(self, section: str, option: str, value: str | None = None) -> None: 

1663 """ 

1664 Set an option to the given value. 

1665 

1666 This override just makes sure the section and option are lower case, to match what we do in `get`. 

1667 """ 

1668 section = section.lower() 

1669 option = option.lower() 

1670 defaults = self.configuration_description or {} 

1671 if not self.has_section(section) and section in defaults: 

1672 # Trying to set a key in a section that exists in default, but not in the user config; 

1673 # automatically create it 

1674 self.add_section(section) 

1675 super().set(section, option, value) 

1676 

1677 def remove_option(self, section: str, option: str, remove_default: bool = True): 

1678 """ 

1679 Remove an option if it exists in config from a file or default config. 

1680 

1681 If both of config have the same option, this removes the option 

1682 in both configs unless remove_default=False. 

1683 """ 

1684 section = section.lower() 

1685 option = option.lower() 

1686 if super().has_option(section, option): 

1687 super().remove_option(section, option) 

1688 

1689 if remove_default and self._default_values.has_option(section, option): 

1690 self._default_values.remove_option(section, option) 

1691 

1692 def optionxform(self, optionstr: str) -> str: 

1693 """ 

1694 Transform option names on every read, get, or set operation. 

1695 

1696 This changes from the default behaviour of ConfigParser from lower-casing 

1697 to instead be case-preserving. 

1698 

1699 :param optionstr: 

1700 :return: 

1701 """ 

1702 return optionstr 

1703 

1704 def as_dict( 

1705 self, 

1706 display_source: bool = False, 

1707 display_sensitive: bool = False, 

1708 raw: bool = False, 

1709 include_env: bool = True, 

1710 include_cmds: bool = True, 

1711 include_secret: bool = True, 

1712 ) -> ConfigSourcesType: 

1713 """ 

1714 Return the current configuration as an OrderedDict of OrderedDicts. 

1715 

1716 When materializing current configuration Airflow defaults are 

1717 materialized along with user set configs. If any of the `include_*` 

1718 options are False then the result of calling command or secret key 

1719 configs do not override Airflow defaults and instead are passed through. 

1720 In order to then avoid Airflow defaults from overwriting user set 

1721 command or secret key configs we filter out bare sensitive_config_values 

1722 that are set to Airflow defaults when command or secret key configs 

1723 produce different values. 

1724 

1725 :param display_source: If False, the option value is returned. If True, 

1726 a tuple of (option_value, source) is returned. Source is either 

1727 'airflow.cfg', 'default', 'env var', or 'cmd'. 

1728 :param display_sensitive: If True, the values of options set by env 

1729 vars and bash commands will be displayed. If False, those options 

1730 are shown as '< hidden >' 

1731 :param raw: Should the values be output as interpolated values, or the 

1732 "raw" form that can be fed back in to ConfigParser 

1733 :param include_env: Should the value of configuration from AIRFLOW__ 

1734 environment variables be included or not 

1735 :param include_cmds: Should the result of calling any ``*_cmd`` config be 

1736 set (True, default), or should the _cmd options be left as the 

1737 command to run (False) 

1738 :param include_secret: Should the result of calling any ``*_secret`` config be 

1739 set (True, default), or should the _secret options be left as the 

1740 path to get the secret from (False) 

1741 :return: Dictionary, where the key is the name of the section and the content is 

1742 the dictionary with the name of the parameter and its value. 

1743 """ 

1744 if not display_sensitive: 

1745 # We want to hide the sensitive values at the appropriate methods 

1746 # since envs from cmds, secrets can be read at _include_envs method 

1747 if not all([include_env, include_cmds, include_secret]): 

1748 raise ValueError( 

1749 "If display_sensitive is false, then include_env, " 

1750 "include_cmds, include_secret must all be set as True" 

1751 ) 

1752 

1753 config_sources: ConfigSourcesType = {} 

1754 

1755 # We check sequentially all those sources and the last one we saw it in will "win" 

1756 configs = self._config_sources_for_as_dict 

1757 

1758 self._replace_config_with_display_sources( 

1759 config_sources, 

1760 configs, 

1761 self.configuration_description, 

1762 display_source, 

1763 raw, 

1764 self.deprecated_options, 

1765 include_cmds=include_cmds, 

1766 include_env=include_env, 

1767 include_secret=include_secret, 

1768 ) 

1769 

1770 # add env vars and overwrite because they have priority 

1771 if include_env: 

1772 self._include_envs(config_sources, display_sensitive, display_source, raw) 

1773 else: 

1774 self._filter_by_source(config_sources, display_source, self._get_env_var_option) 

1775 

1776 # add bash commands 

1777 if include_cmds: 

1778 self._include_commands(config_sources, display_sensitive, display_source, raw) 

1779 else: 

1780 self._filter_by_source(config_sources, display_source, self._get_cmd_option) 

1781 

1782 # add config from secret backends 

1783 if include_secret: 

1784 self._include_secrets(config_sources, display_sensitive, display_source, raw) 

1785 else: 

1786 self._filter_by_source(config_sources, display_source, self._get_secret_option) 

1787 

1788 if not display_sensitive: 

1789 # This ensures the ones from config file is hidden too 

1790 # if they are not provided through env, cmd and secret 

1791 hidden = "< hidden >" 

1792 for section, key in self.sensitive_config_values: 

1793 if config_sources.get(section): 

1794 if config_sources[section].get(key, None): 

1795 if display_source: 

1796 source = config_sources[section][key][1] 

1797 config_sources[section][key] = (hidden, source) 

1798 else: 

1799 config_sources[section][key] = hidden 

1800 

1801 return config_sources 

1802 

1803 def _write_option_header( 

1804 self, 

1805 file: IO[str], 

1806 option: str, 

1807 extra_spacing: bool, 

1808 include_descriptions: bool, 

1809 include_env_vars: bool, 

1810 include_examples: bool, 

1811 include_sources: bool, 

1812 section_config_description: dict[str, dict[str, Any]], 

1813 section_to_write: str, 

1814 sources_dict: ConfigSourcesType, 

1815 ) -> tuple[bool, bool]: 

1816 """ 

1817 Write header for configuration option. 

1818 

1819 Returns tuple of (should_continue, needs_separation) where needs_separation should be 

1820 set if the option needs additional separation to visually separate it from the next option. 

1821 """ 

1822 option_config_description = ( 

1823 section_config_description.get("options", {}).get(option, {}) 

1824 if section_config_description 

1825 else {} 

1826 ) 

1827 description = option_config_description.get("description") 

1828 needs_separation = False 

1829 if description and include_descriptions: 

1830 for line in description.splitlines(): 

1831 file.write(f"# {line}\n") 

1832 needs_separation = True 

1833 example = option_config_description.get("example") 

1834 if example is not None and include_examples: 

1835 if extra_spacing: 

1836 file.write("#\n") 

1837 example_lines = example.splitlines() 

1838 example = "\n# ".join(example_lines) 

1839 file.write(f"# Example: {option} = {example}\n") 

1840 needs_separation = True 

1841 if include_sources and sources_dict: 

1842 sources_section = sources_dict.get(section_to_write) 

1843 value_with_source = sources_section.get(option) if sources_section else None 

1844 if value_with_source is None: 

1845 file.write("#\n# Source: not defined\n") 

1846 else: 

1847 file.write(f"#\n# Source: {value_with_source[1]}\n") 

1848 needs_separation = True 

1849 if include_env_vars: 

1850 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n") 

1851 if extra_spacing: 

1852 file.write("#\n") 

1853 needs_separation = True 

1854 return True, needs_separation 

1855 

1856 def is_template(self, section: str, key) -> bool: 

1857 """ 

1858 Return whether the value is templated. 

1859 

1860 :param section: section of the config 

1861 :param key: key in the section 

1862 :return: True if the value is templated 

1863 """ 

1864 return _is_template(self.configuration_description, section, key) 

1865 

1866 def getsection(self, section: str, team_name: str | None = None) -> ConfigOptionsDictType | None: 

1867 """ 

1868 Return the section as a dict. 

1869 

1870 Values are converted to int, float, bool as required. 

1871 

1872 :param section: section from the config 

1873 :param team_name: optional team name for team-specific configuration lookup 

1874 """ 

1875 # Handle team-specific section lookup for config file 

1876 config_section = f"{team_name}={section}" if team_name else section 

1877 

1878 if not self.has_section(config_section) and not self._default_values.has_section(config_section): 

1879 return None 

1880 if self._default_values.has_section(config_section): 

1881 _section: ConfigOptionsDictType = dict(self._default_values.items(config_section)) 

1882 else: 

1883 _section = {} 

1884 

1885 if self.has_section(config_section): 

1886 _section.update(self.items(config_section)) 

1887 

1888 # Use section (not config_section) for env var lookup - team_name is handled by _env_var_name 

1889 section_prefix = self._env_var_name(section, "", team_name=team_name) 

1890 for env_var in sorted(os.environ.keys()): 

1891 if env_var.startswith(section_prefix): 

1892 key = env_var.replace(section_prefix, "") 

1893 if key.endswith("_CMD"): 

1894 key = key[:-4] 

1895 key = key.lower() 

1896 _section[key] = self._get_env_var_option(section, key, team_name=team_name) 

1897 

1898 for key, val in _section.items(): 

1899 if val is None: 

1900 raise AirflowConfigException( 

1901 f"Failed to convert value automatically. " 

1902 f'Please check "{key}" key in "{section}" section is set.' 

1903 ) 

1904 try: 

1905 _section[key] = int(val) 

1906 except ValueError: 

1907 try: 

1908 _section[key] = float(val) 

1909 except ValueError: 

1910 if isinstance(val, str) and val.lower() in ("t", "true"): 

1911 _section[key] = True 

1912 elif isinstance(val, str) and val.lower() in ("f", "false"): 

1913 _section[key] = False 

1914 return _section 

1915 

1916 @staticmethod 

1917 def _write_section_header( 

1918 file: IO[str], 

1919 include_descriptions: bool, 

1920 section_config_description: dict[str, str], 

1921 section_to_write: str, 

1922 ) -> None: 

1923 """Write header for configuration section.""" 

1924 file.write(f"[{section_to_write}]\n") 

1925 section_description = section_config_description.get("description") 

1926 if section_description and include_descriptions: 

1927 for line in section_description.splitlines(): 

1928 file.write(f"# {line}\n") 

1929 file.write("\n") 

1930 

1931 def _write_value( 

1932 self, 

1933 file: IO[str], 

1934 option: str, 

1935 comment_out_everything: bool, 

1936 needs_separation: bool, 

1937 only_defaults: bool, 

1938 section_to_write: str, 

1939 hide_sensitive: bool, 

1940 is_sensitive: bool, 

1941 show_values: bool = False, 

1942 ): 

1943 default_value = self.get_default_value(section_to_write, option, raw=True) 

1944 if only_defaults: 

1945 value = default_value 

1946 else: 

1947 value = self.get(section_to_write, option, fallback=default_value, raw=True) 

1948 if not show_values: 

1949 file.write(f"# {option} = \n") 

1950 else: 

1951 if hide_sensitive and is_sensitive: 

1952 value = "< hidden >" 

1953 else: 

1954 pass 

1955 if value is None: 

1956 file.write(f"# {option} = \n") 

1957 else: 

1958 if comment_out_everything: 

1959 value_lines = value.splitlines() 

1960 value = "\n# ".join(value_lines) 

1961 file.write(f"# {option} = {value}\n") 

1962 else: 

1963 if "\n" in value: 

1964 try: 

1965 value = json.dumps(json.loads(value), indent=4) 

1966 value = value.replace( 

1967 "\n", "\n " 

1968 ) # indent multi-line JSON to satisfy configparser format 

1969 except JSONDecodeError: 

1970 pass 

1971 file.write(f"{option} = {value}\n") 

1972 if needs_separation: 

1973 file.write("\n") 

1974 

1975 def write( # type: ignore[override] 

1976 self, 

1977 file: IO[str], 

1978 section: str | None = None, 

1979 include_examples: bool = True, 

1980 include_descriptions: bool = True, 

1981 include_sources: bool = True, 

1982 include_env_vars: bool = True, 

1983 include_providers: bool = True, 

1984 comment_out_everything: bool = False, 

1985 hide_sensitive: bool = False, 

1986 extra_spacing: bool = True, 

1987 only_defaults: bool = False, 

1988 show_values: bool = False, 

1989 **kwargs: Any, 

1990 ) -> None: 

1991 """ 

1992 Write configuration with comments and examples to a file. 

1993 

1994 :param file: file to write to 

1995 :param section: section of the config to write, defaults to all sections 

1996 :param include_examples: Include examples in the output 

1997 :param include_descriptions: Include descriptions in the output 

1998 :param include_sources: Include the source of each config option 

1999 :param include_env_vars: Include environment variables corresponding to each config option 

2000 :param include_providers: Include providers configuration 

2001 :param comment_out_everything: Comment out all values 

2002 :param hide_sensitive_values: Include sensitive values in the output 

2003 :param extra_spacing: Add extra spacing before examples and after variables 

2004 :param only_defaults: Only include default values when writing the config, not the actual values 

2005 """ 

2006 with self.make_sure_configuration_loaded(with_providers=include_providers): 

2007 sources_dict = {} 

2008 if include_sources: 

2009 sources_dict = self.as_dict(display_source=True) 

2010 for section_to_write in self.get_sections_including_defaults(): 

2011 section_config_description = self.configuration_description.get(section_to_write, {}) 

2012 if section_to_write != section and section is not None: 

2013 continue 

2014 if self._has_section_in_any_defaults(section_to_write) or self.has_section(section_to_write): 

2015 self._write_section_header( 

2016 file, include_descriptions, section_config_description, section_to_write 

2017 ) 

2018 for option in self.get_options_including_defaults(section_to_write): 

2019 should_continue, needs_separation = self._write_option_header( 

2020 file=file, 

2021 option=option, 

2022 extra_spacing=extra_spacing, 

2023 include_descriptions=include_descriptions, 

2024 include_env_vars=include_env_vars, 

2025 include_examples=include_examples, 

2026 include_sources=include_sources, 

2027 section_config_description=section_config_description, 

2028 section_to_write=section_to_write, 

2029 sources_dict=sources_dict, 

2030 ) 

2031 is_sensitive = ( 

2032 section_to_write.lower(), 

2033 option.lower(), 

2034 ) in self.sensitive_config_values 

2035 self._write_value( 

2036 file=file, 

2037 option=option, 

2038 comment_out_everything=comment_out_everything, 

2039 needs_separation=needs_separation, 

2040 only_defaults=only_defaults, 

2041 section_to_write=section_to_write, 

2042 hide_sensitive=hide_sensitive, 

2043 is_sensitive=is_sensitive, 

2044 show_values=show_values, 

2045 ) 

2046 if include_descriptions and not needs_separation: 

2047 # extra separation between sections in case last option did not need it 

2048 file.write("\n") 

2049 

2050 @contextmanager 

2051 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]: 

2052 """ 

2053 Make sure configuration is loaded with or without providers. 

2054 

2055 The context manager will only toggle the `self._use_providers_configuration` flag if `with_providers` is False, and will reset `self._use_providers_configuration` to True after the context block. 

2056 Nop for `with_providers=True` as the configuration already loads providers configuration by default. 

2057 

2058 :param with_providers: whether providers should be loaded 

2059 """ 

2060 if not with_providers: 

2061 self._use_providers_configuration = False 

2062 # Only invalidate cached properties that depend on _use_providers_configuration. 

2063 # Do NOT use invalidate_cache() here — it would also evict expensive provider-discovery 

2064 # caches (_provider_metadata_configuration_description, _provider_metadata_config_fallback_default_values) 

2065 # that don't depend on this flag. 

2066 self._invalidate_provider_flag_caches() 

2067 try: 

2068 yield 

2069 finally: 

2070 if not with_providers: 

2071 self._use_providers_configuration = True 

2072 self._invalidate_provider_flag_caches()