Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/_shared/configuration/parser.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

740 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Base configuration parser with pure parsing logic.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import datetime 

24import functools 

25import itertools 

26import json 

27import logging 

28import os 

29import shlex 

30import subprocess 

31import sys 

32import warnings 

33from collections.abc import Callable, Generator, Iterable 

34from configparser import ConfigParser, NoOptionError, NoSectionError 

35from contextlib import contextmanager 

36from enum import Enum 

37from json.decoder import JSONDecodeError 

38from re import Pattern 

39from typing import IO, Any, TypeVar, overload 

40 

41from .exceptions import AirflowConfigException 

42 

43log = logging.getLogger(__name__) 

44 

45 

46ConfigType = str | int | float | bool 

47ConfigOptionsDictType = dict[str, ConfigType] 

48ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

49ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

50ENV_VAR_PREFIX = "AIRFLOW__" 

51 

52 

53class ValueNotFound: 

54 """Object of this is raised when a configuration value cannot be found.""" 

55 

56 pass 

57 

58 

59VALUE_NOT_FOUND_SENTINEL = ValueNotFound() 

60 

61 

62@overload 

63def expand_env_var(env_var: None) -> None: ... 

64@overload 

65def expand_env_var(env_var: str) -> str: ... 

66 

67 

68def expand_env_var(env_var: str | None) -> str | None: 

69 """ 

70 Expand (potentially nested) env vars. 

71 

72 Repeat and apply `expandvars` and `expanduser` until 

73 interpolation stops having any effect. 

74 """ 

75 if not env_var or not isinstance(env_var, str): 

76 return env_var 

77 while True: 

78 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

79 if interpolated == env_var: 

80 return interpolated 

81 env_var = interpolated 

82 

83 

84def run_command(command: str) -> str: 

85 """Run command and returns stdout.""" 

86 process = subprocess.Popen( 

87 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

88 ) 

89 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

90 

91 if process.returncode != 0: 

92 raise AirflowConfigException( 

93 f"Cannot execute {command}. Error code is: {process.returncode}. " 

94 f"Output: {output}, Stderr: {stderr}" 

95 ) 

96 

97 return output 

98 

99 

100def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool: 

101 """ 

102 Check if the config is a template. 

103 

104 :param configuration_description: description of configuration 

105 :param section: section 

106 :param key: key 

107 :return: True if the config is a template 

108 """ 

109 return configuration_description.get(section, {}).get(key, {}).get("is_template", False) 

110 

111 

112class AirflowConfigParser(ConfigParser): 

113 """ 

114 Base configuration parser with pure parsing logic. 

115 

116 This class provides the core parsing methods that work with: 

117 - configuration_description: dict describing config options (required in __init__) 

118 - _default_values: ConfigParser with default values (required in __init__) 

119 - deprecated_options: class attribute mapping new -> old options 

120 - deprecated_sections: class attribute mapping new -> old sections 

121 """ 

122 

123 # A mapping of section -> setting -> { old, replace } for deprecated default values. 

124 # Subclasses can override this to define deprecated values that should be upgraded. 

125 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {} 

126 

127 # A mapping of (new section, new option) -> (old section, old option, since_version). 

128 # When reading new option, the old option will be checked to see if it exists. If it does a 

129 # DeprecationWarning will be issued and the old option will be used instead 

130 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { 

131 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"), 

132 ("api", "host"): ("webserver", "web_server_host", "3.0"), 

133 ("api", "port"): ("webserver", "web_server_port", "3.0"), 

134 ("api", "workers"): ("webserver", "workers", "3.0"), 

135 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"), 

136 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"), 

137 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"), 

138 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"), 

139 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"), 

140 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"), 

141 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"), 

142 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"), 

143 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"), 

144 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"), 

145 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"), 

146 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"), 

147 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"), 

148 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"), 

149 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"), 

150 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"), 

151 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"), 

152 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"), 

153 ("api", "page_size"): ("webserver", "page_size", "3.1.0"), 

154 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"), 

155 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"), 

156 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"), 

157 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"), 

158 ("api", "log_config"): ("api", "access_logfile", "3.1.0"), 

159 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"), 

160 } 

161 

162 # A mapping of new section -> (old section, since_version). 

163 deprecated_sections: dict[str, tuple[str, str]] = {} 

164 

165 @property 

166 def _lookup_sequence(self) -> list[Callable]: 

167 """ 

168 Define the sequence of lookup methods for get(). The definition here does not have provider lookup. 

169 

170 Subclasses can override this to customise lookup order. 

171 """ 

172 return [ 

173 self._get_environment_variables, 

174 self._get_option_from_config_file, 

175 self._get_option_from_commands, 

176 self._get_option_from_secrets, 

177 self._get_option_from_defaults, 

178 ] 

179 

180 @property 

181 def _validators(self) -> list[Callable[[], None]]: 

182 """ 

183 Return list of validators defined on a config parser class. Base class will return an empty list. 

184 

185 Subclasses can override this to customize the validators that are run during validation on the 

186 config parser instance. 

187 """ 

188 return [] 

189 

190 def validate(self) -> None: 

191 """Run all registered validators.""" 

192 for validator in self._validators: 

193 validator() 

194 self.is_validated = True 

195 

196 def _validate_deprecated_values(self) -> None: 

197 """Validate and upgrade deprecated default values.""" 

198 for section, replacement in self.deprecated_values.items(): 

199 for name, info in replacement.items(): 

200 old, new = info 

201 current_value = self.get(section, name, fallback="") 

202 if self._using_old_value(old, current_value): 

203 self.upgraded_values[(section, name)] = current_value 

204 new_value = old.sub(new, current_value) 

205 self._update_env_var(section=section, name=name, new_value=new_value) 

206 self._create_future_warning( 

207 name=name, 

208 section=section, 

209 current_value=current_value, 

210 new_value=new_value, 

211 ) 

212 

213 def _using_old_value(self, old: Pattern, current_value: str) -> bool: 

214 """Check if current_value matches the old pattern.""" 

215 return old.search(current_value) is not None 

216 

217 def _update_env_var(self, section: str, name: str, new_value: str) -> None: 

218 """Update environment variable with new value.""" 

219 env_var = self._env_var_name(section, name) 

220 # Set it as an env var so that any subprocesses keep the same override! 

221 os.environ[env_var] = new_value 

222 

223 @staticmethod 

224 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None: 

225 """Create a FutureWarning for deprecated default values.""" 

226 warnings.warn( 

227 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. " 

228 f"This value has been changed to {new_value!r} in the running config, but please update your config.", 

229 FutureWarning, 

230 stacklevel=3, 

231 ) 

232 

233 def __init__( 

234 self, 

235 configuration_description: dict[str, dict[str, Any]], 

236 _default_values: ConfigParser, 

237 *args, 

238 **kwargs, 

239 ): 

240 """ 

241 Initialize the parser. 

242 

243 :param configuration_description: Description of configuration options 

244 :param _default_values: ConfigParser with default values 

245 """ 

246 super().__init__(*args, **kwargs) 

247 self.configuration_description = configuration_description 

248 self._default_values = _default_values 

249 self._suppress_future_warnings = False 

250 self.upgraded_values = {} 

251 

252 @functools.cached_property 

253 def inversed_deprecated_options(self): 

254 """Build inverse mapping from old options to new options.""" 

255 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()} 

256 

257 @functools.cached_property 

258 def inversed_deprecated_sections(self): 

259 """Build inverse mapping from old sections to new sections.""" 

260 return { 

261 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items() 

262 } 

263 

264 @functools.cached_property 

265 def sensitive_config_values(self) -> set[tuple[str, str]]: 

266 """Get set of sensitive config values that should be masked.""" 

267 flattened = { 

268 (s, k): item 

269 for s, s_c in self.configuration_description.items() 

270 for k, item in s_c.get("options", {}).items() 

271 } 

272 sensitive = { 

273 (section.lower(), key.lower()) 

274 for (section, key), v in flattened.items() 

275 if v.get("sensitive") is True 

276 } 

277 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options} 

278 depr_section = { 

279 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections 

280 } 

281 sensitive.update(depr_section, depr_option) 

282 return sensitive 

283 

284 @overload # type: ignore[override] 

285 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... 

286 

287 @overload 

288 def get(self, section: str, key: str, **kwargs) -> str | None: ... 

289 

290 def _update_defaults_from_string(self, config_string: str) -> None: 

291 """ 

292 Update the defaults in _default_values based on values in config_string ("ini" format). 

293 

294 Override shared parser's method to add validation for template variables. 

295 Note that those values are not validated and cannot contain variables because we are using 

296 regular config parser to load them. This method is used to test the config parser in unit tests. 

297 

298 :param config_string: ini-formatted config string 

299 """ 

300 parser = ConfigParser() 

301 parser.read_string(config_string) 

302 for section in parser.sections(): 

303 if section not in self._default_values.sections(): 

304 self._default_values.add_section(section) 

305 errors = False 

306 for key, value in parser.items(section): 

307 if not self.is_template(section, key) and "{" in value: 

308 errors = True 

309 log.error( 

310 "The %s.%s value %s read from string contains variable. This is not supported", 

311 section, 

312 key, 

313 value, 

314 ) 

315 self._default_values.set(section, key, value) 

316 if errors: 

317 raise AirflowConfigException( 

318 f"The string config passed as default contains variables. " 

319 f"This is not supported. String config: {config_string}" 

320 ) 

321 

322 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any: 

323 """ 

324 Retrieve default value from default config parser. 

325 

326 This will retrieve the default value from the default config parser. Optionally a raw, stored 

327 value can be retrieved by setting skip_interpolation to True. This is useful for example when 

328 we want to write the default value to a file, and we don't want the interpolation to happen 

329 as it is going to be done later when the config is read. 

330 

331 :param section: section of the config 

332 :param key: key to use 

333 :param fallback: fallback value to use 

334 :param raw: if raw, then interpolation will be reversed 

335 :param kwargs: other args 

336 :return: 

337 """ 

338 value = self._default_values.get(section, key, fallback=fallback, **kwargs) 

339 if raw and value is not None: 

340 return value.replace("%", "%%") 

341 return value 

342 

343 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None: 

344 """ 

345 Get Secret Backend if defined in airflow.cfg. 

346 

347 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

348 """ 

349 section = "workers" if worker_mode else "secrets" 

350 key = "secrets_backend" if worker_mode else "backend" 

351 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs" 

352 

353 secrets_backend_cls = self.getimport(section=section, key=key) 

354 

355 if not secrets_backend_cls: 

356 if worker_mode: 

357 # if we find no secrets backend for worker, return that of secrets backend 

358 secrets_backend_cls = self.getimport(section="secrets", key="backend") 

359 if not secrets_backend_cls: 

360 return None 

361 # When falling back to secrets backend, use its kwargs 

362 kwargs_key = "backend_kwargs" 

363 section = "secrets" 

364 else: 

365 return None 

366 

367 try: 

368 backend_kwargs = self.getjson(section=section, key=kwargs_key) 

369 if not backend_kwargs: 

370 backend_kwargs = {} 

371 elif not isinstance(backend_kwargs, dict): 

372 raise ValueError("not a dict") 

373 except AirflowConfigException: 

374 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key) 

375 backend_kwargs = {} 

376 except ValueError: 

377 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key) 

378 backend_kwargs = {} 

379 

380 return secrets_backend_cls(**backend_kwargs) 

381 

382 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

383 """ 

384 Get Config option values from Secret Backend. 

385 

386 Called by the shared parser's _get_secret_option() method as part of the lookup chain. 

387 Uses _get_custom_secret_backend() to get the backend instance. 

388 

389 :param config_key: the config key to retrieve 

390 :return: config value or None 

391 """ 

392 try: 

393 secrets_client = self._get_custom_secret_backend() 

394 if not secrets_client: 

395 return None 

396 return secrets_client.get_config(config_key) 

397 except Exception as e: 

398 raise AirflowConfigException( 

399 "Cannot retrieve config from alternative secrets backend. " 

400 "Make sure it is configured properly and that the Backend " 

401 "is accessible.\n" 

402 f"{e}" 

403 ) 

404 

405 def _get_cmd_option_from_config_sources( 

406 self, config_sources: ConfigSourcesType, section: str, key: str 

407 ) -> str | None: 

408 fallback_key = key + "_cmd" 

409 if (section, key) in self.sensitive_config_values: 

410 section_dict = config_sources.get(section) 

411 if section_dict is not None: 

412 command_value = section_dict.get(fallback_key) 

413 if command_value is not None: 

414 if isinstance(command_value, str): 

415 command = command_value 

416 else: 

417 command = command_value[0] 

418 return run_command(command) 

419 return None 

420 

421 def _get_secret_option_from_config_sources( 

422 self, config_sources: ConfigSourcesType, section: str, key: str 

423 ) -> str | None: 

424 fallback_key = key + "_secret" 

425 if (section, key) in self.sensitive_config_values: 

426 section_dict = config_sources.get(section) 

427 if section_dict is not None: 

428 secrets_path_value = section_dict.get(fallback_key) 

429 if secrets_path_value is not None: 

430 if isinstance(secrets_path_value, str): 

431 secrets_path = secrets_path_value 

432 else: 

433 secrets_path = secrets_path_value[0] 

434 return self._get_config_value_from_secret_backend(secrets_path) 

435 return None 

436 

437 def _include_secrets( 

438 self, 

439 config_sources: ConfigSourcesType, 

440 display_sensitive: bool, 

441 display_source: bool, 

442 raw: bool, 

443 ): 

444 for section, key in self.sensitive_config_values: 

445 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key) 

446 if value: 

447 if not display_sensitive: 

448 value = "< hidden >" 

449 if display_source: 

450 opt: str | tuple[str, str] = (value, "secret") 

451 elif raw: 

452 opt = value.replace("%", "%%") 

453 else: 

454 opt = value 

455 config_sources.setdefault(section, {}).update({key: opt}) 

456 del config_sources[section][key + "_secret"] 

457 

458 def _include_commands( 

459 self, 

460 config_sources: ConfigSourcesType, 

461 display_sensitive: bool, 

462 display_source: bool, 

463 raw: bool, 

464 ): 

465 for section, key in self.sensitive_config_values: 

466 opt = self._get_cmd_option_from_config_sources(config_sources, section, key) 

467 if not opt: 

468 continue 

469 opt_to_set: str | tuple[str, str] | None = opt 

470 if not display_sensitive: 

471 opt_to_set = "< hidden >" 

472 if display_source: 

473 opt_to_set = (str(opt_to_set), "cmd") 

474 elif raw: 

475 opt_to_set = str(opt_to_set).replace("%", "%%") 

476 if opt_to_set is not None: 

477 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set} 

478 config_sources.setdefault(section, {}).update(dict_to_update) 

479 del config_sources[section][key + "_cmd"] 

480 

481 def _include_envs( 

482 self, 

483 config_sources: ConfigSourcesType, 

484 display_sensitive: bool, 

485 display_source: bool, 

486 raw: bool, 

487 ): 

488 for env_var in [ 

489 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX) 

490 ]: 

491 try: 

492 _, section, key = env_var.split("__", 2) 

493 opt = self._get_env_var_option(section, key) 

494 except ValueError: 

495 continue 

496 if opt is None: 

497 log.warning("Ignoring unknown env var '%s'", env_var) 

498 continue 

499 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"): 

500 # Don't hide cmd/secret values here 

501 if not env_var.lower().endswith(("cmd", "secret")): 

502 if (section, key) in self.sensitive_config_values: 

503 opt = "< hidden >" 

504 elif raw: 

505 opt = opt.replace("%", "%%") 

506 if display_source: 

507 opt = (opt, "env var") 

508 

509 section = section.lower() 

510 key = key.lower() 

511 config_sources.setdefault(section, {}).update({key: opt}) 

512 

513 def _filter_by_source( 

514 self, 

515 config_sources: ConfigSourcesType, 

516 display_source: bool, 

517 getter_func, 

518 ): 

519 """ 

520 Delete default configs from current configuration. 

521 

522 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values. 

523 

524 This is necessary because bare configs take precedence over the command 

525 or secret key equivalents so if the current running config is 

526 materialized with Airflow defaults they in turn override user set 

527 command or secret key configs. 

528 

529 :param config_sources: The current configuration to operate on 

530 :param display_source: If False, configuration options contain raw 

531 values. If True, options are a tuple of (option_value, source). 

532 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'. 

533 :param getter_func: A callback function that gets the user configured 

534 override value for a particular sensitive_config_values config. 

535 :return: None, the given config_sources is filtered if necessary, 

536 otherwise untouched. 

537 """ 

538 for section, key in self.sensitive_config_values: 

539 # Don't bother if we don't have section / key 

540 if section not in config_sources or key not in config_sources[section]: 

541 continue 

542 # Check that there is something to override defaults 

543 try: 

544 getter_opt = getter_func(section, key) 

545 except ValueError: 

546 continue 

547 if not getter_opt: 

548 continue 

549 # Check to see that there is a default value 

550 if self.get_default_value(section, key) is None: 

551 continue 

552 # Check to see if bare setting is the same as defaults 

553 if display_source: 

554 # when display_source = true, we know that the config_sources contains tuple 

555 opt, source = config_sources[section][key] # type: ignore 

556 else: 

557 opt = config_sources[section][key] 

558 if opt == self.get_default_value(section, key): 

559 del config_sources[section][key] 

560 

561 @staticmethod 

562 def _deprecated_value_is_set_in_config( 

563 deprecated_section: str, 

564 deprecated_key: str, 

565 configs: Iterable[tuple[str, ConfigParser]], 

566 ) -> bool: 

567 for config_type, config in configs: 

568 if config_type != "default": 

569 with contextlib.suppress(NoSectionError): 

570 deprecated_section_array = config.items(section=deprecated_section, raw=True) 

571 if any(key == deprecated_key for key, _ in deprecated_section_array): 

572 return True 

573 return False 

574 

575 @staticmethod 

576 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

577 return ( 

578 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}") 

579 is not None 

580 ) 

581 

582 @staticmethod 

583 def _deprecated_command_is_set_in_config( 

584 deprecated_section: str, 

585 deprecated_key: str, 

586 configs: Iterable[tuple[str, ConfigParser]], 

587 ) -> bool: 

588 return AirflowConfigParser._deprecated_value_is_set_in_config( 

589 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs 

590 ) 

591 

592 @staticmethod 

593 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

594 return ( 

595 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD") 

596 is not None 

597 ) 

598 

599 @staticmethod 

600 def _deprecated_secret_is_set_in_config( 

601 deprecated_section: str, 

602 deprecated_key: str, 

603 configs: Iterable[tuple[str, ConfigParser]], 

604 ) -> bool: 

605 return AirflowConfigParser._deprecated_value_is_set_in_config( 

606 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs 

607 ) 

608 

609 @staticmethod 

610 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

611 return ( 

612 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET") 

613 is not None 

614 ) 

615 

616 @staticmethod 

617 def _replace_config_with_display_sources( 

618 config_sources: ConfigSourcesType, 

619 configs: Iterable[tuple[str, ConfigParser]], 

620 configuration_description: dict[str, dict[str, Any]], 

621 display_source: bool, 

622 raw: bool, 

623 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

624 include_env: bool, 

625 include_cmds: bool, 

626 include_secret: bool, 

627 ): 

628 for source_name, config in configs: 

629 sections = config.sections() 

630 for section in sections: 

631 AirflowConfigParser._replace_section_config_with_display_sources( 

632 config, 

633 config_sources, 

634 configuration_description, 

635 display_source, 

636 raw, 

637 section, 

638 source_name, 

639 deprecated_options, 

640 configs, 

641 include_env=include_env, 

642 include_cmds=include_cmds, 

643 include_secret=include_secret, 

644 ) 

645 

646 @staticmethod 

647 def _replace_section_config_with_display_sources( 

648 config: ConfigParser, 

649 config_sources: ConfigSourcesType, 

650 configuration_description: dict[str, dict[str, Any]], 

651 display_source: bool, 

652 raw: bool, 

653 section: str, 

654 source_name: str, 

655 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

656 configs: Iterable[tuple[str, ConfigParser]], 

657 include_env: bool, 

658 include_cmds: bool, 

659 include_secret: bool, 

660 ): 

661 sect = config_sources.setdefault(section, {}) 

662 if isinstance(config, AirflowConfigParser): 

663 with config.suppress_future_warnings(): 

664 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw) 

665 else: 

666 items = config.items(section=section, raw=raw) 

667 for k, val in items: 

668 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None)) 

669 if deprecated_section and deprecated_key: 

670 if source_name == "default": 

671 # If deprecated entry has some non-default value set for any of the sources requested, 

672 # We should NOT set default for the new entry (because it will override anything 

673 # coming from the deprecated ones) 

674 if AirflowConfigParser._deprecated_value_is_set_in_config( 

675 deprecated_section, deprecated_key, configs 

676 ): 

677 continue 

678 if include_env and AirflowConfigParser._deprecated_variable_is_set( 

679 deprecated_section, deprecated_key 

680 ): 

681 continue 

682 if include_cmds and ( 

683 AirflowConfigParser._deprecated_variable_command_is_set( 

684 deprecated_section, deprecated_key 

685 ) 

686 or AirflowConfigParser._deprecated_command_is_set_in_config( 

687 deprecated_section, deprecated_key, configs 

688 ) 

689 ): 

690 continue 

691 if include_secret and ( 

692 AirflowConfigParser._deprecated_variable_secret_is_set( 

693 deprecated_section, deprecated_key 

694 ) 

695 or AirflowConfigParser._deprecated_secret_is_set_in_config( 

696 deprecated_section, deprecated_key, configs 

697 ) 

698 ): 

699 continue 

700 if display_source: 

701 updated_source_name = source_name 

702 if source_name == "default": 

703 # defaults can come from other sources (default-<PROVIDER>) that should be used here 

704 source_description_section = configuration_description.get(section, {}) 

705 source_description_key = source_description_section.get("options", {}).get(k, {}) 

706 if source_description_key is not None: 

707 updated_source_name = source_description_key.get("source", source_name) 

708 sect[k] = (val, updated_source_name) 

709 else: 

710 sect[k] = val 

711 

712 def _warn_deprecate( 

713 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int 

714 ): 

715 """Warn about deprecated config option usage.""" 

716 if section == deprecated_section: 

717 warnings.warn( 

718 f"The {deprecated_name} option in [{section}] has been renamed to {key} - " 

719 f"the old setting has been used, but please update your config.", 

720 DeprecationWarning, 

721 stacklevel=4 + extra_stacklevel, 

722 ) 

723 else: 

724 warnings.warn( 

725 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option " 

726 f"in [{section}] - the old setting has been used, but please update your config.", 

727 DeprecationWarning, 

728 stacklevel=4 + extra_stacklevel, 

729 ) 

730 

731 @contextmanager 

732 def suppress_future_warnings(self): 

733 """ 

734 Context manager to temporarily suppress future warnings. 

735 

736 This is a stub used by the shared parser's lookup methods when checking deprecated options. 

737 Subclasses can override this to customize warning suppression behavior. 

738 

739 :return: context manager that suppresses future warnings 

740 """ 

741 suppress_future_warnings = self._suppress_future_warnings 

742 self._suppress_future_warnings = True 

743 yield self 

744 self._suppress_future_warnings = suppress_future_warnings 

745 

746 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str: 

747 """Generate environment variable name for a config option.""" 

748 team_component: str = f"{team_name.upper()}___" if team_name else "" 

749 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}" 

750 

751 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None): 

752 """Get config option from environment variable.""" 

753 env_var: str = self._env_var_name(section, key, team_name=team_name) 

754 if env_var in os.environ: 

755 return expand_env_var(os.environ[env_var]) 

756 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command) 

757 env_var_cmd = env_var + "_CMD" 

758 if env_var_cmd in os.environ: 

759 # if this is a valid command key... 

760 if (section, key) in self.sensitive_config_values: 

761 return run_command(os.environ[env_var_cmd]) 

762 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend) 

763 env_var_secret_path = env_var + "_SECRET" 

764 if env_var_secret_path in os.environ: 

765 # if this is a valid secret path... 

766 if (section, key) in self.sensitive_config_values: 

767 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path]) 

768 return None 

769 

770 def _get_cmd_option(self, section: str, key: str): 

771 """Get config option from command execution.""" 

772 fallback_key = key + "_cmd" 

773 if (section, key) in self.sensitive_config_values: 

774 if super().has_option(section, fallback_key): 

775 command = super().get(section, fallback_key) 

776 try: 

777 cmd_output = run_command(command) 

778 except AirflowConfigException as e: 

779 raise e 

780 except Exception as e: 

781 raise AirflowConfigException( 

782 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd." 

783 f" Please check the {fallback_key} value." 

784 ) from e 

785 return cmd_output 

786 return None 

787 

788 def _get_secret_option(self, section: str, key: str) -> str | None: 

789 """Get Config option values from Secret Backend.""" 

790 fallback_key = key + "_secret" 

791 if (section, key) in self.sensitive_config_values: 

792 if super().has_option(section, fallback_key): 

793 secrets_path = super().get(section, fallback_key) 

794 return self._get_config_value_from_secret_backend(secrets_path) 

795 return None 

796 

797 def _get_environment_variables( 

798 self, 

799 deprecated_key: str | None, 

800 deprecated_section: str | None, 

801 key: str, 

802 section: str, 

803 issue_warning: bool = True, 

804 extra_stacklevel: int = 0, 

805 **kwargs, 

806 ) -> str | ValueNotFound: 

807 """Get config option from environment variables.""" 

808 team_name = kwargs.get("team_name", None) 

809 option = self._get_env_var_option(section, key, team_name=team_name) 

810 if option is not None: 

811 return option 

812 if deprecated_section and deprecated_key: 

813 with self.suppress_future_warnings(): 

814 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name) 

815 if option is not None: 

816 if issue_warning: 

817 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

818 return option 

819 return VALUE_NOT_FOUND_SENTINEL 

820 

821 def _get_option_from_config_file( 

822 self, 

823 deprecated_key: str | None, 

824 deprecated_section: str | None, 

825 key: str, 

826 section: str, 

827 issue_warning: bool = True, 

828 extra_stacklevel: int = 0, 

829 **kwargs, 

830 ) -> str | ValueNotFound: 

831 """Get config option from config file.""" 

832 if team_name := kwargs.get("team_name", None): 

833 section = f"{team_name}={section}" 

834 # since this is the last lookup that supports team_name, pop it 

835 kwargs.pop("team_name") 

836 if super().has_option(section, key): 

837 return expand_env_var(super().get(section, key, **kwargs)) 

838 if deprecated_section and deprecated_key: 

839 if super().has_option(deprecated_section, deprecated_key): 

840 if issue_warning: 

841 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

842 with self.suppress_future_warnings(): 

843 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs)) 

844 return VALUE_NOT_FOUND_SENTINEL 

845 

846 def _get_option_from_commands( 

847 self, 

848 deprecated_key: str | None, 

849 deprecated_section: str | None, 

850 key: str, 

851 section: str, 

852 issue_warning: bool = True, 

853 extra_stacklevel: int = 0, 

854 **kwargs, 

855 ) -> str | ValueNotFound: 

856 """Get config option from command execution.""" 

857 option = self._get_cmd_option(section, key) 

858 if option: 

859 return option 

860 if deprecated_section and deprecated_key: 

861 with self.suppress_future_warnings(): 

862 option = self._get_cmd_option(deprecated_section, deprecated_key) 

863 if option: 

864 if issue_warning: 

865 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

866 return option 

867 return VALUE_NOT_FOUND_SENTINEL 

868 

869 def _get_option_from_secrets( 

870 self, 

871 deprecated_key: str | None, 

872 deprecated_section: str | None, 

873 key: str, 

874 section: str, 

875 issue_warning: bool = True, 

876 extra_stacklevel: int = 0, 

877 **kwargs, 

878 ) -> str | ValueNotFound: 

879 """Get config option from secrets backend.""" 

880 option = self._get_secret_option(section, key) 

881 if option: 

882 return option 

883 if deprecated_section and deprecated_key: 

884 with self.suppress_future_warnings(): 

885 option = self._get_secret_option(deprecated_section, deprecated_key) 

886 if option: 

887 if issue_warning: 

888 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

889 return option 

890 return VALUE_NOT_FOUND_SENTINEL 

891 

892 def _get_option_from_defaults( 

893 self, 

894 deprecated_key: str | None, 

895 deprecated_section: str | None, 

896 key: str, 

897 section: str, 

898 issue_warning: bool = True, 

899 extra_stacklevel: int = 0, 

900 team_name: str | None = None, 

901 **kwargs, 

902 ) -> str | ValueNotFound: 

903 """Get config option from default values.""" 

904 if self.get_default_value(section, key) is not None or "fallback" in kwargs: 

905 return expand_env_var(self.get_default_value(section, key, **kwargs)) 

906 return VALUE_NOT_FOUND_SENTINEL 

907 

908 def _resolve_deprecated_lookup( 

909 self, 

910 section: str, 

911 key: str, 

912 lookup_from_deprecated: bool, 

913 extra_stacklevel: int = 0, 

914 ) -> tuple[str, str, str | None, str | None, bool]: 

915 """ 

916 Resolve deprecated section/key mappings and determine deprecated values. 

917 

918 :param section: Section name (will be lowercased) 

919 :param key: Key name (will be lowercased) 

920 :param lookup_from_deprecated: Whether to lookup from deprecated options 

921 :param extra_stacklevel: Extra stack level for warnings 

922 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted) 

923 """ 

924 section = section.lower() 

925 key = key.lower() 

926 warning_emitted = False 

927 deprecated_section: str | None = None 

928 deprecated_key: str | None = None 

929 

930 if not lookup_from_deprecated: 

931 return section, key, deprecated_section, deprecated_key, warning_emitted 

932 

933 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {}) 

934 if option_description.get("deprecated"): 

935 deprecation_reason = option_description.get("deprecation_reason", "") 

936 warnings.warn( 

937 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}", 

938 DeprecationWarning, 

939 stacklevel=2 + extra_stacklevel, 

940 ) 

941 # For the cases in which we rename whole sections 

942 if section in self.inversed_deprecated_sections: 

943 deprecated_section, deprecated_key = (section, key) 

944 section = self.inversed_deprecated_sections[section] 

945 if not self._suppress_future_warnings: 

946 warnings.warn( 

947 f"The config section [{deprecated_section}] has been renamed to " 

948 f"[{section}]. Please update your `conf.get*` call to use the new name", 

949 FutureWarning, 

950 stacklevel=2 + extra_stacklevel, 

951 ) 

952 # Don't warn about individual rename if the whole section is renamed 

953 warning_emitted = True 

954 elif (section, key) in self.inversed_deprecated_options: 

955 # Handle using deprecated section/key instead of the new section/key 

956 new_section, new_key = self.inversed_deprecated_options[(section, key)] 

957 if not self._suppress_future_warnings and not warning_emitted: 

958 warnings.warn( 

959 f"section/key [{section}/{key}] has been deprecated, you should use" 

960 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the " 

961 "new name", 

962 FutureWarning, 

963 stacklevel=2 + extra_stacklevel, 

964 ) 

965 warning_emitted = True 

966 deprecated_section, deprecated_key = section, key 

967 section, key = (new_section, new_key) 

968 elif section in self.deprecated_sections: 

969 # When accessing the new section name, make sure we check under the old config name 

970 deprecated_key = key 

971 deprecated_section = self.deprecated_sections[section][0] 

972 else: 

973 deprecated_section, deprecated_key, _ = self.deprecated_options.get( 

974 (section, key), (None, None, None) 

975 ) 

976 

977 return section, key, deprecated_section, deprecated_key, warning_emitted 

978 

979 @overload # type: ignore[override] 

980 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... 

981 

982 @overload # type: ignore[override] 

983 def get(self, section: str, key: str, **kwargs) -> str | None: ... 

984 

985 def get( # type: ignore[misc, override] 

986 self, 

987 section: str, 

988 key: str, 

989 suppress_warnings: bool = False, 

990 lookup_from_deprecated: bool = True, 

991 _extra_stacklevel: int = 0, 

992 team_name: str | None = None, 

993 **kwargs, 

994 ) -> str | None: 

995 """ 

996 Get config value by iterating through lookup sequence. 

997 

998 Priority order is defined by _lookup_sequence property. 

999 """ 

1000 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup( 

1001 section=section, 

1002 key=key, 

1003 lookup_from_deprecated=lookup_from_deprecated, 

1004 extra_stacklevel=_extra_stacklevel, 

1005 ) 

1006 

1007 if team_name is not None: 

1008 kwargs["team_name"] = team_name 

1009 

1010 for lookup_method in self._lookup_sequence: 

1011 value = lookup_method( 

1012 deprecated_key=deprecated_key, 

1013 deprecated_section=deprecated_section, 

1014 key=key, 

1015 section=section, 

1016 issue_warning=not warning_emitted, 

1017 extra_stacklevel=_extra_stacklevel, 

1018 **kwargs, 

1019 ) 

1020 if value is not VALUE_NOT_FOUND_SENTINEL: 

1021 return value 

1022 

1023 # Check if fallback was explicitly provided (even if None) 

1024 if "fallback" in kwargs: 

1025 return kwargs["fallback"] 

1026 

1027 if not suppress_warnings: 

1028 log.warning("section/key [%s/%s] not found in config", section, key) 

1029 

1030 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config") 

1031 

1032 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override] 

1033 """Get config value as boolean.""" 

1034 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip() 

1035 if "#" in val: 

1036 val = val.split("#")[0].strip() 

1037 if val in ("t", "true", "1"): 

1038 return True 

1039 if val in ("f", "false", "0"): 

1040 return False 

1041 raise AirflowConfigException( 

1042 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. ' 

1043 f'Current value: "{val}".' 

1044 ) 

1045 

1046 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override] 

1047 """Get config value as integer.""" 

1048 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1049 if val is None: 

1050 raise AirflowConfigException( 

1051 f"Failed to convert value None to int. " 

1052 f'Please check "{key}" key in "{section}" section is set.' 

1053 ) 

1054 try: 

1055 return int(val) 

1056 except ValueError: 

1057 raise AirflowConfigException( 

1058 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1059 f'Current value: "{val}".' 

1060 ) 

1061 

1062 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override] 

1063 """Get config value as float.""" 

1064 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1065 if val is None: 

1066 raise AirflowConfigException( 

1067 f"Failed to convert value None to float. " 

1068 f'Please check "{key}" key in "{section}" section is set.' 

1069 ) 

1070 try: 

1071 return float(val) 

1072 except ValueError: 

1073 raise AirflowConfigException( 

1074 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. ' 

1075 f'Current value: "{val}".' 

1076 ) 

1077 

1078 def getlist(self, section: str, key: str, delimiter=",", **kwargs): 

1079 """Get config value as list.""" 

1080 val = self.get(section, key, **kwargs) 

1081 if val is None: 

1082 if "fallback" in kwargs: 

1083 return kwargs["fallback"] 

1084 raise AirflowConfigException( 

1085 f"Failed to convert value None to list. " 

1086 f'Please check "{key}" key in "{section}" section is set.' 

1087 ) 

1088 try: 

1089 return [item.strip() for item in val.split(delimiter)] 

1090 except Exception: 

1091 raise AirflowConfigException( 

1092 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. ' 

1093 f'Current value: "{val}".' 

1094 ) 

1095 

1096 E = TypeVar("E", bound=Enum) 

1097 

1098 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E: 

1099 """Get config value as enum.""" 

1100 val = self.get(section, key, **kwargs) 

1101 enum_names = [enum_item.name for enum_item in enum_class] 

1102 

1103 if val is None: 

1104 raise AirflowConfigException( 

1105 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1106 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}' 

1107 ) 

1108 

1109 try: 

1110 return enum_class[val] 

1111 except KeyError: 

1112 if "fallback" in kwargs and kwargs["fallback"] in enum_names: 

1113 return enum_class[kwargs["fallback"]] 

1114 raise AirflowConfigException( 

1115 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1116 f"the value must be one of {', '.join(enum_names)}" 

1117 ) 

1118 

1119 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]: 

1120 """Get config value as list of enums.""" 

1121 string_list = self.getlist(section, key, delimiter, **kwargs) 

1122 enum_names = [enum_item.name for enum_item in enum_class] 

1123 enum_list = [] 

1124 

1125 for val in string_list: 

1126 try: 

1127 enum_list.append(enum_class[val]) 

1128 except KeyError: 

1129 log.warning( 

1130 "Failed to convert value. Please check %s key in %s section. " 

1131 "it must be one of %s, if not the value is ignored", 

1132 key, 

1133 section, 

1134 ", ".join(enum_names), 

1135 ) 

1136 

1137 return enum_list 

1138 

1139 def getimport(self, section: str, key: str, **kwargs) -> Any: 

1140 """ 

1141 Read options, import the full qualified name, and return the object. 

1142 

1143 In case of failure, it throws an exception with the key and section names 

1144 

1145 :return: The object or None, if the option is empty 

1146 """ 

1147 # Fixed: use self.get() instead of conf.get() 

1148 full_qualified_path = self.get(section=section, key=key, **kwargs) 

1149 if not full_qualified_path: 

1150 return None 

1151 

1152 try: 

1153 # Import here to avoid circular dependency 

1154 from ..module_loading import import_string 

1155 

1156 return import_string(full_qualified_path) 

1157 except ImportError as e: 

1158 log.warning(e) 

1159 raise AirflowConfigException( 

1160 f'The object could not be loaded. Please check "{key}" key in "{section}" section. ' 

1161 f'Current value: "{full_qualified_path}".' 

1162 ) 

1163 

1164 def getjson( 

1165 self, section: str, key: str, fallback=None, **kwargs 

1166 ) -> dict | list | str | int | float | None: 

1167 """ 

1168 Return a config value parsed from a JSON string. 

1169 

1170 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given. 

1171 """ 

1172 try: 

1173 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs) 

1174 except (NoSectionError, NoOptionError): 

1175 data = None 

1176 

1177 if data is None or data == "": 

1178 return fallback 

1179 

1180 try: 

1181 return json.loads(data) 

1182 except JSONDecodeError as e: 

1183 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e 

1184 

1185 def gettimedelta( 

1186 self, section: str, key: str, fallback: Any = None, **kwargs 

1187 ) -> datetime.timedelta | None: 

1188 """ 

1189 Get the config value for the given section and key, and convert it into datetime.timedelta object. 

1190 

1191 If the key is missing, then it is considered as `None`. 

1192 

1193 :param section: the section from the config 

1194 :param key: the key defined in the given section 

1195 :param fallback: fallback value when no config value is given, defaults to None 

1196 :raises AirflowConfigException: raised because ValueError or OverflowError 

1197 :return: datetime.timedelta(seconds=<config_value>) or None 

1198 """ 

1199 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs) 

1200 

1201 if val: 

1202 # the given value must be convertible to integer 

1203 try: 

1204 int_val = int(val) 

1205 except ValueError: 

1206 raise AirflowConfigException( 

1207 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1208 f'Current value: "{val}".' 

1209 ) 

1210 

1211 try: 

1212 return datetime.timedelta(seconds=int_val) 

1213 except OverflowError as err: 

1214 raise AirflowConfigException( 

1215 f"Failed to convert value to timedelta in `seconds`. " 

1216 f"{err}. " 

1217 f'Please check "{key}" key in "{section}" section. Current value: "{val}".' 

1218 ) 

1219 

1220 return fallback 

1221 

1222 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: 

1223 """Get mandatory config value, raising ValueError if not found.""" 

1224 value = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1225 if value is None: 

1226 raise ValueError(f"The value {section}/{key} should be set!") 

1227 return value 

1228 

1229 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]: 

1230 """Get mandatory config value as list, raising ValueError if not found.""" 

1231 value = self.getlist(section, key, **kwargs) 

1232 if value is None: 

1233 raise ValueError(f"The value {section}/{key} should be set!") 

1234 return value 

1235 

1236 def read( 

1237 self, 

1238 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike], 

1239 encoding: str | None = None, 

1240 ) -> list[str]: 

1241 return super().read(filenames=filenames, encoding=encoding) 

1242 

1243 def read_dict( # type: ignore[override] 

1244 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>" 

1245 ) -> None: 

1246 """ 

1247 We define a different signature here to add better type hints and checking. 

1248 

1249 :param dictionary: dictionary to read from 

1250 :param source: source to be used to store the configuration 

1251 :return: 

1252 """ 

1253 super().read_dict(dictionary=dictionary, source=source) 

1254 

1255 def get_sections_including_defaults(self) -> list[str]: 

1256 """ 

1257 Retrieve all sections from the configuration parser, including sections defined by built-in defaults. 

1258 

1259 :return: list of section names 

1260 """ 

1261 sections_from_config = self.sections() 

1262 sections_from_description = list(self.configuration_description.keys()) 

1263 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config))) 

1264 

1265 def get_options_including_defaults(self, section: str) -> list[str]: 

1266 """ 

1267 Retrieve all possible options from the configuration parser for the section given. 

1268 

1269 Includes options defined by built-in defaults. 

1270 

1271 :param section: section name 

1272 :return: list of option names for the section given 

1273 """ 

1274 my_own_options = self.options(section) if self.has_section(section) else [] 

1275 all_options_from_defaults = list( 

1276 self.configuration_description.get(section, {}).get("options", {}).keys() 

1277 ) 

1278 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options))) 

1279 

1280 def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True) -> bool: 

1281 """ 

1282 Check if option is defined. 

1283 

1284 Uses self.get() to avoid reimplementing the priority order of config variables 

1285 (env, config, cmd, defaults). 

1286 

1287 :param section: section to get option from 

1288 :param option: option to get 

1289 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections 

1290 :return: 

1291 """ 

1292 try: 

1293 value = self.get( 

1294 section, 

1295 option, 

1296 fallback=None, 

1297 _extra_stacklevel=1, 

1298 suppress_warnings=True, 

1299 lookup_from_deprecated=lookup_from_deprecated, 

1300 ) 

1301 if value is None: 

1302 return False 

1303 return True 

1304 except (NoOptionError, NoSectionError, AirflowConfigException): 

1305 return False 

1306 

1307 def set(self, section: str, option: str, value: str | None = None) -> None: 

1308 """ 

1309 Set an option to the given value. 

1310 

1311 This override just makes sure the section and option are lower case, to match what we do in `get`. 

1312 """ 

1313 section = section.lower() 

1314 option = option.lower() 

1315 defaults = self.configuration_description or {} 

1316 if not self.has_section(section) and section in defaults: 

1317 # Trying to set a key in a section that exists in default, but not in the user config; 

1318 # automatically create it 

1319 self.add_section(section) 

1320 super().set(section, option, value) 

1321 

1322 def remove_option(self, section: str, option: str, remove_default: bool = True): 

1323 """ 

1324 Remove an option if it exists in config from a file or default config. 

1325 

1326 If both of config have the same option, this removes the option 

1327 in both configs unless remove_default=False. 

1328 """ 

1329 section = section.lower() 

1330 option = option.lower() 

1331 if super().has_option(section, option): 

1332 super().remove_option(section, option) 

1333 

1334 if self.get_default_value(section, option) is not None and remove_default: 

1335 self._default_values.remove_option(section, option) 

1336 

1337 def optionxform(self, optionstr: str) -> str: 

1338 """ 

1339 Transform option names on every read, get, or set operation. 

1340 

1341 This changes from the default behaviour of ConfigParser from lower-casing 

1342 to instead be case-preserving. 

1343 

1344 :param optionstr: 

1345 :return: 

1346 """ 

1347 return optionstr 

1348 

1349 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: 

1350 """ 

1351 Get list of config sources to use in as_dict(). 

1352 

1353 Subclasses can override to add additional sources (e.g., provider configs). 

1354 """ 

1355 return [ 

1356 ("default", self._default_values), 

1357 ("airflow.cfg", self), 

1358 ] 

1359 

1360 def as_dict( 

1361 self, 

1362 display_source: bool = False, 

1363 display_sensitive: bool = False, 

1364 raw: bool = False, 

1365 include_env: bool = True, 

1366 include_cmds: bool = True, 

1367 include_secret: bool = True, 

1368 ) -> ConfigSourcesType: 

1369 """ 

1370 Return the current configuration as an OrderedDict of OrderedDicts. 

1371 

1372 When materializing current configuration Airflow defaults are 

1373 materialized along with user set configs. If any of the `include_*` 

1374 options are False then the result of calling command or secret key 

1375 configs do not override Airflow defaults and instead are passed through. 

1376 In order to then avoid Airflow defaults from overwriting user set 

1377 command or secret key configs we filter out bare sensitive_config_values 

1378 that are set to Airflow defaults when command or secret key configs 

1379 produce different values. 

1380 

1381 :param display_source: If False, the option value is returned. If True, 

1382 a tuple of (option_value, source) is returned. Source is either 

1383 'airflow.cfg', 'default', 'env var', or 'cmd'. 

1384 :param display_sensitive: If True, the values of options set by env 

1385 vars and bash commands will be displayed. If False, those options 

1386 are shown as '< hidden >' 

1387 :param raw: Should the values be output as interpolated values, or the 

1388 "raw" form that can be fed back in to ConfigParser 

1389 :param include_env: Should the value of configuration from AIRFLOW__ 

1390 environment variables be included or not 

1391 :param include_cmds: Should the result of calling any ``*_cmd`` config be 

1392 set (True, default), or should the _cmd options be left as the 

1393 command to run (False) 

1394 :param include_secret: Should the result of calling any ``*_secret`` config be 

1395 set (True, default), or should the _secret options be left as the 

1396 path to get the secret from (False) 

1397 :return: Dictionary, where the key is the name of the section and the content is 

1398 the dictionary with the name of the parameter and its value. 

1399 """ 

1400 if not display_sensitive: 

1401 # We want to hide the sensitive values at the appropriate methods 

1402 # since envs from cmds, secrets can be read at _include_envs method 

1403 if not all([include_env, include_cmds, include_secret]): 

1404 raise ValueError( 

1405 "If display_sensitive is false, then include_env, " 

1406 "include_cmds, include_secret must all be set as True" 

1407 ) 

1408 

1409 config_sources: ConfigSourcesType = {} 

1410 

1411 # We check sequentially all those sources and the last one we saw it in will "win" 

1412 configs = self._get_config_sources_for_as_dict() 

1413 

1414 self._replace_config_with_display_sources( 

1415 config_sources, 

1416 configs, 

1417 self.configuration_description, 

1418 display_source, 

1419 raw, 

1420 self.deprecated_options, 

1421 include_cmds=include_cmds, 

1422 include_env=include_env, 

1423 include_secret=include_secret, 

1424 ) 

1425 

1426 # add env vars and overwrite because they have priority 

1427 if include_env: 

1428 self._include_envs(config_sources, display_sensitive, display_source, raw) 

1429 else: 

1430 self._filter_by_source(config_sources, display_source, self._get_env_var_option) 

1431 

1432 # add bash commands 

1433 if include_cmds: 

1434 self._include_commands(config_sources, display_sensitive, display_source, raw) 

1435 else: 

1436 self._filter_by_source(config_sources, display_source, self._get_cmd_option) 

1437 

1438 # add config from secret backends 

1439 if include_secret: 

1440 self._include_secrets(config_sources, display_sensitive, display_source, raw) 

1441 else: 

1442 self._filter_by_source(config_sources, display_source, self._get_secret_option) 

1443 

1444 if not display_sensitive: 

1445 # This ensures the ones from config file is hidden too 

1446 # if they are not provided through env, cmd and secret 

1447 hidden = "< hidden >" 

1448 for section, key in self.sensitive_config_values: 

1449 if config_sources.get(section): 

1450 if config_sources[section].get(key, None): 

1451 if display_source: 

1452 source = config_sources[section][key][1] 

1453 config_sources[section][key] = (hidden, source) 

1454 else: 

1455 config_sources[section][key] = hidden 

1456 

1457 return config_sources 

1458 

1459 def _write_option_header( 

1460 self, 

1461 file: IO[str], 

1462 option: str, 

1463 extra_spacing: bool, 

1464 include_descriptions: bool, 

1465 include_env_vars: bool, 

1466 include_examples: bool, 

1467 include_sources: bool, 

1468 section_config_description: dict[str, dict[str, Any]], 

1469 section_to_write: str, 

1470 sources_dict: ConfigSourcesType, 

1471 ) -> tuple[bool, bool]: 

1472 """ 

1473 Write header for configuration option. 

1474 

1475 Returns tuple of (should_continue, needs_separation) where needs_separation should be 

1476 set if the option needs additional separation to visually separate it from the next option. 

1477 """ 

1478 option_config_description = ( 

1479 section_config_description.get("options", {}).get(option, {}) 

1480 if section_config_description 

1481 else {} 

1482 ) 

1483 description = option_config_description.get("description") 

1484 needs_separation = False 

1485 if description and include_descriptions: 

1486 for line in description.splitlines(): 

1487 file.write(f"# {line}\n") 

1488 needs_separation = True 

1489 example = option_config_description.get("example") 

1490 if example is not None and include_examples: 

1491 if extra_spacing: 

1492 file.write("#\n") 

1493 example_lines = example.splitlines() 

1494 example = "\n# ".join(example_lines) 

1495 file.write(f"# Example: {option} = {example}\n") 

1496 needs_separation = True 

1497 if include_sources and sources_dict: 

1498 sources_section = sources_dict.get(section_to_write) 

1499 value_with_source = sources_section.get(option) if sources_section else None 

1500 if value_with_source is None: 

1501 file.write("#\n# Source: not defined\n") 

1502 else: 

1503 file.write(f"#\n# Source: {value_with_source[1]}\n") 

1504 needs_separation = True 

1505 if include_env_vars: 

1506 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n") 

1507 if extra_spacing: 

1508 file.write("#\n") 

1509 needs_separation = True 

1510 return True, needs_separation 

1511 

1512 def is_template(self, section: str, key) -> bool: 

1513 """ 

1514 Return whether the value is templated. 

1515 

1516 :param section: section of the config 

1517 :param key: key in the section 

1518 :return: True if the value is templated 

1519 """ 

1520 return _is_template(self.configuration_description, section, key) 

1521 

1522 def getsection(self, section: str) -> ConfigOptionsDictType | None: 

1523 """ 

1524 Return the section as a dict. 

1525 

1526 Values are converted to int, float, bool as required. 

1527 

1528 :param section: section from the config 

1529 """ 

1530 if not self.has_section(section) and not self._default_values.has_section(section): 

1531 return None 

1532 if self._default_values.has_section(section): 

1533 _section: ConfigOptionsDictType = dict(self._default_values.items(section)) 

1534 else: 

1535 _section = {} 

1536 

1537 if self.has_section(section): 

1538 _section.update(self.items(section)) 

1539 

1540 section_prefix = self._env_var_name(section, "") 

1541 for env_var in sorted(os.environ.keys()): 

1542 if env_var.startswith(section_prefix): 

1543 key = env_var.replace(section_prefix, "") 

1544 if key.endswith("_CMD"): 

1545 key = key[:-4] 

1546 key = key.lower() 

1547 _section[key] = self._get_env_var_option(section, key) 

1548 

1549 for key, val in _section.items(): 

1550 if val is None: 

1551 raise AirflowConfigException( 

1552 f"Failed to convert value automatically. " 

1553 f'Please check "{key}" key in "{section}" section is set.' 

1554 ) 

1555 try: 

1556 _section[key] = int(val) 

1557 except ValueError: 

1558 try: 

1559 _section[key] = float(val) 

1560 except ValueError: 

1561 if isinstance(val, str) and val.lower() in ("t", "true"): 

1562 _section[key] = True 

1563 elif isinstance(val, str) and val.lower() in ("f", "false"): 

1564 _section[key] = False 

1565 return _section 

1566 

1567 @staticmethod 

1568 def _write_section_header( 

1569 file: IO[str], 

1570 include_descriptions: bool, 

1571 section_config_description: dict[str, str], 

1572 section_to_write: str, 

1573 ) -> None: 

1574 """Write header for configuration section.""" 

1575 file.write(f"[{section_to_write}]\n") 

1576 section_description = section_config_description.get("description") 

1577 if section_description and include_descriptions: 

1578 for line in section_description.splitlines(): 

1579 file.write(f"# {line}\n") 

1580 file.write("\n") 

1581 

1582 def _write_value( 

1583 self, 

1584 file: IO[str], 

1585 option: str, 

1586 comment_out_everything: bool, 

1587 needs_separation: bool, 

1588 only_defaults: bool, 

1589 section_to_write: str, 

1590 ): 

1591 default_value = self.get_default_value(section_to_write, option, raw=True) 

1592 if only_defaults: 

1593 value = default_value 

1594 else: 

1595 value = self.get(section_to_write, option, fallback=default_value, raw=True) 

1596 if value is None: 

1597 file.write(f"# {option} = \n") 

1598 else: 

1599 if comment_out_everything: 

1600 value_lines = value.splitlines() 

1601 value = "\n# ".join(value_lines) 

1602 file.write(f"# {option} = {value}\n") 

1603 else: 

1604 if "\n" in value: 

1605 try: 

1606 value = json.dumps(json.loads(value), indent=4) 

1607 value = value.replace( 

1608 "\n", "\n " 

1609 ) # indent multi-line JSON to satisfy configparser format 

1610 except JSONDecodeError: 

1611 pass 

1612 file.write(f"{option} = {value}\n") 

1613 if needs_separation: 

1614 file.write("\n") 

1615 

1616 def write( # type: ignore[override] 

1617 self, 

1618 file: IO[str], 

1619 section: str | None = None, 

1620 include_examples: bool = True, 

1621 include_descriptions: bool = True, 

1622 include_sources: bool = True, 

1623 include_env_vars: bool = True, 

1624 include_providers: bool = True, 

1625 comment_out_everything: bool = False, 

1626 hide_sensitive_values: bool = False, 

1627 extra_spacing: bool = True, 

1628 only_defaults: bool = False, 

1629 **kwargs: Any, 

1630 ) -> None: 

1631 """ 

1632 Write configuration with comments and examples to a file. 

1633 

1634 :param file: file to write to 

1635 :param section: section of the config to write, defaults to all sections 

1636 :param include_examples: Include examples in the output 

1637 :param include_descriptions: Include descriptions in the output 

1638 :param include_sources: Include the source of each config option 

1639 :param include_env_vars: Include environment variables corresponding to each config option 

1640 :param include_providers: Include providers configuration 

1641 :param comment_out_everything: Comment out all values 

1642 :param hide_sensitive_values: Include sensitive values in the output 

1643 :param extra_spacing: Add extra spacing before examples and after variables 

1644 :param only_defaults: Only include default values when writing the config, not the actual values 

1645 """ 

1646 sources_dict = {} 

1647 if include_sources: 

1648 sources_dict = self.as_dict(display_source=True) 

1649 with self.make_sure_configuration_loaded(with_providers=include_providers): 

1650 for section_to_write in self.get_sections_including_defaults(): 

1651 section_config_description = self.configuration_description.get(section_to_write, {}) 

1652 if section_to_write != section and section is not None: 

1653 continue 

1654 if self._default_values.has_section(section_to_write) or self.has_section(section_to_write): 

1655 self._write_section_header( 

1656 file, include_descriptions, section_config_description, section_to_write 

1657 ) 

1658 for option in self.get_options_including_defaults(section_to_write): 

1659 should_continue, needs_separation = self._write_option_header( 

1660 file=file, 

1661 option=option, 

1662 extra_spacing=extra_spacing, 

1663 include_descriptions=include_descriptions, 

1664 include_env_vars=include_env_vars, 

1665 include_examples=include_examples, 

1666 include_sources=include_sources, 

1667 section_config_description=section_config_description, 

1668 section_to_write=section_to_write, 

1669 sources_dict=sources_dict, 

1670 ) 

1671 self._write_value( 

1672 file=file, 

1673 option=option, 

1674 comment_out_everything=comment_out_everything, 

1675 needs_separation=needs_separation, 

1676 only_defaults=only_defaults, 

1677 section_to_write=section_to_write, 

1678 ) 

1679 if include_descriptions and not needs_separation: 

1680 # extra separation between sections in case last option did not need it 

1681 file.write("\n") 

1682 

1683 @contextmanager 

1684 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]: 

1685 """ 

1686 Make sure configuration is loaded with or without providers. 

1687 

1688 This happens regardless if the provider configuration has been loaded before or not. 

1689 Restores configuration to the state before entering the context. 

1690 

1691 :param with_providers: whether providers should be loaded 

1692 """ 

1693 needs_reload = False 

1694 if with_providers: 

1695 self._ensure_providers_config_loaded() 

1696 else: 

1697 needs_reload = self._ensure_providers_config_unloaded() 

1698 yield 

1699 if needs_reload: 

1700 self._reload_provider_configs() 

1701 

1702 def _ensure_providers_config_loaded(self) -> None: 

1703 """Ensure providers configurations are loaded.""" 

1704 raise NotImplementedError("Subclasses must implement _ensure_providers_config_loaded method") 

1705 

1706 def _ensure_providers_config_unloaded(self) -> bool: 

1707 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded.""" 

1708 raise NotImplementedError("Subclasses must implement _ensure_providers_config_unloaded method") 

1709 

1710 def _reload_provider_configs(self) -> None: 

1711 """Reload providers configuration.""" 

1712 raise NotImplementedError("Subclasses must implement _reload_provider_configs method")