Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/_shared/configuration/parser.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

740 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Base configuration parser with pure parsing logic.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import datetime 

24import functools 

25import itertools 

26import json 

27import logging 

28import os 

29import shlex 

30import subprocess 

31import sys 

32import warnings 

33from collections.abc import Callable, Generator, Iterable 

34from configparser import ConfigParser, NoOptionError, NoSectionError 

35from contextlib import contextmanager 

36from enum import Enum 

37from json.decoder import JSONDecodeError 

38from re import Pattern 

39from typing import IO, Any, TypeVar, overload 

40 

41from .exceptions import AirflowConfigException 

42 

43log = logging.getLogger(__name__) 

44 

45 

46ConfigType = str | int | float | bool 

47ConfigOptionsDictType = dict[str, ConfigType] 

48ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

49ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

50ENV_VAR_PREFIX = "AIRFLOW__" 

51 

52 

53class ValueNotFound: 

54 """Object of this is raised when a configuration value cannot be found.""" 

55 

56 pass 

57 

58 

59VALUE_NOT_FOUND_SENTINEL = ValueNotFound() 

60 

61 

62@overload 

63def expand_env_var(env_var: None) -> None: ... 

64@overload 

65def expand_env_var(env_var: str) -> str: ... 

66 

67 

68def expand_env_var(env_var: str | None) -> str | None: 

69 """ 

70 Expand (potentially nested) env vars. 

71 

72 Repeat and apply `expandvars` and `expanduser` until 

73 interpolation stops having any effect. 

74 """ 

75 if not env_var or not isinstance(env_var, str): 

76 return env_var 

77 while True: 

78 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

79 if interpolated == env_var: 

80 return interpolated 

81 env_var = interpolated 

82 

83 

84def run_command(command: str) -> str: 

85 """Run command and returns stdout.""" 

86 process = subprocess.Popen( 

87 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

88 ) 

89 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

90 

91 if process.returncode != 0: 

92 raise AirflowConfigException( 

93 f"Cannot execute {command}. Error code is: {process.returncode}. " 

94 f"Output: {output}, Stderr: {stderr}" 

95 ) 

96 

97 return output 

98 

99 

100def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool: 

101 """ 

102 Check if the config is a template. 

103 

104 :param configuration_description: description of configuration 

105 :param section: section 

106 :param key: key 

107 :return: True if the config is a template 

108 """ 

109 return configuration_description.get(section, {}).get(key, {}).get("is_template", False) 

110 

111 

112class AirflowConfigParser(ConfigParser): 

113 """ 

114 Base configuration parser with pure parsing logic. 

115 

116 This class provides the core parsing methods that work with: 

117 - configuration_description: dict describing config options (required in __init__) 

118 - _default_values: ConfigParser with default values (required in __init__) 

119 - deprecated_options: class attribute mapping new -> old options 

120 - deprecated_sections: class attribute mapping new -> old sections 

121 """ 

122 

123 # A mapping of section -> setting -> { old, replace } for deprecated default values. 

124 # Subclasses can override this to define deprecated values that should be upgraded. 

125 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {} 

126 

127 # A mapping of (new section, new option) -> (old section, old option, since_version). 

128 # When reading new option, the old option will be checked to see if it exists. If it does a 

129 # DeprecationWarning will be issued and the old option will be used instead 

130 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { 

131 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"), 

132 ("api", "host"): ("webserver", "web_server_host", "3.0"), 

133 ("api", "port"): ("webserver", "web_server_port", "3.0"), 

134 ("api", "workers"): ("webserver", "workers", "3.0"), 

135 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"), 

136 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"), 

137 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"), 

138 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"), 

139 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"), 

140 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"), 

141 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"), 

142 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"), 

143 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"), 

144 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"), 

145 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"), 

146 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"), 

147 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"), 

148 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"), 

149 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"), 

150 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"), 

151 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"), 

152 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"), 

153 ("api", "page_size"): ("webserver", "page_size", "3.1.0"), 

154 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"), 

155 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"), 

156 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"), 

157 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"), 

158 ("api", "log_config"): ("api", "access_logfile", "3.1.0"), 

159 } 

160 

161 # A mapping of new section -> (old section, since_version). 

162 deprecated_sections: dict[str, tuple[str, str]] = {} 

163 

164 @property 

165 def _lookup_sequence(self) -> list[Callable]: 

166 """ 

167 Define the sequence of lookup methods for get(). The definition here does not have provider lookup. 

168 

169 Subclasses can override this to customise lookup order. 

170 """ 

171 return [ 

172 self._get_environment_variables, 

173 self._get_option_from_config_file, 

174 self._get_option_from_commands, 

175 self._get_option_from_secrets, 

176 self._get_option_from_defaults, 

177 ] 

178 

179 @property 

180 def _validators(self) -> list[Callable[[], None]]: 

181 """ 

182 Return list of validators defined on a config parser class. Base class will return an empty list. 

183 

184 Subclasses can override this to customize the validators that are run during validation on the 

185 config parser instance. 

186 """ 

187 return [] 

188 

189 def validate(self) -> None: 

190 """Run all registered validators.""" 

191 for validator in self._validators: 

192 validator() 

193 self.is_validated = True 

194 

195 def _validate_deprecated_values(self) -> None: 

196 """Validate and upgrade deprecated default values.""" 

197 for section, replacement in self.deprecated_values.items(): 

198 for name, info in replacement.items(): 

199 old, new = info 

200 current_value = self.get(section, name, fallback="") 

201 if self._using_old_value(old, current_value): 

202 self.upgraded_values[(section, name)] = current_value 

203 new_value = old.sub(new, current_value) 

204 self._update_env_var(section=section, name=name, new_value=new_value) 

205 self._create_future_warning( 

206 name=name, 

207 section=section, 

208 current_value=current_value, 

209 new_value=new_value, 

210 ) 

211 

212 def _using_old_value(self, old: Pattern, current_value: str) -> bool: 

213 """Check if current_value matches the old pattern.""" 

214 return old.search(current_value) is not None 

215 

216 def _update_env_var(self, section: str, name: str, new_value: str) -> None: 

217 """Update environment variable with new value.""" 

218 env_var = self._env_var_name(section, name) 

219 # Set it as an env var so that any subprocesses keep the same override! 

220 os.environ[env_var] = new_value 

221 

222 @staticmethod 

223 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None: 

224 """Create a FutureWarning for deprecated default values.""" 

225 warnings.warn( 

226 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. " 

227 f"This value has been changed to {new_value!r} in the running config, but please update your config.", 

228 FutureWarning, 

229 stacklevel=3, 

230 ) 

231 

232 def __init__( 

233 self, 

234 configuration_description: dict[str, dict[str, Any]], 

235 _default_values: ConfigParser, 

236 *args, 

237 **kwargs, 

238 ): 

239 """ 

240 Initialize the parser. 

241 

242 :param configuration_description: Description of configuration options 

243 :param _default_values: ConfigParser with default values 

244 """ 

245 super().__init__(*args, **kwargs) 

246 self.configuration_description = configuration_description 

247 self._default_values = _default_values 

248 self._suppress_future_warnings = False 

249 self.upgraded_values = {} 

250 

251 @functools.cached_property 

252 def inversed_deprecated_options(self): 

253 """Build inverse mapping from old options to new options.""" 

254 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()} 

255 

256 @functools.cached_property 

257 def inversed_deprecated_sections(self): 

258 """Build inverse mapping from old sections to new sections.""" 

259 return { 

260 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items() 

261 } 

262 

263 @functools.cached_property 

264 def sensitive_config_values(self) -> set[tuple[str, str]]: 

265 """Get set of sensitive config values that should be masked.""" 

266 flattened = { 

267 (s, k): item 

268 for s, s_c in self.configuration_description.items() 

269 for k, item in s_c.get("options", {}).items() 

270 } 

271 sensitive = { 

272 (section.lower(), key.lower()) 

273 for (section, key), v in flattened.items() 

274 if v.get("sensitive") is True 

275 } 

276 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options} 

277 depr_section = { 

278 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections 

279 } 

280 sensitive.update(depr_section, depr_option) 

281 return sensitive 

282 

283 @overload # type: ignore[override] 

284 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... 

285 

286 @overload 

287 def get(self, section: str, key: str, **kwargs) -> str | None: ... 

288 

289 def _update_defaults_from_string(self, config_string: str) -> None: 

290 """ 

291 Update the defaults in _default_values based on values in config_string ("ini" format). 

292 

293 Override shared parser's method to add validation for template variables. 

294 Note that those values are not validated and cannot contain variables because we are using 

295 regular config parser to load them. This method is used to test the config parser in unit tests. 

296 

297 :param config_string: ini-formatted config string 

298 """ 

299 parser = ConfigParser() 

300 parser.read_string(config_string) 

301 for section in parser.sections(): 

302 if section not in self._default_values.sections(): 

303 self._default_values.add_section(section) 

304 errors = False 

305 for key, value in parser.items(section): 

306 if not self.is_template(section, key) and "{" in value: 

307 errors = True 

308 log.error( 

309 "The %s.%s value %s read from string contains variable. This is not supported", 

310 section, 

311 key, 

312 value, 

313 ) 

314 self._default_values.set(section, key, value) 

315 if errors: 

316 raise AirflowConfigException( 

317 f"The string config passed as default contains variables. " 

318 f"This is not supported. String config: {config_string}" 

319 ) 

320 

321 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any: 

322 """ 

323 Retrieve default value from default config parser. 

324 

325 This will retrieve the default value from the default config parser. Optionally a raw, stored 

326 value can be retrieved by setting skip_interpolation to True. This is useful for example when 

327 we want to write the default value to a file, and we don't want the interpolation to happen 

328 as it is going to be done later when the config is read. 

329 

330 :param section: section of the config 

331 :param key: key to use 

332 :param fallback: fallback value to use 

333 :param raw: if raw, then interpolation will be reversed 

334 :param kwargs: other args 

335 :return: 

336 """ 

337 value = self._default_values.get(section, key, fallback=fallback, **kwargs) 

338 if raw and value is not None: 

339 return value.replace("%", "%%") 

340 return value 

341 

342 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None: 

343 """ 

344 Get Secret Backend if defined in airflow.cfg. 

345 

346 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

347 """ 

348 section = "workers" if worker_mode else "secrets" 

349 key = "secrets_backend" if worker_mode else "backend" 

350 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs" 

351 

352 secrets_backend_cls = self.getimport(section=section, key=key) 

353 

354 if not secrets_backend_cls: 

355 if worker_mode: 

356 # if we find no secrets backend for worker, return that of secrets backend 

357 secrets_backend_cls = self.getimport(section="secrets", key="backend") 

358 if not secrets_backend_cls: 

359 return None 

360 # When falling back to secrets backend, use its kwargs 

361 kwargs_key = "backend_kwargs" 

362 section = "secrets" 

363 else: 

364 return None 

365 

366 try: 

367 backend_kwargs = self.getjson(section=section, key=kwargs_key) 

368 if not backend_kwargs: 

369 backend_kwargs = {} 

370 elif not isinstance(backend_kwargs, dict): 

371 raise ValueError("not a dict") 

372 except AirflowConfigException: 

373 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key) 

374 backend_kwargs = {} 

375 except ValueError: 

376 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key) 

377 backend_kwargs = {} 

378 

379 return secrets_backend_cls(**backend_kwargs) 

380 

381 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

382 """ 

383 Get Config option values from Secret Backend. 

384 

385 Called by the shared parser's _get_secret_option() method as part of the lookup chain. 

386 Uses _get_custom_secret_backend() to get the backend instance. 

387 

388 :param config_key: the config key to retrieve 

389 :return: config value or None 

390 """ 

391 try: 

392 secrets_client = self._get_custom_secret_backend() 

393 if not secrets_client: 

394 return None 

395 return secrets_client.get_config(config_key) 

396 except Exception as e: 

397 raise AirflowConfigException( 

398 "Cannot retrieve config from alternative secrets backend. " 

399 "Make sure it is configured properly and that the Backend " 

400 "is accessible.\n" 

401 f"{e}" 

402 ) 

403 

404 def _get_cmd_option_from_config_sources( 

405 self, config_sources: ConfigSourcesType, section: str, key: str 

406 ) -> str | None: 

407 fallback_key = key + "_cmd" 

408 if (section, key) in self.sensitive_config_values: 

409 section_dict = config_sources.get(section) 

410 if section_dict is not None: 

411 command_value = section_dict.get(fallback_key) 

412 if command_value is not None: 

413 if isinstance(command_value, str): 

414 command = command_value 

415 else: 

416 command = command_value[0] 

417 return run_command(command) 

418 return None 

419 

420 def _get_secret_option_from_config_sources( 

421 self, config_sources: ConfigSourcesType, section: str, key: str 

422 ) -> str | None: 

423 fallback_key = key + "_secret" 

424 if (section, key) in self.sensitive_config_values: 

425 section_dict = config_sources.get(section) 

426 if section_dict is not None: 

427 secrets_path_value = section_dict.get(fallback_key) 

428 if secrets_path_value is not None: 

429 if isinstance(secrets_path_value, str): 

430 secrets_path = secrets_path_value 

431 else: 

432 secrets_path = secrets_path_value[0] 

433 return self._get_config_value_from_secret_backend(secrets_path) 

434 return None 

435 

436 def _include_secrets( 

437 self, 

438 config_sources: ConfigSourcesType, 

439 display_sensitive: bool, 

440 display_source: bool, 

441 raw: bool, 

442 ): 

443 for section, key in self.sensitive_config_values: 

444 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key) 

445 if value: 

446 if not display_sensitive: 

447 value = "< hidden >" 

448 if display_source: 

449 opt: str | tuple[str, str] = (value, "secret") 

450 elif raw: 

451 opt = value.replace("%", "%%") 

452 else: 

453 opt = value 

454 config_sources.setdefault(section, {}).update({key: opt}) 

455 del config_sources[section][key + "_secret"] 

456 

457 def _include_commands( 

458 self, 

459 config_sources: ConfigSourcesType, 

460 display_sensitive: bool, 

461 display_source: bool, 

462 raw: bool, 

463 ): 

464 for section, key in self.sensitive_config_values: 

465 opt = self._get_cmd_option_from_config_sources(config_sources, section, key) 

466 if not opt: 

467 continue 

468 opt_to_set: str | tuple[str, str] | None = opt 

469 if not display_sensitive: 

470 opt_to_set = "< hidden >" 

471 if display_source: 

472 opt_to_set = (str(opt_to_set), "cmd") 

473 elif raw: 

474 opt_to_set = str(opt_to_set).replace("%", "%%") 

475 if opt_to_set is not None: 

476 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set} 

477 config_sources.setdefault(section, {}).update(dict_to_update) 

478 del config_sources[section][key + "_cmd"] 

479 

480 def _include_envs( 

481 self, 

482 config_sources: ConfigSourcesType, 

483 display_sensitive: bool, 

484 display_source: bool, 

485 raw: bool, 

486 ): 

487 for env_var in [ 

488 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX) 

489 ]: 

490 try: 

491 _, section, key = env_var.split("__", 2) 

492 opt = self._get_env_var_option(section, key) 

493 except ValueError: 

494 continue 

495 if opt is None: 

496 log.warning("Ignoring unknown env var '%s'", env_var) 

497 continue 

498 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"): 

499 # Don't hide cmd/secret values here 

500 if not env_var.lower().endswith(("cmd", "secret")): 

501 if (section, key) in self.sensitive_config_values: 

502 opt = "< hidden >" 

503 elif raw: 

504 opt = opt.replace("%", "%%") 

505 if display_source: 

506 opt = (opt, "env var") 

507 

508 section = section.lower() 

509 key = key.lower() 

510 config_sources.setdefault(section, {}).update({key: opt}) 

511 

512 def _filter_by_source( 

513 self, 

514 config_sources: ConfigSourcesType, 

515 display_source: bool, 

516 getter_func, 

517 ): 

518 """ 

519 Delete default configs from current configuration. 

520 

521 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values. 

522 

523 This is necessary because bare configs take precedence over the command 

524 or secret key equivalents so if the current running config is 

525 materialized with Airflow defaults they in turn override user set 

526 command or secret key configs. 

527 

528 :param config_sources: The current configuration to operate on 

529 :param display_source: If False, configuration options contain raw 

530 values. If True, options are a tuple of (option_value, source). 

531 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'. 

532 :param getter_func: A callback function that gets the user configured 

533 override value for a particular sensitive_config_values config. 

534 :return: None, the given config_sources is filtered if necessary, 

535 otherwise untouched. 

536 """ 

537 for section, key in self.sensitive_config_values: 

538 # Don't bother if we don't have section / key 

539 if section not in config_sources or key not in config_sources[section]: 

540 continue 

541 # Check that there is something to override defaults 

542 try: 

543 getter_opt = getter_func(section, key) 

544 except ValueError: 

545 continue 

546 if not getter_opt: 

547 continue 

548 # Check to see that there is a default value 

549 if self.get_default_value(section, key) is None: 

550 continue 

551 # Check to see if bare setting is the same as defaults 

552 if display_source: 

553 # when display_source = true, we know that the config_sources contains tuple 

554 opt, source = config_sources[section][key] # type: ignore 

555 else: 

556 opt = config_sources[section][key] 

557 if opt == self.get_default_value(section, key): 

558 del config_sources[section][key] 

559 

560 @staticmethod 

561 def _deprecated_value_is_set_in_config( 

562 deprecated_section: str, 

563 deprecated_key: str, 

564 configs: Iterable[tuple[str, ConfigParser]], 

565 ) -> bool: 

566 for config_type, config in configs: 

567 if config_type != "default": 

568 with contextlib.suppress(NoSectionError): 

569 deprecated_section_array = config.items(section=deprecated_section, raw=True) 

570 if any(key == deprecated_key for key, _ in deprecated_section_array): 

571 return True 

572 return False 

573 

574 @staticmethod 

575 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

576 return ( 

577 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}") 

578 is not None 

579 ) 

580 

581 @staticmethod 

582 def _deprecated_command_is_set_in_config( 

583 deprecated_section: str, 

584 deprecated_key: str, 

585 configs: Iterable[tuple[str, ConfigParser]], 

586 ) -> bool: 

587 return AirflowConfigParser._deprecated_value_is_set_in_config( 

588 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs 

589 ) 

590 

591 @staticmethod 

592 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

593 return ( 

594 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD") 

595 is not None 

596 ) 

597 

598 @staticmethod 

599 def _deprecated_secret_is_set_in_config( 

600 deprecated_section: str, 

601 deprecated_key: str, 

602 configs: Iterable[tuple[str, ConfigParser]], 

603 ) -> bool: 

604 return AirflowConfigParser._deprecated_value_is_set_in_config( 

605 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs 

606 ) 

607 

608 @staticmethod 

609 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

610 return ( 

611 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET") 

612 is not None 

613 ) 

614 

615 @staticmethod 

616 def _replace_config_with_display_sources( 

617 config_sources: ConfigSourcesType, 

618 configs: Iterable[tuple[str, ConfigParser]], 

619 configuration_description: dict[str, dict[str, Any]], 

620 display_source: bool, 

621 raw: bool, 

622 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

623 include_env: bool, 

624 include_cmds: bool, 

625 include_secret: bool, 

626 ): 

627 for source_name, config in configs: 

628 sections = config.sections() 

629 for section in sections: 

630 AirflowConfigParser._replace_section_config_with_display_sources( 

631 config, 

632 config_sources, 

633 configuration_description, 

634 display_source, 

635 raw, 

636 section, 

637 source_name, 

638 deprecated_options, 

639 configs, 

640 include_env=include_env, 

641 include_cmds=include_cmds, 

642 include_secret=include_secret, 

643 ) 

644 

645 @staticmethod 

646 def _replace_section_config_with_display_sources( 

647 config: ConfigParser, 

648 config_sources: ConfigSourcesType, 

649 configuration_description: dict[str, dict[str, Any]], 

650 display_source: bool, 

651 raw: bool, 

652 section: str, 

653 source_name: str, 

654 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

655 configs: Iterable[tuple[str, ConfigParser]], 

656 include_env: bool, 

657 include_cmds: bool, 

658 include_secret: bool, 

659 ): 

660 sect = config_sources.setdefault(section, {}) 

661 if isinstance(config, AirflowConfigParser): 

662 with config.suppress_future_warnings(): 

663 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw) 

664 else: 

665 items = config.items(section=section, raw=raw) 

666 for k, val in items: 

667 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None)) 

668 if deprecated_section and deprecated_key: 

669 if source_name == "default": 

670 # If deprecated entry has some non-default value set for any of the sources requested, 

671 # We should NOT set default for the new entry (because it will override anything 

672 # coming from the deprecated ones) 

673 if AirflowConfigParser._deprecated_value_is_set_in_config( 

674 deprecated_section, deprecated_key, configs 

675 ): 

676 continue 

677 if include_env and AirflowConfigParser._deprecated_variable_is_set( 

678 deprecated_section, deprecated_key 

679 ): 

680 continue 

681 if include_cmds and ( 

682 AirflowConfigParser._deprecated_variable_command_is_set( 

683 deprecated_section, deprecated_key 

684 ) 

685 or AirflowConfigParser._deprecated_command_is_set_in_config( 

686 deprecated_section, deprecated_key, configs 

687 ) 

688 ): 

689 continue 

690 if include_secret and ( 

691 AirflowConfigParser._deprecated_variable_secret_is_set( 

692 deprecated_section, deprecated_key 

693 ) 

694 or AirflowConfigParser._deprecated_secret_is_set_in_config( 

695 deprecated_section, deprecated_key, configs 

696 ) 

697 ): 

698 continue 

699 if display_source: 

700 updated_source_name = source_name 

701 if source_name == "default": 

702 # defaults can come from other sources (default-<PROVIDER>) that should be used here 

703 source_description_section = configuration_description.get(section, {}) 

704 source_description_key = source_description_section.get("options", {}).get(k, {}) 

705 if source_description_key is not None: 

706 updated_source_name = source_description_key.get("source", source_name) 

707 sect[k] = (val, updated_source_name) 

708 else: 

709 sect[k] = val 

710 

711 def _warn_deprecate( 

712 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int 

713 ): 

714 """Warn about deprecated config option usage.""" 

715 if section == deprecated_section: 

716 warnings.warn( 

717 f"The {deprecated_name} option in [{section}] has been renamed to {key} - " 

718 f"the old setting has been used, but please update your config.", 

719 DeprecationWarning, 

720 stacklevel=4 + extra_stacklevel, 

721 ) 

722 else: 

723 warnings.warn( 

724 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option " 

725 f"in [{section}] - the old setting has been used, but please update your config.", 

726 DeprecationWarning, 

727 stacklevel=4 + extra_stacklevel, 

728 ) 

729 

730 @contextmanager 

731 def suppress_future_warnings(self): 

732 """ 

733 Context manager to temporarily suppress future warnings. 

734 

735 This is a stub used by the shared parser's lookup methods when checking deprecated options. 

736 Subclasses can override this to customize warning suppression behavior. 

737 

738 :return: context manager that suppresses future warnings 

739 """ 

740 suppress_future_warnings = self._suppress_future_warnings 

741 self._suppress_future_warnings = True 

742 yield self 

743 self._suppress_future_warnings = suppress_future_warnings 

744 

745 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str: 

746 """Generate environment variable name for a config option.""" 

747 team_component: str = f"{team_name.upper()}___" if team_name else "" 

748 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}" 

749 

750 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None): 

751 """Get config option from environment variable.""" 

752 env_var: str = self._env_var_name(section, key, team_name=team_name) 

753 if env_var in os.environ: 

754 return expand_env_var(os.environ[env_var]) 

755 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command) 

756 env_var_cmd = env_var + "_CMD" 

757 if env_var_cmd in os.environ: 

758 # if this is a valid command key... 

759 if (section, key) in self.sensitive_config_values: 

760 return run_command(os.environ[env_var_cmd]) 

761 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend) 

762 env_var_secret_path = env_var + "_SECRET" 

763 if env_var_secret_path in os.environ: 

764 # if this is a valid secret path... 

765 if (section, key) in self.sensitive_config_values: 

766 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path]) 

767 return None 

768 

769 def _get_cmd_option(self, section: str, key: str): 

770 """Get config option from command execution.""" 

771 fallback_key = key + "_cmd" 

772 if (section, key) in self.sensitive_config_values: 

773 if super().has_option(section, fallback_key): 

774 command = super().get(section, fallback_key) 

775 try: 

776 cmd_output = run_command(command) 

777 except AirflowConfigException as e: 

778 raise e 

779 except Exception as e: 

780 raise AirflowConfigException( 

781 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd." 

782 f" Please check the {fallback_key} value." 

783 ) from e 

784 return cmd_output 

785 return None 

786 

787 def _get_secret_option(self, section: str, key: str) -> str | None: 

788 """Get Config option values from Secret Backend.""" 

789 fallback_key = key + "_secret" 

790 if (section, key) in self.sensitive_config_values: 

791 if super().has_option(section, fallback_key): 

792 secrets_path = super().get(section, fallback_key) 

793 return self._get_config_value_from_secret_backend(secrets_path) 

794 return None 

795 

796 def _get_environment_variables( 

797 self, 

798 deprecated_key: str | None, 

799 deprecated_section: str | None, 

800 key: str, 

801 section: str, 

802 issue_warning: bool = True, 

803 extra_stacklevel: int = 0, 

804 **kwargs, 

805 ) -> str | ValueNotFound: 

806 """Get config option from environment variables.""" 

807 team_name = kwargs.get("team_name", None) 

808 option = self._get_env_var_option(section, key, team_name=team_name) 

809 if option is not None: 

810 return option 

811 if deprecated_section and deprecated_key: 

812 with self.suppress_future_warnings(): 

813 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name) 

814 if option is not None: 

815 if issue_warning: 

816 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

817 return option 

818 return VALUE_NOT_FOUND_SENTINEL 

819 

820 def _get_option_from_config_file( 

821 self, 

822 deprecated_key: str | None, 

823 deprecated_section: str | None, 

824 key: str, 

825 section: str, 

826 issue_warning: bool = True, 

827 extra_stacklevel: int = 0, 

828 **kwargs, 

829 ) -> str | ValueNotFound: 

830 """Get config option from config file.""" 

831 if team_name := kwargs.get("team_name", None): 

832 section = f"{team_name}={section}" 

833 # since this is the last lookup that supports team_name, pop it 

834 kwargs.pop("team_name") 

835 if super().has_option(section, key): 

836 return expand_env_var(super().get(section, key, **kwargs)) 

837 if deprecated_section and deprecated_key: 

838 if super().has_option(deprecated_section, deprecated_key): 

839 if issue_warning: 

840 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

841 with self.suppress_future_warnings(): 

842 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs)) 

843 return VALUE_NOT_FOUND_SENTINEL 

844 

845 def _get_option_from_commands( 

846 self, 

847 deprecated_key: str | None, 

848 deprecated_section: str | None, 

849 key: str, 

850 section: str, 

851 issue_warning: bool = True, 

852 extra_stacklevel: int = 0, 

853 **kwargs, 

854 ) -> str | ValueNotFound: 

855 """Get config option from command execution.""" 

856 option = self._get_cmd_option(section, key) 

857 if option: 

858 return option 

859 if deprecated_section and deprecated_key: 

860 with self.suppress_future_warnings(): 

861 option = self._get_cmd_option(deprecated_section, deprecated_key) 

862 if option: 

863 if issue_warning: 

864 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

865 return option 

866 return VALUE_NOT_FOUND_SENTINEL 

867 

868 def _get_option_from_secrets( 

869 self, 

870 deprecated_key: str | None, 

871 deprecated_section: str | None, 

872 key: str, 

873 section: str, 

874 issue_warning: bool = True, 

875 extra_stacklevel: int = 0, 

876 **kwargs, 

877 ) -> str | ValueNotFound: 

878 """Get config option from secrets backend.""" 

879 option = self._get_secret_option(section, key) 

880 if option: 

881 return option 

882 if deprecated_section and deprecated_key: 

883 with self.suppress_future_warnings(): 

884 option = self._get_secret_option(deprecated_section, deprecated_key) 

885 if option: 

886 if issue_warning: 

887 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

888 return option 

889 return VALUE_NOT_FOUND_SENTINEL 

890 

891 def _get_option_from_defaults( 

892 self, 

893 deprecated_key: str | None, 

894 deprecated_section: str | None, 

895 key: str, 

896 section: str, 

897 issue_warning: bool = True, 

898 extra_stacklevel: int = 0, 

899 team_name: str | None = None, 

900 **kwargs, 

901 ) -> str | ValueNotFound: 

902 """Get config option from default values.""" 

903 if self.get_default_value(section, key) is not None or "fallback" in kwargs: 

904 return expand_env_var(self.get_default_value(section, key, **kwargs)) 

905 return VALUE_NOT_FOUND_SENTINEL 

906 

907 def _resolve_deprecated_lookup( 

908 self, 

909 section: str, 

910 key: str, 

911 lookup_from_deprecated: bool, 

912 extra_stacklevel: int = 0, 

913 ) -> tuple[str, str, str | None, str | None, bool]: 

914 """ 

915 Resolve deprecated section/key mappings and determine deprecated values. 

916 

917 :param section: Section name (will be lowercased) 

918 :param key: Key name (will be lowercased) 

919 :param lookup_from_deprecated: Whether to lookup from deprecated options 

920 :param extra_stacklevel: Extra stack level for warnings 

921 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted) 

922 """ 

923 section = section.lower() 

924 key = key.lower() 

925 warning_emitted = False 

926 deprecated_section: str | None = None 

927 deprecated_key: str | None = None 

928 

929 if not lookup_from_deprecated: 

930 return section, key, deprecated_section, deprecated_key, warning_emitted 

931 

932 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {}) 

933 if option_description.get("deprecated"): 

934 deprecation_reason = option_description.get("deprecation_reason", "") 

935 warnings.warn( 

936 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}", 

937 DeprecationWarning, 

938 stacklevel=2 + extra_stacklevel, 

939 ) 

940 # For the cases in which we rename whole sections 

941 if section in self.inversed_deprecated_sections: 

942 deprecated_section, deprecated_key = (section, key) 

943 section = self.inversed_deprecated_sections[section] 

944 if not self._suppress_future_warnings: 

945 warnings.warn( 

946 f"The config section [{deprecated_section}] has been renamed to " 

947 f"[{section}]. Please update your `conf.get*` call to use the new name", 

948 FutureWarning, 

949 stacklevel=2 + extra_stacklevel, 

950 ) 

951 # Don't warn about individual rename if the whole section is renamed 

952 warning_emitted = True 

953 elif (section, key) in self.inversed_deprecated_options: 

954 # Handle using deprecated section/key instead of the new section/key 

955 new_section, new_key = self.inversed_deprecated_options[(section, key)] 

956 if not self._suppress_future_warnings and not warning_emitted: 

957 warnings.warn( 

958 f"section/key [{section}/{key}] has been deprecated, you should use" 

959 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the " 

960 "new name", 

961 FutureWarning, 

962 stacklevel=2 + extra_stacklevel, 

963 ) 

964 warning_emitted = True 

965 deprecated_section, deprecated_key = section, key 

966 section, key = (new_section, new_key) 

967 elif section in self.deprecated_sections: 

968 # When accessing the new section name, make sure we check under the old config name 

969 deprecated_key = key 

970 deprecated_section = self.deprecated_sections[section][0] 

971 else: 

972 deprecated_section, deprecated_key, _ = self.deprecated_options.get( 

973 (section, key), (None, None, None) 

974 ) 

975 

976 return section, key, deprecated_section, deprecated_key, warning_emitted 

977 

978 @overload # type: ignore[override] 

979 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... 

980 

981 @overload # type: ignore[override] 

982 def get(self, section: str, key: str, **kwargs) -> str | None: ... 

983 

984 def get( # type: ignore[misc, override] 

985 self, 

986 section: str, 

987 key: str, 

988 suppress_warnings: bool = False, 

989 lookup_from_deprecated: bool = True, 

990 _extra_stacklevel: int = 0, 

991 team_name: str | None = None, 

992 **kwargs, 

993 ) -> str | None: 

994 """ 

995 Get config value by iterating through lookup sequence. 

996 

997 Priority order is defined by _lookup_sequence property. 

998 """ 

999 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup( 

1000 section=section, 

1001 key=key, 

1002 lookup_from_deprecated=lookup_from_deprecated, 

1003 extra_stacklevel=_extra_stacklevel, 

1004 ) 

1005 

1006 if team_name is not None: 

1007 kwargs["team_name"] = team_name 

1008 

1009 for lookup_method in self._lookup_sequence: 

1010 value = lookup_method( 

1011 deprecated_key=deprecated_key, 

1012 deprecated_section=deprecated_section, 

1013 key=key, 

1014 section=section, 

1015 issue_warning=not warning_emitted, 

1016 extra_stacklevel=_extra_stacklevel, 

1017 **kwargs, 

1018 ) 

1019 if value is not VALUE_NOT_FOUND_SENTINEL: 

1020 return value 

1021 

1022 # Check if fallback was explicitly provided (even if None) 

1023 if "fallback" in kwargs: 

1024 return kwargs["fallback"] 

1025 

1026 if not suppress_warnings: 

1027 log.warning("section/key [%s/%s] not found in config", section, key) 

1028 

1029 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config") 

1030 

1031 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override] 

1032 """Get config value as boolean.""" 

1033 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip() 

1034 if "#" in val: 

1035 val = val.split("#")[0].strip() 

1036 if val in ("t", "true", "1"): 

1037 return True 

1038 if val in ("f", "false", "0"): 

1039 return False 

1040 raise AirflowConfigException( 

1041 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. ' 

1042 f'Current value: "{val}".' 

1043 ) 

1044 

1045 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override] 

1046 """Get config value as integer.""" 

1047 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1048 if val is None: 

1049 raise AirflowConfigException( 

1050 f"Failed to convert value None to int. " 

1051 f'Please check "{key}" key in "{section}" section is set.' 

1052 ) 

1053 try: 

1054 return int(val) 

1055 except ValueError: 

1056 raise AirflowConfigException( 

1057 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1058 f'Current value: "{val}".' 

1059 ) 

1060 

1061 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override] 

1062 """Get config value as float.""" 

1063 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1064 if val is None: 

1065 raise AirflowConfigException( 

1066 f"Failed to convert value None to float. " 

1067 f'Please check "{key}" key in "{section}" section is set.' 

1068 ) 

1069 try: 

1070 return float(val) 

1071 except ValueError: 

1072 raise AirflowConfigException( 

1073 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. ' 

1074 f'Current value: "{val}".' 

1075 ) 

1076 

1077 def getlist(self, section: str, key: str, delimiter=",", **kwargs): 

1078 """Get config value as list.""" 

1079 val = self.get(section, key, **kwargs) 

1080 if val is None: 

1081 if "fallback" in kwargs: 

1082 return kwargs["fallback"] 

1083 raise AirflowConfigException( 

1084 f"Failed to convert value None to list. " 

1085 f'Please check "{key}" key in "{section}" section is set.' 

1086 ) 

1087 try: 

1088 return [item.strip() for item in val.split(delimiter)] 

1089 except Exception: 

1090 raise AirflowConfigException( 

1091 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. ' 

1092 f'Current value: "{val}".' 

1093 ) 

1094 

1095 E = TypeVar("E", bound=Enum) 

1096 

1097 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E: 

1098 """Get config value as enum.""" 

1099 val = self.get(section, key, **kwargs) 

1100 enum_names = [enum_item.name for enum_item in enum_class] 

1101 

1102 if val is None: 

1103 raise AirflowConfigException( 

1104 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1105 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}' 

1106 ) 

1107 

1108 try: 

1109 return enum_class[val] 

1110 except KeyError: 

1111 if "fallback" in kwargs and kwargs["fallback"] in enum_names: 

1112 return enum_class[kwargs["fallback"]] 

1113 raise AirflowConfigException( 

1114 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1115 f"the value must be one of {', '.join(enum_names)}" 

1116 ) 

1117 

1118 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]: 

1119 """Get config value as list of enums.""" 

1120 string_list = self.getlist(section, key, delimiter, **kwargs) 

1121 enum_names = [enum_item.name for enum_item in enum_class] 

1122 enum_list = [] 

1123 

1124 for val in string_list: 

1125 try: 

1126 enum_list.append(enum_class[val]) 

1127 except KeyError: 

1128 log.warning( 

1129 "Failed to convert value. Please check %s key in %s section. " 

1130 "it must be one of %s, if not the value is ignored", 

1131 key, 

1132 section, 

1133 ", ".join(enum_names), 

1134 ) 

1135 

1136 return enum_list 

1137 

1138 def getimport(self, section: str, key: str, **kwargs) -> Any: 

1139 """ 

1140 Read options, import the full qualified name, and return the object. 

1141 

1142 In case of failure, it throws an exception with the key and section names 

1143 

1144 :return: The object or None, if the option is empty 

1145 """ 

1146 # Fixed: use self.get() instead of conf.get() 

1147 full_qualified_path = self.get(section=section, key=key, **kwargs) 

1148 if not full_qualified_path: 

1149 return None 

1150 

1151 try: 

1152 # Import here to avoid circular dependency 

1153 from ..module_loading import import_string 

1154 

1155 return import_string(full_qualified_path) 

1156 except ImportError as e: 

1157 log.warning(e) 

1158 raise AirflowConfigException( 

1159 f'The object could not be loaded. Please check "{key}" key in "{section}" section. ' 

1160 f'Current value: "{full_qualified_path}".' 

1161 ) 

1162 

1163 def getjson( 

1164 self, section: str, key: str, fallback=None, **kwargs 

1165 ) -> dict | list | str | int | float | None: 

1166 """ 

1167 Return a config value parsed from a JSON string. 

1168 

1169 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given. 

1170 """ 

1171 try: 

1172 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs) 

1173 except (NoSectionError, NoOptionError): 

1174 data = None 

1175 

1176 if data is None or data == "": 

1177 return fallback 

1178 

1179 try: 

1180 return json.loads(data) 

1181 except JSONDecodeError as e: 

1182 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e 

1183 

1184 def gettimedelta( 

1185 self, section: str, key: str, fallback: Any = None, **kwargs 

1186 ) -> datetime.timedelta | None: 

1187 """ 

1188 Get the config value for the given section and key, and convert it into datetime.timedelta object. 

1189 

1190 If the key is missing, then it is considered as `None`. 

1191 

1192 :param section: the section from the config 

1193 :param key: the key defined in the given section 

1194 :param fallback: fallback value when no config value is given, defaults to None 

1195 :raises AirflowConfigException: raised because ValueError or OverflowError 

1196 :return: datetime.timedelta(seconds=<config_value>) or None 

1197 """ 

1198 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs) 

1199 

1200 if val: 

1201 # the given value must be convertible to integer 

1202 try: 

1203 int_val = int(val) 

1204 except ValueError: 

1205 raise AirflowConfigException( 

1206 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1207 f'Current value: "{val}".' 

1208 ) 

1209 

1210 try: 

1211 return datetime.timedelta(seconds=int_val) 

1212 except OverflowError as err: 

1213 raise AirflowConfigException( 

1214 f"Failed to convert value to timedelta in `seconds`. " 

1215 f"{err}. " 

1216 f'Please check "{key}" key in "{section}" section. Current value: "{val}".' 

1217 ) 

1218 

1219 return fallback 

1220 

1221 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: 

1222 """Get mandatory config value, raising ValueError if not found.""" 

1223 value = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1224 if value is None: 

1225 raise ValueError(f"The value {section}/{key} should be set!") 

1226 return value 

1227 

1228 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]: 

1229 """Get mandatory config value as list, raising ValueError if not found.""" 

1230 value = self.getlist(section, key, **kwargs) 

1231 if value is None: 

1232 raise ValueError(f"The value {section}/{key} should be set!") 

1233 return value 

1234 

1235 def read( 

1236 self, 

1237 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike], 

1238 encoding: str | None = None, 

1239 ) -> list[str]: 

1240 return super().read(filenames=filenames, encoding=encoding) 

1241 

1242 def read_dict( # type: ignore[override] 

1243 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>" 

1244 ) -> None: 

1245 """ 

1246 We define a different signature here to add better type hints and checking. 

1247 

1248 :param dictionary: dictionary to read from 

1249 :param source: source to be used to store the configuration 

1250 :return: 

1251 """ 

1252 super().read_dict(dictionary=dictionary, source=source) 

1253 

1254 def get_sections_including_defaults(self) -> list[str]: 

1255 """ 

1256 Retrieve all sections from the configuration parser, including sections defined by built-in defaults. 

1257 

1258 :return: list of section names 

1259 """ 

1260 sections_from_config = self.sections() 

1261 sections_from_description = list(self.configuration_description.keys()) 

1262 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config))) 

1263 

1264 def get_options_including_defaults(self, section: str) -> list[str]: 

1265 """ 

1266 Retrieve all possible options from the configuration parser for the section given. 

1267 

1268 Includes options defined by built-in defaults. 

1269 

1270 :param section: section name 

1271 :return: list of option names for the section given 

1272 """ 

1273 my_own_options = self.options(section) if self.has_section(section) else [] 

1274 all_options_from_defaults = list( 

1275 self.configuration_description.get(section, {}).get("options", {}).keys() 

1276 ) 

1277 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options))) 

1278 

1279 def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True) -> bool: 

1280 """ 

1281 Check if option is defined. 

1282 

1283 Uses self.get() to avoid reimplementing the priority order of config variables 

1284 (env, config, cmd, defaults). 

1285 

1286 :param section: section to get option from 

1287 :param option: option to get 

1288 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections 

1289 :return: 

1290 """ 

1291 try: 

1292 value = self.get( 

1293 section, 

1294 option, 

1295 fallback=None, 

1296 _extra_stacklevel=1, 

1297 suppress_warnings=True, 

1298 lookup_from_deprecated=lookup_from_deprecated, 

1299 ) 

1300 if value is None: 

1301 return False 

1302 return True 

1303 except (NoOptionError, NoSectionError, AirflowConfigException): 

1304 return False 

1305 

1306 def set(self, section: str, option: str, value: str | None = None) -> None: 

1307 """ 

1308 Set an option to the given value. 

1309 

1310 This override just makes sure the section and option are lower case, to match what we do in `get`. 

1311 """ 

1312 section = section.lower() 

1313 option = option.lower() 

1314 defaults = self.configuration_description or {} 

1315 if not self.has_section(section) and section in defaults: 

1316 # Trying to set a key in a section that exists in default, but not in the user config; 

1317 # automatically create it 

1318 self.add_section(section) 

1319 super().set(section, option, value) 

1320 

1321 def remove_option(self, section: str, option: str, remove_default: bool = True): 

1322 """ 

1323 Remove an option if it exists in config from a file or default config. 

1324 

1325 If both of config have the same option, this removes the option 

1326 in both configs unless remove_default=False. 

1327 """ 

1328 section = section.lower() 

1329 option = option.lower() 

1330 if super().has_option(section, option): 

1331 super().remove_option(section, option) 

1332 

1333 if self.get_default_value(section, option) is not None and remove_default: 

1334 self._default_values.remove_option(section, option) 

1335 

1336 def optionxform(self, optionstr: str) -> str: 

1337 """ 

1338 Transform option names on every read, get, or set operation. 

1339 

1340 This changes from the default behaviour of ConfigParser from lower-casing 

1341 to instead be case-preserving. 

1342 

1343 :param optionstr: 

1344 :return: 

1345 """ 

1346 return optionstr 

1347 

1348 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: 

1349 """ 

1350 Get list of config sources to use in as_dict(). 

1351 

1352 Subclasses can override to add additional sources (e.g., provider configs). 

1353 """ 

1354 return [ 

1355 ("default", self._default_values), 

1356 ("airflow.cfg", self), 

1357 ] 

1358 

1359 def as_dict( 

1360 self, 

1361 display_source: bool = False, 

1362 display_sensitive: bool = False, 

1363 raw: bool = False, 

1364 include_env: bool = True, 

1365 include_cmds: bool = True, 

1366 include_secret: bool = True, 

1367 ) -> ConfigSourcesType: 

1368 """ 

1369 Return the current configuration as an OrderedDict of OrderedDicts. 

1370 

1371 When materializing current configuration Airflow defaults are 

1372 materialized along with user set configs. If any of the `include_*` 

1373 options are False then the result of calling command or secret key 

1374 configs do not override Airflow defaults and instead are passed through. 

1375 In order to then avoid Airflow defaults from overwriting user set 

1376 command or secret key configs we filter out bare sensitive_config_values 

1377 that are set to Airflow defaults when command or secret key configs 

1378 produce different values. 

1379 

1380 :param display_source: If False, the option value is returned. If True, 

1381 a tuple of (option_value, source) is returned. Source is either 

1382 'airflow.cfg', 'default', 'env var', or 'cmd'. 

1383 :param display_sensitive: If True, the values of options set by env 

1384 vars and bash commands will be displayed. If False, those options 

1385 are shown as '< hidden >' 

1386 :param raw: Should the values be output as interpolated values, or the 

1387 "raw" form that can be fed back in to ConfigParser 

1388 :param include_env: Should the value of configuration from AIRFLOW__ 

1389 environment variables be included or not 

1390 :param include_cmds: Should the result of calling any ``*_cmd`` config be 

1391 set (True, default), or should the _cmd options be left as the 

1392 command to run (False) 

1393 :param include_secret: Should the result of calling any ``*_secret`` config be 

1394 set (True, default), or should the _secret options be left as the 

1395 path to get the secret from (False) 

1396 :return: Dictionary, where the key is the name of the section and the content is 

1397 the dictionary with the name of the parameter and its value. 

1398 """ 

1399 if not display_sensitive: 

1400 # We want to hide the sensitive values at the appropriate methods 

1401 # since envs from cmds, secrets can be read at _include_envs method 

1402 if not all([include_env, include_cmds, include_secret]): 

1403 raise ValueError( 

1404 "If display_sensitive is false, then include_env, " 

1405 "include_cmds, include_secret must all be set as True" 

1406 ) 

1407 

1408 config_sources: ConfigSourcesType = {} 

1409 

1410 # We check sequentially all those sources and the last one we saw it in will "win" 

1411 configs = self._get_config_sources_for_as_dict() 

1412 

1413 self._replace_config_with_display_sources( 

1414 config_sources, 

1415 configs, 

1416 self.configuration_description, 

1417 display_source, 

1418 raw, 

1419 self.deprecated_options, 

1420 include_cmds=include_cmds, 

1421 include_env=include_env, 

1422 include_secret=include_secret, 

1423 ) 

1424 

1425 # add env vars and overwrite because they have priority 

1426 if include_env: 

1427 self._include_envs(config_sources, display_sensitive, display_source, raw) 

1428 else: 

1429 self._filter_by_source(config_sources, display_source, self._get_env_var_option) 

1430 

1431 # add bash commands 

1432 if include_cmds: 

1433 self._include_commands(config_sources, display_sensitive, display_source, raw) 

1434 else: 

1435 self._filter_by_source(config_sources, display_source, self._get_cmd_option) 

1436 

1437 # add config from secret backends 

1438 if include_secret: 

1439 self._include_secrets(config_sources, display_sensitive, display_source, raw) 

1440 else: 

1441 self._filter_by_source(config_sources, display_source, self._get_secret_option) 

1442 

1443 if not display_sensitive: 

1444 # This ensures the ones from config file is hidden too 

1445 # if they are not provided through env, cmd and secret 

1446 hidden = "< hidden >" 

1447 for section, key in self.sensitive_config_values: 

1448 if config_sources.get(section): 

1449 if config_sources[section].get(key, None): 

1450 if display_source: 

1451 source = config_sources[section][key][1] 

1452 config_sources[section][key] = (hidden, source) 

1453 else: 

1454 config_sources[section][key] = hidden 

1455 

1456 return config_sources 

1457 

1458 def _write_option_header( 

1459 self, 

1460 file: IO[str], 

1461 option: str, 

1462 extra_spacing: bool, 

1463 include_descriptions: bool, 

1464 include_env_vars: bool, 

1465 include_examples: bool, 

1466 include_sources: bool, 

1467 section_config_description: dict[str, dict[str, Any]], 

1468 section_to_write: str, 

1469 sources_dict: ConfigSourcesType, 

1470 ) -> tuple[bool, bool]: 

1471 """ 

1472 Write header for configuration option. 

1473 

1474 Returns tuple of (should_continue, needs_separation) where needs_separation should be 

1475 set if the option needs additional separation to visually separate it from the next option. 

1476 """ 

1477 option_config_description = ( 

1478 section_config_description.get("options", {}).get(option, {}) 

1479 if section_config_description 

1480 else {} 

1481 ) 

1482 description = option_config_description.get("description") 

1483 needs_separation = False 

1484 if description and include_descriptions: 

1485 for line in description.splitlines(): 

1486 file.write(f"# {line}\n") 

1487 needs_separation = True 

1488 example = option_config_description.get("example") 

1489 if example is not None and include_examples: 

1490 if extra_spacing: 

1491 file.write("#\n") 

1492 example_lines = example.splitlines() 

1493 example = "\n# ".join(example_lines) 

1494 file.write(f"# Example: {option} = {example}\n") 

1495 needs_separation = True 

1496 if include_sources and sources_dict: 

1497 sources_section = sources_dict.get(section_to_write) 

1498 value_with_source = sources_section.get(option) if sources_section else None 

1499 if value_with_source is None: 

1500 file.write("#\n# Source: not defined\n") 

1501 else: 

1502 file.write(f"#\n# Source: {value_with_source[1]}\n") 

1503 needs_separation = True 

1504 if include_env_vars: 

1505 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n") 

1506 if extra_spacing: 

1507 file.write("#\n") 

1508 needs_separation = True 

1509 return True, needs_separation 

1510 

1511 def is_template(self, section: str, key) -> bool: 

1512 """ 

1513 Return whether the value is templated. 

1514 

1515 :param section: section of the config 

1516 :param key: key in the section 

1517 :return: True if the value is templated 

1518 """ 

1519 return _is_template(self.configuration_description, section, key) 

1520 

1521 def getsection(self, section: str) -> ConfigOptionsDictType | None: 

1522 """ 

1523 Return the section as a dict. 

1524 

1525 Values are converted to int, float, bool as required. 

1526 

1527 :param section: section from the config 

1528 """ 

1529 if not self.has_section(section) and not self._default_values.has_section(section): 

1530 return None 

1531 if self._default_values.has_section(section): 

1532 _section: ConfigOptionsDictType = dict(self._default_values.items(section)) 

1533 else: 

1534 _section = {} 

1535 

1536 if self.has_section(section): 

1537 _section.update(self.items(section)) 

1538 

1539 section_prefix = self._env_var_name(section, "") 

1540 for env_var in sorted(os.environ.keys()): 

1541 if env_var.startswith(section_prefix): 

1542 key = env_var.replace(section_prefix, "") 

1543 if key.endswith("_CMD"): 

1544 key = key[:-4] 

1545 key = key.lower() 

1546 _section[key] = self._get_env_var_option(section, key) 

1547 

1548 for key, val in _section.items(): 

1549 if val is None: 

1550 raise AirflowConfigException( 

1551 f"Failed to convert value automatically. " 

1552 f'Please check "{key}" key in "{section}" section is set.' 

1553 ) 

1554 try: 

1555 _section[key] = int(val) 

1556 except ValueError: 

1557 try: 

1558 _section[key] = float(val) 

1559 except ValueError: 

1560 if isinstance(val, str) and val.lower() in ("t", "true"): 

1561 _section[key] = True 

1562 elif isinstance(val, str) and val.lower() in ("f", "false"): 

1563 _section[key] = False 

1564 return _section 

1565 

1566 @staticmethod 

1567 def _write_section_header( 

1568 file: IO[str], 

1569 include_descriptions: bool, 

1570 section_config_description: dict[str, str], 

1571 section_to_write: str, 

1572 ) -> None: 

1573 """Write header for configuration section.""" 

1574 file.write(f"[{section_to_write}]\n") 

1575 section_description = section_config_description.get("description") 

1576 if section_description and include_descriptions: 

1577 for line in section_description.splitlines(): 

1578 file.write(f"# {line}\n") 

1579 file.write("\n") 

1580 

1581 def _write_value( 

1582 self, 

1583 file: IO[str], 

1584 option: str, 

1585 comment_out_everything: bool, 

1586 needs_separation: bool, 

1587 only_defaults: bool, 

1588 section_to_write: str, 

1589 ): 

1590 default_value = self.get_default_value(section_to_write, option, raw=True) 

1591 if only_defaults: 

1592 value = default_value 

1593 else: 

1594 value = self.get(section_to_write, option, fallback=default_value, raw=True) 

1595 if value is None: 

1596 file.write(f"# {option} = \n") 

1597 else: 

1598 if comment_out_everything: 

1599 value_lines = value.splitlines() 

1600 value = "\n# ".join(value_lines) 

1601 file.write(f"# {option} = {value}\n") 

1602 else: 

1603 if "\n" in value: 

1604 try: 

1605 value = json.dumps(json.loads(value), indent=4) 

1606 value = value.replace( 

1607 "\n", "\n " 

1608 ) # indent multi-line JSON to satisfy configparser format 

1609 except JSONDecodeError: 

1610 pass 

1611 file.write(f"{option} = {value}\n") 

1612 if needs_separation: 

1613 file.write("\n") 

1614 

1615 def write( # type: ignore[override] 

1616 self, 

1617 file: IO[str], 

1618 section: str | None = None, 

1619 include_examples: bool = True, 

1620 include_descriptions: bool = True, 

1621 include_sources: bool = True, 

1622 include_env_vars: bool = True, 

1623 include_providers: bool = True, 

1624 comment_out_everything: bool = False, 

1625 hide_sensitive_values: bool = False, 

1626 extra_spacing: bool = True, 

1627 only_defaults: bool = False, 

1628 **kwargs: Any, 

1629 ) -> None: 

1630 """ 

1631 Write configuration with comments and examples to a file. 

1632 

1633 :param file: file to write to 

1634 :param section: section of the config to write, defaults to all sections 

1635 :param include_examples: Include examples in the output 

1636 :param include_descriptions: Include descriptions in the output 

1637 :param include_sources: Include the source of each config option 

1638 :param include_env_vars: Include environment variables corresponding to each config option 

1639 :param include_providers: Include providers configuration 

1640 :param comment_out_everything: Comment out all values 

1641 :param hide_sensitive_values: Include sensitive values in the output 

1642 :param extra_spacing: Add extra spacing before examples and after variables 

1643 :param only_defaults: Only include default values when writing the config, not the actual values 

1644 """ 

1645 sources_dict = {} 

1646 if include_sources: 

1647 sources_dict = self.as_dict(display_source=True) 

1648 with self.make_sure_configuration_loaded(with_providers=include_providers): 

1649 for section_to_write in self.get_sections_including_defaults(): 

1650 section_config_description = self.configuration_description.get(section_to_write, {}) 

1651 if section_to_write != section and section is not None: 

1652 continue 

1653 if self._default_values.has_section(section_to_write) or self.has_section(section_to_write): 

1654 self._write_section_header( 

1655 file, include_descriptions, section_config_description, section_to_write 

1656 ) 

1657 for option in self.get_options_including_defaults(section_to_write): 

1658 should_continue, needs_separation = self._write_option_header( 

1659 file=file, 

1660 option=option, 

1661 extra_spacing=extra_spacing, 

1662 include_descriptions=include_descriptions, 

1663 include_env_vars=include_env_vars, 

1664 include_examples=include_examples, 

1665 include_sources=include_sources, 

1666 section_config_description=section_config_description, 

1667 section_to_write=section_to_write, 

1668 sources_dict=sources_dict, 

1669 ) 

1670 self._write_value( 

1671 file=file, 

1672 option=option, 

1673 comment_out_everything=comment_out_everything, 

1674 needs_separation=needs_separation, 

1675 only_defaults=only_defaults, 

1676 section_to_write=section_to_write, 

1677 ) 

1678 if include_descriptions and not needs_separation: 

1679 # extra separation between sections in case last option did not need it 

1680 file.write("\n") 

1681 

1682 @contextmanager 

1683 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]: 

1684 """ 

1685 Make sure configuration is loaded with or without providers. 

1686 

1687 This happens regardless if the provider configuration has been loaded before or not. 

1688 Restores configuration to the state before entering the context. 

1689 

1690 :param with_providers: whether providers should be loaded 

1691 """ 

1692 needs_reload = False 

1693 if with_providers: 

1694 self._ensure_providers_config_loaded() 

1695 else: 

1696 needs_reload = self._ensure_providers_config_unloaded() 

1697 yield 

1698 if needs_reload: 

1699 self._reload_provider_configs() 

1700 

1701 def _ensure_providers_config_loaded(self) -> None: 

1702 """Ensure providers configurations are loaded.""" 

1703 raise NotImplementedError("Subclasses must implement _ensure_providers_config_loaded method") 

1704 

1705 def _ensure_providers_config_unloaded(self) -> bool: 

1706 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded.""" 

1707 raise NotImplementedError("Subclasses must implement _ensure_providers_config_unloaded method") 

1708 

1709 def _reload_provider_configs(self) -> None: 

1710 """Reload providers configuration.""" 

1711 raise NotImplementedError("Subclasses must implement _reload_provider_configs method")