Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/configuration.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

361 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import logging 

20import os 

21import pathlib 

22import re 

23import shlex 

24import stat 

25import subprocess 

26import sys 

27import warnings 

28from base64 import b64encode 

29from collections.abc import Callable 

30from configparser import ConfigParser 

31from inspect import ismodule 

32from io import StringIO 

33from re import Pattern 

34from typing import IO, TYPE_CHECKING, Any 

35from urllib.parse import urlsplit 

36 

37from typing_extensions import overload 

38 

39from airflow._shared.configuration.parser import ( 

40 AirflowConfigParser as _SharedAirflowConfigParser, 

41 configure_parser_from_configuration_description, 

42) 

43from airflow._shared.module_loading import import_string 

44from airflow.exceptions import AirflowConfigException, RemovedInAirflow4Warning 

45from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH 

46from airflow.task.weight_rule import WeightRule 

47from airflow.utils import yaml 

48 

49if TYPE_CHECKING: 

50 from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager 

51 from airflow.secrets import BaseSecretsBackend 

52 

53log = logging.getLogger(__name__) 

54 

55# show Airflow's deprecation warnings 

56if not sys.warnoptions: 

57 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow") 

58 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow") 

59 

60_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$") 

61 

62ConfigType = str | int | float | bool 

63ConfigOptionsDictType = dict[str, ConfigType] 

64ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

65ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

66 

67ENV_VAR_PREFIX = "AIRFLOW__" 

68 

69 

70class _SecretKeys: 

71 """Holds the secret keys used in Airflow during runtime.""" 

72 

73 fernet_key: str = "" # Set only if needed when generating a new file 

74 jwt_secret_key: str = "" 

75 

76 

77class ConfigModifications: 

78 """ 

79 Holds modifications to be applied when writing out the config. 

80 

81 :param rename: Mapping from (old_section, old_option) to (new_section, new_option) 

82 :param remove: Set of (section, option) to remove 

83 :param default_updates: Mapping from (section, option) to new default value 

84 """ 

85 

86 def __init__(self) -> None: 

87 self.rename: dict[tuple[str, str], tuple[str, str]] = {} 

88 self.remove: set[tuple[str, str]] = set() 

89 self.default_updates: dict[tuple[str, str], str] = {} 

90 

91 def add_rename(self, old_section: str, old_option: str, new_section: str, new_option: str) -> None: 

92 self.rename[(old_section, old_option)] = (new_section, new_option) 

93 

94 def add_remove(self, section: str, option: str) -> None: 

95 self.remove.add((section, option)) 

96 

97 def add_default_update(self, section: str, option: str, new_default: str) -> None: 

98 self.default_updates[(section, option)] = new_default 

99 

100 

101def _parse_sqlite_version(s: str) -> tuple[int, ...]: 

102 match = _SQLITE3_VERSION_PATTERN.match(s) 

103 if match is None: 

104 return () 

105 return tuple(int(p) for p in match.group("version").split(".")) 

106 

107 

108@overload 

109def expand_env_var(env_var: None) -> None: ... 

110 

111 

112@overload 

113def expand_env_var(env_var: str) -> str: ... 

114 

115 

116def expand_env_var(env_var: str | None) -> str | None: 

117 """ 

118 Expand (potentially nested) env vars. 

119 

120 Repeat and apply `expandvars` and `expanduser` until 

121 interpolation stops having any effect. 

122 """ 

123 if not env_var or not isinstance(env_var, str): 

124 return env_var 

125 while True: 

126 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

127 if interpolated == env_var: 

128 return interpolated 

129 env_var = interpolated 

130 

131 

132def run_command(command: str) -> str: 

133 """Run command and returns stdout.""" 

134 process = subprocess.Popen( 

135 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

136 ) 

137 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

138 

139 if process.returncode != 0: 

140 raise AirflowConfigException( 

141 f"Cannot execute {command}. Error code is: {process.returncode}. " 

142 f"Output: {output}, Stderr: {stderr}" 

143 ) 

144 

145 return output 

146 

147 

148def _default_config_file_path(file_name: str) -> str: 

149 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates") 

150 return os.path.join(templates_dir, file_name) 

151 

152 

153def retrieve_configuration_description( 

154 include_airflow: bool = True, 

155 include_providers: bool = True, 

156 selected_provider: str | None = None, 

157) -> dict[str, dict[str, Any]]: 

158 """ 

159 Read Airflow configuration description from YAML file. 

160 

161 :param include_airflow: Include Airflow configs 

162 :param include_providers: Include provider configs 

163 :param selected_provider: If specified, include selected provider only 

164 :return: Python dictionary containing configs & their info 

165 """ 

166 base_configuration_description: dict[str, dict[str, Any]] = {} 

167 if include_airflow: 

168 with open(_default_config_file_path("config.yml")) as config_file: 

169 base_configuration_description.update(yaml.safe_load(config_file)) 

170 if include_providers: 

171 from airflow.providers_manager import ProvidersManager 

172 

173 for provider, config in ProvidersManager().provider_configs: 

174 if not selected_provider or provider == selected_provider: 

175 base_configuration_description.update(config) 

176 return base_configuration_description 

177 

178 

179class AirflowConfigParser(_SharedAirflowConfigParser): 

180 """ 

181 Custom Airflow Configparser supporting defaults and deprecated options. 

182 

183 This is a subclass of the shared AirflowConfigParser that adds Core-specific initialization 

184 and functionality (providers, validation, writing, etc.). 

185 

186 The defaults are stored in the ``_default_values``. The configuration description keeps 

187 description of all the options available in Airflow (description follow config.yaml.schema). 

188 

189 :param default_config: default configuration (in the form of ini file). 

190 :param configuration_description: description of configuration to use 

191 """ 

192 

193 def __init__( 

194 self, 

195 default_config: str | None = None, 

196 *args, 

197 **kwargs, 

198 ): 

199 _configuration_description = retrieve_configuration_description(include_providers=False) 

200 # For those who would like to use a different data structure to keep defaults: 

201 # We have to keep the default values in a ConfigParser rather than in any other 

202 # data structure, because the values we have might contain %% which are ConfigParser 

203 # interpolation placeholders. The _default_values config parser will interpolate them 

204 # properly when we call get() on it. 

205 _default_values = create_default_config_parser(_configuration_description) 

206 from airflow.providers_manager import ProvidersManager 

207 

208 super().__init__( 

209 _configuration_description, 

210 _default_values, 

211 ProvidersManager, 

212 create_default_config_parser, 

213 _default_config_file_path("provider_config_fallback_defaults.cfg"), 

214 *args, 

215 **kwargs, 

216 ) 

217 self._configuration_description = _configuration_description 

218 self._default_values = _default_values 

219 if default_config is not None: 

220 self._update_defaults_from_string(default_config) 

221 self._update_logging_deprecated_template_to_one_from_defaults() 

222 self.is_validated = False 

223 self._suppress_future_warnings = False 

224 

225 @property 

226 def _validators(self) -> list[Callable[[], None]]: 

227 """Overring _validators from shared base class to add core-specific validators.""" 

228 return [ 

229 self._validate_sqlite3_version, 

230 self._validate_enums, 

231 self._validate_deprecated_values, 

232 self._upgrade_postgres_metastore_conn, 

233 ] 

234 

235 def _update_logging_deprecated_template_to_one_from_defaults(self): 

236 default = self.get_default_value("logging", "log_filename_template") 

237 if default: 

238 # Tuple does not support item assignment, so we have to create a new tuple and replace it 

239 original_replacement = self.deprecated_values["logging"]["log_filename_template"] 

240 self.deprecated_values["logging"]["log_filename_template"] = ( 

241 original_replacement[0], 

242 default, 

243 ) 

244 

245 # A mapping of old default values that we want to change and warn the user 

246 # about. Mapping of section -> setting -> { old, replace } 

247 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = { 

248 "logging": { 

249 "log_filename_template": ( 

250 re.compile( 

251 re.escape( 

252 "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log" 

253 ) 

254 ), 

255 # The actual replacement value will be updated after defaults are loaded from config.yml 

256 "XX-set-after-default-config-loaded-XX", 

257 ), 

258 }, 

259 "core": { 

260 "executor": (re.compile(re.escape("SequentialExecutor")), "LocalExecutor"), 

261 }, 

262 } 

263 

264 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"] 

265 enums_options = { 

266 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()), 

267 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"], 

268 ("dag_processor", "file_parsing_sort_mode"): [ 

269 "modified_time", 

270 "random_seeded_by_host", 

271 "alphabetical", 

272 ], 

273 ("logging", "logging_level"): _available_logging_levels, 

274 ("logging", "fab_logging_level"): _available_logging_levels, 

275 # celery_logging_level can be empty, which uses logging_level as fallback 

276 ("logging", "celery_logging_level"): [*_available_logging_levels, ""], 

277 # uvicorn and gunicorn logging levels for web servers 

278 ("logging", "uvicorn_logging_level"): _available_logging_levels, 

279 ("logging", "gunicorn_logging_level"): _available_logging_levels, 

280 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""], 

281 ("api", "grid_view_sorting_order"): ["topological", "hierarchical_alphabetical"], 

282 } 

283 

284 upgraded_values: dict[tuple[str, str], str] 

285 """Mapping of (section,option) to the old value that was upgraded""" 

286 

287 def write_custom_config( 

288 self, 

289 file: IO[str], 

290 comment_out_defaults: bool = True, 

291 include_descriptions: bool = True, 

292 extra_spacing: bool = True, 

293 modifications: ConfigModifications | None = None, 

294 ) -> None: 

295 """ 

296 Write a configuration file using a ConfigModifications object. 

297 

298 This method includes only options from the current airflow.cfg. For each option: 

299 - If it's marked for removal, omit it. 

300 - If renamed, output it under its new name and add a comment indicating its original location. 

301 - If a default update is specified, apply the new default and output the option as a commented line. 

302 - Otherwise, if the current value equals the default and comment_out_defaults is True, output it as a comment. 

303 Options absent from the current airflow.cfg are omitted. 

304 

305 :param file: File to write the configuration. 

306 :param comment_out_defaults: If True, options whose value equals the default are written as comments. 

307 :param include_descriptions: Whether to include section descriptions. 

308 :param extra_spacing: Whether to insert an extra blank line after each option. 

309 :param modifications: ConfigModifications instance with rename, remove, and default updates. 

310 """ 

311 modifications = modifications or ConfigModifications() 

312 output: dict[str, list[tuple[str, str, bool, str]]] = {} 

313 

314 for section in self._sections: # type: ignore[attr-defined] # accessing _sections from ConfigParser 

315 for option, orig_value in self._sections[section].items(): # type: ignore[attr-defined] 

316 key = (section.lower(), option.lower()) 

317 if key in modifications.remove: 

318 continue 

319 

320 mod_comment = "" 

321 if key in modifications.rename: 

322 new_sec, new_opt = modifications.rename[key] 

323 effective_section = new_sec 

324 effective_option = new_opt 

325 mod_comment += f"# Renamed from {section}.{option}\n" 

326 else: 

327 effective_section = section 

328 effective_option = option 

329 

330 value = orig_value 

331 if key in modifications.default_updates: 

332 mod_comment += ( 

333 f"# Default updated from {orig_value} to {modifications.default_updates[key]}\n" 

334 ) 

335 value = modifications.default_updates[key] 

336 

337 default_value = self.get_default_value(effective_section, effective_option, fallback="") 

338 is_default = str(value) == str(default_value) 

339 output.setdefault(effective_section.lower(), []).append( 

340 (effective_option, str(value), is_default, mod_comment) 

341 ) 

342 

343 for section, options in output.items(): 

344 section_buffer = StringIO() 

345 section_buffer.write(f"[{section}]\n") 

346 if include_descriptions: 

347 description = self.configuration_description.get(section, {}).get("description", "") 

348 if description: 

349 for line in description.splitlines(): 

350 section_buffer.write(f"# {line}\n") 

351 section_buffer.write("\n") 

352 for option, value_str, is_default, mod_comment in options: 

353 key = (section.lower(), option.lower()) 

354 if key in modifications.default_updates and comment_out_defaults: 

355 section_buffer.write(f"# {option} = {value_str}\n") 

356 else: 

357 if mod_comment: 

358 section_buffer.write(mod_comment) 

359 if is_default and comment_out_defaults: 

360 section_buffer.write(f"# {option} = {value_str}\n") 

361 else: 

362 section_buffer.write(f"{option} = {value_str}\n") 

363 if extra_spacing: 

364 section_buffer.write("\n") 

365 content = section_buffer.getvalue().strip() 

366 if content: 

367 file.write(f"{content}\n\n") 

368 

369 def _upgrade_postgres_metastore_conn(self): 

370 """ 

371 Upgrade SQL schemas. 

372 

373 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres` 

374 must be replaced with `postgresql`. 

375 """ 

376 section, key = "database", "sql_alchemy_conn" 

377 old_value = self.get(section, key, _extra_stacklevel=1) 

378 bad_schemes = ["postgres+psycopg2", "postgres"] 

379 good_scheme = "postgresql" 

380 parsed = urlsplit(old_value) 

381 if parsed.scheme in bad_schemes: 

382 warnings.warn( 

383 f"Bad scheme in Airflow configuration [database] sql_alchemy_conn: `{parsed.scheme}`. " 

384 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must " 

385 f"change to `{good_scheme}` before the next Airflow release.", 

386 FutureWarning, 

387 stacklevel=1, 

388 ) 

389 self.upgraded_values[(section, key)] = old_value 

390 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value) 

391 self._update_env_var(section=section, name=key, new_value=new_value) 

392 

393 # if the old value is set via env var, we need to wipe it 

394 # otherwise, it'll "win" over our adjusted value 

395 old_env_var = self._env_var_name("core", key) 

396 os.environ.pop(old_env_var, None) 

397 

398 def _validate_enums(self): 

399 """Validate that enum type config has an accepted value.""" 

400 for (section_key, option_key), enum_options in self.enums_options.items(): 

401 if self.has_option(section_key, option_key): 

402 value = self.get(section_key, option_key, fallback=None) 

403 if value and value not in enum_options: 

404 raise AirflowConfigException( 

405 f"`[{section_key}] {option_key}` should not be " 

406 f"{value!r}. Possible values: {', '.join(enum_options)}." 

407 ) 

408 

409 def _validate_sqlite3_version(self): 

410 """ 

411 Validate SQLite version. 

412 

413 Some features in storing rendered fields require SQLite >= 3.15.0. 

414 """ 

415 if "sqlite" not in self.get("database", "sql_alchemy_conn"): 

416 return 

417 

418 import sqlite3 

419 

420 min_sqlite_version = (3, 15, 0) 

421 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version: 

422 return 

423 

424 from airflow.utils.docs import get_docs_url 

425 

426 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version) 

427 raise AirflowConfigException( 

428 f"error: SQLite C library too old (< {min_sqlite_version_str}). " 

429 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}" 

430 ) 

431 

432 def _get_custom_secret_backend(self, worker_mode: bool | None = None) -> Any | None: 

433 return super()._get_custom_secret_backend( 

434 worker_mode=worker_mode if worker_mode is not None else False 

435 ) 

436 

437 def mask_secrets(self): 

438 from airflow._shared.configuration.parser import _build_kwarg_env_prefix, _collect_kwarg_env_vars 

439 from airflow._shared.secrets_masker import mask_secret as mask_secret_core 

440 from airflow.sdk.log import mask_secret as mask_secret_sdk 

441 

442 for section, key in self.sensitive_config_values: 

443 try: 

444 with self.suppress_future_warnings(): 

445 value = self.get(section, key, suppress_warnings=True) 

446 except AirflowConfigException: 

447 log.debug( 

448 "Could not retrieve value from section %s, for key %s. Skipping redaction of this conf.", 

449 section, 

450 key, 

451 ) 

452 continue 

453 mask_secret_core(value) 

454 mask_secret_sdk(value) 

455 

456 # Mask per-key backend kwarg env vars (AIRFLOW__SECRETS__BACKEND_KWARG__* etc.). 

457 # These are not in sensitive_config_values but may contain sensitive values. 

458 for _section, _kwargs_key in [ 

459 ("secrets", "backend_kwargs"), 

460 ("workers", "secrets_backend_kwargs"), 

461 ]: 

462 _prefix = _build_kwarg_env_prefix(_section, _kwargs_key) 

463 for _value in _collect_kwarg_env_vars(_prefix).values(): 

464 mask_secret_core(_value) 

465 mask_secret_sdk(_value) 

466 

467 def load_test_config(self): 

468 """ 

469 Use test configuration rather than the configuration coming from airflow defaults. 

470 

471 When running tests we use special the unit_test configuration to avoid accidental modifications and 

472 different behaviours when running the tests. Values for those test configuration are stored in 

473 the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder 

474 and you need to change values there if you want to make some specific configuration to be used 

475 """ 

476 from cryptography.fernet import Fernet 

477 

478 unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg" 

479 unit_test_config = unit_test_config_file.read_text() 

480 self.remove_all_read_configurations() 

481 with StringIO(unit_test_config) as test_config_file: 

482 self.read_file(test_config_file) 

483 

484 # We need those globals before we run "get_all_expansion_variables" because this is where 

485 # the variables are expanded from in the configuration - set to random values for tests 

486 _SecretKeys.fernet_key = Fernet.generate_key().decode() 

487 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8") 

488 self.expand_all_configuration_values() 

489 log.info("Unit test configuration loaded from 'config_unit_tests.cfg'") 

490 

491 def expand_all_configuration_values(self): 

492 """Expand all configuration values using global and local variables defined in this module.""" 

493 all_vars = get_all_expansion_variables() 

494 for section in self.sections(): 

495 for key, value in self.items(section): 

496 if value is not None: 

497 if self.has_option(section, key): 

498 self.remove_option(section, key, remove_default=False) 

499 if self.is_template(section, key) or not isinstance(value, str): 

500 self.set(section, key, value) 

501 else: 

502 self.set(section, key, value.format(**all_vars)) 

503 

504 def remove_all_read_configurations(self): 

505 """Remove all read configurations, leaving only default values in the config.""" 

506 for section in self.sections(): 

507 self.remove_section(section) 

508 

509 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

510 """ 

511 Override to use module-level function that reads from global conf. 

512 

513 This ensures as_dict() and other methods use the same secrets backend 

514 configuration as the global conf instance (set via conf_vars in tests). 

515 """ 

516 secrets_client = get_custom_secret_backend() 

517 if not secrets_client: 

518 return None 

519 try: 

520 return secrets_client.get_config(config_key) 

521 except Exception as e: 

522 raise AirflowConfigException( 

523 "Cannot retrieve config from alternative secrets backend. " 

524 "Make sure it is configured properly and that the Backend " 

525 "is accessible.\n" 

526 f"{e}" 

527 ) 

528 

529 def __getstate__(self) -> dict[str, Any]: 

530 """Return the state of the object as a dictionary for pickling.""" 

531 return { 

532 name: getattr(self, name) 

533 for name in [ 

534 "_sections", 

535 "is_validated", 

536 "configuration_description", 

537 "upgraded_values", 

538 "_default_values", 

539 ] 

540 } 

541 

542 def __setstate__(self, state) -> None: 

543 """Restore the state of the object from a dictionary representation.""" 

544 self.__init__() # type: ignore[misc] 

545 config = state.pop("_sections") 

546 self.read_dict(config) 

547 self.__dict__.update(state) 

548 

549 

550def get_airflow_home() -> str: 

551 """Get path to Airflow Home.""" 

552 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow")) 

553 

554 

555def get_airflow_config(airflow_home: str) -> str: 

556 """Get Path to airflow.cfg path.""" 

557 airflow_config_var = os.environ.get("AIRFLOW_CONFIG") 

558 if airflow_config_var is None: 

559 return os.path.join(airflow_home, "airflow.cfg") 

560 return expand_env_var(airflow_config_var) 

561 

562 

563def get_all_expansion_variables() -> dict[str, Any]: 

564 return { 

565 "FERNET_KEY": _SecretKeys.fernet_key, 

566 "JWT_SECRET_KEY": _SecretKeys.jwt_secret_key, 

567 **{ 

568 k: v 

569 for k, v in globals().items() 

570 if not k.startswith("_") and not callable(v) and not ismodule(v) 

571 }, 

572 } 

573 

574 

575def _generate_fernet_key() -> str: 

576 from cryptography.fernet import Fernet 

577 

578 return Fernet.generate_key().decode() 

579 

580 

581def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser: 

582 """ 

583 Create default config parser based on configuration description. 

584 

585 It creates ConfigParser with all default values retrieved from the configuration description and 

586 expands all the variables from the global and local variables defined in this module. 

587 

588 :param configuration_description: configuration description - retrieved from config.yaml files 

589 following the schema defined in "config.yml.schema.json" in the config_templates folder. 

590 :return: Default Config Parser that can be used to read configuration values from. 

591 """ 

592 parser = ConfigParser() 

593 all_vars = get_all_expansion_variables() 

594 configure_parser_from_configuration_description(parser, configuration_description, all_vars) 

595 return parser 

596 

597 

598def write_default_airflow_configuration_if_needed() -> AirflowConfigParser: 

599 airflow_config = pathlib.Path(AIRFLOW_CONFIG) 

600 if airflow_config.is_dir(): 

601 msg = ( 

602 "Airflow config expected to be a path to the configuration file, " 

603 f"but got a directory {airflow_config.__fspath__()!r}." 

604 ) 

605 raise IsADirectoryError(msg) 

606 if not airflow_config.exists(): 

607 log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__()) 

608 config_directory = airflow_config.parent 

609 if not config_directory.exists(): 

610 if not config_directory.is_relative_to(AIRFLOW_HOME): 

611 msg = ( 

612 f"Config directory {config_directory.__fspath__()!r} not exists " 

613 f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. " 

614 "Please create this directory first." 

615 ) 

616 raise FileNotFoundError(msg) from None 

617 log.debug("Create directory %r for Airflow config", config_directory.__fspath__()) 

618 config_directory.mkdir(parents=True, exist_ok=True) 

619 if not conf.get("core", "fernet_key"): 

620 # We know that fernet_key is not set, so we can generate it, set as global key 

621 # and also write it to the config file so that same key will be used next time 

622 _SecretKeys.fernet_key = _generate_fernet_key() 

623 conf._configuration_description["core"]["options"]["fernet_key"]["default"] = ( 

624 _SecretKeys.fernet_key 

625 ) 

626 conf._default_values.set("core", "fernet_key", _SecretKeys.fernet_key) 

627 

628 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8") 

629 conf._configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = ( 

630 _SecretKeys.jwt_secret_key 

631 ) 

632 conf._default_values.set("api_auth", "jwt_secret", _SecretKeys.jwt_secret_key) 

633 # Invalidate cached configuration_description so it recomputes with the updated base 

634 conf.invalidate_cache() 

635 pathlib.Path(airflow_config.__fspath__()).touch() 

636 make_group_other_inaccessible(airflow_config.__fspath__()) 

637 with open(airflow_config, "w") as file: 

638 conf.write( 

639 file, 

640 include_sources=False, 

641 include_env_vars=True, 

642 include_providers=True, 

643 extra_spacing=True, 

644 only_defaults=True, 

645 show_values=True, 

646 ) 

647 return conf 

648 

649 

650def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser): 

651 """ 

652 Load standard airflow configuration. 

653 

654 In case it finds that the configuration file is missing, it will create it and write the default 

655 configuration values there, based on defaults passed, and will add the comments and examples 

656 from the default configuration. 

657 

658 :param airflow_config_parser: parser to which the configuration will be loaded 

659 

660 """ 

661 global AIRFLOW_HOME # to be cleaned in Airflow 4.0 

662 log.info("Reading the config from %s", AIRFLOW_CONFIG) 

663 airflow_config_parser.read(AIRFLOW_CONFIG) 

664 if airflow_config_parser.has_option("core", "AIRFLOW_HOME"): 

665 msg = ( 

666 "Specifying both AIRFLOW_HOME environment variable and airflow_home " 

667 "in the config file is deprecated. Please use only the AIRFLOW_HOME " 

668 "environment variable and remove the config file entry." 

669 ) 

670 if "AIRFLOW_HOME" in os.environ: 

671 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1) 

672 elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME: 

673 warnings.warn( 

674 "Specifying airflow_home in the config file is deprecated. As you " 

675 "have left it at the default value you should remove the setting " 

676 "from your airflow.cfg and suffer no change in behaviour.", 

677 category=RemovedInAirflow4Warning, 

678 stacklevel=1, 

679 ) 

680 else: 

681 AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home") 

682 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1) 

683 

684 

685def initialize_config() -> AirflowConfigParser: 

686 """ 

687 Load the Airflow config files. 

688 

689 Called for you automatically as part of the Airflow boot process. 

690 """ 

691 airflow_config_parser = AirflowConfigParser() 

692 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

693 airflow_config_parser.load_test_config() 

694 else: 

695 load_standard_airflow_configuration(airflow_config_parser) 

696 # If the user set unit_test_mode in the airflow.cfg, we still 

697 # want to respect that and then load the default unit test configuration 

698 # file on top of it. 

699 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

700 airflow_config_parser.load_test_config() 

701 return airflow_config_parser 

702 

703 

704def make_group_other_inaccessible(file_path: str): 

705 try: 

706 permissions = os.stat(file_path) 

707 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR)) 

708 except Exception as e: 

709 log.warning( 

710 "Could not change permissions of config file to be group/other inaccessible. " 

711 "Continuing with original permissions: %s", 

712 e, 

713 ) 

714 

715 

716def ensure_secrets_loaded( 

717 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

718) -> list[BaseSecretsBackend]: 

719 """ 

720 Ensure that all secrets backends are loaded. 

721 

722 If the secrets_backend_list contains only 2 default backends, reload it. 

723 """ 

724 # Check if the secrets_backend_list contains only 2 default backends. 

725 

726 # Check if we are loading the backends for worker too by checking if the default_backends is equal 

727 # to DEFAULT_SECRETS_SEARCH_PATH. 

728 if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH: 

729 return initialize_secrets_backends(default_backends=default_backends) 

730 return secrets_backend_list 

731 

732 

733def get_custom_secret_backend(worker_mode: bool = False) -> BaseSecretsBackend | None: 

734 """ 

735 Get Secret Backend if defined in airflow.cfg. 

736 

737 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

738 

739 This is a convenience function that calls conf._get_custom_secret_backend(). 

740 """ 

741 return conf._get_custom_secret_backend(worker_mode=worker_mode) 

742 

743 

744def initialize_secrets_backends( 

745 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

746) -> list[BaseSecretsBackend]: 

747 """ 

748 Initialize secrets backend. 

749 

750 * import secrets backend classes 

751 * instantiate them and return them in a list 

752 """ 

753 backend_list = [] 

754 worker_mode = False 

755 if default_backends != DEFAULT_SECRETS_SEARCH_PATH: 

756 worker_mode = True 

757 

758 custom_secret_backend = get_custom_secret_backend(worker_mode) 

759 

760 if custom_secret_backend is not None: 

761 from airflow.models import Connection 

762 

763 custom_secret_backend._set_connection_class(Connection) 

764 backend_list.append(custom_secret_backend) 

765 

766 for class_name in default_backends: 

767 from airflow.models import Connection 

768 

769 secrets_backend_cls = import_string(class_name) 

770 backend = secrets_backend_cls() 

771 backend._set_connection_class(Connection) 

772 backend_list.append(backend) 

773 

774 return backend_list 

775 

776 

777def initialize_auth_manager() -> BaseAuthManager: 

778 """ 

779 Initialize auth manager. 

780 

781 * import user manager class 

782 * instantiate it and return it 

783 """ 

784 auth_manager_cls = conf.getimport(section="core", key="auth_manager") 

785 

786 if not auth_manager_cls: 

787 raise AirflowConfigException( 

788 "No auth manager defined in the config. Please specify one using section/key [core/auth_manager]." 

789 ) 

790 

791 return auth_manager_cls() 

792 

793 

794# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using 

795# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults. 

796AIRFLOW_HOME = get_airflow_home() 

797AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME) 

798 

799# Set up dags folder for unit tests 

800# this directory won't exist if users install via pip 

801_TEST_DAGS_FOLDER = os.path.join( 

802 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags" 

803) 

804if os.path.exists(_TEST_DAGS_FOLDER): 

805 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER 

806else: 

807 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags") 

808 

809# Set up plugins folder for unit tests 

810_TEST_PLUGINS_FOLDER = os.path.join( 

811 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins" 

812) 

813if os.path.exists(_TEST_PLUGINS_FOLDER): 

814 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER 

815else: 

816 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins") 

817 

818SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8") 

819 

820conf: AirflowConfigParser = initialize_config() 

821secrets_backend_list = initialize_secrets_backends() 

822conf.validate()