Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/configuration.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

361 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import logging 

20import os 

21import pathlib 

22import re 

23import shlex 

24import stat 

25import subprocess 

26import sys 

27import warnings 

28from base64 import b64encode 

29from collections.abc import Callable 

30from configparser import ConfigParser 

31from inspect import ismodule 

32from io import StringIO 

33from re import Pattern 

34from typing import IO, TYPE_CHECKING, Any 

35from urllib.parse import urlsplit 

36 

37from typing_extensions import overload 

38 

39from airflow._shared.configuration.parser import ( 

40 AirflowConfigParser as _SharedAirflowConfigParser, 

41 configure_parser_from_configuration_description, 

42) 

43from airflow._shared.module_loading import import_string 

44from airflow.exceptions import AirflowConfigException, RemovedInAirflow4Warning 

45from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH 

46from airflow.task.weight_rule import WeightRule 

47from airflow.utils import yaml 

48 

49if TYPE_CHECKING: 

50 from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager 

51 from airflow.secrets import BaseSecretsBackend 

52 

53log = logging.getLogger(__name__) 

54 

55# show Airflow's deprecation warnings 

56if not sys.warnoptions: 

57 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow") 

58 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow") 

59 

60_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$") 

61 

62ConfigType = str | int | float | bool 

63ConfigOptionsDictType = dict[str, ConfigType] 

64ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

65ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

66 

67ENV_VAR_PREFIX = "AIRFLOW__" 

68 

69 

70class _SecretKeys: 

71 """Holds the secret keys used in Airflow during runtime.""" 

72 

73 fernet_key: str = "" # Set only if needed when generating a new file 

74 jwt_secret_key: str = "" 

75 

76 

77class ConfigModifications: 

78 """ 

79 Holds modifications to be applied when writing out the config. 

80 

81 :param rename: Mapping from (old_section, old_option) to (new_section, new_option) 

82 :param remove: Set of (section, option) to remove 

83 :param default_updates: Mapping from (section, option) to new default value 

84 """ 

85 

86 def __init__(self) -> None: 

87 self.rename: dict[tuple[str, str], tuple[str, str]] = {} 

88 self.remove: set[tuple[str, str]] = set() 

89 self.default_updates: dict[tuple[str, str], str] = {} 

90 

91 def add_rename(self, old_section: str, old_option: str, new_section: str, new_option: str) -> None: 

92 self.rename[(old_section, old_option)] = (new_section, new_option) 

93 

94 def add_remove(self, section: str, option: str) -> None: 

95 self.remove.add((section, option)) 

96 

97 def add_default_update(self, section: str, option: str, new_default: str) -> None: 

98 self.default_updates[(section, option)] = new_default 

99 

100 

101def _parse_sqlite_version(s: str) -> tuple[int, ...]: 

102 match = _SQLITE3_VERSION_PATTERN.match(s) 

103 if match is None: 

104 return () 

105 return tuple(int(p) for p in match.group("version").split(".")) 

106 

107 

108@overload 

109def expand_env_var(env_var: None) -> None: ... 

110 

111 

112@overload 

113def expand_env_var(env_var: str) -> str: ... 

114 

115 

116def expand_env_var(env_var: str | None) -> str | None: 

117 """ 

118 Expand (potentially nested) env vars. 

119 

120 Repeat and apply `expandvars` and `expanduser` until 

121 interpolation stops having any effect. 

122 """ 

123 if not env_var or not isinstance(env_var, str): 

124 return env_var 

125 while True: 

126 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

127 if interpolated == env_var: 

128 return interpolated 

129 env_var = interpolated 

130 

131 

132def run_command(command: str) -> str: 

133 """Run command and returns stdout.""" 

134 process = subprocess.Popen( 

135 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

136 ) 

137 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

138 

139 if process.returncode != 0: 

140 raise AirflowConfigException( 

141 f"Cannot execute {command}. Error code is: {process.returncode}. " 

142 f"Output: {output}, Stderr: {stderr}" 

143 ) 

144 

145 return output 

146 

147 

148def _default_config_file_path(file_name: str) -> str: 

149 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates") 

150 return os.path.join(templates_dir, file_name) 

151 

152 

153def retrieve_configuration_description( 

154 include_airflow: bool = True, 

155 include_providers: bool = True, 

156 selected_provider: str | None = None, 

157) -> dict[str, dict[str, Any]]: 

158 """ 

159 Read Airflow configuration description from YAML file. 

160 

161 :param include_airflow: Include Airflow configs 

162 :param include_providers: Include provider configs 

163 :param selected_provider: If specified, include selected provider only 

164 :return: Python dictionary containing configs & their info 

165 """ 

166 base_configuration_description: dict[str, dict[str, Any]] = {} 

167 if include_airflow: 

168 with open(_default_config_file_path("config.yml")) as config_file: 

169 base_configuration_description.update(yaml.safe_load(config_file)) 

170 if include_providers: 

171 from airflow.providers_manager import ProvidersManager 

172 

173 for provider, config in ProvidersManager().provider_configs: 

174 if not selected_provider or provider == selected_provider: 

175 base_configuration_description.update(config) 

176 return base_configuration_description 

177 

178 

179class AirflowConfigParser(_SharedAirflowConfigParser): 

180 """ 

181 Custom Airflow Configparser supporting defaults and deprecated options. 

182 

183 This is a subclass of the shared AirflowConfigParser that adds Core-specific initialization 

184 and functionality (providers, validation, writing, etc.). 

185 

186 The defaults are stored in the ``_default_values``. The configuration description keeps 

187 description of all the options available in Airflow (description follow config.yaml.schema). 

188 

189 :param default_config: default configuration (in the form of ini file). 

190 :param configuration_description: description of configuration to use 

191 """ 

192 

193 def __init__( 

194 self, 

195 default_config: str | None = None, 

196 *args, 

197 **kwargs, 

198 ): 

199 _configuration_description = retrieve_configuration_description(include_providers=False) 

200 # For those who would like to use a different data structure to keep defaults: 

201 # We have to keep the default values in a ConfigParser rather than in any other 

202 # data structure, because the values we have might contain %% which are ConfigParser 

203 # interpolation placeholders. The _default_values config parser will interpolate them 

204 # properly when we call get() on it. 

205 _default_values = create_default_config_parser(_configuration_description) 

206 from airflow.providers_manager import ProvidersManager 

207 

208 super().__init__( 

209 _configuration_description, 

210 _default_values, 

211 ProvidersManager, 

212 create_default_config_parser, 

213 _default_config_file_path("provider_config_fallback_defaults.cfg"), 

214 *args, 

215 **kwargs, 

216 ) 

217 self._configuration_description = _configuration_description 

218 self._default_values = _default_values 

219 if default_config is not None: 

220 self._update_defaults_from_string(default_config) 

221 self._update_logging_deprecated_template_to_one_from_defaults() 

222 self.is_validated = False 

223 self._suppress_future_warnings = False 

224 

225 @property 

226 def _validators(self) -> list[Callable[[], None]]: 

227 """Overring _validators from shared base class to add core-specific validators.""" 

228 return [ 

229 self._validate_sqlite3_version, 

230 self._validate_enums, 

231 self._validate_deprecated_values, 

232 self._upgrade_postgres_metastore_conn, 

233 ] 

234 

235 def _update_logging_deprecated_template_to_one_from_defaults(self): 

236 default = self.get_default_value("logging", "log_filename_template") 

237 if default: 

238 # Tuple does not support item assignment, so we have to create a new tuple and replace it 

239 original_replacement = self.deprecated_values["logging"]["log_filename_template"] 

240 self.deprecated_values["logging"]["log_filename_template"] = ( 

241 original_replacement[0], 

242 default, 

243 ) 

244 

245 # A mapping of old default values that we want to change and warn the user 

246 # about. Mapping of section -> setting -> { old, replace } 

247 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = { 

248 "logging": { 

249 "log_filename_template": ( 

250 re.compile( 

251 re.escape( 

252 "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log" 

253 ) 

254 ), 

255 # The actual replacement value will be updated after defaults are loaded from config.yml 

256 "XX-set-after-default-config-loaded-XX", 

257 ), 

258 }, 

259 "core": { 

260 "executor": (re.compile(re.escape("SequentialExecutor")), "LocalExecutor"), 

261 }, 

262 } 

263 

264 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"] 

265 enums_options = { 

266 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()), 

267 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"], 

268 ("dag_processor", "file_parsing_sort_mode"): [ 

269 "modified_time", 

270 "random_seeded_by_host", 

271 "alphabetical", 

272 ], 

273 ("logging", "logging_level"): _available_logging_levels, 

274 ("logging", "fab_logging_level"): _available_logging_levels, 

275 # celery_logging_level can be empty, which uses logging_level as fallback 

276 ("logging", "celery_logging_level"): [*_available_logging_levels, ""], 

277 # uvicorn and gunicorn logging levels for web servers 

278 ("logging", "uvicorn_logging_level"): _available_logging_levels, 

279 ("logging", "gunicorn_logging_level"): _available_logging_levels, 

280 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""], 

281 ("api", "grid_view_sorting_order"): ["topological", "hierarchical_alphabetical"], 

282 ("logging", "dag_processor_log_target"): ["file", "stdout"], 

283 } 

284 

285 upgraded_values: dict[tuple[str, str], str] 

286 """Mapping of (section,option) to the old value that was upgraded""" 

287 

288 def write_custom_config( 

289 self, 

290 file: IO[str], 

291 comment_out_defaults: bool = True, 

292 include_descriptions: bool = True, 

293 extra_spacing: bool = True, 

294 modifications: ConfigModifications | None = None, 

295 ) -> None: 

296 """ 

297 Write a configuration file using a ConfigModifications object. 

298 

299 This method includes only options from the current airflow.cfg. For each option: 

300 - If it's marked for removal, omit it. 

301 - If renamed, output it under its new name and add a comment indicating its original location. 

302 - If a default update is specified, apply the new default and output the option as a commented line. 

303 - Otherwise, if the current value equals the default and comment_out_defaults is True, output it as a comment. 

304 Options absent from the current airflow.cfg are omitted. 

305 

306 :param file: File to write the configuration. 

307 :param comment_out_defaults: If True, options whose value equals the default are written as comments. 

308 :param include_descriptions: Whether to include section descriptions. 

309 :param extra_spacing: Whether to insert an extra blank line after each option. 

310 :param modifications: ConfigModifications instance with rename, remove, and default updates. 

311 """ 

312 modifications = modifications or ConfigModifications() 

313 output: dict[str, list[tuple[str, str, bool, str]]] = {} 

314 

315 for section in self._sections: # type: ignore[attr-defined] # accessing _sections from ConfigParser 

316 for option, orig_value in self._sections[section].items(): # type: ignore[attr-defined] 

317 key = (section.lower(), option.lower()) 

318 if key in modifications.remove: 

319 continue 

320 

321 mod_comment = "" 

322 if key in modifications.rename: 

323 new_sec, new_opt = modifications.rename[key] 

324 effective_section = new_sec 

325 effective_option = new_opt 

326 mod_comment += f"# Renamed from {section}.{option}\n" 

327 else: 

328 effective_section = section 

329 effective_option = option 

330 

331 value = orig_value 

332 if key in modifications.default_updates: 

333 mod_comment += ( 

334 f"# Default updated from {orig_value} to {modifications.default_updates[key]}\n" 

335 ) 

336 value = modifications.default_updates[key] 

337 

338 default_value = self.get_default_value(effective_section, effective_option, fallback="") 

339 is_default = str(value) == str(default_value) 

340 output.setdefault(effective_section.lower(), []).append( 

341 (effective_option, str(value), is_default, mod_comment) 

342 ) 

343 

344 for section, options in output.items(): 

345 section_buffer = StringIO() 

346 section_buffer.write(f"[{section}]\n") 

347 if include_descriptions: 

348 description = self.configuration_description.get(section, {}).get("description", "") 

349 if description: 

350 for line in description.splitlines(): 

351 section_buffer.write(f"# {line}\n") 

352 section_buffer.write("\n") 

353 for option, value_str, is_default, mod_comment in options: 

354 key = (section.lower(), option.lower()) 

355 if key in modifications.default_updates and comment_out_defaults: 

356 section_buffer.write(f"# {option} = {value_str}\n") 

357 else: 

358 if mod_comment: 

359 section_buffer.write(mod_comment) 

360 if is_default and comment_out_defaults: 

361 section_buffer.write(f"# {option} = {value_str}\n") 

362 else: 

363 section_buffer.write(f"{option} = {value_str}\n") 

364 if extra_spacing: 

365 section_buffer.write("\n") 

366 content = section_buffer.getvalue().strip() 

367 if content: 

368 file.write(f"{content}\n\n") 

369 

370 def _upgrade_postgres_metastore_conn(self): 

371 """ 

372 Upgrade SQL schemas. 

373 

374 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres` 

375 must be replaced with `postgresql+psycopg2`. The bare `postgresql` 

376 scheme is also upgraded to make the psycopg2 driver explicit. 

377 """ 

378 section, key = "database", "sql_alchemy_conn" 

379 old_value = self.get(section, key, _extra_stacklevel=1) 

380 bad_schemes = ["postgres+psycopg2", "postgres", "postgresql"] 

381 good_scheme = "postgresql+psycopg2" 

382 parsed = urlsplit(old_value) 

383 if parsed.scheme in bad_schemes: 

384 warnings.warn( 

385 f"Bad scheme in Airflow configuration [database] sql_alchemy_conn: `{parsed.scheme}`. " 

386 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must " 

387 f"change to `{good_scheme}` before the next Airflow release.", 

388 FutureWarning, 

389 stacklevel=1, 

390 ) 

391 self.upgraded_values[(section, key)] = old_value 

392 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value) 

393 self._update_env_var(section=section, name=key, new_value=new_value) 

394 

395 # if the old value is set via env var, we need to wipe it 

396 # otherwise, it'll "win" over our adjusted value 

397 old_env_var = self._env_var_name("core", key) 

398 os.environ.pop(old_env_var, None) 

399 

400 def _validate_enums(self): 

401 """Validate that enum type config has an accepted value.""" 

402 for (section_key, option_key), enum_options in self.enums_options.items(): 

403 if self.has_option(section_key, option_key): 

404 value = self.get(section_key, option_key, fallback=None) 

405 if value and value not in enum_options: 

406 raise AirflowConfigException( 

407 f"`[{section_key}] {option_key}` should not be " 

408 f"{value!r}. Possible values: {', '.join(enum_options)}." 

409 ) 

410 

411 def _validate_sqlite3_version(self): 

412 """ 

413 Validate SQLite version. 

414 

415 Some features in storing rendered fields require SQLite >= 3.15.0. 

416 """ 

417 if "sqlite" not in self.get("database", "sql_alchemy_conn"): 

418 return 

419 

420 import sqlite3 

421 

422 min_sqlite_version = (3, 15, 0) 

423 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version: 

424 return 

425 

426 from airflow.utils.docs import get_docs_url 

427 

428 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version) 

429 raise AirflowConfigException( 

430 f"error: SQLite C library too old (< {min_sqlite_version_str}). " 

431 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}" 

432 ) 

433 

434 def _get_custom_secret_backend(self, worker_mode: bool | None = None) -> Any | None: 

435 return super()._get_custom_secret_backend( 

436 worker_mode=worker_mode if worker_mode is not None else False 

437 ) 

438 

439 def mask_secrets(self): 

440 from airflow._shared.configuration.parser import _build_kwarg_env_prefix, _collect_kwarg_env_vars 

441 from airflow._shared.secrets_masker import mask_secret as mask_secret_core 

442 from airflow.sdk.log import mask_secret as mask_secret_sdk 

443 

444 for section, key in self.sensitive_config_values: 

445 try: 

446 with self.suppress_future_warnings(): 

447 value = self.get(section, key, suppress_warnings=True) 

448 except AirflowConfigException: 

449 log.debug( 

450 "Could not retrieve value from section %s, for key %s. Skipping redaction of this conf.", 

451 section, 

452 key, 

453 ) 

454 continue 

455 mask_secret_core(value) 

456 mask_secret_sdk(value) 

457 

458 # Mask per-key backend kwarg env vars (AIRFLOW__SECRETS__BACKEND_KWARG__* etc.). 

459 # These are not in sensitive_config_values but may contain sensitive values. 

460 for _section, _kwargs_key in [ 

461 ("secrets", "backend_kwargs"), 

462 ("workers", "secrets_backend_kwargs"), 

463 ]: 

464 _prefix = _build_kwarg_env_prefix(_section, _kwargs_key) 

465 for _value in _collect_kwarg_env_vars(_prefix).values(): 

466 mask_secret_core(_value) 

467 mask_secret_sdk(_value) 

468 

469 def load_test_config(self): 

470 """ 

471 Use test configuration rather than the configuration coming from airflow defaults. 

472 

473 When running tests we use special the unit_test configuration to avoid accidental modifications and 

474 different behaviours when running the tests. Values for those test configuration are stored in 

475 the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder 

476 and you need to change values there if you want to make some specific configuration to be used 

477 """ 

478 from cryptography.fernet import Fernet 

479 

480 unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg" 

481 unit_test_config = unit_test_config_file.read_text() 

482 self.remove_all_read_configurations() 

483 with StringIO(unit_test_config) as test_config_file: 

484 self.read_file(test_config_file) 

485 

486 # We need those globals before we run "get_all_expansion_variables" because this is where 

487 # the variables are expanded from in the configuration - set to random values for tests 

488 _SecretKeys.fernet_key = Fernet.generate_key().decode() 

489 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8") 

490 self.expand_all_configuration_values() 

491 log.info("Unit test configuration loaded from 'config_unit_tests.cfg'") 

492 

493 def expand_all_configuration_values(self): 

494 """Expand all configuration values using global and local variables defined in this module.""" 

495 all_vars = get_all_expansion_variables() 

496 for section in self.sections(): 

497 for key, value in self.items(section): 

498 if value is not None: 

499 if self.has_option(section, key): 

500 self.remove_option(section, key, remove_default=False) 

501 if self.is_template(section, key) or not isinstance(value, str): 

502 self.set(section, key, value) 

503 else: 

504 self.set(section, key, value.format(**all_vars)) 

505 

506 def remove_all_read_configurations(self): 

507 """Remove all read configurations, leaving only default values in the config.""" 

508 for section in self.sections(): 

509 self.remove_section(section) 

510 

511 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

512 """ 

513 Override to use module-level function that reads from global conf. 

514 

515 This ensures as_dict() and other methods use the same secrets backend 

516 configuration as the global conf instance (set via conf_vars in tests). 

517 """ 

518 secrets_client = get_custom_secret_backend() 

519 if not secrets_client: 

520 return None 

521 try: 

522 return secrets_client.get_config(config_key) 

523 except Exception as e: 

524 raise AirflowConfigException( 

525 "Cannot retrieve config from alternative secrets backend. " 

526 "Make sure it is configured properly and that the Backend " 

527 "is accessible.\n" 

528 f"{e}" 

529 ) 

530 

531 def __getstate__(self) -> dict[str, Any]: 

532 """Return the state of the object as a dictionary for pickling.""" 

533 return { 

534 name: getattr(self, name) 

535 for name in [ 

536 "_sections", 

537 "is_validated", 

538 "configuration_description", 

539 "upgraded_values", 

540 "_default_values", 

541 ] 

542 } 

543 

544 def __setstate__(self, state) -> None: 

545 """Restore the state of the object from a dictionary representation.""" 

546 self.__init__() # type: ignore[misc] 

547 config = state.pop("_sections") 

548 self.read_dict(config) 

549 self.__dict__.update(state) 

550 

551 

552def get_airflow_home() -> str: 

553 """Get path to Airflow Home.""" 

554 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow")) 

555 

556 

557def get_airflow_config(airflow_home: str) -> str: 

558 """Get Path to airflow.cfg path.""" 

559 airflow_config_var = os.environ.get("AIRFLOW_CONFIG") 

560 if airflow_config_var is None: 

561 return os.path.join(airflow_home, "airflow.cfg") 

562 return expand_env_var(airflow_config_var) 

563 

564 

565def get_all_expansion_variables() -> dict[str, Any]: 

566 return { 

567 "FERNET_KEY": _SecretKeys.fernet_key, 

568 "JWT_SECRET_KEY": _SecretKeys.jwt_secret_key, 

569 **{ 

570 k: v 

571 for k, v in globals().items() 

572 if not k.startswith("_") and not callable(v) and not ismodule(v) 

573 }, 

574 } 

575 

576 

577def _generate_fernet_key() -> str: 

578 from cryptography.fernet import Fernet 

579 

580 return Fernet.generate_key().decode() 

581 

582 

583def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser: 

584 """ 

585 Create default config parser based on configuration description. 

586 

587 It creates ConfigParser with all default values retrieved from the configuration description and 

588 expands all the variables from the global and local variables defined in this module. 

589 

590 :param configuration_description: configuration description - retrieved from config.yaml files 

591 following the schema defined in "config.yml.schema.json" in the config_templates folder. 

592 :return: Default Config Parser that can be used to read configuration values from. 

593 """ 

594 parser = ConfigParser() 

595 all_vars = get_all_expansion_variables() 

596 configure_parser_from_configuration_description(parser, configuration_description, all_vars) 

597 return parser 

598 

599 

600def write_default_airflow_configuration_if_needed() -> AirflowConfigParser: 

601 airflow_config = pathlib.Path(AIRFLOW_CONFIG) 

602 if airflow_config.is_dir(): 

603 msg = ( 

604 "Airflow config expected to be a path to the configuration file, " 

605 f"but got a directory {airflow_config.__fspath__()!r}." 

606 ) 

607 raise IsADirectoryError(msg) 

608 if not airflow_config.exists(): 

609 log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__()) 

610 config_directory = airflow_config.parent 

611 if not config_directory.exists(): 

612 if not config_directory.is_relative_to(AIRFLOW_HOME): 

613 msg = ( 

614 f"Config directory {config_directory.__fspath__()!r} not exists " 

615 f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. " 

616 "Please create this directory first." 

617 ) 

618 raise FileNotFoundError(msg) from None 

619 log.debug("Create directory %r for Airflow config", config_directory.__fspath__()) 

620 config_directory.mkdir(parents=True, exist_ok=True) 

621 if not conf.get("core", "fernet_key"): 

622 # We know that fernet_key is not set, so we can generate it, set as global key 

623 # and also write it to the config file so that same key will be used next time 

624 _SecretKeys.fernet_key = _generate_fernet_key() 

625 conf._configuration_description["core"]["options"]["fernet_key"]["default"] = ( 

626 _SecretKeys.fernet_key 

627 ) 

628 conf._default_values.set("core", "fernet_key", _SecretKeys.fernet_key) 

629 

630 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8") 

631 conf._configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = ( 

632 _SecretKeys.jwt_secret_key 

633 ) 

634 conf._default_values.set("api_auth", "jwt_secret", _SecretKeys.jwt_secret_key) 

635 # Invalidate cached configuration_description so it recomputes with the updated base 

636 conf.invalidate_cache() 

637 pathlib.Path(airflow_config.__fspath__()).touch() 

638 make_group_other_inaccessible(airflow_config.__fspath__()) 

639 with open(airflow_config, "w") as file: 

640 conf.write( 

641 file, 

642 include_sources=False, 

643 include_env_vars=True, 

644 include_providers=True, 

645 extra_spacing=True, 

646 only_defaults=True, 

647 show_values=True, 

648 ) 

649 return conf 

650 

651 

652def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser): 

653 """ 

654 Load standard airflow configuration. 

655 

656 In case it finds that the configuration file is missing, it will create it and write the default 

657 configuration values there, based on defaults passed, and will add the comments and examples 

658 from the default configuration. 

659 

660 :param airflow_config_parser: parser to which the configuration will be loaded 

661 

662 """ 

663 global AIRFLOW_HOME # to be cleaned in Airflow 4.0 

664 log.info("Reading the config from %s", AIRFLOW_CONFIG) 

665 airflow_config_parser.read(AIRFLOW_CONFIG) 

666 if airflow_config_parser.has_option("core", "AIRFLOW_HOME"): 

667 msg = ( 

668 "Specifying both AIRFLOW_HOME environment variable and airflow_home " 

669 "in the config file is deprecated. Please use only the AIRFLOW_HOME " 

670 "environment variable and remove the config file entry." 

671 ) 

672 if "AIRFLOW_HOME" in os.environ: 

673 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1) 

674 elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME: 

675 warnings.warn( 

676 "Specifying airflow_home in the config file is deprecated. As you " 

677 "have left it at the default value you should remove the setting " 

678 "from your airflow.cfg and suffer no change in behaviour.", 

679 category=RemovedInAirflow4Warning, 

680 stacklevel=1, 

681 ) 

682 else: 

683 AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home") 

684 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1) 

685 

686 

687def initialize_config() -> AirflowConfigParser: 

688 """ 

689 Load the Airflow config files. 

690 

691 Called for you automatically as part of the Airflow boot process. 

692 """ 

693 airflow_config_parser = AirflowConfigParser() 

694 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

695 airflow_config_parser.load_test_config() 

696 else: 

697 load_standard_airflow_configuration(airflow_config_parser) 

698 # If the user set unit_test_mode in the airflow.cfg, we still 

699 # want to respect that and then load the default unit test configuration 

700 # file on top of it. 

701 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

702 airflow_config_parser.load_test_config() 

703 return airflow_config_parser 

704 

705 

706def make_group_other_inaccessible(file_path: str): 

707 try: 

708 permissions = os.stat(file_path) 

709 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR)) 

710 except Exception as e: 

711 log.warning( 

712 "Could not change permissions of config file to be group/other inaccessible. " 

713 "Continuing with original permissions: %s", 

714 e, 

715 ) 

716 

717 

718def ensure_secrets_loaded( 

719 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

720) -> list[BaseSecretsBackend]: 

721 """ 

722 Ensure that all secrets backends are loaded. 

723 

724 If the secrets_backend_list contains only 2 default backends, reload it. 

725 """ 

726 # Check if the secrets_backend_list contains only 2 default backends. 

727 

728 # Check if we are loading the backends for worker too by checking if the default_backends is equal 

729 # to DEFAULT_SECRETS_SEARCH_PATH. 

730 if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH: 

731 return initialize_secrets_backends(default_backends=default_backends) 

732 return secrets_backend_list 

733 

734 

735def get_custom_secret_backend(worker_mode: bool = False) -> BaseSecretsBackend | None: 

736 """ 

737 Get Secret Backend if defined in airflow.cfg. 

738 

739 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

740 

741 This is a convenience function that calls conf._get_custom_secret_backend(). 

742 """ 

743 return conf._get_custom_secret_backend(worker_mode=worker_mode) 

744 

745 

746def initialize_secrets_backends( 

747 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

748) -> list[BaseSecretsBackend]: 

749 """ 

750 Initialize secrets backend. 

751 

752 * import secrets backend classes 

753 * instantiate them and return them in a list 

754 """ 

755 backend_list = [] 

756 worker_mode = False 

757 if default_backends != DEFAULT_SECRETS_SEARCH_PATH: 

758 worker_mode = True 

759 

760 custom_secret_backend = get_custom_secret_backend(worker_mode) 

761 

762 if custom_secret_backend is not None: 

763 from airflow.models import Connection 

764 

765 custom_secret_backend._set_connection_class(Connection) 

766 backend_list.append(custom_secret_backend) 

767 

768 for class_name in default_backends: 

769 from airflow.models import Connection 

770 

771 secrets_backend_cls = import_string(class_name) 

772 backend = secrets_backend_cls() 

773 backend._set_connection_class(Connection) 

774 backend_list.append(backend) 

775 

776 return backend_list 

777 

778 

779def initialize_auth_manager() -> BaseAuthManager: 

780 """ 

781 Initialize auth manager. 

782 

783 * import user manager class 

784 * instantiate it and return it 

785 """ 

786 auth_manager_cls = conf.getimport(section="core", key="auth_manager") 

787 

788 if not auth_manager_cls: 

789 raise AirflowConfigException( 

790 "No auth manager defined in the config. Please specify one using section/key [core/auth_manager]." 

791 ) 

792 

793 return auth_manager_cls() 

794 

795 

796# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using 

797# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults. 

798AIRFLOW_HOME = get_airflow_home() 

799AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME) 

800 

801# Set up dags folder for unit tests 

802# this directory won't exist if users install via pip 

803_TEST_DAGS_FOLDER = os.path.join( 

804 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags" 

805) 

806if os.path.exists(_TEST_DAGS_FOLDER): 

807 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER 

808else: 

809 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags") 

810 

811# Set up plugins folder for unit tests 

812_TEST_PLUGINS_FOLDER = os.path.join( 

813 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins" 

814) 

815if os.path.exists(_TEST_PLUGINS_FOLDER): 

816 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER 

817else: 

818 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins") 

819 

820SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8") 

821 

822conf: AirflowConfigParser = initialize_config() 

823secrets_backend_list = initialize_secrets_backends() 

824conf.validate()