Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/configuration.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

411 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import contextlib 

20import logging 

21import multiprocessing 

22import os 

23import pathlib 

24import re 

25import shlex 

26import stat 

27import subprocess 

28import sys 

29import warnings 

30from base64 import b64encode 

31from collections.abc import Callable 

32from configparser import ConfigParser 

33from copy import deepcopy 

34from inspect import ismodule 

35from io import StringIO 

36from re import Pattern 

37from typing import IO, TYPE_CHECKING, Any 

38from urllib.parse import urlsplit 

39 

40from typing_extensions import overload 

41 

42from airflow._shared.configuration.parser import ( 

43 VALUE_NOT_FOUND_SENTINEL, 

44 AirflowConfigParser as _SharedAirflowConfigParser, 

45 ValueNotFound, 

46) 

47from airflow._shared.module_loading import import_string 

48from airflow.exceptions import AirflowConfigException, RemovedInAirflow4Warning 

49from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH 

50from airflow.task.weight_rule import WeightRule 

51from airflow.utils import yaml 

52 

53if TYPE_CHECKING: 

54 from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager 

55 from airflow.secrets import BaseSecretsBackend 

56 

57log = logging.getLogger(__name__) 

58 

59# show Airflow's deprecation warnings 

60if not sys.warnoptions: 

61 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow") 

62 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow") 

63 

64_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$") 

65 

66ConfigType = str | int | float | bool 

67ConfigOptionsDictType = dict[str, ConfigType] 

68ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

69ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

70 

71ENV_VAR_PREFIX = "AIRFLOW__" 

72 

73 

74class _SecretKeys: 

75 """Holds the secret keys used in Airflow during runtime.""" 

76 

77 fernet_key: str | None = None 

78 jwt_secret_key: str | None = None 

79 

80 

81class ConfigModifications: 

82 """ 

83 Holds modifications to be applied when writing out the config. 

84 

85 :param rename: Mapping from (old_section, old_option) to (new_section, new_option) 

86 :param remove: Set of (section, option) to remove 

87 :param default_updates: Mapping from (section, option) to new default value 

88 """ 

89 

90 def __init__(self) -> None: 

91 self.rename: dict[tuple[str, str], tuple[str, str]] = {} 

92 self.remove: set[tuple[str, str]] = set() 

93 self.default_updates: dict[tuple[str, str], str] = {} 

94 

95 def add_rename(self, old_section: str, old_option: str, new_section: str, new_option: str) -> None: 

96 self.rename[(old_section, old_option)] = (new_section, new_option) 

97 

98 def add_remove(self, section: str, option: str) -> None: 

99 self.remove.add((section, option)) 

100 

101 def add_default_update(self, section: str, option: str, new_default: str) -> None: 

102 self.default_updates[(section, option)] = new_default 

103 

104 

105def _parse_sqlite_version(s: str) -> tuple[int, ...]: 

106 match = _SQLITE3_VERSION_PATTERN.match(s) 

107 if match is None: 

108 return () 

109 return tuple(int(p) for p in match.group("version").split(".")) 

110 

111 

112@overload 

113def expand_env_var(env_var: None) -> None: ... 

114 

115 

116@overload 

117def expand_env_var(env_var: str) -> str: ... 

118 

119 

120def expand_env_var(env_var: str | None) -> str | None: 

121 """ 

122 Expand (potentially nested) env vars. 

123 

124 Repeat and apply `expandvars` and `expanduser` until 

125 interpolation stops having any effect. 

126 """ 

127 if not env_var or not isinstance(env_var, str): 

128 return env_var 

129 while True: 

130 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

131 if interpolated == env_var: 

132 return interpolated 

133 env_var = interpolated 

134 

135 

136def run_command(command: str) -> str: 

137 """Run command and returns stdout.""" 

138 process = subprocess.Popen( 

139 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

140 ) 

141 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

142 

143 if process.returncode != 0: 

144 raise AirflowConfigException( 

145 f"Cannot execute {command}. Error code is: {process.returncode}. " 

146 f"Output: {output}, Stderr: {stderr}" 

147 ) 

148 

149 return output 

150 

151 

152def _default_config_file_path(file_name: str) -> str: 

153 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates") 

154 return os.path.join(templates_dir, file_name) 

155 

156 

157def retrieve_configuration_description( 

158 include_airflow: bool = True, 

159 include_providers: bool = True, 

160 selected_provider: str | None = None, 

161) -> dict[str, dict[str, Any]]: 

162 """ 

163 Read Airflow configuration description from YAML file. 

164 

165 :param include_airflow: Include Airflow configs 

166 :param include_providers: Include provider configs 

167 :param selected_provider: If specified, include selected provider only 

168 :return: Python dictionary containing configs & their info 

169 """ 

170 base_configuration_description: dict[str, dict[str, Any]] = {} 

171 if include_airflow: 

172 with open(_default_config_file_path("config.yml")) as config_file: 

173 base_configuration_description.update(yaml.safe_load(config_file)) 

174 if include_providers: 

175 from airflow.providers_manager import ProvidersManager 

176 

177 for provider, config in ProvidersManager().provider_configs: 

178 if not selected_provider or provider == selected_provider: 

179 base_configuration_description.update(config) 

180 return base_configuration_description 

181 

182 

183class AirflowConfigParser(_SharedAirflowConfigParser): 

184 """ 

185 Custom Airflow Configparser supporting defaults and deprecated options. 

186 

187 This is a subclass of the shared AirflowConfigParser that adds Core-specific initialization 

188 and functionality (providers, validation, writing, etc.). 

189 

190 The defaults are stored in the ``_default_values``. The configuration description keeps 

191 description of all the options available in Airflow (description follow config.yaml.schema). 

192 

193 :param default_config: default configuration (in the form of ini file). 

194 :param configuration_description: description of configuration to use 

195 """ 

196 

197 def __init__( 

198 self, 

199 default_config: str | None = None, 

200 *args, 

201 **kwargs, 

202 ): 

203 configuration_description = retrieve_configuration_description(include_providers=False) 

204 # For those who would like to use a different data structure to keep defaults: 

205 # We have to keep the default values in a ConfigParser rather than in any other 

206 # data structure, because the values we have might contain %% which are ConfigParser 

207 # interpolation placeholders. The _default_values config parser will interpolate them 

208 # properly when we call get() on it. 

209 _default_values = create_default_config_parser(configuration_description) 

210 super().__init__(configuration_description, _default_values, *args, **kwargs) 

211 self.configuration_description = configuration_description 

212 self._default_values = _default_values 

213 self._provider_config_fallback_default_values = create_provider_config_fallback_defaults() 

214 if default_config is not None: 

215 self._update_defaults_from_string(default_config) 

216 self._update_logging_deprecated_template_to_one_from_defaults() 

217 self.is_validated = False 

218 self._suppress_future_warnings = False 

219 self._providers_configuration_loaded = False 

220 

221 @property 

222 def _validators(self) -> list[Callable[[], None]]: 

223 """Overring _validators from shared base class to add core-specific validators.""" 

224 return [ 

225 self._validate_sqlite3_version, 

226 self._validate_enums, 

227 self._validate_deprecated_values, 

228 self._upgrade_postgres_metastore_conn, 

229 ] 

230 

231 @property 

232 def _lookup_sequence(self) -> list[Callable]: 

233 """Overring _lookup_sequence from shared base class to add provider fallbacks.""" 

234 return super()._lookup_sequence + [self._get_option_from_provider_fallbacks] 

235 

236 def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: 

237 """Override the base method to add provider fallbacks.""" 

238 return [ 

239 ("provider-fallback-defaults", self._provider_config_fallback_default_values), 

240 ("default", self._default_values), 

241 ("airflow.cfg", self), 

242 ] 

243 

244 def _get_option_from_provider_fallbacks( 

245 self, 

246 deprecated_key: str | None, 

247 deprecated_section: str | None, 

248 key: str, 

249 section: str, 

250 issue_warning: bool = True, 

251 extra_stacklevel: int = 0, 

252 **kwargs, 

253 ) -> str | ValueNotFound: 

254 """Get config option from provider fallback defaults.""" 

255 if self.get_provider_config_fallback_defaults(section, key) is not None: 

256 # no expansion needed 

257 return self.get_provider_config_fallback_defaults(section, key, **kwargs) 

258 return VALUE_NOT_FOUND_SENTINEL 

259 

260 def _update_logging_deprecated_template_to_one_from_defaults(self): 

261 default = self.get_default_value("logging", "log_filename_template") 

262 if default: 

263 # Tuple does not support item assignment, so we have to create a new tuple and replace it 

264 original_replacement = self.deprecated_values["logging"]["log_filename_template"] 

265 self.deprecated_values["logging"]["log_filename_template"] = ( 

266 original_replacement[0], 

267 default, 

268 ) 

269 

270 def get_provider_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: 

271 """Get provider config fallback default values.""" 

272 return self._provider_config_fallback_default_values.get(section, key, fallback=None, **kwargs) 

273 

274 # A mapping of old default values that we want to change and warn the user 

275 # about. Mapping of section -> setting -> { old, replace } 

276 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = { 

277 "logging": { 

278 "log_filename_template": ( 

279 re.compile( 

280 re.escape( 

281 "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log" 

282 ) 

283 ), 

284 # The actual replacement value will be updated after defaults are loaded from config.yml 

285 "XX-set-after-default-config-loaded-XX", 

286 ), 

287 }, 

288 "core": { 

289 "executor": (re.compile(re.escape("SequentialExecutor")), "LocalExecutor"), 

290 }, 

291 } 

292 

293 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"] 

294 enums_options = { 

295 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()), 

296 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"], 

297 ("core", "mp_start_method"): multiprocessing.get_all_start_methods(), 

298 ("dag_processor", "file_parsing_sort_mode"): [ 

299 "modified_time", 

300 "random_seeded_by_host", 

301 "alphabetical", 

302 ], 

303 ("logging", "logging_level"): _available_logging_levels, 

304 ("logging", "fab_logging_level"): _available_logging_levels, 

305 # celery_logging_level can be empty, which uses logging_level as fallback 

306 ("logging", "celery_logging_level"): [*_available_logging_levels, ""], 

307 # uvicorn and gunicorn logging levels for web servers 

308 ("logging", "uvicorn_logging_level"): _available_logging_levels, 

309 ("logging", "gunicorn_logging_level"): _available_logging_levels, 

310 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""], 

311 ("api", "grid_view_sorting_order"): ["topological", "hierarchical_alphabetical"], 

312 } 

313 

314 upgraded_values: dict[tuple[str, str], str] 

315 """Mapping of (section,option) to the old value that was upgraded""" 

316 

317 def write_custom_config( 

318 self, 

319 file: IO[str], 

320 comment_out_defaults: bool = True, 

321 include_descriptions: bool = True, 

322 extra_spacing: bool = True, 

323 modifications: ConfigModifications | None = None, 

324 ) -> None: 

325 """ 

326 Write a configuration file using a ConfigModifications object. 

327 

328 This method includes only options from the current airflow.cfg. For each option: 

329 - If it's marked for removal, omit it. 

330 - If renamed, output it under its new name and add a comment indicating its original location. 

331 - If a default update is specified, apply the new default and output the option as a commented line. 

332 - Otherwise, if the current value equals the default and comment_out_defaults is True, output it as a comment. 

333 Options absent from the current airflow.cfg are omitted. 

334 

335 :param file: File to write the configuration. 

336 :param comment_out_defaults: If True, options whose value equals the default are written as comments. 

337 :param include_descriptions: Whether to include section descriptions. 

338 :param extra_spacing: Whether to insert an extra blank line after each option. 

339 :param modifications: ConfigModifications instance with rename, remove, and default updates. 

340 """ 

341 modifications = modifications or ConfigModifications() 

342 output: dict[str, list[tuple[str, str, bool, str]]] = {} 

343 

344 for section in self._sections: # type: ignore[attr-defined] # accessing _sections from ConfigParser 

345 for option, orig_value in self._sections[section].items(): # type: ignore[attr-defined] 

346 key = (section.lower(), option.lower()) 

347 if key in modifications.remove: 

348 continue 

349 

350 mod_comment = "" 

351 if key in modifications.rename: 

352 new_sec, new_opt = modifications.rename[key] 

353 effective_section = new_sec 

354 effective_option = new_opt 

355 mod_comment += f"# Renamed from {section}.{option}\n" 

356 else: 

357 effective_section = section 

358 effective_option = option 

359 

360 value = orig_value 

361 if key in modifications.default_updates: 

362 mod_comment += ( 

363 f"# Default updated from {orig_value} to {modifications.default_updates[key]}\n" 

364 ) 

365 value = modifications.default_updates[key] 

366 

367 default_value = self.get_default_value(effective_section, effective_option, fallback="") 

368 is_default = str(value) == str(default_value) 

369 output.setdefault(effective_section.lower(), []).append( 

370 (effective_option, str(value), is_default, mod_comment) 

371 ) 

372 

373 for section, options in output.items(): 

374 section_buffer = StringIO() 

375 section_buffer.write(f"[{section}]\n") 

376 if include_descriptions: 

377 description = self.configuration_description.get(section, {}).get("description", "") 

378 if description: 

379 for line in description.splitlines(): 

380 section_buffer.write(f"# {line}\n") 

381 section_buffer.write("\n") 

382 for option, value_str, is_default, mod_comment in options: 

383 key = (section.lower(), option.lower()) 

384 if key in modifications.default_updates and comment_out_defaults: 

385 section_buffer.write(f"# {option} = {value_str}\n") 

386 else: 

387 if mod_comment: 

388 section_buffer.write(mod_comment) 

389 if is_default and comment_out_defaults: 

390 section_buffer.write(f"# {option} = {value_str}\n") 

391 else: 

392 section_buffer.write(f"{option} = {value_str}\n") 

393 if extra_spacing: 

394 section_buffer.write("\n") 

395 content = section_buffer.getvalue().strip() 

396 if content: 

397 file.write(f"{content}\n\n") 

398 

399 def _ensure_providers_config_loaded(self) -> None: 

400 """Ensure providers configurations are loaded.""" 

401 if not self._providers_configuration_loaded: 

402 from airflow.providers_manager import ProvidersManager 

403 

404 ProvidersManager()._initialize_providers_configuration() 

405 

406 def _ensure_providers_config_unloaded(self) -> bool: 

407 """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded.""" 

408 if self._providers_configuration_loaded: 

409 self.restore_core_default_configuration() 

410 return True 

411 return False 

412 

413 def _reload_provider_configs(self) -> None: 

414 """Reload providers configuration.""" 

415 self.load_providers_configuration() 

416 

417 def restore_core_default_configuration(self) -> None: 

418 """ 

419 Restore default configuration for core Airflow. 

420 

421 It does not restore configuration for providers. If you want to restore configuration for 

422 providers, you need to call ``load_providers_configuration`` method. 

423 """ 

424 self.configuration_description = retrieve_configuration_description(include_providers=False) 

425 self._default_values = create_default_config_parser(self.configuration_description) 

426 self._providers_configuration_loaded = False 

427 

428 def _upgrade_postgres_metastore_conn(self): 

429 """ 

430 Upgrade SQL schemas. 

431 

432 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres` 

433 must be replaced with `postgresql`. 

434 """ 

435 section, key = "database", "sql_alchemy_conn" 

436 old_value = self.get(section, key, _extra_stacklevel=1) 

437 bad_schemes = ["postgres+psycopg2", "postgres"] 

438 good_scheme = "postgresql" 

439 parsed = urlsplit(old_value) 

440 if parsed.scheme in bad_schemes: 

441 warnings.warn( 

442 f"Bad scheme in Airflow configuration [database] sql_alchemy_conn: `{parsed.scheme}`. " 

443 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must " 

444 f"change to `{good_scheme}` before the next Airflow release.", 

445 FutureWarning, 

446 stacklevel=1, 

447 ) 

448 self.upgraded_values[(section, key)] = old_value 

449 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value) 

450 self._update_env_var(section=section, name=key, new_value=new_value) 

451 

452 # if the old value is set via env var, we need to wipe it 

453 # otherwise, it'll "win" over our adjusted value 

454 old_env_var = self._env_var_name("core", key) 

455 os.environ.pop(old_env_var, None) 

456 

457 def _validate_enums(self): 

458 """Validate that enum type config has an accepted value.""" 

459 for (section_key, option_key), enum_options in self.enums_options.items(): 

460 if self.has_option(section_key, option_key): 

461 value = self.get(section_key, option_key, fallback=None) 

462 if value and value not in enum_options: 

463 raise AirflowConfigException( 

464 f"`[{section_key}] {option_key}` should not be " 

465 f"{value!r}. Possible values: {', '.join(enum_options)}." 

466 ) 

467 

468 def _validate_sqlite3_version(self): 

469 """ 

470 Validate SQLite version. 

471 

472 Some features in storing rendered fields require SQLite >= 3.15.0. 

473 """ 

474 if "sqlite" not in self.get("database", "sql_alchemy_conn"): 

475 return 

476 

477 import sqlite3 

478 

479 min_sqlite_version = (3, 15, 0) 

480 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version: 

481 return 

482 

483 from airflow.utils.docs import get_docs_url 

484 

485 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version) 

486 raise AirflowConfigException( 

487 f"error: SQLite C library too old (< {min_sqlite_version_str}). " 

488 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}" 

489 ) 

490 

491 def mask_secrets(self): 

492 from airflow._shared.secrets_masker import mask_secret as mask_secret_core 

493 from airflow.sdk.log import mask_secret as mask_secret_sdk 

494 

495 for section, key in self.sensitive_config_values: 

496 try: 

497 with self.suppress_future_warnings(): 

498 value = self.get(section, key, suppress_warnings=True) 

499 except AirflowConfigException: 

500 log.debug( 

501 "Could not retrieve value from section %s, for key %s. Skipping redaction of this conf.", 

502 section, 

503 key, 

504 ) 

505 continue 

506 mask_secret_core(value) 

507 mask_secret_sdk(value) 

508 

509 def load_test_config(self): 

510 """ 

511 Use test configuration rather than the configuration coming from airflow defaults. 

512 

513 When running tests we use special the unit_test configuration to avoid accidental modifications and 

514 different behaviours when running the tests. Values for those test configuration are stored in 

515 the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder 

516 and you need to change values there if you want to make some specific configuration to be used 

517 """ 

518 from cryptography.fernet import Fernet 

519 

520 unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg" 

521 unit_test_config = unit_test_config_file.read_text() 

522 self.remove_all_read_configurations() 

523 with StringIO(unit_test_config) as test_config_file: 

524 self.read_file(test_config_file) 

525 

526 # We need those globals before we run "get_all_expansion_variables" because this is where 

527 # the variables are expanded from in the configuration - set to random values for tests 

528 _SecretKeys.fernet_key = Fernet.generate_key().decode() 

529 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8") 

530 self.expand_all_configuration_values() 

531 log.info("Unit test configuration loaded from 'config_unit_tests.cfg'") 

532 

533 def expand_all_configuration_values(self): 

534 """Expand all configuration values using global and local variables defined in this module.""" 

535 all_vars = get_all_expansion_variables() 

536 for section in self.sections(): 

537 for key, value in self.items(section): 

538 if value is not None: 

539 if self.has_option(section, key): 

540 self.remove_option(section, key) 

541 if self.is_template(section, key) or not isinstance(value, str): 

542 self.set(section, key, value) 

543 else: 

544 self.set(section, key, value.format(**all_vars)) 

545 

546 def remove_all_read_configurations(self): 

547 """Remove all read configurations, leaving only default values in the config.""" 

548 for section in self.sections(): 

549 self.remove_section(section) 

550 

551 @property 

552 def providers_configuration_loaded(self) -> bool: 

553 """Checks if providers have been loaded.""" 

554 return self._providers_configuration_loaded 

555 

556 def load_providers_configuration(self): 

557 """ 

558 Load configuration for providers. 

559 

560 This should be done after initial configuration have been performed. Initializing and discovering 

561 providers is an expensive operation and cannot be performed when we load configuration for the first 

562 time when airflow starts, because we initialize configuration very early, during importing of the 

563 `airflow` package and the module is not yet ready to be used when it happens and until configuration 

564 and settings are loaded. Therefore, in order to reload provider configuration we need to additionally 

565 load provider - specific configuration. 

566 """ 

567 log.debug("Loading providers configuration") 

568 from airflow.providers_manager import ProvidersManager 

569 

570 self.restore_core_default_configuration() 

571 for provider, config in ProvidersManager().already_initialized_provider_configs: 

572 for provider_section, provider_section_content in config.items(): 

573 provider_options = provider_section_content["options"] 

574 section_in_current_config = self.configuration_description.get(provider_section) 

575 if not section_in_current_config: 

576 self.configuration_description[provider_section] = deepcopy(provider_section_content) 

577 section_in_current_config = self.configuration_description.get(provider_section) 

578 section_in_current_config["source"] = f"default-{provider}" 

579 for option in provider_options: 

580 section_in_current_config["options"][option]["source"] = f"default-{provider}" 

581 else: 

582 section_source = section_in_current_config.get("source", "Airflow's core package").split( 

583 "default-" 

584 )[-1] 

585 raise AirflowConfigException( 

586 f"The provider {provider} is attempting to contribute " 

587 f"configuration section {provider_section} that " 

588 f"has already been added before. The source of it: {section_source}. " 

589 "This is forbidden. A provider can only add new sections. It " 

590 "cannot contribute options to existing sections or override other " 

591 "provider's configuration.", 

592 UserWarning, 

593 ) 

594 self._default_values = create_default_config_parser(self.configuration_description) 

595 # sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete 

596 # the cached values, and it will be refreshed on next access. 

597 with contextlib.suppress(AttributeError): 

598 # no problem if cache is not set yet 

599 del self.sensitive_config_values 

600 self._providers_configuration_loaded = True 

601 

602 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

603 """ 

604 Override to use module-level function that reads from global conf. 

605 

606 This ensures as_dict() and other methods use the same secrets backend 

607 configuration as the global conf instance (set via conf_vars in tests). 

608 """ 

609 secrets_client = get_custom_secret_backend() 

610 if not secrets_client: 

611 return None 

612 try: 

613 return secrets_client.get_config(config_key) 

614 except Exception as e: 

615 raise AirflowConfigException( 

616 "Cannot retrieve config from alternative secrets backend. " 

617 "Make sure it is configured properly and that the Backend " 

618 "is accessible.\n" 

619 f"{e}" 

620 ) 

621 

622 def __getstate__(self) -> dict[str, Any]: 

623 """Return the state of the object as a dictionary for pickling.""" 

624 return { 

625 name: getattr(self, name) 

626 for name in [ 

627 "_sections", 

628 "is_validated", 

629 "configuration_description", 

630 "upgraded_values", 

631 "_default_values", 

632 ] 

633 } 

634 

635 def __setstate__(self, state) -> None: 

636 """Restore the state of the object from a dictionary representation.""" 

637 self.__init__() # type: ignore[misc] 

638 config = state.pop("_sections") 

639 self.read_dict(config) 

640 self.__dict__.update(state) 

641 

642 

643def get_airflow_home() -> str: 

644 """Get path to Airflow Home.""" 

645 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow")) 

646 

647 

648def get_airflow_config(airflow_home: str) -> str: 

649 """Get Path to airflow.cfg path.""" 

650 airflow_config_var = os.environ.get("AIRFLOW_CONFIG") 

651 if airflow_config_var is None: 

652 return os.path.join(airflow_home, "airflow.cfg") 

653 return expand_env_var(airflow_config_var) 

654 

655 

656def get_all_expansion_variables() -> dict[str, Any]: 

657 return { 

658 "FERNET_KEY": _SecretKeys.fernet_key, 

659 "JWT_SECRET_KEY": _SecretKeys.jwt_secret_key, 

660 **{ 

661 k: v 

662 for k, v in globals().items() 

663 if not k.startswith("_") and not callable(v) and not ismodule(v) 

664 }, 

665 } 

666 

667 

668def _generate_fernet_key() -> str: 

669 from cryptography.fernet import Fernet 

670 

671 return Fernet.generate_key().decode() 

672 

673 

674def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser: 

675 """ 

676 Create default config parser based on configuration description. 

677 

678 It creates ConfigParser with all default values retrieved from the configuration description and 

679 expands all the variables from the global and local variables defined in this module. 

680 

681 :param configuration_description: configuration description - retrieved from config.yaml files 

682 following the schema defined in "config.yml.schema.json" in the config_templates folder. 

683 :return: Default Config Parser that can be used to read configuration values from. 

684 """ 

685 parser = ConfigParser() 

686 all_vars = get_all_expansion_variables() 

687 for section, section_desc in configuration_description.items(): 

688 parser.add_section(section) 

689 options = section_desc["options"] 

690 for key in options: 

691 default_value = options[key]["default"] 

692 is_template = options[key].get("is_template", False) 

693 if default_value is not None: 

694 if is_template or not isinstance(default_value, str): 

695 parser.set(section, key, default_value) 

696 else: 

697 parser.set(section, key, default_value.format(**all_vars)) 

698 return parser 

699 

700 

701def create_provider_config_fallback_defaults() -> ConfigParser: 

702 """ 

703 Create fallback defaults. 

704 

705 This parser contains provider defaults for Airflow configuration, containing fallback default values 

706 that might be needed when provider classes are being imported - before provider's configuration 

707 is loaded. 

708 

709 Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead 

710 to retrieving provider configuration before the defaults for the provider are loaded. 

711 

712 Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or 

713 environment variables) those will be used as usual. 

714 

715 NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication, 

716 at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed. 

717 

718 You've been warned! 

719 """ 

720 config_parser = ConfigParser() 

721 config_parser.read(_default_config_file_path("provider_config_fallback_defaults.cfg")) 

722 return config_parser 

723 

724 

725def write_default_airflow_configuration_if_needed() -> AirflowConfigParser: 

726 airflow_config = pathlib.Path(AIRFLOW_CONFIG) 

727 if airflow_config.is_dir(): 

728 msg = ( 

729 "Airflow config expected to be a path to the configuration file, " 

730 f"but got a directory {airflow_config.__fspath__()!r}." 

731 ) 

732 raise IsADirectoryError(msg) 

733 if not airflow_config.exists(): 

734 log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__()) 

735 config_directory = airflow_config.parent 

736 if not config_directory.exists(): 

737 if not config_directory.is_relative_to(AIRFLOW_HOME): 

738 msg = ( 

739 f"Config directory {config_directory.__fspath__()!r} not exists " 

740 f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. " 

741 "Please create this directory first." 

742 ) 

743 raise FileNotFoundError(msg) from None 

744 log.debug("Create directory %r for Airflow config", config_directory.__fspath__()) 

745 config_directory.mkdir(parents=True, exist_ok=True) 

746 if conf.get("core", "fernet_key", fallback=None) in (None, ""): 

747 # We know that fernet_key is not set, so we can generate it, set as global key 

748 # and also write it to the config file so that same key will be used next time 

749 _SecretKeys.fernet_key = _generate_fernet_key() 

750 conf.configuration_description["core"]["options"]["fernet_key"]["default"] = ( 

751 _SecretKeys.fernet_key 

752 ) 

753 

754 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8") 

755 conf.configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = ( 

756 _SecretKeys.jwt_secret_key 

757 ) 

758 pathlib.Path(airflow_config.__fspath__()).touch() 

759 make_group_other_inaccessible(airflow_config.__fspath__()) 

760 with open(airflow_config, "w") as file: 

761 conf.write( 

762 file, 

763 include_sources=False, 

764 include_env_vars=True, 

765 include_providers=True, 

766 extra_spacing=True, 

767 only_defaults=True, 

768 ) 

769 return conf 

770 

771 

772def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser): 

773 """ 

774 Load standard airflow configuration. 

775 

776 In case it finds that the configuration file is missing, it will create it and write the default 

777 configuration values there, based on defaults passed, and will add the comments and examples 

778 from the default configuration. 

779 

780 :param airflow_config_parser: parser to which the configuration will be loaded 

781 

782 """ 

783 global AIRFLOW_HOME # to be cleaned in Airflow 4.0 

784 log.info("Reading the config from %s", AIRFLOW_CONFIG) 

785 airflow_config_parser.read(AIRFLOW_CONFIG) 

786 if airflow_config_parser.has_option("core", "AIRFLOW_HOME"): 

787 msg = ( 

788 "Specifying both AIRFLOW_HOME environment variable and airflow_home " 

789 "in the config file is deprecated. Please use only the AIRFLOW_HOME " 

790 "environment variable and remove the config file entry." 

791 ) 

792 if "AIRFLOW_HOME" in os.environ: 

793 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1) 

794 elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME: 

795 warnings.warn( 

796 "Specifying airflow_home in the config file is deprecated. As you " 

797 "have left it at the default value you should remove the setting " 

798 "from your airflow.cfg and suffer no change in behaviour.", 

799 category=RemovedInAirflow4Warning, 

800 stacklevel=1, 

801 ) 

802 else: 

803 AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home") 

804 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1) 

805 

806 

807def initialize_config() -> AirflowConfigParser: 

808 """ 

809 Load the Airflow config files. 

810 

811 Called for you automatically as part of the Airflow boot process. 

812 """ 

813 airflow_config_parser = AirflowConfigParser() 

814 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

815 airflow_config_parser.load_test_config() 

816 else: 

817 load_standard_airflow_configuration(airflow_config_parser) 

818 # If the user set unit_test_mode in the airflow.cfg, we still 

819 # want to respect that and then load the default unit test configuration 

820 # file on top of it. 

821 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

822 airflow_config_parser.load_test_config() 

823 return airflow_config_parser 

824 

825 

826def make_group_other_inaccessible(file_path: str): 

827 try: 

828 permissions = os.stat(file_path) 

829 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR)) 

830 except Exception as e: 

831 log.warning( 

832 "Could not change permissions of config file to be group/other inaccessible. " 

833 "Continuing with original permissions: %s", 

834 e, 

835 ) 

836 

837 

838def ensure_secrets_loaded( 

839 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

840) -> list[BaseSecretsBackend]: 

841 """ 

842 Ensure that all secrets backends are loaded. 

843 

844 If the secrets_backend_list contains only 2 default backends, reload it. 

845 """ 

846 # Check if the secrets_backend_list contains only 2 default backends. 

847 

848 # Check if we are loading the backends for worker too by checking if the default_backends is equal 

849 # to DEFAULT_SECRETS_SEARCH_PATH. 

850 if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH: 

851 return initialize_secrets_backends(default_backends=default_backends) 

852 return secrets_backend_list 

853 

854 

855def get_custom_secret_backend(worker_mode: bool = False) -> BaseSecretsBackend | None: 

856 """ 

857 Get Secret Backend if defined in airflow.cfg. 

858 

859 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

860 

861 This is a convenience function that calls conf._get_custom_secret_backend(). 

862 """ 

863 return conf._get_custom_secret_backend(worker_mode=worker_mode) 

864 

865 

866def initialize_secrets_backends( 

867 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

868) -> list[BaseSecretsBackend]: 

869 """ 

870 Initialize secrets backend. 

871 

872 * import secrets backend classes 

873 * instantiate them and return them in a list 

874 """ 

875 backend_list = [] 

876 worker_mode = False 

877 if default_backends != DEFAULT_SECRETS_SEARCH_PATH: 

878 worker_mode = True 

879 

880 custom_secret_backend = get_custom_secret_backend(worker_mode) 

881 

882 if custom_secret_backend is not None: 

883 backend_list.append(custom_secret_backend) 

884 

885 for class_name in default_backends: 

886 secrets_backend_cls = import_string(class_name) 

887 backend_list.append(secrets_backend_cls()) 

888 

889 return backend_list 

890 

891 

892def initialize_auth_manager() -> BaseAuthManager: 

893 """ 

894 Initialize auth manager. 

895 

896 * import user manager class 

897 * instantiate it and return it 

898 """ 

899 auth_manager_cls = conf.getimport(section="core", key="auth_manager") 

900 

901 if not auth_manager_cls: 

902 raise AirflowConfigException( 

903 "No auth manager defined in the config. Please specify one using section/key [core/auth_manager]." 

904 ) 

905 

906 return auth_manager_cls() 

907 

908 

909# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using 

910# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults. 

911AIRFLOW_HOME = get_airflow_home() 

912AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME) 

913 

914# Set up dags folder for unit tests 

915# this directory won't exist if users install via pip 

916_TEST_DAGS_FOLDER = os.path.join( 

917 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags" 

918) 

919if os.path.exists(_TEST_DAGS_FOLDER): 

920 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER 

921else: 

922 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags") 

923 

924# Set up plugins folder for unit tests 

925_TEST_PLUGINS_FOLDER = os.path.join( 

926 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins" 

927) 

928if os.path.exists(_TEST_PLUGINS_FOLDER): 

929 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER 

930else: 

931 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins") 

932 

933SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8") 

934 

935conf: AirflowConfigParser = initialize_config() 

936secrets_backend_list = initialize_secrets_backends() 

937conf.validate()