Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/_shared/configuration/parser.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

744 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Base configuration parser with pure parsing logic.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import datetime 

24import functools 

25import itertools 

26import json 

27import logging 

28import os 

29import shlex 

30import subprocess 

31import sys 

32import warnings 

33from collections.abc import Callable, Generator, Iterable 

34from configparser import ConfigParser, NoOptionError, NoSectionError 

35from contextlib import contextmanager 

36from enum import Enum 

37from json.decoder import JSONDecodeError 

38from re import Pattern 

39from typing import IO, Any, TypeVar, overload 

40 

41from .exceptions import AirflowConfigException 

42 

43log = logging.getLogger(__name__) 

44 

45 

46ConfigType = str | int | float | bool 

47ConfigOptionsDictType = dict[str, ConfigType] 

48ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

49ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

50ENV_VAR_PREFIX = "AIRFLOW__" 

51 

52 

53class ValueNotFound: 

54 """Object of this is raised when a configuration value cannot be found.""" 

55 

56 pass 

57 

58 

59VALUE_NOT_FOUND_SENTINEL = ValueNotFound() 

60 

61 

62@overload 

63def expand_env_var(env_var: None) -> None: ... 

64@overload 

65def expand_env_var(env_var: str) -> str: ... 

66 

67 

68def expand_env_var(env_var: str | None) -> str | None: 

69 """ 

70 Expand (potentially nested) env vars. 

71 

72 Repeat and apply `expandvars` and `expanduser` until 

73 interpolation stops having any effect. 

74 """ 

75 if not env_var or not isinstance(env_var, str): 

76 return env_var 

77 while True: 

78 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

79 if interpolated == env_var: 

80 return interpolated 

81 env_var = interpolated 

82 

83 

84def run_command(command: str) -> str: 

85 """Run command and returns stdout.""" 

86 process = subprocess.Popen( 

87 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

88 ) 

89 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

90 

91 if process.returncode != 0: 

92 raise AirflowConfigException( 

93 f"Cannot execute {command}. Error code is: {process.returncode}. " 

94 f"Output: {output}, Stderr: {stderr}" 

95 ) 

96 

97 return output 

98 

99 

100def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool: 

101 """ 

102 Check if the config is a template. 

103 

104 :param configuration_description: description of configuration 

105 :param section: section 

106 :param key: key 

107 :return: True if the config is a template 

108 """ 

109 return configuration_description.get(section, {}).get(key, {}).get("is_template", False) 

110 

111 

112class AirflowConfigParser(ConfigParser): 

113 """ 

114 Base configuration parser with pure parsing logic. 

115 

116 This class provides the core parsing methods that work with: 

117 - configuration_description: dict describing config options (required in __init__) 

118 - _default_values: ConfigParser with default values (required in __init__) 

119 - deprecated_options: class attribute mapping new -> old options 

120 - deprecated_sections: class attribute mapping new -> old sections 

121 """ 

122 

123 # A mapping of section -> setting -> { old, replace } for deprecated default values. 

124 # Subclasses can override this to define deprecated values that should be upgraded. 

125 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {} 

126 

127 # A mapping of (new section, new option) -> (old section, old option, since_version). 

128 # When reading new option, the old option will be checked to see if it exists. If it does a 

129 # DeprecationWarning will be issued and the old option will be used instead 

130 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { 

131 ("dag_processor", "dag_file_processor_timeout"): ("core", "dag_file_processor_timeout", "3.0"), 

132 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"), 

133 ("api", "base_url"): ("webserver", "base_url", "3.0"), 

134 ("api", "host"): ("webserver", "web_server_host", "3.0"), 

135 ("api", "port"): ("webserver", "web_server_port", "3.0"), 

136 ("api", "workers"): ("webserver", "workers", "3.0"), 

137 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"), 

138 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"), 

139 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"), 

140 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"), 

141 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"), 

142 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"), 

143 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"), 

144 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"), 

145 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"), 

146 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"), 

147 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"), 

148 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"), 

149 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"), 

150 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"), 

151 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"), 

152 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"), 

153 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"), 

154 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"), 

155 ("api", "page_size"): ("webserver", "page_size", "3.1.0"), 

156 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"), 

157 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"), 

158 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"), 

159 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"), 

160 ("api", "log_config"): ("api", "access_logfile", "3.1.0"), 

161 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"), 

162 } 

163 

164 # A mapping of new section -> (old section, since_version). 

165 deprecated_sections: dict[str, tuple[str, str]] = {} 

166 

167 @property 

168 def _lookup_sequence(self) -> list[Callable]: 

169 """ 

170 Define the sequence of lookup methods for get(). The definition here does not have provider lookup. 

171 

172 Subclasses can override this to customise lookup order. 

173 """ 

174 return [ 

175 self._get_environment_variables, 

176 self._get_option_from_config_file, 

177 self._get_option_from_commands, 

178 self._get_option_from_secrets, 

179 self._get_option_from_defaults, 

180 ] 

181 

182 @property 

183 def _validators(self) -> list[Callable[[], None]]: 

184 """ 

185 Return list of validators defined on a config parser class. Base class will return an empty list. 

186 

187 Subclasses can override this to customize the validators that are run during validation on the 

188 config parser instance. 

189 """ 

190 return [] 

191 

192 def validate(self) -> None: 

193 """Run all registered validators.""" 

194 for validator in self._validators: 

195 validator() 

196 self.is_validated = True 

197 

198 def _validate_deprecated_values(self) -> None: 

199 """Validate and upgrade deprecated default values.""" 

200 for section, replacement in self.deprecated_values.items(): 

201 for name, info in replacement.items(): 

202 old, new = info 

203 current_value = self.get(section, name, fallback="") 

204 if self._using_old_value(old, current_value): 

205 self.upgraded_values[(section, name)] = current_value 

206 new_value = old.sub(new, current_value) 

207 self._update_env_var(section=section, name=name, new_value=new_value) 

208 self._create_future_warning( 

209 name=name, 

210 section=section, 

211 current_value=current_value, 

212 new_value=new_value, 

213 ) 

214 

215 def _using_old_value(self, old: Pattern, current_value: str) -> bool: 

216 """Check if current_value matches the old pattern.""" 

217 return old.search(current_value) is not None 

218 

219 def _update_env_var(self, section: str, name: str, new_value: str) -> None: 

220 """Update environment variable with new value.""" 

221 env_var = self._env_var_name(section, name) 

222 # Set it as an env var so that any subprocesses keep the same override! 

223 os.environ[env_var] = new_value 

224 

225 @staticmethod 

226 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None: 

227 """Create a FutureWarning for deprecated default values.""" 

228 warnings.warn( 

229 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. " 

230 f"This value has been changed to {new_value!r} in the running config, but please update your config.", 

231 FutureWarning, 

232 stacklevel=3, 

233 ) 

234 

235 def __init__( 

236 self, 

237 configuration_description: dict[str, dict[str, Any]], 

238 _default_values: ConfigParser, 

239 *args, 

240 **kwargs, 

241 ): 

242 """ 

243 Initialize the parser. 

244 

245 :param configuration_description: Description of configuration options 

246 :param _default_values: ConfigParser with default values 

247 """ 

248 super().__init__(*args, **kwargs) 

249 self.configuration_description = configuration_description 

250 self._default_values = _default_values 

251 self._suppress_future_warnings = False 

252 self.upgraded_values: dict[tuple[str, str], str] = {} 

253 

254 @functools.cached_property 

255 def inversed_deprecated_options(self): 

256 """Build inverse mapping from old options to new options.""" 

257 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()} 

258 

259 @functools.cached_property 

260 def inversed_deprecated_sections(self): 

261 """Build inverse mapping from old sections to new sections.""" 

262 return { 

263 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items() 

264 } 

265 

266 @functools.cached_property 

267 def sensitive_config_values(self) -> set[tuple[str, str]]: 

268 """Get set of sensitive config values that should be masked.""" 

269 flattened = { 

270 (s, k): item 

271 for s, s_c in self.configuration_description.items() 

272 for k, item in s_c.get("options", {}).items() 

273 } 

274 sensitive = { 

275 (section.lower(), key.lower()) 

276 for (section, key), v in flattened.items() 

277 if v.get("sensitive") is True 

278 } 

279 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options} 

280 depr_section = { 

281 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections 

282 } 

283 sensitive.update(depr_section, depr_option) 

284 return sensitive 

285 

286 def _update_defaults_from_string(self, config_string: str) -> None: 

287 """ 

288 Update the defaults in _default_values based on values in config_string ("ini" format). 

289 

290 Override shared parser's method to add validation for template variables. 

291 Note that those values are not validated and cannot contain variables because we are using 

292 regular config parser to load them. This method is used to test the config parser in unit tests. 

293 

294 :param config_string: ini-formatted config string 

295 """ 

296 parser = ConfigParser() 

297 parser.read_string(config_string) 

298 for section in parser.sections(): 

299 if section not in self._default_values.sections(): 

300 self._default_values.add_section(section) 

301 errors = False 

302 for key, value in parser.items(section): 

303 if not self.is_template(section, key) and "{" in value: 

304 errors = True 

305 log.error( 

306 "The %s.%s value %s read from string contains variable. This is not supported", 

307 section, 

308 key, 

309 value, 

310 ) 

311 self._default_values.set(section, key, value) 

312 if errors: 

313 raise AirflowConfigException( 

314 f"The string config passed as default contains variables. " 

315 f"This is not supported. String config: {config_string}" 

316 ) 

317 

318 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any: 

319 """ 

320 Retrieve default value from default config parser. 

321 

322 This will retrieve the default value from the default config parser. Optionally a raw, stored 

323 value can be retrieved by setting skip_interpolation to True. This is useful for example when 

324 we want to write the default value to a file, and we don't want the interpolation to happen 

325 as it is going to be done later when the config is read. 

326 

327 :param section: section of the config 

328 :param key: key to use 

329 :param fallback: fallback value to use 

330 :param raw: if raw, then interpolation will be reversed 

331 :param kwargs: other args 

332 :return: 

333 """ 

334 value = self._default_values.get(section, key, fallback=fallback, **kwargs) 

335 if raw and value is not None: 

336 return value.replace("%", "%%") 

337 return value 

338 

339 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None: 

340 """ 

341 Get Secret Backend if defined in airflow.cfg. 

342 

343 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

344 """ 

345 section = "workers" if worker_mode else "secrets" 

346 key = "secrets_backend" if worker_mode else "backend" 

347 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs" 

348 

349 secrets_backend_cls = self.getimport(section=section, key=key) 

350 

351 if not secrets_backend_cls: 

352 if worker_mode: 

353 # if we find no secrets backend for worker, return that of secrets backend 

354 secrets_backend_cls = self.getimport(section="secrets", key="backend") 

355 if not secrets_backend_cls: 

356 return None 

357 # When falling back to secrets backend, use its kwargs 

358 kwargs_key = "backend_kwargs" 

359 section = "secrets" 

360 else: 

361 return None 

362 

363 try: 

364 backend_kwargs = self.getjson(section=section, key=kwargs_key) 

365 if not backend_kwargs: 

366 backend_kwargs = {} 

367 elif not isinstance(backend_kwargs, dict): 

368 raise ValueError("not a dict") 

369 except AirflowConfigException: 

370 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key) 

371 backend_kwargs = {} 

372 except ValueError: 

373 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key) 

374 backend_kwargs = {} 

375 

376 return secrets_backend_cls(**backend_kwargs) 

377 

378 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

379 """ 

380 Get Config option values from Secret Backend. 

381 

382 Called by the shared parser's _get_secret_option() method as part of the lookup chain. 

383 Uses _get_custom_secret_backend() to get the backend instance. 

384 

385 :param config_key: the config key to retrieve 

386 :return: config value or None 

387 """ 

388 try: 

389 secrets_client = self._get_custom_secret_backend() 

390 if not secrets_client: 

391 return None 

392 return secrets_client.get_config(config_key) 

393 except Exception as e: 

394 raise AirflowConfigException( 

395 "Cannot retrieve config from alternative secrets backend. " 

396 "Make sure it is configured properly and that the Backend " 

397 "is accessible.\n" 

398 f"{e}" 

399 ) 

400 

401 def _get_cmd_option_from_config_sources( 

402 self, config_sources: ConfigSourcesType, section: str, key: str 

403 ) -> str | None: 

404 fallback_key = key + "_cmd" 

405 if (section, key) in self.sensitive_config_values: 

406 section_dict = config_sources.get(section) 

407 if section_dict is not None: 

408 command_value = section_dict.get(fallback_key) 

409 if command_value is not None: 

410 if isinstance(command_value, str): 

411 command = command_value 

412 else: 

413 command = command_value[0] 

414 return run_command(command) 

415 return None 

416 

417 def _get_secret_option_from_config_sources( 

418 self, config_sources: ConfigSourcesType, section: str, key: str 

419 ) -> str | None: 

420 fallback_key = key + "_secret" 

421 if (section, key) in self.sensitive_config_values: 

422 section_dict = config_sources.get(section) 

423 if section_dict is not None: 

424 secrets_path_value = section_dict.get(fallback_key) 

425 if secrets_path_value is not None: 

426 if isinstance(secrets_path_value, str): 

427 secrets_path = secrets_path_value 

428 else: 

429 secrets_path = secrets_path_value[0] 

430 return self._get_config_value_from_secret_backend(secrets_path) 

431 return None 

432 

433 def _include_secrets( 

434 self, 

435 config_sources: ConfigSourcesType, 

436 display_sensitive: bool, 

437 display_source: bool, 

438 raw: bool, 

439 ): 

440 for section, key in self.sensitive_config_values: 

441 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key) 

442 if value: 

443 if not display_sensitive: 

444 value = "< hidden >" 

445 if display_source: 

446 opt: str | tuple[str, str] = (value, "secret") 

447 elif raw: 

448 opt = value.replace("%", "%%") 

449 else: 

450 opt = value 

451 config_sources.setdefault(section, {}).update({key: opt}) 

452 del config_sources[section][key + "_secret"] 

453 

454 def _include_commands( 

455 self, 

456 config_sources: ConfigSourcesType, 

457 display_sensitive: bool, 

458 display_source: bool, 

459 raw: bool, 

460 ): 

461 for section, key in self.sensitive_config_values: 

462 opt = self._get_cmd_option_from_config_sources(config_sources, section, key) 

463 if not opt: 

464 continue 

465 opt_to_set: str | tuple[str, str] | None = opt 

466 if not display_sensitive: 

467 opt_to_set = "< hidden >" 

468 if display_source: 

469 opt_to_set = (str(opt_to_set), "cmd") 

470 elif raw: 

471 opt_to_set = str(opt_to_set).replace("%", "%%") 

472 if opt_to_set is not None: 

473 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set} 

474 config_sources.setdefault(section, {}).update(dict_to_update) 

475 del config_sources[section][key + "_cmd"] 

476 

477 def _include_envs( 

478 self, 

479 config_sources: ConfigSourcesType, 

480 display_sensitive: bool, 

481 display_source: bool, 

482 raw: bool, 

483 ): 

484 for env_var in [ 

485 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX) 

486 ]: 

487 try: 

488 _, section, key = env_var.split("__", 2) 

489 opt = self._get_env_var_option(section, key) 

490 except ValueError: 

491 continue 

492 if opt is None: 

493 log.warning("Ignoring unknown env var '%s'", env_var) 

494 continue 

495 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"): 

496 # Don't hide cmd/secret values here 

497 if not env_var.lower().endswith(("cmd", "secret")): 

498 if (section, key) in self.sensitive_config_values: 

499 opt = "< hidden >" 

500 elif raw: 

501 opt = opt.replace("%", "%%") 

502 if display_source: 

503 opt = (opt, "env var") 

504 

505 section = section.lower() 

506 key = key.lower() 

507 config_sources.setdefault(section, {}).update({key: opt}) 

508 

509 def _filter_by_source( 

510 self, 

511 config_sources: ConfigSourcesType, 

512 display_source: bool, 

513 getter_func, 

514 ): 

515 """ 

516 Delete default configs from current configuration. 

517 

518 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values. 

519 

520 This is necessary because bare configs take precedence over the command 

521 or secret key equivalents so if the current running config is 

522 materialized with Airflow defaults they in turn override user set 

523 command or secret key configs. 

524 

525 :param config_sources: The current configuration to operate on 

526 :param display_source: If False, configuration options contain raw 

527 values. If True, options are a tuple of (option_value, source). 

528 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'. 

529 :param getter_func: A callback function that gets the user configured 

530 override value for a particular sensitive_config_values config. 

531 :return: None, the given config_sources is filtered if necessary, 

532 otherwise untouched. 

533 """ 

534 for section, key in self.sensitive_config_values: 

535 # Don't bother if we don't have section / key 

536 if section not in config_sources or key not in config_sources[section]: 

537 continue 

538 # Check that there is something to override defaults 

539 try: 

540 getter_opt = getter_func(section, key) 

541 except ValueError: 

542 continue 

543 if not getter_opt: 

544 continue 

545 # Check to see that there is a default value 

546 if self.get_default_value(section, key) is None: 

547 continue 

548 # Check to see if bare setting is the same as defaults 

549 if display_source: 

550 # when display_source = true, we know that the config_sources contains tuple 

551 opt, source = config_sources[section][key] # type: ignore 

552 else: 

553 opt = config_sources[section][key] 

554 if opt == self.get_default_value(section, key): 

555 del config_sources[section][key] 

556 

557 @staticmethod 

558 def _deprecated_value_is_set_in_config( 

559 deprecated_section: str, 

560 deprecated_key: str, 

561 configs: Iterable[tuple[str, ConfigParser]], 

562 ) -> bool: 

563 for config_type, config in configs: 

564 if config_type != "default": 

565 with contextlib.suppress(NoSectionError): 

566 deprecated_section_array = config.items(section=deprecated_section, raw=True) 

567 if any(key == deprecated_key for key, _ in deprecated_section_array): 

568 return True 

569 return False 

570 

571 @staticmethod 

572 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

573 return ( 

574 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}") 

575 is not None 

576 ) 

577 

578 @staticmethod 

579 def _deprecated_command_is_set_in_config( 

580 deprecated_section: str, 

581 deprecated_key: str, 

582 configs: Iterable[tuple[str, ConfigParser]], 

583 ) -> bool: 

584 return AirflowConfigParser._deprecated_value_is_set_in_config( 

585 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs 

586 ) 

587 

588 @staticmethod 

589 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

590 return ( 

591 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD") 

592 is not None 

593 ) 

594 

595 @staticmethod 

596 def _deprecated_secret_is_set_in_config( 

597 deprecated_section: str, 

598 deprecated_key: str, 

599 configs: Iterable[tuple[str, ConfigParser]], 

600 ) -> bool: 

601 return AirflowConfigParser._deprecated_value_is_set_in_config( 

602 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs 

603 ) 

604 

605 @staticmethod 

606 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

607 return ( 

608 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET") 

609 is not None 

610 ) 

611 

612 @staticmethod 

613 def _replace_config_with_display_sources( 

614 config_sources: ConfigSourcesType, 

615 configs: Iterable[tuple[str, ConfigParser]], 

616 configuration_description: dict[str, dict[str, Any]], 

617 display_source: bool, 

618 raw: bool, 

619 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

620 include_env: bool, 

621 include_cmds: bool, 

622 include_secret: bool, 

623 ): 

624 for source_name, config in configs: 

625 sections = config.sections() 

626 for section in sections: 

627 AirflowConfigParser._replace_section_config_with_display_sources( 

628 config, 

629 config_sources, 

630 configuration_description, 

631 display_source, 

632 raw, 

633 section, 

634 source_name, 

635 deprecated_options, 

636 configs, 

637 include_env=include_env, 

638 include_cmds=include_cmds, 

639 include_secret=include_secret, 

640 ) 

641 

642 @staticmethod 

643 def _replace_section_config_with_display_sources( 

644 config: ConfigParser, 

645 config_sources: ConfigSourcesType, 

646 configuration_description: dict[str, dict[str, Any]], 

647 display_source: bool, 

648 raw: bool, 

649 section: str, 

650 source_name: str, 

651 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

652 configs: Iterable[tuple[str, ConfigParser]], 

653 include_env: bool, 

654 include_cmds: bool, 

655 include_secret: bool, 

656 ): 

657 sect = config_sources.setdefault(section, {}) 

658 if isinstance(config, AirflowConfigParser): 

659 with config.suppress_future_warnings(): 

660 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw) 

661 else: 

662 items = config.items(section=section, raw=raw) 

663 for k, val in items: 

664 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None)) 

665 if deprecated_section and deprecated_key: 

666 if source_name == "default": 

667 # If deprecated entry has some non-default value set for any of the sources requested, 

668 # We should NOT set default for the new entry (because it will override anything 

669 # coming from the deprecated ones) 

670 if AirflowConfigParser._deprecated_value_is_set_in_config( 

671 deprecated_section, deprecated_key, configs 

672 ): 

673 continue 

674 if include_env and AirflowConfigParser._deprecated_variable_is_set( 

675 deprecated_section, deprecated_key 

676 ): 

677 continue 

678 if include_cmds and ( 

679 AirflowConfigParser._deprecated_variable_command_is_set( 

680 deprecated_section, deprecated_key 

681 ) 

682 or AirflowConfigParser._deprecated_command_is_set_in_config( 

683 deprecated_section, deprecated_key, configs 

684 ) 

685 ): 

686 continue 

687 if include_secret and ( 

688 AirflowConfigParser._deprecated_variable_secret_is_set( 

689 deprecated_section, deprecated_key 

690 ) 

691 or AirflowConfigParser._deprecated_secret_is_set_in_config( 

692 deprecated_section, deprecated_key, configs 

693 ) 

694 ): 

695 continue 

696 if display_source: 

697 updated_source_name = source_name 

698 if source_name == "default": 

699 # defaults can come from other sources (default-<PROVIDER>) that should be used here 

700 source_description_section = configuration_description.get(section, {}) 

701 source_description_key = source_description_section.get("options", {}).get(k, {}) 

702 if source_description_key is not None: 

703 updated_source_name = source_description_key.get("source", source_name) 

704 sect[k] = (val, updated_source_name) 

705 else: 

706 sect[k] = val 

707 

708 def _warn_deprecate( 

709 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int 

710 ): 

711 """Warn about deprecated config option usage.""" 

712 if section == deprecated_section: 

713 warnings.warn( 

714 f"The {deprecated_name} option in [{section}] has been renamed to {key} - " 

715 f"the old setting has been used, but please update your config.", 

716 DeprecationWarning, 

717 stacklevel=4 + extra_stacklevel, 

718 ) 

719 else: 

720 warnings.warn( 

721 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option " 

722 f"in [{section}] - the old setting has been used, but please update your config.", 

723 DeprecationWarning, 

724 stacklevel=4 + extra_stacklevel, 

725 ) 

726 

727 @contextmanager 

728 def suppress_future_warnings(self): 

729 """ 

730 Context manager to temporarily suppress future warnings. 

731 

732 This is a stub used by the shared parser's lookup methods when checking deprecated options. 

733 Subclasses can override this to customize warning suppression behavior. 

734 

735 :return: context manager that suppresses future warnings 

736 """ 

737 suppress_future_warnings = self._suppress_future_warnings 

738 self._suppress_future_warnings = True 

739 yield self 

740 self._suppress_future_warnings = suppress_future_warnings 

741 

742 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str: 

743 """Generate environment variable name for a config option.""" 

744 team_component: str = f"{team_name.upper()}___" if team_name else "" 

745 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}" 

746 

747 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None): 

748 """Get config option from environment variable.""" 

749 env_var: str = self._env_var_name(section, key, team_name=team_name) 

750 if env_var in os.environ: 

751 return expand_env_var(os.environ[env_var]) 

752 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command) 

753 env_var_cmd = env_var + "_CMD" 

754 if env_var_cmd in os.environ: 

755 # if this is a valid command key... 

756 if (section, key) in self.sensitive_config_values: 

757 return run_command(os.environ[env_var_cmd]) 

758 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend) 

759 env_var_secret_path = env_var + "_SECRET" 

760 if env_var_secret_path in os.environ: 

761 # if this is a valid secret path... 

762 if (section, key) in self.sensitive_config_values: 

763 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path]) 

764 return None 

765 

766 def _get_cmd_option(self, section: str, key: str): 

767 """Get config option from command execution.""" 

768 fallback_key = key + "_cmd" 

769 if (section, key) in self.sensitive_config_values: 

770 if super().has_option(section, fallback_key): 

771 command = super().get(section, fallback_key) 

772 try: 

773 cmd_output = run_command(command) 

774 except AirflowConfigException as e: 

775 raise e 

776 except Exception as e: 

777 raise AirflowConfigException( 

778 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd." 

779 f" Please check the {fallback_key} value." 

780 ) from e 

781 return cmd_output 

782 return None 

783 

784 def _get_secret_option(self, section: str, key: str) -> str | None: 

785 """Get Config option values from Secret Backend.""" 

786 fallback_key = key + "_secret" 

787 if (section, key) in self.sensitive_config_values: 

788 if super().has_option(section, fallback_key): 

789 secrets_path = super().get(section, fallback_key) 

790 return self._get_config_value_from_secret_backend(secrets_path) 

791 return None 

792 

793 def _get_environment_variables( 

794 self, 

795 deprecated_key: str | None, 

796 deprecated_section: str | None, 

797 key: str, 

798 section: str, 

799 issue_warning: bool = True, 

800 extra_stacklevel: int = 0, 

801 **kwargs, 

802 ) -> str | ValueNotFound: 

803 """Get config option from environment variables.""" 

804 team_name = kwargs.get("team_name", None) 

805 option = self._get_env_var_option(section, key, team_name=team_name) 

806 if option is not None: 

807 return option 

808 if deprecated_section and deprecated_key: 

809 with self.suppress_future_warnings(): 

810 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name) 

811 if option is not None: 

812 if issue_warning: 

813 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

814 return option 

815 return VALUE_NOT_FOUND_SENTINEL 

816 

817 def _get_option_from_config_file( 

818 self, 

819 deprecated_key: str | None, 

820 deprecated_section: str | None, 

821 key: str, 

822 section: str, 

823 issue_warning: bool = True, 

824 extra_stacklevel: int = 0, 

825 **kwargs, 

826 ) -> str | ValueNotFound: 

827 """Get config option from config file.""" 

828 if team_name := kwargs.get("team_name", None): 

829 section = f"{team_name}={section}" 

830 # since this is the last lookup that supports team_name, pop it 

831 kwargs.pop("team_name") 

832 if super().has_option(section, key): 

833 return expand_env_var(super().get(section, key, **kwargs)) 

834 if deprecated_section and deprecated_key: 

835 if super().has_option(deprecated_section, deprecated_key): 

836 if issue_warning: 

837 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

838 with self.suppress_future_warnings(): 

839 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs)) 

840 return VALUE_NOT_FOUND_SENTINEL 

841 

842 def _get_option_from_commands( 

843 self, 

844 deprecated_key: str | None, 

845 deprecated_section: str | None, 

846 key: str, 

847 section: str, 

848 issue_warning: bool = True, 

849 extra_stacklevel: int = 0, 

850 **kwargs, 

851 ) -> str | ValueNotFound: 

852 """Get config option from command execution.""" 

853 option = self._get_cmd_option(section, key) 

854 if option: 

855 return option 

856 if deprecated_section and deprecated_key: 

857 with self.suppress_future_warnings(): 

858 option = self._get_cmd_option(deprecated_section, deprecated_key) 

859 if option: 

860 if issue_warning: 

861 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

862 return option 

863 return VALUE_NOT_FOUND_SENTINEL 

864 

865 def _get_option_from_secrets( 

866 self, 

867 deprecated_key: str | None, 

868 deprecated_section: str | None, 

869 key: str, 

870 section: str, 

871 issue_warning: bool = True, 

872 extra_stacklevel: int = 0, 

873 **kwargs, 

874 ) -> str | ValueNotFound: 

875 """Get config option from secrets backend.""" 

876 option = self._get_secret_option(section, key) 

877 if option: 

878 return option 

879 if deprecated_section and deprecated_key: 

880 with self.suppress_future_warnings(): 

881 option = self._get_secret_option(deprecated_section, deprecated_key) 

882 if option: 

883 if issue_warning: 

884 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

885 return option 

886 return VALUE_NOT_FOUND_SENTINEL 

887 

888 def _get_option_from_defaults( 

889 self, 

890 deprecated_key: str | None, 

891 deprecated_section: str | None, 

892 key: str, 

893 section: str, 

894 issue_warning: bool = True, 

895 extra_stacklevel: int = 0, 

896 team_name: str | None = None, 

897 **kwargs, 

898 ) -> str | ValueNotFound: 

899 """Get config option from default values.""" 

900 if self.get_default_value(section, key) is not None or "fallback" in kwargs: 

901 return expand_env_var(self.get_default_value(section, key, **kwargs)) 

902 return VALUE_NOT_FOUND_SENTINEL 

903 

904 def _resolve_deprecated_lookup( 

905 self, 

906 section: str, 

907 key: str, 

908 lookup_from_deprecated: bool, 

909 extra_stacklevel: int = 0, 

910 ) -> tuple[str, str, str | None, str | None, bool]: 

911 """ 

912 Resolve deprecated section/key mappings and determine deprecated values. 

913 

914 :param section: Section name (will be lowercased) 

915 :param key: Key name (will be lowercased) 

916 :param lookup_from_deprecated: Whether to lookup from deprecated options 

917 :param extra_stacklevel: Extra stack level for warnings 

918 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted) 

919 """ 

920 section = section.lower() 

921 key = key.lower() 

922 warning_emitted = False 

923 deprecated_section: str | None = None 

924 deprecated_key: str | None = None 

925 

926 if not lookup_from_deprecated: 

927 return section, key, deprecated_section, deprecated_key, warning_emitted 

928 

929 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {}) 

930 if option_description.get("deprecated"): 

931 deprecation_reason = option_description.get("deprecation_reason", "") 

932 warnings.warn( 

933 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}", 

934 DeprecationWarning, 

935 stacklevel=2 + extra_stacklevel, 

936 ) 

937 # For the cases in which we rename whole sections 

938 if section in self.inversed_deprecated_sections: 

939 deprecated_section, deprecated_key = (section, key) 

940 section = self.inversed_deprecated_sections[section] 

941 if not self._suppress_future_warnings: 

942 warnings.warn( 

943 f"The config section [{deprecated_section}] has been renamed to " 

944 f"[{section}]. Please update your `conf.get*` call to use the new name", 

945 FutureWarning, 

946 stacklevel=2 + extra_stacklevel, 

947 ) 

948 # Don't warn about individual rename if the whole section is renamed 

949 warning_emitted = True 

950 elif (section, key) in self.inversed_deprecated_options: 

951 # Handle using deprecated section/key instead of the new section/key 

952 new_section, new_key = self.inversed_deprecated_options[(section, key)] 

953 if not self._suppress_future_warnings and not warning_emitted: 

954 warnings.warn( 

955 f"section/key [{section}/{key}] has been deprecated, you should use" 

956 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the " 

957 "new name", 

958 FutureWarning, 

959 stacklevel=2 + extra_stacklevel, 

960 ) 

961 warning_emitted = True 

962 deprecated_section, deprecated_key = section, key 

963 section, key = (new_section, new_key) 

964 elif section in self.deprecated_sections: 

965 # When accessing the new section name, make sure we check under the old config name 

966 deprecated_key = key 

967 deprecated_section = self.deprecated_sections[section][0] 

968 else: 

969 deprecated_section, deprecated_key, _ = self.deprecated_options.get( 

970 (section, key), (None, None, None) 

971 ) 

972 

973 return section, key, deprecated_section, deprecated_key, warning_emitted 

974 

975 @overload # type: ignore[override] 

976 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... 

977 

978 @overload # type: ignore[override] 

979 def get(self, section: str, key: str, **kwargs) -> str | None: ... 

980 

981 def get( # type: ignore[misc, override] 

982 self, 

983 section: str, 

984 key: str, 

985 suppress_warnings: bool = False, 

986 lookup_from_deprecated: bool = True, 

987 _extra_stacklevel: int = 0, 

988 team_name: str | None = None, 

989 **kwargs, 

990 ) -> str | None: 

991 """ 

992 Get config value by iterating through lookup sequence. 

993 

994 Priority order is defined by _lookup_sequence property. 

995 """ 

996 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup( 

997 section=section, 

998 key=key, 

999 lookup_from_deprecated=lookup_from_deprecated, 

1000 extra_stacklevel=_extra_stacklevel, 

1001 ) 

1002 

1003 if team_name is not None: 

1004 kwargs["team_name"] = team_name 

1005 

1006 for lookup_method in self._lookup_sequence: 

1007 value = lookup_method( 

1008 deprecated_key=deprecated_key, 

1009 deprecated_section=deprecated_section, 

1010 key=key, 

1011 section=section, 

1012 issue_warning=not warning_emitted, 

1013 extra_stacklevel=_extra_stacklevel, 

1014 **kwargs, 

1015 ) 

1016 if value is not VALUE_NOT_FOUND_SENTINEL: 

1017 return value 

1018 

1019 # Check if fallback was explicitly provided (even if None) 

1020 if "fallback" in kwargs: 

1021 return kwargs["fallback"] 

1022 

1023 if not suppress_warnings: 

1024 log.warning("section/key [%s/%s] not found in config", section, key) 

1025 

1026 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config") 

1027 

1028 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override] 

1029 """Get config value as boolean.""" 

1030 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip() 

1031 if "#" in val: 

1032 val = val.split("#")[0].strip() 

1033 if val in ("t", "true", "1"): 

1034 return True 

1035 if val in ("f", "false", "0"): 

1036 return False 

1037 raise AirflowConfigException( 

1038 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. ' 

1039 f'Current value: "{val}".' 

1040 ) 

1041 

1042 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override] 

1043 """Get config value as integer.""" 

1044 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1045 if val is None: 

1046 raise AirflowConfigException( 

1047 f"Failed to convert value None to int. " 

1048 f'Please check "{key}" key in "{section}" section is set.' 

1049 ) 

1050 try: 

1051 return int(val) 

1052 except ValueError: 

1053 raise AirflowConfigException( 

1054 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1055 f'Current value: "{val}".' 

1056 ) 

1057 

1058 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override] 

1059 """Get config value as float.""" 

1060 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1061 if val is None: 

1062 raise AirflowConfigException( 

1063 f"Failed to convert value None to float. " 

1064 f'Please check "{key}" key in "{section}" section is set.' 

1065 ) 

1066 try: 

1067 return float(val) 

1068 except ValueError: 

1069 raise AirflowConfigException( 

1070 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. ' 

1071 f'Current value: "{val}".' 

1072 ) 

1073 

1074 def getlist(self, section: str, key: str, delimiter=",", **kwargs): 

1075 """Get config value as list.""" 

1076 val = self.get(section, key, **kwargs) 

1077 

1078 if isinstance(val, list) or val is None: 

1079 # `get` will always return a (possibly-empty) string, so the only way we can 

1080 # have these types is with `fallback=` was specified. So just return it. 

1081 return val 

1082 

1083 if val == "": 

1084 return [] 

1085 

1086 try: 

1087 return [item.strip() for item in val.split(delimiter)] 

1088 except Exception: 

1089 raise AirflowConfigException( 

1090 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. ' 

1091 f'Current value: "{val}".' 

1092 ) 

1093 

1094 E = TypeVar("E", bound=Enum) 

1095 

1096 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E: 

1097 """Get config value as enum.""" 

1098 val = self.get(section, key, **kwargs) 

1099 enum_names = [enum_item.name for enum_item in enum_class] 

1100 

1101 if val is None: 

1102 raise AirflowConfigException( 

1103 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1104 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}' 

1105 ) 

1106 

1107 try: 

1108 return enum_class[val] 

1109 except KeyError: 

1110 if "fallback" in kwargs and kwargs["fallback"] in enum_names: 

1111 return enum_class[kwargs["fallback"]] 

1112 raise AirflowConfigException( 

1113 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1114 f"the value must be one of {', '.join(enum_names)}" 

1115 ) 

1116 

1117 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]: 

1118 """Get config value as list of enums.""" 

1119 kwargs.setdefault("fallback", []) 

1120 string_list = self.getlist(section, key, delimiter, **kwargs) 

1121 

1122 enum_names = [enum_item.name for enum_item in enum_class] 

1123 enum_list = [] 

1124 

1125 for val in string_list: 

1126 try: 

1127 enum_list.append(enum_class[val]) 

1128 except KeyError: 

1129 log.warning( 

1130 "Failed to convert value %r. Please check %s key in %s section. " 

1131 "it must be one of %s, if not the value is ignored", 

1132 val, 

1133 key, 

1134 section, 

1135 ", ".join(enum_names), 

1136 ) 

1137 

1138 return enum_list 

1139 

1140 def getimport(self, section: str, key: str, **kwargs) -> Any: 

1141 """ 

1142 Read options, import the full qualified name, and return the object. 

1143 

1144 In case of failure, it throws an exception with the key and section names 

1145 

1146 :return: The object or None, if the option is empty 

1147 """ 

1148 # Fixed: use self.get() instead of conf.get() 

1149 full_qualified_path = self.get(section=section, key=key, **kwargs) 

1150 if not full_qualified_path: 

1151 return None 

1152 

1153 try: 

1154 # Import here to avoid circular dependency 

1155 from ..module_loading import import_string 

1156 

1157 return import_string(full_qualified_path) 

1158 except ImportError as e: 

1159 log.warning(e) 

1160 raise AirflowConfigException( 

1161 f'The object could not be loaded. Please check "{key}" key in "{section}" section. ' 

1162 f'Current value: "{full_qualified_path}".' 

1163 ) 

1164 

1165 def getjson( 

1166 self, section: str, key: str, fallback=None, **kwargs 

1167 ) -> dict | list | str | int | float | None: 

1168 """ 

1169 Return a config value parsed from a JSON string. 

1170 

1171 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given. 

1172 """ 

1173 try: 

1174 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs) 

1175 except (NoSectionError, NoOptionError): 

1176 data = None 

1177 

1178 if data is None or data == "": 

1179 return fallback 

1180 

1181 try: 

1182 return json.loads(data) 

1183 except JSONDecodeError as e: 

1184 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e 

1185 

1186 def gettimedelta( 

1187 self, section: str, key: str, fallback: Any = None, **kwargs 

1188 ) -> datetime.timedelta | None: 

1189 """ 

1190 Get the config value for the given section and key, and convert it into datetime.timedelta object. 

1191 

1192 If the key is missing, then it is considered as `None`. 

1193 

1194 :param section: the section from the config 

1195 :param key: the key defined in the given section 

1196 :param fallback: fallback value when no config value is given, defaults to None 

1197 :raises AirflowConfigException: raised because ValueError or OverflowError 

1198 :return: datetime.timedelta(seconds=<config_value>) or None 

1199 """ 

1200 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs) 

1201 

1202 if val: 

1203 # the given value must be convertible to integer 

1204 try: 

1205 int_val = int(val) 

1206 except ValueError: 

1207 raise AirflowConfigException( 

1208 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1209 f'Current value: "{val}".' 

1210 ) 

1211 

1212 try: 

1213 return datetime.timedelta(seconds=int_val) 

1214 except OverflowError as err: 

1215 raise AirflowConfigException( 

1216 f"Failed to convert value to timedelta in `seconds`. " 

1217 f"{err}. " 

1218 f'Please check "{key}" key in "{section}" section. Current value: "{val}".' 

1219 ) 

1220 

1221 return fallback 

1222 

1223 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: 

1224 """Get mandatory config value, raising ValueError if not found.""" 

1225 value = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1226 if value is None: 

1227 raise ValueError(f"The value {section}/{key} should be set!") 

1228 return value 

1229 

1230 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]: 

1231 """Get mandatory config value as list, raising ValueError if not found.""" 

1232 value = self.getlist(section, key, **kwargs) 

1233 if value is None: 

1234 raise ValueError(f"The value {section}/{key} should be set!") 

1235 return value 

1236 

1237 def read( 

1238 self, 

1239 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike], 

1240 encoding: str | None = None, 

1241 ) -> list[str]: 

1242 return super().read(filenames=filenames, encoding=encoding) 

1243 

1244 def read_dict( # type: ignore[override] 

1245 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>" 

1246 ) -> None: 

1247 """ 

1248 We define a different signature here to add better type hints and checking. 

1249 

1250 :param dictionary: dictionary to read from 

1251 :param source: source to be used to store the configuration 

1252 :return: 

1253 """ 

1254 super().read_dict(dictionary=dictionary, source=source) 

1255 

1256 def get_sections_including_defaults(self) -> list[str]: 

1257 """ 

1258 Retrieve all sections from the configuration parser, including sections defined by built-in defaults. 

1259 

1260 :return: list of section names 

1261 """ 

1262 sections_from_config = self.sections() 

1263 sections_from_description = list(self.configuration_description.keys()) 

1264 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config))) 

1265 

1266 def get_options_including_defaults(self, section: str) -> list[str]: 

1267 """ 

1268 Retrieve all possible options from the configuration parser for the section given. 

1269 

1270 Includes options defined by built-in defaults. 

1271 

1272 :param section: section name 

1273 :return: list of option names for the section given 

1274 """ 

1275 my_own_options = self.options(section) if self.has_section(section) else [] 

1276 all_options_from_defaults = list( 

1277 self.configuration_description.get(section, {}).get("options", {}).keys() 

1278 ) 

1279 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options))) 

1280 

1281 def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True, **kwargs) -> bool: 

1282 """ 

1283 Check if option is defined. 

1284 

1285 Uses self.get() to avoid reimplementing the priority order of config variables 

1286 (env, config, cmd, defaults). 

1287 

1288 :param section: section to get option from 

1289 :param option: option to get 

1290 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections 

1291 :param kwargs: additional keyword arguments to pass to get(), such as team_name 

1292 :return: 

1293 """ 

1294 try: 

1295 value = self.get( 

1296 section, 

1297 option, 

1298 fallback=None, 

1299 _extra_stacklevel=1, 

1300 suppress_warnings=True, 

1301 lookup_from_deprecated=lookup_from_deprecated, 

1302 **kwargs, 

1303 ) 

1304 if value is None: 

1305 return False 

1306 return True 

1307 except (NoOptionError, NoSectionError, AirflowConfigException): 

1308 return False 

1309 

1310 def set(self, section: str, option: str, value: str | None = None) -> None: 

1311 """ 

1312 Set an option to the given value. 

1313 

1314 This override just makes sure the section and option are lower case, to match what we do in `get`. 

1315 """ 

1316 section = section.lower() 

1317 option = option.lower() 

1318 defaults = self.configuration_description or {} 

1319 if not self.has_section(section) and section in defaults: 

1320 # Trying to set a key in a section that exists in default, but not in the user config; 

1321 # automatically create it 

1322 self.add_section(section) 

1323 super().set(section, option, value) 

1324 

1325 def remove_option(self, section: str, option: str, remove_default: bool = True): 

1326 """ 

1327 Remove an option if it exists in config from a file or default config. 

1328 

1329 If both of config have the same option, this removes the option 

1330 in both configs unless remove_default=False. 

1331 """ 

1332 section = section.lower() 

1333 option = option.lower() 

1334 if super().has_option(section, option): 

1335 super().remove_option(section, option) 

1336 

1337 if self.get_default_value(section, option) is not None and remove_default: 

1338 self._default_values.remove_option(section, option) 

1339 

1340 def optionxform(self, optionstr: str) -> str: 

1341 """ 

1342 Transform option names on every read, get, or set operation. 

1343 

1344 This changes from the default behaviour of ConfigParser from lower-casing 

1345 to instead be case-preserving. 

1346 

1347 :param optionstr: 

1348 :return: 

1349 """ 

1350 return optionstr 

1351 

1352 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: 

1353 """ 

1354 Get list of config sources to use in as_dict(). 

1355 

1356 Subclasses can override to add additional sources (e.g., provider configs). 

1357 """ 

1358 return [ 

1359 ("default", self._default_values), 

1360 ("airflow.cfg", self), 

1361 ] 

1362 

1363 def as_dict( 

1364 self, 

1365 display_source: bool = False, 

1366 display_sensitive: bool = False, 

1367 raw: bool = False, 

1368 include_env: bool = True, 

1369 include_cmds: bool = True, 

1370 include_secret: bool = True, 

1371 ) -> ConfigSourcesType: 

1372 """ 

1373 Return the current configuration as an OrderedDict of OrderedDicts. 

1374 

1375 When materializing current configuration Airflow defaults are 

1376 materialized along with user set configs. If any of the `include_*` 

1377 options are False then the result of calling command or secret key 

1378 configs do not override Airflow defaults and instead are passed through. 

1379 In order to then avoid Airflow defaults from overwriting user set 

1380 command or secret key configs we filter out bare sensitive_config_values 

1381 that are set to Airflow defaults when command or secret key configs 

1382 produce different values. 

1383 

1384 :param display_source: If False, the option value is returned. If True, 

1385 a tuple of (option_value, source) is returned. Source is either 

1386 'airflow.cfg', 'default', 'env var', or 'cmd'. 

1387 :param display_sensitive: If True, the values of options set by env 

1388 vars and bash commands will be displayed. If False, those options 

1389 are shown as '< hidden >' 

1390 :param raw: Should the values be output as interpolated values, or the 

1391 "raw" form that can be fed back in to ConfigParser 

1392 :param include_env: Should the value of configuration from AIRFLOW__ 

1393 environment variables be included or not 

1394 :param include_cmds: Should the result of calling any ``*_cmd`` config be 

1395 set (True, default), or should the _cmd options be left as the 

1396 command to run (False) 

1397 :param include_secret: Should the result of calling any ``*_secret`` config be 

1398 set (True, default), or should the _secret options be left as the 

1399 path to get the secret from (False) 

1400 :return: Dictionary, where the key is the name of the section and the content is 

1401 the dictionary with the name of the parameter and its value. 

1402 """ 

1403 if not display_sensitive: 

1404 # We want to hide the sensitive values at the appropriate methods 

1405 # since envs from cmds, secrets can be read at _include_envs method 

1406 if not all([include_env, include_cmds, include_secret]): 

1407 raise ValueError( 

1408 "If display_sensitive is false, then include_env, " 

1409 "include_cmds, include_secret must all be set as True" 

1410 ) 

1411 

1412 config_sources: ConfigSourcesType = {} 

1413 

1414 # We check sequentially all those sources and the last one we saw it in will "win" 

1415 configs = self._get_config_sources_for_as_dict() 

1416 

1417 self._replace_config_with_display_sources( 

1418 config_sources, 

1419 configs, 

1420 self.configuration_description, 

1421 display_source, 

1422 raw, 

1423 self.deprecated_options, 

1424 include_cmds=include_cmds, 

1425 include_env=include_env, 

1426 include_secret=include_secret, 

1427 ) 

1428 

1429 # add env vars and overwrite because they have priority 

1430 if include_env: 

1431 self._include_envs(config_sources, display_sensitive, display_source, raw) 

1432 else: 

1433 self._filter_by_source(config_sources, display_source, self._get_env_var_option) 

1434 

1435 # add bash commands 

1436 if include_cmds: 

1437 self._include_commands(config_sources, display_sensitive, display_source, raw) 

1438 else: 

1439 self._filter_by_source(config_sources, display_source, self._get_cmd_option) 

1440 

1441 # add config from secret backends 

1442 if include_secret: 

1443 self._include_secrets(config_sources, display_sensitive, display_source, raw) 

1444 else: 

1445 self._filter_by_source(config_sources, display_source, self._get_secret_option) 

1446 

1447 if not display_sensitive: 

1448 # This ensures the ones from config file is hidden too 

1449 # if they are not provided through env, cmd and secret 

1450 hidden = "< hidden >" 

1451 for section, key in self.sensitive_config_values: 

1452 if config_sources.get(section): 

1453 if config_sources[section].get(key, None): 

1454 if display_source: 

1455 source = config_sources[section][key][1] 

1456 config_sources[section][key] = (hidden, source) 

1457 else: 

1458 config_sources[section][key] = hidden 

1459 

1460 return config_sources 

1461 

1462 def _write_option_header( 

1463 self, 

1464 file: IO[str], 

1465 option: str, 

1466 extra_spacing: bool, 

1467 include_descriptions: bool, 

1468 include_env_vars: bool, 

1469 include_examples: bool, 

1470 include_sources: bool, 

1471 section_config_description: dict[str, dict[str, Any]], 

1472 section_to_write: str, 

1473 sources_dict: ConfigSourcesType, 

1474 ) -> tuple[bool, bool]: 

1475 """ 

1476 Write header for configuration option. 

1477 

1478 Returns tuple of (should_continue, needs_separation) where needs_separation should be 

1479 set if the option needs additional separation to visually separate it from the next option. 

1480 """ 

1481 option_config_description = ( 

1482 section_config_description.get("options", {}).get(option, {}) 

1483 if section_config_description 

1484 else {} 

1485 ) 

1486 description = option_config_description.get("description") 

1487 needs_separation = False 

1488 if description and include_descriptions: 

1489 for line in description.splitlines(): 

1490 file.write(f"# {line}\n") 

1491 needs_separation = True 

1492 example = option_config_description.get("example") 

1493 if example is not None and include_examples: 

1494 if extra_spacing: 

1495 file.write("#\n") 

1496 example_lines = example.splitlines() 

1497 example = "\n# ".join(example_lines) 

1498 file.write(f"# Example: {option} = {example}\n") 

1499 needs_separation = True 

1500 if include_sources and sources_dict: 

1501 sources_section = sources_dict.get(section_to_write) 

1502 value_with_source = sources_section.get(option) if sources_section else None 

1503 if value_with_source is None: 

1504 file.write("#\n# Source: not defined\n") 

1505 else: 

1506 file.write(f"#\n# Source: {value_with_source[1]}\n") 

1507 needs_separation = True 

1508 if include_env_vars: 

1509 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n") 

1510 if extra_spacing: 

1511 file.write("#\n") 

1512 needs_separation = True 

1513 return True, needs_separation 

1514 

1515 def is_template(self, section: str, key) -> bool: 

1516 """ 

1517 Return whether the value is templated. 

1518 

1519 :param section: section of the config 

1520 :param key: key in the section 

1521 :return: True if the value is templated 

1522 """ 

1523 return _is_template(self.configuration_description, section, key) 

1524 

1525 def getsection(self, section: str, team_name: str | None = None) -> ConfigOptionsDictType | None: 

1526 """ 

1527 Return the section as a dict. 

1528 

1529 Values are converted to int, float, bool as required. 

1530 

1531 :param section: section from the config 

1532 :param team_name: optional team name for team-specific configuration lookup 

1533 """ 

1534 # Handle team-specific section lookup for config file 

1535 config_section = f"{team_name}={section}" if team_name else section 

1536 

1537 if not self.has_section(config_section) and not self._default_values.has_section(config_section): 

1538 return None 

1539 if self._default_values.has_section(config_section): 

1540 _section: ConfigOptionsDictType = dict(self._default_values.items(config_section)) 

1541 else: 

1542 _section = {} 

1543 

1544 if self.has_section(config_section): 

1545 _section.update(self.items(config_section)) 

1546 

1547 # Use section (not config_section) for env var lookup - team_name is handled by _env_var_name 

1548 section_prefix = self._env_var_name(section, "", team_name=team_name) 

1549 for env_var in sorted(os.environ.keys()): 

1550 if env_var.startswith(section_prefix): 

1551 key = env_var.replace(section_prefix, "") 

1552 if key.endswith("_CMD"): 

1553 key = key[:-4] 

1554 key = key.lower() 

1555 _section[key] = self._get_env_var_option(section, key, team_name=team_name) 

1556 

1557 for key, val in _section.items(): 

1558 if val is None: 

1559 raise AirflowConfigException( 

1560 f"Failed to convert value automatically. " 

1561 f'Please check "{key}" key in "{section}" section is set.' 

1562 ) 

1563 try: 

1564 _section[key] = int(val) 

1565 except ValueError: 

1566 try: 

1567 _section[key] = float(val) 

1568 except ValueError: 

1569 if isinstance(val, str) and val.lower() in ("t", "true"): 

1570 _section[key] = True 

1571 elif isinstance(val, str) and val.lower() in ("f", "false"): 

1572 _section[key] = False 

1573 return _section 

1574 

1575 @staticmethod 

1576 def _write_section_header( 

1577 file: IO[str], 

1578 include_descriptions: bool, 

1579 section_config_description: dict[str, str], 

1580 section_to_write: str, 

1581 ) -> None: 

1582 """Write header for configuration section.""" 

1583 file.write(f"[{section_to_write}]\n") 

1584 section_description = section_config_description.get("description") 

1585 if section_description and include_descriptions: 

1586 for line in section_description.splitlines(): 

1587 file.write(f"# {line}\n") 

1588 file.write("\n") 

1589 

1590 def _write_value( 

1591 self, 

1592 file: IO[str], 

1593 option: str, 

1594 comment_out_everything: bool, 

1595 needs_separation: bool, 

1596 only_defaults: bool, 

1597 section_to_write: str, 

1598 hide_sensitive: bool, 

1599 is_sensitive: bool, 

1600 show_values: bool = False, 

1601 ): 

1602 default_value = self.get_default_value(section_to_write, option, raw=True) 

1603 if only_defaults: 

1604 value = default_value 

1605 else: 

1606 value = self.get(section_to_write, option, fallback=default_value, raw=True) 

1607 if not show_values: 

1608 file.write(f"# {option} = \n") 

1609 else: 

1610 if hide_sensitive and is_sensitive: 

1611 value = "< hidden >" 

1612 else: 

1613 pass 

1614 if value is None: 

1615 file.write(f"# {option} = \n") 

1616 else: 

1617 if comment_out_everything: 

1618 value_lines = value.splitlines() 

1619 value = "\n# ".join(value_lines) 

1620 file.write(f"# {option} = {value}\n") 

1621 else: 

1622 if "\n" in value: 

1623 try: 

1624 value = json.dumps(json.loads(value), indent=4) 

1625 value = value.replace( 

1626 "\n", "\n " 

1627 ) # indent multi-line JSON to satisfy configparser format 

1628 except JSONDecodeError: 

1629 pass 

1630 file.write(f"{option} = {value}\n") 

1631 if needs_separation: 

1632 file.write("\n") 

1633 

1634 def write( # type: ignore[override] 

1635 self, 

1636 file: IO[str], 

1637 section: str | None = None, 

1638 include_examples: bool = True, 

1639 include_descriptions: bool = True, 

1640 include_sources: bool = True, 

1641 include_env_vars: bool = True, 

1642 include_providers: bool = True, 

1643 comment_out_everything: bool = False, 

1644 hide_sensitive: bool = False, 

1645 extra_spacing: bool = True, 

1646 only_defaults: bool = False, 

1647 show_values: bool = False, 

1648 **kwargs: Any, 

1649 ) -> None: 

1650 """ 

1651 Write configuration with comments and examples to a file. 

1652 

1653 :param file: file to write to 

1654 :param section: section of the config to write, defaults to all sections 

1655 :param include_examples: Include examples in the output 

1656 :param include_descriptions: Include descriptions in the output 

1657 :param include_sources: Include the source of each config option 

1658 :param include_env_vars: Include environment variables corresponding to each config option 

1659 :param include_providers: Include providers configuration 

1660 :param comment_out_everything: Comment out all values 

1661 :param hide_sensitive_values: Include sensitive values in the output 

1662 :param extra_spacing: Add extra spacing before examples and after variables 

1663 :param only_defaults: Only include default values when writing the config, not the actual values 

1664 """ 

1665 sources_dict = {} 

1666 if include_sources: 

1667 sources_dict = self.as_dict(display_source=True) 

1668 with self.make_sure_configuration_loaded(with_providers=include_providers): 

1669 for section_to_write in self.get_sections_including_defaults(): 

1670 section_config_description = self.configuration_description.get(section_to_write, {}) 

1671 if section_to_write != section and section is not None: 

1672 continue 

1673 if self._default_values.has_section(section_to_write) or self.has_section(section_to_write): 

1674 self._write_section_header( 

1675 file, include_descriptions, section_config_description, section_to_write 

1676 ) 

1677 for option in self.get_options_including_defaults(section_to_write): 

1678 should_continue, needs_separation = self._write_option_header( 

1679 file=file, 

1680 option=option, 

1681 extra_spacing=extra_spacing, 

1682 include_descriptions=include_descriptions, 

1683 include_env_vars=include_env_vars, 

1684 include_examples=include_examples, 

1685 include_sources=include_sources, 

1686 section_config_description=section_config_description, 

1687 section_to_write=section_to_write, 

1688 sources_dict=sources_dict, 

1689 ) 

1690 is_sensitive = ( 

1691 section_to_write.lower(), 

1692 option.lower(), 

1693 ) in self.sensitive_config_values 

1694 self._write_value( 

1695 file=file, 

1696 option=option, 

1697 comment_out_everything=comment_out_everything, 

1698 needs_separation=needs_separation, 

1699 only_defaults=only_defaults, 

1700 section_to_write=section_to_write, 

1701 hide_sensitive=hide_sensitive, 

1702 is_sensitive=is_sensitive, 

1703 show_values=show_values, 

1704 ) 

1705 if include_descriptions and not needs_separation: 

1706 # extra separation between sections in case last option did not need it 

1707 file.write("\n") 

1708 

1709 @contextmanager 

1710 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]: 

1711 """ 

1712 Make sure configuration is loaded with or without providers. 

1713 

1714 This happens regardless if the provider configuration has been loaded before or not. 

1715 Restores configuration to the state before entering the context. 

1716 

1717 :param with_providers: whether providers should be loaded 

1718 """ 

1719 needs_reload = False 

1720 if with_providers: 

1721 self._ensure_providers_config_loaded() 

1722 else: 

1723 needs_reload = self._ensure_providers_config_unloaded() 

1724 yield 

1725 if needs_reload: 

1726 self._reload_provider_configs() 

1727 

1728 def _ensure_providers_config_loaded(self) -> None: 

1729 """Ensure providers configurations are loaded.""" 

1730 raise NotImplementedError("Subclasses must implement _ensure_providers_config_loaded method") 

1731 

1732 def _ensure_providers_config_unloaded(self) -> bool: 

1733 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded.""" 

1734 raise NotImplementedError("Subclasses must implement _ensure_providers_config_unloaded method") 

1735 

1736 def _reload_provider_configs(self) -> None: 

1737 """Reload providers configuration.""" 

1738 raise NotImplementedError("Subclasses must implement _reload_provider_configs method")