Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/_shared/configuration/parser.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

738 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Base configuration parser with pure parsing logic.""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import datetime 

24import functools 

25import itertools 

26import json 

27import logging 

28import os 

29import shlex 

30import subprocess 

31import sys 

32import warnings 

33from collections.abc import Callable, Generator, Iterable 

34from configparser import ConfigParser, NoOptionError, NoSectionError 

35from contextlib import contextmanager 

36from enum import Enum 

37from json.decoder import JSONDecodeError 

38from re import Pattern 

39from typing import IO, Any, TypeVar, overload 

40 

41from .exceptions import AirflowConfigException 

42 

43log = logging.getLogger(__name__) 

44 

45 

46ConfigType = str | int | float | bool 

47ConfigOptionsDictType = dict[str, ConfigType] 

48ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

49ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

50ENV_VAR_PREFIX = "AIRFLOW__" 

51 

52 

53class ValueNotFound: 

54 """Object of this is raised when a configuration value cannot be found.""" 

55 

56 pass 

57 

58 

59VALUE_NOT_FOUND_SENTINEL = ValueNotFound() 

60 

61 

62@overload 

63def expand_env_var(env_var: None) -> None: ... 

64@overload 

65def expand_env_var(env_var: str) -> str: ... 

66 

67 

68def expand_env_var(env_var: str | None) -> str | None: 

69 """ 

70 Expand (potentially nested) env vars. 

71 

72 Repeat and apply `expandvars` and `expanduser` until 

73 interpolation stops having any effect. 

74 """ 

75 if not env_var or not isinstance(env_var, str): 

76 return env_var 

77 while True: 

78 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

79 if interpolated == env_var: 

80 return interpolated 

81 env_var = interpolated 

82 

83 

84def run_command(command: str) -> str: 

85 """Run command and returns stdout.""" 

86 process = subprocess.Popen( 

87 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

88 ) 

89 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

90 

91 if process.returncode != 0: 

92 raise AirflowConfigException( 

93 f"Cannot execute {command}. Error code is: {process.returncode}. " 

94 f"Output: {output}, Stderr: {stderr}" 

95 ) 

96 

97 return output 

98 

99 

100def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool: 

101 """ 

102 Check if the config is a template. 

103 

104 :param configuration_description: description of configuration 

105 :param section: section 

106 :param key: key 

107 :return: True if the config is a template 

108 """ 

109 return configuration_description.get(section, {}).get(key, {}).get("is_template", False) 

110 

111 

112class AirflowConfigParser(ConfigParser): 

113 """ 

114 Base configuration parser with pure parsing logic. 

115 

116 This class provides the core parsing methods that work with: 

117 - configuration_description: dict describing config options (required in __init__) 

118 - _default_values: ConfigParser with default values (required in __init__) 

119 - deprecated_options: class attribute mapping new -> old options 

120 - deprecated_sections: class attribute mapping new -> old sections 

121 """ 

122 

123 # A mapping of section -> setting -> { old, replace } for deprecated default values. 

124 # Subclasses can override this to define deprecated values that should be upgraded. 

125 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {} 

126 

127 # A mapping of (new section, new option) -> (old section, old option, since_version). 

128 # When reading new option, the old option will be checked to see if it exists. If it does a 

129 # DeprecationWarning will be issued and the old option will be used instead 

130 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { 

131 ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"), 

132 ("api", "base_url"): ("webserver", "base_url", "3.0"), 

133 ("api", "host"): ("webserver", "web_server_host", "3.0"), 

134 ("api", "port"): ("webserver", "web_server_port", "3.0"), 

135 ("api", "workers"): ("webserver", "workers", "3.0"), 

136 ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"), 

137 ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"), 

138 ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"), 

139 ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"), 

140 ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"), 

141 ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"), 

142 ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"), 

143 ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"), 

144 ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"), 

145 ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"), 

146 ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"), 

147 ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"), 

148 ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"), 

149 ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"), 

150 ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"), 

151 ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"), 

152 ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"), 

153 ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"), 

154 ("api", "page_size"): ("webserver", "page_size", "3.1.0"), 

155 ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"), 

156 ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"), 

157 ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"), 

158 ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"), 

159 ("api", "log_config"): ("api", "access_logfile", "3.1.0"), 

160 ("scheduler", "ti_metrics_interval"): ("scheduler", "running_metrics_interval", "3.2.0"), 

161 } 

162 

163 # A mapping of new section -> (old section, since_version). 

164 deprecated_sections: dict[str, tuple[str, str]] = {} 

165 

166 @property 

167 def _lookup_sequence(self) -> list[Callable]: 

168 """ 

169 Define the sequence of lookup methods for get(). The definition here does not have provider lookup. 

170 

171 Subclasses can override this to customise lookup order. 

172 """ 

173 return [ 

174 self._get_environment_variables, 

175 self._get_option_from_config_file, 

176 self._get_option_from_commands, 

177 self._get_option_from_secrets, 

178 self._get_option_from_defaults, 

179 ] 

180 

181 @property 

182 def _validators(self) -> list[Callable[[], None]]: 

183 """ 

184 Return list of validators defined on a config parser class. Base class will return an empty list. 

185 

186 Subclasses can override this to customize the validators that are run during validation on the 

187 config parser instance. 

188 """ 

189 return [] 

190 

191 def validate(self) -> None: 

192 """Run all registered validators.""" 

193 for validator in self._validators: 

194 validator() 

195 self.is_validated = True 

196 

197 def _validate_deprecated_values(self) -> None: 

198 """Validate and upgrade deprecated default values.""" 

199 for section, replacement in self.deprecated_values.items(): 

200 for name, info in replacement.items(): 

201 old, new = info 

202 current_value = self.get(section, name, fallback="") 

203 if self._using_old_value(old, current_value): 

204 self.upgraded_values[(section, name)] = current_value 

205 new_value = old.sub(new, current_value) 

206 self._update_env_var(section=section, name=name, new_value=new_value) 

207 self._create_future_warning( 

208 name=name, 

209 section=section, 

210 current_value=current_value, 

211 new_value=new_value, 

212 ) 

213 

214 def _using_old_value(self, old: Pattern, current_value: str) -> bool: 

215 """Check if current_value matches the old pattern.""" 

216 return old.search(current_value) is not None 

217 

218 def _update_env_var(self, section: str, name: str, new_value: str) -> None: 

219 """Update environment variable with new value.""" 

220 env_var = self._env_var_name(section, name) 

221 # Set it as an env var so that any subprocesses keep the same override! 

222 os.environ[env_var] = new_value 

223 

224 @staticmethod 

225 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any) -> None: 

226 """Create a FutureWarning for deprecated default values.""" 

227 warnings.warn( 

228 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. " 

229 f"This value has been changed to {new_value!r} in the running config, but please update your config.", 

230 FutureWarning, 

231 stacklevel=3, 

232 ) 

233 

234 def __init__( 

235 self, 

236 configuration_description: dict[str, dict[str, Any]], 

237 _default_values: ConfigParser, 

238 *args, 

239 **kwargs, 

240 ): 

241 """ 

242 Initialize the parser. 

243 

244 :param configuration_description: Description of configuration options 

245 :param _default_values: ConfigParser with default values 

246 """ 

247 super().__init__(*args, **kwargs) 

248 self.configuration_description = configuration_description 

249 self._default_values = _default_values 

250 self._suppress_future_warnings = False 

251 self.upgraded_values: dict[tuple[str, str], str] = {} 

252 

253 @functools.cached_property 

254 def inversed_deprecated_options(self): 

255 """Build inverse mapping from old options to new options.""" 

256 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()} 

257 

258 @functools.cached_property 

259 def inversed_deprecated_sections(self): 

260 """Build inverse mapping from old sections to new sections.""" 

261 return { 

262 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items() 

263 } 

264 

265 @functools.cached_property 

266 def sensitive_config_values(self) -> set[tuple[str, str]]: 

267 """Get set of sensitive config values that should be masked.""" 

268 flattened = { 

269 (s, k): item 

270 for s, s_c in self.configuration_description.items() 

271 for k, item in s_c.get("options", {}).items() 

272 } 

273 sensitive = { 

274 (section.lower(), key.lower()) 

275 for (section, key), v in flattened.items() 

276 if v.get("sensitive") is True 

277 } 

278 depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options} 

279 depr_section = { 

280 (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections 

281 } 

282 sensitive.update(depr_section, depr_option) 

283 return sensitive 

284 

285 def _update_defaults_from_string(self, config_string: str) -> None: 

286 """ 

287 Update the defaults in _default_values based on values in config_string ("ini" format). 

288 

289 Override shared parser's method to add validation for template variables. 

290 Note that those values are not validated and cannot contain variables because we are using 

291 regular config parser to load them. This method is used to test the config parser in unit tests. 

292 

293 :param config_string: ini-formatted config string 

294 """ 

295 parser = ConfigParser() 

296 parser.read_string(config_string) 

297 for section in parser.sections(): 

298 if section not in self._default_values.sections(): 

299 self._default_values.add_section(section) 

300 errors = False 

301 for key, value in parser.items(section): 

302 if not self.is_template(section, key) and "{" in value: 

303 errors = True 

304 log.error( 

305 "The %s.%s value %s read from string contains variable. This is not supported", 

306 section, 

307 key, 

308 value, 

309 ) 

310 self._default_values.set(section, key, value) 

311 if errors: 

312 raise AirflowConfigException( 

313 f"The string config passed as default contains variables. " 

314 f"This is not supported. String config: {config_string}" 

315 ) 

316 

317 def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any: 

318 """ 

319 Retrieve default value from default config parser. 

320 

321 This will retrieve the default value from the default config parser. Optionally a raw, stored 

322 value can be retrieved by setting skip_interpolation to True. This is useful for example when 

323 we want to write the default value to a file, and we don't want the interpolation to happen 

324 as it is going to be done later when the config is read. 

325 

326 :param section: section of the config 

327 :param key: key to use 

328 :param fallback: fallback value to use 

329 :param raw: if raw, then interpolation will be reversed 

330 :param kwargs: other args 

331 :return: 

332 """ 

333 value = self._default_values.get(section, key, fallback=fallback, **kwargs) 

334 if raw and value is not None: 

335 return value.replace("%", "%%") 

336 return value 

337 

338 def _get_custom_secret_backend(self, worker_mode: bool = False) -> Any | None: 

339 """ 

340 Get Secret Backend if defined in airflow.cfg. 

341 

342 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

343 """ 

344 section = "workers" if worker_mode else "secrets" 

345 key = "secrets_backend" if worker_mode else "backend" 

346 kwargs_key = "secrets_backend_kwargs" if worker_mode else "backend_kwargs" 

347 

348 secrets_backend_cls = self.getimport(section=section, key=key) 

349 

350 if not secrets_backend_cls: 

351 if worker_mode: 

352 # if we find no secrets backend for worker, return that of secrets backend 

353 secrets_backend_cls = self.getimport(section="secrets", key="backend") 

354 if not secrets_backend_cls: 

355 return None 

356 # When falling back to secrets backend, use its kwargs 

357 kwargs_key = "backend_kwargs" 

358 section = "secrets" 

359 else: 

360 return None 

361 

362 try: 

363 backend_kwargs = self.getjson(section=section, key=kwargs_key) 

364 if not backend_kwargs: 

365 backend_kwargs = {} 

366 elif not isinstance(backend_kwargs, dict): 

367 raise ValueError("not a dict") 

368 except AirflowConfigException: 

369 log.warning("Failed to parse [%s] %s as JSON, defaulting to no kwargs.", section, kwargs_key) 

370 backend_kwargs = {} 

371 except ValueError: 

372 log.warning("Failed to parse [%s] %s into a dict, defaulting to no kwargs.", section, kwargs_key) 

373 backend_kwargs = {} 

374 

375 return secrets_backend_cls(**backend_kwargs) 

376 

377 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

378 """ 

379 Get Config option values from Secret Backend. 

380 

381 Called by the shared parser's _get_secret_option() method as part of the lookup chain. 

382 Uses _get_custom_secret_backend() to get the backend instance. 

383 

384 :param config_key: the config key to retrieve 

385 :return: config value or None 

386 """ 

387 try: 

388 secrets_client = self._get_custom_secret_backend() 

389 if not secrets_client: 

390 return None 

391 return secrets_client.get_config(config_key) 

392 except Exception as e: 

393 raise AirflowConfigException( 

394 "Cannot retrieve config from alternative secrets backend. " 

395 "Make sure it is configured properly and that the Backend " 

396 "is accessible.\n" 

397 f"{e}" 

398 ) 

399 

400 def _get_cmd_option_from_config_sources( 

401 self, config_sources: ConfigSourcesType, section: str, key: str 

402 ) -> str | None: 

403 fallback_key = key + "_cmd" 

404 if (section, key) in self.sensitive_config_values: 

405 section_dict = config_sources.get(section) 

406 if section_dict is not None: 

407 command_value = section_dict.get(fallback_key) 

408 if command_value is not None: 

409 if isinstance(command_value, str): 

410 command = command_value 

411 else: 

412 command = command_value[0] 

413 return run_command(command) 

414 return None 

415 

416 def _get_secret_option_from_config_sources( 

417 self, config_sources: ConfigSourcesType, section: str, key: str 

418 ) -> str | None: 

419 fallback_key = key + "_secret" 

420 if (section, key) in self.sensitive_config_values: 

421 section_dict = config_sources.get(section) 

422 if section_dict is not None: 

423 secrets_path_value = section_dict.get(fallback_key) 

424 if secrets_path_value is not None: 

425 if isinstance(secrets_path_value, str): 

426 secrets_path = secrets_path_value 

427 else: 

428 secrets_path = secrets_path_value[0] 

429 return self._get_config_value_from_secret_backend(secrets_path) 

430 return None 

431 

432 def _include_secrets( 

433 self, 

434 config_sources: ConfigSourcesType, 

435 display_sensitive: bool, 

436 display_source: bool, 

437 raw: bool, 

438 ): 

439 for section, key in self.sensitive_config_values: 

440 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key) 

441 if value: 

442 if not display_sensitive: 

443 value = "< hidden >" 

444 if display_source: 

445 opt: str | tuple[str, str] = (value, "secret") 

446 elif raw: 

447 opt = value.replace("%", "%%") 

448 else: 

449 opt = value 

450 config_sources.setdefault(section, {}).update({key: opt}) 

451 del config_sources[section][key + "_secret"] 

452 

453 def _include_commands( 

454 self, 

455 config_sources: ConfigSourcesType, 

456 display_sensitive: bool, 

457 display_source: bool, 

458 raw: bool, 

459 ): 

460 for section, key in self.sensitive_config_values: 

461 opt = self._get_cmd_option_from_config_sources(config_sources, section, key) 

462 if not opt: 

463 continue 

464 opt_to_set: str | tuple[str, str] | None = opt 

465 if not display_sensitive: 

466 opt_to_set = "< hidden >" 

467 if display_source: 

468 opt_to_set = (str(opt_to_set), "cmd") 

469 elif raw: 

470 opt_to_set = str(opt_to_set).replace("%", "%%") 

471 if opt_to_set is not None: 

472 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set} 

473 config_sources.setdefault(section, {}).update(dict_to_update) 

474 del config_sources[section][key + "_cmd"] 

475 

476 def _include_envs( 

477 self, 

478 config_sources: ConfigSourcesType, 

479 display_sensitive: bool, 

480 display_source: bool, 

481 raw: bool, 

482 ): 

483 for env_var in [ 

484 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX) 

485 ]: 

486 try: 

487 _, section, key = env_var.split("__", 2) 

488 opt = self._get_env_var_option(section, key) 

489 except ValueError: 

490 continue 

491 if opt is None: 

492 log.warning("Ignoring unknown env var '%s'", env_var) 

493 continue 

494 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"): 

495 # Don't hide cmd/secret values here 

496 if not env_var.lower().endswith(("cmd", "secret")): 

497 if (section, key) in self.sensitive_config_values: 

498 opt = "< hidden >" 

499 elif raw: 

500 opt = opt.replace("%", "%%") 

501 if display_source: 

502 opt = (opt, "env var") 

503 

504 section = section.lower() 

505 key = key.lower() 

506 config_sources.setdefault(section, {}).update({key: opt}) 

507 

508 def _filter_by_source( 

509 self, 

510 config_sources: ConfigSourcesType, 

511 display_source: bool, 

512 getter_func, 

513 ): 

514 """ 

515 Delete default configs from current configuration. 

516 

517 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values. 

518 

519 This is necessary because bare configs take precedence over the command 

520 or secret key equivalents so if the current running config is 

521 materialized with Airflow defaults they in turn override user set 

522 command or secret key configs. 

523 

524 :param config_sources: The current configuration to operate on 

525 :param display_source: If False, configuration options contain raw 

526 values. If True, options are a tuple of (option_value, source). 

527 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'. 

528 :param getter_func: A callback function that gets the user configured 

529 override value for a particular sensitive_config_values config. 

530 :return: None, the given config_sources is filtered if necessary, 

531 otherwise untouched. 

532 """ 

533 for section, key in self.sensitive_config_values: 

534 # Don't bother if we don't have section / key 

535 if section not in config_sources or key not in config_sources[section]: 

536 continue 

537 # Check that there is something to override defaults 

538 try: 

539 getter_opt = getter_func(section, key) 

540 except ValueError: 

541 continue 

542 if not getter_opt: 

543 continue 

544 # Check to see that there is a default value 

545 if self.get_default_value(section, key) is None: 

546 continue 

547 # Check to see if bare setting is the same as defaults 

548 if display_source: 

549 # when display_source = true, we know that the config_sources contains tuple 

550 opt, source = config_sources[section][key] # type: ignore 

551 else: 

552 opt = config_sources[section][key] 

553 if opt == self.get_default_value(section, key): 

554 del config_sources[section][key] 

555 

556 @staticmethod 

557 def _deprecated_value_is_set_in_config( 

558 deprecated_section: str, 

559 deprecated_key: str, 

560 configs: Iterable[tuple[str, ConfigParser]], 

561 ) -> bool: 

562 for config_type, config in configs: 

563 if config_type != "default": 

564 with contextlib.suppress(NoSectionError): 

565 deprecated_section_array = config.items(section=deprecated_section, raw=True) 

566 if any(key == deprecated_key for key, _ in deprecated_section_array): 

567 return True 

568 return False 

569 

570 @staticmethod 

571 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

572 return ( 

573 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}") 

574 is not None 

575 ) 

576 

577 @staticmethod 

578 def _deprecated_command_is_set_in_config( 

579 deprecated_section: str, 

580 deprecated_key: str, 

581 configs: Iterable[tuple[str, ConfigParser]], 

582 ) -> bool: 

583 return AirflowConfigParser._deprecated_value_is_set_in_config( 

584 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs 

585 ) 

586 

587 @staticmethod 

588 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

589 return ( 

590 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD") 

591 is not None 

592 ) 

593 

594 @staticmethod 

595 def _deprecated_secret_is_set_in_config( 

596 deprecated_section: str, 

597 deprecated_key: str, 

598 configs: Iterable[tuple[str, ConfigParser]], 

599 ) -> bool: 

600 return AirflowConfigParser._deprecated_value_is_set_in_config( 

601 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs 

602 ) 

603 

604 @staticmethod 

605 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

606 return ( 

607 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET") 

608 is not None 

609 ) 

610 

611 @staticmethod 

612 def _replace_config_with_display_sources( 

613 config_sources: ConfigSourcesType, 

614 configs: Iterable[tuple[str, ConfigParser]], 

615 configuration_description: dict[str, dict[str, Any]], 

616 display_source: bool, 

617 raw: bool, 

618 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

619 include_env: bool, 

620 include_cmds: bool, 

621 include_secret: bool, 

622 ): 

623 for source_name, config in configs: 

624 sections = config.sections() 

625 for section in sections: 

626 AirflowConfigParser._replace_section_config_with_display_sources( 

627 config, 

628 config_sources, 

629 configuration_description, 

630 display_source, 

631 raw, 

632 section, 

633 source_name, 

634 deprecated_options, 

635 configs, 

636 include_env=include_env, 

637 include_cmds=include_cmds, 

638 include_secret=include_secret, 

639 ) 

640 

641 @staticmethod 

642 def _replace_section_config_with_display_sources( 

643 config: ConfigParser, 

644 config_sources: ConfigSourcesType, 

645 configuration_description: dict[str, dict[str, Any]], 

646 display_source: bool, 

647 raw: bool, 

648 section: str, 

649 source_name: str, 

650 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

651 configs: Iterable[tuple[str, ConfigParser]], 

652 include_env: bool, 

653 include_cmds: bool, 

654 include_secret: bool, 

655 ): 

656 sect = config_sources.setdefault(section, {}) 

657 if isinstance(config, AirflowConfigParser): 

658 with config.suppress_future_warnings(): 

659 items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw) 

660 else: 

661 items = config.items(section=section, raw=raw) 

662 for k, val in items: 

663 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None)) 

664 if deprecated_section and deprecated_key: 

665 if source_name == "default": 

666 # If deprecated entry has some non-default value set for any of the sources requested, 

667 # We should NOT set default for the new entry (because it will override anything 

668 # coming from the deprecated ones) 

669 if AirflowConfigParser._deprecated_value_is_set_in_config( 

670 deprecated_section, deprecated_key, configs 

671 ): 

672 continue 

673 if include_env and AirflowConfigParser._deprecated_variable_is_set( 

674 deprecated_section, deprecated_key 

675 ): 

676 continue 

677 if include_cmds and ( 

678 AirflowConfigParser._deprecated_variable_command_is_set( 

679 deprecated_section, deprecated_key 

680 ) 

681 or AirflowConfigParser._deprecated_command_is_set_in_config( 

682 deprecated_section, deprecated_key, configs 

683 ) 

684 ): 

685 continue 

686 if include_secret and ( 

687 AirflowConfigParser._deprecated_variable_secret_is_set( 

688 deprecated_section, deprecated_key 

689 ) 

690 or AirflowConfigParser._deprecated_secret_is_set_in_config( 

691 deprecated_section, deprecated_key, configs 

692 ) 

693 ): 

694 continue 

695 if display_source: 

696 updated_source_name = source_name 

697 if source_name == "default": 

698 # defaults can come from other sources (default-<PROVIDER>) that should be used here 

699 source_description_section = configuration_description.get(section, {}) 

700 source_description_key = source_description_section.get("options", {}).get(k, {}) 

701 if source_description_key is not None: 

702 updated_source_name = source_description_key.get("source", source_name) 

703 sect[k] = (val, updated_source_name) 

704 else: 

705 sect[k] = val 

706 

707 def _warn_deprecate( 

708 self, section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int 

709 ): 

710 """Warn about deprecated config option usage.""" 

711 if section == deprecated_section: 

712 warnings.warn( 

713 f"The {deprecated_name} option in [{section}] has been renamed to {key} - " 

714 f"the old setting has been used, but please update your config.", 

715 DeprecationWarning, 

716 stacklevel=4 + extra_stacklevel, 

717 ) 

718 else: 

719 warnings.warn( 

720 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option " 

721 f"in [{section}] - the old setting has been used, but please update your config.", 

722 DeprecationWarning, 

723 stacklevel=4 + extra_stacklevel, 

724 ) 

725 

726 @contextmanager 

727 def suppress_future_warnings(self): 

728 """ 

729 Context manager to temporarily suppress future warnings. 

730 

731 This is a stub used by the shared parser's lookup methods when checking deprecated options. 

732 Subclasses can override this to customize warning suppression behavior. 

733 

734 :return: context manager that suppresses future warnings 

735 """ 

736 suppress_future_warnings = self._suppress_future_warnings 

737 self._suppress_future_warnings = True 

738 yield self 

739 self._suppress_future_warnings = suppress_future_warnings 

740 

741 def _env_var_name(self, section: str, key: str, team_name: str | None = None) -> str: 

742 """Generate environment variable name for a config option.""" 

743 team_component: str = f"{team_name.upper()}___" if team_name else "" 

744 return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.', '_').upper()}__{key.upper()}" 

745 

746 def _get_env_var_option(self, section: str, key: str, team_name: str | None = None): 

747 """Get config option from environment variable.""" 

748 env_var: str = self._env_var_name(section, key, team_name=team_name) 

749 if env_var in os.environ: 

750 return expand_env_var(os.environ[env_var]) 

751 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command) 

752 env_var_cmd = env_var + "_CMD" 

753 if env_var_cmd in os.environ: 

754 # if this is a valid command key... 

755 if (section, key) in self.sensitive_config_values: 

756 return run_command(os.environ[env_var_cmd]) 

757 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend) 

758 env_var_secret_path = env_var + "_SECRET" 

759 if env_var_secret_path in os.environ: 

760 # if this is a valid secret path... 

761 if (section, key) in self.sensitive_config_values: 

762 return self._get_config_value_from_secret_backend(os.environ[env_var_secret_path]) 

763 return None 

764 

765 def _get_cmd_option(self, section: str, key: str): 

766 """Get config option from command execution.""" 

767 fallback_key = key + "_cmd" 

768 if (section, key) in self.sensitive_config_values: 

769 if super().has_option(section, fallback_key): 

770 command = super().get(section, fallback_key) 

771 try: 

772 cmd_output = run_command(command) 

773 except AirflowConfigException as e: 

774 raise e 

775 except Exception as e: 

776 raise AirflowConfigException( 

777 f"Cannot run the command for the config section [{section}]{fallback_key}_cmd." 

778 f" Please check the {fallback_key} value." 

779 ) from e 

780 return cmd_output 

781 return None 

782 

783 def _get_secret_option(self, section: str, key: str) -> str | None: 

784 """Get Config option values from Secret Backend.""" 

785 fallback_key = key + "_secret" 

786 if (section, key) in self.sensitive_config_values: 

787 if super().has_option(section, fallback_key): 

788 secrets_path = super().get(section, fallback_key) 

789 return self._get_config_value_from_secret_backend(secrets_path) 

790 return None 

791 

792 def _get_environment_variables( 

793 self, 

794 deprecated_key: str | None, 

795 deprecated_section: str | None, 

796 key: str, 

797 section: str, 

798 issue_warning: bool = True, 

799 extra_stacklevel: int = 0, 

800 **kwargs, 

801 ) -> str | ValueNotFound: 

802 """Get config option from environment variables.""" 

803 team_name = kwargs.get("team_name", None) 

804 option = self._get_env_var_option(section, key, team_name=team_name) 

805 if option is not None: 

806 return option 

807 if deprecated_section and deprecated_key: 

808 with self.suppress_future_warnings(): 

809 option = self._get_env_var_option(deprecated_section, deprecated_key, team_name=team_name) 

810 if option is not None: 

811 if issue_warning: 

812 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

813 return option 

814 return VALUE_NOT_FOUND_SENTINEL 

815 

816 def _get_option_from_config_file( 

817 self, 

818 deprecated_key: str | None, 

819 deprecated_section: str | None, 

820 key: str, 

821 section: str, 

822 issue_warning: bool = True, 

823 extra_stacklevel: int = 0, 

824 **kwargs, 

825 ) -> str | ValueNotFound: 

826 """Get config option from config file.""" 

827 if team_name := kwargs.get("team_name", None): 

828 section = f"{team_name}={section}" 

829 # since this is the last lookup that supports team_name, pop it 

830 kwargs.pop("team_name") 

831 if super().has_option(section, key): 

832 return expand_env_var(super().get(section, key, **kwargs)) 

833 if deprecated_section and deprecated_key: 

834 if super().has_option(deprecated_section, deprecated_key): 

835 if issue_warning: 

836 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

837 with self.suppress_future_warnings(): 

838 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs)) 

839 return VALUE_NOT_FOUND_SENTINEL 

840 

841 def _get_option_from_commands( 

842 self, 

843 deprecated_key: str | None, 

844 deprecated_section: str | None, 

845 key: str, 

846 section: str, 

847 issue_warning: bool = True, 

848 extra_stacklevel: int = 0, 

849 **kwargs, 

850 ) -> str | ValueNotFound: 

851 """Get config option from command execution.""" 

852 option = self._get_cmd_option(section, key) 

853 if option: 

854 return option 

855 if deprecated_section and deprecated_key: 

856 with self.suppress_future_warnings(): 

857 option = self._get_cmd_option(deprecated_section, deprecated_key) 

858 if option: 

859 if issue_warning: 

860 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

861 return option 

862 return VALUE_NOT_FOUND_SENTINEL 

863 

864 def _get_option_from_secrets( 

865 self, 

866 deprecated_key: str | None, 

867 deprecated_section: str | None, 

868 key: str, 

869 section: str, 

870 issue_warning: bool = True, 

871 extra_stacklevel: int = 0, 

872 **kwargs, 

873 ) -> str | ValueNotFound: 

874 """Get config option from secrets backend.""" 

875 option = self._get_secret_option(section, key) 

876 if option: 

877 return option 

878 if deprecated_section and deprecated_key: 

879 with self.suppress_future_warnings(): 

880 option = self._get_secret_option(deprecated_section, deprecated_key) 

881 if option: 

882 if issue_warning: 

883 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

884 return option 

885 return VALUE_NOT_FOUND_SENTINEL 

886 

887 def _get_option_from_defaults( 

888 self, 

889 deprecated_key: str | None, 

890 deprecated_section: str | None, 

891 key: str, 

892 section: str, 

893 issue_warning: bool = True, 

894 extra_stacklevel: int = 0, 

895 team_name: str | None = None, 

896 **kwargs, 

897 ) -> str | ValueNotFound: 

898 """Get config option from default values.""" 

899 if self.get_default_value(section, key) is not None or "fallback" in kwargs: 

900 return expand_env_var(self.get_default_value(section, key, **kwargs)) 

901 return VALUE_NOT_FOUND_SENTINEL 

902 

903 def _resolve_deprecated_lookup( 

904 self, 

905 section: str, 

906 key: str, 

907 lookup_from_deprecated: bool, 

908 extra_stacklevel: int = 0, 

909 ) -> tuple[str, str, str | None, str | None, bool]: 

910 """ 

911 Resolve deprecated section/key mappings and determine deprecated values. 

912 

913 :param section: Section name (will be lowercased) 

914 :param key: Key name (will be lowercased) 

915 :param lookup_from_deprecated: Whether to lookup from deprecated options 

916 :param extra_stacklevel: Extra stack level for warnings 

917 :return: Tuple of (resolved_section, resolved_key, deprecated_section, deprecated_key, warning_emitted) 

918 """ 

919 section = section.lower() 

920 key = key.lower() 

921 warning_emitted = False 

922 deprecated_section: str | None = None 

923 deprecated_key: str | None = None 

924 

925 if not lookup_from_deprecated: 

926 return section, key, deprecated_section, deprecated_key, warning_emitted 

927 

928 option_description = self.configuration_description.get(section, {}).get("options", {}).get(key, {}) 

929 if option_description.get("deprecated"): 

930 deprecation_reason = option_description.get("deprecation_reason", "") 

931 warnings.warn( 

932 f"The '{key}' option in section {section} is deprecated. {deprecation_reason}", 

933 DeprecationWarning, 

934 stacklevel=2 + extra_stacklevel, 

935 ) 

936 # For the cases in which we rename whole sections 

937 if section in self.inversed_deprecated_sections: 

938 deprecated_section, deprecated_key = (section, key) 

939 section = self.inversed_deprecated_sections[section] 

940 if not self._suppress_future_warnings: 

941 warnings.warn( 

942 f"The config section [{deprecated_section}] has been renamed to " 

943 f"[{section}]. Please update your `conf.get*` call to use the new name", 

944 FutureWarning, 

945 stacklevel=2 + extra_stacklevel, 

946 ) 

947 # Don't warn about individual rename if the whole section is renamed 

948 warning_emitted = True 

949 elif (section, key) in self.inversed_deprecated_options: 

950 # Handle using deprecated section/key instead of the new section/key 

951 new_section, new_key = self.inversed_deprecated_options[(section, key)] 

952 if not self._suppress_future_warnings and not warning_emitted: 

953 warnings.warn( 

954 f"section/key [{section}/{key}] has been deprecated, you should use" 

955 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the " 

956 "new name", 

957 FutureWarning, 

958 stacklevel=2 + extra_stacklevel, 

959 ) 

960 warning_emitted = True 

961 deprecated_section, deprecated_key = section, key 

962 section, key = (new_section, new_key) 

963 elif section in self.deprecated_sections: 

964 # When accessing the new section name, make sure we check under the old config name 

965 deprecated_key = key 

966 deprecated_section = self.deprecated_sections[section][0] 

967 else: 

968 deprecated_section, deprecated_key, _ = self.deprecated_options.get( 

969 (section, key), (None, None, None) 

970 ) 

971 

972 return section, key, deprecated_section, deprecated_key, warning_emitted 

973 

974 @overload # type: ignore[override] 

975 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... 

976 

977 @overload # type: ignore[override] 

978 def get(self, section: str, key: str, **kwargs) -> str | None: ... 

979 

980 def get( # type: ignore[misc, override] 

981 self, 

982 section: str, 

983 key: str, 

984 suppress_warnings: bool = False, 

985 lookup_from_deprecated: bool = True, 

986 _extra_stacklevel: int = 0, 

987 team_name: str | None = None, 

988 **kwargs, 

989 ) -> str | None: 

990 """ 

991 Get config value by iterating through lookup sequence. 

992 

993 Priority order is defined by _lookup_sequence property. 

994 """ 

995 section, key, deprecated_section, deprecated_key, warning_emitted = self._resolve_deprecated_lookup( 

996 section=section, 

997 key=key, 

998 lookup_from_deprecated=lookup_from_deprecated, 

999 extra_stacklevel=_extra_stacklevel, 

1000 ) 

1001 

1002 if team_name is not None: 

1003 kwargs["team_name"] = team_name 

1004 

1005 for lookup_method in self._lookup_sequence: 

1006 value = lookup_method( 

1007 deprecated_key=deprecated_key, 

1008 deprecated_section=deprecated_section, 

1009 key=key, 

1010 section=section, 

1011 issue_warning=not warning_emitted, 

1012 extra_stacklevel=_extra_stacklevel, 

1013 **kwargs, 

1014 ) 

1015 if value is not VALUE_NOT_FOUND_SENTINEL: 

1016 return value 

1017 

1018 # Check if fallback was explicitly provided (even if None) 

1019 if "fallback" in kwargs: 

1020 return kwargs["fallback"] 

1021 

1022 if not suppress_warnings: 

1023 log.warning("section/key [%s/%s] not found in config", section, key) 

1024 

1025 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config") 

1026 

1027 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override] 

1028 """Get config value as boolean.""" 

1029 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip() 

1030 if "#" in val: 

1031 val = val.split("#")[0].strip() 

1032 if val in ("t", "true", "1"): 

1033 return True 

1034 if val in ("f", "false", "0"): 

1035 return False 

1036 raise AirflowConfigException( 

1037 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. ' 

1038 f'Current value: "{val}".' 

1039 ) 

1040 

1041 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override] 

1042 """Get config value as integer.""" 

1043 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1044 if val is None: 

1045 raise AirflowConfigException( 

1046 f"Failed to convert value None to int. " 

1047 f'Please check "{key}" key in "{section}" section is set.' 

1048 ) 

1049 try: 

1050 return int(val) 

1051 except ValueError: 

1052 raise AirflowConfigException( 

1053 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1054 f'Current value: "{val}".' 

1055 ) 

1056 

1057 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override] 

1058 """Get config value as float.""" 

1059 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1060 if val is None: 

1061 raise AirflowConfigException( 

1062 f"Failed to convert value None to float. " 

1063 f'Please check "{key}" key in "{section}" section is set.' 

1064 ) 

1065 try: 

1066 return float(val) 

1067 except ValueError: 

1068 raise AirflowConfigException( 

1069 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. ' 

1070 f'Current value: "{val}".' 

1071 ) 

1072 

1073 def getlist(self, section: str, key: str, delimiter=",", **kwargs): 

1074 """Get config value as list.""" 

1075 val = self.get(section, key, **kwargs) 

1076 

1077 if isinstance(val, list) or val is None: 

1078 # `get` will always return a (possibly-empty) string, so the only way we can 

1079 # have these types is with `fallback=` was specified. So just return it. 

1080 return val 

1081 

1082 if val == "": 

1083 return [] 

1084 

1085 try: 

1086 return [item.strip() for item in val.split(delimiter)] 

1087 except Exception: 

1088 raise AirflowConfigException( 

1089 f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. ' 

1090 f'Current value: "{val}".' 

1091 ) 

1092 

1093 E = TypeVar("E", bound=Enum) 

1094 

1095 def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E: 

1096 """Get config value as enum.""" 

1097 val = self.get(section, key, **kwargs) 

1098 enum_names = [enum_item.name for enum_item in enum_class] 

1099 

1100 if val is None: 

1101 raise AirflowConfigException( 

1102 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1103 f'Current value: "{val}" and it must be one of {", ".join(enum_names)}' 

1104 ) 

1105 

1106 try: 

1107 return enum_class[val] 

1108 except KeyError: 

1109 if "fallback" in kwargs and kwargs["fallback"] in enum_names: 

1110 return enum_class[kwargs["fallback"]] 

1111 raise AirflowConfigException( 

1112 f'Failed to convert value. Please check "{key}" key in "{section}" section. ' 

1113 f"the value must be one of {', '.join(enum_names)}" 

1114 ) 

1115 

1116 def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]: 

1117 """Get config value as list of enums.""" 

1118 kwargs.setdefault("fallback", []) 

1119 string_list = self.getlist(section, key, delimiter, **kwargs) 

1120 

1121 enum_names = [enum_item.name for enum_item in enum_class] 

1122 enum_list = [] 

1123 

1124 for val in string_list: 

1125 try: 

1126 enum_list.append(enum_class[val]) 

1127 except KeyError: 

1128 log.warning( 

1129 "Failed to convert value %r. Please check %s key in %s section. " 

1130 "it must be one of %s, if not the value is ignored", 

1131 val, 

1132 key, 

1133 section, 

1134 ", ".join(enum_names), 

1135 ) 

1136 

1137 return enum_list 

1138 

1139 def getimport(self, section: str, key: str, **kwargs) -> Any: 

1140 """ 

1141 Read options, import the full qualified name, and return the object. 

1142 

1143 In case of failure, it throws an exception with the key and section names 

1144 

1145 :return: The object or None, if the option is empty 

1146 """ 

1147 # Fixed: use self.get() instead of conf.get() 

1148 full_qualified_path = self.get(section=section, key=key, **kwargs) 

1149 if not full_qualified_path: 

1150 return None 

1151 

1152 try: 

1153 # Import here to avoid circular dependency 

1154 from ..module_loading import import_string 

1155 

1156 return import_string(full_qualified_path) 

1157 except ImportError as e: 

1158 log.warning(e) 

1159 raise AirflowConfigException( 

1160 f'The object could not be loaded. Please check "{key}" key in "{section}" section. ' 

1161 f'Current value: "{full_qualified_path}".' 

1162 ) 

1163 

1164 def getjson( 

1165 self, section: str, key: str, fallback=None, **kwargs 

1166 ) -> dict | list | str | int | float | None: 

1167 """ 

1168 Return a config value parsed from a JSON string. 

1169 

1170 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given. 

1171 """ 

1172 try: 

1173 data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs) 

1174 except (NoSectionError, NoOptionError): 

1175 data = None 

1176 

1177 if data is None or data == "": 

1178 return fallback 

1179 

1180 try: 

1181 return json.loads(data) 

1182 except JSONDecodeError as e: 

1183 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e 

1184 

1185 def gettimedelta( 

1186 self, section: str, key: str, fallback: Any = None, **kwargs 

1187 ) -> datetime.timedelta | None: 

1188 """ 

1189 Get the config value for the given section and key, and convert it into datetime.timedelta object. 

1190 

1191 If the key is missing, then it is considered as `None`. 

1192 

1193 :param section: the section from the config 

1194 :param key: the key defined in the given section 

1195 :param fallback: fallback value when no config value is given, defaults to None 

1196 :raises AirflowConfigException: raised because ValueError or OverflowError 

1197 :return: datetime.timedelta(seconds=<config_value>) or None 

1198 """ 

1199 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs) 

1200 

1201 if val: 

1202 # the given value must be convertible to integer 

1203 try: 

1204 int_val = int(val) 

1205 except ValueError: 

1206 raise AirflowConfigException( 

1207 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

1208 f'Current value: "{val}".' 

1209 ) 

1210 

1211 try: 

1212 return datetime.timedelta(seconds=int_val) 

1213 except OverflowError as err: 

1214 raise AirflowConfigException( 

1215 f"Failed to convert value to timedelta in `seconds`. " 

1216 f"{err}. " 

1217 f'Please check "{key}" key in "{section}" section. Current value: "{val}".' 

1218 ) 

1219 

1220 return fallback 

1221 

1222 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: 

1223 """Get mandatory config value, raising ValueError if not found.""" 

1224 value = self.get(section, key, _extra_stacklevel=1, **kwargs) 

1225 if value is None: 

1226 raise ValueError(f"The value {section}/{key} should be set!") 

1227 return value 

1228 

1229 def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]: 

1230 """Get mandatory config value as list, raising ValueError if not found.""" 

1231 value = self.getlist(section, key, **kwargs) 

1232 if value is None: 

1233 raise ValueError(f"The value {section}/{key} should be set!") 

1234 return value 

1235 

1236 def read( 

1237 self, 

1238 filenames: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike], 

1239 encoding: str | None = None, 

1240 ) -> list[str]: 

1241 return super().read(filenames=filenames, encoding=encoding) 

1242 

1243 def read_dict( # type: ignore[override] 

1244 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>" 

1245 ) -> None: 

1246 """ 

1247 We define a different signature here to add better type hints and checking. 

1248 

1249 :param dictionary: dictionary to read from 

1250 :param source: source to be used to store the configuration 

1251 :return: 

1252 """ 

1253 super().read_dict(dictionary=dictionary, source=source) 

1254 

1255 def get_sections_including_defaults(self) -> list[str]: 

1256 """ 

1257 Retrieve all sections from the configuration parser, including sections defined by built-in defaults. 

1258 

1259 :return: list of section names 

1260 """ 

1261 sections_from_config = self.sections() 

1262 sections_from_description = list(self.configuration_description.keys()) 

1263 return list(dict.fromkeys(itertools.chain(sections_from_description, sections_from_config))) 

1264 

1265 def get_options_including_defaults(self, section: str) -> list[str]: 

1266 """ 

1267 Retrieve all possible options from the configuration parser for the section given. 

1268 

1269 Includes options defined by built-in defaults. 

1270 

1271 :param section: section name 

1272 :return: list of option names for the section given 

1273 """ 

1274 my_own_options = self.options(section) if self.has_section(section) else [] 

1275 all_options_from_defaults = list( 

1276 self.configuration_description.get(section, {}).get("options", {}).keys() 

1277 ) 

1278 return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options))) 

1279 

1280 def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True, **kwargs) -> bool: 

1281 """ 

1282 Check if option is defined. 

1283 

1284 Uses self.get() to avoid reimplementing the priority order of config variables 

1285 (env, config, cmd, defaults). 

1286 

1287 :param section: section to get option from 

1288 :param option: option to get 

1289 :param lookup_from_deprecated: If True, check if the option is defined in deprecated sections 

1290 :param kwargs: additional keyword arguments to pass to get(), such as team_name 

1291 :return: 

1292 """ 

1293 try: 

1294 value = self.get( 

1295 section, 

1296 option, 

1297 fallback=None, 

1298 _extra_stacklevel=1, 

1299 suppress_warnings=True, 

1300 lookup_from_deprecated=lookup_from_deprecated, 

1301 **kwargs, 

1302 ) 

1303 if value is None: 

1304 return False 

1305 return True 

1306 except (NoOptionError, NoSectionError, AirflowConfigException): 

1307 return False 

1308 

1309 def set(self, section: str, option: str, value: str | None = None) -> None: 

1310 """ 

1311 Set an option to the given value. 

1312 

1313 This override just makes sure the section and option are lower case, to match what we do in `get`. 

1314 """ 

1315 section = section.lower() 

1316 option = option.lower() 

1317 defaults = self.configuration_description or {} 

1318 if not self.has_section(section) and section in defaults: 

1319 # Trying to set a key in a section that exists in default, but not in the user config; 

1320 # automatically create it 

1321 self.add_section(section) 

1322 super().set(section, option, value) 

1323 

1324 def remove_option(self, section: str, option: str, remove_default: bool = True): 

1325 """ 

1326 Remove an option if it exists in config from a file or default config. 

1327 

1328 If both of config have the same option, this removes the option 

1329 in both configs unless remove_default=False. 

1330 """ 

1331 section = section.lower() 

1332 option = option.lower() 

1333 if super().has_option(section, option): 

1334 super().remove_option(section, option) 

1335 

1336 if self.get_default_value(section, option) is not None and remove_default: 

1337 self._default_values.remove_option(section, option) 

1338 

1339 def optionxform(self, optionstr: str) -> str: 

1340 """ 

1341 Transform option names on every read, get, or set operation. 

1342 

1343 This changes from the default behaviour of ConfigParser from lower-casing 

1344 to instead be case-preserving. 

1345 

1346 :param optionstr: 

1347 :return: 

1348 """ 

1349 return optionstr 

1350 

1351 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: 

1352 """ 

1353 Get list of config sources to use in as_dict(). 

1354 

1355 Subclasses can override to add additional sources (e.g., provider configs). 

1356 """ 

1357 return [ 

1358 ("default", self._default_values), 

1359 ("airflow.cfg", self), 

1360 ] 

1361 

1362 def as_dict( 

1363 self, 

1364 display_source: bool = False, 

1365 display_sensitive: bool = False, 

1366 raw: bool = False, 

1367 include_env: bool = True, 

1368 include_cmds: bool = True, 

1369 include_secret: bool = True, 

1370 ) -> ConfigSourcesType: 

1371 """ 

1372 Return the current configuration as an OrderedDict of OrderedDicts. 

1373 

1374 When materializing current configuration Airflow defaults are 

1375 materialized along with user set configs. If any of the `include_*` 

1376 options are False then the result of calling command or secret key 

1377 configs do not override Airflow defaults and instead are passed through. 

1378 In order to then avoid Airflow defaults from overwriting user set 

1379 command or secret key configs we filter out bare sensitive_config_values 

1380 that are set to Airflow defaults when command or secret key configs 

1381 produce different values. 

1382 

1383 :param display_source: If False, the option value is returned. If True, 

1384 a tuple of (option_value, source) is returned. Source is either 

1385 'airflow.cfg', 'default', 'env var', or 'cmd'. 

1386 :param display_sensitive: If True, the values of options set by env 

1387 vars and bash commands will be displayed. If False, those options 

1388 are shown as '< hidden >' 

1389 :param raw: Should the values be output as interpolated values, or the 

1390 "raw" form that can be fed back in to ConfigParser 

1391 :param include_env: Should the value of configuration from AIRFLOW__ 

1392 environment variables be included or not 

1393 :param include_cmds: Should the result of calling any ``*_cmd`` config be 

1394 set (True, default), or should the _cmd options be left as the 

1395 command to run (False) 

1396 :param include_secret: Should the result of calling any ``*_secret`` config be 

1397 set (True, default), or should the _secret options be left as the 

1398 path to get the secret from (False) 

1399 :return: Dictionary, where the key is the name of the section and the content is 

1400 the dictionary with the name of the parameter and its value. 

1401 """ 

1402 if not display_sensitive: 

1403 # We want to hide the sensitive values at the appropriate methods 

1404 # since envs from cmds, secrets can be read at _include_envs method 

1405 if not all([include_env, include_cmds, include_secret]): 

1406 raise ValueError( 

1407 "If display_sensitive is false, then include_env, " 

1408 "include_cmds, include_secret must all be set as True" 

1409 ) 

1410 

1411 config_sources: ConfigSourcesType = {} 

1412 

1413 # We check sequentially all those sources and the last one we saw it in will "win" 

1414 configs = self._get_config_sources_for_as_dict() 

1415 

1416 self._replace_config_with_display_sources( 

1417 config_sources, 

1418 configs, 

1419 self.configuration_description, 

1420 display_source, 

1421 raw, 

1422 self.deprecated_options, 

1423 include_cmds=include_cmds, 

1424 include_env=include_env, 

1425 include_secret=include_secret, 

1426 ) 

1427 

1428 # add env vars and overwrite because they have priority 

1429 if include_env: 

1430 self._include_envs(config_sources, display_sensitive, display_source, raw) 

1431 else: 

1432 self._filter_by_source(config_sources, display_source, self._get_env_var_option) 

1433 

1434 # add bash commands 

1435 if include_cmds: 

1436 self._include_commands(config_sources, display_sensitive, display_source, raw) 

1437 else: 

1438 self._filter_by_source(config_sources, display_source, self._get_cmd_option) 

1439 

1440 # add config from secret backends 

1441 if include_secret: 

1442 self._include_secrets(config_sources, display_sensitive, display_source, raw) 

1443 else: 

1444 self._filter_by_source(config_sources, display_source, self._get_secret_option) 

1445 

1446 if not display_sensitive: 

1447 # This ensures the ones from config file is hidden too 

1448 # if they are not provided through env, cmd and secret 

1449 hidden = "< hidden >" 

1450 for section, key in self.sensitive_config_values: 

1451 if config_sources.get(section): 

1452 if config_sources[section].get(key, None): 

1453 if display_source: 

1454 source = config_sources[section][key][1] 

1455 config_sources[section][key] = (hidden, source) 

1456 else: 

1457 config_sources[section][key] = hidden 

1458 

1459 return config_sources 

1460 

1461 def _write_option_header( 

1462 self, 

1463 file: IO[str], 

1464 option: str, 

1465 extra_spacing: bool, 

1466 include_descriptions: bool, 

1467 include_env_vars: bool, 

1468 include_examples: bool, 

1469 include_sources: bool, 

1470 section_config_description: dict[str, dict[str, Any]], 

1471 section_to_write: str, 

1472 sources_dict: ConfigSourcesType, 

1473 ) -> tuple[bool, bool]: 

1474 """ 

1475 Write header for configuration option. 

1476 

1477 Returns tuple of (should_continue, needs_separation) where needs_separation should be 

1478 set if the option needs additional separation to visually separate it from the next option. 

1479 """ 

1480 option_config_description = ( 

1481 section_config_description.get("options", {}).get(option, {}) 

1482 if section_config_description 

1483 else {} 

1484 ) 

1485 description = option_config_description.get("description") 

1486 needs_separation = False 

1487 if description and include_descriptions: 

1488 for line in description.splitlines(): 

1489 file.write(f"# {line}\n") 

1490 needs_separation = True 

1491 example = option_config_description.get("example") 

1492 if example is not None and include_examples: 

1493 if extra_spacing: 

1494 file.write("#\n") 

1495 example_lines = example.splitlines() 

1496 example = "\n# ".join(example_lines) 

1497 file.write(f"# Example: {option} = {example}\n") 

1498 needs_separation = True 

1499 if include_sources and sources_dict: 

1500 sources_section = sources_dict.get(section_to_write) 

1501 value_with_source = sources_section.get(option) if sources_section else None 

1502 if value_with_source is None: 

1503 file.write("#\n# Source: not defined\n") 

1504 else: 

1505 file.write(f"#\n# Source: {value_with_source[1]}\n") 

1506 needs_separation = True 

1507 if include_env_vars: 

1508 file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n") 

1509 if extra_spacing: 

1510 file.write("#\n") 

1511 needs_separation = True 

1512 return True, needs_separation 

1513 

1514 def is_template(self, section: str, key) -> bool: 

1515 """ 

1516 Return whether the value is templated. 

1517 

1518 :param section: section of the config 

1519 :param key: key in the section 

1520 :return: True if the value is templated 

1521 """ 

1522 return _is_template(self.configuration_description, section, key) 

1523 

1524 def getsection(self, section: str, team_name: str | None = None) -> ConfigOptionsDictType | None: 

1525 """ 

1526 Return the section as a dict. 

1527 

1528 Values are converted to int, float, bool as required. 

1529 

1530 :param section: section from the config 

1531 :param team_name: optional team name for team-specific configuration lookup 

1532 """ 

1533 # Handle team-specific section lookup for config file 

1534 config_section = f"{team_name}={section}" if team_name else section 

1535 

1536 if not self.has_section(config_section) and not self._default_values.has_section(config_section): 

1537 return None 

1538 if self._default_values.has_section(config_section): 

1539 _section: ConfigOptionsDictType = dict(self._default_values.items(config_section)) 

1540 else: 

1541 _section = {} 

1542 

1543 if self.has_section(config_section): 

1544 _section.update(self.items(config_section)) 

1545 

1546 # Use section (not config_section) for env var lookup - team_name is handled by _env_var_name 

1547 section_prefix = self._env_var_name(section, "", team_name=team_name) 

1548 for env_var in sorted(os.environ.keys()): 

1549 if env_var.startswith(section_prefix): 

1550 key = env_var.replace(section_prefix, "") 

1551 if key.endswith("_CMD"): 

1552 key = key[:-4] 

1553 key = key.lower() 

1554 _section[key] = self._get_env_var_option(section, key, team_name=team_name) 

1555 

1556 for key, val in _section.items(): 

1557 if val is None: 

1558 raise AirflowConfigException( 

1559 f"Failed to convert value automatically. " 

1560 f'Please check "{key}" key in "{section}" section is set.' 

1561 ) 

1562 try: 

1563 _section[key] = int(val) 

1564 except ValueError: 

1565 try: 

1566 _section[key] = float(val) 

1567 except ValueError: 

1568 if isinstance(val, str) and val.lower() in ("t", "true"): 

1569 _section[key] = True 

1570 elif isinstance(val, str) and val.lower() in ("f", "false"): 

1571 _section[key] = False 

1572 return _section 

1573 

1574 @staticmethod 

1575 def _write_section_header( 

1576 file: IO[str], 

1577 include_descriptions: bool, 

1578 section_config_description: dict[str, str], 

1579 section_to_write: str, 

1580 ) -> None: 

1581 """Write header for configuration section.""" 

1582 file.write(f"[{section_to_write}]\n") 

1583 section_description = section_config_description.get("description") 

1584 if section_description and include_descriptions: 

1585 for line in section_description.splitlines(): 

1586 file.write(f"# {line}\n") 

1587 file.write("\n") 

1588 

1589 def _write_value( 

1590 self, 

1591 file: IO[str], 

1592 option: str, 

1593 comment_out_everything: bool, 

1594 needs_separation: bool, 

1595 only_defaults: bool, 

1596 section_to_write: str, 

1597 ): 

1598 default_value = self.get_default_value(section_to_write, option, raw=True) 

1599 if only_defaults: 

1600 value = default_value 

1601 else: 

1602 value = self.get(section_to_write, option, fallback=default_value, raw=True) 

1603 if value is None: 

1604 file.write(f"# {option} = \n") 

1605 else: 

1606 if comment_out_everything: 

1607 value_lines = value.splitlines() 

1608 value = "\n# ".join(value_lines) 

1609 file.write(f"# {option} = {value}\n") 

1610 else: 

1611 if "\n" in value: 

1612 try: 

1613 value = json.dumps(json.loads(value), indent=4) 

1614 value = value.replace( 

1615 "\n", "\n " 

1616 ) # indent multi-line JSON to satisfy configparser format 

1617 except JSONDecodeError: 

1618 pass 

1619 file.write(f"{option} = {value}\n") 

1620 if needs_separation: 

1621 file.write("\n") 

1622 

1623 def write( # type: ignore[override] 

1624 self, 

1625 file: IO[str], 

1626 section: str | None = None, 

1627 include_examples: bool = True, 

1628 include_descriptions: bool = True, 

1629 include_sources: bool = True, 

1630 include_env_vars: bool = True, 

1631 include_providers: bool = True, 

1632 comment_out_everything: bool = False, 

1633 hide_sensitive_values: bool = False, 

1634 extra_spacing: bool = True, 

1635 only_defaults: bool = False, 

1636 **kwargs: Any, 

1637 ) -> None: 

1638 """ 

1639 Write configuration with comments and examples to a file. 

1640 

1641 :param file: file to write to 

1642 :param section: section of the config to write, defaults to all sections 

1643 :param include_examples: Include examples in the output 

1644 :param include_descriptions: Include descriptions in the output 

1645 :param include_sources: Include the source of each config option 

1646 :param include_env_vars: Include environment variables corresponding to each config option 

1647 :param include_providers: Include providers configuration 

1648 :param comment_out_everything: Comment out all values 

1649 :param hide_sensitive_values: Include sensitive values in the output 

1650 :param extra_spacing: Add extra spacing before examples and after variables 

1651 :param only_defaults: Only include default values when writing the config, not the actual values 

1652 """ 

1653 sources_dict = {} 

1654 if include_sources: 

1655 sources_dict = self.as_dict(display_source=True) 

1656 with self.make_sure_configuration_loaded(with_providers=include_providers): 

1657 for section_to_write in self.get_sections_including_defaults(): 

1658 section_config_description = self.configuration_description.get(section_to_write, {}) 

1659 if section_to_write != section and section is not None: 

1660 continue 

1661 if self._default_values.has_section(section_to_write) or self.has_section(section_to_write): 

1662 self._write_section_header( 

1663 file, include_descriptions, section_config_description, section_to_write 

1664 ) 

1665 for option in self.get_options_including_defaults(section_to_write): 

1666 should_continue, needs_separation = self._write_option_header( 

1667 file=file, 

1668 option=option, 

1669 extra_spacing=extra_spacing, 

1670 include_descriptions=include_descriptions, 

1671 include_env_vars=include_env_vars, 

1672 include_examples=include_examples, 

1673 include_sources=include_sources, 

1674 section_config_description=section_config_description, 

1675 section_to_write=section_to_write, 

1676 sources_dict=sources_dict, 

1677 ) 

1678 self._write_value( 

1679 file=file, 

1680 option=option, 

1681 comment_out_everything=comment_out_everything, 

1682 needs_separation=needs_separation, 

1683 only_defaults=only_defaults, 

1684 section_to_write=section_to_write, 

1685 ) 

1686 if include_descriptions and not needs_separation: 

1687 # extra separation between sections in case last option did not need it 

1688 file.write("\n") 

1689 

1690 @contextmanager 

1691 def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]: 

1692 """ 

1693 Make sure configuration is loaded with or without providers. 

1694 

1695 This happens regardless if the provider configuration has been loaded before or not. 

1696 Restores configuration to the state before entering the context. 

1697 

1698 :param with_providers: whether providers should be loaded 

1699 """ 

1700 needs_reload = False 

1701 if with_providers: 

1702 self._ensure_providers_config_loaded() 

1703 else: 

1704 needs_reload = self._ensure_providers_config_unloaded() 

1705 yield 

1706 if needs_reload: 

1707 self._reload_provider_configs() 

1708 

1709 def _ensure_providers_config_loaded(self) -> None: 

1710 """Ensure providers configurations are loaded.""" 

1711 raise NotImplementedError("Subclasses must implement _ensure_providers_config_loaded method") 

1712 

1713 def _ensure_providers_config_unloaded(self) -> bool: 

1714 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded.""" 

1715 raise NotImplementedError("Subclasses must implement _ensure_providers_config_unloaded method") 

1716 

1717 def _reload_provider_configs(self) -> None: 

1718 """Reload providers configuration.""" 

1719 raise NotImplementedError("Subclasses must implement _reload_provider_configs method")