Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/configuration.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

402 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import contextlib 

20import logging 

21import multiprocessing 

22import os 

23import pathlib 

24import re 

25import shlex 

26import stat 

27import subprocess 

28import sys 

29import warnings 

30from base64 import b64encode 

31from collections.abc import Callable 

32from configparser import ConfigParser 

33from copy import deepcopy 

34from inspect import ismodule 

35from io import StringIO 

36from re import Pattern 

37from typing import IO, TYPE_CHECKING, Any 

38from urllib.parse import urlsplit 

39 

40from typing_extensions import overload 

41 

42from airflow._shared.configuration.parser import ( 

43 VALUE_NOT_FOUND_SENTINEL, 

44 AirflowConfigParser as _SharedAirflowConfigParser, 

45 ValueNotFound, 

46 configure_parser_from_configuration_description, 

47) 

48from airflow._shared.module_loading import import_string 

49from airflow.exceptions import AirflowConfigException, RemovedInAirflow4Warning 

50from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH 

51from airflow.task.weight_rule import WeightRule 

52from airflow.utils import yaml 

53 

54if TYPE_CHECKING: 

55 from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager 

56 from airflow.secrets import BaseSecretsBackend 

57 

58log = logging.getLogger(__name__) 

59 

60# show Airflow's deprecation warnings 

61if not sys.warnoptions: 

62 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow") 

63 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow") 

64 

65_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$") 

66 

67ConfigType = str | int | float | bool 

68ConfigOptionsDictType = dict[str, ConfigType] 

69ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

70ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

71 

72ENV_VAR_PREFIX = "AIRFLOW__" 

73 

74 

75class _SecretKeys: 

76 """Holds the secret keys used in Airflow during runtime.""" 

77 

78 fernet_key: str = "" # Set only if needed when generating a new file 

79 jwt_secret_key: str = "" 

80 

81 

82class ConfigModifications: 

83 """ 

84 Holds modifications to be applied when writing out the config. 

85 

86 :param rename: Mapping from (old_section, old_option) to (new_section, new_option) 

87 :param remove: Set of (section, option) to remove 

88 :param default_updates: Mapping from (section, option) to new default value 

89 """ 

90 

91 def __init__(self) -> None: 

92 self.rename: dict[tuple[str, str], tuple[str, str]] = {} 

93 self.remove: set[tuple[str, str]] = set() 

94 self.default_updates: dict[tuple[str, str], str] = {} 

95 

96 def add_rename(self, old_section: str, old_option: str, new_section: str, new_option: str) -> None: 

97 self.rename[(old_section, old_option)] = (new_section, new_option) 

98 

99 def add_remove(self, section: str, option: str) -> None: 

100 self.remove.add((section, option)) 

101 

102 def add_default_update(self, section: str, option: str, new_default: str) -> None: 

103 self.default_updates[(section, option)] = new_default 

104 

105 

106def _parse_sqlite_version(s: str) -> tuple[int, ...]: 

107 match = _SQLITE3_VERSION_PATTERN.match(s) 

108 if match is None: 

109 return () 

110 return tuple(int(p) for p in match.group("version").split(".")) 

111 

112 

113@overload 

114def expand_env_var(env_var: None) -> None: ... 

115 

116 

117@overload 

118def expand_env_var(env_var: str) -> str: ... 

119 

120 

121def expand_env_var(env_var: str | None) -> str | None: 

122 """ 

123 Expand (potentially nested) env vars. 

124 

125 Repeat and apply `expandvars` and `expanduser` until 

126 interpolation stops having any effect. 

127 """ 

128 if not env_var or not isinstance(env_var, str): 

129 return env_var 

130 while True: 

131 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

132 if interpolated == env_var: 

133 return interpolated 

134 env_var = interpolated 

135 

136 

137def run_command(command: str) -> str: 

138 """Run command and returns stdout.""" 

139 process = subprocess.Popen( 

140 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

141 ) 

142 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

143 

144 if process.returncode != 0: 

145 raise AirflowConfigException( 

146 f"Cannot execute {command}. Error code is: {process.returncode}. " 

147 f"Output: {output}, Stderr: {stderr}" 

148 ) 

149 

150 return output 

151 

152 

153def _default_config_file_path(file_name: str) -> str: 

154 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates") 

155 return os.path.join(templates_dir, file_name) 

156 

157 

158def retrieve_configuration_description( 

159 include_airflow: bool = True, 

160 include_providers: bool = True, 

161 selected_provider: str | None = None, 

162) -> dict[str, dict[str, Any]]: 

163 """ 

164 Read Airflow configuration description from YAML file. 

165 

166 :param include_airflow: Include Airflow configs 

167 :param include_providers: Include provider configs 

168 :param selected_provider: If specified, include selected provider only 

169 :return: Python dictionary containing configs & their info 

170 """ 

171 base_configuration_description: dict[str, dict[str, Any]] = {} 

172 if include_airflow: 

173 with open(_default_config_file_path("config.yml")) as config_file: 

174 base_configuration_description.update(yaml.safe_load(config_file)) 

175 if include_providers: 

176 from airflow.providers_manager import ProvidersManager 

177 

178 for provider, config in ProvidersManager().provider_configs: 

179 if not selected_provider or provider == selected_provider: 

180 base_configuration_description.update(config) 

181 return base_configuration_description 

182 

183 

184class AirflowConfigParser(_SharedAirflowConfigParser): 

185 """ 

186 Custom Airflow Configparser supporting defaults and deprecated options. 

187 

188 This is a subclass of the shared AirflowConfigParser that adds Core-specific initialization 

189 and functionality (providers, validation, writing, etc.). 

190 

191 The defaults are stored in the ``_default_values``. The configuration description keeps 

192 description of all the options available in Airflow (description follow config.yaml.schema). 

193 

194 :param default_config: default configuration (in the form of ini file). 

195 :param configuration_description: description of configuration to use 

196 """ 

197 

198 def __init__( 

199 self, 

200 default_config: str | None = None, 

201 *args, 

202 **kwargs, 

203 ): 

204 configuration_description = retrieve_configuration_description(include_providers=False) 

205 # For those who would like to use a different data structure to keep defaults: 

206 # We have to keep the default values in a ConfigParser rather than in any other 

207 # data structure, because the values we have might contain %% which are ConfigParser 

208 # interpolation placeholders. The _default_values config parser will interpolate them 

209 # properly when we call get() on it. 

210 _default_values = create_default_config_parser(configuration_description) 

211 super().__init__(configuration_description, _default_values, *args, **kwargs) 

212 self.configuration_description = configuration_description 

213 self._default_values = _default_values 

214 self._provider_config_fallback_default_values = create_provider_config_fallback_defaults() 

215 if default_config is not None: 

216 self._update_defaults_from_string(default_config) 

217 self._update_logging_deprecated_template_to_one_from_defaults() 

218 self.is_validated = False 

219 self._suppress_future_warnings = False 

220 self._providers_configuration_loaded = False 

221 

222 @property 

223 def _validators(self) -> list[Callable[[], None]]: 

224 """Overring _validators from shared base class to add core-specific validators.""" 

225 return [ 

226 self._validate_sqlite3_version, 

227 self._validate_enums, 

228 self._validate_deprecated_values, 

229 self._upgrade_postgres_metastore_conn, 

230 ] 

231 

232 @property 

233 def _lookup_sequence(self) -> list[Callable]: 

234 """Overring _lookup_sequence from shared base class to add provider fallbacks.""" 

235 return super()._lookup_sequence + [self._get_option_from_provider_fallbacks] 

236 

237 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: 

238 """Override the base method to add provider fallbacks.""" 

239 return [ 

240 ("provider-fallback-defaults", self._provider_config_fallback_default_values), 

241 ("default", self._default_values), 

242 ("airflow.cfg", self), 

243 ] 

244 

245 def _get_option_from_provider_fallbacks( 

246 self, 

247 deprecated_key: str | None, 

248 deprecated_section: str | None, 

249 key: str, 

250 section: str, 

251 issue_warning: bool = True, 

252 extra_stacklevel: int = 0, 

253 **kwargs, 

254 ) -> str | ValueNotFound: 

255 """Get config option from provider fallback defaults.""" 

256 if self.get_provider_config_fallback_defaults(section, key) is not None: 

257 # no expansion needed 

258 return self.get_provider_config_fallback_defaults(section, key, **kwargs) 

259 return VALUE_NOT_FOUND_SENTINEL 

260 

261 def _update_logging_deprecated_template_to_one_from_defaults(self): 

262 default = self.get_default_value("logging", "log_filename_template") 

263 if default: 

264 # Tuple does not support item assignment, so we have to create a new tuple and replace it 

265 original_replacement = self.deprecated_values["logging"]["log_filename_template"] 

266 self.deprecated_values["logging"]["log_filename_template"] = ( 

267 original_replacement[0], 

268 default, 

269 ) 

270 

271 def get_provider_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: 

272 """Get provider config fallback default values.""" 

273 return self._provider_config_fallback_default_values.get(section, key, fallback=None, **kwargs) 

274 

275 # A mapping of old default values that we want to change and warn the user 

276 # about. Mapping of section -> setting -> { old, replace } 

277 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = { 

278 "logging": { 

279 "log_filename_template": ( 

280 re.compile( 

281 re.escape( 

282 "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log" 

283 ) 

284 ), 

285 # The actual replacement value will be updated after defaults are loaded from config.yml 

286 "XX-set-after-default-config-loaded-XX", 

287 ), 

288 }, 

289 "core": { 

290 "executor": (re.compile(re.escape("SequentialExecutor")), "LocalExecutor"), 

291 }, 

292 } 

293 

294 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"] 

295 enums_options = { 

296 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()), 

297 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"], 

298 ("core", "mp_start_method"): multiprocessing.get_all_start_methods(), 

299 ("dag_processor", "file_parsing_sort_mode"): [ 

300 "modified_time", 

301 "random_seeded_by_host", 

302 "alphabetical", 

303 ], 

304 ("logging", "logging_level"): _available_logging_levels, 

305 ("logging", "fab_logging_level"): _available_logging_levels, 

306 # celery_logging_level can be empty, which uses logging_level as fallback 

307 ("logging", "celery_logging_level"): [*_available_logging_levels, ""], 

308 # uvicorn and gunicorn logging levels for web servers 

309 ("logging", "uvicorn_logging_level"): _available_logging_levels, 

310 ("logging", "gunicorn_logging_level"): _available_logging_levels, 

311 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""], 

312 ("api", "grid_view_sorting_order"): ["topological", "hierarchical_alphabetical"], 

313 } 

314 

315 upgraded_values: dict[tuple[str, str], str] 

316 """Mapping of (section,option) to the old value that was upgraded""" 

317 

318 def write_custom_config( 

319 self, 

320 file: IO[str], 

321 comment_out_defaults: bool = True, 

322 include_descriptions: bool = True, 

323 extra_spacing: bool = True, 

324 modifications: ConfigModifications | None = None, 

325 ) -> None: 

326 """ 

327 Write a configuration file using a ConfigModifications object. 

328 

329 This method includes only options from the current airflow.cfg. For each option: 

330 - If it's marked for removal, omit it. 

331 - If renamed, output it under its new name and add a comment indicating its original location. 

332 - If a default update is specified, apply the new default and output the option as a commented line. 

333 - Otherwise, if the current value equals the default and comment_out_defaults is True, output it as a comment. 

334 Options absent from the current airflow.cfg are omitted. 

335 

336 :param file: File to write the configuration. 

337 :param comment_out_defaults: If True, options whose value equals the default are written as comments. 

338 :param include_descriptions: Whether to include section descriptions. 

339 :param extra_spacing: Whether to insert an extra blank line after each option. 

340 :param modifications: ConfigModifications instance with rename, remove, and default updates. 

341 """ 

342 modifications = modifications or ConfigModifications() 

343 output: dict[str, list[tuple[str, str, bool, str]]] = {} 

344 

345 for section in self._sections: # type: ignore[attr-defined] # accessing _sections from ConfigParser 

346 for option, orig_value in self._sections[section].items(): # type: ignore[attr-defined] 

347 key = (section.lower(), option.lower()) 

348 if key in modifications.remove: 

349 continue 

350 

351 mod_comment = "" 

352 if key in modifications.rename: 

353 new_sec, new_opt = modifications.rename[key] 

354 effective_section = new_sec 

355 effective_option = new_opt 

356 mod_comment += f"# Renamed from {section}.{option}\n" 

357 else: 

358 effective_section = section 

359 effective_option = option 

360 

361 value = orig_value 

362 if key in modifications.default_updates: 

363 mod_comment += ( 

364 f"# Default updated from {orig_value} to {modifications.default_updates[key]}\n" 

365 ) 

366 value = modifications.default_updates[key] 

367 

368 default_value = self.get_default_value(effective_section, effective_option, fallback="") 

369 is_default = str(value) == str(default_value) 

370 output.setdefault(effective_section.lower(), []).append( 

371 (effective_option, str(value), is_default, mod_comment) 

372 ) 

373 

374 for section, options in output.items(): 

375 section_buffer = StringIO() 

376 section_buffer.write(f"[{section}]\n") 

377 if include_descriptions: 

378 description = self.configuration_description.get(section, {}).get("description", "") 

379 if description: 

380 for line in description.splitlines(): 

381 section_buffer.write(f"# {line}\n") 

382 section_buffer.write("\n") 

383 for option, value_str, is_default, mod_comment in options: 

384 key = (section.lower(), option.lower()) 

385 if key in modifications.default_updates and comment_out_defaults: 

386 section_buffer.write(f"# {option} = {value_str}\n") 

387 else: 

388 if mod_comment: 

389 section_buffer.write(mod_comment) 

390 if is_default and comment_out_defaults: 

391 section_buffer.write(f"# {option} = {value_str}\n") 

392 else: 

393 section_buffer.write(f"{option} = {value_str}\n") 

394 if extra_spacing: 

395 section_buffer.write("\n") 

396 content = section_buffer.getvalue().strip() 

397 if content: 

398 file.write(f"{content}\n\n") 

399 

400 def _ensure_providers_config_loaded(self) -> None: 

401 """Ensure providers configurations are loaded.""" 

402 if not self._providers_configuration_loaded: 

403 from airflow.providers_manager import ProvidersManager 

404 

405 ProvidersManager()._initialize_providers_configuration() 

406 

407 def _ensure_providers_config_unloaded(self) -> bool: 

408 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded.""" 

409 if self._providers_configuration_loaded: 

410 self.restore_core_default_configuration() 

411 return True 

412 return False 

413 

414 def _reload_provider_configs(self) -> None: 

415 """Reload providers configuration.""" 

416 self.load_providers_configuration() 

417 

418 def restore_core_default_configuration(self) -> None: 

419 """ 

420 Restore default configuration for core Airflow. 

421 

422 It does not restore configuration for providers. If you want to restore configuration for 

423 providers, you need to call ``load_providers_configuration`` method. 

424 """ 

425 self.configuration_description = retrieve_configuration_description(include_providers=False) 

426 self._default_values = create_default_config_parser(self.configuration_description) 

427 self._providers_configuration_loaded = False 

428 

429 def _upgrade_postgres_metastore_conn(self): 

430 """ 

431 Upgrade SQL schemas. 

432 

433 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres` 

434 must be replaced with `postgresql`. 

435 """ 

436 section, key = "database", "sql_alchemy_conn" 

437 old_value = self.get(section, key, _extra_stacklevel=1) 

438 bad_schemes = ["postgres+psycopg2", "postgres"] 

439 good_scheme = "postgresql" 

440 parsed = urlsplit(old_value) 

441 if parsed.scheme in bad_schemes: 

442 warnings.warn( 

443 f"Bad scheme in Airflow configuration [database] sql_alchemy_conn: `{parsed.scheme}`. " 

444 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must " 

445 f"change to `{good_scheme}` before the next Airflow release.", 

446 FutureWarning, 

447 stacklevel=1, 

448 ) 

449 self.upgraded_values[(section, key)] = old_value 

450 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value) 

451 self._update_env_var(section=section, name=key, new_value=new_value) 

452 

453 # if the old value is set via env var, we need to wipe it 

454 # otherwise, it'll "win" over our adjusted value 

455 old_env_var = self._env_var_name("core", key) 

456 os.environ.pop(old_env_var, None) 

457 

458 def _validate_enums(self): 

459 """Validate that enum type config has an accepted value.""" 

460 for (section_key, option_key), enum_options in self.enums_options.items(): 

461 if self.has_option(section_key, option_key): 

462 value = self.get(section_key, option_key, fallback=None) 

463 if value and value not in enum_options: 

464 raise AirflowConfigException( 

465 f"`[{section_key}] {option_key}` should not be " 

466 f"{value!r}. Possible values: {', '.join(enum_options)}." 

467 ) 

468 

469 def _validate_sqlite3_version(self): 

470 """ 

471 Validate SQLite version. 

472 

473 Some features in storing rendered fields require SQLite >= 3.15.0. 

474 """ 

475 if "sqlite" not in self.get("database", "sql_alchemy_conn"): 

476 return 

477 

478 import sqlite3 

479 

480 min_sqlite_version = (3, 15, 0) 

481 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version: 

482 return 

483 

484 from airflow.utils.docs import get_docs_url 

485 

486 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version) 

487 raise AirflowConfigException( 

488 f"error: SQLite C library too old (< {min_sqlite_version_str}). " 

489 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}" 

490 ) 

491 

492 def mask_secrets(self): 

493 from airflow._shared.secrets_masker import mask_secret as mask_secret_core 

494 from airflow.sdk.log import mask_secret as mask_secret_sdk 

495 

496 for section, key in self.sensitive_config_values: 

497 try: 

498 with self.suppress_future_warnings(): 

499 value = self.get(section, key, suppress_warnings=True) 

500 except AirflowConfigException: 

501 log.debug( 

502 "Could not retrieve value from section %s, for key %s. Skipping redaction of this conf.", 

503 section, 

504 key, 

505 ) 

506 continue 

507 mask_secret_core(value) 

508 mask_secret_sdk(value) 

509 

510 def load_test_config(self): 

511 """ 

512 Use test configuration rather than the configuration coming from airflow defaults. 

513 

514 When running tests we use special the unit_test configuration to avoid accidental modifications and 

515 different behaviours when running the tests. Values for those test configuration are stored in 

516 the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder 

517 and you need to change values there if you want to make some specific configuration to be used 

518 """ 

519 from cryptography.fernet import Fernet 

520 

521 unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg" 

522 unit_test_config = unit_test_config_file.read_text() 

523 self.remove_all_read_configurations() 

524 with StringIO(unit_test_config) as test_config_file: 

525 self.read_file(test_config_file) 

526 

527 # We need those globals before we run "get_all_expansion_variables" because this is where 

528 # the variables are expanded from in the configuration - set to random values for tests 

529 _SecretKeys.fernet_key = Fernet.generate_key().decode() 

530 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8") 

531 self.expand_all_configuration_values() 

532 log.info("Unit test configuration loaded from 'config_unit_tests.cfg'") 

533 

534 def expand_all_configuration_values(self): 

535 """Expand all configuration values using global and local variables defined in this module.""" 

536 all_vars = get_all_expansion_variables() 

537 for section in self.sections(): 

538 for key, value in self.items(section): 

539 if value is not None: 

540 if self.has_option(section, key): 

541 self.remove_option(section, key) 

542 if self.is_template(section, key) or not isinstance(value, str): 

543 self.set(section, key, value) 

544 else: 

545 self.set(section, key, value.format(**all_vars)) 

546 

547 def remove_all_read_configurations(self): 

548 """Remove all read configurations, leaving only default values in the config.""" 

549 for section in self.sections(): 

550 self.remove_section(section) 

551 

552 @property 

553 def providers_configuration_loaded(self) -> bool: 

554 """Checks if providers have been loaded.""" 

555 return self._providers_configuration_loaded 

556 

557 def load_providers_configuration(self): 

558 """ 

559 Load configuration for providers. 

560 

561 This should be done after initial configuration have been performed. Initializing and discovering 

562 providers is an expensive operation and cannot be performed when we load configuration for the first 

563 time when airflow starts, because we initialize configuration very early, during importing of the 

564 `airflow` package and the module is not yet ready to be used when it happens and until configuration 

565 and settings are loaded. Therefore, in order to reload provider configuration we need to additionally 

566 load provider - specific configuration. 

567 """ 

568 log.debug("Loading providers configuration") 

569 from airflow.providers_manager import ProvidersManager 

570 

571 self.restore_core_default_configuration() 

572 for provider, config in ProvidersManager().already_initialized_provider_configs: 

573 for provider_section, provider_section_content in config.items(): 

574 provider_options = provider_section_content["options"] 

575 section_in_current_config = self.configuration_description.get(provider_section) 

576 if not section_in_current_config: 

577 self.configuration_description[provider_section] = deepcopy(provider_section_content) 

578 section_in_current_config = self.configuration_description.get(provider_section) 

579 section_in_current_config["source"] = f"default-{provider}" 

580 for option in provider_options: 

581 section_in_current_config["options"][option]["source"] = f"default-{provider}" 

582 else: 

583 section_source = section_in_current_config.get("source", "Airflow's core package").split( 

584 "default-" 

585 )[-1] 

586 raise AirflowConfigException( 

587 f"The provider {provider} is attempting to contribute " 

588 f"configuration section {provider_section} that " 

589 f"has already been added before. The source of it: {section_source}. " 

590 "This is forbidden. A provider can only add new sections. It " 

591 "cannot contribute options to existing sections or override other " 

592 "provider's configuration.", 

593 UserWarning, 

594 ) 

595 self._default_values = create_default_config_parser(self.configuration_description) 

596 # sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete 

597 # the cached values, and it will be refreshed on next access. 

598 with contextlib.suppress(AttributeError): 

599 # no problem if cache is not set yet 

600 del self.sensitive_config_values 

601 self._providers_configuration_loaded = True 

602 

603 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

604 """ 

605 Override to use module-level function that reads from global conf. 

606 

607 This ensures as_dict() and other methods use the same secrets backend 

608 configuration as the global conf instance (set via conf_vars in tests). 

609 """ 

610 secrets_client = get_custom_secret_backend() 

611 if not secrets_client: 

612 return None 

613 try: 

614 return secrets_client.get_config(config_key) 

615 except Exception as e: 

616 raise AirflowConfigException( 

617 "Cannot retrieve config from alternative secrets backend. " 

618 "Make sure it is configured properly and that the Backend " 

619 "is accessible.\n" 

620 f"{e}" 

621 ) 

622 

623 def __getstate__(self) -> dict[str, Any]: 

624 """Return the state of the object as a dictionary for pickling.""" 

625 return { 

626 name: getattr(self, name) 

627 for name in [ 

628 "_sections", 

629 "is_validated", 

630 "configuration_description", 

631 "upgraded_values", 

632 "_default_values", 

633 ] 

634 } 

635 

636 def __setstate__(self, state) -> None: 

637 """Restore the state of the object from a dictionary representation.""" 

638 self.__init__() # type: ignore[misc] 

639 config = state.pop("_sections") 

640 self.read_dict(config) 

641 self.__dict__.update(state) 

642 

643 

644def get_airflow_home() -> str: 

645 """Get path to Airflow Home.""" 

646 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow")) 

647 

648 

649def get_airflow_config(airflow_home: str) -> str: 

650 """Get Path to airflow.cfg path.""" 

651 airflow_config_var = os.environ.get("AIRFLOW_CONFIG") 

652 if airflow_config_var is None: 

653 return os.path.join(airflow_home, "airflow.cfg") 

654 return expand_env_var(airflow_config_var) 

655 

656 

657def get_all_expansion_variables() -> dict[str, Any]: 

658 return { 

659 "FERNET_KEY": _SecretKeys.fernet_key, 

660 "JWT_SECRET_KEY": _SecretKeys.jwt_secret_key, 

661 **{ 

662 k: v 

663 for k, v in globals().items() 

664 if not k.startswith("_") and not callable(v) and not ismodule(v) 

665 }, 

666 } 

667 

668 

669def _generate_fernet_key() -> str: 

670 from cryptography.fernet import Fernet 

671 

672 return Fernet.generate_key().decode() 

673 

674 

675def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser: 

676 """ 

677 Create default config parser based on configuration description. 

678 

679 It creates ConfigParser with all default values retrieved from the configuration description and 

680 expands all the variables from the global and local variables defined in this module. 

681 

682 :param configuration_description: configuration description - retrieved from config.yaml files 

683 following the schema defined in "config.yml.schema.json" in the config_templates folder. 

684 :return: Default Config Parser that can be used to read configuration values from. 

685 """ 

686 parser = ConfigParser() 

687 all_vars = get_all_expansion_variables() 

688 configure_parser_from_configuration_description(parser, configuration_description, all_vars) 

689 return parser 

690 

691 

692def create_provider_config_fallback_defaults() -> ConfigParser: 

693 """ 

694 Create fallback defaults. 

695 

696 This parser contains provider defaults for Airflow configuration, containing fallback default values 

697 that might be needed when provider classes are being imported - before provider's configuration 

698 is loaded. 

699 

700 Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead 

701 to retrieving provider configuration before the defaults for the provider are loaded. 

702 

703 Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or 

704 environment variables) those will be used as usual. 

705 

706 NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication, 

707 at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed. 

708 

709 You've been warned! 

710 """ 

711 config_parser = ConfigParser() 

712 config_parser.read(_default_config_file_path("provider_config_fallback_defaults.cfg")) 

713 return config_parser 

714 

715 

716def write_default_airflow_configuration_if_needed() -> AirflowConfigParser: 

717 airflow_config = pathlib.Path(AIRFLOW_CONFIG) 

718 if airflow_config.is_dir(): 

719 msg = ( 

720 "Airflow config expected to be a path to the configuration file, " 

721 f"but got a directory {airflow_config.__fspath__()!r}." 

722 ) 

723 raise IsADirectoryError(msg) 

724 if not airflow_config.exists(): 

725 log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__()) 

726 config_directory = airflow_config.parent 

727 if not config_directory.exists(): 

728 if not config_directory.is_relative_to(AIRFLOW_HOME): 

729 msg = ( 

730 f"Config directory {config_directory.__fspath__()!r} not exists " 

731 f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. " 

732 "Please create this directory first." 

733 ) 

734 raise FileNotFoundError(msg) from None 

735 log.debug("Create directory %r for Airflow config", config_directory.__fspath__()) 

736 config_directory.mkdir(parents=True, exist_ok=True) 

737 if not conf.get("core", "fernet_key"): 

738 # We know that fernet_key is not set, so we can generate it, set as global key 

739 # and also write it to the config file so that same key will be used next time 

740 _SecretKeys.fernet_key = _generate_fernet_key() 

741 conf.configuration_description["core"]["options"]["fernet_key"]["default"] = ( 

742 _SecretKeys.fernet_key 

743 ) 

744 

745 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8") 

746 conf.configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = ( 

747 _SecretKeys.jwt_secret_key 

748 ) 

749 pathlib.Path(airflow_config.__fspath__()).touch() 

750 make_group_other_inaccessible(airflow_config.__fspath__()) 

751 with open(airflow_config, "w") as file: 

752 conf.write( 

753 file, 

754 include_sources=False, 

755 include_env_vars=True, 

756 include_providers=True, 

757 extra_spacing=True, 

758 only_defaults=True, 

759 show_values=True, 

760 ) 

761 return conf 

762 

763 

764def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser): 

765 """ 

766 Load standard airflow configuration. 

767 

768 In case it finds that the configuration file is missing, it will create it and write the default 

769 configuration values there, based on defaults passed, and will add the comments and examples 

770 from the default configuration. 

771 

772 :param airflow_config_parser: parser to which the configuration will be loaded 

773 

774 """ 

775 global AIRFLOW_HOME # to be cleaned in Airflow 4.0 

776 log.info("Reading the config from %s", AIRFLOW_CONFIG) 

777 airflow_config_parser.read(AIRFLOW_CONFIG) 

778 if airflow_config_parser.has_option("core", "AIRFLOW_HOME"): 

779 msg = ( 

780 "Specifying both AIRFLOW_HOME environment variable and airflow_home " 

781 "in the config file is deprecated. Please use only the AIRFLOW_HOME " 

782 "environment variable and remove the config file entry." 

783 ) 

784 if "AIRFLOW_HOME" in os.environ: 

785 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1) 

786 elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME: 

787 warnings.warn( 

788 "Specifying airflow_home in the config file is deprecated. As you " 

789 "have left it at the default value you should remove the setting " 

790 "from your airflow.cfg and suffer no change in behaviour.", 

791 category=RemovedInAirflow4Warning, 

792 stacklevel=1, 

793 ) 

794 else: 

795 AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home") 

796 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1) 

797 

798 

799def initialize_config() -> AirflowConfigParser: 

800 """ 

801 Load the Airflow config files. 

802 

803 Called for you automatically as part of the Airflow boot process. 

804 """ 

805 airflow_config_parser = AirflowConfigParser() 

806 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

807 airflow_config_parser.load_test_config() 

808 else: 

809 load_standard_airflow_configuration(airflow_config_parser) 

810 # If the user set unit_test_mode in the airflow.cfg, we still 

811 # want to respect that and then load the default unit test configuration 

812 # file on top of it. 

813 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

814 airflow_config_parser.load_test_config() 

815 return airflow_config_parser 

816 

817 

818def make_group_other_inaccessible(file_path: str): 

819 try: 

820 permissions = os.stat(file_path) 

821 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR)) 

822 except Exception as e: 

823 log.warning( 

824 "Could not change permissions of config file to be group/other inaccessible. " 

825 "Continuing with original permissions: %s", 

826 e, 

827 ) 

828 

829 

830def ensure_secrets_loaded( 

831 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

832) -> list[BaseSecretsBackend]: 

833 """ 

834 Ensure that all secrets backends are loaded. 

835 

836 If the secrets_backend_list contains only 2 default backends, reload it. 

837 """ 

838 # Check if the secrets_backend_list contains only 2 default backends. 

839 

840 # Check if we are loading the backends for worker too by checking if the default_backends is equal 

841 # to DEFAULT_SECRETS_SEARCH_PATH. 

842 if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH: 

843 return initialize_secrets_backends(default_backends=default_backends) 

844 return secrets_backend_list 

845 

846 

847def get_custom_secret_backend(worker_mode: bool = False) -> BaseSecretsBackend | None: 

848 """ 

849 Get Secret Backend if defined in airflow.cfg. 

850 

851 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

852 

853 This is a convenience function that calls conf._get_custom_secret_backend(). 

854 """ 

855 return conf._get_custom_secret_backend(worker_mode=worker_mode) 

856 

857 

858def initialize_secrets_backends( 

859 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

860) -> list[BaseSecretsBackend]: 

861 """ 

862 Initialize secrets backend. 

863 

864 * import secrets backend classes 

865 * instantiate them and return them in a list 

866 """ 

867 backend_list = [] 

868 worker_mode = False 

869 if default_backends != DEFAULT_SECRETS_SEARCH_PATH: 

870 worker_mode = True 

871 

872 custom_secret_backend = get_custom_secret_backend(worker_mode) 

873 

874 if custom_secret_backend is not None: 

875 backend_list.append(custom_secret_backend) 

876 

877 for class_name in default_backends: 

878 secrets_backend_cls = import_string(class_name) 

879 backend_list.append(secrets_backend_cls()) 

880 

881 return backend_list 

882 

883 

884def initialize_auth_manager() -> BaseAuthManager: 

885 """ 

886 Initialize auth manager. 

887 

888 * import user manager class 

889 * instantiate it and return it 

890 """ 

891 auth_manager_cls = conf.getimport(section="core", key="auth_manager") 

892 

893 if not auth_manager_cls: 

894 raise AirflowConfigException( 

895 "No auth manager defined in the config. Please specify one using section/key [core/auth_manager]." 

896 ) 

897 

898 return auth_manager_cls() 

899 

900 

901# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using 

902# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults. 

903AIRFLOW_HOME = get_airflow_home() 

904AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME) 

905 

906# Set up dags folder for unit tests 

907# this directory won't exist if users install via pip 

908_TEST_DAGS_FOLDER = os.path.join( 

909 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags" 

910) 

911if os.path.exists(_TEST_DAGS_FOLDER): 

912 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER 

913else: 

914 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags") 

915 

916# Set up plugins folder for unit tests 

917_TEST_PLUGINS_FOLDER = os.path.join( 

918 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins" 

919) 

920if os.path.exists(_TEST_PLUGINS_FOLDER): 

921 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER 

922else: 

923 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins") 

924 

925SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8") 

926 

927conf: AirflowConfigParser = initialize_config() 

928secrets_backend_list = initialize_secrets_backends() 

929conf.validate()