Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/configuration.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

361 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import logging 

20import os 

21import pathlib 

22import re 

23import shlex 

24import stat 

25import subprocess 

26import sys 

27import warnings 

28from base64 import b64encode 

29from collections.abc import Callable 

30from configparser import ConfigParser 

31from inspect import ismodule 

32from io import StringIO 

33from re import Pattern 

34from typing import IO, TYPE_CHECKING, Any 

35from urllib.parse import urlsplit 

36 

37from typing_extensions import overload 

38 

39from airflow._shared.configuration.parser import ( 

40 AirflowConfigParser as _SharedAirflowConfigParser, 

41 configure_parser_from_configuration_description, 

42) 

43from airflow._shared.module_loading import import_string 

44from airflow.exceptions import AirflowConfigException, RemovedInAirflow4Warning 

45from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH 

46from airflow.task.weight_rule import WeightRule 

47from airflow.utils import yaml 

48 

49if TYPE_CHECKING: 

50 from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager 

51 from airflow.secrets import BaseSecretsBackend 

52 

53log = logging.getLogger(__name__) 

54 

55# show Airflow's deprecation warnings 

56if not sys.warnoptions: 

57 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow") 

58 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow") 

59 

60_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$") 

61 

62ConfigType = str | int | float | bool 

63ConfigOptionsDictType = dict[str, ConfigType] 

64ConfigSectionSourcesType = dict[str, str | tuple[str, str]] 

65ConfigSourcesType = dict[str, ConfigSectionSourcesType] 

66 

67ENV_VAR_PREFIX = "AIRFLOW__" 

68 

69 

70class _SecretKeys: 

71 """Holds the secret keys used in Airflow during runtime.""" 

72 

73 fernet_key: str = "" # Set only if needed when generating a new file 

74 jwt_secret_key: str = "" 

75 

76 

77class ConfigModifications: 

78 """ 

79 Holds modifications to be applied when writing out the config. 

80 

81 :param rename: Mapping from (old_section, old_option) to (new_section, new_option) 

82 :param remove: Set of (section, option) to remove 

83 :param default_updates: Mapping from (section, option) to new default value 

84 """ 

85 

86 def __init__(self) -> None: 

87 self.rename: dict[tuple[str, str], tuple[str, str]] = {} 

88 self.remove: set[tuple[str, str]] = set() 

89 self.default_updates: dict[tuple[str, str], str] = {} 

90 

91 def add_rename(self, old_section: str, old_option: str, new_section: str, new_option: str) -> None: 

92 self.rename[(old_section, old_option)] = (new_section, new_option) 

93 

94 def add_remove(self, section: str, option: str) -> None: 

95 self.remove.add((section, option)) 

96 

97 def add_default_update(self, section: str, option: str, new_default: str) -> None: 

98 self.default_updates[(section, option)] = new_default 

99 

100 

101def _parse_sqlite_version(s: str) -> tuple[int, ...]: 

102 match = _SQLITE3_VERSION_PATTERN.match(s) 

103 if match is None: 

104 return () 

105 return tuple(int(p) for p in match.group("version").split(".")) 

106 

107 

108@overload 

109def expand_env_var(env_var: None) -> None: ... 

110 

111 

112@overload 

113def expand_env_var(env_var: str) -> str: ... 

114 

115 

116def expand_env_var(env_var: str | None) -> str | None: 

117 """ 

118 Expand (potentially nested) env vars. 

119 

120 Repeat and apply `expandvars` and `expanduser` until 

121 interpolation stops having any effect. 

122 """ 

123 if not env_var or not isinstance(env_var, str): 

124 return env_var 

125 while True: 

126 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

127 if interpolated == env_var: 

128 return interpolated 

129 env_var = interpolated 

130 

131 

132def run_command(command: str) -> str: 

133 """Run command and returns stdout.""" 

134 process = subprocess.Popen( 

135 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

136 ) 

137 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

138 

139 if process.returncode != 0: 

140 raise AirflowConfigException( 

141 f"Cannot execute {command}. Error code is: {process.returncode}. " 

142 f"Output: {output}, Stderr: {stderr}" 

143 ) 

144 

145 return output 

146 

147 

148def _default_config_file_path(file_name: str) -> str: 

149 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates") 

150 return os.path.join(templates_dir, file_name) 

151 

152 

153def retrieve_configuration_description( 

154 include_airflow: bool = True, 

155 include_providers: bool = True, 

156 selected_provider: str | None = None, 

157) -> dict[str, dict[str, Any]]: 

158 """ 

159 Read Airflow configuration description from YAML file. 

160 

161 :param include_airflow: Include Airflow configs 

162 :param include_providers: Include provider configs 

163 :param selected_provider: If specified, include selected provider only 

164 :return: Python dictionary containing configs & their info 

165 """ 

166 base_configuration_description: dict[str, dict[str, Any]] = {} 

167 if include_airflow: 

168 with open(_default_config_file_path("config.yml")) as config_file: 

169 base_configuration_description.update(yaml.safe_load(config_file)) 

170 if include_providers: 

171 from airflow.providers_manager import ProvidersManager 

172 

173 for provider, config in ProvidersManager().provider_configs: 

174 if not selected_provider or provider == selected_provider: 

175 base_configuration_description.update(config) 

176 return base_configuration_description 

177 

178 

179class AirflowConfigParser(_SharedAirflowConfigParser): 

180 """ 

181 Custom Airflow Configparser supporting defaults and deprecated options. 

182 

183 This is a subclass of the shared AirflowConfigParser that adds Core-specific initialization 

184 and functionality (providers, validation, writing, etc.). 

185 

186 The defaults are stored in the ``_default_values``. The configuration description keeps 

187 description of all the options available in Airflow (description follow config.yaml.schema). 

188 

189 :param default_config: default configuration (in the form of ini file). 

190 :param configuration_description: description of configuration to use 

191 """ 

192 

193 def __init__( 

194 self, 

195 default_config: str | None = None, 

196 *args, 

197 **kwargs, 

198 ): 

199 _configuration_description = retrieve_configuration_description(include_providers=False) 

200 # For those who would like to use a different data structure to keep defaults: 

201 # We have to keep the default values in a ConfigParser rather than in any other 

202 # data structure, because the values we have might contain %% which are ConfigParser 

203 # interpolation placeholders. The _default_values config parser will interpolate them 

204 # properly when we call get() on it. 

205 _default_values = create_default_config_parser(_configuration_description) 

206 from airflow.providers_manager import ProvidersManager 

207 

208 super().__init__( 

209 _configuration_description, 

210 _default_values, 

211 ProvidersManager, 

212 create_default_config_parser, 

213 _default_config_file_path("provider_config_fallback_defaults.cfg"), 

214 *args, 

215 **kwargs, 

216 ) 

217 self._configuration_description = _configuration_description 

218 self._default_values = _default_values 

219 if default_config is not None: 

220 self._update_defaults_from_string(default_config) 

221 self._update_logging_deprecated_template_to_one_from_defaults() 

222 self.is_validated = False 

223 self._suppress_future_warnings = False 

224 

225 @property 

226 def _validators(self) -> list[Callable[[], None]]: 

227 """Overring _validators from shared base class to add core-specific validators.""" 

228 return [ 

229 self._validate_sqlite3_version, 

230 self._validate_enums, 

231 self._validate_deprecated_values, 

232 self._upgrade_postgres_metastore_conn, 

233 ] 

234 

235 def _update_logging_deprecated_template_to_one_from_defaults(self): 

236 default = self.get_default_value("logging", "log_filename_template") 

237 if default: 

238 # Tuple does not support item assignment, so we have to create a new tuple and replace it 

239 original_replacement = self.deprecated_values["logging"]["log_filename_template"] 

240 self.deprecated_values["logging"]["log_filename_template"] = ( 

241 original_replacement[0], 

242 default, 

243 ) 

244 

245 # A mapping of old default values that we want to change and warn the user 

246 # about. Mapping of section -> setting -> { old, replace } 

247 deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = { 

248 "logging": { 

249 "log_filename_template": ( 

250 re.compile( 

251 re.escape( 

252 "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log" 

253 ) 

254 ), 

255 # The actual replacement value will be updated after defaults are loaded from config.yml 

256 "XX-set-after-default-config-loaded-XX", 

257 ), 

258 }, 

259 "core": { 

260 "executor": (re.compile(re.escape("SequentialExecutor")), "LocalExecutor"), 

261 }, 

262 } 

263 

264 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"] 

265 enums_options = { 

266 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()), 

267 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"], 

268 ("dag_processor", "file_parsing_sort_mode"): [ 

269 "modified_time", 

270 "random_seeded_by_host", 

271 "alphabetical", 

272 ], 

273 ("logging", "logging_level"): _available_logging_levels, 

274 ("logging", "fab_logging_level"): _available_logging_levels, 

275 # celery_logging_level can be empty, which uses logging_level as fallback 

276 ("logging", "celery_logging_level"): [*_available_logging_levels, ""], 

277 # uvicorn and gunicorn logging levels for web servers 

278 ("logging", "uvicorn_logging_level"): _available_logging_levels, 

279 ("logging", "gunicorn_logging_level"): _available_logging_levels, 

280 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""], 

281 ("api", "grid_view_sorting_order"): ["topological", "hierarchical_alphabetical"], 

282 ("logging", "dag_processor_log_target"): ["file", "stdout"], 

283 } 

284 

285 upgraded_values: dict[tuple[str, str], str] 

286 """Mapping of (section,option) to the old value that was upgraded""" 

287 

288 def write_custom_config( 

289 self, 

290 file: IO[str], 

291 comment_out_defaults: bool = True, 

292 include_descriptions: bool = True, 

293 extra_spacing: bool = True, 

294 modifications: ConfigModifications | None = None, 

295 ) -> None: 

296 """ 

297 Write a configuration file using a ConfigModifications object. 

298 

299 This method includes only options from the current airflow.cfg. For each option: 

300 - If it's marked for removal, omit it. 

301 - If renamed, output it under its new name and add a comment indicating its original location. 

302 - If a default update is specified, apply the new default and output the option as a commented line. 

303 - Otherwise, if the current value equals the default and comment_out_defaults is True, output it as a comment. 

304 Options absent from the current airflow.cfg are omitted. 

305 

306 :param file: File to write the configuration. 

307 :param comment_out_defaults: If True, options whose value equals the default are written as comments. 

308 :param include_descriptions: Whether to include section descriptions. 

309 :param extra_spacing: Whether to insert an extra blank line after each option. 

310 :param modifications: ConfigModifications instance with rename, remove, and default updates. 

311 """ 

312 modifications = modifications or ConfigModifications() 

313 output: dict[str, list[tuple[str, str, bool, str]]] = {} 

314 

315 for section in self._sections: # type: ignore[attr-defined] # accessing _sections from ConfigParser 

316 for option, orig_value in self._sections[section].items(): # type: ignore[attr-defined] 

317 key = (section.lower(), option.lower()) 

318 if key in modifications.remove: 

319 continue 

320 

321 mod_comment = "" 

322 if key in modifications.rename: 

323 new_sec, new_opt = modifications.rename[key] 

324 effective_section = new_sec 

325 effective_option = new_opt 

326 mod_comment += f"# Renamed from {section}.{option}\n" 

327 else: 

328 effective_section = section 

329 effective_option = option 

330 

331 value = orig_value 

332 if key in modifications.default_updates: 

333 mod_comment += ( 

334 f"# Default updated from {orig_value} to {modifications.default_updates[key]}\n" 

335 ) 

336 value = modifications.default_updates[key] 

337 

338 default_value = self.get_default_value(effective_section, effective_option, fallback="") 

339 is_default = str(value) == str(default_value) 

340 output.setdefault(effective_section.lower(), []).append( 

341 (effective_option, str(value), is_default, mod_comment) 

342 ) 

343 

344 for section, options in output.items(): 

345 section_buffer = StringIO() 

346 section_buffer.write(f"[{section}]\n") 

347 if include_descriptions: 

348 description = self.configuration_description.get(section, {}).get("description", "") 

349 if description: 

350 for line in description.splitlines(): 

351 section_buffer.write(f"# {line}\n") 

352 section_buffer.write("\n") 

353 for option, value_str, is_default, mod_comment in options: 

354 key = (section.lower(), option.lower()) 

355 if key in modifications.default_updates and comment_out_defaults: 

356 section_buffer.write(f"# {option} = {value_str}\n") 

357 else: 

358 if mod_comment: 

359 section_buffer.write(mod_comment) 

360 if is_default and comment_out_defaults: 

361 section_buffer.write(f"# {option} = {value_str}\n") 

362 else: 

363 section_buffer.write(f"{option} = {value_str}\n") 

364 if extra_spacing: 

365 section_buffer.write("\n") 

366 content = section_buffer.getvalue().strip() 

367 if content: 

368 file.write(f"{content}\n\n") 

369 

370 def _upgrade_postgres_metastore_conn(self): 

371 """ 

372 Upgrade SQL schemas. 

373 

374 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres` 

375 must be replaced with `postgresql`. 

376 """ 

377 section, key = "database", "sql_alchemy_conn" 

378 old_value = self.get(section, key, _extra_stacklevel=1) 

379 bad_schemes = ["postgres+psycopg2", "postgres"] 

380 good_scheme = "postgresql" 

381 parsed = urlsplit(old_value) 

382 if parsed.scheme in bad_schemes: 

383 warnings.warn( 

384 f"Bad scheme in Airflow configuration [database] sql_alchemy_conn: `{parsed.scheme}`. " 

385 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must " 

386 f"change to `{good_scheme}` before the next Airflow release.", 

387 FutureWarning, 

388 stacklevel=1, 

389 ) 

390 self.upgraded_values[(section, key)] = old_value 

391 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value) 

392 self._update_env_var(section=section, name=key, new_value=new_value) 

393 

394 # if the old value is set via env var, we need to wipe it 

395 # otherwise, it'll "win" over our adjusted value 

396 old_env_var = self._env_var_name("core", key) 

397 os.environ.pop(old_env_var, None) 

398 

399 def _validate_enums(self): 

400 """Validate that enum type config has an accepted value.""" 

401 for (section_key, option_key), enum_options in self.enums_options.items(): 

402 if self.has_option(section_key, option_key): 

403 value = self.get(section_key, option_key, fallback=None) 

404 if value and value not in enum_options: 

405 raise AirflowConfigException( 

406 f"`[{section_key}] {option_key}` should not be " 

407 f"{value!r}. Possible values: {', '.join(enum_options)}." 

408 ) 

409 

410 def _validate_sqlite3_version(self): 

411 """ 

412 Validate SQLite version. 

413 

414 Some features in storing rendered fields require SQLite >= 3.15.0. 

415 """ 

416 if "sqlite" not in self.get("database", "sql_alchemy_conn"): 

417 return 

418 

419 import sqlite3 

420 

421 min_sqlite_version = (3, 15, 0) 

422 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version: 

423 return 

424 

425 from airflow.utils.docs import get_docs_url 

426 

427 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version) 

428 raise AirflowConfigException( 

429 f"error: SQLite C library too old (< {min_sqlite_version_str}). " 

430 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}" 

431 ) 

432 

433 def _get_custom_secret_backend(self, worker_mode: bool | None = None) -> Any | None: 

434 return super()._get_custom_secret_backend( 

435 worker_mode=worker_mode if worker_mode is not None else False 

436 ) 

437 

438 def mask_secrets(self): 

439 from airflow._shared.configuration.parser import _build_kwarg_env_prefix, _collect_kwarg_env_vars 

440 from airflow._shared.secrets_masker import mask_secret as mask_secret_core 

441 from airflow.sdk.log import mask_secret as mask_secret_sdk 

442 

443 for section, key in self.sensitive_config_values: 

444 try: 

445 with self.suppress_future_warnings(): 

446 value = self.get(section, key, suppress_warnings=True) 

447 except AirflowConfigException: 

448 log.debug( 

449 "Could not retrieve value from section %s, for key %s. Skipping redaction of this conf.", 

450 section, 

451 key, 

452 ) 

453 continue 

454 mask_secret_core(value) 

455 mask_secret_sdk(value) 

456 

457 # Mask per-key backend kwarg env vars (AIRFLOW__SECRETS__BACKEND_KWARG__* etc.). 

458 # These are not in sensitive_config_values but may contain sensitive values. 

459 for _section, _kwargs_key in [ 

460 ("secrets", "backend_kwargs"), 

461 ("workers", "secrets_backend_kwargs"), 

462 ]: 

463 _prefix = _build_kwarg_env_prefix(_section, _kwargs_key) 

464 for _value in _collect_kwarg_env_vars(_prefix).values(): 

465 mask_secret_core(_value) 

466 mask_secret_sdk(_value) 

467 

468 def load_test_config(self): 

469 """ 

470 Use test configuration rather than the configuration coming from airflow defaults. 

471 

472 When running tests we use special the unit_test configuration to avoid accidental modifications and 

473 different behaviours when running the tests. Values for those test configuration are stored in 

474 the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder 

475 and you need to change values there if you want to make some specific configuration to be used 

476 """ 

477 from cryptography.fernet import Fernet 

478 

479 unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg" 

480 unit_test_config = unit_test_config_file.read_text() 

481 self.remove_all_read_configurations() 

482 with StringIO(unit_test_config) as test_config_file: 

483 self.read_file(test_config_file) 

484 

485 # We need those globals before we run "get_all_expansion_variables" because this is where 

486 # the variables are expanded from in the configuration - set to random values for tests 

487 _SecretKeys.fernet_key = Fernet.generate_key().decode() 

488 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8") 

489 self.expand_all_configuration_values() 

490 log.info("Unit test configuration loaded from 'config_unit_tests.cfg'") 

491 

492 def expand_all_configuration_values(self): 

493 """Expand all configuration values using global and local variables defined in this module.""" 

494 all_vars = get_all_expansion_variables() 

495 for section in self.sections(): 

496 for key, value in self.items(section): 

497 if value is not None: 

498 if self.has_option(section, key): 

499 self.remove_option(section, key, remove_default=False) 

500 if self.is_template(section, key) or not isinstance(value, str): 

501 self.set(section, key, value) 

502 else: 

503 self.set(section, key, value.format(**all_vars)) 

504 

505 def remove_all_read_configurations(self): 

506 """Remove all read configurations, leaving only default values in the config.""" 

507 for section in self.sections(): 

508 self.remove_section(section) 

509 

510 def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: 

511 """ 

512 Override to use module-level function that reads from global conf. 

513 

514 This ensures as_dict() and other methods use the same secrets backend 

515 configuration as the global conf instance (set via conf_vars in tests). 

516 """ 

517 secrets_client = get_custom_secret_backend() 

518 if not secrets_client: 

519 return None 

520 try: 

521 return secrets_client.get_config(config_key) 

522 except Exception as e: 

523 raise AirflowConfigException( 

524 "Cannot retrieve config from alternative secrets backend. " 

525 "Make sure it is configured properly and that the Backend " 

526 "is accessible.\n" 

527 f"{e}" 

528 ) 

529 

530 def __getstate__(self) -> dict[str, Any]: 

531 """Return the state of the object as a dictionary for pickling.""" 

532 return { 

533 name: getattr(self, name) 

534 for name in [ 

535 "_sections", 

536 "is_validated", 

537 "configuration_description", 

538 "upgraded_values", 

539 "_default_values", 

540 ] 

541 } 

542 

543 def __setstate__(self, state) -> None: 

544 """Restore the state of the object from a dictionary representation.""" 

545 self.__init__() # type: ignore[misc] 

546 config = state.pop("_sections") 

547 self.read_dict(config) 

548 self.__dict__.update(state) 

549 

550 

551def get_airflow_home() -> str: 

552 """Get path to Airflow Home.""" 

553 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow")) 

554 

555 

556def get_airflow_config(airflow_home: str) -> str: 

557 """Get Path to airflow.cfg path.""" 

558 airflow_config_var = os.environ.get("AIRFLOW_CONFIG") 

559 if airflow_config_var is None: 

560 return os.path.join(airflow_home, "airflow.cfg") 

561 return expand_env_var(airflow_config_var) 

562 

563 

564def get_all_expansion_variables() -> dict[str, Any]: 

565 return { 

566 "FERNET_KEY": _SecretKeys.fernet_key, 

567 "JWT_SECRET_KEY": _SecretKeys.jwt_secret_key, 

568 **{ 

569 k: v 

570 for k, v in globals().items() 

571 if not k.startswith("_") and not callable(v) and not ismodule(v) 

572 }, 

573 } 

574 

575 

576def _generate_fernet_key() -> str: 

577 from cryptography.fernet import Fernet 

578 

579 return Fernet.generate_key().decode() 

580 

581 

582def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser: 

583 """ 

584 Create default config parser based on configuration description. 

585 

586 It creates ConfigParser with all default values retrieved from the configuration description and 

587 expands all the variables from the global and local variables defined in this module. 

588 

589 :param configuration_description: configuration description - retrieved from config.yaml files 

590 following the schema defined in "config.yml.schema.json" in the config_templates folder. 

591 :return: Default Config Parser that can be used to read configuration values from. 

592 """ 

593 parser = ConfigParser() 

594 all_vars = get_all_expansion_variables() 

595 configure_parser_from_configuration_description(parser, configuration_description, all_vars) 

596 return parser 

597 

598 

599def write_default_airflow_configuration_if_needed() -> AirflowConfigParser: 

600 airflow_config = pathlib.Path(AIRFLOW_CONFIG) 

601 if airflow_config.is_dir(): 

602 msg = ( 

603 "Airflow config expected to be a path to the configuration file, " 

604 f"but got a directory {airflow_config.__fspath__()!r}." 

605 ) 

606 raise IsADirectoryError(msg) 

607 if not airflow_config.exists(): 

608 log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__()) 

609 config_directory = airflow_config.parent 

610 if not config_directory.exists(): 

611 if not config_directory.is_relative_to(AIRFLOW_HOME): 

612 msg = ( 

613 f"Config directory {config_directory.__fspath__()!r} not exists " 

614 f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. " 

615 "Please create this directory first." 

616 ) 

617 raise FileNotFoundError(msg) from None 

618 log.debug("Create directory %r for Airflow config", config_directory.__fspath__()) 

619 config_directory.mkdir(parents=True, exist_ok=True) 

620 if not conf.get("core", "fernet_key"): 

621 # We know that fernet_key is not set, so we can generate it, set as global key 

622 # and also write it to the config file so that same key will be used next time 

623 _SecretKeys.fernet_key = _generate_fernet_key() 

624 conf._configuration_description["core"]["options"]["fernet_key"]["default"] = ( 

625 _SecretKeys.fernet_key 

626 ) 

627 conf._default_values.set("core", "fernet_key", _SecretKeys.fernet_key) 

628 

629 _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8") 

630 conf._configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = ( 

631 _SecretKeys.jwt_secret_key 

632 ) 

633 conf._default_values.set("api_auth", "jwt_secret", _SecretKeys.jwt_secret_key) 

634 # Invalidate cached configuration_description so it recomputes with the updated base 

635 conf.invalidate_cache() 

636 pathlib.Path(airflow_config.__fspath__()).touch() 

637 make_group_other_inaccessible(airflow_config.__fspath__()) 

638 with open(airflow_config, "w") as file: 

639 conf.write( 

640 file, 

641 include_sources=False, 

642 include_env_vars=True, 

643 include_providers=True, 

644 extra_spacing=True, 

645 only_defaults=True, 

646 show_values=True, 

647 ) 

648 return conf 

649 

650 

651def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser): 

652 """ 

653 Load standard airflow configuration. 

654 

655 In case it finds that the configuration file is missing, it will create it and write the default 

656 configuration values there, based on defaults passed, and will add the comments and examples 

657 from the default configuration. 

658 

659 :param airflow_config_parser: parser to which the configuration will be loaded 

660 

661 """ 

662 global AIRFLOW_HOME # to be cleaned in Airflow 4.0 

663 log.info("Reading the config from %s", AIRFLOW_CONFIG) 

664 airflow_config_parser.read(AIRFLOW_CONFIG) 

665 if airflow_config_parser.has_option("core", "AIRFLOW_HOME"): 

666 msg = ( 

667 "Specifying both AIRFLOW_HOME environment variable and airflow_home " 

668 "in the config file is deprecated. Please use only the AIRFLOW_HOME " 

669 "environment variable and remove the config file entry." 

670 ) 

671 if "AIRFLOW_HOME" in os.environ: 

672 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1) 

673 elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME: 

674 warnings.warn( 

675 "Specifying airflow_home in the config file is deprecated. As you " 

676 "have left it at the default value you should remove the setting " 

677 "from your airflow.cfg and suffer no change in behaviour.", 

678 category=RemovedInAirflow4Warning, 

679 stacklevel=1, 

680 ) 

681 else: 

682 AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home") 

683 warnings.warn(msg, category=RemovedInAirflow4Warning, stacklevel=1) 

684 

685 

686def initialize_config() -> AirflowConfigParser: 

687 """ 

688 Load the Airflow config files. 

689 

690 Called for you automatically as part of the Airflow boot process. 

691 """ 

692 airflow_config_parser = AirflowConfigParser() 

693 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

694 airflow_config_parser.load_test_config() 

695 else: 

696 load_standard_airflow_configuration(airflow_config_parser) 

697 # If the user set unit_test_mode in the airflow.cfg, we still 

698 # want to respect that and then load the default unit test configuration 

699 # file on top of it. 

700 if airflow_config_parser.getboolean("core", "unit_test_mode"): 

701 airflow_config_parser.load_test_config() 

702 return airflow_config_parser 

703 

704 

705def make_group_other_inaccessible(file_path: str): 

706 try: 

707 permissions = os.stat(file_path) 

708 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR)) 

709 except Exception as e: 

710 log.warning( 

711 "Could not change permissions of config file to be group/other inaccessible. " 

712 "Continuing with original permissions: %s", 

713 e, 

714 ) 

715 

716 

717def ensure_secrets_loaded( 

718 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

719) -> list[BaseSecretsBackend]: 

720 """ 

721 Ensure that all secrets backends are loaded. 

722 

723 If the secrets_backend_list contains only 2 default backends, reload it. 

724 """ 

725 # Check if the secrets_backend_list contains only 2 default backends. 

726 

727 # Check if we are loading the backends for worker too by checking if the default_backends is equal 

728 # to DEFAULT_SECRETS_SEARCH_PATH. 

729 if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH: 

730 return initialize_secrets_backends(default_backends=default_backends) 

731 return secrets_backend_list 

732 

733 

734def get_custom_secret_backend(worker_mode: bool = False) -> BaseSecretsBackend | None: 

735 """ 

736 Get Secret Backend if defined in airflow.cfg. 

737 

738 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

739 

740 This is a convenience function that calls conf._get_custom_secret_backend(). 

741 """ 

742 return conf._get_custom_secret_backend(worker_mode=worker_mode) 

743 

744 

745def initialize_secrets_backends( 

746 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

747) -> list[BaseSecretsBackend]: 

748 """ 

749 Initialize secrets backend. 

750 

751 * import secrets backend classes 

752 * instantiate them and return them in a list 

753 """ 

754 backend_list = [] 

755 worker_mode = False 

756 if default_backends != DEFAULT_SECRETS_SEARCH_PATH: 

757 worker_mode = True 

758 

759 custom_secret_backend = get_custom_secret_backend(worker_mode) 

760 

761 if custom_secret_backend is not None: 

762 from airflow.models import Connection 

763 

764 custom_secret_backend._set_connection_class(Connection) 

765 backend_list.append(custom_secret_backend) 

766 

767 for class_name in default_backends: 

768 from airflow.models import Connection 

769 

770 secrets_backend_cls = import_string(class_name) 

771 backend = secrets_backend_cls() 

772 backend._set_connection_class(Connection) 

773 backend_list.append(backend) 

774 

775 return backend_list 

776 

777 

778def initialize_auth_manager() -> BaseAuthManager: 

779 """ 

780 Initialize auth manager. 

781 

782 * import user manager class 

783 * instantiate it and return it 

784 """ 

785 auth_manager_cls = conf.getimport(section="core", key="auth_manager") 

786 

787 if not auth_manager_cls: 

788 raise AirflowConfigException( 

789 "No auth manager defined in the config. Please specify one using section/key [core/auth_manager]." 

790 ) 

791 

792 return auth_manager_cls() 

793 

794 

795# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using 

796# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults. 

797AIRFLOW_HOME = get_airflow_home() 

798AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME) 

799 

800# Set up dags folder for unit tests 

801# this directory won't exist if users install via pip 

802_TEST_DAGS_FOLDER = os.path.join( 

803 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags" 

804) 

805if os.path.exists(_TEST_DAGS_FOLDER): 

806 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER 

807else: 

808 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags") 

809 

810# Set up plugins folder for unit tests 

811_TEST_PLUGINS_FOLDER = os.path.join( 

812 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins" 

813) 

814if os.path.exists(_TEST_PLUGINS_FOLDER): 

815 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER 

816else: 

817 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins") 

818 

819SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8") 

820 

821conf: AirflowConfigParser = initialize_config() 

822secrets_backend_list = initialize_secrets_backends() 

823conf.validate()