Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/configuration.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

409 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import contextlib 

20import logging 

21import multiprocessing 

22import os 

23import pathlib 

24import re 

25import shlex 

26import stat 

27import subprocess 

28import sys 

29import warnings 

30from base64 import b64encode 

31from collections.abc import Callable 

32from configparser import ConfigParser 

33from copy import deepcopy 

34from io import StringIO 

35from re import Pattern 

36from typing import IO, TYPE_CHECKING, Any 

37from urllib.parse import urlsplit 

38 

39from typing_extensions import overload 

40 

41from airflow._shared.configuration.parser import ( 

42 VALUE_NOT_FOUND_SENTINEL, 

43 AirflowConfigParser as _SharedAirflowConfigParser, 

44 ValueNotFound, 

45) 

46from airflow._shared.module_loading import import_string 

47from airflow.exceptions import AirflowConfigException 

48from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH 

49from airflow.task.weight_rule import WeightRule 

50from airflow.utils import yaml 

51 

52if TYPE_CHECKING: 

53 from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager 

54 from airflow.secrets import BaseSecretsBackend 

55 

56log = logging.getLogger(__name__) 

57 

58# show Airflow's deprecation warnings 

59if not sys.warnoptions: 

60 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow") 

61 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow") 

62 

63_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$") 

64 

65ConfigType = str | int | float | bool 

66ConfigOptionsDictType = dict[str, ConfigType] 

67ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

68ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

69 

70ENV_VAR_PREFIX = "AIRFLOW__" 

71 

72 

73class ConfigModifications: 

74 """ 

75 Holds modifications to be applied when writing out the config. 

76 

77 :param rename: Mapping from (old_section, old_option) to (new_section, new_option) 

78 :param remove: Set of (section, option) to remove 

79 :param default_updates: Mapping from (section, option) to new default value 

80 """ 

81 

82 def __init__(self) -> None: 

83 self.rename: dict[tuple[str, str], tuple[str, str]] = {} 

84 self.remove: set[tuple[str, str]] = set() 

85 self.default_updates: dict[tuple[str, str], str] = {} 

86 

87 def add_rename(self, old_section: str, old_option: str, new_section: str, new_option: str) -> None: 

88 self.rename[(old_section, old_option)] = (new_section, new_option) 

89 

90 def add_remove(self, section: str, option: str) -> None: 

91 self.remove.add((section, option)) 

92 

93 def add_default_update(self, section: str, option: str, new_default: str) -> None: 

94 self.default_updates[(section, option)] = new_default 

95 

96 

97def _parse_sqlite_version(s: str) -> tuple[int, ...]: 

98 match = _SQLITE3_VERSION_PATTERN.match(s) 

99 if match is None: 

100 return () 

101 return tuple(int(p) for p in match.group("version").split(".")) 

102 

103 

104@overload 

105def expand_env_var(env_var: None) -> None: ... 

106 

107 

108@overload 

109def expand_env_var(env_var: str) -> str: ... 

110 

111 

112def expand_env_var(env_var: str | None) -> str | None: 

113 """ 

114 Expand (potentially nested) env vars. 

115 

116 Repeat and apply `expandvars` and `expanduser` until 

117 interpolation stops having any effect. 

118 """ 

119 if not env_var or not isinstance(env_var, str): 

120 return env_var 

121 while True: 

122 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

123 if interpolated == env_var: 

124 return interpolated 

125 env_var = interpolated 

126 

127 

128def run_command(command: str) -> str: 

129 """Run command and returns stdout.""" 

130 process = subprocess.Popen( 

131 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

132 ) 

133 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

134 

135 if process.returncode != 0: 

136 raise AirflowConfigException( 

137 f"Cannot execute {command}. Error code is: {process.returncode}. " 

138 f"Output: {output}, Stderr: {stderr}" 

139 ) 

140 

141 return output 

142 

143 

144def _default_config_file_path(file_name: str) -> str: 

145 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates") 

146 return os.path.join(templates_dir, file_name) 

147 

148 

149def retrieve_configuration_description( 

150 include_airflow: bool = True, 

151 include_providers: bool = True, 

152 selected_provider: str | None = None, 

153) -> dict[str, dict[str, Any]]: 

154 """ 

155 Read Airflow configuration description from YAML file. 

156 

157 :param include_airflow: Include Airflow configs 

158 :param include_providers: Include provider configs 

159 :param selected_provider: If specified, include selected provider only 

160 :return: Python dictionary containing configs & their info 

161 """ 

162 base_configuration_description: dict[str, dict[str, Any]] = {} 

163 if include_airflow: 

164 with open(_default_config_file_path("config.yml")) as config_file: 

165 base_configuration_description.update(yaml.safe_load(config_file)) 

166 if include_providers: 

167 from airflow.providers_manager import ProvidersManager 

168 

169 for provider, config in ProvidersManager().provider_configs: 

170 if not selected_provider or provider == selected_provider: 

171 base_configuration_description.update(config) 

172 return base_configuration_description 

173 

174 

175class AirflowConfigParser(_SharedAirflowConfigParser): 

176 """ 

177 Custom Airflow Configparser supporting defaults and deprecated options. 

178 

179 This is a subclass of the shared AirflowConfigParser that adds Core-specific initialization 

180 and functionality (providers, validation, writing, etc.). 

181 

182 The defaults are stored in the ``_default_values``. The configuration description keeps 

183 description of all the options available in Airflow (description follow config.yaml.schema). 

184 

185 :param default_config: default configuration (in the form of ini file). 

186 :param configuration_description: description of configuration to use 

187 """ 

188 

189 def __init__( 

190 self, 

191 default_config: str | None = None, 

192 *args, 

193 **kwargs, 

194 ): 

195 configuration_description = retrieve_configuration_description(include_providers=False) 

196 # For those who would like to use a different data structure to keep defaults: 

197 # We have to keep the default values in a ConfigParser rather than in any other 

198 # data structure, because the values we have might contain %% which are ConfigParser 

199 # interpolation placeholders. The _default_values config parser will interpolate them 

200 # properly when we call get() on it. 

201 _default_values = create_default_config_parser(configuration_description) 

202 super().__init__(configuration_description, _default_values, *args, **kwargs) 

203 self.configuration_description = configuration_description 

204 self._default_values = _default_values 

205 self._provider_config_fallback_default_values = create_provider_config_fallback_defaults() 

206 if default_config is not None: 

207 self._update_defaults_from_string(default_config) 

208 self._update_logging_deprecated_template_to_one_from_defaults() 

209 self.is_validated = False 

210 self._suppress_future_warnings = False 

211 self._providers_configuration_loaded = False 

212 

213 @property 

214 def _validators(self) -> list[Callable[[], None]]: 

215 """Overring _validators from shared base class to add core-specific validators.""" 

216 return [ 

217 self._validate_sqlite3_version, 

218 self._validate_enums, 

219 self._validate_deprecated_values, 

220 self._upgrade_postgres_metastore_conn, 

221 ] 

222 

223 @property 

224 def _lookup_sequence(self) -> list[Callable]: 

225 """Overring _lookup_sequence from shared base class to add provider fallbacks.""" 

226 return super()._lookup_sequence + [self._get_option_from_provider_fallbacks] 

227 

228 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: 

229 """Override the base method to add provider fallbacks.""" 

230 return [ 

231 ("provider-fallback-defaults", self._provider_config_fallback_default_values), 

232 ("default", self._default_values), 

233 ("airflow.cfg", self), 

234 ] 

235 

236 def _get_option_from_provider_fallbacks( 

237 self, 

238 deprecated_key: str | None, 

239 deprecated_section: str | None, 

240 key: str, 

241 section: str, 

242 issue_warning: bool = True, 

243 extra_stacklevel: int = 0, 

244 **kwargs, 

245 ) -> str | ValueNotFound: 

246 """Get config option from provider fallback defaults.""" 

247 if self.get_provider_config_fallback_defaults(section, key) is not None: 

248 # no expansion needed 

249 return self.get_provider_config_fallback_defaults(section, key, **kwargs) 

250 return VALUE_NOT_FOUND_SENTINEL 

251 

252 def _update_logging_deprecated_template_to_one_from_defaults(self): 

253 default = self.get_default_value("logging", "log_filename_template") 

254 if default: 

255 # Tuple does not support item assignment, so we have to create a new tuple and replace it 

256 original_replacement = self.deprecated_values["logging"]["log_filename_template"] 

257 self.deprecated_values["logging"]["log_filename_template"] = ( 

258 original_replacement[0], 

259 default, 

260 ) 

261 

262 def get_provider_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: 

263 """Get provider config fallback default values.""" 

264 return self._provider_config_fallback_default_values.get(section, key, fallback=None, **kwargs) 

265 

266 # A mapping of old default values that we want to change and warn the user 

267 # about. Mapping of section -> setting -> { old, replace } 

268 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = { 

269 "logging": { 

270 "log_filename_template": ( 

271 re.compile( 

272 re.escape( 

273 "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log" 

274 ) 

275 ), 

276 # The actual replacement value will be updated after defaults are loaded from config.yml 

277 "XX-set-after-default-config-loaded-XX", 

278 ), 

279 }, 

280 "core": { 

281 "executor": (re.compile(re.escape("SequentialExecutor")), "LocalExecutor"), 

282 }, 

283 } 

284 

285 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"] 

286 enums_options = { 

287 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()), 

288 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"], 

289 ("core", "mp_start_method"): multiprocessing.get_all_start_methods(), 

290 ("dag_processor", "file_parsing_sort_mode"): [ 

291 "modified_time", 

292 "random_seeded_by_host", 

293 "alphabetical", 

294 ], 

295 ("logging", "logging_level"): _available_logging_levels, 

296 ("logging", "fab_logging_level"): _available_logging_levels, 

297 # celery_logging_level can be empty, which uses logging_level as fallback 

298 ("logging", "celery_logging_level"): [*_available_logging_levels, ""], 

299 # uvicorn and gunicorn logging levels for web servers 

300 ("logging", "uvicorn_logging_level"): _available_logging_levels, 

301 ("logging", "gunicorn_logging_level"): _available_logging_levels, 

302 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""], 

303 ("api", "grid_view_sorting_order"): ["topological", "hierarchical_alphabetical"], 

304 } 

305 

306 upgraded_values: dict[tuple[str, str], str] 

307 """Mapping of (section,option) to the old value that was upgraded""" 

308 

309 def write_custom_config( 

310 self, 

311 file: IO[str], 

312 comment_out_defaults: bool = True, 

313 include_descriptions: bool = True, 

314 extra_spacing: bool = True, 

315 modifications: ConfigModifications | None = None, 

316 ) -> None: 

317 """ 

318 Write a configuration file using a ConfigModifications object. 

319 

320 This method includes only options from the current airflow.cfg. For each option: 

321 - If it's marked for removal, omit it. 

322 - If renamed, output it under its new name and add a comment indicating its original location. 

323 - If a default update is specified, apply the new default and output the option as a commented line. 

324 - Otherwise, if the current value equals the default and comment_out_defaults is True, output it as a comment. 

325 Options absent from the current airflow.cfg are omitted. 

326 

327 :param file: File to write the configuration. 

328 :param comment_out_defaults: If True, options whose value equals the default are written as comments. 

329 :param include_descriptions: Whether to include section descriptions. 

330 :param extra_spacing: Whether to insert an extra blank line after each option. 

331 :param modifications: ConfigModifications instance with rename, remove, and default updates. 

332 """ 

333 modifications = modifications or ConfigModifications() 

334 output: dict[str, list[tuple[str, str, bool, str]]] = {} 

335 

336 for section in self._sections: # type: ignore[attr-defined] # accessing _sections from ConfigParser 

337 for option, orig_value in self._sections[section].items(): # type: ignore[attr-defined] 

338 key = (section.lower(), option.lower()) 

339 if key in modifications.remove: 

340 continue 

341 

342 mod_comment = "" 

343 if key in modifications.rename: 

344 new_sec, new_opt = modifications.rename[key] 

345 effective_section = new_sec 

346 effective_option = new_opt 

347 mod_comment += f"# Renamed from {section}.{option}\n" 

348 else: 

349 effective_section = section 

350 effective_option = option 

351 

352 value = orig_value 

353 if key in modifications.default_updates: 

354 mod_comment += ( 

355 f"# Default updated from {orig_value} to {modifications.default_updates[key]}\n" 

356 ) 

357 value = modifications.default_updates[key] 

358 

359 default_value = self.get_default_value(effective_section, effective_option, fallback="") 

360 is_default = str(value) == str(default_value) 

361 output.setdefault(effective_section.lower(), []).append( 

362 (effective_option, str(value), is_default, mod_comment) 

363 ) 

364 

365 for section, options in output.items(): 

366 section_buffer = StringIO() 

367 section_buffer.write(f"[{section}]\n") 

368 if include_descriptions: 

369 description = self.configuration_description.get(section, {}).get("description", "") 

370 if description: 

371 for line in description.splitlines(): 

372 section_buffer.write(f"# {line}\n") 

373 section_buffer.write("\n") 

374 for option, value_str, is_default, mod_comment in options: 

375 key = (section.lower(), option.lower()) 

376 if key in modifications.default_updates and comment_out_defaults: 

377 section_buffer.write(f"# {option} = {value_str}\n") 

378 else: 

379 if mod_comment: 

380 section_buffer.write(mod_comment) 

381 if is_default and comment_out_defaults: 

382 section_buffer.write(f"# {option} = {value_str}\n") 

383 else: 

384 section_buffer.write(f"{option} = {value_str}\n") 

385 if extra_spacing: 

386 section_buffer.write("\n") 

387 content = section_buffer.getvalue().strip() 

388 if content: 

389 file.write(f"{content}\n\n") 

390 

391 def _ensure_providers_config_loaded(self) -> None: 

392 """Ensure providers configurations are loaded.""" 

393 if not self._providers_configuration_loaded: 

394 from airflow.providers_manager import ProvidersManager 

395 

396 ProvidersManager()._initialize_providers_configuration() 

397 

398 def _ensure_providers_config_unloaded(self) -> bool: 

399 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded.""" 

400 if self._providers_configuration_loaded: 

401 self.restore_core_default_configuration() 

402 return True 

403 return False 

404 

405 def _reload_provider_configs(self) -> None: 

406 """Reload providers configuration.""" 

407 self.load_providers_configuration() 

408 

409 def restore_core_default_configuration(self) -> None: 

410 """ 

411 Restore default configuration for core Airflow. 

412 

413 It does not restore configuration for providers. If you want to restore configuration for 

414 providers, you need to call ``load_providers_configuration`` method. 

415 """ 

416 self.configuration_description = retrieve_configuration_description(include_providers=False) 

417 self._default_values = create_default_config_parser(self.configuration_description) 

418 self._providers_configuration_loaded = False 

419 

420 def _upgrade_postgres_metastore_conn(self): 

421 """ 

422 Upgrade SQL schemas. 

423 

424 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres` 

425 must be replaced with `postgresql`. 

426 """ 

427 section, key = "database", "sql_alchemy_conn" 

428 old_value = self.get(section, key, _extra_stacklevel=1) 

429 bad_schemes = ["postgres+psycopg2", "postgres"] 

430 good_scheme = "postgresql" 

431 parsed = urlsplit(old_value) 

432 if parsed.scheme in bad_schemes: 

433 warnings.warn( 

434 f"Bad scheme in Airflow configuration [database] sql_alchemy_conn: `{parsed.scheme}`. " 

435 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must " 

436 f"change to `{good_scheme}` before the next Airflow release.", 

437 FutureWarning, 

438 stacklevel=1, 

439 ) 

440 self.upgraded_values[(section, key)] = old_value 

441 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value) 

442 self._update_env_var(section=section, name=key, new_value=new_value) 

443 

444 # if the old value is set via env var, we need to wipe it 

445 # otherwise, it'll "win" over our adjusted value 

446 old_env_var = self._env_var_name("core", key) 

447 os.environ.pop(old_env_var, None) 

448 

449 def _validate_enums(self): 

450 """Validate that enum type config has an accepted value.""" 

451 for (section_key, option_key), enum_options in self.enums_options.items(): 

452 if self.has_option(section_key, option_key): 

453 value = self.get(section_key, option_key, fallback=None) 

454 if value and value not in enum_options: 

455 raise AirflowConfigException( 

456 f"`[{section_key}] {option_key}` should not be " 

457 f"{value!r}. Possible values: {', '.join(enum_options)}." 

458 ) 

459 

460 def _validate_sqlite3_version(self): 

461 """ 

462 Validate SQLite version. 

463 

464 Some features in storing rendered fields require SQLite >= 3.15.0. 

465 """ 

466 if "sqlite" not in self.get("database", "sql_alchemy_conn"): 

467 return 

468 

469 import sqlite3 

470 

471 min_sqlite_version = (3, 15, 0) 

472 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version: 

473 return 

474 

475 from airflow.utils.docs import get_docs_url 

476 

477 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version) 

478 raise AirflowConfigException( 

479 f"error: SQLite C library too old (< {min_sqlite_version_str}). " 

480 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}" 

481 ) 

482 

483 def mask_secrets(self): 

484 from airflow._shared.secrets_masker import mask_secret as mask_secret_core 

485 from airflow.sdk.log import mask_secret as mask_secret_sdk 

486 

487 for section, key in self.sensitive_config_values: 

488 try: 

489 with self.suppress_future_warnings(): 

490 value = self.get(section, key, suppress_warnings=True) 

491 except AirflowConfigException: 

492 log.debug( 

493 "Could not retrieve value from section %s, for key %s. Skipping redaction of this conf.", 

494 section, 

495 key, 

496 ) 

497 continue 

498 mask_secret_core(value) 

499 mask_secret_sdk(value) 

500 

501 def load_test_config(self): 

502 """ 

503 Use test configuration rather than the configuration coming from airflow defaults. 

504 

505 When running tests we use special the unit_test configuration to avoid accidental modifications and 

506 different behaviours when running the tests. Values for those test configuration are stored in 

507 the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder 

508 and you need to change values there if you want to make some specific configuration to be used 

509 """ 

510 # We need those globals before we run "get_all_expansion_variables" because this is where 

511 # the variables are expanded from in the configuration 

512 global FERNET_KEY, JWT_SECRET_KEY 

513 from cryptography.fernet import Fernet 

514 

515 unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg" 

516 unit_test_config = unit_test_config_file.read_text() 

517 self.remove_all_read_configurations() 

518 with StringIO(unit_test_config) as test_config_file: 

519 self.read_file(test_config_file) 

520 # set fernet key to a random value 

521 FERNET_KEY = Fernet.generate_key().decode() 

522 JWT_SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8") 

523 self.expand_all_configuration_values() 

524 log.info("Unit test configuration loaded from 'config_unit_tests.cfg'") 

525 

526 def expand_all_configuration_values(self): 

527 """Expand all configuration values using global and local variables defined in this module.""" 

528 all_vars = get_all_expansion_variables() 

529 for section in self.sections(): 

530 for key, value in self.items(section): 

531 if value is not None: 

532 if self.has_option(section, key): 

533 self.remove_option(section, key) 

534 if self.is_template(section, key) or not isinstance(value, str): 

535 self.set(section, key, value) 

536 else: 

537 self.set(section, key, value.format(**all_vars)) 

538 

539 def remove_all_read_configurations(self): 

540 """Remove all read configurations, leaving only default values in the config.""" 

541 for section in self.sections(): 

542 self.remove_section(section) 

543 

544 @property 

545 def providers_configuration_loaded(self) -> bool: 

546 """Checks if providers have been loaded.""" 

547 return self._providers_configuration_loaded 

548 

549 def load_providers_configuration(self): 

550 """ 

551 Load configuration for providers. 

552 

553 This should be done after initial configuration have been performed. Initializing and discovering 

554 providers is an expensive operation and cannot be performed when we load configuration for the first 

555 time when airflow starts, because we initialize configuration very early, during importing of the 

556 `airflow` package and the module is not yet ready to be used when it happens and until configuration 

557 and settings are loaded. Therefore, in order to reload provider configuration we need to additionally 

558 load provider - specific configuration. 

559 """ 

560 log.debug("Loading providers configuration") 

561 from airflow.providers_manager import ProvidersManager 

562 

563 self.restore_core_default_configuration() 

564 for provider, config in ProvidersManager().already_initialized_provider_configs: 

565 for provider_section, provider_section_content in config.items(): 

566 provider_options = provider_section_content["options"] 

567 section_in_current_config = self.configuration_description.get(provider_section) 

568 if not section_in_current_config: 

569 self.configuration_description[provider_section] = deepcopy(provider_section_content) 

570 section_in_current_config = self.configuration_description.get(provider_section) 

571 section_in_current_config["source"] = f"default-{provider}" 

572 for option in provider_options: 

573 section_in_current_config["options"][option]["source"] = f"default-{provider}" 

574 else: 

575 section_source = section_in_current_config.get("source", "Airflow's core package").split( 

576 "default-" 

577 )[-1] 

578 raise AirflowConfigException( 

579 f"The provider {provider} is attempting to contribute " 

580 f"configuration section {provider_section} that " 

581 f"has already been added before. The source of it: {section_source}. " 

582 "This is forbidden. A provider can only add new sections. It " 

583 "cannot contribute options to existing sections or override other " 

584 "provider's configuration.", 

585 UserWarning, 

586 ) 

587 self._default_values = create_default_config_parser(self.configuration_description) 

588 # sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete 

589 # the cached values, and it will be refreshed on next access. 

590 with contextlib.suppress(AttributeError): 

591 # no problem if cache is not set yet 

592 del self.sensitive_config_values 

593 self._providers_configuration_loaded = True 

594 

595 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

596 """ 

597 Override to use module-level function that reads from global conf. 

598 

599 This ensures as_dict() and other methods use the same secrets backend 

600 configuration as the global conf instance (set via conf_vars in tests). 

601 """ 

602 secrets_client = get_custom_secret_backend() 

603 if not secrets_client: 

604 return None 

605 try: 

606 return secrets_client.get_config(config_key) 

607 except Exception as e: 

608 raise AirflowConfigException( 

609 "Cannot retrieve config from alternative secrets backend. " 

610 "Make sure it is configured properly and that the Backend " 

611 "is accessible.\n" 

612 f"{e}" 

613 ) 

614 

615 def __getstate__(self) -> dict[str, Any]: 

616 """Return the state of the object as a dictionary for pickling.""" 

617 return { 

618 name: getattr(self, name) 

619 for name in [ 

620 "_sections", 

621 "is_validated", 

622 "configuration_description", 

623 "upgraded_values", 

624 "_default_values", 

625 ] 

626 } 

627 

628 def __setstate__(self, state) -> None: 

629 """Restore the state of the object from a dictionary representation.""" 

630 self.__init__() # type: ignore[misc] 

631 config = state.pop("_sections") 

632 self.read_dict(config) 

633 self.__dict__.update(state) 

634 

635 

636def get_airflow_home() -> str: 

637 """Get path to Airflow Home.""" 

638 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow")) 

639 

640 

641def get_airflow_config(airflow_home: str) -> str: 

642 """Get Path to airflow.cfg path.""" 

643 airflow_config_var = os.environ.get("AIRFLOW_CONFIG") 

644 if airflow_config_var is None: 

645 return os.path.join(airflow_home, "airflow.cfg") 

646 return expand_env_var(airflow_config_var) 

647 

648 

649def get_all_expansion_variables() -> dict[str, Any]: 

650 return {k: v for d in [globals(), locals()] for k, v in d.items() if not k.startswith("_")} 

651 

652 

653def _generate_fernet_key() -> str: 

654 from cryptography.fernet import Fernet 

655 

656 return Fernet.generate_key().decode() 

657 

658 

659def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser: 

660 """ 

661 Create default config parser based on configuration description. 

662 

663 It creates ConfigParser with all default values retrieved from the configuration description and 

664 expands all the variables from the global and local variables defined in this module. 

665 

666 :param configuration_description: configuration description - retrieved from config.yaml files 

667 following the schema defined in "config.yml.schema.json" in the config_templates folder. 

668 :return: Default Config Parser that can be used to read configuration values from. 

669 """ 

670 parser = ConfigParser() 

671 all_vars = get_all_expansion_variables() 

672 for section, section_desc in configuration_description.items(): 

673 parser.add_section(section) 

674 options = section_desc["options"] 

675 for key in options: 

676 default_value = options[key]["default"] 

677 is_template = options[key].get("is_template", False) 

678 if default_value is not None: 

679 if is_template or not isinstance(default_value, str): 

680 parser.set(section, key, default_value) 

681 else: 

682 parser.set(section, key, default_value.format(**all_vars)) 

683 return parser 

684 

685 

686def create_provider_config_fallback_defaults() -> ConfigParser: 

687 """ 

688 Create fallback defaults. 

689 

690 This parser contains provider defaults for Airflow configuration, containing fallback default values 

691 that might be needed when provider classes are being imported - before provider's configuration 

692 is loaded. 

693 

694 Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead 

695 to retrieving provider configuration before the defaults for the provider are loaded. 

696 

697 Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or 

698 environment variables) those will be used as usual. 

699 

700 NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication, 

701 at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed. 

702 

703 You've been warned! 

704 """ 

705 config_parser = ConfigParser() 

706 config_parser.read(_default_config_file_path("provider_config_fallback_defaults.cfg")) 

707 return config_parser 

708 

709 

710def write_default_airflow_configuration_if_needed() -> AirflowConfigParser: 

711 global FERNET_KEY, JWT_SECRET_KEY 

712 airflow_config = pathlib.Path(AIRFLOW_CONFIG) 

713 if airflow_config.is_dir(): 

714 msg = ( 

715 "Airflow config expected to be a path to the configuration file, " 

716 f"but got a directory {airflow_config.__fspath__()!r}." 

717 ) 

718 raise IsADirectoryError(msg) 

719 if not airflow_config.exists(): 

720 log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__()) 

721 config_directory = airflow_config.parent 

722 if not config_directory.exists(): 

723 if not config_directory.is_relative_to(AIRFLOW_HOME): 

724 msg = ( 

725 f"Config directory {config_directory.__fspath__()!r} not exists " 

726 f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. " 

727 "Please create this directory first." 

728 ) 

729 raise FileNotFoundError(msg) from None 

730 log.debug("Create directory %r for Airflow config", config_directory.__fspath__()) 

731 config_directory.mkdir(parents=True, exist_ok=True) 

732 if conf.get("core", "fernet_key", fallback=None) in (None, ""): 

733 # We know that FERNET_KEY is not set, so we can generate it, set as global key 

734 # and also write it to the config file so that same key will be used next time 

735 FERNET_KEY = _generate_fernet_key() 

736 conf.configuration_description["core"]["options"]["fernet_key"]["default"] = FERNET_KEY 

737 

738 JWT_SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8") 

739 conf.configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = JWT_SECRET_KEY 

740 pathlib.Path(airflow_config.__fspath__()).touch() 

741 make_group_other_inaccessible(airflow_config.__fspath__()) 

742 with open(airflow_config, "w") as file: 

743 conf.write( 

744 file, 

745 include_sources=False, 

746 include_env_vars=True, 

747 include_providers=True, 

748 extra_spacing=True, 

749 only_defaults=True, 

750 ) 

751 return conf 

752 

753 

754def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser): 

755 """ 

756 Load standard airflow configuration. 

757 

758 In case it finds that the configuration file is missing, it will create it and write the default 

759 configuration values there, based on defaults passed, and will add the comments and examples 

760 from the default configuration. 

761 

762 :param airflow_config_parser: parser to which the configuration will be loaded 

763 

764 """ 

765 global AIRFLOW_HOME 

766 log.info("Reading the config from %s", AIRFLOW_CONFIG) 

767 airflow_config_parser.read(AIRFLOW_CONFIG) 

768 if airflow_config_parser.has_option("core", "AIRFLOW_HOME"): 

769 msg = ( 

770 "Specifying both AIRFLOW_HOME environment variable and airflow_home " 

771 "in the config file is deprecated. Please use only the AIRFLOW_HOME " 

772 "environment variable and remove the config file entry." 

773 ) 

774 if "AIRFLOW_HOME" in os.environ: 

775 warnings.warn(msg, category=DeprecationWarning, stacklevel=1) 

776 elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME: 

777 warnings.warn( 

778 "Specifying airflow_home in the config file is deprecated. As you " 

779 "have left it at the default value you should remove the setting " 

780 "from your airflow.cfg and suffer no change in behaviour.", 

781 category=DeprecationWarning, 

782 stacklevel=1, 

783 ) 

784 else: 

785 AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home") 

786 warnings.warn(msg, category=DeprecationWarning, stacklevel=1) 

787 

788 

789def initialize_config() -> AirflowConfigParser: 

790 """ 

791 Load the Airflow config files. 

792 

793 Called for you automatically as part of the Airflow boot process. 

794 """ 

795 airflow_config_parser = AirflowConfigParser() 

796 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

797 airflow_config_parser.load_test_config() 

798 else: 

799 load_standard_airflow_configuration(airflow_config_parser) 

800 # If the user set unit_test_mode in the airflow.cfg, we still 

801 # want to respect that and then load the default unit test configuration 

802 # file on top of it. 

803 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

804 airflow_config_parser.load_test_config() 

805 return airflow_config_parser 

806 

807 

808def make_group_other_inaccessible(file_path: str): 

809 try: 

810 permissions = os.stat(file_path) 

811 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR)) 

812 except Exception as e: 

813 log.warning( 

814 "Could not change permissions of config file to be group/other inaccessible. " 

815 "Continuing with original permissions: %s", 

816 e, 

817 ) 

818 

819 

820def ensure_secrets_loaded( 

821 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

822) -> list[BaseSecretsBackend]: 

823 """ 

824 Ensure that all secrets backends are loaded. 

825 

826 If the secrets_backend_list contains only 2 default backends, reload it. 

827 """ 

828 # Check if the secrets_backend_list contains only 2 default backends. 

829 

830 # Check if we are loading the backends for worker too by checking if the default_backends is equal 

831 # to DEFAULT_SECRETS_SEARCH_PATH. 

832 if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH: 

833 return initialize_secrets_backends(default_backends=default_backends) 

834 return secrets_backend_list 

835 

836 

837def get_custom_secret_backend(worker_mode: bool = False) -> BaseSecretsBackend | None: 

838 """ 

839 Get Secret Backend if defined in airflow.cfg. 

840 

841 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

842 

843 This is a convenience function that calls conf._get_custom_secret_backend(). 

844 """ 

845 return conf._get_custom_secret_backend(worker_mode=worker_mode) 

846 

847 

848def initialize_secrets_backends( 

849 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

850) -> list[BaseSecretsBackend]: 

851 """ 

852 Initialize secrets backend. 

853 

854 * import secrets backend classes 

855 * instantiate them and return them in a list 

856 """ 

857 backend_list = [] 

858 worker_mode = False 

859 if default_backends != DEFAULT_SECRETS_SEARCH_PATH: 

860 worker_mode = True 

861 

862 custom_secret_backend = get_custom_secret_backend(worker_mode) 

863 

864 if custom_secret_backend is not None: 

865 backend_list.append(custom_secret_backend) 

866 

867 for class_name in default_backends: 

868 secrets_backend_cls = import_string(class_name) 

869 backend_list.append(secrets_backend_cls()) 

870 

871 return backend_list 

872 

873 

874def initialize_auth_manager() -> BaseAuthManager: 

875 """ 

876 Initialize auth manager. 

877 

878 * import user manager class 

879 * instantiate it and return it 

880 """ 

881 auth_manager_cls = conf.getimport(section="core", key="auth_manager") 

882 

883 if not auth_manager_cls: 

884 raise AirflowConfigException( 

885 "No auth manager defined in the config. Please specify one using section/key [core/auth_manager]." 

886 ) 

887 

888 return auth_manager_cls() 

889 

890 

891# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using 

892# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults. 

893AIRFLOW_HOME = get_airflow_home() 

894AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME) 

895 

896# Set up dags folder for unit tests 

897# this directory won't exist if users install via pip 

898_TEST_DAGS_FOLDER = os.path.join( 

899 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags" 

900) 

901if os.path.exists(_TEST_DAGS_FOLDER): 

902 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER 

903else: 

904 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags") 

905 

906# Set up plugins folder for unit tests 

907_TEST_PLUGINS_FOLDER = os.path.join( 

908 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins" 

909) 

910if os.path.exists(_TEST_PLUGINS_FOLDER): 

911 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER 

912else: 

913 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins") 

914 

915SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8") 

916FERNET_KEY = "" # Set only if needed when generating a new file 

917JWT_SECRET_KEY = "" 

918 

919conf: AirflowConfigParser = initialize_config() 

920secrets_backend_list = initialize_secrets_backends() 

921conf.validate()