Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/_shared/configuration/parser.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

763 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Base configuration parser with pure parsing logic.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import datetime 

24import functools 

25import itertools 

26import json 

27import logging 

28import os 

29import shlex 

30import subprocess 

31import sys 

32import warnings 

33from collections.abc import Callable, Generator, Iterable 

34from configparser import ConfigParser, NoOptionError, NoSectionError 

35from contextlib import contextmanager 

36from enum import Enum 

37from json.decoder import JSONDecodeError 

38from re import Pattern 

39from typing import IO, Any, TypeVar, overload 

40 

41from .exceptions import AirflowConfigException 

42 

43log = logging.getLogger(__name__) 

44 

45 

46ConfigType = str | int | float | bool 

47ConfigOptionsDictType = dict[str, ConfigType] 

48ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

49ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

50ENV_VAR_PREFIX = "AIRFLOW__" 

51 

52 

53class ValueNotFound: 

54 """Object of this is raised when a configuration value cannot be found.""" 

55 

56 pass 

57 

58 

59VALUE_NOT_FOUND_SENTINEL = ValueNotFound() 

60 

61 

62@overload 

63def expand_env_var(env_var: None) -> None: ... 

64@overload 

65def expand_env_var(env_var: str) -> str: ... 

66 

67 

68def expand_env_var(env_var: str | None) -> str | None: 

69 """ 

70 Expand (potentially nested) env vars. 

71 

72 Repeat and apply `expandvars` and `expanduser` until 

73 interpolation stops having any effect. 

74 """ 

75 if not env_var or not isinstance(env_var, str): 

76 return env_var 

77 while True: 

78 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

79 if interpolated == env_var: 

80 return interpolated 

81 env_var = interpolated 

82 

83 

84def run_command(command: str) -> str: 

85 """Run command and returns stdout.""" 

86 process = subprocess.Popen( 

87 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

88 ) 

89 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

90 

91 if process.returncode != 0: 

92 raise AirflowConfigException( 

93 f"Cannot execute {command}. Error code is: {process.returncode}. " 

94 f"Output: {output}, Stderr: {stderr}" 

95 ) 

96 

97 return output 

98 

99 

100def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool: 

101 """ 

102 Check if the config is a template. 

103 

104 :param configuration_description: description of configuration 

105 :param section: section 

106 :param key: key 

107 :return: True if the config is a template 

108 """ 

109 return configuration_description.get(section, {}).get(key, {}).get("is_template", False) 

110 

111 

112def configure_parser_from_configuration_description( 

113 parser: ConfigParser, 

114 configuration_description: dict[str, dict[str, Any]], 

115 all_vars: dict[str, Any], 

116) -> None: 

117 """ 

118 Configure a ConfigParser based on configuration description. 

119 

120 :param parser: ConfigParser to configure 

121 :param configuration_description: configuration description from config.yml 

122 """ 

123 for section, section_desc in configuration_description.items(): 

124 parser.add_section(section) 

125 options = section_desc["options"] 

126 for key in options: 

127 default_value = options[key]["default"] 

128 is_template = options[key].get("is_template", False) 

129 if (default_value is not None) and not ( 

130 options[key].get("version_deprecated") or options[key].get("deprecation_reason") 

131 ): 

132 if is_template or not isinstance(default_value, str): 

133 parser.set(section, key, str(default_value)) 

134 else: 

135 try: 

136 parser.set(section, key, default_value.format(**all_vars)) 

137 except (KeyError, ValueError): 

138 parser.set(section, key, default_value) 

139 

140 

141class AirflowConfigParser(ConfigParser): 

142 """ 

143 Base configuration parser with pure parsing logic. 

144 

145 This class provides the core parsing methods that work with: 

146 - configuration_description: dict describing config options (required in __init__) 

147 - _default_values: ConfigParser with default values (required in __init__) 

148 - deprecated_options: class attribute mapping new -> old options 

149 - deprecated_sections: class attribute mapping new -> old sections 

150 """ 

151 

152 # A mapping of section -> setting -> { old, replace } for deprecated default values. 

153 # Subclasses can override this to define deprecated values that should be upgraded. 

154 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {} 

155 

156 # A mapping of (new section, new option) -> (old section, old option, since_version). 

157 # When reading new option, the old option will be checked to see if it exists. If it does a 

158 # DeprecationWarning will be issued and the old option will be used instead 

159 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { 

160 ("dag_processor", "dag_file_processor_timeout"): ("core", "dag_file_processor_timeout", "3.0"), 

161 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"), 

162 ("api", "base_url"): ("webserver", "base_url", "3.0"), 

163 ("api", "host"): ("webserver", "web_server_host", "3.0"), 

164 ("api", "port"): ("webserver", "web_server_port", "3.0"), 

165 ("api", "workers"): ("webserver", "workers", "3.0"), 

166 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"), 

167 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"), 

168 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"), 

169 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"), 

170 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"), 

171 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"), 

172 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"), 

173 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"), 

174 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"), 

175 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"), 

176 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"), 

177 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"), 

178 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"), 

179 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"), 

180 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"), 

181 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"), 

182 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"), 

183 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"), 

184 ("core", "num_dag_runs_to_retain_rendered_fields"): ( 

185 "core", 

186 "max_num_rendered_ti_fields_per_task", 

187 "3.2.0", 

188 ), 

189 ("api", "page_size"): ("webserver", "page_size", "3.1.0"), 

190 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"), 

191 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"), 

192 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"), 

193 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"), 

194 ("api", "log_config"): ("api", "access_logfile", "3.1.0"), 

195 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"), 

196 ("api", "fallback_page_limit"): ("api", "page_size", "3.2.0"), 

197 } 

198 

199 # A mapping of new section -> (old section, since_version). 

200 deprecated_sections: dict[str, tuple[str, str]] = {} 

201 

202 @property 

203 def _lookup_sequence(self) -> list[Callable]: 

204 """ 

205 Define the sequence of lookup methods for get(). The definition here does not have provider lookup. 

206 

207 Subclasses can override this to customise lookup order. 

208 """ 

209 return [ 

210 self._get_environment_variables, 

211 self._get_option_from_config_file, 

212 self._get_option_from_commands, 

213 self._get_option_from_secrets, 

214 self._get_option_from_defaults, 

215 ] 

216 

217 @property 

218 def _validators(self) -> list[Callable[[], None]]: 

219 """ 

220 Return list of validators defined on a config parser class. Base class will return an empty list. 

221 

222 Subclasses can override this to customize the validators that are run during validation on the 

223 config parser instance. 

224 """ 

225 return [] 

226 

227 def validate(self) -> None: 

228 """Run all registered validators.""" 

229 for validator in self._validators: 

230 validator() 

231 self.is_validated = True 

232 

233 def _validate_deprecated_values(self) -> None: 

234 """Validate and upgrade deprecated default values.""" 

235 for section, replacement in self.deprecated_values.items(): 

236 for name, info in replacement.items(): 

237 old, new = info 

238 current_value = self.get(section, name, fallback="") 

239 if self._using_old_value(old, current_value): 

240 self.upgraded_values[(section, name)] = current_value 

241 new_value = old.sub(new, current_value) 

242 self._update_env_var(section=section, name=name, new_value=new_value) 

243 self._create_future_warning( 

244 name=name, 

245 section=section, 

246 current_value=current_value, 

247 new_value=new_value, 

248 ) 

249 

250 def _using_old_value(self, old: Pattern, current_value: str) -> bool: 

251 """Check if current_value matches the old pattern.""" 

252 return old.search(current_value) is not None 

253 

254 def _update_env_var(self, section: str, name: str, new_value: str) -> None: 

255 """Update environment variable with new value.""" 

256 env_var = self._env_var_name(section, name) 

257 # Set it as an env var so that any subprocesses keep the same override! 

258 os.environ[env_var] = new_value 

259 

260 @staticmethod 

261 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None: 

262 """Create a FutureWarning for deprecated default values.""" 

263 warnings.warn( 

264 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. " 

265 f"This value has been changed to {new_value!r} in the running config, but please update your config.", 

266 FutureWarning, 

267 stacklevel=3, 

268 ) 

269 

270 def __init__( 

271 self, 

272 configuration_description: dict[str, dict[str, Any]], 

273 _default_values: ConfigParser, 

274 *args, 

275 **kwargs, 

276 ): 

277 """ 

278 Initialize the parser. 

279 

280 :param configuration_description: Description of configuration options 

281 :param _default_values: ConfigParser with default values 

282 """ 

283 super().__init__(*args, **kwargs) 

284 self.configuration_description = configuration_description 

285 self._default_values = _default_values 

286 self._suppress_future_warnings = False 

287 self.upgraded_values: dict[tuple[str, str], str] = {} 

288 

289 @functools.cached_property 

290 def inversed_deprecated_options(self): 

291 """Build inverse mapping from old options to new options.""" 

292 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()} 

293 

294 @functools.cached_property 

295 def inversed_deprecated_sections(self): 

296 """Build inverse mapping from old sections to new sections.""" 

297 return { 

298 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items() 

299 } 

300 

301 @functools.cached_property 

302 def sensitive_config_values(self) -> set[tuple[str, str]]: 

303 """Get set of sensitive config values that should be masked.""" 

304 flattened = { 

305 (s, k): item 

306 for s, s_c in self.configuration_description.items() 

307 for k, item in s_c.get("options", {}).items() 

308 } 

309 sensitive = { 

310 (section.lower(), key.lower()) 

311 for (section, key), v in flattened.items() 

312 if v.get("sensitive") is True 

313 } 

314 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options} 

315 depr_section = { 

316 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections 

317 } 

318 sensitive.update(depr_section, depr_option) 

319 return sensitive 

320 

321 def _update_defaults_from_string(self, config_string: str) -> None: 

322 """ 

323 Update the defaults in _default_values based on values in config_string ("ini" format). 

324 

325 Override shared parser's method to add validation for template variables. 

326 Note that those values are not validated and cannot contain variables because we are using 

327 regular config parser to load them. This method is used to test the config parser in unit tests. 

328 

329 :param config_string: ini-formatted config string 

330 """ 

331 parser = ConfigParser() 

332 parser.read_string(config_string) 

333 for section in parser.sections(): 

334 if section not in self._default_values.sections(): 

335 self._default_values.add_section(section) 

336 errors = False 

337 for key, value in parser.items(section): 

338 if not self.is_template(section, key) and "{" in value: 

339 errors = True 

340 log.error( 

341 "The %s.%s value %s read from string contains variable. This is not supported", 

342 section, 

343 key, 

344 value, 

345 ) 

346 self._default_values.set(section, key, value) 

347 if errors: 

348 raise AirflowConfigException( 

349 f"The string config passed as default contains variables. " 

350 f"This is not supported. String config: {config_string}" 

351 ) 

352 

353 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any: 

354 """ 

355 Retrieve default value from default config parser. 

356 

357 This will retrieve the default value from the default config parser. Optionally a raw, stored 

358 value can be retrieved by setting skip_interpolation to True. This is useful for example when 

359 we want to write the default value to a file, and we don't want the interpolation to happen 

360 as it is going to be done later when the config is read. 

361 

362 :param section: section of the config 

363 :param key: key to use 

364 :param fallback: fallback value to use 

365 :param raw: if raw, then interpolation will be reversed 

366 :param kwargs: other args 

367 :return: 

368 """ 

369 value = self._default_values.get(section, key, fallback=fallback, **kwargs) 

370 if raw and value is not None: 

371 return value.replace("%", "%%") 

372 return value 

373 

374 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None: 

375 """ 

376 Get Secret Backend if defined in airflow.cfg. 

377 

378 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

379 """ 

380 section = "workers" if worker_mode else "secrets" 

381 key = "secrets_backend" if worker_mode else "backend" 

382 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs" 

383 

384 secrets_backend_cls = self.getimport(section=section, key=key) 

385 

386 if not secrets_backend_cls: 

387 if worker_mode: 

388 # if we find no secrets backend for worker, return that of secrets backend 

389 secrets_backend_cls = self.getimport(section="secrets", key="backend") 

390 if not secrets_backend_cls: 

391 return None 

392 # When falling back to secrets backend, use its kwargs 

393 kwargs_key = "backend_kwargs" 

394 section = "secrets" 

395 else: 

396 return None 

397 

398 try: 

399 backend_kwargs = self.getjson(section=section, key=kwargs_key) 

400 if not backend_kwargs: 

401 backend_kwargs = {} 

402 elif not isinstance(backend_kwargs, dict): 

403 raise ValueError("not a dict") 

404 except AirflowConfigException: 

405 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key) 

406 backend_kwargs = {} 

407 except ValueError: 

408 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key) 

409 backend_kwargs = {} 

410 

411 return secrets_backend_cls(**backend_kwargs) 

412 

413 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

414 """ 

415 Get Config option values from Secret Backend. 

416 

417 Called by the shared parser's _get_secret_option() method as part of the lookup chain. 

418 Uses _get_custom_secret_backend() to get the backend instance. 

419 

420 :param config_key: the config key to retrieve 

421 :return: config value or None 

422 """ 

423 try: 

424 secrets_client = self._get_custom_secret_backend() 

425 if not secrets_client: 

426 return None 

427 return secrets_client.get_config(config_key) 

428 except Exception as e: 

429 raise AirflowConfigException( 

430 "Cannot retrieve config from alternative secrets backend. " 

431 "Make sure it is configured properly and that the Backend " 

432 "is accessible.\n" 

433 f"{e}" 

434 ) 

435 

436 def _get_cmd_option_from_config_sources( 

437 self, config_sources: ConfigSourcesType, section: str, key: str 

438 ) -> str | None: 

439 fallback_key = key + "_cmd" 

440 if (section, key) in self.sensitive_config_values: 

441 section_dict = config_sources.get(section) 

442 if section_dict is not None: 

443 command_value = section_dict.get(fallback_key) 

444 if command_value is not None: 

445 if isinstance(command_value, str): 

446 command = command_value 

447 else: 

448 command = command_value[0] 

449 return run_command(command) 

450 return None 

451 

452 def _get_secret_option_from_config_sources( 

453 self, config_sources: ConfigSourcesType, section: str, key: str 

454 ) -> str | None: 

455 fallback_key = key + "_secret" 

456 if (section, key) in self.sensitive_config_values: 

457 section_dict = config_sources.get(section) 

458 if section_dict is not None: 

459 secrets_path_value = section_dict.get(fallback_key) 

460 if secrets_path_value is not None: 

461 if isinstance(secrets_path_value, str): 

462 secrets_path = secrets_path_value 

463 else: 

464 secrets_path = secrets_path_value[0] 

465 return self._get_config_value_from_secret_backend(secrets_path) 

466 return None 

467 

468 def _include_secrets( 

469 self, 

470 config_sources: ConfigSourcesType, 

471 display_sensitive: bool, 

472 display_source: bool, 

473 raw: bool, 

474 ): 

475 for section, key in self.sensitive_config_values: 

476 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key) 

477 if value: 

478 if not display_sensitive: 

479 value = "< hidden >" 

480 if display_source: 

481 opt: str | tuple[str, str] = (value, "secret") 

482 elif raw: 

483 opt = value.replace("%", "%%") 

484 else: 

485 opt = value 

486 config_sources.setdefault(section, {}).update({key: opt}) 

487 del config_sources[section][key + "_secret"] 

488 

489 def _include_commands( 

490 self, 

491 config_sources: ConfigSourcesType, 

492 display_sensitive: bool, 

493 display_source: bool, 

494 raw: bool, 

495 ): 

496 for section, key in self.sensitive_config_values: 

497 opt = self._get_cmd_option_from_config_sources(config_sources, section, key) 

498 if not opt: 

499 continue 

500 opt_to_set: str | tuple[str, str] | None = opt 

501 if not display_sensitive: 

502 opt_to_set = "< hidden >" 

503 if display_source: 

504 opt_to_set = (str(opt_to_set), "cmd") 

505 elif raw: 

506 opt_to_set = str(opt_to_set).replace("%", "%%") 

507 if opt_to_set is not None: 

508 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set} 

509 config_sources.setdefault(section, {}).update(dict_to_update) 

510 del config_sources[section][key + "_cmd"] 

511 

512 def _include_envs( 

513 self, 

514 config_sources: ConfigSourcesType, 

515 display_sensitive: bool, 

516 display_source: bool, 

517 raw: bool, 

518 ): 

519 for env_var in [ 

520 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX) 

521 ]: 

522 try: 

523 _, section, key = env_var.split("__", 2) 

524 opt = self._get_env_var_option(section, key) 

525 except ValueError: 

526 continue 

527 if opt is None: 

528 log.warning("Ignoring unknown env var '%s'", env_var) 

529 continue 

530 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"): 

531 # Don't hide cmd/secret values here 

532 if not env_var.lower().endswith(("cmd", "secret")): 

533 if (section, key) in self.sensitive_config_values: 

534 opt = "< hidden >" 

535 elif raw: 

536 opt = opt.replace("%", "%%") 

537 if display_source: 

538 opt = (opt, "env var") 

539 

540 section = section.lower() 

541 key = key.lower() 

542 config_sources.setdefault(section, {}).update({key: opt}) 

543 

544 def _filter_by_source( 

545 self, 

546 config_sources: ConfigSourcesType, 

547 display_source: bool, 

548 getter_func, 

549 ): 

550 """ 

551 Delete default configs from current configuration. 

552 

553 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values. 

554 

555 This is necessary because bare configs take precedence over the command 

556 or secret key equivalents so if the current running config is 

557 materialized with Airflow defaults they in turn override user set 

558 command or secret key configs. 

559 

560 :param config_sources: The current configuration to operate on 

561 :param display_source: If False, configuration options contain raw 

562 values. If True, options are a tuple of (option_value, source). 

563 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'. 

564 :param getter_func: A callback function that gets the user configured 

565 override value for a particular sensitive_config_values config. 

566 :return: None, the given config_sources is filtered if necessary, 

567 otherwise untouched. 

568 """ 

569 for section, key in self.sensitive_config_values: 

570 # Don't bother if we don't have section / key 

571 if section not in config_sources or key not in config_sources[section]: 

572 continue 

573 # Check that there is something to override defaults 

574 try: 

575 getter_opt = getter_func(section, key) 

576 except ValueError: 

577 continue 

578 if not getter_opt: 

579 continue 

580 # Check to see that there is a default value 

581 if self.get_default_value(section, key) is None: 

582 continue 

583 # Check to see if bare setting is the same as defaults 

584 if display_source: 

585 # when display_source = true, we know that the config_sources contains tuple 

586 opt, source = config_sources[section][key] # type: ignore 

587 else: 

588 opt = config_sources[section][key] 

589 if opt == self.get_default_value(section, key): 

590 del config_sources[section][key] 

591 

592 @staticmethod 

593 def _deprecated_value_is_set_in_config( 

594 deprecated_section: str, 

595 deprecated_key: str, 

596 configs: Iterable[tuple[str, ConfigParser]], 

597 ) -> bool: 

598 for config_type, config in configs: 

599 if config_type != "default": 

600 with contextlib.suppress(NoSectionError): 

601 deprecated_section_array = config.items(section=deprecated_section, raw=True) 

602 if any(key == deprecated_key for key, _ in deprecated_section_array): 

603 return True 

604 return False 

605 

606 @staticmethod 

607 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

608 return ( 

609 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}") 

610 is not None 

611 ) 

612 

613 @staticmethod 

614 def _deprecated_command_is_set_in_config( 

615 deprecated_section: str, 

616 deprecated_key: str, 

617 configs: Iterable[tuple[str, ConfigParser]], 

618 ) -> bool: 

619 return AirflowConfigParser._deprecated_value_is_set_in_config( 

620 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs 

621 ) 

622 

623 @staticmethod 

624 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

625 return ( 

626 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD") 

627 is not None 

628 ) 

629 

630 @staticmethod 

631 def _deprecated_secret_is_set_in_config( 

632 deprecated_section: str, 

633 deprecated_key: str, 

634 configs: Iterable[tuple[str, ConfigParser]], 

635 ) -> bool: 

636 return AirflowConfigParser._deprecated_value_is_set_in_config( 

637 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs 

638 ) 

639 

640 @staticmethod 

641 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

642 return ( 

643 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET") 

644 is not None 

645 ) 

646 

647 @staticmethod 

648 def _replace_config_with_display_sources( 

649 config_sources: ConfigSourcesType, 

650 configs: Iterable[tuple[str, ConfigParser]], 

651 configuration_description: dict[str, dict[str, Any]], 

652 display_source: bool, 

653 raw: bool, 

654 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

655 include_env: bool, 

656 include_cmds: bool, 

657 include_secret: bool, 

658 ): 

659 for source_name, config in configs: 

660 sections = config.sections() 

661 for section in sections: 

662 AirflowConfigParser._replace_section_config_with_display_sources( 

663 config, 

664 config_sources, 

665 configuration_description, 

666 display_source, 

667 raw, 

668 section, 

669 source_name, 

670 deprecated_options, 

671 configs, 

672 include_env=include_env, 

673 include_cmds=include_cmds, 

674 include_secret=include_secret, 

675 ) 

676 

677 @staticmethod 

678 def _replace_section_config_with_display_sources( 

679 config: ConfigParser, 

680 config_sources: ConfigSourcesType, 

681 configuration_description: dict[str, dict[str, Any]], 

682 display_source: bool, 

683 raw: bool, 

684 section: str, 

685 source_name: str, 

686 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

687 configs: Iterable[tuple[str, ConfigParser]], 

688 include_env: bool, 

689 include_cmds: bool, 

690 include_secret: bool, 

691 ): 

692 sect = config_sources.setdefault(section, {}) 

693 if isinstance(config, AirflowConfigParser): 

694 with config.suppress_future_warnings(): 

695 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw) 

696 else: 

697 items = config.items(section=section, raw=raw) 

698 for k, val in items: 

699 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None)) 

700 if deprecated_section and deprecated_key: 

701 if source_name == "default": 

702 # If deprecated entry has some non-default value set for any of the sources requested, 

703 # We should NOT set default for the new entry (because it will override anything 

704 # coming from the deprecated ones) 

705 if AirflowConfigParser._deprecated_value_is_set_in_config( 

706 deprecated_section, deprecated_key, configs 

707 ): 

708 continue 

709 if include_env and AirflowConfigParser._deprecated_variable_is_set( 

710 deprecated_section, deprecated_key 

711 ): 

712 continue 

713 if include_cmds and ( 

714 AirflowConfigParser._deprecated_variable_command_is_set( 

715 deprecated_section, deprecated_key 

716 ) 

717 or AirflowConfigParser._deprecated_command_is_set_in_config( 

718 deprecated_section, deprecated_key, configs 

719 ) 

720 ): 

721 continue 

722 if include_secret and ( 

723 AirflowConfigParser._deprecated_variable_secret_is_set( 

724 deprecated_section, deprecated_key 

725 ) 

726 or AirflowConfigParser._deprecated_secret_is_set_in_config( 

727 deprecated_section, deprecated_key, configs 

728 ) 

729 ): 

730 continue 

731 if display_source: 

732 updated_source_name = source_name 

733 if source_name == "default": 

734 # defaults can come from other sources (default-<PROVIDER>) that should be used here 

735 source_description_section = configuration_description.get(section, {}) 

736 source_description_key = source_description_section.get("options", {}).get(k, {}) 

737 if source_description_key is not None: 

738 updated_source_name = source_description_key.get("source", source_name) 

739 sect[k] = (val, updated_source_name) 

740 else: 

741 sect[k] = val 

742 

743 def _warn_deprecate( 

744 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int 

745 ): 

746 """Warn about deprecated config option usage.""" 

747 if section == deprecated_section: 

748 warnings.warn( 

749 f"The {deprecated_name} option in [{section}] has been renamed to {key} - " 

750 f"the old setting has been used, but please update your config.", 

751 DeprecationWarning, 

752 stacklevel=4 + extra_stacklevel, 

753 ) 

754 else: 

755 warnings.warn( 

756 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option " 

757 f"in [{section}] - the old setting has been used, but please update your config.", 

758 DeprecationWarning, 

759 stacklevel=4 + extra_stacklevel, 

760 ) 

761 

762 @contextmanager 

763 def suppress_future_warnings(self): 

764 """ 

765 Context manager to temporarily suppress future warnings. 

766 

767 This is a stub used by the shared parser's lookup methods when checking deprecated options. 

768 Subclasses can override this to customize warning suppression behavior. 

769 

770 :return: context manager that suppresses future warnings 

771 """ 

772 suppress_future_warnings = self._suppress_future_warnings 

773 self._suppress_future_warnings = True 

774 yield self 

775 self._suppress_future_warnings = suppress_future_warnings 

776 

777 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str: 

778 """Generate environment variable name for a config option.""" 

779 team_component: str = f"{team_name.upper()}___" if team_name else "" 

780 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}" 

781 

782 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None): 

783 """Get config option from environment variable.""" 

784 env_var: str = self._env_var_name(section, key, team_name=team_name) 

785 if env_var in os.environ: 

786 return expand_env_var(os.environ[env_var]) 

787 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command) 

788 env_var_cmd = env_var + "_CMD" 

789 if env_var_cmd in os.environ: 

790 # if this is a valid command key... 

791 if (section, key) in self.sensitive_config_values: 

792 return run_command(os.environ[env_var_cmd]) 

793 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend) 

794 env_var_secret_path = env_var + "_SECRET" 

795 if env_var_secret_path in os.environ: 

796 # if this is a valid secret path... 

797 if (section, key) in self.sensitive_config_values: 

798 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path]) 

799 return None 

800 

801 def _get_cmd_option(self, section: str, key: str): 

802 """Get config option from command execution.""" 

803 fallback_key = key + "_cmd" 

804 if (section, key) in self.sensitive_config_values: 

805 if super().has_option(section, fallback_key): 

806 command = super().get(section, fallback_key) 

807 try: 

808 cmd_output = run_command(command) 

809 except AirflowConfigException as e: 

810 raise e 

811 except Exception as e: 

812 raise AirflowConfigException( 

813 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd." 

814 f" Please check the {fallback_key} value." 

815 ) from e 

816 return cmd_output 

817 return None 

818 

819 def _get_secret_option(self, section: str, key: str) -> str | None: 

820 """Get Config option values from Secret Backend.""" 

821 fallback_key = key + "_secret" 

822 if (section, key) in self.sensitive_config_values: 

823 if super().has_option(section, fallback_key): 

824 secrets_path = super().get(section, fallback_key) 

825 return self._get_config_value_from_secret_backend(secrets_path) 

826 return None 

827 

828 def _get_environment_variables( 

829 self, 

830 deprecated_key: str | None, 

831 deprecated_section: str | None, 

832 key: str, 

833 section: str, 

834 issue_warning: bool = True, 

835 extra_stacklevel: int = 0, 

836 **kwargs, 

837 ) -> str | ValueNotFound: 

838 """Get config option from environment variables.""" 

839 team_name = kwargs.get("team_name", None) 

840 option = self._get_env_var_option(section, key, team_name=team_name) 

841 if option is not None: 

842 return option 

843 if deprecated_section and deprecated_key: 

844 with self.suppress_future_warnings(): 

845 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name) 

846 if option is not None: 

847 if issue_warning: 

848 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

849 return option 

850 return VALUE_NOT_FOUND_SENTINEL 

851 

852 def _get_option_from_config_file( 

853 self, 

854 deprecated_key: str | None, 

855 deprecated_section: str | None, 

856 key: str, 

857 section: str, 

858 issue_warning: bool = True, 

859 extra_stacklevel: int = 0, 

860 **kwargs, 

861 ) -> str | ValueNotFound: 

862 """Get config option from config file.""" 

863 if team_name := kwargs.get("team_name", None): 

864 section = f"{team_name}={section}" 

865 # since this is the last lookup that supports team_name, pop it 

866 kwargs.pop("team_name") 

867 if super().has_option(section, key): 

868 return expand_env_var(super().get(section, key, **kwargs)) 

869 if deprecated_section and deprecated_key: 

870 if super().has_option(deprecated_section, deprecated_key): 

871 if issue_warning: 

872 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

873 with self.suppress_future_warnings(): 

874 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs)) 

875 return VALUE_NOT_FOUND_SENTINEL 

876 

877 def _get_option_from_commands( 

878 self, 

879 deprecated_key: str | None, 

880 deprecated_section: str | None, 

881 key: str, 

882 section: str, 

883 issue_warning: bool = True, 

884 extra_stacklevel: int = 0, 

885 **kwargs, 

886 ) -> str | ValueNotFound: 

887 """Get config option from command execution.""" 

888 option = self._get_cmd_option(section, key) 

889 if option: 

890 return option 

891 if deprecated_section and deprecated_key: 

892 with self.suppress_future_warnings(): 

893 option = self._get_cmd_option(deprecated_section, deprecated_key) 

894 if option: 

895 if issue_warning: 

896 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

897 return option 

898 return VALUE_NOT_FOUND_SENTINEL 

899 

900 def _get_option_from_secrets( 

901 self, 

902 deprecated_key: str | None, 

903 deprecated_section: str | None, 

904 key: str, 

905 section: str, 

906 issue_warning: bool = True, 

907 extra_stacklevel: int = 0, 

908 **kwargs, 

909 ) -> str | ValueNotFound: 

910 """Get config option from secrets backend.""" 

911 option = self._get_secret_option(section, key) 

912 if option: 

913 return option 

914 if deprecated_section and deprecated_key: 

915 with self.suppress_future_warnings(): 

916 option = self._get_secret_option(deprecated_section, deprecated_key) 

917 if option: 

918 if issue_warning: 

919 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

920 return option 

921 return VALUE_NOT_FOUND_SENTINEL 

922 

923 def _get_option_from_defaults( 

924 self, 

925 deprecated_key: str | None, 

926 deprecated_section: str | None, 

927 key: str, 

928 section: str, 

929 issue_warning: bool = True, 

930 extra_stacklevel: int = 0, 

931 team_name: str | None = None, 

932 **kwargs, 

933 ) -> str | ValueNotFound: 

934 """Get config option from default values.""" 

935 if self.get_default_value(section, key) is not None or "fallback" in kwargs: 

936 return expand_env_var(self.get_default_value(section, key, **kwargs)) 

937 return VALUE_NOT_FOUND_SENTINEL 

938 

939 def _resolve_deprecated_lookup( 

940 self, 

941 section: str, 

942 key: str, 

943 lookup_from_deprecated: bool, 

944 extra_stacklevel: int = 0, 

945 ) -> tuple[str, str, str | None, str | None, bool]: 

946 """ 

947 Resolve deprecated section/key mappings and determine deprecated values. 

948 

949 :param section: Section name (will be lowercased) 

950 :param key: Key name (will be lowercased) 

951 :param lookup_from_deprecated: Whether to lookup from deprecated options 

952 :param extra_stacklevel: Extra stack level for warnings 

953 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted) 

954 """ 

955 section = section.lower() 

956 key = key.lower() 

957 warning_emitted = False 

958 deprecated_section: str | None = None 

959 deprecated_key: str | None = None 

960 

961 if not lookup_from_deprecated: 

962 return section, key, deprecated_section, deprecated_key, warning_emitted 

963 

964 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {}) 

965 if option_description.get("deprecated"): 

966 deprecation_reason = option_description.get("deprecation_reason", "") 

967 warnings.warn( 

968 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}", 

969 DeprecationWarning, 

970 stacklevel=2 + extra_stacklevel, 

971 ) 

972 # For the cases in which we rename whole sections 

973 if section in self.inversed_deprecated_sections: 

974 deprecated_section, deprecated_key = (section, key) 

975 section = self.inversed_deprecated_sections[section] 

976 if not self._suppress_future_warnings: 

977 warnings.warn( 

978 f"The config section [{deprecated_section}] has been renamed to " 

979 f"[{section}]. Please update your `conf.get*` call to use the new name", 

980 FutureWarning, 

981 stacklevel=2 + extra_stacklevel, 

982 ) 

983 # Don't warn about individual rename if the whole section is renamed 

984 warning_emitted = True 

985 elif (section, key) in self.inversed_deprecated_options: 

986 # Handle using deprecated section/key instead of the new section/key 

987 new_section, new_key = self.inversed_deprecated_options[(section, key)] 

988 if not self._suppress_future_warnings and not warning_emitted: 

989 warnings.warn( 

990 f"section/key [{section}/{key}] has been deprecated, you should use" 

991 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the " 

992 "new name", 

993 FutureWarning, 

994 stacklevel=2 + extra_stacklevel, 

995 ) 

996 warning_emitted = True 

997 deprecated_section, deprecated_key = section, key 

998 section, key = (new_section, new_key) 

999 elif section in self.deprecated_sections: 

1000 # When accessing the new section name, make sure we check under the old config name 

1001 deprecated_key = key 

1002 deprecated_section = self.deprecated_sections[section][0] 

1003 else: 

1004 deprecated_section, deprecated_key, _ = self.deprecated_options.get( 

1005 (section, key), (None, None, None) 

1006 ) 

1007 

1008 return section, key, deprecated_section, deprecated_key, warning_emitted 

1009 

1010 @overload # type: ignore[override] 

1011 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... 

1012 

1013 @overload # type: ignore[override] 

1014 def get(self, section: str, key: str, **kwargs) -> str | None: ... 

1015 

1016 def get( # type: ignore[misc, override] 

1017 self, 

1018 section: str, 

1019 key: str, 

1020 suppress_warnings: bool = False, 

1021 lookup_from_deprecated: bool = True, 

1022 _extra_stacklevel: int = 0, 

1023 team_name: str | None = None, 

1024 **kwargs, 

1025 ) -> str | None: 

1026 """ 

1027 Get config value by iterating through lookup sequence. 

1028 

1029 Priority order is defined by _lookup_sequence property. 

1030 """ 

1031 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup( 

1032 section=section, 

1033 key=key, 

1034 lookup_from_deprecated=lookup_from_deprecated, 

1035 extra_stacklevel=_extra_stacklevel, 

1036 ) 

1037 

1038 if team_name is not None: 

1039 kwargs["team_name"] = team_name 

1040 

1041 for lookup_method in self._lookup_sequence: 

1042 value = lookup_method( 

1043 deprecated_key=deprecated_key, 

1044 deprecated_section=deprecated_section, 

1045 key=key, 

1046 section=section, 

1047 issue_warning=not warning_emitted, 

1048 extra_stacklevel=_extra_stacklevel, 

1049 **kwargs, 

1050 ) 

1051 if value is not VALUE_NOT_FOUND_SENTINEL: 

1052 return value 

1053 

1054 # Check if fallback was explicitly provided (even if None) 

1055 if "fallback" in kwargs: 

1056 return kwargs["fallback"] 

1057 

1058 if not suppress_warnings: 

1059 log.warning("section/key [%s/%s] not found in config", section, key) 

1060 

1061 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config") 

1062 

1063 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override] 

1064 """Get config value as boolean.""" 

1065 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip() 

1066 if "#" in val: 

1067 val = val.split("#")[0].strip() 

1068 if val in ("t", "true", "1"): 

1069 return True 

1070 if val in ("f", "false", "0"): 

1071 return False 

1072 raise AirflowConfigException( 

1073 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. ' 

1074 f'Current value: "{val}".' 

1075 ) 

1076 

1077 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override] 

1078 """Get config value as integer.""" 

1079 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1080 if val is None: 

1081 raise AirflowConfigException( 

1082 f"Failed to convert value None to int. " 

1083 f'Please check "{key}" key in "{section}" section is set.' 

1084 ) 

1085 try: 

1086 return int(val) 

1087 except ValueError: 

1088 try: 

1089 if (float_val := float(val)) != (int_val := int(float_val)): 

1090 raise ValueError 

1091 return int_val 

1092 except (ValueError, OverflowError): 

1093 raise AirflowConfigException( 

1094 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1095 f'Current value: "{val}".' 

1096 ) 

1097 

1098 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override] 

1099 """Get config value as float.""" 

1100 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1101 if val is None: 

1102 raise AirflowConfigException( 

1103 f"Failed to convert value None to float. " 

1104 f'Please check "{key}" key in "{section}" section is set.' 

1105 ) 

1106 try: 

1107 return float(val) 

1108 except ValueError: 

1109 raise AirflowConfigException( 

1110 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. ' 

1111 f'Current value: "{val}".' 

1112 ) 

1113 

1114 def getlist(self, section: str, key: str, delimiter=",", **kwargs): 

1115 """Get config value as list.""" 

1116 val = self.get(section, key, **kwargs) 

1117 

1118 if isinstance(val, list) or val is None: 

1119 # `get` will always return a (possibly-empty) string, so the only way we can 

1120 # have these types is with `fallback=` was specified. So just return it. 

1121 return val 

1122 

1123 if val == "": 

1124 return [] 

1125 

1126 try: 

1127 return [item.strip() for item in val.split(delimiter)] 

1128 except Exception: 

1129 raise AirflowConfigException( 

1130 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. ' 

1131 f'Current value: "{val}".' 

1132 ) 

1133 

1134 E = TypeVar("E", bound=Enum) 

1135 

1136 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E: 

1137 """Get config value as enum.""" 

1138 val = self.get(section, key, **kwargs) 

1139 enum_names = [enum_item.name for enum_item in enum_class] 

1140 

1141 if val is None: 

1142 raise AirflowConfigException( 

1143 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1144 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}' 

1145 ) 

1146 

1147 try: 

1148 return enum_class[val] 

1149 except KeyError: 

1150 if "fallback" in kwargs and kwargs["fallback"] in enum_names: 

1151 return enum_class[kwargs["fallback"]] 

1152 raise AirflowConfigException( 

1153 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1154 f"the value must be one of {', '.join(enum_names)}" 

1155 ) 

1156 

1157 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]: 

1158 """Get config value as list of enums.""" 

1159 kwargs.setdefault("fallback", []) 

1160 string_list = self.getlist(section, key, delimiter, **kwargs) 

1161 

1162 enum_names = [enum_item.name for enum_item in enum_class] 

1163 enum_list = [] 

1164 

1165 for val in string_list: 

1166 try: 

1167 enum_list.append(enum_class[val]) 

1168 except KeyError: 

1169 log.warning( 

1170 "Failed to convert value %r. Please check %s key in %s section. " 

1171 "it must be one of %s, if not the value is ignored", 

1172 val, 

1173 key, 

1174 section, 

1175 ", ".join(enum_names), 

1176 ) 

1177 

1178 return enum_list 

1179 

1180 def getimport(self, section: str, key: str, **kwargs) -> Any: 

1181 """ 

1182 Read options, import the full qualified name, and return the object. 

1183 

1184 In case of failure, it throws an exception with the key and section names 

1185 

1186 :return: The object or None, if the option is empty 

1187 """ 

1188 # Fixed: use self.get() instead of conf.get() 

1189 full_qualified_path = self.get(section=section, key=key, **kwargs) 

1190 if not full_qualified_path: 

1191 return None 

1192 

1193 try: 

1194 # Import here to avoid circular dependency 

1195 from ..module_loading import import_string 

1196 

1197 return import_string(full_qualified_path) 

1198 except ImportError as e: 

1199 log.warning(e) 

1200 raise AirflowConfigException( 

1201 f'The object could not be loaded. Please check "{key}" key in "{section}" section. ' 

1202 f'Current value: "{full_qualified_path}".' 

1203 ) 

1204 

1205 def getjson( 

1206 self, section: str, key: str, fallback=None, **kwargs 

1207 ) -> dict | list | str | int | float | None: 

1208 """ 

1209 Return a config value parsed from a JSON string. 

1210 

1211 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given. 

1212 """ 

1213 try: 

1214 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs) 

1215 except (NoSectionError, NoOptionError): 

1216 data = None 

1217 

1218 if data is None or data == "": 

1219 return fallback 

1220 

1221 try: 

1222 return json.loads(data) 

1223 except JSONDecodeError as e: 

1224 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e 

1225 

1226 def gettimedelta( 

1227 self, section: str, key: str, fallback: Any = None, **kwargs 

1228 ) -> datetime.timedelta | None: 

1229 """ 

1230 Get the config value for the given section and key, and convert it into datetime.timedelta object. 

1231 

1232 If the key is missing, then it is considered as `None`. 

1233 

1234 :param section: the section from the config 

1235 :param key: the key defined in the given section 

1236 :param fallback: fallback value when no config value is given, defaults to None 

1237 :raises AirflowConfigException: raised because ValueError or OverflowError 

1238 :return: datetime.timedelta(seconds=<config_value>) or None 

1239 """ 

1240 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs) 

1241 

1242 if val: 

1243 # the given value must be convertible to integer 

1244 try: 

1245 int_val = int(val) 

1246 except ValueError: 

1247 raise AirflowConfigException( 

1248 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1249 f'Current value: "{val}".' 

1250 ) 

1251 

1252 try: 

1253 return datetime.timedelta(seconds=int_val) 

1254 except OverflowError as err: 

1255 raise AirflowConfigException( 

1256 f"Failed to convert value to timedelta in `seconds`. " 

1257 f"{err}. " 

1258 f'Please check "{key}" key in "{section}" section. Current value: "{val}".' 

1259 ) 

1260 

1261 return fallback 

1262 

1263 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: 

1264 """Get mandatory config value, raising ValueError if not found.""" 

1265 value = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1266 if value is None: 

1267 raise ValueError(f"The value {section}/{key} should be set!") 

1268 return value 

1269 

1270 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]: 

1271 """Get mandatory config value as list, raising ValueError if not found.""" 

1272 value = self.getlist(section, key, **kwargs) 

1273 if value is None: 

1274 raise ValueError(f"The value {section}/{key} should be set!") 

1275 return value 

1276 

1277 def read( 

1278 self, 

1279 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike], 

1280 encoding: str | None = None, 

1281 ) -> list[str]: 

1282 return super().read(filenames=filenames, encoding=encoding) 

1283 

1284 def read_dict( # type: ignore[override] 

1285 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>" 

1286 ) -> None: 

1287 """ 

1288 We define a different signature here to add better type hints and checking. 

1289 

1290 :param dictionary: dictionary to read from 

1291 :param source: source to be used to store the configuration 

1292 :return: 

1293 """ 

1294 super().read_dict(dictionary=dictionary, source=source) 

1295 

1296 def get_sections_including_defaults(self) -> list[str]: 

1297 """ 

1298 Retrieve all sections from the configuration parser, including sections defined by built-in defaults. 

1299 

1300 :return: list of section names 

1301 """ 

1302 sections_from_config = self.sections() 

1303 sections_from_description = list(self.configuration_description.keys()) 

1304 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config))) 

1305 

1306 def get_options_including_defaults(self, section: str) -> list[str]: 

1307 """ 

1308 Retrieve all possible options from the configuration parser for the section given. 

1309 

1310 Includes options defined by built-in defaults. 

1311 

1312 :param section: section name 

1313 :return: list of option names for the section given 

1314 """ 

1315 my_own_options = self.options(section) if self.has_section(section) else [] 

1316 all_options_from_defaults = list( 

1317 self.configuration_description.get(section, {}).get("options", {}).keys() 

1318 ) 

1319 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options))) 

1320 

1321 def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True, **kwargs) -> bool: 

1322 """ 

1323 Check if option is defined. 

1324 

1325 Uses self.get() to avoid reimplementing the priority order of config variables 

1326 (env, config, cmd, defaults). 

1327 

1328 :param section: section to get option from 

1329 :param option: option to get 

1330 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections 

1331 :param kwargs: additional keyword arguments to pass to get(), such as team_name 

1332 :return: 

1333 """ 

1334 try: 

1335 value = self.get( 

1336 section, 

1337 option, 

1338 fallback=None, 

1339 _extra_stacklevel=1, 

1340 suppress_warnings=True, 

1341 lookup_from_deprecated=lookup_from_deprecated, 

1342 **kwargs, 

1343 ) 

1344 if value is None: 

1345 return False 

1346 return True 

1347 except (NoOptionError, NoSectionError, AirflowConfigException): 

1348 return False 

1349 

1350 def set(self, section: str, option: str, value: str | None = None) -> None: 

1351 """ 

1352 Set an option to the given value. 

1353 

1354 This override just makes sure the section and option are lower case, to match what we do in `get`. 

1355 """ 

1356 section = section.lower() 

1357 option = option.lower() 

1358 defaults = self.configuration_description or {} 

1359 if not self.has_section(section) and section in defaults: 

1360 # Trying to set a key in a section that exists in default, but not in the user config; 

1361 # automatically create it 

1362 self.add_section(section) 

1363 super().set(section, option, value) 

1364 

1365 def remove_option(self, section: str, option: str, remove_default: bool = True): 

1366 """ 

1367 Remove an option if it exists in config from a file or default config. 

1368 

1369 If both of config have the same option, this removes the option 

1370 in both configs unless remove_default=False. 

1371 """ 

1372 section = section.lower() 

1373 option = option.lower() 

1374 if super().has_option(section, option): 

1375 super().remove_option(section, option) 

1376 

1377 if self.get_default_value(section, option) is not None and remove_default: 

1378 self._default_values.remove_option(section, option) 

1379 

1380 def optionxform(self, optionstr: str) -> str: 

1381 """ 

1382 Transform option names on every read, get, or set operation. 

1383 

1384 This changes from the default behaviour of ConfigParser from lower-casing 

1385 to instead be case-preserving. 

1386 

1387 :param optionstr: 

1388 :return: 

1389 """ 

1390 return optionstr 

1391 

1392 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: 

1393 """ 

1394 Get list of config sources to use in as_dict(). 

1395 

1396 Subclasses can override to add additional sources (e.g., provider configs). 

1397 """ 

1398 return [ 

1399 ("default", self._default_values), 

1400 ("airflow.cfg", self), 

1401 ] 

1402 

1403 def as_dict( 

1404 self, 

1405 display_source: bool = False, 

1406 display_sensitive: bool = False, 

1407 raw: bool = False, 

1408 include_env: bool = True, 

1409 include_cmds: bool = True, 

1410 include_secret: bool = True, 

1411 ) -> ConfigSourcesType: 

1412 """ 

1413 Return the current configuration as an OrderedDict of OrderedDicts. 

1414 

1415 When materializing current configuration Airflow defaults are 

1416 materialized along with user set configs. If any of the `include_*` 

1417 options are False then the result of calling command or secret key 

1418 configs do not override Airflow defaults and instead are passed through. 

1419 In order to then avoid Airflow defaults from overwriting user set 

1420 command or secret key configs we filter out bare sensitive_config_values 

1421 that are set to Airflow defaults when command or secret key configs 

1422 produce different values. 

1423 

1424 :param display_source: If False, the option value is returned. If True, 

1425 a tuple of (option_value, source) is returned. Source is either 

1426 'airflow.cfg', 'default', 'env var', or 'cmd'. 

1427 :param display_sensitive: If True, the values of options set by env 

1428 vars and bash commands will be displayed. If False, those options 

1429 are shown as '< hidden >' 

1430 :param raw: Should the values be output as interpolated values, or the 

1431 "raw" form that can be fed back in to ConfigParser 

1432 :param include_env: Should the value of configuration from AIRFLOW__ 

1433 environment variables be included or not 

1434 :param include_cmds: Should the result of calling any ``*_cmd`` config be 

1435 set (True, default), or should the _cmd options be left as the 

1436 command to run (False) 

1437 :param include_secret: Should the result of calling any ``*_secret`` config be 

1438 set (True, default), or should the _secret options be left as the 

1439 path to get the secret from (False) 

1440 :return: Dictionary, where the key is the name of the section and the content is 

1441 the dictionary with the name of the parameter and its value. 

1442 """ 

1443 if not display_sensitive: 

1444 # We want to hide the sensitive values at the appropriate methods 

1445 # since envs from cmds, secrets can be read at _include_envs method 

1446 if not all([include_env, include_cmds, include_secret]): 

1447 raise ValueError( 

1448 "If display_sensitive is false, then include_env, " 

1449 "include_cmds, include_secret must all be set as True" 

1450 ) 

1451 

1452 config_sources: ConfigSourcesType = {} 

1453 

1454 # We check sequentially all those sources and the last one we saw it in will "win" 

1455 configs = self._get_config_sources_for_as_dict() 

1456 

1457 self._replace_config_with_display_sources( 

1458 config_sources, 

1459 configs, 

1460 self.configuration_description, 

1461 display_source, 

1462 raw, 

1463 self.deprecated_options, 

1464 include_cmds=include_cmds, 

1465 include_env=include_env, 

1466 include_secret=include_secret, 

1467 ) 

1468 

1469 # add env vars and overwrite because they have priority 

1470 if include_env: 

1471 self._include_envs(config_sources, display_sensitive, display_source, raw) 

1472 else: 

1473 self._filter_by_source(config_sources, display_source, self._get_env_var_option) 

1474 

1475 # add bash commands 

1476 if include_cmds: 

1477 self._include_commands(config_sources, display_sensitive, display_source, raw) 

1478 else: 

1479 self._filter_by_source(config_sources, display_source, self._get_cmd_option) 

1480 

1481 # add config from secret backends 

1482 if include_secret: 

1483 self._include_secrets(config_sources, display_sensitive, display_source, raw) 

1484 else: 

1485 self._filter_by_source(config_sources, display_source, self._get_secret_option) 

1486 

1487 if not display_sensitive: 

1488 # This ensures the ones from config file is hidden too 

1489 # if they are not provided through env, cmd and secret 

1490 hidden = "< hidden >" 

1491 for section, key in self.sensitive_config_values: 

1492 if config_sources.get(section): 

1493 if config_sources[section].get(key, None): 

1494 if display_source: 

1495 source = config_sources[section][key][1] 

1496 config_sources[section][key] = (hidden, source) 

1497 else: 

1498 config_sources[section][key] = hidden 

1499 

1500 return config_sources 

1501 

1502 def _write_option_header( 

1503 self, 

1504 file: IO[str], 

1505 option: str, 

1506 extra_spacing: bool, 

1507 include_descriptions: bool, 

1508 include_env_vars: bool, 

1509 include_examples: bool, 

1510 include_sources: bool, 

1511 section_config_description: dict[str, dict[str, Any]], 

1512 section_to_write: str, 

1513 sources_dict: ConfigSourcesType, 

1514 ) -> tuple[bool, bool]: 

1515 """ 

1516 Write header for configuration option. 

1517 

1518 Returns tuple of (should_continue, needs_separation) where needs_separation should be 

1519 set if the option needs additional separation to visually separate it from the next option. 

1520 """ 

1521 option_config_description = ( 

1522 section_config_description.get("options", {}).get(option, {}) 

1523 if section_config_description 

1524 else {} 

1525 ) 

1526 description = option_config_description.get("description") 

1527 needs_separation = False 

1528 if description and include_descriptions: 

1529 for line in description.splitlines(): 

1530 file.write(f"# {line}\n") 

1531 needs_separation = True 

1532 example = option_config_description.get("example") 

1533 if example is not None and include_examples: 

1534 if extra_spacing: 

1535 file.write("#\n") 

1536 example_lines = example.splitlines() 

1537 example = "\n# ".join(example_lines) 

1538 file.write(f"# Example: {option} = {example}\n") 

1539 needs_separation = True 

1540 if include_sources and sources_dict: 

1541 sources_section = sources_dict.get(section_to_write) 

1542 value_with_source = sources_section.get(option) if sources_section else None 

1543 if value_with_source is None: 

1544 file.write("#\n# Source: not defined\n") 

1545 else: 

1546 file.write(f"#\n# Source: {value_with_source[1]}\n") 

1547 needs_separation = True 

1548 if include_env_vars: 

1549 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n") 

1550 if extra_spacing: 

1551 file.write("#\n") 

1552 needs_separation = True 

1553 return True, needs_separation 

1554 

1555 def is_template(self, section: str, key) -> bool: 

1556 """ 

1557 Return whether the value is templated. 

1558 

1559 :param section: section of the config 

1560 :param key: key in the section 

1561 :return: True if the value is templated 

1562 """ 

1563 return _is_template(self.configuration_description, section, key) 

1564 

1565 def getsection(self, section: str, team_name: str | None = None) -> ConfigOptionsDictType | None: 

1566 """ 

1567 Return the section as a dict. 

1568 

1569 Values are converted to int, float, bool as required. 

1570 

1571 :param section: section from the config 

1572 :param team_name: optional team name for team-specific configuration lookup 

1573 """ 

1574 # Handle team-specific section lookup for config file 

1575 config_section = f"{team_name}={section}" if team_name else section 

1576 

1577 if not self.has_section(config_section) and not self._default_values.has_section(config_section): 

1578 return None 

1579 if self._default_values.has_section(config_section): 

1580 _section: ConfigOptionsDictType = dict(self._default_values.items(config_section)) 

1581 else: 

1582 _section = {} 

1583 

1584 if self.has_section(config_section): 

1585 _section.update(self.items(config_section)) 

1586 

1587 # Use section (not config_section) for env var lookup - team_name is handled by _env_var_name 

1588 section_prefix = self._env_var_name(section, "", team_name=team_name) 

1589 for env_var in sorted(os.environ.keys()): 

1590 if env_var.startswith(section_prefix): 

1591 key = env_var.replace(section_prefix, "") 

1592 if key.endswith("_CMD"): 

1593 key = key[:-4] 

1594 key = key.lower() 

1595 _section[key] = self._get_env_var_option(section, key, team_name=team_name) 

1596 

1597 for key, val in _section.items(): 

1598 if val is None: 

1599 raise AirflowConfigException( 

1600 f"Failed to convert value automatically. " 

1601 f'Please check "{key}" key in "{section}" section is set.' 

1602 ) 

1603 try: 

1604 _section[key] = int(val) 

1605 except ValueError: 

1606 try: 

1607 _section[key] = float(val) 

1608 except ValueError: 

1609 if isinstance(val, str) and val.lower() in ("t", "true"): 

1610 _section[key] = True 

1611 elif isinstance(val, str) and val.lower() in ("f", "false"): 

1612 _section[key] = False 

1613 return _section 

1614 

1615 @staticmethod 

1616 def _write_section_header( 

1617 file: IO[str], 

1618 include_descriptions: bool, 

1619 section_config_description: dict[str, str], 

1620 section_to_write: str, 

1621 ) -> None: 

1622 """Write header for configuration section.""" 

1623 file.write(f"[{section_to_write}]\n") 

1624 section_description = section_config_description.get("description") 

1625 if section_description and include_descriptions: 

1626 for line in section_description.splitlines(): 

1627 file.write(f"# {line}\n") 

1628 file.write("\n") 

1629 

1630 def _write_value( 

1631 self, 

1632 file: IO[str], 

1633 option: str, 

1634 comment_out_everything: bool, 

1635 needs_separation: bool, 

1636 only_defaults: bool, 

1637 section_to_write: str, 

1638 hide_sensitive: bool, 

1639 is_sensitive: bool, 

1640 show_values: bool = False, 

1641 ): 

1642 default_value = self.get_default_value(section_to_write, option, raw=True) 

1643 if only_defaults: 

1644 value = default_value 

1645 else: 

1646 value = self.get(section_to_write, option, fallback=default_value, raw=True) 

1647 if not show_values: 

1648 file.write(f"# {option} = \n") 

1649 else: 

1650 if hide_sensitive and is_sensitive: 

1651 value = "< hidden >" 

1652 else: 

1653 pass 

1654 if value is None: 

1655 file.write(f"# {option} = \n") 

1656 else: 

1657 if comment_out_everything: 

1658 value_lines = value.splitlines() 

1659 value = "\n# ".join(value_lines) 

1660 file.write(f"# {option} = {value}\n") 

1661 else: 

1662 if "\n" in value: 

1663 try: 

1664 value = json.dumps(json.loads(value), indent=4) 

1665 value = value.replace( 

1666 "\n", "\n " 

1667 ) # indent multi-line JSON to satisfy configparser format 

1668 except JSONDecodeError: 

1669 pass 

1670 file.write(f"{option} = {value}\n") 

1671 if needs_separation: 

1672 file.write("\n") 

1673 

1674 def write( # type: ignore[override] 

1675 self, 

1676 file: IO[str], 

1677 section: str | None = None, 

1678 include_examples: bool = True, 

1679 include_descriptions: bool = True, 

1680 include_sources: bool = True, 

1681 include_env_vars: bool = True, 

1682 include_providers: bool = True, 

1683 comment_out_everything: bool = False, 

1684 hide_sensitive: bool = False, 

1685 extra_spacing: bool = True, 

1686 only_defaults: bool = False, 

1687 show_values: bool = False, 

1688 **kwargs: Any, 

1689 ) -> None: 

1690 """ 

1691 Write configuration with comments and examples to a file. 

1692 

1693 :param file: file to write to 

1694 :param section: section of the config to write, defaults to all sections 

1695 :param include_examples: Include examples in the output 

1696 :param include_descriptions: Include descriptions in the output 

1697 :param include_sources: Include the source of each config option 

1698 :param include_env_vars: Include environment variables corresponding to each config option 

1699 :param include_providers: Include providers configuration 

1700 :param comment_out_everything: Comment out all values 

1701 :param hide_sensitive_values: Include sensitive values in the output 

1702 :param extra_spacing: Add extra spacing before examples and after variables 

1703 :param only_defaults: Only include default values when writing the config, not the actual values 

1704 """ 

1705 sources_dict = {} 

1706 if include_sources: 

1707 sources_dict = self.as_dict(display_source=True) 

1708 with self.make_sure_configuration_loaded(with_providers=include_providers): 

1709 for section_to_write in self.get_sections_including_defaults(): 

1710 section_config_description = self.configuration_description.get(section_to_write, {}) 

1711 if section_to_write != section and section is not None: 

1712 continue 

1713 if self._default_values.has_section(section_to_write) or self.has_section(section_to_write): 

1714 self._write_section_header( 

1715 file, include_descriptions, section_config_description, section_to_write 

1716 ) 

1717 for option in self.get_options_including_defaults(section_to_write): 

1718 should_continue, needs_separation = self._write_option_header( 

1719 file=file, 

1720 option=option, 

1721 extra_spacing=extra_spacing, 

1722 include_descriptions=include_descriptions, 

1723 include_env_vars=include_env_vars, 

1724 include_examples=include_examples, 

1725 include_sources=include_sources, 

1726 section_config_description=section_config_description, 

1727 section_to_write=section_to_write, 

1728 sources_dict=sources_dict, 

1729 ) 

1730 is_sensitive = ( 

1731 section_to_write.lower(), 

1732 option.lower(), 

1733 ) in self.sensitive_config_values 

1734 self._write_value( 

1735 file=file, 

1736 option=option, 

1737 comment_out_everything=comment_out_everything, 

1738 needs_separation=needs_separation, 

1739 only_defaults=only_defaults, 

1740 section_to_write=section_to_write, 

1741 hide_sensitive=hide_sensitive, 

1742 is_sensitive=is_sensitive, 

1743 show_values=show_values, 

1744 ) 

1745 if include_descriptions and not needs_separation: 

1746 # extra separation between sections in case last option did not need it 

1747 file.write("\n") 

1748 

1749 @contextmanager 

1750 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]: 

1751 """ 

1752 Make sure configuration is loaded with or without providers. 

1753 

1754 This happens regardless if the provider configuration has been loaded before or not. 

1755 Restores configuration to the state before entering the context. 

1756 

1757 :param with_providers: whether providers should be loaded 

1758 """ 

1759 needs_reload = False 

1760 if with_providers: 

1761 self._ensure_providers_config_loaded() 

1762 else: 

1763 needs_reload = self._ensure_providers_config_unloaded() 

1764 yield 

1765 if needs_reload: 

1766 self._reload_provider_configs() 

1767 

1768 def _ensure_providers_config_loaded(self) -> None: 

1769 """Ensure providers configurations are loaded.""" 

1770 raise NotImplementedError("Subclasses must implement _ensure_providers_config_loaded method") 

1771 

1772 def _ensure_providers_config_unloaded(self) -> bool: 

1773 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded.""" 

1774 raise NotImplementedError("Subclasses must implement _ensure_providers_config_unloaded method") 

1775 

1776 def _reload_provider_configs(self) -> None: 

1777 """Reload providers configuration.""" 

1778 raise NotImplementedError("Subclasses must implement _reload_provider_configs method")