Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/_shared/configuration/parser.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

741 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Base configuration parser with pure parsing logic.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import datetime 

24import functools 

25import itertools 

26import json 

27import logging 

28import os 

29import shlex 

30import subprocess 

31import sys 

32import warnings 

33from collections.abc import Callable, Generator, Iterable 

34from configparser import ConfigParser, NoOptionError, NoSectionError 

35from contextlib import contextmanager 

36from enum import Enum 

37from json.decoder import JSONDecodeError 

38from re import Pattern 

39from typing import IO, Any, TypeVar, overload 

40 

41from .exceptions import AirflowConfigException 

42 

43log = logging.getLogger(__name__) 

44 

45 

46ConfigType = str | int | float | bool 

47ConfigOptionsDictType = dict[str, ConfigType] 

48ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

49ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

50ENV_VAR_PREFIX = "AIRFLOW__" 

51 

52 

53class ValueNotFound: 

54 """Object of this is raised when a configuration value cannot be found.""" 

55 

56 pass 

57 

58 

59VALUE_NOT_FOUND_SENTINEL = ValueNotFound() 

60 

61 

62@overload 

63def expand_env_var(env_var: None) -> None: ... 

64@overload 

65def expand_env_var(env_var: str) -> str: ... 

66 

67 

68def expand_env_var(env_var: str | None) -> str | None: 

69 """ 

70 Expand (potentially nested) env vars. 

71 

72 Repeat and apply `expandvars` and `expanduser` until 

73 interpolation stops having any effect. 

74 """ 

75 if not env_var or not isinstance(env_var, str): 

76 return env_var 

77 while True: 

78 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

79 if interpolated == env_var: 

80 return interpolated 

81 env_var = interpolated 

82 

83 

84def run_command(command: str) -> str: 

85 """Run command and returns stdout.""" 

86 process = subprocess.Popen( 

87 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

88 ) 

89 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

90 

91 if process.returncode != 0: 

92 raise AirflowConfigException( 

93 f"Cannot execute {command}. Error code is: {process.returncode}. " 

94 f"Output: {output}, Stderr: {stderr}" 

95 ) 

96 

97 return output 

98 

99 

100def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool: 

101 """ 

102 Check if the config is a template. 

103 

104 :param configuration_description: description of configuration 

105 :param section: section 

106 :param key: key 

107 :return: True if the config is a template 

108 """ 

109 return configuration_description.get(section, {}).get(key, {}).get("is_template", False) 

110 

111 

112class AirflowConfigParser(ConfigParser): 

113 """ 

114 Base configuration parser with pure parsing logic. 

115 

116 This class provides the core parsing methods that work with: 

117 - configuration_description: dict describing config options (required in __init__) 

118 - _default_values: ConfigParser with default values (required in __init__) 

119 - deprecated_options: class attribute mapping new -> old options 

120 - deprecated_sections: class attribute mapping new -> old sections 

121 """ 

122 

123 # A mapping of section -> setting -> { old, replace } for deprecated default values. 

124 # Subclasses can override this to define deprecated values that should be upgraded. 

125 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {} 

126 

127 # A mapping of (new section, new option) -> (old section, old option, since_version). 

128 # When reading new option, the old option will be checked to see if it exists. If it does a 

129 # DeprecationWarning will be issued and the old option will be used instead 

130 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { 

131 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"), 

132 ("api", "base_url"): ("webserver", "base_url", "3.0"), 

133 ("api", "host"): ("webserver", "web_server_host", "3.0"), 

134 ("api", "port"): ("webserver", "web_server_port", "3.0"), 

135 ("api", "workers"): ("webserver", "workers", "3.0"), 

136 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"), 

137 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"), 

138 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"), 

139 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"), 

140 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"), 

141 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"), 

142 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"), 

143 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"), 

144 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"), 

145 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"), 

146 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"), 

147 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"), 

148 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"), 

149 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"), 

150 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"), 

151 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"), 

152 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"), 

153 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"), 

154 ("api", "page_size"): ("webserver", "page_size", "3.1.0"), 

155 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"), 

156 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"), 

157 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"), 

158 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"), 

159 ("api", "log_config"): ("api", "access_logfile", "3.1.0"), 

160 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"), 

161 } 

162 

163 # A mapping of new section -> (old section, since_version). 

164 deprecated_sections: dict[str, tuple[str, str]] = {} 

165 

166 @property 

167 def _lookup_sequence(self) -> list[Callable]: 

168 """ 

169 Define the sequence of lookup methods for get(). The definition here does not have provider lookup. 

170 

171 Subclasses can override this to customise lookup order. 

172 """ 

173 return [ 

174 self._get_environment_variables, 

175 self._get_option_from_config_file, 

176 self._get_option_from_commands, 

177 self._get_option_from_secrets, 

178 self._get_option_from_defaults, 

179 ] 

180 

181 @property 

182 def _validators(self) -> list[Callable[[], None]]: 

183 """ 

184 Return list of validators defined on a config parser class. Base class will return an empty list. 

185 

186 Subclasses can override this to customize the validators that are run during validation on the 

187 config parser instance. 

188 """ 

189 return [] 

190 

191 def validate(self) -> None: 

192 """Run all registered validators.""" 

193 for validator in self._validators: 

194 validator() 

195 self.is_validated = True 

196 

197 def _validate_deprecated_values(self) -> None: 

198 """Validate and upgrade deprecated default values.""" 

199 for section, replacement in self.deprecated_values.items(): 

200 for name, info in replacement.items(): 

201 old, new = info 

202 current_value = self.get(section, name, fallback="") 

203 if self._using_old_value(old, current_value): 

204 self.upgraded_values[(section, name)] = current_value 

205 new_value = old.sub(new, current_value) 

206 self._update_env_var(section=section, name=name, new_value=new_value) 

207 self._create_future_warning( 

208 name=name, 

209 section=section, 

210 current_value=current_value, 

211 new_value=new_value, 

212 ) 

213 

214 def _using_old_value(self, old: Pattern, current_value: str) -> bool: 

215 """Check if current_value matches the old pattern.""" 

216 return old.search(current_value) is not None 

217 

218 def _update_env_var(self, section: str, name: str, new_value: str) -> None: 

219 """Update environment variable with new value.""" 

220 env_var = self._env_var_name(section, name) 

221 # Set it as an env var so that any subprocesses keep the same override! 

222 os.environ[env_var] = new_value 

223 

224 @staticmethod 

225 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None: 

226 """Create a FutureWarning for deprecated default values.""" 

227 warnings.warn( 

228 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. " 

229 f"This value has been changed to {new_value!r} in the running config, but please update your config.", 

230 FutureWarning, 

231 stacklevel=3, 

232 ) 

233 

234 def __init__( 

235 self, 

236 configuration_description: dict[str, dict[str, Any]], 

237 _default_values: ConfigParser, 

238 *args, 

239 **kwargs, 

240 ): 

241 """ 

242 Initialize the parser. 

243 

244 :param configuration_description: Description of configuration options 

245 :param _default_values: ConfigParser with default values 

246 """ 

247 super().__init__(*args, **kwargs) 

248 self.configuration_description = configuration_description 

249 self._default_values = _default_values 

250 self._suppress_future_warnings = False 

251 self.upgraded_values = {} 

252 

253 @functools.cached_property 

254 def inversed_deprecated_options(self): 

255 """Build inverse mapping from old options to new options.""" 

256 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()} 

257 

258 @functools.cached_property 

259 def inversed_deprecated_sections(self): 

260 """Build inverse mapping from old sections to new sections.""" 

261 return { 

262 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items() 

263 } 

264 

265 @functools.cached_property 

266 def sensitive_config_values(self) -> set[tuple[str, str]]: 

267 """Get set of sensitive config values that should be masked.""" 

268 flattened = { 

269 (s, k): item 

270 for s, s_c in self.configuration_description.items() 

271 for k, item in s_c.get("options", {}).items() 

272 } 

273 sensitive = { 

274 (section.lower(), key.lower()) 

275 for (section, key), v in flattened.items() 

276 if v.get("sensitive") is True 

277 } 

278 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options} 

279 depr_section = { 

280 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections 

281 } 

282 sensitive.update(depr_section, depr_option) 

283 return sensitive 

284 

285 @overload # type: ignore[override] 

286 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... 

287 

288 @overload 

289 def get(self, section: str, key: str, **kwargs) -> str | None: ... 

290 

291 def _update_defaults_from_string(self, config_string: str) -> None: 

292 """ 

293 Update the defaults in _default_values based on values in config_string ("ini" format). 

294 

295 Override shared parser's method to add validation for template variables. 

296 Note that those values are not validated and cannot contain variables because we are using 

297 regular config parser to load them. This method is used to test the config parser in unit tests. 

298 

299 :param config_string: ini-formatted config string 

300 """ 

301 parser = ConfigParser() 

302 parser.read_string(config_string) 

303 for section in parser.sections(): 

304 if section not in self._default_values.sections(): 

305 self._default_values.add_section(section) 

306 errors = False 

307 for key, value in parser.items(section): 

308 if not self.is_template(section, key) and "{" in value: 

309 errors = True 

310 log.error( 

311 "The %s.%s value %s read from string contains variable. This is not supported", 

312 section, 

313 key, 

314 value, 

315 ) 

316 self._default_values.set(section, key, value) 

317 if errors: 

318 raise AirflowConfigException( 

319 f"The string config passed as default contains variables. " 

320 f"This is not supported. String config: {config_string}" 

321 ) 

322 

323 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any: 

324 """ 

325 Retrieve default value from default config parser. 

326 

327 This will retrieve the default value from the default config parser. Optionally a raw, stored 

328 value can be retrieved by setting skip_interpolation to True. This is useful for example when 

329 we want to write the default value to a file, and we don't want the interpolation to happen 

330 as it is going to be done later when the config is read. 

331 

332 :param section: section of the config 

333 :param key: key to use 

334 :param fallback: fallback value to use 

335 :param raw: if raw, then interpolation will be reversed 

336 :param kwargs: other args 

337 :return: 

338 """ 

339 value = self._default_values.get(section, key, fallback=fallback, **kwargs) 

340 if raw and value is not None: 

341 return value.replace("%", "%%") 

342 return value 

343 

344 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None: 

345 """ 

346 Get Secret Backend if defined in airflow.cfg. 

347 

348 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

349 """ 

350 section = "workers" if worker_mode else "secrets" 

351 key = "secrets_backend" if worker_mode else "backend" 

352 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs" 

353 

354 secrets_backend_cls = self.getimport(section=section, key=key) 

355 

356 if not secrets_backend_cls: 

357 if worker_mode: 

358 # if we find no secrets backend for worker, return that of secrets backend 

359 secrets_backend_cls = self.getimport(section="secrets", key="backend") 

360 if not secrets_backend_cls: 

361 return None 

362 # When falling back to secrets backend, use its kwargs 

363 kwargs_key = "backend_kwargs" 

364 section = "secrets" 

365 else: 

366 return None 

367 

368 try: 

369 backend_kwargs = self.getjson(section=section, key=kwargs_key) 

370 if not backend_kwargs: 

371 backend_kwargs = {} 

372 elif not isinstance(backend_kwargs, dict): 

373 raise ValueError("not a dict") 

374 except AirflowConfigException: 

375 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key) 

376 backend_kwargs = {} 

377 except ValueError: 

378 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key) 

379 backend_kwargs = {} 

380 

381 return secrets_backend_cls(**backend_kwargs) 

382 

383 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

384 """ 

385 Get Config option values from Secret Backend. 

386 

387 Called by the shared parser's _get_secret_option() method as part of the lookup chain. 

388 Uses _get_custom_secret_backend() to get the backend instance. 

389 

390 :param config_key: the config key to retrieve 

391 :return: config value or None 

392 """ 

393 try: 

394 secrets_client = self._get_custom_secret_backend() 

395 if not secrets_client: 

396 return None 

397 return secrets_client.get_config(config_key) 

398 except Exception as e: 

399 raise AirflowConfigException( 

400 "Cannot retrieve config from alternative secrets backend. " 

401 "Make sure it is configured properly and that the Backend " 

402 "is accessible.\n" 

403 f"{e}" 

404 ) 

405 

406 def _get_cmd_option_from_config_sources( 

407 self, config_sources: ConfigSourcesType, section: str, key: str 

408 ) -> str | None: 

409 fallback_key = key + "_cmd" 

410 if (section, key) in self.sensitive_config_values: 

411 section_dict = config_sources.get(section) 

412 if section_dict is not None: 

413 command_value = section_dict.get(fallback_key) 

414 if command_value is not None: 

415 if isinstance(command_value, str): 

416 command = command_value 

417 else: 

418 command = command_value[0] 

419 return run_command(command) 

420 return None 

421 

422 def _get_secret_option_from_config_sources( 

423 self, config_sources: ConfigSourcesType, section: str, key: str 

424 ) -> str | None: 

425 fallback_key = key + "_secret" 

426 if (section, key) in self.sensitive_config_values: 

427 section_dict = config_sources.get(section) 

428 if section_dict is not None: 

429 secrets_path_value = section_dict.get(fallback_key) 

430 if secrets_path_value is not None: 

431 if isinstance(secrets_path_value, str): 

432 secrets_path = secrets_path_value 

433 else: 

434 secrets_path = secrets_path_value[0] 

435 return self._get_config_value_from_secret_backend(secrets_path) 

436 return None 

437 

438 def _include_secrets( 

439 self, 

440 config_sources: ConfigSourcesType, 

441 display_sensitive: bool, 

442 display_source: bool, 

443 raw: bool, 

444 ): 

445 for section, key in self.sensitive_config_values: 

446 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key) 

447 if value: 

448 if not display_sensitive: 

449 value = "< hidden >" 

450 if display_source: 

451 opt: str | tuple[str, str] = (value, "secret") 

452 elif raw: 

453 opt = value.replace("%", "%%") 

454 else: 

455 opt = value 

456 config_sources.setdefault(section, {}).update({key: opt}) 

457 del config_sources[section][key + "_secret"] 

458 

459 def _include_commands( 

460 self, 

461 config_sources: ConfigSourcesType, 

462 display_sensitive: bool, 

463 display_source: bool, 

464 raw: bool, 

465 ): 

466 for section, key in self.sensitive_config_values: 

467 opt = self._get_cmd_option_from_config_sources(config_sources, section, key) 

468 if not opt: 

469 continue 

470 opt_to_set: str | tuple[str, str] | None = opt 

471 if not display_sensitive: 

472 opt_to_set = "< hidden >" 

473 if display_source: 

474 opt_to_set = (str(opt_to_set), "cmd") 

475 elif raw: 

476 opt_to_set = str(opt_to_set).replace("%", "%%") 

477 if opt_to_set is not None: 

478 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set} 

479 config_sources.setdefault(section, {}).update(dict_to_update) 

480 del config_sources[section][key + "_cmd"] 

481 

482 def _include_envs( 

483 self, 

484 config_sources: ConfigSourcesType, 

485 display_sensitive: bool, 

486 display_source: bool, 

487 raw: bool, 

488 ): 

489 for env_var in [ 

490 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX) 

491 ]: 

492 try: 

493 _, section, key = env_var.split("__", 2) 

494 opt = self._get_env_var_option(section, key) 

495 except ValueError: 

496 continue 

497 if opt is None: 

498 log.warning("Ignoring unknown env var '%s'", env_var) 

499 continue 

500 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"): 

501 # Don't hide cmd/secret values here 

502 if not env_var.lower().endswith(("cmd", "secret")): 

503 if (section, key) in self.sensitive_config_values: 

504 opt = "< hidden >" 

505 elif raw: 

506 opt = opt.replace("%", "%%") 

507 if display_source: 

508 opt = (opt, "env var") 

509 

510 section = section.lower() 

511 key = key.lower() 

512 config_sources.setdefault(section, {}).update({key: opt}) 

513 

514 def _filter_by_source( 

515 self, 

516 config_sources: ConfigSourcesType, 

517 display_source: bool, 

518 getter_func, 

519 ): 

520 """ 

521 Delete default configs from current configuration. 

522 

523 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values. 

524 

525 This is necessary because bare configs take precedence over the command 

526 or secret key equivalents so if the current running config is 

527 materialized with Airflow defaults they in turn override user set 

528 command or secret key configs. 

529 

530 :param config_sources: The current configuration to operate on 

531 :param display_source: If False, configuration options contain raw 

532 values. If True, options are a tuple of (option_value, source). 

533 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'. 

534 :param getter_func: A callback function that gets the user configured 

535 override value for a particular sensitive_config_values config. 

536 :return: None, the given config_sources is filtered if necessary, 

537 otherwise untouched. 

538 """ 

539 for section, key in self.sensitive_config_values: 

540 # Don't bother if we don't have section / key 

541 if section not in config_sources or key not in config_sources[section]: 

542 continue 

543 # Check that there is something to override defaults 

544 try: 

545 getter_opt = getter_func(section, key) 

546 except ValueError: 

547 continue 

548 if not getter_opt: 

549 continue 

550 # Check to see that there is a default value 

551 if self.get_default_value(section, key) is None: 

552 continue 

553 # Check to see if bare setting is the same as defaults 

554 if display_source: 

555 # when display_source = true, we know that the config_sources contains tuple 

556 opt, source = config_sources[section][key] # type: ignore 

557 else: 

558 opt = config_sources[section][key] 

559 if opt == self.get_default_value(section, key): 

560 del config_sources[section][key] 

561 

562 @staticmethod 

563 def _deprecated_value_is_set_in_config( 

564 deprecated_section: str, 

565 deprecated_key: str, 

566 configs: Iterable[tuple[str, ConfigParser]], 

567 ) -> bool: 

568 for config_type, config in configs: 

569 if config_type != "default": 

570 with contextlib.suppress(NoSectionError): 

571 deprecated_section_array = config.items(section=deprecated_section, raw=True) 

572 if any(key == deprecated_key for key, _ in deprecated_section_array): 

573 return True 

574 return False 

575 

576 @staticmethod 

577 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

578 return ( 

579 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}") 

580 is not None 

581 ) 

582 

583 @staticmethod 

584 def _deprecated_command_is_set_in_config( 

585 deprecated_section: str, 

586 deprecated_key: str, 

587 configs: Iterable[tuple[str, ConfigParser]], 

588 ) -> bool: 

589 return AirflowConfigParser._deprecated_value_is_set_in_config( 

590 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs 

591 ) 

592 

593 @staticmethod 

594 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

595 return ( 

596 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD") 

597 is not None 

598 ) 

599 

600 @staticmethod 

601 def _deprecated_secret_is_set_in_config( 

602 deprecated_section: str, 

603 deprecated_key: str, 

604 configs: Iterable[tuple[str, ConfigParser]], 

605 ) -> bool: 

606 return AirflowConfigParser._deprecated_value_is_set_in_config( 

607 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs 

608 ) 

609 

610 @staticmethod 

611 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

612 return ( 

613 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET") 

614 is not None 

615 ) 

616 

617 @staticmethod 

618 def _replace_config_with_display_sources( 

619 config_sources: ConfigSourcesType, 

620 configs: Iterable[tuple[str, ConfigParser]], 

621 configuration_description: dict[str, dict[str, Any]], 

622 display_source: bool, 

623 raw: bool, 

624 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

625 include_env: bool, 

626 include_cmds: bool, 

627 include_secret: bool, 

628 ): 

629 for source_name, config in configs: 

630 sections = config.sections() 

631 for section in sections: 

632 AirflowConfigParser._replace_section_config_with_display_sources( 

633 config, 

634 config_sources, 

635 configuration_description, 

636 display_source, 

637 raw, 

638 section, 

639 source_name, 

640 deprecated_options, 

641 configs, 

642 include_env=include_env, 

643 include_cmds=include_cmds, 

644 include_secret=include_secret, 

645 ) 

646 

647 @staticmethod 

648 def _replace_section_config_with_display_sources( 

649 config: ConfigParser, 

650 config_sources: ConfigSourcesType, 

651 configuration_description: dict[str, dict[str, Any]], 

652 display_source: bool, 

653 raw: bool, 

654 section: str, 

655 source_name: str, 

656 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

657 configs: Iterable[tuple[str, ConfigParser]], 

658 include_env: bool, 

659 include_cmds: bool, 

660 include_secret: bool, 

661 ): 

662 sect = config_sources.setdefault(section, {}) 

663 if isinstance(config, AirflowConfigParser): 

664 with config.suppress_future_warnings(): 

665 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw) 

666 else: 

667 items = config.items(section=section, raw=raw) 

668 for k, val in items: 

669 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None)) 

670 if deprecated_section and deprecated_key: 

671 if source_name == "default": 

672 # If deprecated entry has some non-default value set for any of the sources requested, 

673 # We should NOT set default for the new entry (because it will override anything 

674 # coming from the deprecated ones) 

675 if AirflowConfigParser._deprecated_value_is_set_in_config( 

676 deprecated_section, deprecated_key, configs 

677 ): 

678 continue 

679 if include_env and AirflowConfigParser._deprecated_variable_is_set( 

680 deprecated_section, deprecated_key 

681 ): 

682 continue 

683 if include_cmds and ( 

684 AirflowConfigParser._deprecated_variable_command_is_set( 

685 deprecated_section, deprecated_key 

686 ) 

687 or AirflowConfigParser._deprecated_command_is_set_in_config( 

688 deprecated_section, deprecated_key, configs 

689 ) 

690 ): 

691 continue 

692 if include_secret and ( 

693 AirflowConfigParser._deprecated_variable_secret_is_set( 

694 deprecated_section, deprecated_key 

695 ) 

696 or AirflowConfigParser._deprecated_secret_is_set_in_config( 

697 deprecated_section, deprecated_key, configs 

698 ) 

699 ): 

700 continue 

701 if display_source: 

702 updated_source_name = source_name 

703 if source_name == "default": 

704 # defaults can come from other sources (default-<PROVIDER>) that should be used here 

705 source_description_section = configuration_description.get(section, {}) 

706 source_description_key = source_description_section.get("options", {}).get(k, {}) 

707 if source_description_key is not None: 

708 updated_source_name = source_description_key.get("source", source_name) 

709 sect[k] = (val, updated_source_name) 

710 else: 

711 sect[k] = val 

712 

713 def _warn_deprecate( 

714 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int 

715 ): 

716 """Warn about deprecated config option usage.""" 

717 if section == deprecated_section: 

718 warnings.warn( 

719 f"The {deprecated_name} option in [{section}] has been renamed to {key} - " 

720 f"the old setting has been used, but please update your config.", 

721 DeprecationWarning, 

722 stacklevel=4 + extra_stacklevel, 

723 ) 

724 else: 

725 warnings.warn( 

726 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option " 

727 f"in [{section}] - the old setting has been used, but please update your config.", 

728 DeprecationWarning, 

729 stacklevel=4 + extra_stacklevel, 

730 ) 

731 

732 @contextmanager 

733 def suppress_future_warnings(self): 

734 """ 

735 Context manager to temporarily suppress future warnings. 

736 

737 This is a stub used by the shared parser's lookup methods when checking deprecated options. 

738 Subclasses can override this to customize warning suppression behavior. 

739 

740 :return: context manager that suppresses future warnings 

741 """ 

742 suppress_future_warnings = self._suppress_future_warnings 

743 self._suppress_future_warnings = True 

744 yield self 

745 self._suppress_future_warnings = suppress_future_warnings 

746 

747 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str: 

748 """Generate environment variable name for a config option.""" 

749 team_component: str = f"{team_name.upper()}___" if team_name else "" 

750 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}" 

751 

752 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None): 

753 """Get config option from environment variable.""" 

754 env_var: str = self._env_var_name(section, key, team_name=team_name) 

755 if env_var in os.environ: 

756 return expand_env_var(os.environ[env_var]) 

757 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command) 

758 env_var_cmd = env_var + "_CMD" 

759 if env_var_cmd in os.environ: 

760 # if this is a valid command key... 

761 if (section, key) in self.sensitive_config_values: 

762 return run_command(os.environ[env_var_cmd]) 

763 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend) 

764 env_var_secret_path = env_var + "_SECRET" 

765 if env_var_secret_path in os.environ: 

766 # if this is a valid secret path... 

767 if (section, key) in self.sensitive_config_values: 

768 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path]) 

769 return None 

770 

771 def _get_cmd_option(self, section: str, key: str): 

772 """Get config option from command execution.""" 

773 fallback_key = key + "_cmd" 

774 if (section, key) in self.sensitive_config_values: 

775 if super().has_option(section, fallback_key): 

776 command = super().get(section, fallback_key) 

777 try: 

778 cmd_output = run_command(command) 

779 except AirflowConfigException as e: 

780 raise e 

781 except Exception as e: 

782 raise AirflowConfigException( 

783 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd." 

784 f" Please check the {fallback_key} value." 

785 ) from e 

786 return cmd_output 

787 return None 

788 

789 def _get_secret_option(self, section: str, key: str) -> str | None: 

790 """Get Config option values from Secret Backend.""" 

791 fallback_key = key + "_secret" 

792 if (section, key) in self.sensitive_config_values: 

793 if super().has_option(section, fallback_key): 

794 secrets_path = super().get(section, fallback_key) 

795 return self._get_config_value_from_secret_backend(secrets_path) 

796 return None 

797 

798 def _get_environment_variables( 

799 self, 

800 deprecated_key: str | None, 

801 deprecated_section: str | None, 

802 key: str, 

803 section: str, 

804 issue_warning: bool = True, 

805 extra_stacklevel: int = 0, 

806 **kwargs, 

807 ) -> str | ValueNotFound: 

808 """Get config option from environment variables.""" 

809 team_name = kwargs.get("team_name", None) 

810 option = self._get_env_var_option(section, key, team_name=team_name) 

811 if option is not None: 

812 return option 

813 if deprecated_section and deprecated_key: 

814 with self.suppress_future_warnings(): 

815 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name) 

816 if option is not None: 

817 if issue_warning: 

818 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

819 return option 

820 return VALUE_NOT_FOUND_SENTINEL 

821 

822 def _get_option_from_config_file( 

823 self, 

824 deprecated_key: str | None, 

825 deprecated_section: str | None, 

826 key: str, 

827 section: str, 

828 issue_warning: bool = True, 

829 extra_stacklevel: int = 0, 

830 **kwargs, 

831 ) -> str | ValueNotFound: 

832 """Get config option from config file.""" 

833 if team_name := kwargs.get("team_name", None): 

834 section = f"{team_name}={section}" 

835 # since this is the last lookup that supports team_name, pop it 

836 kwargs.pop("team_name") 

837 if super().has_option(section, key): 

838 return expand_env_var(super().get(section, key, **kwargs)) 

839 if deprecated_section and deprecated_key: 

840 if super().has_option(deprecated_section, deprecated_key): 

841 if issue_warning: 

842 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

843 with self.suppress_future_warnings(): 

844 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs)) 

845 return VALUE_NOT_FOUND_SENTINEL 

846 

847 def _get_option_from_commands( 

848 self, 

849 deprecated_key: str | None, 

850 deprecated_section: str | None, 

851 key: str, 

852 section: str, 

853 issue_warning: bool = True, 

854 extra_stacklevel: int = 0, 

855 **kwargs, 

856 ) -> str | ValueNotFound: 

857 """Get config option from command execution.""" 

858 option = self._get_cmd_option(section, key) 

859 if option: 

860 return option 

861 if deprecated_section and deprecated_key: 

862 with self.suppress_future_warnings(): 

863 option = self._get_cmd_option(deprecated_section, deprecated_key) 

864 if option: 

865 if issue_warning: 

866 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

867 return option 

868 return VALUE_NOT_FOUND_SENTINEL 

869 

870 def _get_option_from_secrets( 

871 self, 

872 deprecated_key: str | None, 

873 deprecated_section: str | None, 

874 key: str, 

875 section: str, 

876 issue_warning: bool = True, 

877 extra_stacklevel: int = 0, 

878 **kwargs, 

879 ) -> str | ValueNotFound: 

880 """Get config option from secrets backend.""" 

881 option = self._get_secret_option(section, key) 

882 if option: 

883 return option 

884 if deprecated_section and deprecated_key: 

885 with self.suppress_future_warnings(): 

886 option = self._get_secret_option(deprecated_section, deprecated_key) 

887 if option: 

888 if issue_warning: 

889 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

890 return option 

891 return VALUE_NOT_FOUND_SENTINEL 

892 

893 def _get_option_from_defaults( 

894 self, 

895 deprecated_key: str | None, 

896 deprecated_section: str | None, 

897 key: str, 

898 section: str, 

899 issue_warning: bool = True, 

900 extra_stacklevel: int = 0, 

901 team_name: str | None = None, 

902 **kwargs, 

903 ) -> str | ValueNotFound: 

904 """Get config option from default values.""" 

905 if self.get_default_value(section, key) is not None or "fallback" in kwargs: 

906 return expand_env_var(self.get_default_value(section, key, **kwargs)) 

907 return VALUE_NOT_FOUND_SENTINEL 

908 

909 def _resolve_deprecated_lookup( 

910 self, 

911 section: str, 

912 key: str, 

913 lookup_from_deprecated: bool, 

914 extra_stacklevel: int = 0, 

915 ) -> tuple[str, str, str | None, str | None, bool]: 

916 """ 

917 Resolve deprecated section/key mappings and determine deprecated values. 

918 

919 :param section: Section name (will be lowercased) 

920 :param key: Key name (will be lowercased) 

921 :param lookup_from_deprecated: Whether to lookup from deprecated options 

922 :param extra_stacklevel: Extra stack level for warnings 

923 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted) 

924 """ 

925 section = section.lower() 

926 key = key.lower() 

927 warning_emitted = False 

928 deprecated_section: str | None = None 

929 deprecated_key: str | None = None 

930 

931 if not lookup_from_deprecated: 

932 return section, key, deprecated_section, deprecated_key, warning_emitted 

933 

934 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {}) 

935 if option_description.get("deprecated"): 

936 deprecation_reason = option_description.get("deprecation_reason", "") 

937 warnings.warn( 

938 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}", 

939 DeprecationWarning, 

940 stacklevel=2 + extra_stacklevel, 

941 ) 

942 # For the cases in which we rename whole sections 

943 if section in self.inversed_deprecated_sections: 

944 deprecated_section, deprecated_key = (section, key) 

945 section = self.inversed_deprecated_sections[section] 

946 if not self._suppress_future_warnings: 

947 warnings.warn( 

948 f"The config section [{deprecated_section}] has been renamed to " 

949 f"[{section}]. Please update your `conf.get*` call to use the new name", 

950 FutureWarning, 

951 stacklevel=2 + extra_stacklevel, 

952 ) 

953 # Don't warn about individual rename if the whole section is renamed 

954 warning_emitted = True 

955 elif (section, key) in self.inversed_deprecated_options: 

956 # Handle using deprecated section/key instead of the new section/key 

957 new_section, new_key = self.inversed_deprecated_options[(section, key)] 

958 if not self._suppress_future_warnings and not warning_emitted: 

959 warnings.warn( 

960 f"section/key [{section}/{key}] has been deprecated, you should use" 

961 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the " 

962 "new name", 

963 FutureWarning, 

964 stacklevel=2 + extra_stacklevel, 

965 ) 

966 warning_emitted = True 

967 deprecated_section, deprecated_key = section, key 

968 section, key = (new_section, new_key) 

969 elif section in self.deprecated_sections: 

970 # When accessing the new section name, make sure we check under the old config name 

971 deprecated_key = key 

972 deprecated_section = self.deprecated_sections[section][0] 

973 else: 

974 deprecated_section, deprecated_key, _ = self.deprecated_options.get( 

975 (section, key), (None, None, None) 

976 ) 

977 

978 return section, key, deprecated_section, deprecated_key, warning_emitted 

979 

980 @overload # type: ignore[override] 

981 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... 

982 

983 @overload # type: ignore[override] 

984 def get(self, section: str, key: str, **kwargs) -> str | None: ... 

985 

986 def get( # type: ignore[misc, override] 

987 self, 

988 section: str, 

989 key: str, 

990 suppress_warnings: bool = False, 

991 lookup_from_deprecated: bool = True, 

992 _extra_stacklevel: int = 0, 

993 team_name: str | None = None, 

994 **kwargs, 

995 ) -> str | None: 

996 """ 

997 Get config value by iterating through lookup sequence. 

998 

999 Priority order is defined by _lookup_sequence property. 

1000 """ 

1001 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup( 

1002 section=section, 

1003 key=key, 

1004 lookup_from_deprecated=lookup_from_deprecated, 

1005 extra_stacklevel=_extra_stacklevel, 

1006 ) 

1007 

1008 if team_name is not None: 

1009 kwargs["team_name"] = team_name 

1010 

1011 for lookup_method in self._lookup_sequence: 

1012 value = lookup_method( 

1013 deprecated_key=deprecated_key, 

1014 deprecated_section=deprecated_section, 

1015 key=key, 

1016 section=section, 

1017 issue_warning=not warning_emitted, 

1018 extra_stacklevel=_extra_stacklevel, 

1019 **kwargs, 

1020 ) 

1021 if value is not VALUE_NOT_FOUND_SENTINEL: 

1022 return value 

1023 

1024 # Check if fallback was explicitly provided (even if None) 

1025 if "fallback" in kwargs: 

1026 return kwargs["fallback"] 

1027 

1028 if not suppress_warnings: 

1029 log.warning("section/key [%s/%s] not found in config", section, key) 

1030 

1031 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config") 

1032 

1033 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override] 

1034 """Get config value as boolean.""" 

1035 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip() 

1036 if "#" in val: 

1037 val = val.split("#")[0].strip() 

1038 if val in ("t", "true", "1"): 

1039 return True 

1040 if val in ("f", "false", "0"): 

1041 return False 

1042 raise AirflowConfigException( 

1043 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. ' 

1044 f'Current value: "{val}".' 

1045 ) 

1046 

1047 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override] 

1048 """Get config value as integer.""" 

1049 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1050 if val is None: 

1051 raise AirflowConfigException( 

1052 f"Failed to convert value None to int. " 

1053 f'Please check "{key}" key in "{section}" section is set.' 

1054 ) 

1055 try: 

1056 return int(val) 

1057 except ValueError: 

1058 raise AirflowConfigException( 

1059 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1060 f'Current value: "{val}".' 

1061 ) 

1062 

1063 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override] 

1064 """Get config value as float.""" 

1065 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1066 if val is None: 

1067 raise AirflowConfigException( 

1068 f"Failed to convert value None to float. " 

1069 f'Please check "{key}" key in "{section}" section is set.' 

1070 ) 

1071 try: 

1072 return float(val) 

1073 except ValueError: 

1074 raise AirflowConfigException( 

1075 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. ' 

1076 f'Current value: "{val}".' 

1077 ) 

1078 

1079 def getlist(self, section: str, key: str, delimiter=",", **kwargs): 

1080 """Get config value as list.""" 

1081 val = self.get(section, key, **kwargs) 

1082 if val is None: 

1083 if "fallback" in kwargs: 

1084 return kwargs["fallback"] 

1085 raise AirflowConfigException( 

1086 f"Failed to convert value None to list. " 

1087 f'Please check "{key}" key in "{section}" section is set.' 

1088 ) 

1089 try: 

1090 return [item.strip() for item in val.split(delimiter)] 

1091 except Exception: 

1092 raise AirflowConfigException( 

1093 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. ' 

1094 f'Current value: "{val}".' 

1095 ) 

1096 

1097 E = TypeVar("E", bound=Enum) 

1098 

1099 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E: 

1100 """Get config value as enum.""" 

1101 val = self.get(section, key, **kwargs) 

1102 enum_names = [enum_item.name for enum_item in enum_class] 

1103 

1104 if val is None: 

1105 raise AirflowConfigException( 

1106 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1107 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}' 

1108 ) 

1109 

1110 try: 

1111 return enum_class[val] 

1112 except KeyError: 

1113 if "fallback" in kwargs and kwargs["fallback"] in enum_names: 

1114 return enum_class[kwargs["fallback"]] 

1115 raise AirflowConfigException( 

1116 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1117 f"the value must be one of {', '.join(enum_names)}" 

1118 ) 

1119 

1120 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]: 

1121 """Get config value as list of enums.""" 

1122 string_list = self.getlist(section, key, delimiter, **kwargs) 

1123 enum_names = [enum_item.name for enum_item in enum_class] 

1124 enum_list = [] 

1125 

1126 for val in string_list: 

1127 try: 

1128 enum_list.append(enum_class[val]) 

1129 except KeyError: 

1130 log.warning( 

1131 "Failed to convert value. Please check %s key in %s section. " 

1132 "it must be one of %s, if not the value is ignored", 

1133 key, 

1134 section, 

1135 ", ".join(enum_names), 

1136 ) 

1137 

1138 return enum_list 

1139 

1140 def getimport(self, section: str, key: str, **kwargs) -> Any: 

1141 """ 

1142 Read options, import the full qualified name, and return the object. 

1143 

1144 In case of failure, it throws an exception with the key and section names 

1145 

1146 :return: The object or None, if the option is empty 

1147 """ 

1148 # Fixed: use self.get() instead of conf.get() 

1149 full_qualified_path = self.get(section=section, key=key, **kwargs) 

1150 if not full_qualified_path: 

1151 return None 

1152 

1153 try: 

1154 # Import here to avoid circular dependency 

1155 from ..module_loading import import_string 

1156 

1157 return import_string(full_qualified_path) 

1158 except ImportError as e: 

1159 log.warning(e) 

1160 raise AirflowConfigException( 

1161 f'The object could not be loaded. Please check "{key}" key in "{section}" section. ' 

1162 f'Current value: "{full_qualified_path}".' 

1163 ) 

1164 

1165 def getjson( 

1166 self, section: str, key: str, fallback=None, **kwargs 

1167 ) -> dict | list | str | int | float | None: 

1168 """ 

1169 Return a config value parsed from a JSON string. 

1170 

1171 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given. 

1172 """ 

1173 try: 

1174 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs) 

1175 except (NoSectionError, NoOptionError): 

1176 data = None 

1177 

1178 if data is None or data == "": 

1179 return fallback 

1180 

1181 try: 

1182 return json.loads(data) 

1183 except JSONDecodeError as e: 

1184 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e 

1185 

1186 def gettimedelta( 

1187 self, section: str, key: str, fallback: Any = None, **kwargs 

1188 ) -> datetime.timedelta | None: 

1189 """ 

1190 Get the config value for the given section and key, and convert it into datetime.timedelta object. 

1191 

1192 If the key is missing, then it is considered as `None`. 

1193 

1194 :param section: the section from the config 

1195 :param key: the key defined in the given section 

1196 :param fallback: fallback value when no config value is given, defaults to None 

1197 :raises AirflowConfigException: raised because ValueError or OverflowError 

1198 :return: datetime.timedelta(seconds=<config_value>) or None 

1199 """ 

1200 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs) 

1201 

1202 if val: 

1203 # the given value must be convertible to integer 

1204 try: 

1205 int_val = int(val) 

1206 except ValueError: 

1207 raise AirflowConfigException( 

1208 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1209 f'Current value: "{val}".' 

1210 ) 

1211 

1212 try: 

1213 return datetime.timedelta(seconds=int_val) 

1214 except OverflowError as err: 

1215 raise AirflowConfigException( 

1216 f"Failed to convert value to timedelta in `seconds`. " 

1217 f"{err}. " 

1218 f'Please check "{key}" key in "{section}" section. Current value: "{val}".' 

1219 ) 

1220 

1221 return fallback 

1222 

1223 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: 

1224 """Get mandatory config value, raising ValueError if not found.""" 

1225 value = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1226 if value is None: 

1227 raise ValueError(f"The value {section}/{key} should be set!") 

1228 return value 

1229 

1230 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]: 

1231 """Get mandatory config value as list, raising ValueError if not found.""" 

1232 value = self.getlist(section, key, **kwargs) 

1233 if value is None: 

1234 raise ValueError(f"The value {section}/{key} should be set!") 

1235 return value 

1236 

1237 def read( 

1238 self, 

1239 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike], 

1240 encoding: str | None = None, 

1241 ) -> list[str]: 

1242 return super().read(filenames=filenames, encoding=encoding) 

1243 

1244 def read_dict( # type: ignore[override] 

1245 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>" 

1246 ) -> None: 

1247 """ 

1248 We define a different signature here to add better type hints and checking. 

1249 

1250 :param dictionary: dictionary to read from 

1251 :param source: source to be used to store the configuration 

1252 :return: 

1253 """ 

1254 super().read_dict(dictionary=dictionary, source=source) 

1255 

1256 def get_sections_including_defaults(self) -> list[str]: 

1257 """ 

1258 Retrieve all sections from the configuration parser, including sections defined by built-in defaults. 

1259 

1260 :return: list of section names 

1261 """ 

1262 sections_from_config = self.sections() 

1263 sections_from_description = list(self.configuration_description.keys()) 

1264 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config))) 

1265 

1266 def get_options_including_defaults(self, section: str) -> list[str]: 

1267 """ 

1268 Retrieve all possible options from the configuration parser for the section given. 

1269 

1270 Includes options defined by built-in defaults. 

1271 

1272 :param section: section name 

1273 :return: list of option names for the section given 

1274 """ 

1275 my_own_options = self.options(section) if self.has_section(section) else [] 

1276 all_options_from_defaults = list( 

1277 self.configuration_description.get(section, {}).get("options", {}).keys() 

1278 ) 

1279 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options))) 

1280 

1281 def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True, **kwargs) -> bool: 

1282 """ 

1283 Check if option is defined. 

1284 

1285 Uses self.get() to avoid reimplementing the priority order of config variables 

1286 (env, config, cmd, defaults). 

1287 

1288 :param section: section to get option from 

1289 :param option: option to get 

1290 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections 

1291 :param kwargs: additional keyword arguments to pass to get(), such as team_name 

1292 :return: 

1293 """ 

1294 try: 

1295 value = self.get( 

1296 section, 

1297 option, 

1298 fallback=None, 

1299 _extra_stacklevel=1, 

1300 suppress_warnings=True, 

1301 lookup_from_deprecated=lookup_from_deprecated, 

1302 **kwargs, 

1303 ) 

1304 if value is None: 

1305 return False 

1306 return True 

1307 except (NoOptionError, NoSectionError, AirflowConfigException): 

1308 return False 

1309 

1310 def set(self, section: str, option: str, value: str | None = None) -> None: 

1311 """ 

1312 Set an option to the given value. 

1313 

1314 This override just makes sure the section and option are lower case, to match what we do in `get`. 

1315 """ 

1316 section = section.lower() 

1317 option = option.lower() 

1318 defaults = self.configuration_description or {} 

1319 if not self.has_section(section) and section in defaults: 

1320 # Trying to set a key in a section that exists in default, but not in the user config; 

1321 # automatically create it 

1322 self.add_section(section) 

1323 super().set(section, option, value) 

1324 

1325 def remove_option(self, section: str, option: str, remove_default: bool = True): 

1326 """ 

1327 Remove an option if it exists in config from a file or default config. 

1328 

1329 If both of config have the same option, this removes the option 

1330 in both configs unless remove_default=False. 

1331 """ 

1332 section = section.lower() 

1333 option = option.lower() 

1334 if super().has_option(section, option): 

1335 super().remove_option(section, option) 

1336 

1337 if self.get_default_value(section, option) is not None and remove_default: 

1338 self._default_values.remove_option(section, option) 

1339 

1340 def optionxform(self, optionstr: str) -> str: 

1341 """ 

1342 Transform option names on every read, get, or set operation. 

1343 

1344 This changes from the default behaviour of ConfigParser from lower-casing 

1345 to instead be case-preserving. 

1346 

1347 :param optionstr: 

1348 :return: 

1349 """ 

1350 return optionstr 

1351 

1352 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: 

1353 """ 

1354 Get list of config sources to use in as_dict(). 

1355 

1356 Subclasses can override to add additional sources (e.g., provider configs). 

1357 """ 

1358 return [ 

1359 ("default", self._default_values), 

1360 ("airflow.cfg", self), 

1361 ] 

1362 

1363 def as_dict( 

1364 self, 

1365 display_source: bool = False, 

1366 display_sensitive: bool = False, 

1367 raw: bool = False, 

1368 include_env: bool = True, 

1369 include_cmds: bool = True, 

1370 include_secret: bool = True, 

1371 ) -> ConfigSourcesType: 

1372 """ 

1373 Return the current configuration as an OrderedDict of OrderedDicts. 

1374 

1375 When materializing current configuration Airflow defaults are 

1376 materialized along with user set configs. If any of the `include_*` 

1377 options are False then the result of calling command or secret key 

1378 configs do not override Airflow defaults and instead are passed through. 

1379 In order to then avoid Airflow defaults from overwriting user set 

1380 command or secret key configs we filter out bare sensitive_config_values 

1381 that are set to Airflow defaults when command or secret key configs 

1382 produce different values. 

1383 

1384 :param display_source: If False, the option value is returned. If True, 

1385 a tuple of (option_value, source) is returned. Source is either 

1386 'airflow.cfg', 'default', 'env var', or 'cmd'. 

1387 :param display_sensitive: If True, the values of options set by env 

1388 vars and bash commands will be displayed. If False, those options 

1389 are shown as '< hidden >' 

1390 :param raw: Should the values be output as interpolated values, or the 

1391 "raw" form that can be fed back in to ConfigParser 

1392 :param include_env: Should the value of configuration from AIRFLOW__ 

1393 environment variables be included or not 

1394 :param include_cmds: Should the result of calling any ``*_cmd`` config be 

1395 set (True, default), or should the _cmd options be left as the 

1396 command to run (False) 

1397 :param include_secret: Should the result of calling any ``*_secret`` config be 

1398 set (True, default), or should the _secret options be left as the 

1399 path to get the secret from (False) 

1400 :return: Dictionary, where the key is the name of the section and the content is 

1401 the dictionary with the name of the parameter and its value. 

1402 """ 

1403 if not display_sensitive: 

1404 # We want to hide the sensitive values at the appropriate methods 

1405 # since envs from cmds, secrets can be read at _include_envs method 

1406 if not all([include_env, include_cmds, include_secret]): 

1407 raise ValueError( 

1408 "If display_sensitive is false, then include_env, " 

1409 "include_cmds, include_secret must all be set as True" 

1410 ) 

1411 

1412 config_sources: ConfigSourcesType = {} 

1413 

1414 # We check sequentially all those sources and the last one we saw it in will "win" 

1415 configs = self._get_config_sources_for_as_dict() 

1416 

1417 self._replace_config_with_display_sources( 

1418 config_sources, 

1419 configs, 

1420 self.configuration_description, 

1421 display_source, 

1422 raw, 

1423 self.deprecated_options, 

1424 include_cmds=include_cmds, 

1425 include_env=include_env, 

1426 include_secret=include_secret, 

1427 ) 

1428 

1429 # add env vars and overwrite because they have priority 

1430 if include_env: 

1431 self._include_envs(config_sources, display_sensitive, display_source, raw) 

1432 else: 

1433 self._filter_by_source(config_sources, display_source, self._get_env_var_option) 

1434 

1435 # add bash commands 

1436 if include_cmds: 

1437 self._include_commands(config_sources, display_sensitive, display_source, raw) 

1438 else: 

1439 self._filter_by_source(config_sources, display_source, self._get_cmd_option) 

1440 

1441 # add config from secret backends 

1442 if include_secret: 

1443 self._include_secrets(config_sources, display_sensitive, display_source, raw) 

1444 else: 

1445 self._filter_by_source(config_sources, display_source, self._get_secret_option) 

1446 

1447 if not display_sensitive: 

1448 # This ensures the ones from config file is hidden too 

1449 # if they are not provided through env, cmd and secret 

1450 hidden = "< hidden >" 

1451 for section, key in self.sensitive_config_values: 

1452 if config_sources.get(section): 

1453 if config_sources[section].get(key, None): 

1454 if display_source: 

1455 source = config_sources[section][key][1] 

1456 config_sources[section][key] = (hidden, source) 

1457 else: 

1458 config_sources[section][key] = hidden 

1459 

1460 return config_sources 

1461 

1462 def _write_option_header( 

1463 self, 

1464 file: IO[str], 

1465 option: str, 

1466 extra_spacing: bool, 

1467 include_descriptions: bool, 

1468 include_env_vars: bool, 

1469 include_examples: bool, 

1470 include_sources: bool, 

1471 section_config_description: dict[str, dict[str, Any]], 

1472 section_to_write: str, 

1473 sources_dict: ConfigSourcesType, 

1474 ) -> tuple[bool, bool]: 

1475 """ 

1476 Write header for configuration option. 

1477 

1478 Returns tuple of (should_continue, needs_separation) where needs_separation should be 

1479 set if the option needs additional separation to visually separate it from the next option. 

1480 """ 

1481 option_config_description = ( 

1482 section_config_description.get("options", {}).get(option, {}) 

1483 if section_config_description 

1484 else {} 

1485 ) 

1486 description = option_config_description.get("description") 

1487 needs_separation = False 

1488 if description and include_descriptions: 

1489 for line in description.splitlines(): 

1490 file.write(f"# {line}\n") 

1491 needs_separation = True 

1492 example = option_config_description.get("example") 

1493 if example is not None and include_examples: 

1494 if extra_spacing: 

1495 file.write("#\n") 

1496 example_lines = example.splitlines() 

1497 example = "\n# ".join(example_lines) 

1498 file.write(f"# Example: {option} = {example}\n") 

1499 needs_separation = True 

1500 if include_sources and sources_dict: 

1501 sources_section = sources_dict.get(section_to_write) 

1502 value_with_source = sources_section.get(option) if sources_section else None 

1503 if value_with_source is None: 

1504 file.write("#\n# Source: not defined\n") 

1505 else: 

1506 file.write(f"#\n# Source: {value_with_source[1]}\n") 

1507 needs_separation = True 

1508 if include_env_vars: 

1509 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n") 

1510 if extra_spacing: 

1511 file.write("#\n") 

1512 needs_separation = True 

1513 return True, needs_separation 

1514 

1515 def is_template(self, section: str, key) -> bool: 

1516 """ 

1517 Return whether the value is templated. 

1518 

1519 :param section: section of the config 

1520 :param key: key in the section 

1521 :return: True if the value is templated 

1522 """ 

1523 return _is_template(self.configuration_description, section, key) 

1524 

1525 def getsection(self, section: str, team_name: str | None = None) -> ConfigOptionsDictType | None: 

1526 """ 

1527 Return the section as a dict. 

1528 

1529 Values are converted to int, float, bool as required. 

1530 

1531 :param section: section from the config 

1532 :param team_name: optional team name for team-specific configuration lookup 

1533 """ 

1534 # Handle team-specific section lookup for config file 

1535 config_section = f"{team_name}={section}" if team_name else section 

1536 

1537 if not self.has_section(config_section) and not self._default_values.has_section(config_section): 

1538 return None 

1539 if self._default_values.has_section(config_section): 

1540 _section: ConfigOptionsDictType = dict(self._default_values.items(config_section)) 

1541 else: 

1542 _section = {} 

1543 

1544 if self.has_section(config_section): 

1545 _section.update(self.items(config_section)) 

1546 

1547 # Use section (not config_section) for env var lookup - team_name is handled by _env_var_name 

1548 section_prefix = self._env_var_name(section, "", team_name=team_name) 

1549 for env_var in sorted(os.environ.keys()): 

1550 if env_var.startswith(section_prefix): 

1551 key = env_var.replace(section_prefix, "") 

1552 if key.endswith("_CMD"): 

1553 key = key[:-4] 

1554 key = key.lower() 

1555 _section[key] = self._get_env_var_option(section, key, team_name=team_name) 

1556 

1557 for key, val in _section.items(): 

1558 if val is None: 

1559 raise AirflowConfigException( 

1560 f"Failed to convert value automatically. " 

1561 f'Please check "{key}" key in "{section}" section is set.' 

1562 ) 

1563 try: 

1564 _section[key] = int(val) 

1565 except ValueError: 

1566 try: 

1567 _section[key] = float(val) 

1568 except ValueError: 

1569 if isinstance(val, str) and val.lower() in ("t", "true"): 

1570 _section[key] = True 

1571 elif isinstance(val, str) and val.lower() in ("f", "false"): 

1572 _section[key] = False 

1573 return _section 

1574 

1575 @staticmethod 

1576 def _write_section_header( 

1577 file: IO[str], 

1578 include_descriptions: bool, 

1579 section_config_description: dict[str, str], 

1580 section_to_write: str, 

1581 ) -> None: 

1582 """Write header for configuration section.""" 

1583 file.write(f"[{section_to_write}]\n") 

1584 section_description = section_config_description.get("description") 

1585 if section_description and include_descriptions: 

1586 for line in section_description.splitlines(): 

1587 file.write(f"# {line}\n") 

1588 file.write("\n") 

1589 

1590 def _write_value( 

1591 self, 

1592 file: IO[str], 

1593 option: str, 

1594 comment_out_everything: bool, 

1595 needs_separation: bool, 

1596 only_defaults: bool, 

1597 section_to_write: str, 

1598 ): 

1599 default_value = self.get_default_value(section_to_write, option, raw=True) 

1600 if only_defaults: 

1601 value = default_value 

1602 else: 

1603 value = self.get(section_to_write, option, fallback=default_value, raw=True) 

1604 if value is None: 

1605 file.write(f"# {option} = \n") 

1606 else: 

1607 if comment_out_everything: 

1608 value_lines = value.splitlines() 

1609 value = "\n# ".join(value_lines) 

1610 file.write(f"# {option} = {value}\n") 

1611 else: 

1612 if "\n" in value: 

1613 try: 

1614 value = json.dumps(json.loads(value), indent=4) 

1615 value = value.replace( 

1616 "\n", "\n " 

1617 ) # indent multi-line JSON to satisfy configparser format 

1618 except JSONDecodeError: 

1619 pass 

1620 file.write(f"{option} = {value}\n") 

1621 if needs_separation: 

1622 file.write("\n") 

1623 

1624 def write( # type: ignore[override] 

1625 self, 

1626 file: IO[str], 

1627 section: str | None = None, 

1628 include_examples: bool = True, 

1629 include_descriptions: bool = True, 

1630 include_sources: bool = True, 

1631 include_env_vars: bool = True, 

1632 include_providers: bool = True, 

1633 comment_out_everything: bool = False, 

1634 hide_sensitive_values: bool = False, 

1635 extra_spacing: bool = True, 

1636 only_defaults: bool = False, 

1637 **kwargs: Any, 

1638 ) -> None: 

1639 """ 

1640 Write configuration with comments and examples to a file. 

1641 

1642 :param file: file to write to 

1643 :param section: section of the config to write, defaults to all sections 

1644 :param include_examples: Include examples in the output 

1645 :param include_descriptions: Include descriptions in the output 

1646 :param include_sources: Include the source of each config option 

1647 :param include_env_vars: Include environment variables corresponding to each config option 

1648 :param include_providers: Include providers configuration 

1649 :param comment_out_everything: Comment out all values 

1650 :param hide_sensitive_values: Include sensitive values in the output 

1651 :param extra_spacing: Add extra spacing before examples and after variables 

1652 :param only_defaults: Only include default values when writing the config, not the actual values 

1653 """ 

1654 sources_dict = {} 

1655 if include_sources: 

1656 sources_dict = self.as_dict(display_source=True) 

1657 with self.make_sure_configuration_loaded(with_providers=include_providers): 

1658 for section_to_write in self.get_sections_including_defaults(): 

1659 section_config_description = self.configuration_description.get(section_to_write, {}) 

1660 if section_to_write != section and section is not None: 

1661 continue 

1662 if self._default_values.has_section(section_to_write) or self.has_section(section_to_write): 

1663 self._write_section_header( 

1664 file, include_descriptions, section_config_description, section_to_write 

1665 ) 

1666 for option in self.get_options_including_defaults(section_to_write): 

1667 should_continue, needs_separation = self._write_option_header( 

1668 file=file, 

1669 option=option, 

1670 extra_spacing=extra_spacing, 

1671 include_descriptions=include_descriptions, 

1672 include_env_vars=include_env_vars, 

1673 include_examples=include_examples, 

1674 include_sources=include_sources, 

1675 section_config_description=section_config_description, 

1676 section_to_write=section_to_write, 

1677 sources_dict=sources_dict, 

1678 ) 

1679 self._write_value( 

1680 file=file, 

1681 option=option, 

1682 comment_out_everything=comment_out_everything, 

1683 needs_separation=needs_separation, 

1684 only_defaults=only_defaults, 

1685 section_to_write=section_to_write, 

1686 ) 

1687 if include_descriptions and not needs_separation: 

1688 # extra separation between sections in case last option did not need it 

1689 file.write("\n") 

1690 

1691 @contextmanager 

1692 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]: 

1693 """ 

1694 Make sure configuration is loaded with or without providers. 

1695 

1696 This happens regardless if the provider configuration has been loaded before or not. 

1697 Restores configuration to the state before entering the context. 

1698 

1699 :param with_providers: whether providers should be loaded 

1700 """ 

1701 needs_reload = False 

1702 if with_providers: 

1703 self._ensure_providers_config_loaded() 

1704 else: 

1705 needs_reload = self._ensure_providers_config_unloaded() 

1706 yield 

1707 if needs_reload: 

1708 self._reload_provider_configs() 

1709 

1710 def _ensure_providers_config_loaded(self) -> None: 

1711 """Ensure providers configurations are loaded.""" 

1712 raise NotImplementedError("Subclasses must implement _ensure_providers_config_loaded method") 

1713 

1714 def _ensure_providers_config_unloaded(self) -> bool: 

1715 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded.""" 

1716 raise NotImplementedError("Subclasses must implement _ensure_providers_config_unloaded method") 

1717 

1718 def _reload_provider_configs(self) -> None: 

1719 """Reload providers configuration.""" 

1720 raise NotImplementedError("Subclasses must implement _reload_provider_configs method")